catalinux / Conn (public) (License: LGPLv2) (since 2016-03-01) (hash sha1)
Net library for easy building ipv4/ipv6 network daemons/clients
List of commits:
Subject Hash Author Date (UTC)
Checkpoint before switching to processes not threads de43b387557dde215ab1210838d396e7e7b22c4f Catalin(ux) M. BOIE 2015-01-14 04:13:00
Wpools work now\! f875d6bea1777c3a290bf9bb1aa047f26c935a63 Catalin(ux) M. BOIE 2013-11-13 20:51:13
WIP 1d246f2130d4acf8c267e82051b250a623da6870 Catalin(ux) M. BOIE 2013-10-16 19:57:04
WIP 6387026db3ce7983e610887565a282f4124d4092 Catalin(ux) M. BOIE 2013-10-14 20:06:49
WIP 15f063f9d34d3f7b7ae9d9e83f59b4077515122b Catalin(ux) M. BOIE 2010-09-30 22:01:30
Switch licence to LGPLv3+; Do not stupidly close master socket. d3b1c4ccd591627e7faa0eeaaa3b2bc1ee20709e Catalin(ux) M. BOIE 2013-08-14 04:09:36
Duilder fixes. Removed -O0. Fixed spec file. d8a03dced52e918b6f66a05dfd64a3c75c07c91b Catalin(ux) M. BOIE 2011-12-14 09:12:55
Fixed a stupid logging bug (invalid number of parameters). e7d4c38d0130a142ac6c409c63d63201d2af08e2 Catalin(ux) M. BOIE 2010-12-22 16:46:47
Ignore all Changelog files. fa45b63d3db958228f44bcb3d6431d60f94d1147 Catalin(ux) M. BOIE 2010-12-22 16:46:16
Added mailmap file. a57dcfd6bdf6c8c86161cf7ce3fff942a714a2b9 Catalin(ux) M. BOIE 2010-12-22 16:46:04
WIP a69db41578de7ded49d656b7ea7cfae76c6695d9 Catalin(ux) M. BOIE 2010-09-30 22:01:30
Be more verbose in try_expand_buf and error out connection when cannot expand. e95ac8d7e5015958d3594862c6183b63bab80d4a Catalin(ux) M. BOIE 2010-09-30 20:58:08
Ignore xbind1 compiled example. 089b68cf9fc5c16ee7d8136b28a52ec4dd139c78 Catalin(ux) M. BOIE 2010-08-23 19:42:01
Removed direct access to Conn structure in s.c example. f6215273602571ef2c98479bdba930ebe312cbc9 Catalin(ux) M. BOIE 2010-08-23 19:38:33
Bumped the version to 1.0.32. f158fca2cf9f3285ae761cd4ce30b01911b84385 Catalin(ux) M. BOIE 2010-06-23 21:30:47
The cache for epoll_ctl, has to take also the slot in consideration. ed378c16927c707feee481b2cd0ea7fbdf257d9d Catalin(ux) M. BOIE 2010-06-09 19:18:03
After calling getsockname, set cache as clean. 199f0fd96b064fd1e63f5773b1e7ab58e7d91303 Catalin(ux) M. BOIE 2010-06-08 18:30:20
Do not try to call getsockname if connection is not opened. e9853b5b5a01df3c5e07f6fe4607a68fd7181b39 Catalin(ux) M. BOIE 2010-06-08 18:28:49
Supports kernels below 2.6.9 (epoll_ctl). 97919a022fae39bdefc3f7e4c50526fb473dfd34 Catalin(ux) M. BOIE 2010-06-08 18:06:35
Cosmetic logging. ad806920255f8ca5f9281b6b6a3a53edf2d5088f Catalin(ux) M. BOIE 2010-06-08 17:35:45
Commit de43b387557dde215ab1210838d396e7e7b22c4f - Checkpoint before switching to processes not threads
Author: Catalin(ux) M. BOIE
Author date (UTC): 2015-01-14 04:13
Committer name: Catalin(ux) M. BOIE
Committer date (UTC): 2015-01-14 04:13
Parent(s): f875d6bea1777c3a290bf9bb1aa047f26c935a63
Signing key:
Tree: b9fc563f9efb3fb3f741b86867d5cf80dea87453
File Lines added Lines deleted
Conn.c 816 392
Conn.h 9 0
Conn.spec.in 1 1
Conn_config.h.in 2 7
Makefile.in 14 5
TODO 269 18
configure 1 1
examples/Makefile 10 4
examples/wpool2.c 1 1
examples/ws1.c 48 3
tests/.gitignore 1 0
tests/Makefile 1 1
tests/ws_double.expect 1 0
tests/ws_double.sh 11 0
File Conn.c changed (mode: 100644) (index 2a7b800..4a588a1)
10 10 #include "Conn.h" #include "Conn.h"
11 11
12 12 #include <sys/epoll.h> #include <sys/epoll.h>
13 #include <sys/timerfd.h>
13 14 #include <stdarg.h> #include <stdarg.h>
14 15 #include <string.h> #include <string.h>
15 16 #include <unistd.h> #include <unistd.h>
 
... ... enum CONN_ERROR {
65 66 CONN_ERROR_INTERNAL CONN_ERROR_INTERNAL
66 67 }; };
67 68
69 struct Conn_pool
70 {
71 struct Conn *head, *tail;
72 unsigned int allocated;
73 unsigned short next_block;
74 void *blocks[4096]; /* TODO: make it dynamic */
75 };
76
68 77 struct Conn_wpool_worker struct Conn_wpool_worker
69 78 { {
70 79 int epoll_fd; int epoll_fd;
 
... ... struct Conn_wpool_worker
74 83 pthread_t tid; pthread_t tid;
75 84 struct Conn *conns_head, *conns_tail; struct Conn *conns_head, *conns_tail;
76 85 unsigned short id; /* Used for logging */ unsigned short id; /* Used for logging */
86 int timer;
77 87
78 88 /* Keep track of free structures */ /* Keep track of free structures */
79 struct Conn *free_head, *free_tail;
80 unsigned int free_count;
89 struct Conn_pool free;
81 90
82 91 /* stats */ /* stats */
83 92 unsigned long mem_structs; /* How much bytes are used for conns */ unsigned long mem_structs; /* How much bytes are used for conns */
93 unsigned long long in_clients; /* How many clients connected */
94 unsigned long long false_accepts; /* How many times we got EAGAIN on accept */
84 95 }; };
85 96
86 97 struct Conn_wpool struct Conn_wpool
 
... ... struct Conn
113 124 struct Conn *next; struct Conn *next;
114 125
115 126 int fd; int fd;
116 unsigned int revents; /* I/O events */ /* TODO: really needed? */
117 127
118 128 unsigned char type; unsigned char type;
119 129
 
... ... struct Conn
127 137 unsigned int obuf_size, obuf_head, obuf_tail; unsigned int obuf_size, obuf_head, obuf_tail;
128 138
129 139 struct timeval trecv, tsend; /* last time we saw an receive/send */ struct timeval trecv, tsend; /* last time we saw an receive/send */
130 time_t idle; /* idle time allowed */
140 unsigned int idle; /* idle time allowed TODO: ms? */
131 141 unsigned int read_timeout; /* Max timeout for receiving an answer (milliseconds) */ unsigned int read_timeout; /* Max timeout for receiving an answer (milliseconds) */
132 142
133 143 struct timeval conn_syn; /* Time when a connection was initiated */ struct timeval conn_syn; /* Time when a connection was initiated */
134 144 unsigned int conn_timeout; /* Timeout for connection (milliseconds) */ unsigned int conn_timeout; /* Timeout for connection (milliseconds) */
135 145
136 time_t last_trigger; /* last trigger was at */
137 time_t trigger; /* Frequency of wakeup a connection */
146 unsigned int last_trigger; /* last trigger was at TODO: make it an offset from start. */
147 unsigned int trigger; /* Frequency of wakeup a connection */
138 148
139 149 int sock_domain; int sock_domain;
140 150 int sock_type; int sock_type;
 
... ... struct Conn
142 152
143 153 char addr[40], bind_addr[40]; char addr[40], bind_addr[40];
144 154 int port, bind_port; int port, bind_port;
145 unsigned long long via; /* "via" is the connection id via a client was accepted */
146 155
147 156 unsigned long long bi, bo; unsigned long long bi, bo;
148 157
 
... ... struct Conn
151 160 /* reconnect stuff */ /* reconnect stuff */
152 161 unsigned int retries; unsigned int retries;
153 162 unsigned int delay; /* delay between reconnects */ unsigned int delay; /* delay between reconnects */
154 time_t tryat; /* when we go to CONNECT_a state */
163 unsigned int tryat; /* when we go to CONNECT_a state */
155 164
156 165 int xerrno; int xerrno;
157 166
 
... ... struct Conn
163 172 unsigned char shutdown_after_send:1; unsigned char shutdown_after_send:1;
164 173 unsigned char local_dirty:1; unsigned char local_dirty:1;
165 174 unsigned char remote_dirty:1; unsigned char remote_dirty:1;
166 unsigned char accept_pending:1;
167 unsigned char is_freed:1; /* 1 is is freed */
168 175
169 176 /* bandwidth stuff */ /* bandwidth stuff */
170 177 struct timeval band_lasttime; /* last time tokens were added */ struct timeval band_lasttime; /* last time tokens were added */
 
... ... struct Conn
180 187
181 188 /* wpool */ /* wpool */
182 189 struct Conn_wpool *wp; /* Worker pool associated with this conn */ struct Conn_wpool *wp; /* Worker pool associated with this conn */
183 struct Conn_wpool_worker *ww; /* used to link C in active list of a worker */
190 struct Conn_wpool_worker *worker; /* used to link C in active list of a worker */
191
192 /* web server */
193 struct Conn_ws *ws;
184 194 }; };
185 195
186 struct Conn_pool
196 /* For web server */
197 #define CONN_WS_PATH 0
198 #define CONN_WS_SCRIPT 1
199
200 #define CONN_WS_REQ_GET 1
201 #define CONN_WS_REQ_POST 2
202
203 struct Conn_ws_url
187 204 { {
188 pthread_spinlock_t lock;
189 struct Conn *head, *tail;
190 unsigned int allocated;
191 unsigned short next_block;
192 void *blocks[4096]; /* TODO: make it dynamic */
205 struct Conn_ws_url *next;
206 char *url;
207 char *path;
208 void (*cb)(struct Conn *);
209 unsigned char type:1; /* CONN_WS_* */
193 210 }; };
194 211
212 struct Conn_ws
213 {
214 struct Conn_ws_url *urls;
215 unsigned char req_type:2; /* CONN_WS_REQ_* */
216 char *url; /* requested url */
217 char *paras; /* parameters */
218 unsigned char http_protocol; /* 10, 11, 20 etc. */
219 };
195 220
196 221 /* For parsing */ /* For parsing */
197 222 struct Conn_split_cell struct Conn_split_cell
 
... ... static unsigned int Conn_max = 0;
229 254 static unsigned long Conn_total = 0; static unsigned long Conn_total = 0;
230 255 static unsigned int Conn_start = 0; static unsigned int Conn_start = 0;
231 256 static unsigned int Conn_pending = 0; static unsigned int Conn_pending = 0;
232 static struct timeval Conn_now;
233 static unsigned short Conn_debug_level = 0; /* debug level */
257 static __thread struct timeval Conn_now;
258 static unsigned short Conn_debug_level = 0; /* debug level */
234 259 /* memory stuff */ /* memory stuff */
235 260 static unsigned long long Conn_mem_buffers_in = 0; static unsigned long long Conn_mem_buffers_in = 0;
236 261 static unsigned long long Conn_mem_buffers_out = 0; static unsigned long long Conn_mem_buffers_out = 0;
 
