/*
* Author: Catalin(ux) M BOIE <catab at embedromix.ro>
* Date: 2004
* Description: Some functions to help writing network servers and clients,
* both ipv4 and ipv6.
* Licence: LGPL
*/
#include "Conn.h"
/* Visible variables */
void (*Conn_accept_cb)(struct Conn *C) = NULL;
void (*Conn_recv_cb)(struct Conn *C) = NULL;
void (*Conn_send_cb)(struct Conn *C) = NULL;
void (*Conn_data_cb)(struct Conn *C) = NULL;
void (*Conn_close_cb)(struct Conn *C) = NULL;
void (*Conn_trigger_cb)(struct Conn *C) = NULL;
void (*Conn_error_cb)(struct Conn *C) = NULL;
void (*Conn_connected_cb)(struct Conn *C) = NULL;
char *(*Conn_status_slot_html_cb)(struct Conn *C);
char *(*Conn_status_cb)(void);
unsigned int Conn_max_reached = 0;
unsigned int Conn_default_ibuf = 128;
unsigned int Conn_default_obuf = 128;
unsigned int Conn_max_ibuf = 4096000;
unsigned int Conn_max_obuf = 4096000;
/* Max bytes enqueued on one send/recv call */
unsigned int Conn_max_send = 32 * 1024;
unsigned int Conn_max_recv = 32 * 1024;
unsigned int Conn_no = 0;
unsigned int Conn_max = 0;
unsigned long Conn_total = 0;
unsigned int Conn_start = 0;
unsigned int Conn_pending = 0;
struct timeval Conn_now;
unsigned short Conn_level = 0; /* debug level */
/* Internal variables */
static char Conn_error[512];
static struct pollfd *Conn_pfds = NULL;
static struct Conn *Conns = NULL;
static unsigned int Conn_inited = 0;
static unsigned int Conn_accept_is_allowed;
static unsigned int Conn_allocated = 0;
static unsigned long long Conn_id = 1;
static FILE *Conn_Log = NULL;
static int debug_band = 11;
/* Functions */
/*
* Difference between two timeval strutures, in milliseconds
*/
long long Conn_time_diff(const struct timeval *t1, const struct timeval *t2)
{
return (t1->tv_sec - t2->tv_sec) * 1000
+ (t1->tv_usec - t2->tv_usec) / 1000;
}
char *Conn_strerror(void)
{
return Conn_error;
}
/* set noblocking */
static int Conn_setnonblock(int fd)
{
int ret;
long flags;
flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
return -1;
flags |= O_NONBLOCK;
ret = fcntl(fd, F_SETFL, flags);
return ret;
}
static void Log(unsigned short level, char *format, ...)
{
va_list ap;
FILE *out;
if (level > Conn_level)
return;
if (Conn_Log == NULL)
out = stderr;
else
out = Conn_Log;
fprintf(out, "%ld.%06ld ",
Conn_now.tv_sec, Conn_now.tv_usec);
va_start(ap, format);
vfprintf(out, format, ap);
va_end(ap);
}
char *Conn_dump(char *buf_src, int len_src)
{
int i, j;
char tmp[3];
char *buf_dst;
unsigned char c;
if (len_src < 0)
return strdup("[Error: len < 0]");
Log(30, "\tConn_dump(%p, len=%d)\n",
buf_src, len_src);
buf_dst = malloc(len_src * 4 + 1);
if (buf_dst == NULL)
return strdup("Memory allocation error1!");
j = 0;
for (i = 0; i < len_src; i++) {
c = buf_src[i];
if ((c < 32) || (c > 127)) {
buf_dst[j++] = '[';
snprintf(tmp, sizeof(tmp), "%02x", c);
buf_dst[j++] = tmp[0];
buf_dst[j++] = tmp[1];
buf_dst[j++] = ']';
} else {
buf_dst[j++] = c;
}
}
buf_dst[j] = '\0';
/*
Log(0, "%s ([%s], %d, [%s], %d\n",
__FUNCTION__, buf_src, len_src, buf_dst, len_dst);
*/
return buf_dst;
}
char *Conn_dumphex(char *buf_src, int len_src)
{
int i, j;
char tmp[3];
char *buf_dst;
unsigned char c;
if (len_src < 0)
return strdup("[Error: len < 0]");
Log(30, "\tConn_dumphex(%p, len=%d)\n",
buf_src, len_src);
buf_dst = malloc(len_src * 2 + 1);
if (buf_dst == NULL)
return strdup("Memory allocation error1!");
j = 0;
for (i = 0; i < len_src; i++) {
c = buf_src[i];
snprintf(tmp, sizeof(tmp), "%02x", c);
buf_dst[j++] = tmp[0];
buf_dst[j++] = tmp[1];
}
buf_dst[j] = '\0';
return buf_dst;
}
void Conn_debug(FILE *f, unsigned short debug)
{
Conn_Log = f;
Conn_level = debug;
}
/* Grow Conn structures */
static unsigned int Conn_grow(void)
{
unsigned int alloc;
struct pollfd *p, *set;
struct Conn *q, *set2;
Log(10, "%s() Try to grow from %d to %d.\n",
__FUNCTION__,
Conn_allocated, Conn_allocated + 128);
alloc = Conn_allocated + 128;
p = (struct pollfd *) realloc(Conn_pfds, alloc * sizeof(struct pollfd));
if (p == NULL)
return 0;
q = (struct Conn *) realloc(Conns, alloc * sizeof(struct Conn));
if (q == NULL) {
free(p);
return 0;
}
set = p + Conn_allocated;
memset(set, 0, 128 * sizeof(struct pollfd));
Conn_pfds = p;
set2 = q + Conn_allocated;
memset(set2, 0, 128 * sizeof(struct Conn));
Conns = q;
Conn_allocated = alloc;
return 1;
}
int Conn_init(unsigned int max)
{
unsigned int ret;
if (Conn_inited == 1)
return 0;
Conn_max = max;
Conn_no = 0;
Conn_total = 0;
Conn_max_reached = 0;
gettimeofday(&Conn_now, NULL);
Conn_start = Conn_now.tv_sec;
Conn_accept_is_allowed = 1;
Conn_allocated = 0;
snprintf(Conn_error, sizeof(Conn_error), "%s", "");
ret = Conn_grow();
if (ret == 0) {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot grow anymore. Probably memory shortage.");
return -1;
}
Conn_inited = 1;
return 0;
}
static void Conn_accept_allow(unsigned short val)
{
unsigned int i;
if (Conn_accept_is_allowed == val)
return;
Log(10, "%s: Turn accept allow to %d (%s)\n",
__FUNCTION__, val, val == 0 ? "off" : "on");
for (i = 0; i < Conn_no; i++) {
if (Conns[i].type != Conn_type_MASTER)
continue;
if (val == 0)
Conn_pfds[i].events &= ~POLLIN;
else
Conn_pfds[i].events |= POLLIN;
}
Conn_accept_is_allowed = val;
}
/* Alloc a Conn structure */
static struct Conn *Conn_alloc(void)
{
unsigned int slot, growok;
void *p;
Log(10, "%s() Conn_no=%d Conn_max=%d\n",
__FUNCTION__,
Conn_no, Conn_max);
if ((Conn_max > 0) && (Conn_no >= Conn_max)) {
snprintf(Conn_error, sizeof(Conn_error),
"Limit reached! Consider a raise of max connection number or put 0 for no limit.");
return NULL;
}
if (Conn_allocated == Conn_no) {
growok = Conn_grow();
if (growok == 0) {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot grow anymore. Probably memory shortage.");
return NULL;
}
}
if (Conn_no > Conn_max_reached)
Conn_max_reached = Conn_no;
slot = Conn_no;
Conns[slot].type = Conn_type_UNK;
Conns[slot].state = CONN_STATE_FREE;
Conns[slot].slot = slot;
if (Conns[slot].ibuf_size < Conn_default_ibuf) {
p = realloc(Conns[slot].ibuf, Conn_default_ibuf);
if (p == NULL) {
snprintf(Conn_error, sizeof(Conn_error),
"Memory allocation error2!");
return NULL;
}
Conns[slot].ibuf = p;
Conns[slot].ibuf_size = Conn_default_ibuf;
}
Conns[slot].ibuf_head = 0;
Conns[slot].ibuf_tail = 0;
if (Conns[slot].obuf_size < Conn_default_obuf) {
p = realloc(Conns[slot].obuf, Conn_default_obuf);
if (p == NULL) {
snprintf(Conn_error, sizeof(Conn_error),
"Memory allocation error3!");
return NULL;
}
Conns[slot].obuf = p;
Conns[slot].obuf_size = Conn_default_obuf;
}
Conns[slot].obuf_head = 0;
Conns[slot].obuf_tail = 0;
Conns[slot].trecv = Conn_now;
Conns[slot].bi = 0;
Conns[slot].bo = 0;
Conns[slot].private = NULL;
/* bandwidth */
Conns[slot].band_width = 0;
Conns[slot].band_factor = 0;
Conns[slot].band_tokens = 0;
Conns[slot].band_lasttime = Conn_now;
Conn_pfds[slot].fd = -1;
Conn_pfds[slot].events = 0;
Conn_pfds[slot].revents = 0;
Conns[slot].state = CONN_STATE_EMPTY;
Conns[slot].flags = 0;
Conn_no++;
Log(10, "\tFound free slot %u. Now Conn_no=%d\n",
slot, Conn_no);
if (Conn_no == Conn_max)
Conn_accept_allow(0);
return &Conns[slot];
}
/*
* If put buffer is empty, just mark for closing.
* If we have data, set the flag to do the closing after send.
*/
void Conn_close(struct Conn *C)
{
if (C->obuf_head == C->obuf_tail)
C->error_state = CONN_ERROR_USERREQ;
else
C->flags |= CONN_FLAGS_CLOSE_AFTER_SEND;
}
/*
* Returns string representation of errno code
*/
static char *Conn_errno(struct Conn *C)
{
static char buf[256];
char *is;
switch (C->error_state) {
case CONN_ERROR_USERREQ: is = "user"; break;
case CONN_ERROR_POLL: is = "poll"; break;
case CONN_ERROR_RECV: is = "recv"; break;
case CONN_ERROR_SEND: is = "send"; break;
case CONN_ERROR_SOCKET: is = "socket"; break;
case CONN_ERROR_HANGUP: is = "hangup"; break;
case CONN_ERROR_GETADDRINFO: is = "lookup error"; break;
case CONN_ERROR_EXPIRED: is = "expired"; break;
case CONN_ERROR_ACCEPT: is = "accept"; break;
case CONN_ERROR_MEM: is = "allocation failed"; break;
case CONN_ERROR_CONNECT: is = "connect"; break;
case CONN_ERROR_READ_TIMEOUT: is = "read timeout"; break;
case CONN_ERROR_CONN_TIMEOUT: is = "conn timeout"; break;
default: is = "?";
}
snprintf(buf, sizeof(buf), "%s (%s)",
is, strerror(C->xerrno));
return buf;
}
static char *Conn_state(struct Conn *C)
{
switch (C->state) {
case CONN_STATE_FREE: return "FREE";
case CONN_STATE_EMPTY: return "EMPTY";
case CONN_STATE_OPEN: return "OPEN";
case CONN_STATE_LISTEN: return "LISTEN";
case CONN_STATE_CONNECT_0: return "CONN0";
case CONN_STATE_CONNECT_a: return "CONNa";
case CONN_STATE_CONNECT_b: return "CONNb";
default: return "BUG?";
}
}
static void Conn_free_intern(struct Conn *C)
{
unsigned int slot;
slot = C->slot;
Log(7, "Cleaning-up slot in %s state id %llu [%s]...\n",
Conn_state(C), C->id, Conn_errno(C));
snprintf(Conn_error, sizeof(Conn_error),
"%s", Conn_errno(C));
if (C->error_state != CONN_ERROR_USERREQ) {
if (Conns[slot].cb_error)
Conns[slot].cb_error(C);
else if (Conn_error_cb)
Conn_error_cb(C);
}
if (Conns[slot].state == CONN_STATE_OPEN) {
if (Conns[slot].cb_close)
Conns[slot].cb_close(C);
else if (Conn_close_cb)
Conn_close_cb(&Conns[slot]);
}
if (Conn_pfds[slot].fd > -1) {
close(Conn_pfds[slot].fd);
Conn_pfds[slot].fd = -1;
}
/* Reset tsend, else we enter in a timeout error loop */
Conns[slot].tsend.tv_sec = 0;
Conns[slot].tsend.tv_usec = 0;
Conn_pfds[slot].events = 0;
Conn_pfds[slot].revents = 0;
/* Reset the connection attempt time */
Conns[slot].conn_syn.tv_sec = 0;
Conns[slot].conn_syn.tv_usec = 0;
if (Conns[slot].flags & CONN_FLAGS_AUTO_RECONNECT) {
Conns[slot].tryat = Conn_now.tv_sec + Conns[slot].delay;
Conns[slot].error_state = 0;
Conns[slot].state = CONN_STATE_CONNECT_0;
Conns[slot].ibuf_head = 0;
Conns[slot].ibuf_tail = 0;
Conns[slot].obuf_head = 0;
Conns[slot].obuf_tail = 0;
Conn_pending++;
} else {
Conns[slot].type = Conn_type_UNK;
Conns[slot].state = CONN_STATE_FREE;
if (slot < Conn_no - 1) {
Log(10, "Move last pfd/Conn entry %d -> %d...\n",
Conn_no - 1, slot);
/* free old mem */
if (Conns[slot].ibuf)
free(Conns[slot].ibuf);
if (Conns[slot].obuf)
free(Conns[slot].obuf);
Conn_pfds[slot] = Conn_pfds[Conn_no - 1];
Conns[slot] = Conns[Conn_no - 1];
Conns[slot].slot = slot;
/* fix */
Conns[Conn_no - 1].ibuf = NULL;
Conns[Conn_no - 1].ibuf_size = 0;
Conns[Conn_no - 1].obuf = NULL;
Conns[Conn_no - 1].obuf_size = 0;
}
Conn_no--;
Conn_accept_allow(1);
}
}
/*
* Expand the requested buffer
* what = 0 for out buffer, what = 1 for input buffer
* returns 0 if OK, -1 on error
*/
int Conn_try_expand_buf(struct Conn *C, int what, int needed)
{
char *p;
unsigned int hm;
unsigned int slot, old_size, amount, head, tail;
slot = C->slot;
if (what == 0) {
head = C->obuf_head;
tail = C->obuf_tail;
} else {
head = C->ibuf_head;
tail = C->ibuf_tail;
}
Log(10, "\tTry to expand buffer on slot %u for [%s] needed=%d head=%u tail=%u.\n",
slot, what == 0 ? "o" : "i", needed,
head, tail);
amount = needed;
if (what == 0) {
if (amount < Conn_default_obuf)
amount = Conn_default_obuf;
old_size = Conns[slot].obuf_size;
hm = Conns[slot].obuf_size + amount;
if (hm > Conn_max_obuf)
hm = Conn_max_obuf;
p = realloc(Conns[slot].obuf, hm);
if (p == NULL) {
Log(3, "Cannot realloc obuf!\n");
return -1;
}
Conns[slot].obuf = p;
Conns[slot].obuf_size = hm;
Log(10, "\tSucces. Old/new size = %u/%u.\n",
old_size, Conns[slot].obuf_size);
} else {
if (amount < Conn_default_ibuf)
amount = Conn_default_ibuf;
old_size = Conns[slot].ibuf_size;
hm = Conns[slot].ibuf_size + amount;
if (hm > Conn_max_ibuf)
hm = Conn_max_ibuf;
p = realloc(Conns[slot].ibuf, hm);
if (p == NULL) {
Log(3, "Cannot realloc ibuf!\n");
return -1;
}
Conns[slot].ibuf = p;
Conns[slot].ibuf_size = hm;
Log(10, "\tSucces. Old/new size = %u/%u.\n",
old_size, Conns[slot].ibuf_size);
}
return 0;
}
int Conn_enqueue(struct Conn *C, void *buf, size_t count)
{
unsigned int slot, r;
char *dump;
if (C == NULL) {
Log(0, "ERROR: Somebody try to enqueue something to a NULL conn.\n");
return -1;
}
if (Conn_level >= 10) {
dump = Conn_dump(buf, count);
Log(0, "\tTry to enqueue %d bytes to id %llu [%s]...\n",
count, C->id, dump);
free(dump);
}
/* we cannot use pointers directly because they can change under us */
slot = C->slot;
if (Conns[slot].obuf_size - Conns[slot].obuf_tail < count) {
r = Conn_try_expand_buf(&Conns[slot], 0, count);
if (r != 0)
return -1;
}
memcpy(Conns[slot].obuf + Conns[slot].obuf_tail, buf, count);
Conns[slot].obuf_tail += count;
Conn_pfds[slot].events |= POLLOUT;
return 0;
}
struct Conn *Conn_socket(int domain, int type, int port)
{
struct Conn *C;
int i, ret;
struct sockaddr *psa = NULL;
struct sockaddr_in sa;
struct sockaddr_in6 sa6;
int sock_len = 0;
char *addr = "?";
unsigned int slot;
int do_bind = 1, do_listen = 1;
int protocol = 0;
int first_state;
switch (domain) {
case PF_INET:
memset(&sa, 0, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_addr.s_addr = htonl(INADDR_ANY);
sa.sin_port = htons(port);
psa = (struct sockaddr *) &sa;
sock_len = sizeof(sa);
addr = "0.0.0.0";
if (type == SOCK_STREAM) {
first_state = CONN_STATE_LISTEN;
} else if (type == SOCK_DGRAM) {
do_listen = 0;
first_state = CONN_STATE_OPEN;
}
break;
case PF_INET6:
memset(&sa6, 0, sizeof(sa6));
sa6.sin6_family = AF_INET6;
ret = inet_pton(AF_INET6, "::", &sa6.sin6_addr);
if (ret < 0) {
snprintf(Conn_error, sizeof(Conn_error),
"inet_pton(::) failed");
return NULL;
}
sa6.sin6_port = htons(port);
psa = (struct sockaddr *) &sa6;
sock_len = sizeof(sa6);
addr = "::";
if (type == SOCK_STREAM) {
first_state = CONN_STATE_LISTEN;
} else if (type == SOCK_DGRAM) {
do_listen = 0;
first_state = CONN_STATE_OPEN;
}
break;
case PF_PACKET:
do_bind = 0;
protocol = htons(port);
first_state = CONN_STATE_OPEN;
break;
default:
snprintf(Conn_error, sizeof(Conn_error),
"Invalid domain [%d]!", domain);
return NULL;
}
C = Conn_alloc();
if (!C)
return NULL;
slot = C->slot;
Conn_pfds[slot].fd = socket(domain, type, protocol);
if (Conn_pfds[slot].fd == -1) {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot create socket (%d, %d, %d) [%s]",
domain, type, protocol, strerror(errno));
Conns[slot].xerrno = errno;
goto out;
}
Conn_pfds[slot].events = POLLIN;
Conn_setnonblock(Conn_pfds[slot].fd);
if (domain == PF_INET6) {
#ifndef IPV6_V6ONLY
#define IPV6_V6ONLY 26
#endif
i = 1;
setsockopt(Conn_pfds[slot].fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&i, sizeof(i));
}
/* Default, client */
C->type = Conn_type_CLIENT;
if (do_bind == 1) {
i = 1;
setsockopt(Conn_pfds[slot].fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
ret = bind(Conn_pfds[slot].fd, psa, sock_len);
if (ret < 0) {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot bind [%s]", strerror(errno));
Conns[slot].xerrno = errno;
goto out;
}
if (do_listen == 1) {
listen(Conn_pfds[slot].fd, 128);
C->type = Conn_type_MASTER;
}
}
C->state = first_state;
C->start = Conn_now.tv_sec;
/* Reset syn time */
C->conn_syn.tv_sec = 0;
C->conn_syn.tv_usec = 0;
C->id = Conn_id++;
snprintf(Conns[slot].addr, sizeof(Conns[slot].addr), "%s", addr);
Conns[slot].port = port;
Conns[slot].sock_domain = domain;
Conns[slot].sock_type = type;
return &Conns[slot];
out:
Conns[slot].error_state = CONN_ERROR_SOCKET;
Conn_free_intern(&Conns[slot]);
return NULL;
}
void Conn_new(struct Conn *C)
{
}
void Conn_accept(struct Conn *C)
{
int fd;
struct sockaddr *pca;
struct sockaddr_in ca4;
struct sockaddr_in6 ca6;
socklen_t cax_len;
struct Conn *X;
unsigned int slot, Xslot;
slot = C->slot;
switch(Conns[slot].sock_domain) {
case PF_INET:
pca = (struct sockaddr *) &ca4;
cax_len = sizeof(ca4);
break;
case PF_INET6:
pca = (struct sockaddr *) &ca6;
cax_len = sizeof(ca6);
break;
default:
snprintf(Conn_error, sizeof(Conn_error),
"Cannot deal with domain %d.",
Conns[slot].sock_domain);
Conns[slot].error_state = CONN_ERROR_SOCKET;
if (Conns[slot].cb_error)
Conns[slot].cb_error(&Conns[slot]);
else if (Conn_error_cb)
Conn_error_cb(&Conns[slot]);
return;
}
fd = accept(Conn_pfds[slot].fd, pca, &cax_len);
if (fd == -1) {
if (errno == EAGAIN)
return;
snprintf(Conn_error, sizeof(Conn_error),
"Cannot accept: %s",
strerror(errno));
Conns[slot].error_state = CONN_ERROR_ACCEPT;
if (Conns[slot].cb_error)
Conns[slot].cb_error(&Conns[slot]);
else if (Conn_error_cb)
Conn_error_cb(&Conns[slot]);
return;
}
X = Conn_alloc();
if (!X) {
Log(0, "ERROR: Cannot alloc a slot!\n");
Conns[slot].error_state = CONN_ERROR_MEM;
if (Conns[slot].cb_error)
Conns[slot].cb_error(&Conns[slot]);
else if (Conn_error_cb)
Conn_error_cb(&Conns[slot]);
close(fd);
return;
}
Xslot = X->slot;
switch (Conns[slot].sock_domain) {
case PF_INET:
inet_ntop(Conns[slot].sock_domain, &ca4.sin_addr, Conns[Xslot].addr, sizeof(Conns[Xslot].addr));
Conns[Xslot].port = ntohs(ca4.sin_port);
break;
case PF_INET6:
inet_ntop(Conns[slot].sock_domain, &ca6.sin6_addr, Conns[Xslot].addr, sizeof(Conns[Xslot].addr));
Conns[Xslot].port = ntohs(ca6.sin6_port);
break;
}
Conns[Xslot].type = Conn_type_CLIENT;
Conns[Xslot].error_state = 0;
Conns[Xslot].state = CONN_STATE_OPEN;
Conns[Xslot].via = Conns[slot].port;
Conn_pfds[Xslot].fd = fd;
Conn_pfds[Xslot].events = POLLIN;
Conns[Xslot].sock_domain = Conns[slot].sock_domain;
Conns[Xslot].sock_type = Conns[slot].sock_type;
Conns[Xslot].start = Conn_now.tv_sec;
Conns[Xslot].id = Conn_id++;
Conn_setnonblock(Conn_pfds[Xslot].fd);
if (Conns[slot].cb_accept)
Conns[slot].cb_accept(&Conns[slot]);
else if (Conn_accept_cb != NULL)
Conn_accept_cb(&Conns[Xslot]);
Conn_total++;
}
static void Conn_poll_status(short ev, char *ret)
{
strcpy(ret, "________");
if (ev & POLLIN) ret[0] = 'I';
if (ev & POLLPRI) ret[1] = 'P';
if (ev & POLLOUT) ret[2] = 'O';
if (ev & POLLERR) ret[3] = 'E';
if (ev & POLLHUP) ret[4] = 'H';
if (ev & POLLNVAL) ret[5] = 'V';
if (ev & POLLRDNORM) ret[6] = 'r';
if (ev & POLLRDBAND) ret[7] = 'R';
}
static char *Conn_domain(struct Conn *C)
{
switch (C->sock_domain) {
case PF_INET: return "IPv4";
case PF_INET6: return "IPv6";
case PF_PACKET: return "PACKET";
default: return "?";
}
}
static char *Conn_type(struct Conn *C)
{
switch (C->sock_type) {
case SOCK_STREAM: return "stream";
case SOCK_DGRAM: return "dgram";
case SOCK_RAW: return "raw";
default: return "?";
}
}
static char *Conn_socktype(struct Conn *C)
{
switch (C->type) {
case Conn_type_MASTER: return "master";
case Conn_type_CLIENT: return "client";
case Conn_type_UNK: return "unk";
default:
return "?";
}
}
/*
* Returns a nice speed
*/
void Conn_speed(char *dst, unsigned int dst_len, unsigned int speed)
{
float sp;
sp = speed;
if (speed < 1000)
snprintf(dst, dst_len, "%.2fBps", sp);
else if (speed < 1000 * 1000)
snprintf(dst, dst_len, "%.2fKBps", sp / 1000);
else
snprintf(dst, dst_len, "%.2fMBps", sp / 1000 / 1000);
}
char *Conn_status_slot(struct Conn *C)
{
static char tmp[1024];
char polle[16], pollr[16];
char speedi[32], speedo[32];
unsigned int dT, si, so;
Conn_poll_status(Conn_pfds[C->slot].events, polle);
Conn_poll_status(Conn_pfds[C->slot].revents, pollr);
dT = Conn_now.tv_sec - C->start;
if (dT == 0)
dT = 1;
si = C->bi / dT;
so = C->bo / dT;
Conn_speed(speedi, sizeof(speedi), si);
Conn_speed(speedo, sizeof(speedo), so);
snprintf(tmp, sizeof(tmp), "%4d fd%4d"
" %4s %6s %5s %6s"
" %39s/%-5d\n"
" Via%-5d [%s][%s] IO=%llu/%llu"
" BS=%u/%u S=%s/%s"
" T=%ld bw=%u f=%u tk=%u id=%llu\n",
C->slot, Conn_pfds[C->slot].fd,
Conn_domain(C), Conn_type(C), Conn_socktype(C), Conn_state(C),
C->addr, C->port, C->via, polle, pollr, C->bi, C->bo,
C->ibuf_size, C->obuf_size, speedi, speedo,
Conn_now.tv_sec - C->start,
C->band_width, C->band_factor, C->band_tokens,
C->id);
return tmp;
}
char *Conn_status_slot_html(struct Conn *C)
{
static char tmp[1024];
char polle[16], pollr[16], *ext = "";
char speedi[32], speedo[32];
unsigned int dT, si, so;
Conn_poll_status(Conn_pfds[C->slot].events, polle);
Conn_poll_status(Conn_pfds[C->slot].revents, pollr);
dT = Conn_now.tv_sec - C->start;
if (dT == 0)
dT = 1;
si = C->bi / dT;
so = C->bo / dT;
Conn_speed(speedi, sizeof(speedi), si);
Conn_speed(speedo, sizeof(speedo), so);
if (Conn_status_slot_html_cb)
ext = Conn_status_slot_html_cb(C);
snprintf(tmp, sizeof(tmp), "<td>%llu</td><td>%d</td><td>%d</td>"
"<td>%s</td><td>%s</td><td>%s</td><td>%s</td>"
"<td>%s/%d</td>"
"<td>%d</td><td>%s</td><td>%s</td><td>%llu / %llu</td>"
"<td>%u / %u</td><td>%s / %s</td><td>%ld</td>"
"<td>%u</td><td>%u</td><td>%u</td>"
"%s\n",
C->id, C->slot, Conn_pfds[C->slot].fd,
Conn_domain(C), Conn_type(C), Conn_socktype(C), Conn_state(C),
C->addr, C->port, C->via, polle, pollr, C->bi, C->bo,
C->ibuf_size, C->obuf_size,
speedi, speedo, Conn_now.tv_sec - C->start,
C->band_width, C->band_factor, C->band_tokens,
ext);
return tmp;
}
/* flags: bit 1 = 1 - html */
char *Conn_status(unsigned int flags)
{
unsigned int len = 0, i, max;
struct Conn *C;
char tmp[512], tmp_len;
char polle[16], pollr[16];
char *buf, *slot, *ext = "";
char speedi[32], speedo[32];
unsigned long long bi, bo, dT;
max = (Conn_no + 1) * 512 - 1;
buf = malloc(max + 1);
if (!buf)
return strdup("No enough memory!");
strcpy(buf, "");
gettimeofday(&Conn_now, NULL);
/* TODO: "len += " is incorrect */
tmp_len = snprintf(tmp, sizeof(tmp), "Conn_pending=%d Conn_no/Conn_max=%d/%d Conn_total=%lu Conn_uptime=%lus Conn_allocated=%d\n",
Conn_pending, Conn_no, Conn_max, Conn_total, Conn_now.tv_sec - Conn_start, Conn_allocated);
if (len + tmp_len < max) {
strcat(buf, tmp);
len += tmp_len;
}
if (flags & 1)
if (Conn_status_cb)
ext = Conn_status_cb();
if (flags & 1) {
strcat(buf, "<table border=\"0\" cellspacing=\"1\" cellpadding=\"3\" bgcolor=\"#aaaaaa\">\n");
strcat(buf, "<tr bgcolor=\"ffffff\">\n");
strcat(buf, "<td>ID</td>");
strcat(buf, "<td>Slot</td>");
strcat(buf, "<td>FD</td>");
strcat(buf, "<td>Dom</td>");
strcat(buf, "<td>Type</td>");
strcat(buf, "<td>SType</td>");
strcat(buf, "<td>State</td>");
strcat(buf, "<td>Addr/port</td>");
strcat(buf, "<td>Via</td>");
strcat(buf, "<td>Polle</td>");
strcat(buf, "<td>Pollr</td>");
strcat(buf, "<td>BI/BO</td>");
strcat(buf, "<td>BUF I/O</td>");
strcat(buf, "<td>Speed I/O</td>");
strcat(buf, "<td>Elap (s)</td>");
strcat(buf, "<td>Band</td>");
strcat(buf, "<td>F</td>");
strcat(buf, "<td>Tks</td>");
strcat(buf, ext);
strcat(buf, "</tr>\n");
} else {
strcat(buf, ext);
}
bi = 0; bo = 0; dT = 0;
for (i = 0; i < Conn_no; i++) {
C = &Conns[i];
if (C->state == CONN_STATE_FREE)
continue;
if (C->type == Conn_type_CLIENT) {
bi += C->bi;
bo += C->bo;
dT += Conn_now.tv_sec - C->start;
}
if (flags & 1)
strcat(buf, "<tr bgcolor=\"ffffff\">\n");
Conn_poll_status(Conn_pfds[C->slot].events, polle);
Conn_poll_status(Conn_pfds[C->slot].revents, pollr);
if ((flags & 1) == 0)
slot = Conn_status_slot(C);
else
slot = Conn_status_slot_html(C);
len += snprintf(tmp, sizeof(tmp), "%s", slot);
if (len < max)
strcat(buf, tmp);
if (flags & 1)
strcat(buf, "</tr>\n");
}
if (flags & 1)
strcat(buf, "</table>\n");
if (dT == 0)
dT = 1;
Conn_speed(speedi, sizeof(speedi), bi / dT);
Conn_speed(speedo, sizeof(speedo), bo / dT);
tmp_len = snprintf(tmp, sizeof(tmp), "Total speed I/O: %s / %s."
" Total bytes I/O: %llu / %llu\n",
speedi, speedo, bi, bo);
if (len + tmp_len < max) {
strcat(buf, tmp);
len += tmp_len;
}
return buf;
}
/*
* Returns the number of bytes in 'in' buffer
*/
unsigned int Conn_qlen(struct Conn *C)
{
return C->ibuf_tail - C->ibuf_head;
}
/*
* Eat @bytes from head of input buffer
*/
void Conn_eat(struct Conn *C, unsigned int bytes)
{
unsigned int slot;
slot = C->slot;
/* advance head */
Conns[slot].ibuf_head += bytes;
if (Conns[slot].ibuf_head >= Conns[slot].ibuf_tail) {
Conns[slot].ibuf_head = 0;
Conns[slot].ibuf_tail = 0;
}
Log(10, "Conn_eat(C, %u) head=%u tail=%u qlen=%u\n",
bytes, C->ibuf_head, C->ibuf_tail,
Conn_qlen(C));
}
/*
* Eat all input buffer
*/
void Conn_eatall(struct Conn *C)
{
Conn_eat(C, Conn_qlen(C));
}
/*
* Add tokens to connection
*/
void Conn_band_update(struct Conn *C)
{
unsigned int slot;
long diff;
/* no need */
if (C->band_width == 0)
return;
diff = (Conn_now.tv_sec - C->band_lasttime.tv_sec) * 1000000;
diff += Conn_now.tv_usec - C->band_lasttime.tv_usec;
diff /= 100000;
/* already added in this hundred of milisecond? */
if (diff == 0)
return;
/* take care of time skew */
if (diff < 0)
diff = 1;
C->band_lasttime = Conn_now;
C->band_tokens += diff * C->band_width / 10;
if (C->band_tokens > C->band_factor * C->band_width)
C->band_tokens = C->band_factor * C->band_width;
slot = C->slot;
Conn_pfds[slot].events |= POLLOUT;
Log(debug_band, "\t\tBAND: Added tokens -> %u.\n",
C->band_tokens);
}
/*
* Set the bandwidth for a connection
* width is in 1b increments
*/
int Conn_band(struct Conn *C, unsigned int width, unsigned int factor)
{
Log(11, "\tConn_band(C, width=%u, factor=%u)\n",
width, factor);
C->band_lasttime = Conn_now;
C->band_width = width;
C->band_factor = factor;
C->band_tokens = factor * width;
Log(debug_band, "\t\tBAND: lasttime=%d.%06d, width=%u, factor=%u, tokens=%u\n",
C->band_lasttime.tv_sec, C->band_lasttime.tv_usec,
C->band_width, C->band_factor, C->band_tokens);
return 0;
}
static void Conn_send_cb_i(struct Conn *C)
{
ssize_t n;
unsigned int max;
int count;
unsigned int slot;
char *buf;
int xerrno;
char *dump;
Log(10, "Conn_send_cb_i id=%llu, slot=%u...\n",
C->id, C->slot);
slot = C->slot;
buf = Conns[slot].obuf + Conns[slot].obuf_head;
count = Conns[slot].obuf_tail - Conns[slot].obuf_head;
if (count == 0) {
Log(13, "\tConn_send_cb_i: Empty buffer! Strange!\n");
return;
}
max = count;
if ((Conn_max_send > 0) && (max > Conn_max_send))
max = Conn_max_send;
/* bandwidth */
if (Conns[slot].band_width > 0) {
if (max > Conns[slot].band_tokens)
max = Conns[slot].band_tokens;
if (max == 0) {
Log(debug_band, "\tBAND: Suspend 100ms the C (no tokens)!\n");
Conn_pfds[slot].events &= ~POLLOUT;
return;
}
}
again:
Log(10, "\tsend(fd%d, buf=%p (head=%u), max=%d (count=%d), 0)...\n",
Conn_pfds[slot].fd, buf, Conns[slot].obuf_head, max, count);
n = send(Conn_pfds[slot].fd, buf, max, 0);
xerrno = errno;
if ((n == -1) && (errno == EINTR))
goto again;
if ((n == -1) && (errno == EAGAIN))
return;
Log(10, "%s: Slot %u: FD%d Sent %d bytes [head=%d tail=%d]\n",
__FUNCTION__, slot, Conn_pfds[slot].fd,
n, Conns[slot].obuf_head, Conns[slot].obuf_tail);
if (Conn_level >= 10) {
dump = Conn_dump(buf, n);
Log(0, "\t%s\n", dump);
free(dump);
}
if (n > 0) {
Conns[slot].tsend = Conn_now;
if (n < count) {
Conns[slot].obuf_head += n;
} else {
Conns[slot].obuf_head = 0;
Conns[slot].obuf_tail = 0;
}
Conns[slot].bo += n;
if (C->band_width > 0) {
Conns[slot].band_tokens -= n;
Log(debug_band, "\t%s: BAND: Remove %d tokens -> %u...\n",
__FUNCTION__,
n, Conns[slot].band_tokens);
}
} else {
Log(0, "%s: Error in send [id %llu] [slot %u] [%s]\n",
__FUNCTION__,
Conns[slot].id, slot, strerror(errno));
Conns[slot].error_state = CONN_ERROR_SEND;
Conns[slot].xerrno = xerrno;
}
}
static void Conn_recv_cb_i(struct Conn *C)
{
ssize_t n;
unsigned int max;
unsigned int slot;
int r, xerrno;
char *dump;
Log(10, "Conn_recv_cb_i id=%llu, slot=%u...\n",
C->id, C->slot);
slot = C->slot;
if (Conns[slot].ibuf_tail == Conns[slot].ibuf_size) {
r = Conn_try_expand_buf(&Conns[slot], 1, 0);
if (r != 0) {
Log(1, "MEM: Cannot expand ibuf!\n");
return;
}
}
max = Conns[slot].ibuf_size - Conns[slot].ibuf_tail;
if ((Conn_max_recv > 0) && (max > Conn_max_recv))
max = Conn_max_recv;
while (1) {
n = recv(Conn_pfds[slot].fd, Conns[slot].ibuf + Conns[slot].ibuf_tail, max, 0);
if ((n == -1) && (errno == EINTR))
continue;
xerrno = errno;
break;
}
Log(10, "%s: Slot %u: FD%d Received %d bytes.\n",
__FUNCTION__, slot, Conn_pfds[slot].fd, n);
if (n > 0) {
if (Conn_level >= 10) {
dump = Conn_dump(Conns[slot].ibuf
+ Conns[slot].ibuf_tail, n);
Log(0, "\t%s\n", dump);
free(dump);
}
Conns[slot].ibuf_tail += n;
Conns[slot].bi += n;
Conns[slot].trecv = Conn_now;
if (Conns[slot].cb_data)
Conns[slot].cb_data(&Conns[slot]);
else if (Conn_data_cb)
Conn_data_cb(&Conns[slot]);
} else if (n == 0) {
Conns[slot].error_state = CONN_ERROR_HANGUP;
} else {
Log(4, "Error in recv id %llu slot %u [%s]\n",
Conns[slot].id, slot, strerror(errno));
Conns[slot].error_state = CONN_ERROR_RECV;
Conns[slot].xerrno = xerrno;
}
}
/*
* Set some internal parameters
*/
void Conn_set(struct Conn *C, int var, unsigned int val)
{
unsigned int slot;
int fd;
slot = C->slot;
fd = Conn_pfds[slot].fd;
switch (var) {
case CONN_PARA_AUTO_RECONNECT:
C->flags |= (val == 0) ? 0 : CONN_FLAGS_AUTO_RECONNECT;
break;
case CONN_PARA_RECONNECT_DELAY:
C->delay = val;
break;
case CONN_PARA_IDLE_TIME:
C->idle = val;
break;
case CONN_PARA_READ_TIMEOUT:
C->read_timeout = val;
break;
case CONN_PARA_CONN_TIMEOUT:
C->conn_timeout = val;
break;
case CONN_PARA_TRIGGER:
C->trigger = val;
break;
case CONN_PARA_IBUF:
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val));
break;
case CONN_PARA_OBUF:
setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val));
break;
}
}
struct Conn *Conn_connect(int domain, int type, char *addr, int port)
{
struct Conn *X;
unsigned int Xslot;
Log(8, "%s(%s, %d)\n",
__FUNCTION__, addr, port);
X = Conn_alloc();
if (!X)
return NULL;
Xslot = X->slot;
Conns[Xslot].type = Conn_type_CLIENT;
Conns[Xslot].error_state = 0;
Conns[Xslot].state = CONN_STATE_CONNECT_a;
snprintf(Conns[Xslot].addr, sizeof(Conns[Xslot].addr), "%s", addr);
Conns[Xslot].port = port;
Conns[Xslot].sock_domain = domain;
Conns[Xslot].sock_type = type;
Conns[Xslot].id = Conn_id++;
Conns[Xslot].start = Conn_now.tv_sec;
Conn_pending++;
return &Conns[Xslot];
}
static void Conn_trytoconnect(void)
{
struct addrinfo hints;
struct addrinfo *res;
int i, ret;
char port[8];
Log(8, "%s() Conn_no=%d Conn_pending=%d\n",
__FUNCTION__, Conn_no, Conn_pending);
for (i = Conn_no - 1; i >= 0; i--) {
if (Conns[i].type != Conn_type_CLIENT)
continue;
if ((Conns[i].state == CONN_STATE_CONNECT_0)
&& (Conns[i].tryat <= Conn_now.tv_sec)) {
Conns[i].state = CONN_STATE_CONNECT_a;
}
if (Conns[i].state != CONN_STATE_CONNECT_a)
continue;
Log(9, "\tTry to connect to %s/%d...\n",
Conns[i].addr, Conns[i].port);
memset(&hints, 0, sizeof(hints));
if (Conns[i].sock_domain == 0)
hints.ai_family = PF_UNSPEC;
else
hints.ai_family = Conns[i].sock_domain;
hints.ai_socktype = Conns[i].sock_type;
/*hints.ai_flags = AI_NUMERICHOST;*/
hints.ai_flags = 0;
snprintf(port, sizeof(port), "%d", Conns[i].port);
res = NULL;
ret = getaddrinfo(Conns[i].addr, port, &hints, &res);
if (ret != 0) {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot call getaddrinfo [%s]", gai_strerror(ret));
Log(9, "\t%s\n", Conn_error);
Conns[i].error_state = CONN_ERROR_GETADDRINFO;
Conn_free_intern(&Conns[i]);
if (res)
freeaddrinfo(res);
continue;
}
if (Conn_pfds[i].fd == -1) {
Conn_pfds[i].fd = socket(res->ai_family, res->ai_socktype, 0);
if (Conn_pfds[i].fd == -1) {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot create socket [%s]", strerror(errno));
Log(9, "\t%s\n", Conn_error);
Conns[i].error_state = CONN_ERROR_SOCKET;
Conn_free_intern(&Conns[i]);
freeaddrinfo(res);
continue;
}
Log(10, " Allocated socket %d\n",
Conn_pfds[i].fd);
Conn_pfds[i].events |= POLLIN | POLLOUT;
Conn_setnonblock(Conn_pfds[i].fd);
}
Log(9, "\tconnecting...\n");
/* Set syn time */
Conns[i].conn_syn = Conn_now;
ret = connect(Conn_pfds[i].fd, res->ai_addr, res->ai_addrlen);
if ((ret != 0) && (errno != EINPROGRESS)) {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot connect [%d] [%s]", errno, strerror(errno));
Log(9, "\t%s\n", Conn_error);
Conns[i].error_state = CONN_ERROR_CONNECT;
Conn_free_intern(&Conns[i]);
freeaddrinfo(res);
continue;
}
Conns[i].state = CONN_STATE_CONNECT_b;
Conn_pending--;
Conn_total++;
freeaddrinfo(res);
}
Log(10, "%s() FINISH\n",
__FUNCTION__);
}
/*
* Returns 1 if we can ignore this connection */
static int Conn_ignore(struct Conn *C)
{
if (C->error_state > 0)
return 1;
return 0;
}
/*
* Close a connection if it exceeded maximum idle time or got a timeout
*/
static void Conn_expire(struct Conn *C)
{
long long diff_ms;
if ((C->trigger > 0)
&& (C->last_trigger + C->trigger < Conn_now.tv_sec)) {
C->last_trigger = Conn_now.tv_sec;
if (C->cb_trigger)
C->cb_trigger(C);
else if (Conn_trigger_cb)
Conn_trigger_cb(C);
}
if ((C->idle > 0) && (C->trecv.tv_sec + C->idle < Conn_now.tv_sec)) {
C->error_state = CONN_ERROR_EXPIRED;
} else if ((C->read_timeout > 0) && (C->tsend.tv_sec > 0)
&& (C->tsend.tv_sec > C->trecv.tv_sec)) {
diff_ms = Conn_time_diff(&Conn_now, &C->tsend);
if (diff_ms > C->read_timeout) {
C->error_state = CONN_ERROR_READ_TIMEOUT;
}
} else if ((C->conn_timeout > 0) && (C->state == CONN_STATE_CONNECT_b)) {
diff_ms = Conn_time_diff(&Conn_now, &C->conn_syn);
if (diff_ms > C->conn_timeout) {
/* connection attempt expired */
C->error_state = CONN_ERROR_CONN_TIMEOUT;
}
}
}
/*
* Returns: -1 on error, 0 nothing to do, 1 if some work was done
* timeout is in 1/1000 seconds increments.
*/
int Conn_poll(int timeout)
{
int i;
short rev;
char *stats;
int timeout2;
int events;
Log(11, "Conn_poll(timeout=%d Conn_no=%d)\n",
timeout, Conn_no);
if (Conn_no == 0)
return 0;
if (timeout == -1)
timeout2 = 500;
else
timeout2 = timeout;
loop:
if (Conn_pending > 0)
Conn_trytoconnect();
again:
events = poll(&Conn_pfds[0], Conn_no, timeout2);
if ((events == -1) && (errno == EINTR))
goto again;
if (events < 0) {
snprintf(Conn_error, sizeof(Conn_error),
"poll(%p, %d): %s",
(void *)Conn_pfds, Conn_no, strerror(errno));
return -1;
}
gettimeofday(&Conn_now, NULL);
if (Conn_level >= 11) {
stats = Conn_status(0);
Log(0, "%s\n", stats);
free(stats);
}
/* We do revers scan because of moving Conn objects */
Log(11, "Process %d events...\n",
events);
for (i = Conn_no - 1; i >= 0; i--) {
if (events > 0) {
rev = Conn_pfds[i].revents;
if (rev == 0)
continue;
events--;
if (rev & POLLHUP) {
Conns[i].error_state = CONN_ERROR_HANGUP;
}
if (rev & POLLERR) {
Conns[i].error_state = CONN_ERROR_POLL;
Conns[i].xerrno = 0;
}
if (rev & POLLNVAL) {
Log(0, "BUG NVAL!\n");
exit(1);
}
/* First, test we have a new connection */
if ((rev & POLLOUT) && (Conn_ignore(&Conns[i]) == 0)) {
/* We just established a connection */
if (Conns[i].state == CONN_STATE_CONNECT_b) {
if (Conns[i].cb_connected)
Conns[i].cb_connected(&Conns[i]);
else if (Conn_connected_cb)
Conn_connected_cb(&Conns[i]);
Conns[i].state = CONN_STATE_OPEN;
}
}
/* Second, test for error or input */
if ((rev & POLLIN) && (Conn_ignore(&Conns[i]) == 0)) {
if (Conns[i].type == Conn_type_MASTER) {
Conn_accept(&Conns[i]);
} else {
if (Conns[i].cb_recv)
Conns[i].cb_recv(&Conns[i]);
else if (Conn_recv_cb != NULL)
Conn_recv_cb(&Conns[i]);
else
Conn_recv_cb_i(&Conns[i]);
}
}
if ((rev & POLLOUT) && (Conn_ignore(&Conns[i]) == 0)) {
/* We can send data */
if (Conns[i].state == CONN_STATE_OPEN) {
if (Conns[i].cb_send)
Conns[i].cb_send(&Conns[i]);
else if (Conn_send_cb != NULL)
Conn_send_cb(&Conns[i]);
else
Conn_send_cb_i(&Conns[i]);
if (Conns[i].obuf_head == Conns[i].obuf_tail) {
Conn_pfds[i].events &= ~POLLOUT;
if (Conns[i].flags & CONN_FLAGS_CLOSE_AFTER_SEND)
Conns[i].error_state = CONN_ERROR_USERREQ;
}
}
}
}
/* test if it expired/timout */
Conn_expire(&Conns[i]);
if (Conns[i].error_state > 0) {
Conn_free_intern(&Conns[i]);
} else {
/* add tokens */
Conn_band_update(&Conns[i]);
}
}
if (timeout == -1)
goto loop;
else
return 1;
return 1;
}
/*
* Set NODELAY on socket
*/
int Conn_nodelay(struct Conn *C)
{
int i, fd;
fd = Conn_pfds[C->slot] . fd;
i = 1;
return setsockopt(fd, SOL_TCP, TCP_NODELAY, &i, sizeof(i));
}
void Conn_rollback(struct Conn *C, unsigned int bytes)
{
if (C->obuf_tail - C->obuf_head <= bytes)
C->obuf_tail -= bytes;
}
/*
* Search for str in active buffer from a given offset
* Returns pointer to string if match or NUll if doesn't.
*/
char *Conn_ostrstr(struct Conn *C, unsigned int off, char *str)
{
unsigned int len, str_len, i;
char *buf, *ret = NULL;
len = C->ibuf_tail - C->ibuf_head - off;
buf = C->ibuf + C->ibuf_head + off;
str_len = strlen(str);
if (len < str_len)
return NULL;
i = 0;
while (i <= len - str_len) {
if (strncmp(buf + i, str, str_len) == 0) {
ret = buf + i;
break;
}
i++;
}
return ret;
}
/*
* Search for str in active buffer
* Returns pointer to string if match or NUll if doesn't.
*/
char *Conn_strstr(struct Conn *C, char *str)
{
return Conn_ostrstr(C, 0, str);
}
/*
* Returns a pointer to current in buffer
*/
char *Conn_ibuf(struct Conn *C)
{
return C->ibuf + C->ibuf_head;
}
/*
* Returns a pointer to current out buffer
*/
char *Conn_obuf(struct Conn *C)
{
return C->obuf + C->obuf_head;
}
/*
* Returns the id of a connection
*/
unsigned long long Conn_getid(struct Conn *C)
{
return C->id;
}
/*
* Returns a Conn* searching for id
*/
struct Conn *Conn_get(unsigned long long id)
{
struct Conn *R = NULL;
int i;
for (i = Conn_no - 1; i >= 0; i--) {
if (Conns[i].id == id) {
R = &Conns[i];
break;
}
}
return R;
}
/*
* Returns the fd associated with C
*/
int Conn_get_fd(struct Conn *C)
{
if (C == NULL)
return -1;
return Conn_pfds[C->slot].fd;
}
/*
* Returns the timeval of the last packet
*/
void Conn_last_time(struct Conn *C, struct timeval *tv)
{
*tv = C->trecv;
}
/*
* Set a callback
*/
int Conn_set_cb(struct Conn *C, unsigned int type, void (*f)(struct Conn *))
{
switch (type) {
case CONN_CB_ACCEPT: C->cb_accept = f; break;
case CONN_CB_RECV: C->cb_recv = f; break;
case CONN_CB_SEND: C->cb_send = f; break;
case CONN_CB_DATA: C->cb_data = f; break;
case CONN_CB_CLOSE: C->cb_close = f; break;
case CONN_CB_TRIGGER: C->cb_trigger = f; break;
case CONN_CB_ERROR: C->cb_error = f; break;
case CONN_CB_CONNECTED: C->cb_connected = f; break;
default:
return -1;
}
return 0;
}
/*
* Returns a '\0' terminated line, modifying received buffer
*/
char *Conn_get_line(struct Conn *C)
{
char *cr;
cr = Conn_strstr(C, "\n");
if (!cr)
return NULL;
*cr = '\0';
return Conn_ibuf(C);
}
/*
* Helper help building text line daemons
*/
void Conn_for_every_line(struct Conn *C, int (*cb)(struct Conn *C, char *line))
{
int ret = 0;
char *line;
unsigned int line_size;
if (cb == NULL)
return;
while (1) {
line = Conn_get_line(C);
if (line == NULL)
break;
line_size = strlen(line) + 1;
ret = cb(C, line);
if (ret != 0)
break;
Conn_eat(C, line_size);
}
}