catalinux / Conn (public) (License: LGPLv2) (since 2016-03-01) (hash sha1)
Net library for easy building ipv4/ipv6 network daemons/clients

/Conn.c (c1b7536b872342827edf09313a4f34eddc11464a) (30346 bytes) (mode 100644) (type blob)

/*
 * Author: Catalin(ux) M BOIE <catab at embedromix.ro>
 * Date: 2004
 * Description:	Some functions to help writing network servers and clients,
 *		both ipv4 and ipv6.
 * Licence: LGPL
 */

#include "Conn.h"
#include "Conn_engine_poll.h"
#include "Conn_engine_epoll.h"


/* Internal variables */
/* Engine */
static unsigned int		Conn_engine;
static unsigned int		Conn_engine_poll_found;
static unsigned int		Conn_engine_epoll_found;

/* Engine functions */
static int (*Conn_engine_init)(void);
static int (*Conn_engine_shutdown)(void);
static int (*Conn_engine_grow)(unsigned int);
static int (*Conn_engine_add_obj)(struct Conn *);
static int (*Conn_engine_del_obj)(struct Conn *);
static int (*Conn_engine_chg_obj)(struct Conn *);
static int (*Conn_engine_poll)(int, void (*cb)(struct Conn *C, const int revents));
static int (*Conn_engine_move_slot)(const unsigned int dst,
		const unsigned int src);


/* Functions */

/*
 * Set prefered engine
 */
int Conn_engine_set(const unsigned int engine)
{
	if (engine == Conn_engine)
		return 0;

	if (engine == CONN_ENGINE_POLL) {
		if (Conn_engine_poll_found == 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot use engine POLL. Probably not supported.");
			return -1;
		}

		#ifdef POLL_FOUND
		/* Set variables */
		CONN_POLLIN = POLLIN;
		CONN_POLLOUT = POLLOUT;
		CONN_POLLPRI = POLLPRI;
		CONN_POLLERR = POLLERR;
		CONN_POLLHUP = POLLHUP;
		CONN_POLLNVAL = POLLNVAL;
		CONN_POLLRDNORM = POLLRDNORM;
		CONN_POLLRDBAND = POLLRDBAND;
		/* Set functions */
		Conn_engine_init = Conn_poll_init;
		Conn_engine_shutdown = Conn_poll_shutdown;
		Conn_engine_grow = Conn_poll_grow;
		Conn_engine_add_obj = Conn_poll_add_obj;
		Conn_engine_del_obj = Conn_poll_del_obj;
		Conn_engine_chg_obj = Conn_poll_chg_obj;
		Conn_engine_poll = Conn_poll_poll;
		Conn_engine_move_slot = Conn_poll_move_slot;
		#endif
	} else if (engine == CONN_ENGINE_EPOLL) {
		if (Conn_engine_epoll_found == 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot use engine EPOLL. Probably not supported.");
			return -1;
		}

		#ifdef EPOLL_FOUND
		/* Set variables */
		CONN_POLLIN = EPOLLIN;
		CONN_POLLOUT = EPOLLOUT;
		CONN_POLLPRI = EPOLLPRI;
		CONN_POLLERR = EPOLLERR;
		CONN_POLLHUP = EPOLLHUP;
		CONN_POLLNVAL = 0; /* not defined for epoll */
		CONN_POLLRDNORM = 0; /* not defined for epoll */
		CONN_POLLRDBAND = 0; /* not defined for epoll */
		/* Set functions */
		Conn_engine_init = Conn_epoll_init;
		Conn_engine_shutdown = Conn_epoll_shutdown;
		Conn_engine_grow = Conn_epoll_grow;
		Conn_engine_add_obj = Conn_epoll_add_obj;
		Conn_engine_del_obj = Conn_epoll_del_obj;
		Conn_engine_chg_obj = Conn_epoll_chg_obj;
		Conn_engine_poll = Conn_epoll_poll;
		Conn_engine_move_slot = Conn_epoll_move_slot;
		#endif
	} else {
		snprintf(Conn_error, sizeof(Conn_error),
			"Cannot use engine %u because is not supported.", engine);
		return -1;
	}

	return 0;
}

/*
 * Reads a value from proc
 */
static void Conn_read_proc(char *buf, const size_t buf_size, const char *file)
{
	int fd;
	ssize_t n;

	fd = open(file, O_RDONLY);
	if (fd == -1) {
		snprintf(buf, buf_size, "ERROR_OPEN!");
		return;
	}

	n = read(fd, buf, buf_size - 1);
	if (n == -1) {
		snprintf(buf, buf_size, "ERROR_READ!");
	} else if (n == 0) {
		strcpy(buf, "");
	} else {
		buf[n - 1] = '\0';
		n--;

		while ((n >= 0) && (buf[n - 1] == '\n')) {
			buf[n - 1] = '\0';
			n--;
		}
	}

	close(fd);
}

/*
 * Returns some important system values
 */
char *Conn_sys(void)
{
	static char ret[512];
	char somaxconn[16];
	char tcp_max_tw_buckets[16];
	char tcp_fin_timeout[16];

	Conn_read_proc(somaxconn, sizeof(somaxconn),
		"/proc/sys/net/core/somaxconn");
	Conn_read_proc(tcp_max_tw_buckets, sizeof(tcp_max_tw_buckets),
		"/proc/sys/net/ipv4/tcp_max_tw_buckets");
	Conn_read_proc(tcp_fin_timeout, sizeof(tcp_fin_timeout),
		"/proc/sys/net/ipv4/tcp_fin_timeout");

	snprintf(ret, sizeof(ret), "net.core.somaxconn=%s"
		" net.ipv4.tcp_max_tw_buckets=%s"
		" net.ipv4.tcp_fin_timeout=%s",
		somaxconn, tcp_max_tw_buckets, tcp_fin_timeout);

	return ret;
}

