catalinux / Conn (public) (License: LGPLv2) (since 2016-03-01) (hash sha1)
Net library for easy building ipv4/ipv6 network daemons/clients

/Conn.c (b7e33ed4916a779af85b159240f2db042dcf5749) (25504 bytes) (mode 100644) (type blob)

/*
 * Author: Catalin(ux) M BOIE <catab at embedromix.ro>
 * Date: 2004
 * Description:	Some functions to help writing network servers and clients,
 *		both ipv4 and ipv6.
 * Licence: LGPL
 */

#include "Conn.h"
#include "Conn_engine_poll.h"
#include "Conn_engine_epoll.h"


/* Internal variables */
/* Engine */
static unsigned int		Conn_engine;
static unsigned int		Conn_engine_poll_found;
static unsigned int		Conn_engine_epoll_found;

/* Engine functions */
static int (*Conn_engine_init)(void);
static int (*Conn_engine_shutdown)(void);
static int (*Conn_engine_grow)(unsigned int);
static int (*Conn_engine_add_obj)(struct Conn *);
static int (*Conn_engine_del_obj)(struct Conn *);
static int (*Conn_engine_chg_obj)(struct Conn *);
static int (*Conn_engine_poll)(int, void (*cb)(struct Conn *C, const int revents));
static void (*Conn_engine_move_slot)(const unsigned int dst,
		const unsigned int src);


/* Functions */

/*
 * Set prefered engine
 */
int Conn_engine_set(const unsigned int engine)
{
	if (engine == Conn_engine)
		return 0;

	if (engine == CONN_ENGINE_POLL) {
		if (Conn_engine_poll_found == 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot use engine POLL. Probably not supported.");
			return -1;
		}

		#ifdef POLL_FOUND
		/* Set variables */
		CONN_POLLIN = POLLIN;
		CONN_POLLOUT = POLLOUT;
		CONN_POLLPRI = POLLPRI;
		CONN_POLLERR = POLLERR;
		CONN_POLLHUP = POLLHUP;
		CONN_POLLNVAL = POLLNVAL;
		CONN_POLLRDNORM = POLLRDNORM;
		CONN_POLLRDBAND = POLLRDBAND;
		/* Set functions */
		Conn_engine_init = Conn_poll_init;
		Conn_engine_shutdown = Conn_poll_shutdown;
		Conn_engine_grow = Conn_poll_grow;
		Conn_engine_add_obj = Conn_poll_add_obj;
		Conn_engine_del_obj = Conn_poll_del_obj;
		Conn_engine_chg_obj = Conn_poll_chg_obj;
		Conn_engine_poll = Conn_poll_poll;
		Conn_engine_move_slot = Conn_poll_move_slot;
		#endif
	} else if (engine == CONN_ENGINE_EPOLL) {
		if (Conn_engine_epoll_found == 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot use engine EPOLL. Probably not supported.");
			return -1;
		}

		#ifdef EPOLL_FOUND
		/* Set variables */
		CONN_POLLIN = EPOLLIN;
		CONN_POLLOUT = EPOLLOUT;
		CONN_POLLPRI = EPOLLPRI;
		CONN_POLLERR = EPOLLERR;
		CONN_POLLHUP = EPOLLHUP;
		CONN_POLLNVAL = 0; /* not defined for epoll */
		CONN_POLLRDNORM = 0; /* not defined for epoll */
		CONN_POLLRDBAND = 0; /* not defined for epoll */
		/* Set functions */
		Conn_engine_init = Conn_epoll_init;
		Conn_engine_shutdown = Conn_epoll_shutdown;
		Conn_engine_grow = Conn_epoll_grow;
		Conn_engine_add_obj = Conn_epoll_add_obj;
		Conn_engine_del_obj = Conn_epoll_del_obj;
		Conn_engine_chg_obj = Conn_epoll_chg_obj;
		Conn_engine_poll = Conn_epoll_poll;
		Conn_engine_move_slot = Conn_epoll_move_slot;
		#endif
	} else {
		snprintf(Conn_error, sizeof(Conn_error),
			"Cannot use engine %u because is not supported.", engine);
		return -1;
	}

	return 0;
}

int Conn_init(const unsigned int max)
{
	unsigned int ret;
	unsigned int engine;

	if (Conn_inited == 1)
		return 0;

	Conn_max = max;

	Conn_no = 0;
	Conn_total = 0;
	Conn_max_reached = 0;
	gettimeofday(&Conn_now, NULL);
	Conn_start = Conn_now.tv_sec;
	Conn_accept_is_allowed = 1;
	Conn_accept_is_allowed_last = 1;
	Conn_allocated = 0;

	snprintf(Conn_error, sizeof(Conn_error), "%s", "");

	/*
	Conn_queue_init(&Conn_queue_free);
	*/

	#ifdef POLL_FOUND
	engine = CONN_ENGINE_POLL;
	Conn_engine_poll_found = 1;
	#endif

	#ifdef EPOLL_FOUND
	engine = CONN_ENGINE_EPOLL;
	Conn_engine_epoll_found = 1;
	#endif

	ret = Conn_engine_set(engine);
	if (ret != 0)
		return -1;

	ret = Conn_engine_init();
	if (ret != 0)
		return -1;

	Conn_inited = 1;

	return 0;
}