... ... static void Conn_default_cbs_trigger(struct Conn *C);
264 289 static void Conn_default_cbs_error(struct Conn *C); static void Conn_default_cbs_error(struct Conn *C);
265 290 static void Conn_default_cbs_connected(struct Conn *C); static void Conn_default_cbs_connected(struct Conn *C);
266 291 static void Conn_default_cbs_accept_error(struct Conn *C); static void Conn_default_cbs_accept_error(struct Conn *C);
267 int Conn_wpool_enqueue(struct Conn_wpool *wp, struct Conn *C);
268 292
269 293 static struct Conn_cbs Conn_default_cbs = static struct Conn_cbs Conn_default_cbs =
270 294 { {
 
... ... static struct Conn_cbs Conn_default_cbs =
281 305
282 306
283 307 /* ############## misc ############# */ /* ############## misc ############# */
284 #if 0
308 #if 1
285 309 __cold void Log(const unsigned short level, char *format, ...) __cold void Log(const unsigned short level, char *format, ...)
286 310 { {
287 311 va_list ap; va_list ap;
 
... ... __cold void Log(const unsigned short level, char *format, ...)
299 323 vsnprintf(line + len, sizeof(line) - len, format, ap); vsnprintf(line + len, sizeof(line) - len, format, ap);
300 324 va_end(ap); va_end(ap);
301 325
302 //pthread_spin_lock(&Log_lock);
326 pthread_spin_lock(&Log_lock);
303 327 write(Conn_log_fd, line, strlen(line)); write(Conn_log_fd, line, strlen(line));
304 //pthread_spin_unlock(&Log_lock);
328 pthread_spin_unlock(&Log_lock);
305 329 } }
306 330 #else #else
307 __cold inline void Log(const unsigned short level, char *format, ...)
331 inline void Log(const unsigned short level, char *format, ...)
308 332 { {
309 333 return; return;
310 334 } }
311 335 #endif #endif
312 336
313 __hot static inline void my_pthread_spin_lock(pthread_spinlock_t *lock)
337 static inline void my_pthread_spin_lock(pthread_spinlock_t *lock)
314 338 { {
315 339 int r; int r;
316 340
 
... ... __hot static inline void my_pthread_spin_lock(pthread_spinlock_t *lock)
322 346 } }
323 347 } }
324 348
325 __hot static inline void my_pthread_spin_unlock(pthread_spinlock_t *lock)
349 static inline void my_pthread_spin_unlock(pthread_spinlock_t *lock)
326 350 { {
327 351 int r; int r;
328 352
 
... ... static void Conn_sys(void)
485 509 somaxconn, tcp_max_tw_buckets, tcp_fin_timeout); somaxconn, tcp_max_tw_buckets, tcp_fin_timeout);
486 510 } }
487 511
512 /*
513 * Adds an fd to poll system
514 */
515 static inline int Conn_add_obj(int epoll_fd, struct Conn *C,
516 const unsigned int events)
517 {
518 int ret;
519 struct epoll_event ev;
520
521 memset(&ev, 0, sizeof(struct epoll_event));
522 ev.data.ptr = C;
523 ev.events = events;
524 ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, C->fd, &ev);
525 if (likely(ret == 0))
526 return 0;
527
528 Log(1, "%s Could not add fd %d to epoll_fd %d, events 0x%x (%s)!\n",
529 __func__, C->fd, epoll_fd, events, strerror(errno));
530 C->xerrno = errno;
531 snprintf(Conn_error, sizeof(Conn_error),
532 strerror(C->xerrno));
533 return C->xerrno;
534 }
535
536 /*
537 * Change events for a fd
538 */
539 static inline int Conn_change_obj(int epoll_fd, struct Conn *C,
540 const unsigned int events)
541 {
542 int ret;
543 struct epoll_event ev;
544
545 memset(&ev, 0, sizeof(struct epoll_event));
546 ev.data.ptr = C;
547 ev.events = events;
548 ret = epoll_ctl(epoll_fd, EPOLL_CTL_MOD, C->fd, &ev);
549 if (likely(ret == 0))
550 return 0;
551
552 Log(1, "%s Could not change fd %d to epoll_fd %d, events 0x%x (%s)!\n",
553 __func__, C->fd, epoll_fd, events, strerror(errno));
554 C->xerrno = errno;
555 snprintf(Conn_error, sizeof(Conn_error),
556 strerror(C->xerrno));
557 return C->xerrno;
558 }
559
560
488 561 /* ############## split ############ */ /* ############## split ############ */
489 562 /* /*
490 563 * Cut from right, in place, the chars specified in @chars. * Cut from right, in place, the chars specified in @chars.
 
... ... void Conn_del_wp(struct Conn *C, struct Conn_wpool *wp)
777 850 } }
778 851
779 852
853 /* ############ Web server functions ########### */
854
855 /*
856 * Allocates a web server structure (Conn_ws)
857 */
858 static struct Conn_ws *Conn_ws_alloc(void)
859 {
860 struct Conn_ws *ret;
861
862 ret = (struct Conn_ws *) malloc(sizeof(struct Conn_ws));
863 if (ret == NULL)
864 return NULL;
865
866 ret->urls = NULL;
867
868 return ret;
869 }
870
871 /*
872 * Frees a Conn_ws structure
873 */
874 void Conn_ws_free(struct Conn_ws *ws)
875 {
876 struct Conn_ws_url *u, *next;
877
878 u = ws->urls;
879 while (u) {
880 next = u->next;
881 free(u->url);
882 if (u->type == CONN_WS_PATH)
883 free(u->path);
884 free(u);
885 u = next;
886 }
887
888 free(ws);
889 }
890
891 /*
892 * Creates a web server and attach it to a C
893 * Returns 0 if OK, -1 on error.
894 */
895 int Conn_ws_create(struct Conn *C)
896 {
897 struct Conn_ws *ws;
898
899 Log(10, "%s", __func__);
900
901 ws = Conn_ws_alloc();
902 if (!ws)
903 return -1;
904
905 if (C->ws)
906 Conn_ws_free(C->ws);
907
908 C->ws = ws;
909 Log(10, "%s: attached!", __func__);
910
911 return 0;
912 }
913
914 /*
915 * Returns the string request type
916 */
917 static char *Conn_ws_req_type(const unsigned char req_type)
918 {
919 switch (req_type) {
920 case CONN_WS_REQ_GET: return "GET";
921 case CONN_WS_REQ_POST: return "POST";
922 }
923
924 return "UNK";
925 }
926
927 /*
928 * Attach a function to an URL
929 * Example: Conn_ws_script(C, "/cgi-bin/script1", function_script1)
930 */
931 int Conn_ws_script(struct Conn *C, const char *url, void(*cb)(struct Conn *C))
932 {
933 struct Conn_ws_url *u, *p;
934
935 u = (struct Conn_ws_url *) malloc(sizeof(struct Conn_ws_url));
936 if (!u)
937 return -1;
938
939 u->type = CONN_WS_SCRIPT;
940 u->next = NULL;
941
942 u->url = strdup(url);
943 if (!u->url) {
944 free(u);
945 return -1;
946 }
947 u->cb = cb;
948
949 if (C->ws->urls == NULL) {
950 C->ws->urls = u;
951 } else {
952 p = C->ws->urls;
953 while (p->next)
954 p = p->next;
955 p->next = u;
956 }
957
958 return 0;
959 }
960
961 /*
962 * Attach a dir/file to an URL
963 * Example: Conn_ws_path(C, "/img", "/home/images/v1")
964 */
965 int Conn_ws_path(struct Conn *C, const char *url, const char *path)
966 {
967 struct Conn_ws_url *u, *p;
968
969 u = (struct Conn_ws_url *) malloc(sizeof(struct Conn_ws_url));
970 if (!u)
971 return -1;
972
973 u->type = CONN_WS_PATH;
974 u->next = NULL;
975
976 u->url = strdup(url);
977 if (!u->url) {
978 free(u);
979 return -1;
980 }
981
982 u->path = strdup(path);
983 if (!u->path) {
984 free(u->url);
985 free(u);
986 return -1;
987 }
988
989 if (C->ws->urls == NULL) {
990 C->ws->urls = u;
991 } else {
992 p = C->ws->urls;
993 while (p->next)
994 p = p->next;
995 p->next = u;
996 }
997
998 return 0;
999 }
1000
1001 /*
1002 * Loads a file and outputs it to a connection
1003 */
1004 static int Conn_ws_read_file(struct Conn *C, const char *path)
1005 {
1006 /*
1007 We have a problem here. If file is big, we need to fill a buffer
1008 and return. when buffer is getting low, refill.
1009 */
1010
1011 return 0;
1012 }
1013
1014 static void Conn_ws_dispatch_path(struct Conn *C, struct Conn_ws_url *u)
1015 {
1016 char tmp[128], body[128];
1017
1018 Log(2, "%s: req_type=[%s] url=[%s] http_protcol=%hhu\n",
1019 __func__, Conn_ws_req_type(C->ws->req_type), C->ws->url,
1020 C->ws->http_protocol);
1021
1022 if (C->ws->req_type != CONN_WS_REQ_GET) {
1023 /* TODO: send some error */
1024 return;
1025 }
1026
1027 snprintf(body, sizeof(body), "path");
1028 snprintf(tmp, sizeof(tmp),
1029 "HTTP/1.1 200 OK\r\n"
1030 "Content-Length: %zu\r\n"
1031 "\r\n"
1032 "%s",
1033 strlen(body), body);
1034 Conn_enqueue(C, tmp, strlen(tmp));
1035 }
1036
1037 /*
1038 * Dispatch a web server request
1039 */
1040 static void Conn_ws_dispatch(struct Conn *C)
1041 {
1042 struct Conn_ws_url *u = NULL;
1043 char *sep, *url, *paras = "";
1044 char tmp[128];
1045 unsigned char req_type = 0, match;
1046 unsigned char http_protocol = 11, tmp8;
1047
1048 url = Conn_ibuf(C);
1049 Log(1, "%s [%s]\n", __func__, url);
1050
1051 sep = Conn_strstr(C, "\r\n\r\n");
1052 if (sep == NULL)
1053 return;
1054
1055 if (strncmp(url, "GET ", 4) == 0) {
1056 req_type = CONN_WS_REQ_GET;
1057 url += 4;
1058 } else if (strncmp(url, "POST ", 5) == 0) {
1059 req_type = CONN_WS_REQ_POST;
1060 url += 5;
1061 }
1062
1063 if (unlikely(req_type == 0)) {
1064 /* Invalid request */
1065 snprintf(tmp, sizeof(tmp), "HTTP/1.1 500 Invalid request\r\n"
1066 "\r\n"
1067 "Invalid request");
1068 Conn_enqueue(C, tmp, strlen(tmp));
1069 return;
1070 }
1071
1072 sep = strchr(url, ' ');
1073 if (sep)
1074 *sep = '\0';
1075 Log(2, "%s: Request req_type=%s [%s]!\n",
1076 __func__, Conn_ws_req_type(req_type), url);
1077
1078 /* TODO: parse HTTP type */
1079 if (sep) {
1080 sep = strstr(sep + 1, "HTTP/");
1081 if (sep) {
1082 sep += 5;
1083 tmp8 = sep[0] - '0';
1084 if ((sep[1] == '.') && (sep[2] != '\0'))
1085 http_protocol = tmp8 * 10 + sep[2] - '0';
1086 }
1087 }
1088
1089 sep = strchr(url, '?');
1090 if (sep) {
1091 *sep = '\0';
1092 paras = sep + 1;
1093 }
1094
1095 u = C->ws->urls;
1096 while (u) {
1097 Log(2, "Comparing [%s] with [%s]...\n", url, u->url);
1098
1099 if (u->type == CONN_WS_PATH)
1100 match = strncmp(url, u->url, strlen(u->url));
1101 else
1102 match = strcmp(url, u->url);
1103
1104 if (match == 0) {
1105 C->ws->req_type = req_type;
1106 C->ws->url = url;
1107 C->ws->paras = paras;
1108 C->ws->http_protocol = http_protocol;
1109
1110 if (u->type == CONN_WS_PATH)
1111 Conn_ws_dispatch_path(C, u);
1112 else
1113 u->cb(C);
1114
1115 /* TODO: also, if not keepalive is present */
1116 if (C->ws->http_protocol == 10)
1117 Conn_close(C);
1118
1119 /* TODO: start timer to close connection */
1120
1121 break;
1122 }
1123 u = u->next;
1124 }
1125
1126 if (unlikely(!u)) {
1127 /* not found */
1128 snprintf(tmp, sizeof(tmp), "HTTP/1.1 404 Not found\r\n"
1129 "\r\n"
1130 "Path not found");
1131 Conn_enqueue(C, tmp, strlen(tmp));
1132 }
1133 }
1134
1135
780 1136 /* ########### conn ############## */ /* ########### conn ############## */
781 1137 /* /*
782 1138 * Inits a conn pool * Inits a conn pool
 
... ... static void pool_init(struct Conn_pool *x)
786 1142 x->head = NULL; x->head = NULL;
787 1143 x->allocated = 0; x->allocated = 0;
788 1144 x->next_block = 0; x->next_block = 0;
789 pthread_spin_init(&x->lock, PTHREAD_PROCESS_PRIVATE);
790 1145 } }
791 1146
792 1147 /* /*
 
... ... static void Conn_error_raise(struct Conn *C, const int err)
855 1210 C->cbs.error(C); C->cbs.error(C);
856 1211 } }
857 1212
1213 #if 0
858 1214 /* set noblocking */ /* set noblocking */
859 1215 static int Conn_setnonblock(const int fd) static int Conn_setnonblock(const int fd)
860 1216 { {
861 1217 return fcntl(fd, F_SETFL, O_NONBLOCK | O_RDWR); return fcntl(fd, F_SETFL, O_NONBLOCK | O_RDWR);
862 1218 } }
1219 #endif
863 1220
864 1221 /* /*
865 1222 * Expand the requested buffer * Expand the requested buffer
 
... ... static void Conn_speed(char *dst, const unsigned int dst_len,
1171 1528 static char *Conn_status_slot(struct Conn *C) static char *Conn_status_slot(struct Conn *C)
1172 1529 { {
1173 1530 static char tmp[1024]; static char tmp[1024];
1174 char pollr[16];
1175 1531 char speedi[32], speedo[32]; char speedi[32], speedo[32];
1176 1532 unsigned int dT, si, so; unsigned int dT, si, so;
1177 1533 char flags[128], flags_prefix[3], flags_postfix[2]; char flags[128], flags_prefix[3], flags_postfix[2];
 
... ... static char *Conn_status_slot(struct Conn *C)
1208 1564
1209 1565 strcat(flags, flags_postfix); strcat(flags, flags_postfix);
1210 1566
1211 Conn_poll_status(C->revents, pollr);
1212
1213 1567 dT = Conn_now.tv_sec - C->start; dT = Conn_now.tv_sec - C->start;
1214 1568 if (dT == 0) if (dT == 0)
1215 1569 dT = 1; dT = 1;
 
... ... static char *Conn_status_slot(struct Conn *C)
1242 1596 " %s/%s/%s" " %s/%s/%s"
1243 1597 " %s %s" " %s %s"
1244 1598 " %s/%d <-> %s/%d" " %s/%d <-> %s/%d"
1245 " via=%llu [%s] IO=%llu/%llu"
1599 " IO=%llu/%llu"
1246 1600 " BS=%u/%u S=%s/%s" " BS=%u/%u S=%s/%s"
1247 1601 " T=%ld bw=%u f=%u tk=%u next=%p" " T=%ld bw=%u f=%u tk=%u next=%p"
1248 1602 "%s", "%s",
 
... ... static char *Conn_status_slot(struct Conn *C)
1251 1605 Conn_get_socket_protocol(C), Conn_get_socket_protocol(C),
1252 1606 Conn_socktype(C), Conn_state(C), Conn_socktype(C), Conn_state(C),
1253 1607 local_addr, local_port, remote_addr, remote_port, local_addr, local_port, remote_addr, remote_port,
1254 C->via, pollr, C->bi, C->bo,
1608 C->bi, C->bo,
1255 1609 C->ibuf_size, C->obuf_size, speedi, speedo, C->ibuf_size, C->obuf_size, speedi, speedo,
1256 1610 Conn_now.tv_sec - C->start, Conn_now.tv_sec - C->start,
1257 1611 C->band_width, C->band_factor, C->band_tokens, C->band_width, C->band_factor, C->band_tokens,
 
... ... static char *Conn_status_slot(struct Conn *C)
1261 1615 } }
1262 1616
1263 1617 #if 0 #if 0
1618 // TODO: pollr was removed, remove also from header
1264 1619 static char *Conn_status_slot_html(struct Conn *C) static char *Conn_status_slot_html(struct Conn *C)
1265 1620 { {
1266 1621 static char tmp[1024]; static char tmp[1024];
1267 char pollr[16], *ext = "";
1622 char *ext = "";
1268 1623 char speedi[32], speedo[32]; char speedi[32], speedo[32];
1269 1624 unsigned int dT, si, so; unsigned int dT, si, so;
1270 1625
1271 Conn_poll_status(C->revents, pollr);
1272
1273 1626 dT = Conn_now.tv_sec - C->start; dT = Conn_now.tv_sec - C->start;
1274 1627 if (dT == 0) if (dT == 0)
1275 1628 dT = 1; dT = 1;
 
... ... static char *Conn_status_slot_html(struct Conn *C)
1289 1642 "<td>%s</td><td>%s</td><td>%s</td>" "<td>%s</td><td>%s</td><td>%s</td>"
1290 1643 "<td>%s</td><td>%s</td>" "<td>%s</td><td>%s</td>"
1291 1644 "<td>%s/%d</td>" "<td>%s/%d</td>"
1292 "<td>%llu</td><td>%s</td><td>%llu / %llu</td>"
1645 "<td>%llu / %llu</td>"
1293 1646 "<td>%u / %u</td><td>%s / %s</td><td>%ld</td>" "<td>%u / %u</td><td>%s / %s</td><td>%ld</td>"
1294 1647 "<td>%u</td><td>%u</td><td>%u</td>" "<td>%u</td><td>%u</td><td>%u</td>"
1295 1648 "%s", "%s",
1296 1649 C->id, C->fd, C->id, C->fd,
1297 1650 Conn_domain(C), Conn_type(C), Conn_get_socket_protocol(C), Conn_domain(C), Conn_type(C), Conn_get_socket_protocol(C),
1298 1651 Conn_socktype(C), Conn_state(C), Conn_socktype(C), Conn_state(C),
1299 C->addr, C->port, C->via, pollr, C->bi, C->bo,
1652 C->addr, C->port, C->bi, C->bo,
1300 1653 C->ibuf_size, C->obuf_size, C->ibuf_size, C->obuf_size,
1301 1654 speedi, speedo, Conn_now.tv_sec - C->start, speedi, speedo, Conn_now.tv_sec - C->start,
1302 1655 C->band_width, C->band_factor, C->band_tokens, C->band_width, C->band_factor, C->band_tokens,
 
... ... char *Conn_status(const unsigned int flags)
1356 1709 strcat(buf, "<td>SType</td>"); strcat(buf, "<td>SType</td>");
1357 1710 strcat(buf, "<td>State</td>"); strcat(buf, "<td>State</td>");
1358 1711 strcat(buf, "<td>Addr/port</td>"); strcat(buf, "<td>Addr/port</td>");
1359 strcat(buf, "<td>Via</td>");
1360 strcat(buf, "<td>Pollr</td>");
1361 1712 strcat(buf, "<td>BI/BO</td>"); strcat(buf, "<td>BI/BO</td>");
1362 1713 strcat(buf, "<td>BUF I/O</td>"); strcat(buf, "<td>BUF I/O</td>");
1363 1714 strcat(buf, "<td>Speed I/O</td>"); strcat(buf, "<td>Speed I/O</td>");
 
... ... char *Conn_status(const unsigned int flags)
1387 1738 if (flags & 1) if (flags & 1)
1388 1739 strcat(buf, "<tr bgcolor=\"ffffff\">\n"); strcat(buf, "<tr bgcolor=\"ffffff\">\n");
1389 1740
1390 Conn_poll_status(C->revents, pollr);
1391
1392 1741 if ((flags & 1) == 0) if ((flags & 1) == 0)
1393 1742 per_slot = Conn_status_slot(slot); per_slot = Conn_status_slot(slot);
1394 1743 else else
 
... ... unsigned int Conn_qlen(const struct Conn *C)
1447 1796 return Conn_iqlen(C); return Conn_iqlen(C);
1448 1797 } }
1449 1798
1450 #if 0
1451 TODO - really needed?
1452 1799 /* /*
1453 1800 * Returns 1 if we can ignore this connection * Returns 1 if we can ignore this connection
1801 * TODO - really needed?
1454 1802 */ */
1455 1803 static int Conn_ignore(struct Conn *C) static int Conn_ignore(struct Conn *C)
1456 1804 { {
1457 1805 if (C->error_state > 0) if (C->error_state > 0)
1458 1806 return 1; return 1;
1459 1807
1460 if (C->is_freed == 1)
1808 if (C->state == CONN_STATE_FREE)
1461 1809 return 1; return 1;
1462 1810
1463 1811 return 0; return 0;
1464 1812 } }
1465 #endif
1466 1813
1467 1814 #if 0 #if 0
1468 1815 /* /*
 
... ... __hot static void Conn_free_intern(struct Conn *C)
1813 2160 if (unlikely(C->type == CONN_TYPE_MASTER)) if (unlikely(C->type == CONN_TYPE_MASTER))
1814 2161 Conn_wpool_put(C->wp); Conn_wpool_put(C->wp);
1815 2162
1816 /* Reset tsend, else we enter in a timeout error loop */
1817 C->tsend.tv_sec = 0;
1818 C->tsend.tv_usec = 0;
2163 if (unlikely(C->auto_reconnect == 1)) {
2164 /* Reset tsend, else we enter in a timeout error loop */
2165 C->tsend.tv_sec = 0;
2166 C->tsend.tv_usec = 0;
1819 2167
1820 /* Reset the connection attempt time */
1821 C->conn_syn.tv_sec = 0;
1822 C->conn_syn.tv_usec = 0;
2168 /* Reset the connection attempt time */
2169 C->conn_syn.tv_sec = 0;
2170 C->conn_syn.tv_usec = 0;
1823 2171
1824 /* Misc */
1825 C->error_state = 0;
1826 C->revents = 0;
2172 /* Misc */
2173 C->error_state = 0;
1827 2174
1828 if (unlikely(C->auto_reconnect == 1)) {
1829 2175 C->tryat = Conn_now.tv_sec + C->delay; C->tryat = Conn_now.tv_sec + C->delay;
1830 2176 C->state = CONN_STATE_CONNECT_0; C->state = CONN_STATE_CONNECT_0;
1831 2177
 
... ... __hot static void Conn_free_intern(struct Conn *C)
1837 2183
1838 2184 Conn_pending++; Conn_pending++;
1839 2185 } else { } else {
1840 C->is_freed = 1;
1841 2186 C->type = CONN_TYPE_UNK; C->type = CONN_TYPE_UNK;
1842 2187 C->state = CONN_STATE_FREE; C->state = CONN_STATE_FREE;
1843 2188
 
... ... __hot static void Conn_free_intern(struct Conn *C)
1871 2216 } }
1872 2217
1873 2218 /* Add it to free list of the worker pool */ /* Add it to free list of the worker pool */
1874 if (likely(C->ww)) {
1875 struct Conn_wpool_worker *ww = C->ww;
2219 if (likely(C->worker)) {
2220 struct Conn_wpool_worker *worker = C->worker;
1876 2221
1877 //Log(0, "%p: ww->free before: %p/%p\n",
1878 // C, ww->free_head, ww->free_tail);
2222 //Log(0, "%p: worker->free before: %p/%p\n",
2223 // C, worker->free_head, worker->free_tail);
1879 2224 C->next = NULL; C->next = NULL;
1880 C->ww = NULL;
1881 if (unlikely(ww->free_head == NULL))
1882 ww->free_head = C;
1883 else
1884 ww->free_tail->next = C;
1885 ww->free_tail = C;
1886 //Log(0, "%p: ww->free after: %p/%p ww->free->next=%p\n",
1887 // C, ww->free_head, ww->free_tail, ww->free_head->next);
1888 //if ((ww->free_head == ww->free_tail) && (ww->free_head->next != NULL)) {
2225 C->worker = NULL; /* TODO: not needed */
2226 if (unlikely(worker->free.head == NULL)) {
2227 worker->free.head = C;
2228 worker->free.tail = C;
2229 } else {
2230 C->next = worker->free.head;
2231 worker->free.head = C;
2232 }
2233 //Log(0, "%p: worker->free after: %p/%p worker->free->next=%p\n",
2234 // C, ww->free.head, worker->free.tail, worker->free.head->next);
2235 //if ((worker->free.head == worker->free.tail) && (worker->free.head->next != NULL)) {
1889 2236 // Log(0, "### List is corrupted!\n"); // Log(0, "### List is corrupted!\n");
1890 2237 // abort(); // abort();
1891 2238 //} //}
1892 ww->free_count++;
1893 2239 } }
1894 2240 } }
1895 2241 } }
 
