<?php
// This is called by cron, and is persistent.
// It takes care of fast caching, like memcache.
// It will receive signals using a UNIX socket.
error_reporting(E_ALL);
ini_set("track_errors", "On");
set_time_limit(0);
// Increment this if we need to restart this daemon (protocol changes etc.)
$rg_cache_version = 36;
$_s = microtime(TRUE);
require_once("/etc/rocketgit/config.php");
$INC = dirname(__FILE__) . "/../inc";
require_once($INC . "/init.inc.php");
require_once($INC . "/log.inc.php");
require_once($INC . "/sql.inc.php");
require_once($INC . "/struct.inc.php");
require_once($INC . "/cache.inc.php");
require_once($INC . "/repo.inc.php");
require_once($INC . "/prof.inc.php");
require_once($INC . "/mr.inc.php");
require_once($INC . "/keys.inc.php");
require_once($INC . "/user.inc.php");
require_once($INC . "/bug.inc.php");
require_once($INC . "/fixes.inc.php");
require_once($INC . "/ver.php");
function rg_destroy($k, &$conn_table)
{
rg_log("Destroying key $k...");
if (isset($conn_table['r'][$k]))
unset($conn_table['r'][$k]);
if (isset($conn_table['w'][$k]))
unset($conn_table['w'][$k]);
// TODO: seems socket is already closed
if (isset($conn_table['conns'][$k]['socket']))
if (is_resource($conn_table['conns'][$k]['socket']))
socket_close($conn_table['conns'][$k]['socket']);
unset($conn_table['conns'][$k]);
}
function rg_handle_command($k, &$conn_table, $cmd)
{
//rg_log("rg_handle_command: k=$k, cmd=$cmd");
$s = &$conn_table['conns'][$k];
$a = explode(' ', $cmd, 4);
$buf = "ER Invalid command\n";
$no_wait = FALSE;
while (1) {
// We must have at least 2 parameters: cmd and flags and I
if (!isset($a[2]))
break;
$cmd = trim($a[0]);
$flags = trim($a[1]);
if (strncmp($flags, 'F=', 2) != 0) {
rg_log('Invalid command (no flags): $cmd');
break;
}
$flags = substr($flags, 2);
if (strstr($flags, 'W'))
$no_wait = TRUE;
$id = trim($a[2]);
if (strncmp($id, 'I=', 2) != 0) {
rg_log('Invalid command (no id): $cmd');
break;
}
$id = substr($id, 2);
/* From here, commands with no parameters */
/* none yet */
/* From here, at least 1 para */
if (!isset($a[3]))
break;
$para1 = trim($a[3]);
if (strcmp($cmd, "SET") == 0) {
$ns_var_value = explode("=", $para1, 2);
if (!isset($ns_var_value[1]))
break;
$ns_var = trim($ns_var_value[0]);
$value = trim($ns_var_value[1]);
$value = unserialize(stripcslashes($value));
if ($value !== FALSE) {
rg_cache_core_set("normal::" . $ns_var, $value);
$buf = 'OK ' . $id . "\n";
} else {
$buf = 'ER ' . $id . ' cannot unserialize data' . "\n";
}
break;
}
if (strcmp($cmd, "MERGE") == 0) {
$ns_var_value = explode("=", $para1, 2);
if (!isset($ns_var_value[1]))
break;
$ns_var = trim($ns_var_value[0]);
$value = trim($ns_var_value[1]);
$value = unserialize(stripcslashes($value));
if ($value !== FALSE) {
$ret = rg_cache_core_merge("normal::" . $ns_var,
$value);
if ($ret === FALSE)
$buf = 'ER ' . $id . ' NOT_FOUND' . "\n";
else
$buf = 'OK ' . $id . "\n";
} else {
$buf = 'ER ' . $id . 'cannot unserialize data' . "\n";
}
break;
}
if (strcmp($cmd, "INC") == 0) {
$v = rg_cache_core_inc("normal::" . $para1);
$buf = 'OK ' . $id . ' v=' . $v . "\n";
break;
}
if (strcmp($cmd, "GET") == 0) {
$ret = rg_cache_core_get("normal::" . $para1);
if ($ret === FALSE)
$buf = 'ER ' . $id . ' NOT_FOUND' . "\n";
else
$buf = 'OK ' . $id . ' ' . rg_cache_prepare($ret) . "\n";
break;
}
if (strcmp($cmd, "UNSET") == 0) {
$ret = rg_cache_core_unset("normal::" . $para1);
if ($ret === FALSE)
$buf = 'ER ' . $id . ' NOT_FOUND' . "\n";
else
$buf = 'OK ' . $id . "\n";
break;
}
if (strcmp($cmd, "ADUMP") == 0) {
$ret = rg_cache_core_adump("normal::" . $para1);
if ($ret === FALSE)
$buf = 'ER ' . $id . ' NOT_FOUND' . "\n";
else
$buf = 'OK ' . $id . ' ' . rg_cache_prepare($ret) . "\n";
break;
}
if (strcmp($cmd, "APUSH") == 0) {
$ns_var_value = explode("=", $para1, 2);
if (!isset($ns_var_value[1]))
break;
$ns_var = trim($ns_var_value[0]);
$value = trim($ns_var_value[1]);
$value = unserialize(stripcslashes($value));
if ($value !== FALSE) {
rg_cache_core_apush("normal::" . $ns_var, $value);
$buf = 'OK ' . $id . "\n";
} else {
$buf = 'ER ' . $id . ' cannot unserialize data' . "\n";
}
break;
}
if (strcmp($cmd, "APOP") == 0) {
$ret = rg_cache_core_apop("normal::" . $para1);
if ($ret === FALSE)
$buf = 'ER ' . $id . ' NOT_FOUND' . "\n";
else
$buf = 'OK ' . $id . ' ' . $ret . "\n";
break;
}
if (strcmp($cmd, "ASHIFT") == 0) {
$ret = rg_cache_core_ashift("normal::" . $para1);
if ($ret === FALSE)
$buf = 'ER ' . $id . ' NOT_FOUND' . "\n";
else
$buf = 'OK ' . $id . ' ' . $ret . "\n";
break;
}
if (strcmp($cmd, "SHUTDOWN") == 0) {
rg_log('Shutting down...');
exit(0);
}
if (strcmp($cmd, "SLEEP") == 0) {
rg_log('Sleeping...');
$buf = 'OK ' . $id . "\n";
sleep(3);
}
break;
}
if ($no_wait === FALSE) {
rg_log($k . ':DEBUG: enqueue: ' . $buf);
$s['send'] .= $buf;
$conn_table['w'][$k] = $s['socket'];
}
}
function rg_handle_recv($k, &$conn_table)
{
//rg_log("handle_recv on key $k...");
$s = &$conn_table['conns'][$k];
$ret = @socket_recv($s['socket'], $buf, 32 * 4096, 0);
if ($ret === FALSE) {
rg_log("Error in recv (" . socket_strerror(socket_last_error()) . ")");
rg_destroy($k, $conn_table);
return;
}
if ($ret === 0) {
rg_log("Remote closed the connection (received 0).");
rg_destroy($k, $conn_table);
return;
}
rg_log($k . ':DEBUG: received: ' . $buf);
$s['recv'] .= $buf;
$s['close_at'] = time() + 30;
while (1) {
$pos = strpos($s['recv'], "\n");
if ($pos === FALSE)
return;
$cmd = substr($s['recv'], 0, $pos);
rg_handle_command($k, $conn_table, $cmd);
$s['recv'] = substr($s['recv'], $pos + 1);
}
}
function rg_handle_send($k, &$conn_table)
{
//rg_log("Sending on key $k...");
$s = &$conn_table['conns'][$k];
$ret = @socket_send($s['socket'], $s['send'], strlen($s['send']), 0);
if ($ret === FALSE) {
rg_log("Cannot send (" . socket_strerror(socket_last_error()) . ")");
rg_destroy($k, $conn_table);
return;
}
$s['send'] = substr($s['send'], $ret);
if (empty($s['send']))
unset($conn_table['w'][$k]);
}
function rg_handle_new($client, &$conn_table)
{
socket_set_nonblock($client);
$key = intval($client);
$conn_table['conns'][$key] = array(
"socket" => $client,
"recv" => "",
"send" => "",
"close_at" => time() + 30
);
$conn_table['r'][$key] = $client;
//rg_log("Added client with key $key.");
/* This way I can enforce the connecting user to be apache/rocketgit
but is seems it does not work correctly in PHP...
$t = socket_get_option($client, SOL_SOCKET, 17 //SO_PEERCRED);
rg_log_ml('PEERCRED: ' . print_r($t, TRUE));
*/
}
function rg_handle_idle(&$conn_table)
{
$now = time();
foreach ($conn_table['conns'] as $key => $info) {
if (!isset($info['close_at'])) {
rg_internal_error('close_at is not set!');
continue;
}
if ($info['close_at'] < $now) {
rg_log("Destroy $key because has been too much time idle.");
rg_destroy($key, $conn_table);
}
}
}
rg_prof_start("MAIN");
rg_log_set_file($rg_log_dir . "/cache.log");
rg_log_set_sid("000000"); // to spread the logs
rg_log("Start (ver=$rocketgit_version)...");
// Remove the socket, else we will get error
if (file_exists($rg_cache_socket))
unlink($rg_cache_socket);
$master = socket_create(AF_UNIX, SOCK_STREAM, 0);
if ($master === FALSE) {
rg_internal_error("Cannot create events socket!");
exit(1);
}
$r = socket_bind($master, $rg_cache_socket);
if ($r === FALSE) {
rg_internal_error("Cannot bind socket!");
exit(1);
}
$r = socket_listen($master, 128);
if ($r === FALSE) {
rg_internal_error("Cannot set queue length on socket!");
exit(1);
}
socket_set_nonblock($master);
$r = chmod($rg_cache_socket, 0600);
if ($r === FALSE) {
rg_internal_error("Cannot set rights on cache socket!");
exit(1);
}
$conn_table = array("r" => array(), "w" => array(), "conns" => array());
$conn_table['r']['master'] = $master;
do {
rg_log_buffer_clear();
//rg_log_ml("conn_table: " . print_r($conn_table, TRUE));
$r2 = $conn_table['r']; $w2 = $conn_table['w']; $ex = array();
$r = @socket_select($r2, $w2, $ex, 5);
if ($r === FALSE)
rg_fatal("Cannot select (" . socket_strerror(socket_last_error()) . ")!");
if ($r > 0) {
//if (!empty($r2))
// rg_log_ml('r2: ' . print_r($r2, TRUE));
//if (!empty($w2))
// rg_log_ml('w2: ' . print_r($w2, TRUE));
//if (!empty($ex))
// rg_log_ml('ex: ' . print_r($ex, TRUE));
foreach ($r2 as $key => $socket) {
if (strcmp($key, "master") == 0) {
$client = @socket_accept($socket);
if ($client === FALSE) {
rg_log("Connection seems broken!");
continue;
}
rg_handle_new($client, $conn_table);
continue;
}
rg_handle_recv($key, $conn_table);
}
foreach ($w2 as $key => $sock) {
if (isset($conn_table['conns'][$key]))
rg_handle_send($key, $conn_table);
}
foreach ($ex as $key => $sock)
rg_destroy($key, $conn_table);
}
// TODO: if we activate this, we broke the connection with events.php
//rg_handle_idle($conn_table);
} while (1);
socket_close($master);
rg_log("Exiting...");
rg_prof_end("MAIN");
rg_prof_log();
?>