catalinux / Conn (public) (License: LGPLv2) (since 2016-03-01) (hash sha1)
Net library for easy building ipv4/ipv6 network daemons/clients

/Conn.c (ce307c8127733a93d036be69555ce39f5d5c0a6d) (32589 bytes) (mode 100644) (type blob)

/*
 * Author: Catalin(ux) M BOIE <catab at embedromix.ro>
 * Date: 2004
 * Description:	Some functions to help writing network servers and clients,
 *		both ipv4 and ipv6.
 * Licence: LGPL
 */

#include "Conn.h"
#include "Conn_engine_poll.h"
#include "Conn_engine_epoll.h"


/* Internal variables */
/* Engine */
static unsigned int		Conn_engine;
static unsigned int		Conn_engine_poll_found;
static unsigned int		Conn_engine_epoll_found;

/* Engine functions */
static int (*Conn_engine_init)(void);
static int (*Conn_engine_shutdown)(void);
static int (*Conn_engine_grow)(unsigned int);
static int (*Conn_engine_add_obj)(struct Conn *);
static int (*Conn_engine_del_obj)(struct Conn *);
static int (*Conn_engine_chg_obj)(struct Conn *);
static int (*Conn_engine_poll)(int, void (*cb)(const unsigned int slot, const int revents));
static int (*Conn_engine_move_slot)(const unsigned int dst,
		const unsigned int src);


/* Functions */

/*
 * Set prefered engine
 */
int Conn_engine_set(const unsigned int engine)
{
	if (engine == Conn_engine)
		return 0;

	if (engine == CONN_ENGINE_POLL) {
		if (Conn_engine_poll_found == 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot use engine POLL. Probably not supported.");
			return -1;
		}

		#ifdef POLL_FOUND
		/* Set variables */
		CONN_POLLIN = POLLIN;
		CONN_POLLOUT = POLLOUT;
		CONN_POLLPRI = POLLPRI;
		CONN_POLLERR = POLLERR;
		CONN_POLLHUP = POLLHUP;
		CONN_POLLNVAL = POLLNVAL;
		CONN_POLLRDNORM = POLLRDNORM;
		CONN_POLLRDBAND = POLLRDBAND;
		/* Set functions */
		Conn_engine_init = Conn_poll_init;
		Conn_engine_shutdown = Conn_poll_shutdown;
		Conn_engine_grow = Conn_poll_grow;
		Conn_engine_add_obj = Conn_poll_add_obj;
		Conn_engine_del_obj = Conn_poll_del_obj;
		Conn_engine_chg_obj = Conn_poll_chg_obj;
		Conn_engine_poll = Conn_poll_poll;
		Conn_engine_move_slot = Conn_poll_move_slot;
		#endif
	} else if (engine == CONN_ENGINE_EPOLL) {
		if (Conn_engine_epoll_found == 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot use engine EPOLL. Probably not supported.");
			return -1;
		}

		#ifdef EPOLL_FOUND
		/* Set variables */
		CONN_POLLIN = EPOLLIN;
		CONN_POLLOUT = EPOLLOUT;
		CONN_POLLPRI = EPOLLPRI;
		CONN_POLLERR = EPOLLERR;
		CONN_POLLHUP = EPOLLHUP;
		CONN_POLLNVAL = 0; /* not defined for epoll */
		CONN_POLLRDNORM = 0; /* not defined for epoll */
		CONN_POLLRDBAND = 0; /* not defined for epoll */
		/* Set functions */
		Conn_engine_init = Conn_epoll_init;
		Conn_engine_shutdown = Conn_epoll_shutdown;
		Conn_engine_grow = Conn_epoll_grow;
		Conn_engine_add_obj = Conn_epoll_add_obj;
		Conn_engine_del_obj = Conn_epoll_del_obj;
		Conn_engine_chg_obj = Conn_epoll_chg_obj;
		Conn_engine_poll = Conn_epoll_poll;
		Conn_engine_move_slot = Conn_epoll_move_slot;
		#endif
	} else {
		snprintf(Conn_error, sizeof(Conn_error),
			"Cannot use engine %u because is not supported.", engine);
		return -1;
	}

	return 0;
}

/*
 * Reads a value from proc
 */
static void Conn_read_proc(char *buf, const size_t buf_size, const char *file)
{
	int fd;
	ssize_t n;

	fd = open(file, O_RDONLY);
	if (fd == -1) {
		snprintf(buf, buf_size, "ERROR_OPEN!");
		return;
	}

	n = read(fd, buf, buf_size - 1);
	if (n == -1) {
		snprintf(buf, buf_size, "ERROR_READ!");
	} else if (n == 0) {
		strcpy(buf, "");
	} else {
		buf[n - 1] = '\0';
		n--;

		while ((n >= 0) && (buf[n - 1] == '\n')) {
			buf[n - 1] = '\0';
			n--;
		}
	}

	close(fd);
}

/*
 * Returns some important system values
 */
char *Conn_sys(void)
{
	static char ret[512];
	char somaxconn[16];
	char tcp_max_tw_buckets[16];
	char tcp_fin_timeout[16];

	Conn_read_proc(somaxconn, sizeof(somaxconn),
		"/proc/sys/net/core/somaxconn");
	Conn_read_proc(tcp_max_tw_buckets, sizeof(tcp_max_tw_buckets),
		"/proc/sys/net/ipv4/tcp_max_tw_buckets");
	Conn_read_proc(tcp_fin_timeout, sizeof(tcp_fin_timeout),
		"/proc/sys/net/ipv4/tcp_fin_timeout");

	snprintf(ret, sizeof(ret), "net.core.somaxconn=%s"
		" net.ipv4.tcp_max_tw_buckets=%s"
		" net.ipv4.tcp_fin_timeout=%s",
		somaxconn, tcp_max_tw_buckets, tcp_fin_timeout);

	return ret;
}