int Conn_init(const unsigned int max)
{
	unsigned int ret;
	unsigned int engine;

	if (Conn_inited == 1)
		return 0;

	Conn_max = max;

	Conn_no = 0;
	Conn_work_to_do = 0;
	Conn_total = 0;
	Conn_max_reached = 0;
	gettimeofday(&Conn_now, NULL);
	Conn_start = Conn_now.tv_sec;
	Conn_accept_is_allowed = 1;
	Conn_accept_is_allowed_last = 1;
	Conn_allocated = 0;

	snprintf(Conn_error, sizeof(Conn_error), "%s", "");

	/*
	Conn_queue_init(&Conn_queue_free);
	*/

	#ifdef POLL_FOUND
	engine = CONN_ENGINE_POLL;
	Conn_engine_poll_found = 1;
	#endif

	#ifdef EPOLL_FOUND
	engine = CONN_ENGINE_EPOLL;
	Conn_engine_epoll_found = 1;
	#endif

	ret = Conn_engine_set(engine);
	if (ret != 0)
		return -1;

	ret = Conn_engine_init();
	if (ret != 0)
		return -1;

	Conn_inited = 1;

	return 0;
}

/*
 * Shutdown Conn
 */
int Conn_shutdown(void)
{
	int ret;
	unsigned int i;

	Conn_inited = 0;

	ret = Conn_engine_shutdown();
	if (ret < 0)
		return ret;

	/* Free all buffers */
	for (i = 0; i < Conn_allocated - 1; i++) {
		if (Conns[i].ibuf)
			free(Conns[i].ibuf);
		if (Conns[i].obuf)
			free(Conns[i].obuf);
	}

	free(Conns);

	return 0;
}

int Conn_enqueue(struct Conn *C, void *buf, const size_t count)
{
	unsigned int r;
	char *dump;

	if (C == NULL) {
		Log(0, "ERROR: Somebody try to enqueue something to a NULL conn.\n");
		return -1;
	}

	if (Conn_level >= 10) {
		dump = Conn_dump(buf, count);
		Log(0, "\tTry to enqueue %d bytes to slot=%u, id=%llu [%s]...\n",
			count, C->slot, C->id, dump);
		free(dump);
	}

	if (C->obuf_size - C->obuf_tail < count) {
		r = Conn_try_expand_buf(C, 0, count);
		if (r != 0)
			return -1;
	}

	memcpy(C->obuf + C->obuf_tail, buf, count);
	C->obuf_tail += count;

	C->events |= CONN_POLLOUT;
	Conn_engine_chg_obj(C);

	return count;
}

static void Conn_free_intern(struct Conn *C)
{
	Log(11, "%s: Cleaning-up id %llu (slot %u) in state %s [%s]...\n",
		__FUNCTION__, C->id, C->slot, Conn_state(C), Conn_errno(C));

	snprintf(Conn_error, sizeof(Conn_error),
		"%s", Conn_errno(C));

	if (C->error_state != CONN_ERROR_USERREQ)
		Conn_error_raise(C, 0);

	if ((C->state == CONN_STATE_OPEN)
		|| (C->state == CONN_STATE_LISTEN)
		|| (C->state == CONN_STATE_ERROR)) {
		if (C->cb_close)
			C->cb_close(C);
		else if (Conn_close_cb)
			Conn_close_cb(C);
	}

	if (C->fd > -1) {
		Conn_engine_del_obj(C);
		close(C->fd);
		C->fd = -1;
	}

	/* Reset tsend, else we enter in a timeout error loop */
	C->tsend.tv_sec = 0;
	C->tsend.tv_usec = 0;

	/* Reset the connection attempt time */
	C->conn_syn.tv_sec = 0;
	C->conn_syn.tv_usec = 0;

	/* Misc */
	C->error_state = 0;

	if (C->flags & CONN_FLAGS_AUTO_RECONNECT) {
		C->tryat = Conn_now.tv_sec + C->delay;
		C->state = CONN_STATE_CONNECT_0;

		C->ibuf_head = 0;
		C->ibuf_tail = 0;

		C->obuf_head = 0;
		C->obuf_tail = 0;

		Conn_pending++;
	} else {
		C->type = CONN_TYPE_UNK;
		C->state = CONN_STATE_FREE;

		/* Allow connections */
		Conn_accept_is_allowed = 1;

		/* Decrement the number of busy connections */
		Conn_no--;
		Conn_work_to_do--;
	}
}

/*
 * Grow Conn structures (cells)
 */
static int Conn_grow(void)
{
	int ret;
	unsigned int alloc;
	struct Conn *p, *set;

	Log(10, "%s() Try to grow cells from %d to %d.\n",
		__FUNCTION__,
		Conn_allocated, Conn_allocated + 128);

	alloc = Conn_allocated + 128;

	ret = Conn_engine_grow(alloc);
	if (ret != 0)
		return -1;

	p = (struct Conn *) realloc(Conns, alloc * sizeof(struct Conn));
	if (p == NULL)
		return -1;

	set = p + Conn_allocated;
	memset(set, 0, 128 * sizeof(struct Conn));
	Conns = p;

	Conn_allocated = alloc;

	return 0;
}

/*
 * Allocs a Conn structure
 */
struct Conn *Conn_alloc(void)
{
	struct Conn *C;
	unsigned int growok;
	void *p;

	Log(10, "%s() Conn_no=%d Conn_max=%d\n",
		__FUNCTION__,
		Conn_no, Conn_max);