... ... __hot void Conn_close(struct Conn *C)
1910 2256 C->shutdown_after_send = 1; C->shutdown_after_send = 1;
1911 2257 Log(9, "%p %s Set SHUTDOWN_AFTER_SEND;" Log(9, "%p %s Set SHUTDOWN_AFTER_SEND;"
1912 2258 " We have data in out buffer; kick sending.\n", C, __func__); " We have data in out buffer; kick sending.\n", C, __func__);
2259 /* TODO: send and receive are sensitive.
1913 2260 if (likely(C->cbs.send)) if (likely(C->cbs.send))
1914 2261 C->cbs.send(C); C->cbs.send(C);
2262 */
2263 Conn_default_cbs_send(C);
1915 2264 } }
1916 2265 } }
1917 2266
 
... ... void Conn_set(struct Conn *C, const unsigned int var, const int val)
1961 2310 } }
1962 2311 } }
1963 2312
1964 /*
1965 * Adds an fd to poll system
1966 */
1967 static int Conn_add_obj(int epoll_fd, struct Conn *C)
1968 {
1969 int ret;
1970 struct epoll_event ev;
1971
1972 memset(&ev, 0, sizeof(struct epoll_event));
1973 ev.data.ptr = C;
1974 ev.events = EPOLLIN | EPOLLRDHUP | EPOLLOUT | EPOLLET;
1975 ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, C->fd, &ev);
1976 if (ret == -1) {
1977 C->xerrno = errno;
1978 snprintf(Conn_error, sizeof(Conn_error),
1979 strerror(C->xerrno));
1980 return C->xerrno;
1981 }
1982
1983 return 0;
1984 }
1985
1986 /*
1987 * Dispatch events to a callback
1988 * Returns: -1 on error, 0 nothing to do, n (>0) if some work was done
1989 * timeout is in miliseconds.
1990 */
1991 __hot static int Conn_dispatch_events(int epoll_fd, struct epoll_event *e,
1992 const unsigned short e_size, const int timeout,
1993 void (*cb)(struct Conn *C, unsigned int revents))
1994 {
1995 int i, events;
1996 struct Conn *C;
1997
1998 Log(10, "%s timeout2=%ums...\n", __func__, timeout);
1999 while (1) {
2000 events = epoll_wait(epoll_fd, e, e_size, timeout);
2001 if (unlikely((events == -1) && (errno == EINTR)))
2002 continue;
2003 break;
2004 };
2005
2006 if (unlikely(events < 0)) {
2007 Log(1, "%s e_size=%d [%s]\n",
2008 __func__, e_size, strerror(errno));
2009 snprintf(Conn_error, sizeof(Conn_error), "epoll error (%d/%d) (%s)",
2010 events, errno, strerror(errno));
2011 return -1;
2012 }
2013
2014 /* TODO: this must be protected by a lock or force per thread */
2015 /* Or just call gettimeofday when needed */
2016 gettimeofday(&Conn_now, NULL);
2017
2018 if (unlikely(Conn_debug_level > 8)) {
2019 Log(0, "Processing %d event(s)...\n", events);
2020 for (i = 0; i < events; i++) {
2021 C = e[i].data.ptr;
2022 Log(0, "%p ### ev %d %x\n", C, i, e[i].events);
2023 }
2024 }
2025 for (i = 0; i < events; i++) {
2026 C = e[i].data.ptr;
2027 cb(C, e[i].events);
2028 }
2029
2030 return events;
2031 }
2032
2033 2313 /* /*
2034 2314 * Grow Conn structures (cells) * Grow Conn structures (cells)
2035 2315 * Caller must have x->lock taken. * Caller must have x->lock taken.
 
... ... __cold static int pool_grow(struct Conn_pool *x, const unsigned int increment)
2081 2361 } }
2082 2362
2083 2363 /* /*
2084 * Allocs a Conn structure
2364 * This is called after allocation to init some fields.
2085 2365 */ */
2086 __hot struct Conn *Conn_alloc(void)
2366 __hot struct Conn *Conn_alloc_prepare(struct Conn *C)
2087 2367 { {
2088 struct Conn *C;
2089 unsigned int growok;
2090 2368 void *p; void *p;
2091 2369
2092 Log(10, "%s Conn_no=%d Conn_max=%d\n",
2093 __func__, Conn_no, Conn_max);
2094
2095 if (unlikely((Conn_max > 0) && (Conn_no >= Conn_max))) {
2096 snprintf(Conn_error, sizeof(Conn_error),
2097 "Limit reached! Consider a raise of max connection number or put 0 for no limit.");
2098 return NULL;
2099 }
2100
2101 my_pthread_spin_lock(&Conn_free.lock);
2102 if (unlikely(Conn_free.head == NULL)) {
2103 growok = pool_grow(&Conn_free, CONN_BULK_ALLOC);
2104 if (unlikely(growok != 0)) {
2105 snprintf(Conn_error, sizeof(Conn_error),
2106 "Cannot grow anymore. Probably memory shortage.");
2107 return NULL;
2108 }
2109 }
2110
2111 /* Steal from free list */
2112 C = Conn_free.head;
2113 Conn_free.head = C->next;
2114 C->next = NULL;
2115
2116 my_pthread_spin_unlock(&Conn_free.lock);
2117
2118 2370 if (unlikely(Conn_no > Conn_max_reached)) if (unlikely(Conn_no > Conn_max_reached))
2119 2371 Conn_max_reached = Conn_no; Conn_max_reached = Conn_no;
2120 2372
2121 C->type = CONN_TYPE_UNK;
2373 /* We assume that will be a master */
2374 C->type = CONN_TYPE_MASTER;
2122 2375 C->state = CONN_STATE_EMPTY; C->state = CONN_STATE_EMPTY;
2376 C->error_state = CONN_ERROR_NO_ERROR;
2377 C->xerrno = 0;
2378
2379 C->sock_protocol = 0;
2380 C->sock_domain = PF_INET;
2381 C->sock_type = SOCK_STREAM;
2123 2382
2124 2383 if (unlikely(C->ibuf_size < Conn_default_ibuf)) { if (unlikely(C->ibuf_size < Conn_default_ibuf)) {
2125 2384 p = realloc(C->ibuf, Conn_default_ibuf); p = realloc(C->ibuf, Conn_default_ibuf);
 
... ... __hot struct Conn *Conn_alloc(void)
2165 2424 C->shutdown_after_send = 0; C->shutdown_after_send = 0;
2166 2425 C->local_dirty = 1; C->local_dirty = 1;
2167 2426 C->remote_dirty = 1; C->remote_dirty = 1;
2168 C->accept_pending = 1;
2169 C->is_freed = 0;
2170 2427
2171 2428 /* bandwidth */ /* bandwidth */
2172 2429 C->band_width = 0; C->band_width = 0;
 
... ... __hot struct Conn *Conn_alloc(void)
2175 2432 C->band_lasttime = Conn_now; C->band_lasttime = Conn_now;
2176 2433
2177 2434 C->fd = -1; C->fd = -1;
2178 C->revents = 0;
2179 2435
2180 2436 C->start = Conn_now.tv_sec; C->start = Conn_now.tv_sec;
2181 2437
 
... ... __hot struct Conn *Conn_alloc(void)
2184 2440 /* TODO: really? Don't we borrow them from master listent conn?! */ /* TODO: really? Don't we borrow them from master listent conn?! */
2185 2441 C->cbs = Conn_default_cbs; C->cbs = Conn_default_cbs;
2186 2442
2443 C->ws = NULL;
2444
2187 2445 Conn_no++; Conn_no++;
2188 2446 /* Conn_work_to_do will not be incremented here, only in commit! */ /* Conn_work_to_do will not be incremented here, only in commit! */
2189 2447
 
... ... __hot struct Conn *Conn_alloc(void)
2193 2451 return C; return C;
2194 2452 } }
2195 2453
2454 /*
2455 * Allocates a Conn structure. This is called by the user.
2456 */
2457 __hot struct Conn *Conn_alloc(void)
2458 {
2459 struct Conn *C;
2460 int r;
2461
2462 Log(10, "%s Conn_no=%d Conn_max=%d\n",
2463 __func__, Conn_no, Conn_max);
2464
2465 if (unlikely((Conn_max > 0) && (Conn_no >= Conn_max))) {
2466 snprintf(Conn_error, sizeof(Conn_error),
2467 "Limit reached! Consider a raise of max connection number or put 0 for no limit.");
2468 return NULL;
2469 }
2470
2471 if (unlikely(Conn_free.head == NULL)) {
2472 r = pool_grow(&Conn_free, 4);
2473 if (unlikely(r != 0)) {
2474 snprintf(Conn_error, sizeof(Conn_error),
2475 "Cannot grow anymore. Probably memory shortage.");
2476 return NULL;
2477 }
2478 }
2479
2480 /* Steal from free list */
2481 C = Conn_free.head;
2482 Conn_free.head = C->next;
2483 C->next = NULL;
2484
2485 return Conn_alloc_prepare(C);
2486 }
2487
2488 /*
2489 * Allocates a Conn structure, worker version
2490 */
2491 __hot struct Conn *Conn_alloc_worker(struct Conn_wpool_worker *w)
2492 {
2493 struct Conn *C;
2494 int r;
2495
2496 if (unlikely(w->free.head == NULL)) {
2497 r = pool_grow(&w->free, CONN_BULK_ALLOC);
2498 if (unlikely(r != 0)) {
2499 snprintf(Conn_error, sizeof(Conn_error),
2500 "Cannot grow anymore. Probably memory shortage.");
2501 return NULL;
2502 }
2503 }
2504
2505 /* Steal from free list */
2506 C = w->free.head;
2507 w->free.head = C->next;
2508 C->next = NULL;
2509 C->worker = w;
2510
2511 return Conn_alloc_prepare(C);
2512 }
2513
2196 2514 /* /*
2197 2515 * Accepting a connection * Accepting a connection
2516 * "C" is shared with the other threads. TODO: make it less ugly.
2198 2517 */ */
2199 2518 __hot static void Conn_accept(struct Conn *C) __hot static void Conn_accept(struct Conn *C)
2200 2519 { {
 
... ... __hot static void Conn_accept(struct Conn *C)
2212 2531 Conn_domain(C), Conn_get_socket_protocol(C)); Conn_domain(C), Conn_get_socket_protocol(C));
2213 2532 } }
2214 2533
2215 switch(C->sock_domain) {
2534 switch (C->sock_domain) {
2216 2535 case PF_INET: case PF_INET:
2217 2536 pca = (struct sockaddr *) &ca4; pca = (struct sockaddr *) &ca4;
2218 2537 cax_len = sizeof(ca4); cax_len = sizeof(ca4);
 
... ... __hot static void Conn_accept(struct Conn *C)
2233 2552
2234 2553 again: again:
2235 2554 fd = accept4(C->fd, pca, &cax_len, SOCK_NONBLOCK | SOCK_CLOEXEC); fd = accept4(C->fd, pca, &cax_len, SOCK_NONBLOCK | SOCK_CLOEXEC);
2236 if (fd == -1) {
2555 if (unlikely(fd == -1)) {
2556 if (errno == EAGAIN)
2557 return;
2558
2237 2559 if (unlikely(errno == EINTR)) if (unlikely(errno == EINTR))
2238 2560 goto again; goto again;
2239 2561
2240 if (unlikely(errno == EAGAIN)) {
2241 Log(10, "accept returned EAGAIN. Return\n");
2242 C->revents &= ~EPOLLIN;
2243 return;
2244 }
2245
2246 2562 /* TODO: ratelimit */ /* TODO: ratelimit */
2247 2563 Log(2, "WARN: Cannot accept on fd %d [%s].\n", Log(2, "WARN: Cannot accept on fd %d [%s].\n",
2248 2564 C->fd, strerror(errno)); C->fd, strerror(errno));
 
... ... __hot static void Conn_accept(struct Conn *C)
2255 2571 return; return;
2256 2572 } }
2257 2573
2258 X = Conn_alloc();
2574 X = Conn_alloc_worker(C->worker);
2259 2575 if (unlikely(!X)) { if (unlikely(!X)) {
2576 /* TODO: really? We will close the master fd! */
2260 2577 Conn_error_raise(C, ENOMEM); Conn_error_raise(C, ENOMEM);
2261 2578 close(fd); close(fd);
2262 2579 return; return;
 
... ... __hot static void Conn_accept(struct Conn *C)
2267 2584 X->type = CONN_TYPE_P2P; X->type = CONN_TYPE_P2P;
2268 2585 X->state = CONN_STATE_OPEN; X->state = CONN_STATE_OPEN;
2269 2586 X->time_open = Conn_now; X->time_open = Conn_now;
2270 X->via = C->id;
2587 X->ws = C->ws;
2271 2588
2589 /* We should replace them with a pointer to master C */
2272 2590 Conn_set_socket_domain(X, C->sock_domain); Conn_set_socket_domain(X, C->sock_domain);
2273 2591 Conn_set_socket_type(X, C->sock_type); Conn_set_socket_type(X, C->sock_type);
2274 2592 Conn_set_socket_protocol(X, C->sock_protocol); Conn_set_socket_protocol(X, C->sock_protocol);
2275 2593
2276 2594 X->local_dirty = 1; X->local_dirty = 1;
2277 2595 X->remote_dirty = 1; X->remote_dirty = 1;
2278 X->accept_pending = 1;
2279 2596
2280 Conn_nodelay(X);
2281
2282 err = Conn_wpool_enqueue(C->wp, X);
2597 /*TODO err = Conn_add_obj(C->worker->epoll_fd, X, EPOLLIN | EPOLLRDHUP);*/
2598 err = Conn_add_obj(C->worker->epoll_fd, X, EPOLLIN);
2283 2599 if (unlikely(err != 0)) { if (unlikely(err != 0)) {
2284 2600 /* TODO: add a specific callback for enqueue errors? */ /* TODO: add a specific callback for enqueue errors? */
2285 2601 Conn_error_raise(C, CONN_ERROR_INTERNAL); Conn_error_raise(C, CONN_ERROR_INTERNAL);
 
... ... __hot static void Conn_accept(struct Conn *C)
2287 2603 return; return;
2288 2604 } }
2289 2605
2606 /* TODO: is possible to not be really needed
2607 Conn_nodelay(X);
2608 */
2609
2610 #if 0
2611 if (X->cbs.accept)
2612 X->cbs.accept(X);
2613 #endif
2614 Conn_default_cbs_accept(X);
2615
2290 2616 Conn_work_to_do++; Conn_work_to_do++;
2291 2617
2292 2618 Conn_total++; Conn_total++;
2619 C->worker->in_clients++;
2293 2620
2294 2621 /* We must call accept till we get EAGAIN */ /* We must call accept till we get EAGAIN */
2622 /* TODO: seems is not a good idea afterall. We may starge the rest of connections.
2295 2623 goto again; goto again;
2624 */
2296 2625 } }
2297 2626
2298 2627 /* /*
2299 2628 * Callback that is called for every connection * Callback that is called for every connection
2300 2629 */ */
2301 __hot static void Conn_poll_cb(struct Conn *C, unsigned int revents)
2630 static inline void Conn_poll_cb(struct Conn *C, const unsigned int revents)
2302 2631 { {
2303 char poll_status1[16], poll_status2[16];
2632 char poll_status[16];
2304 2633
2305 2634 if (unlikely(Conn_debug_level > 5)) { if (unlikely(Conn_debug_level > 5)) {
2306 Conn_poll_status(C->revents, poll_status1);
2307 Conn_poll_status(revents, poll_status2);
2308 Log(0, "%p %s id %llu, revents=prev=%s/new=%s\n",
2309 C, __func__, C->id, poll_status1, poll_status2);
2635 Conn_poll_status(revents, poll_status);
2636 Log(0, "%p %s id %llu, revents=%s\n",
2637 C, __func__, C->id, poll_status);
2310 2638
2311 2639 Log(12, "%s\n", Conn_status_slot(C)); Log(12, "%s\n", Conn_status_slot(C));
2312 2640 } }
2313 2641
2314 C->revents |= revents;
2315
2316 2642 /* We may receive several events on the same conn */ /* We may receive several events on the same conn */
2317 2643 if (unlikely(C->state == CONN_STATE_FREE)) if (unlikely(C->state == CONN_STATE_FREE))
2318 2644 return; return;
2319 2645
2320 if (unlikely(C->accept_pending == 1)) {
2321 /* Add it to active connections - only if is a worker */
2322 if (likely(C->ww)) {
2323 #if 0
2324 TODO: seems we have a problem with the list management. Try to isolate it.
2325 Log(0, "%p ### conns=%p/%p\n", C, C->ww->conns_head,
2326 C->ww->conns_tail);
2327 if (C->ww->conns_head == NULL)
2328 C->ww->conns_head = C;
2329 else
2330 C->ww->conns_tail->next = C;
2331 C->ww->conns_tail = C;
2332 C->next = NULL;
2333 Log(0, "%p: ### after adding C. conns=%p/%p\n",
2334 C->ww->conns_head, C->ww->conns_tail);
2335 #endif
2336
2337 if (C->cbs.accept) {
2338 Log(12, "%p Accept cb was not called, call now\n", C);
2339 C->cbs.accept(C);
2340 }
2341 C->accept_pending = 0;
2342 }
2343 }
2344
2345 if (unlikely(C->revents & EPOLLERR)) {
2646 if (unlikely(revents & EPOLLERR)) {
2346 2647 Log(0, "%p EPOLLERR!\n", C); Log(0, "%p EPOLLERR!\n", C);
2347 2648 C->error_state = CONN_ERROR_POLL; C->error_state = CONN_ERROR_POLL;
2348 2649 C->xerrno = 0; /* TODO: unknown error? */ C->xerrno = 0; /* TODO: unknown error? */
 
... ... __hot static void Conn_poll_cb(struct Conn *C, unsigned int revents)
2351 2652 return; return;
2352 2653 } }
2353 2654
2354 if (unlikely(C->revents & EPOLLHUP)) {
2655 if (unlikely(revents & EPOLLHUP)) {
2355 2656 Log(9, "%p EPOLLHUP!\n", C); Log(9, "%p EPOLLHUP!\n", C);
2356 2657 C->error_state = CONN_ERROR_HANGUP; C->error_state = CONN_ERROR_HANGUP;
2357 2658 Conn_free_intern(C); Conn_free_intern(C);
 
... ... __hot static void Conn_poll_cb(struct Conn *C, unsigned int revents)
2359 2660 } }
2360 2661
2361 2662 /* First, test we have a new connection */ /* First, test we have a new connection */
2362 if (C->revents & EPOLLOUT) {
2663 if (revents & EPOLLOUT) {
2363 2664 if (C->state == CONN_STATE_CONNECT_b) { if (C->state == CONN_STATE_CONNECT_b) {
2364 2665 Log(9, "%p We just established a connection.\n", C); Log(9, "%p We just established a connection.\n", C);
2365 2666
 
... ... __hot static void Conn_poll_cb(struct Conn *C, unsigned int revents)
2367 2668 C->local_dirty = 1; C->local_dirty = 1;
2368 2669 C->time_open = Conn_now; C->time_open = Conn_now;
2369 2670
2671 /* TODO: indirect call costs a lot. Hm.
2370 2672 if (likely(C->cbs.connected)) if (likely(C->cbs.connected))
2371 2673 C->cbs.connected(C); C->cbs.connected(C);
2674 */
2675 Conn_default_cbs_connected(C);
2372 2676 } }
2373 2677 } }
2374 2678 //if (Conn_ignore(C)) //if (Conn_ignore(C))
2375 2679 // return; // return;
2376 2680
2377 2681 /* Second, test for hangup or input */ /* Second, test for hangup or input */
2378 if (likely(C->revents & EPOLLIN)) {
2379 Log(9, "%p We have input...\n", C);
2682 if (likely(revents & EPOLLIN)) {
2380 2683 if (unlikely(C->type == CONN_TYPE_MASTER)) { if (unlikely(C->type == CONN_TYPE_MASTER)) {
2381 2684 Conn_accept(C); Conn_accept(C);
2382 2685 } else { } else {
2686 Log(9, "%p We have input\n", C);
2687 /* TODO: use a callback or not?
2383 2688 if (likely(C->cbs.recv)) if (likely(C->cbs.recv))
2384 2689 C->cbs.recv(C); C->cbs.recv(C);
2690 */
2691 Conn_default_cbs_recv(C);
2385 2692 } }
2386 2693 } }
2387 //if (Conn_ignore(C))
2388 // return;
2694 if (Conn_ignore(C))
2695 return;
2389 2696
2390 2697 /* RDHUP may come with POLLIN, so it must be called after */ /* RDHUP may come with POLLIN, so it must be called after */
2391 if (unlikely(C->revents & EPOLLRDHUP)) {
2698 if (unlikely(revents & EPOLLRDHUP)) {
2392 2699 Log(9, "%p EPOLLRDHUP!\n", C); Log(9, "%p EPOLLRDHUP!\n", C);
2393 C->revents &= ~EPOLLIN;
2700 /* TODO: What we do here? */
2394 2701 } }
2702 if (Conn_ignore(C))
2703 return;
2395 2704
2396 if (likely(C->revents & EPOLLOUT)) {
2397 Log(9, "%p We can send data...\n", C);
2705 if (likely(revents & EPOLLOUT)) {
2706 Log(9, "%p We can send data (state=%u)...\n", C, C->state);
2398 2707 if (likely(C->state == CONN_STATE_OPEN)) { if (likely(C->state == CONN_STATE_OPEN)) {
2708 /*
2399 2709 if (likely(C->cbs.send)) if (likely(C->cbs.send))
2400 2710 C->cbs.send(C); C->cbs.send(C);
2711 */
2712 Conn_default_cbs_send(C);
2401 2713 } }
2402 2714 } }
2403 //if (Conn_ignore(C))
2404 // return;
2715 }
2716
2717 /*
2718 * Dispatch events to a callback
2719 * Returns: -1 on error, 0 nothing to do, n (>0) if some work was done
2720 * timeout is in miliseconds.
2721 */
2722 static inline int Conn_dispatch_events(struct Conn_wpool_worker *w,
2723 int epoll_fd, struct epoll_event *e,
2724 const unsigned short e_size, const int timeout)
2725 {
2726 int i, events, r;
2727 struct Conn *C;
2728 uint64_t ticks;
2729 char sevents[16];
2730
2731 Log(10, "%s timeout2=%dms...\n", __func__, timeout);
2732 while (1) {
2733 events = epoll_wait(epoll_fd, e, e_size, timeout);
2734 if (unlikely(events < 0)) {
2735 if (errno == EINTR)
2736 continue;
2737
2738 Log(1, "%s e_size=%d [%s]\n",
2739 __func__, e_size, strerror(errno));
2740 snprintf(Conn_error, sizeof(Conn_error), "epoll error (%d/%d) (%s)",
2741 events, errno, strerror(errno));
2742 return -1;
2743 }
2744
2745 break;
2746 }
2747
2748 if (events == 0)
2749 return 0;
2750
2751 if (unlikely(Conn_debug_level > 8)) {
2752 Log(0, "Processing %d event(s)...\n", events);
2753 for (i = 0; i < events; i++) {
2754 C = e[i].data.ptr;
2755 Conn_poll_status(e[i].events, sevents);
2756 Log(0, "%p ### ev i=%d %s\n", C, i, sevents);
2757 }
2758 }
2759 for (i = 0; i < events; i++) {
2760 /* Special events */
2761 if (unlikely(e[i].data.fd == 1)) {
2762 Log(0, "We have a master signaling.\n");
2763 continue;
2764 }
2765
2766 if (unlikely(e[i].data.fd == 2)) {
2767 r = read(w->timer, &ticks, sizeof(ticks));
2768 if (r != 8) {
2769 Log(0, "Cannot read timer fd (%s).\n", strerror(errno));
2770 continue;
2771 }
2772 Log(10, "We have a timer tick (ticks %llu).\n", ticks);
2773 gettimeofday(&Conn_now, NULL);
2774 /* TODO: optimeze here and try to deduct time! */
2775 continue;
2776 }
2777
2778 Conn_poll_cb(e[i].data.ptr, e[i].events);
2779 }
2780
2781 return events;
2405 2782 } }
2406 2783
2407 2784
2408 2785 /* ########### wpool ########### */ /* ########### wpool ########### */
2409 2786 /* TODO: Seems we have two wpool sections! */ /* TODO: Seems we have two wpool sections! */
2787 /*
2788 * Dumps some stats about a worker
2789 */
2790 static void Conn_wpool_worker_stats(const struct Conn_wpool_worker *w)
2791 {
2792 Log(0, "Worker %hu: in_clients: %llu false_accepts=%llu\n",
2793 w->id, w->in_clients, w->false_accepts);
2794 }
2795
2410 2796 /* /*
2411 2797 * Function that is executed by a worker. * Function that is executed by a worker.
2412 2798 * It accepts connections, receives and sends commands. * It accepts connections, receives and sends commands.
2413 2799 */ */
2414 void *Conn_wpool_worker_func(void *arg)
2800 static void *Conn_wpool_worker_func(void *arg)
2415 2801 { {
2416 2802 struct Conn_wpool_worker *w = arg; struct Conn_wpool_worker *w = arg;
2417 2803 char tmp[32]; char tmp[32];
 
... ... void *Conn_wpool_worker_func(void *arg)
2422 2808 Log(5, "Started work...\n"); Log(5, "Started work...\n");
2423 2809
2424 2810 while (1) { while (1) {
2425 Conn_dispatch_events(w->epoll_fd, w->events,
2426 CONN_EVENTS_SLOTS, 60000, Conn_poll_cb);
2427
2428 /* Send back to master the free connections */
2429 if (unlikely(w->free_count >= CONN_GIVE_BACK_LIMIT)) {
2430 if ((w->free_head == w->free_tail) & (w->free_head->next != NULL)) {
2431 Log(0, "### List is wrong! TODO\n");
2432 abort();
2433 }
2434
2435 my_pthread_spin_lock(&Conn_free.lock);
2436 if (Conn_free.head == NULL)
2437 Conn_free.head = w->free_head;
2438 else
2439 Conn_free.tail->next = w->free_head;
2440 Conn_free.tail = w->free_tail;
2441 my_pthread_spin_unlock(&Conn_free.lock);
2811 Conn_dispatch_events(w, w->epoll_fd, w->events,
2812 CONN_EVENTS_SLOTS, -1);
2442 2813
2443 w->free_head = NULL;
2444 w->free_count = 0;
2445 }
2814 //sched_yield();
2815 //Conn_wpool_worker_stats(w);
2446 2816 } }
2447 2817
2448 2818 return NULL; return NULL;
 
... ... void *Conn_wpool_worker_func(void *arg)
2451 2821 /* /*
2452 2822 * Destroy a worker * Destroy a worker
2453 2823 */ */
2454 void Conn_wpool_stop_worker(struct Conn_wpool_worker *w)
2824 static void Conn_wpool_stop_worker(struct Conn_wpool_worker *w)
2455 2825 { {
2456 2826 int r; int r;
2457 2827
2458 2828 if (w->inited == 0) if (w->inited == 0)
2459 2829 return; return;
2460 2830
2461 close(w->epoll_fd);
2462
2463 2831 close(w->pipe[0]); close(w->pipe[0]);
2464 2832 close(w->pipe[1]); close(w->pipe[1]);
2833 close(w->timer);
2834 close(w->epoll_fd);
2465 2835
2466 2836 r = pthread_cancel(w->tid); r = pthread_cancel(w->tid);
2467 2837 if (r != 0) { if (r != 0) {
 
... ... void Conn_wpool_stop_worker(struct Conn_wpool_worker *w)
2471 2841
2472 2842 /* /*
2473 2843 * Start a worker * Start a worker
2844 * master_fd is the fd associated with the listening socket.
2474 2845 */ */
2475 int Conn_wpool_start_worker(struct Conn_wpool_worker *w)
2846 static int Conn_wpool_start_worker(struct Conn_wpool_worker *w)
2476 2847 { {
2477 2848 int r, cpus; int r, cpus;
2478 2849 struct epoll_event ev; struct epoll_event ev;
2479 2850 cpu_set_t cpuset; cpu_set_t cpuset;
2480 2851 pthread_attr_t attr; pthread_attr_t attr;
2852 struct itimerspec new_value;
2481 2853
2482 2854 Log(10, "Creating worker %p (%hu)...\n", w, w->id); Log(10, "Creating worker %p (%hu)...\n", w, w->id);
2483 2855
2484 2856 w->conns_head = NULL; w->conns_head = NULL;
2485 w->free_head = NULL;
2486 w->free_tail = NULL; /* TODO: not needed */
2487 w->free_count = 0;
2857 pool_init(&w->free);
2488 2858 w->mem_structs = 0; /* TODO: really used? */ w->mem_structs = 0; /* TODO: really used? */
2859 w->in_clients = 0;
2860 w->false_accepts = 0;
2489 2861
2490 2862 w->epoll_fd = epoll_create1(EPOLL_CLOEXEC); w->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
2491 2863 if (w->epoll_fd == -1) { if (w->epoll_fd == -1) {
 
... ... int Conn_wpool_start_worker(struct Conn_wpool_worker *w)
2493 2865 goto out; goto out;
2494 2866 } }
2495 2867
2868 w->timer = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC);
2869 if (w->timer == -1) {
2870 Log(0, "Cannot create timer fd (%s).\n", strerror(errno));
2871 goto close_epoll;
2872 }
2873 /* arm timer - once per second */
2874 new_value.it_value.tv_sec = 1;
2875 new_value.it_value.tv_nsec = 0;
2876 new_value.it_interval.tv_sec = 1;
2877 new_value.it_interval.tv_nsec = 0;
2878 r = timerfd_settime(w->timer, 0 /* relative */, &new_value, NULL);
2879 if (r == -1) {
2880 Log(0, "Cannot arm timer (%s).\n", strerror(errno));
2881 goto close_timer;
2882 }
2883 /* add timer to poll */
2884 memset(&ev, 0, sizeof(struct epoll_event));
2885 ev.events = EPOLLIN;
2886 ev.data.u32 = 2; /* TODO: Find a better id */
2887 r = epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, w->timer, &ev);
2888 if (r != 0) {
2889 Log(0, "Cannot add timer to epoll (%s).\n", strerror(errno));
2890 goto close_timer;
2891 }
2892
2496 2893 r = socketpair(AF_LOCAL, SOCK_STREAM, 0, w->pipe); r = socketpair(AF_LOCAL, SOCK_STREAM, 0, w->pipe);
2497 2894 if (r != 0) { if (r != 0) {
2498 2895 Log(0, "Cannot call socketpair (%s)\n", strerror(errno)); Log(0, "Cannot call socketpair (%s)\n", strerror(errno));
2499 goto close_epoll;
2896 goto close_timer;
2500 2897 } }
2501
2502 2898 /* Register our end of the pipe to receive signaling from 'accept' thread */ /* Register our end of the pipe to receive signaling from 'accept' thread */
2503 2899 memset(&ev, 0, sizeof(struct epoll_event)); memset(&ev, 0, sizeof(struct epoll_event));
2504 2900 ev.events = EPOLLIN; ev.events = EPOLLIN;
2505 ev.data.u32 = 1; /* TODO: Found a better id */
2901 ev.data.u32 = 1; /* TODO: Find a better id */
2506 2902 r = epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, w->pipe[1], &ev); r = epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, w->pipe[1], &ev);
2903 if (r != 0) {
2904 Log(0, "Cannot add pipe[1] to epoll (%s).\n", strerror(errno));
2905 goto close_pipe;
2906 }
2507 2907
2508 2908 r = pthread_attr_init(&attr); r = pthread_attr_init(&attr);
2509 2909 if (r != 0) { if (r != 0) {
 
... ... int Conn_wpool_start_worker(struct Conn_wpool_worker *w)
2549 2949 destroy_attr: destroy_attr:
2550 2950 pthread_attr_destroy(&attr); pthread_attr_destroy(&attr);
2551 2951
2952 close_timer:
2953 close(w->timer);
2954
2552 2955 close_epoll: close_epoll:
2553 2956 close(w->epoll_fd); close(w->epoll_fd);
2554 2957
 
... ... int Conn_wpool_start_worker(struct Conn_wpool_worker *w)
2558 2961
2559 2962 /* /*
2560 2963 * Creates a workers pool * Creates a workers pool
2964 * @M is the master struct. It is needed to enqueue the master fd to workers.
2561 2965 */ */
2562 2966 struct Conn_wpool *Conn_wpool_create(const unsigned short workers) struct Conn_wpool *Conn_wpool_create(const unsigned short workers)
2563 2967 { {
 
... ... struct Conn_wpool *Conn_wpool_create(const unsigned short workers)
2578 2982 goto free_ret; goto free_ret;
2579 2983 } }
2580 2984
2985 /* TODO: what I am doing with this?! Signaling between works and pool? */
2581 2986 ret->epoll_fd = epoll_create1(EPOLL_CLOEXEC); ret->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
2582 2987 if (ret->epoll_fd == -1) { if (ret->epoll_fd == -1) {
2583 2988 Log(0, "Cannot create epoll fd (%s)\n", strerror(errno)); Log(0, "Cannot create epoll fd (%s)\n", strerror(errno));
 
... ... int Conn_wpool_destroy(struct Conn_wpool *wp)
2639 3044 return 0; return 0;
2640 3045 } }
2641 3046
2642 /*
2643 * Enqueues a connection to a workers pool
2644 */
2645 int Conn_wpool_enqueue(struct Conn_wpool *wp, struct Conn *C)
2646 {
2647 short i;
2648 int r;
2649
2650 /* Round robin algo */
2651 i = wp->next;
2652 wp->next = (wp->next + 1) % wp->workers;
2653
2654 /* We need it later to add C to active conns */
2655 C->ww = &wp->ws[i];
2656
2657 Log(10, "%p %s Enqueue C to worker %u\n", C, __func__, C->ww->id);
2658
2659 r = Conn_add_obj(C->ww->epoll_fd, C);
2660 if (unlikely(r != 0)) {
2661 Log(0, "Cannot enqueue fd %d to epoll_fd %d (%s)\n",
2662 C->fd, C->ww->epoll_fd, strerror(errno));
2663 return -1;
2664 }
2665
2666 return 0;
2667 }
2668
2669 3047
2670 3048 /* ############## conn 2 ############# */ /* ############## conn 2 ############# */
2671 3049 int Conn_init(const unsigned int max) int Conn_init(const unsigned int max)
 
... ... int Conn_init(const unsigned int max)
2703 3081 /* Log some info about this system */ /* Log some info about this system */
2704 3082 Conn_sys(); Conn_sys();
2705 3083
3084 Log(0, "sizeof(struct Conn)=%u\n", sizeof(struct Conn));
3085
2706 3086 return 0; return 0;
2707 3087 } }
2708 3088
 
... ... int Conn_shutdown(void)
2739 3119 } }
2740 3120
2741 3121 /* /*
2742 * Enqueues data but do not kick the sending
3122 * Enqueues data but does not kick the sending
2743 3123 */ */
2744 3124 __hot int Conn_enqueue_wait(struct Conn *C, const void *buf, const size_t count) __hot int Conn_enqueue_wait(struct Conn *C, const void *buf, const size_t count)
2745 3125 { {
 
... ... __hot int Conn_enqueue(struct Conn *C, const void *buf, const size_t count)
2774 3154
2775 3155 ret = Conn_enqueue_wait(C, buf, count); ret = Conn_enqueue_wait(C, buf, count);
2776 3156
2777 if (likely((ret > 0) && (C->cbs.send)))
2778 C->cbs.send(C);
3157 if (likely(ret > 0))
3158 Conn_default_cbs_send(C);
2779 3159
2780 3160 return ret; return ret;
2781 3161 } }
2782 3162
3163 /*
3164 * Kicks sending (after Conn_enqueue_wait).
3165 */
3166 __hot void Conn_kick(struct Conn *C)
3167 {
3168 Log(10, "%s id=%llu\n", __func__, C->id);
3169 Conn_default_cbs_send(C);
3170 }
3171
2783 3172 int Conn_set_socket_domain(struct Conn *C, const int domain) int Conn_set_socket_domain(struct Conn *C, const int domain)
2784 3173 { {
2785 3174 C->sock_domain = domain; C->sock_domain = domain;
 
... ... int Conn_set_socket_bind_addr(struct Conn *C, const char *addr)
2813 3202 int Conn_set_socket_addr(struct Conn *C, const char *addr) int Conn_set_socket_addr(struct Conn *C, const char *addr)
2814 3203 { {
2815 3204 snprintf(C->addr, sizeof(C->addr), "%s", addr); snprintf(C->addr, sizeof(C->addr), "%s", addr);
3205 C->type = CONN_TYPE_P2P;
2816 3206 return 0; return 0;
2817 3207 } }
2818 3208
 
... ... int Conn_set_socket_port(struct Conn *C, const int port)
2828 3218 return 0; return 0;
2829 3219 } }
2830 3220
3221 /*
3222 * Clones a Conn structure to be used for splitting between workers
3223 */
3224 __cold static struct Conn *Conn_clone(struct Conn *C)
3225 {
3226 struct Conn *ret;
3227
3228 ret = malloc(sizeof(struct Conn));
3229 if (!ret) {
3230 Log(1, "Cannot alloc memory for a clone!\n");
3231 return NULL;
3232 }
3233
3234 memcpy(ret, C, sizeof(struct Conn));
3235
3236 return ret;
3237 }
3238
2831 3239 /* /*
2832 3240 * Allocates socket and bind if asked * Allocates socket and bind if asked
2833 3241 * TODO: We should return -1 or free connection and call calbacks?! * TODO: We should return -1 or free connection and call calbacks?!
 
... ... int Conn_commit(struct Conn *C)
2842 3250 int do_listen = 1, do_connect = 0; int do_listen = 1, do_connect = 0;
2843 3251 int first_state = -1; int first_state = -1;
2844 3252 struct Conn_wpool *wp; struct Conn_wpool *wp;
3253 struct Conn_wpool_worker *w;
3254 struct Conn *clone;
2845 3255
2846 3256 Log(10, "%p %s\n", C, __func__); Log(10, "%p %s\n", C, __func__);
2847 3257
 
... ... int Conn_commit(struct Conn *C)
2851 3261
2852 3262 /* Try to figure what kind of socket is: client or master */ /* Try to figure what kind of socket is: client or master */
2853 3263 if (C->type == CONN_TYPE_UNK) { if (C->type == CONN_TYPE_UNK) {
2854 if (C->addr[0] != '\0') {
2855 C->type = CONN_TYPE_P2P;
2856 do_listen = 0;
2857 do_connect = 1;
2858 } else {
2859 C->type = CONN_TYPE_MASTER;
2860 if (C->bind_addr[0] == '\0') {
2861 /* Choose defaults because IP was not specified */
2862 switch (C->sock_domain) {
2863 case PF_INET:
2864 snprintf(C->bind_addr, sizeof(C->bind_addr),
2865 "0.0.0.0"); break;
2866 case PF_INET6:
2867 snprintf(C->bind_addr, sizeof(C->bind_addr),
2868 "::"); break;
2869 }
3264 snprintf(Conn_error, sizeof(Conn_error),
3265 "cannot detect Conn type");
3266 return -1;
3267 }
3268
3269 if (C->type == CONN_TYPE_P2P) {
3270 do_listen = 0;
3271 do_connect = 1;
3272 } else {
3273 if (C->bind_addr[0] == '\0') {
3274 /* Choose defaults because IP was not specified */
3275 switch (C->sock_domain) {
3276 case PF_INET:
3277 snprintf(C->bind_addr, sizeof(C->bind_addr),
3278 "0.0.0.0"); break;
3279 case PF_INET6:
3280 snprintf(C->bind_addr, sizeof(C->bind_addr),
3281 "::"); break;
2870 3282 } }
2871 3283 } }
2872 3284 } }
 
... ... int Conn_commit(struct Conn *C)
2932 3344 return -1; return -1;
2933 3345 } }
2934 3346
2935 C->fd = socket(C->sock_domain, C->sock_type, C->sock_protocol);
3347 C->fd = socket(C->sock_domain, C->sock_type | SOCK_NONBLOCK | SOCK_CLOEXEC,
3348 C->sock_protocol);
2936 3349 if (C->fd == -1) { if (C->fd == -1) {
3350 Log(1, "%p: %s: Could not create socket (%s)!\n",
3351 C, __func__, strerror(errno));
2937 3352 snprintf(Conn_error, sizeof(Conn_error), snprintf(Conn_error, sizeof(Conn_error),
2938 3353 "Cannot create socket (%s, %s, %s) [%s]", "Cannot create socket (%s, %s, %s) [%s]",
2939 3354 Conn_domain(C), Conn_type(C), Conn_domain(C), Conn_type(C),
 
... ... int Conn_commit(struct Conn *C)
2941 3356 strerror(errno)); strerror(errno));
2942 3357 return -1; return -1;
2943 3358 } }
3359 Log(1, "%p: %s: Created socket %d!\n", C, __func__, C->fd);
2944 3360
2945 Conn_setnonblock(C->fd);
2946 Conn_nodelay(C);
3361 /*Conn_setnonblock(C->fd); TODO: if kernel < 2.6.27, we still do not
3362 have nonblock support. Check at runtime and make setnonblock as a no-op */
3363 Conn_nodelay(C); /* TODO: really needed? */
2947 3364
2948 3365 if (C->sock_domain == PF_INET6) { if (C->sock_domain == PF_INET6) {
2949 3366 #ifndef IPV6_V6ONLY #ifndef IPV6_V6ONLY
 
... ... int Conn_commit(struct Conn *C)
2972 3389 Conn_masters.tail = C; Conn_masters.tail = C;
2973 3390 } }
2974 3391
2975 C->revents = 0;
2976
2977 3392 if (do_listen == 1) if (do_listen == 1)
2978 3393 listen(C->fd, 4096); listen(C->fd, 4096);
2979 3394
 
... ... int Conn_commit(struct Conn *C)
2989 3404 Conn_set_wp(C, wp); Conn_set_wp(C, wp);
2990 3405 } }
2991 3406
2992 ret = Conn_add_obj(Conn_epoll_fd, C);
2993 if (ret != 0)
2994 goto out_free;
3407 /* Now, we must enqueue listening socket to workers */
3408 for (i = 0; i < C->wp->workers; i++) {
3409 w = &C->wp->ws[i];
3410 clone = Conn_clone(C);
3411 if (!clone)
3412 goto out_free;
3413 clone->worker = w;
3414
3415 Log(1, "%p: %s: Adding C->fd=%d to worker %hu\n",
3416 C, __func__, clone->fd, w->id);
3417 Conn_add_obj(w->epoll_fd, clone, EPOLLIN);
3418 }
2995 3419
2996 3420 if (do_connect == 1) if (do_connect == 1)
2997 3421 Conn_pending++; Conn_pending++;
 