int Conn_init(const unsigned int max)
{
	unsigned int ret;
	unsigned int engine;

	if (Conn_inited == 1)
		return 0;

	Conn_max = max;

	Conn_no = 0;
	Conn_work_to_do = 0;
	Conn_total = 0;
	Conn_max_reached = 0;
	gettimeofday(&Conn_now, NULL);
	Conn_start = Conn_now.tv_sec;
	Conn_accept_is_allowed = 1;
	Conn_accept_is_allowed_last = 1;
	Conn_allocated = 0;

	snprintf(Conn_error, sizeof(Conn_error), "%s", "");

	/*
	Conn_queue_init(&Conn_queue_free);
	*/

	#ifdef POLL_FOUND
	engine = CONN_ENGINE_POLL;
	Conn_engine_poll_found = 1;
	#endif

	#ifdef EPOLL_FOUND
	engine = CONN_ENGINE_EPOLL;
	Conn_engine_epoll_found = 1;
	#endif

	ret = Conn_engine_set(engine);
	if (ret != 0)
		return -1;

	ret = Conn_engine_init();
	if (ret != 0)
		return -1;

	Conn_inited = 1;

	return 0;
}

/*
 * Shutdown Conn
 */
int Conn_shutdown(void)
{
	int ret;
	unsigned int slot;

	Conn_inited = 0;

	ret = Conn_engine_shutdown();
	if (ret < 0)
		return ret;

	/* Free all buffers */
	for (slot = 0; slot < Conn_allocated - 1; slot++) {
		if (Conns[slot].ibuf)
			free(Conns[slot].ibuf);
		if (Conns[slot].obuf)
			free(Conns[slot].obuf);
	}

	free(Conns);

	return 0;
}

int Conn_enqueue(struct Conn *C, void *buf, const size_t count)
{
	unsigned int r, slot;
	char *dump;

	if (C == NULL) {
		Log(0, "ERROR: Somebody try to enqueue something to a NULL conn.\n");
		return -1;
	}

	slot = C->slot;

	if (Conn_level >= 10) {
		dump = Conn_dump(buf, count);
		Log(0, "\tTry to enqueue %d bytes to slot=%u, id=%llu [%s]...\n",
			count, slot, Conns[slot].id, dump);
		free(dump);
	}

	if (Conns[slot].obuf_size - Conns[slot].obuf_tail < count) {
		r = Conn_try_expand_buf(slot, 0, count);
		if (r != 0)
			return -1;
	}

	memcpy(Conns[slot].obuf + Conns[slot].obuf_tail, buf, count);
	Conns[slot].obuf_tail += count;

	Conns[slot].events |= CONN_POLLOUT;
	Conn_engine_chg_obj(&Conns[slot]);

	return count;
}

static void Conn_free_intern(const unsigned int slot)
{
	Log(11, "%s: Cleaning-up id %llu (slot %u) in state %s [%s]...\n",
		__FUNCTION__, Conns[slot].id, slot, Conn_state(&Conns[slot]),
		Conn_errno(&Conns[slot]));

	snprintf(Conn_error, sizeof(Conn_error),
		"%s", Conn_errno(&Conns[slot]));

	if (Conns[slot].error_state != CONN_ERROR_USERREQ)
		Conn_error_raise(slot, 0);

	if ((Conns[slot].state == CONN_STATE_OPEN)
		|| (Conns[slot].state == CONN_STATE_LISTEN)
		|| (Conns[slot].state == CONN_STATE_ERROR)) {
		if (Conns[slot].cb_close)
			Conns[slot].cb_close(&Conns[slot]);
		else if (Conn_close_cb)
			Conn_close_cb(&Conns[slot]);
	}

	if (Conns[slot].fd > -1) {
		Conn_engine_del_obj(&Conns[slot]);
		close(Conns[slot].fd);
		Conns[slot].fd = -1;
	}

	/* Reset tsend, else we enter in a timeout error loop */
	Conns[slot].tsend.tv_sec = 0;
	Conns[slot].tsend.tv_usec = 0;

	/* Reset the connection attempt time */
	Conns[slot].conn_syn.tv_sec = 0;
	Conns[slot].conn_syn.tv_usec = 0;

	/* Misc */
	Conns[slot].error_state = 0;

	if (Conns[slot].flags & CONN_FLAGS_AUTO_RECONNECT) {
		Conns[slot].tryat = Conn_now.tv_sec + Conns[slot].delay;
		Conns[slot].state = CONN_STATE_CONNECT_0;

		Conns[slot].ibuf_head = 0;
		Conns[slot].ibuf_tail = 0;

		Conns[slot].obuf_head = 0;
		Conns[slot].obuf_tail = 0;

		Conn_pending++;
	} else {
		Conns[slot].type = CONN_TYPE_UNK;
		Conns[slot].state = CONN_STATE_FREE;

		/* Allow connections */
		Conn_accept_is_allowed = 1;

		/* Decrement the number of busy connections */
		Conn_no--;
		Conn_work_to_do--;
	}
}

/*
 * Grow Conn structures (cells)
 */
static int Conn_grow(void)
{
	int ret;
	unsigned int alloc;
	struct Conn *p, *set;

	Log(10, "%s() Try to grow cells from %d to %d.\n",
		__FUNCTION__,
		Conn_allocated, Conn_allocated + 128);

	alloc = Conn_allocated + 128;

	ret = Conn_engine_grow(alloc);
	if (ret != 0)
		return -1;

	p = (struct Conn *) realloc(Conns, alloc * sizeof(struct Conn));
	if (p == NULL)
		return -1;

	set = p + Conn_allocated;
	memset(set, 0, 128 * sizeof(struct Conn));
	Conns = p;

	Conn_allocated = alloc;

	return 0;
}

/*
 * Allocs a Conn structure
 */
struct Conn *Conn_alloc(void)
{
	unsigned int growok;
	void *p;
	unsigned int slot;

	Log(10, "%s() Conn_no=%d Conn_max=%d\n",
		__FUNCTION__,
		Conn_no, Conn_max);

