catalinux / Conn (public) (License: LGPLv2) (since 2016-03-01) (hash sha1)
Net library for easy building ipv4/ipv6 network daemons/clients

/Conn.c (d6754c4bc9f1550b546b24e613edeeaa0f76b172) (81042 bytes) (mode 100644) (type blob)

/*
 * Author: Catalin(ux) M BOIE <catab at embedromix.ro>
 * Date: 2004
 * Description:	Some functions to help writing network servers and clients,
 *		both ipv4 and ipv6.
 * Licence: LGPL
 */

#include "Conn_config.h"

#include <sys/timerfd.h>
#include <stdarg.h>
#include <unistd.h>
#include <ctype.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/time.h>
#include <netinet/tcp.h>
#include <errno.h>
#include <signal.h>
#include <sched.h>

#include "Conn_intern.h"
#include "Conn_web.h"


#define CONN_OUT	0
#define CONN_IN		1

/* ############## static ########### */
static int			Conn_epoll_fd;
static struct epoll_event	Conn_epoll_events[CONN_EVENTS_SLOTS];

static unsigned int		Conn_max_reached = 0;
static unsigned int		Conn_default_ibuf = 256;
static unsigned int		Conn_default_obuf = 256;
static unsigned int		Conn_max_ibuf = 4096000;
static unsigned int		Conn_max_obuf = 4096000;

/* Max bytes to enqueue on one send/recv call */
static unsigned int		Conn_max_send = 32 * 1024;
static unsigned int		Conn_max_recv = 32 * 1024;

static unsigned int		Conn_no = 0;
static unsigned int		Conn_work_to_do = 0;
static unsigned int		Conn_max = 0;
static unsigned long		Conn_total = 0;
static unsigned int		Conn_start = 0;
static unsigned int		Conn_pending = 0;
static __thread struct timeval	Conn_now;
static unsigned short		Conn_debug_level = 0;		/* debug level */
/* memory stuff */
static unsigned long long	Conn_mem_buffers_in = 0;
static unsigned long long	Conn_mem_buffers_out = 0;
#if 1
static unsigned long long	Conn_mem_structs = 0;
#endif

static struct Conn		*Conns = NULL;
static unsigned int		Conn_inited = 0;
static unsigned int		Conn_allocated = 0;
static unsigned long long	Conn_id = 1;
static unsigned int		Conn_must_stop = 0;

static __thread char		Conn_error[512];
static __thread char		log_info[32];

static int			Conn_log_fd = 2;
static unsigned short		debug_band = 11;
static struct Conn_pool		Conn_masters; /* Keeps track of listening sockets */
static struct Conn_pool		Conn_free; /* Keeps track of free Conns */
static struct Conn_pool		Conn_connect_list; /* Keeps track of connecting sockets */

static unsigned char		Conn_reuseport_available;


/* Prototypes */
static void Conn_default_cbs_accept(struct Conn *C);
static void Conn_default_cbs_recv(struct Conn *C);
static void Conn_default_cbs_send(struct Conn *C);
static void Conn_default_cbs_data(struct Conn *C);
static void Conn_default_cbs_close(struct Conn *C);
static void Conn_default_cbs_trigger(struct Conn *C);
static void Conn_default_cbs_error(struct Conn *C);
static void Conn_default_cbs_connected(struct Conn *C);
static void Conn_default_cbs_accept_error(struct Conn *C);

static struct Conn_cbs Conn_default_cbs =
{
	.accept		=	Conn_default_cbs_accept,
	.recv		=	Conn_default_cbs_recv,
	.send		=	Conn_default_cbs_send,
	.data		=	Conn_default_cbs_data,
	.close		=	Conn_default_cbs_close,
	.trigger	=	Conn_default_cbs_trigger,
	.error		=	Conn_default_cbs_error,
	.connected	=	Conn_default_cbs_connected,
	.accept_error	=	Conn_default_cbs_accept_error,
	.worker_start	=	NULL
};


/* ############## wpool ############# */
/*
 * Gets a reference to a wpool structure
 */
static void Conn_wpool_get(struct Conn_wpool *wp)
{
	wp->refs++;
}

/*
 * Decrements usage
 */
static void Conn_wpool_put(struct Conn_wpool *wp)
{
	if (!wp)
		return;

	wp->refs--;
	if (wp->refs == 0)
		Conn_wpool_destroy(wp);
}

/*
 * Associate a Conn with a workers pool
 */
void Conn_set_wp(struct Conn *C, struct Conn_wpool *wp)
{
	Conn_wpool_get(wp);
	C->wp = wp;
}

/*
 * Disassociate a Conn with a workers pool
 */
void Conn_del_wp(struct Conn *C, struct Conn_wpool *wp)
{
	C->wp = NULL;
	Conn_wpool_put(wp);
}

/*
 * Dumps some stats about a worker
 */
static void Conn_wpool_worker_stats(const struct Conn_wpool_worker *w)
{
	Log(0, "Worker %hu: in_clients: %llu\n",
		w->id, w->in_clients);
}

/*
 * Creates a workers pool
 * @M is the master struct. It is needed to enqueue the master fd to workers.
 */
struct Conn_wpool *Conn_wpool_create(const unsigned short workers)
{
	struct Conn_wpool *ret;
	unsigned short i;
	struct Conn_wpool_worker *ww;

	ret = malloc(sizeof(struct Conn_wpool));
	if (ret == NULL) {
		Log(0, "Cannot alloc memory for wpool!\n");
		goto out;
	}

	ret->ww = malloc(workers * sizeof(struct Conn_wpool_worker));
	if (ret->ww == NULL) {
		Log(0, "Cannot alloc memory for workers!\n");
		goto free_ret;
	}

	/* TODO: what I am doing with this?! Signaling between works and pool? */
	ret->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
	if (ret->epoll_fd == -1) {
		Log(0, "Cannot create epoll fd (%s)\n", strerror(errno));
		goto free_ww;
	}

	/* Start threads */
	for (i = 0; i < workers; i++) {
		ww = &ret->ww[i];
		ww->id = i;
	}

	ret->workers = workers;
	ret->next = 0;
	ret->refs = 0;
	Log(10, "\tPool created with success.\n");
	return ret;

	free_ww:
	free(ret->ww);

	free_ret:
	free(ret);

	out:
	Log(5, "\tFailed to create wpool.\n");
	return NULL;
}

/*
 * Destroys a worker pool
 */
int Conn_wpool_destroy(struct Conn_wpool *wp)
{
	int i;

	for (i = 0; i < wp->workers; i++) {
		struct Conn_wpool_worker *w;

		w = &wp->ww[i];

		if (w->inited == 0)
			continue;

		/* child will exit if control is closed */
		close(w->control);
	}

	close(wp->epoll_fd);

	free(wp->ww);

	free(wp);

	return 0;
}


/* ########### pool ############## */
/*
 * Inits a conn pool
 */
static void pool_init(struct Conn_pool *x)
{
	x->head = NULL;
	x->allocated = 0;
	x->next_block = 0;
}

/*
 * Grow Conn structures (cells)
 * Caller must have x->lock taken.
 */
__cold static int pool_grow(struct Conn_pool *x, const unsigned int increment)
{
	unsigned int i;
	struct Conn *p, *prev;

	Log(1, "%s Try to grow pool from %d to %d.\n",
		__func__, x->allocated, x->allocated + increment);

	p = (struct Conn *) malloc(increment * sizeof(struct Conn));
	if (unlikely(p == NULL))
		return -1;

	i = x->next_block;
	x->blocks[i] = p;
	x->next_block++;

	/* Add them to free pool */
	prev = NULL;
	for (i = 0; i < increment; i++) {
		//Log(1, "%s: ### Allocated %p.\n", __func__, &p[i]);

		/* We are forced to do some inits here */
		p[i].obuf_size = 0;
		p[i].ibuf_size = 0;
		p[i].obuf = NULL;
		p[i].ibuf = NULL;

		if (likely(prev))
			prev->next = &p[i];

		prev = &p[i];
	}
	/* Now, 'prev' contain last element; we will reuse it. */
	prev->next = NULL;

	if (likely(x->head == NULL))
		x->head = &p[0];
	else
		x->tail->next = &p[0];
	x->tail = prev;

	x->allocated += increment;

	return 0;
}


/* ############## basic Conn stuff ############ */
#if 0
/*
 * Sends a fd using comm_fd as a communication medium
 * man 3 cmsg
 */
static int Conn_send_fd(int comm_fd, int fd)
{
	struct msghdr m = { 0 };
	struct cmsghdr *cm;
	struct iovec iov;
	int *p, r;
	union {
		struct cmsghdr align;
		char buf[CMSG_SPACE(sizeof(int))];
	} u;
	unsigned char junk = 1;

	iov.iov_base = &junk;
	iov.iov_len = 1;

	m.msg_name = NULL;
	m.msg_namelen = 0;
	m.msg_control = u.buf;
	m.msg_controllen = sizeof(u.buf);
	m.msg_iov = &iov;
	m.msg_iovlen = 1;

	cm = CMSG_FIRSTHDR(&m);
	cm->cmsg_len = CMSG_LEN(sizeof(int));
	cm->cmsg_level = SOL_SOCKET;
	cm->cmsg_type = SCM_RIGHTS;
	p = (int *) CMSG_DATA(cm);
	*p = fd;

	r = sendmsg(comm_fd, &m, 0 /*flags*/);
	if (r == -1) {
		Log(1, "%s: cannot send message: %s\n",
			__func__, strerror(errno));
		return -1;
	}

	Log(1, "%s: Sent fd %d using comm fd %d\n",
		__func__, fd, comm_fd);

	return 0;
}

/*
 * Receive a fd using comm_fd as a communication medium
 * man 3 cmsg
 */
static int Conn_recv_fd(int comm_fd)
{
	int r;
	struct msghdr m;
	struct cmsghdr *cm;
	struct iovec iov;
	int *p;
	union {
		struct cmsghdr cm;
		char buf[CMSG_SPACE(sizeof(int))];
	} u;
	unsigned char junk;

	u.cm.cmsg_len = CMSG_LEN(sizeof(int));
	u.cm.cmsg_level = SOL_SOCKET;
	u.cm.cmsg_type = SCM_RIGHTS;

	iov.iov_base = &junk;
	iov.iov_len = 1;

	m.msg_control = u.buf;
	m.msg_controllen = sizeof(u.buf);
	m.msg_iov = &iov;
	m.msg_iovlen = 1;
	m.msg_name = NULL;
	m.msg_namelen = 0;

	r = recvmsg(comm_fd, &m, 0 /*flags*/);
	if (r == -1) {
		Log(1, "%s: Cannot receive msg: %s\n",
			__func__, strerror(errno));
		return -1;
	}
	Log(10, "DEBUG: recvmsg returned: %d\n", r);

	cm = CMSG_FIRSTHDR(&m);
	if (!cm) {
		Log(1, "%s: Invalid first header.\n", __func__);
		return -1;
	}
	if ((cm->cmsg_level != SOL_SOCKET)
		|| (cm->cmsg_type != SCM_RIGHTS)) {
		Log(1, "%s: Invalid level/type: %d/%d\n",
			__func__, cm->cmsg_level, cm->cmsg_type);
		return -1;
	}

	p = (int *) CMSG_DATA(cm);
	Log(5, "%s: Received fd %d using comm_fd %d\n",
		__func__, *p, comm_fd);

	return *p;
}
#endif

#if 1
__cold void Log(const unsigned short level, char *format, ...)
{
	va_list	ap;
	size_t len;
	char line[4096];

	if (likely(level > Conn_debug_level))
		return;

	snprintf(line, sizeof(line), "%ld.%06ld %s ",
		Conn_now.tv_sec, Conn_now.tv_usec, log_info);
	len = strlen(line);

	va_start(ap, format);
	vsnprintf(line + len, sizeof(line) - len, format, ap);
	va_end(ap);

	write(Conn_log_fd, line, strlen(line));
}
#else
inline void Log(const unsigned short level, char *format, ...)
{
	return;
}
#endif

/*
 * Set extra log info to distinguish between workers
 */
static void Conn_log_set_info(const char *info)
{
	snprintf(log_info, sizeof(log_info), "%s", info);
}

char *Conn_dump(const void *buf_src0, const size_t len_src)
{
	unsigned int i, j;
	char tmp[3];
	char *buf_dst;
	const unsigned char *buf_src = buf_src0;

	Log(30, "\tConn_dump(%p, len=%zu)\n",
		buf_src, len_src);

	buf_dst = malloc(len_src * 4 + 1);
	if (buf_dst == NULL)
		return strdup("Memory allocation error1!");

	j = 0;
	for (i = 0; i < len_src; i++) {
		unsigned char c;

		c = buf_src[i];
		if ((c < 32) || (c > 127)) {
			buf_dst[j++] = '[';
			snprintf(tmp, sizeof(tmp), "%02x", c);
			buf_dst[j++] = tmp[0];
			buf_dst[j++] = tmp[1];
			buf_dst[j++] = ']';
		} else {
			buf_dst[j++] = (char) c;
		}
	}

	buf_dst[j] = '\0';

	return buf_dst;
}

char *Conn_dumphex(const void *buf_src0, const size_t len_src)
{
	unsigned int i, j;
	char tmp[3];
	char *buf_dst;
	const unsigned char *buf_src = buf_src0;

	buf_dst = malloc(len_src * 2 + 1);
	if (buf_dst == NULL)
		return strdup("Memory allocation error1!");

	j = 0;
	for (i = 0; i < len_src; i++) {
		unsigned char c;

		c = buf_src[i];
		snprintf(tmp, sizeof(tmp), "%02x", c);
		buf_dst[j++] = tmp[0];
		buf_dst[j++] = tmp[1];
	}

	buf_dst[j] = '\0';

	return buf_dst;
}