	if ((Conn_max > 0) && (Conn_no >= Conn_max)) {
		snprintf(Conn_error, sizeof(Conn_error),
			"Limit reached! Consider a raise of max connection number or put 0 for no limit.");
		return NULL;
	}

	if (Conn_allocated == Conn_no) {
		growok = Conn_grow();
		if (growok != 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot grow anymore. Probably memory shortage.");
			return NULL;
		}
	}

	if (Conn_no > Conn_max_reached)
		Conn_max_reached = Conn_no;

	C = &Conns[Conn_no];
	C->slot = Conn_no;

	C->type = CONN_TYPE_UNK;
	C->state = CONN_STATE_EMPTY;

	if (C->ibuf_size < Conn_default_ibuf) {
		p = realloc(C->ibuf, Conn_default_ibuf);
		if (p == NULL) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Memory allocation error2!");
			return NULL;
		}
		C->ibuf = p;
		C->ibuf_size = Conn_default_ibuf;
	}
	C->ibuf_head = 0;
	C->ibuf_tail = 0;

	if (C->obuf_size < Conn_default_obuf) {
		p = realloc(C->obuf, Conn_default_obuf);
		if (p == NULL) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Memory allocation error3!");
			return NULL;
		}
		C->obuf = p;
		C->obuf_size = Conn_default_obuf;
	}
	C->obuf_head = 0;
	C->obuf_tail = 0;

	C->trecv = Conn_now;

	C->bi = 0;
	C->bo = 0;
	C->private = NULL;

	/* Reset syn time */
	C->conn_syn.tv_sec = 0;
	C->conn_syn.tv_usec = 0;

	/* bandwidth */
	C->band_width = 0;
	C->band_factor = 0;
	C->band_tokens = 0;
	C->band_lasttime = Conn_now;

	C->fd = -1;
	C->events = 0;
	C->revents = 0;

	C->flags = 0;

	C->start = Conn_now.tv_sec;

	C->id = Conn_id++;

	Conn_no++;
	/* Conn_work_to_do will not be incremented here, only in commit! */

	Log(10, "\tFound free slot=%u, id=%llu. Now Conn_no=%d\n",
		C->slot, C->id, Conn_no);

	if (Conn_no == Conn_max)
		Conn_accept_is_allowed = 0;

	return C;
}

int Conn_set_socket_domain(struct Conn *C, const int domain)
{
	C->sock_domain = domain;
	return 0;
}

int Conn_set_socket_type(struct Conn *C, const int type)
{
	C->sock_type = type;
	return 0;
}

int Conn_set_socket_protocol(struct Conn *C, const int protocol)
{
	C->sock_protocol = protocol;
	return 0;
}

/*
 * Set the address where to bind to
 */
int Conn_set_socket_bind_addr(struct Conn *C, const char *addr)
{
	snprintf(C->bind_addr, sizeof(C->bind_addr), "%s", addr);
	return 0;
}

/*
 * Set address where to connect to
 */
int Conn_set_socket_addr(struct Conn *C, const char *addr)
{
	snprintf(C->addr, sizeof(C->addr), "%s", addr);
	return 0;
}

int Conn_set_socket_bind_port(struct Conn *C, const int port)
{
	C->bind_port = port;
	return 0;
}

int Conn_set_socket_port(struct Conn *C, const int port)
{
	C->port = port;
	return 0;
}

/*
 * Allocates socket and bind if necesary
 * TODO: We should return -1 or free connection and call calbacks?!
 */