... ... static void Conn_trytoconnect(void)
3195 3619 Conn_setnonblock(C->fd); Conn_setnonblock(C->fd);
3196 3620
3197 3621 /* Need POLLOUT to signal when the connection was done. */ /* Need POLLOUT to signal when the connection was done. */
3198 C->events |= EPOLLIN | EPOLLRDHUP | EPOLLOUT | EPOLLET;
3199 ret = Conn_add_obj(Conn_epoll_fd, C);
3622 /*TODO ret = Conn_add_obj(Conn_epoll_fd, C, EPOLLIN | EPOLLRDHUP);*/
3623 ret = Conn_add_obj(Conn_epoll_fd, C, EPOLLIN);
3200 3624 if (ret != 0) { if (ret != 0) {
3201 3625 C->state = CONN_STATE_ERROR; C->state = CONN_STATE_ERROR;
3202 3626 C->error_state = CONN_ERROR_SOCKET; C->error_state = CONN_ERROR_SOCKET;
 
... ... int Conn_poll(const int timeout)
3296 3720 Conn_trytoconnect(); Conn_trytoconnect();
3297 3721 #endif #endif
3298 3722
3299 ret = Conn_dispatch_events(Conn_epoll_fd, Conn_epoll_events,
3300 CONN_EVENTS_SLOTS, timeout2, Conn_poll_cb);
3723 ret = Conn_dispatch_events(NULL, Conn_epoll_fd, Conn_epoll_events,
3724 CONN_EVENTS_SLOTS, timeout2);
3301 3725 if (unlikely(ret < 0)) if (unlikely(ret < 0))
3302 3726 return -1; return -1;
3303 3727
 
... ... int Conn_poll(const int timeout)
3356 3780 return ret; return ret;
3357 3781 } }
3358 3782
3783 /*
3784 * Dumps some statistics
3785 */
3786 void Conn_stats(void)
3787 {
3788 }
3359 3789
3360 3790 /* ############ Callbacks ########### */ /* ############ Callbacks ########### */
3361 3791 static void Conn_default_cbs_accept(struct Conn *C) static void Conn_default_cbs_accept(struct Conn *C)
 