/*
 * Shutdown Conn
 */
int Conn_shutdown(void)
{
	int ret;
	unsigned int i;

	Conn_inited = 0;

	ret = Conn_engine_shutdown();
	if (ret < 0)
		return ret;

	/* Free all buffers */
	for (i = 0; i < Conn_allocated - 1; i++) {
		if (Conns[i].ibuf)
			free(Conns[i].ibuf);
		if (Conns[i].obuf)
			free(Conns[i].obuf);
	}

	free(Conns);

	return 0;
}

int Conn_enqueue(struct Conn *C, void *buf, const size_t count)
{
	unsigned int r;
	char *dump;

	if (C == NULL) {
		Log(0, "ERROR: Somebody try to enqueue something to a NULL conn.\n");
		return -1;
	}

	if (Conn_level >= 10) {
		dump = Conn_dump(buf, count);
		Log(0, "\tTry to enqueue %d bytes to slot=%u, id=%llu [%s]...\n",
			count, C->slot, C->id, dump);
		free(dump);
	}

	if (C->obuf_size - C->obuf_tail < count) {
		r = Conn_try_expand_buf(C, 0, count);
		if (r != 0)
			return -1;
	}

	memcpy(C->obuf + C->obuf_tail, buf, count);
	C->obuf_tail += count;

	C->events |= CONN_POLLOUT;
	Conn_engine_chg_obj(C);

	return 0;
}

static void Conn_free_intern(struct Conn *C)
{
	Log(7, "%s: Cleaning-up slot in %s state, slot=%u, id=%llu [%s]...\n",
		__FUNCTION__, Conn_state(C), C->slot, C->id, Conn_errno(C));

	snprintf(Conn_error, sizeof(Conn_error),
		"%s", Conn_errno(C));

	if (C->error_state != CONN_ERROR_USERREQ) {
		if (C->cb_error)
			C->cb_error(C);
		else if (Conn_error_cb)
			Conn_error_cb(C);
	}

	if ((C->state == CONN_STATE_OPEN) || (C->state == CONN_STATE_LISTEN)) {
		if (C->cb_close)
			C->cb_close(C);
		else if (Conn_close_cb)
			Conn_close_cb(C);
	}

	Conn_engine_del_obj(C);

	if (C->fd > -1) {
		close(C->fd);
		C->fd = -1;
	}

	/* Reset tsend, else we enter in a timeout error loop */
	C->tsend.tv_sec = 0;
	C->tsend.tv_usec = 0;

	/* Reset the connection attempt time */
	C->conn_syn.tv_sec = 0;
	C->conn_syn.tv_usec = 0;

	if (C->flags & CONN_FLAGS_AUTO_RECONNECT) {
		C->tryat = Conn_now.tv_sec + C->delay;
		C->error_state = 0;
		C->state = CONN_STATE_CONNECT_0;

		C->ibuf_head = 0;
		C->ibuf_tail = 0;

		C->obuf_head = 0;
		C->obuf_tail = 0;

		Conn_pending++;
	} else {
		C->type = Conn_type_UNK;
		C->state = CONN_STATE_FREE;
	}
}

/*
 * Grow Conn structures (cells)
 */
static int Conn_grow(void)
{
	int ret;
	unsigned int alloc;
	struct Conn *p, *set;

	Log(10, "%s() Try to grow cells from %d to %d.\n",
		__FUNCTION__,
		Conn_allocated, Conn_allocated + 128);

	alloc = Conn_allocated + 128;

	ret = Conn_engine_grow(alloc);
	if (ret != 0)
		return -1;

	p = (struct Conn *) realloc(Conns, alloc * sizeof(struct Conn));
	if (p == NULL)
		return -1;

	set = p + Conn_allocated;
	memset(set, 0, 128 * sizeof(struct Conn));
	Conns = p;

	Conn_allocated = alloc;

	return 0;
}

/*
 * Allocs a Conn structure
 */
static struct Conn *Conn_alloc(void)
{
	struct Conn *C;
	unsigned int growok;
	void *p;

	Log(10, "%s() Conn_no=%d Conn_max=%d\n",
		__FUNCTION__,
		Conn_no, Conn_max);

	if ((Conn_max > 0) && (Conn_no >= Conn_max)) {
		snprintf(Conn_error, sizeof(Conn_error),
			"Limit reached! Consider a raise of max connection number or put 0 for no limit.");
		return NULL;
	}