int Conn_commit(struct Conn *C)
{
	int i, ret;
	struct sockaddr *psa = NULL, *bind_psa = NULL;
	struct sockaddr_in sa, bind_sa;
	struct sockaddr_in6 sa6, bind_sa6;
	int sock_len = 0, bind_sock_len = 0;
	int do_bind = 1, do_listen = 1, do_connect = 0;
	int first_state;

	/* Try to figure what kind of socket is: client or master */
	if (C->type == CONN_TYPE_UNK) {
		if (strlen(C->addr) > 0) {
			C->type = CONN_TYPE_P2P;
			do_listen = 0;
			do_connect = 1;
		} else {
			C->type = CONN_TYPE_MASTER;
			if (strlen(C->bind_addr) == 0) {
				switch (C->sock_domain) {
				case PF_INET:
					snprintf(C->bind_addr, sizeof(C->bind_addr),
						"0.0.0.0"); break;
				case PF_INET6:
					snprintf(C->bind_addr, sizeof(C->bind_addr),
						"::"); break;
				}
			}
		}
	}

	switch (C->sock_domain) {
		case PF_INET:
			/* for connection socket */
			if (strlen(C->addr) > 0) {
				memset(&sa, 0, sizeof(sa));
				sa.sin_family = AF_INET;
				ret = inet_pton(AF_INET, C->addr, &sa.sin_addr);
				if (ret < 0) {
					snprintf(Conn_error, sizeof(Conn_error),
						"inet_pton(%s) failed", C->addr);
					return -1;
				}
				sa.sin_port = htons(C->port);
				psa = (struct sockaddr *) &sa;
				sock_len = sizeof(sa);
			}

			/* for binding socket */
			if (strlen(C->bind_addr) > 0) {
				memset(&bind_sa, 0, sizeof(bind_sa));
				bind_sa.sin_family = AF_INET;
				ret = inet_pton(AF_INET, C->bind_addr, &bind_sa.sin_addr);
				if (ret < 0) {
					snprintf(Conn_error, sizeof(Conn_error),
						"inet_pton(%s) failed", C->bind_addr);
					return -1;
				}
				bind_sa.sin_port = htons(C->bind_port);
				bind_psa = (struct sockaddr *) &bind_sa;
				bind_sock_len = sizeof(bind_sa);
			}

			if (C->sock_type == SOCK_STREAM) {
				first_state = CONN_STATE_LISTEN;
			} else if (C->sock_type == SOCK_DGRAM) {
				do_listen = 0;
				first_state = CONN_STATE_OPEN;
			}
			break;

		case PF_INET6:
			/* for connection socket */
			if (strlen(C->addr) > 0) {
				memset(&sa6, 0, sizeof(sa6));
				sa6.sin6_family = AF_INET6;
				ret = inet_pton(AF_INET6, C->addr, &sa6.sin6_addr);
				if (ret < 0) {
					snprintf(Conn_error, sizeof(Conn_error),
						"inet_pton(%s) failed", C->addr);
					return -1;
				}
				sa6.sin6_port = htons(C->port);
				psa = (struct sockaddr *) &sa6;
				sock_len = sizeof(sa6);
			}

			/* for binding socket */
			if (strlen(C->bind_addr) > 0) {
				memset(&bind_sa6, 0, sizeof(bind_sa6));
				bind_sa6.sin6_family = AF_INET6;
				ret = inet_pton(AF_INET6, C->bind_addr, &bind_sa6.sin6_addr);
				if (ret < 0) {
					snprintf(Conn_error, sizeof(Conn_error),
						"inet_pton(%s) failed", C->bind_addr);
					return -1;
				}
				bind_sa6.sin6_port = htons(C->bind_port);
				bind_psa = (struct sockaddr *) &bind_sa6;
				bind_sock_len = sizeof(bind_sa6);
			}

			if (C->sock_type == SOCK_STREAM) {
				first_state = CONN_STATE_LISTEN;
			} else if (C->sock_type == SOCK_DGRAM) {
				do_listen = 0;
				first_state = CONN_STATE_OPEN;
			}
			break;

		case PF_PACKET:
			do_bind = 0;
			do_listen = 0; /*TODO:check this!*/
			first_state = CONN_STATE_OPEN;
			break;

		default:
			snprintf(Conn_error, sizeof(Conn_error),
				"Invalid domain [%d]!", C->sock_domain);
			return -1;
	}

	C->fd = socket(C->sock_domain, C->sock_type, C->sock_protocol);
	if (C->fd == -1) {
		snprintf(Conn_error, sizeof(Conn_error),
			"Cannot create socket (%s, %s, %s) [%s]",
			Conn_domain(C), Conn_type(C),
			Conn_get_socket_protocol(C),
			strerror(errno));
		return -1;
	}

	Conn_setnonblock(C->fd);

	if (C->sock_domain == PF_INET6) {
		#ifndef IPV6_V6ONLY
			#define IPV6_V6ONLY 26
		#endif
		i = 1;
		setsockopt(C->fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&i, sizeof(i));
	}

	if (strlen(C->bind_addr) > 0) {
		i = 1;
		setsockopt(C->fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));

		ret = bind(C->fd, bind_psa, bind_sock_len);
		if (ret < 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot bind on %s/%d [%s]",
				C->bind_addr, C->bind_port, strerror(errno));
			goto out_free_fd;
		}
	}

	C->events = CONN_POLLIN;
	C->revents = 0;

	if (do_listen == 1)
		listen(C->fd, 4096);

	if (do_connect == 1) {
		/*TODO:replace connect_a with OPEN?! */
		first_state = CONN_STATE_CONNECT_a;
		C->events |= CONN_POLLOUT;
	}

	ret = Conn_engine_add_obj(C);
	if (ret != 0)
		goto out_free_fd;

	if (do_connect == 1)
		Conn_pending++;

	C->state = first_state;

	Conn_work_to_do++;

	return 0;

	out_free_fd:
	Conn_free_intern(C);

	return -1;
}

/*
 * OBSOLETE
 */
struct Conn *Conn_socket_addr(const int domain, const int type,
	const char *addr, const int port)
{
	struct Conn *C;
	int ret;

	C = Conn_alloc();
	if (C == NULL)
		return NULL;

	Conn_set_socket_domain(C, domain);
	Conn_set_socket_type(C, type);
	Conn_set_socket_bind_addr(C, addr);
	Conn_set_socket_bind_port(C, port);

	ret = Conn_commit(C);
	if (ret != 0)
		return NULL;

	return C;
}

/*
 * OBSOLETE
 */
struct Conn *Conn_socket(const int domain, const int type, const int port)
{
	char *addr;

	switch (domain) {
		case PF_INET: addr = "0.0.0.0"; break;
		case PF_INET6: addr = "::"; break;
		default: addr = NULL;
	}

	return Conn_socket_addr(domain, type, addr, port);
}

/*
 * Connects an allocated socket
 * OBSOLETE!
 */
struct Conn *Conn_connect(const int domain, const int type, const char *addr,
	const int port)
{
	struct Conn *X;
	int ret;

	Log(8, "%s(%s, %d)\n",
		__FUNCTION__, addr, port);

	X = Conn_alloc();
	if (!X)
		return NULL;

	X->type = CONN_TYPE_P2P;
	X->state = CONN_STATE_CONNECT_a;

	Conn_set_socket_domain(X, domain);
	Conn_set_socket_type(X, type);
	Conn_set_socket_addr(X, addr);
	Conn_set_socket_port(X, port);
	ret = Conn_commit(X);
	if (ret != 0)
		return NULL;

	return X;
}

