catalinux / Conn (public) (License: LGPLv2) (since 2016-03-01) (hash sha1)
Net library for easy building ipv4/ipv6 network daemons/clients

/Conn.c (bee2aa87c3f0e64f6e285d355b16d1625fb83266) (33594 bytes) (mode 100644) (type blob)

/*
 * Author: Catalin(ux) M BOIE <catab at embedromix.ro>
 * Date: 2004
 * Description:	Some functions to help writing network servers and clients,
 *		both ipv4 and ipv6.
 * Licence: LGPL
 */

#include "Conn.h"
#include "Conn_engine_poll.h"
#include "Conn_engine_epoll.h"


/* Internal variables */
/* Engine */
static unsigned int		Conn_engine;
static unsigned int		Conn_engine_poll_found;
static unsigned int		Conn_engine_epoll_found;

/* Engine functions */
static int (*Conn_engine_init)(void);
static int (*Conn_engine_shutdown)(void);
static int (*Conn_engine_grow)(unsigned int);
static int (*Conn_engine_add_obj)(struct Conn *);
static int (*Conn_engine_del_obj)(struct Conn *);
static int (*Conn_engine_chg_obj)(struct Conn *);
static int (*Conn_engine_poll)(int, void (*cb)(const unsigned int slot, const int revents));
static int (*Conn_engine_move_slot)(const unsigned int dst,
		const unsigned int src);


/* Functions */

/*
 * Set prefered engine
 */
int Conn_engine_set(const unsigned int engine)
{
	if (engine == Conn_engine)
		return 0;

	if (engine == CONN_ENGINE_POLL) {
		if (Conn_engine_poll_found == 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot use engine POLL. Probably not supported.");
			return -1;
		}

		#ifdef POLL_FOUND
		/* Set variables */
		CONN_POLLIN = POLLIN;
		CONN_POLLOUT = POLLOUT;
		CONN_POLLPRI = POLLPRI;
		CONN_POLLERR = POLLERR;
		CONN_POLLHUP = POLLHUP;
		CONN_POLLNVAL = POLLNVAL;
		CONN_POLLRDNORM = POLLRDNORM;
		CONN_POLLRDBAND = POLLRDBAND;
		/* Set functions */
		Conn_engine_init = Conn_poll_init;
		Conn_engine_shutdown = Conn_poll_shutdown;
		Conn_engine_grow = Conn_poll_grow;
		Conn_engine_add_obj = Conn_poll_add_obj;
		Conn_engine_del_obj = Conn_poll_del_obj;
		Conn_engine_chg_obj = Conn_poll_chg_obj;
		Conn_engine_poll = Conn_poll_poll;
		Conn_engine_move_slot = Conn_poll_move_slot;
		#endif
	} else if (engine == CONN_ENGINE_EPOLL) {
		if (Conn_engine_epoll_found == 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot use engine EPOLL. Probably not supported.");
			return -1;
		}

		#ifdef EPOLL_FOUND
		/* Set variables */
		CONN_POLLIN = EPOLLIN;
		CONN_POLLOUT = EPOLLOUT;
		CONN_POLLPRI = EPOLLPRI;
		CONN_POLLERR = EPOLLERR;
		CONN_POLLHUP = EPOLLHUP;
		CONN_POLLNVAL = 0; /* not defined for epoll */
		CONN_POLLRDNORM = 0; /* not defined for epoll */
		CONN_POLLRDBAND = 0; /* not defined for epoll */
		/* Set functions */
		Conn_engine_init = Conn_epoll_init;
		Conn_engine_shutdown = Conn_epoll_shutdown;
		Conn_engine_grow = Conn_epoll_grow;
		Conn_engine_add_obj = Conn_epoll_add_obj;
		Conn_engine_del_obj = Conn_epoll_del_obj;
		Conn_engine_chg_obj = Conn_epoll_chg_obj;
		Conn_engine_poll = Conn_epoll_poll;
		Conn_engine_move_slot = Conn_epoll_move_slot;
		#endif
	} else {
		snprintf(Conn_error, sizeof(Conn_error),
			"Cannot use engine %u because is not supported.", engine);
		return -1;
	}

	return 0;
}

/*
 * Reads a value from proc
 */
static void Conn_read_proc(char *buf, const size_t buf_size, const char *file)
{
	int fd;
	ssize_t n;

	fd = open(file, O_RDONLY);
	if (fd == -1) {
		snprintf(buf, buf_size, "ERROR_OPEN!");
		return;
	}

	n = read(fd, buf, buf_size - 1);
	if (n == -1) {
		snprintf(buf, buf_size, "ERROR_READ!");
	} else if (n == 0) {
		strcpy(buf, "");
	} else {
		buf[n - 1] = '\0';
		n--;

		while ((n >= 0) && (buf[n - 1] == '\n')) {
			buf[n - 1] = '\0';
			n--;
		}
	}

	close(fd);
}

/*
 * Returns some important system values
 */
char *Conn_sys(void)
{
	static char ret[512];
	char somaxconn[16];
	char tcp_max_tw_buckets[16];
	char tcp_fin_timeout[16];

	Conn_read_proc(somaxconn, sizeof(somaxconn),
		"/proc/sys/net/core/somaxconn");
	Conn_read_proc(tcp_max_tw_buckets, sizeof(tcp_max_tw_buckets),
		"/proc/sys/net/ipv4/tcp_max_tw_buckets");
	Conn_read_proc(tcp_fin_timeout, sizeof(tcp_fin_timeout),
		"/proc/sys/net/ipv4/tcp_fin_timeout");

	snprintf(ret, sizeof(ret), "net.core.somaxconn=%s"
		" net.ipv4.tcp_max_tw_buckets=%s"
		" net.ipv4.tcp_fin_timeout=%s",
		somaxconn, tcp_max_tw_buckets, tcp_fin_timeout);

	return ret;
}