/*
 * Hex digit to value
 */
static unsigned char hex_digit(const char x)
{
	if ((x >= 'A') && (x <= 'F'))
		return 10 + x - 'A';
	if ((x >= 'a') && (x <= 'f'))
		return 10 + x - 'a';
	return x - '0';
}

/*
 * Converts a hexa string into binary form
 * Returns -1 on error, else the number of bytes.
 */
int Conn_hex2bin(unsigned char *out, const size_t out_size, const char *hex)
{
	unsigned int i = 0;

	while (*hex != '\0') {
		if (i == out_size) {
			snprintf(Conn_error, sizeof(Conn_error),
				"out buffer too small (%zu)", out_size);
			return -1;
		}

		if (hex[1] == '\0') {
			snprintf(Conn_error, sizeof(Conn_error),
				"odd number of hexa chars");
			return -1;
		}
		out[i++] = hex_digit(hex[0]) * 16 + hex_digit(hex[1]);
		hex += 2;
	}

	return i;
}

void Conn_debug(int fd, const unsigned short debug)
{
	Conn_log_fd = fd;
	Conn_debug_level = debug;
}

/*
 * Do not use it yet, it sucks (the paras)
 */
static void Conn_poll_status(const unsigned int ev, char *ret)
{
	int i = 0;

	strcpy(ret, "");

	if (ev & EPOLLIN)	ret[i++] = 'I';
	if (ev & EPOLLPRI)	ret[i++] = 'P';
	if (ev & EPOLLOUT)	ret[i++] = 'O';
	if (ev & EPOLLERR)	ret[i++] = 'E';
	if (ev & EPOLLHUP)	ret[i++] = 'H';
	if (ev & EPOLLRDHUP)	ret[i++] = 'h';
	ret[i++] = '\0';
}

static char *Conn_domain(const struct Conn *C)
{
	switch (C->sock_domain) {
	case PF_INET:		return "IPv4";
	case PF_INET6:		return "IPv6";
	case PF_PACKET:		return "PACKET";
	default:		return "?";
	}
}

static char *Conn_type(const struct Conn *C)
{
	switch (C->sock_type) {
	case SOCK_STREAM:	return "stream";
	case SOCK_DGRAM:	return "dgram";
	case SOCK_RAW:		return "raw";
	default:		return "?";
	}
}

static char *Conn_get_socket_protocol(const struct Conn *C)
{
	switch (C->sock_protocol) {
	case IPPROTO_IP:	return "IP";
	default:		return "?";
	}
}

static char *Conn_socktype(const struct Conn *C)
{
	switch (C->type) {
	case CONN_TYPE_UNK:		return "unk";
	case CONN_TYPE_MASTER:		return "master";
	case CONN_TYPE_P2P:		return "p2p";
	default:			return "?";
	}
}

/* ############ Conn Callbacks ########### */
static void Conn_default_cbs_accept(struct Conn *C)
{
	Log(10, "%llu %s A conn was accepted from %s/%d\n",
		C->id, __func__, Conn_addr_remote(C), Conn_port_remote(C));
}

/*
 * Expand the requested buffer
 * what = 0 for out buffer, what = 1 for input buffer
 * returns 0 if OK, -1 on error
 */
__cold static int Conn_try_expand_buf(struct Conn *C, const char what,
	const unsigned int needed)
{
	char *p;
	unsigned int hm;
	unsigned int old_size, amount, head, tail;
	unsigned int max_buf;
	char *pbuf;

	if (what == CONN_OUT) {
		head = C->obuf_head;
		tail = C->obuf_tail;
		old_size = C->obuf_size;
		max_buf = Conn_max_obuf;
		pbuf = C->obuf;
	} else {
		head = C->ibuf_head;
		tail = C->ibuf_tail;
		old_size = C->ibuf_size;
		max_buf = Conn_max_ibuf;
		pbuf = C->ibuf;
	}

	/* Do we have enough room? */
	if (old_size - tail >= needed)
		return 0;

	Log(10, "\tTry to expand buffer for [%s] have=%d needed=%d"
		" head=%u tail=%u.\n",
		what == CONN_OUT ? "O" : "I", old_size, old_size + needed, head, tail);

	amount = needed - (old_size - tail);

	/* Do not alloc less than 128 bytes */
	if (amount < 128)
		amount = 128;

	hm = old_size + amount;
	if ((max_buf > 0) && (hm > max_buf))
		hm = max_buf;

	/* Seems we are not allowed to grow larger */
	if (hm <= old_size) {
		Log(1, "Cannot obtain needed byte(s):"
			" hm(%u) <= old_size(%u)!\n",
			hm, old_size);
		return -1;
	}

	p = realloc(pbuf, hm);
	if (p == NULL) {
		Log(1, "Cannot realloc pbuf to %u byte(s)!\n", hm);
		return -1;
	}

	if (what == CONN_OUT) {
		C->obuf = p;
		C->obuf_size = hm;
		Conn_mem_buffers_out += hm - old_size;
	} else {
		C->ibuf = p;
		C->ibuf_size = hm;
		Conn_mem_buffers_in += hm - old_size;
	}

	return 0;
}

/*
 * Eat @bytes from head of input buffer
 */
void Conn_eat(struct Conn *C, const unsigned int bytes)
{
	/* advance head */
	C->ibuf_head += bytes;
	if (C->ibuf_head >= C->ibuf_tail) {
		C->ibuf_head = 0;
		C->ibuf_tail = 0;
	}

	Log(10, "%llu %s(bytes=%u) head=%u tail=%u qlen=%u\n",
		C->id, __func__, bytes, C->ibuf_head, C->ibuf_tail,
		Conn_iqlen(C));
}

/*
 * Eat all input buffer
 */
void Conn_eatall(struct Conn *C)
{
	C->ibuf_head = 0;
	C->ibuf_tail = 0;
}

static char *Conn_state(const struct Conn *C)
{
	switch (C->state) {
		case CONN_STATE_FREE:		return "FREE";
		case CONN_STATE_EMPTY:		return "EMPTY";
		case CONN_STATE_OPEN:		return "OPEN";
		case CONN_STATE_LISTEN:		return "LISTEN";
		case CONN_STATE_CONNECT_0:	return "CONN0";
		case CONN_STATE_CONNECT_a:	return "CONNa";
		case CONN_STATE_CONNECT_b:	return "CONNb";
		case CONN_STATE_ERROR:		return "ERROR";
		default:			return "BUG?";
	}
}

/*
 * Raise an error. It is just a little helper.
 */
static void Conn_error_raise(struct Conn *C, const int err)
{
	if (err != 0)
		C->xerrno = err;

	if (C->cbs.error)
		C->cbs.error(C);
}

__hot static void Conn_free_intern(struct Conn *C)
{
	struct Conn *p, *prev;

	if (unlikely(Conn_debug_level > 9)) {
		Log(0, "%llu %s Cleaning-up fd=%d in state %s [%s]...\n",
			C->id, __func__, C->fd, Conn_state(C), Conn_errno(C));
	}

	if (C->error_state != CONN_ERROR_USERREQ)
		Conn_error_raise(C, 0);

	if ((C->state == CONN_STATE_OPEN)
		|| (C->state == CONN_STATE_LISTEN)
		|| (C->state == CONN_STATE_ERROR)) {
		if (C->cbs.close)
			C->cbs.close(C);
	}

	if (likely(C->fd > -1)) {
		Log(10, "\tClosing fd %d\n", C->fd);
		close(C->fd);
		C->fd = -1;
	}

	C->web = NULL;
	if (C->web_req) {
		free(C->web_req);
		C->web_req = NULL;
	}

	if (unlikely(C->type == CONN_TYPE_MASTER))
		Conn_wpool_put(C->wp);

	if (unlikely(C->auto_reconnect == 1)) {
		Log(20, "Preparing for re-connect\n");

		/* Reset tsend, else we enter in a timeout error loop */
		C->tsend.tv_sec = 0;
		C->tsend.tv_usec = 0;

		/* Reset the connection attempt time */
		C->conn_syn.tv_sec = 0;
		C->conn_syn.tv_usec = 0;

		/* Misc */
		C->error_state = 0;

		C->tryat = (unsigned int) Conn_now.tv_sec + C->delay;
		C->state = CONN_STATE_CONNECT_0;

		C->ibuf_head = 0;
		C->ibuf_tail = 0;

		C->obuf_head = 0;
		C->obuf_tail = 0;

		Conn_pending++;
	} else {
		C->type = CONN_TYPE_UNK;
		C->state = CONN_STATE_FREE;

		/* Decrement the number of busy connections */
		Conn_no--;
		Conn_work_to_do--;

		/* Remove from masters list */
		if (unlikely(C->type == CONN_TYPE_MASTER)) {
			Log(0, "\t### Remove from Conn_masters\n");
			prev = NULL;
			p = Conn_masters.head;
			while (p) {
				if (p != C) {
					prev = p;
					p = p -> next;
					continue;
				}

				if (prev == NULL)
					Conn_masters.head = p->next;
				else
					prev->next = p->next;

				if (Conn_masters.tail == C)
					Conn_masters.tail = prev;
				break;
			}

			C->next = NULL;
		}

		if (C->private) {
			free(C->private);
			C->private = NULL;
		}

		/* Add it to free list of the worker pool */
		if (likely(C->worker)) {
			struct Conn_wpool_worker *worker = C->worker;

			//Log(0, "\tworker->free before: %p/%p\n",
			//	worker->free_head, worker->free_tail);
			C->next = NULL;
			C->worker = NULL; /* TODO: not needed */
			if (unlikely(worker->free.head == NULL)) {
				worker->free.head = C;
				worker->free.tail = C;
			} else {
				C->next = worker->free.head;
				worker->free.head = C;
			}
			//Log(0, "\tworker->free  after: %p/%p worker->free->next=%p\n",
			//	ww->free.head, worker->free.tail, worker->free.head->next);
			//if ((worker->free.head == worker->free.tail) && (worker->free.head->next != NULL)) {
			//	Log(0, "\t### List is corrupted!\n");
			//	abort();
			//}
		}
	}
}

/*
 * If we have data, set the flag to do the closing after send.
 */
__hot void Conn_close(struct Conn *C)
{
	Log(10, "%llu %s Mark for closing (head=%u tail=%u)\n",
		C->id, __func__, C->obuf_head, C->obuf_tail);

	if (C->obuf_head == C->obuf_tail) {
		Log(15, "\tNothing to send, call free_intern\n");
		C->error_state = CONN_ERROR_USERREQ;
		Conn_free_intern(C);
	} else {
		C->shutdown_after_send = 1;
		Log(9, "\tSet SHUTDOWN_AFTER_SEND;"
			" We have data in out buffer; kick sending.\n");
		/* TODO: send and receive are sensitive.
		if (likely(C->cbs.send))
			C->cbs.send(C);
		*/
		Conn_default_cbs_send(C);
	}
}

/*
 * Instructs Conn to stop
 */
void Conn_stop(void)
{
	Conn_must_stop = 1;
}

/*
 * Returns the fd associated with C
 */
static int Conn_get_fd(const struct Conn *C)
{
	return C->fd;
}

/*
 * Returns the timeval of the last packet
 */
void Conn_last_time(const struct Conn *C, struct timeval *tv)
{
	*tv = C->trecv;
}

/*
 * Set some internal parameters
 */
void Conn_set(struct Conn *C, const unsigned int var, const int val)
{
	int fd;

	fd = Conn_get_fd(C);

	switch (var) {
	case CONN_PARA_AUTO_RECONNECT:
		C->auto_reconnect = (val == 0) ? 0 : 1;
		break;
	case CONN_PARA_RECONNECT_DELAY:
		C->delay = (unsigned int) val;
		break;
	case CONN_PARA_IDLE_TIME:
		C->idle = (unsigned int) val;
		break;
	case CONN_PARA_READ_TIMEOUT:
		C->read_timeout = (unsigned int) val;
		break;
	case CONN_PARA_CONN_TIMEOUT:
		C->conn_timeout = (unsigned int) val;
		break;
	case CONN_PARA_TRIGGER:
		C->trigger = (unsigned int) val;
		C->last_trigger = 0;
		break;
	case CONN_PARA_IBUF:
		setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val));
		break;
	case CONN_PARA_OBUF:
		setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val));
		break;
	}
}

/*
 * Adds an fd to poll system
 */
static inline int Conn_add_obj(int epoll_fd, struct Conn *C,
	const unsigned int events)
{
	int ret;
	struct epoll_event ev;

	memset(&ev, 0, sizeof(struct epoll_event));
	ev.data.ptr = C;
	ev.events = events;
	ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, C->fd, &ev);
	if (likely(ret == 0)) {
		Log(10, "%s fd %d added with success on epoll_fd %d\n",
			__func__, C->fd, epoll_fd);
		return 0;
	}

	Log(1, "%s Could not add fd %d to epoll_fd %d, events 0x%x (%s)!\n",
		__func__, C->fd, epoll_fd, events, strerror(errno));
	C->xerrno = errno;
	snprintf(Conn_error, sizeof(Conn_error), "%s",
		strerror(C->xerrno));
	return C->xerrno;
}

/*
 * Change events for a fd
 */
