<?php
// This is called by cron, and is persistent.
// It takes care of build jobs.
// It can be run on the same machine as the webserver.
error_reporting(E_ALL);
ini_set("track_errors", "On");
set_time_limit(0);
$_s = microtime(TRUE);
require_once("/etc/rocketgit/config.php");
$INC = dirname(__FILE__) . "/../inc";
require_once($INC . "/init.inc.php");
require_once($INC . "/log.inc.php");
require_once($INC . "/sql.inc.php");
require_once($INC . "/struct.inc.php");
require_once($INC . "/events.inc.php");
require_once($INC . "/repo.inc.php");
require_once($INC . "/prof.inc.php");
require_once($INC . "/mr.inc.php");
require_once($INC . "/keys.inc.php");
require_once($INC . "/user.inc.php");
require_once($INC . "/bug.inc.php");
require_once($INC . "/fixes.inc.php");
require_once($INC . "/plan.inc.php");
require_once($INC . "/admin.inc.php");
require_once($INC . "/ver.php");
require_once($INC . "/builder.inc.php");
require_once($INC . "/conn.inc.php");
require_once($INC . "/workers.inc.php");
if ($rg_builder_port == 0)
exit(0);
/*
* Called when a connection closes
*/
function xdestroy($key)
{
global $workers;
$workers--;
}
/*
* Called when a new client connects
*/
function xnew($key, $arg)
{
global $rg_conns;
global $workers;
$s = &$rg_conns[$key];
$s['func_cmd'] = 'xdispatch';
$s['func_destroy'] = 'xdestroy';
$s['db'] = $arg;
$s['auth'] = 0;
$s['is_master'] = 0;
$workers++;
}
/*
* Dispatch a command from a worker
*/
function xdispatch($key, $line)
{
global $rg_conns;
global $jobs;
rg_log('Dispatch[' . $key . ']');
$s = &$rg_conns[$key];
$cmd = substr($line, 0, 4);
$d = trim(substr($line, 4));
$x = stripcslashes($d);
$u = @unserialize($x);
if ($u === FALSE) {
rg_conn_enq($key, 'ERR malformed command' . "\n");
rg_conn_shutdown($key, 2);
return;
}
rg_log_ml('cmd=' . $cmd . ' u: ' . print_r($u, TRUE));
if (strcmp($cmd, 'ANN ') == 0) {
$now = time();
if (($u['boot_time'] < $now - 30) || ($u['boot_time'] > $now + 30)) {
rg_log('boot_time is too old; abort');
rg_conn_enq($key, 'ERR time not in sync between worker'
. ' and server' . "\n");
rg_conn_shutdown($key, 2);
return;
}
// Lookup user first
if (!isset($u['type']))
$u['type'] = 'global';
if (strcasecmp($u['type'], 'global') == 0) {
$worker_uid = 0;
} else if (!isset($u['user'])) {
rg_log('user field is not present; abort');
rg_conn_enq($key, 'ERR user not defined in conf file' . "\n");
rg_conn_shutdown($key, 2);
return;
} else {
$w_ui = rg_user_info($s['db'], 0, $u['user'], '');
if ($w_ui['exists'] !== 1) {
rg_log('invalid user; abort');
rg_conn_enq($key, 'ERR invalid user' . "\n");
rg_conn_shutdown($key, 2);
return;
}
$worker_uid = $w_ui['uid'];
}
// Check if worker is registered
$wi = rg_worker_find_by_name($s['db'], $worker_uid, $u['name']);
if ($wi === -1) {
rg_log('cannot load worker info: '
. rg_worker_error() . '; abort');
rg_conn_enq($key, 'ERR internal error' . "\n");
rg_conn_shutdown($key, 2);
return;
}
if ($wi === 0) {
rg_log('name [' . $u['name'] . '] not found; abort');
$err = 'ERR builder name not found, add it in the web'
. ' interface!' . "\n";
rg_conn_enq($key, $err);
rg_conn_shutdown($key, 2);
return;
}
$sign = hash_hmac('sha512', $u['boot_time'], $wi['key']);
if (strcmp($sign, $u['sign']) != 0) {
rg_log('signature is not ok [' . $sign . ']'
. ' != [' . $u['sign'] . ']');
rg_conn_enq($key, 'ERR wrong signature' . "\n");
rg_conn_shutdown($key, 2);
return;
}
$s['worker_id'] = $wi['id'];
$s['worker_uid'] = $worker_uid;
$s['ann'] = $u;
$s['auth'] = 1;
$s['active_jobs'] = 0;
$a = array();
$a['name'] = $u['name'];
$a['uname'] = $u['uname'];
$a['host'] = $u['host'];
$a['arch'] = $u['arch'];
$a['env'] = $u['env'];
$a['ssh_key'] = $u['ssh_key'];
$a['ip'] = rg_fix_ip($s['ip']);
rg_worker_update($s['db'], $worker_uid, $wi['id'], $a);
rg_log($key . ':Peer [' . $u['name'] . '] announce');
return;
}
if ($s['auth'] != 1) {
rg_log($key . ':Client not authenticated!');
$a = array('error' => 'client not authenticated');
$cmd = 'ERR ' . rg_conn_prepare($a) . "\n";
rg_conn_enq($key, $cmd);
return;
}
if (strcmp($cmd, 'STA ') == 0) {
$jid = $u['id'];
$jobs[$jid]['worker'] = $key;
$jobs[$jid]['worker_name'] = $s['ann']['name'];
$jobs[$jid]['worker_started'] = time();
$s['active_jobs'] += 1;
rg_log('Job started: ' . $jid);
return;
}
if (strcmp($cmd, 'DON ') == 0) {
$s['active_jobs'] -= 1;
$jid = $u['id'];
if (isset($u['error'])) {
rg_log('job failed with error: ' . $u['error']);
// Delay job and retry (on another worker)
$jobs[$jid]['next_try'] = time() + 60;
$k = $s['worker_id'];
$jobs[$jid]['avoid'][$k] = 1;
return;
}
$r = rg_builder_done($s['db'], $jobs[$jid], $u['status']);
if ($r === TRUE) {
unset($jobs[$jid]);
// Send DoneREceived - so client will delete the jobs
$a = array('id' => $job['id']);
$cmd = 'DRE ' . rg_conn_prepare($a) . "\n";
rg_conn_enq($key, $cmd);
}
return;
}
rg_log('Unknown command [' . $cmd . ']!');
}
function rg_process_job($db, &$job)
{
global $rg_conns;
// Job is already in progress?
if (!empty($job['worker']))
return;
// Should we delay because of a previous fail?
if (isset($job['next_try']) && ($job['next_try'] < time()))
return;
rg_log_ml('Processing job: ' . print_r($job, TRUE));
// Get the worker list, so we can sort it
$workers_list = rg_worker_list_all($db, $job['uid']);
if ($workers_list === FALSE) {
rg_log('cannot load workers list: ' . rg_worker_error());
$job['next_try'] = time() + 60;
return;
}
//rg_log_ml('DEBUG: workers list: ' . print_r($workers_list, TRUE));
// Trying to find a worker in the list of connections
$found = FALSE;
foreach ($rg_conns as $key => $i) {
if (strcmp($key, 'master') == 0)
continue;
if (!isset($i['ann'])) {
rg_log('Conn ' . $key . ' has no announce.');
// TODO: close after some time?
continue;
}
if (empty($i['ann']['env'])) {
rg_log('Conn ' . $key . ' has no environments.');
continue;
}
if (($i['worker_uid'] > 0) && ($i['worker_uid'] != $job['uid'])) {
//rg_log('uid does not match, try next');
continue;
}
$k = $i['worker_id'];
if (!isset($workers_list[$k])) {
rg_log('Worker ' . $k . ' not found in workers_list! Strange!');
continue;
}
$wi = $workers_list[$k];
if (isset($job['avoid'][$k])) {
rg_log('We must avoid worker ' . $k);
continue;
}
rg_log('DEBUG: selected worker ' . $k);
// If number of active jobs is == max workers, skip it
if ($wi['workers'] <= $i['active_jobs']) {
rg_log('DEBUG: workers=' . $wi['workers']
. ' active_jobs=' . $i['active_jobs']);
continue;
}
foreach ($i['ann']['env'] as $env => $junk) {
if (strcasecmp($job['env'], $env) != 0) {
//rg_log('DEBUG job env [' . $job['env'] . ']'
// . ' != worker [' . $env . ']');
continue;
}
// Send only what is really needed
$job2 = array();
$job2['cmds'] = $job['cmds'];
$job2['packages'] = $job['packages'];
$job2['hook_id'] = $job['hook_id'];
$job2['url'] = $job['url'];
$job2['head'] = $job['head'];
$job2['env'] = $job['env'];
$job2['id'] = $job['id'];
$cmd = 'BLD ' . rg_conn_prepare($job2) . "\n";
rg_conn_enq($key, $cmd);
// TODO: get a confirmation?
$job['worker'] = $key;
$job['worker_started'] = 0;
$job['worker_sent'] = time();
rg_log_ml('DEBUG: After sending BLD: job: ' . print_r($job, TRUE));
// TODO: after some time, if worker_started is still 0,
// mark the 'worker' as '' to be able to go in other place
// TODO: maybe the client must resync with server to
// abort jobs already done on another host, to not
// duplicate work
$found = TRUE;
return;
}
}
if (!$found)
rg_log('No workers found!');
}
rg_prof_start("MAIN");
rg_log_set_file($rg_log_dir . "/builder.log");
rg_log_set_sid("000000"); // to spread the logs
rg_log("Start (ver=$rocketgit_version)...");
rg_sql_app("rg-builder");
$db = rg_sql_open($rg_sql);
if ($db === FALSE) {
rg_internal_error("Cannot connect to database!");
exit(1);
}
if (rg_struct_ok($db) === FALSE)
exit(0);
// Prepare socket
$socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if ($socket === FALSE) {
rg_internal_error('Cannot create socket!');
exit(1);
}
socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1);
$r = @socket_bind($socket, $rg_builder_bind, $rg_builder_port);
if ($r === FALSE) {
rg_internal_error("Cannot bind socket!");
exit(1);
}
$r = @socket_listen($socket, 128);
if ($r === FALSE) {
rg_internal_error("Cannot set queue length on socket!");
exit(1);
}
rg_conn_new('master', $socket);
$rg_conns['master']['exit_on_close'] = 1;
$rg_conns['master']['func_new'] = 'xnew';
$rg_conns['master']['func_new_arg'] = $db;
$rg_conns['master']['is_master'] = 1;
$jobs = array();
$workers = 0;
$original_mtime = @filemtime(__FILE__);
do {
// We do not want stale entries!
rg_cache_core_destroy();
// Check our mtime so we can upgrade the software and this script
// will restart.
clearstatcache();
$mtime = @filemtime(__FILE__);
if ($mtime != $original_mtime) {
rg_log("mtime=$mtime, original_mtime=$original_mtime");
rg_log('File changed. Exiting...');
break;
}
if ($workers > 0) {
$r = rg_builder_load_jobs($db);
if ($r['ok'] != 1) {
rg_log('Cannot load jobs from database! Sleeping 30s...');
sleep(30);
continue;
}
foreach ($r['list'] as $jid => $job) {
if (!isset($jobs[$jid])) {
$job['worker'] = '';
$job['avoid'] = array(); // to avoid workers
$jobs[$jid] = $job;
}
$r = rg_process_job($db, $jobs[$jid]);
if ($r === FALSE)
break;
}
if ($r === FALSE)
break;
}
rg_log("Waiting for connections...");
rg_conn_wait(10);
} while (1);
@socket_close($socket);
rg_log("Exiting...");
rg_prof_end("MAIN");
rg_prof_log();
?>