catalinux / Conn (public) (License: LGPLv2) (since 2016-03-01) (hash sha1)
Net library for easy building ipv4/ipv6 network daemons/clients

/Conn.c (3101607ebe0288de591acda2d558579b4c54ccac) (39966 bytes) (mode 100644) (type blob)

/*
 * Author: Catalin(ux aka Dino) BOIE <catab at umbrella.ro>
 * Date: 2004
 * Description:	Some functions to help writing network servers and clients,
 *		both ipv4 and ipv6.
 * Licence: LGPL
 */

#include "Conn.h"


/* Visible variables */
void		(*Conn_accept_cb)(struct Conn *C) = NULL;
void		(*Conn_recv_cb)(struct Conn *C) = NULL;
void		(*Conn_send_cb)(struct Conn *C) = NULL;
void		(*Conn_data_cb)(struct Conn *C) = NULL;
void		(*Conn_close_cb)(struct Conn *C) = NULL;
void		(*Conn_trigger_cb)(struct Conn *C) = NULL;
void		(*Conn_error_cb)(struct Conn *C) = NULL;
void		(*Conn_connected_cb)(struct Conn *C) = NULL;

char		*(*Conn_status_slot_html_cb)(struct Conn *C);
char		*(*Conn_status_cb)(void);


unsigned int		Conn_max_reached = 0;
unsigned int		Conn_default_ibuf = 128;
unsigned int		Conn_default_obuf = 128;
unsigned int		Conn_max_ibuf = 4096000;
unsigned int		Conn_max_obuf = 4096000;

/* Max bytes enqueued on one send/recv call */
unsigned int		Conn_max_send = 32 * 1024;
unsigned int		Conn_max_recv = 32 * 1024;

unsigned int		Conn_no = 0;
unsigned int		Conn_max = 0;
unsigned long		Conn_total = 0;
unsigned int		Conn_start = 0;
unsigned int		Conn_pending = 0;
struct timeval		Conn_now;
unsigned short		Conn_level = 0;			/* debug level */

/* Internal variables */
static char			Conn_error[512];
static struct pollfd		*Conn_pfds = NULL;
static struct Conn		*Conns = NULL;
static unsigned int		Conn_inited = 0;
static unsigned int		Conn_accept_is_allowed;
static unsigned int		Conn_allocated = 0;
static unsigned long long	Conn_id = 1;

static FILE		*Conn_Log = NULL;
static int		debug_band = 11;

/* Functions */

/*
 * Difference between two timeval strutures, in milliseconds
 */
long long Conn_time_diff(const struct timeval *t1, const struct timeval *t2)
{
	return (t1->tv_sec - t2->tv_sec) * 1000
		+ (t1->tv_usec - t2->tv_usec) / 1000;
}

char *Conn_strerror(void)
{
	return Conn_error;
}

/* set noblocking */
static int Conn_setnonblock(int fd)
{
	int	ret;
	long	flags;

	flags = fcntl(fd, F_GETFL, 0);
	if (flags == -1)
		return -1;

	flags |= O_NONBLOCK;

	ret = fcntl(fd, F_SETFL, flags);

	return ret;
}

static void Log(unsigned short level, char *format, ...)
{
	va_list	ap;
	FILE *out;

	if (level > Conn_level)
		return;

	if (Conn_Log == NULL)
		out = stderr;
	else
		out = Conn_Log;
	fprintf(out, "%ld.%06ld ",
		Conn_now.tv_sec, Conn_now.tv_usec);

	va_start(ap, format);
	vfprintf(out, format, ap);
	va_end(ap);
}

char *Conn_dump(char *buf_src, int len_src)
{
	int i, j;
	char tmp[3];
	char *buf_dst;
	unsigned char c;

	if (len_src < 0)
		return strdup("[Error: len < 0]");

	Log(30, "\tConn_dump(%p, len=%d)\n",
		buf_src, len_src);

	buf_dst = malloc(len_src * 4 + 1);
	if (buf_dst == NULL)
		return strdup("Memory allocation error1!");

	j = 0;
	for (i = 0; i < len_src; i++) {
		c = buf_src[i];
		if ((c < 32) || (c > 127)) {
			buf_dst[j++] = '[';
			snprintf(tmp, sizeof(tmp), "%02x", c);
			buf_dst[j++] = tmp[0];
			buf_dst[j++] = tmp[1];
			buf_dst[j++] = ']';
		} else {
			buf_dst[j++] = c;
		}
	}

	buf_dst[j] = '\0';

	/*
	Log(0, "%s ([%s], %d, [%s], %d\n",
		__FUNCTION__, buf_src, len_src, buf_dst, len_dst);
	*/

	return buf_dst;
}

char *Conn_dumphex(char *buf_src, int len_src)
{
	int i, j;
	char tmp[3];
	char *buf_dst;
	unsigned char c;

	if (len_src < 0)
		return strdup("[Error: len < 0]");

	Log(30, "\tConn_dumphex(%p, len=%d)\n",
		buf_src, len_src);

	buf_dst = malloc(len_src * 2 + 1);
	if (buf_dst == NULL)
		return strdup("Memory allocation error1!");

	j = 0;
	for (i = 0; i < len_src; i++) {
		c = buf_src[i];
		snprintf(tmp, sizeof(tmp), "%02x", c);
		buf_dst[j++] = tmp[0];
		buf_dst[j++] = tmp[1];
	}

	buf_dst[j] = '\0';

	return buf_dst;
}

void Conn_debug(FILE *f, unsigned short debug)
{
	Conn_Log = f;
	Conn_level = debug;
}

/* Grow Conn structures */
static unsigned int Conn_grow(void)
{
	unsigned int alloc;
	struct pollfd *p, *set;
	struct Conn *q, *set2;

	Log(10, "%s() Try to grow from %d to %d.\n",
		__FUNCTION__,
		Conn_allocated, Conn_allocated + 128);

	alloc = Conn_allocated + 128;

	p = (struct pollfd *) realloc(Conn_pfds, alloc * sizeof(struct pollfd));
	if (p == NULL)
		return 0;

	q = (struct Conn *) realloc(Conns, alloc * sizeof(struct Conn));
	if (q == NULL) {
		free(p);
		return 0;
	}

	set = p + Conn_allocated;
	memset(set, 0, 128 * sizeof(struct pollfd));
	Conn_pfds = p;

	set2 = q + Conn_allocated;
	memset(set2, 0, 128 * sizeof(struct Conn));
	Conns = q;

	Conn_allocated = alloc;

	return 1;
}

int Conn_init(unsigned int max)
{
	unsigned int ret;

	if (Conn_inited == 1)
		return 0;

	Conn_max = max;

	Conn_no = 0;
	Conn_total = 0;
	Conn_max_reached = 0;
	gettimeofday(&Conn_now, NULL);
	Conn_start = Conn_now.tv_sec;
	Conn_accept_is_allowed = 1;
	Conn_allocated = 0;

	snprintf(Conn_error, sizeof(Conn_error), "%s", "");

	ret = Conn_grow();
	if (ret == 0) {
		snprintf(Conn_error, sizeof(Conn_error),
			"Cannot grow anymore. Probably memory shortage.");
		return -1;
	}

	Conn_inited = 1;

	return 0;
}