	if (Conn_allocated == Conn_no) {
		growok = Conn_grow();
		if (growok != 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot grow anymore. Probably memory shortage.");
			return NULL;
		}
	}

	if (Conn_no > Conn_max_reached)
		Conn_max_reached = Conn_no;

	C = &Conns[Conn_no];
	C->slot = Conn_no;

	C->type = Conn_type_UNK;
	C->state = CONN_STATE_FREE;

	if (C->ibuf_size < Conn_default_ibuf) {
		p = realloc(C->ibuf, Conn_default_ibuf);
		if (p == NULL) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Memory allocation error2!");
			return NULL;
		}
		C->ibuf = p;
		C->ibuf_size = Conn_default_ibuf;
	}
	C->ibuf_head = 0;
	C->ibuf_tail = 0;

	if (C->obuf_size < Conn_default_obuf) {
		p = realloc(C->obuf, Conn_default_obuf);
		if (p == NULL) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Memory allocation error3!");
			return NULL;
		}
		C->obuf = p;
		C->obuf_size = Conn_default_obuf;
	}
	C->obuf_head = 0;
	C->obuf_tail = 0;

	C->trecv = Conn_now;

	C->bi = 0;
	C->bo = 0;
	C->private = NULL;

	/* bandwidth */
	C->band_width = 0;
	C->band_factor = 0;
	C->band_tokens = 0;
	C->band_lasttime = Conn_now;

	C->fd = -1;
	C->events = 0;
	C->revents = 0;
	C->state = CONN_STATE_EMPTY;

	C->flags = 0;

	Conn_no++;

	Log(10, "\tFound free slot=%u, id=%llu. Now Conn_no=%d\n",
		C->slot, C->id, Conn_no);

	if (Conn_no == Conn_max)
		Conn_accept_is_allowed = 0;

	return C;
}

struct Conn *Conn_socket_addr(const int domain, const int type,
	const char *addr, const int port)
{
	struct Conn *C;
	int i, ret;
	struct sockaddr *psa = NULL;
	struct sockaddr_in sa;
	struct sockaddr_in6 sa6;
	int sock_len = 0;
	int do_bind = 1, do_listen = 1;
	int protocol = 0;
	int first_state;

	switch (domain) {
		case PF_INET:
			memset(&sa, 0, sizeof(sa));
			sa.sin_family = AF_INET;
			ret = inet_pton(AF_INET, addr, &sa.sin_addr);
			if (ret < 0) {
				snprintf(Conn_error, sizeof(Conn_error),
					"inet_pton(%s) failed", addr);
				return NULL;
			}
			sa.sin_port = htons(port);
			psa = (struct sockaddr *) &sa;
			sock_len = sizeof(sa);
			if (type == SOCK_STREAM) {
				first_state = CONN_STATE_LISTEN;
			} else if (type == SOCK_DGRAM) {
				do_listen = 0;
				first_state = CONN_STATE_OPEN;
			}
			break;

		case PF_INET6:
			memset(&sa6, 0, sizeof(sa6));
			sa6.sin6_family = AF_INET6;
			ret = inet_pton(AF_INET6, addr, &sa6.sin6_addr);
			if (ret < 0) {
				snprintf(Conn_error, sizeof(Conn_error),
					"inet_pton(%s) failed", addr);
				return NULL;
			}
			sa6.sin6_port = htons(port);
			psa = (struct sockaddr *) &sa6;
			sock_len = sizeof(sa6);
			if (type == SOCK_STREAM) {
				first_state = CONN_STATE_LISTEN;
			} else if (type == SOCK_DGRAM) {
				do_listen = 0;
				first_state = CONN_STATE_OPEN;
			}
			break;

		case PF_PACKET:
			do_bind = 0;
			protocol = htons(port);
			first_state = CONN_STATE_OPEN;
			break;

		default:
			snprintf(Conn_error, sizeof(Conn_error),
				"Invalid domain [%d]!", domain);
			return NULL;
	}

	C = Conn_alloc();
	if (!C)
		return NULL;

	C->fd = socket(domain, type, protocol);
	if (C->fd == -1) {
		snprintf(Conn_error, sizeof(Conn_error),
			"Cannot create socket (%d, %d, %d) [%s]",
			domain, type, protocol, strerror(errno));
		C->xerrno = errno;
		goto out;
	}

	Conn_setnonblock(C->fd);

	if (domain == PF_INET6) {
		#ifndef IPV6_V6ONLY
			#define IPV6_V6ONLY 26
		#endif
		i = 1;
		setsockopt(C->fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&i, sizeof(i));
	}

	/* Default, client */
	C->type = Conn_type_CLIENT;

	if (do_bind == 1) {
		i = 1;
		setsockopt(C->fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));

		ret = bind(C->fd, psa, sock_len);
		if (ret < 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot bind [%s]", strerror(errno));
			C->xerrno = errno;
			goto out;
		}

		if (do_listen == 1) {
			listen(C->fd, 128);
			C->type = Conn_type_MASTER;
		}
	}

	C->state = first_state;

	C->start = Conn_now.tv_sec;

	/* Reset syn time */
	C->conn_syn.tv_sec = 0;
	C->conn_syn.tv_usec = 0;

	C->id = Conn_id++;

	snprintf(C->addr, sizeof(C->addr), "%s", addr);
	C->port = port;

	C->sock_domain = domain;
	C->sock_type = type;

	C->events = CONN_POLLIN;
	C->revents = 0;

	ret = Conn_engine_add_obj(C);
	if (ret != 0) {
		C->xerrno = 0;
		goto out;
	}

	return C;

	out:
	C->error_state = CONN_ERROR_SOCKET;
	Conn_free_intern(C);

	return NULL;
}

