catalinux / Conn (public) (License: LGPLv2) (since 2016-03-01) (hash sha1)
Net library for easy building ipv4/ipv6 network daemons/clients

/Conn.c (d85b0e9c52392d2114fd5a46fde6b95469ed4e68) (25523 bytes) (mode 100644) (type blob)

/*
 * Author: Catalin(ux) M BOIE <catab at embedromix.ro>
 * Date: 2004
 * Description:	Some functions to help writing network servers and clients,
 *		both ipv4 and ipv6.
 * Licence: LGPL
 */

#include "Conn.h"
#include "Conn_engine_poll.h"
#include "Conn_engine_epoll.h"


/* Internal variables */
/* Engine */
static unsigned int		Conn_engine;
static unsigned int		Conn_engine_poll_found;
static unsigned int		Conn_engine_epoll_found;

/* Engine functions */
static int (*Conn_engine_init)(void);
static int (*Conn_engine_shutdown)(void);
static int (*Conn_engine_grow)(unsigned int);
static int (*Conn_engine_add_obj)(struct Conn *);
static int (*Conn_engine_del_obj)(struct Conn *);
static int (*Conn_engine_chg_obj)(struct Conn *);
static int (*Conn_engine_poll)(int, void (*cb)(struct Conn *C, const int revents));
static void (*Conn_engine_move_slot)(const unsigned int dst,
		const unsigned int src);


/* Functions */

/*
 * Set prefered engine
 */
int Conn_engine_set(const unsigned int engine)
{
	if (engine == Conn_engine)
		return 0;

	if (engine == CONN_ENGINE_POLL) {
		if (Conn_engine_poll_found == 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot use engine POLL. Probably not supported.");
			return -1;
		}

		#ifdef POLL_FOUND
		/* Set variables */
		CONN_POLLIN = POLLIN;
		CONN_POLLOUT = POLLOUT;
		CONN_POLLPRI = POLLPRI;
		CONN_POLLERR = POLLERR;
		CONN_POLLHUP = POLLHUP;
		CONN_POLLNVAL = POLLNVAL;
		CONN_POLLRDNORM = POLLRDNORM;
		CONN_POLLRDBAND = POLLRDBAND;
		/* Set functions */
		Conn_engine_init = Conn_poll_init;
		Conn_engine_shutdown = Conn_poll_shutdown;
		Conn_engine_grow = Conn_poll_grow;
		Conn_engine_add_obj = Conn_poll_add_obj;
		Conn_engine_del_obj = Conn_poll_del_obj;
		Conn_engine_chg_obj = Conn_poll_chg_obj;
		Conn_engine_poll = Conn_poll_poll;
		Conn_engine_move_slot = Conn_poll_move_slot;
		#endif
	} else if (engine == CONN_ENGINE_EPOLL) {
		if (Conn_engine_epoll_found == 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot use engine EPOLL. Probably not supported.");
			return -1;
		}

		#ifdef EPOLL_FOUND
		/* Set variables */
		CONN_POLLIN = EPOLLIN;
		CONN_POLLOUT = EPOLLOUT;
		CONN_POLLPRI = EPOLLPRI;
		CONN_POLLERR = EPOLLERR;
		CONN_POLLHUP = EPOLLHUP;
		CONN_POLLNVAL = 0; /* not defined for epoll */
		CONN_POLLRDNORM = 0; /* not defined for epoll */
		CONN_POLLRDBAND = 0; /* not defined for epoll */
		/* Set functions */
		Conn_engine_init = Conn_epoll_init;
		Conn_engine_shutdown = Conn_epoll_shutdown;
		Conn_engine_grow = Conn_epoll_grow;
		Conn_engine_add_obj = Conn_epoll_add_obj;
		Conn_engine_del_obj = Conn_epoll_del_obj;
		Conn_engine_chg_obj = Conn_epoll_chg_obj;
		Conn_engine_poll = Conn_epoll_poll;
		Conn_engine_move_slot = Conn_epoll_move_slot;
		#endif
	} else {
		snprintf(Conn_error, sizeof(Conn_error),
			"Cannot use engine %u because is not supported.", engine);
		return -1;
	}

	return 0;
}

int Conn_init(const unsigned int max)
{
	unsigned int ret;
	unsigned int engine;

	if (Conn_inited == 1)
		return 0;

	Conn_max = max;

	Conn_no = 0;
	Conn_total = 0;
	Conn_max_reached = 0;
	gettimeofday(&Conn_now, NULL);
	Conn_start = Conn_now.tv_sec;
	Conn_accept_is_allowed = 1;
	Conn_accept_is_allowed_last = 1;
	Conn_allocated = 0;

	snprintf(Conn_error, sizeof(Conn_error), "%s", "");

	/*
	Conn_queue_init(&Conn_queue_free);
	*/

	#ifdef POLL_FOUND
	engine = CONN_ENGINE_POLL;
	Conn_engine_poll_found = 1;
	#endif

	#ifdef EPOLL_FOUND
	engine = CONN_ENGINE_EPOLL;
	Conn_engine_epoll_found = 1;
	#endif

	ret = Conn_engine_set(engine);
	if (ret != 0)
		return -1;

	ret = Conn_engine_init();
	if (ret != 0)
		return -1;

	Conn_inited = 1;

	return 0;
}