static void Conn_accept_allow(unsigned short val)
{
	unsigned int i;

	if (Conn_accept_is_allowed == val)
		return;

	Log(10, "%s: Turn accept allow to %d (%s)\n",
		__FUNCTION__, val, val == 0 ? "off" : "on");

	for (i = 0; i < Conn_no; i++) {
		if (Conns[i].type != Conn_type_MASTER)
			continue;

		if (val == 0)
			Conn_pfds[i].events &= ~POLLIN;
		else	
			Conn_pfds[i].events |= POLLIN;
	}

	Conn_accept_is_allowed = val;
}

/* Alloc a Conn structure */
static struct Conn *Conn_alloc(void)
{
	unsigned int slot, growok;
	void *p;

	Log(10, "%s() Conn_no=%d Conn_max=%d\n",
		__FUNCTION__,
		Conn_no, Conn_max);

	if ((Conn_max > 0) && (Conn_no >= Conn_max)) {
		snprintf(Conn_error, sizeof(Conn_error),
			"Limit reached! Consider a raise of max connection number or put 0 for no limit.");
		return NULL;
	}

	if (Conn_allocated == Conn_no) {
		growok = Conn_grow();
		if (growok == 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot grow anymore. Probably memory shortage.");
			return NULL;
		}
	}

	if (Conn_no > Conn_max_reached)
		Conn_max_reached = Conn_no;

	slot = Conn_no;

	Conns[slot].type = Conn_type_UNK;
	Conns[slot].state = CONN_STATE_FREE;
	Conns[slot].slot = slot;

	if (Conns[slot].ibuf_size < Conn_default_ibuf) {
		p = realloc(Conns[slot].ibuf, Conn_default_ibuf);
		if (p == NULL) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Memory allocation error2!");
			return NULL;
		}
		Conns[slot].ibuf = p;
		Conns[slot].ibuf_size = Conn_default_ibuf;
	}
	Conns[slot].ibuf_head = 0;
	Conns[slot].ibuf_tail = 0;

	if (Conns[slot].obuf_size < Conn_default_obuf) {
		p = realloc(Conns[slot].obuf, Conn_default_obuf);
		if (p == NULL) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Memory allocation error3!");
			return NULL;
		}
		Conns[slot].obuf = p;
		Conns[slot].obuf_size = Conn_default_obuf;
	}
	Conns[slot].obuf_head = 0;
	Conns[slot].obuf_tail = 0;

	Conns[slot].trecv = Conn_now;

	Conns[slot].bi = 0;
	Conns[slot].bo = 0;
	Conns[slot].private = NULL;

	/* bandwidth */
	Conns[slot].band_width = 0;
	Conns[slot].band_factor = 0;
	Conns[slot].band_tokens = 0;
	Conns[slot].band_lasttime = Conn_now;

	Conn_pfds[slot].fd = -1;
	Conn_pfds[slot].events = 0;
	Conn_pfds[slot].revents = 0;
	Conns[slot].state = CONN_STATE_EMPTY;

	Conns[slot].flags = 0;

	Conn_no++;

	Log(10, "\tFound free slot %u. Now Conn_no=%d\n",
		slot, Conn_no);

	if (Conn_no == Conn_max)
		Conn_accept_allow(0);

	return &Conns[slot];
}

/*
 * If put buffer is empty, just mark for closing.
 * If we have data, set the flag to do the closing after send.
 */
void Conn_close(struct Conn *C)
{
	if (C->obuf_head == C->obuf_tail)
		C->error_state = CONN_ERROR_USERREQ;
	else
		C->flags |= CONN_FLAGS_CLOSE_AFTER_SEND;
}

/*
 * Returns string representation of errno code
 */
static char *Conn_errno(struct Conn *C)
{
	static char buf[256];
	char *is;

	switch (C->error_state) {
		case CONN_ERROR_USERREQ:	is = "user"; break;
		case CONN_ERROR_POLL:		is = "poll"; break;
		case CONN_ERROR_RECV:		is = "recv"; break;
		case CONN_ERROR_SEND:		is = "send"; break;
		case CONN_ERROR_SOCKET:		is = "socket"; break;
		case CONN_ERROR_HANGUP:		is = "hangup"; break;
		case CONN_ERROR_GETADDRINFO:	is = "lookup error"; break;
		case CONN_ERROR_EXPIRED:	is = "expired"; break;
		case CONN_ERROR_ACCEPT:		is = "accept"; break;
		case CONN_ERROR_MEM:		is = "allocation failed"; break;
		case CONN_ERROR_CONNECT:	is = "connect"; break;
		case CONN_ERROR_READ_TIMEOUT:	is = "read timeout"; break;
		case CONN_ERROR_CONN_TIMEOUT:	is = "conn timeout"; break;

		default: is = "?";
	}

	snprintf(buf, sizeof(buf), "%s (%s)",
		is, strerror(C->xerrno));

	return buf;
}

static char *Conn_state(struct Conn *C)
{
	switch (C->state) {
		case CONN_STATE_FREE:		return "FREE";
		case CONN_STATE_EMPTY:		return "EMPTY";
		case CONN_STATE_OPEN:		return "OPEN";
		case CONN_STATE_LISTEN:		return "LISTEN";
		case CONN_STATE_CONNECT_0:	return "CONN0";
		case CONN_STATE_CONNECT_a:	return "CONNa";
		case CONN_STATE_CONNECT_b:	return "CONNb";

		default:			return "BUG?";
	}
}

static void Conn_free_intern(struct Conn *C)
{
	unsigned int slot;

	slot = C->slot;

	Log(7, "Cleaning-up slot in %s state id %llu [%s]...\n",
		Conn_state(C), C->id, Conn_errno(C));

	snprintf(Conn_error, sizeof(Conn_error),
		"%s", Conn_errno(C));

	if (C->error_state != CONN_ERROR_USERREQ) {
		if (Conns[slot].cb_error)
			Conns[slot].cb_error(C);
		else if (Conn_error_cb)
			Conn_error_cb(C);
	}

	if (Conns[slot].state == CONN_STATE_OPEN) {
		if (Conns[slot].cb_close)
			Conns[slot].cb_close(C);
		else if (Conn_close_cb)
			Conn_close_cb(&Conns[slot]);
	}

	if (Conn_pfds[slot].fd > -1) {
		close(Conn_pfds[slot].fd);
		Conn_pfds[slot].fd = -1;
	}

	/* Reset tsend, else we enter in a timeout error loop */
	Conns[slot].tsend.tv_sec = 0;
	Conns[slot].tsend.tv_usec = 0;

	Conn_pfds[slot].events = 0;
	Conn_pfds[slot].revents = 0;

	/* Reset the connection attempt time */
	Conns[slot].conn_syn.tv_sec = 0;
	Conns[slot].conn_syn.tv_usec = 0;

	if (Conns[slot].flags & CONN_FLAGS_AUTO_RECONNECT) {
		Conns[slot].tryat = Conn_now.tv_sec + Conns[slot].delay;
		Conns[slot].error_state = 0;
		Conns[slot].state = CONN_STATE_CONNECT_0;

		Conns[slot].ibuf_head = 0;
		Conns[slot].ibuf_tail = 0;

		Conns[slot].obuf_head = 0;
		Conns[slot].obuf_tail = 0;

		Conn_pending++;
	} else {
		Conns[slot].type = Conn_type_UNK;
		Conns[slot].state = CONN_STATE_FREE;

		if (slot < Conn_no - 1) {
			Log(10, "Move last pfd/Conn entry %d -> %d...\n",
				Conn_no - 1, slot);

			/* free old mem */
			if (Conns[slot].ibuf)
				free(Conns[slot].ibuf);
			if (Conns[slot].obuf)
				free(Conns[slot].obuf);

			Conn_pfds[slot] = Conn_pfds[Conn_no - 1];
			Conns[slot] = Conns[Conn_no - 1];
			Conns[slot].slot = slot;

			/* fix */
			Conns[Conn_no - 1].ibuf = NULL;
			Conns[Conn_no - 1].ibuf_size = 0;
			Conns[Conn_no - 1].obuf = NULL;
			Conns[Conn_no - 1].obuf_size = 0;
		}

		Conn_no--;

		Conn_accept_allow(1);
	}
}