... ... static void Conn_default_cbs_accept(struct Conn *C)
3366 3796 } }
3367 3797 } }
3368 3798
3799 /*
3800 * Called when EPOLLIN present on a fd.
3801 */
3369 3802 __hot static void Conn_default_cbs_recv(struct Conn *C) __hot static void Conn_default_cbs_recv(struct Conn *C)
3370 3803 { {
3371 3804 ssize_t n; ssize_t n;
3372 3805 unsigned int max, xfer_in_this_call; unsigned int max, xfer_in_this_call;
3373 3806 int r, xerrno; int r, xerrno;
3374 3807 char *dump, call_callback; char *dump, call_callback;
3375 unsigned char add_to_extra_queue;
3376 3808
3377 3809 Log(10, "%p %s id %llu fd=%d head=%u tail=%u size=%u...\n", Log(10, "%p %s id %llu fd=%d head=%u tail=%u size=%u...\n",
3378 3810 C, __func__, C->id, C->fd, C, __func__, C->id, C->fd,
3379 3811 C->ibuf_head, C->ibuf_tail, C->ibuf_size); C->ibuf_head, C->ibuf_tail, C->ibuf_size);
3380 3812
3381 add_to_extra_queue = 0;
3382 3813 call_callback = 0; call_callback = 0;
3383 3814 xfer_in_this_call = 0; xfer_in_this_call = 0;
3384 3815 while (1) { while (1) {
3385 3816 if ((Conn_max_recv > 0) && (xfer_in_this_call >= Conn_max_recv)) { if ((Conn_max_recv > 0) && (xfer_in_this_call >= Conn_max_recv)) {
3386 3817 Log(3, "%p Recv limit reached. Add to extra queue.\n", C); Log(3, "%p Recv limit reached. Add to extra queue.\n", C);
3387 add_to_extra_queue = 1;
3388 3818 break; break;
3389 3819 } }
3390 3820
 
... ... __hot static void Conn_default_cbs_recv(struct Conn *C)
3392 3822 r = Conn_try_expand_buf(C, 1, Conn_default_ibuf); r = Conn_try_expand_buf(C, 1, Conn_default_ibuf);
3393 3823 if (unlikely(r != 0)) { if (unlikely(r != 0)) {
3394 3824 C->error_state = CONN_ERROR_MEM; C->error_state = CONN_ERROR_MEM;
3395 /* TODO: Just suspend connection for 1 second instead;
3396 add to extra queue? */
3397 3825 return; return;
3398 3826 } }
3399 3827 } }
 
... ... __hot static void Conn_default_cbs_recv(struct Conn *C)
3404 3832
3405 3833 while (1) { while (1) {
3406 3834 n = recv(C->fd, C->ibuf + C->ibuf_tail, max, 0); n = recv(C->fd, C->ibuf + C->ibuf_tail, max, 0);
3407 xerrno = errno;
3408 3835 if (unlikely((n == -1) && (errno == EINTR))) if (unlikely((n == -1) && (errno == EINTR)))
3409 3836 continue; continue;
3837 xerrno = errno;
3410 3838 break; break;
3411 };
3839 }
3412 3840
3413 3841 if ((n == -1) && (errno == EAGAIN)) { if ((n == -1) && (errno == EAGAIN)) {
3414 3842 Log(10, "%p EAGAIN received. Wait next round.\n", C); Log(10, "%p EAGAIN received. Wait next round.\n", C);
3415 C->revents &= ~EPOLLIN;
3416 3843 break; break;
3417 3844 } }
3418 3845
 
... ... __hot static void Conn_default_cbs_recv(struct Conn *C)
3425 3852 } }
3426 3853
3427 3854 if (unlikely(n == 0)) { if (unlikely(n == 0)) {
3428 Log(10, "%p Remote closed sending side. Set CLOSE_AFTER_SEND\n", C);
3429 C->close_after_send = 1;
3855 Log(5, "We received 0 bytes; oqlen=%lu\n", Conn_oqlen(C));
3856 if (Conn_oqlen(C) == 0) {
3857 /* TODO: Why we do not shutdown? Maybe we just
3858 * sent data. */
3859 /* Nothing to send, we can close the connection */
3860 Log(10, "%p Remote hangup and nothing in out"
3861 " buffer. Close.\n", C);
3862 C->state = CONN_STATE_ERROR;
3863 C->error_state = CONN_ERROR_HANGUP;
3864 Conn_free_intern(C);
3865 } else if (C->close_after_send == 0) {
3866 Log(10, "%p Remote closed sending side."
3867 " Set CLOSE_AFTER_SEND and ignore POLLIN\n", C);
3868 C->close_after_send = 1;
3869 Conn_change_obj(C->worker->epoll_fd, C, EPOLLOUT);
3870 }
3430 3871 break; break;
3431 3872 } }
3432 3873
 