/*
 * Shutdown Conn
 */
int Conn_shutdown(void)
{
	int ret;
	unsigned int i;

	Conn_inited = 0;

	ret = Conn_engine_shutdown();
	if (ret < 0)
		return ret;

	/* Free all buffers */
	for (i = 0; i < Conn_allocated - 1; i++) {
		if (Conns[i].ibuf)
			free(Conns[i].ibuf);
		if (Conns[i].obuf)
			free(Conns[i].obuf);
	}

	free(Conns);

	return 0;
}

int Conn_enqueue(struct Conn *C, void *buf, const size_t count)
{
	unsigned int r;
	char *dump;

	if (C == NULL) {
		Log(0, "ERROR: Somebody try to enqueue something to a NULL conn.\n");
		return -1;
	}

	if (Conn_level >= 10) {
		dump = Conn_dump(buf, count);
		Log(0, "\tTry to enqueue %d bytes to slot=%u, id=%llu [%s]...\n",
			count, C->slot, C->id, dump);
		free(dump);
	}

	if (C->obuf_size - C->obuf_tail < count) {
		r = Conn_try_expand_buf(C, 0, count);
		if (r != 0)
			return -1;
	}

	memcpy(C->obuf + C->obuf_tail, buf, count);
	C->obuf_tail += count;

	C->events |= CONN_POLLOUT;
	Conn_engine_chg_obj(C);

	return 0;
}

static void Conn_free_intern(struct Conn *C)
{
	Log(7, "%s: Cleaning-up slot in %s state, slot=%u, id=%llu [%s]...\n",
		__FUNCTION__, Conn_state(C), C->slot, C->id, Conn_errno(C));

	snprintf(Conn_error, sizeof(Conn_error),
		"%s", Conn_errno(C));

	if (C->error_state != CONN_ERROR_USERREQ) {
		if (C->cb_error)
			C->cb_error(C);
		else if (Conn_error_cb)
			Conn_error_cb(C);
	}

	if ((C->state == CONN_STATE_OPEN) || (C->state == CONN_STATE_LISTEN)) {
		if (C->cb_close)
			C->cb_close(C);
		else if (Conn_close_cb)
			Conn_close_cb(C);
	}

	Conn_engine_del_obj(C);

	if (C->fd > -1) {
		close(C->fd);
		C->fd = -1;
	}

	/* Reset tsend, else we enter in a timeout error loop */
	C->tsend.tv_sec = 0;
	C->tsend.tv_usec = 0;

	/* Reset the connection attempt time */
	C->conn_syn.tv_sec = 0;
	C->conn_syn.tv_usec = 0;

	/* Misc */
	C->error_state = 0;

	if (C->flags & CONN_FLAGS_AUTO_RECONNECT) {
		C->tryat = Conn_now.tv_sec + C->delay;
		C->state = CONN_STATE_CONNECT_0;

		C->ibuf_head = 0;
		C->ibuf_tail = 0;

		C->obuf_head = 0;
		C->obuf_tail = 0;

		Conn_pending++;
	} else {
		C->type = Conn_type_UNK;
		C->state = CONN_STATE_FREE;
	}
}

/*
 * Grow Conn structures (cells)
 */
static int Conn_grow(void)
{
	int ret;
	unsigned int alloc;
	struct Conn *p, *set;

	Log(10, "%s() Try to grow cells from %d to %d.\n",
		__FUNCTION__,
		Conn_allocated, Conn_allocated + 128);

	alloc = Conn_allocated + 128;

	ret = Conn_engine_grow(alloc);
	if (ret != 0)
		return -1;

	p = (struct Conn *) realloc(Conns, alloc * sizeof(struct Conn));
	if (p == NULL)
		return -1;

	set = p + Conn_allocated;
	memset(set, 0, 128 * sizeof(struct Conn));
	Conns = p;

	Conn_allocated = alloc;

	return 0;
}

/*
 * Allocs a Conn structure
 */
static struct Conn *Conn_alloc(void)
{
	struct Conn *C;
	unsigned int growok;
	void *p;

	Log(10, "%s() Conn_no=%d Conn_max=%d\n",
		__FUNCTION__,
		Conn_no, Conn_max);

	if ((Conn_max > 0) && (Conn_no >= Conn_max)) {
		snprintf(Conn_error, sizeof(Conn_error),
			"Limit reached! Consider a raise of max connection number or put 0 for no limit.");
		return NULL;
	}