static void Conn_accept(struct Conn *C)
{
	int fd, err;
	struct sockaddr *pca, *psa;
	struct sockaddr_in ca4, sa4;
	struct sockaddr_in6 ca6, sa6;
	socklen_t cax_len, sax_len;
	struct Conn *X;
	unsigned int Cslot;

	Log(10, "Accepting a connection via %s/%d, type %s, domain %s"
		", protocol %s.\n",
		C->bind_addr, C->bind_port, Conn_type(C), Conn_domain(C),
		Conn_get_socket_protocol(C));

	switch(C->sock_domain) {
		case PF_INET:
			pca = (struct sockaddr *) &ca4;
			cax_len = sizeof(ca4);
			psa = (struct sockaddr *) &sa4;
			sax_len = sizeof(sa4);
		break;

		case PF_INET6:
			pca = (struct sockaddr *) &ca6;
			cax_len = sizeof(ca6);
			psa = (struct sockaddr *) &sa6;
			sax_len = sizeof(sa6);
		break;

		default:
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot deal with domain %d.",
				C->sock_domain);
			Conn_error_raise(C, EAFNOSUPPORT);
			return;
	}

	fd = accept(C->fd, pca, &cax_len);
	if (fd == -1) {
		if (errno == EAGAIN)
			return;

		/* TODO: ratelimit */
		Log(9, "WARN: Cannot accept on fd %d [%s].\n",
			C->fd, strerror(errno));
		Conn_error_raise(C, errno);
		return;
	}

	/* After calling Conn_alloc, pointer to slot can change, so C is not valid */
	Cslot = C->slot;
	X = Conn_alloc();
	C = &Conns[Cslot];
	if (!X) {
		Conn_error_raise(C, ENOMEM);
		close(fd);
		return;
	}

	X->fd = fd;
	X->type = CONN_TYPE_P2P;
	X->state = CONN_STATE_OPEN;
	X->time_open = Conn_now;
	X->via = C->id;
	X->events = CONN_POLLIN;

	Conn_set_socket_domain(X, C->sock_domain);
	Conn_set_socket_type(X, C->sock_type);
	Conn_set_socket_protocol(X, C->sock_protocol);

	Conn_setnonblock(X->fd);

	Conn_set_address(X, 0);
	Conn_set_address(X, 1);

	err = Conn_engine_add_obj(X);
	if (err != 0) {
		Conn_error_raise(C, err);
		Conn_free_intern(X);
		return;
	}

	if (C->cb_accept)
		C->cb_accept(X);
	else if (Conn_accept_cb != NULL)
		Conn_accept_cb(X);

	Conn_work_to_do++;

	Conn_total++;
}

static void Conn_accept_allow(void)
{
	unsigned int i;

	if (Conn_accept_is_allowed == Conn_accept_is_allowed_last)
		return;

	Log(10, "%s: Turning accept allow from %d to %d...\n",
		__FUNCTION__, Conn_accept_is_allowed_last,
		Conn_accept_is_allowed);

	for (i = 0; i < Conn_no; i++) {
		if (Conns[i].type != CONN_TYPE_MASTER)
			continue;

		if (Conn_accept_is_allowed == 0)
			Conns[i].events &= ~CONN_POLLIN;
		else	
			Conns[i].events |= CONN_POLLIN;

		Conn_engine_chg_obj(&Conns[i]);
	}

	Conn_accept_is_allowed_last = Conn_accept_is_allowed;
}

/*
 * Add tokens to connection
 */
static void Conn_band_update(struct Conn *C)
{
	long diff;

	/* no need */
	if (C->band_width == 0)
		return;

	diff = (Conn_now.tv_sec - C->band_lasttime.tv_sec) * 1000000;
	diff += Conn_now.tv_usec - C->band_lasttime.tv_usec;
	diff /= 100000;

	/* already added in this hundred of milisecond? */
	if (diff == 0)
		return;

	/* take care of time skew */
	if (diff < 0)
		diff = 1;

	C->band_lasttime = Conn_now;
	C->band_tokens += diff * C->band_width / 10;
	if (C->band_tokens > C->band_factor * C->band_width)
		C->band_tokens = C->band_factor * C->band_width;

	C->events |= CONN_POLLOUT;
	Conn_engine_chg_obj(C);

	Log(debug_band, "\t\tBAND: slot=%u, id=%llu, added tokens -> %u.\n",
		C->slot, C->id, C->band_tokens);
}

/*
 * Set the bandwidth for a connection
 * width is in 1b increments
 */
int Conn_band(struct Conn *C, const unsigned int width,
	const unsigned int factor)
{
	Log(11, "\tConn_band: slot=%u, id=%llu, width=%u, factor=%u.\n",
		C->slot, C->id, width, factor);

	C->band_lasttime = Conn_now;
	C->band_width = width;
	C->band_factor = factor;
	C->band_tokens = factor * width;

	Log(debug_band, "\t\tBAND: lasttime=%d.%06d, width=%u, factor=%u, tokens=%u\n",
		C->band_lasttime.tv_sec, C->band_lasttime.tv_usec,
		C->band_width, C->band_factor, C->band_tokens);

	return 0;
}