static inline int Conn_change_obj(int epoll_fd, struct Conn *C,
	const unsigned int events)
{
	int ret;
	struct epoll_event ev;

	memset(&ev, 0, sizeof(struct epoll_event));
	ev.data.ptr = C;
	ev.events = events;
	ret = epoll_ctl(epoll_fd, EPOLL_CTL_MOD, C->fd, &ev);
	if (likely(ret == 0))
		return 0;

	Log(1, "%s Could not change fd %d to epoll_fd %d, events 0x%x (%s)!\n",
		__func__, C->fd, epoll_fd, events, strerror(errno));
	C->xerrno = errno;
	snprintf(Conn_error, sizeof(Conn_error), "%s",
		strerror(C->xerrno));
	return C->xerrno;
}

/*
 * Called when EPOLLIN present on a fd.
 */
__hot static void Conn_default_cbs_recv(struct Conn *C)
{
	ssize_t n;
	unsigned int xfer_in_this_call;
	int r, xerrno;
	char *dump, call_callback;

	Log(10, "%llu %s fd=%d head=%u tail=%u size=%u...\n",
		C->id, __func__, C->fd,
		C->ibuf_head, C->ibuf_tail, C->ibuf_size);

	call_callback = 0;
	xfer_in_this_call = 0;
	while (1) {
		unsigned int max;

		if ((Conn_max_recv > 0) && (xfer_in_this_call >= Conn_max_recv)) {
			Log(3, "\tRecv limit reached. Add to extra queue.\n");
			break;
		}

		/* 1 is for a final \0 */
		if (unlikely(C->ibuf_tail + 1 >= C->ibuf_size)) {
			r = Conn_try_expand_buf(C, CONN_IN, Conn_default_ibuf);
			if (unlikely(r != 0)) {
				C->error_state = CONN_ERROR_MEM;
				return;
			}
		}

		max = C->ibuf_size - C->ibuf_tail - 1;
		if ((Conn_max_recv > 0) && (max > Conn_max_recv))
			max = Conn_max_recv;

		while (1) {
			n = recv(C->fd, C->ibuf + C->ibuf_tail, max, 0);
			if (unlikely((n == -1) && (errno == EINTR)))
				continue;
			xerrno = errno;
			break;
		}

		if ((n == -1) && (errno == EAGAIN)) {
			Log(10, "\tEAGAIN received. Wait next round.\n");
			break;
		}

		if (unlikely(n < 0)) {
			Log(0, "\tError receiving [%s]\n", strerror(errno));
			C->error_state = CONN_ERROR_RECV;
			C->xerrno = xerrno;
			Conn_free_intern(C);
			break;
		}

		if (unlikely(n == 0)) {
			Log(5, "\tWe received 0 bytes; oqlen=%lu\n", Conn_oqlen(C));
			if (Conn_oqlen(C) == 0) {
				/* TODO: Why we do not shutdown? Maybe we just
				 * sent data. */
				/* Nothing to send, we can close the connection */
				Log(10, "\tRemote hangup and nothing in out"
					" buffer. Close.\n");
				C->state = CONN_STATE_ERROR;
				C->error_state = CONN_ERROR_HANGUP;
				Conn_free_intern(C);
			} else if (C->close_after_send == 0) {
				Log(10, "\tRemote closed sending side."
					" Set CLOSE_AFTER_SEND and ignore POLLIN\n");
				C->close_after_send = 1;
				Conn_change_obj(C->worker->epoll_fd, C, EPOLLOUT);
			}
			break;
		}

		Log(10, "\tReceived %d byte(s)\n", n);
		call_callback = 1;

		if (unlikely(Conn_debug_level >= 10)) {
			dump = Conn_dump(C->ibuf + C->ibuf_tail, (unsigned int) n);
			Log(0, "\tReceived: %s\n", dump);
			free(dump);
		}

		C->trecv = Conn_now;
		C->ibuf_tail += (unsigned int) n;
		C->bi += (unsigned int) n;
		xfer_in_this_call += (unsigned int) n;

		C->ibuf[C->ibuf_tail] = '\0';

		if (n < max) {
			Log(23, "\tReaded less(%d < %d) than what we requested\n",
				n, max);
			break;
		}
	}

	if (likely(call_callback == 1)) {
		Log(10, "\t%s\n", C->web ? "We have ww attached" : "We have no ww attached");
		if (C->web)
			Conn_web_dispatch(C);
		else if (C->cbs.data)
			C->cbs.data(C);
	}
}

__hot static void Conn_default_cbs_send(struct Conn *C)
{
	ssize_t n;
	int xerrno, r;
	char *dump;

	Log(9, "%llu %s C=%p next=%p fd=%d head=%u tail=%u size=%u\n",
		C->id, __func__, C, C->next, C->fd, C->obuf_head,
		C->obuf_tail, C->obuf_size);

	while (1) {
		unsigned int max, count;
		char *buf;

		count = Conn_oqlen(C);
		if (unlikely(count == 0)) {
			Log(10, "\tEmpty output buffer.\n");
			if (unlikely(C->close_after_send == 1)) {
				Log(9, "\tCLOSE_AFTER_SEND is set"
					", close connection\n");
				/* TODO: this is not really an error; switch to other code */
				C->state = CONN_STATE_ERROR;
				C->error_state = CONN_ERROR_USERREQ;
				Conn_free_intern(C);
				return;
			}

			if (C->shutdown_after_send == 1) {
				Log(9, "\tSHUTDOWN_AFTER_SEND is set"
					", shutdown write\n");
				C->state = CONN_STATE_ERROR;
				r = shutdown(C->fd, SHUT_WR);
				if (likely(r == 0)) {
					/* TODO: this is not really an error; switch to other code */
					C->error_state = CONN_ERROR_USERREQ;
					return;
				}

				Log(1, "\tshutdown returned error [%s]!\n",
					strerror(errno));
				/* TODO: We must set a proper C->state */
				C->error_state = CONN_ERROR_HANGUP;
				Conn_free_intern(C);
				return;
			}

			Conn_change_obj(C->worker->epoll_fd, C, EPOLLIN);
			return; /* TODO: not clear if we could be here. */
		}

		max = count;
		if ((Conn_max_send > 0) && (max > Conn_max_send))
			max = Conn_max_send;

		/* bandwidth */
		if (unlikely(C->band_width > 0)) {
			if (max > C->band_tokens)
				max = C->band_tokens;
			if (max == 0) {
				Log(debug_band, "\tBAND: Suspend for 100ms"
					" (no tokens)!\n");
				break;
			}
		}

		Log(10, "\tsend(fd=%d, buf (head=%u, tail=%u)"
			", max=%d (count=%d), 0)...\n",
			C->fd, C->obuf_head,
			C->obuf_tail, max, count);
		buf = C->obuf + C->obuf_head;
		while (1) {
			n = send(C->fd, buf, max, MSG_NOSIGNAL);
			xerrno = errno;
			if (unlikely((n == -1) && (errno == EINTR)))
				continue;
			break;
		}

		if ((n == -1) && (errno == EAGAIN)) {
			Log(10, "\tEAGAIN received. Wait next round.\n");
			break;
		}

		Log(10, "\tSent %d byte(s) [head=%d tail=%d]\n",
			n, C->obuf_head, C->obuf_tail);

		if (unlikely(n <= 0)) {
			Log(0, "%llu %s Error in sending [%s]\n",
				C->id, __func__, strerror(errno));
			C->error_state = CONN_ERROR_SEND;
			C->xerrno = xerrno;
			Conn_free_intern(C);
			break;
		}

		if (unlikely(Conn_debug_level >= 10)) {
			dump = Conn_dump(buf, (unsigned int) n);
			Log(0, "\tSent: %s\n", dump);
			free(dump);
		}

		C->tsend = Conn_now;
		if (n < count) {
			C->obuf_head += (unsigned int) n;
		} else {
			C->obuf_head = 0;
			C->obuf_tail = 0;
		}
		C->bo += (unsigned int) n;

		if (C->band_width > 0) {
			/* What if band_tokens < n?! */
			C->band_tokens -= (unsigned int) n;
				Log(debug_band, "\tBAND: Remove %d tokens -> %u...\n",
					n, C->band_tokens);
		}

		if (n < max) {
			Log(1, "\tSent less than what we requested."
				" Drop EPOLLOUT and wait for signal."
				" Break loop.\n");
			break;
		}
	}
}

static void Conn_default_cbs_data(struct Conn *C)
{
	Log(10, "%llu %s Data received\n", C->id, __func__);
}

__hot static void Conn_default_cbs_close(struct Conn *C)
{
	Log(9, "%llu %s Closing conn\n", C->id, __func__);
}

static void Conn_default_cbs_trigger(struct Conn *C)
{
	Log(10, "%llu %s Trigger on conn\n", C->id, __func__);
}

static void Conn_default_cbs_error(struct Conn *C)
{
	/* TODO: put 0 */
	Log(10, "%llu %s Error on conn (%s)\n",
		C->id, __func__, Conn_strerror());
}

static void Conn_default_cbs_connected(struct Conn *C)
{
	Log(10, "%llu %s We are connected\n", C->id, __func__);
}

static void Conn_default_cbs_accept_error(struct Conn *C)
{
	Log(10, "%llu %s Error accepting conn (id %llu) (%s)\n",
		C->id, __func__, Conn_strerror());
}

#if 0
/* set noblocking */
static int Conn_setnonblock(const int fd)
{
	return fcntl(fd, F_SETFL, O_NONBLOCK | O_RDWR);
}
#endif

/*
 * Returns the string representation of an socket address
 * @flags: bit0==0 => local address, bit0==1 => peer address
 */
static int Conn_set_address(struct Conn *C, const int flags)
{
	int err;
	struct sockaddr *psa;
	struct sockaddr_in sa4;
	struct sockaddr_in6 sa6;
	socklen_t sa_len;
	char *paddr;
	socklen_t addr_size;
	int *pport;

	/* Test if we need to regenerate. */
	if (flags & 1) {
		if (C->remote_dirty == 0)
			return 0;
        } else {
		if (C->local_dirty == 0)
			return 0;
	}

	if ((C->state == CONN_STATE_FREE) || (C->state == CONN_STATE_EMPTY))
		return 0;

	switch (C->sock_domain) {
	case PF_INET:
		psa = (struct sockaddr *) &sa4;
		sa_len = sizeof(struct sockaddr_in);
		break;
	case PF_INET6:
		psa = (struct sockaddr *) &sa6;
		sa_len = sizeof(struct sockaddr_in6);
		break;
	default:
		return -1;
	}

	if (flags & 1) {
		/* peer */
		paddr = C->addr;
		addr_size = sizeof(C->addr);
		pport = &C->port;
		err = getpeername(C->fd, psa, &sa_len);
	} else {
		/* local */
		paddr = C->bind_addr;
		addr_size = sizeof(C->bind_addr);
		pport = &C->bind_port;
		err = getsockname(C->fd, psa, &sa_len);
	}

	if (err != 0) {
		snprintf(paddr, addr_size, "?");
		return -1;
	}

	switch (C->sock_domain) {
	case PF_INET:
		inet_ntop(C->sock_domain, &sa4.sin_addr,
			paddr, addr_size);
		*pport = ntohs(sa4.sin_port);
		break;
	case PF_INET6:
		inet_ntop(C->sock_domain, &sa6.sin6_addr,
			paddr, addr_size);
		*pport = ntohs(sa6.sin6_port);
		break;
	default:
		return -1;
	}

	/* Remove dirty flag */
	if (flags & 1) {
		C->remote_dirty = 0;
        } else {
		C->local_dirty = 0;
	}

	return 0;
}

/*
 * Returns local address
 */
char *Conn_addr_local(struct Conn *C)
{
	Conn_set_address(C, 0);

	return C->bind_addr;
}

/*
 * Returns remote address
 */
char *Conn_addr_remote(struct Conn *C)
{
	Conn_set_address(C, 1);

	return C->addr;
}

/*
 * Returns local port
 */
int Conn_port_local(struct Conn *C)
{
	Conn_set_address(C, 0);

	return C->bind_port;
}

/*
 * Returns remote port
 */
int Conn_port_remote(struct Conn *C)
{
	Conn_set_address(C, 1);

	return C->port;
}

/*
 * Returns the address family for address stored in @addr.
 */
int Conn_addr_family(const char *addr)
{
	struct addrinfo hints, *results = NULL;
	int ret;

	memset(&hints, 0, sizeof(struct addrinfo));
	hints.ai_family = AF_UNSPEC;
	hints.ai_socktype = 0;
	hints.ai_flags = AI_NUMERICHOST;
	hints.ai_protocol = 0;
	hints.ai_canonname = NULL;
	hints.ai_addr = NULL;
	hints.ai_next = NULL;

	ret = getaddrinfo(addr, NULL, &hints, &results);
	if (ret != 0) {
		snprintf(Conn_error, sizeof(Conn_error),
			"getaddrinfo error on %s (%s)",
			addr, gai_strerror(ret));
		if (results)
			freeaddrinfo(results);
		return -1;
	}

	ret = results->ai_family;

	freeaddrinfo(results);

	return ret;
}

/*
 * Returns a nice speed
 */
static void Conn_speed(char *dst, const unsigned int dst_len,
	const unsigned int speed)
{
	float sp;

	sp = (float) speed;

	if (speed < 1000)
		snprintf(dst, dst_len, "%.2fBps", sp);
	else if (speed < 1000 * 1000)
		snprintf(dst, dst_len, "%.2fKBps", sp / 1000);
	else
		snprintf(dst, dst_len, "%.2fMBps", sp / 1000 / 1000);
}

