1/* 2 * libwebsockets - small server side websockets and web server implementation 3 * 4 * Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com> 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining a copy 7 * of this software and associated documentation files (the "Software"), to 8 * deal in the Software without restriction, including without limitation the 9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 10 * sell copies of the Software, and to permit persons to whom the Software is 11 * furnished to do so, subject to the following conditions: 12 * 13 * The above copyright notice and this permission notice shall be included in 14 * all copies or substantial portions of the Software. 15 * 16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 22 * IN THE SOFTWARE. 23 */ 24 25#include "private-lib-core.h" 26 27int 28_lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa) 29{ 30#if !defined(LWS_WITH_EVENT_LIBS) 31 volatile struct lws_context_per_thread *vpt; 32#endif 33 struct lws_context_per_thread *pt; 34 struct lws_context *context; 35 int ret = 0, pa_events; 36 struct lws_pollfd *pfd; 37 int sampled_tid, tid; 38 39 if (!wsi) 40 return 0; 41 42 assert(wsi->position_in_fds_table == LWS_NO_FDS_POS || 43 wsi->position_in_fds_table >= 0); 44 45 if (wsi->position_in_fds_table == LWS_NO_FDS_POS) 46 return 0; 47 48 if (((volatile struct lws *)wsi)->handling_pollout && 49 !_and && _or == LWS_POLLOUT) { 50 /* 51 * Happening alongside service thread handling POLLOUT. 52 * The danger is when he is finished, he will disable POLLOUT, 53 * countermanding what we changed here. 54 * 55 * Instead of changing the fds, inform the service thread 56 * what happened, and ask it to leave POLLOUT active on exit 57 */ 58 ((volatile struct lws *)wsi)->leave_pollout_active = 1; 59 /* 60 * by definition service thread is not in poll wait, so no need 61 * to cancel service 62 */ 63 64 lwsl_wsi_debug(wsi, "using leave_pollout_active"); 65 66 return 0; 67 } 68 69 context = wsi->a.context; 70 pt = &context->pt[(int)wsi->tsi]; 71 72 assert(wsi->position_in_fds_table < (int)pt->fds_count); 73 74#if !defined(LWS_WITH_EVENT_LIBS) 75 /* 76 * This only applies when we use the default poll() event loop. 77 * 78 * BSD can revert pa->events at any time, when the kernel decides to 79 * exit from poll(). We can't protect against it using locking. 80 * 81 * Therefore we must check first if the service thread is in poll() 82 * wait; if so, we know we must be being called from a foreign thread, 83 * and we must keep a strictly ordered list of changes we made instead 84 * of trying to apply them, since when poll() exits, which may happen 85 * at any time it would revert our changes. 86 * 87 * The plat code will apply them when it leaves the poll() wait 88 * before doing anything else. 89 */ 90 91 vpt = (volatile struct lws_context_per_thread *)pt; 92 93 vpt->foreign_spinlock = 1; 94 lws_memory_barrier(); 95 96 if (vpt->inside_poll) { 97 struct lws_foreign_thread_pollfd *ftp, **ftp1; 98 /* 99 * We are certainly a foreign thread trying to change events 100 * while the service thread is in the poll() wait. 101 * 102 * Create a list of changes to be applied after poll() exit, 103 * instead of trying to apply them now. 104 */ 105 ftp = lws_malloc(sizeof(*ftp), "ftp"); 106 if (!ftp) { 107 vpt->foreign_spinlock = 0; 108 lws_memory_barrier(); 109 ret = -1; 110 goto bail; 111 } 112 113 ftp->_and = _and; 114 ftp->_or = _or; 115 ftp->fd_index = wsi->position_in_fds_table; 116 ftp->next = NULL; 117 118 lws_pt_lock(pt, __func__); 119 120 /* place at END of list to maintain order */ 121 ftp1 = (struct lws_foreign_thread_pollfd **) 122 &vpt->foreign_pfd_list; 123 while (*ftp1) 124 ftp1 = &((*ftp1)->next); 125 126 *ftp1 = ftp; 127 vpt->foreign_spinlock = 0; 128 lws_memory_barrier(); 129 130 lws_pt_unlock(pt); 131 132 lws_cancel_service_pt(wsi); 133 134 return 0; 135 } 136 137 vpt->foreign_spinlock = 0; 138 lws_memory_barrier(); 139#endif 140 141#if !defined(__linux__) && !defined(WIN32) 142 /* OSX couldn't see close on stdin pipe side otherwise; WSAPOLL 143 * blows up if we give it POLLHUP 144 */ 145 _or |= LWS_POLLHUP; 146#endif 147 148 pfd = &pt->fds[wsi->position_in_fds_table]; 149 pa->fd = wsi->desc.sockfd; 150 lwsl_wsi_debug(wsi, "fd %d events %d -> %d", pa->fd, pfd->events, 151 (pfd->events & ~_and) | _or); 152 pa->prev_events = pfd->events; 153 pa->events = pfd->events = (short)((pfd->events & ~_and) | _or); 154 155 if (wsi->mux_substream) 156 return 0; 157 158#if defined(LWS_WITH_EXTERNAL_POLL) 159 160 if (wsi->a.vhost && 161 wsi->a.vhost->protocols[0].callback(wsi, 162 LWS_CALLBACK_CHANGE_MODE_POLL_FD, 163 wsi->user_space, (void *)pa, 0)) { 164 ret = -1; 165 goto bail; 166 } 167#endif 168 169 if (context->event_loop_ops->io) { 170 if (_and & LWS_POLLIN) 171 context->event_loop_ops->io(wsi, 172 LWS_EV_STOP | LWS_EV_READ); 173 174 if (_or & LWS_POLLIN) 175 context->event_loop_ops->io(wsi, 176 LWS_EV_START | LWS_EV_READ); 177 178 if (_and & LWS_POLLOUT) 179 context->event_loop_ops->io(wsi, 180 LWS_EV_STOP | LWS_EV_WRITE); 181 182 if (_or & LWS_POLLOUT) 183 context->event_loop_ops->io(wsi, 184 LWS_EV_START | LWS_EV_WRITE); 185 } 186 187 /* 188 * if we changed something in this pollfd... 189 * ... and we're running in a different thread context 190 * than the service thread... 191 * ... and the service thread is waiting ... 192 * then cancel it to force a restart with our changed events 193 */ 194 pa_events = pa->prev_events != pa->events; 195 pfd->events = (short)pa->events; 196 197 if (pa_events) { 198 if (lws_plat_change_pollfd(context, wsi, pfd)) { 199 lwsl_wsi_info(wsi, "failed"); 200 ret = -1; 201 goto bail; 202 } 203 sampled_tid = pt->service_tid; 204 if (sampled_tid && wsi->a.vhost) { 205 tid = wsi->a.vhost->protocols[0].callback(wsi, 206 LWS_CALLBACK_GET_THREAD_ID, NULL, NULL, 0); 207 if (tid == -1) { 208 ret = -1; 209 goto bail; 210 } 211 if (tid != sampled_tid) 212 lws_cancel_service_pt(wsi); 213 } 214 } 215 216bail: 217 return ret; 218} 219 220#if defined(LWS_WITH_SERVER) 221/* 222 * Enable or disable listen sockets on this pt globally... 223 * it's modulated according to the pt having space for a new accept. 224 */ 225static void 226lws_accept_modulation(struct lws_context *context, 227 struct lws_context_per_thread *pt, int allow) 228{ 229 struct lws_vhost *vh = context->vhost_list; 230 struct lws_pollargs pa1; 231 232 while (vh) { 233 lws_start_foreach_dll(struct lws_dll2 *, d, 234 lws_dll2_get_head(&vh->listen_wsi)) { 235 struct lws *wsi = lws_container_of(d, struct lws, 236 listen_list); 237 238 _lws_change_pollfd(wsi, allow ? 0 : LWS_POLLIN, 239 allow ? LWS_POLLIN : 0, &pa1); 240 } lws_end_foreach_dll(d); 241 242 vh = vh->vhost_next; 243 } 244} 245#endif 246 247#if _LWS_ENABLED_LOGS & LLL_WARN 248void 249__dump_fds(struct lws_context_per_thread *pt, const char *s) 250{ 251 unsigned int n; 252 253 lwsl_cx_warn(pt->context, "fds_count %u, %s", pt->fds_count, s); 254 255 for (n = 0; n < pt->fds_count; n++) { 256 struct lws *wsi = wsi_from_fd(pt->context, pt->fds[n].fd); 257 258 lwsl_cx_warn(pt->context, " %d: fd %d, wsi %s, pos_in_fds: %d", 259 n + 1, pt->fds[n].fd, lws_wsi_tag(wsi), 260 wsi ? wsi->position_in_fds_table : -1); 261 } 262} 263#else 264#define __dump_fds(x, y) 265#endif 266 267int 268__insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi) 269{ 270#if defined(LWS_WITH_EXTERNAL_POLL) 271 struct lws_pollargs pa = { wsi->desc.sockfd, LWS_POLLIN, 0 }; 272#endif 273 struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; 274 int ret = 0; 275 276// __dump_fds(pt, "pre insert"); 277 278 lws_pt_assert_lock_held(pt); 279 280 lwsl_wsi_debug(wsi, "tsi=%d, sock=%d, pos-in-fds=%d", 281 wsi->tsi, wsi->desc.sockfd, pt->fds_count); 282 283 if ((unsigned int)pt->fds_count >= context->fd_limit_per_thread) { 284 lwsl_cx_err(context, "Too many fds (%d vs %d)", context->max_fds, 285 context->fd_limit_per_thread); 286 return 1; 287 } 288 289#if !defined(_WIN32) 290 if (!wsi->a.context->max_fds_unrelated_to_ulimit && 291 wsi->desc.sockfd - lws_plat_socket_offset() >= (int)context->max_fds) { 292 lwsl_cx_err(context, "Socket fd %d is too high (%d) offset %d", 293 wsi->desc.sockfd, context->max_fds, 294 lws_plat_socket_offset()); 295 return 1; 296 } 297#endif 298 299 assert(wsi); 300 301#if defined(LWS_WITH_NETLINK) 302 assert(wsi->event_pipe || wsi->a.vhost || wsi == pt->context->netlink); 303#else 304 assert(wsi->event_pipe || wsi->a.vhost); 305#endif 306 assert(lws_socket_is_valid(wsi->desc.sockfd)); 307 308#if defined(LWS_WITH_EXTERNAL_POLL) 309 310 if (wsi->a.vhost && 311 wsi->a.vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL, 312 wsi->user_space, (void *) &pa, 1)) 313 return -1; 314#endif 315 316 if (insert_wsi(context, wsi)) 317 return -1; 318 pt->count_conns++; 319 wsi->position_in_fds_table = (int)pt->fds_count; 320 321 pt->fds[wsi->position_in_fds_table].fd = wsi->desc.sockfd; 322 pt->fds[wsi->position_in_fds_table].events = LWS_POLLIN; 323#if defined(LWS_WITH_EXTERNAL_POLL) 324 pa.events = pt->fds[pt->fds_count].events; 325#endif 326 327 lws_plat_insert_socket_into_fds(context, wsi); 328 329#if defined(LWS_WITH_EXTERNAL_POLL) 330 331 /* external POLL support via protocol 0 */ 332 if (wsi->a.vhost && 333 wsi->a.vhost->protocols[0].callback(wsi, LWS_CALLBACK_ADD_POLL_FD, 334 wsi->user_space, (void *) &pa, 0)) 335 ret = -1; 336#endif 337#if defined(LWS_WITH_SERVER) 338 /* if no more room, defeat accepts on this service thread */ 339 if ((unsigned int)pt->fds_count == context->fd_limit_per_thread - 1) 340 lws_accept_modulation(context, pt, 0); 341#endif 342 343#if defined(LWS_WITH_EXTERNAL_POLL) 344 if (wsi->a.vhost && 345 wsi->a.vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL, 346 wsi->user_space, (void *)&pa, 1)) 347 ret = -1; 348#endif 349 350// __dump_fds(pt, "post insert"); 351 352 return ret; 353} 354 355/* requires pt lock */ 356 357int 358__remove_wsi_socket_from_fds(struct lws *wsi) 359{ 360 struct lws_context *context = wsi->a.context; 361#if defined(LWS_WITH_EXTERNAL_POLL) 362 struct lws_pollargs pa = { wsi->desc.sockfd, 0, 0 }; 363#endif 364 struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; 365 struct lws *end_wsi; 366 int v, m, ret = 0; 367 368 lws_pt_assert_lock_held(pt); 369 370// __dump_fds(pt, "pre remove"); 371 372#if !defined(_WIN32) 373 if (!wsi->a.context->max_fds_unrelated_to_ulimit && 374 wsi->desc.sockfd - lws_plat_socket_offset() > (int)context->max_fds) { 375 lwsl_wsi_err(wsi, "fd %d too high (%d)", 376 wsi->desc.sockfd, 377 context->max_fds); 378 379 return 1; 380 } 381#endif 382#if defined(LWS_WITH_EXTERNAL_POLL) 383 if (wsi->a.vhost && wsi->a.vhost->protocols && 384 wsi->a.vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL, 385 wsi->user_space, (void *)&pa, 1)) 386 return -1; 387#endif 388 389 __lws_same_vh_protocol_remove(wsi); 390 391 /* the guy who is to be deleted's slot index in pt->fds */ 392 m = wsi->position_in_fds_table; 393 394 /* these are the only valid possibilities for position_in_fds_table */ 395 assert(m == LWS_NO_FDS_POS || (m >= 0 && (unsigned int)m < pt->fds_count)); 396 397 if (context->event_loop_ops->io) 398 context->event_loop_ops->io(wsi, LWS_EV_STOP | LWS_EV_READ | 399 LWS_EV_WRITE); 400/* 401 lwsl_notice("%s: wsi=%s, skt=%d, fds pos=%d, end guy pos=%d, endfd=%d\n", 402 __func__, lws_wsi_tag(wsi), wsi->desc.sockfd, wsi->position_in_fds_table, 403 pt->fds_count, pt->fds[pt->fds_count - 1].fd); */ 404 405 if (m != LWS_NO_FDS_POS) { 406 char fixup = 0; 407 408 assert(pt->fds_count && (unsigned int)m != pt->fds_count); 409 410 /* deletion guy's lws_lookup entry needs nuking */ 411 delete_from_fd(context, wsi->desc.sockfd); 412 413 if ((unsigned int)m != pt->fds_count - 1) { 414 /* have the last guy take up the now vacant slot */ 415 pt->fds[m] = pt->fds[pt->fds_count - 1]; 416 fixup = 1; 417 } 418 419 pt->fds[pt->fds_count - 1].fd = -1; 420 421 /* this decrements pt->fds_count */ 422 lws_plat_delete_socket_from_fds(context, wsi, m); 423 pt->count_conns--; 424 if (fixup) { 425 v = (int) pt->fds[m].fd; 426 /* old end guy's "position in fds table" is now the 427 * deletion guy's old one */ 428 end_wsi = wsi_from_fd(context, v); 429 if (!end_wsi) { 430 lwsl_wsi_err(wsi, "no wsi for fd %d pos %d, " 431 "pt->fds_count=%d", 432 (int)pt->fds[m].fd, m, 433 pt->fds_count); 434 // assert(0); 435 } else 436 end_wsi->position_in_fds_table = m; 437 } 438 439 /* removed wsi has no position any more */ 440 wsi->position_in_fds_table = LWS_NO_FDS_POS; 441 442#if defined(LWS_WITH_EXTERNAL_POLL) 443 /* remove also from external POLL support via protocol 0 */ 444 if (lws_socket_is_valid(wsi->desc.sockfd) && wsi->a.vhost && 445 wsi->a.vhost->protocols[0].callback(wsi, 446 LWS_CALLBACK_DEL_POLL_FD, 447 wsi->user_space, 448 (void *) &pa, 0)) 449 ret = -1; 450#endif 451 } 452 453#if defined(LWS_WITH_SERVER) 454 if (!context->being_destroyed && 455 /* if this made some room, accept connects on this thread */ 456 (unsigned int)pt->fds_count < context->fd_limit_per_thread - 1) 457 lws_accept_modulation(context, pt, 1); 458#endif 459 460#if defined(LWS_WITH_EXTERNAL_POLL) 461 if (wsi->a.vhost && 462 wsi->a.vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL, 463 wsi->user_space, (void *) &pa, 1)) 464 ret = -1; 465#endif 466 467// __dump_fds(pt, "post remove"); 468 469 return ret; 470} 471 472int 473__lws_change_pollfd(struct lws *wsi, int _and, int _or) 474{ 475 struct lws_context *context; 476 struct lws_pollargs pa; 477 int ret = 0; 478 479 if (!wsi || (!wsi->a.protocol && !wsi->event_pipe) || 480 wsi->position_in_fds_table == LWS_NO_FDS_POS) 481 return 0; 482 483 context = lws_get_context(wsi); 484 if (!context) 485 return 1; 486 487#if defined(LWS_WITH_EXTERNAL_POLL) 488 if (wsi->a.vhost && 489 wsi->a.vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL, 490 wsi->user_space, (void *) &pa, 0)) 491 return -1; 492#endif 493 494 ret = _lws_change_pollfd(wsi, _and, _or, &pa); 495 496#if defined(LWS_WITH_EXTERNAL_POLL) 497 if (wsi->a.vhost && 498 wsi->a.vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL, 499 wsi->user_space, (void *) &pa, 0)) 500 ret = -1; 501#endif 502 503 return ret; 504} 505 506int 507lws_change_pollfd(struct lws *wsi, int _and, int _or) 508{ 509 struct lws_context_per_thread *pt; 510 int ret = 0; 511 512 pt = &wsi->a.context->pt[(int)wsi->tsi]; 513 514 lws_pt_lock(pt, __func__); 515 ret = __lws_change_pollfd(wsi, _and, _or); 516 lws_pt_unlock(pt); 517 518 return ret; 519} 520 521int 522lws_callback_on_writable(struct lws *wsi) 523{ 524 struct lws *w = wsi; 525 526 if (lwsi_state(wsi) == LRS_SHUTDOWN) 527 return 0; 528 529 if (wsi->socket_is_permanently_unusable) 530 return 0; 531 532 if (lws_rops_fidx(wsi->role_ops, LWS_ROPS_callback_on_writable)) { 533 int q = lws_rops_func_fidx(wsi->role_ops, 534 LWS_ROPS_callback_on_writable). 535 callback_on_writable(wsi); 536 if (q) 537 return 1; 538 w = lws_get_network_wsi(wsi); 539 } else 540 if (w->position_in_fds_table == LWS_NO_FDS_POS) { 541 lwsl_wsi_debug(wsi, "failed to find socket %d", 542 wsi->desc.sockfd); 543 return -1; 544 } 545 546 if (__lws_change_pollfd(w, 0, LWS_POLLOUT)) 547 return -1; 548 549 return 1; 550} 551 552 553/* 554 * stitch protocol choice into the vh protocol linked list 555 * We always insert ourselves at the start of the list 556 * 557 * X <-> B 558 * X <-> pAn <-> pB 559 * 560 * Illegal to attach more than once without detach inbetween 561 */ 562void 563lws_same_vh_protocol_insert(struct lws *wsi, int n) 564{ 565 lws_context_lock(wsi->a.context, __func__); 566 lws_vhost_lock(wsi->a.vhost); 567 568 lws_dll2_remove(&wsi->same_vh_protocol); 569 lws_dll2_add_head(&wsi->same_vh_protocol, 570 &wsi->a.vhost->same_vh_protocol_owner[n]); 571 572 wsi->bound_vhost_index = (uint8_t)n; 573 574 lws_vhost_unlock(wsi->a.vhost); 575 lws_context_unlock(wsi->a.context); 576} 577 578void 579__lws_same_vh_protocol_remove(struct lws *wsi) 580{ 581 if (wsi->a.vhost && wsi->a.vhost->same_vh_protocol_owner) 582 lws_dll2_remove(&wsi->same_vh_protocol); 583} 584 585void 586lws_same_vh_protocol_remove(struct lws *wsi) 587{ 588 if (!wsi->a.vhost) 589 return; 590 591 lws_context_lock(wsi->a.context, __func__); 592 lws_vhost_lock(wsi->a.vhost); 593 594 __lws_same_vh_protocol_remove(wsi); 595 596 lws_vhost_unlock(wsi->a.vhost); 597 lws_context_unlock(wsi->a.context); 598} 599 600 601int 602lws_callback_on_writable_all_protocol_vhost(const struct lws_vhost *vhost, 603 const struct lws_protocols *protocol) 604{ 605 struct lws *wsi; 606 int n; 607 608 if (protocol < vhost->protocols || 609 protocol >= (vhost->protocols + vhost->count_protocols)) { 610 lwsl_vhost_err((struct lws_vhost *)vhost, 611 "protocol %p is not from vhost %p (%p - %p)", 612 protocol, vhost->protocols, vhost, 613 (vhost->protocols + vhost->count_protocols)); 614 615 return -1; 616 } 617 618 n = (int)(protocol - vhost->protocols); 619 620 lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1, 621 lws_dll2_get_head(&vhost->same_vh_protocol_owner[n])) { 622 wsi = lws_container_of(d, struct lws, same_vh_protocol); 623 624 assert(wsi->a.protocol == protocol); 625 lws_callback_on_writable(wsi); 626 627 } lws_end_foreach_dll_safe(d, d1); 628 629 return 0; 630} 631 632int 633lws_callback_on_writable_all_protocol(const struct lws_context *context, 634 const struct lws_protocols *protocol) 635{ 636 struct lws_vhost *vhost; 637 int n; 638 639 if (!context) 640 return 0; 641 642 vhost = context->vhost_list; 643 644 while (vhost) { 645 for (n = 0; n < vhost->count_protocols; n++) 646 if (protocol->callback == 647 vhost->protocols[n].callback && 648 !strcmp(protocol->name, vhost->protocols[n].name)) 649 break; 650 if (n != vhost->count_protocols) 651 lws_callback_on_writable_all_protocol_vhost( 652 vhost, &vhost->protocols[n]); 653 654 vhost = vhost->vhost_next; 655 } 656 657 return 0; 658} 659