	if (Conn_allocated == Conn_no) {
		growok = Conn_grow();
		if (growok != 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot grow anymore. Probably memory shortage.");
			return NULL;
		}
	}

	if (Conn_no > Conn_max_reached)
		Conn_max_reached = Conn_no;

	C = &Conns[Conn_no];
	C->slot = Conn_no;

	C->type = Conn_type_UNK;
	C->state = CONN_STATE_FREE;

	if (C->ibuf_size < Conn_default_ibuf) {
		p = realloc(C->ibuf, Conn_default_ibuf);
		if (p == NULL) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Memory allocation error2!");
			return NULL;
		}
		C->ibuf = p;
		C->ibuf_size = Conn_default_ibuf;
	}
	C->ibuf_head = 0;
	C->ibuf_tail = 0;

	if (C->obuf_size < Conn_default_obuf) {
		p = realloc(C->obuf, Conn_default_obuf);
		if (p == NULL) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Memory allocation error3!");
			return NULL;
		}
		C->obuf = p;
		C->obuf_size = Conn_default_obuf;
	}
	C->obuf_head = 0;
	C->obuf_tail = 0;

	C->trecv = Conn_now;

	C->bi = 0;
	C->bo = 0;
	C->private = NULL;

	/* bandwidth */
	C->band_width = 0;
	C->band_factor = 0;
	C->band_tokens = 0;
	C->band_lasttime = Conn_now;

	C->fd = -1;
	C->events = 0;
	C->revents = 0;
	C->state = CONN_STATE_EMPTY;

	C->flags = 0;

	Conn_no++;

	Log(10, "\tFound free slot=%u, id=%llu. Now Conn_no=%d\n",
		C->slot, C->id, Conn_no);

	if (Conn_no == Conn_max)
		Conn_accept_is_allowed = 0;

	return C;
}

struct Conn *Conn_socket_addr(const int domain, const int type,
	const char *addr, const int port)
{
	struct Conn *C;
	int i, ret;
	struct sockaddr *psa = NULL;
	struct sockaddr_in sa;
	struct sockaddr_in6 sa6;
	int sock_len = 0;
	int do_bind = 1, do_listen = 1;
	int protocol = 0;
	int first_state;

	switch (domain) {
		case PF_INET:
			memset(&sa, 0, sizeof(sa));
			sa.sin_family = AF_INET;
			ret = inet_pton(AF_INET, addr, &sa.sin_addr);
			if (ret < 0) {
				snprintf(Conn_error, sizeof(Conn_error),
					"inet_pton(%s) failed", addr);
				return NULL;
			}
			sa.sin_port = htons(port);
			psa = (struct sockaddr *) &sa;
			sock_len = sizeof(sa);
			if (type == SOCK_STREAM) {
				first_state = CONN_STATE_LISTEN;
			} else if (type == SOCK_DGRAM) {
				do_listen = 0;
				first_state = CONN_STATE_OPEN;
			}
			break;

		case PF_INET6:
			memset(&sa6, 0, sizeof(sa6));
			sa6.sin6_family = AF_INET6;
			ret = inet_pton(AF_INET6, addr, &sa6.sin6_addr);
			if (ret < 0) {
				snprintf(Conn_error, sizeof(Conn_error),
					"inet_pton(%s) failed", addr);
				return NULL;
			}
			sa6.sin6_port = htons(port);
			psa = (struct sockaddr *) &sa6;
			sock_len = sizeof(sa6);
			if (type == SOCK_STREAM) {
				first_state = CONN_STATE_LISTEN;
			} else if (type == SOCK_DGRAM) {
				do_listen = 0;
				first_state = CONN_STATE_OPEN;
			}
			break;

		case PF_PACKET:
			do_bind = 0;
			protocol = htons(port);
			first_state = CONN_STATE_OPEN;
			break;

		default:
			snprintf(Conn_error, sizeof(Conn_error),
				"Invalid domain [%d]!", domain);
			return NULL;
	}

	C = Conn_alloc();
	if (!C)
		return NULL;

	C->fd = socket(domain, type, protocol);
	if (C->fd == -1) {
		snprintf(Conn_error, sizeof(Conn_error),
			"Cannot create socket (%d, %d, %d) [%s]",
			domain, type, protocol, strerror(errno));
		C->xerrno = errno;
		goto out;
	}

	Conn_setnonblock(C->fd);

	if (domain == PF_INET6) {
		#ifndef IPV6_V6ONLY
			#define IPV6_V6ONLY 26
		#endif
		i = 1;
		setsockopt(C->fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&i, sizeof(i));
	}

	/* Default, client */
	C->type = Conn_type_CLIENT;

	if (do_bind == 1) {
		i = 1;
		setsockopt(C->fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));

		ret = bind(C->fd, psa, sock_len);
		if (ret < 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot bind [%s]", strerror(errno));
			C->xerrno = errno;
			goto out;
		}

		if (do_listen == 1) {
			listen(C->fd, 128);
			C->type = Conn_type_MASTER;
		}
	}

	C->state = first_state;

	C->start = Conn_now.tv_sec;

	/* Reset syn time */
	C->conn_syn.tv_sec = 0;
	C->conn_syn.tv_usec = 0;

	C->id = Conn_id++;

	snprintf(C->addr, sizeof(C->addr), "%s", addr);
	C->port = port;

	C->sock_domain = domain;
	C->sock_type = type;

	C->events = CONN_POLLIN;
	C->revents = 0;

	ret = Conn_engine_add_obj(C);
	if (ret != 0) {
		C->xerrno = 0;
		goto out;
	}

	return C;

	out:
	C->error_state = CONN_ERROR_SOCKET;
	Conn_free_intern(C);

	return NULL;
}