int Conn_init(const unsigned int max)
{
	unsigned int ret;
	unsigned int engine;

	if (Conn_inited == 1)
		return 0;

	Conn_max = max;

	Conn_no = 0;
	Conn_work_to_do = 0;
	Conn_total = 0;
	Conn_max_reached = 0;
	gettimeofday(&Conn_now, NULL);
	Conn_start = Conn_now.tv_sec;
	Conn_accept_is_allowed = 1;
	Conn_accept_is_allowed_last = 1;
	Conn_allocated = 0;

	snprintf(Conn_error, sizeof(Conn_error), "%s", "");

	/*
	Conn_queue_init(&Conn_queue_free);
	*/

	#ifdef POLL_FOUND
	engine = CONN_ENGINE_POLL;
	Conn_engine_poll_found = 1;
	#endif

	#ifdef EPOLL_FOUND
	engine = CONN_ENGINE_EPOLL;
	Conn_engine_epoll_found = 1;
	#endif

	ret = Conn_engine_set(engine);
	if (ret != 0)
		return -1;

	ret = Conn_engine_init();
	if (ret != 0)
		return -1;

	Conn_inited = 1;

	return 0;
}

/*
 * Shutdown Conn
 */
int Conn_shutdown(void)
{
	int ret;
	unsigned int slot;

	Conn_inited = 0;

	ret = Conn_engine_shutdown();
	if (ret < 0)
		return ret;

	/* Free all buffers */
	Log(5, "Freeing %u slots...\n", Conn_allocated);
	for (slot = 0; slot < Conn_allocated - 1; slot++) {
		if (Conns[slot].ibuf)
			free(Conns[slot].ibuf);
		if (Conns[slot].obuf)
			free(Conns[slot].obuf);

		if (Conns[slot].fd != -1) {
			Log(6, "Closing slot %u, fd %d...\n",
			    slot, Conns[slot].fd);
			close(Conns[slot].fd);
		}
	}

	free(Conns);

	return 0;
}

int Conn_enqueue(struct Conn *C, void *buf, const size_t count)
{
	unsigned int r, slot;
	char *dump;

	if (C == NULL) {
		Log(0, "ERROR: Somebody try to enqueue something to a NULL conn.\n");
		return -1;
	}

	slot = C->slot;

	if (Conn_level >= 10) {
		dump = Conn_dump(buf, count);
		Log(0, "\tTry to enqueue %d bytes to slot=%u, id=%llu [%s]...\n",
			count, slot, Conns[slot].id, dump);
		free(dump);
	}

	if (Conns[slot].obuf_size - Conns[slot].obuf_tail < count) {
		r = Conn_try_expand_buf(slot, 0, count);
		if (r != 0)
			return -1;
	}

	memcpy(Conns[slot].obuf + Conns[slot].obuf_tail, buf, count);
	Conns[slot].obuf_tail += count;

	Conns[slot].events |= CONN_POLLOUT;
	Conn_engine_chg_obj(&Conns[slot]);

	return count;
}

static void Conn_free_intern(const unsigned int slot)
{
	Log(11, "%s: Cleaning-up id %llu (slot=%u, fd=%d) in state %s [%s]...\n",
		__FUNCTION__, Conns[slot].id, slot, Conns[slot].fd,
		Conn_state(&Conns[slot]), Conn_errno(&Conns[slot]));

	if (Conns[slot].error_state != CONN_ERROR_USERREQ)
		Conn_error_raise(slot, 0);

	if ((Conns[slot].state == CONN_STATE_OPEN)
		|| (Conns[slot].state == CONN_STATE_LISTEN)
		|| (Conns[slot].state == CONN_STATE_ERROR)) {
		if (Conns[slot].cb_close)
			Conns[slot].cb_close(&Conns[slot]);
		else if (Conn_close_cb)
			Conn_close_cb(&Conns[slot]);
	}

	if (Conns[slot].fd > -1) {
		close(Conns[slot].fd);
		Conns[slot].fd = -1;
	}

	/* Reset tsend, else we enter in a timeout error loop */
	Conns[slot].tsend.tv_sec = 0;
	Conns[slot].tsend.tv_usec = 0;

	/* Reset the connection attempt time */
	Conns[slot].conn_syn.tv_sec = 0;
	Conns[slot].conn_syn.tv_usec = 0;

	/* Misc */
	Conns[slot].error_state = 0;

	if (Conns[slot].flags & CONN_FLAGS_AUTO_RECONNECT) {
		Conns[slot].tryat = Conn_now.tv_sec + Conns[slot].delay;
		Conns[slot].state = CONN_STATE_CONNECT_0;

		Conns[slot].ibuf_head = 0;
		Conns[slot].ibuf_tail = 0;

		Conns[slot].obuf_head = 0;
		Conns[slot].obuf_tail = 0;

		Conn_pending++;
	} else {
		Conns[slot].type = CONN_TYPE_UNK;
		Conns[slot].state = CONN_STATE_FREE;

		/* Allow connections */
		Conn_accept_is_allowed = 1;

		/* Decrement the number of busy connections */
		Conn_no--;
		Conn_work_to_do--;
	}
}

/*
 * Grow Conn structures (cells)
 */
static int Conn_grow(void)
{
	int ret;
	unsigned int alloc, increment = 128, i;
	struct Conn *p, *set;

	Log(10, "%s() Try to grow cells from %d to %d.\n",
		__FUNCTION__,
		Conn_allocated, Conn_allocated + 128);

	alloc = Conn_allocated + increment;

	ret = Conn_engine_grow(alloc);
	if (ret != 0)
		return -1;

	p = (struct Conn *) realloc(Conns, alloc * sizeof(struct Conn));
	if (p == NULL)
		return -1;

	Conn_mem_structs += increment * sizeof(struct Conn);

	set = p + Conn_allocated;
	memset(set, 0, increment * sizeof(struct Conn));
	/* Some inits */
	for (i = 0; i < 128; i++)
		set[i].fd = -1;

	Conns = p;
	Conn_allocated = alloc;

	return 0;
}

/*
 * Allocs a Conn structure
 */
struct Conn *Conn_alloc(void)
{
	unsigned int growok;
	void *p;
	unsigned int slot;

	Log(10, "%s() Conn_no=%d Conn_max=%d\n",
		__FUNCTION__,
		Conn_no, Conn_max);

	if ((Conn_max > 0) && (Conn_no >= Conn_max)) {
		snprintf(Conn_error, sizeof(Conn_error),
			"Limit reached! Consider a raise of max connection number or put 0 for no limit.");
		return NULL;
	}

