<?php
//
// This functions are used for background tasks: the tasks that the user should
// not wait to happen in the browser: e-mails, keyring regeneration etc.
// prio - lower = more important
//
require_once(__DIR__ . '/util2.inc.php');
require_once(__DIR__ . '/log.inc.php');
require_once(__DIR__ . '/sql.inc.php');
require_once(__DIR__ . '/prof.inc.php');
if (!isset($rg_event_socket))
$rg_event_socket = "/var/lib/rocketgit/sockets/event.sock";
$rg_event_error = "";
function rg_event_set_error($str)
{
global $rg_event_error;
$rg_event_error = $str;
rg_log($str);
}
function rg_event_error()
{
global $rg_event_error;
return $rg_event_error;
}
$rg_event_split_functions = array();
/*
* Register event functions.
*/
function rg_event_register_functions($functions)
{
global $rg_event_split_functions;
if (empty($functions))
return FALSE;
foreach ($functions as $type => $function)
$rg_event_split_functions[$type] = $function;
return TRUE;
}
/*
* Signals the daemon that there is some work to do
* @timeout == NULL=forever, 0=no_wait, else, wait @timeout ms
*/
function rg_event_signal_daemon($ev_id, $timeout)
{
global $rg_event_socket;
rg_prof_start('event_signal_daemon');
rg_log_enter('event_signal_daemon: event_id=[' . $ev_id . ']');
$ret = FALSE;
do {
if (empty($rg_event_socket)) {
rg_log_debug('rg_event_socket is not defined!');
break;
}
if ($timeout === NULL)
$s_timeout = "forever";
else if ($timeout === 0)
$s_timeout = "no_wait";
else
$s_timeout = $timeout . "ms";
if (empty($ev_id))
$buf = 'W' . "\n";
else
$buf = 'NOTIFY ' . $ev_id . "\n";
$flags = 0;
if ($timeout === 0)
$flags |= RG_SOCKET_NO_WAIT;
$ret = rg_socket($rg_event_socket, $buf, $timeout, 1, $flags);
} while (0);
rg_log_exit();
return $ret;
}
/*
* Inserts an event
* This function is called from all over the place to generate events
*/
function rg_event_add($db, $ev)
{
global $rg_log_sid;
rg_prof_start('event_add');
rg_log_enter('event_add category=' . $ev['category']);
//rg_log_debug('ev: ' . rg_array2string_short($ev));
if (!isset($ev['ui_login']))
$ev['ui_login'] = rg_ui_login();
if (!isset($ev['ip']))
$ev['ip'] = rg_ip();
if (!isset($ev['log_sid']))
$ev['log_sid'] = $rg_log_sid;
$ev['debug'] = rg_debug();
$ret = FALSE;
while (1) {
$now = time();
$prio = $ev['prio']; unset($ev['prio']);
$params = array("now" => $now,
"prio" => $prio,
"data" => rg_serialize($ev));
$sql = "INSERT INTO events (itime, prio, data)"
. " VALUES (@@now@@, @@prio@@, @@data@@)";
$res = rg_sql_query_params($db, $sql, $params);
if ($res === FALSE) {
rg_event_set_error("Could not add event (" . rg_sql_error() . ")");
break;
}
rg_sql_free_result($res);
$ret = TRUE;
break;
}
rg_log_exit();
rg_prof_end("event_add");
return $ret;
}
/*
* Process an event
*/
function rg_event_process($db, $ev_id, $event)
{
global $rg_event_split_functions;
rg_prof_start("event_process");
rg_log_enter('event_process: ev_id=' . $ev_id
. ' cat=' . $event['category']);
$ret = FALSE;
$rollback = FALSE;
do {
rg_debug_set($event['debug']);
rg_log_debug('ev: ' . rg_array2string_short($event));
if (isset($event['ui_login']))
rg_ui_login_set($event['ui_login']);
$category = $event['category'];
unset($event['category']);
if (!isset($rg_event_split_functions[$category])) {
rg_event_set_error("Cannot find event function [cat=$category]!");
break;
}
$r = rg_sql_begin($db);
if ($r !== TRUE)
break;
$rollback = TRUE;
$f = $rg_event_split_functions[$category];
rg_log_debug('Calling ' . $f . '...');
$evs = $f($db, $event);
if ($evs === FALSE) {
rg_event_set_error('Error in function ['
. $f . '] (category [' . $category . '])!');
break;
}
if (!is_array($evs)) {
rg_internal_error("evs is not array!");
break;
}
$err = FALSE;
foreach ($evs as $index => $ev) {
$r = rg_event_add($db, $ev);
if ($r !== TRUE) {
$err = TRUE;
break;
}
}
if ($err)
break;
$r = rg_sql_commit($db);
if ($r !== TRUE)
break;
$rollback = FALSE;
$ret = TRUE;
} while (0);
if ($rollback)
rg_sql_rollback($db);
rg_log_exit();
rg_prof_end("event_process");
return $ret;
}
/*
* Notifies all listeners when @ev_id is happening.
*/
function rg_event_notify(&$notify_list, $ev_id, $misc)
{
if (!isset($notify_list[$ev_id]))
return;
$buf = "DONE $ev_id $misc\n";
$buf_len = strlen($buf);
foreach ($notify_list[$ev_id] as $index => $info) {
if (isset($info['func'])) {
$info['func']($info['priv'], $buf);
continue;
}
$fd = $info['fd'];
rg_log("Notify [$ev_id] [fd=$fd]...");
$r = @socket_send($fd, $buf, $buf_len, 0);
if ($r === FALSE)
rg_log("Error in sending.");
}
return;
}
/*
* Process events queue
* reset id to 1 if queue is empty?
* Returns FALSE on error, else, the number of events processed
* @notify_list: Will be used to signal the finish of an event
*/
function rg_event_process_queue($db, &$notify_list)
{
rg_prof_start("event_process_queue");
rg_log_enter_debug('event_process_queue: notify_list: '
. rg_array2string($notify_list) . '.');
$ret = FALSE;
while (1) {
$now = time();
// We limit to be able to deal with high prio tasks
$sql = "SELECT * FROM events"
. " WHERE fail = 0"
. " AND next_try < " . $now
. " ORDER BY prio, id"
. " FOR UPDATE"
. " LIMIT 100";
$res = rg_sql_query($db, $sql);
if ($res === FALSE) {
rg_event_set_error("Cannot load job list"
. " (" . rg_sql_error() . ")");
break;
}
$no_of_events = rg_sql_num_rows($res);
while (($row = rg_sql_fetch_array($res))) {
$params = array('id' => $row['id']);
$sql = "UPDATE events SET fail = 1 WHERE id = @@id@@";
do {
$ev = rg_unserialize($row['data']);
if ($ev === FALSE)
break;
$ev['prio'] = $row['prio'];
$ev['itime'] = $row['itime'];
$r = rg_event_process($db, $row['id'], $ev);
if ($r !== TRUE) {
// When we are in debug, we want faster retries.
if ($ev['debug'] == 1)
break;
$sql = "UPDATE events"
. " SET tries = tries + 1"
. ", next_try = $now + tries * 600"
. " WHERE id = @@id@@";
break;
}
if (isset($ev['notification']))
rg_event_notify($notify_list, $ev['notification'], "");
$sql = "DELETE FROM events WHERE id = @@id@@";
} while (0);
$res2 = rg_sql_query_params($db, $sql, $params);
if ($res2 === FALSE) {
rg_event_set_error('internal error');
break;
}
rg_sql_free_result($res2);
}
rg_sql_free_result($res);
$ret = $no_of_events;
break;
}
rg_log_exit_debug();
rg_prof_end("event_process_queue");
return $ret;
}