struct Conn *Conn_socket(const int domain, const int type, const int port)
{
	char *addr;

	switch (domain) {
		case PF_INET: addr = "0.0.0.0"; break;
		case PF_INET6: addr = "::"; break;
		default: addr = NULL;
	}

	return Conn_socket_addr(domain, type, addr, port);
}

struct Conn *Conn_connect(const int domain, const int type, const char *addr,
	const int port)
{
	struct Conn *X;

	Log(8, "%s(%s, %d)\n",
		__FUNCTION__, addr, port);

	X = Conn_alloc();
	if (!X)
		return NULL;

	X->type = Conn_type_CLIENT;
	X->error_state = 0;
	X->state = CONN_STATE_CONNECT_a;
	snprintf(X->addr, sizeof(X->addr), "%s", addr);
	X->port = port;
	X->sock_domain = domain;
	X->sock_type = type;
	X->id = Conn_id++;
	X->start = Conn_now.tv_sec;

	Conn_pending++;

	return X;
}

static void Conn_accept(struct Conn *C)
{
	int fd;
	struct sockaddr *pca;
	struct sockaddr_in ca4;
	struct sockaddr_in6 ca6;
	socklen_t cax_len;
	struct Conn *X;
	unsigned int Cslot;

	Log(10, "Accepting a connection via %s/%d, type %s, domain %s.\n",
		C->addr, C->port, Conn_type(C), Conn_domain(C));

	switch(C->sock_domain) {
		case PF_INET:
			pca = (struct sockaddr *) &ca4;
			cax_len = sizeof(ca4);
		break;

		case PF_INET6:
			pca = (struct sockaddr *) &ca6;
			cax_len = sizeof(ca6);
		break;

		default:
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot deal with domain %d.",
				C->sock_domain);
			C->error_state = CONN_ERROR_SOCKET;
			if (C->cb_error)
				C->cb_error(C);
			else if (Conn_error_cb)
				Conn_error_cb(C);
			return;
	}

	fd = accept(C->fd, pca, &cax_len);
	if (fd == -1) {
		if (errno == EAGAIN)
			return;

		/* TODO: ratelimit */
		Log(9, "WARN: Cannot accept on fd %d [%s].\n",
			C->fd, strerror(errno));
		if (C->cb_accept_error)
			C->cb_accept_error(C);
		else if (Conn_accept_error_cb)
			Conn_accept_error_cb(C);
		return;
	}

	/* After calling Conn_alloc, pointer to slot can change, so C is not valid */
	Cslot = C->slot;
	X = Conn_alloc();
	C = &Conns[Cslot];
	if (!X) {
		if (C->cb_accept_error)
			C->cb_accept_error(C);
		else if (Conn_accept_error_cb)
			Conn_accept_error_cb(C);
		close(fd);
		return;
	}

	switch (C->sock_domain) {
		case PF_INET:
			inet_ntop(C->sock_domain, &ca4.sin_addr, X->addr, sizeof(X->addr));
			X->port = ntohs(ca4.sin_port);
			break;

		case PF_INET6:
			inet_ntop(C->sock_domain, &ca6.sin6_addr, X->addr, sizeof(X->addr));
			X->port = ntohs(ca6.sin6_port);
			break;
	}

	X->type = Conn_type_CLIENT;
	X->error_state = 0;
	X->state = CONN_STATE_OPEN;
	X->via = C->id;
	X->fd = fd;
	X->sock_domain = C->sock_domain;
	X->sock_type = C->sock_type;
	X->start = Conn_now.tv_sec;
	X->id = Conn_id++;
	X->events = CONN_POLLIN;
	X->revents = 0;

	Conn_setnonblock(X->fd);

	Conn_engine_add_obj(X);

	if (C->cb_accept)
		C->cb_accept(X);
	else if (Conn_accept_cb != NULL)
		Conn_accept_cb(X);

	Conn_total++;
}