struct Conn *Conn_socket(const int domain, const int type, const int port)
{
	char *addr;

	switch (domain) {
		case PF_INET: addr = "0.0.0.0"; break;
		case PF_INET6: addr = "::"; break;
		default: addr = NULL;
	}

	return Conn_socket_addr(domain, type, addr, port);
}

struct Conn *Conn_connect(const int domain, const int type, const char *addr,
	const int port)
{
	struct Conn *X;

	Log(8, "%s(%s, %d)\n",
		__FUNCTION__, addr, port);

	X = Conn_alloc();
	if (!X)
		return NULL;

	X->type = Conn_type_CLIENT;
	X->error_state = 0;
	X->state = CONN_STATE_CONNECT_a;
	snprintf(X->addr, sizeof(X->addr), "%s", addr);
	X->port = port;
	X->sock_domain = domain;
	X->sock_type = type;
	X->id = Conn_id++;
	X->start = Conn_now.tv_sec;

	Conn_pending++;

	return X;
}

static void Conn_accept(struct Conn *C)
{
	int fd;
	struct sockaddr *pca;
	struct sockaddr_in ca4;
	struct sockaddr_in6 ca6;
	socklen_t cax_len;
	struct Conn *X;
	unsigned int Cslot;

	Log(10, "Accepting a connection via %s/%d, type %s, domain %s.\n",
		C->addr, C->port, Conn_type(C), Conn_domain(C));

	switch(C->sock_domain) {
		case PF_INET:
			pca = (struct sockaddr *) &ca4;
			cax_len = sizeof(ca4);
		break;

		case PF_INET6:
			pca = (struct sockaddr *) &ca6;
			cax_len = sizeof(ca6);
		break;

		default:
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot deal with domain %d.",
				C->sock_domain);
			C->error_state = CONN_ERROR_SOCKET;
			if (C->cb_error)
				C->cb_error(C);
			else if (Conn_error_cb)
				Conn_error_cb(C);
			return;
	}

	fd = accept(C->fd, pca, &cax_len);
	if (fd == -1) {
		if (errno == EAGAIN)
			return;

		/* TODO: ratelimit */
		Log(9, "WARN: Cannot accept on fd %d [%s].\n",
			C->fd, strerror(errno));
		if (C->cb_accept_error)
			C->cb_accept_error(C);
		else if (Conn_accept_error_cb)
			Conn_accept_error_cb(C);
		return;
	}

	/* After calling Conn_alloc, pointer to slot can change, so C is not valid */
	Cslot = C->slot;
	X = Conn_alloc();
	C = &Conns[Cslot];
	if (!X) {
		if (C->cb_accept_error)
			C->cb_accept_error(C);
		else if (Conn_accept_error_cb)
			Conn_accept_error_cb(C);
		close(fd);
		return;
	}

	switch (C->sock_domain) {
		case PF_INET:
			inet_ntop(C->sock_domain, &ca4.sin_addr, X->addr, sizeof(X->addr));
			X->port = ntohs(ca4.sin_port);
			break;

		case PF_INET6:
			inet_ntop(C->sock_domain, &ca6.sin6_addr, X->addr, sizeof(X->addr));
			X->port = ntohs(ca6.sin6_port);
			break;
	}

	X->type = Conn_type_CLIENT;
	X->error_state = 0;
	X->state = CONN_STATE_OPEN;
	X->via = C->id;
	X->fd = fd;
	X->sock_domain = C->sock_domain;
	X->sock_type = C->sock_type;
	X->start = Conn_now.tv_sec;
	X->id = Conn_id++;
	X->events = CONN_POLLIN;
	X->revents = 0;

	Conn_setnonblock(X->fd);

	Conn_engine_add_obj(X);

	if (C->cb_accept)
		C->cb_accept(X);
	else if (Conn_accept_cb != NULL)
		Conn_accept_cb(X);

	Conn_total++;
}

