<?php
$INC = isset($INC) ? $INC : dirname(__FILE__);
require_once($INC . '/events.inc.php');
require_once($INC . '/builder.inc.php');
require_once($INC . '/keys.inc.php');
require_once($INC . '/graph.inc.php');
$rg_worker_error = '';
function rg_worker_set_error($str)
{
global $rg_worker_error;
$rg_worker_error = $str;
rg_log($str);
}
function rg_worker_error()
{
global $rg_worker_error;
return $rg_worker_error;
}
/*
* Event functions
*/
$rg_worker_functions = array(
'worker_event_add' => 'rg_worker_event_add',
'worker_event_del' => 'rg_worker_event_del'
);
rg_event_register_functions($rg_worker_functions);
/*
* Event for adding a worker
*/
function rg_worker_event_add($db, $ev)
{
$ret = array();
$ret[] = array_merge($ev, array(
'category' => 'rg_keys_event_regen',
'prio' => 10)
);
return $ret;
}
/*
* Event for deleting a worker
*/
function rg_worker_event_del($db, $ev)
{
$ret = array();
$ret[] = array_merge($ev, array(
'category' => 'rg_keys_event_regen',
'prio' => 10)
);
return $ret;
}
/*
* Helper for sorting by time
*/
function rg_worker_sort_time_desc($a, $b)
{
if ($a['itime'] > $b['itime'])
return 1;
if ($a['itime'] < $b['itime'])
return -1;
return 0;
}
/*
* Cosmetic function (one element)
*/
function rg_worker_cosmetic_one(&$row)
{
$row['itime_nice'] = gmdate('Y-m-d H:i', $row['itime']);
if ($row['last_connect'] == 0)
$row['last_connect_nice'] = 'n/a';
else
$row['last_connect_nice'] = gmdate('Y-m-d H:i',
$row['last_connect']);
if (empty($row['env'])) {
$row['HTML:envs_nice'] = 'n/a';
} else {
$list = rg_unserialize($row['env']);
$t = array();
foreach ($list as $env => $env_info)
$t[] = rg_xss_safe($env);
sort($t);
$row['HTML:envs_nice'] = implode('<br />', $t);
}
}
/*
* Cosmetic function (array)
*/
function rg_worker_cosmetic(&$a)
{
foreach ($a as &$row)
rg_worker_cosmetic_one($row);
uasort($a, 'rg_worker_sort_time_desc');
}
/*
* Sorts the list of workers.
* It is used when selecting a server to send the job.
*/
function rg_worker_sort($a, $b)
{
if ($a['cost'] > $b['cost'])
return 1;
if ($a['cost'] < $b['cost'])
return -1;
return 0;
}
/*
* Sorts the list of workers by name.
*/
function rg_worker_sort_alpha($a, $b)
{
return strcmp($a, $b);
}
/*
* Loads the list of workers for a user
*/
function rg_worker_list($db, $uid)
{
rg_prof_start('worker_list');
rg_log_enter('worker_list uid=' . $uid);
$ret = FALSE;
while (1) {
$key = 'workers' . '::' . $uid;
$c = rg_cache_get($key);
if (($c !== FALSE) && isset($c['LOADED'])) {
$ret = $c['list'];
rg_worker_cosmetic($ret);
break;
}
$params = array('uid' => $uid);
$sql = 'SELECT * FROM workers'
. ' WHERE uid = @@uid@@';
$res = rg_sql_query_params($db, $sql, $params);
if ($res === FALSE) {
rg_worker_set_error('cannot load list');
break;
}
$ret = array();
while (($row = rg_sql_fetch_array($res))) {
$id = $row['id'];
$ret[$id] = $row;
}
rg_sql_free_result($res);
$a = array('LOADED' => 1, 'list' => $ret);
rg_cache_set($key, $a, RG_SOCKET_NO_WAIT);
rg_worker_cosmetic($ret);
break;
}
rg_log_exit();
rg_prof_end('worker_list');
return $ret;
}
/*
* Returns a list of workers, including the global ones
*/
function rg_worker_list_all($db, $uid)
{
$l1 = rg_worker_list($db, $uid);
if ($l1 === FALSE)
return FALSE;
$l2 = rg_worker_list($db, 0);
if ($l2 === FALSE)
return FALSE;
$ret = array();
foreach ($l1 as $id => $i)
$ret[$id] = $i;
foreach ($l2 as $id => $i)
$ret[$id] = $i;
uasort($ret, 'rg_worker_sort');
return $ret;
}
/*
* Searches for a worker, by name
* Returns -1 on error, 0 if not found or the info if found
*/
function rg_worker_find_by_id($db, $uid, $id)
{
rg_log_enter('worker_find_by_id uid=' . $uid . ' id=' . $id);
$ret = -1;
while (1) {
$wi = rg_worker_list($db, $uid);
if ($wi === FALSE)
break;
$ret = 0;
foreach ($wi as $_id => $i) {
if ($_id == $id) {
$ret = $i;
break;
}
}
break;
}
rg_log_exit();
return $ret;
}
/*
* Searches for a worker, by name
* Returns -1 on error, 0 if not found or the info if found
*/
function rg_worker_find_by_name($db, $uid, $name)
{
rg_log_enter('worker_find_by_name uid=' . $uid . ' name=' . $name);
$ret = -1;
while (1) {
$wi = rg_worker_list($db, $uid);
if ($wi === FALSE)
break;
$ret = 0;
foreach ($wi as $id => $i) {
rg_log('DEBUG: comparing with [' . $i['name'] . ']...');
if (strcasecmp($i['name'], $name) == 0) {
$ret = $i;
break;
}
}
break;
}
rg_log_exit();
return $ret;
}
/*
* Adds a new worker
*/
function rg_worker_add($db, $uid, $a)
{
rg_prof_start('worker_add');
rg_log_enter('worker_add');
//rg_log_ml('DEBUG: uid=' . $uid . ', a: ' . print_r($a, TRUE));
$ret = FALSE;
while (1) {
$params = $a;
$params['uid'] = $uid;
if ($a['id'] == 0) {
$params['itime'] = time();
$params['uname'] = '';
$params['host'] = '';
$params['arch'] = '';
$params['last_connect'] = '';
$params['last_ip'] = '';
$sql = 'INSERT INTO workers (itime, uid, name'
. ', key, who, cost, workers)'
. ' VALUES (@@itime@@, @@uid@@, @@name@@'
. ', @@key@@, @@who@@, @@cost@@, @@workers@@)'
. ' RETURNING id';
} else {
$sql = 'UPDATE workers SET name = @@name@@'
. ', cost = @@cost@@'
. ', workers = @@workers@@'
. ' WHERE uid = @@uid@@ AND id = @@id@@';
}
$res = rg_sql_query_params($db, $sql, $params);
if ($res === FALSE) {
rg_worker_set_error('cannot insert/update');
break;
}
$row = rg_sql_fetch_array($res);
rg_sql_free_result($res);
if ($a['id'] == 0)
$id = $row['id'];
else
$id = $a['id'];
$event = array(
'category' => 'worker_event_add',
'prio' => 50,
'ui' => array('uid' => $uid),
'add' => $a['id'] == 0 ? 1 : 0,
'id' => $id);
$r = rg_event_add($db, $event);
if ($r !== TRUE) {
rg_worker_set_error('cannot add event'
. ' (' . rg_event_error() . ')');
break;
}
$key = 'workers' . '::' . $uid . '::' . 'list' . '::' . $id;
unset($params['uid']);
$params['id'] = $id;
rg_cache_merge($key, $params, RG_SOCKET_NO_WAIT);
rg_event_signal_daemon('', 0);
$ret = TRUE;
break;
}
rg_log_exit();
rg_prof_end('worker_add');
return $ret;
}
/*
* Updates 'last_connect' and 'last_ip', 'uname', 'host', 'arch', 'env'
* and other fields.
* It is called from builder.php when a worker connects.
*/
function rg_worker_update($db, $uid, $id, $a)
{
rg_prof_start('worker_update');
rg_log_enter('worker_update');
rg_log_ml('DEBUG: a: ' . print_r($a, TRUE));
$ret = FALSE;
while (1) {
$params = $a;
$params['uid'] = $uid;
$params['id'] = $id;
$params['last_connect'] = time();
$params['env'] = rg_serialize($a['env']);
$ki = rg_keys_info($a['ssh_key']);
if ($ki['ok'] != 1) {
rg_worker_set_error('cannot parse ssh key: '
. rg_keys_error());
break;
}
$params['fingerprint_sha256'] = $ki['fingerprint_sha256'];
$r = rg_keys_weak($db, $ki);
if ($r['ok'] != 1) {
rg_worker_set_error('cannot detect weak ssh keys: '
. rg_keys_error());
break;
}
if ($r['weak'] == 1) {
rg_worker_set_error('ssh key is weak');
break;
}
$sql = 'UPDATE workers SET'
. ' name = @@name@@'
. ', last_connect = @@last_connect@@'
. ', last_ip = @@ip@@'
. ', uname = @@uname@@'
. ', host = @@host@@'
. ', arch = @@arch@@'
. ', env = @@env@@'
. ', ssh_key = @@ssh_key@@'
. ', fingerprint_sha256 = @@fingerprint_sha256@@'
. ' WHERE uid = @@uid@@'
. ' AND id = @@id@@';
$res = rg_sql_query_params($db, $sql, $params);
if ($res === FALSE) {
rg_worker_set_error('cannot update fields');
break;
}
rg_sql_free_result($res);
$event = array(
'category' => 'worker_event_add',
'prio' => 50,
'ui' => array('uid' => $uid),
'add' => 0,
'id' => $id);
$r = rg_event_add($db, $event);
if ($r !== TRUE) {
rg_worker_set_error('cannot add event'
. ' (' . rg_event_error() . ')');
break;
}
$key = 'workers' . '::' . $uid . '::' . 'list' . '::' . $id;
unset($params['uid']);
rg_cache_merge($key, $params, RG_SOCKET_NO_WAIT);
$ret = TRUE;
break;
}
rg_log_exit();
rg_prof_end('worker_update');
return $ret;
}
/*
* Returns a list of available environments
* It also order by cost.
*/
function rg_worker_environments($db, $uid)
{
rg_prof_start('worker_environments');
rg_log_enter('worker_environments');
$ret = FALSE;
while (1) {
$user_envs = array();
if ($uid > 0) {
$r = rg_worker_list($db, $uid);
if ($r === FALSE)
break;
}
$global_envs = rg_worker_list($db, 0);
if ($global_envs === FALSE)
break;
$r = array_merge($user_envs, $global_envs);
$ret = array();
foreach ($r as $id => $i) {
if (empty($i['env']))
continue;
$list = rg_unserialize($i['env']);
if ($list === FALSE)
continue;
foreach ($list as $env => $junk)
$ret[$env] = $env;
}
uksort($ret, 'rg_worker_sort_alpha');
//rg_log_ml('DEBUG: workers_environments: ret=' . print_r($ret, TRUE));
break;
}
rg_log_exit();
rg_prof_end('worker_environments');
return $ret;
}
/*
* Remove workers from database
*/
function rg_worker_remove($db, $uid, $list)
{
rg_prof_start('workers_remove');
rg_log_enter('workers_remove: list=' . rg_array2string($list));
$ret = FALSE;
while (1) {
$my_list = array();
foreach ($list as $key_id => $junk)
$my_list[] = sprintf('%u', $key_id);
$params = array('uid' => $uid);
$sql_list = implode(', ', $my_list);
$sql = 'DELETE FROM workers'
. ' WHERE uid = @@uid@@'
. ' AND id IN (' . $sql_list . ')';
$res = rg_sql_query_params($db, $sql, $params);
if ($res === FALSE) {
rg_worker_set_error('cannot delete api keys');
break;
}
rg_sql_free_result($res);
$event = array(
'category' => 'worker_event_del',
'prio' => 50,
'ui' => array('uid' => $uid),
'keys' => implode(',', $my_list));
$r = rg_event_add($db, $event);
if ($r !== TRUE) {
rg_worker_set_error('cannot add event'
. ' (' . rg_event_error() . ')');
break;
}
$key = 'workers' . '::' . $uid . '::' . 'list';
foreach ($my_list as $_key_id)
rg_cache_unset($key . '::' . $_key_id,
RG_SOCKET_NO_WAIT);
rg_event_signal_daemon('', 0);
$ret = TRUE;
break;
}
rg_log_exit();
rg_prof_end('workers_remove');
return $ret;
}
/*
* Insert worker stats
*/
function rg_worker_stats_insert($db, $worker_id, $ts, $a)
{
rg_prof_start('worker_stats_insert');
$ret = FALSE;
while (1) {
$params = array('itime' => $ts,
'worker_id' => $worker_id,
'data' => rg_serialize($a)
);
$sql = 'INSERT INTO worker_stats_' . gmdate('Y_m', $ts)
. ' (worker_id, itime, data)'
. ' VALUES (@@worker_id@@, @@itime@@, @@data@@)';
$res = rg_sql_query_params($db, $sql, $params);
if ($res === FALSE)
break;
rg_sql_free_result($res);
$ret = TRUE;
break;
}
rg_prof_end('worker_stats_insert');
return $ret;
}
/*
* Helper for rg_worker_data
*/
function rg_worker_data_decode_load($data)
{
$r = rg_unserialize($data);
if ($r === FALSE)
return FALSE;
return $r['load'];
}
/*
* Helper for rg_worker_data
*/
function rg_worker_data_decode_jobs($data)
{
$r = rg_unserialize($data);
if ($r === FALSE)
return FALSE;
return $r['jobs'];
}
/*
* Loads data for graphs
* @unit - interval on which a sum is made
*/
function rg_worker_data($db, $type, $worker_id, $start, $end, $unit, $mode)
{
$params = array(
'worker_id' => $worker_id,
'start' => $start,
'end' => $end
);
$q = 'SELECT data AS value, itime FROM worker_stats'
. ' WHERE worker_id = @@worker_id@@'
. ' AND itime >= @@start@@ AND itime <= @@end@@';
switch ($type) {
case 'load':
$func = 'rg_worker_data_decode_load';
break;
case 'jobs':
$func = 'rg_worker_data_decode_jobs';
break;
default:
rg_internal_error('invalid type');
return FALSE;
}
$ret = rg_graph_query($db, $start, $end, $unit, $mode, $q, $params,
$func);
if ($ret === FALSE)
rg_workers_set_error(rg_graph_error());
return $ret;
}
/*
* Generate graphs(s) for workers
* @r - the result of rg_worker_list function
* Returns a new 'r' augmented with the graphs.
*/
function rg_worker_graphs($db, $r)
{
rg_log_ml('DEBUG: worker_graphs: ' . print_r($r, TRUE));
$period = 'hour';
$start = gmmktime(0, 0, 0, gmdate('m') - 1, gmdate('d'), gmdate('Y'));
$end = gmmktime(0, 0, 0, gmdate('m'), gmdate('d') + 1, gmdate('Y')) - 1;
$subtitle = gmdate('Y-m-d H:i:s', $start)
. ' - ' . gmdate('Y-m-d H:i:s', $end) . " UTC\n";
// TODO: set this as default?
$a = array(
'bg_style' => 'fill: #bdf',
'h' => 150,
'title_style' => 'color: #fff',
'subtitle' => $subtitle,
'subtitle_style' => 'color: #00f; font-size: 8pt',
'data_style' => 'fill: #ffa',
'data_line_style' => 'fill: #f00',
'data_line_width' => 1,
'gap' => 0,
'footer_style' => 'font-color: #000; font-size: 8pt'
);
// TODO: it is not effcient to run the same query two times!
foreach ($r as $id => $wi) {
$b = $a;
$b['title'] = 'Worker [' . $wi['name'] . '] load';
$g = rg_worker_data($db, 'load', $id, $start, $end, $period, 'avg');
if ($g === FALSE) {
$r[$id]['graph_load'] = 'Error generating graph: '
. rg_worker_error() . '!';
} else {
$b['data'] = $g;
$r[$id]['HTML:graph_load'] = rg_graph($b);
}
$b = $a;
$b['title'] = 'Worker [' . $wi['name'] . '] jobs';
$g = rg_worker_data($db, 'jobs', $id, $start, $end, $period, 'sum');
if ($g === FALSE) {
$r[$id]['graph_jobs'] = 'Error generating graph: '
. rg_worker_error() . '!';
} else {
$b['data'] = $g;
$r[$id]['HTML:graph_jobs'] = rg_graph($b);
}
}
return $r;
}
/*
* Workers - add/edit
*/
function rg_worker_add_high_level($db, $rg, $op, $paras)
{
rg_prof_start('worker_add_high_level');
rg_log_enter('worker_add_high_level op=' . $op);
$ret = '';
$errmsg = array();
// All admins will have 'who' == 0
if ($rg['login_ui']['is_admin'] == 1)
$target_uid = 0;
else
$target_uid = $rg['login_ui']['uid'];
$rg['worker'] = array();
if (strcmp($op, 'add') == 0) {
$rg['worker']['id'] = 0;
$rg['worker']['name'] = '';
$rg['worker']['key'] = rg_id(32);
$rg['worker']['cost'] = 10;
$rg['worker']['workers'] = 1;
} else { // edit
if (isset($paras[0])) {
$id = intval($paras[0]);
$r = rg_worker_find_by_id($db, $target_uid, $id);
if ($r === -1)
$errmsg[] = 'error loading list; try again later';
else if ($r === 0)
$errmsg[] = 'invalid id for edit';
else
$rg['worker'] = $r;
} else {
$errmsg[] = 'no worker id received';
}
}
rg_log_ml('DEBUG: rg[worker]: ' . print_r($rg['worker'], TRUE));
$doit = rg_var_uint('doit');
while ($doit == 1) {
if ($rg['worker']['id'] == 0) {
$rg['worker']['id'] = rg_var_uint('worker::id');
$rg['worker']['key'] = trim(rg_var_str('worker::key'));
}
$rg['worker']['name'] = trim(rg_var_str('worker::name'));
$rg['worker']['cost'] = rg_var_uint('worker::cost');
$rg['worker']['workers'] = rg_var_uint('worker::workers');
if ($rg['worker']['id'] == 0) {
$len = strlen($rg['worker']['key']);
if ($len < 32) {
$errmsg[] = 'invalid key (len = ' . $len . ')';
break;
}
}
if (empty($rg['worker']['name'])) {
$errmsg[] = 'invalid name';
break;
}
if (!rg_valid_referer()) {
$errmsg[] = 'invalid referer; try again';
break;
}
if (!rg_token_valid($db, $rg, 'worker_add', FALSE)) {
$errmsg[] = 'invalid token; try again';
break;
}
$rg['worker']['who'] = $target_uid;
$r = rg_worker_add($db, $target_uid, $rg['worker']);
if ($r === FALSE) {
$errmsg[] = rg_worker_error();
break;
}
$ret .= rg_template('user/settings/workers/add_ok.html',
$rg, TRUE /*xss*/);
// Load default values for the next 'add' operation
$rg['worker']['id'] = 0;
$rg['worker']['name'] = '';
$rg['worker']['key'] = rg_id(32);
$rg['worker']['cost'] = 10;
$rg['worker']['workers'] = 1;
break;
}
$rg['HTML:errmsg'] = rg_template_errmsg($errmsg);
$rg['rg_form_token'] = rg_token_get($db, $rg, 'worker_add');
$ret .= rg_template('user/settings/workers/add.html',
$rg, TRUE /*xss*/);
rg_log_exit();
rg_prof_end('worker_add_high_level');
return $ret;
}
/*
* Workers - list
*/
function rg_worker_list_high_level($db, $rg, $paras)
{
rg_prof_start('worker_list_high_level');
rg_log_enter('worker_list_high_level');
$ret = '';
$errmsg = array();
$rg['HTML:status'] = '';
if ($rg['login_ui']['is_admin'] == 1)
$target_uid = 0;
else
$target_uid = $rg['login_ui']['uid'];
$doit = rg_var_uint('doit');
while ($doit == 1) {
if (!rg_valid_referer()) {
$errmsg[] = 'invalid referer; try again';
break;
}
if (!rg_token_valid($db, $rg, 'workers_list', FALSE)) {
$errmsg[] = 'invalid token; try again.';
break;
}
$list = rg_var_str('delete_ids');
$r = rg_worker_remove($db, $target_uid, $list);
if ($r !== TRUE) {
$errmsg[] = 'cannot delete: ' . rg_worker_error();
break;
}
$rg['HTML:status'] = rg_template(
'user/settings/workers/delete_ok.html',
$rg, TRUE /*xss*/);
break;
}
$r = rg_worker_list($db, $target_uid);
if ($r === FALSE) {
$rg['errmsg'] = rg_worker_error();
$ret .= rg_template('user/settings/workers/list_err.html',
$rg, TRUE /*xss*/);
} else {
$r = rg_worker_graphs($db, $r);
$rg['rg_form_token'] = rg_token_get($db, $rg, 'workers_list');
$rg['HTML:errmsg'] = rg_template_errmsg($errmsg);
$ret .= rg_template_table('user/settings/workers/list', $r, $rg);
}
rg_log_exit();
rg_prof_end('worker_list_high_level');
return $ret;
}
/*
* Deals with workers
*/
function rg_worker_high_level($db, &$rg, $paras)
{
rg_prof_start('worker_high_level');
rg_log_enter('worker_high_level');
$ret = '';
$op = empty($paras) ? 'list' : array_shift($paras);
$rg['menu']['worker'][$op] = 1;
$rg['HTML:menu_level2'] =
rg_template('user/settings/workers/menu.html', $rg, TRUE /*xss*/);
switch ($op) {
case 'edit':
case 'add':
$ret .= rg_worker_add_high_level($db, $rg, $op, $paras);
break;
case 'queue':
case 'queue_all':
$ret .= rg_builder_list_high_level($db, $rg, $op, $paras);
break;
default:
$ret .= rg_worker_list_high_level($db, $rg, $paras);
break;
}
$hints = array();
$hints[]['HTML:hint'] = rg_template('user/settings/workers/hints.html',
$rg, TRUE /*xss*/);
$ret .= rg_template_table('hints/list', $hints, $rg);
rg_log_exit();
rg_prof_end('worker_high_level');
return $ret;
}