/*
 * Expand the requested buffer
 * what = 0 for out buffer, what = 1 for input buffer
 * returns 0 if OK, -1 on error
 */
int Conn_try_expand_buf(struct Conn *C, int what, int needed)
{
	char *p;
	unsigned int hm;
	unsigned int slot, old_size, amount, head, tail;

	slot = C->slot;

	if (what == 0) {
		head = C->obuf_head;
		tail = C->obuf_tail;
	} else {
		head = C->ibuf_head;
		tail = C->ibuf_tail;
	}

	Log(10, "\tTry to expand buffer on slot %u for [%s] needed=%d head=%u tail=%u.\n",
		slot, what == 0 ? "o" : "i", needed,
		head, tail);

	amount = needed;

	if (what == 0) {
		if (amount < Conn_default_obuf)
			amount = Conn_default_obuf;
		old_size = Conns[slot].obuf_size;
		hm = Conns[slot].obuf_size + amount;
		if (hm > Conn_max_obuf)
			hm = Conn_max_obuf;
		p = realloc(Conns[slot].obuf, hm);
		if (p == NULL) {
			Log(3, "Cannot realloc obuf!\n");
			return -1;
		}
		Conns[slot].obuf = p;
		Conns[slot].obuf_size = hm;
		Log(10, "\tSucces. Old/new size = %u/%u.\n",
			old_size, Conns[slot].obuf_size);
	} else {
		if (amount < Conn_default_ibuf)
			amount = Conn_default_ibuf;
		old_size = Conns[slot].ibuf_size;
		hm = Conns[slot].ibuf_size + amount;
		if (hm > Conn_max_ibuf)
			hm = Conn_max_ibuf;
		p = realloc(Conns[slot].ibuf, hm);
		if (p == NULL) {
			Log(3, "Cannot realloc ibuf!\n");
			return -1;
		}
		Conns[slot].ibuf = p;
		Conns[slot].ibuf_size = hm;
		Log(10, "\tSucces. Old/new size = %u/%u.\n",
			old_size, Conns[slot].ibuf_size);
	}

	return 0;
}

int Conn_enqueue(struct Conn *C, void *buf, size_t count)
{
	unsigned int slot, r;
	char *dump;

	if (C == NULL) {
		Log(0, "ERROR: Somebody try to enqueue something to a NULL conn.\n");
		return -1;
	}

	if (Conn_level >= 10) {
		dump = Conn_dump(buf, count);
		Log(0, "\tTry to enqueue %d bytes to id %llu [%s]...\n",
			count, C->id, dump);
		free(dump);
	}

	/* we cannot use pointers directly because they can change under us */
	slot = C->slot;

	if (Conns[slot].obuf_size - Conns[slot].obuf_tail < count) {
		r = Conn_try_expand_buf(&Conns[slot], 0, count);
		if (r != 0)
			return -1;
	}

	memcpy(Conns[slot].obuf + Conns[slot].obuf_tail, buf, count);
	Conns[slot].obuf_tail += count;

	Conn_pfds[slot].events |= POLLOUT;

	return 0;
}

struct Conn *Conn_socket(int domain, int type, int port)
{
	struct Conn *C;
	int i, ret;
	struct sockaddr *psa = NULL;
	struct sockaddr_in sa;
	struct sockaddr_in6 sa6;
	int sock_len = 0;
	char *addr = "?";
	unsigned int slot;
	int do_bind = 1, do_listen = 1;
	int protocol = 0;
	int first_state;

	switch (domain) {
		case PF_INET:
			memset(&sa, 0, sizeof(sa));
			sa.sin_family = AF_INET;
			sa.sin_addr.s_addr = htonl(INADDR_ANY);
			sa.sin_port = htons(port);
			psa = (struct sockaddr *) &sa;
			sock_len = sizeof(sa);
			addr = "0.0.0.0";
			if (type == SOCK_STREAM) {
				first_state = CONN_STATE_LISTEN;
			} else if (type == SOCK_DGRAM) {
				do_listen = 0;
				first_state = CONN_STATE_OPEN;
			}
			break;

		case PF_INET6:
			memset(&sa6, 0, sizeof(sa6));
			sa6.sin6_family = AF_INET6;
			ret = inet_pton(AF_INET6, "::", &sa6.sin6_addr);
			if (ret < 0) {
				snprintf(Conn_error, sizeof(Conn_error),
					"inet_pton(::) failed");
				return NULL;
			}
			sa6.sin6_port = htons(port);
			psa = (struct sockaddr *) &sa6;
			sock_len = sizeof(sa6);
			addr = "::";
			if (type == SOCK_STREAM) {
				first_state = CONN_STATE_LISTEN;
			} else if (type == SOCK_DGRAM) {
				do_listen = 0;
				first_state = CONN_STATE_OPEN;
			}
			break;

		case PF_PACKET:
			do_bind = 0;
			protocol = htons(port);
			first_state = CONN_STATE_OPEN;
			break;

		default:
			snprintf(Conn_error, sizeof(Conn_error),
				"Invalid domain [%d]!", domain);
			return NULL;
	}

	C = Conn_alloc();
	if (!C)
		return NULL;

	slot = C->slot;

	Conn_pfds[slot].fd = socket(domain, type, protocol);
	if (Conn_pfds[slot].fd == -1) {
		snprintf(Conn_error, sizeof(Conn_error),
			"Cannot create socket (%d, %d, %d) [%s]",
			domain, type, protocol, strerror(errno));
		Conns[slot].xerrno = errno;
		goto out;
	}
	Conn_pfds[slot].events = POLLIN;

	Conn_setnonblock(Conn_pfds[slot].fd);

	if (domain == PF_INET6) {
		#ifndef IPV6_V6ONLY
			#define IPV6_V6ONLY 26
		#endif
		i = 1;
		setsockopt(Conn_pfds[slot].fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&i, sizeof(i));
	}

	/* Default, client */
	C->type = Conn_type_CLIENT;

	if (do_bind == 1) {
		i = 1;
		setsockopt(Conn_pfds[slot].fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));

		ret = bind(Conn_pfds[slot].fd, psa, sock_len);
		if (ret < 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot bind [%s]", strerror(errno));
			Conns[slot].xerrno = errno;
			goto out;
		}