... ... __hot static void Conn_default_cbs_recv(struct Conn *C)
3447 3888 if (n < max) { if (n < max) {
3448 3889 Log(3, "%p Readed less(%d < %d) than what we requested." Log(3, "%p Readed less(%d < %d) than what we requested."
3449 3890 " Drop EPOLLIN and wait the signal.\n", C, n, max); " Drop EPOLLIN and wait the signal.\n", C, n, max);
3450 C->revents &= ~EPOLLIN;
3451 3891 break; break;
3452 3892 } }
3453 3893 } }
3454 3894
3455 if (likely((call_callback == 1) && (C->cbs.data)))
3456 C->cbs.data(C);
3457
3458 if (unlikely(add_to_extra_queue)) {
3459 Log(1, "%p Add to extra processing queue. TODO\n", C);
3895 if (likely(call_callback == 1)) {
3896 Log(10, "%s\n", C->ws ? "We have ws attached" : "no ws attached");
3897 if (C->ws)
3898 Conn_ws_dispatch(C);
3899 else if (C->cbs.data)
3900 C->cbs.data(C);
3460 3901 } }
3461 3902 } }
3462 3903
3463 3904 __hot static void Conn_default_cbs_send(struct Conn *C) __hot static void Conn_default_cbs_send(struct Conn *C)
3464 3905 { {
3465 3906 ssize_t n; ssize_t n;
3466 unsigned int max, xfer_in_this_call;
3907 unsigned int max;
3467 3908 int count, xerrno, r; int count, xerrno, r;
3468 3909 char *buf; char *buf;
3469 3910 char *dump; char *dump;
3470 unsigned char add_to_extra_queue;
3471 3911
3472 Log(10, "%p %p %s id %llu fd=%d head=%u tail=%u size=%u\n",
3473 C, C->next, __func__, C->id, C->fd, C->obuf_head, C->obuf_tail,
3474 C->obuf_size);
3912 Log(10, "%s: %p next=%p %s id %llu fd=%d head=%u tail=%u size=%u\n",
3913 __func__, C, C->next, __func__, C->id, C->fd, C->obuf_head,
3914 C->obuf_tail, C->obuf_size);
3475 3915
3476 add_to_extra_queue = 0;
3477 xfer_in_this_call = 0;
3478 3916 while (1) { while (1) {
3479 3917 buf = C->obuf + C->obuf_head; buf = C->obuf + C->obuf_head;
3480 3918 count = Conn_oqlen(C); count = Conn_oqlen(C);
3481 3919 if (unlikely(count == 0)) { if (unlikely(count == 0)) {
3482 Log(5, "%p Empty output buffer.\n", C);
3483 C->revents &= ~EPOLLOUT;
3920 Log(5, "%s: %p Empty output buffer.\n", __func__, C);
3484 3921 if (unlikely(C->close_after_send == 1)) { if (unlikely(C->close_after_send == 1)) {
3485 Log(9, "%p CLOSE_AFTER_SEND is set, close connection\n", C);
3922 Log(9, "%s: %p CLOSE_AFTER_SEND is set"
3923 ", close connection\n", __func__, C);
3486 3924 /* TODO: this is not really an error; switch to other code */ /* TODO: this is not really an error; switch to other code */
3487 3925 C->state = CONN_STATE_ERROR; C->state = CONN_STATE_ERROR;
3488 3926 C->error_state = CONN_ERROR_USERREQ; C->error_state = CONN_ERROR_USERREQ;
3489 3927 Conn_free_intern(C); Conn_free_intern(C);
3928 return;
3490 3929 } }
3491 3930
3492 3931 if (C->shutdown_after_send == 1) { if (C->shutdown_after_send == 1) {
3493 Log(9, "%p SHUTDOWN_AFTER_SEND is set, shutdown write\n", C);
3932 Log(9, "%s: %p SHUTDOWN_AFTER_SEND is set"
3933 ", shutdown write\n", __func__, C);
3494 3934 C->state = CONN_STATE_ERROR; C->state = CONN_STATE_ERROR;
3495 3935 r = shutdown(C->fd, SHUT_WR); r = shutdown(C->fd, SHUT_WR);
3496 if (unlikely(r != 0)) {
3497 Log(1, "%p: shutdown returned error [%s]!\n",
3498 C, strerror(errno));
3499 /* TODO: We must set a proper C->state */
3500 C->error_state = CONN_ERROR_HANGUP;
3501 Conn_free_intern(C);
3502 } else {
3936 if (likely(r == 0)) {
3503 3937 /* TODO: this is not really an error; switch to other code */ /* TODO: this is not really an error; switch to other code */
3504 3938 C->error_state = CONN_ERROR_USERREQ; C->error_state = CONN_ERROR_USERREQ;
3939 return;
3505 3940 } }
3506 }
3507 3941
3508 return;
3509 }
3942 Log(1, "%s: %p: shutdown returned error [%s]!\n",
3943 __func__, C, strerror(errno));
3944 /* TODO: We must set a proper C->state */
3945 C->error_state = CONN_ERROR_HANGUP;
3946 Conn_free_intern(C);
3947 return;
3948 }
3510 3949
3511 if ((Conn_max_send > 0) && (xfer_in_this_call >= Conn_max_send)) {
3512 Log(1, "%p Send limit reached. Add to extra queue.", C);
3513 add_to_extra_queue = 1;
3514 break;
3950 return; /* TODO: not clear if we could be here. */
3515 3951 } }
3516 3952
3517 3953 max = count; max = count;
 
... ... __hot static void Conn_default_cbs_send(struct Conn *C)
3523 3959 if (max > C->band_tokens) if (max > C->band_tokens)
3524 3960 max = C->band_tokens; max = C->band_tokens;
3525 3961 if (max == 0) { if (max == 0) {
3526 Log(debug_band, "%p BAND: Suspend 100ms the C (no tokens)!\n", C);
3962 Log(debug_band, "%s: %p BAND: Suspend 100ms"
3963 " the C (no tokens)!\n", __func__, C);
3527 3964 break; break;
3528 3965 } }
3529 3966 } }
3530 3967
3531 Log(10, "%p send(fd=%d, buf (head=%u, tail=%u), max=%d (count=%d), 0)...\n",
3532 C, C->fd, C->obuf_head,
3968 Log(10, "%s: %p send(fd=%d, buf (head=%u, tail=%u)"
3969 ", max=%d (count=%d), 0)...\n",
3970 __func__, C, C->fd, C->obuf_head,
3533 3971 C->obuf_tail, max, count); C->obuf_tail, max, count);
3534 3972 while (1) { while (1) {
3535 3973 n = send(C->fd, buf, max, 0); n = send(C->fd, buf, max, 0);
 
... ... __hot static void Conn_default_cbs_send(struct Conn *C)
3540 3978 } }
3541 3979
3542 3980 if ((n == -1) && (errno == EAGAIN)) { if ((n == -1) && (errno == EAGAIN)) {
3543 Log(10, "%p EAGAIN received. Wait next round.\n", C);
3544 C->revents &= ~EPOLLOUT;
3981 Log(10, "%s:%p EAGAIN received. Wait next round.\n",
3982 __func__, C);
3545 3983 break; break;
3546 3984 } }
3547 3985
3548 Log(10, "%p Sent %d bytes [head=%d tail=%d]\n",
3549 C, n, C->obuf_head, C->obuf_tail);
3986 Log(10, "%s: %p Sent %d bytes [head=%d tail=%d]\n",
3987 __func__, C, n, C->obuf_head, C->obuf_tail);
3550 3988
3551 3989 if (unlikely(n <= 0)) { if (unlikely(n <= 0)) {
3552 Log(0, "%p Error in sending [%s]\n",
3553 C, strerror(errno));
3990 Log(0, "%s: %p Error in sending [%s]\n",
3991 __func__, C, strerror(errno));
3554 3992 C->error_state = CONN_ERROR_SEND; C->error_state = CONN_ERROR_SEND;
3555 3993 C->xerrno = xerrno; C->xerrno = xerrno;
3556 3994 Conn_free_intern(C); Conn_free_intern(C);
 
... ... __hot static void Conn_default_cbs_send(struct Conn *C)
3559 3997
3560 3998 if (unlikely(Conn_debug_level >= 10)) { if (unlikely(Conn_debug_level >= 10)) {
3561 3999 dump = Conn_dump(buf, n); dump = Conn_dump(buf, n);
3562 Log(0, "%p %s\n", C, dump);
4000 Log(0, "%s: %p %s\n", __func__, C, dump);
3563 4001 free(dump); free(dump);
3564 4002 } }
3565 4003
 
... ... __hot static void Conn_default_cbs_send(struct Conn *C)
3571 4009 C->obuf_tail = 0; C->obuf_tail = 0;
3572 4010 } }
3573 4011 C->bo += n; C->bo += n;
3574 xfer_in_this_call += n;
3575 4012
3576 4013 if (C->band_width > 0) { if (C->band_width > 0) {
3577 4014 /* What if band_tokens < n?! */ /* What if band_tokens < n?! */
3578 4015 C->band_tokens -= n; C->band_tokens -= n;
3579 Log(debug_band, "%p BAND: Remove %d tokens -> %u...\n",
3580 C, n, C->band_tokens);
4016 Log(debug_band, "%s: %p BAND: Remove %d tokens"
4017 " -> %u...\n",
4018 __func__, C, n, C->band_tokens);
3581 4019 } }
3582 4020
3583 4021 if (n < max) { if (n < max) {
3584 Log(1, "%p Sent less than what we requested."
4022 Log(1, "%s: %p Sent less than what we requested."
3585 4023 " Drop EPOLLOUT and wait for signal." " Drop EPOLLOUT and wait for signal."
3586 " Break loop.\n", C);
3587 C->revents &= ~EPOLLOUT;
3588 break;
3589 }
3590
3591 /*
3592 if (n == max) {
3593 Log(0, "%p n == max (%d == %d). Break the loop.\n",
3594 C, n, max);
4024 " Break loop.\n", __func__, C);
3595 4025 break; break;
3596 4026 } }
3597 */
3598 }
3599
3600 if (add_to_extra_queue) {
3601 Log(1, "%p Add to extra processing queue. TODO\n", C);
3602 4027 } }
3603 4028 } }
3604 4029
 