static void Conn_trytoconnect(void)
{
	struct addrinfo hints;
	struct addrinfo *res;
	int i, ret;
	char port[8];

	Log(8, "%s: Conn_pending=%d\n",
		__FUNCTION__, Conn_pending);

	for (i = Conn_no - 1; i >= 0; i--) {
		if (Conns[i].type != CONN_TYPE_P2P)
			continue;

		if ((Conns[i].state == CONN_STATE_CONNECT_0)
			&& (Conns[i].tryat <= Conn_now.tv_sec)) {
			Conns[i].state = CONN_STATE_CONNECT_a;
		}

		if (Conns[i].state != CONN_STATE_CONNECT_a)
			continue;

		Log(9, "\tTry to connect slot=%u, id=%llu, to %s/%d...\n",
			Conns[i].slot, Conns[i].id, Conns[i].addr, Conns[i].port);

		memset(&hints, 0, sizeof(hints));
		if (Conns[i].sock_domain == 0)
			hints.ai_family = PF_UNSPEC;
		else
			hints.ai_family = Conns[i].sock_domain;
		hints.ai_socktype = Conns[i].sock_type;
		/*hints.ai_flags = AI_NUMERICHOST;*/
		hints.ai_flags = AI_ADDRCONFIG;
		snprintf(port, sizeof(port), "%d", Conns[i].port);
		res = NULL;
		ret = getaddrinfo(Conns[i].addr, port, &hints, &res);
		if (ret != 0) {
			Conns[i].state = CONN_STATE_ERROR;
			Conns[i].error_state = CONN_ERROR_GETADDRINFO;
			if (res)
				freeaddrinfo(res);
			continue;
		}

		if (Conns[i].fd == -1) {
			Conns[i].fd = socket(res->ai_family, res->ai_socktype, 0);
			if (Conns[i].fd == -1) {
				Conns[i].state = CONN_STATE_ERROR;
				Conns[i].error_state = CONN_ERROR_SOCKET;
				Conns[i].xerrno = errno;
				freeaddrinfo(res);
				continue;
			}
			Log(10, "\tAllocated socket on fd %d\n",
				Conns[i].fd);

			Conn_setnonblock(Conns[i].fd);

			/* Need POLLOUT to signal when the connection was done. */
			Conns[i].events |= (CONN_POLLIN | CONN_POLLOUT);
			ret = Conn_engine_add_obj(&Conns[i]);
			if (ret != 0) {
				Conns[i].state = CONN_STATE_ERROR;
				Conns[i].error_state = CONN_ERROR_SOCKET;
				freeaddrinfo(res);
				continue;
			}
		}

		/* Set syn time */
		Conns[i].conn_syn = Conn_now;

		Log(11, "\tConnecting...\n");
		ret = connect(Conns[i].fd, res->ai_addr, res->ai_addrlen);
		if ((ret != 0) && (errno != EINPROGRESS)) {
			Conns[i].state = CONN_STATE_ERROR;
			Conns[i].error_state = CONN_ERROR_CONNECT;
			Conns[i].xerrno = errno;
			freeaddrinfo(res);
			continue;
		}

		Conns[i].state = CONN_STATE_CONNECT_b;

		Conn_pending--;
		Conn_total++;

		freeaddrinfo(res);
	}
}

static void Conn_send_cb_i(struct Conn *C)
{
	ssize_t n;
	unsigned int max;
	int count;
	unsigned int slot;
	char *buf;
	int xerrno;
	char *dump;

	Log(10, "Conn_send_cb_i slot=%u, id=%llu, head=%u, tail=%u, size=%u...\n",
		C->slot, C->id, C->obuf_head, C->obuf_tail, C->obuf_size);

	slot = C->slot;

	if (Conns[slot].obuf == NULL)
		abort();

	buf = Conns[slot].obuf + Conns[slot].obuf_head;
	count = Conns[slot].obuf_tail - Conns[slot].obuf_head;
	if (count == 0) {
		Log(13, "\tConn_send_cb_i: Empty buffer! Strange!\n");
		return;
	}

	max = count;
	if ((Conn_max_send > 0) && (max > Conn_max_send))
		max = Conn_max_send;

	/* bandwidth */
	if (Conns[slot].band_width > 0) {
		if (max > Conns[slot].band_tokens)
			max = Conns[slot].band_tokens;
		if (max == 0) {
			Log(debug_band, "\tBAND: Suspend 100ms the C (no tokens)!\n"); 
			Conns[slot].events &= ~CONN_POLLOUT;
			Conn_engine_chg_obj(&Conns[slot]);
			return;
		}
	}

	again:
	Log(10, "\tsend(fd=%d, buf (head=%u, tail=%u), max=%d (count=%d), 0)...\n",
		Conns[slot].fd, Conns[slot].obuf_head,
		Conns[slot].obuf_tail, max, count);
	n = send(Conns[slot].fd, buf, max, 0);
	xerrno = errno;
	if ((n == -1) && (errno == EINTR))
		goto again;

	if ((n == -1) && (errno == EAGAIN))
		return;

	Log(10, "%s: slot=%u, id=%llu: fd=%d Sent %d bytes [head=%d tail=%d]\n",
		__FUNCTION__, slot, Conns[slot].id, Conns[slot].fd,
		n, Conns[slot].obuf_head, Conns[slot].obuf_tail);
	if (Conn_level >= 10) {
		dump = Conn_dump(buf, n);
		Log(0, "\t%s\n", dump);
		free(dump);
	}

	if (n > 0) {
		Conns[slot].tsend = Conn_now;
		if (n < count) {
			Conns[slot].obuf_head += n;
		} else {
			Conns[slot].obuf_head = 0;
			Conns[slot].obuf_tail = 0;
		}

		Conns[slot].bo += n;
		if (C->band_width > 0) {
			Conns[slot].band_tokens -= n;
			Log(debug_band, "\t%s: BAND: Remove %d tokens -> %u...\n",
				__FUNCTION__,
				n, Conns[slot].band_tokens);
		}
	} else {
		Log(0, "%s: Error in send (slot=%u, id=%llu) [%s]\n",
			__FUNCTION__,
			slot, Conns[slot].id, strerror(errno));
		Conns[slot].error_state = CONN_ERROR_SEND;
		Conns[slot].xerrno = xerrno;
	}
}