		if (do_listen == 1) {
			listen(Conn_pfds[slot].fd, 128);
			C->type = Conn_type_MASTER;
		}
	}

	C->state = first_state;

	C->start = Conn_now.tv_sec;

	/* Reset syn time */
	C->conn_syn.tv_sec = 0;
	C->conn_syn.tv_usec = 0;

	C->id = Conn_id++;

	snprintf(Conns[slot].addr, sizeof(Conns[slot].addr), "%s", addr);
	Conns[slot].port = port;

	Conns[slot].sock_domain = domain;
	Conns[slot].sock_type = type;

	return &Conns[slot];

	out:
	Conns[slot].error_state = CONN_ERROR_SOCKET;
	Conn_free_intern(&Conns[slot]);

	return NULL;
}

void Conn_new(struct Conn *C)
{
}

void Conn_accept(struct Conn *C)
{
	int fd;
	struct sockaddr *pca;
	struct sockaddr_in ca4;
	struct sockaddr_in6 ca6;
	socklen_t cax_len;
	struct Conn *X;
	unsigned int slot, Xslot;

	slot = C->slot;
	switch(Conns[slot].sock_domain) {
		case PF_INET:
			pca = (struct sockaddr *) &ca4;
			cax_len = sizeof(ca4);
		break;

		case PF_INET6:
			pca = (struct sockaddr *) &ca6;
			cax_len = sizeof(ca6);
		break;

		default:
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot deal with domain %d.",
				Conns[slot].sock_domain);
			Conns[slot].error_state = CONN_ERROR_SOCKET;
			if (Conns[slot].cb_error)
				Conns[slot].cb_error(&Conns[slot]);
			else if (Conn_error_cb)
				Conn_error_cb(&Conns[slot]);
			return;
	}

	fd = accept(Conn_pfds[slot].fd, pca, &cax_len);
	if (fd == -1) {
		if (errno == EAGAIN)
			return;

		snprintf(Conn_error, sizeof(Conn_error),
			"Cannot accept: %s",
			strerror(errno));
		Conns[slot].error_state = CONN_ERROR_ACCEPT;
		if (Conns[slot].cb_error)
			Conns[slot].cb_error(&Conns[slot]);
		else if (Conn_error_cb)
			Conn_error_cb(&Conns[slot]);
		return;
	}

	X = Conn_alloc();
	if (!X) {
		Log(0, "ERROR: Cannot alloc a slot!\n");
		Conns[slot].error_state = CONN_ERROR_MEM;
		if (Conns[slot].cb_error)
			Conns[slot].cb_error(&Conns[slot]);
		else if (Conn_error_cb)
			Conn_error_cb(&Conns[slot]);
		close(fd);
		return;
	}

	Xslot = X->slot;

	switch (Conns[slot].sock_domain) {
		case PF_INET:
			inet_ntop(Conns[slot].sock_domain, &ca4.sin_addr, Conns[Xslot].addr, sizeof(Conns[Xslot].addr));
			Conns[Xslot].port = ntohs(ca4.sin_port);
			break;

		case PF_INET6:
			inet_ntop(Conns[slot].sock_domain, &ca6.sin6_addr, Conns[Xslot].addr, sizeof(Conns[Xslot].addr));
			Conns[Xslot].port = ntohs(ca6.sin6_port);
			break;
	}

	Conns[Xslot].type = Conn_type_CLIENT;
	Conns[Xslot].error_state = 0;
	Conns[Xslot].state = CONN_STATE_OPEN;
	Conns[Xslot].via = Conns[slot].port;
	Conn_pfds[Xslot].fd = fd;
	Conn_pfds[Xslot].events = POLLIN;
	Conns[Xslot].sock_domain = Conns[slot].sock_domain;
	Conns[Xslot].sock_type = Conns[slot].sock_type;
	Conns[Xslot].start = Conn_now.tv_sec;
	Conns[Xslot].id = Conn_id++;

	Conn_setnonblock(Conn_pfds[Xslot].fd);

	if (Conns[slot].cb_accept)
		Conns[slot].cb_accept(&Conns[slot]);
	else if (Conn_accept_cb != NULL)
		Conn_accept_cb(&Conns[Xslot]);

	Conn_total++;
}

static void Conn_poll_status(short ev, char *ret)
{
	strcpy(ret, "________");

	if (ev & POLLIN)	ret[0] = 'I';
	if (ev & POLLPRI)	ret[1] = 'P';
	if (ev & POLLOUT)	ret[2] = 'O';
	if (ev & POLLERR)	ret[3] = 'E';
	if (ev & POLLHUP)	ret[4] = 'H';
	if (ev & POLLNVAL)	ret[5] = 'V';
	if (ev & POLLRDNORM)	ret[6] = 'r';
	if (ev & POLLRDBAND)	ret[7] = 'R';
}

static char *Conn_domain(struct Conn *C)
{
	switch (C->sock_domain) {
		case PF_INET:		return "IPv4";
		case PF_INET6:		return "IPv6";
		case PF_PACKET:		return "PACKET";

		default:		return "?";
	}
}

static char *Conn_type(struct Conn *C)
{
	switch (C->sock_type) {
		case SOCK_STREAM:	return "stream";
		case SOCK_DGRAM:	return "dgram";
		case SOCK_RAW:		return "raw";

		default:		return "?";
	}
}

static char *Conn_socktype(struct Conn *C)
{
	switch (C->type) {
		case Conn_type_MASTER:		return "master";
		case Conn_type_CLIENT:		return "client";
		case Conn_type_UNK:		return "unk";

		default:
			return "?";
	}
}

/*
 * Returns a nice speed
 */
void Conn_speed(char *dst, unsigned int dst_len, unsigned int speed)
{
	float sp;

	sp = speed;

	if (speed < 1000)
		snprintf(dst, dst_len, "%.2fBps", sp);
	else if (speed < 1000 * 1000)
		snprintf(dst, dst_len, "%.2fKBps", sp / 1000);
	else
		snprintf(dst, dst_len, "%.2fMBps", sp / 1000 / 1000);
}

char *Conn_status_slot(struct Conn *C)
{
	static char tmp[1024];
	char polle[16], pollr[16];
	char speedi[32], speedo[32];
	unsigned int dT, si, so;

	Conn_poll_status(Conn_pfds[C->slot].events, polle);
	Conn_poll_status(Conn_pfds[C->slot].revents, pollr);

	dT = Conn_now.tv_sec - C->start;
	if (dT == 0)
		dT = 1;
	si = C->bi / dT;
	so = C->bo / dT;

	Conn_speed(speedi, sizeof(speedi), si);
	Conn_speed(speedo, sizeof(speedo), so);

	snprintf(tmp, sizeof(tmp), "%4d fd%4d"
		" %4s %6s %5s %6s"
		" %39s/%-5d\n"
		"            Via%-5d [%s][%s] IO=%llu/%llu"
		" BS=%u/%u S=%s/%s"
		" T=%ld bw=%u f=%u tk=%u id=%llu\n",
		C->slot, Conn_pfds[C->slot].fd,
		Conn_domain(C), Conn_type(C), Conn_socktype(C), Conn_state(C),
		C->addr, C->port, C->via, polle, pollr, C->bi, C->bo,
		C->ibuf_size, C->obuf_size, speedi, speedo,
		Conn_now.tv_sec - C->start,
		C->band_width, C->band_factor, C->band_tokens,
		C->id);

	return tmp;
}