	if ((Conn_max > 0) && (Conn_no >= Conn_max)) {
		snprintf(Conn_error, sizeof(Conn_error),
			"Limit reached! Consider a raise of max connection number or put 0 for no limit.");
		return NULL;
	}

	if (Conn_allocated == Conn_no) {
		growok = Conn_grow();
		if (growok != 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot grow anymore. Probably memory shortage.");
			return NULL;
		}
	}

	if (Conn_no > Conn_max_reached)
		Conn_max_reached = Conn_no;

	slot = Conn_no;
	Conns[slot].slot = slot;

	Conns[slot].type = CONN_TYPE_UNK;
	Conns[slot].state = CONN_STATE_EMPTY;

	if (Conns[slot].ibuf_size < Conn_default_ibuf) {
		p = realloc(Conns[slot].ibuf, Conn_default_ibuf);
		if (p == NULL) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Memory allocation error2!");
			return NULL;
		}
		Conns[slot].ibuf = p;
		Conns[slot].ibuf_size = Conn_default_ibuf;
	}
	Conns[slot].ibuf_head = 0;
	Conns[slot].ibuf_tail = 0;

	if (Conns[slot].obuf_size < Conn_default_obuf) {
		p = realloc(Conns[slot].obuf, Conn_default_obuf);
		if (p == NULL) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Memory allocation error3!");
			return NULL;
		}
		Conns[slot].obuf = p;
		Conns[slot].obuf_size = Conn_default_obuf;
	}
	Conns[slot].obuf_head = 0;
	Conns[slot].obuf_tail = 0;

	Conns[slot].trecv = Conn_now;

	Conns[slot].bi = 0;
	Conns[slot].bo = 0;
	Conns[slot].private = NULL;

	/* Reset syn time */
	Conns[slot].conn_syn.tv_sec = 0;
	Conns[slot].conn_syn.tv_usec = 0;

	/* bandwidth */
	Conns[slot].band_width = 0;
	Conns[slot].band_factor = 0;
	Conns[slot].band_tokens = 0;
	Conns[slot].band_lasttime = Conn_now;

	Conns[slot].fd = -1;
	Conns[slot].events = 0;
	Conns[slot].revents = 0;

	Conns[slot].flags = 0;

	Conns[slot].start = Conn_now.tv_sec;

	Conns[slot].id = Conn_id++;

	Conn_no++;
	/* Conn_work_to_do will not be incremented here, only in commit! */

	Log(10, "\tFound free slot=%u, id=%llu. Now Conn_no=%d\n",
		slot, Conns[slot].id, Conn_no);

	if (Conn_no == Conn_max)
		Conn_accept_is_allowed = 0;

	return &Conns[slot];
}

int Conn_set_socket_domain(struct Conn *C, const int domain)
{
	C->sock_domain = domain;
	return 0;
}

int Conn_set_socket_type(struct Conn *C, const int type)
{
	C->sock_type = type;
	return 0;
}

int Conn_set_socket_protocol(struct Conn *C, const int protocol)
{
	C->sock_protocol = protocol;
	return 0;
}

/*
 * Set the address where to bind to
 */
int Conn_set_socket_bind_addr(struct Conn *C, const char *addr)
{
	snprintf(C->bind_addr, sizeof(C->bind_addr), "%s", addr);
	return 0;
}

/*
 * Set address where to connect to
 */
int Conn_set_socket_addr(struct Conn *C, const char *addr)
{
	snprintf(C->addr, sizeof(C->addr), "%s", addr);
	return 0;
}

int Conn_set_socket_bind_port(struct Conn *C, const int port)
{
	C->bind_port = port;
	return 0;
}

int Conn_set_socket_port(struct Conn *C, const int port)
{
	C->port = port;
	return 0;
}

/*
 * Allocates socket and bind if necesary
 * TODO: We should return -1 or free connection and call calbacks?!
 */