static void Conn_accept_allow(void)
{
	unsigned int i;

	if (Conn_accept_is_allowed == Conn_accept_is_allowed_last)
		return;

	Log(10, "%s: Turning accept allow from %d to %d...\n",
		__FUNCTION__, Conn_accept_is_allowed_last,
		Conn_accept_is_allowed);

	for (i = 0; i < Conn_no; i++) {
		if (Conns[i].type != Conn_type_MASTER)
			continue;

		if (Conn_accept_is_allowed == 0)
			Conns[i].events &= ~CONN_POLLIN;
		else	
			Conns[i].events |= CONN_POLLIN;

		Conn_engine_chg_obj(&Conns[i]);
	}

	Conn_accept_is_allowed_last = Conn_accept_is_allowed;
}

/*
 * Add tokens to connection
 */
static void Conn_band_update(struct Conn *C)
{
	long diff;

	/* no need */
	if (C->band_width == 0)
		return;

	diff = (Conn_now.tv_sec - C->band_lasttime.tv_sec) * 1000000;
	diff += Conn_now.tv_usec - C->band_lasttime.tv_usec;
	diff /= 100000;

	/* already added in this hundred of milisecond? */
	if (diff == 0)
		return;

	/* take care of time skew */
	if (diff < 0)
		diff = 1;

	C->band_lasttime = Conn_now;
	C->band_tokens += diff * C->band_width / 10;
	if (C->band_tokens > C->band_factor * C->band_width)
		C->band_tokens = C->band_factor * C->band_width;

	C->events |= CONN_POLLOUT;
	Conn_engine_chg_obj(C);

	Log(debug_band, "\t\tBAND: slot=%u, id=%llu, added tokens -> %u.\n",
		C->slot, C->id, C->band_tokens);
}

/*
 * Set the bandwidth for a connection
 * width is in 1b increments
 */
int Conn_band(struct Conn *C, const unsigned int width,
	const unsigned int factor)
{
	Log(11, "\tConn_band: slot=%u, id=%llu, width=%u, factor=%u.\n",
		C->slot, C->id, width, factor);

	C->band_lasttime = Conn_now;
	C->band_width = width;
	C->band_factor = factor;
	C->band_tokens = factor * width;

	Log(debug_band, "\t\tBAND: lasttime=%d.%06d, width=%u, factor=%u, tokens=%u\n",
		C->band_lasttime.tv_sec, C->band_lasttime.tv_usec,
		C->band_width, C->band_factor, C->band_tokens);

	return 0;
}

static void Conn_trytoconnect(void)
{
	struct addrinfo hints;
	struct addrinfo *res;
	int i, ret;
	char port[8];

	Log(8, "%s: Conn_pending=%d\n",
		__FUNCTION__, Conn_pending);

	for (i = Conn_no - 1; i >= 0; i--) {
		if (Conns[i].type != Conn_type_CLIENT)
			continue;

		if ((Conns[i].state == CONN_STATE_CONNECT_0)
			&& (Conns[i].tryat <= Conn_now.tv_sec)) {
			Conns[i].state = CONN_STATE_CONNECT_a;
		}

		if (Conns[i].state != CONN_STATE_CONNECT_a)
			continue;

		Log(9, "\tTry to connect slot=%u, id=%llu, to %s/%d...\n",
			Conns[i].slot, Conns[i].id, Conns[i].addr, Conns[i].port);

		memset(&hints, 0, sizeof(hints));
		if (Conns[i].sock_domain == 0)
			hints.ai_family = PF_UNSPEC;
		else
			hints.ai_family = Conns[i].sock_domain;
		hints.ai_socktype = Conns[i].sock_type;
		/*hints.ai_flags = AI_NUMERICHOST;*/
		hints.ai_flags = AI_ADDRCONFIG;
		snprintf(port, sizeof(port), "%d", Conns[i].port);
		res = NULL;
		ret = getaddrinfo(Conns[i].addr, port, &hints, &res);
		if (ret != 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot call getaddrinfo [%s]", gai_strerror(ret));
			Log(9, "\t%s\n", Conn_error);
			Conns[i].error_state = CONN_ERROR_GETADDRINFO;
			Conn_free_intern(&Conns[i]);
			if (res)
				freeaddrinfo(res);
			continue;
		}

		if (Conns[i].fd == -1) {
			Conns[i].fd = socket(res->ai_family, res->ai_socktype, 0);
			if (Conns[i].fd == -1) {
				snprintf(Conn_error, sizeof(Conn_error),
					"Cannot create socket [%s]", strerror(errno));
				Log(9, "\t%s\n", Conn_error);
				Conns[i].error_state = CONN_ERROR_SOCKET;
				Conn_free_intern(&Conns[i]);
				freeaddrinfo(res);
				continue;
			}
			Log(10, "\tAllocated socket %d\n",
				Conns[i].fd);

			Conn_setnonblock(Conns[i].fd);

			/* Need POLLOUT to signal when the connection was done. */
			Conns[i].events |= (CONN_POLLIN | CONN_POLLOUT);
			Conn_engine_add_obj(&Conns[i]);
		}

		Log(9, "\tConnecting...\n");
		/* Set syn time */
		Conns[i].conn_syn = Conn_now;
		ret = connect(Conns[i].fd, res->ai_addr, res->ai_addrlen);
		if ((ret != 0) && (errno != EINPROGRESS)) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot connect [%d] [%s]",
				errno, strerror(errno));
			Log(9, "\t%s\n", Conn_error);
			Conns[i].error_state = CONN_ERROR_CONNECT;
			Conn_free_intern(&Conns[i]);
			freeaddrinfo(res);
			continue;
		}

		Conns[i].state = CONN_STATE_CONNECT_b;

		Conn_pending--;
		Conn_total++;

		freeaddrinfo(res);
	}
}