static void Conn_accept_allow(void)
{
	unsigned int i;

	if (Conn_accept_is_allowed == Conn_accept_is_allowed_last)
		return;

	Log(10, "%s: Turning accept allow from %d to %d...\n",
		__FUNCTION__, Conn_accept_is_allowed_last,
		Conn_accept_is_allowed);

	for (i = 0; i < Conn_no; i++) {
		if (Conns[i].type != Conn_type_MASTER)
			continue;

		if (Conn_accept_is_allowed == 0)
			Conns[i].events &= ~CONN_POLLIN;
		else	
			Conns[i].events |= CONN_POLLIN;

		Conn_engine_chg_obj(&Conns[i]);
	}

	Conn_accept_is_allowed_last = Conn_accept_is_allowed;
}

/*
 * Add tokens to connection
 */
static void Conn_band_update(struct Conn *C)
{
	long diff;

	/* no need */
	if (C->band_width == 0)
		return;

	diff = (Conn_now.tv_sec - C->band_lasttime.tv_sec) * 1000000;
	diff += Conn_now.tv_usec - C->band_lasttime.tv_usec;
	diff /= 100000;

	/* already added in this hundred of milisecond? */
	if (diff == 0)
		return;

	/* take care of time skew */
	if (diff < 0)
		diff = 1;

	C->band_lasttime = Conn_now;
	C->band_tokens += diff * C->band_width / 10;
	if (C->band_tokens > C->band_factor * C->band_width)
		C->band_tokens = C->band_factor * C->band_width;

	C->events |= CONN_POLLOUT;
	Conn_engine_chg_obj(C);

	Log(debug_band, "\t\tBAND: slot=%u, id=%llu, added tokens -> %u.\n",
		C->slot, C->id, C->band_tokens);
}

/*
 * Set the bandwidth for a connection
 * width is in 1b increments
 */
int Conn_band(struct Conn *C, const unsigned int width,
	const unsigned int factor)
{
	Log(11, "\tConn_band: slot=%u, id=%llu, width=%u, factor=%u.\n",
		C->slot, C->id, width, factor);

	C->band_lasttime = Conn_now;
	C->band_width = width;
	C->band_factor = factor;
	C->band_tokens = factor * width;

	Log(debug_band, "\t\tBAND: lasttime=%d.%06d, width=%u, factor=%u, tokens=%u\n",
		C->band_lasttime.tv_sec, C->band_lasttime.tv_usec,
		C->band_width, C->band_factor, C->band_tokens);

	return 0;
}

static void Conn_trytoconnect(void)
{
	struct addrinfo hints;
	struct addrinfo *res;
	int i, ret;
	char port[8];

	Log(8, "%s: Conn_pending=%d\n",
		__FUNCTION__, Conn_pending);

	for (i = Conn_no - 1; i >= 0; i--) {
		if (Conns[i].type != Conn_type_CLIENT)
			continue;

		if ((Conns[i].state == CONN_STATE_CONNECT_0)
			&& (Conns[i].tryat <= Conn_now.tv_sec)) {
			Conns[i].state = CONN_STATE_CONNECT_a;
		}

		if (Conns[i].state != CONN_STATE_CONNECT_a)
			continue;

		Log(9, "\tTry to connect slot=%u, id=%llu, to %s/%d...\n",
			Conns[i].slot, Conns[i].id, Conns[i].addr, Conns[i].port);

		memset(&hints, 0, sizeof(hints));
		if (Conns[i].sock_domain == 0)
			hints.ai_family = PF_UNSPEC;
		else
			hints.ai_family = Conns[i].sock_domain;
		hints.ai_socktype = Conns[i].sock_type;
		/*hints.ai_flags = AI_NUMERICHOST;*/
		hints.ai_flags = AI_ADDRCONFIG;
		snprintf(port, sizeof(port), "%d", Conns[i].port);
		res = NULL;
		ret = getaddrinfo(Conns[i].addr, port, &hints, &res);
		if (ret != 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot call getaddrinfo [%s]", gai_strerror(ret));
			Log(9, "\t%s\n", Conn_error);
			Conns[i].error_state = CONN_ERROR_GETADDRINFO;
			Conn_free_intern(&Conns[i]);
			if (res)
				freeaddrinfo(res);
			continue;
		}

		if (Conns[i].fd == -1) {
			Conns[i].fd = socket(res->ai_family, res->ai_socktype, 0);
			if (Conns[i].fd == -1) {
				snprintf(Conn_error, sizeof(Conn_error),
					"Cannot create socket [%s]", strerror(errno));
				Log(9, "\t%s\n", Conn_error);
				Conns[i].error_state = CONN_ERROR_SOCKET;
				Conn_free_intern(&Conns[i]);
				freeaddrinfo(res);
				continue;
			}
			Log(10, "\tAllocated socket %d\n",
				Conns[i].fd);

			Conn_setnonblock(Conns[i].fd);

			/* Need POLLOUT to signal when the connection was done. */
			Conns[i].events |= (CONN_POLLIN | CONN_POLLOUT);
			Conn_engine_add_obj(&Conns[i]);
		}

		Log(9, "\tConnecting...\n");
		/* Set syn time */
		Conns[i].conn_syn = Conn_now;
		ret = connect(Conns[i].fd, res->ai_addr, res->ai_addrlen);
		if ((ret != 0) && (errno != EINPROGRESS)) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot connect [%d] [%s]",
				errno, strerror(errno));
			Log(9, "\t%s\n", Conn_error);
			Conns[i].error_state = CONN_ERROR_CONNECT;
			Conn_free_intern(&Conns[i]);
			freeaddrinfo(res);
			continue;
		}

		Conns[i].state = CONN_STATE_CONNECT_b;

		Conn_pending--;
		Conn_total++;

		freeaddrinfo(res);
	}
}