	if (Conn_allocated == Conn_no) {
		growok = Conn_grow();
		if (growok != 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot grow anymore. Probably memory shortage.");
			return NULL;
		}
	}

	if (Conn_no > Conn_max_reached)
		Conn_max_reached = Conn_no;

	slot = Conn_no;
	Conns[slot].slot = slot;

	Conns[slot].type = CONN_TYPE_UNK;
	Conns[slot].state = CONN_STATE_EMPTY;

	if (Conns[slot].ibuf_size < Conn_default_ibuf) {
		p = realloc(Conns[slot].ibuf, Conn_default_ibuf);
		if (p == NULL) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Memory allocation error2!");
			return NULL;
		}
		Conn_mem_buffers_in += Conn_default_ibuf - Conns[slot].ibuf_size;
		Conns[slot].ibuf = p;
		Conns[slot].ibuf_size = Conn_default_ibuf;
	}
	Conns[slot].ibuf_head = 0;
	Conns[slot].ibuf_tail = 0;

	if (Conns[slot].obuf_size < Conn_default_obuf) {
		p = realloc(Conns[slot].obuf, Conn_default_obuf);
		if (p == NULL) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Memory allocation error3!");
			return NULL;
		}
		Conn_mem_buffers_out += Conn_default_obuf - Conns[slot].obuf_size;
		Conns[slot].obuf = p;
		Conns[slot].obuf_size = Conn_default_obuf;
	}
	Conns[slot].obuf_head = 0;
	Conns[slot].obuf_tail = 0;

	Conns[slot].trecv = Conn_now;

	Conns[slot].bi = 0;
	Conns[slot].bo = 0;
	Conns[slot].private = NULL;

	/* Reset syn time */
	Conns[slot].conn_syn.tv_sec = 0;
	Conns[slot].conn_syn.tv_usec = 0;

	/* bandwidth */
	Conns[slot].band_width = 0;
	Conns[slot].band_factor = 0;
	Conns[slot].band_tokens = 0;
	Conns[slot].band_lasttime = Conn_now;

	Conns[slot].fd = -1;
	Conns[slot].events = 0;
	Conns[slot].revents = 0;

	Conns[slot].flags = 0;

	Conns[slot].start = Conn_now.tv_sec;

	Conns[slot].id = Conn_id++;

	Conn_no++;
	/* Conn_work_to_do will not be incremented here, only in commit! */

	Log(10, "\tFound free slot=%u, id=%llu. Now Conn_no=%d\n",
		slot, Conns[slot].id, Conn_no);

	if (Conn_no == Conn_max)
		Conn_accept_is_allowed = 0;

	return &Conns[slot];
}

int Conn_set_socket_domain(struct Conn *C, const int domain)
{
	C->sock_domain = domain;
	return 0;
}

int Conn_set_socket_type(struct Conn *C, const int type)
{
	C->sock_type = type;
	return 0;
}

int Conn_set_socket_protocol(struct Conn *C, const int protocol)
{
	C->sock_protocol = protocol;
	return 0;
}

/*
 * Set the address where to bind to
 */
int Conn_set_socket_bind_addr(struct Conn *C, const char *addr)
{
	snprintf(C->bind_addr, sizeof(C->bind_addr), "%s", addr);
	return 0;
}

/*
 * Set address where to connect to
 */
int Conn_set_socket_addr(struct Conn *C, const char *addr)
{
	snprintf(C->addr, sizeof(C->addr), "%s", addr);
	return 0;
}

int Conn_set_socket_bind_port(struct Conn *C, const int port)
{
	C->bind_port = port;
	return 0;
}

int Conn_set_socket_port(struct Conn *C, const int port)
{
	C->port = port;
	return 0;
}

/*
 * Allocates socket and bind if necesary
 * TODO: We should return -1 or free connection and call calbacks?!
 */