static void Conn_send_cb_i(struct Conn *C)
{
	ssize_t n;
	unsigned int max;
	int count;
	unsigned int slot;
	char *buf;
	int xerrno;
	char *dump;

	Log(10, "Conn_send_cb_i slot=%u, id=%llu, head=%u, tail=%u, size=%u...\n",
		C->slot, C->id, C->obuf_head, C->obuf_tail, C->obuf_size);

	slot = C->slot;

	if (Conns[slot].obuf == NULL)
		abort();

	buf = Conns[slot].obuf + Conns[slot].obuf_head;
	count = Conns[slot].obuf_tail - Conns[slot].obuf_head;
	if (count == 0) {
		Log(13, "\tConn_send_cb_i: Empty buffer! Strange!\n");
		return;
	}

	max = count;
	if ((Conn_max_send > 0) && (max > Conn_max_send))
		max = Conn_max_send;

	/* bandwidth */
	if (Conns[slot].band_width > 0) {
		if (max > Conns[slot].band_tokens)
			max = Conns[slot].band_tokens;
		if (max == 0) {
			Log(debug_band, "\tBAND: Suspend 100ms the C (no tokens)!\n"); 
			Conns[slot].events &= ~CONN_POLLOUT;
			Conn_engine_chg_obj(&Conns[slot]);
			return;
		}
	}

	again:
	Log(10, "\tsend(fd=%d, buf (head=%u, tail=%u), max=%d (count=%d), 0)...\n",
		Conns[slot].fd, Conns[slot].obuf_head,
		Conns[slot].obuf_tail, max, count);
	n = send(Conns[slot].fd, buf, max, 0);
	xerrno = errno;
	if ((n == -1) && (errno == EINTR))
		goto again;

	if ((n == -1) && (errno == EAGAIN))
		return;

	Log(10, "%s: slot=%u, id=%llu: fd=%d Sent %d bytes [head=%d tail=%d]\n",
		__FUNCTION__, slot, Conns[slot].id, Conns[slot].fd,
		n, Conns[slot].obuf_head, Conns[slot].obuf_tail);
	if (Conn_level >= 10) {
		dump = Conn_dump(buf, n);
		Log(0, "\t%s\n", dump);
		free(dump);
	}

	if (n > 0) {
		Conns[slot].tsend = Conn_now;
		if (n < count) {
			Conns[slot].obuf_head += n;
		} else {
			Conns[slot].obuf_head = 0;
			Conns[slot].obuf_tail = 0;
		}

		Conns[slot].bo += n;
		if (C->band_width > 0) {
			Conns[slot].band_tokens -= n;
			Log(debug_band, "\t%s: BAND: Remove %d tokens -> %u...\n",
				__FUNCTION__,
				n, Conns[slot].band_tokens);
		}
	} else {
		Log(0, "%s: Error in send (slot=%u, id=%llu) [%s]\n",
			__FUNCTION__,
			slot, Conns[slot].id, strerror(errno));
		Conns[slot].error_state = CONN_ERROR_SEND;
		Conns[slot].xerrno = xerrno;
	}
}