static void Conn_send_cb_i(struct Conn *C)
{
	ssize_t n;
	unsigned int max;
	int count;
	unsigned int slot;
	char *buf;
	int xerrno;
	char *dump;

	Log(10, "Conn_send_cb_i slot=%u, id=%llu, head=%u, tail=%u, size=%u...\n",
		C->slot, C->id, C->obuf_head, C->obuf_tail, C->obuf_size);

	slot = C->slot;

	if (Conns[slot].obuf == NULL)
		abort();

	buf = Conns[slot].obuf + Conns[slot].obuf_head;
	count = Conns[slot].obuf_tail - Conns[slot].obuf_head;
	if (count == 0) {
		Log(13, "\tConn_send_cb_i: Empty buffer! Strange!\n");
		return;
	}

	max = count;
	if ((Conn_max_send > 0) && (max > Conn_max_send))
		max = Conn_max_send;

	/* bandwidth */
	if (Conns[slot].band_width > 0) {
		if (max > Conns[slot].band_tokens)
			max = Conns[slot].band_tokens;
		if (max == 0) {
			Log(debug_band, "\tBAND: Suspend 100ms the C (no tokens)!\n"); 
			Conns[slot].events &= ~CONN_POLLOUT;
			Conn_engine_chg_obj(&Conns[slot]);
			return;
		}
	}

	again:
	Log(10, "\tsend(fd=%d, buf (head=%u, tail=%u), max=%d (count=%d), 0)...\n",
		Conns[slot].fd, Conns[slot].obuf_head,
		Conns[slot].obuf_tail, max, count);
	n = send(Conns[slot].fd, buf, max, 0);
	xerrno = errno;
	if ((n == -1) && (errno == EINTR))
		goto again;

	if ((n == -1) && (errno == EAGAIN))
		return;

	Log(10, "%s: slot=%u, id=%llu: fd=%d Sent %d bytes [head=%d tail=%d]\n",
		__FUNCTION__, slot, Conns[slot].id, Conns[slot].fd,
		n, Conns[slot].obuf_head, Conns[slot].obuf_tail);
	if (Conn_level >= 10) {
		dump = Conn_dump(buf, n);
		Log(0, "\t%s\n", dump);
		free(dump);
	}

	if (n > 0) {
		Conns[slot].tsend = Conn_now;
		if (n < count) {
			Conns[slot].obuf_head += n;
		} else {
			Conns[slot].obuf_head = 0;
			Conns[slot].obuf_tail = 0;
		}

		Conns[slot].bo += n;
		if (C->band_width > 0) {
			Conns[slot].band_tokens -= n;
			Log(debug_band, "\t%s: BAND: Remove %d tokens -> %u...\n",
				__FUNCTION__,
				n, Conns[slot].band_tokens);
		}
	} else {
		Log(0, "%s: Error in send (slot=%u, id=%llu) [%s]\n",
			__FUNCTION__,
			slot, Conns[slot].id, strerror(errno));
		Conns[slot].error_state = CONN_ERROR_SEND;
		Conns[slot].xerrno = xerrno;
	}
}