int Conn_commit(struct Conn *C)
{
	int i, ret;
	struct sockaddr *psa = NULL, *bind_psa = NULL;
	struct sockaddr_in sa, bind_sa;
	struct sockaddr_in6 sa6, bind_sa6;
	int sock_len = 0, bind_sock_len = 0;
	int do_bind = 1, do_listen = 1, do_connect = 0;
	int first_state;
	unsigned int slot;

	slot = C->slot;

	Log(10, "%s: slot=%u...\n", __FUNCTION__, slot);

	/* Be optimistical and increment here, in 'free' we will decrement. */
	/* So, in error case, we will only decrement this thing! */
	Conn_work_to_do++;

	/* Try to figure what kind of socket is: client or master */
	if (Conns[slot].type == CONN_TYPE_UNK) {
		if (strlen(Conns[slot].addr) > 0) {
			Conns[slot].type = CONN_TYPE_P2P;
			do_listen = 0;
			do_connect = 1;
		} else {
			Conns[slot].type = CONN_TYPE_MASTER;
			if (strlen(Conns[slot].bind_addr) == 0) {
				switch (Conns[slot].sock_domain) {
				case PF_INET:
					snprintf(Conns[slot].bind_addr, sizeof(Conns[slot].bind_addr),
						"0.0.0.0"); break;
				case PF_INET6:
					snprintf(Conns[slot].bind_addr, sizeof(Conns[slot].bind_addr),
						"::"); break;
				}
			}
		}
	}

	switch (Conns[slot].sock_domain) {
		case PF_INET:
			/* for connection socket */
			if (strlen(Conns[slot].addr) > 0) {
				memset(&sa, 0, sizeof(sa));
				sa.sin_family = AF_INET;
				ret = inet_pton(AF_INET, Conns[slot].addr, &sa.sin_addr);
				if (ret < 0) {
					snprintf(Conn_error, sizeof(Conn_error),
						"inet_pton(%s) failed", Conns[slot].addr);
					return -1;
				}
				sa.sin_port = htons(Conns[slot].port);
				psa = (struct sockaddr *) &sa;
				sock_len = sizeof(sa);
			}

			/* for binding socket */
			if (strlen(Conns[slot].bind_addr) > 0) {
				memset(&bind_sa, 0, sizeof(bind_sa));
				bind_sa.sin_family = AF_INET;
				ret = inet_pton(AF_INET, Conns[slot].bind_addr, &bind_sa.sin_addr);
				if (ret < 0) {
					snprintf(Conn_error, sizeof(Conn_error),
						"inet_pton(%s) failed", Conns[slot].bind_addr);
					return -1;
				}
				bind_sa.sin_port = htons(Conns[slot].bind_port);
				bind_psa = (struct sockaddr *) &bind_sa;
				bind_sock_len = sizeof(bind_sa);
			}

			if (Conns[slot].sock_type == SOCK_STREAM) {
				first_state = CONN_STATE_LISTEN;
			} else if (Conns[slot].sock_type == SOCK_DGRAM) {
				do_listen = 0;
				first_state = CONN_STATE_OPEN;
			}
			break;

		case PF_INET6:
			/* for connection socket */
			if (strlen(Conns[slot].addr) > 0) {
				memset(&sa6, 0, sizeof(sa6));
				sa6.sin6_family = AF_INET6;
				ret = inet_pton(AF_INET6, Conns[slot].addr, &sa6.sin6_addr);
				if (ret < 0) {
					snprintf(Conn_error, sizeof(Conn_error),
						"inet_pton(%s) failed", Conns[slot].addr);
					return -1;
				}
				sa6.sin6_port = htons(Conns[slot].port);
				psa = (struct sockaddr *) &sa6;
				sock_len = sizeof(sa6);
			}

			/* for binding socket */
			if (strlen(Conns[slot].bind_addr) > 0) {
				memset(&bind_sa6, 0, sizeof(bind_sa6));
				bind_sa6.sin6_family = AF_INET6;
				ret = inet_pton(AF_INET6, Conns[slot].bind_addr, &bind_sa6.sin6_addr);
				if (ret < 0) {
					snprintf(Conn_error, sizeof(Conn_error),
						"inet_pton(%s) failed", Conns[slot].bind_addr);
					return -1;
				}
				bind_sa6.sin6_port = htons(Conns[slot].bind_port);
				bind_psa = (struct sockaddr *) &bind_sa6;
				bind_sock_len = sizeof(bind_sa6);
			}

			if (Conns[slot].sock_type == SOCK_STREAM) {
				first_state = CONN_STATE_LISTEN;
			} else if (Conns[slot].sock_type == SOCK_DGRAM) {
				do_listen = 0;
				first_state = CONN_STATE_OPEN;
			}
			break;

		case PF_PACKET:
			do_bind = 0;
			do_listen = 0; /*TODO:check this!*/
			first_state = CONN_STATE_OPEN;
			break;

		default:
			snprintf(Conn_error, sizeof(Conn_error),
				"Invalid domain [%d]!", Conns[slot].sock_domain);
			return -1;
	}

	Conns[slot].fd = socket(Conns[slot].sock_domain, Conns[slot].sock_type, Conns[slot].sock_protocol);
	if (Conns[slot].fd == -1) {
		snprintf(Conn_error, sizeof(Conn_error),
			"Cannot create socket (%s, %s, %s) [%s]",
			Conn_domain(&Conns[slot]), Conn_type(&Conns[slot]),
			Conn_get_socket_protocol(&Conns[slot]),
			strerror(errno));
		return -1;
	}

	Conn_setnonblock(Conns[slot].fd);

	if (Conns[slot].sock_domain == PF_INET6) {
		#ifndef IPV6_V6ONLY
			#define IPV6_V6ONLY 26
		#endif
		i = 1;
		setsockopt(Conns[slot].fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&i, sizeof(i));
	}

	if (strlen(Conns[slot].bind_addr) > 0) {
		i = 1;
		setsockopt(Conns[slot].fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));

		ret = bind(Conns[slot].fd, bind_psa, bind_sock_len);
		if (ret < 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot bind on %s/%d [%s]",
				Conns[slot].bind_addr, Conns[slot].bind_port, strerror(errno));
			goto out_free_fd;
		}
	}

	Conns[slot].events = CONN_POLLIN;
	Conns[slot].revents = 0;

	if (do_listen == 1)
		listen(Conns[slot].fd, 4096);

	if (do_connect == 1) {
		/*TODO:replace connect_a with OPEN?! */
		first_state = CONN_STATE_CONNECT_a;
		Conns[slot].events |= CONN_POLLOUT;
	}

	ret = Conn_engine_add_obj(&Conns[slot]);
	if (ret != 0)
		goto out_free_fd;

	if (do_connect == 1)
		Conn_pending++;

	Conns[slot].state = first_state;

	return 0;

	out_free_fd:
	Conn_free_intern(slot);

	return -1;
}

/*
 * OBSOLETE
 */
struct Conn *Conn_socket_addr(const int domain, const int type,
	const char *addr, const int port)
{
	struct Conn *C;
	int ret;

	C = Conn_alloc();
	if (C == NULL)
		return NULL;

	Conn_set_socket_domain(C, domain);
	Conn_set_socket_type(C, type);
	Conn_set_socket_bind_addr(C, addr);
	Conn_set_socket_bind_port(C, port);

	ret = Conn_commit(C);
	if (ret != 0)
		return NULL;

	return C;
}

/*
 * OBSOLETE
 */
struct Conn *Conn_socket(const int domain, const int type, const int port)
{
	char *addr;

	switch (domain) {
		case PF_INET: addr = "0.0.0.0"; break;
		case PF_INET6: addr = "::"; break;
		default: addr = NULL;
	}

	return Conn_socket_addr(domain, type, addr, port);
}

/*
 * Connects an allocated socket
 * OBSOLETE!
 */
struct Conn *Conn_connect(const int domain, const int type, const char *addr,
	const int port)
{
	struct Conn *X;
	int ret;

	Log(8, "%s(%s, %d)\n",
		__FUNCTION__, addr, port);

	X = Conn_alloc();
	if (!X)
		return NULL;

	Conn_set_socket_domain(X, domain);
	Conn_set_socket_type(X, type);
	Conn_set_socket_addr(X, addr);
	Conn_set_socket_port(X, port);
	ret = Conn_commit(X);
	if (ret != 0)
		return NULL;

	return X;
}