char *Conn_status_slot_html(struct Conn *C)
{
	static char tmp[1024];
	char polle[16], pollr[16], *ext = "";
	char speedi[32], speedo[32];
	unsigned int dT, si, so;

	Conn_poll_status(Conn_pfds[C->slot].events, polle);
	Conn_poll_status(Conn_pfds[C->slot].revents, pollr);

	dT = Conn_now.tv_sec - C->start;
	if (dT == 0)
		dT = 1;
	si = C->bi / dT;
	so = C->bo / dT;

	Conn_speed(speedi, sizeof(speedi), si);
	Conn_speed(speedo, sizeof(speedo), so);

	if (Conn_status_slot_html_cb)
		ext = Conn_status_slot_html_cb(C);

	snprintf(tmp, sizeof(tmp), "<td>%llu</td><td>%d</td><td>%d</td>"
		"<td>%s</td><td>%s</td><td>%s</td><td>%s</td>"
		"<td>%s/%d</td>"
		"<td>%d</td><td>%s</td><td>%s</td><td>%llu / %llu</td>"
		"<td>%u / %u</td><td>%s / %s</td><td>%ld</td>"
		"<td>%u</td><td>%u</td><td>%u</td>"
		"%s\n",
		C->id, C->slot, Conn_pfds[C->slot].fd,
		Conn_domain(C), Conn_type(C), Conn_socktype(C), Conn_state(C),
		C->addr, C->port, C->via, polle, pollr, C->bi, C->bo,
		C->ibuf_size, C->obuf_size,
		speedi, speedo, Conn_now.tv_sec - C->start,
		C->band_width, C->band_factor, C->band_tokens,
		ext);

	return tmp;
}

/* flags: bit 1 = 1 - html */
char *Conn_status(unsigned int flags)
{
	unsigned int len = 0, i, max;
	struct Conn *C;
	char tmp[512], tmp_len;
	char polle[16], pollr[16];
	char *buf, *slot, *ext = "";
	char speedi[32], speedo[32];
	unsigned long long bi, bo, dT;

	max = (Conn_no + 1) * 512 - 1;
	buf = malloc(max + 1);
	if (!buf)
		return strdup("No enough memory!");

	strcpy(buf, "");

	gettimeofday(&Conn_now, NULL);
	/* TODO: "len += " is incorrect */
	tmp_len = snprintf(tmp, sizeof(tmp), "Conn_pending=%d  Conn_no/Conn_max=%d/%d  Conn_total=%lu  Conn_uptime=%lus  Conn_allocated=%d\n",
		Conn_pending, Conn_no, Conn_max, Conn_total, Conn_now.tv_sec - Conn_start, Conn_allocated);
	if (len + tmp_len < max) {
		strcat(buf, tmp);
		len += tmp_len;
	}

	if (flags & 1)
		if (Conn_status_cb)
			ext = Conn_status_cb();

	if (flags & 1) {
		strcat(buf, "<table border=\"0\" cellspacing=\"1\" cellpadding=\"3\" bgcolor=\"#aaaaaa\">\n");
		strcat(buf, "<tr bgcolor=\"ffffff\">\n");
		strcat(buf, "<td>ID</td>");
		strcat(buf, "<td>Slot</td>");
		strcat(buf, "<td>FD</td>");
		strcat(buf, "<td>Dom</td>");
		strcat(buf, "<td>Type</td>");
		strcat(buf, "<td>SType</td>");
		strcat(buf, "<td>State</td>");
		strcat(buf, "<td>Addr/port</td>");
		strcat(buf, "<td>Via</td>");
		strcat(buf, "<td>Polle</td>");
		strcat(buf, "<td>Pollr</td>");
		strcat(buf, "<td>BI/BO</td>");
		strcat(buf, "<td>BUF I/O</td>");
		strcat(buf, "<td>Speed I/O</td>");
		strcat(buf, "<td>Elap (s)</td>");
		strcat(buf, "<td>Band</td>");
		strcat(buf, "<td>F</td>");
		strcat(buf, "<td>Tks</td>");
		strcat(buf, ext);
		strcat(buf, "</tr>\n");
	} else {
		strcat(buf, ext);
	}

	bi = 0; bo = 0; dT = 0;
	for (i = 0; i < Conn_no; i++) {
		C = &Conns[i];
		if (C->state == CONN_STATE_FREE)
			continue;

		if (C->type == Conn_type_CLIENT) {
			bi += C->bi;
			bo += C->bo;
			dT += Conn_now.tv_sec - C->start;
		}

		if (flags & 1)
			strcat(buf, "<tr bgcolor=\"ffffff\">\n");

		Conn_poll_status(Conn_pfds[C->slot].events, polle);
		Conn_poll_status(Conn_pfds[C->slot].revents, pollr);

		if ((flags & 1) == 0)
			slot = Conn_status_slot(C);
		else
			slot = Conn_status_slot_html(C);
		len += snprintf(tmp, sizeof(tmp), "%s", slot);
		if (len < max)
			strcat(buf, tmp);

		if (flags & 1)
			strcat(buf, "</tr>\n");
	}

	if (flags & 1)
		strcat(buf, "</table>\n");

	if (dT == 0)
		dT = 1;

	Conn_speed(speedi, sizeof(speedi), bi / dT);
	Conn_speed(speedo, sizeof(speedo), bo / dT);

	tmp_len = snprintf(tmp, sizeof(tmp), "Total speed I/O: %s / %s."
		" Total bytes I/O: %llu / %llu\n",
		speedi, speedo, bi, bo);
	if (len + tmp_len < max) {
		strcat(buf, tmp);
		len += tmp_len;
	}

	return buf;
}

/*
 * Returns the number of bytes in 'in' buffer
*/
unsigned int Conn_qlen(struct Conn *C)
{
	return C->ibuf_tail - C->ibuf_head;
}

/*
 * Eat @bytes from head of input buffer
 */
void Conn_eat(struct Conn *C, unsigned int bytes)
{
	unsigned int slot;

	slot = C->slot;

	/* advance head */
	Conns[slot].ibuf_head += bytes;
	if (Conns[slot].ibuf_head >= Conns[slot].ibuf_tail) {
		Conns[slot].ibuf_head = 0;
		Conns[slot].ibuf_tail = 0;
	}

	Log(10, "Conn_eat(C, %u) head=%u tail=%u qlen=%u\n",
		bytes, C->ibuf_head, C->ibuf_tail,
		Conn_qlen(C));
}

/*
 * Eat all input buffer
 */
void Conn_eatall(struct Conn *C)
{
	Conn_eat(C, Conn_qlen(C));
}

/*
 * Add tokens to connection
 */
void Conn_band_update(struct Conn *C)
{
	unsigned int slot;
	long diff;

	/* no need */
	if (C->band_width == 0)
		return;

	diff = (Conn_now.tv_sec - C->band_lasttime.tv_sec) * 1000000;
	diff += Conn_now.tv_usec - C->band_lasttime.tv_usec;
	diff /= 100000;

	/* already added in this hundred of milisecond? */
	if (diff == 0)
		return;

	/* take care of time skew */
	if (diff < 0)
		diff = 1;

	C->band_lasttime = Conn_now;
	C->band_tokens += diff * C->band_width / 10;
	if (C->band_tokens > C->band_factor * C->band_width)
		C->band_tokens = C->band_factor * C->band_width;

	slot = C->slot;
	Conn_pfds[slot].events |= POLLOUT;

	Log(debug_band, "\t\tBAND: Added tokens -> %u.\n",
		C->band_tokens);
}