static void Conn_recv_cb_i(struct Conn *C)
{
	ssize_t n;
	unsigned int max;
	unsigned int slot;
	int r, xerrno;
	char *dump;

	Log(10, "Conn_recv_cb_i slot=%u, id=%llu, head=%u, tail=%u, size=%u...\n",
		C->slot, C->id, C->ibuf_head, C->ibuf_tail, C->ibuf_size);

	slot = C->slot;

	if (Conns[slot].ibuf_tail == Conns[slot].ibuf_size) {
		r = Conn_try_expand_buf(&Conns[slot], 1, 0);
		if (r != 0) {
			Log(1, "MEM: Cannot expand ibuf!\n");
			return;
		}
	}

	max = Conns[slot].ibuf_size - Conns[slot].ibuf_tail;
	if ((Conn_max_recv > 0) && (max > Conn_max_recv))
		max = Conn_max_recv;

	while (1) {
		n = recv(Conns[slot].fd, Conns[slot].ibuf + Conns[slot].ibuf_tail, max, 0);
		if ((n == -1) && (errno == EINTR))
			continue;

		xerrno = errno;

		break;
	}

	Log(10, "%s: slot=%u, id=%llu: fd=%d Received %d bytes.\n",
		__FUNCTION__, slot, Conns[slot].id, Conns[slot].fd, n);

	if (n > 0) {
		if (Conn_level >= 10) {
			dump = Conn_dump(Conns[slot].ibuf
				+ Conns[slot].ibuf_tail, n);
			Log(0, "\t%s\n", dump);
			free(dump);
		}

		Conns[slot].ibuf_tail += n;

		Conns[slot].bi += n;
		Conns[slot].trecv = Conn_now;

		if (Conns[slot].cb_data)
			Conns[slot].cb_data(&Conns[slot]);
		else if (Conn_data_cb)
			Conn_data_cb(&Conns[slot]);
	} else if (n == 0) {
		Conns[slot].error_state = CONN_ERROR_HANGUP;
	} else {
		Log(4, "Error in recv (slot=%u, id=%llu) [%s]\n",
			slot, Conns[slot].id, strerror(errno));
		Conns[slot].error_state = CONN_ERROR_RECV;
		Conns[slot].xerrno = xerrno;
	}
}

/*
 * Callback that is called for every connection
 */
static void Conn_poll_cb(struct Conn *C, int revents)
{
	unsigned int slot;

	Log(12, "%s: slot=%u, id=%llu, revents=%x.\n",
		__FUNCTION__, C->slot, C->id, revents);
	C->revents = revents;

	if (Conn_level >= 12)
		Log(12, "\t%s\n", Conn_status_slot(C));

	/* We should not have events on a free cell */
	if (C->state == CONN_STATE_FREE)
		abort();

	if (revents & CONN_POLLHUP) {
		C->error_state = CONN_ERROR_HANGUP;
		/* TODO: Add it to the close list to speed it up */
	}

	if (revents & CONN_POLLERR) {
		C->error_state = CONN_ERROR_POLL;
		C->xerrno = 0; /* TODO: unknown error? */
		/* TODO: CONN_ERROR_POLL is correct here? */
	}

	/* First, test we have a new connection */
	if ((revents & CONN_POLLOUT)
		&& (Conn_ignore(C) == 0)) {
		/* We just established a connection */
		if (C->state == CONN_STATE_CONNECT_b) {
			/*
			 * We do not need POLLOUT now - it was used only for
			 * connect completion.
			 */
			revents &= ~CONN_POLLOUT;
			C->events &= ~CONN_POLLOUT;
			Conn_engine_chg_obj(C);

			C->state = CONN_STATE_OPEN;

			if (C->cb_connected != NULL)
				C->cb_connected(C);
			else if (Conn_connected_cb)
				Conn_connected_cb(C);

		}
	}

	/* Second, test for error or input */
	if ((revents & CONN_POLLIN)
		&& (Conn_ignore(C) == 0)) {
		if (C->type == Conn_type_MASTER) {
			/* C pointer can change under us in Conn_accept->Conn_grow */
			slot = C->slot;
			Conn_accept(C);
			C = &Conns[slot];
		} else {
			if (C->cb_recv)
				C->cb_recv(C);
			else if (Conn_recv_cb != NULL)
				Conn_recv_cb(C);
			else
				Conn_recv_cb_i(C);
		}
	}

	if ((revents & CONN_POLLOUT)
		&& (Conn_ignore(C) == 0)) {
		/* We can send data */
		if (C->state == CONN_STATE_OPEN) {
			if (C->cb_send)
				C->cb_send(C);
			else if (Conn_send_cb != NULL)
				Conn_send_cb(C);
			else
				Conn_send_cb_i(C);

			if (C->obuf_head == C->obuf_tail) {
				C->events &= ~CONN_POLLOUT;
				Conn_epoll_chg_obj(C);
				if (C->flags & CONN_FLAGS_CLOSE_AFTER_SEND)
					C->error_state = CONN_ERROR_USERREQ;
			}
		}
	}
}

/*
 * Moving a in-use slot over a free one for compacting reason.
 */