static void Conn_recv_cb_i(struct Conn *C)
{
	ssize_t n;
	unsigned int max;
	unsigned int slot;
	int r, xerrno;
	char *dump;

	Log(10, "Conn_recv_cb_i slot=%u, id=%llu, head=%u, tail=%u, size=%u...\n",
		C->slot, C->id, C->ibuf_head, C->ibuf_tail, C->ibuf_size);

	slot = C->slot;

	if (Conns[slot].ibuf_tail == Conns[slot].ibuf_size) {
		r = Conn_try_expand_buf(&Conns[slot], 1, 0);
		if (r != 0) {
			Log(1, "MEM: Cannot expand ibuf!\n");
			return;
		}
	}

	max = Conns[slot].ibuf_size - Conns[slot].ibuf_tail;
	if ((Conn_max_recv > 0) && (max > Conn_max_recv))
		max = Conn_max_recv;

	while (1) {
		n = recv(Conns[slot].fd, Conns[slot].ibuf + Conns[slot].ibuf_tail, max, 0);
		if ((n == -1) && (errno == EINTR))
			continue;

		xerrno = errno;

		break;
	}

	Log(10, "%s: slot=%u, id=%llu: fd=%d Received %d bytes.\n",
		__FUNCTION__, slot, Conns[slot].id, Conns[slot].fd, n);

	if (n > 0) {
		if (Conn_level >= 10) {
			dump = Conn_dump(Conns[slot].ibuf
				+ Conns[slot].ibuf_tail, n);
			Log(0, "\t%s\n", dump);
			free(dump);
		}

		Conns[slot].ibuf_tail += n;

		Conns[slot].bi += n;
		Conns[slot].trecv = Conn_now;

		if (Conns[slot].cb_data)
			Conns[slot].cb_data(&Conns[slot]);
		else if (Conn_data_cb)
			Conn_data_cb(&Conns[slot]);
	} else if (n == 0) {
		Conns[slot].error_state = CONN_ERROR_HANGUP;
	} else {
		Log(4, "Error in recv (slot=%u, id=%llu) [%s]\n",
			slot, Conns[slot].id, strerror(errno));
		Conns[slot].error_state = CONN_ERROR_RECV;
		Conns[slot].xerrno = xerrno;
	}
}

/*
 * Callback that is called for every connection
 */
static void Conn_poll_cb(struct Conn *C, int revents)
{
	unsigned int slot;

	Log(12, "%s: slot=%u, id=%llu, revents=%x.\n",
		__FUNCTION__, C->slot, C->id, revents);
	C->revents = revents;

	if (Conn_level >= 12)
		Log(12, "\t%s\n", Conn_status_slot(C));

	/* We should not have events on a free cell */
	if (C->state == CONN_STATE_FREE)
		abort();

	if (revents & CONN_POLLHUP) {
		C->error_state = CONN_ERROR_HANGUP;
		/* TODO: Add it to the close list to speed it up */
	}

	if (revents & CONN_POLLERR) {
		C->error_state = CONN_ERROR_POLL;
		C->xerrno = 0; /* TODO: unknown error? */
		/* TODO: CONN_ERROR_POLL is correct here? */
	}

	/* First, test we have a new connection */
	if ((revents & CONN_POLLOUT)
		&& (Conn_ignore(C) == 0)) {
		/* We just established a connection */
		if (C->state == CONN_STATE_CONNECT_b) {
			/*
			 * We do not need POLLOUT now - it was used only for
			 * connect completion.
			 */
			revents &= ~CONN_POLLOUT;
			C->events &= ~CONN_POLLOUT;
			Conn_engine_chg_obj(C);

			C->state = CONN_STATE_OPEN;

			Conn_set_address(C, 0);

			C->time_open = Conn_now;

			if (C->cb_connected != NULL)
				C->cb_connected(C);
			else if (Conn_connected_cb)
				Conn_connected_cb(C);

		}
	}

	/* Second, test for error or input */
	if ((revents & CONN_POLLIN)
		&& (Conn_ignore(C) == 0)) {
		if (C->type == CONN_TYPE_MASTER) {
			/* C pointer can change under us in Conn_accept->Conn_grow */
			slot = C->slot;
			Conn_accept(C);
			C = &Conns[slot];
		} else {
			if (C->cb_recv)
				C->cb_recv(C);
			else if (Conn_recv_cb != NULL)
				Conn_recv_cb(C);
			else
				Conn_recv_cb_i(C);
		}
	}

	if ((revents & CONN_POLLOUT)
		&& (Conn_ignore(C) == 0)) {
		/* We can send data */
		if (C->state == CONN_STATE_OPEN) {
			if (C->cb_send)
				C->cb_send(C);
			else if (Conn_send_cb != NULL)
				Conn_send_cb(C);
			else
				Conn_send_cb_i(C);

			if (C->obuf_head == C->obuf_tail) {
				C->events &= ~CONN_POLLOUT;
				Conn_epoll_chg_obj(C);
				if (C->flags & CONN_FLAGS_CLOSE_AFTER_SEND) {
					C->state = CONN_STATE_ERROR;
					C->error_state = CONN_ERROR_USERREQ;
				}
			}
		}
	}
}

/*
 * Moving a in-use slot over a free one for compacting reason.
 */
static void Conn_move_slot(const unsigned int dst, const unsigned int src)
{
	struct Conn tmp;

	if (dst == src)
		return;

	Log(10, "\t%s: Moving id %llu from slot %u to slot %d...\n",
		__FUNCTION__, Conns[src].id, src, dst);

	/* We need to save old location because of usefull pointers */
	tmp = Conns[dst];
	Conns[dst] = Conns[src];
	Conns[src] = tmp;

	Conns[dst].slot = dst;

	Conn_engine_move_slot(dst, src);
}