static void Conn_recv_cb_i(struct Conn *C)
{
	ssize_t n;
	unsigned int max;
	unsigned int slot;
	int r, xerrno;
	char *dump;

	Log(10, "Conn_recv_cb_i slot=%u, id=%llu, head=%u, tail=%u, size=%u...\n",
		C->slot, C->id, C->ibuf_head, C->ibuf_tail, C->ibuf_size);

	slot = C->slot;

	if (Conns[slot].ibuf_tail == Conns[slot].ibuf_size) {
		r = Conn_try_expand_buf(&Conns[slot], 1, 0);
		if (r != 0) {
			Log(1, "MEM: Cannot expand ibuf!\n");
			return;
		}
	}

	max = Conns[slot].ibuf_size - Conns[slot].ibuf_tail;
	if ((Conn_max_recv > 0) && (max > Conn_max_recv))
		max = Conn_max_recv;

	while (1) {
		n = recv(Conns[slot].fd, Conns[slot].ibuf + Conns[slot].ibuf_tail, max, 0);
		if ((n == -1) && (errno == EINTR))
			continue;

		xerrno = errno;

		break;
	}

	Log(10, "%s: slot=%u, id=%llu: fd=%d Received %d bytes.\n",
		__FUNCTION__, slot, Conns[slot].id, Conns[slot].fd, n);

	if (n > 0) {
		if (Conn_level >= 10) {
			dump = Conn_dump(Conns[slot].ibuf
				+ Conns[slot].ibuf_tail, n);
			Log(0, "\t%s\n", dump);
			free(dump);
		}

		Conns[slot].ibuf_tail += n;

		Conns[slot].bi += n;
		Conns[slot].trecv = Conn_now;

		if (Conns[slot].cb_data)
			Conns[slot].cb_data(&Conns[slot]);
		else if (Conn_data_cb)
			Conn_data_cb(&Conns[slot]);
	} else if (n == 0) {
		Conns[slot].error_state = CONN_ERROR_HANGUP;
	} else {
		Log(4, "Error in recv (slot=%u, id=%llu) [%s]\n",
			slot, Conns[slot].id, strerror(errno));
		Conns[slot].error_state = CONN_ERROR_RECV;
		Conns[slot].xerrno = xerrno;
	}
}

/*
 * Callback that is called for every connection
 */
static void Conn_poll_cb(struct Conn *C, int revents)
{
	unsigned int slot;

	Log(12, "%s: slot=%u, id=%llu, revents=%x.\n",
		__FUNCTION__, C->slot, C->id, revents);
	C->revents = revents;

	if (Conn_level >= 12)
		Log(12, "\t%s\n", Conn_status_slot(C));

	/* We should not have events on a free cell */
	if (C->state == CONN_STATE_FREE)
		abort();

	if (revents & CONN_POLLHUP) {
		C->error_state = CONN_ERROR_HANGUP;
		/* TODO: Add it to the close list to speed it up */
	}

	if (revents & CONN_POLLERR) {
		C->error_state = CONN_ERROR_POLL;
		C->xerrno = 0; /* TODO: unknown error? */
		/* TODO: CONN_ERROR_POLL is correct here? */
	}

	/* First, test we have a new connection */
	if ((revents & CONN_POLLOUT)
		&& (Conn_ignore(C) == 0)) {
		/* We just established a connection */
		if (C->state == CONN_STATE_CONNECT_b) {
			/*
			 * We do not need POLLOUT now - it was used only for
			 * connect completion.
			 */
			revents &= ~CONN_POLLOUT;
			C->events &= ~CONN_POLLOUT;
			Conn_engine_chg_obj(C);

			C->state = CONN_STATE_OPEN;

			if (C->cb_connected != NULL)
				C->cb_connected(C);
			else if (Conn_connected_cb)
				Conn_connected_cb(C);

		}
	}

	/* Second, test for error or input */
	if ((revents & CONN_POLLIN)
		&& (Conn_ignore(C) == 0)) {
		if (C->type == Conn_type_MASTER) {
			/* C pointer can change under us in Conn_accept->Conn_grow */
			slot = C->slot;
			Conn_accept(C);
			C = &Conns[slot];
		} else {
			if (C->cb_recv)
				C->cb_recv(C);
			else if (Conn_recv_cb != NULL)
				Conn_recv_cb(C);
			else
				Conn_recv_cb_i(C);
		}
	}

	if ((revents & CONN_POLLOUT)
		&& (Conn_ignore(C) == 0)) {
		/* We can send data */
		if (C->state == CONN_STATE_OPEN) {
			if (C->cb_send)
				C->cb_send(C);
			else if (Conn_send_cb != NULL)
				Conn_send_cb(C);
			else
				Conn_send_cb_i(C);

			if (C->obuf_head == C->obuf_tail) {
				C->events &= ~CONN_POLLOUT;
				Conn_epoll_chg_obj(C);
				if (C->flags & CONN_FLAGS_CLOSE_AFTER_SEND)
					C->error_state = CONN_ERROR_USERREQ;
			}
		}
	}
}

/*
 * Moving a in-use slot over a free one for compacting reason.
 */
