<?php
//
// This functions are used for background tasks: the tasks that the user should
// not wait to happen in the browser: e-mails, keyring regeneration etc.
//
require_once($INC . "/util.inc.php");
require_once($INC . "/log.inc.php");
require_once($INC . "/sql.inc.php");
require_once($INC . "/state.inc.php");
require_once($INC . "/prof.inc.php");
if (!isset($rg_event_socket))
$rg_event_socket = "/var/lib/rocketgit/sockets/event.sock";
$rg_event_error = "";
function rg_event_set_error($str)
{
global $rg_event_error;
$rg_event_error = $str;
rg_log($str);
}
function rg_event_error()
{
global $rg_event_error;
return $rg_event_error;
}
$rg_event_split_functions = array();
/*
* Register event functions.
*/
function rg_event_register_functions($functions)
{
global $rg_event_split_functions;
if (empty($functions))
return FALSE;
foreach ($functions as $type => $function)
$rg_event_split_functions[$type] = $function;
return TRUE;
}
/*
* Signals the daemon that there is some work to do
* @timeout == NULL=forever, 0=no_wait, else, wait @timeout ms
*/
function rg_event_signal_daemon($ev_id, $timeout)
{
global $rg_event_socket;
if (empty($rg_event_socket)) {
rg_log('DEBUG: rg_event_socket is not defined!');
return TRUE;
}
if ($timeout === NULL)
$s_timeout = "forever";
else if ($timeout === 0)
$s_timeout = "no_wait";
else
$s_timeout = $timeout . "ms";
rg_prof_start("event_signal_daemon");
rg_log_enter("event_signal_daemon: event_id=[$ev_id] timeout=$s_timeout");
if (empty($ev_id))
$buf = "W\n";
else
$buf = "NOTIFY " . $ev_id . "\n";
$flags = 0;
if ($timeout === 0)
$flags |= RG_SOCKET_NO_WAIT;
$r = rg_socket($rg_event_socket, $buf, $timeout, 1, $flags);
rg_log_exit();
return $r;
}
/*
* Inserts an event
* This function is called from all over the place to generate events
*/
function rg_event_add($db, $event)
{
rg_prof_start("event_add");
rg_log_enter("event_add: event=" . rg_array2string($event));
if (!isset($event['ip']))
$event['ip'] = rg_var_str('REMOTE_ADDR');
$event['debug'] = rg_state_get($db, 'debug');
$ret = FALSE;
while (1) {
$now = time();
$prio = $event['prio']; unset($event['prio']);
$params = array("now" => $now,
"prio" => $prio,
"data" => rg_serialize($event));
$sql = "INSERT INTO events (itime, prio, data)"
. " VALUES (@@now@@, @@prio@@, @@data@@)";
$res = rg_sql_query_params($db, $sql, $params);
if ($res === FALSE) {
rg_event_set_error("Could not add event (" . rg_sql_error() . ")");
break;
}
rg_sql_free_result($res);
$ret = TRUE;
break;
}
rg_log_exit();
rg_prof_end("event_add");
return $ret;
}
/*
* Process an event
*/
function rg_event_process($db, $ev_id, $event)
{
global $rg_event_split_functions;
rg_prof_start("event_process");
rg_log_enter("event_process: ev_id=$ev_id"
. " event=" . rg_array2string($event));
$ret = FALSE;
while (1) {
$category = $event['category'];
unset($event['category']);
if (!isset($rg_event_split_functions[$category])) {
rg_event_set_error("Cannot find event function [cat=$category]!");
rg_log_ml("DEBUG: rg_event_split_functions="
. print_r($rg_event_split_functions, TRUE));
break;
}
$f = $rg_event_split_functions[$category];
rg_log("Calling $f...");
$evs = $f($db, $event);
if ($evs === FALSE) {
rg_event_set_error('Error in function ['
. $f . '] (category [' . $category . '])!');
break;
}
if (!is_array($evs)) {
rg_internal_error("evs is not array!");
break;
}
// TODO: here we must do a transaction?
$r = TRUE;
foreach ($evs as $index => $ev) {
$r = rg_event_add($db, $ev);
if ($r !== TRUE)
break;
}
if ($r !== TRUE)
break;
$ret = TRUE;
break;
}
rg_log_exit();
rg_prof_end("event_process");
return $ret;
}
/*
* Notifies all listeners when @ev_id is happening.
*/
function rg_event_notify(&$notify_list, $ev_id, $misc)
{
if (!isset($notify_list[$ev_id]))
return;
$buf = "DONE $ev_id $misc\n";
$buf_len = strlen($buf);
foreach ($notify_list[$ev_id] as $index => $info) {
if (isset($info['func'])) {
$info['func']($info['priv'], $buf);
continue;
}
$fd = $info['fd'];
rg_log("Notify [$ev_id] [fd=$fd]...");
$r = @socket_send($fd, $buf, $buf_len, 0);
if ($r === FALSE)
rg_log("Error in sending.");
}
return;
}
/*
* Process events queue
* reset id to 1 if queue is empty?
* Returns FALSE on error, else, the number of events processed
* @notify_list: Will be used to signal the finish of an event
*/
function rg_event_process_queue($db, &$notify_list)
{
rg_prof_start("event_process_queue");
rg_log_enter("event_process_queue: notify_list: "
. rg_array2string($notify_list));
$ret = FALSE;
while (1) {
$now = time();
// We limit to be able to deal with high prio tasks
$sql = "SELECT * FROM events"
. " WHERE fail = 0"
. " AND next_try < $now"
. " ORDER BY prio, id"
. " FOR UPDATE"
. " LIMIT 100";
$res = rg_sql_query($db, $sql);
if ($res === FALSE) {
rg_event_set_error("Cannot load job list"
. " (" . rg_sql_error() . ")");
break;
}
$no_of_events = rg_sql_num_rows($res);
while (($row = rg_sql_fetch_array($res))) {
$params = array('id' => $row['id']);
$sql = "UPDATE events SET fail = 1 WHERE id = @@id@@";
while (1) {
$ev = rg_unserialize($row['data']);
if ($ev === FALSE)
break;
$ev['prio'] = $row['prio'];
$ev['itime'] = $row['itime'];
$r = rg_event_process($db, $row['id'], $ev);
if ($r !== TRUE) {
if ($ev['debug'] == 1)
break;
$sql = "UPDATE events"
. " SET tries = tries + 1"
. ", next_try = $now + tries * 600"
. " WHERE id = @@id@@";
break;
}
if (isset($ev['notification']))
rg_event_notify($notify_list, $ev['notification'], "");
$sql = "DELETE FROM events WHERE id = @@id@@";
break;
}
$res2 = rg_sql_query_params($db, $sql, $params);
if ($res2 === FALSE) {
rg_event_set_error('internal error');
break;
}
rg_sql_free_result($res2);
}
rg_sql_free_result($res);
$ret = $no_of_events;
break;
}
rg_log_exit();
rg_prof_end("event_process_queue");
return $ret;
}
?>