static char *Conn_status_slot(struct Conn *C)
{
	static char tmp[1024];
	char speedi[32], speedo[32];
	unsigned int dT, si, so;
	char flags[128], flags_prefix[3], flags_postfix[2];
	char *local_addr, *remote_addr;
	int local_port, remote_port;

	/* flags */
	strcpy(flags, "");
	strcpy(flags_prefix, " [");
	strcpy(flags_postfix, "");

	if (C->auto_reconnect) {
		char flags_tmp[64];

		strcat(flags, flags_prefix);
		snprintf(flags_tmp, sizeof(flags_tmp), "autoreconnect_in_%ld/%u",
			(C->tryat == 0) ? 0 : C->tryat - Conn_now.tv_sec,
			C->delay);
		strcat(flags, flags_tmp);
		strcpy(flags_prefix, " ");
		strcpy(flags_postfix, "]");
	}
	if (C->close_after_send) {
		strcat(flags, flags_prefix);
		strcat(flags, "close_after_send");
		strcpy(flags_prefix, " ");
		strcpy(flags_postfix, "]");
	}
	if (C->shutdown_after_send) {
		strcat(flags, flags_prefix);
		strcat(flags, "shutdown_after_send");
		strcpy(flags_prefix, " ");
		strcpy(flags_postfix, "]");
	}

	strcat(flags, flags_postfix);

	dT = (unsigned int) (Conn_now.tv_sec - C->start);
	if (dT == 0)
		dT = 1;
	si = (unsigned int)(C->bi / dT);
	so = (unsigned int)(C->bo / dT);

	Conn_speed(speedi, sizeof(speedi), si);
	Conn_speed(speedo, sizeof(speedo), so);

	Conn_set_address(C, 0);
	Conn_set_address(C, 1);

	local_addr = "-";
	local_port = 0;
	remote_addr = "-";
	remote_port = 0;
	if (C->type == CONN_TYPE_MASTER) {
		local_addr = C->bind_addr;
		local_port = C->bind_port;
	} else if (C->type == CONN_TYPE_P2P) {
		if (C->bind_addr[0] != '\0') {
			local_addr = C->bind_addr;
			local_port = C->bind_port;
		}
		remote_addr = C->addr;
		remote_port = C->port;
	}

	snprintf(tmp, sizeof(tmp), "%p id=%llu fd=%d"
		" %s/%s/%s"
		" %s %s"
		" %s/%d <-> %s/%d"
		" IO=%llu/%llu"
		" BS=%u/%u S=%s/%s"
		" T=%ld bw=%u f=%u tk=%u next=%p"
		"%s",
		C, C->id, C->fd,
		Conn_domain(C), Conn_type(C),
		Conn_get_socket_protocol(C),
		Conn_socktype(C), Conn_state(C),
		local_addr, local_port, remote_addr, remote_port,
		C->bi, C->bo,
		C->ibuf_size, C->obuf_size, speedi, speedo,
		Conn_now.tv_sec - C->start,
		C->band_width, C->band_factor, C->band_tokens,
		C->next, flags);

	return tmp;
}

#if 1
static char *Conn_status_slot_html(struct Conn *C)
{
	static char tmp[1024];
	char *ext = "";
	char speedi[32], speedo[32];
	unsigned int dT, si, so;

	dT = (unsigned int) (Conn_now.tv_sec - C->start);
	if (dT == 0)
		dT = 1;
	si = (unsigned int)(C->bi / dT);
	so = (unsigned int)(C->bo / dT);

	Conn_speed(speedi, sizeof(speedi), si);
	Conn_speed(speedo, sizeof(speedo), so);

	#if 0
	TODO
	if (Conn_status_slot_html_cb)
		ext = Conn_status_slot_html_cb(C);
	#endif

	snprintf(tmp, sizeof(tmp), "<td>%llu</td><td>%d</td>"
		"<td>%s</td><td>%s</td><td>%s</td>"
		"<td>%s</td><td>%s</td>"
		"<td>%s/%d</td>"
		"<td>%llu / %llu</td>"
		"<td>%u / %u</td><td>%s / %s</td><td>%ld</td>"
		"<td>%u</td><td>%u</td><td>%u</td>"
		"%s",
		C->id, C->fd,
		Conn_domain(C), Conn_type(C), Conn_get_socket_protocol(C),
		Conn_socktype(C), Conn_state(C),
		C->addr, C->port, C->bi, C->bo,
		C->ibuf_size, C->obuf_size,
		speedi, speedo, Conn_now.tv_sec - C->start,
		C->band_width, C->band_factor, C->band_tokens,
		ext);

	return tmp;
}
#endif

#if 1
/* flags: bit 1 = 1 - html */
char *Conn_status(const unsigned int flags)
{
	int tmp_len;
	unsigned long len = 0, max;
	char tmp[512];
	char *buf, *per_slot, *ext = "";
	char speedi[32], speedo[32];
	unsigned long long bi, bo, dT;
	struct Conn *C;

	max = (Conn_no + 1) * 512 - 1;
	buf = malloc(max + 1);
	if (!buf)
		return strdup("No enough memory!");

	strcpy(buf, "");

	gettimeofday(&Conn_now, NULL);
	/* TODO: "len += " is incorrect */
	tmp_len = snprintf(tmp, sizeof(tmp),
		"Conn_pending=%u  Conn_no/Conn_max=%u/%u  Conn_total=%lu"
		"  Conn_uptime=%lus  Conn_allocated=%u  Conn_work_to_do=%u"
		"  Conn_mem_structs=%llu  Conn_mem_buffers_in/out=%llu/%llu\n",
		Conn_pending, Conn_no, Conn_max, Conn_total,
		Conn_now.tv_sec - Conn_start, Conn_allocated, Conn_work_to_do,
		Conn_mem_structs, Conn_mem_buffers_in, Conn_mem_buffers_out);
	if (tmp_len < 0) {
		free(buf);
		return strdup("snprintf error");
	}
	if (len + (unsigned int) tmp_len < max) {
		strcat(buf, tmp);
		len += (unsigned int) tmp_len;
	}

	#if 0
	TODO
	if (flags & 1)
		if (Conn_status_cb)
			ext = Conn_status_cb();
	#endif

	if (flags & 1) {
		strcat(buf, "<table border=\"0\" cellspacing=\"1\" cellpadding=\"3\" bgcolor=\"#aaaaaa\">\n");
		strcat(buf, "<tr bgcolor=\"ffffff\">\n");
		strcat(buf, "<td>ID</td>");
		strcat(buf, "<td>FD</td>");
		strcat(buf, "<td>Dom</td>");
		strcat(buf, "<td>Type</td>");
		strcat(buf, "<td>Protocol</td>");
		strcat(buf, "<td>SType</td>");
		strcat(buf, "<td>State</td>");
		strcat(buf, "<td>Addr/port</td>");
		strcat(buf, "<td>BI/BO</td>");
		strcat(buf, "<td>BUF I/O</td>");
		strcat(buf, "<td>Speed I/O</td>");
		strcat(buf, "<td>Elap (s)</td>");
		strcat(buf, "<td>Band</td>");
		strcat(buf, "<td>F</td>");
		strcat(buf, "<td>Tks</td>");
		strcat(buf, ext);
		strcat(buf, "</tr>\n");
	} else {
		strcat(buf, ext);
	}

	#if 1
	bi = 0; bo = 0; dT = 0;
	C = Conn_masters.head;
	while (C) {
		if (C->state == CONN_STATE_FREE)
			continue;

		if (C->type == CONN_TYPE_P2P) {
			bi += C->bi;
			bo += C->bo;
			dT += (unsigned int) (Conn_now.tv_sec - C->start);
		}

		if (flags & 1)
			strcat(buf, "<tr bgcolor=\"ffffff\">\n");

		if ((flags & 1) == 0)
			per_slot = Conn_status_slot(C);
		else
			per_slot = Conn_status_slot_html(C);
		tmp_len = snprintf(tmp, sizeof(tmp), "%s\n", per_slot);
		if (tmp_len < 0) {
			free(buf);
			return strdup("snprintf error");
		}
		len += (unsigned int) tmp_len;
		if (len < max)
			strcat(buf, tmp);

		if (flags & 1)
			strcat(buf, "</tr>\n");

		C = C->next;
	}
	#endif

	if (flags & 1)
		strcat(buf, "</table>\n");

	if (dT == 0)
		dT = 1;

	Conn_speed(speedi, sizeof(speedi), (unsigned int)(bi / dT));
	Conn_speed(speedo, sizeof(speedo), (unsigned int)(bo / dT));

	tmp_len = snprintf(tmp, sizeof(tmp), "Total speed I/O: %s/%s"
		" Total bytes I/O: %llu/%llu\n",
		speedi, speedo, bi, bo);
	if (tmp_len < 0) {
		free(buf);
		return strdup("snprintf error");
	}
	if (len + (unsigned int) tmp_len < max)
		strcat(buf, tmp);

	return buf;
}
#endif

/*
 * Set Conn->private pointer
 */
void Conn_set_private(struct Conn *C, void *priv)
{
	C->private = priv;
}

/*
 * Returns Conn->private pointer
 */
void *Conn_get_private(struct Conn *C)
{
	return C->private;
}

/*
 * Returns the number of bytes in 'in' buffer
 */
unsigned int Conn_iqlen(const struct Conn *C)
{
	return C->ibuf_tail - C->ibuf_head;
}

/*
 * Returns the number of bytes in 'out' buffer
 */
unsigned int Conn_oqlen(const struct Conn *C)
{
	return C->obuf_tail - C->obuf_head;
}

/*
 * Returns the number of bytes in 'in' buffer (obsolete)
 */
unsigned int Conn_qlen(const struct Conn *C)
{
	return Conn_iqlen(C);
}

/*
 * Returns 1 if we can ignore this connection
 * TODO - really needed?
 */
static int Conn_ignore(struct Conn *C)
{
	if (C->error_state > 0)
		return 1;

	if (C->state == CONN_STATE_FREE)
		return 1;

	return 0;
}

/*
 * Close a connection if it exceeded maximum idle time or got a timeout
 */
static void Conn_expire(struct Conn *C)
{
	long long diff_ms;

	if (C->trigger > 0) {
		/* We do not trigger first time */
		if (C->last_trigger == 0)
			C->last_trigger = Conn_now.tv_sec;

		if ((C->last_trigger > 0)
			&& (C->last_trigger + C->trigger < Conn_now.tv_sec)) {
			if (C->cbs.trigger)
				C->cbs.trigger(C);
			C->last_trigger = C->last_trigger + C->trigger;
		}
	}

	if ((C->idle > 0) && (C->trecv.tv_sec + C->idle < Conn_now.tv_sec)) {
		C->error_state = CONN_ERROR_EXPIRED;
	} else if ((C->read_timeout > 0) && (C->tsend.tv_sec > 0)
		&& (C->tsend.tv_sec > C->trecv.tv_sec)) {
		diff_ms = Conn_time_diff(&Conn_now, &C->tsend);
		if (diff_ms > C->read_timeout) {
			C->error_state = CONN_ERROR_READ_TIMEOUT;
		}
	} else if ((C->conn_timeout > 0) && (C->state == CONN_STATE_CONNECT_b)) {
		diff_ms = Conn_time_diff(&Conn_now, &C->conn_syn);
		if (diff_ms > C->conn_timeout) {
			/* connection attempt expired */
			C->error_state = CONN_ERROR_CONN_TIMEOUT;
		}
	}
}

/*
 * Set NODELAY on socket
 */
int Conn_nodelay(const struct Conn *C)
{
	int i = 1;

	return setsockopt(C->fd, SOL_TCP, TCP_NODELAY, &i, sizeof(i));
}

void Conn_rollback(struct Conn *C, const unsigned int bytes)
{
	if (C->obuf_tail - C->obuf_head <= bytes)
		C->obuf_tail -= bytes;
}

/*
 * Returns a pointer to current in buffer
 */
char *Conn_ibuf(const struct Conn *C)
{
	return C->ibuf + C->ibuf_head;
}

/*
 * Returns a pointer to current out buffer
 */
char *Conn_obuf(const struct Conn *C)
{
	return C->obuf + C->obuf_head;
}

/*
 * Returns the id of a connection (obsolete)
 */
unsigned long long Conn_getid(const struct Conn *C)
{
	return C->id;
}

/*
 * Returns the id of a connection (obsolete)
 */
unsigned long long Conn_get_id(const struct Conn *C)
{
	return C->id;
}

/*
 * Returns a Conn* searching by id
 */
struct Conn *Conn_get(const unsigned long long id)
{
	struct Conn *R = NULL;
	unsigned int i;

	for (i = 0; i < Conn_no; i++) {
		if (Conns[i].id == id) {
			R = &Conns[i];
			break;
		}
	}

	return R;
}

/*
 * Set a callback
 */
int Conn_set_cb(struct Conn *C, const unsigned int cb_type,
	void (*f)(struct Conn *))
{
	Log(12, "%s %u %p\n", __func__, cb_type, f);
	switch (cb_type) {
	case CONN_CB_ACCEPT:		C->cbs.accept = f; break;
	case CONN_CB_RECV:		C->cbs.recv = f; break;
	case CONN_CB_SEND:		C->cbs.send = f; break;
	case CONN_CB_DATA:		C->cbs.data = f; break;
	case CONN_CB_CLOSE:		C->cbs.close = f; break;
	case CONN_CB_TRIGGER:		C->cbs.trigger = f; break;
	case CONN_CB_ERROR:		C->cbs.error = f; break;
	case CONN_CB_CONNECTED:		C->cbs.connected = f; break;
	case CONN_CB_ACCEPT_ERROR:	C->cbs.accept_error = f; break;
	case CONN_CB_WORKER_START:	C->cbs.worker_start = f; break;
	default:			return -1;
	}

	return 0;
}

/*
 * Search for str in active buffer from a given offset
 * Returns pointer to string if match or NULL if doesn't.
 * @flags: bit 0 == 1 => case insensitive
 */