int Conn_commit(struct Conn *C)
{
	int i, ret;
	struct sockaddr *psa = NULL, *bind_psa = NULL;
	struct sockaddr_in sa, bind_sa;
	struct sockaddr_in6 sa6, bind_sa6;
	int sock_len = 0, bind_sock_len = 0;
	int do_bind = 1, do_listen = 1, do_connect = 0;
	int first_state;
	unsigned int slot;

	slot = C->slot;

	Log(10, "%s: slot=%u...\n", __FUNCTION__, slot);

	/* Be optimistical and increment here, in 'free' we will decrement. */
	/* So, in error case, we will only decrement this thing! */
	Conn_work_to_do++;

	/* Try to figure what kind of socket is: client or master */
	if (Conns[slot].type == CONN_TYPE_UNK) {
		if (strlen(Conns[slot].addr) > 0) {
			Conns[slot].type = CONN_TYPE_P2P;
			do_listen = 0;
			do_connect = 1;
		} else {
			Conns[slot].type = CONN_TYPE_MASTER;
			if (strlen(Conns[slot].bind_addr) == 0) {
				switch (Conns[slot].sock_domain) {
				case PF_INET:
					snprintf(Conns[slot].bind_addr, sizeof(Conns[slot].bind_addr),
						"0.0.0.0"); break;
				case PF_INET6:
					snprintf(Conns[slot].bind_addr, sizeof(Conns[slot].bind_addr),
						"::"); break;
				}
			}
		}
	}

	switch (Conns[slot].sock_domain) {
		case PF_INET:
			/* for connection socket */
			if (strlen(Conns[slot].addr) > 0) {
				memset(&sa, 0, sizeof(sa));
				sa.sin_family = AF_INET;
				ret = inet_pton(AF_INET, Conns[slot].addr, &sa.sin_addr);
				if (ret < 0) {
					snprintf(Conn_error, sizeof(Conn_error),
						"inet_pton(%s) failed", Conns[slot].addr);
					return -1;
				}
				sa.sin_port = htons(Conns[slot].port);
				psa = (struct sockaddr *) &sa;
				sock_len = sizeof(sa);
			}

			/* for binding socket */
			if (strlen(Conns[slot].bind_addr) > 0) {
				memset(&bind_sa, 0, sizeof(bind_sa));
				bind_sa.sin_family = AF_INET;
				ret = inet_pton(AF_INET, Conns[slot].bind_addr, &bind_sa.sin_addr);
				if (ret < 0) {
					snprintf(Conn_error, sizeof(Conn_error),
						"inet_pton(%s) failed", Conns[slot].bind_addr);
					return -1;
				}
				bind_sa.sin_port = htons(Conns[slot].bind_port);
				bind_psa = (struct sockaddr *) &bind_sa;
				bind_sock_len = sizeof(bind_sa);
			}

			if (Conns[slot].sock_type == SOCK_STREAM) {
				first_state = CONN_STATE_LISTEN;
			} else if (Conns[slot].sock_type == SOCK_DGRAM) {
				do_listen = 0;
				first_state = CONN_STATE_OPEN;
			}
			break;

		case PF_INET6:
			/* for connection socket */
			if (strlen(Conns[slot].addr) > 0) {
				memset(&sa6, 0, sizeof(sa6));
				sa6.sin6_family = AF_INET6;
				ret = inet_pton(AF_INET6, Conns[slot].addr, &sa6.sin6_addr);
				if (ret < 0) {
					snprintf(Conn_error, sizeof(Conn_error),
						"inet_pton(%s) failed", Conns[slot].addr);
					return -1;
				}
				sa6.sin6_port = htons(Conns[slot].port);
				psa = (struct sockaddr *) &sa6;
				sock_len = sizeof(sa6);
			}

			/* for binding socket */
			if (strlen(Conns[slot].bind_addr) > 0) {
				memset(&bind_sa6, 0, sizeof(bind_sa6));
				bind_sa6.sin6_family = AF_INET6;
				ret = inet_pton(AF_INET6, Conns[slot].bind_addr, &bind_sa6.sin6_addr);
				if (ret < 0) {
					snprintf(Conn_error, sizeof(Conn_error),
						"inet_pton(%s) failed", Conns[slot].bind_addr);
					return -1;
				}
				bind_sa6.sin6_port = htons(Conns[slot].bind_port);
				bind_psa = (struct sockaddr *) &bind_sa6;
				bind_sock_len = sizeof(bind_sa6);
			}

			if (Conns[slot].sock_type == SOCK_STREAM) {
				first_state = CONN_STATE_LISTEN;
			} else if (Conns[slot].sock_type == SOCK_DGRAM) {
				do_listen = 0;
				first_state = CONN_STATE_OPEN;
			}
			break;

		case PF_PACKET:
			do_bind = 0;
			do_listen = 0; /*TODO:check this!*/
			first_state = CONN_STATE_OPEN;
			break;

		default:
			snprintf(Conn_error, sizeof(Conn_error),
				"Invalid domain [%d]!", Conns[slot].sock_domain);
			return -1;
	}

	Conns[slot].fd = socket(Conns[slot].sock_domain, Conns[slot].sock_type, Conns[slot].sock_protocol);
	if (Conns[slot].fd == -1) {
		snprintf(Conn_error, sizeof(Conn_error),
			"Cannot create socket (%s, %s, %s) [%s]",
			Conn_domain(&Conns[slot]), Conn_type(&Conns[slot]),
			Conn_get_socket_protocol(&Conns[slot]),
			strerror(errno));
		return -1;
	}

	Conn_setnonblock(Conns[slot].fd);

	if (Conns[slot].sock_domain == PF_INET6) {
		#ifndef IPV6_V6ONLY
			#define IPV6_V6ONLY 26
		#endif
		i = 1;
		setsockopt(Conns[slot].fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&i, sizeof(i));
	}

	if (strlen(Conns[slot].bind_addr) > 0) {
		i = 1;
		setsockopt(Conns[slot].fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));

		ret = bind(Conns[slot].fd, bind_psa, bind_sock_len);
		if (ret < 0) {
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot bind on %s/%d [%s]",
				Conns[slot].bind_addr, Conns[slot].bind_port, strerror(errno));
			goto out_free_fd;
		}
	}

	Conns[slot].events = CONN_POLLIN;
	Conns[slot].revents = 0;

	if (do_listen == 1)
		listen(Conns[slot].fd, 4096);

	if (do_connect == 1) {
		/*TODO:replace connect_a with OPEN?! */
		first_state = CONN_STATE_CONNECT_a;
		Conns[slot].events |= CONN_POLLOUT;
	}

	ret = Conn_engine_add_obj(&Conns[slot]);
	if (ret != 0)
		goto out_free_fd;

	if (do_connect == 1)
		Conn_pending++;

	Conns[slot].state = first_state;

	return 0;

	out_free_fd:
	Conn_free_intern(slot);

	return -1;
}

/*
 * OBSOLETE
 */
struct Conn *Conn_socket_addr(const int domain, const int type,
	const char *addr, const int port)
{
	struct Conn *C;
	int ret;

	C = Conn_alloc();
	if (C == NULL)
		return NULL;

	Conn_set_socket_domain(C, domain);
	Conn_set_socket_type(C, type);
	Conn_set_socket_bind_addr(C, addr);
	Conn_set_socket_bind_port(C, port);

	ret = Conn_commit(C);
	if (ret != 0)
		return NULL;

	return C;
}

/*
 * OBSOLETE
 */
struct Conn *Conn_socket(const int domain, const int type, const int port)
{
	char *addr;

	switch (domain) {
		case PF_INET: addr = "0.0.0.0"; break;
		case PF_INET6: addr = "::"; break;
		default: addr = NULL;
	}

	return Conn_socket_addr(domain, type, addr, port);
}

/*
 * Connects an allocated socket
 * OBSOLETE!
 */
struct Conn *Conn_connect(const int domain, const int type, const char *addr,
	const int port)
{
	struct Conn *X;
	int ret;

	Log(8, "%s(%s, %d)\n",
		__FUNCTION__, addr, port);

	X = Conn_alloc();
	if (!X)
		return NULL;

	Conn_set_socket_domain(X, domain);
	Conn_set_socket_type(X, type);
	Conn_set_socket_addr(X, addr);
	Conn_set_socket_port(X, port);
	ret = Conn_commit(X);
	if (ret != 0)
		return NULL;

	return X;
}

