/*
* Author: Catalin(ux) M BOIE <catab at embedromix.ro>
* Date: 2004
* Description: Some functions to help writing network servers and clients,
* both ipv4 and ipv6.
* Licence: LGPL
*/
#include "Conn.h"
#include "Conn_engine_poll.h"
#include "Conn_engine_epoll.h"
/* Internal variables */
/* Engine */
static unsigned int Conn_engine;
static unsigned int Conn_engine_poll_found;
static unsigned int Conn_engine_epoll_found;
/* Engine functions */
static int (*Conn_engine_init)(void);
static int (*Conn_engine_shutdown)(void);
static int (*Conn_engine_grow)(unsigned int);
static int (*Conn_engine_add_obj)(struct Conn *);
static int (*Conn_engine_del_obj)(struct Conn *);
static int (*Conn_engine_chg_obj)(struct Conn *);
static int (*Conn_engine_poll)(int, void (*cb)(const unsigned int slot, const int revents));
static int (*Conn_engine_move_slot)(const unsigned int dst,
const unsigned int src);
/* Functions */
/*
* Set prefered engine
*/
int Conn_engine_set(const unsigned int engine)
{
if (engine == Conn_engine)
return 0;
if (engine == CONN_ENGINE_POLL) {
if (Conn_engine_poll_found == 0) {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot use engine POLL. Probably not supported.");
return -1;
}
#ifdef POLL_FOUND
/* Set variables */
CONN_POLLIN = POLLIN;
CONN_POLLOUT = POLLOUT;
CONN_POLLPRI = POLLPRI;
CONN_POLLERR = POLLERR;
CONN_POLLHUP = POLLHUP;
CONN_POLLNVAL = POLLNVAL;
CONN_POLLRDNORM = POLLRDNORM;
CONN_POLLRDBAND = POLLRDBAND;
/* Set functions */
Conn_engine_init = Conn_poll_init;
Conn_engine_shutdown = Conn_poll_shutdown;
Conn_engine_grow = Conn_poll_grow;
Conn_engine_add_obj = Conn_poll_add_obj;
Conn_engine_del_obj = Conn_poll_del_obj;
Conn_engine_chg_obj = Conn_poll_chg_obj;
Conn_engine_poll = Conn_poll_poll;
Conn_engine_move_slot = Conn_poll_move_slot;
#endif
} else if (engine == CONN_ENGINE_EPOLL) {
if (Conn_engine_epoll_found == 0) {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot use engine EPOLL. Probably not supported.");
return -1;
}
#ifdef EPOLL_FOUND
/* Set variables */
CONN_POLLIN = EPOLLIN;
CONN_POLLOUT = EPOLLOUT;
CONN_POLLPRI = EPOLLPRI;
CONN_POLLERR = EPOLLERR;
CONN_POLLHUP = EPOLLHUP;
CONN_POLLNVAL = 0; /* not defined for epoll */
CONN_POLLRDNORM = 0; /* not defined for epoll */
CONN_POLLRDBAND = 0; /* not defined for epoll */
/* Set functions */
Conn_engine_init = Conn_epoll_init;
Conn_engine_shutdown = Conn_epoll_shutdown;
Conn_engine_grow = Conn_epoll_grow;
Conn_engine_add_obj = Conn_epoll_add_obj;
Conn_engine_del_obj = Conn_epoll_del_obj;
Conn_engine_chg_obj = Conn_epoll_chg_obj;
Conn_engine_poll = Conn_epoll_poll;
Conn_engine_move_slot = Conn_epoll_move_slot;
#endif
} else {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot use engine %u because is not supported.", engine);
return -1;
}
return 0;
}
/*
* Reads a value from proc
*/
static void Conn_read_proc(char *buf, const size_t buf_size, const char *file)
{
int fd;
ssize_t n;
fd = open(file, O_RDONLY);
if (fd == -1) {
snprintf(buf, buf_size, "ERROR_OPEN!");
return;
}
n = read(fd, buf, buf_size - 1);
if (n == -1) {
snprintf(buf, buf_size, "ERROR_READ!");
} else if (n == 0) {
strcpy(buf, "");
} else {
buf[n - 1] = '\0';
n--;
while ((n >= 0) && (buf[n - 1] == '\n')) {
buf[n - 1] = '\0';
n--;
}
}
close(fd);
}
/*
* Returns some important system values
*/
char *Conn_sys(void)
{
static char ret[512];
char somaxconn[16];
char tcp_max_tw_buckets[16];
char tcp_fin_timeout[16];
Conn_read_proc(somaxconn, sizeof(somaxconn),
"/proc/sys/net/core/somaxconn");
Conn_read_proc(tcp_max_tw_buckets, sizeof(tcp_max_tw_buckets),
"/proc/sys/net/ipv4/tcp_max_tw_buckets");
Conn_read_proc(tcp_fin_timeout, sizeof(tcp_fin_timeout),
"/proc/sys/net/ipv4/tcp_fin_timeout");
snprintf(ret, sizeof(ret), "net.core.somaxconn=%s"
" net.ipv4.tcp_max_tw_buckets=%s"
" net.ipv4.tcp_fin_timeout=%s",
somaxconn, tcp_max_tw_buckets, tcp_fin_timeout);
return ret;
}
int Conn_init(const unsigned int max)
{
unsigned int ret;
unsigned int engine;
if (Conn_inited == 1)
return 0;
Conn_max = max;
Conn_no = 0;
Conn_work_to_do = 0;
Conn_total = 0;
Conn_max_reached = 0;
gettimeofday(&Conn_now, NULL);
Conn_start = Conn_now.tv_sec;
Conn_accept_is_allowed = 1;
Conn_accept_is_allowed_last = 1;
Conn_allocated = 0;
snprintf(Conn_error, sizeof(Conn_error), "%s", "");
/*
Conn_queue_init(&Conn_queue_free);
*/
#ifdef POLL_FOUND
engine = CONN_ENGINE_POLL;
Conn_engine_poll_found = 1;
#endif
#ifdef EPOLL_FOUND
engine = CONN_ENGINE_EPOLL;
Conn_engine_epoll_found = 1;
#endif
ret = Conn_engine_set(engine);
if (ret != 0)
return -1;
ret = Conn_engine_init();
if (ret != 0)
return -1;
Conn_inited = 1;
return 0;
}
/*
* Shutdown Conn
*/
int Conn_shutdown(void)
{
int ret;
unsigned int slot;
Conn_inited = 0;
ret = Conn_engine_shutdown();
if (ret < 0)
return ret;
/* Free all buffers */
for (slot = 0; slot < Conn_allocated - 1; slot++) {
if (Conns[slot].ibuf)
free(Conns[slot].ibuf);
if (Conns[slot].obuf)
free(Conns[slot].obuf);
}
free(Conns);
return 0;
}
int Conn_enqueue(struct Conn *C, void *buf, const size_t count)
{
unsigned int r, slot;
char *dump;
if (C == NULL) {
Log(0, "ERROR: Somebody try to enqueue something to a NULL conn.\n");
return -1;
}
slot = C->slot;
if (Conn_level >= 10) {
dump = Conn_dump(buf, count);
Log(0, "\tTry to enqueue %d bytes to slot=%u, id=%llu [%s]...\n",
count, slot, Conns[slot].id, dump);
free(dump);
}
if (Conns[slot].obuf_size - Conns[slot].obuf_tail < count) {
r = Conn_try_expand_buf(slot, 0, count);
if (r != 0)
return -1;
}
memcpy(Conns[slot].obuf + Conns[slot].obuf_tail, buf, count);
Conns[slot].obuf_tail += count;
Conns[slot].events |= CONN_POLLOUT;
Conn_engine_chg_obj(&Conns[slot]);
return count;
}
static void Conn_free_intern(const unsigned int slot)
{
Log(11, "%s: Cleaning-up id %llu (slot=%u) in state %s [%s]...\n",
__FUNCTION__, Conns[slot].id, slot, Conn_state(&Conns[slot]),
Conn_errno(&Conns[slot]));
snprintf(Conn_error, sizeof(Conn_error),
"%s", Conn_errno(&Conns[slot]));
if (Conns[slot].error_state != CONN_ERROR_USERREQ)
Conn_error_raise(slot, 0);
if ((Conns[slot].state == CONN_STATE_OPEN)
|| (Conns[slot].state == CONN_STATE_LISTEN)
|| (Conns[slot].state == CONN_STATE_ERROR)) {
if (Conns[slot].cb_close)
Conns[slot].cb_close(&Conns[slot]);
else if (Conn_close_cb)
Conn_close_cb(&Conns[slot]);
}
if (Conns[slot].fd > -1) {
Conn_engine_del_obj(&Conns[slot]);
close(Conns[slot].fd);
Conns[slot].fd = -1;
}
/* Reset tsend, else we enter in a timeout error loop */
Conns[slot].tsend.tv_sec = 0;
Conns[slot].tsend.tv_usec = 0;
/* Reset the connection attempt time */
Conns[slot].conn_syn.tv_sec = 0;
Conns[slot].conn_syn.tv_usec = 0;
/* Misc */
Conns[slot].error_state = 0;
if (Conns[slot].flags & CONN_FLAGS_AUTO_RECONNECT) {
Conns[slot].tryat = Conn_now.tv_sec + Conns[slot].delay;
Conns[slot].state = CONN_STATE_CONNECT_0;
Conns[slot].ibuf_head = 0;
Conns[slot].ibuf_tail = 0;
Conns[slot].obuf_head = 0;
Conns[slot].obuf_tail = 0;
Conn_pending++;
} else {
Conns[slot].type = CONN_TYPE_UNK;
Conns[slot].state = CONN_STATE_FREE;
/* Allow connections */
Conn_accept_is_allowed = 1;
/* Decrement the number of busy connections */
Conn_no--;
Conn_work_to_do--;
}
}
/*
* Grow Conn structures (cells)
*/
static int Conn_grow(void)
{
int ret;
unsigned int alloc;
struct Conn *p, *set;
Log(10, "%s() Try to grow cells from %d to %d.\n",
__FUNCTION__,
Conn_allocated, Conn_allocated + 128);
alloc = Conn_allocated + 128;
ret = Conn_engine_grow(alloc);
if (ret != 0)
return -1;
p = (struct Conn *) realloc(Conns, alloc * sizeof(struct Conn));
if (p == NULL)
return -1;
set = p + Conn_allocated;
memset(set, 0, 128 * sizeof(struct Conn));
Conns = p;
Conn_allocated = alloc;
return 0;
}
/*
* Allocs a Conn structure
*/
struct Conn *Conn_alloc(void)
{
unsigned int growok;
void *p;
unsigned int slot;
Log(10, "%s() Conn_no=%d Conn_max=%d\n",
__FUNCTION__,
Conn_no, Conn_max);
if ((Conn_max > 0) && (Conn_no >= Conn_max)) {
snprintf(Conn_error, sizeof(Conn_error),
"Limit reached! Consider a raise of max connection number or put 0 for no limit.");
return NULL;
}
if (Conn_allocated == Conn_no) {
growok = Conn_grow();
if (growok != 0) {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot grow anymore. Probably memory shortage.");
return NULL;
}
}
if (Conn_no > Conn_max_reached)
Conn_max_reached = Conn_no;
slot = Conn_no;
Conns[slot].slot = slot;
Conns[slot].type = CONN_TYPE_UNK;
Conns[slot].state = CONN_STATE_EMPTY;
if (Conns[slot].ibuf_size < Conn_default_ibuf) {
p = realloc(Conns[slot].ibuf, Conn_default_ibuf);
if (p == NULL) {
snprintf(Conn_error, sizeof(Conn_error),
"Memory allocation error2!");
return NULL;
}
Conns[slot].ibuf = p;
Conns[slot].ibuf_size = Conn_default_ibuf;
}
Conns[slot].ibuf_head = 0;
Conns[slot].ibuf_tail = 0;
if (Conns[slot].obuf_size < Conn_default_obuf) {
p = realloc(Conns[slot].obuf, Conn_default_obuf);
if (p == NULL) {
snprintf(Conn_error, sizeof(Conn_error),
"Memory allocation error3!");
return NULL;
}
Conns[slot].obuf = p;
Conns[slot].obuf_size = Conn_default_obuf;
}
Conns[slot].obuf_head = 0;
Conns[slot].obuf_tail = 0;
Conns[slot].trecv = Conn_now;
Conns[slot].bi = 0;
Conns[slot].bo = 0;
Conns[slot].private = NULL;
/* Reset syn time */
Conns[slot].conn_syn.tv_sec = 0;
Conns[slot].conn_syn.tv_usec = 0;
/* bandwidth */
Conns[slot].band_width = 0;
Conns[slot].band_factor = 0;
Conns[slot].band_tokens = 0;
Conns[slot].band_lasttime = Conn_now;
Conns[slot].fd = -1;
Conns[slot].events = 0;
Conns[slot].revents = 0;
Conns[slot].flags = 0;
Conns[slot].start = Conn_now.tv_sec;
Conns[slot].id = Conn_id++;
Conn_no++;
/* Conn_work_to_do will not be incremented here, only in commit! */
Log(10, "\tFound free slot=%u, id=%llu. Now Conn_no=%d\n",
slot, Conns[slot].id, Conn_no);
if (Conn_no == Conn_max)
Conn_accept_is_allowed = 0;
return &Conns[slot];
}
int Conn_set_socket_domain(struct Conn *C, const int domain)
{
C->sock_domain = domain;
return 0;
}
int Conn_set_socket_type(struct Conn *C, const int type)
{
C->sock_type = type;
return 0;
}
int Conn_set_socket_protocol(struct Conn *C, const int protocol)
{
C->sock_protocol = protocol;
return 0;
}
/*
* Set the address where to bind to
*/
int Conn_set_socket_bind_addr(struct Conn *C, const char *addr)
{
snprintf(C->bind_addr, sizeof(C->bind_addr), "%s", addr);
return 0;
}
/*
* Set address where to connect to
*/
int Conn_set_socket_addr(struct Conn *C, const char *addr)
{
snprintf(C->addr, sizeof(C->addr), "%s", addr);
return 0;
}
int Conn_set_socket_bind_port(struct Conn *C, const int port)
{
C->bind_port = port;
return 0;
}
int Conn_set_socket_port(struct Conn *C, const int port)
{
C->port = port;
return 0;
}
/*
* Allocates socket and bind if necesary
* TODO: We should return -1 or free connection and call calbacks?!
*/
int Conn_commit(struct Conn *C)
{
int i, ret;
struct sockaddr *psa = NULL, *bind_psa = NULL;
struct sockaddr_in sa, bind_sa;
struct sockaddr_in6 sa6, bind_sa6;
int sock_len = 0, bind_sock_len = 0;
int do_bind = 1, do_listen = 1, do_connect = 0;
int first_state;
unsigned int slot;
slot = C->slot;
Log(10, "%s: slot=%u...\n", __FUNCTION__, slot);
/* Be optimistical and increment here, in 'free' we will decrement. */
/* So, in error case, we will only decrement this thing! */
Conn_work_to_do++;
/* Try to figure what kind of socket is: client or master */
if (Conns[slot].type == CONN_TYPE_UNK) {
if (strlen(Conns[slot].addr) > 0) {
Conns[slot].type = CONN_TYPE_P2P;
do_listen = 0;
do_connect = 1;
} else {
Conns[slot].type = CONN_TYPE_MASTER;
if (strlen(Conns[slot].bind_addr) == 0) {
switch (Conns[slot].sock_domain) {
case PF_INET:
snprintf(Conns[slot].bind_addr, sizeof(Conns[slot].bind_addr),
"0.0.0.0"); break;
case PF_INET6:
snprintf(Conns[slot].bind_addr, sizeof(Conns[slot].bind_addr),
"::"); break;
}
}
}
}
switch (Conns[slot].sock_domain) {
case PF_INET:
/* for connection socket */
if (strlen(Conns[slot].addr) > 0) {
memset(&sa, 0, sizeof(sa));
sa.sin_family = AF_INET;
ret = inet_pton(AF_INET, Conns[slot].addr, &sa.sin_addr);
if (ret < 0) {
snprintf(Conn_error, sizeof(Conn_error),
"inet_pton(%s) failed", Conns[slot].addr);
return -1;
}
sa.sin_port = htons(Conns[slot].port);
psa = (struct sockaddr *) &sa;
sock_len = sizeof(sa);
}
/* for binding socket */
if (strlen(Conns[slot].bind_addr) > 0) {
memset(&bind_sa, 0, sizeof(bind_sa));
bind_sa.sin_family = AF_INET;
ret = inet_pton(AF_INET, Conns[slot].bind_addr, &bind_sa.sin_addr);
if (ret < 0) {
snprintf(Conn_error, sizeof(Conn_error),
"inet_pton(%s) failed", Conns[slot].bind_addr);
return -1;
}
bind_sa.sin_port = htons(Conns[slot].bind_port);
bind_psa = (struct sockaddr *) &bind_sa;
bind_sock_len = sizeof(bind_sa);
}
if (Conns[slot].sock_type == SOCK_STREAM) {
first_state = CONN_STATE_LISTEN;
} else if (Conns[slot].sock_type == SOCK_DGRAM) {
do_listen = 0;
first_state = CONN_STATE_OPEN;
}
break;
case PF_INET6:
/* for connection socket */
if (strlen(Conns[slot].addr) > 0) {
memset(&sa6, 0, sizeof(sa6));
sa6.sin6_family = AF_INET6;
ret = inet_pton(AF_INET6, Conns[slot].addr, &sa6.sin6_addr);
if (ret < 0) {
snprintf(Conn_error, sizeof(Conn_error),
"inet_pton(%s) failed", Conns[slot].addr);
return -1;
}
sa6.sin6_port = htons(Conns[slot].port);
psa = (struct sockaddr *) &sa6;
sock_len = sizeof(sa6);
}
/* for binding socket */
if (strlen(Conns[slot].bind_addr) > 0) {
memset(&bind_sa6, 0, sizeof(bind_sa6));
bind_sa6.sin6_family = AF_INET6;
ret = inet_pton(AF_INET6, Conns[slot].bind_addr, &bind_sa6.sin6_addr);
if (ret < 0) {
snprintf(Conn_error, sizeof(Conn_error),
"inet_pton(%s) failed", Conns[slot].bind_addr);
return -1;
}
bind_sa6.sin6_port = htons(Conns[slot].bind_port);
bind_psa = (struct sockaddr *) &bind_sa6;
bind_sock_len = sizeof(bind_sa6);
}
if (Conns[slot].sock_type == SOCK_STREAM) {
first_state = CONN_STATE_LISTEN;
} else if (Conns[slot].sock_type == SOCK_DGRAM) {
do_listen = 0;
first_state = CONN_STATE_OPEN;
}
break;
case PF_PACKET:
do_bind = 0;
do_listen = 0; /*TODO:check this!*/
first_state = CONN_STATE_OPEN;
break;
default:
snprintf(Conn_error, sizeof(Conn_error),
"Invalid domain [%d]!", Conns[slot].sock_domain);
return -1;
}
Conns[slot].fd = socket(Conns[slot].sock_domain, Conns[slot].sock_type, Conns[slot].sock_protocol);
if (Conns[slot].fd == -1) {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot create socket (%s, %s, %s) [%s]",
Conn_domain(&Conns[slot]), Conn_type(&Conns[slot]),
Conn_get_socket_protocol(&Conns[slot]),
strerror(errno));
return -1;
}
Conn_setnonblock(Conns[slot].fd);
if (Conns[slot].sock_domain == PF_INET6) {
#ifndef IPV6_V6ONLY
#define IPV6_V6ONLY 26
#endif
i = 1;
setsockopt(Conns[slot].fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&i, sizeof(i));
}
if (strlen(Conns[slot].bind_addr) > 0) {
i = 1;
setsockopt(Conns[slot].fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
ret = bind(Conns[slot].fd, bind_psa, bind_sock_len);
if (ret < 0) {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot bind on %s/%d [%s]",
Conns[slot].bind_addr, Conns[slot].bind_port, strerror(errno));
goto out_free_fd;
}
}
Conns[slot].events = CONN_POLLIN;
Conns[slot].revents = 0;
if (do_listen == 1)
listen(Conns[slot].fd, 4096);
if (do_connect == 1) {
/*TODO:replace connect_a with OPEN?! */
first_state = CONN_STATE_CONNECT_a;
Conns[slot].events |= CONN_POLLOUT;
}
ret = Conn_engine_add_obj(&Conns[slot]);
if (ret != 0)
goto out_free_fd;
if (do_connect == 1)
Conn_pending++;
Conns[slot].state = first_state;
return 0;
out_free_fd:
Conn_free_intern(slot);
return -1;
}
/*
* OBSOLETE
*/
struct Conn *Conn_socket_addr(const int domain, const int type,
const char *addr, const int port)
{
struct Conn *C;
int ret;
C = Conn_alloc();
if (C == NULL)
return NULL;
Conn_set_socket_domain(C, domain);
Conn_set_socket_type(C, type);
Conn_set_socket_bind_addr(C, addr);
Conn_set_socket_bind_port(C, port);
ret = Conn_commit(C);
if (ret != 0)
return NULL;
return C;
}
/*
* OBSOLETE
*/
struct Conn *Conn_socket(const int domain, const int type, const int port)
{
char *addr;
switch (domain) {
case PF_INET: addr = "0.0.0.0"; break;
case PF_INET6: addr = "::"; break;
default: addr = NULL;
}
return Conn_socket_addr(domain, type, addr, port);
}
/*
* Connects an allocated socket
* OBSOLETE!
*/
struct Conn *Conn_connect(const int domain, const int type, const char *addr,
const int port)
{
struct Conn *X;
int ret;
Log(8, "%s(%s, %d)\n",
__FUNCTION__, addr, port);
X = Conn_alloc();
if (!X)
return NULL;
Conn_set_socket_domain(X, domain);
Conn_set_socket_type(X, type);
Conn_set_socket_addr(X, addr);
Conn_set_socket_port(X, port);
ret = Conn_commit(X);
if (ret != 0)
return NULL;
return X;
}
static void Conn_accept(const unsigned int slot)
{
int fd, err;
struct sockaddr *pca, *psa;
struct sockaddr_in ca4, sa4;
struct sockaddr_in6 ca6, sa6;
socklen_t cax_len, sax_len;
struct Conn *X;
Log(10, "Accepting a connection on slot=%u via %s/%d, type %s, domain %s"
", protocol %s.\n",
slot, Conns[slot].bind_addr, Conns[slot].bind_port, Conn_type(&Conns[slot]),
Conn_domain(&Conns[slot]), Conn_get_socket_protocol(&Conns[slot]));
switch(Conns[slot].sock_domain) {
case PF_INET:
pca = (struct sockaddr *) &ca4;
cax_len = sizeof(ca4);
psa = (struct sockaddr *) &sa4;
sax_len = sizeof(sa4);
break;
case PF_INET6:
pca = (struct sockaddr *) &ca6;
cax_len = sizeof(ca6);
psa = (struct sockaddr *) &sa6;
sax_len = sizeof(sa6);
break;
default:
snprintf(Conn_error, sizeof(Conn_error),
"Cannot deal with domain %d.",
Conns[slot].sock_domain);
Conn_error_raise(slot, EAFNOSUPPORT);
return;
}
fd = accept(Conns[slot].fd, pca, &cax_len);
if (fd == -1) {
if (errno == EAGAIN)
return;
/* TODO: ratelimit */
Log(9, "WARN: Cannot accept on fd %d [%s].\n",
Conns[slot].fd, strerror(errno));
Conn_error_raise(slot, errno);
return;
}
/* After calling Conn_alloc, pointer to slot can change, so C is not valid */
X = Conn_alloc();
if (!X) {
Conn_error_raise(slot, ENOMEM);
close(fd);
return;
}
X->fd = fd;
X->type = CONN_TYPE_P2P;
X->state = CONN_STATE_OPEN;
X->time_open = Conn_now;
X->via = Conns[slot].id;
X->events = CONN_POLLIN;
Conn_set_socket_domain(X, Conns[slot].sock_domain);
Conn_set_socket_type(X, Conns[slot].sock_type);
Conn_set_socket_protocol(X, Conns[slot].sock_protocol);
Conn_setnonblock(X->fd);
Conn_set_address(X, 0);
Conn_set_address(X, 1);
err = Conn_engine_add_obj(X);
if (err != 0) {
Conn_error_raise(slot, err);
Conn_free_intern(X->slot);
return;
}
if (Conns[slot].cb_accept)
Conns[slot].cb_accept(X);
else if (Conn_accept_cb != NULL)
Conn_accept_cb(X);
Conn_work_to_do++;
Conn_total++;
}
static void Conn_accept_allow(void)
{
unsigned int slot;
if (Conn_accept_is_allowed == Conn_accept_is_allowed_last)
return;
Log(10, "%s: Turning accept allow from %d to %d...\n",
__FUNCTION__, Conn_accept_is_allowed_last,
Conn_accept_is_allowed);
for (slot = 0; slot < Conn_no; slot++) {
if (Conns[slot].type != CONN_TYPE_MASTER)
continue;
if (Conn_accept_is_allowed == 0)
Conns[slot].events &= ~CONN_POLLIN;
else
Conns[slot].events |= CONN_POLLIN;
Conn_engine_chg_obj(&Conns[slot]);
}
Conn_accept_is_allowed_last = Conn_accept_is_allowed;
}
/*
* Add tokens to connection
*/
static void Conn_band_update(const unsigned int slot)
{
long diff;
/* no need */
if (Conns[slot].band_width == 0)
return;
diff = (Conn_now.tv_sec - Conns[slot].band_lasttime.tv_sec) * 1000000;
diff += Conn_now.tv_usec - Conns[slot].band_lasttime.tv_usec;
diff /= 100000;
/* already added in this hundred of milisecond? */
if (diff == 0)
return;
/* take care of time skew */
if (diff < 0)
diff = 1;
Conns[slot].band_lasttime = Conn_now;
Conns[slot].band_tokens += diff * Conns[slot].band_width / 10;
if (Conns[slot].band_tokens > Conns[slot].band_factor * Conns[slot].band_width)
Conns[slot].band_tokens = Conns[slot].band_factor * Conns[slot].band_width;
Conns[slot].events |= CONN_POLLOUT;
Conn_engine_chg_obj(&Conns[slot]);
Log(debug_band, "\t\tBAND: slot=%u, id=%llu, added tokens -> %u.\n",
slot, Conns[slot].id, Conns[slot].band_tokens);
}
/*
* Set the bandwidth for a connection
* width is in 1b increments
*/
int Conn_band(struct Conn *C, const unsigned int width,
const unsigned int factor)
{
unsigned int slot;
slot = C->slot;
Log(11, "\tConn_band: slot=%u, id=%llu, width=%u, factor=%u.\n",
slot, Conns[slot].id, width, factor);
Conns[slot].band_lasttime = Conn_now;
Conns[slot].band_width = width;
Conns[slot].band_factor = factor;
Conns[slot].band_tokens = factor * width;
Log(debug_band, "\t\tBAND: lasttime=%d.%06d, width=%u, factor=%u, tokens=%u\n",
Conns[slot].band_lasttime.tv_sec, Conns[slot].band_lasttime.tv_usec,
Conns[slot].band_width, Conns[slot].band_factor, Conns[slot].band_tokens);
return 0;
}
static void Conn_trytoconnect(void)
{
struct addrinfo hints;
struct addrinfo *res;
int i, ret;
char port[8];
Log(8, "%s: Conn_pending=%d\n",
__FUNCTION__, Conn_pending);
for (i = Conn_no - 1; i >= 0; i--) {
if (Conns[i].type != CONN_TYPE_P2P)
continue;
if ((Conns[i].state == CONN_STATE_CONNECT_0)
&& (Conns[i].tryat <= Conn_now.tv_sec)) {
Conns[i].state = CONN_STATE_CONNECT_a;
}
if (Conns[i].state != CONN_STATE_CONNECT_a)
continue;
Log(9, "\tTrying to connect slot=%u, id=%llu, to %s/%d...\n",
Conns[i].slot, Conns[i].id, Conns[i].addr, Conns[i].port);
memset(&hints, 0, sizeof(hints));
if (Conns[i].sock_domain == 0)
hints.ai_family = PF_UNSPEC;
else
hints.ai_family = Conns[i].sock_domain;
hints.ai_socktype = Conns[i].sock_type;
/*hints.ai_flags = AI_NUMERICHOST;*/
hints.ai_flags = AI_ADDRCONFIG;
snprintf(port, sizeof(port), "%d", Conns[i].port);
res = NULL;
ret = getaddrinfo(Conns[i].addr, port, &hints, &res);
if (ret != 0) {
Conns[i].state = CONN_STATE_ERROR;
Conns[i].error_state = CONN_ERROR_GETADDRINFO;
if (res)
freeaddrinfo(res);
continue;
}
if (Conns[i].fd == -1) {
Conns[i].fd = socket(res->ai_family, res->ai_socktype, 0);
if (Conns[i].fd == -1) {
Conns[i].state = CONN_STATE_ERROR;
Conns[i].error_state = CONN_ERROR_SOCKET;
Conns[i].xerrno = errno;
freeaddrinfo(res);
continue;
}
Log(10, "\tAllocated socket on fd %d\n",
Conns[i].fd);
Conn_setnonblock(Conns[i].fd);
/* Need POLLOUT to signal when the connection was done. */
Conns[i].events |= (CONN_POLLIN | CONN_POLLOUT);
ret = Conn_engine_add_obj(&Conns[i]);
if (ret != 0) {
Conns[i].state = CONN_STATE_ERROR;
Conns[i].error_state = CONN_ERROR_SOCKET;
freeaddrinfo(res);
continue;
}
}
/* Set syn time */
Conns[i].conn_syn = Conn_now;
Log(11, "\tConnecting...\n");
ret = connect(Conns[i].fd, res->ai_addr, res->ai_addrlen);
if ((ret != 0) && (errno != EINPROGRESS)) {
Conns[i].state = CONN_STATE_ERROR;
Conns[i].error_state = CONN_ERROR_CONNECT;
Conns[i].xerrno = errno;
freeaddrinfo(res);
continue;
}
Conns[i].state = CONN_STATE_CONNECT_b;
Conn_pending--;
Conn_total++;
freeaddrinfo(res);
}
Log(8, "%s: After, Conn_pending=%d\n",
__FUNCTION__, Conn_pending);
}
static void Conn_send_cb_i(const unsigned int slot)
{
ssize_t n;
unsigned int max;
int count;
char *buf;
int xerrno;
char *dump;
Log(10, "Conn_send_cb_i slot=%u, id=%llu, head=%u, tail=%u, size=%u...\n",
slot, Conns[slot].id, Conns[slot].obuf_head, Conns[slot].obuf_tail,
Conns[slot].obuf_size);
if (Conns[slot].obuf == NULL)
abort();
buf = Conns[slot].obuf + Conns[slot].obuf_head;
count = Conns[slot].obuf_tail - Conns[slot].obuf_head;
if (count == 0) {
Log(13, "\tConn_send_cb_i: Empty buffer! Strange!\n");
return;
}
max = count;
if ((Conn_max_send > 0) && (max > Conn_max_send))
max = Conn_max_send;
/* bandwidth */
if (Conns[slot].band_width > 0) {
if (max > Conns[slot].band_tokens)
max = Conns[slot].band_tokens;
if (max == 0) {
Log(debug_band, "\tBAND: Suspend 100ms the C (no tokens)!\n");
Conns[slot].events &= ~CONN_POLLOUT;
Conn_engine_chg_obj(&Conns[slot]);
return;
}
}
again:
Log(10, "\tsend(fd=%d, buf (head=%u, tail=%u), max=%d (count=%d), 0)...\n",
Conns[slot].fd, Conns[slot].obuf_head,
Conns[slot].obuf_tail, max, count);
n = send(Conns[slot].fd, buf, max, 0);
xerrno = errno;
if ((n == -1) && (errno == EINTR))
goto again;
if ((n == -1) && (errno == EAGAIN))
return;
Log(10, "%s: slot=%u, id=%llu: fd=%d Sent %d bytes [head=%d tail=%d]\n",
__FUNCTION__, slot, Conns[slot].id, Conns[slot].fd,
n, Conns[slot].obuf_head, Conns[slot].obuf_tail);
if (Conn_level >= 10) {
dump = Conn_dump(buf, n);
Log(0, "\t%s\n", dump);
free(dump);
}
if (n > 0) {
Conns[slot].tsend = Conn_now;
if (n < count) {
Conns[slot].obuf_head += n;
} else {
Conns[slot].obuf_head = 0;
Conns[slot].obuf_tail = 0;
}
Conns[slot].bo += n;
if (Conns[slot].band_width > 0) {
Conns[slot].band_tokens -= n;
Log(debug_band, "\t%s: BAND: Remove %d tokens -> %u...\n",
__FUNCTION__,
n, Conns[slot].band_tokens);
}
} else {
Log(0, "%s: Error in send (slot=%u, id=%llu) [%s]\n",
__FUNCTION__,
slot, Conns[slot].id, strerror(errno));
Conns[slot].error_state = CONN_ERROR_SEND;
Conns[slot].xerrno = xerrno;
}
}
static void Conn_recv_cb_i(const unsigned int slot)
{
ssize_t n;
unsigned int max;
int r, xerrno;
char *dump;
Log(10, "Conn_recv_cb_i slot=%u, id=%llu, head=%u, tail=%u, size=%u...\n",
slot, Conns[slot].id, Conns[slot].ibuf_head, Conns[slot].ibuf_tail, Conns[slot].ibuf_size);
if (Conns[slot].ibuf_tail == Conns[slot].ibuf_size) {
r = Conn_try_expand_buf(slot, 1, 0);
if (r != 0) {
Log(1, "MEM: Cannot expand ibuf!\n");
return;
}
}
max = Conns[slot].ibuf_size - Conns[slot].ibuf_tail;
if ((Conn_max_recv > 0) && (max > Conn_max_recv))
max = Conn_max_recv;
while (1) {
n = recv(Conns[slot].fd, Conns[slot].ibuf + Conns[slot].ibuf_tail, max, 0);
if ((n == -1) && (errno == EINTR))
continue;
xerrno = errno;
break;
}
Log(10, "%s: slot=%u, id=%llu: fd=%d Received %d bytes.\n",
__FUNCTION__, slot, Conns[slot].id, Conns[slot].fd, n);
if (n > 0) {
if (Conn_level >= 10) {
dump = Conn_dump(Conns[slot].ibuf + Conns[slot].ibuf_tail, n);
Log(0, "\t%s\n", dump);
free(dump);
}
Conns[slot].ibuf_tail += n;
Conns[slot].bi += n;
Conns[slot].trecv = Conn_now;
if (Conns[slot].cb_data)
Conns[slot].cb_data(&Conns[slot]);
else if (Conn_data_cb)
Conn_data_cb(&Conns[slot]);
} else if (n == 0) {
Log(10, "Remote close sending side (slot=%u, id=%llu)\n",
slot, Conns[slot].id);
Conns[slot].error_state = CONN_ERROR_HANGUP;
} else {
Log(4, "Error in recv (slot=%u, id=%llu) [%s]\n",
slot, Conns[slot].id, strerror(errno));
Conns[slot].error_state = CONN_ERROR_RECV;
Conns[slot].xerrno = xerrno;
}
}
/*
* Callback that is called for every connection
*/
static void Conn_poll_cb(const unsigned int slot, int revents)
{
Log(12, "%s: slot=%u, id=%llu, revents=%x.\n",
__FUNCTION__, slot, Conns[slot].id, revents);
Conns[slot].revents = revents;
if (Conn_level >= 12)
Log(12, "\t%s\n", Conn_status_slot(slot));
/* We should not have events on a free cell */
if (Conns[slot].state == CONN_STATE_FREE)
abort();
if (revents & CONN_POLLHUP) {
Conns[slot].error_state = CONN_ERROR_HANGUP;
/* TODO: Add it to the close list to speed it up */
}
if (revents & CONN_POLLERR) {
Conns[slot].error_state = CONN_ERROR_POLL;
Conns[slot].xerrno = 0; /* TODO: unknown error? */
/* TODO: CONN_ERROR_POLL is correct here? */
}
/* First, test we have a new connection */
if ((revents & CONN_POLLOUT)
&& (Conn_ignore(slot) == 0)) {
/* We just established a connection */
if (Conns[slot].state == CONN_STATE_CONNECT_b) {
/*
* We do not need POLLOUT now - it was used only for
* connect completion.
*/
revents &= ~CONN_POLLOUT;
Conns[slot].events &= ~CONN_POLLOUT;
Conn_engine_chg_obj(&Conns[slot]);
Conns[slot].state = CONN_STATE_OPEN;
Conn_set_address(&Conns[slot], 0);
Conns[slot].time_open = Conn_now;
if (Conns[slot].cb_connected != NULL)
Conns[slot].cb_connected(&Conns[slot]);
else if (Conn_connected_cb)
Conn_connected_cb(&Conns[slot]);
}
}
/* Second, test for error or input */
if ((revents & CONN_POLLIN)
&& (Conn_ignore(slot) == 0)) {
if (Conns[slot].type == CONN_TYPE_MASTER) {
/* C pointer can change under us in Conn_accept->Conn_grow */
Conn_accept(slot);
} else {
if (Conns[slot].cb_recv)
Conns[slot].cb_recv(&Conns[slot]);
else if (Conn_recv_cb != NULL)
Conn_recv_cb(&Conns[slot]);
else
Conn_recv_cb_i(slot);
}
}
if ((revents & CONN_POLLOUT)
&& (Conn_ignore(slot) == 0)) {
/* We can send data */
if (Conns[slot].state == CONN_STATE_OPEN) {
if (Conns[slot].cb_send)
Conns[slot].cb_send(&Conns[slot]);
else if (Conn_send_cb != NULL)
Conn_send_cb(&Conns[slot]);
else
Conn_send_cb_i(slot);
if (Conns[slot].obuf_head == Conns[slot].obuf_tail) {
Conns[slot].events &= ~CONN_POLLOUT;
Conn_epoll_chg_obj(&Conns[slot]);
if (Conns[slot].flags & CONN_FLAGS_CLOSE_AFTER_SEND) {
Conns[slot].state = CONN_STATE_ERROR;
Conns[slot].error_state = CONN_ERROR_USERREQ;
}
}
}
}
}
/*
* Moving an in-use slot over a free one for compacting reason.
*/
static void Conn_move_slot(const unsigned int dst, const unsigned int src)
{
struct Conn tmp;
if (dst == src)
return;
Log(10, "\t%s: Moving id %llu from slot=%u to slot=%d...\n",
__FUNCTION__, Conns[src].id, src, dst);
/* We need to save old location because of usefull pointers */
tmp = Conns[dst];
Conns[dst] = Conns[src];
Conns[src] = tmp;
Conns[dst].slot = dst;
Conns[src].slot = src;
Conn_engine_move_slot(dst, src);
}
/*
* Does the polling of file descriptors.
* Returns: -1 on error, 0 nothing to do, 1 if some work was done
* timeout is in 1/1000 seconds increments.
*/
int Conn_poll(const int timeout)
{
int ret;
int timeout2;
unsigned int slot, last;
Log(9, "%s: timeout=%d Conn_no=%d, Conn_work_to_do=%u)\n",
__FUNCTION__, timeout, Conn_no, Conn_work_to_do);
if (timeout > 1000)
timeout2 = 1000;
else if (timeout == -1)
timeout2 = 1000;
else
timeout2 = timeout;
loop:
if (Conn_must_stop == 1)
return 0;
if (Conn_work_to_do == 0) {
Log(9, "%s: work_to_do is 0, so return 0!\n",
__FUNCTION__);
return 0;
}
if (Conn_pending > 0)
Conn_trytoconnect();
ret = Conn_engine_poll(timeout2, Conn_poll_cb);
if (ret < 0)
return -1;
Log(9, "\tDo compacting, expiration and band stuff (%d event(s))...\n",
ret);
slot = 0;
while (slot < Conn_no) {
/*
* Save last position because Conn_free_intern
* decrements Conn_no
*/
last = Conn_no - 1;
/* Closing connection if it is in error state */
if (Conns[slot].error_state > 0) {
Log(11, "\tSlot=%u in error [%s], exchange with pos %u.\n",
slot, Conn_errno(&Conns[slot]), last);
Conn_move_slot(slot, last);
Conn_free_intern(last);
continue;
}
/* No commit done yet */
if (Conns[slot].state == CONN_STATE_EMPTY) {
slot++;
continue;
}
if (Conns[slot].state == CONN_STATE_FREE) {
/* Must not happen! */
abort();
}
/* test if it expired/timeout */
Conn_expire(slot);
/* add tokens */
Conn_band_update(slot);
slot++;
}
/* Blocking accept if full queue or unblock if not */
Conn_accept_allow();
if (timeout == -1)
goto loop;
return ret;
}
/*
* Returns the lifetime of a connection
*/
unsigned long long Conn_lifetime(struct Conn *C)
{
unsigned int slot;
slot = C->slot;
return Conn_time_diff(&Conn_now, &Conns[slot].time_open);
}