... ... static void Conn_default_cbs_accept_error(struct Conn *C)
3634 4059 Log(10, "%p Error accepting conn (id %llu) (%s)\n", Log(10, "%p Error accepting conn (id %llu) (%s)\n",
3635 4060 C, Conn_getid(C), Conn_strerror()); C, Conn_getid(C), Conn_strerror());
3636 4061 } }
3637
File Conn.h changed (mode: 100644) (index 90f29fb..594181f)
... ... extern int Conn_enqueue_wait(struct Conn *C, const void *buf,
111 111 const size_t count); const size_t count);
112 112 extern int Conn_enqueue(struct Conn *C, const void *buf, extern int Conn_enqueue(struct Conn *C, const void *buf,
113 113 const size_t count); const size_t count);
114 void Conn_kick(struct Conn *C);
114 115 extern struct Conn *Conn_alloc(void); extern struct Conn *Conn_alloc(void);
115 116 extern int Conn_set_socket_domain(struct Conn *C, const int domain); extern int Conn_set_socket_domain(struct Conn *C, const int domain);
116 117 extern int Conn_set_socket_type(struct Conn *C, const int type); extern int Conn_set_socket_type(struct Conn *C, const int type);
 
... ... extern int Conn_band(struct Conn *C, const unsigned int width,
130 131 const unsigned int factor); const unsigned int factor);
131 132 extern int Conn_poll(const int timeout); extern int Conn_poll(const int timeout);
132 133
134 /* web server */
135 int Conn_ws_create(struct Conn *C);
136 int Conn_ws_script(struct Conn *C, const char *url,
137 void(*cb)(struct Conn *C));
138 int Conn_ws_path(struct Conn *C, const char *url,
139 const char *path);
140
141
133 142 #endif #endif
File Conn.spec.in changed (mode: 100644) (index 3e3dff5..a7255c2)
... ... rm -rf ${RPM_BUILD_ROOT}
35 35 %files %files
36 36 %attr (-,root,root) %attr (-,root,root)
37 37 %{_includedir}/* %{_includedir}/*
38 %{_libdir}/libConn.so
38 %{_libdir}/libConn.so.1
39 39 %{_libdir}/libConn.so.@VER@ %{_libdir}/libConn.so.@VER@
40 40 %doc README Changelog examples INSTALL %doc README Changelog examples INSTALL
41 41
File Conn_config.h.in changed (mode: 100644) (index 6a704fe..662c3f3)
14 14 /* /*
15 15 * How many events slots to expect from kernel in one epoll_wait call * How many events slots to expect from kernel in one epoll_wait call
16 16 */ */
17 #define CONN_EVENTS_SLOTS 128
18
19 /*
20 * How many free structures to give back in one locked section
21 */
22 #define CONN_GIVE_BACK_LIMIT 128
17 #define CONN_EVENTS_SLOTS 256
23 18
24 19 /* /*
25 20 * How many Conn structures to allocate once * How many Conn structures to allocate once
26 21 */ */
27 #define CONN_BULK_ALLOC 128
22 #define CONN_BULK_ALLOC 256
28 23
29 24 #if GCC_VERSION >= 40300 #if GCC_VERSION >= 40300
30 25 #define __hot __attribute__((__hot__)) #define __hot __attribute__((__hot__))
File Makefile.in changed (mode: 100644) (index 9da6a61..00b7e4f)
1 1 export CC := gcc export CC := gcc
2 2 export INCS += -I. export INCS += -I.
3 3 export LIBS += -lpthread export LIBS += -lpthread
4 export CFLAGS += -g -Wall -Wextra -pipe -fpic -O2
5 4 export OBJS += Conn.o export OBJS += Conn.o
6 5
6 export CFLAGS += -Wall -Wextra -pipe -O3 -g -ggdb
7 export CFLAGS += -Wconversion -Wcast-align -Wformat=2 -Wformat-security -fno-common
8 export CFLAGS += -Wmissing-prototypes -Wmissing-declarations -Wstrict-prototypes
9 export CFLAGS += -Wstrict-overflow -Wtrampolines -flto
10 export CFLAGS += -fstack-protector-all
11 export CFLAGS += -D _FORTIFY_SOURCES=2
12 export CFLAGS += -fstack-reuse=all -fbounds-check
13 export CFLAGS += -Wl,-z,noexecstack -Wl,-z,now -Wl,-z,relro -Wl,-O1
14
15
7 16 .PHONY: all .PHONY: all
8 17 all: libConn.so.@VER@ all: libConn.so.@VER@
9 18
10 19
11 20 Conn.o: Conn.c Conn.h Conn_config.h Conn.o: Conn.c Conn.h Conn_config.h
12 $(CC) $(CFLAGS) $(INCS) -c Conn.c
21 $(CC) $(CFLAGS) -fpic $(INCS) -c Conn.c
13 22
14 23 %.o: %.c %.o: %.c
15 24 gcc $(CFLAGS) $(INCS) $< -c gcc $(CFLAGS) $(INCS) $< -c
16 25
17 26 libConn.so.@VER@: $(OBJS) libConn.so.@VER@: $(OBJS)
18 $(CC) $(CFLAGS) $(INCS) -shared -Wl,-soname,libConn.so \
27 $(CC) $(CFLAGS) -fpic $(INCS) -shared -Wl,-soname,libConn.so.1 \
19 28 -o $@ $(OBJS) -lc $(LIBS) -o $@ $(OBJS) -lc $(LIBS)
20 ln -sf $@ libConn.so
29 ln -sf $@ libConn.so.1
21 30
22 31 libConn.a: $(OBJS) libConn.a: $(OBJS)
23 32 ar rcs libConn.a $(OBJS) ar rcs libConn.a $(OBJS)
 
... ... tests:
29 38
30 39 .PHONY: examples .PHONY: examples
31 40 examples: examples:
32 $(MAKE) -C examples
41 $(MAKE) -C examples build
33 42
34 43
35 44 .PHONY: clean .PHONY: clean
File TODO changed (mode: 100644) (index e3ddaa3..2ba4b60)
1 [ ] http://thread.gmane.org/gmane.linux.network/337836 - SO_INCOMING_CPU
2
3 [ ] Scenario: raspundem la un request, vedem ca nu mai avem nimic de trimis,
4 datele sint inca in buffer, apelam close => datele se pierd!
5 Trebuie sa facem shutdown!
6 [ ] De facut o schema cu starile prin care trece o conexiune, suspectez
7 ca atunci cind obuf e 0, nu fac shutdown in loc de close.
8 [ ] La http/1.1, default e sa nu inchida conexiunea.
9
10 == Devel point ==
11 [ ] Now I am working on simple web requests.
12 Static (/) and dynamic (/cgi?a=1).
13 [ ] We must send "HTTP/x.x code message" respecting incoming request.
14 Our API must deal with it.
15
1 16 == Some history == == Some history ==
2 2013-11-12: on r1: ~3850 req/s (-n50000 -c2 + taskset + nice) branch mispredict: 9% K:3.11.4-201
17 2014-04-02: Se pare ca bat gwan-ul. Cam 4300 vs 3700. Dar eu nu fac chiar tot
18 ce face el. Trec la un API pentru a crea un server web.
19 Sa vedem cam cum ar trebui sa arate.
20 C = Conn_alloc();
21 wp = Conn_wpool_create();
22 Conn_set_wp(C, wp);
23 Conn_commit(C);
24 while (1) {
25 Conn_poll(-1);
26 }
27
28 ws = Conn_ws_create(C);
29 Conn_ws_path(C, "/static", "/home/x/public_html");
30 Conn_ws_script(C, "/cgi-bin/script1", function_script1);
31
32 Sounds good.
33
34 Another thing: libConn - 40k, wpool2 - 10k!
35
36 2014-03-25:
37 Se pare ca syscall-urile mele dureaza mai mult decit ale lui.
38 Chiar nu am nici o explicatie. Cum naiba de se intimpla asta?
39 Oare se contorizeaza si cod-ul dintre syscall-uri?
40 Probabil ca se contorizeaza si asteptarea! Si atunci e corect.
41 Dar la shutdown ce explicatie am?!
42 Se pare ca dupa un 'ab', wpool2 nu se mai opreste din mincat CPU!
43
44 2014-03-25:
45 Concluzie: Eu petrec 87% din timp in epoll_wait! gwan doar 6!!!
46 Se pare ca vine EPOLLIN si EPOLLRDHUP si nu fac nimic!
47 Dezactivez EPOLLRDHUP! Wow! 5600 req/s!
48 Dar, se pare ca tot am 95% in epoll_wait. 112724 apeluri fata de 10k!
49 Tot multe! Se pare ca ma blochez undeva si nu mai progresez de acolo!
50
51 strace -c (-n20000 -c10):
52 gwan:
53 % time seconds usecs/call calls errors syscall
54 ------ ----------- ----------- --------- --------- ----------------
55 19.84 0.031245 1 60000 setsockopt
56 19.52 0.030749 2 20000 writev
57 18.32 0.028859 1 40000 20000 shutdown
58 16.58 0.026120 0 60000 20000 epoll_ctl
59 8.68 0.013667 1 20002 close
60 6.23 0.009811 0 20183 183 accept4
61 5.94 0.009357 1 10440 epoll_wait
62 4.88 0.007684 0 20042 20 read
63 0.00 0.000000 0 2 open
64 0.00 0.000000 0 12 stat
65 0.00 0.000000 0 2 fstat
66 0.00 0.000000 0 2 mmap
67 0.00 0.000000 0 1 mprotect
68 0.00 0.000000 0 2 munmap
69 ------ ----------- ----------- --------- --------- ----------------
70 100.00 0.157492 250688 40203 total
71
72 % time seconds usecs/call calls errors syscall
73 ------ ----------- ----------- --------- --------- ----------------
74 87.27 3.011111 46 65802 epoll_wait
75 3.17 0.109265 5 20000 sendto
76 2.37 0.081884 4 20146 145 accept4
77 2.28 0.078617 1 63925 recvfrom
78 1.90 0.065479 3 20005 epoll_ctl
79 1.75 0.060324 3 20000 shutdown
80 1.22 0.041954 2 20008 close
81 0.03 0.001000 500 2 socketpair
82 0.02 0.000535 20 27 19 open
83 0.00 0.000077 19 4 munmap
84 0.00 0.000038 5 7 read
85 0.00 0.000000 0 1 write
86 0.00 0.000000 0 5 fstat
87 0.00 0.000000 0 19 mmap
88 0.00 0.000000 0 12 mprotect
89 0.00 0.000000 0 4 brk
90 0.00 0.000000 0 2 rt_sigaction
91 0.00 0.000000 0 1 rt_sigprocmask
92 0.00 0.000000 0 1 1 access
93 0.00 0.000000 0 1 socket
94 0.00 0.000000 0 1 bind
95 0.00 0.000000 0 1 listen
96 0.00 0.000000 0 2 setsockopt
97 0.00 0.000000 0 2 clone
98 0.00 0.000000 0 1 execve
99 0.00 0.000000 0 1 getcwd
100 0.00 0.000000 0 1 getrlimit
101 0.00 0.000000 0 1 arch_prctl
102 0.00 0.000000 0 3 1 futex
103 0.00 0.000000 0 2 sched_setaffinity
104 0.00 0.000000 0 1 sched_getaffinity
105 0.00 0.000000 0 1 epoll_create
106 0.00 0.000000 0 1 set_tid_address
107 0.00 0.000000 0 3 set_robust_list
108 0.00 0.000000 0 3 epoll_create1
109 ------ ----------- ----------- --------- --------- ----------------
110 100.00 3.450284 229996 166 total
111
112
113 2014-03-24
114 Am adaugat cancel_disable.
115 ab -n20000 -c10 http://localhost:60000/100.html: 4763 req/sec sub perf
116 FARA PERF: 4374 req/sec WTF?!
117 perf report:
118 14.45% wpool2 [kernel.kallsyms] [k] ep_poll
119 13.84% wpool2 [kernel.kallsyms] [k] set_normalized_timespec
120 13.63% wpool2 [vdso] [.] 0x0000000000000cb0
121 6.24% wpool2 [kernel.kallsyms] [k] read_hpet
122 5.71% wpool2 [kernel.kallsyms] [k] select_estimate_accuracy
123 5.41% wpool2 libConn.so.1.0.33 [.] Conn_wpool_worker_func
124 Probabil ca apelez gettimeofday de prea multe ori. Da, se pare ca
125 0x0000000000000cb0 este gettimeofday.
126 Daca scot sched_yield, cu perf record am 4778 req/s
127 [pid 1787] SYS_mmap(0, 0x8000000, 0, 0x4022) = 0x7f5bb264f000
128 [pid 1787] SYS_munmap(0x7f5bb264f000, 26939392) = 0
129 [pid 1787] SYS_munmap(0x7f5bb8000000, 40169472) = 0
130 Se pare ca se face un mmap si apoi imediat munmap. WTF?!
131 Ulterior nu mai face.
132
133 Eu gwan
134 epoll_wait epoll_wait
135 accept4 accept4
136 mmap -
137 munmap -
138 munmap -
139 mprotect -
140 - setsockopt(NODELAY)
141 epoll_ctl epoll_ctl
142 epoll_wait epoll_wait
143 recvfrom read
144 - setsockopt(NODELAY)
145 - open
146 - fstat
147 - open,stat,fstat,mmap,read,close,munmap
148 sendto writev
149 shutdown shutdown
150 - setsockopt(NODELAY)
151 epoll_wait epoll_wait
152 - epoll_ctl(DEL)
153 - shutdown!
154 - epoll_ctl(DEL)!
155 close close
156 Clar pot mai bine de atit!
157
158 2014-03-11
159 Use pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
160 Maybe this way cancelation will not appear in perf reports.
161 2013-12-12
162 Concluzii:
163 - Eu stau mult mai mult timp in epoll_wait. Very strange. E vorba de 22 de secunde in plus!
164 - Eu chem accept4 cu 40000 mai mult! Fuck!
165 - Cum naiba eu chem de 50.000 ori shutdown, fara erori, iar el cheama de 100.000 si dureaza mai putin?!
166 - E incredibil cum reuseste. Doar daca syscall-urile mele sint intrerupte de prea multe ori.
167 - Macar eu fac de 3 ori mai putine setsockopt.
168 Next steps:
169 Nu mai chem accept4 inca o data, pentru ca veni cu notificarea.
170 Din ce in ce mai putin cred in EPOLLET. What a fuck?!
171 Decit sa fac un apel la accept, pe fiecare thread, mai bine apelez epoll_wait.
172
173 wpool2:
174 % time seconds usecs/call calls errors syscall
175 ------ ----------- ----------- --------- --------- ----------------
176 97.87 35.710386 187 191175 epoll_wait
177 1.26 0.459349 3 141576 91576 accept4
178 0.52 0.189052 4 50000 sendto
179 0.22 0.080552 2 50000 shutdown
180 0.06 0.023073 0 50000 close
181 0.03 0.010337 0 50306 recvfrom
182 0.03 0.009216 0 50000 epoll_ctl
183 0.02 0.007091 0 50000 setsockopt
184 ------ ----------- ----------- --------- --------- ----------------
185 100.00 36.489056 633057 91576 total
186
187 gwan:
188 % time seconds usecs/call calls errors syscall
189 ------ ----------- ----------- --------- --------- ----------------
190 97.60 13.464343 89 150462 epoll_wait
191 0.91 0.124850 2 50000 writev
192 0.49 0.067336 1 100000 50000 shutdown
193 0.44 0.060369 1 101351 51351 accept4
194 0.25 0.034678 0 150000 setsockopt
195 0.13 0.017909 0 150000 50000 epoll_ctl
196 0.11 0.015268 0 50123 4 read
197 0.07 0.009198 0 50002 close
198 ------ ----------- ----------- --------- --------- ----------------
199 100.00 13.795050 802006 151355 total
200
201
202 2013-12-10: after -O3
203 me
204 prof: -c2+5000 8%
205 -c2+50000 3719
206
207 2013-11-24: on r1 (after putting free structures in front of free list)
208 me
209 -c10+50000 5034
210 -c2+50000 3777 4700
211 -c1+50000 2973 3900
212
213 2013-11-21: on r1
214 me gwan
215 -c1+50000 1703 3206
216 -c2+50000 3619 4900
217 -c10+50000 3647 5028
218
219 2013-11-20: on r1 (after doing allocations per thread):
220 me
221 branch 1+5000 7
222 -c1+50000
223 -c2+50000 5093!broken?
224
225 2013-11-17: on r1 (after doing accept in all workers):
226 me me+Log
227 branch 1+5000 6.4!
228 -c1+50000 2180
229 -c2+50000 3400
230 2013-11-16: on r1:
231 me me+Log gwan
232 branch 1+5000 9.5%
233 -c1+50000: 3882 2094!
234 -c2+50000: 4100 4927
235 2013-11-13: on r1: ~4270 req/s (-n50000 -c2 + taskset + nice) branch mispredict: 8% K:3.11.4-201 so(Conn)=448 gwan:-c1:~2000 me-c1:3875
236 2013-11-12: on r1: ~3850 req/s (-n50000 -c2 + taskset + nice) branch mispredict: 9% K:3.11.4-201 so(Conn)=480
3 237
4 238 == SHOWSTOPPERS == == SHOWSTOPPERS ==
239 [ ] Call Conn_ws_free when freeing a Conn.
240 [ ] Make sure we compile with -O3
241 [ ] Should we call again accept or go to poll mode? I think we should go to poll.
242 [ ] Compile with -s to obtain profiling on assembly code.
243 [ ] We may get rid of NODELAY because we write and do shutdown. I hope
244 this is triggering a flush. To test.
245 [ ] Prima data, ar trebui sa ignor O pentru ca nu am cum sa am ceva in buffer.
246 [ ] Imi trebui un mecanism, preferabil fara locking, ca sa trimit statistici catre master.
247 Eventual doar la cerere, ca sa evit trafic inutil.
248 Dar, conexiunea pentru statistici, o sa vina pe un worker.
249 Probabil ca pot sa fac o semnalizare prin pipe. Copiez intr-un buffer
250 statisticile curente, apoi trimit pointer-ul prin pipe. Aste pentru update.
251 In momentul in care vine o cerere de statistici, trebuie sa le cer de la master
252 si apoi sa le servesc.
253 [ ] Stop using callbacks for send/receive to speed up operations.
254 [ ] We should do not call initial out hook. We can just try to send at first
255 kick and react to EAGAIN. Very probably we can send.
256 [ ] Probabil ca o sa avem structuri diferite pentru ce seteaza clientul
257 (Conn_alloc/commit) si alta pentru bookkeeping-ul intern.
258 [ ] Move main pollfd to all threads. Tis way they will be "equal" and every
259 core will be at full speed without migrations.
5 260 [ ] Check with gdb why we get a segmentation fault in line 2267. [ ] Check with gdb why we get a segmentation fault in line 2267.
6 [ ]
261 [ ] Limit the number of acepts to not starve read/write.
7 262
8 263 == HIGH PRIORITY == == HIGH PRIORITY ==
264 [ ] Should we do Conn_now per thread? It is updated from all worker threads!
265 [ ] Switch to libConn.so.1 at compile time to be able tu bump the version sometime.
266 [ ] Verify likely/unlikely. I suspect are not working correctly.
267 [ ] http://lwn.net/Articles/257209/
268 [ ] http://highscalability.com/blog/2013/5/13/the-secret-to-10-million-concurrent-connections-the-kernel-i.html
269 [ ] http://fasterdata.es.net/host-tuning/linux/
270 [ ] Investigate MSG_MORE when sending.
9 271 [ ] When init Conn, preallocate a 1 worker wp and set it and when user requests [ ] When init Conn, preallocate a 1 worker wp and set it and when user requests
10 272 another wp, just put(wp) and set the new one? Or at commit time? another wp, just put(wp) and set the new one? Or at commit time?
11 273 Use cases: want to alloc 1 core for a listen port and for other Use cases: want to alloc 1 core for a listen port and for other
12 274 many cores. many cores.
13 275 [ ] Split Conn_poll_cb into MASTER/NON_MASTER and do not make it callback [ ] Split Conn_poll_cb into MASTER/NON_MASTER and do not make it callback
14 276 but inline. but inline.
15 [ ] ->next pointer can be remove from struct Conn.
277 [ ] ->next pointer can be removed from struct Conn.
16 278 This way I can save a lot of space. This way I can save a lot of space.
17 279 [ ] Do not make the fd -1, is pointless. [ ] Do not make the fd -1, is pointless.
18 [ ] At start, dump also the size of Conn structure.
19 [ ] Cache getaddrinfo responses
20 280 [ ] Replace Conn_X with Conn_get_socket_X! [ ] Replace Conn_X with Conn_get_socket_X!
21 [ ] Use shutdown(2) before closing connection
281 [ ] Use shutdown(2) before closing connection. Done, but see the link.
22 282 http://www.developerweb.net/forum/archive/index.php/t-2940.html. http://www.developerweb.net/forum/archive/index.php/t-2940.html.
23 283 [ ] Switch all pointers to callbacks to a single callback with paramenters + [ ] Switch all pointers to callbacks to a single callback with paramenters +
24 284 a flag that will say for what type of callbacks to call the callback. a flag that will say for what type of callbacks to call the callback.
 