static void Conn_accept(const unsigned int slot)
{
	int fd, err;
	struct sockaddr *pca, *psa;
	struct sockaddr_in ca4, sa4;
	struct sockaddr_in6 ca6, sa6;
	socklen_t cax_len, sax_len;
	struct Conn *X;

	Log(10, "Accepting a connection on slot %u via %s/%d, type %s, domain %s"
		", protocol %s.\n",
		slot, Conns[slot].bind_addr, Conns[slot].bind_port, Conn_type(&Conns[slot]),
		Conn_domain(&Conns[slot]), Conn_get_socket_protocol(&Conns[slot]));

	switch(Conns[slot].sock_domain) {
		case PF_INET:
			pca = (struct sockaddr *) &ca4;
			cax_len = sizeof(ca4);
			psa = (struct sockaddr *) &sa4;
			sax_len = sizeof(sa4);
		break;

		case PF_INET6:
			pca = (struct sockaddr *) &ca6;
			cax_len = sizeof(ca6);
			psa = (struct sockaddr *) &sa6;
			sax_len = sizeof(sa6);
		break;

		default:
			snprintf(Conn_error, sizeof(Conn_error),
				"Cannot deal with domain %d.",
				Conns[slot].sock_domain);
			Conn_error_raise(slot, EAFNOSUPPORT);
			return;
	}

	fd = accept(Conns[slot].fd, pca, &cax_len);
	if (fd == -1) {
		if (errno == EAGAIN)
			return;

		/* TODO: ratelimit */
		Log(9, "WARN: Cannot accept on fd %d [%s].\n",
			Conns[slot].fd, strerror(errno));
		Conn_error_raise(slot, errno);
		return;
	}

	/* After calling Conn_alloc, pointer to slot can change, so C is not valid */
	X = Conn_alloc();
	if (!X) {
		Conn_error_raise(slot, ENOMEM);
		close(fd);
		return;
	}

	X->fd = fd;
	X->type = CONN_TYPE_P2P;
	X->state = CONN_STATE_OPEN;
	X->time_open = Conn_now;
	X->via = Conns[slot].id;
	X->events = CONN_POLLIN;

	Conn_set_socket_domain(X, Conns[slot].sock_domain);
	Conn_set_socket_type(X, Conns[slot].sock_type);
	Conn_set_socket_protocol(X, Conns[slot].sock_protocol);

	Conn_setnonblock(X->fd);

	Conn_set_address(X, 0);
	Conn_set_address(X, 1);

	err = Conn_engine_add_obj(X);
	if (err != 0) {
		Conn_error_raise(slot, err);
		Conn_free_intern(X->slot);
		return;
	}

	if (Conns[slot].cb_accept)
		Conns[slot].cb_accept(X);
	else if (Conn_accept_cb != NULL)
		Conn_accept_cb(X);

	Conn_work_to_do++;

	Conn_total++;
}

static void Conn_accept_allow(void)
{
	unsigned int slot;

	if (Conn_accept_is_allowed == Conn_accept_is_allowed_last)
		return;

	Log(10, "%s: Turning accept allow from %d to %d...\n",
		__FUNCTION__, Conn_accept_is_allowed_last,
		Conn_accept_is_allowed);

	for (slot = 0; slot < Conn_no; slot++) {
		if (Conns[slot].type != CONN_TYPE_MASTER)
			continue;

		if (Conn_accept_is_allowed == 0)
			Conns[slot].events &= ~CONN_POLLIN;
		else	
			Conns[slot].events |= CONN_POLLIN;

		Conn_engine_chg_obj(&Conns[slot]);
	}

	Conn_accept_is_allowed_last = Conn_accept_is_allowed;
}

/*
 * Add tokens to connection
 */
static void Conn_band_update(const unsigned int slot)
{
	long diff;

	/* no need */
	if (Conns[slot].band_width == 0)
		return;

	diff = (Conn_now.tv_sec - Conns[slot].band_lasttime.tv_sec) * 1000000;
	diff += Conn_now.tv_usec - Conns[slot].band_lasttime.tv_usec;
	diff /= 100000;

	/* already added in this hundred of milisecond? */
	if (diff == 0)
		return;

	/* take care of time skew */
	if (diff < 0)
		diff = 1;

	Conns[slot].band_lasttime = Conn_now;
	Conns[slot].band_tokens += diff * Conns[slot].band_width / 10;
	if (Conns[slot].band_tokens > Conns[slot].band_factor * Conns[slot].band_width)
		Conns[slot].band_tokens = Conns[slot].band_factor * Conns[slot].band_width;

	Conns[slot].events |= CONN_POLLOUT;
	Conn_engine_chg_obj(&Conns[slot]);

	Log(debug_band, "\t\tBAND: slot=%u, id=%llu, added tokens -> %u.\n",
		slot, Conns[slot].id, Conns[slot].band_tokens);
}

/*
 * Set the bandwidth for a connection
 * width is in 1b increments
 */
int Conn_band(struct Conn *C, const unsigned int width,
	const unsigned int factor)
{
	unsigned int slot;

	slot = C->slot;

	Log(11, "\tConn_band: slot=%u, id=%llu, width=%u, factor=%u.\n",
		slot, Conns[slot].id, width, factor);

	Conns[slot].band_lasttime = Conn_now;
	Conns[slot].band_width = width;
	Conns[slot].band_factor = factor;
	Conns[slot].band_tokens = factor * width;

	Log(debug_band, "\t\tBAND: lasttime=%d.%06d, width=%u, factor=%u, tokens=%u\n",
		Conns[slot].band_lasttime.tv_sec, Conns[slot].band_lasttime.tv_usec,
		Conns[slot].band_width, Conns[slot].band_factor, Conns[slot].band_tokens);

	return 0;
}