static void Conn_move_slot(const unsigned int dst, const unsigned int src)
{
	struct Conn tmp;

	if (dst == src)
		return;

	Log(10, "%s: Moving slot=%u (id=%llu) over %d...\n",
		__FUNCTION__, src, Conns[src].id, dst);

	tmp = Conns[dst];
	Conns[dst] = Conns[src];
	Conns[dst].slot = dst;
	Conns[src] = tmp;

	Conn_engine_move_slot(dst, src);

	Conn_no--;

	/* We made some space, so accepting again connection */
	Conn_accept_is_allowed = 1;
}

/*
 * Returns: -1 on error, 0 nothing to do, 1 if some work was done
 * timeout is in 1/1000 seconds increments.
 */
int Conn_poll(const int timeout)
{
	int ret;
	int timeout2;
	unsigned int i;
	struct Conn *C;

	Log(11, "Conn_poll(timeout=%d Conn_no=%d)\n",
		timeout, Conn_no);

	if (Conn_no == 0)
		return 0;

	if (timeout == -1)
		timeout2 = 1000;
	else
		timeout2 = timeout;

	loop:
	if (Conn_pending > 0)
		Conn_trytoconnect();

	ret = Conn_engine_poll(timeout2, Conn_poll_cb);
	if (ret < 0)
		return -1;

	Log(15, "Do compacting, expiration and band stuff...\n");
	i = 0;
	while (i < Conn_no) {
		C = &Conns[i];

		/* Closing connection if it is in error state */
		if (C->error_state > 0)
			Conn_free_intern(C);

		if (C->state == CONN_STATE_FREE) {
			Conn_move_slot(i, Conn_no - 1);
			i++;
			continue;
		}

		/* test if it expired/timeout */
		Conn_expire(C);

		/* add tokens */
		Conn_band_update(C);

		i++;
	}

	/* Blocking accept if full queue or unblock if not */
	Conn_accept_allow();

	if (timeout == -1)
		goto loop;

	return ret;
}


Mode Type Size Ref File
100644 blob 70 9964a59b5d89f394cc4250ed6d6ce67a5f0cd196 .gitignore
100644 blob 1945 fecf0e7a7e8580485101a179685aedc7e00affbb Changelog.pre109
100644 blob 25523 d85b0e9c52392d2114fd5a46fde6b95469ed4e68 Conn.c
100644 blob 820 77f3d32a7beb5f3d5e00daa043634309d144d016 Conn.h
100644 blob 726 64b1bad93a84f87c3e93fc24ac5341db691ea578 Conn.spec.in
100644 blob 66 68138d781ca754b15e14c687da91ee261b2c41f3 Conn_config.h.in
100644 blob 23351 6a1675c08b466c03c54121a053cf00168da08a31 Conn_engine_core.c
100644 blob 9151 948c1652b906eeede45fa9020029a84b5de92a1f Conn_engine_core.h
100644 blob 3601 bd8a8669bbfbafe3af4b1d3291041ed64577b73a Conn_engine_epoll.c
100644 blob 602 b648aaf1ad3c79ac5b6f425989aa5fee9a0d9f30 Conn_engine_epoll.h
100644 blob 2589 264b9b69fcb9dbda1c562316594208c753e4bacd Conn_engine_poll.c
100644 blob 589 f897f7d70dd5b17256cb51dce4637f0a8cf6291d Conn_engine_poll.h
100644 blob 30 d987fa5df957830331139935d517009e2911b0cf INSTALL
100644 blob 25275 92b8903ff3fea7f49ef5c041b67a087bca21c5ec LICENSE
100644 blob 1311 3c820df3b36b4bc844c52bc8c7e86f7843b67bb6 Makefile.in
100644 blob 192 5b11bdfb23857d8588845465aef993b320596b44 README
100644 blob 1679 74fec409297402fe691c8cef3f775994f7d312f2 TODO
100755 blob 23 d33bb6c4ecdce1390ce1db3c79ea3b93e22ea755 configure
040000 tree - d4c9c4a69c5cfa2a84316967185f1661b6817779 docs
100755 blob 10344 8acd6afdceefbb056b57e9d09a9943857800df8e duilder
100644 blob 276 f34bf151ee9b0b2e4731771041f3ee39ffe131db duilder.conf
040000 tree - 06c1a6f477d485f1d6d96b95466ba4338f426879 examples
040000 tree - 753d3d6e89c88cc27b99ef8babbf0f763a3287a7 tests
Hints:
Before first commit, do not forget to setup your git environment:
git config --global user.name "your_name_here"
git config --global user.email "your@email_here"

Clone this repository using HTTP(S):
git clone https://rocketgit.com/user/catalinux/Conn

Clone this repository using ssh (do not forget to upload a key first):
git clone ssh://rocketgit@ssh.rocketgit.com/user/catalinux/Conn

Clone this repository using git:
git clone git://git.rocketgit.com/user/catalinux/Conn

You are allowed to anonymously push to this repository.
This means that your pushed commits will automatically be transformed into a merge request:
... clone the repository ...
... make some changes and some commits ...
git push origin main