<?php
// TODO: use 'shutdown_read'.
$rg_conns = array();
$rg_events = array('r' => array(), 'w' => array());
/*
* Prepares a string to be sent over a socket
*/
function rg_conn_prepare($s)
{
return rg_serialize($s);
}
/*
* Shutts down a connection
* 0 = read, 1 = write, 2 = read+write
*/
function rg_conn_shutdown($key, $what)
{
global $rg_conns;
if (!isset($rg_conns[$key]['socket']))
return;
if (!is_resource($rg_conns[$key]['socket'])
&& !is_object($rg_conns[$key]['socket']))
return;
if (($what == 0) || ($what == 2))
$rg_conns[$key]['shutdown_read'] = 1;
if (($what == 1) || ($what == 2))
$rg_conns[$key]['shutdown_write'] = 1;
rg_log($key . ': marking socket for shutdown ' . $what);
}
/*
* Destroys a connection
*/
function rg_conn_destroy($key)
{
global $rg_conns;
global $rg_events;
if (!isset($rg_conns[$key]))
rg_internal_error('key not defined!');
rg_log($key . ': bytes_recv=' . $rg_conns[$key]['bytes_recv']
. ' bytes_sent=' . $rg_conns[$key]['bytes_sent']);
@socket_close($rg_conns[$key]['socket']);
if (isset($rg_conns[$key]['func_destroy']))
$rg_conns[$key]['func_destroy']($key);
if (isset($rg_events['r'][$key]))
unset($rg_events['r'][$key]);
if (isset($rg_events['w'][$key]))
unset($rg_events['w'][$key]);
if (isset($rg_conns[$key]['socket']))
if (is_resource($rg_conns[$key]['socket']))
@socket_close($rg_conns[$key]['socket']);
if ($rg_conns[$key]['exit_on_close']) {
rg_log($key . ': exit_on_close is set, so exit');
exit(1);
}
unset($rg_conns[$key]);
}
/*
* Registers a new socket
*/
function rg_conn_new($key, $socket)
{
global $rg_conns;
global $rg_events;
$ip = '?';
$port ='?';
if (strcmp($key, 'master') != 0)
@socket_getpeername($socket, $ip, $port);
$rg_conns[$key] = array(
'socket' => $socket,
'recv' => '',
'send' => '',
'itime' => time(),
'exit_on_close' => 0,
'func_error' => 'rg_conn_func_error',
'func_close' => 'rg_conn_func_close',
'ip' => $ip,
'port' => $port,
'shutdown_read' => 0,
'shutdown_write' => 0,
'bytes_sent' => 0,
'bytes_recv' => 0
);
$rg_events['r'][$key] = $socket;
socket_set_nonblock($socket);
if (strcmp($key, 'master') != 0)
rg_log($key . ': new connection from ' . $ip . '/' . $port . '.');
}
/*
* Enqueues data to a socket
*/
function rg_conn_enq($key, $buf)
{
global $rg_conns;
global $rg_events;
$s = &$rg_conns[$key];
if ($s['shutdown_write'] == 1) {
rg_log($key . ': cannot enq because we called shutdown(write)');
return;
}
$s['send'] .= $buf;
$rg_events['w'][$key] = $s['socket'];
}
/*
* Called when a socket is ready to send data
*/
function rg_conn_send($key)
{
global $rg_conns;
global $rg_events;
if (!isset($rg_conns[$key])) {
//rg_log($key . ': key not present');
return FALSE;
}
$s = &$rg_conns[$key];
//rg_log($key . ': SEND: ' . $s['send']);
$r = @socket_send($s['socket'], $s['send'], strlen($s['send']), 0);
if ($r === FALSE) {
rg_log($key . ': Cannot send: ' . socket_strerror(socket_last_error()));
$s['func_error']($key);
rg_conn_destroy($key);
return FALSE;
}
$s['send'] = substr($s['send'], $r);
if (empty($s['send'])) {
unset($rg_events['w'][$key]);
if ($s['shutdown_write'] == 1) {
@socket_shutdown($s['socket'], 1 /*writing*/);
rg_log($key . ': shutdown write');
}
}
$s['bytes_sent'] += $r;
return $r;
}
/*
* Called when a socket is ready to be read
*/
function rg_conn_recv($key)
{
global $rg_conns;
global $rg_events;
$s = &$rg_conns[$key];
if (isset($s['func_new'])) {
$client = @socket_accept($s['socket']);
if ($client === FALSE) {
rg_log($key . ': cannot accept: '
. socket_strerror(socket_last_error()));
return;
}
if (isset($s['func_new_arg']))
$arg = $s['func_new_arg'];
else
$arg = FALSE;
socket_set_nonblock($client);
$_i = socket_export_stream($client);
$c = intval($_i);
rg_conn_new($c, $client);
$s['func_new']($c, $arg);
return;
}
$r = @socket_recv($s['socket'], $buf, 16 * 4096, 0);
if ($r === FALSE) {
rg_log($key . ': cannot receive: '
. socket_strerror(socket_last_error()));
$s['func_error']($key);
rg_conn_destroy($key);
return FALSE;
}
if ($r === 0) {
rg_log($key . ': remote closed normally');
$s['func_close']($key);
rg_conn_destroy($key);
return FALSE;
}
//rg_log($key . ': RECV: ' . $buf);
$s['recv'] .= $buf;
$used = $s['func_data']($key, $s['recv']);
$s['recv'] = substr($s['recv'], $used);
$s['bytes_recv'] += $r;
return;
}
/*
* Function that waits for activity and calls different functions
*/
function rg_conn_wait($timeout)
{
global $rg_conns;
global $rg_events;
$r2 = $rg_events['r'];
$w2 = $rg_events['w'];
$e2 = array();
$r = @socket_select($r2, $w2, $e2, $timeout);
if ($r === FALSE) {
rg_log('Cannot select: ' . socket_strerror(socket_last_error()));
return;
}
if ($r === 0)
return;
//if (!empty($r2))
// rg_log_ml('read events: ' . rg_array2string($r2));
foreach ($r2 as $key => $sock)
rg_conn_recv($key);
//if (!empty($w2))
// rg_log_ml('write events: ' . rg_array2string($w2));
foreach ($w2 as $key => $sock)
rg_conn_send($key);
//if (!empty($e2))
// rg_log_ml('error events: ' . rg_array2string($e2));
foreach ($e2 as $key => $sock)
rg_conn_destroy($key);
}
/*
* Generic function called when there is an error on a socket
*/
function rg_conn_func_error($key)
{
}
/*
* Generic function called when a socket closes
*/
function rg_conn_func_close($key)
{
}