static void Conn_trytoconnect(void)
{
	struct addrinfo hints;
	struct addrinfo *res;
	int i, ret;
	char port[8];

	Log(8, "%s: Conn_pending=%d\n",
		__FUNCTION__, Conn_pending);

	for (i = Conn_no - 1; i >= 0; i--) {
		if (Conns[i].type != CONN_TYPE_P2P)
			continue;

		if ((Conns[i].state == CONN_STATE_CONNECT_0)
			&& (Conns[i].tryat <= Conn_now.tv_sec)) {
			Conns[i].state = CONN_STATE_CONNECT_a;
		}

		if (Conns[i].state != CONN_STATE_CONNECT_a)
			continue;

		Log(9, "\tTry to connect slot=%u, id=%llu, to %s/%d...\n",
			Conns[i].slot, Conns[i].id, Conns[i].addr, Conns[i].port);

		memset(&hints, 0, sizeof(hints));
		if (Conns[i].sock_domain == 0)
			hints.ai_family = PF_UNSPEC;
		else
			hints.ai_family = Conns[i].sock_domain;
		hints.ai_socktype = Conns[i].sock_type;
		/*hints.ai_flags = AI_NUMERICHOST;*/
		hints.ai_flags = AI_ADDRCONFIG;
		snprintf(port, sizeof(port), "%d", Conns[i].port);
		res = NULL;
		ret = getaddrinfo(Conns[i].addr, port, &hints, &res);
		if (ret != 0) {
			Conns[i].state = CONN_STATE_ERROR;
			Conns[i].error_state = CONN_ERROR_GETADDRINFO;
			if (res)
				freeaddrinfo(res);
			continue;
		}

		if (Conns[i].fd == -1) {
			Conns[i].fd = socket(res->ai_family, res->ai_socktype, 0);
			if (Conns[i].fd == -1) {
				Conns[i].state = CONN_STATE_ERROR;
				Conns[i].error_state = CONN_ERROR_SOCKET;
				Conns[i].xerrno = errno;
				freeaddrinfo(res);
				continue;
			}
			Log(10, "\tAllocated socket on fd %d\n",
				Conns[i].fd);

			Conn_setnonblock(Conns[i].fd);

			/* Need POLLOUT to signal when the connection was done. */
			Conns[i].events |= (CONN_POLLIN | CONN_POLLOUT);
			ret = Conn_engine_add_obj(&Conns[i]);
			if (ret != 0) {
				Conns[i].state = CONN_STATE_ERROR;
				Conns[i].error_state = CONN_ERROR_SOCKET;
				freeaddrinfo(res);
				continue;
			}
		}

		/* Set syn time */
		Conns[i].conn_syn = Conn_now;

		Log(11, "\tConnecting...\n");
		ret = connect(Conns[i].fd, res->ai_addr, res->ai_addrlen);
		if ((ret != 0) && (errno != EINPROGRESS)) {
			Conns[i].state = CONN_STATE_ERROR;
			Conns[i].error_state = CONN_ERROR_CONNECT;
			Conns[i].xerrno = errno;
			freeaddrinfo(res);
			continue;
		}

		Conns[i].state = CONN_STATE_CONNECT_b;

		Conn_pending--;
		Conn_total++;

		freeaddrinfo(res);
	}
}

static void Conn_send_cb_i(const unsigned int slot)
{
	ssize_t n;
	unsigned int max;
	int count;
	char *buf;
	int xerrno;
	char *dump;

	Log(10, "Conn_send_cb_i slot=%u, id=%llu, head=%u, tail=%u, size=%u...\n",
		slot, Conns[slot].id, Conns[slot].obuf_head, Conns[slot].obuf_tail,
		Conns[slot].obuf_size);

	if (Conns[slot].obuf == NULL)
		abort();

	buf = Conns[slot].obuf + Conns[slot].obuf_head;
	count = Conns[slot].obuf_tail - Conns[slot].obuf_head;
	if (count == 0) {
		Log(13, "\tConn_send_cb_i: Empty buffer! Strange!\n");
		return;
	}

	max = count;
	if ((Conn_max_send > 0) && (max > Conn_max_send))
		max = Conn_max_send;

	/* bandwidth */
	if (Conns[slot].band_width > 0) {
		if (max > Conns[slot].band_tokens)
			max = Conns[slot].band_tokens;
		if (max == 0) {
			Log(debug_band, "\tBAND: Suspend 100ms the C (no tokens)!\n"); 
			Conns[slot].events &= ~CONN_POLLOUT;
			Conn_engine_chg_obj(&Conns[slot]);
			return;
		}
	}

	again:
	Log(10, "\tsend(fd=%d, buf (head=%u, tail=%u), max=%d (count=%d), 0)...\n",
		Conns[slot].fd, Conns[slot].obuf_head,
		Conns[slot].obuf_tail, max, count);
	n = send(Conns[slot].fd, buf, max, 0);
	xerrno = errno;
	if ((n == -1) && (errno == EINTR))
		goto again;

	if ((n == -1) && (errno == EAGAIN))
		return;

	Log(10, "%s: slot=%u, id=%llu: fd=%d Sent %d bytes [head=%d tail=%d]\n",
		__FUNCTION__, slot, Conns[slot].id, Conns[slot].fd,
		n, Conns[slot].obuf_head, Conns[slot].obuf_tail);
	if (Conn_level >= 10) {
		dump = Conn_dump(buf, n);
		Log(0, "\t%s\n", dump);
		free(dump);
	}

	if (n > 0) {
		Conns[slot].tsend = Conn_now;
		if (n < count) {
			Conns[slot].obuf_head += n;
		} else {
			Conns[slot].obuf_head = 0;
			Conns[slot].obuf_tail = 0;
		}

		Conns[slot].bo += n;
		if (Conns[slot].band_width > 0) {
			Conns[slot].band_tokens -= n;
			Log(debug_band, "\t%s: BAND: Remove %d tokens -> %u...\n",
				__FUNCTION__,
				n, Conns[slot].band_tokens);
		}
	} else {
		Log(0, "%s: Error in send (slot=%u, id=%llu) [%s]\n",
			__FUNCTION__,
			slot, Conns[slot].id, strerror(errno));
		Conns[slot].error_state = CONN_ERROR_SEND;
		Conns[slot].xerrno = xerrno;
	}
}

