<?php
// This is called by cron, and is persistent.
// It takes care of build jobs.
// It can be run on the same machine as the webserver.
error_reporting(E_ALL);
ini_set('track_errors', 'On');
set_time_limit(0);
$_s = microtime(TRUE);
require_once('/etc/rocketgit/config.php');
$INC = dirname(__FILE__) . '/../inc';
require_once($INC . '/init.inc.php');
require_once($INC . '/log.inc.php');
require_once($INC . '/sql.inc.php');
require_once($INC . '/struct.inc.php');
require_once($INC . '/events.inc.php');
require_once($INC . '/repo.inc.php');
require_once($INC . '/prof.inc.php');
require_once($INC . '/mr.inc.php');
require_once($INC . '/keys.inc.php');
require_once($INC . '/user.inc.php');
require_once($INC . '/bug.inc.php');
require_once($INC . '/fixes.inc.php');
require_once($INC . '/plan.inc.php');
require_once($INC . '/admin.inc.php');
require_once($INC . '/ver.php');
require_once($INC . '/builder.inc.php');
require_once($INC . '/conn.inc.php');
require_once($INC . '/workers.inc.php');
if ($rg_builder_port == 0)
exit(0);
/*
* Called when a connection closes
*/
function xdestroy($key)
{
global $workers;
$workers--;
}
/*
* Called when a new worker connects
*/
function xnew($key, $arg)
{
global $rg_conns;
global $workers;
global $features;
$s = &$rg_conns[$key];
$s['func_data'] = 'xdispatch';
$s['func_destroy'] = 'xdestroy';
$s['db'] = $arg;
$s['auth'] = 0;
$s['is_master'] = 0;
$workers++;
$f = array(
'op' => 'FEATURES',
'features' => $features
);
rg_conn_enq($key, json_encode($f) . "\n");
}
/*
* Dispatch a command from a worker (one json)
*/
function xdispatch_one($key, $data)
{
global $rg_conns;
global $jobs;
global $features;
rg_log($key . ': dispatching');
$s = &$rg_conns[$key];
$u = @json_decode($data, TRUE);
if ($u === NULL) {
$err = array(
'errstr' => 'cannot decode JSON:' . json_last_error_msg()
);
rg_log_ml('Cannot decode JSON: ' . json_last_error_msg());
rg_conn_enq($key, json_encode($err) . "\n");
rg_conn_shutdown($key, 2);
return;
}
//rg_log_ml('DEBUG: u: ' . print_r($u, TRUE));
if (strcmp($u['op'], 'ANN') == 0) {
$now = time();
if (($u['boot_time'] < $now - 30) || ($u['boot_time'] > $now + 30)) {
$err = array(
'errstr' => 'boot_time is too old; time desync or replay attack?'
);
rg_log('boot_time is too old; abort');
rg_conn_enq($key, json_encode($err) . "\n");
rg_conn_shutdown($key, 2);
return;
}
// Lookup user first
if (!isset($u['type']))
$u['type'] = 'global';
if (strcasecmp($u['type'], 'global') == 0) {
$worker_uid = 0;
} else if (!isset($u['user'])) {
rg_log('user field is not present; abort');
$err = array('errstr' => 'user not defined in conf file');
rg_conn_enq($key, json_encode($err) . "\n");
rg_conn_shutdown($key, 2);
return;
} else {
$w_ui = rg_user_info($s['db'], 0, $u['user'], '');
if ($w_ui['exists'] !== 1) {
rg_log('invalid user; abort');
$err = array('errstr' => 'invalid user');
rg_conn_enq($key, json_encode($err) . "\n");
rg_conn_shutdown($key, 2);
return;
}
$worker_uid = $w_ui['uid'];
}
// Check if worker is registered
$wi = rg_worker_find_by_name($s['db'], $worker_uid, $u['name']);
if ($wi === -1) {
rg_log('cannot load worker info: '
. rg_worker_error() . '; abort');
$err = array('errstr' => 'internal error');
rg_conn_enq($key, json_encode($err) . "\n");
rg_conn_shutdown($key, 2);
return;
}
if ($wi === 0) {
rg_log('name [' . $u['name'] . '] not found; abort');
$err = array('errstr' => 'builder name not found, add it'
. ' in the web interface');
rg_conn_enq($key, json_encode($err) . "\n");
rg_conn_shutdown($key, 2);
return;
}
$sign = hash_hmac('sha512', $u['boot_time'], $wi['key']);
if (strcmp($sign, $u['sign']) != 0) {
rg_log('signature is not ok [' . $sign . ']'
. ' != [' . $u['sign'] . ']');
$err = array('errstr' => 'wrong signature');
rg_conn_enq($key, json_encode($err) . "\n");
rg_conn_shutdown($key, 2);
return;
}
$s['worker_id'] = $wi['id'];
$s['worker_name'] = $wi['name'];
$s['worker_uid'] = $worker_uid;
$s['ann'] = $u;
$s['auth'] = 1;
$s['active_jobs'] = array();
$a = array();
$a['name'] = $u['name'];
$a['uname'] = $u['uname'];
$a['host'] = $u['host'];
$a['arch'] = $u['arch'];
$a['env'] = empty($u['env']) ? array() : $u['env'];
$a['ssh_key'] = $u['ssh_key'];
$a['ip'] = rg_fix_ip($s['ip']);
$r = rg_worker_update($s['db'], $worker_uid, $wi['id'], $a);
if ($r !== TRUE) {
rg_log('cannot update worker: ' . rg_worker_error());
$err = array('errstr' => rg_worker_info());
rg_conn_enq($key, json_encode($err) . "\n");
rg_conn_shutdown($key, 2);
return;
}
rg_log($key . ': peer [' . $u['name'] . '] announce processed.');
return;
}
if ($s['auth'] != 1) {
rg_log($key . ':Client not authenticated!');
$err = array('errstr' => 'client not authenticated');
rg_conn_enq($key, json_encode($err) . "\n");
return;
}
if (strcmp($u['op'], 'STA') == 0) {
$jid = $u['id'];
$jobs[$jid]['worker'] = $key;
$jobs[$jid]['worker_name'] = $s['ann']['name'];
$jobs[$jid]['worker_started'] = time();
if (!isset($s['active_jobs'][$jid]))
$s['active_jobs'][$jid] = 1;
rg_log('Job started: ' . $jid);
return;
}
if (strcmp($u['op'], 'DON') == 0) {
$jid = $u['id'];
if (isset($s['active_jobs'][$jid]))
unset($s['active_jobs'][$jid]);
if (isset($u['error'])) {
rg_log('job failed with error: ' . $u['error']);
// Delay job and retry (on another worker)
$jobs[$jid]['next_try'] = time() + 60;
$k = $s['worker_id'];
$jobs[$jid]['avoid'][$k] = 1;
return;
}
$r = rg_builder_done($s['db'], $jobs[$jid], $u['status']);
if ($r === TRUE) {
unset($jobs[$jid]);
// Send DoneREceived - so client will delete the job
$a = array('op' => 'DRE', 'id' => $job['id']);
rg_conn_enq($key, json_encode($a) . "\n");
}
return;
}
if (strcmp($u['op'], 'WORKER_STATS') == 0) {
$_x = $u; unset($_x['op']);
$_ts = $_x['ts']; unset($_x['ts']);
rg_worker_stats_insert($s['db'], $s['worker_id'], $_ts, $_x);
return;
}
rg_log('Unknown command [' . $u['op'] . ']!');
$err = array('errstr' => 'unknown op');
rg_conn_enq($key, json_encode($err) . "\n");
rg_conn_shutdown($key, 2);
}
/*
* Dispatch a command from a worker
*/
function xdispatch($key, $data)
{
$ret = 0;
while (1) {
$pos = strpos($data, "\n");
if ($pos === FALSE)
return $ret;
$one = substr($data, 0, $pos);
xdispatch_one($key, $one);
$data = substr($data, $pos + 1);
$ret += $pos + 1;
}
return $ret;
}
function rg_process_job($db, &$job)
{
global $rg_conns;
// Job is already in progress?
if (!empty($job['worker']))
return;
// Should we delay because of a previous fail?
if (isset($job['next_try']) && ($job['next_try'] < time()))
return;
rg_log_ml('Processing job: ' . print_r($job, TRUE));
if (!isset($job['request']))
$req = $job;
else
$req = $job['request'];
// Get the worker list, so we can sort it
$workers_list = rg_worker_list_all($db, $req['uid']);
if ($workers_list === FALSE) {
rg_log('cannot load workers list: ' . rg_worker_error());
$job['next_try'] = time() + 60;
return;
}
//rg_log_ml('DEBUG: workers list: ' . print_r($workers_list, TRUE));
// Trying to find a worker in the list of connections
foreach ($rg_conns as $key => $i) {
if (strcmp($key, 'master') == 0)
continue;
if (!isset($i['ann'])) {
rg_log('Conn ' . $key . ' has no announce.');
// TODO: close after some time?
continue;
}
if (empty($i['ann']['env'])) {
rg_log('Conn ' . $key . ' has no environments.');
continue;
}
if (($i['worker_uid'] > 0) && ($i['worker_uid'] != $req['uid'])) {
rg_log('uids do not match, try next');
continue;
}
$k = $i['worker_id'];
$name = $i['worker_name'];
if (!isset($workers_list[$k])) {
rg_internal_error('Worker ' . $name . ' not found'
. ' in workers_list! Strange!');
continue;
}
$wi = $workers_list[$k];
if (isset($job['avoid'][$k])) {
rg_log('We must avoid worker ' . $name);
continue;
}
// If number of active jobs is == max workers, skip it
$aj = count($i['active_jobs']);
if ($wi['workers'] && ($aj >= $wi['workers'])) {
rg_log('DEBUG: skip worker ' . $name . ' because'
. ' active_jobs(' . $aj . ')'
. ' >= workers(' . $wi['workers'] . ')');
continue;
}
foreach ($i['ann']['env'] as $env => $junk) {
if (strcasecmp($req['env'], $env) != 0) {
rg_log('DEBUG: worker ' . $name
. ': job env [' . $req['env'] . ']'
. ' != worker [' . $env . ']');
continue;
}
// Send only what is really needed
$job2 = array();
$job2['op'] = 'BLD';
$job2['cmds'] = $req['cmds'];
$job2['packages'] = $req['packages'];
$job2['hook_id'] = $req['hook_id'];
$job2['url'] = $req['url'];
$job2['head'] = $req['head'];
$job2['env'] = $req['env'];
$job2['id'] = $job['id'];
rg_conn_enq($key, json_encode($job2) . "\n");
// TODO: get a confirmation? We get 'STA'.
$job['worker'] = $key;
$job['worker_started'] = 0;
$job['worker_sent'] = time();
rg_log_ml('DEBUG: After sending BLD: job: ' . print_r($job, TRUE));
// TODO: after some time, if worker_started is still 0,
// mark the 'worker' as '' to be able to go in other place
// TODO: maybe the client must resync with server to
// abort jobs already done on another host, to not
// duplicate work
return;
}
}
rg_log('No workers found!');
}
rg_prof_start('MAIN');
rg_log_set_file($rg_log_dir . '/builder.log');
rg_log_set_sid('000000'); // to spread the logs
rg_lock_or_exit('builder.lock');
rg_log('Start (ver=' . $rocketgit_version . ')...');
rg_sql_app('rg-builder');
$db = rg_sql_open($rg_sql);
if ($db === FALSE) {
rg_internal_error('Cannot connect to database!');
exit(1);
}
if (rg_struct_ok($db) === FALSE)
exit(0);
// Prepare socket
$socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if ($socket === FALSE) {
rg_internal_error('Cannot create socket!');
exit(1);
}
socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1);
$r = @socket_bind($socket, $rg_builder_bind, $rg_builder_port);
if ($r === FALSE) {
rg_internal_error('Cannot bind socket!');
exit(1);
}
$r = @socket_listen($socket, 128);
if ($r === FALSE) {
rg_internal_error('Cannot set queue length on socket!');
exit(1);
}
rg_conn_new('master', $socket);
$rg_conns['master']['exit_on_close'] = 1;
$rg_conns['master']['func_new'] = 'xnew';
$rg_conns['master']['func_new_arg'] = $db;
$rg_conns['master']['is_master'] = 1;
$jobs = array();
// What features the builder supports
$features = array('allow_stats' => 1);
$workers = 0;
$original_mtime = @filemtime(__FILE__);
do {
// We do not want stale entries!
rg_cache_core_destroy();
// Check our mtime so we can upgrade the software and this script
// will restart.
clearstatcache();
$mtime = @filemtime(__FILE__);
if ($mtime != $original_mtime) {
rg_log('mtime=' . $mtime . ', original_mtime=' . $original_mtime);
rg_log('File changed. Exiting...');
break;
}
if ($workers > 0) {
$r = rg_builder_load_jobs($db, 'done = 0');
if ($r['ok'] != 1) {
rg_log('Cannot load jobs from database! Sleeping 30s...');
sleep(30);
continue;
}
$_r = TRUE;
foreach ($r['list'] as $jid => $job) {
if (!isset($jobs[$jid])) {
$job['worker'] = '';
$job['avoid'] = array(); // to avoid some workers
$jobs[$jid] = $job;
}
$_r = rg_process_job($db, $jobs[$jid]);
if ($_r === FALSE)
break;
}
if ($_r === FALSE)
break;
}
rg_conn_wait(10);
} while (1);
@socket_close($socket);
rg_log('Exiting...');
rg_prof_end('MAIN');
rg_prof_log();
?>