static void Conn_accept(const unsigned int slot)
{
	int fd, err;
	struct sockaddr *pca, *psa;
	struct sockaddr_in ca4, sa4;
	struct sockaddr_in6 ca6, sa6;
	socklen_t cax_len, sax_len;
	struct Conn *X;

	Log(10, "Accepting a connection on slot=%u via %s/%d, type %s, domain %s"
		", protocol %s.\n",
		slot, Conns[slot].bind_addr, Conns[slot].bind_port, Conn_type(&Conns[slot]),
		Conn_domain(&Conns[slot]), Conn_get_socket_protocol(&Conns[slot]));

	switch(Conns[slot].sock_domain) {
		case PF_INET:
			pca = (struct sockaddr *) &ca4;
			cax_len = sizeof(ca4);
			psa = (struct sockaddr *) &sa4;
			sax_len = sizeof(sa4);
		break;

		case PF_INET6:
			pca = (struct sockaddr *) &ca6;
			cax_len = sizeof(ca6);
			psa = (struct sockaddr *) &sa6;
			sax_len = sizeof(sa6);
		break;

		default:
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot deal with domain %d.",
				Conns[slot].sock_domain);
			Conn_error_raise(slot, EAFNOSUPPORT);
			return;
	}

	fd = accept4(Conns[slot].fd, pca, &cax_len, SOCK_NONBLOCK | SOCK_CLOEXEC);
	if (fd == -1) {
		if (errno == EAGAIN)
			return;

		/* TODO: ratelimit */
		Log(9, "WARN: Cannot accept on fd %d [%s].\n",
			Conns[slot].fd, strerror(errno));
		/*
		 * We must not raise an error here because we will close the
		 * master socket!
		 * TODO: We should signal it as a warning.
		 */
		/* Conn_error_raise(slot, errno); */
		return;
	}

	/* After calling Conn_alloc, pointer to slot can change, so C is not valid */
	X = Conn_alloc();
	if (!X) {
		Conn_error_raise(slot, ENOMEM);
		close(fd);
		return;
	}

	X->fd = fd;
	X->type = CONN_TYPE_P2P;
	X->state = CONN_STATE_OPEN;
	X->time_open = Conn_now;
	X->via = Conns[slot].id;
	X->events = CONN_POLLIN | CONN_POLLOUT;

	Conn_set_socket_domain(X, Conns[slot].sock_domain);
	Conn_set_socket_type(X, Conns[slot].sock_type);
	Conn_set_socket_protocol(X, Conns[slot].sock_protocol);

	X->flags |= CONN_ADDR_LOCAL_DIRTY | CONN_ADDR_REMOTE_DIRTY;

	err = Conn_engine_add_obj(X);
	if (err != 0) {
		Conn_error_raise(slot, err);
		Conn_free_intern(X->slot);
		return;
	}

	if (Conns[slot].cb_accept)
		Conns[slot].cb_accept(X);
	else if (Conn_accept_cb != NULL)
		Conn_accept_cb(X);

	Conn_work_to_do++;

	Conn_total++;
}

static void Conn_accept_allow(void)
{
	unsigned int slot;

	if (Conn_accept_is_allowed == Conn_accept_is_allowed_last)
		return;

	Log(10, "%s: Turning accept allow from %d to %d...\n",
		__FUNCTION__, Conn_accept_is_allowed_last,
		Conn_accept_is_allowed);

	for (slot = 0; slot < Conn_no; slot++) {
		if (Conns[slot].type != CONN_TYPE_MASTER)
			continue;

		if (Conn_accept_is_allowed == 0)
			Conns[slot].events &= ~CONN_POLLIN;
		else	
			Conns[slot].events |= CONN_POLLIN;

		Conn_engine_chg_obj(&Conns[slot]);
	}

	Conn_accept_is_allowed_last = Conn_accept_is_allowed;
}

/*
 * Add tokens to connection
 */
static void Conn_band_update(const unsigned int slot)
{
	long diff;

	/* no need */
	if (Conns[slot].band_width == 0)
		return;

	diff = (Conn_now.tv_sec - Conns[slot].band_lasttime.tv_sec) * 1000000;
	diff += Conn_now.tv_usec - Conns[slot].band_lasttime.tv_usec;
	diff /= 100000;

	/* already added in this hundred of milisecond? */
	if (diff == 0)
		return;

	/* take care of time skew */
	if (diff < 0)
		diff = 1;

	Conns[slot].band_lasttime = Conn_now;
	Conns[slot].band_tokens += diff * Conns[slot].band_width / 10;
	if (Conns[slot].band_tokens > Conns[slot].band_factor * Conns[slot].band_width)
		Conns[slot].band_tokens = Conns[slot].band_factor * Conns[slot].band_width;

	Conns[slot].events |= CONN_POLLOUT;
	Conn_engine_chg_obj(&Conns[slot]);

	Log(debug_band, "\t\tBAND: slot=%u, id=%llu, added tokens -> %u.\n",
		slot, Conns[slot].id, Conns[slot].band_tokens);
}

/*
 * Set the bandwidth for a connection
 * width is in 1b increments
 */
int Conn_band(struct Conn *C, const unsigned int width,
	const unsigned int factor)
{
	unsigned int slot;

	slot = C->slot;

	Log(11, "\tConn_band: slot=%u, id=%llu, width=%u, factor=%u.\n",
		slot, Conns[slot].id, width, factor);

	Conns[slot].band_lasttime = Conn_now;
	Conns[slot].band_width = width;
	Conns[slot].band_factor = factor;
	Conns[slot].band_tokens = factor * width;

	Log(debug_band, "\t\tBAND: lasttime=%d.%06d, width=%u, factor=%u, tokens=%u\n",
		Conns[slot].band_lasttime.tv_sec, Conns[slot].band_lasttime.tv_usec,
		Conns[slot].band_width, Conns[slot].band_factor, Conns[slot].band_tokens);

	return 0;
}