static void Conn_recv_cb_i(const unsigned int slot)
{
	ssize_t n;
	unsigned int max;
	int r, xerrno;
	char *dump;

	Log(10, "Conn_recv_cb_i slot=%u, id=%llu, head=%u, tail=%u, size=%u...\n",
		slot, Conns[slot].id, Conns[slot].ibuf_head, Conns[slot].ibuf_tail, Conns[slot].ibuf_size);

	if (Conns[slot].ibuf_tail == Conns[slot].ibuf_size) {
		r = Conn_try_expand_buf(slot, 1, 0);
		if (r != 0) {
			Log(1, "MEM: Cannot expand ibuf!\n");
			return;
		}
	}

	max = Conns[slot].ibuf_size - Conns[slot].ibuf_tail;
	if ((Conn_max_recv > 0) && (max > Conn_max_recv))
		max = Conn_max_recv;

	while (1) {
		n = recv(Conns[slot].fd, Conns[slot].ibuf + Conns[slot].ibuf_tail, max, 0);
		if ((n == -1) && (errno == EINTR))
			continue;

		xerrno = errno;

		break;
	}

	Log(10, "%s: slot=%u, id=%llu: fd=%d Received %d bytes.\n",
		__FUNCTION__, slot, Conns[slot].id, Conns[slot].fd, n);

	if (n > 0) {
		if (Conn_level >= 10) {
			dump = Conn_dump(Conns[slot].ibuf + Conns[slot].ibuf_tail, n);
			Log(0, "\t%s\n", dump);
			free(dump);
		}

		Conns[slot].ibuf_tail += n;

		Conns[slot].bi += n;
		Conns[slot].trecv = Conn_now;

		if (Conns[slot].cb_data)
			Conns[slot].cb_data(&Conns[slot]);
		else if (Conn_data_cb)
			Conn_data_cb(&Conns[slot]);
	} else if (n == 0) {
		Log(10, "Remote close sending side (slot=%u, id=%llu)\n",
			slot, Conns[slot].id);
		Conns[slot].error_state = CONN_ERROR_HANGUP;
	} else {
		Log(4, "Error in recv (slot=%u, id=%llu) [%s]\n",
			slot, Conns[slot].id, strerror(errno));
		Conns[slot].error_state = CONN_ERROR_RECV;
		Conns[slot].xerrno = xerrno;
	}
}

/*
 * Callback that is called for every connection
 */
static void Conn_poll_cb(const unsigned int slot, int revents)
{
	Log(12, "%s: slot=%u, id=%llu, revents=%x.\n",
		__FUNCTION__, slot, Conns[slot].id, revents);
	Conns[slot].revents = revents;

	if (Conn_level >= 12)
		Log(12, "\t%s\n", Conn_status_slot(slot));

	/* We should not have events on a free cell */
	if (Conns[slot].state == CONN_STATE_FREE)
		abort();

	if (revents & CONN_POLLHUP) {
		Conns[slot].error_state = CONN_ERROR_HANGUP;
		/* TODO: Add it to the close list to speed it up */
	}

	if (revents & CONN_POLLERR) {
		Conns[slot].error_state = CONN_ERROR_POLL;
		Conns[slot].xerrno = 0; /* TODO: unknown error? */
		/* TODO: CONN_ERROR_POLL is correct here? */
	}

	/* First, test we have a new connection */
	if ((revents & CONN_POLLOUT)
		&& (Conn_ignore(slot) == 0)) {
		/* We just established a connection */
		if (Conns[slot].state == CONN_STATE_CONNECT_b) {
			/*
			 * We do not need POLLOUT now - it was used only for
			 * connect completion.
			 */
			revents &= ~CONN_POLLOUT;
			Conns[slot].events &= ~CONN_POLLOUT;
			Conn_engine_chg_obj(&Conns[slot]);

			Conns[slot].state = CONN_STATE_OPEN;

			Conn_set_address(&Conns[slot], 0);

			Conns[slot].time_open = Conn_now;

			if (Conns[slot].cb_connected != NULL)
				Conns[slot].cb_connected(&Conns[slot]);
			else if (Conn_connected_cb)
				Conn_connected_cb(&Conns[slot]);
		}
	}

	/* Second, test for error or input */
	if ((revents & CONN_POLLIN)
		&& (Conn_ignore(slot) == 0)) {
		if (Conns[slot].type == CONN_TYPE_MASTER) {
			/* C pointer can change under us in Conn_accept->Conn_grow */
			Conn_accept(slot);
		} else {
			if (Conns[slot].cb_recv)
				Conns[slot].cb_recv(&Conns[slot]);
			else if (Conn_recv_cb != NULL)
				Conn_recv_cb(&Conns[slot]);
			else
				Conn_recv_cb_i(slot);
		}
	}

	if ((revents & CONN_POLLOUT)
		&& (Conn_ignore(slot) == 0)) {
		/* We can send data */
		if (Conns[slot].state == CONN_STATE_OPEN) {
			if (Conns[slot].cb_send)
				Conns[slot].cb_send(&Conns[slot]);
			else if (Conn_send_cb != NULL)
				Conn_send_cb(&Conns[slot]);
			else
				Conn_send_cb_i(slot);

			if (Conns[slot].obuf_head == Conns[slot].obuf_tail) {
				Conns[slot].events &= ~CONN_POLLOUT;
				Conn_epoll_chg_obj(&Conns[slot]);
				if (Conns[slot].flags & CONN_FLAGS_CLOSE_AFTER_SEND) {
					Conns[slot].state = CONN_STATE_ERROR;
					Conns[slot].error_state = CONN_ERROR_USERREQ;
				}
			}
		}
	}
}