char *Conn_ostrstr(struct Conn *C, const unsigned int off, const char *str,
	const unsigned int flags)
{
	unsigned int len, i;
	size_t str_len;
	char *buf, *ret = NULL;
	int err;

	len = C->ibuf_tail - C->ibuf_head - off;
	buf = C->ibuf + C->ibuf_head + off;
	str_len = strlen(str);

	if (len < str_len)
		return NULL;

	i = 0;
	while (i <= len - str_len) {
		if (flags & 1)
			err = strncasecmp(buf + i, str, str_len);
		else
			err = strncmp(buf + i, str, str_len);
		if (err == 0) {
			ret = buf + i;
			break;
		}

		i++;
	}

	return ret;
}

/*
 * Search for str in active buffer
 * Returns pointer to string if match or NUll if doesn't.
 */
char *Conn_strstr(struct Conn *C, const char *str)
{
	return Conn_ostrstr(C, 0, str, 0);
}

/*
 * Search for str in active buffer (case insensitive)
 * Returns pointer to string if match or NUll if doesn't.
 */
char *Conn_strcasestr(struct Conn *C, const char *str)
{
	return Conn_ostrstr(C, 0, str, 1);
}

/*
 * Returns a '\0' terminated line, modifying received buffer
 */
char *Conn_get_line(struct Conn *C)
{
	char *cr;

	cr = Conn_ostrstr(C, 0, "\n", 1);
	if (!cr)
		return NULL;

	*cr = '\0';

	return Conn_ibuf(C);
}

/*
 * Cut from right, in place, the chars specified in @chars.
 */
static void Conn_rtrim(char *s, const char *chars)
{
	char *e;

	if (!s || (*s == '\0'))
		return;

	e = s + strlen(s) - 1;
	while ((e >= s) && (strchr(chars, *e))) {
		*e = '\0';
		e--;
	}
}

/*
 * Helper for building text line daemons
 */
void Conn_for_every_line(struct Conn *C, void (*cb)(struct Conn *C, char *line))
{
	if (cb == NULL)
		return;

	while (1) {
		char *line;
		size_t line_size;

		/* TODO: Conn_get_line may also return the size */
		line = Conn_get_line(C);
		if (line == NULL)
			break;

		line_size = strlen(line) + 1;

		Conn_rtrim(line, "\r");

		cb(C, line);

		Conn_eat(C, (unsigned int) line_size);
	}
}

/*
 * Formats a line and puts it in the queue
 * Returns -1 on error.
 */
int Conn_printf(struct Conn *C, const char *format, ...)
{
	va_list ap;
	int ret;

	ret = 0;
	va_start(ap, format);
	while (likely(*format)) {
		char *s;
		ssize_t slen;
		unsigned long len;
		char tmp[64];
		int d, r;

		/* Find first formatting char */
		s = strchr(format, '%');
		if (unlikely(!s)) {
			r = Conn_enqueue_wait(C, format, strlen(format));
			ret = r == -1 ? -1 : 0;
			break;
		}

		len = (long unsigned int) (s - format);
		r = Conn_enqueue_wait(C, format, (unsigned int) len);
		/* TODO: Should we call error callback and close the connection? */
		if (unlikely(r == -1)) {
			ret = -1;
			break;
		}

		format = s + 1;
		s = tmp;
		switch (*format) {
		case '%': tmp[0] = '%'; len = 1; break;

		case 's':
			s = va_arg(ap, char *);
			len = strlen(s);
			break;

		case 'd':
			d = va_arg(ap, int);
			/* TODO: replace this with another function. Doh! */
			slen = snprintf(tmp, sizeof(tmp), "%d", d);
			if (slen < 0)
				len = 0;
			else
				len = (unsigned long) slen;
			break;

		case 'c':
			tmp[0] = (char) va_arg(ap, int);
			len = 1;
			break;

		case '\0': s = NULL; break;

		default:
			Log(0, "Unknown format [%c]!\n", *format);
			abort();
		}

		if (likely(s)) {
			r = Conn_enqueue_wait(C, s, (unsigned int) len);
			if (unlikely(r == -1)) {
				ret = -1;
				break;
			}
			format++;
		}
	}
	va_end(ap);

	return ret;
}

/*
 * This is called after allocation to init some fields.
 */
__hot static struct Conn *Conn_alloc_prepare(struct Conn *C)
{
	void *p;

	if (unlikely(Conn_no > Conn_max_reached))
		Conn_max_reached = Conn_no;

	/* We assume that will be a master */
	C->type = CONN_TYPE_MASTER;
	C->state = CONN_STATE_EMPTY;
	C->error_state = CONN_ERROR_NO_ERROR;
	C->xerrno = 0;

	C->sock_protocol = 0;
	C->sock_domain = PF_INET;
	C->sock_type = SOCK_STREAM;

	if (unlikely(C->ibuf_size < Conn_default_ibuf)) {
		p = realloc(C->ibuf, Conn_default_ibuf);
		if (unlikely(p == NULL)) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Memory allocation error2!");
			return NULL;
		}
		Conn_mem_buffers_in += Conn_default_ibuf - C->ibuf_size;
		C->ibuf = p;
		C->ibuf_size = Conn_default_ibuf;
	}
	C->ibuf_head = 0;
	C->ibuf_tail = 0;

	if (unlikely(C->obuf_size < Conn_default_obuf)) {
		p = realloc(C->obuf, Conn_default_obuf);
		if (unlikely(p == NULL)) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Memory allocation error3!");
			return NULL;
		}
		Conn_mem_buffers_out += Conn_default_obuf - C->obuf_size;
		C->obuf = p;
		C->obuf_size = Conn_default_obuf;
	}
	C->obuf_head = 0;
	C->obuf_tail = 0;

	C->trecv = Conn_now;

	C->bi = 0;
	C->bo = 0;
	C->private = NULL;

	/* Reset syn time */
	C->conn_syn.tv_sec = 0;
	C->conn_syn.tv_usec = 0;

	/* flags */
	C->auto_reconnect = 0;
	C->close_after_send = 0;
	C->shutdown_after_send = 0;
	C->local_dirty = 1;
	C->remote_dirty = 1;

	/* bandwidth */
	C->band_width = 0;
	C->band_factor = 0;
	C->band_tokens = 0;
	C->band_lasttime = Conn_now;

	C->fd = -1;

	C->start = Conn_now.tv_sec;

	C->id = Conn_id++;

	/* TODO: really? Don't we borrow them from master listen conn?! */
	C->cbs = Conn_default_cbs;

	C->web = NULL;
	C->web_req = NULL;

	C->wp = NULL;

	Conn_no++;
	/* Conn_work_to_do will not be incremented here, only in commit! */

	Log(10, "Allocated id %llu. Now Conn_no=%d.\n",
		C->id, Conn_no);

	return C;
}

/*
 * Allocates a Conn structure. This is called by the user.
 */
__hot struct Conn *Conn_alloc(void)
{
	struct Conn *C;

	Log(10, "%s Conn_no=%d Conn_max=%d\n",
		__func__, Conn_no, Conn_max);

	if (unlikely((Conn_max > 0) && (Conn_no >= Conn_max))) {
		snprintf(Conn_error, sizeof(Conn_error),
			"Limit reached! Consider a raise of max connection number or put 0 for no limit.");
		return NULL;
	}

	if (unlikely(Conn_free.head == NULL)) {
		int r;

		r = pool_grow(&Conn_free, 4);
		if (unlikely(r != 0)) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot grow anymore. Probably memory shortage.");
			return NULL;
		}
	}

	/* Steal from free list */
	C = Conn_free.head;
	Conn_free.head = C->next;
	C->next = NULL;

	return Conn_alloc_prepare(C);
}

/*
 * Allocates a Conn structure, worker version
 */
__hot static struct Conn *Conn_alloc_worker(struct Conn_wpool_worker *w)
{
	struct Conn *C;

	if (unlikely(w->free.head == NULL)) {
		int r;

		r = pool_grow(&w->free, CONN_BULK_ALLOC);
		if (unlikely(r != 0)) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot grow anymore. Probably memory shortage.");
			return NULL;
		}
	}

	/* Steal from free list */
	C = w->free.head;
	w->free.head = C->next;
	C->next = NULL;
	C->worker = w;

	return Conn_alloc_prepare(C);
}

/*
 * Accepting a connection
 * "C" is shared with the other threads. TODO: make it less ugly.
 */
__hot static void Conn_accept(struct Conn *C)
{
	int fd, err;
	struct sockaddr *pca;
	struct sockaddr_in ca4;
	struct sockaddr_in6 ca6;
	socklen_t cax_len;
	struct Conn *X;

	if (unlikely(Conn_debug_level > 9)) {
		Log(0, "Accepting a connection via %s/%d, fd %d, type %s"
			", domain %s, protocol %s.\n",
			C->bind_addr, C->bind_port, C->fd, Conn_type(C),
			Conn_domain(C), Conn_get_socket_protocol(C));
	}

	switch (C->sock_domain) {
		case PF_INET:
			pca = (struct sockaddr *) &ca4;
			cax_len = sizeof(ca4);
			break;

		case PF_INET6:
			pca = (struct sockaddr *) &ca6;
			cax_len = sizeof(ca6);
			break;

		default:
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot deal with domain %d.",
				C->sock_domain);
			Conn_error_raise(C, EAFNOSUPPORT);
			return;
	}

	again:
	fd = accept4(C->fd, pca, &cax_len, SOCK_NONBLOCK | SOCK_CLOEXEC);
	if (unlikely(fd == -1)) {
		if (errno == EAGAIN)
			return;

		if (errno == EINTR)
			goto again;

		/* TODO: ratelimit */
		Log(2, "WARN: Cannot accept on fd %d [%s].\n",
			C->fd, strerror(errno));
		/*
		 * We must not raise an error here because we will close the
		 * master socket!
		 * TODO: We should signal it as a warning.
		 */
		/* Conn_error_raise(C, errno); */
		return;
	}

	X = Conn_alloc_worker(C->worker);
	if (unlikely(!X)) {
		/* TODO: really? We will close the master fd! */
		Conn_error_raise(C, ENOMEM);
		close(fd);
		return;
	}

	X->cbs = C->cbs;
	X->fd = fd;
	X->type = CONN_TYPE_P2P;
	X->state = CONN_STATE_OPEN;
	X->time_open = Conn_now;
	X->web = C->web;
	if (X->web) {
		X->web_req = malloc(sizeof(struct Conn_web_request));
		if (!X->web_req) {
			Conn_error_raise(C, ENOMEM);
			close(fd);
			return;
		}
		memset(X->web_req, 0, sizeof(struct Conn_web_request));
	}

	/* We should replace them with a pointer to master C */
	Conn_set_socket_domain(X, C->sock_domain);
	Conn_set_socket_type(X, C->sock_type);
	Conn_set_socket_protocol(X, C->sock_protocol);

	X->local_dirty = 1;
	X->remote_dirty = 1;

	/*TODO err = Conn_add_obj(C->worker->epoll_fd, X, EPOLLIN | EPOLLRDHUP);*/
	err = Conn_add_obj(C->worker->epoll_fd, X, EPOLLIN);
	if (unlikely(err != 0)) {
		/* TODO: add a specific callback for enqueue errors? */
		Conn_error_raise(C, CONN_ERROR_INTERNAL);
		Conn_free_intern(X);
		return;
	}

	/* TODO: is possible to not be really needed
	Conn_nodelay(X);
	*/

	#if 1
	if (X->cbs.accept)
		X->cbs.accept(X);
	#else
	Conn_default_cbs_accept(X);
	#endif

	Conn_work_to_do++;

	Conn_total++;
	/* TODO: not real correct! */
	C->worker->in_clients++;

	/* We must call accept till we get EAGAIN */
	/* TODO: seems is not a good idea afterall. We may starve the rest of connections.
	goto again;
	*/
}

/*
 * Callback that is called for every connection
 */
static inline void Conn_poll_cb(struct Conn *C, const unsigned int revents)
{
	if (unlikely(Conn_debug_level > 5)) {
		char poll_status[16];

		Conn_poll_status(revents, poll_status);
		Log(0, "%llu %s revents=%s\n",
			C->id, __func__, poll_status);

		Log(12, "\t%s\n", Conn_status_slot(C));
		/* TODO: we should prefix the outout with a \t? */
	}

	/* We may receive several events on the same conn */
	if (unlikely(C->state == CONN_STATE_FREE))
		return;

	if (unlikely(revents & EPOLLERR)) {
		Log(0, "\tEPOLLERR!\n");
		C->error_state = CONN_ERROR_POLL;
		C->xerrno = 0; /* TODO: unknown error? */
		Conn_free_intern(C);
		/* TODO: CONN_ERROR_POLL is correct here? */
		return;
	}

	if (unlikely(revents & EPOLLHUP)) {
		Log(9, "\tEPOLLHUP!\n");
		C->error_state = CONN_ERROR_HANGUP;
		Conn_free_intern(C);
		return;
	}

	/* First, test we have a new connection */
	if (revents & EPOLLOUT) {
		if (C->state == CONN_STATE_CONNECT_b) {
			Log(9, "\tWe just established a connection.\n");

			C->state = CONN_STATE_OPEN;
			C->local_dirty = 1;
			C->time_open = Conn_now;

			/* TODO: indirect call costs a lot. Hm. */
			if (likely(C->cbs.connected))
				C->cbs.connected(C);
			else
				Conn_default_cbs_connected(C);
		}
	}
	//if (Conn_ignore(C))
	//	return;

	/* Second, test for hangup or input */
	if (likely(revents & EPOLLIN)) {
		if (unlikely(C->type == CONN_TYPE_MASTER)) {
			Conn_accept(C);
		} else {
			Log(9, "\tWe have input\n");
			/* TODO: use a callback or not?
			if (likely(C->cbs.recv))
				C->cbs.recv(C);
			*/
			Conn_default_cbs_recv(C);
		}
	}
	if (Conn_ignore(C))
		return;

	/* RDHUP may come with POLLIN, so it must be called after */
	if (unlikely(revents & EPOLLRDHUP)) {
		Log(9, "\tEPOLLRDHUP!\n");
		/* TODO: What we do here? */
	}
	if (Conn_ignore(C))
		return;

	if (likely(revents & EPOLLOUT)) {
		Log(9, "\tWe can send data (state=%hhu)...\n", C->state);
		if (likely(C->state == CONN_STATE_OPEN)) {
			/*
			if (likely(C->cbs.send))
				C->cbs.send(C);
			*/
			Conn_default_cbs_send(C);
		}
	}
}