/*
 * Set the bandwidth for a connection
 * width is in 1b increments
 */
int Conn_band(struct Conn *C, unsigned int width, unsigned int factor)
{
	Log(11, "\tConn_band(C, width=%u, factor=%u)\n",
		width, factor);

	C->band_lasttime = Conn_now;
	C->band_width = width;
	C->band_factor = factor;
	C->band_tokens = factor * width;

	Log(debug_band, "\t\tBAND: lasttime=%d.%06d, width=%u, factor=%u, tokens=%u\n",
		C->band_lasttime.tv_sec, C->band_lasttime.tv_usec,
		C->band_width, C->band_factor, C->band_tokens);

	return 0;
}

static void Conn_send_cb_i(struct Conn *C)
{
	ssize_t n;
	unsigned int max;
	int count;
	unsigned int slot;
	char *buf;
	int xerrno;
	char *dump;

	Log(10, "Conn_send_cb_i id=%llu, slot=%u...\n",
		C->id, C->slot);

	slot = C->slot;

	buf = Conns[slot].obuf + Conns[slot].obuf_head;
	count = Conns[slot].obuf_tail - Conns[slot].obuf_head;
	if (count == 0) {
		Log(13, "\tConn_send_cb_i: Empty buffer! Strange!\n");
		return;
	}

	max = count;
	if ((Conn_max_send > 0) && (max > Conn_max_send))
		max = Conn_max_send;

	/* bandwidth */
	if (Conns[slot].band_width > 0) {
		if (max > Conns[slot].band_tokens)
			max = Conns[slot].band_tokens;
		if (max == 0) {
			Log(debug_band, "\tBAND: Suspend 100ms the C (no tokens)!\n"); 
			Conn_pfds[slot].events &= ~POLLOUT;
			return;
		}
	}

	again:
	Log(10, "\tsend(fd%d, buf=%p (head=%u), max=%d (count=%d), 0)...\n",
		Conn_pfds[slot].fd, buf, Conns[slot].obuf_head, max, count);
	n = send(Conn_pfds[slot].fd, buf, max, 0);
	xerrno = errno;
	if ((n == -1) && (errno == EINTR))
		goto again;

	if ((n == -1) && (errno == EAGAIN))
		return;

	Log(10, "%s: Slot %u: FD%d Sent %d bytes [head=%d tail=%d]\n",
		__FUNCTION__, slot, Conn_pfds[slot].fd,
		n, Conns[slot].obuf_head, Conns[slot].obuf_tail);
	if (Conn_level >= 10) {
		dump = Conn_dump(buf, n);
		Log(0, "\t%s\n", dump);
		free(dump);
	}

	if (n > 0) {
		Conns[slot].tsend = Conn_now;
		if (n < count) {
			Conns[slot].obuf_head += n;
		} else {
			Conns[slot].obuf_head = 0;
			Conns[slot].obuf_tail = 0;
		}

		Conns[slot].bo += n;
		if (C->band_width > 0) {
			Conns[slot].band_tokens -= n;
			Log(debug_band, "\t%s: BAND: Remove %d tokens -> %u...\n",
				__FUNCTION__,
				n, Conns[slot].band_tokens);
		}
	} else {
		Log(0, "%s: Error in send [id %llu] [slot %u] [%s]\n",
			__FUNCTION__,
			Conns[slot].id, slot, strerror(errno));
		Conns[slot].error_state = CONN_ERROR_SEND;
		Conns[slot].xerrno = xerrno;
	}
}

static void Conn_recv_cb_i(struct Conn *C)
{
	ssize_t	n;
	unsigned int max;
	unsigned int slot;
	int r, xerrno;
	char *dump;

	Log(10, "Conn_recv_cb_i id=%llu, slot=%u...\n",
		C->id, C->slot);

	slot = C->slot;

	if (Conns[slot].ibuf_tail == Conns[slot].ibuf_size) {
		r = Conn_try_expand_buf(&Conns[slot], 1, 0);
		if (r != 0) {
			Log(1, "MEM: Cannot expand ibuf!\n");
			return;
		}
	}

	max = Conns[slot].ibuf_size - Conns[slot].ibuf_tail;
	if ((Conn_max_recv > 0) && (max > Conn_max_recv))
		max = Conn_max_recv;

	while (1) {
		n = recv(Conn_pfds[slot].fd, Conns[slot].ibuf + Conns[slot].ibuf_tail, max, 0);
		if ((n == -1) && (errno == EINTR))
			continue;

		xerrno = errno;

		break;
	}

	Log(10, "%s: Slot %u: FD%d Received %d bytes.\n",
		__FUNCTION__, slot, Conn_pfds[slot].fd, n);

	if (n > 0) {
		if (Conn_level >= 10) {
			dump = Conn_dump(Conns[slot].ibuf
				+ Conns[slot].ibuf_tail, n);
			Log(0, "\t%s\n", dump);
			free(dump);
		}

		Conns[slot].ibuf_tail += n;

		Conns[slot].bi += n;
		Conns[slot].trecv = Conn_now;

		if (Conns[slot].cb_data)
			Conns[slot].cb_data(&Conns[slot]);
		else if (Conn_data_cb)
			Conn_data_cb(&Conns[slot]);
	} else if (n == 0) {
		Conns[slot].error_state = CONN_ERROR_HANGUP;
	} else {
		Log(4, "Error in recv id %llu slot %u [%s]\n",
			Conns[slot].id, slot, strerror(errno));
		Conns[slot].error_state = CONN_ERROR_RECV;
		Conns[slot].xerrno = xerrno;
	}
}

/*
 * Set some internal parameters
 */
void Conn_set(struct Conn *C, int var, unsigned int val)
{
	unsigned int slot;
	int fd;

	slot = C->slot;
	fd = Conn_pfds[slot].fd;

	switch (var) {
	case CONN_PARA_AUTO_RECONNECT:
		C->flags |= (val == 0) ? 0 : CONN_FLAGS_AUTO_RECONNECT;
		break;
	case CONN_PARA_RECONNECT_DELAY:
		C->delay = val;
		break;
	case CONN_PARA_IDLE_TIME:
		C->idle = val;
		break;
	case CONN_PARA_READ_TIMEOUT:
		C->read_timeout = val;
		break;
	case CONN_PARA_CONN_TIMEOUT:
		C->conn_timeout = val;
		break;
	case CONN_PARA_TRIGGER:
		C->trigger = val;
		break;
	case CONN_PARA_IBUF:
		setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val));
		break;
	case CONN_PARA_OBUF:
		setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val));
		break;
	}
}

struct Conn *Conn_connect(int domain, int type, char *addr, int port)
{
	struct Conn *X;
	unsigned int Xslot;

	Log(8, "%s(%s, %d)\n",
		__FUNCTION__, addr, port);

	X = Conn_alloc();
	if (!X)
		return NULL;

	Xslot = X->slot;

	Conns[Xslot].type = Conn_type_CLIENT;
	Conns[Xslot].error_state = 0;
	Conns[Xslot].state = CONN_STATE_CONNECT_a;
	snprintf(Conns[Xslot].addr, sizeof(Conns[Xslot].addr), "%s", addr);
	Conns[Xslot].port = port;
	Conns[Xslot].sock_domain = domain;
	Conns[Xslot].sock_type = type;
	Conns[Xslot].id = Conn_id++;
	Conns[Xslot].start = Conn_now.tv_sec;