29 289 [ ] Alloc private area just after Conn structure. Add a function to set private [ ] Alloc private area just after Conn structure. Add a function to set private
30 290 size. size.
31 291 [ ] Set on master socket the needed in/out buffer sizes and inherit to accepted [ ] Set on master socket the needed in/out buffer sizes and inherit to accepted
32 ones. This is becuase we may need different buffers for different masters.
292 ones. This is because we may need different buffers for different masters.
33 293 [ ] Daca am luat HUP, nu mai trebuie sa permit parsarea in continuare! [ ] Daca am luat HUP, nu mai trebuie sa permit parsarea in continuare!
34 294 [ ] Ar trebui sa-l scoatem din lista de active C-ul caruia ii facem free. [ ] Ar trebui sa-l scoatem din lista de active C-ul caruia ii facem free.
35 [ ]
36 1383337000.648931 main Affinity for worker 1: 1
37 1383337003.109904 main pool_grow: Try to grow cells from 1024 to 2048.
38 1383337003.621690 main pool_grow: Try to grow cells from 2048 to 3072.
39 1383337004.153547 main pool_grow: Try to grow cells from 3072 to 4096.
40 1383337004.667851 main pool_grow: Try to grow cells from 4096 to 5120.
41 295 [ ] Align Conn structures to 8 bytes in allocations blocks. [ ] Align Conn structures to 8 bytes in allocations blocks.
42 296 [ ] Investigate the idea to put free buffers in front of the queue because [ ] Investigate the idea to put free buffers in front of the queue because
43 297 they are hot. they are hot.
44 [ ] Investigate the idea to keep the free list per thread to avoid cache trashing.
45 [ ] Seems that I do not call close on a C that already received HUP.
46 Seems I must not call shutdown(SHUT_WR).
47 [ ] Trebuie sa fac tcpdump -w.
48 298 [ ] [ ]
49 299
50 300
51 301 == LOW PRIORITY == == LOW PRIORITY ==
302 [ ] Use enums for enum types.
303 [ ] Cache getaddrinfo responses
52 304 [ ] Investigate moving TCP stack in userspace. [ ] Investigate moving TCP stack in userspace.
53 305 [ ] Conn_join(C1, C2) (Bridge 2 connections together for proxy stuff.) [ ] Conn_join(C1, C2) (Bridge 2 connections together for proxy stuff.)
54 306 [ ] See http://highscalability.com/blog/2012/9/10/russ-10-ingredient-recipe-for-making-1-million-tps-on-5k-har.html [ ] See http://highscalability.com/blog/2012/9/10/russ-10-ingredient-recipe-for-making-1-million-tps-on-5k-har.html
 
... ... Pentru a reduce numarul de conexiuni in TIME-WAIT:
117 369 === When we switch to Conn version 2 library === === When we switch to Conn version 2 library ===
118 370 [ ] Conn_socket will call Conn_socket_proto [ ] Conn_socket will call Conn_socket_proto
119 371 [ ] use enums! [ ] use enums!
120 [ ] All internal variables of Conn structure must not be exposed directly.
121 372 [ ] http://urbanairship.com/blog/2010/09/29/linux-kernel-tuning-for-c500k/ [ ] http://urbanairship.com/blog/2010/09/29/linux-kernel-tuning-for-c500k/
122 373 [ ] [ ]
File configure changed (mode: 100755) (index d33bb6c..92c4bc4)
1 1 #!/bin/bash #!/bin/bash
2 2
3 ./duilder
3 ./duilder "${@}"
File examples/Makefile changed (mode: 100644) (index d7b0c60..e31d8e3)
1 1 TARGETS := wpool1 wpool2 \ TARGETS := wpool1 wpool2 \
2 2 s c raw udp_s timeout trigger reconnect ntime blackhole_s blackhole_c \ s c raw udp_s timeout trigger reconnect ntime blackhole_s blackhole_c \
3 3 xbind xbind
4 TARGETS := wpool2 ws1
4 5
5 TARGETS := wpool2
6 .PHONY: all
7 all:
8 make -C .. examples
6 9
7 all: $(TARGETS)
10 build: $(TARGETS)
8 11
9 12 INCS += -I.. INCS += -I..
10 LIBS += -L.. -lConn
13 LIBS += -L.. -l:../libConn.so.1
11 14 OBJS := OBJS :=
12 DEPS := ../libConn.so $(OBJS)
15 DEPS := ../libConn.so.1 $(OBJS)
13 16
14 17 s: s.c $(DEPS) s: s.c $(DEPS)
15 18 gcc $(CFLAGS) $(INCS) s.c -o s $(LIBS) gcc $(CFLAGS) $(INCS) s.c -o s $(LIBS)
 
... ... wpool1: wpool1.c $(DEPS)
56 59 wpool2: wpool2.c $(DEPS) wpool2: wpool2.c $(DEPS)
57 60 gcc $(CFLAGS) $(INCS) $@.c -o $@ $(LIBS) gcc $(CFLAGS) $(INCS) $@.c -o $@ $(LIBS)
58 61
62 ws1: ws1.c $(DEPS)
63 gcc $(CFLAGS) $(INCS) $@.c -o $@ $(LIBS)
64
59 65 ma: ma.c ma: ma.c
60 66 gcc $(CFLAGS) $(INCS) $@.c -o $@ -lpthread gcc $(CFLAGS) $(INCS) $@.c -o $@ -lpthread
61 67
File examples/wpool2.c changed (mode: 100644) (index 7929424..3697e0b)
23 23 #include <Conn.h> #include <Conn.h>
24 24
25 25 /* Global variables */ /* Global variables */
26 static unsigned short debug = 2;
26 static unsigned short debug = 0;
27 27
28 28 static void data_cb(struct Conn *C) static void data_cb(struct Conn *C)
29 29 { {
File examples/ws1.c copied from file examples/wpool2.c (similarity 66%) (mode: 100644) (index 7929424..fd107cd)
1 1 /* /*
2 * Test workers pool - web server
2 * A real web server
3 3 */ */
4 4
5 5 #define _GNU_SOURCE #define _GNU_SOURCE
 
23 23 #include <Conn.h> #include <Conn.h>
24 24
25 25 /* Global variables */ /* Global variables */
26 static unsigned short debug = 2;
26 static unsigned short debug = 20;
27 27
28 28 static void data_cb(struct Conn *C) static void data_cb(struct Conn *C)
29 29 { {
 
... ... static void data_cb(struct Conn *C)
32 32 memset(body, 'X', 100); memset(body, 'X', 100);
33 33 body[100] = '\0'; body[100] = '\0';
34 34
35 Conn_printf(C, "HTTP/1.0 200 OK\r\n"
35 Conn_printf(C, "HTTP/1.1 200 OK\r\n"
36 36 "Server: wpool2\r\n" "Server: wpool2\r\n"
37 37 "Content-Length: 100\r\n" "Content-Length: 100\r\n"
38 38 "\r\n" "\r\n"
 
... ... static void data_cb(struct Conn *C)
40 40 Conn_close(C); Conn_close(C);
41 41 } }
42 42
43 void cgi1(struct Conn *C)
44 {
45 char body[4096];
46
47 Log(0, "%s\n", __func__);
48
49 memset(body, 'X', 100);
50 body[100] = '\0';
51
52 Conn_printf(C,
53 "HTTP/1.1 200 OK\r\n"
54 "Server: ws1\r\n"
55 "Connection: Keep-Alive\r\n"
56 "Content-Length: 100\r\n"
57 "Content-Type: text/plain; charset=UTF-8\r\n"
58 "\r\n"
59 "%s", body);
60
61 Conn_kick(C);
62 }
63
43 64 int main(void) int main(void)
44 65 { {
45 66 int ret; int ret;
 
... ... int main(void)
66 87 Conn_set_socket_bind_addr(C, "0.0.0.0"); Conn_set_socket_bind_addr(C, "0.0.0.0");
67 88 Conn_set_socket_bind_port(C, 60000); Conn_set_socket_bind_port(C, 60000);
68 89
90
91 /* Create a web server attached to C */
92 ret = Conn_ws_create(C);
93 if (ret != 0) {
94 printf("Cannot create web server!\n");
95 return 1;
96 }
97
98 ret = Conn_ws_path(C, "/", "/x");
99 if (ret != 0) {
100 printf("Cannot attach path /x!\n");
101 return 1;
102 }
103
104 /* When you access http://server/cgi?a=1, you will be redirected to function cgi1 */
105 ret = Conn_ws_script(C, "/cgi", cgi1);
106 if (ret != 0) {
107 printf("Cannot attach script cgi1!\n");
108 return 1;
109 }
110
111
69 112 /* Create a pool with 2 workers */ /* Create a pool with 2 workers */
70 113 wp = Conn_wpool_create(2); wp = Conn_wpool_create(2);
71 114 if (wp == NULL) { if (wp == NULL) {
 
... ... int main(void)
75 118
76 119 Conn_set_wp(C, wp); Conn_set_wp(C, wp);
77 120
121 #if 0
78 122 Conn_set_cb(C, CONN_CB_DATA, data_cb); Conn_set_cb(C, CONN_CB_DATA, data_cb);
79 123 Conn_set_cb(C, CONN_CB_ACCEPT, NULL); Conn_set_cb(C, CONN_CB_ACCEPT, NULL);
80 124 Conn_set_cb(C, CONN_CB_CLOSE, NULL); Conn_set_cb(C, CONN_CB_CLOSE, NULL);
125 #endif
81 126
82 127 ret = Conn_commit(C); ret = Conn_commit(C);
83 128 if (ret != 0) { if (ret != 0) {
File tests/.gitignore changed (mode: 100644) (index 9857323..64e9830)
1 1 *.run *.run
2 *.out
File tests/Makefile changed (mode: 100644) (index 0e29409..f2a8de6)
... ... t3.run: t3.c $(DEPS)
18 18
19 19 .PHONY: clean .PHONY: clean
20 20 clean: clean:
21 @-rm -f $(TARGETS) *.log core
21 @-rm -f $(TARGETS) *.log core *.out
File tests/ws_double.expect added (mode: 100644) (index 0000000..39dc8b0)
1 XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
File tests/ws_double.sh added (mode: 100755) (index 0000000..bf34704)
1 #!/bin/bash
2
3 wget --timeout=10 -d -O ws_double.out \
4 http://r1i:60000/cgi?a=1 \
5 http://r1i:60000/cgi?a=1
6
7 diff ws_double.expect ws_double.out &>/dev/null
8 if [ "${?}" != "0" ]; then
9 echo "ws_double test failed!"
10 exit 1
11 fi
Hints:
Before first commit, do not forget to setup your git environment:
git config --global user.name "your_name_here"
git config --global user.email "your@email_here"

Clone this repository using HTTP(S):
git clone https://rocketgit.com/user/catalinux/Conn

Clone this repository using ssh (do not forget to upload a key first):
git clone ssh://rocketgit@ssh.rocketgit.com/user/catalinux/Conn

Clone this repository using git:
git clone git://git.rocketgit.com/user/catalinux/Conn

You are allowed to anonymously push to this repository.
This means that your pushed commits will automatically be transformed into a merge request:
... clone the repository ...
... make some changes and some commits ...
git push origin main