static void Conn_trytoconnect(void)
{
	struct addrinfo hints;
	struct addrinfo *res;
	int i, ret;
	char port[8];

	Log(8, "%s: Conn_pending=%d\n",
		__FUNCTION__, Conn_pending);

	for (i = Conn_no - 1; i >= 0; i--) {
		if (Conns[i].type != CONN_TYPE_P2P)
			continue;

		if ((Conns[i].state == CONN_STATE_CONNECT_0)
			&& (Conns[i].tryat <= Conn_now.tv_sec)) {
			Conns[i].state = CONN_STATE_CONNECT_a;
		}

		if (Conns[i].state != CONN_STATE_CONNECT_a)
			continue;

		Log(9, "\tTrying to connect slot=%u, id=%llu, to %s/%d...\n",
			Conns[i].slot, Conns[i].id, Conns[i].addr, Conns[i].port);

		memset(&hints, 0, sizeof(hints));
		if (Conns[i].sock_domain == 0)
			hints.ai_family = PF_UNSPEC;
		else
			hints.ai_family = Conns[i].sock_domain;
		hints.ai_socktype = Conns[i].sock_type;
		/*hints.ai_flags = AI_NUMERICHOST;*/
		hints.ai_flags = AI_ADDRCONFIG;
		snprintf(port, sizeof(port), "%d", Conns[i].port);
		res = NULL;
		ret = getaddrinfo(Conns[i].addr, port, &hints, &res);
		if (ret != 0) {
			Conns[i].state = CONN_STATE_ERROR;
			Conns[i].error_state = CONN_ERROR_GETADDRINFO;
			if (res)
				freeaddrinfo(res);
			continue;
		}

		if (Conns[i].fd == -1) {
			Conns[i].fd = socket(res->ai_family, res->ai_socktype, 0);
			if (Conns[i].fd == -1) {
				Conns[i].state = CONN_STATE_ERROR;
				Conns[i].error_state = CONN_ERROR_SOCKET;
				Conns[i].xerrno = errno;
				freeaddrinfo(res);
				continue;
			}
			Log(10, "\tAllocated socket on fd %d\n",
				Conns[i].fd);

			Conn_setnonblock(Conns[i].fd);

			/* Need POLLOUT to signal when the connection was done. */
			Conns[i].events |= (CONN_POLLIN | CONN_POLLOUT);
			ret = Conn_engine_add_obj(&Conns[i]);
			if (ret != 0) {
				Conns[i].state = CONN_STATE_ERROR;
				Conns[i].error_state = CONN_ERROR_SOCKET;
				freeaddrinfo(res);
				continue;
			}
		}

		/* Set syn time */
		Conns[i].conn_syn = Conn_now;

		Log(11, "\tConnecting...\n");
		ret = connect(Conns[i].fd, res->ai_addr, res->ai_addrlen);
		if ((ret != 0) && (errno != EINPROGRESS)) {
			Conns[i].state = CONN_STATE_ERROR;
			Conns[i].error_state = CONN_ERROR_CONNECT;
			Conns[i].xerrno = errno;
			freeaddrinfo(res);
			continue;
		}

		Conns[i].state = CONN_STATE_CONNECT_b;

		Conn_pending--;
		Conn_total++;

		freeaddrinfo(res);
	}

	Log(8, "%s: After, Conn_pending=%d\n",
		__FUNCTION__, Conn_pending);
}

static void Conn_send_cb_i(const unsigned int slot)
{
	ssize_t n;
	unsigned int max;
	int count;
	char *buf;
	int xerrno;
	char *dump;

	Log(10, "Conn_send_cb_i slot=%u, id=%llu, fd=%d"
		", head=%u, tail=%u, size=%u...\n",
		slot, Conns[slot].id, Conns[slot].fd,
		Conns[slot].obuf_head, Conns[slot].obuf_tail,
		Conns[slot].obuf_size);

	if (Conns[slot].obuf == NULL)
		abort();

	buf = Conns[slot].obuf + Conns[slot].obuf_head;
	count = Conns[slot].obuf_tail - Conns[slot].obuf_head;
	if (count == 0) {
		Log(13, "\tEmpty output buffer!\n");
		return;
	}

	max = count;
	if ((Conn_max_send > 0) && (max > Conn_max_send))
		max = Conn_max_send;

	/* bandwidth */
	if (Conns[slot].band_width > 0) {
		if (max > Conns[slot].band_tokens)
			max = Conns[slot].band_tokens;
		if (max == 0) {
			Log(debug_band, "\tBAND: Suspend 100ms the C (no tokens)!\n"); 
			Conns[slot].events &= ~CONN_POLLOUT;
			Conn_engine_chg_obj(&Conns[slot]);
			return;
		}
	}

	again:
	Log(10, "\tsend(fd=%d, buf (head=%u, tail=%u), max=%d (count=%d), 0)...\n",
		Conns[slot].fd, Conns[slot].obuf_head,
		Conns[slot].obuf_tail, max, count);
	n = send(Conns[slot].fd, buf, max, 0);
	xerrno = errno;
	if ((n == -1) && (errno == EINTR))
		goto again;

	if ((n == -1) && (errno == EAGAIN))
		return;

	Log(10, "\tSent %d bytes [head=%d tail=%d]\n",
		n, Conns[slot].obuf_head, Conns[slot].obuf_tail);
	if (Conn_level >= 10) {
		dump = Conn_dump(buf, n);
		Log(0, "\t%s\n", dump);
		free(dump);
	}

	if (n > 0) {
		Conns[slot].tsend = Conn_now;
		if (n < count) {
			Conns[slot].obuf_head += n;
		} else {
			Conns[slot].obuf_head = 0;
			Conns[slot].obuf_tail = 0;
		}

		Conns[slot].bo += n;
		if (Conns[slot].band_width > 0) {
			Conns[slot].band_tokens -= n;
			Log(debug_band, "\t%s: BAND: Remove %d tokens -> %u...\n",
				__FUNCTION__,
				n, Conns[slot].band_tokens);
		}
	} else {
		Log(0, "\tError in sending [%s]\n",
			strerror(errno));
		Conns[slot].error_state = CONN_ERROR_SEND;
		Conns[slot].xerrno = xerrno;
	}
}