	Conn_pending++;

	return &Conns[Xslot];
}

static void Conn_trytoconnect(void)
{
	struct addrinfo hints;
	struct addrinfo *res;
	int i, ret;
	char port[8];

	Log(8, "%s() Conn_no=%d Conn_pending=%d\n",
		__FUNCTION__, Conn_no, Conn_pending);

	for (i = Conn_no - 1; i >= 0; i--) {
		if (Conns[i].type != Conn_type_CLIENT)
			continue;

		if ((Conns[i].state == CONN_STATE_CONNECT_0)
			&& (Conns[i].tryat <= Conn_now.tv_sec)) {
			Conns[i].state = CONN_STATE_CONNECT_a;
		}

		if (Conns[i].state != CONN_STATE_CONNECT_a)
			continue;

		Log(9, "\tTry to connect to %s/%d...\n",
			Conns[i].addr, Conns[i].port);

		memset(&hints, 0, sizeof(hints));
		if (Conns[i].sock_domain == 0)
			hints.ai_family = PF_UNSPEC;
		else
			hints.ai_family = Conns[i].sock_domain;
		hints.ai_socktype = Conns[i].sock_type;
		/*hints.ai_flags = AI_NUMERICHOST;*/
		hints.ai_flags = 0;
		snprintf(port, sizeof(port), "%d", Conns[i].port);
		res = NULL;
		ret = getaddrinfo(Conns[i].addr, port, &hints, &res);
		if (ret != 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot call getaddrinfo [%s]", gai_strerror(ret));
			Log(9, "\t%s\n", Conn_error);
			Conns[i].error_state = CONN_ERROR_GETADDRINFO;
			Conn_free_intern(&Conns[i]);
			if (res)
				freeaddrinfo(res);
			continue;
		}

		if (Conn_pfds[i].fd == -1) {
			Conn_pfds[i].fd = socket(res->ai_family, res->ai_socktype, 0);
			if (Conn_pfds[i].fd == -1) {
				snprintf(Conn_error, sizeof(Conn_error),
					"Cannot create socket [%s]", strerror(errno));
				Log(9, "\t%s\n", Conn_error);
				Conns[i].error_state = CONN_ERROR_SOCKET;
				Conn_free_intern(&Conns[i]);
				freeaddrinfo(res);
				continue;
			}
			Log(10, "	Allocated socket %d\n",
				Conn_pfds[i].fd);
			Conn_pfds[i].events |= POLLIN | POLLOUT;

			Conn_setnonblock(Conn_pfds[i].fd);
		}

		Log(9, "\tconnecting...\n");
		/* Set syn time */
		Conns[i].conn_syn = Conn_now;
		ret = connect(Conn_pfds[i].fd, res->ai_addr, res->ai_addrlen);
		if ((ret != 0) && (errno != EINPROGRESS)) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot connect [%d] [%s]", errno, strerror(errno));
			Log(9, "\t%s\n", Conn_error);
			Conns[i].error_state = CONN_ERROR_CONNECT;
			Conn_free_intern(&Conns[i]);
			freeaddrinfo(res);
			continue;
		}

		Conns[i].state = CONN_STATE_CONNECT_b;
		Conn_pending--;
		Conn_total++;

		freeaddrinfo(res);
	}

	Log(10, "%s() FINISH\n",
		__FUNCTION__);
}

/*
 * Returns 1 if we can ignore this connection */
static int Conn_ignore(struct Conn *C)
{
	if (C->error_state > 0)
		return 1;

	return 0;
}

/*
 * Close a connection if it exceeded maximum idle time or got a timeout
 */
static void Conn_expire(struct Conn *C)
{
	long long diff_ms;

	if ((C->trigger > 0)
		&& (C->last_trigger + C->trigger < Conn_now.tv_sec)) {
		C->last_trigger = Conn_now.tv_sec;
		if (C->cb_trigger)
			C->cb_trigger(C);
		else if (Conn_trigger_cb)
			Conn_trigger_cb(C);
	}

	if ((C->idle > 0) && (C->trecv.tv_sec + C->idle < Conn_now.tv_sec)) {
		C->error_state = CONN_ERROR_EXPIRED;
	} else if ((C->read_timeout > 0) && (C->tsend.tv_sec > 0)
		&& (C->tsend.tv_sec > C->trecv.tv_sec)) {
		diff_ms = Conn_time_diff(&Conn_now, &C->tsend);
		if (diff_ms > C->read_timeout) {
			C->error_state = CONN_ERROR_READ_TIMEOUT;
		}
	} else if ((C->conn_timeout > 0) && (C->state == CONN_STATE_CONNECT_b)) {
		diff_ms = Conn_time_diff(&Conn_now, &C->conn_syn);
		if (diff_ms > C->conn_timeout) {
			/* connection attempt expired */
			C->error_state = CONN_ERROR_CONN_TIMEOUT;
		}
	}
}

/*
 * Returns: -1 on error, 0 nothing to do, 1 if some work was done
 * timeout is in 1/1000 seconds increments.
 */
int Conn_poll(int timeout)
{
	int i;
	short rev;
	char *stats;
	int timeout2;
	int events;

	Log(11, "Conn_poll(timeout=%d Conn_no=%d)\n",
		timeout, Conn_no);

	if (Conn_no == 0)
		return 0;

	if (timeout == -1)
		timeout2 = 500;
	else
		timeout2 = timeout;

	loop:
	if (Conn_pending > 0)
		Conn_trytoconnect();

	again:
	events = poll(&Conn_pfds[0], Conn_no, timeout2);
	if ((events == -1) && (errno == EINTR))
		goto again;

	if (events < 0) {
		snprintf(Conn_error, sizeof(Conn_error),
			"poll(%p, %d): %s",
			(void *)Conn_pfds, Conn_no, strerror(errno));
		return -1;
	}

	gettimeofday(&Conn_now, NULL);

	if (Conn_level >= 11) {
		stats = Conn_status(0);
		Log(0, "%s\n", stats);
		free(stats);
	}

	/* We do revers scan because of moving Conn objects */
	Log(11, "Process %d events...\n",
		events);
	for (i = Conn_no - 1; i >= 0; i--) {
		if (events > 0) {
			rev = Conn_pfds[i].revents;

			if (rev == 0)
				continue;

			events--;

			if (rev & POLLHUP) {
				Conns[i].error_state = CONN_ERROR_HANGUP;
			}

			if (rev & POLLERR) {
				Conns[i].error_state = CONN_ERROR_POLL;
				Conns[i].xerrno = 0;
			}

			if (rev & POLLNVAL) {
				Log(0, "BUG NVAL!\n");
				exit(1);
			}

			/* First, test we have a new connection */
			if ((rev & POLLOUT) && (Conn_ignore(&Conns[i]) == 0)) {
				/* We just established a connection */
				if (Conns[i].state == CONN_STATE_CONNECT_b) {
					if (Conns[i].cb_connected)
						Conns[i].cb_connected(&Conns[i]);
					else if (Conn_connected_cb)
						Conn_connected_cb(&Conns[i]);
					Conns[i].state = CONN_STATE_OPEN;
				}
			}

			/* Second, test for error or input */
			if ((rev & POLLIN) && (Conn_ignore(&Conns[i]) == 0)) {
				if (Conns[i].type == Conn_type_MASTER) {
					Conn_accept(&Conns[i]);
				} else {
					if (Conns[i].cb_recv)
						Conns[i].cb_recv(&Conns[i]);
					else if (Conn_recv_cb != NULL)
						Conn_recv_cb(&Conns[i]);
					else
						Conn_recv_cb_i(&Conns[i]);
				}
			}

			if ((rev & POLLOUT) && (Conn_ignore(&Conns[i]) == 0)) {
				/* We can send data */
				if (Conns[i].state == CONN_STATE_OPEN) {
					if (Conns[i].cb_send)
						Conns[i].cb_send(&Conns[i]);
					else if (Conn_send_cb != NULL)
						Conn_send_cb(&Conns[i]);
					else
						Conn_send_cb_i(&Conns[i]);

					if (Conns[i].obuf_head == Conns[i].obuf_tail) {
						Conn_pfds[i].events &= ~POLLOUT;
						if (Conns[i].flags & CONN_FLAGS_CLOSE_AFTER_SEND)
							Conns[i].error_state = CONN_ERROR_USERREQ;
					}
				}
			}
		}

		/* test if it expired/timout */
		Conn_expire(&Conns[i]);

		if (Conns[i].error_state > 0) {
			Conn_free_intern(&Conns[i]);
		} else {
			/* add tokens */
			Conn_band_update(&Conns[i]);
		}
	}

	if (timeout == -1)
		goto loop;
	else
		return 1;

	return 1;
}