/*
 * Dumps some statistics
 */
static void Conn_stats(void)
{
}


/* ############## wpool 2 ############# */
/*
 * Dispatch events to a callback
 * Returns: -1 on error, 0 nothing to do, n (>0) if some work was done
 * timeout is in miliseconds.
 */
static inline int Conn_dispatch_events(struct Conn_wpool_worker *w,
	int epoll_fd, struct epoll_event *e,
	const unsigned short e_size, const int timeout)
{
	int i, events;
	struct Conn *C;
	ssize_t r;

	Log(10, "%s epoll_fd=%d timeout=%dms e_size=%hu...\n", __func__,
		epoll_fd, timeout, e_size);
	while (1) {
		events = epoll_wait(epoll_fd, e, e_size, timeout);
		if (unlikely(events < 0)) {
			if (errno == EINTR)
				continue;

			Log(1, "%s [%s]\n",
				__func__, strerror(errno));
			snprintf(Conn_error, sizeof(Conn_error), "epoll error (errno=%d) (%s)",
				errno, strerror(errno));
			return -1;
		}

		break;
	}

	if (events == 0)
		return 0;

	if (unlikely(Conn_debug_level > 8)) {
		char sevents[16];

		Log(0, "\tProcessing %d event(s):\n", events);
		for (i = 0; i < events; i++) {
			C = e[i].data.ptr;
			Conn_poll_status(e[i].events, sevents);
			Log(0, "\tptr=%p ### ev i=%d %s\n", C, i, sevents);
		}
	}
	for (i = 0; i < events; i++) {
		/* Special events */
		if (unlikely(e[i].data.fd == 1)) {
			/* TODO: we may have another message type! */
			Log(0, "We have a master signaling. TODO!\n");
			exit(0);
			continue;
		}

		if (unlikely(e[i].data.fd == 2)) {
			uint64_t ticks;

			r = read(w->timer_fd, &ticks, sizeof(ticks));
			if (r != 8) {
				Log(0, "\tCannot read timer fd (%s).\n", strerror(errno));
				continue;
			}
			//Log(10, "We have a timer tick (ticks %llu).\n", ticks);
			gettimeofday(&Conn_now, NULL);
			/* TODO: optimize here and try to deduct time! */
			continue;
		}

		Conn_poll_cb(e[i].data.ptr, e[i].events);
	}

	return events;
}

/*
 * Start a worker
 */
static int Conn_wpool_start_worker(struct Conn *C, struct Conn_wpool_worker *w,
	int listen_fd)
{
	int r, pipes[2];
	long cpus;
	struct epoll_event ev;
	cpu_set_t cpuset;
	struct itimerspec new_value;
	pid_t pid;
	char tmp[32];

	Log(10, "Starting worker (id=%hu, listen_fd=%d)...\n", w->id, listen_fd);

	r = socketpair(AF_LOCAL, SOCK_STREAM, 0, pipes);
	if (r != 0) {
		Log(0, "Cannot call socketpair (%s)\n", strerror(errno));
		return -1;
	}

	pid = fork();
	if (pid == -1) {
		Log(0, "%s: Cannot fork: %s!\n",
			__func__, strerror(errno));
		goto close_pipe;
	}

	if (pid != 0) { /* parent */
		Log(10, "Started worker (id=%hu) pipe=%d\n",
			w->id, pipes[0]);
		close(pipes[1]);
		w->control = pipes[0];
		return 0;
	}

	/* Here we are in the child process */

	snprintf(tmp, sizeof(tmp), "worker %hu", w->id);
	Conn_log_set_info(tmp);
	w->control = pipes[1];
	close(pipes[0]);
	C->fd = listen_fd;

	if (C->type == CONN_TYPE_MASTER)
		listen(C->fd, 4096);

	/* TODO: if we have gaps, the allocation is not correct! */
	cpus = sysconf(_SC_NPROCESSORS_ONLN);
	Log(1, "%ld cpus found.\n", cpus);
	CPU_ZERO(&cpuset);
	CPU_SET((size_t)(w->id % cpus), &cpuset);
	r = sched_setaffinity(0, sizeof(cpu_set_t), &cpuset);
	if (r != 0) {
		Log(0, "%s: Cannot set affinity: %s\n",
			__func__, strerror(errno));
	} else {
		Log(1, "%s: Affinity for worker %hu: cpu %u\n",
			__func__, w->id, w->id % cpus);
	}

	C->worker = w;

	w->conns_head = NULL;
	pool_init(&w->free);
	w->mem_structs = 0; /* TODO: really used? */
	w->in_clients = 0;
	w->pid = getpid();

	w->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
	if (w->epoll_fd == -1) {
		Log(0, "Cannot create worker epoll (%s).\n", strerror(errno));
		exit(1);
	}

	/* Register our end of the pipe to receive signaling from parent */
	memset(&ev, 0, sizeof(struct epoll_event));
	ev.events = EPOLLIN;
	ev.data.u64 = 1; /* TODO: Find a better id */
	r = epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, w->control, &ev);
	if (r != 0) {
		Log(0, "Cannot add control fd to epoll (%s).\n", strerror(errno));
		exit(1);
	}

	w->timer_fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC);
	if (w->timer_fd == -1) {
		Log(0, "Cannot create timer fd (%s).\n", strerror(errno));
		exit(1);
	}
	/* arm timer - once per second */
	new_value.it_value.tv_sec = 1;
	new_value.it_value.tv_nsec = 0;
	new_value.it_interval.tv_sec = 1;
	new_value.it_interval.tv_nsec = 0;
	r = timerfd_settime(w->timer_fd, 0 /* relative */, &new_value, NULL);
	if (r == -1) {
		Log(0, "Cannot arm timer (%s).\n", strerror(errno));
		exit(1);
	}
	/* add timer to poll */
	memset(&ev, 0, sizeof(struct epoll_event));
	ev.events = EPOLLIN;
	ev.data.u64 = 2; /* TODO: Find a better id */
	r = epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, w->timer_fd, &ev);
	if (r != 0) {
		Log(0, "Cannot add timer to epoll (%s).\n", strerror(errno));
		exit(1);
	}

	/* Register C->fd */
	memset(&ev, 0, sizeof(struct epoll_event));
	ev.events = EPOLLIN;
	ev.data.ptr = C;
	r = epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, C->fd, &ev);
	if (r != 0) {
		Log(0, "Cannot add listen_fd %d to epoll (%s).\n",
			C->fd, strerror(errno));
		exit(1);
	}

	w->inited = 1;

	Log(10, "%s: worker prepared, epoll=%d control=%d fd=%d timer=%d\n",
		__func__, w->epoll_fd, w->control, C->fd, w->timer_fd);

	if (C->cbs.worker_start)
		C->cbs.worker_start(C);

	while (1) {
		Conn_dispatch_events(w, w->epoll_fd, w->events,
			CONN_EVENTS_SLOTS, -1);

		//sched_yield();
		//Conn_wpool_worker_stats(w);
	}

	exit(1);

	close_pipe:
	close(pipes[0]);
	close(pipes[1]);

	return -1;
}


/* ############## misc ############# */
/*
 * Reads a value from proc
 */
static void Conn_read_proc(char *buf, const size_t buf_size, const char *file)
{
	int fd;
	ssize_t n;

	fd = open(file, O_RDONLY);
	if (fd == -1) {
		snprintf(buf, buf_size, "ERROR_OPEN!");
		return;
	}

	n = read(fd, buf, buf_size - 1);
	if (n == -1) {
		snprintf(buf, buf_size, "ERROR_READ!");
	} else if (n == 0) {
		strcpy(buf, "");
	} else {
		buf[n - 1] = '\0';
		n--;

		while ((n >= 0) && (buf[n - 1] == '\n')) {
			buf[n - 1] = '\0';
			n--;
		}
	}

	close(fd);
}

/*
 * Returns some important system values
 */
static void Conn_sys(void)
{
	char somaxconn[16];
	char tcp_max_tw_buckets[16];
	char tcp_fin_timeout[16];

	Conn_read_proc(somaxconn, sizeof(somaxconn),
		"/proc/sys/net/core/somaxconn");
	Conn_read_proc(tcp_max_tw_buckets, sizeof(tcp_max_tw_buckets),
		"/proc/sys/net/ipv4/tcp_max_tw_buckets");
	Conn_read_proc(tcp_fin_timeout, sizeof(tcp_fin_timeout),
		"/proc/sys/net/ipv4/tcp_fin_timeout");

	Log(1, "net.core.somaxconn=%s"
		" net.ipv4.tcp_max_tw_buckets=%s"
		" net.ipv4.tcp_fin_timeout=%s\n",
		somaxconn, tcp_max_tw_buckets, tcp_fin_timeout);
}


/* ############## split ############ */
/*
 * Free a prealocated structure
 */
void Conn_split_free(struct Conn_split **s)
{
	struct Conn_split *p;
	struct Conn_split_cell *q, *next;

	if (!s)
		return;

	p = *s;
	if (!p)
		return;

	q = p->head;
	while (q) {
		next = q->next;
		free(q);
		q = next;
	}

	if (p->line)
		free(p->line);

	free(p);
}

/*
 * Split a buffer pointed by C to var,value pairs.
 */
struct Conn_split *Conn_split(const char *line0)
{
	char *p;
	struct Conn_split *ret;
	struct Conn_split_cell *q;
	char search_for;
	char *left, *right;
	unsigned int right_len;

	ret = (struct Conn_split *) calloc(1, sizeof(struct Conn_split));
	if (!ret) {
		snprintf(Conn_error, sizeof(Conn_error),
			"cannot alloc memory for Conn_split!\n");
		return NULL;
	}

	ret->line = strdup(line0);
	if (!ret->line) {
		snprintf(Conn_error, sizeof(Conn_error),
			"cannot alloc memory for line duplication!\n");
		goto free;
	}

	Conn_rtrim(ret->line, "\r\n \t");

	/* do the spliting */
	p = ret->line;
	while (*p != '\0') {
		/* skip empty space */
		while ((*p == ' ') || (*p == '\t'))
			p++;

		if (*p == '\0')
			break;

		/* Init */
		right = "";
		right_len = 0;

		/* Building left */
		left = p;
		while ((*p != '\0') && (*p != '='))
			p++;
		if (*p != '\0') {
			*p = '\0';

			/* skip '=' */
			p++;

			/* Building right */
			right = p;

			search_for = ' ';
			if (*p == '"') {
				search_for = '"';
				p++;
			}

			while ((*p != '\0') && (*p != search_for)) {
				right_len++;
				p++;
			}

			if (*p != '\0') {
				*p = '\0';
				p++;
			}
		}

		/* alloc data and fill it */
		q = (struct Conn_split_cell *) calloc(1, sizeof(struct Conn_split_cell));
		if (!q) {
			snprintf(Conn_error, sizeof(Conn_error),
				"cannot alloc memory!\n");
			goto free;
		}
		q->left = left;
		q->right = right;
		q->right_len = right_len;

		if (ret->head == NULL)
			ret->head = q;
		else
			ret->tail->next = q;

		ret->tail = q;
	}

	return ret;

	free:
	Conn_split_free(&ret);

	return NULL;
}

/*
 * Search for a string and return the value
 */
char *Conn_split_get_size(const struct Conn_split *s, const char *left,
	unsigned int *size)
{
	struct Conn_split_cell *p;

	p = s->head;
	while (p) {
		if (strcmp(left, p->left) == 0) {
			if (size != NULL)
				*size = p->right_len;
			return p->right;
		}
		p = p->next;
	}

	return NULL;
}

/*
 * Search for a string and return the value
 */
char *Conn_split_get_e(const struct Conn_split *s, const char *l)
{
	return Conn_split_get_size(s, l, NULL);
}

/*
 * Search for a string and return the value or "" if not found
 */
char *Conn_split_get(const struct Conn_split *s, const char *l)
{
	char *r;

	r = Conn_split_get_size(s, l, NULL);
	if (!r)
		r = "";

	return r;
}

/*
 * Return a value as unsigned long
 */
unsigned long Conn_split_get_ul(const struct Conn_split *s, const char *l,
	const unsigned int base)
{
	char *r;
	unsigned long ret = 0;

	r = Conn_split_get_e(s, l);
	if (r)
		ret = strtoul(r, NULL, (int) base);

	return ret;
}

/*
 * Return a value as unsigned long long
 */
unsigned long long Conn_split_get_ull(const struct Conn_split *s, const char *l,
	unsigned int base)
{
	char *r;
	unsigned long long ret = 0;

	r = Conn_split_get_e(s, l);
	if (r)
		ret = strtoull(r, NULL, (int) base);

	return ret;
}