static void Conn_recv_cb_i(const unsigned int slot)
{
	ssize_t n;
	unsigned int max;
	int r, xerrno;
	char *dump;

	Log(10, "Conn_recv_cb_i: slot=%u, id=%llu, fd=%d"
		", head=%u, tail=%u, size=%u...\n",
		slot, Conns[slot].id, Conns[slot].fd,
		Conns[slot].ibuf_head, Conns[slot].ibuf_tail, Conns[slot].ibuf_size);

	if (Conns[slot].ibuf_tail == Conns[slot].ibuf_size) {
		r = Conn_try_expand_buf(slot, 1, Conn_default_ibuf);
		if (r != 0) {
			Conns[slot].error_state = CONN_ERROR_MEM;
			/* TODO: Just suspend connection for 1 second instead */
			return;
		}
	}

	max = Conns[slot].ibuf_size - Conns[slot].ibuf_tail;
	if ((Conn_max_recv > 0) && (max > Conn_max_recv))
		max = Conn_max_recv;

	while (1) {
		n = recv(Conns[slot].fd, Conns[slot].ibuf + Conns[slot].ibuf_tail, max, 0);
		if ((n == -1) && (errno == EINTR))
			continue;

		xerrno = errno;

		break;
	}

	Log(10, "\tReceived %d bytes.\n", n);

	if (n > 0) {
		if (Conn_level >= 10) {
			dump = Conn_dump(Conns[slot].ibuf + Conns[slot].ibuf_tail, n);
			Log(0, "\t%s\n", dump);
			free(dump);
		}

		Conns[slot].ibuf_tail += n;

		Conns[slot].bi += n;
		Conns[slot].trecv = Conn_now;

		if (Conns[slot].cb_data)
			Conns[slot].cb_data(&Conns[slot]);
		else if (Conn_data_cb)
			Conn_data_cb(&Conns[slot]);
	} else if (n == 0) {
		Log(10, "\tRemote closed sending side.\n");
		Conns[slot].error_state = CONN_ERROR_HANGUP;
		/* TODO: Maybe we should just cut INPUT and do not hangup */
	} else {
		Log(0, "\tError receiving [%s]\n",
			strerror(errno));
		Conns[slot].error_state = CONN_ERROR_RECV;
		Conns[slot].xerrno = xerrno;
	}
}

/*
 * Callback that is called for every connection
 */
static void Conn_poll_cb(const unsigned int slot, int revents)
{
	char poll_status[16];

	Conn_poll_status(revents, poll_status);
	Log(12, "\t%s: slot=%u, id=%llu, revents=%s.\n",
		__FUNCTION__, slot, Conns[slot].id, poll_status);
	Conns[slot].revents = revents;

	if (Conn_level >= 12)
		Log(12, "\t\t%s\n", Conn_status_slot(slot));

	/* We should not have events on a free cell */
	if (Conns[slot].state == CONN_STATE_FREE) {
		Log(12, "\t\tBUG! Events on a FREE slot!\n");
		return;
	}

	if (revents & CONN_POLLHUP) {
		Conns[slot].error_state = CONN_ERROR_HANGUP;
		/* TODO: Add it to the close list to speed it up */
	}

	if (revents & CONN_POLLERR) {
		Conns[slot].error_state = CONN_ERROR_POLL;
		Conns[slot].xerrno = 0; /* TODO: unknown error? */
		/* TODO: CONN_ERROR_POLL is correct here? */
	}

	/* First, test we have a new connection */
	if ((revents & CONN_POLLOUT)
		&& (Conn_ignore(slot) == 0)) {

		if (Conn_oqlen(&Conns[slot]) == 0) {
			/* Nothing to send */
			revents &= ~CONN_POLLOUT;
			Conns[slot].events &= ~CONN_POLLOUT;
			Conn_engine_chg_obj(&Conns[slot]);
		}

		if (Conns[slot].state == CONN_STATE_CONNECT_b) {
			Log(12, "\t\tWe just established a connection.\n");

			Conns[slot].state = CONN_STATE_OPEN;

			Conns[slot].flags |= CONN_ADDR_LOCAL_DIRTY;

			Conns[slot].time_open = Conn_now;

			if (Conns[slot].cb_connected != NULL)
				Conns[slot].cb_connected(&Conns[slot]);
			else if (Conn_connected_cb)
				Conn_connected_cb(&Conns[slot]);
		}
	}

	/* Second, test for hangup or input */
	if ((revents & CONN_POLLIN)
		&& (Conn_ignore(slot) == 0)) {
		Log(12, "\t\tWe have input...\n");
		if (Conns[slot].type == CONN_TYPE_MASTER) {
			/* C pointer can change under us in Conn_accept->Conn_grow */
			Conn_accept(slot);
		} else {
			if (Conns[slot].cb_recv)
				Conns[slot].cb_recv(&Conns[slot]);
			else if (Conn_recv_cb != NULL)
				Conn_recv_cb(&Conns[slot]);
			else
				Conn_recv_cb_i(slot);
		}
	}

	if ((revents & CONN_POLLOUT)
		&& (Conn_ignore(slot) == 0)) {
		Log(12, "\t\tWe can send data...\n");
		if (Conns[slot].state == CONN_STATE_OPEN) {
			if (Conns[slot].cb_send)
				Conns[slot].cb_send(&Conns[slot]);
			else if (Conn_send_cb != NULL)
				Conn_send_cb(&Conns[slot]);
			else
				Conn_send_cb_i(slot);

			if (Conn_oqlen(&Conns[slot]) == 0) {
				if (Conns[slot].flags & CONN_FLAGS_CLOSE_AFTER_SEND) {
					Conns[slot].state = CONN_STATE_ERROR;
					Conns[slot].error_state = CONN_ERROR_USERREQ;
				} else {
					Conns[slot].events &= ~CONN_POLLOUT;
					Conn_epoll_chg_obj(&Conns[slot]);
				}
			}
		}
	}
}

/*
 * Moving an in-use slot over a free one for compacting reason.
 */
static void Conn_move_slot(const unsigned int dst, const unsigned int src)
{
	struct Conn tmp;

	if (dst == src)
		return;

	Log(10, "\t\t%s: Moving id %llu from slot=%u to slot=%d...\n",
		__FUNCTION__, Conns[src].id, src, dst);

	/* We need to save old location because of usefull pointers. */
	tmp = Conns[dst];
	Conns[dst] = Conns[src];
	Conns[src] = tmp;

	Conns[dst].slot = dst;
	Conns[src].slot = src;

	Conn_engine_move_slot(dst, src);
}

