catalinux / Conn (public) (License: LGPLv2) (since 2016-03-01) (hash sha1)
Net library for easy building ipv4/ipv6 network daemons/clients
List of commits:
Subject Hash Author Date (UTC)
Lot of small changes that were need to fix epoll integration. 92877346e9cbecc7bea6f6d342e6d03475349878 Catalin(ux) M. BOIE 2009-02-26 12:20:07
Fixed the server example to output a \n terminated line. ed20ea84c9c6bd5dc9a3c066d03e4c8a049e1369 Catalin(ux) M. BOIE 2009-02-26 12:16:33
Added an accept_error callback to server example (s.c). 7014b75488c82c4b1f1c5b52cf7d4d2278719f4d Catalin(ux) M. BOIE 2009-02-26 12:15:56
Fixed exit from client example (c.c) to catch leaks. b912a406c59719919d3d98f4ca92cbbb3f986e99 Catalin(ux) M. BOIE 2009-02-26 12:15:08
Added queue stuff - not used for now. b913455733a88e63bea980e7490d4f36766e15fc Catalin(ux) M. BOIE 2009-02-26 10:50:56
Constify a lot of parameters and added more consistent logging. 8b15034895c7e63be9b9ca68e4b41c9a52618d10 Catalin(ux) M. BOIE 2009-02-26 10:46:05
Stuff in/out. 074bc88cec80384ade2f4309840a7d2c6f15a700 Catalin(ux) M. BOIE 2009-02-26 10:41:16
Remove 'slot' concept from anywhere I could. It was confusing. eacc7a0b5355ffa8f71dc07631209e2f805edd03 Catalin(ux) M. BOIE 2009-02-23 15:21:53
Use valgrind in examples to catch errors. 3520092ef3e946840bc13fd1891b22645cf085a5 Catalin(ux) M. BOIE 2009-02-23 15:21:23
Some cosmetic changes for examples. 2fc9e92d9fb24568a13355db191e5bf64d601ee8 Catalin(ux) M. BOIE 2009-02-23 15:21:02
More TODO entries added/removed. eb5a15672c8adf3c3da2425582b1e842c6127b13 Catalin(ux) M. BOIE 2009-02-23 15:20:04
Minor cosmetic changes. 5955b574710254e771bbf9d81a8712033c9f1464 Catalin(ux) M. BOIE 2009-02-23 11:32:20
Added support for multiple polling engines and added epoll engine. ecf8b9c40ddae857b9d7dd72dee6596c1689db3e Catalin(ux) M. BOIE 2009-02-23 10:02:39
Duilder updates. db4999d6737f84d17c3b9aa5863483c91d6ed1a7 Catalin(ux) M. BOIE 2008-12-08 14:17:02
Bump up the version to 1.0.11. c623ba1f66e79d758d88407b6224c3574d9fd730 Catalin(ux) M. BOIE 2008-07-16 06:38:33
Added Conn_for_every_line. e7b9ae7009aa9a5cafa1f0f4e3916729f69dbb6b Catalin(ux) M. BOIE 2008-07-15 14:38:29
Bump version to 1.0.10. 4919629756fe6236e2ff75e646e612ed534353e8 Catalin(ux) M. BOIE 2008-07-04 09:23:00
Updated duilder. a20257fddc85103b43978c64d9cd4a5d4c1451ea Catalin(ux) M. BOIE 2008-07-04 09:14:18
Added SRPM_POST_RUN duilder config variable. f7c007d745d147f285ad8bc0a9bdfea3a7d832c0 Catalin(ux) M. BOIE 2008-07-04 09:13:47
Changed my e-mail address and URL. 9353a5fd0cad934232e41b23ee10872f33df2da1 Catalin(ux) M. BOIE 2008-07-04 09:13:10
Commit 92877346e9cbecc7bea6f6d342e6d03475349878 - Lot of small changes that were need to fix epoll integration.
Signed-off-by: Catalin(ux) M. BOIE <catab@embedromix.ro>
Author: Catalin(ux) M. BOIE
Author date (UTC): 2009-02-26 12:20
Committer name: Catalin(ux) M. BOIE
Committer date (UTC): 2009-03-05 15:31
Parent(s): ed20ea84c9c6bd5dc9a3c066d03e4c8a049e1369
Signing key:
Tree: f4b25d5bcf7ce3e91d8176a239baf6bfd8a56172
File Lines added Lines deleted
Conn.c 236 182
Conn_engine_core.c 7 5
Conn_engine_core.h 3 1
Conn_engine_epoll.c 19 10
File Conn.c changed (mode: 100644) (index eca8f5b..be20c99)
... ... int Conn_init(const unsigned int max)
150 150 return 0; return 0;
151 151 } }
152 152
153 int Conn_enqueue(struct Conn *C, void *buf, size_t count)
153 /*
154 * Shutdown Conn
155 */
156 int Conn_shutdown(void)
157 {
158 int ret;
159 unsigned int i;
160
161 Conn_inited = 0;
162
163 ret = Conn_engine_shutdown();
164 if (ret < 0)
165 return ret;
166
167 /* Free all buffers */
168 for (i = 0; i < Conn_allocated - 1; i++) {
169 if (Conns[i].ibuf)
170 free(Conns[i].ibuf);
171 if (Conns[i].obuf)
172 free(Conns[i].obuf);
173 }
174
175 free(Conns);
176
177 return 0;
178 }
179
180 int Conn_enqueue(struct Conn *C, void *buf, const size_t count)
154 181 { {
155 unsigned int slot, r;
182 unsigned int r;
156 183 char *dump; char *dump;
157 184
158 185 if (C == NULL) { if (C == NULL) {
 
... ... int Conn_enqueue(struct Conn *C, void *buf, size_t count)
162 189
163 190 if (Conn_level >= 10) { if (Conn_level >= 10) {
164 191 dump = Conn_dump(buf, count); dump = Conn_dump(buf, count);
165 Log(0, "\tTry to enqueue %d bytes to id %llu [%s]...\n",
166 count, C->id, dump);
192 Log(0, "\tTry to enqueue %d bytes to slot=%u, id=%llu [%s]...\n",
193 count, C->slot, C->id, dump);
167 194 free(dump); free(dump);
168 195 } }
169 196
170 /* we cannot use pointers directly because they can change under us in 'expand' */
171 slot = C->slot;
172
173 if (Conns[slot].obuf_size - Conns[slot].obuf_tail < count) {
174 r = Conn_try_expand_buf(&Conns[slot], 0, count);
197 if (C->obuf_size - C->obuf_tail < count) {
198 r = Conn_try_expand_buf(C, 0, count);
175 199 if (r != 0) if (r != 0)
176 200 return -1; return -1;
177 201 } }
178 202
179 memcpy(Conns[slot].obuf + Conns[slot].obuf_tail, buf, count);
180 Conns[slot].obuf_tail += count;
203 memcpy(C->obuf + C->obuf_tail, buf, count);
204 C->obuf_tail += count;
181 205
182 Conns[slot].events |= CONN_POLLOUT;
183 Conn_engine_chg_obj(&Conns[slot]);
206 C->events |= CONN_POLLOUT;
207 Conn_engine_chg_obj(C);
184 208
185 209 return 0; return 0;
186 210 } }
187 211
188 212 static void Conn_free_intern(struct Conn *C) static void Conn_free_intern(struct Conn *C)
189 213 { {
190 unsigned int slot;
191
192 slot = C->slot;
193
194 Log(7, "Cleaning-up slot in %s state id %llu [%s]...\n",
195 Conn_state(C), C->id, Conn_errno(C));
214 Log(7, "Cleaning-up slot in %s state, slot=%u, id=%llu [%s]...\n",
215 Conn_state(C), C->slot, C->id, Conn_errno(C));
196 216
197 217 snprintf(Conn_error, sizeof(Conn_error), snprintf(Conn_error, sizeof(Conn_error),
198 218 "%s", Conn_errno(C)); "%s", Conn_errno(C));
199 219
200 220 if (C->error_state != CONN_ERROR_USERREQ) { if (C->error_state != CONN_ERROR_USERREQ) {
201 if (Conns[slot].cb_error)
202 Conns[slot].cb_error(C);
221 if (C->cb_error)
222 C->cb_error(C);
203 223 else if (Conn_error_cb) else if (Conn_error_cb)
204 224 Conn_error_cb(C); Conn_error_cb(C);
205 225 } }
206 226
207 if (Conns[slot].state == CONN_STATE_OPEN) {
208 if (Conns[slot].cb_close)
209 Conns[slot].cb_close(C);
227 if (C->state == CONN_STATE_OPEN) {
228 if (C->cb_close)
229 C->cb_close(C);
210 230 else if (Conn_close_cb) else if (Conn_close_cb)
211 Conn_close_cb(&Conns[slot]);
231 Conn_close_cb(C);
212 232 } }
213 233
214 if (Conns[slot].fd > -1) {
215 close(Conns[slot].fd);
216 Conns[slot].fd = -1;
217 }
234 Conn_engine_del_obj(C);
218 235
219 Conn_engine_del_obj(&Conns[slot]);
236 if (C->fd > -1) {
237 close(C->fd);
238 C->fd = -1;
239 }
220 240
221 241 /* Reset tsend, else we enter in a timeout error loop */ /* Reset tsend, else we enter in a timeout error loop */
222 Conns[slot].tsend.tv_sec = 0;
223 Conns[slot].tsend.tv_usec = 0;
242 C->tsend.tv_sec = 0;
243 C->tsend.tv_usec = 0;
224 244
225 245 /* Reset the connection attempt time */ /* Reset the connection attempt time */
226 Conns[slot].conn_syn.tv_sec = 0;
227 Conns[slot].conn_syn.tv_usec = 0;
246 C->conn_syn.tv_sec = 0;
247 C->conn_syn.tv_usec = 0;
228 248
229 if (Conns[slot].flags & CONN_FLAGS_AUTO_RECONNECT) {
230 Conns[slot].tryat = Conn_now.tv_sec + Conns[slot].delay;
231 Conns[slot].error_state = 0;
232 Conns[slot].state = CONN_STATE_CONNECT_0;
249 if (C->flags & CONN_FLAGS_AUTO_RECONNECT) {
250 C->tryat = Conn_now.tv_sec + C->delay;
251 C->error_state = 0;
252 C->state = CONN_STATE_CONNECT_0;
233 253
234 Conns[slot].ibuf_head = 0;
235 Conns[slot].ibuf_tail = 0;
254 C->ibuf_head = 0;
255 C->ibuf_tail = 0;
236 256
237 Conns[slot].obuf_head = 0;
238 Conns[slot].obuf_tail = 0;
257 C->obuf_head = 0;
258 C->obuf_tail = 0;
239 259
240 260 Conn_pending++; Conn_pending++;
241 261 } else { } else {
242 Conns[slot].type = Conn_type_UNK;
243 Conns[slot].state = CONN_STATE_FREE;
244
245 /* TODO: add to free list */
246
247 if (slot < Conn_no - 1) {
248 /* free old mem */
249 if (Conns[slot].ibuf)
250 free(Conns[slot].ibuf);
251 if (Conns[slot].obuf)
252 free(Conns[slot].obuf);
253
254 Conn_engine_move_slot(slot, Conn_no - 1);
255 Conns[slot] = Conns[Conn_no - 1];
256 Conns[slot].slot = slot;
257
258 /* fixes */
259 Conns[Conn_no - 1].ibuf = NULL;
260 Conns[Conn_no - 1].ibuf_size = 0;
261 Conns[Conn_no - 1].obuf = NULL;
262 Conns[Conn_no - 1].obuf_size = 0;
263 }
264
265 if (Conn_no == Conn_max)
266 Conn_accept_is_allowed = 1;
267
268 Conn_no--;
262 C->type = Conn_type_UNK;
263 C->state = CONN_STATE_FREE;
269 264 } }
270 265 } }
271 266
 
... ... static int Conn_grow(void)
306 301 */ */
307 302 static struct Conn *Conn_alloc(void) static struct Conn *Conn_alloc(void)
308 303 { {
309 unsigned int slot, growok;
304 struct Conn *C;
305 unsigned int growok;
310 306 void *p; void *p;
311 307
312 308 Log(10, "%s() Conn_no=%d Conn_max=%d\n", Log(10, "%s() Conn_no=%d Conn_max=%d\n",
 
... ... static struct Conn *Conn_alloc(void)
331 327 if (Conn_no > Conn_max_reached) if (Conn_no > Conn_max_reached)
332 328 Conn_max_reached = Conn_no; Conn_max_reached = Conn_no;
333 329
334 slot = Conn_no;
330 C = &Conns[Conn_no];
331 C->slot = Conn_no;
335 332
336 Conns[slot].type = Conn_type_UNK;
337 Conns[slot].state = CONN_STATE_FREE;
338 Conns[slot].slot = slot;
333 C->type = Conn_type_UNK;
334 C->state = CONN_STATE_FREE;
339 335
340 if (Conns[slot].ibuf_size < Conn_default_ibuf) {
341 p = realloc(Conns[slot].ibuf, Conn_default_ibuf);
336 if (C->ibuf_size < Conn_default_ibuf) {
337 p = realloc(C->ibuf, Conn_default_ibuf);
342 338 if (p == NULL) { if (p == NULL) {
343 339 snprintf(Conn_error, sizeof(Conn_error), snprintf(Conn_error, sizeof(Conn_error),
344 340 "Memory allocation error2!"); "Memory allocation error2!");
345 341 return NULL; return NULL;
346 342 } }
347 Conns[slot].ibuf = p;
348 Conns[slot].ibuf_size = Conn_default_ibuf;
343 C->ibuf = p;
344 C->ibuf_size = Conn_default_ibuf;
349 345 } }
350 Conns[slot].ibuf_head = 0;
351 Conns[slot].ibuf_tail = 0;
346 C->ibuf_head = 0;
347 C->ibuf_tail = 0;
352 348
353 if (Conns[slot].obuf_size < Conn_default_obuf) {
354 p = realloc(Conns[slot].obuf, Conn_default_obuf);
349 if (C->obuf_size < Conn_default_obuf) {
350 p = realloc(C->obuf, Conn_default_obuf);
355 351 if (p == NULL) { if (p == NULL) {
356 352 snprintf(Conn_error, sizeof(Conn_error), snprintf(Conn_error, sizeof(Conn_error),
357 353 "Memory allocation error3!"); "Memory allocation error3!");
358 354 return NULL; return NULL;
359 355 } }
360 Conns[slot].obuf = p;
361 Conns[slot].obuf_size = Conn_default_obuf;
356 C->obuf = p;
357 C->obuf_size = Conn_default_obuf;
362 358 } }
363 Conns[slot].obuf_head = 0;
364 Conns[slot].obuf_tail = 0;
359 C->obuf_head = 0;
360 C->obuf_tail = 0;
365 361
366 Conns[slot].trecv = Conn_now;
362 C->trecv = Conn_now;
367 363
368 Conns[slot].bi = 0;
369 Conns[slot].bo = 0;
370 Conns[slot].private = NULL;
364 C->bi = 0;
365 C->bo = 0;
366 C->private = NULL;
371 367
372 368 /* bandwidth */ /* bandwidth */
373 Conns[slot].band_width = 0;
374 Conns[slot].band_factor = 0;
375 Conns[slot].band_tokens = 0;
376 Conns[slot].band_lasttime = Conn_now;
369 C->band_width = 0;
370 C->band_factor = 0;
371 C->band_tokens = 0;
372 C->band_lasttime = Conn_now;
377 373
378 Conns[slot].fd = -1;
379 Conns[slot].events = 0;
380 Conns[slot].revents = 0;
381 Conns[slot].state = CONN_STATE_EMPTY;
374 C->fd = -1;
375 C->events = 0;
376 C->revents = 0;
377 C->state = CONN_STATE_EMPTY;
382 378
383 Conns[slot].flags = 0;
379 C->flags = 0;
384 380
385 381 Conn_no++; Conn_no++;
386 382
387 Log(10, "\tFound free slot %u. Now Conn_no=%d\n",
388 slot, Conn_no);
383 Log(10, "\tFound free slot=%u, id=%llu. Now Conn_no=%d\n",
384 C->slot, C->id, Conn_no);
389 385
390 386 if (Conn_no == Conn_max) if (Conn_no == Conn_max)
391 387 Conn_accept_is_allowed = 0; Conn_accept_is_allowed = 0;
392 388
393 return &Conns[slot];
389 return C;
394 390 } }
395 391
396 392 struct Conn *Conn_socket(const int domain, const int type, const int port) struct Conn *Conn_socket(const int domain, const int type, const int port)
 
... ... struct Conn *Conn_connect(const int domain, const int type, const char *addr,
546 542 if (!X) if (!X)
547 543 return NULL; return NULL;
548 544
549 Xslot = X->slot;
550
551 Conns[Xslot].type = Conn_type_CLIENT;
552 Conns[Xslot].error_state = 0;
553 Conns[Xslot].state = CONN_STATE_CONNECT_a;
554 snprintf(Conns[Xslot].addr, sizeof(Conns[Xslot].addr), "%s", addr);
555 Conns[Xslot].port = port;
556 Conns[Xslot].sock_domain = domain;
557 Conns[Xslot].sock_type = type;
558 Conns[Xslot].id = Conn_id++;
559 Conns[Xslot].start = Conn_now.tv_sec;
545 X->type = Conn_type_CLIENT;
546 X->error_state = 0;
547 X->state = CONN_STATE_CONNECT_a;
548 snprintf(X->addr, sizeof(X->addr), "%s", addr);
549 X->port = port;
550 X->sock_domain = domain;
551 X->sock_type = type;
552 X->id = Conn_id++;
553 X->start = Conn_now.tv_sec;
560 554
561 555 Conn_pending++; Conn_pending++;
562 556
563 return &Conns[Xslot];
557 return X;
564 558 } }
565 559
566 560 static void Conn_accept(struct Conn *C) static void Conn_accept(struct Conn *C)
 
... ... static void Conn_accept(struct Conn *C)
571 565 struct sockaddr_in6 ca6; struct sockaddr_in6 ca6;
572 566 socklen_t cax_len; socklen_t cax_len;
573 567 struct Conn *X; struct Conn *X;
574 unsigned int slot, Xslot;
568 unsigned int Cslot;
575 569
576 570 Log(10, "Accepting a connection via %s/%d, type %s, domain %s.\n", Log(10, "Accepting a connection via %s/%d, type %s, domain %s.\n",
577 571 C->addr, C->port, Conn_type(C), Conn_domain(C)); C->addr, C->port, Conn_type(C), Conn_domain(C));
578 572
579 slot = C->slot;
580 switch(Conns[slot].sock_domain) {
573 switch(C->sock_domain) {
581 574 case PF_INET: case PF_INET:
582 575 pca = (struct sockaddr *) &ca4; pca = (struct sockaddr *) &ca4;
583 576 cax_len = sizeof(ca4); cax_len = sizeof(ca4);
 
... ... static void Conn_accept(struct Conn *C)
591 584 default: default:
592 585 snprintf(Conn_error, sizeof(Conn_error), snprintf(Conn_error, sizeof(Conn_error),
593 586 "Cannot deal with domain %d.", "Cannot deal with domain %d.",
594 Conns[slot].sock_domain);
595 Conns[slot].error_state = CONN_ERROR_SOCKET;
596 if (Conns[slot].cb_error)
597 Conns[slot].cb_error(&Conns[slot]);
587 C->sock_domain);
588 C->error_state = CONN_ERROR_SOCKET;
589 if (C->cb_error)
590 C->cb_error(C);
598 591 else if (Conn_error_cb) else if (Conn_error_cb)
599 Conn_error_cb(&Conns[slot]);
592 Conn_error_cb(C);
600 593 return; return;
601 594 } }
602 595
603 fd = accept(Conns[slot].fd, pca, &cax_len);
596 fd = accept(C->fd, pca, &cax_len);
604 597 if (fd == -1) { if (fd == -1) {
605 598 if (errno == EAGAIN) if (errno == EAGAIN)
606 599 return; return;
607 600
608 snprintf(Conn_error, sizeof(Conn_error),
609 "Cannot accept: %s",
610 strerror(errno));
611 Conns[slot].error_state = CONN_ERROR_ACCEPT;
612 if (Conns[slot].cb_error)
613 Conns[slot].cb_error(&Conns[slot]);
614 else if (Conn_error_cb)
615 Conn_error_cb(&Conns[slot]);
601 /* TODO: ratelimit */
602 Log(9, "WARN: Cannot accept on fd %d [%s].\n",
603 C->fd, strerror(errno));
604 if (C->cb_accept_error)
605 C->cb_accept_error(C);
606 else if (Conn_accept_error_cb)
607 Conn_accept_error_cb(C);
616 608 return; return;
617 609 } }
618 610
611 /* After calling Conn_alloc, pointer to slot can change, so C is not valid */
612 Cslot = C->slot;
619 613 X = Conn_alloc(); X = Conn_alloc();
614 C = &Conns[Cslot];
620 615 if (!X) { if (!X) {
621 Log(0, "ERROR: Cannot alloc a slot!\n");
622 Conns[slot].error_state = CONN_ERROR_MEM;
623 if (Conns[slot].cb_error)
624 Conns[slot].cb_error(&Conns[slot]);
625 else if (Conn_error_cb)
626 Conn_error_cb(&Conns[slot]);
616 if (C->cb_accept_error)
617 C->cb_accept_error(C);
618 else if (Conn_accept_error_cb)
619 Conn_accept_error_cb(C);
627 620 close(fd); close(fd);
628 621 return; return;
629 622 } }
630 623
631 Xslot = X->slot;
632
633 switch (Conns[slot].sock_domain) {
624 switch (C->sock_domain) {
634 625 case PF_INET: case PF_INET:
635 inet_ntop(Conns[slot].sock_domain, &ca4.sin_addr, Conns[Xslot].addr, sizeof(Conns[Xslot].addr));
636 Conns[Xslot].port = ntohs(ca4.sin_port);
626 inet_ntop(C->sock_domain, &ca4.sin_addr, X->addr, sizeof(X->addr));
627 X->port = ntohs(ca4.sin_port);
637 628 break; break;
638 629
639 630 case PF_INET6: case PF_INET6:
640 inet_ntop(Conns[slot].sock_domain, &ca6.sin6_addr, Conns[Xslot].addr, sizeof(Conns[Xslot].addr));
641 Conns[Xslot].port = ntohs(ca6.sin6_port);
631 inet_ntop(C->sock_domain, &ca6.sin6_addr, X->addr, sizeof(X->addr));
632 X->port = ntohs(ca6.sin6_port);
642 633 break; break;
643 634 } }
644 635
645 Conns[Xslot].type = Conn_type_CLIENT;
646 Conns[Xslot].error_state = 0;
647 Conns[Xslot].state = CONN_STATE_OPEN;
648 Conns[Xslot].via = Conns[slot].port;
649 Conns[Xslot].fd = fd;
650 Conns[Xslot].sock_domain = Conns[slot].sock_domain;
651 Conns[Xslot].sock_type = Conns[slot].sock_type;
652 Conns[Xslot].start = Conn_now.tv_sec;
653 Conns[Xslot].id = Conn_id++;
654 Conns[Xslot].events = CONN_POLLIN;
655 Conns[Xslot].revents = 0;
636 X->type = Conn_type_CLIENT;
637 X->error_state = 0;
638 X->state = CONN_STATE_OPEN;
639 X->via = C->id;
640 X->fd = fd;
641 X->sock_domain = C->sock_domain;
642 X->sock_type = C->sock_type;
643 X->start = Conn_now.tv_sec;
644 X->id = Conn_id++;
645 X->events = CONN_POLLIN;
646 X->revents = 0;
656 647
657 Conn_setnonblock(Conns[Xslot].fd);
648 Conn_setnonblock(X->fd);
658 649
659 Conn_engine_add_obj(&Conns[Xslot]);
650 Conn_engine_add_obj(X);
660 651
661 if (Conns[slot].cb_accept)
662 Conns[slot].cb_accept(&Conns[slot]);
652 if (C->cb_accept)
653 C->cb_accept(X);
663 654 else if (Conn_accept_cb != NULL) else if (Conn_accept_cb != NULL)
664 Conn_accept_cb(&Conns[Xslot]);
655 Conn_accept_cb(X);
665 656
666 657 Conn_total++; Conn_total++;
667 658 } }
 
... ... static void Conn_accept_allow(void)
697 688 */ */
698 689 static void Conn_band_update(struct Conn *C) static void Conn_band_update(struct Conn *C)
699 690 { {
700 unsigned int slot;
701 691 long diff; long diff;
702 692
703 693 /* no need */ /* no need */
 
... ... static void Conn_trytoconnect(void)
813 803
814 804 Conn_setnonblock(Conns[i].fd); Conn_setnonblock(Conns[i].fd);
815 805
806 /* Need POLLOUT to signal when the connection was done. */
816 807 Conns[i].events |= (CONN_POLLIN | CONN_POLLOUT); Conns[i].events |= (CONN_POLLIN | CONN_POLLOUT);
817 808 Conn_engine_add_obj(&Conns[i]); Conn_engine_add_obj(&Conns[i]);
818 809 } }
 
... ... static void Conn_send_cb_i(struct Conn *C)
856 847
857 848 slot = C->slot; slot = C->slot;
858 849
850 if (Conns[slot].obuf == NULL)
851 abort();
852
859 853 buf = Conns[slot].obuf + Conns[slot].obuf_head; buf = Conns[slot].obuf + Conns[slot].obuf_head;
860 854 count = Conns[slot].obuf_tail - Conns[slot].obuf_head; count = Conns[slot].obuf_tail - Conns[slot].obuf_head;
861 855 if (count == 0) { if (count == 0) {
 
... ... static void Conn_recv_cb_i(struct Conn *C)
992 986
993 987 /* /*
994 988 * Callback that is called for every connection * Callback that is called for every connection
995 * TODO: Hm. For epoll seems that we do not scan all entries and this affects expiration!
996 989 */ */
997 static void Conn_poll_cb(struct Conn *C, const int revents)
990 static void Conn_poll_cb(struct Conn *C, int revents)
998 991 { {
999 Log(12, "%s: revents=%x.\n", __FUNCTION__, revents);
992 unsigned int slot;
993
994 Log(12, "%s: slot=%u, id=%llu, revents=%x.\n",
995 __FUNCTION__, C->slot, C->id, revents);
1000 996 C->revents = revents; C->revents = revents;
1001 997
1002 998 if (Conn_level >= 12) if (Conn_level >= 12)
1003 999 Log(12, "\t%s\n", Conn_status_slot(C)); Log(12, "\t%s\n", Conn_status_slot(C));
1004 1000
1005 if (C->revents & CONN_POLLHUP) {
1001 /* We should not have events on a free cell */
1002 if (C->state == CONN_STATE_FREE)
1003 abort();
1004
1005 if (revents & CONN_POLLHUP) {
1006 1006 C->error_state = CONN_ERROR_HANGUP; C->error_state = CONN_ERROR_HANGUP;
1007 1007 /* TODO: Add it to the close list to speed it up */ /* TODO: Add it to the close list to speed it up */
1008 1008 } }
1009 1009
1010 if (C->revents & CONN_POLLERR) {
1010 if (revents & CONN_POLLERR) {
1011 1011 C->error_state = CONN_ERROR_POLL; C->error_state = CONN_ERROR_POLL;
1012 1012 C->xerrno = 0; /* TODO: unknown error? */ C->xerrno = 0; /* TODO: unknown error? */
1013 1013 /* TODO: CONN_ERROR_POLL is correct here? */ /* TODO: CONN_ERROR_POLL is correct here? */
1014 1014 } }
1015 1015
1016 1016 /* First, test we have a new connection */ /* First, test we have a new connection */
1017 if ((C->revents & CONN_POLLOUT)
1017 if ((revents & CONN_POLLOUT)
1018 1018 && (Conn_ignore(C) == 0)) { && (Conn_ignore(C) == 0)) {
1019 1019 /* We just established a connection */ /* We just established a connection */
1020 1020 if (C->state == CONN_STATE_CONNECT_b) { if (C->state == CONN_STATE_CONNECT_b) {
1021 /*
1022 * We do not need POLLOUT now - it was used only for
1023 * connect completion.
1024 */
1025 revents &= ~CONN_POLLOUT;
1026 C->events &= ~CONN_POLLOUT;
1027 Conn_engine_chg_obj(C);
1028
1029 C->state = CONN_STATE_OPEN;
1030
1021 1031 if (C->cb_connected != NULL) if (C->cb_connected != NULL)
1022 1032 C->cb_connected(C); C->cb_connected(C);
1023 1033 else if (Conn_connected_cb) else if (Conn_connected_cb)
1024 1034 Conn_connected_cb(C); Conn_connected_cb(C);
1025 C->state = CONN_STATE_OPEN;
1035
1026 1036 } }
1027 1037 } }
1028 1038
1029 1039 /* Second, test for error or input */ /* Second, test for error or input */
1030 if ((C->revents & CONN_POLLIN)
1040 if ((revents & CONN_POLLIN)
1031 1041 && (Conn_ignore(C) == 0)) { && (Conn_ignore(C) == 0)) {
1032 1042 if (C->type == Conn_type_MASTER) { if (C->type == Conn_type_MASTER) {
1043 /* C pointer can change under us in Conn_accept->Conn_grow */
1044 slot = C->slot;
1033 1045 Conn_accept(C); Conn_accept(C);
1046 C = &Conns[slot];
1034 1047 } else { } else {
1035 1048 if (C->cb_recv) if (C->cb_recv)
1036 1049 C->cb_recv(C); C->cb_recv(C);
 
... ... static void Conn_poll_cb(struct Conn *C, const int revents)
1041 1054 } }
1042 1055 } }
1043 1056
1044 if ((C->revents & CONN_POLLOUT)
1057 if ((revents & CONN_POLLOUT)
1045 1058 && (Conn_ignore(C) == 0)) { && (Conn_ignore(C) == 0)) {
1046 1059 /* We can send data */ /* We can send data */
1047 1060 if (C->state == CONN_STATE_OPEN) { if (C->state == CONN_STATE_OPEN) {
 
... ... static void Conn_poll_cb(struct Conn *C, const int revents)
1053 1066 Conn_send_cb_i(C); Conn_send_cb_i(C);
1054 1067
1055 1068 if (C->obuf_head == C->obuf_tail) { if (C->obuf_head == C->obuf_tail) {
1056 C->events &= ~EPOLLOUT;
1069 C->events &= ~CONN_POLLOUT;
1057 1070 Conn_epoll_chg_obj(C); Conn_epoll_chg_obj(C);
1058 1071 if (C->flags & CONN_FLAGS_CLOSE_AFTER_SEND) if (C->flags & CONN_FLAGS_CLOSE_AFTER_SEND)
1059 1072 C->error_state = CONN_ERROR_USERREQ; C->error_state = CONN_ERROR_USERREQ;
 
... ... static void Conn_poll_cb(struct Conn *C, const int revents)
1061 1074 } }
1062 1075 } }
1063 1076
1064 /* test if it expired/timout */
1065 Conn_expire(C);
1066
1067 if (C->error_state > 0) {
1068 /* TODO: put it on a list and before exit scan the list */
1077 /* Close it if in error state */
1078 if (C->error_state > 0)
1069 1079 Conn_free_intern(C); Conn_free_intern(C);
1070 } else {
1071 /* add tokens */
1072 Conn_band_update(C);
1073 }
1080 }
1081
1082 /*
1083 * Moving a in-use slot over a free one for compacting reason.
1084 */
1085 static void Conn_move_slot(const unsigned int dst, const unsigned int src)
1086 {
1087 struct Conn tmp;
1088
1089 if (dst == src)
1090 return;
1091
1092 Log(10, "%s: Moving slot=%u (id=%llu) over %d...\n",
1093 __FUNCTION__, src, Conns[src].id, dst);
1094
1095 tmp = Conns[dst];
1096 Conns[dst] = Conns[src];
1097 Conns[dst].slot = dst;
1098 Conns[src] = tmp;
1099
1100 Conn_engine_move_slot(dst, src);
1101
1102 Conn_no--;
1103
1104 /* We made some space, so accepting again connection */
1105 Conn_accept_is_allowed = 1;
1074 1106 } }
1075 1107
1076 1108 /* /*
1077 1109 * Returns: -1 on error, 0 nothing to do, 1 if some work was done * Returns: -1 on error, 0 nothing to do, 1 if some work was done
1078 1110 * timeout is in 1/1000 seconds increments. * timeout is in 1/1000 seconds increments.
1079 1111 */ */
1080 int Conn_poll(int timeout)
1112 int Conn_poll(const int timeout)
1081 1113 { {
1082 1114 int ret; int ret;
1083 1115 int timeout2; int timeout2;
1116 unsigned int i;
1117 struct Conn *C;
1084 1118
1085 1119 Log(11, "Conn_poll(timeout=%d Conn_no=%d)\n", Log(11, "Conn_poll(timeout=%d Conn_no=%d)\n",
1086 1120 timeout, Conn_no); timeout, Conn_no);
 
... ... int Conn_poll(int timeout)
1101 1135 if (ret < 0) if (ret < 0)
1102 1136 return -1; return -1;
1103 1137
1104 /* block accept if full queue */
1138 Log(9, "Compacting cells, do expiration and band stuff...\n");
1139 i = 0;
1140 while (i < Conn_no) {
1141 C = &Conns[i];
1142
1143 if (C->state == CONN_STATE_FREE) {
1144 Conn_move_slot(i, Conn_no - 1);
1145 i++;
1146 continue;
1147 }
1148
1149 /* test if it expired/timeout */
1150 Conn_expire(C);
1151
1152 /* add tokens */
1153 Conn_band_update(C);
1154
1155 i++;
1156 }
1157
1158 /* Blocking accept if full queue or unblock if not */
1105 1159 Conn_accept_allow(); Conn_accept_allow();
1106 1160
1107 1161 if (timeout == -1) if (timeout == -1)
File Conn_engine_core.c changed (mode: 100644) (index 5f5c3c1..8f32aa4)
... ... void (*Conn_close_cb)(struct Conn *C) = NULL;
18 18 void (*Conn_trigger_cb)(struct Conn *C) = NULL; void (*Conn_trigger_cb)(struct Conn *C) = NULL;
19 19 void (*Conn_error_cb)(struct Conn *C) = NULL; void (*Conn_error_cb)(struct Conn *C) = NULL;
20 20 void (*Conn_connected_cb)(struct Conn *C) = NULL; void (*Conn_connected_cb)(struct Conn *C) = NULL;
21 void (*Conn_accept_error_cb)(struct Conn *C) = NULL;
21 22
22 23 char *(*Conn_status_slot_html_cb)(const struct Conn *C); char *(*Conn_status_slot_html_cb)(const struct Conn *C);
23 24 char *(*Conn_status_cb)(void); char *(*Conn_status_cb)(void);
 
... ... char *Conn_status_slot_html(const struct Conn *C)
416 417 char speedi[32], speedo[32]; char speedi[32], speedo[32];
417 418 unsigned int dT, si, so; unsigned int dT, si, so;
418 419
419 Conn_poll_status(Conns[C->slot].events, polle);
420 Conn_poll_status(Conns[C->slot].revents, pollr);
420 Conn_poll_status(C->events, polle);
421 Conn_poll_status(C->revents, pollr);
421 422
422 423 dT = Conn_now.tv_sec - C->start; dT = Conn_now.tv_sec - C->start;
423 424 if (dT == 0) if (dT == 0)
 
... ... char *Conn_status(const unsigned int flags)
522 523 if (flags & 1) if (flags & 1)
523 524 strcat(buf, "<tr bgcolor=\"ffffff\">\n"); strcat(buf, "<tr bgcolor=\"ffffff\">\n");
524 525
525 Conn_poll_status(Conns[C->slot].events, polle);
526 Conn_poll_status(Conns[C->slot].revents, pollr);
526 Conn_poll_status(C->events, polle);
527 Conn_poll_status(C->revents, pollr);
527 528
528 529 if ((flags & 1) == 0) if ((flags & 1) == 0)
529 530 slot = Conn_status_slot(C); slot = Conn_status_slot(C);
 
... ... int Conn_set_cb(struct Conn *C, const unsigned int type, void (*f)(struct Conn *
738 739 case CONN_CB_TRIGGER: C->cb_trigger = f; break; case CONN_CB_TRIGGER: C->cb_trigger = f; break;
739 740 case CONN_CB_ERROR: C->cb_error = f; break; case CONN_CB_ERROR: C->cb_error = f; break;
740 741 case CONN_CB_CONNECTED: C->cb_connected = f; break; case CONN_CB_CONNECTED: C->cb_connected = f; break;
742 case CONN_CB_ACCEPT_ERROR: C->cb_accept_error = f; break;
741 743
742 744 default: default:
743 745 return -1; return -1;
 
... ... void Conn_eatall(struct Conn *C)
819 821 } }
820 822
821 823 /* /*
822 * If put buffer is empty, just mark for closing.
824 * If put buffer is empty, just mark for closing.
823 825 * If we have data, set the flag to do the closing after send. * If we have data, set the flag to do the closing after send.
824 826 */ */
825 827 void Conn_close(struct Conn *C) void Conn_close(struct Conn *C)
File Conn_engine_core.h changed (mode: 100644) (index e686ef5..754f621)
53 53 #define CONN_ERROR_HANGUP CONN_ERROR_START + 6 #define CONN_ERROR_HANGUP CONN_ERROR_START + 6
54 54 #define CONN_ERROR_GETADDRINFO CONN_ERROR_START + 7 #define CONN_ERROR_GETADDRINFO CONN_ERROR_START + 7
55 55 #define CONN_ERROR_EXPIRED CONN_ERROR_START + 8 #define CONN_ERROR_EXPIRED CONN_ERROR_START + 8
56 #define CONN_ERROR_ACCEPT CONN_ERROR_START + 9
56 #define CONN_ERROR_ACCEPT CONN_ERROR_START + 9 /* This is free TODO: check! */
57 57 #define CONN_ERROR_MEM CONN_ERROR_START + 10 #define CONN_ERROR_MEM CONN_ERROR_START + 10
58 58 #define CONN_ERROR_CONNECT CONN_ERROR_START + 11 #define CONN_ERROR_CONNECT CONN_ERROR_START + 11
59 59 #define CONN_ERROR_READ_TIMEOUT CONN_ERROR_START + 12 #define CONN_ERROR_READ_TIMEOUT CONN_ERROR_START + 12
 
86 86 #define CONN_CB_TRIGGER CONN_CB_START + 6 #define CONN_CB_TRIGGER CONN_CB_START + 6
87 87 #define CONN_CB_ERROR CONN_CB_START + 7 #define CONN_CB_ERROR CONN_CB_START + 7
88 88 #define CONN_CB_CONNECTED CONN_CB_START + 8 #define CONN_CB_CONNECTED CONN_CB_START + 8
89 #define CONN_CB_ACCEPT_ERROR CONN_CB_START + 9
89 90
90 91 /* Engine type */ /* Engine type */
91 92 #define CONN_ENGINE_POLL 1 #define CONN_ENGINE_POLL 1
 
... ... extern void (*Conn_close_cb)(struct Conn *C);
189 190 extern void (*Conn_trigger_cb)(struct Conn *C); extern void (*Conn_trigger_cb)(struct Conn *C);
190 191 extern void (*Conn_error_cb)(struct Conn *C); extern void (*Conn_error_cb)(struct Conn *C);
191 192 extern void (*Conn_connected_cb)(struct Conn *C); extern void (*Conn_connected_cb)(struct Conn *C);
193 extern void (*Conn_accept_error_cb)(struct Conn *C);
192 194
193 195 extern char *(*Conn_status_slot_html_cb)(const struct Conn *C); extern char *(*Conn_status_slot_html_cb)(const struct Conn *C);
194 196 extern char *(*Conn_status_cb)(void); extern char *(*Conn_status_cb)(void);
File Conn_engine_epoll.c changed (mode: 100644) (index 0bf0866..bd8a866)
... ... int Conn_epoll_init(void)
32 32
33 33 int Conn_epoll_shutdown(void) int Conn_epoll_shutdown(void)
34 34 { {
35 close(Conn_epoll_fd);
35 int ret;
36
37 free(Conn_epoll_events);
38 ret = close(Conn_epoll_fd);
39 if (ret != 0)
40 return -1;
36 41
37 42 return 0; return 0;
38 43 } }
 
... ... int Conn_epoll_add_obj(struct Conn *C)
78 83 struct epoll_event ev; struct epoll_event ev;
79 84
80 85 memset(&ev, 0, sizeof(ev)); memset(&ev, 0, sizeof(ev));
81 ev.data.ptr = C;
86 ev.data.ptr = (void *) C->slot;
82 87 ev.events = C->events; ev.events = C->events;
83 88 ret = epoll_ctl(Conn_epoll_fd, EPOLL_CTL_ADD, C->fd, &ev); ret = epoll_ctl(Conn_epoll_fd, EPOLL_CTL_ADD, C->fd, &ev);
84 89 if (ret == -1) { if (ret == -1) {
 
... ... int Conn_epoll_del_obj(struct Conn *C)
97 102 { {
98 103 int ret; int ret;
99 104
105 /* Was never registered */
106 if (C->fd == -1)
107 return 0;
108
100 109 ret = epoll_ctl(Conn_epoll_fd, EPOLL_CTL_DEL, C->fd, NULL); ret = epoll_ctl(Conn_epoll_fd, EPOLL_CTL_DEL, C->fd, NULL);
101 110 if (ret == -1) { if (ret == -1) {
102 111 snprintf(Conn_error, sizeof(Conn_error), snprintf(Conn_error, sizeof(Conn_error),
 
... ... int Conn_epoll_chg_obj(struct Conn *C)
117 126
118 127 memset(&ev, 0, sizeof(ev)); memset(&ev, 0, sizeof(ev));
119 128 ev.events = C->events; ev.events = C->events;
120 ev.data.ptr = C;
129 ev.data.ptr = (void *) C->slot;
121 130 ret = epoll_ctl(Conn_epoll_fd, EPOLL_CTL_MOD, C->fd, &ev); ret = epoll_ctl(Conn_epoll_fd, EPOLL_CTL_MOD, C->fd, &ev);
122 131 if (ret != 0) { if (ret != 0) {
123 132 snprintf(Conn_error, sizeof(Conn_error), snprintf(Conn_error, sizeof(Conn_error),
 
... ... int Conn_epoll_poll(const int timeout2, void (*cb)(struct Conn *C,
137 146 const int revents)) const int revents))
138 147 { {
139 148 int i, events; int i, events;
149 struct Conn *C;
150 unsigned int slot;
140 151
141 152 again: again:
142 153 events = epoll_wait(Conn_epoll_fd, Conn_epoll_events, Conn_no, timeout2); events = epoll_wait(Conn_epoll_fd, Conn_epoll_events, Conn_no, timeout2);
 
... ... int Conn_epoll_poll(const int timeout2, void (*cb)(struct Conn *C,
158 169 Log(11, "Processing %d event(s)...\n", events); Log(11, "Processing %d event(s)...\n", events);
159 170 i = events - 1; i = events - 1;
160 171 do { do {
161 cb((struct Conn *) Conn_epoll_events[i].data.ptr,
162 Conn_epoll_events[i].events);
172 slot = (unsigned int) Conn_epoll_events[i].data.ptr;
173 C = &Conns[slot];
174 cb(C, Conn_epoll_events[i].events);
163 175
164 176 i--; i--;
165 177 } while (i >= 0); } while (i >= 0);
 
... ... int Conn_epoll_poll(const int timeout2, void (*cb)(struct Conn *C,
172 184 */ */
173 185 void Conn_epoll_move_slot(const unsigned int dst, const unsigned int src) void Conn_epoll_move_slot(const unsigned int dst, const unsigned int src)
174 186 { {
175 unsigned int tmp;
176
177 /* Nothing to do - shut up the compiler */
178 tmp = dst;
179 tmp = src;
187 /* Because slot was changed, update epoll info */
188 Conn_epoll_chg_obj(&Conns[dst]);
180 189 } }
181 190
182 191 #endif #endif
Hints:
Before first commit, do not forget to setup your git environment:
git config --global user.name "your_name_here"
git config --global user.email "your@email_here"

Clone this repository using HTTP(S):
git clone https://rocketgit.com/user/catalinux/Conn

Clone this repository using ssh (do not forget to upload a key first):
git clone ssh://rocketgit@ssh.rocketgit.com/user/catalinux/Conn

Clone this repository using git:
git clone git://git.rocketgit.com/user/catalinux/Conn

You are allowed to anonymously push to this repository.
This means that your pushed commits will automatically be transformed into a merge request:
... clone the repository ...
... make some changes and some commits ...
git push origin main