/*
 * Moving a in-use slot over a free one for compacting reason.
 */
static void Conn_move_slot(const unsigned int dst, const unsigned int src)
{
	struct Conn tmp;

	if (dst == src)
		return;

	Log(10, "\t%s: Moving id %llu from slot %u to slot %d...\n",
		__FUNCTION__, Conns[src].id, src, dst);

	/* We need to save old location because of usefull pointers */
	tmp = Conns[dst];
	Conns[dst] = Conns[src];
	Conns[src] = tmp;

	Conns[dst].slot = dst;

	Conn_engine_move_slot(dst, src);
}

/*
 * Does the polling of file descriptors.
 * Returns: -1 on error, 0 nothing to do, 1 if some work was done
 * timeout is in 1/1000 seconds increments.
 */
int Conn_poll(const int timeout)
{
	int ret;
	int timeout2;
	unsigned int slot, last;

	Log(9, "%s: timeout=%d Conn_no=%d, Conn_work_to_do=%u)\n",
		__FUNCTION__, timeout, Conn_no, Conn_work_to_do);

	if (timeout == -1)
		timeout2 = 1000;
	else
		timeout2 = timeout;

	loop:
	if (Conn_must_stop == 1)
		return 0;

	if (Conn_work_to_do == 0) {
		Log(9, "%s: work_to_do is 0, so return 0!\n",
			__FUNCTION__);
		return 0;
	}

	if (Conn_pending > 0)
		Conn_trytoconnect();

	ret = Conn_engine_poll(timeout2, Conn_poll_cb);
	if (ret < 0)
		return -1;

	Log(9, "\tDo compacting, expiration and band stuff (%d event(s))...\n",
		ret);
	slot = 0;
	while (slot < Conn_no) {
		/*
		 * Save last position because Conn_free_intern
		 * decrements Conn_no
		 */
		last = Conn_no - 1;

		/* Closing connection if it is in error state */
		if (Conns[slot].error_state > 0) {
			Log(11, "\tSlot %u in error, exchange with pos %u.\n",
				slot, last);
			Conn_move_slot(slot, last);
			Conn_free_intern(last);
			continue;
		}

		/* No commit done yet */
		if (Conns[slot].state == CONN_STATE_EMPTY) {
			slot++;
			continue;
		}

		if (Conns[slot].state == CONN_STATE_FREE) {
			/* Must not happen! */
			abort();
		}

		/* test if it expired/timeout */
		Conn_expire(slot);

		/* add tokens */
		Conn_band_update(slot);

		slot++;
	}

	/* Blocking accept if full queue or unblock if not */
	Conn_accept_allow();

	if (timeout == -1)
		goto loop;

	return ret;
}

/*
 * Returns the lifetime of a connection
 */
unsigned long long Conn_lifetime(struct Conn *C)
{
	unsigned int slot;

	slot = C->slot;

	return Conn_time_diff(&Conn_now, &Conns[slot].time_open);
}


Mode Type Size Ref File
100644 blob 70 9964a59b5d89f394cc4250ed6d6ce67a5f0cd196 .gitignore
100644 blob 1945 fecf0e7a7e8580485101a179685aedc7e00affbb Changelog.pre109
100644 blob 32589 ce307c8127733a93d036be69555ce39f5d5c0a6d Conn.c
100644 blob 1470 afedf88dd9c3b275e9c7d83d6572c2fcce60e50b Conn.h
100644 blob 726 64b1bad93a84f87c3e93fc24ac5341db691ea578 Conn.spec.in
100644 blob 66 68138d781ca754b15e14c687da91ee261b2c41f3 Conn_config.h.in
100644 blob 30551 727bb838714a57b5005fb74de778a09b644c1250 Conn_engine_core.c
100644 blob 9204 f975897b64a208c33f380addd293832ff2fd24f6 Conn_engine_core.h
100644 blob 3661 999f465eb29de894b978bf8c5f200acc4c1e21b8 Conn_engine_epoll.c
100644 blob 610 b8597ef7043fa9b6ccd58c0f484f040e8621cc95 Conn_engine_epoll.h
100644 blob 2699 cdb9ad643a95b8777c608ef1ca8de10a3599ed4a Conn_engine_poll.c
100644 blob 597 183f7af0b0688200fa8f69527c03ee075c83df12 Conn_engine_poll.h
100644 blob 30 d987fa5df957830331139935d517009e2911b0cf INSTALL
100644 blob 25275 92b8903ff3fea7f49ef5c041b67a087bca21c5ec LICENSE
100644 blob 1311 3c820df3b36b4bc844c52bc8c7e86f7843b67bb6 Makefile.in
100644 blob 192 5b11bdfb23857d8588845465aef993b320596b44 README
100644 blob 2078 1425b11d02e3cd49be474d575a03d0698ce37228 TODO
100755 blob 23 d33bb6c4ecdce1390ce1db3c79ea3b93e22ea755 configure
040000 tree - d4c9c4a69c5cfa2a84316967185f1661b6817779 docs
100755 blob 10910 8fcd88850fe239f609c0d7bb7e09f5b9f853f1b2 duilder
100644 blob 276 1c2b695f44e694e60ac10db747d5c9093472859c duilder.conf
040000 tree - 44bbe0aba6bc0116534b304ab4a5446f282649c8 examples
040000 tree - 751693d0803f700dd060788cc9383aa24b472267 tests
Hints:
Before first commit, do not forget to setup your git environment:
git config --global user.name "your_name_here"
git config --global user.email "your@email_here"

Clone this repository using HTTP(S):
git clone https://rocketgit.com/user/catalinux/Conn

Clone this repository using ssh (do not forget to upload a key first):
git clone ssh://rocketgit@ssh.rocketgit.com/user/catalinux/Conn

Clone this repository using git:
git clone git://git.rocketgit.com/user/catalinux/Conn

You are allowed to anonymously push to this repository.
This means that your pushed commits will automatically be transformed into a merge request:
... clone the repository ...
... make some changes and some commits ...
git push origin main