/*
 * Does the polling of file descriptors.
 * Returns: -1 on error, 0 nothing to do, 1 if some work was done
 * timeout is in 1/1000 seconds increments.
 */
int Conn_poll(const int timeout)
{
	int ret;
	int timeout2;
	unsigned int i, last;

	Log(9, "%s: timeout=%d Conn_no=%d, Conn_work_to_do=%u)\n",
		__FUNCTION__, timeout, Conn_no, Conn_work_to_do);

	if (timeout == -1)
		timeout2 = 1000;
	else
		timeout2 = timeout;

	loop:
	if (Conn_must_stop == 1)
		return 0;

	if (Conn_work_to_do == 0) {
		Log(9, "%s: work_to_do is 0, so return 0!\n",
			__FUNCTION__);
		return 0;
	}

	if (Conn_pending > 0)
		Conn_trytoconnect();

	ret = Conn_engine_poll(timeout2, Conn_poll_cb);
	if (ret < 0)
		return -1;

	Log(9, "\tDo compacting, expiration and band stuff (%d event(s))...\n",
		ret);
	i = 0;
	while (i < Conn_no) {
		/*
		 * Save last position because Conn_free_intern
		 * decrements Conn_no
		 */
		last = Conn_no - 1;

		/* Closing connection if it is in error state */
		if (Conns[i].error_state > 0) {
			Log(11, "\tSlot %u in error, exchange with pos %u.\n",
				i, last);
			Conn_move_slot(i, last);
			Conn_free_intern(&Conns[last]);
			continue;
		}

		/* No commit done yet */
		if (Conns[i].state == CONN_STATE_EMPTY) {
			i++;
			continue;
		}

		if (Conns[i].state == CONN_STATE_FREE) {
			/* Must not happen! */
			abort();
		}

		/* test if it expired/timeout */
		Conn_expire(&Conns[i]);

		/* add tokens */
		Conn_band_update(&Conns[i]);

		i++;
	}

	/* Blocking accept if full queue or unblock if not */
	Conn_accept_allow();

	if (timeout == -1)
		goto loop;

	return ret;
}

/*
 * Returns the lifetime of a connection
 */
unsigned long long Conn_lifetime(struct Conn *C)
{
	return Conn_time_diff(&Conn_now, &C->time_open);
}


Mode Type Size Ref File
100644 blob 70 9964a59b5d89f394cc4250ed6d6ce67a5f0cd196 .gitignore
100644 blob 1945 fecf0e7a7e8580485101a179685aedc7e00affbb Changelog.pre109
100644 blob 30346 c1b7536b872342827edf09313a4f34eddc11464a Conn.c
100644 blob 1470 afedf88dd9c3b275e9c7d83d6572c2fcce60e50b Conn.h
100644 blob 726 64b1bad93a84f87c3e93fc24ac5341db691ea578 Conn.spec.in
100644 blob 66 68138d781ca754b15e14c687da91ee261b2c41f3 Conn_config.h.in
100644 blob 27985 34e68f838096277281cdb368ad1c3ecc5cd04bc1 Conn_engine_core.c
100644 blob 9091 94b3b4cf54424d7a6e01f8a441c1c048b79cd595 Conn_engine_core.h
100644 blob 3687 8ae607cf9e14a6f9f1d9f17f468294660986e845 Conn_engine_epoll.c
100644 blob 601 b7631d5fc5487b502f02679b0d679661f87b4da9 Conn_engine_epoll.h
100644 blob 2677 e885b694fc0f55200ee50936966a2e40744ebf9b Conn_engine_poll.c
100644 blob 588 b518b20ac383c00b72e96a77a882c6b7ee953f6f Conn_engine_poll.h
100644 blob 30 d987fa5df957830331139935d517009e2911b0cf INSTALL
100644 blob 25275 92b8903ff3fea7f49ef5c041b67a087bca21c5ec LICENSE
100644 blob 1311 3c820df3b36b4bc844c52bc8c7e86f7843b67bb6 Makefile.in
100644 blob 192 5b11bdfb23857d8588845465aef993b320596b44 README
100644 blob 2063 15e6dab8fc121a854413b547d8681c28ca408a2e TODO
100755 blob 23 d33bb6c4ecdce1390ce1db3c79ea3b93e22ea755 configure
040000 tree - d4c9c4a69c5cfa2a84316967185f1661b6817779 docs
100755 blob 10814 7afe0037026192b59c4994780eafab5531fe8fe4 duilder
100644 blob 276 16e0a86bc917accbb9b122e106fb692f4624a680 duilder.conf
040000 tree - ba4a0b66aea6997503ff3d4bbe83e4a3f9a5ff2b examples
040000 tree - 751693d0803f700dd060788cc9383aa24b472267 tests
Hints:
Before first commit, do not forget to setup your git environment:
git config --global user.name "your_name_here"
git config --global user.email "your@email_here"

Clone this repository using HTTP(S):
git clone https://rocketgit.com/user/catalinux/Conn

Clone this repository using ssh (do not forget to upload a key first):
git clone ssh://rocketgit@ssh.rocketgit.com/user/catalinux/Conn

Clone this repository using git:
git clone git://git.rocketgit.com/user/catalinux/Conn

You are allowed to anonymously push to this repository.
This means that your pushed commits will automatically be transformed into a merge request:
... clone the repository ...
... make some changes and some commits ...
git push origin main