/*
* Author: Catalin(ux) M BOIE <catab at embedromix.ro>
* Date: 2004
* Description: Some functions to help writing network servers and clients,
* both ipv4 and ipv6.
* Licence: LGPL
*/
#include "Conn_config.h"
#include <sys/timerfd.h>
#include <stdarg.h>
#include <unistd.h>
#include <ctype.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/time.h>
#include <netinet/tcp.h>
#include <errno.h>
#include <signal.h>
#include <sched.h>
#include "Conn_intern.h"
#include "Conn_web.h"
/* ############## static ########### */
static int Conn_epoll_fd;
static struct epoll_event Conn_epoll_events[CONN_EVENTS_SLOTS];
static unsigned int Conn_max_reached = 0;
static unsigned int Conn_default_ibuf = 256;
static unsigned int Conn_default_obuf = 256;
static unsigned int Conn_max_ibuf = 4096000;
static unsigned int Conn_max_obuf = 4096000;
/* Max bytes to enqueue on one send/recv call */
static unsigned int Conn_max_send = 32 * 1024;
static unsigned int Conn_max_recv = 32 * 1024;
static unsigned int Conn_no = 0;
static unsigned int Conn_work_to_do = 0;
static unsigned int Conn_max = 0;
static unsigned long Conn_total = 0;
static unsigned int Conn_start = 0;
static unsigned int Conn_pending = 0;
static __thread struct timeval Conn_now;
static unsigned short Conn_debug_level = 0; /* debug level */
/* memory stuff */
static unsigned long long Conn_mem_buffers_in = 0;
static unsigned long long Conn_mem_buffers_out = 0;
#if 1
static unsigned long long Conn_mem_structs = 0;
#endif
static struct Conn *Conns = NULL;
static unsigned int Conn_inited = 0;
static unsigned int Conn_allocated = 0;
static unsigned long long Conn_id = 1;
static unsigned int Conn_must_stop = 0;
static __thread char Conn_error[512];
static __thread char log_info[32];
static int Conn_log_fd = 2;
static unsigned short debug_band = 11;
static struct Conn_pool Conn_masters; /* Keeps track of listening sockets */
static struct Conn_pool Conn_free; /* Keeps track of free Conns */
static unsigned char Conn_reuseport_available;
/* Prototypes */
static void Conn_default_cbs_accept(struct Conn *C);
static void Conn_default_cbs_recv(struct Conn *C);
static void Conn_default_cbs_send(struct Conn *C);
static void Conn_default_cbs_data(struct Conn *C);
static void Conn_default_cbs_close(struct Conn *C);
static void Conn_default_cbs_trigger(struct Conn *C);
static void Conn_default_cbs_error(struct Conn *C);
static void Conn_default_cbs_connected(struct Conn *C);
static void Conn_default_cbs_accept_error(struct Conn *C);
static struct Conn_cbs Conn_default_cbs =
{
.accept = Conn_default_cbs_accept,
.recv = Conn_default_cbs_recv,
.send = Conn_default_cbs_send,
.data = Conn_default_cbs_data,
.close = Conn_default_cbs_close,
.trigger = Conn_default_cbs_trigger,
.error = Conn_default_cbs_error,
.connected = Conn_default_cbs_connected,
.accept_error = Conn_default_cbs_accept_error
};
/* ############## wpool ############# */
/*
* Gets a reference to a wpool structure
*/
static void Conn_wpool_get(struct Conn_wpool *wp)
{
wp->refs++;
}
/*
* Decrements usage
*/
static void Conn_wpool_put(struct Conn_wpool *wp)
{
wp->refs--;
if (wp->refs == 0)
Conn_wpool_destroy(wp);
}
/*
* Associate a Conn with a workers pool
*/
void Conn_set_wp(struct Conn *C, struct Conn_wpool *wp)
{
Conn_wpool_get(wp);
C->wp = wp;
}
/*
* Disassociate a Conn with a workers pool
*/
void Conn_del_wp(struct Conn *C, struct Conn_wpool *wp)
{
C->wp = NULL;
Conn_wpool_put(wp);
}
/*
* Dumps some stats about a worker
*/
static void Conn_wpool_worker_stats(const struct Conn_wpool_worker *w)
{
Log(0, "Worker %hu: in_clients: %llu\n",
w->id, w->in_clients);
}
/*
* Creates a workers pool
* @M is the master struct. It is needed to enqueue the master fd to workers.
*/
struct Conn_wpool *Conn_wpool_create(const unsigned short workers)
{
struct Conn_wpool *ret;
unsigned short i;
struct Conn_wpool_worker *ww;
ret = malloc(sizeof(struct Conn_wpool));
if (ret == NULL) {
Log(0, "Cannot alloc memory for wpool!\n");
goto out;
}
ret->ww = malloc(workers * sizeof(struct Conn_wpool_worker));
if (ret->ww == NULL) {
Log(0, "Cannot alloc memory for workers!\n");
goto free_ret;
}
/* TODO: what I am doing with this?! Signaling between works and pool? */
ret->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (ret->epoll_fd == -1) {
Log(0, "Cannot create epoll fd (%s)\n", strerror(errno));
goto free_ww;
}
/* Start threads */
for (i = 0; i < workers; i++) {
ww = &ret->ww[i];
ww->id = i;
}
ret->workers = workers;
ret->next = 0;
ret->refs = 0;
Log(10, "\tPool created with success.\n");
return ret;
free_ww:
free(ret->ww);
free_ret:
free(ret);
out:
Log(5, "\tFailed to create wpool.\n");
return NULL;
}
/*
* Destroys a worker pool
*/
int Conn_wpool_destroy(struct Conn_wpool *wp)
{
int i;
for (i = 0; i < wp->workers; i++) {
struct Conn_wpool_worker *w;
w = &wp->ww[i];
if (w->inited == 0)
continue;
/* child will exit if control is closed */
close(w->control);
}
close(wp->epoll_fd);
free(wp->ww);
free(wp);
return 0;
}
/* ########### pool ############## */
/*
* Inits a conn pool
*/
static void pool_init(struct Conn_pool *x)
{
x->head = NULL;
x->allocated = 0;
x->next_block = 0;
}
/*
* Grow Conn structures (cells)
* Caller must have x->lock taken.
*/
__cold static int pool_grow(struct Conn_pool *x, const unsigned int increment)
{
unsigned int i;
struct Conn *p, *prev;
Log(1, "%s Try to grow pool from %d to %d.\n",
__func__, x->allocated, x->allocated + increment);
p = (struct Conn *) malloc(increment * sizeof(struct Conn));
if (unlikely(p == NULL))
return -1;
i = x->next_block;
x->blocks[i] = p;
x->next_block++;
/* Add them to free pool */
prev = NULL;
for (i = 0; i < increment; i++) {
//Log(1, "%s: ### Allocated %p.\n", __func__, &p[i]);
/* We are forced to do some inits here */
p[i].obuf_size = 0;
p[i].ibuf_size = 0;
p[i].obuf = NULL;
p[i].ibuf = NULL;
if (likely(prev))
prev->next = &p[i];
prev = &p[i];
}
/* Now, 'prev' contain last element; we will reuse it. */
prev->next = NULL;
if (likely(x->head == NULL))
x->head = &p[0];
else
x->tail->next = &p[0];
x->tail = prev;
x->allocated += increment;
return 0;
}
/* ############## basic Conn stuff ############ */
#if 0
/*
* Sends a fd using comm_fd as a communication medium
* man 3 cmsg
*/
static int Conn_send_fd(int comm_fd, int fd)
{
struct msghdr m = { 0 };
struct cmsghdr *cm;
struct iovec iov;
int *p, r;
union {
struct cmsghdr align;
char buf[CMSG_SPACE(sizeof(int))];
} u;
unsigned char junk = 1;
iov.iov_base = &junk;
iov.iov_len = 1;
m.msg_name = NULL;
m.msg_namelen = 0;
m.msg_control = u.buf;
m.msg_controllen = sizeof(u.buf);
m.msg_iov = &iov;
m.msg_iovlen = 1;
cm = CMSG_FIRSTHDR(&m);
cm->cmsg_len = CMSG_LEN(sizeof(int));
cm->cmsg_level = SOL_SOCKET;
cm->cmsg_type = SCM_RIGHTS;
p = (int *) CMSG_DATA(cm);
*p = fd;
r = sendmsg(comm_fd, &m, 0 /*flags*/);
if (r == -1) {
Log(1, "%s: cannot send message: %s\n",
__func__, strerror(errno));
return -1;
}
Log(1, "%s: Sent fd %d using comm fd %d\n",
__func__, fd, comm_fd);
return 0;
}
/*
* Receive a fd using comm_fd as a communication medium
* man 3 cmsg
*/
static int Conn_recv_fd(int comm_fd)
{
int r;
struct msghdr m;
struct cmsghdr *cm;
struct iovec iov;
int *p;
union {
struct cmsghdr cm;
char buf[CMSG_SPACE(sizeof(int))];
} u;
unsigned char junk;
u.cm.cmsg_len = CMSG_LEN(sizeof(int));
u.cm.cmsg_level = SOL_SOCKET;
u.cm.cmsg_type = SCM_RIGHTS;
iov.iov_base = &junk;
iov.iov_len = 1;
m.msg_control = u.buf;
m.msg_controllen = sizeof(u.buf);
m.msg_iov = &iov;
m.msg_iovlen = 1;
m.msg_name = NULL;
m.msg_namelen = 0;
r = recvmsg(comm_fd, &m, 0 /*flags*/);
if (r == -1) {
Log(1, "%s: Cannot receive msg: %s\n",
__func__, strerror(errno));
return -1;
}
Log(10, "DEBUG: recvmsg returned: %d\n", r);
cm = CMSG_FIRSTHDR(&m);
if (!cm) {
Log(1, "%s: Invalid first header.\n", __func__);
return -1;
}
if ((cm->cmsg_level != SOL_SOCKET)
|| (cm->cmsg_type != SCM_RIGHTS)) {
Log(1, "%s: Invalid level/type: %d/%d\n",
__func__, cm->cmsg_level, cm->cmsg_type);
return -1;
}
p = (int *) CMSG_DATA(cm);
Log(5, "%s: Received fd %d using comm_fd %d\n",
__func__, *p, comm_fd);
return *p;
}
#endif
#if 1
__cold void Log(const unsigned short level, char *format, ...)
{
va_list ap;
size_t len;
char line[512];
if (likely(level > Conn_debug_level))
return;
snprintf(line, sizeof(line), "%ld.%06ld %s ",
Conn_now.tv_sec, Conn_now.tv_usec, log_info);
len = strlen(line);
va_start(ap, format);
vsnprintf(line + len, sizeof(line) - len, format, ap);
va_end(ap);
write(Conn_log_fd, line, strlen(line));
}
#else
inline void Log(const unsigned short level, char *format, ...)
{
return;
}
#endif
/*
* Set extra log info to distinguish between workers
*/
static void Conn_log_set_info(const char *info)
{
snprintf(log_info, sizeof(log_info), "%s", info);
}
char *Conn_dump(const void *buf_src0, const size_t len_src)
{
unsigned int i, j;
char tmp[3];
char *buf_dst;
const unsigned char *buf_src = buf_src0;
Log(30, "\tConn_dump(%p, len=%zu)\n",
buf_src, len_src);
buf_dst = malloc(len_src * 4 + 1);
if (buf_dst == NULL)
return strdup("Memory allocation error1!");
j = 0;
for (i = 0; i < len_src; i++) {
unsigned char c;
c = buf_src[i];
if ((c < 32) || (c > 127)) {
buf_dst[j++] = '[';
snprintf(tmp, sizeof(tmp), "%02x", c);
buf_dst[j++] = tmp[0];
buf_dst[j++] = tmp[1];
buf_dst[j++] = ']';
} else {
buf_dst[j++] = (char) c;
}
}
buf_dst[j] = '\0';
return buf_dst;
}
char *Conn_dumphex(const void *buf_src0, const size_t len_src)
{
unsigned int i, j;
char tmp[3];
char *buf_dst;
const unsigned char *buf_src = buf_src0;
Log(30, "\tConn_dumphex(%p, len=%zu)\n",
buf_src, len_src);
buf_dst = malloc(len_src * 2 + 1);
if (buf_dst == NULL)
return strdup("Memory allocation error1!");
j = 0;
for (i = 0; i < len_src; i++) {
unsigned char c;
c = buf_src[i];
snprintf(tmp, sizeof(tmp), "%02x", c);
buf_dst[j++] = tmp[0];
buf_dst[j++] = tmp[1];
}
buf_dst[j] = '\0';
return buf_dst;
}
void Conn_debug(int fd, const unsigned short debug)
{
Conn_log_fd = fd;
Conn_debug_level = debug;
}
/*
* Do not use it yet, it sucks (the paras)
*/
static void Conn_poll_status(const unsigned int ev, char *ret)
{
int i = 0;
strcpy(ret, "");
if (ev & EPOLLIN) ret[i++] = 'I';
if (ev & EPOLLPRI) ret[i++] = 'P';
if (ev & EPOLLOUT) ret[i++] = 'O';
if (ev & EPOLLERR) ret[i++] = 'E';
if (ev & EPOLLHUP) ret[i++] = 'H';
if (ev & EPOLLRDHUP) ret[i++] = 'h';
ret[i++] = '\0';
}
static char *Conn_domain(const struct Conn *C)
{
switch (C->sock_domain) {
case PF_INET: return "IPv4";
case PF_INET6: return "IPv6";
case PF_PACKET: return "PACKET";
default: return "?";
}
}
static char *Conn_type(const struct Conn *C)
{
switch (C->sock_type) {
case SOCK_STREAM: return "stream";
case SOCK_DGRAM: return "dgram";
case SOCK_RAW: return "raw";
default: return "?";
}
}
static char *Conn_get_socket_protocol(const struct Conn *C)
{
switch (C->sock_protocol) {
case IPPROTO_IP: return "IP";
default: return "?";
}
}
static char *Conn_socktype(const struct Conn *C)
{
switch (C->type) {
case CONN_TYPE_UNK: return "unk";
case CONN_TYPE_MASTER: return "master";
case CONN_TYPE_P2P: return "p2p";
default: return "?";
}
}
/* ############ Conn Callbacks ########### */
static void Conn_default_cbs_accept(struct Conn *C)
{
Log(10, "%llu %s A conn was accepted from %s/%d\n",
C->id, __func__, Conn_addr_remote(C), Conn_port_remote(C));
}
/*
* Expand the requested buffer
* what = 0 for out buffer, what = 1 for input buffer
* returns 0 if OK, -1 on error
*/
__cold static int Conn_try_expand_buf(struct Conn *C, const char what,
const unsigned int needed)
{
char *p;
unsigned int hm;
unsigned int old_size, amount, head, tail;
unsigned int max_buf;
char *pbuf;
if (what == 0) {
head = C->obuf_head;
tail = C->obuf_tail;
old_size = C->obuf_size;
max_buf = Conn_max_obuf;
pbuf = C->obuf;
} else {
head = C->ibuf_head;
tail = C->ibuf_tail;
old_size = C->ibuf_size;
max_buf = Conn_max_ibuf;
pbuf = C->ibuf;
}
/* Do we have enough room? */
if (old_size - tail >= needed)
return 0;
Log(10, "\tTry to expand buffer for [%s] have=%d needed=%d"
" head=%u tail=%u.\n",
what == 0 ? "O" : "I", old_size, old_size + needed, head, tail);
amount = needed - (old_size - tail);
/* Do not alloc less than 128 bytes */
if (amount < 128)
amount = 128;
hm = old_size + amount;
if ((max_buf > 0) && (hm > max_buf))
hm = max_buf;
/* Seems we are not allowed to grow larger */
if (hm <= old_size) {
Log(1, "Cannot obtain needed byte(s):"
" hm(%u) <= old_size(%u)!\n",
hm, old_size);
return -1;
}
p = realloc(pbuf, hm);
if (p == NULL) {
Log(1, "Cannot realloc pbuf to %u byte(s)!\n", hm);
return -1;
}
if (what == 0) {
C->obuf = p;
C->obuf_size = hm;
Conn_mem_buffers_out += hm - old_size;
} else {
C->ibuf = p;
C->ibuf_size = hm;
Conn_mem_buffers_in += hm - old_size;
}
return 0;
}
/*
* Eat @bytes from head of input buffer
*/
void Conn_eat(struct Conn *C, const unsigned int bytes)
{
/* advance head */
C->ibuf_head += bytes;
if (C->ibuf_head >= C->ibuf_tail) {
C->ibuf_head = 0;
C->ibuf_tail = 0;
}
Log(10, "%llu %s(bytes=%u) head=%u tail=%u qlen=%u\n",
C->id, __func__, bytes, C->ibuf_head, C->ibuf_tail,
Conn_iqlen(C));
}
/*
* Eat all input buffer
*/
void Conn_eatall(struct Conn *C)
{
C->ibuf_head = 0;
C->ibuf_tail = 0;
}
static char *Conn_state(const struct Conn *C)
{
switch (C->state) {
case CONN_STATE_FREE: return "FREE";
case CONN_STATE_EMPTY: return "EMPTY";
case CONN_STATE_OPEN: return "OPEN";
case CONN_STATE_LISTEN: return "LISTEN";
case CONN_STATE_CONNECT_0: return "CONN0";
case CONN_STATE_CONNECT_a: return "CONNa";
case CONN_STATE_CONNECT_b: return "CONNb";
case CONN_STATE_ERROR: return "ERROR";
default: return "BUG?";
}
}
/*
* Raise an error. It is just a little helper.
*/
static void Conn_error_raise(struct Conn *C, const int err)
{
if (err != 0)
C->xerrno = err;
if (C->cbs.error)
C->cbs.error(C);
}
__hot static void Conn_free_intern(struct Conn *C)
{
struct Conn *p, *prev;
if (unlikely(Conn_debug_level > 9)) {
Log(0, "%llu %s Cleaning-up fd=%d in state %s [%s]...\n",
C->id, __func__, C->fd, Conn_state(C), Conn_errno(C));
}
if (C->error_state != CONN_ERROR_USERREQ)
Conn_error_raise(C, 0);
if ((C->state == CONN_STATE_OPEN)
|| (C->state == CONN_STATE_LISTEN)
|| (C->state == CONN_STATE_ERROR)) {
if (C->cbs.close)
C->cbs.close(C);
}
if (likely(C->fd > -1)) {
Log(10, "\tClosing fd %d\n", C->fd);
close(C->fd);
C->fd = -1;
}
C->web = NULL;
if (C->web_req) {
free(C->web_req);
C->web_req = NULL;
}
if (unlikely(C->type == CONN_TYPE_MASTER))
Conn_wpool_put(C->wp);
if (unlikely(C->auto_reconnect == 1)) {
/* Reset tsend, else we enter in a timeout error loop */
C->tsend.tv_sec = 0;
C->tsend.tv_usec = 0;
/* Reset the connection attempt time */
C->conn_syn.tv_sec = 0;
C->conn_syn.tv_usec = 0;
/* Misc */
C->error_state = 0;
C->tryat = (unsigned int) Conn_now.tv_sec + C->delay;
C->state = CONN_STATE_CONNECT_0;
C->ibuf_head = 0;
C->ibuf_tail = 0;
C->obuf_head = 0;
C->obuf_tail = 0;
Conn_pending++;
} else {
C->type = CONN_TYPE_UNK;
C->state = CONN_STATE_FREE;
/* Decrement the number of busy connections */
Conn_no--;
Conn_work_to_do--;
/* Remove from masters list */
if (unlikely(C->type == CONN_TYPE_MASTER)) {
Log(0, "\t### Remove from Conn_masters\n");
prev = NULL;
p = Conn_masters.head;
while (p) {
if (p != C) {
prev = p;
p = p -> next;
continue;
}
if (prev == NULL)
Conn_masters.head = p->next;
else
prev->next = p->next;
if (Conn_masters.tail == C)
Conn_masters.tail = prev;
break;
}
C->next = NULL;
}
if (C->private) {
free(C->private);
C->private = NULL;
}
/* Add it to free list of the worker pool */
if (likely(C->worker)) {
struct Conn_wpool_worker *worker = C->worker;
//Log(0, "\tworker->free before: %p/%p\n",
// worker->free_head, worker->free_tail);
C->next = NULL;
C->worker = NULL; /* TODO: not needed */
if (unlikely(worker->free.head == NULL)) {
worker->free.head = C;
worker->free.tail = C;
} else {
C->next = worker->free.head;
worker->free.head = C;
}
//Log(0, "\tworker->free after: %p/%p worker->free->next=%p\n",
// ww->free.head, worker->free.tail, worker->free.head->next);
//if ((worker->free.head == worker->free.tail) && (worker->free.head->next != NULL)) {
// Log(0, "\t### List is corrupted!\n");
// abort();
//}
}
}
}
/*
* If we have data, set the flag to do the closing after send.
*/
__hot void Conn_close(struct Conn *C)
{
Log(10, "%llu %s Mark for closing (head=%u tail=%u)\n",
C->id, __func__, C->obuf_head, C->obuf_tail);
if (C->obuf_head == C->obuf_tail) {
Log(15, "\tNothing to send, call free_intern\n");
C->error_state = CONN_ERROR_USERREQ;
Conn_free_intern(C);
} else {
C->shutdown_after_send = 1;
Log(9, "\tSet SHUTDOWN_AFTER_SEND;"
" We have data in out buffer; kick sending.\n");
/* TODO: send and receive are sensitive.
if (likely(C->cbs.send))
C->cbs.send(C);
*/
Conn_default_cbs_send(C);
}
}
/*
* Instructs Conn to stop
*/
void Conn_stop(void)
{
Conn_must_stop = 1;
}
/*
* Returns the fd associated with C
*/
static int Conn_get_fd(const struct Conn *C)
{
return C->fd;
}
/*
* Returns the timeval of the last packet
*/
void Conn_last_time(const struct Conn *C, struct timeval *tv)
{
*tv = C->trecv;
}
/*
* Set some internal parameters
*/
void Conn_set(struct Conn *C, const unsigned int var, const int val)
{
int fd;
fd = Conn_get_fd(C);
switch (var) {
case CONN_PARA_AUTO_RECONNECT:
C->auto_reconnect = (val == 0) ? 0 : 1;
break;
case CONN_PARA_RECONNECT_DELAY:
C->delay = (unsigned int) val;
break;
case CONN_PARA_IDLE_TIME:
C->idle = (unsigned int) val;
break;
case CONN_PARA_READ_TIMEOUT:
C->read_timeout = (unsigned int) val;
break;
case CONN_PARA_CONN_TIMEOUT:
C->conn_timeout = (unsigned int) val;
break;
case CONN_PARA_TRIGGER:
C->trigger = (unsigned int) val;
C->last_trigger = 0;
break;
case CONN_PARA_IBUF:
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val));
break;
case CONN_PARA_OBUF:
setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val));
break;
}
}
/*
* Adds an fd to poll system
*/
static inline int Conn_add_obj(int epoll_fd, struct Conn *C,
const unsigned int events)
{
int ret;
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
ev.data.ptr = C;
ev.events = events;
ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, C->fd, &ev);
if (likely(ret == 0)) {
Log(10, "%s fd %d added with success on epoll_fd %d\n",
__func__, C->fd, epoll_fd);
return 0;
}
Log(1, "%s Could not add fd %d to epoll_fd %d, events 0x%x (%s)!\n",
__func__, C->fd, epoll_fd, events, strerror(errno));
C->xerrno = errno;
snprintf(Conn_error, sizeof(Conn_error), "%s",
strerror(C->xerrno));
return C->xerrno;
}
/*
* Change events for a fd
*/
static inline int Conn_change_obj(int epoll_fd, struct Conn *C,
const unsigned int events)
{
int ret;
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
ev.data.ptr = C;
ev.events = events;
ret = epoll_ctl(epoll_fd, EPOLL_CTL_MOD, C->fd, &ev);
if (likely(ret == 0))
return 0;
Log(1, "%s Could not change fd %d to epoll_fd %d, events 0x%x (%s)!\n",
__func__, C->fd, epoll_fd, events, strerror(errno));
C->xerrno = errno;
snprintf(Conn_error, sizeof(Conn_error), "%s",
strerror(C->xerrno));
return C->xerrno;
}
/*
* Called when EPOLLIN present on a fd.
*/
__hot static void Conn_default_cbs_recv(struct Conn *C)
{
ssize_t n;
unsigned int xfer_in_this_call;
int r, xerrno;
char *dump, call_callback;
Log(10, "%llu %s fd=%d head=%u tail=%u size=%u...\n",
C->id, __func__, C->fd,
C->ibuf_head, C->ibuf_tail, C->ibuf_size);
call_callback = 0;
xfer_in_this_call = 0;
while (1) {
unsigned int max;
if ((Conn_max_recv > 0) && (xfer_in_this_call >= Conn_max_recv)) {
Log(3, "\tRecv limit reached. Add to extra queue.\n");
break;
}
if (unlikely(C->ibuf_tail == C->ibuf_size)) {
r = Conn_try_expand_buf(C, 1, Conn_default_ibuf);
if (unlikely(r != 0)) {
C->error_state = CONN_ERROR_MEM;
return;
}
}
max = C->ibuf_size - C->ibuf_tail;
if ((Conn_max_recv > 0) && (max > Conn_max_recv))
max = Conn_max_recv;
while (1) {
n = recv(C->fd, C->ibuf + C->ibuf_tail, max, 0);
if (unlikely((n == -1) && (errno == EINTR)))
continue;
xerrno = errno;
break;
}
if ((n == -1) && (errno == EAGAIN)) {
Log(10, "\tEAGAIN received. Wait next round.\n");
break;
}
if (unlikely(n < 0)) {
Log(0, "\tError receiving [%s]\n", strerror(errno));
C->error_state = CONN_ERROR_RECV;
C->xerrno = xerrno;
Conn_free_intern(C);
break;
}
if (unlikely(n == 0)) {
Log(5, "\tWe received 0 bytes; oqlen=%lu\n", Conn_oqlen(C));
if (Conn_oqlen(C) == 0) {
/* TODO: Why we do not shutdown? Maybe we just
* sent data. */
/* Nothing to send, we can close the connection */
Log(10, "\tRemote hangup and nothing in out"
" buffer. Close.\n");
C->state = CONN_STATE_ERROR;
C->error_state = CONN_ERROR_HANGUP;
Conn_free_intern(C);
} else if (C->close_after_send == 0) {
Log(10, "\tRemote closed sending side."
" Set CLOSE_AFTER_SEND and ignore POLLIN\n");
C->close_after_send = 1;
Conn_change_obj(C->worker->epoll_fd, C, EPOLLOUT);
}
break;
}
Log(10, "\tReceived %d byte(s)\n", n);
call_callback = 1;
if (unlikely(Conn_debug_level >= 10)) {
dump = Conn_dump(C->ibuf + C->ibuf_tail, (unsigned int) n);
Log(0, "\tReceived: %s\n", dump);
free(dump);
}
C->trecv = Conn_now;
C->ibuf_tail += (unsigned int) n;
C->bi += (unsigned int) n;
xfer_in_this_call += (unsigned int) n;
if (n < max) {
Log(3, "\tReaded less(%d < %d) than what we requested\n",
n, max);
break;
}
}
if (likely(call_callback == 1)) {
Log(10, "\t%s\n", C->web ? "We have ww attached" : "We have no ww attached");
if (C->web)
Conn_web_dispatch(C);
else if (C->cbs.data)
C->cbs.data(C);
}
}
__hot static void Conn_default_cbs_send(struct Conn *C)
{
ssize_t n;
int xerrno, r;
char *dump;
Log(5, "%llu %s C=%p next=%p fd=%d head=%u tail=%u size=%u\n",
C->id, __func__, C, C->next, C->fd, C->obuf_head,
C->obuf_tail, C->obuf_size);
while (1) {
unsigned int max, count;
char *buf;
buf = C->obuf + C->obuf_head;
count = Conn_oqlen(C);
if (unlikely(count == 0)) {
Log(5, "\tEmpty output buffer.\n");
if (unlikely(C->close_after_send == 1)) {
Log(9, "\tCLOSE_AFTER_SEND is set"
", close connection\n");
/* TODO: this is not really an error; switch to other code */
C->state = CONN_STATE_ERROR;
C->error_state = CONN_ERROR_USERREQ;
Conn_free_intern(C);
return;
}
if (C->shutdown_after_send == 1) {
Log(9, "\tSHUTDOWN_AFTER_SEND is set"
", shutdown write\n");
C->state = CONN_STATE_ERROR;
r = shutdown(C->fd, SHUT_WR);
if (likely(r == 0)) {
/* TODO: this is not really an error; switch to other code */
C->error_state = CONN_ERROR_USERREQ;
return;
}
Log(1, "\tshutdown returned error [%s]!\n",
strerror(errno));
/* TODO: We must set a proper C->state */
C->error_state = CONN_ERROR_HANGUP;
Conn_free_intern(C);
return;
}
return; /* TODO: not clear if we could be here. */
}
max = count;
if ((Conn_max_send > 0) && (max > Conn_max_send))
max = Conn_max_send;
/* bandwidth */
if (unlikely(C->band_width > 0)) {
if (max > C->band_tokens)
max = C->band_tokens;
if (max == 0) {
Log(debug_band, "\tBAND: Suspend 100ms"
" the C (no tokens)!\n");
break;
}
}
Log(10, "\tsend(fd=%d, buf (head=%u, tail=%u)"
", max=%d (count=%d), 0)...\n",
C->fd, C->obuf_head,
C->obuf_tail, max, count);
while (1) {
n = send(C->fd, buf, max, 0);
xerrno = errno;
if (unlikely((n == -1) && (errno == EINTR)))
continue;
break;
}
if ((n == -1) && (errno == EAGAIN)) {
Log(10, "\tEAGAIN received. Wait next round.\n");
break;
}
Log(10, "\tSent %d byte(s) [head=%d tail=%d]\n",
n, C->obuf_head, C->obuf_tail);
if (unlikely(n <= 0)) {
Log(0, "%llu %s Error in sending [%s]\n",
C->id, __func__, strerror(errno));
C->error_state = CONN_ERROR_SEND;
C->xerrno = xerrno;
Conn_free_intern(C);
break;
}
if (unlikely(Conn_debug_level >= 10)) {
dump = Conn_dump(buf, (unsigned int) n);
Log(0, "\tSent: %s\n", dump);
free(dump);
}
C->tsend = Conn_now;
if (n < count) {
C->obuf_head += (unsigned int) n;
} else {
C->obuf_head = 0;
C->obuf_tail = 0;
}
C->bo += (unsigned int) n;
if (C->band_width > 0) {
/* What if band_tokens < n?! */
C->band_tokens -= (unsigned int) n;
Log(debug_band, "\tBAND: Remove %d tokens -> %u...\n",
n, C->band_tokens);
}
if (n < max) {
Log(1, "\tSent less than what we requested."
" Drop EPOLLOUT and wait for signal."
" Break loop.\n");
break;
}
}
}
static void Conn_default_cbs_data(struct Conn *C)
{
Log(10, "%llu %s Data received\n", C->id, __func__);
}
__hot static void Conn_default_cbs_close(struct Conn *C)
{
Log(9, "%llu %s Closing conn\n", C->id, __func__);
}
static void Conn_default_cbs_trigger(struct Conn *C)
{
Log(10, "%llu %s Trigger on conn\n", C->id, __func__);
}
static void Conn_default_cbs_error(struct Conn *C)
{
/* TODO: put 0 */
Log(10, "%llu %s Error on conn (%s)\n",
C->id, __func__, Conn_strerror());
}
static void Conn_default_cbs_connected(struct Conn *C)
{
Log(10, "%llu %s We are connected\n", C->id, __func__);
}
static void Conn_default_cbs_accept_error(struct Conn *C)
{
Log(10, "%llu %s Error accepting conn (id %llu) (%s)\n",
C->id, __func__, Conn_strerror());
}
#if 0
/* set noblocking */
static int Conn_setnonblock(const int fd)
{
return fcntl(fd, F_SETFL, O_NONBLOCK | O_RDWR);
}
#endif
/*
* Returns the string representation of an socket address
* @flags: bit0==0 => local address, bit0==1 => peer address
*/
static int Conn_set_address(struct Conn *C, const int flags)
{
int err;
struct sockaddr *psa;
struct sockaddr_in sa4;
struct sockaddr_in6 sa6;
socklen_t sa_len;
char *paddr;
socklen_t addr_size;
int *pport;
/* Test if we need to regenerate. */
if (flags & 1) {
if (C->remote_dirty == 0)
return 0;
} else {
if (C->local_dirty == 0)
return 0;
}
if ((C->state == CONN_STATE_FREE) || (C->state == CONN_STATE_EMPTY))
return 0;
switch (C->sock_domain) {
case PF_INET:
psa = (struct sockaddr *) &sa4;
sa_len = sizeof(struct sockaddr_in);
break;
case PF_INET6:
psa = (struct sockaddr *) &sa6;
sa_len = sizeof(struct sockaddr_in6);
break;
default:
return -1;
}
if (flags & 1) {
/* peer */
paddr = C->addr;
addr_size = sizeof(C->addr);
pport = &C->port;
err = getpeername(C->fd, psa, &sa_len);
} else {
/* local */
paddr = C->bind_addr;
addr_size = sizeof(C->bind_addr);
pport = &C->bind_port;
err = getsockname(C->fd, psa, &sa_len);
}
if (err != 0) {
snprintf(paddr, addr_size, "?");
return -1;
}
switch (C->sock_domain) {
case PF_INET:
inet_ntop(C->sock_domain, &sa4.sin_addr,
paddr, addr_size);
*pport = ntohs(sa4.sin_port);
break;
case PF_INET6:
inet_ntop(C->sock_domain, &sa6.sin6_addr,
paddr, addr_size);
*pport = ntohs(sa6.sin6_port);
break;
default:
return -1;
}
/* Remove dirty flag */
if (flags & 1) {
C->remote_dirty = 0;
} else {
C->local_dirty = 0;
}
return 0;
}
/*
* Returns local address
*/
char *Conn_addr_local(struct Conn *C)
{
Conn_set_address(C, 0);
return C->bind_addr;
}
/*
* Returns remote address
*/
char *Conn_addr_remote(struct Conn *C)
{
Conn_set_address(C, 1);
return C->addr;
}
/*
* Returns local port
*/
int Conn_port_local(struct Conn *C)
{
Conn_set_address(C, 0);
return C->bind_port;
}
/*
* Returns remote port
*/
int Conn_port_remote(struct Conn *C)
{
Conn_set_address(C, 1);
return C->port;
}
/*
* Returns the address family for address stored in @addr.
*/
int Conn_addr_family(const char *addr)
{
struct addrinfo hints, *results = NULL;
int ret;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = 0;
hints.ai_flags = AI_NUMERICHOST;
hints.ai_protocol = 0;
hints.ai_canonname = NULL;
hints.ai_addr = NULL;
hints.ai_next = NULL;
ret = getaddrinfo(addr, NULL, &hints, &results);
if (ret != 0) {
snprintf(Conn_error, sizeof(Conn_error),
"getaddrinfo error on %s (%s)",
addr, gai_strerror(ret));
if (results)
freeaddrinfo(results);
return -1;
}
ret = results->ai_family;
freeaddrinfo(results);
return ret;
}
/*
* Returns a nice speed
*/
static void Conn_speed(char *dst, const unsigned int dst_len,
const unsigned int speed)
{
float sp;
sp = (float) speed;
if (speed < 1000)
snprintf(dst, dst_len, "%.2fBps", sp);
else if (speed < 1000 * 1000)
snprintf(dst, dst_len, "%.2fKBps", sp / 1000);
else
snprintf(dst, dst_len, "%.2fMBps", sp / 1000 / 1000);
}
static char *Conn_status_slot(struct Conn *C)
{
static char tmp[1024];
char speedi[32], speedo[32];
unsigned int dT, si, so;
char flags[128], flags_prefix[3], flags_postfix[2];
char *local_addr, *remote_addr;
int local_port, remote_port;
/* flags */
strcpy(flags, "");
strcpy(flags_prefix, " [");
strcpy(flags_postfix, "");
if (C->auto_reconnect) {
char flags_tmp[64];
strcat(flags, flags_prefix);
snprintf(flags_tmp, sizeof(flags_tmp), "autoreconnect_in_%ld/%u",
(C->tryat == 0) ? 0 : C->tryat - Conn_now.tv_sec,
C->delay);
strcat(flags, flags_tmp);
strcpy(flags_prefix, " ");
strcpy(flags_postfix, "]");
}
if (C->close_after_send) {
strcat(flags, flags_prefix);
strcat(flags, "close_after_send");
strcpy(flags_prefix, " ");
strcpy(flags_postfix, "]");
}
if (C->shutdown_after_send) {
strcat(flags, flags_prefix);
strcat(flags, "shutdown_after_send");
strcpy(flags_prefix, " ");
strcpy(flags_postfix, "]");
}
strcat(flags, flags_postfix);
dT = (unsigned int) (Conn_now.tv_sec - C->start);
if (dT == 0)
dT = 1;
si = (unsigned int)(C->bi / dT);
so = (unsigned int)(C->bo / dT);
Conn_speed(speedi, sizeof(speedi), si);
Conn_speed(speedo, sizeof(speedo), so);
Conn_set_address(C, 0);
Conn_set_address(C, 1);
local_addr = "-";
local_port = 0;
remote_addr = "-";
remote_port = 0;
if (C->type == CONN_TYPE_MASTER) {
local_addr = C->bind_addr;
local_port = C->bind_port;
} else if (C->type == CONN_TYPE_P2P) {
if (C->bind_addr[0] != '\0') {
local_addr = C->bind_addr;
local_port = C->bind_port;
}
remote_addr = C->addr;
remote_port = C->port;
}
snprintf(tmp, sizeof(tmp), "%p id=%llu fd=%d"
" %s/%s/%s"
" %s %s"
" %s/%d <-> %s/%d"
" IO=%llu/%llu"
" BS=%u/%u S=%s/%s"
" T=%ld bw=%u f=%u tk=%u next=%p"
"%s",
C, C->id, C->fd,
Conn_domain(C), Conn_type(C),
Conn_get_socket_protocol(C),
Conn_socktype(C), Conn_state(C),
local_addr, local_port, remote_addr, remote_port,
C->bi, C->bo,
C->ibuf_size, C->obuf_size, speedi, speedo,
Conn_now.tv_sec - C->start,
C->band_width, C->band_factor, C->band_tokens,
C->next, flags);
return tmp;
}
#if 1
static char *Conn_status_slot_html(struct Conn *C)
{
static char tmp[1024];
char *ext = "";
char speedi[32], speedo[32];
unsigned int dT, si, so;
dT = (unsigned int) (Conn_now.tv_sec - C->start);
if (dT == 0)
dT = 1;
si = (unsigned int)(C->bi / dT);
so = (unsigned int)(C->bo / dT);
Conn_speed(speedi, sizeof(speedi), si);
Conn_speed(speedo, sizeof(speedo), so);
#if 0
TODO
if (Conn_status_slot_html_cb)
ext = Conn_status_slot_html_cb(C);
#endif
snprintf(tmp, sizeof(tmp), "<td>%llu</td><td>%d</td>"
"<td>%s</td><td>%s</td><td>%s</td>"
"<td>%s</td><td>%s</td>"
"<td>%s/%d</td>"
"<td>%llu / %llu</td>"
"<td>%u / %u</td><td>%s / %s</td><td>%ld</td>"
"<td>%u</td><td>%u</td><td>%u</td>"
"%s",
C->id, C->fd,
Conn_domain(C), Conn_type(C), Conn_get_socket_protocol(C),
Conn_socktype(C), Conn_state(C),
C->addr, C->port, C->bi, C->bo,
C->ibuf_size, C->obuf_size,
speedi, speedo, Conn_now.tv_sec - C->start,
C->band_width, C->band_factor, C->band_tokens,
ext);
return tmp;
}
#endif
#if 1
/* flags: bit 1 = 1 - html */
char *Conn_status(const unsigned int flags)
{
int tmp_len;
unsigned long len = 0, max;
char tmp[512];
char *buf, *per_slot, *ext = "";
char speedi[32], speedo[32];
unsigned long long bi, bo, dT;
struct Conn *C;
max = (Conn_no + 1) * 512 - 1;
buf = malloc(max + 1);
if (!buf)
return strdup("No enough memory!");
strcpy(buf, "");
gettimeofday(&Conn_now, NULL);
/* TODO: "len += " is incorrect */
tmp_len = snprintf(tmp, sizeof(tmp),
"Conn_pending=%u Conn_no/Conn_max=%u/%u Conn_total=%lu"
" Conn_uptime=%lus Conn_allocated=%u Conn_work_to_do=%u"
" Conn_mem_structs=%llu Conn_mem_buffers_in/out=%llu/%llu\n",
Conn_pending, Conn_no, Conn_max, Conn_total,
Conn_now.tv_sec - Conn_start, Conn_allocated, Conn_work_to_do,
Conn_mem_structs, Conn_mem_buffers_in, Conn_mem_buffers_out);
if (tmp_len < 0) {
free(buf);
return strdup("snprintf error");
}
if (len + (unsigned int) tmp_len < max) {
strcat(buf, tmp);
len += (unsigned int) tmp_len;
}
#if 0
TODO
if (flags & 1)
if (Conn_status_cb)
ext = Conn_status_cb();
#endif
if (flags & 1) {
strcat(buf, "<table border=\"0\" cellspacing=\"1\" cellpadding=\"3\" bgcolor=\"#aaaaaa\">\n");
strcat(buf, "<tr bgcolor=\"ffffff\">\n");
strcat(buf, "<td>ID</td>");
strcat(buf, "<td>FD</td>");
strcat(buf, "<td>Dom</td>");
strcat(buf, "<td>Type</td>");
strcat(buf, "<td>Protocol</td>");
strcat(buf, "<td>SType</td>");
strcat(buf, "<td>State</td>");
strcat(buf, "<td>Addr/port</td>");
strcat(buf, "<td>BI/BO</td>");
strcat(buf, "<td>BUF I/O</td>");
strcat(buf, "<td>Speed I/O</td>");
strcat(buf, "<td>Elap (s)</td>");
strcat(buf, "<td>Band</td>");
strcat(buf, "<td>F</td>");
strcat(buf, "<td>Tks</td>");
strcat(buf, ext);
strcat(buf, "</tr>\n");
} else {
strcat(buf, ext);
}
#if 1
bi = 0; bo = 0; dT = 0;
C = Conn_masters.head;
while (C) {
if (C->state == CONN_STATE_FREE)
continue;
if (C->type == CONN_TYPE_P2P) {
bi += C->bi;
bo += C->bo;
dT += (unsigned int) (Conn_now.tv_sec - C->start);
}
if (flags & 1)
strcat(buf, "<tr bgcolor=\"ffffff\">\n");
if ((flags & 1) == 0)
per_slot = Conn_status_slot(C);
else
per_slot = Conn_status_slot_html(C);
tmp_len = snprintf(tmp, sizeof(tmp), "%s\n", per_slot);
if (tmp_len < 0) {
free(buf);
return strdup("snprintf error");
}
len += (unsigned int) tmp_len;
if (len < max)
strcat(buf, tmp);
if (flags & 1)
strcat(buf, "</tr>\n");
C = C->next;
}
#endif
if (flags & 1)
strcat(buf, "</table>\n");
if (dT == 0)
dT = 1;
Conn_speed(speedi, sizeof(speedi), (unsigned int)(bi / dT));
Conn_speed(speedo, sizeof(speedo), (unsigned int)(bo / dT));
tmp_len = snprintf(tmp, sizeof(tmp), "Total speed I/O: %s/%s"
" Total bytes I/O: %llu/%llu\n",
speedi, speedo, bi, bo);
if (tmp_len < 0) {
free(buf);
return strdup("snprintf error");
}
if (len + (unsigned int) tmp_len < max) {
strcat(buf, tmp);
len += (unsigned int) tmp_len;
}
return buf;
}
#endif
/*
* Set Conn->private pointer
*/
void Conn_set_private(struct Conn *C, void *priv)
{
C->private = priv;
}
/*
* Returns Conn->private pointer
*/
void *Conn_get_private(struct Conn *C)
{
return C->private;
}
/*
* Returns the number of bytes in 'in' buffer
*/
unsigned int Conn_iqlen(const struct Conn *C)
{
return C->ibuf_tail - C->ibuf_head;
}
/*
* Returns the number of bytes in 'out' buffer
*/
unsigned int Conn_oqlen(const struct Conn *C)
{
return C->obuf_tail - C->obuf_head;
}
/*
* Returns the number of bytes in 'in' buffer (obsolete)
*/
unsigned int Conn_qlen(const struct Conn *C)
{
return Conn_iqlen(C);
}
/*
* Returns 1 if we can ignore this connection
* TODO - really needed?
*/
static int Conn_ignore(struct Conn *C)
{
if (C->error_state > 0)
return 1;
if (C->state == CONN_STATE_FREE)
return 1;
return 0;
}
#if 0
/*
* Close a connection if it exceeded maximum idle time or got a timeout
*/
static void Conn_expire(struct Conn *C)
{
long long diff_ms;
if (C->trigger > 0) {
/* We do not trigger first time */
if (C->last_trigger == 0)
C->last_trigger = Conn_now.tv_sec;
if ((C->last_trigger > 0)
&& (C->last_trigger + C->trigger < Conn_now.tv_sec)) {
if (C->cbs.trigger)
C->cbs.trigger(C);
C->last_trigger = C->last_trigger + C->trigger;
}
}
if ((C->idle > 0) && (C->trecv.tv_sec + C->idle < Conn_now.tv_sec)) {
C->error_state = CONN_ERROR_EXPIRED;
} else if ((C->read_timeout > 0) && (C->tsend.tv_sec > 0)
&& (C->tsend.tv_sec > C->trecv.tv_sec)) {
diff_ms = Conn_time_diff(&Conn_now, &C->tsend);
if (diff_ms > C->read_timeout) {
C->error_state = CONN_ERROR_READ_TIMEOUT;
}
} else if ((C->conn_timeout > 0) && (C->state == CONN_STATE_CONNECT_b)) {
diff_ms = Conn_time_diff(&Conn_now, &C->conn_syn);
if (diff_ms > C->conn_timeout) {
/* connection attempt expired */
C->error_state = CONN_ERROR_CONN_TIMEOUT;
}
}
}
#endif
/*
* Set NODELAY on socket
*/
int Conn_nodelay(const struct Conn *C)
{
int i = 1;
return setsockopt(C->fd, SOL_TCP, TCP_NODELAY, &i, sizeof(i));
}
void Conn_rollback(struct Conn *C, const unsigned int bytes)
{
if (C->obuf_tail - C->obuf_head <= bytes)
C->obuf_tail -= bytes;
}
/*
* Returns a pointer to current in buffer
*/
char *Conn_ibuf(const struct Conn *C)
{
return C->ibuf + C->ibuf_head;
}
/*
* Returns a pointer to current out buffer
*/
char *Conn_obuf(const struct Conn *C)
{
return C->obuf + C->obuf_head;
}
/*
* Returns the id of a connection (obsolete)
*/
unsigned long long Conn_getid(const struct Conn *C)
{
return C->id;
}
/*
* Returns the id of a connection (obsolete)
*/
unsigned long long Conn_get_id(const struct Conn *C)
{
return C->id;
}
/*
* Returns a Conn* searching by id
*/
struct Conn *Conn_get(const unsigned long long id)
{
struct Conn *R = NULL;
unsigned int i;
for (i = 0; i < Conn_no; i++) {
if (Conns[i].id == id) {
R = &Conns[i];
break;
}
}
return R;
}
/*
* Set a callback
*/
int Conn_set_cb(struct Conn *C, const unsigned int cb_type,
void (*f)(struct Conn *))
{
Log(12, "%s %u %p\n", __func__, cb_type, f);
switch (cb_type) {
case CONN_CB_ACCEPT: C->cbs.accept = f; break;
case CONN_CB_RECV: C->cbs.recv = f; break;
case CONN_CB_SEND: C->cbs.send = f; break;
case CONN_CB_DATA: C->cbs.data = f; break;
case CONN_CB_CLOSE: C->cbs.close = f; break;
case CONN_CB_TRIGGER: C->cbs.trigger = f; break;
case CONN_CB_ERROR: C->cbs.error = f; break;
case CONN_CB_CONNECTED: C->cbs.connected = f; break;
case CONN_CB_ACCEPT_ERROR: C->cbs.accept_error = f; break;
default: return -1;
}
return 0;
}
/*
* Search for str in active buffer from a given offset
* Returns pointer to string if match or NULL if doesn't.
* @flags: bit 0 == 1 => case insensitive
*/
char *Conn_ostrstr(struct Conn *C, const unsigned int off, const char *str,
const unsigned int flags)
{
unsigned int len, i;
size_t str_len;
char *buf, *ret = NULL;
int err;
len = C->ibuf_tail - C->ibuf_head - off;
buf = C->ibuf + C->ibuf_head + off;
str_len = strlen(str);
if (len < str_len)
return NULL;
i = 0;
while (i <= len - str_len) {
if (flags & 1)
err = strncasecmp(buf + i, str, str_len);
else
err = strncmp(buf + i, str, str_len);
if (err == 0) {
ret = buf + i;
break;
}
i++;
}
return ret;
}
/*
* Search for str in active buffer
* Returns pointer to string if match or NUll if doesn't.
*/
char *Conn_strstr(struct Conn *C, const char *str)
{
return Conn_ostrstr(C, 0, str, 0);
}
/*
* Search for str in active buffer (case insensitive)
* Returns pointer to string if match or NUll if doesn't.
*/
char *Conn_strcasestr(struct Conn *C, const char *str)
{
return Conn_ostrstr(C, 0, str, 1);
}
/*
* Returns a '\0' terminated line, modifying received buffer
*/
char *Conn_get_line(struct Conn *C)
{
char *cr;
cr = Conn_ostrstr(C, 0, "\n", 1);
if (!cr)
return NULL;
*cr = '\0';
return Conn_ibuf(C);
}
/*
* Cut from right, in place, the chars specified in @chars.
*/
static void Conn_rtrim(char *s, const char *chars)
{
char *e;
if (!s || (*s == '\0'))
return;
e = s + strlen(s) - 1;
while ((e >= s) && (strchr(chars, *e))) {
*e = '\0';
e--;
}
}
/*
* Helper for building text line daemons
*/
void Conn_for_every_line(struct Conn *C, void (*cb)(struct Conn *C, char *line))
{
if (cb == NULL)
return;
while (1) {
char *line;
size_t line_size;
/* TODO: Conn_get_line may also return the size */
line = Conn_get_line(C);
if (line == NULL)
break;
line_size = strlen(line) + 1;
Conn_rtrim(line, "\r");
cb(C, line);
Conn_eat(C, (unsigned int) line_size);
}
}
/*
* Formats a line and puts it in the queue
* Returns -1 on error.
*/
int Conn_printf(struct Conn *C, const char *format, ...)
{
va_list ap;
int ret;
ret = 0;
va_start(ap, format);
while (likely(*format)) {
char *s;
ssize_t slen;
unsigned long len;
char tmp[64];
int d, r;
/* Find first formatting char */
s = strchr(format, '%');
if (unlikely(!s)) {
r = Conn_enqueue_wait(C, format, strlen(format));
ret = r == -1 ? -1 : 0;
break;
}
len = (long unsigned int) (s - format);
r = Conn_enqueue_wait(C, format, (unsigned int) len);
/* TODO: Should we call error callback and close the connection? */
if (unlikely(r == -1)) {
ret = -1;
break;
}
format = s + 1;
s = tmp;
switch (*format) {
case '%': tmp[0] = '%'; len = 1; break;
case 's':
s = va_arg(ap, char *);
len = strlen(s);
break;
case 'd':
d = va_arg(ap, int);
/* TODO: replace this with another function. Doh! */
slen = snprintf(tmp, sizeof(tmp), "%d", d);
if (slen < 0)
len = 0;
else
len = (unsigned long) slen;
break;
case 'c':
tmp[0] = (char) va_arg(ap, int);
len = 1;
break;
case '\0': s = NULL; break;
default:
Log(0, "Unknown format [%c]!\n", *format);
abort();
}
if (likely(s)) {
r = Conn_enqueue_wait(C, s, (unsigned int) len);
if (unlikely(r == -1)) {
ret = -1;
break;
}
format++;
}
}
va_end(ap);
return ret;
}
/*
* This is called after allocation to init some fields.
*/
__hot static struct Conn *Conn_alloc_prepare(struct Conn *C)
{
void *p;
if (unlikely(Conn_no > Conn_max_reached))
Conn_max_reached = Conn_no;
/* We assume that will be a master */
C->type = CONN_TYPE_MASTER;
C->state = CONN_STATE_EMPTY;
C->error_state = CONN_ERROR_NO_ERROR;
C->xerrno = 0;
C->sock_protocol = 0;
C->sock_domain = PF_INET;
C->sock_type = SOCK_STREAM;
if (unlikely(C->ibuf_size < Conn_default_ibuf)) {
p = realloc(C->ibuf, Conn_default_ibuf);
if (unlikely(p == NULL)) {
snprintf(Conn_error, sizeof(Conn_error),
"Memory allocation error2!");
return NULL;
}
Conn_mem_buffers_in += Conn_default_ibuf - C->ibuf_size;
C->ibuf = p;
C->ibuf_size = Conn_default_ibuf;
}
C->ibuf_head = 0;
C->ibuf_tail = 0;
if (unlikely(C->obuf_size < Conn_default_obuf)) {
p = realloc(C->obuf, Conn_default_obuf);
if (unlikely(p == NULL)) {
snprintf(Conn_error, sizeof(Conn_error),
"Memory allocation error3!");
return NULL;
}
Conn_mem_buffers_out += Conn_default_obuf - C->obuf_size;
C->obuf = p;
C->obuf_size = Conn_default_obuf;
}
C->obuf_head = 0;
C->obuf_tail = 0;
C->trecv = Conn_now;
C->bi = 0;
C->bo = 0;
C->private = NULL;
/* Reset syn time */
C->conn_syn.tv_sec = 0;
C->conn_syn.tv_usec = 0;
/* flags */
C->auto_reconnect = 0;
C->close_after_send = 0;
C->shutdown_after_send = 0;
C->local_dirty = 1;
C->remote_dirty = 1;
/* bandwidth */
C->band_width = 0;
C->band_factor = 0;
C->band_tokens = 0;
C->band_lasttime = Conn_now;
C->fd = -1;
C->start = Conn_now.tv_sec;
C->id = Conn_id++;
/* TODO: really? Don't we borrow them from master listent conn?! */
C->cbs = Conn_default_cbs;
C->web = NULL;
C->web_req = NULL;
Conn_no++;
/* Conn_work_to_do will not be incremented here, only in commit! */
Log(10, "Allocated id %llu. Now Conn_no=%d.\n",
C->id, Conn_no);
return C;
}
/*
* Allocates a Conn structure. This is called by the user.
*/
__hot struct Conn *Conn_alloc(void)
{
struct Conn *C;
Log(10, "%s Conn_no=%d Conn_max=%d\n",
__func__, Conn_no, Conn_max);
if (unlikely((Conn_max > 0) && (Conn_no >= Conn_max))) {
snprintf(Conn_error, sizeof(Conn_error),
"Limit reached! Consider a raise of max connection number or put 0 for no limit.");
return NULL;
}
if (unlikely(Conn_free.head == NULL)) {
int r;
r = pool_grow(&Conn_free, 4);
if (unlikely(r != 0)) {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot grow anymore. Probably memory shortage.");
return NULL;
}
}
/* Steal from free list */
C = Conn_free.head;
Conn_free.head = C->next;
C->next = NULL;
return Conn_alloc_prepare(C);
}
/*
* Allocates a Conn structure, worker version
*/
__hot static struct Conn *Conn_alloc_worker(struct Conn_wpool_worker *w)
{
struct Conn *C;
if (unlikely(w->free.head == NULL)) {
int r;
r = pool_grow(&w->free, CONN_BULK_ALLOC);
if (unlikely(r != 0)) {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot grow anymore. Probably memory shortage.");
return NULL;
}
}
/* Steal from free list */
C = w->free.head;
w->free.head = C->next;
C->next = NULL;
C->worker = w;
return Conn_alloc_prepare(C);
}
/*
* Accepting a connection
* "C" is shared with the other threads. TODO: make it less ugly.
*/
__hot static void Conn_accept(struct Conn *C)
{
int fd, err;
struct sockaddr *pca;
struct sockaddr_in ca4;
struct sockaddr_in6 ca6;
socklen_t cax_len;
struct Conn *X;
if (unlikely(Conn_debug_level > 9)) {
Log(0, "Accepting a connection via %s/%d, fd %d, type %s"
", domain %s, protocol %s.\n",
C->bind_addr, C->bind_port, C->fd, Conn_type(C),
Conn_domain(C), Conn_get_socket_protocol(C));
}
switch (C->sock_domain) {
case PF_INET:
pca = (struct sockaddr *) &ca4;
cax_len = sizeof(ca4);
break;
case PF_INET6:
pca = (struct sockaddr *) &ca6;
cax_len = sizeof(ca6);
break;
default:
snprintf(Conn_error, sizeof(Conn_error),
"Cannot deal with domain %d.",
C->sock_domain);
Conn_error_raise(C, EAFNOSUPPORT);
return;
}
again:
fd = accept4(C->fd, pca, &cax_len, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (unlikely(fd == -1)) {
if (errno == EAGAIN)
return;
if (errno == EINTR)
goto again;
/* TODO: ratelimit */
Log(2, "WARN: Cannot accept on fd %d [%s].\n",
C->fd, strerror(errno));
/*
* We must not raise an error here because we will close the
* master socket!
* TODO: We should signal it as a warning.
*/
/* Conn_error_raise(C, errno); */
return;
}
X = Conn_alloc_worker(C->worker);
if (unlikely(!X)) {
/* TODO: really? We will close the master fd! */
Conn_error_raise(C, ENOMEM);
close(fd);
return;
}
X->cbs = C->cbs;
X->fd = fd;
X->type = CONN_TYPE_P2P;
X->state = CONN_STATE_OPEN;
X->time_open = Conn_now;
X->web = C->web;
if (X->web) {
X->web_req = malloc(sizeof(struct Conn_web_request));
if (!X->web_req) {
Conn_error_raise(C, ENOMEM);
close(fd);
return;
}
memset(X->web_req, 0, sizeof(struct Conn_web_request));
}
/* We should replace them with a pointer to master C */
Conn_set_socket_domain(X, C->sock_domain);
Conn_set_socket_type(X, C->sock_type);
Conn_set_socket_protocol(X, C->sock_protocol);
X->local_dirty = 1;
X->remote_dirty = 1;
/*TODO err = Conn_add_obj(C->worker->epoll_fd, X, EPOLLIN | EPOLLRDHUP);*/
err = Conn_add_obj(C->worker->epoll_fd, X, EPOLLIN);
if (unlikely(err != 0)) {
/* TODO: add a specific callback for enqueue errors? */
Conn_error_raise(C, CONN_ERROR_INTERNAL);
Conn_free_intern(X);
return;
}
/* TODO: is possible to not be really needed
Conn_nodelay(X);
*/
#if 1
if (X->cbs.accept)
X->cbs.accept(X);
#else
Conn_default_cbs_accept(X);
#endif
Conn_work_to_do++;
Conn_total++;
/* TODO: not real correct! */
C->worker->in_clients++;
/* We must call accept till we get EAGAIN */
/* TODO: seems is not a good idea afterall. We may starve the rest of connections.
goto again;
*/
}
/*
* Callback that is called for every connection
*/
static inline void Conn_poll_cb(struct Conn *C, const unsigned int revents)
{
if (unlikely(Conn_debug_level > 5)) {
char poll_status[16];
Conn_poll_status(revents, poll_status);
Log(0, "%llu %s revents=%s\n",
C->id, __func__, poll_status);
Log(12, "\t%s\n", Conn_status_slot(C));
/* TODO: we should prefix the outout with a \t? */
}
/* We may receive several events on the same conn */
if (unlikely(C->state == CONN_STATE_FREE))
return;
if (unlikely(revents & EPOLLERR)) {
Log(0, "\tEPOLLERR!\n");
C->error_state = CONN_ERROR_POLL;
C->xerrno = 0; /* TODO: unknown error? */
Conn_free_intern(C);
/* TODO: CONN_ERROR_POLL is correct here? */
return;
}
if (unlikely(revents & EPOLLHUP)) {
Log(9, "\tEPOLLHUP!\n");
C->error_state = CONN_ERROR_HANGUP;
Conn_free_intern(C);
return;
}
/* First, test we have a new connection */
if (revents & EPOLLOUT) {
if (C->state == CONN_STATE_CONNECT_b) {
Log(9, "\tWe just established a connection.\n");
C->state = CONN_STATE_OPEN;
C->local_dirty = 1;
C->time_open = Conn_now;
/* TODO: indirect call costs a lot. Hm.
if (likely(C->cbs.connected))
C->cbs.connected(C);
*/
Conn_default_cbs_connected(C);
}
}
//if (Conn_ignore(C))
// return;
/* Second, test for hangup or input */
if (likely(revents & EPOLLIN)) {
if (unlikely(C->type == CONN_TYPE_MASTER)) {
Conn_accept(C);
} else {
Log(9, "\tWe have input\n");
/* TODO: use a callback or not?
if (likely(C->cbs.recv))
C->cbs.recv(C);
*/
Conn_default_cbs_recv(C);
}
}
if (Conn_ignore(C))
return;
/* RDHUP may come with POLLIN, so it must be called after */
if (unlikely(revents & EPOLLRDHUP)) {
Log(9, "\tEPOLLRDHUP!\n");
/* TODO: What we do here? */
}
if (Conn_ignore(C))
return;
if (likely(revents & EPOLLOUT)) {
Log(9, "\tWe can send data (state=%hhu)...\n", C->state);
if (likely(C->state == CONN_STATE_OPEN)) {
/*
if (likely(C->cbs.send))
C->cbs.send(C);
*/
Conn_default_cbs_send(C);
}
}
}
/*
* Dumps some statistics
*/
static void Conn_stats(void)
{
}
/* ############## wpool 2 ############# */
/*
* Dispatch events to a callback
* Returns: -1 on error, 0 nothing to do, n (>0) if some work was done
* timeout is in miliseconds.
*/
static inline int Conn_dispatch_events(struct Conn_wpool_worker *w,
int epoll_fd, struct epoll_event *e,
const unsigned short e_size, const int timeout)
{
int i, events;
struct Conn *C;
ssize_t r;
Log(10, "%s timeout2=%dms e_size=%hu...\n", __func__,
timeout, e_size);
while (1) {
events = epoll_wait(epoll_fd, e, e_size, timeout);
if (unlikely(events < 0)) {
if (errno == EINTR)
continue;
Log(1, "%s [%s]\n",
__func__, strerror(errno));
snprintf(Conn_error, sizeof(Conn_error), "epoll error (errno=%d) (%s)",
errno, strerror(errno));
return -1;
}
break;
}
if (events == 0)
return 0;
if (unlikely(Conn_debug_level > 8)) {
char sevents[16];
Log(0, "\tProcessing %d event(s):\n", events);
for (i = 0; i < events; i++) {
C = e[i].data.ptr;
Conn_poll_status(e[i].events, sevents);
Log(0, "\tptr=%p ### ev i=%d %s\n", C, i, sevents);
}
}
for (i = 0; i < events; i++) {
/* Special events */
if (unlikely(e[i].data.fd == 1)) {
/* TODO: we may have another message type! */
Log(0, "We have a master signaling. TODO!\n");
continue;
}
if (unlikely(e[i].data.fd == 2)) {
uint64_t ticks;
r = read(w->timer_fd, &ticks, sizeof(ticks));
if (r != 8) {
Log(0, "\tCannot read timer fd (%s).\n", strerror(errno));
continue;
}
//Log(10, "We have a timer tick (ticks %llu).\n", ticks);
gettimeofday(&Conn_now, NULL);
/* TODO: optimize here and try to deduct time! */
continue;
}
Conn_poll_cb(e[i].data.ptr, e[i].events);
}
return events;
}
/*
* Start a worker
*/
static int Conn_wpool_start_worker(struct Conn *C, struct Conn_wpool_worker *w,
int listen_fd)
{
int r, pipes[2];
long cpus;
struct epoll_event ev;
cpu_set_t cpuset;
struct itimerspec new_value;
pid_t pid;
char tmp[32];
Log(10, "Starting worker (id=%hu, listen_fd=%d)...\n", w->id, listen_fd);
r = socketpair(AF_LOCAL, SOCK_STREAM, 0, pipes);
if (r != 0) {
Log(0, "Cannot call socketpair (%s)\n", strerror(errno));
return -1;
}
pid = fork();
if (pid == -1) {
Log(0, "%s: Cannot fork: %s!\n",
__func__, strerror(errno));
goto close_pipe;
}
if (pid != 0) { /* parent */
Log(10, "Started worker (id=%hu) pipe=%d\n",
w->id, pipes[0]);
close(pipes[1]);
w->control = pipes[0];
return 0;
}
/* Here we are in the child process */
snprintf(tmp, sizeof(tmp), "worker %hu", w->id);
Conn_log_set_info(tmp);
w->control = pipes[1];
close(pipes[0]);
C->fd = listen_fd;
if (C->type == CONN_TYPE_MASTER)
listen(C->fd, 4096);
/* TODO: if we have gaps, the allocation is not correct! */
cpus = sysconf(_SC_NPROCESSORS_ONLN);
Log(1, "%ld cpus found.\n", cpus);
CPU_ZERO(&cpuset);
CPU_SET((size_t)(w->id % cpus), &cpuset);
r = sched_setaffinity(0, sizeof(cpu_set_t), &cpuset);
if (r != 0) {
Log(0, "%s: Cannot set affinity: %s\n",
__func__, strerror(errno));
} else {
Log(1, "%s: Affinity for worker %hu: cpu %u\n",
__func__, w->id, w->id % cpus);
}
C->worker = w;
w->conns_head = NULL;
pool_init(&w->free);
w->mem_structs = 0; /* TODO: really used? */
w->in_clients = 0;
w->pid = getpid();
w->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (w->epoll_fd == -1) {
Log(0, "Cannot create worker epoll (%s).\n", strerror(errno));
exit(1);
}
/* Register our end of the pipe to receive signaling from parent */
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.u64 = 1; /* TODO: Find a better id */
r = epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, w->control, &ev);
if (r != 0) {
Log(0, "Cannot add control fd to epoll (%s).\n", strerror(errno));
exit(1);
}
w->timer_fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC);
if (w->timer_fd == -1) {
Log(0, "Cannot create timer fd (%s).\n", strerror(errno));
exit(1);
}
/* arm timer - once per second */
new_value.it_value.tv_sec = 1;
new_value.it_value.tv_nsec = 0;
new_value.it_interval.tv_sec = 1;
new_value.it_interval.tv_nsec = 0;
r = timerfd_settime(w->timer_fd, 0 /* relative */, &new_value, NULL);
if (r == -1) {
Log(0, "Cannot arm timer (%s).\n", strerror(errno));
exit(1);
}
/* add timer to poll */
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.u64 = 2; /* TODO: Find a better id */
r = epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, w->timer_fd, &ev);
if (r != 0) {
Log(0, "Cannot add timer to epoll (%s).\n", strerror(errno));
exit(1);
}
/* Register C->fd */
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.ptr = C;
r = epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, C->fd, &ev);
if (r != 0) {
Log(0, "Cannot add listen_fd %d to epoll (%s).\n",
C->fd, strerror(errno));
exit(1);
}
w->inited = 1;
Log(10, "%s: worker prepared, epoll=%d control=%d fd=%d timer=%d\n",
__func__, w->epoll_fd, w->control, C->fd, w->timer_fd);
while (1) {
Conn_dispatch_events(w, w->epoll_fd, w->events,
CONN_EVENTS_SLOTS, -1);
//sched_yield();
//Conn_wpool_worker_stats(w);
}
exit(1);
close_pipe:
close(pipes[0]);
close(pipes[1]);
return -1;
}
/* ############## misc ############# */
/*
* Reads a value from proc
*/
static void Conn_read_proc(char *buf, const size_t buf_size, const char *file)
{
int fd;
ssize_t n;
fd = open(file, O_RDONLY);
if (fd == -1) {
snprintf(buf, buf_size, "ERROR_OPEN!");
return;
}
n = read(fd, buf, buf_size - 1);
if (n == -1) {
snprintf(buf, buf_size, "ERROR_READ!");
} else if (n == 0) {
strcpy(buf, "");
} else {
buf[n - 1] = '\0';
n--;
while ((n >= 0) && (buf[n - 1] == '\n')) {
buf[n - 1] = '\0';
n--;
}
}
close(fd);
}
/*
* Returns some important system values
*/
static void Conn_sys(void)
{
char somaxconn[16];
char tcp_max_tw_buckets[16];
char tcp_fin_timeout[16];
Conn_read_proc(somaxconn, sizeof(somaxconn),
"/proc/sys/net/core/somaxconn");
Conn_read_proc(tcp_max_tw_buckets, sizeof(tcp_max_tw_buckets),
"/proc/sys/net/ipv4/tcp_max_tw_buckets");
Conn_read_proc(tcp_fin_timeout, sizeof(tcp_fin_timeout),
"/proc/sys/net/ipv4/tcp_fin_timeout");
Log(1, "net.core.somaxconn=%s"
" net.ipv4.tcp_max_tw_buckets=%s"
" net.ipv4.tcp_fin_timeout=%s\n",
somaxconn, tcp_max_tw_buckets, tcp_fin_timeout);
}
/* ############## split ############ */
/*
* Free a prealocated structure
*/
void Conn_split_free(struct Conn_split **s)
{
struct Conn_split *p;
struct Conn_split_cell *q, *next;
if (!s)
return;
p = *s;
if (!p)
return;
q = p->head;
while (q) {
next = q->next;
free(q);
q = next;
}
if (p->line)
free(p->line);
free(p);
}
/*
* Split a buffer pointed by C to var,value pairs.
*/
struct Conn_split *Conn_split(const char *line0)
{
char *p;
struct Conn_split *ret;
struct Conn_split_cell *q;
char search_for;
char *left, *right;
unsigned int right_len;
ret = (struct Conn_split *) calloc(1, sizeof(struct Conn_split));
if (!ret) {
snprintf(Conn_error, sizeof(Conn_error),
"cannot alloc memory for Conn_split!\n");
return NULL;
}
ret->line = strdup(line0);
if (!ret->line) {
snprintf(Conn_error, sizeof(Conn_error),
"cannot alloc memory for line duplication!\n");
goto free;
}
Conn_rtrim(ret->line, "\r\n \t");
/* do the spliting */
p = ret->line;
while (*p != '\0') {
/* skip empty space */
while ((*p == ' ') || (*p == '\t'))
p++;
if (*p == '\0')
break;
/* Init */
right = "";
right_len = 0;
/* Building left */
left = p;
while ((*p != '\0') && (*p != '='))
p++;
if (*p != '\0') {
*p = '\0';
/* skip '=' */
p++;
/* Building right */
right = p;
search_for = ' ';
if (*p == '"') {
search_for = '"';
p++;
}
while ((*p != '\0') && (*p != search_for)) {
right_len++;
p++;
}
if (*p != '\0') {
*p = '\0';
p++;
}
}
/* alloc data and fill it */
q = (struct Conn_split_cell *) calloc(1, sizeof(struct Conn_split_cell));
if (!q) {
snprintf(Conn_error, sizeof(Conn_error),
"cannot alloc memory!\n");
goto free;
}
q->left = left;
q->right = right;
q->right_len = right_len;
if (ret->head == NULL)
ret->head = q;
else
ret->tail->next = q;
ret->tail = q;
}
return ret;
free:
Conn_split_free(&ret);
return NULL;
}
/*
* Search for a string and return the value
*/
char *Conn_split_get_size(const struct Conn_split *s, const char *left,
unsigned int *size)
{
struct Conn_split_cell *p;
p = s->head;
while (p) {
if (strcmp(left, p->left) == 0) {
if (size != NULL)
*size = p->right_len;
return p->right;
}
p = p->next;
}
return NULL;
}
/*
* Search for a string and return the value
*/
char *Conn_split_get_e(const struct Conn_split *s, const char *l)
{
return Conn_split_get_size(s, l, NULL);
}
/*
* Search for a string and return the value or "" if not found
*/
char *Conn_split_get(const struct Conn_split *s, const char *l)
{
char *r;
r = Conn_split_get_size(s, l, NULL);
if (!r)
r = "";
return r;
}
/*
* Return a value as unsigned long
*/
unsigned long Conn_split_get_ul(const struct Conn_split *s, const char *l,
const unsigned int base)
{
char *r;
unsigned long ret = 0;
r = Conn_split_get_e(s, l);
if (r)
ret = strtoul(r, NULL, (int) base);
return ret;
}
/*
* Return a value as unsigned long long
*/
unsigned long long Conn_split_get_ull(const struct Conn_split *s, const char *l,
unsigned int base)
{
char *r;
unsigned long long ret = 0;
r = Conn_split_get_e(s, l);
if (r)
ret = strtoull(r, NULL, (int) base);
return ret;
}
/*
* Return a value as double
*/
double Conn_split_get_d(const struct Conn_split *s, const char *l)
{
char *r;
double ret = 0;
r = Conn_split_get_e(s, l);
if (r)
ret = strtod(r, NULL);
return ret;
}
#if 0
/*
* Returns 1 if the string contains only 0-9a-zA-Z. Else 0
*/
static int Conn_alphanum(const char *s)
{
size_t i, len;
len = strlen(s);
for (i = 0; i < len; i++)
if (isalnum(s[i]) == 0)
return 0;
return 1;
}
#endif
/*
* Returns the lifetime of a connection
*/
unsigned long long Conn_lifetime(struct Conn *C)
{
return (unsigned long long) Conn_time_diff(&Conn_now, &C->time_open);
}
char *Conn_strerror(void)
{
return Conn_error;
}
void Conn_set_error(const char *format, ...)
{
va_list ap;
va_start(ap, format);
vsnprintf(Conn_error, sizeof(Conn_error), format, ap);
va_end(ap);
}
/*
* Difference between two timeval strutures, in milliseconds
*/
long long Conn_time_diff(const struct timeval *t1, const struct timeval *t2)
{
return (t1->tv_sec - t2->tv_sec) * 1000
+ (t1->tv_usec - t2->tv_usec) / 1000;
}
/*
* Returns string representation of errno code
*/
char *Conn_errno(const struct Conn *C)
{
static char buf[256];
char *is;
switch (C->error_state) {
case CONN_ERROR_USERREQ: is = "user"; break;
case CONN_ERROR_POLL: is = "poll"; break;
case CONN_ERROR_RECV: is = "recv"; break;
case CONN_ERROR_SEND: is = "send"; break;
case CONN_ERROR_SOCKET: is = "socket"; break;
case CONN_ERROR_HANGUP: is = "hangup"; break;
case CONN_ERROR_GETADDRINFO: is = "lookup error"; break;
case CONN_ERROR_EXPIRED: is = "expired"; break;
case CONN_ERROR_ACCEPT: is = "accept"; break;
case CONN_ERROR_MEM: is = "allocation failed"; break;
case CONN_ERROR_CONNECT: is = "connect"; break;
case CONN_ERROR_READ_TIMEOUT: is = "read timeout"; break;
case CONN_ERROR_CONN_TIMEOUT: is = "conn timeout"; break;
case CONN_ERROR_INTERNAL: is = "internal error"; break;
default: is = "?"; break;
}
snprintf(buf, sizeof(buf), "%s (%s)",
is, (C->xerrno > 0) ? strerror(C->xerrno) : "-");
return buf;
}
int Conn_init(const unsigned int max)
{
int sock, i, ret;
if (Conn_inited == 1)
return 0;
Conn_log_set_info("main");
/* TODO: masters does not need a full pool implementation */
pool_init(&Conn_masters);
pool_init(&Conn_free);
Conn_max = max;
Conn_no = 0;
Conn_work_to_do = 0;
Conn_total = 0;
Conn_max_reached = 0;
gettimeofday(&Conn_now, NULL);
Conn_start = (unsigned int) Conn_now.tv_sec;
Conn_allocated = 0;
snprintf(Conn_error, sizeof(Conn_error), "%s", "");
Conn_epoll_fd = epoll_create(32);
if (Conn_epoll_fd == -1) {
Log(0, "Cannot create epoll fd (%s)\n", strerror(errno));
return -1;
}
Conn_inited = 1;
sock = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
if (sock == -1) {
Log(0, "Cannot create a simple socket (%s)\n", strerror(errno));
return -1;
}
i = 1;
ret = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &i, sizeof(i));
if (ret >= 0)
Conn_reuseport_available = 1;
close(sock);
Log(10, "SO_REUSEPORT is %savailable\n",
Conn_reuseport_available ? "" : "not ");
/* Log some info about this system */
Conn_sys();
Log(0, "sizeof(struct Conn)=%u\n", sizeof(struct Conn));
return 0;
}
/*
* Shutdown Conn
*/
int Conn_shutdown(void)
{
Conn_inited = 0;
close(Conn_epoll_fd);
/* Free all buffers */
#if 0
TODO
Log(5, "Freeing %u slots...\n", Conn_allocated);
for (slot = 0; slot < Conn_allocated - 1; slot++) {
if (Conns[slot].ibuf)
free(Conns[slot].ibuf);
if (Conns[slot].obuf)
free(Conns[slot].obuf);
if (Conns[slot].fd != -1) {
Log(6, "Closing slot %u, fd %d...\n",
slot, Conns[slot].fd);
close(Conns[slot].fd);
}
}
#endif
free(Conns);
return 0;
}
/*
* Enqueues data but does not kick the sending
* Returns -1 on error, 0 if ok.
*/
__hot int Conn_enqueue_wait(struct Conn *C, const void *buf, const unsigned int count)
{
if (unlikely(Conn_debug_level >= 10)) {
char *dump;
dump = Conn_dump(buf, count);
Log(0, "%llu %s Try to enqueue %d byte(s) [%s]...\n",
C->id, __func__, count, dump);
free(dump);
}
if (unlikely(C->obuf_size - C->obuf_tail < count)) {
int r;
r = Conn_try_expand_buf(C, 0, count);
if (unlikely(r != 0))
return -1;
}
memcpy(C->obuf + C->obuf_tail, buf, count);
C->obuf_tail += count;
return 0;
}
/*
* Enqueues data and kick the sending
*/
__hot int Conn_enqueue(struct Conn *C, const void *buf, const unsigned int count)
{
int ret;
ret = Conn_enqueue_wait(C, buf, count);
if (likely(ret == 0))
Conn_default_cbs_send(C);
return ret;
}
/*
* Kicks sending (after Conn_enqueue_wait).
*/
__hot void Conn_kick(struct Conn *C)
{
Log(10, "%llu %s\n", C->id, __func__);
Conn_default_cbs_send(C);
}
int Conn_set_socket_domain(struct Conn *C, const int domain)
{
C->sock_domain = domain;
return 0;
}
int Conn_set_socket_type(struct Conn *C, const int type)
{
C->sock_type = type;
return 0;
}
int Conn_set_socket_protocol(struct Conn *C, const int protocol)
{
C->sock_protocol = protocol;
return 0;
}
/*
* Set the address where to bind to
*/
int Conn_set_socket_bind_addr(struct Conn *C, const char *addr)
{
snprintf(C->bind_addr, sizeof(C->bind_addr), "%s", addr);
return 0;
}
/*
* Set address where to connect to
*/
int Conn_set_socket_addr(struct Conn *C, const char *addr)
{
snprintf(C->addr, sizeof(C->addr), "%s", addr);
C->type = CONN_TYPE_P2P;
return 0;
}
int Conn_set_socket_bind_port(struct Conn *C, const int port)
{
C->bind_port = port;
return 0;
}
int Conn_set_socket_port(struct Conn *C, const int port)
{
C->port = port;
return 0;
}
#if 0
/*
* Clones a Conn structure to be used for splitting between workers
*/
__cold static struct Conn *Conn_clone(struct Conn *C)
{
struct Conn *ret;
ret = malloc(sizeof(struct Conn));
if (!ret) {
Log(1, "Cannot alloc memory for a clone!\n");
return NULL;
}
memcpy(ret, C, sizeof(struct Conn));
return ret;
}
#endif
/*
* Prepares a socket to be used in Conn_commit
*/
static int Conn_prepare_socket(struct Conn *C, struct sockaddr *bind_psa,
socklen_t bind_sock_len)
{
int fd, i;
fd = socket(C->sock_domain, C->sock_type | SOCK_NONBLOCK | SOCK_CLOEXEC,
C->sock_protocol);
if (fd == -1) {
Log(1, "%s: Could not create socket (%s)!\n",
__func__, strerror(errno));
snprintf(Conn_error, sizeof(Conn_error),
"Cannot create socket (%s, %s, %s) [%s]",
Conn_domain(C), Conn_type(C),
Conn_get_socket_protocol(C),
strerror(errno));
return -1;
}
/* Conn_setnonblock(fd); TODO: if kernel < 2.6.27, we still do not
* have nonblock support. Check at runtime and make setnonblock as
* a no-op */
if (C->sock_domain == PF_INET6) {
#ifndef IPV6_V6ONLY
#define IPV6_V6ONLY 26
#endif
i = 1;
setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&i, sizeof(i));
}
if (C->type == CONN_TYPE_MASTER) {
int ret;
i = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
if (Conn_reuseport_available) {
i = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &i, sizeof(i));
}
ret = bind(fd, bind_psa, bind_sock_len);
if (ret < 0) {
snprintf(Conn_error, sizeof(Conn_error),
"Cannot bind on %s/%d [%s]",
C->bind_addr, C->bind_port, strerror(errno));
return -1;
}
}
Log(1, "%s: Prepared socket %d!\n", __func__, fd);
return fd;
}
/*
* Allocates socket and bind if asked
* TODO: We should return -1 or free connection and call calbacks?!
*/
int Conn_commit(struct Conn *C)
{
int i, ret, listen_fd;
struct sockaddr *bind_psa = NULL;
struct sockaddr_in bind_sa;
struct sockaddr_in6 bind_sa6;
socklen_t bind_sock_len = 0;
int do_connect = 0;
unsigned char first_state = 0xff;
struct Conn_wpool_worker *w;
Log(10, "%s\n", __func__);
/* Be optimistical and increment here, in 'free' we will decrement. */
/* So, in error case, we will only decrement this thing! */
Conn_work_to_do++;
/* Try to figure what kind of socket is: client or master */
if (C->type == CONN_TYPE_UNK) {
snprintf(Conn_error, sizeof(Conn_error),
"cannot detect Conn type");
return -1;
}
if (C->type == CONN_TYPE_P2P) {
do_connect = 1;
} else {
if (C->bind_addr[0] == '\0') {
/* Choose defaults because IP was not specified */
switch (C->sock_domain) {
case PF_INET:
snprintf(C->bind_addr, sizeof(C->bind_addr),
"0.0.0.0"); break;
case PF_INET6:
snprintf(C->bind_addr, sizeof(C->bind_addr),
"::"); break;
}
}
}
/* TODO: why not move this in prepare_socket?! */
/* TODO: This way I can create the socket in the child */
switch (C->sock_domain) {
case PF_INET:
/* for binding socket */
if (C->bind_addr[0] != '\0') {
memset(&bind_sa, 0, sizeof(bind_sa));
bind_sa.sin_family = AF_INET;
ret = inet_pton(AF_INET, C->bind_addr, &bind_sa.sin_addr);
if (ret < 0) {
snprintf(Conn_error, sizeof(Conn_error),
"inet_pton(%s) failed", C->bind_addr);
return -1;
}
bind_sa.sin_port = htons(C->bind_port);
bind_psa = (struct sockaddr *) &bind_sa;
bind_sock_len = sizeof(bind_sa);
}
if (C->sock_type == SOCK_STREAM) {
first_state = CONN_STATE_LISTEN;
} else if (C->sock_type == SOCK_DGRAM) {
first_state = CONN_STATE_OPEN;
}
break;
case PF_INET6:
/* for binding socket */
if (C->bind_addr[0] != '\0') {
memset(&bind_sa6, 0, sizeof(bind_sa6));
bind_sa6.sin6_family = AF_INET6;
ret = inet_pton(AF_INET6, C->bind_addr, &bind_sa6.sin6_addr);
if (ret < 0) {
snprintf(Conn_error, sizeof(Conn_error),
"inet_pton(%s) failed", C->bind_addr);
return -1;
}
bind_sa6.sin6_port = htons(C->bind_port);
bind_psa = (struct sockaddr *) &bind_sa6;
bind_sock_len = sizeof(bind_sa6);
}
if (C->sock_type == SOCK_STREAM) {
first_state = CONN_STATE_LISTEN;
} else if (C->sock_type == SOCK_DGRAM) {
first_state = CONN_STATE_OPEN;
}
break;
case PF_PACKET:
/*do_bind = 0; TODO? */
first_state = CONN_STATE_OPEN;
break;
default:
snprintf(Conn_error, sizeof(Conn_error),
"Invalid domain [%d]!", C->sock_domain);
return -1;
}
// TODO: If we have workers, why would need to create a socket for master?!
#if 1
ret = Conn_prepare_socket(C, bind_psa, bind_sock_len);
if (ret == -1)
goto out_free;
C->fd = ret;
#else
C->fd = -1;
#endif
if (C->type == CONN_TYPE_MASTER) {
if (Conn_masters.head == NULL)
Conn_masters.head = C;
else
Conn_masters.tail->next = C;
Conn_masters.tail = C;
}
if (do_connect == 1) {
/* TODO:replace connect_a with OPEN?! */
first_state = CONN_STATE_CONNECT_a;
}
C->state = first_state;
if (!C->wp) {
Log(5, "%s We do not have a worker pool, create one...\n", __func__);
C->wp = Conn_wpool_create(1);
if (!C->wp)
goto out_free; // TODO check if is correct!
Conn_set_wp(C, C->wp);
}
/* Now, we must enqueue listening socket to workers */
for (i = 0; i < C->wp->workers; i++) {
w = &C->wp->ww[i];
/* TODO 'reuse' must be global, right? */
if (Conn_reuseport_available) {
Log(1, "%s: reuseport is available, create a new socket.\n",
__func__);
listen_fd = Conn_prepare_socket(C, bind_psa, bind_sock_len);
// TODO: why we do not do listen in prepare? Because we do not know if we should listen
} else {
Log(1, "%s: reuseport is NOT available, use master socket.\n",
__func__);
listen_fd = C->fd;
}
ret = Conn_wpool_start_worker(C, w, listen_fd);
if (ret != 0)
goto out_free;
if (Conn_reuseport_available)
close(listen_fd);
}
if (do_connect == 1)
Conn_pending++;
return 0;
out_free:
Conn_free_intern(C);
return -1;
}
/*
* OBSOLETE
*/
struct Conn *Conn_socket_addr(const int domain, const int type,
const char *addr, const int port)
{
struct Conn *C;
int ret;
C = Conn_alloc();
if (C == NULL)
return NULL;
Conn_set_socket_domain(C, domain);
Conn_set_socket_type(C, type);
Conn_set_socket_bind_addr(C, addr);
Conn_set_socket_bind_port(C, port);
ret = Conn_commit(C);
if (ret != 0)
return NULL;
return C;
}
/*
* OBSOLETE
*/
struct Conn *Conn_socket(const int domain, const int type, const int port)
{
char *addr;
switch (domain) {
case PF_INET: addr = "0.0.0.0"; break;
case PF_INET6: addr = "::"; break;
default: addr = NULL;
}
return Conn_socket_addr(domain, type, addr, port);
}
/*
* Connects an allocated socket
* OBSOLETE!
*/
struct Conn *Conn_connect(const int domain, const int type, const char *addr,
const int port)
{
struct Conn *X;
int ret;
Log(8, "%s(%s, %d)\n",
__func__, addr, port);
X = Conn_alloc();
if (!X)
return NULL;
Conn_set_socket_domain(X, domain);
Conn_set_socket_type(X, type);
Conn_set_socket_addr(X, addr);
Conn_set_socket_port(X, port);
ret = Conn_commit(X);
if (ret != 0)
return NULL;
return X;
}
#if 0
/*
* Add tokens to connection
*/
static void Conn_band_update(struct Conn *C)
{
long diff;
/* no need */
if (C->band_width == 0)
return;
diff = (Conn_now.tv_sec - C->band_lasttime.tv_sec) * 1000000;
diff += Conn_now.tv_usec - C->band_lasttime.tv_usec;
diff /= 100000;
/* already added in this hundred of milisecond? */
if (diff == 0)
return;
/* take care of time skew */
if (diff < 0)
diff = 1;
C->band_lasttime = Conn_now;
C->band_tokens += diff * C->band_width / 10;
if (C->band_tokens > C->band_factor * C->band_width)
C->band_tokens = C->band_factor * C->band_width;
if (C->cbs.send)
C->cbs.send(C);
Log(debug_band, "BAND: id %llu added tokens -> %u.\n",
C->id, C->band_tokens);
}
/*
* Set the bandwidth for a connection
* width is in 1b increments
*/
int Conn_band(struct Conn *C, const unsigned int width,
const unsigned int factor)
{
Log(11, "%s id %llu width=%u factor=%u.\n",
__func__, C->id, width, factor);
C->band_lasttime = Conn_now;
C->band_width = width;
C->band_factor = factor;
C->band_tokens = factor * width;
Log(debug_band, "\t\tBAND: lasttime=%d.%06d, width=%u, factor=%u, tokens=%u\n",
C->band_lasttime.tv_sec, C->band_lasttime.tv_usec,
C->band_width, C->band_factor, C->band_tokens);
return 0;
}
#endif
#if 0
TODO
static void Conn_trytoconnect(void)
{
struct addrinfo hints;
struct addrinfo *res;
int i, ret;
char port[8];
struct Conn *C;
Log(8, "%s Conn_pending=%d\n",
__func__, Conn_pending);
C = Conn_connect_list.head;
while (C) {
if ((C->state == CONN_STATE_CONNECT_0)
&& (C->tryat <= Conn_now.tv_sec))
C->state = CONN_STATE_CONNECT_a;
if (C->state != CONN_STATE_CONNECT_a) {
C = C->next;
continue;
}
Log(9, "Trying to connect id %llu, to %s/%d...\n",
C->id, C->addr, C->port);
memset(&hints, 0, sizeof(hints));
if (C->sock_domain == 0)
hints.ai_family = PF_UNSPEC;
else
hints.ai_family = C->sock_domain;
hints.ai_socktype = C->sock_type;
hints.ai_flags = AI_ADDRCONFIG;
snprintf(port, sizeof(port), "%d", C->port);
res = NULL;
ret = getaddrinfo(C->addr, port, &hints, &res);
if (ret != 0) {
C->state = CONN_STATE_ERROR;
C->error_state = CONN_ERROR_GETADDRINFO;
if (res)
freeaddrinfo(res);
C = C->next;
continue;
}
if (C->fd == -1) {
C->fd = socket(res->ai_family, res->ai_socktype, 0);
if (C->fd == -1) {
C->state = CONN_STATE_ERROR;
C->error_state = CONN_ERROR_SOCKET;
C->xerrno = errno;
freeaddrinfo(res);
C = C->next;
continue;
}
Log(10, "\tAllocated socket on fd %d\n", C->fd);
Conn_setnonblock(C->fd);
/* Need POLLOUT to signal when the connection was done. */
/*TODO ret = Conn_add_obj(Conn_epoll_fd, C, EPOLLIN | EPOLLRDHUP);*/
ret = Conn_add_obj(Conn_epoll_fd, C, EPOLLIN);
if (ret != 0) {
C->state = CONN_STATE_ERROR;
C->error_state = CONN_ERROR_SOCKET;
freeaddrinfo(res);
C = C->next;
continue;
}
}
/* Set syn time */
C->conn_syn = Conn_now;
Log(11, "\tConnecting...\n");
ret = connect(C->fd, res->ai_addr, res->ai_addrlen);
if ((ret != 0) && (errno != EINPROGRESS)) {
C->state = CONN_STATE_ERROR;
C->error_state = CONN_ERROR_CONNECT;
C->xerrno = errno;
freeaddrinfo(res);
C = C->next;
continue;
}
C->state = CONN_STATE_CONNECT_b;
Conn_pending--;
Conn_total++;
freeaddrinfo(res);
C = C->next;
}
Log(8, "%s After, Conn_pending=%d\n", __func__, Conn_pending);
}
#endif
#if 0
/*
* Moving an in-use slot over a free one for compacting reason.
*/
static void Conn_move_slot(const unsigned int dst, const unsigned int src)
{
struct Conn tmp;
if (dst == src)
return;
Log(10, "%s Moving id %llu from slot=%u to slot=%d...\n",
__func__, Conns[src].id, src, dst);
/* We need to save old location because of usefull pointers. */
tmp = Conns[dst];
Conns[dst] = Conns[src];
Conns[src] = tmp;
Conns[dst].slot = dst;
Conns[src].slot = src;
Conn_change_obj(&Conns[dst]);
}
#endif
/*
* Does the polling of file descriptors.
* Returns: -1 on error, 0 nothing to do, 1 if some work was done
* timeout is in 1/1000 seconds increments.
*/
int Conn_poll(const int timeout)
{
int ret;
int timeout2;
Log(9, "%s timeout=%d Conn_no=%d Conn_work_to_do=%u\n",
__func__, timeout, Conn_no, Conn_work_to_do);
if (timeout > 5000)
timeout2 = 5000;
else if (timeout == -1)
timeout2 = 5000;
else
timeout2 = timeout;
loop:
if (unlikely(Conn_must_stop == 1))
return 0;
if (unlikely(Conn_work_to_do == 0)) {
Log(9, "%s work_to_do is 0, so return 0!\n",
__func__);
return 0;
}
#if 0
TODO
if (Conn_pending > 0)
Conn_trytoconnect();
#endif
ret = Conn_dispatch_events(NULL, Conn_epoll_fd, Conn_epoll_events,
CONN_EVENTS_SLOTS, timeout2);
if (unlikely(ret < 0))
return -1;
#if 0
Log(9, "Do compacting, expiration and band stuff...\n");
TODO
slot = 0;
while (slot < Conn_no) {
/*
* Save last position because Conn_free_intern
* decrements Conn_no.
*/
last = Conn_no - 1;
/* Closing connection if it is in error state. */
if (C->error_state > 0) {
Log(11, "\tSlot=%u in error [%s], move and free it.\n",
slot, Conn_errno(C));
Conn_move_slot(slot, last);
Conn_free_intern(last);
continue;
}
/* No commit done yet */
if (C->state == CONN_STATE_EMPTY) {
slot++;
continue;
}
if (C->state == CONN_STATE_FREE) {
/* Must not happen! Probably stale events. */
Log(9, "\tBUG! Slot is in FREE state and has events!\n");
slot++;
continue;
}
/* test if it expired/timeout */
Conn_expire(slot);
/* add tokens */
Conn_band_update(slot);
slot++;
}
#endif
/* Any work left to do? */
if (Conn_no == 0) {
Log(10, "Nothing remained to poll for!\n");
return 0;
}
if (timeout == -1)
goto loop;
return ret;
}