static void Conn_move_slot(const unsigned int dst, const unsigned int src)
{
	struct Conn tmp;

	if (dst == src)
		return;

	Log(10, "%s: Moving slot=%u (id=%llu) over %d...\n",
		__FUNCTION__, src, Conns[src].id, dst);

	tmp = Conns[dst];
	Conns[dst] = Conns[src];
	Conns[dst].slot = dst;
	Conns[src] = tmp;

	Conn_engine_move_slot(dst, src);

	Conn_no--;

	/* We made some space, so accepting again connection */
	Conn_accept_is_allowed = 1;
}

/*
 * Returns: -1 on error, 0 nothing to do, 1 if some work was done
 * timeout is in 1/1000 seconds increments.
 */
int Conn_poll(const int timeout)
{
	int ret;
	int timeout2;
	unsigned int i;
	struct Conn *C;

	Log(11, "Conn_poll(timeout=%d Conn_no=%d)\n",
		timeout, Conn_no);

	if (Conn_no == 0)
		return 0;

	if (timeout == -1)
		timeout2 = 1000;
	else
		timeout2 = timeout;

	loop:
	if (Conn_pending > 0)
		Conn_trytoconnect();

	ret = Conn_engine_poll(timeout2, Conn_poll_cb);
	if (ret < 0)
		return -1;

	Log(9, "Do compacting, expiration and band stuff...\n");
	i = 0;
	while (i < Conn_no) {
		C = &Conns[i];

		/* Closing connection if in error state */
		if (C->error_state > 0)
			Conn_free_intern(C);

		if (C->state == CONN_STATE_FREE) {
			Conn_move_slot(i, Conn_no - 1);
			i++;
			continue;
		}

		/* test if it expired/timeout */
		Conn_expire(C);

		/* add tokens */
		Conn_band_update(C);

		i++;
	}

	/* Blocking accept if full queue or unblock if not */
	Conn_accept_allow();

	if (timeout == -1)
		goto loop;

	return ret;
}


Mode Type Size Ref File
100644 blob 70 9964a59b5d89f394cc4250ed6d6ce67a5f0cd196 .gitignore
100644 blob 1945 fecf0e7a7e8580485101a179685aedc7e00affbb Changelog.pre109
100644 blob 25504 b7e33ed4916a779af85b159240f2db042dcf5749 Conn.c
100644 blob 820 77f3d32a7beb5f3d5e00daa043634309d144d016 Conn.h
100644 blob 726 64b1bad93a84f87c3e93fc24ac5341db691ea578 Conn.spec.in
100644 blob 66 68138d781ca754b15e14c687da91ee261b2c41f3 Conn_config.h.in
100644 blob 19609 ff4c1b30c147990867f91566c9ea7ccf5749a0c0 Conn_engine_core.c
100644 blob 8392 754f621be0df3227c91d89956acd27b4cb4def23 Conn_engine_core.h
100644 blob 3601 bd8a8669bbfbafe3af4b1d3291041ed64577b73a Conn_engine_epoll.c
100644 blob 602 b648aaf1ad3c79ac5b6f425989aa5fee9a0d9f30 Conn_engine_epoll.h
100644 blob 2589 264b9b69fcb9dbda1c562316594208c753e4bacd Conn_engine_poll.c
100644 blob 589 f897f7d70dd5b17256cb51dce4637f0a8cf6291d Conn_engine_poll.h
100644 blob 30 d987fa5df957830331139935d517009e2911b0cf INSTALL
100644 blob 25275 92b8903ff3fea7f49ef5c041b67a087bca21c5ec LICENSE
100644 blob 1255 976d5d795aae2df1b265bb7bb19f9c1761db98e3 Makefile.in
100644 blob 192 5b11bdfb23857d8588845465aef993b320596b44 README
100644 blob 1612 705e412028cffeccaec7303e23fa81e9e78ae6e5 TODO
100755 blob 23 d33bb6c4ecdce1390ce1db3c79ea3b93e22ea755 configure
040000 tree - d4c9c4a69c5cfa2a84316967185f1661b6817779 docs
100755 blob 10344 8acd6afdceefbb056b57e9d09a9943857800df8e duilder
100644 blob 276 348b1930873b2a9aa3498c76af57a28cd62e79fa duilder.conf
040000 tree - 267133ee1eb21f456dd3840596a7941f761bb471 examples
Hints:
Before first commit, do not forget to setup your git environment:
git config --global user.name "your_name_here"
git config --global user.email "your@email_here"

Clone this repository using HTTP(S):
git clone https://rocketgit.com/user/catalinux/Conn

Clone this repository using ssh (do not forget to upload a key first):
git clone ssh://rocketgit@ssh.rocketgit.com/user/catalinux/Conn

Clone this repository using git:
git clone git://git.rocketgit.com/user/catalinux/Conn

You are allowed to anonymously push to this repository.
This means that your pushed commits will automatically be transformed into a merge request:
... clone the repository ...
... make some changes and some commits ...
git push origin main