/*
 * Set NODELAY on socket
 */
int Conn_nodelay(struct Conn *C)
{
	int i, fd;

	fd = Conn_pfds[C->slot] . fd;
	i = 1;
	return setsockopt(fd, SOL_TCP, TCP_NODELAY, &i, sizeof(i));
}

void Conn_rollback(struct Conn *C, unsigned int bytes)
{
	if (C->obuf_tail - C->obuf_head <= bytes)
		C->obuf_tail -= bytes;
}

/*
 * Search for str in active buffer from a given offset
 * Returns pointer to string if match or NUll if doesn't.
 */
char *Conn_ostrstr(struct Conn *C, unsigned int off, char *str)
{
	unsigned int len, str_len, i;
	char *buf, *ret = NULL;

	len = C->ibuf_tail - C->ibuf_head - off;
	buf = C->ibuf + C->ibuf_head + off;
	str_len = strlen(str);

	if (len < str_len)
		return NULL;

	i = 0;
	while (i <= len - str_len) {
		if (strncmp(buf + i, str, str_len) == 0) {
			ret = buf + i;
			break;
		}

		i++;
	}

	return ret;
}

/*
 * Search for str in active buffer
 * Returns pointer to string if match or NUll if doesn't.
 */
char *Conn_strstr(struct Conn *C, char *str)
{
	return Conn_ostrstr(C, 0, str);
}

/*
 * Returns a pointer to current in buffer
 */
char *Conn_ibuf(struct Conn *C)
{
	return C->ibuf + C->ibuf_head;
}

/*
 * Returns a pointer to current out buffer
 */
char *Conn_obuf(struct Conn *C)
{
	return C->obuf + C->obuf_head;
}

/*
 * Returns the id of a connection
 */
unsigned long long Conn_getid(struct Conn *C)
{
	return C->id;
}

/*
 * Returns a Conn* searching for id
 */
struct Conn *Conn_get(unsigned long long id)
{
	struct Conn *R = NULL;
	int i;

	for (i = Conn_no - 1; i >= 0; i--) {
		if (Conns[i].id == id) {
			R = &Conns[i];
			break;
		}
	}

	return R;
}


/*
 * Returns the fd associated with C
 */
int Conn_get_fd(struct Conn *C)
{
	if (C == NULL)
		return -1;

	return Conn_pfds[C->slot].fd;
}

/*
 * Returns the timeval of the last packet
 */
void Conn_last_time(struct Conn *C, struct timeval *tv)
{
	*tv = C->trecv;
}

/*
 * Set a callback
 */
int Conn_set_cb(struct Conn *C, unsigned int type, void (*f)(struct Conn *))
{
	switch (type) {
		case CONN_CB_ACCEPT:	C->cb_accept = f; break;
		case CONN_CB_RECV:	C->cb_recv = f; break;
		case CONN_CB_SEND:	C->cb_send = f; break;
		case CONN_CB_DATA:	C->cb_data = f; break;
		case CONN_CB_CLOSE:	C->cb_close = f; break;
		case CONN_CB_TRIGGER:	C->cb_trigger = f; break;
		case CONN_CB_ERROR:	C->cb_error = f; break;
		case CONN_CB_CONNECTED:	C->cb_connected = f; break;

		default:
			return -1;
	}

	return 0;
}


Mode Type Size Ref File
100644 blob 1945 fecf0e7a7e8580485101a179685aedc7e00affbb Changelog
100644 blob 39966 3101607ebe0288de591acda2d558579b4c54ccac Conn.c
100644 blob 7051 e2302f47f9beb6d949268fee80c918298c6701a1 Conn.h
100644 blob 717 1394784a1fafde8f15d3fba489c056bc045a8003 Conn.spec.in
100644 blob 30 d987fa5df957830331139935d517009e2911b0cf INSTALL
100644 blob 25275 92b8903ff3fea7f49ef5c041b67a087bca21c5ec LICENSE
100644 blob 498 9b760aafe85b71ce4d52b50f6b55ff73cff7bfc6 Makefile.head
100644 blob 710 570276f936ae7d28a85c1c29c92645f3ba6a9543 Makefile.in
100644 blob 462 1b23184ebfd6fb72c20de1e4b011f15aaffb214f Makefile.tail
100644 blob 4 c4846601e2d94d4ee8b2c17cc37c0a829c49d1e9 PROJECT
100644 blob 0 e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 PROJECT_EXCLUDE
100644 blob 3 8f92bfdd49766b1907d4aec8d3b0f9ed6129d0e6 PROJECT_REV
100644 blob 4 0702cb5bfbb0169457c00f947f4e5601f8cd77e7 PROJECT_TARGETS
100644 blob 6 b0f3d96f877256ed9ae03858ecc5185a989b1d1b PROJECT_VER
100644 blob 192 5b11bdfb23857d8588845465aef993b320596b44 README
100644 blob 1453 b9f9dab2d0bb2762dea406c9e87d34eb38a18d9d TODO
100755 blob 2315 11e05cc644a8af4277006af69d78277a143590c4 configure
040000 tree - d4c9c4a69c5cfa2a84316967185f1661b6817779 docs
040000 tree - a4c21858806284281d1fe8479845411711694ebb examples
Hints:
Before first commit, do not forget to setup your git environment:
git config --global user.name "your_name_here"
git config --global user.email "your@email_here"

Clone this repository using HTTP(S):
git clone https://rocketgit.com/user/catalinux/Conn

Clone this repository using ssh (do not forget to upload a key first):
git clone ssh://rocketgit@ssh.rocketgit.com/user/catalinux/Conn

Clone this repository using git:
git clone git://git.rocketgit.com/user/catalinux/Conn

You are allowed to anonymously push to this repository.
This means that your pushed commits will automatically be transformed into a merge request:
... clone the repository ...
... make some changes and some commits ...
git push origin main