/*
 * Return a value as double
 */
double Conn_split_get_d(const struct Conn_split *s, const char *l)
{
	char *r;
	double ret = 0;

	r = Conn_split_get_e(s, l);
	if (r)
		ret = strtod(r, NULL);

	return ret;
}

#if 0
/*
 * Returns 1 if the string contains only 0-9a-zA-Z. Else 0
 */
static int Conn_alphanum(const char *s)
{
	size_t i, len;

	len = strlen(s);
	for (i = 0; i < len; i++)
		if (isalnum(s[i]) == 0)
			return 0;

	return 1;
}
#endif

/*
 * Returns the lifetime of a connection
 */
unsigned long long Conn_lifetime(struct Conn *C)
{
	return (unsigned long long) Conn_time_diff(&Conn_now, &C->time_open);
}

char *Conn_strerror(void)
{
	return Conn_error;
}

void Conn_set_error(const char *format, ...)
{
	va_list	ap;

	va_start(ap, format);
	vsnprintf(Conn_error, sizeof(Conn_error), format, ap);
	va_end(ap);
}

/*
 * Difference between two timeval strutures, in milliseconds
 */
long long Conn_time_diff(const struct timeval *t1, const struct timeval *t2)
{
	return (t1->tv_sec - t2->tv_sec) * 1000
		+ (t1->tv_usec - t2->tv_usec) / 1000;
}

/*
 * Returns string representation of errno code
 */
char *Conn_errno(const struct Conn *C)
{
	static char buf[256];
	char *is;

	switch (C->error_state) {
		case CONN_ERROR_USERREQ:	is = "user"; break;
		case CONN_ERROR_POLL:		is = "poll"; break;
		case CONN_ERROR_RECV:		is = "recv"; break;
		case CONN_ERROR_SEND:		is = "send"; break;
		case CONN_ERROR_SOCKET:		is = "socket"; break;
		case CONN_ERROR_HANGUP:		is = "hangup"; break;
		case CONN_ERROR_GETADDRINFO:	is = "lookup error"; break;
		case CONN_ERROR_EXPIRED:	is = "expired"; break;
		case CONN_ERROR_ACCEPT:		is = "accept"; break;
		case CONN_ERROR_MEM:		is = "allocation failed"; break;
		case CONN_ERROR_CONNECT:	is = "connect"; break;
		case CONN_ERROR_READ_TIMEOUT:	is = "read timeout"; break;
		case CONN_ERROR_CONN_TIMEOUT:	is = "conn timeout"; break;
		case CONN_ERROR_INTERNAL:	is = "internal error"; break;
		default:			is = "?"; break;
	}

	snprintf(buf, sizeof(buf), "%s (%s)",
		is, (C->xerrno > 0) ? strerror(C->xerrno) : "-");

	return buf;
}

int Conn_init(const unsigned int max)
{
	int sock, i, ret;

	if (Conn_inited == 1)
		return 0;

	Conn_log_set_info("main");

	/* TODO: masters does not need a full pool implementation */
	pool_init(&Conn_masters);
	pool_init(&Conn_free);
	pool_init(&Conn_connect_list);

	Conn_max = max;

	Conn_no = 0;
	Conn_work_to_do = 0;
	Conn_total = 0;
	Conn_max_reached = 0;
	gettimeofday(&Conn_now, NULL);
	Conn_start = (unsigned int) Conn_now.tv_sec;
	Conn_allocated = 0;

	snprintf(Conn_error, sizeof(Conn_error), "%s", "");

	Conn_epoll_fd = epoll_create(32);
	if (Conn_epoll_fd == -1) {
		Log(0, "Cannot create epoll fd (%s)\n", strerror(errno));
		return -1;
	}

	Conn_inited = 1;

	sock = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
	if (sock == -1) {
		Log(0, "Cannot create a simple socket (%s)\n", strerror(errno));
		return -1;
	}
	i = 1;
	ret = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &i, sizeof(i));
	if (ret >= 0)
		Conn_reuseport_available = 1;
	close(sock);
	Log(10, "SO_REUSEPORT is %savailable\n",
		Conn_reuseport_available ? "" : "not ");

	/* Log some info about this system */
	Conn_sys();

	Log(0, "sizeof(struct Conn)=%u\n", sizeof(struct Conn));

	return 0;
}

/*
 * Shutdown Conn
 */
int Conn_shutdown(void)
{
	Conn_inited = 0;

	close(Conn_epoll_fd);

	/* Free all buffers */
	#if 0
	TODO
	Log(5, "Freeing %u slots...\n", Conn_allocated);
	for (slot = 0; slot < Conn_allocated - 1; slot++) {
		if (Conns[slot].ibuf)
			free(Conns[slot].ibuf);
		if (Conns[slot].obuf)
			free(Conns[slot].obuf);

		if (Conns[slot].fd != -1) {
			Log(6, "Closing slot %u, fd %d...\n",
			    slot, Conns[slot].fd);
			close(Conns[slot].fd);
		}
	}
	#endif

	free(Conns);

	return 0;
}

/*
 * Enqueues data but does not kick the sending
 * Returns -1 on error, 0 if ok.
 */
__hot int Conn_enqueue_wait(struct Conn *C, const void *buf, const unsigned int count)
{
	if (unlikely(Conn_debug_level >= 10)) {
		char *dump;

		dump = Conn_dump(buf, count);
		Log(0, "%llu %s Try to enqueue %d byte(s) [%s]...\n",
			C->id, __func__, count, dump);
		free(dump);
	}

	if (unlikely(C->obuf_size - C->obuf_tail < count)) {
		int r;

		r = Conn_try_expand_buf(C, CONN_OUT, count);
		if (unlikely(r != 0))
			return -1;
	}

	memcpy(C->obuf + C->obuf_tail, buf, count);
	C->obuf_tail += count;

	return 0;
}

/*
 * Enqueues data and kick the sending
 */
__hot int Conn_enqueue(struct Conn *C, const void *buf, const unsigned int count)
{
	int ret;

	ret = Conn_enqueue_wait(C, buf, count);

	if (likely(ret == 0))
		Conn_default_cbs_send(C);

	return ret;
}

/*
 * Kicks sending (after Conn_enqueue_wait).
 */
__hot void Conn_kick(struct Conn *C)
{
	Log(10, "%llu %s\n", C->id, __func__);
	Conn_default_cbs_send(C);
}

int Conn_set_socket_domain(struct Conn *C, const int domain)
{
	C->sock_domain = domain;
	return 0;
}

int Conn_set_socket_type(struct Conn *C, const int type)
{
	C->sock_type = type;
	return 0;
}

int Conn_set_socket_protocol(struct Conn *C, const int protocol)
{
	C->sock_protocol = protocol;
	return 0;
}

/*
 * Set the address where to bind to
 */
int Conn_set_socket_bind_addr(struct Conn *C, const char *addr)
{
	snprintf(C->bind_addr, sizeof(C->bind_addr), "%s", addr);
	return 0;
}

/*
 * Set address where to connect to
 */
int Conn_set_socket_addr(struct Conn *C, const char *addr)
{
	snprintf(C->addr, sizeof(C->addr), "%s", addr);
	C->type = CONN_TYPE_P2P;
	return 0;
}

int Conn_set_socket_bind_port(struct Conn *C, const int port)
{
	C->bind_port = port;
	return 0;
}

int Conn_set_socket_port(struct Conn *C, const int port)
{
	C->port = port;
	return 0;
}

#if 0
/*
 * Clones a Conn structure to be used for splitting between workers
 */
__cold static struct Conn *Conn_clone(struct Conn *C)
{
	struct Conn *ret;

	ret = malloc(sizeof(struct Conn));
	if (!ret) {
		Log(1, "Cannot alloc memory for a clone!\n");
		return NULL;
	}

	memcpy(ret, C, sizeof(struct Conn));

	return ret;
}
#endif

/*
 * Prepares a socket to be used in Conn_commit
 */
static int Conn_prepare_socket(struct Conn *C, struct sockaddr *bind_psa,
	socklen_t bind_sock_len)
{
	int fd, i;

	fd = socket(C->sock_domain, C->sock_type | SOCK_NONBLOCK | SOCK_CLOEXEC,
		C->sock_protocol);
	if (fd == -1) {
		Log(1, "%s: Could not create socket (%s)!\n",
			__func__, strerror(errno));
		snprintf(Conn_error, sizeof(Conn_error),
			"Cannot create socket (%s, %s, %s) [%s]",
			Conn_domain(C), Conn_type(C),
			Conn_get_socket_protocol(C),
			strerror(errno));
		return -1;
	}

	/* Conn_setnonblock(fd); TODO: if kernel < 2.6.27, we still do not
	 * have nonblock support. Check at runtime and make setnonblock as
	 * a no-op */

	if (C->sock_domain == PF_INET6) {
		#ifndef IPV6_V6ONLY
			#define IPV6_V6ONLY 26
		#endif
		i = 1;
		setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&i, sizeof(i));
	}

	if (C->type == CONN_TYPE_MASTER) {
		int ret;

		i = 1;
		setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));

		if (Conn_reuseport_available) {
			i = 1;
			setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &i, sizeof(i));
		}

		ret = bind(fd, bind_psa, bind_sock_len);
		if (ret < 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot bind on %s/%d [%s]",
				C->bind_addr, C->bind_port, strerror(errno));
			return -1;
		}
	}

	Log(1, "%s: Prepared socket %d!\n", __func__, fd);
	return fd;
}

/*
 * Allocates socket and bind if asked
 * TODO: We should return -1 or free connection and call calbacks?!
 */
int Conn_commit(struct Conn *C)
{
	int i, ret, listen_fd;
	struct sockaddr *bind_psa = NULL;
	struct sockaddr_in bind_sa;
	struct sockaddr_in6 bind_sa6;
	socklen_t bind_sock_len = 0;
	unsigned char first_state = 0xff;
	struct Conn_wpool_worker *w;

	Log(10, "%s\n", __func__);

	/* Be optimistical and increment here, in 'free' we will decrement. */
	/* So, in error case, we will only decrement this thing! */
	Conn_work_to_do++;

	/* Try to figure what kind of socket is: client or master */
	if (C->type == CONN_TYPE_UNK) {
		snprintf(Conn_error, sizeof(Conn_error),
			"cannot detect Conn type");
		return -1;
	}

	if (C->type == CONN_TYPE_P2P) {
		if (Conn_connect_list.head == NULL)
			Conn_connect_list.head = C;
		else
			Conn_connect_list.tail->next = C;
		Conn_connect_list.tail = C;

		/* TODO:replace connect_a with OPEN?! */
		C->state = CONN_STATE_CONNECT_a;
		Conn_pending++;
		return 0;
	}

	/* TODO: we may want to bind even if we connect */
	if (C->bind_addr[0] == '\0') {
		/* Choose defaults because IP was not specified */
		switch (C->sock_domain) {
		case PF_INET:
			snprintf(C->bind_addr, sizeof(C->bind_addr),
				"0.0.0.0"); break;
		case PF_INET6:
			snprintf(C->bind_addr, sizeof(C->bind_addr),
				"::"); break;
		}
	}

	/* TODO: why not move this in prepare_socket?! */
	/* TODO: This way I can create the socket in the child */
	switch (C->sock_domain) {
	case PF_INET:
		/* for binding socket */
		if (C->bind_addr[0] != '\0') {
			memset(&bind_sa, 0, sizeof(bind_sa));
			bind_sa.sin_family = AF_INET;
			ret = inet_pton(AF_INET, C->bind_addr, &bind_sa.sin_addr);
			if (ret < 0) {
				snprintf(Conn_error, sizeof(Conn_error),
					"inet_pton(%s) failed", C->bind_addr);
				return -1;
			}
			bind_sa.sin_port = htons(C->bind_port);
			bind_psa = (struct sockaddr *) &bind_sa;
			bind_sock_len = sizeof(bind_sa);
		}

		if (C->sock_type == SOCK_STREAM) {
			first_state = CONN_STATE_LISTEN;
		} else if (C->sock_type == SOCK_DGRAM) {
			first_state = CONN_STATE_OPEN;
		}
		break;

	case PF_INET6:
		/* for binding socket */
		if (C->bind_addr[0] != '\0') {
			memset(&bind_sa6, 0, sizeof(bind_sa6));
			bind_sa6.sin6_family = AF_INET6;
			ret = inet_pton(AF_INET6, C->bind_addr, &bind_sa6.sin6_addr);
			if (ret < 0) {
				snprintf(Conn_error, sizeof(Conn_error),
					"inet_pton(%s) failed", C->bind_addr);
				return -1;
			}
			bind_sa6.sin6_port = htons(C->bind_port);
			bind_psa = (struct sockaddr *) &bind_sa6;
			bind_sock_len = sizeof(bind_sa6);
		}

		if (C->sock_type == SOCK_STREAM) {
			first_state = CONN_STATE_LISTEN;
		} else if (C->sock_type == SOCK_DGRAM) {
			first_state = CONN_STATE_OPEN;
		}
		break;

	case PF_PACKET:
		/*do_bind = 0; TODO? */
		first_state = CONN_STATE_OPEN;
		break;

	default:
		snprintf(Conn_error, sizeof(Conn_error),
			"Invalid domain [%d]!", C->sock_domain);
		return -1;
	}

	// TODO: If we have workers, why would need to create a socket
	// for master?!
	#if 1
	ret = Conn_prepare_socket(C, bind_psa, bind_sock_len);
	if (ret == -1)
		goto out_free;

	C->fd = ret;
	#else
	C->fd = -1;
	#endif

	if (C->type == CONN_TYPE_MASTER) {
		if (Conn_masters.head == NULL)
			Conn_masters.head = C;
		else
			Conn_masters.tail->next = C;
		Conn_masters.tail = C;
	}

	C->state = first_state;

	if (!C->wp) {
		Log(5, "%s We do not have a worker pool, create one...\n", __func__);
		C->wp = Conn_wpool_create(1);
		if (!C->wp)
			goto out_free; // TODO check if is correct!
		Conn_set_wp(C, C->wp);
	}

	/* Now, we must enqueue listening socket to workers */
	for (i = 0; i < C->wp->workers; i++) {
		w = &C->wp->ww[i];

		/* TODO 'reuse' must be global, right? */
		if (Conn_reuseport_available) {
			Log(1, "%s: reuseport is available, create a new socket.\n",
				__func__);
			listen_fd = Conn_prepare_socket(C, bind_psa, bind_sock_len);
			// TODO: why we do not do listen in prepare?
			// Because we do not know if we should listen.
		} else {
			Log(1, "%s: reuseport is NOT available, use master socket.\n",
				__func__);
			listen_fd = C->fd;
		}

		ret = Conn_wpool_start_worker(C, w, listen_fd);
		if (ret != 0)
			goto out_free;

		if (Conn_reuseport_available)
			close(listen_fd);
	}

	return 0;

	out_free:
	Conn_free_intern(C);

	return -1;
}