/*
 * Does the polling of file descriptors.
 * Returns: -1 on error, 0 nothing to do, 1 if some work was done
 * timeout is in 1/1000 seconds increments.
 */
int Conn_poll(const int timeout)
{
	int ret;
	int timeout2;
	unsigned int slot, last;

	Log(9, "%s: timeout=%d Conn_no=%d, Conn_work_to_do=%u)\n",
		__FUNCTION__, timeout, Conn_no, Conn_work_to_do);

	if (timeout > 1000)
		timeout2 = 1000;
	else if (timeout == -1)
		timeout2 = 1000;
	else
		timeout2 = timeout;

	loop:
	if (Conn_must_stop == 1)
		return 0;

	if (Conn_work_to_do == 0) {
		Log(9, "%s: work_to_do is 0, so return 0!\n",
			__FUNCTION__);
		return 0;
	}

	if (Conn_pending > 0)
		Conn_trytoconnect();

	ret = Conn_engine_poll(timeout2, Conn_poll_cb);
	if (ret < 0)
		return -1;

	Log(9, "Do compacting, expiration and band stuff...\n");
	slot = 0;
	while (slot < Conn_no) {
		/*
		 * Save last position because Conn_free_intern
		 * decrements Conn_no.
		 */
		last = Conn_no - 1;

		/* Closing connection if it is in error state. */
		if (Conns[slot].error_state > 0) {
			Log(11, "\tSlot=%u in error [%s], move and free it.\n",
				slot, Conn_errno(&Conns[slot]));
			Conn_move_slot(slot, last);
			Conn_free_intern(last);
			continue;
		}

		/* No commit done yet */
		if (Conns[slot].state == CONN_STATE_EMPTY) {
			slot++;
			continue;
		}

		if (Conns[slot].state == CONN_STATE_FREE) {
			/* Must not happen! Probably stale events. */
			Log(9, "\tBUG! Slot is in FREE state and has events!\n");
			slot++;
			continue;
		}

		/* test if it expired/timeout */
		Conn_expire(slot);

		/* add tokens */
		Conn_band_update(slot);

		slot++;
	}

	/* Blocking accept if full queue or unblock if not */
	Conn_accept_allow();

	/* Any work left to do? */
	if (Conn_no == 0) {
		Log(10, "Nothing remained to poll for!\n");
		return 0;
	}

	if (timeout == -1)
		goto loop;

	return ret;
}

/*
 * Returns the lifetime of a connection
 * TODO: To be moved in 'core'?
 */
unsigned long long Conn_lifetime(struct Conn *C)
{
	unsigned int slot;

	slot = C->slot;

	return Conn_time_diff(&Conn_now, &Conns[slot].time_open);
}


Mode Type Size Ref File
100644 blob 94 2e97920b91646e1a8c2438ca375e2aaae22793fb .gitignore
100644 blob 169 c003c095218f64ad33aeb89987f61eb575557d96 .mailmap
100644 blob 1945 fecf0e7a7e8580485101a179685aedc7e00affbb Changelog.pre109
100644 blob 33594 bee2aa87c3f0e64f6e285d355b16d1625fb83266 Conn.c
100644 blob 1526 e10421bf8e376a6b479156ca7be599e3a7bc7727 Conn.h
100644 blob 835 3e3dff5bad664ce3c207f971f93c3a9125690e5e Conn.spec.in
100644 blob 95 709b9e660dcb7e8c7f17cda4e15cf3e37ec73839 Conn_config.h.in
100644 blob 32116 1ef5a31760df3e75df94dfc24bf9b82be345f074 Conn_engine_core.c
100644 blob 9809 1b67b229f712fc9f929bd50db99edd16dee93d24 Conn_engine_core.h
100644 blob 3960 51e1c1125e9dd4cbfdefa2dab9c1817a346e7827 Conn_engine_epoll.c
100644 blob 610 b8597ef7043fa9b6ccd58c0f484f040e8621cc95 Conn_engine_epoll.h
100644 blob 2761 8a866dbb20b17aa5efd9466ccacd74e25baa6923 Conn_engine_poll.c
100644 blob 597 183f7af0b0688200fa8f69527c03ee075c83df12 Conn_engine_poll.h
100644 blob 30 d987fa5df957830331139935d517009e2911b0cf INSTALL
100644 blob 25275 92b8903ff3fea7f49ef5c041b67a087bca21c5ec LICENSE
100644 blob 1300 3dca4a4b1231b9544475cabe887ceed92e014d17 Makefile.in
100644 blob 192 5b11bdfb23857d8588845465aef993b320596b44 README
100644 blob 3347 21c1e62573729f1d34b5be7ea2d9e20ffe0c1c65 TODO
100755 blob 23 d33bb6c4ecdce1390ce1db3c79ea3b93e22ea755 configure
040000 tree - d4c9c4a69c5cfa2a84316967185f1661b6817779 docs
100755 blob 13311 a6e2825b35f915e6d64c2a981fa3b6266b2bf587 duilder
100644 blob 325 0397e6b6c1db3a3995fc8f73de7df276c7f83015 duilder.conf
040000 tree - 7b6e9c21cc34a982588af6cbbac0447b70a01ed6 examples
040000 tree - 751693d0803f700dd060788cc9383aa24b472267 tests
Hints:
Before first commit, do not forget to setup your git environment:
git config --global user.name "your_name_here"
git config --global user.email "your@email_here"

Clone this repository using HTTP(S):
git clone https://rocketgit.com/user/catalinux/Conn

Clone this repository using ssh (do not forget to upload a key first):
git clone ssh://rocketgit@ssh.rocketgit.com/user/catalinux/Conn

Clone this repository using git:
git clone git://git.rocketgit.com/user/catalinux/Conn

You are allowed to anonymously push to this repository.
This means that your pushed commits will automatically be transformed into a merge request:
... clone the repository ...
... make some changes and some commits ...
git push origin main