/*
 * OBSOLETE
 */
struct Conn *Conn_socket_addr(const int domain, const int type,
	const char *addr, const int port)
{
	struct Conn *C;
	int ret;

	C = Conn_alloc();
	if (C == NULL)
		return NULL;

	Conn_set_socket_domain(C, domain);
	Conn_set_socket_type(C, type);
	Conn_set_socket_bind_addr(C, addr);
	Conn_set_socket_bind_port(C, port);

	ret = Conn_commit(C);
	if (ret != 0)
		return NULL;

	return C;
}

/*
 * OBSOLETE
 */
struct Conn *Conn_socket(const int domain, const int type, const int port)
{
	char *addr;

	switch (domain) {
		case PF_INET: addr = "0.0.0.0"; break;
		case PF_INET6: addr = "::"; break;
		default: addr = NULL;
	}

	return Conn_socket_addr(domain, type, addr, port);
}

/*
 * Connects an allocated socket
 * OBSOLETE!
 */
struct Conn *Conn_connect(const int domain, const int type, const char *addr,
	const int port)
{
	struct Conn *X;
	int ret;

	Log(8, "%s(%s, %d)\n",
		__func__, addr, port);

	X = Conn_alloc();
	if (!X)
		return NULL;

	Conn_set_socket_domain(X, domain);
	Conn_set_socket_type(X, type);
	Conn_set_socket_addr(X, addr);
	Conn_set_socket_port(X, port);
	ret = Conn_commit(X);
	if (ret != 0)
		return NULL;

	return X;
}

/*
 * Add tokens to connection
 */
static void Conn_band_update(struct Conn *C)
{
	long diff;

	/* no need */
	if (C->band_width == 0)
		return;

	diff = (Conn_now.tv_sec - C->band_lasttime.tv_sec) * 1000000;
	diff += Conn_now.tv_usec - C->band_lasttime.tv_usec;
	diff /= 100000;

	/* already added in this hundred of milisecond? */
	if (diff == 0)
		return;

	/* take care of time skew */
	if (diff < 0)
		diff = 1;

	C->band_lasttime = Conn_now;
	C->band_tokens += diff * C->band_width / 10;
	if (C->band_tokens > C->band_factor * C->band_width)
		C->band_tokens = C->band_factor * C->band_width;

	if (C->cbs.send)
		C->cbs.send(C);

	Log(debug_band, "BAND: id %llu added tokens -> %u.\n",
		C->id, C->band_tokens);
}

/*
 * Set the bandwidth for a connection
 * width is in 1b increments
 */
int Conn_band(struct Conn *C, const unsigned int width,
	const unsigned int factor)
{
	Log(11, "%s id %llu width=%u factor=%u.\n",
		__func__, C->id, width, factor);

	C->band_lasttime = Conn_now;
	C->band_width = width;
	C->band_factor = factor;
	C->band_tokens = factor * width;

	Log(debug_band, "\t\tBAND: lasttime=%d.%06d, width=%u, factor=%u, tokens=%u\n",
		C->band_lasttime.tv_sec, C->band_lasttime.tv_usec,
		C->band_width, C->band_factor, C->band_tokens);

	return 0;
}

static void Conn_trytoconnect(void)
{
	struct addrinfo hints;
	struct addrinfo *res;
	int ret;
	char port[8];
	struct Conn *C;

	Log(8, "%s Conn_pending=%d\n",
		__func__, Conn_pending);

	gettimeofday(&Conn_now, NULL);

	C = Conn_connect_list.head;
	while (C) {
		Log(20, "%s	id=%llu state=%s\n",
			__func__, C->id, Conn_state(C));

		if ((C->state == CONN_STATE_CONNECT_0)
			&& (C->tryat <= Conn_now.tv_sec))
			C->state = CONN_STATE_CONNECT_a;

		if (C->state != CONN_STATE_CONNECT_a) {
			C = C->next;
			continue;
		}

		Log(9, "Trying to connect id %llu, to %s/%d...\n",
			C->id, C->addr, C->port);

		memset(&hints, 0, sizeof(hints));
		if (C->sock_domain == 0)
			hints.ai_family = PF_UNSPEC;
		else
			hints.ai_family = C->sock_domain;
		hints.ai_socktype = C->sock_type;
		hints.ai_flags = AI_ADDRCONFIG;
		snprintf(port, sizeof(port), "%d", C->port);
		res = NULL;
		ret = getaddrinfo(C->addr, port, &hints, &res);
		if (ret != 0) {
			C->state = CONN_STATE_ERROR;
			C->error_state = CONN_ERROR_GETADDRINFO;
			if (res)
				freeaddrinfo(res);
			C = C->next;
			continue;
		}

		if (C->fd == -1) {
			C->fd = socket(res->ai_family, res->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
			if (C->fd == -1) {
				C->state = CONN_STATE_ERROR;
				C->error_state = CONN_ERROR_SOCKET;
				C->xerrno = errno;
				freeaddrinfo(res);
				C = C->next;
				continue;
			}
			Log(10, "\tAllocated socket on fd %d\n", C->fd);

			/* Need POLLOUT to signal when the connection was done. */
			/*TODO ret = Conn_add_obj(Conn_epoll_fd, C, EPOLLIN | EPOLLRDHUP);*/
			ret = Conn_add_obj(Conn_epoll_fd, C, EPOLLIN | EPOLLOUT);
			if (ret != 0) {
				C->state = CONN_STATE_ERROR;
				C->error_state = CONN_ERROR_SOCKET;
				freeaddrinfo(res);
				C = C->next;
				continue;
			}
		}

		/* Set syn time */
		C->conn_syn = Conn_now;

		Log(11, "\tConnecting...\n");
		ret = connect(C->fd, res->ai_addr, res->ai_addrlen);
		if ((ret != 0) && (errno != EINPROGRESS)) {
			C->state = CONN_STATE_ERROR;
			C->error_state = CONN_ERROR_CONNECT;
			C->xerrno = errno;
			freeaddrinfo(res);
			C = C->next;
			continue;
		}

		C->state = CONN_STATE_CONNECT_b;

		Conn_pending--;
		Conn_total++;

		freeaddrinfo(res);

		C = C->next;
	}

	Log(8, "%s After, Conn_pending=%d\n", __func__, Conn_pending);
}

#if 0
/*
 * Moving an in-use slot over a free one for compacting reason.
 */
static void Conn_move_slot(const unsigned int dst, const unsigned int src)
{
	struct Conn tmp;

	if (dst == src)
		return;

	Log(10, "%s Moving id %llu from slot=%u to slot=%d...\n",
		__func__, Conns[src].id, src, dst);

	/* We need to save old location because of usefull pointers. */
	tmp = Conns[dst];
	Conns[dst] = Conns[src];
	Conns[src] = tmp;

	Conns[dst].slot = dst;
	Conns[src].slot = src;

	Conn_change_obj(&Conns[dst]);
}
#endif

/*
 * Does the polling of file descriptors.
 * Returns: -1 on error, 0 nothing to do, 1 if some work was done
 * timeout is in 1/1000 seconds increments.
 */
int Conn_poll(const int timeout)
{
	int ret;
	int timeout2;
	unsigned int slot, last;
	struct Conn *C;

	Log(9, "%s timeout=%d Conn_no=%d Conn_work_to_do=%u\n",
		__func__, timeout, Conn_no, Conn_work_to_do);

	if (timeout > 5000)
		timeout2 = 5000;
	else if (timeout == -1)
		timeout2 = 5000;
	else
		timeout2 = timeout;

	loop:
	if (unlikely(Conn_must_stop == 1))
		return 0;

	if (unlikely(Conn_work_to_do == 0)) {
		Log(9, "%s work_to_do is 0, so return 0!\n",
			__func__);
		return 0;
	}

	if (Conn_pending > 0)
		Conn_trytoconnect();

	ret = Conn_dispatch_events(NULL, Conn_epoll_fd, Conn_epoll_events,
		CONN_EVENTS_SLOTS, timeout2);
	if (unlikely(ret < 0))
		return -1;

	#if 1
	Log(30, "Do compacting, expiration and band stuff...\n");
	slot = 0;
	while (slot < Conn_no) {
		/*
		 * Save last position because Conn_free_intern
		 * decrements Conn_no.
		 */
		last = Conn_no - 1;

		C = &Conns[slot];
		if (!C) {
			slot++;
			continue;
		}

		#if 0
		/* Closing connection if it is in error state. */
		if (C->error_state > 0) {
			Log(11, "\tSlot=%u in error [%s], move and free it.\n",
				slot, Conn_errno(C));
			Conn_move_slot(slot, last);
			Conn_free_intern(last);
			continue;
		}
		#endif

		/* No commit done yet */
		if (C->state == CONN_STATE_EMPTY) {
			slot++;
			continue;
		}

		if (C->state == CONN_STATE_FREE) {
			/* Must not happen! Probably stale events. */
			Log(9, "\tBUG! Slot is in FREE state and has events!\n");
			slot++;
			continue;
		}

		/* test if it expired/timeout */
		Conn_expire(C);

		/* add tokens */
		Conn_band_update(C);

		slot++;
	}
	#endif

	/* Any work left to do? */
	if (Conn_no == 0) {
		Log(10, "Nothing remained to poll for!\n");
		return 0;
	}

	if (timeout == -1)
		goto loop;

	return ret;
}


Mode Type Size Ref File
100644 blob 129 38f2534580e0aace0e6a5b49d79ada2c2ca162be .exclude
100644 blob 162 30a78e3a392ae33217d139ce27c4e1ebd04aa6e0 .gitignore
100644 blob 169 c003c095218f64ad33aeb89987f61eb575557d96 .mailmap
100644 blob 1945 fecf0e7a7e8580485101a179685aedc7e00affbb Changelog.pre109
100644 blob 81042 d6754c4bc9f1550b546b24e613edeeaa0f76b172 Conn.c
100644 blob 5994 f3e90d8ed62f256e05fb74235620ba9cce31a823 Conn.h
100644 blob 905 884fa11125512f99984f40735af6c380330f871c Conn.spec.in
100644 blob 747 662c3f3fe8d0a3d23770631d7a0a260719d81e62 Conn_config.h.in
100644 blob 5546 7bc5f77db036d7714b28600ffd90ab4c5ee080e2 Conn_intern.h
100644 blob 12705 e9e694f33101b90d22c688ca44cb47c33c011032 Conn_web.c
100644 blob 93 4754320eef2b558b97b9c75bd01e545f102670b7 Conn_web.h
100644 blob 30 d987fa5df957830331139935d517009e2911b0cf INSTALL
100644 blob 25275 92b8903ff3fea7f49ef5c041b67a087bca21c5ec LICENSE
100644 blob 1261 aa9a4eb4cd37aedafb0d69ec4f9bf348841e5af5 Makefile.in
100644 blob 29 e214257f87a28e8fb0413b627cf7ee76ade2e94c Makefile.include.in
100644 blob 200 02d1f3eebc03e84013c23a35db5fe3b77f12f0cc README
100644 blob 19668 bf0f19113acc021dcdc92af4be32b2c737b362a7 TODO
100755 blob 30 92c4bc48245c00408cd7e1fd89bc1a03058f4ce4 configure
040000 tree - d4c9c4a69c5cfa2a84316967185f1661b6817779 docs
100755 blob 18252 e2438615edba7066a730ed6a796a5302263f1f37 duilder
100644 blob 1414 e672a355b5777d14d4f58e36075ed1967aa15ba3 duilder.conf
040000 tree - b255bf8fe16832bbbb513181b1dc6ae9420deed1 examples
040000 tree - 5643f06c34660e576e6c5d0dee5ac74a2bf34f51 tests
Hints:
Before first commit, do not forget to setup your git environment:
git config --global user.name "your_name_here"
git config --global user.email "your@email_here"

Clone this repository using HTTP(S):
git clone https://rocketgit.com/user/catalinux/Conn

Clone this repository using ssh (do not forget to upload a key first):
git clone ssh://rocketgit@ssh.rocketgit.com/user/catalinux/Conn

Clone this repository using git:
git clone git://git.rocketgit.com/user/catalinux/Conn

You are allowed to anonymously push to this repository.
This means that your pushed commits will automatically be transformed into a merge request:
... clone the repository ...
... make some changes and some commits ...
git push origin main