1/* 2 * lws System Message Distribution 3 * 4 * Copyright (C) 2019 - 2021 Andy Green <andy@warmcat.com> 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining a copy 7 * of this software and associated documentation files (the "Software"), to 8 * deal in the Software without restriction, including without limitation the 9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 10 * sell copies of the Software, and to permit persons to whom the Software is 11 * furnished to do so, subject to the following conditions: 12 * 13 * The above copyright notice and this permission notice shall be included in 14 * all copies or substantial portions of the Software. 15 * 16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 22 * IN THE SOFTWARE. 23 */ 24 25#include "private-lib-core.h" 26#include <assert.h> 27 28/* comment me to remove extra debug and sanity checks */ 29// #define LWS_SMD_DEBUG 30 31 32#if defined(LWS_SMD_DEBUG) 33#define lwsl_smd lwsl_notice 34#else 35#define lwsl_smd(_s, ...) 36#endif 37 38void * 39lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len) 40{ 41 lws_smd_msg_t *msg; 42 43 /* only allow it if someone wants to consume this class of event */ 44 45 if (!(ctx->smd._class_filter & _class)) { 46 lwsl_cx_info(ctx, "rejecting class 0x%x as no participant wants", 47 (unsigned int)_class); 48 return NULL; 49 } 50 51 assert(len <= LWS_SMD_MAX_PAYLOAD); 52 53 54 /* 55 * If SS configured, over-allocate LWS_SMD_SS_RX_HEADER_LEN behind 56 * payload, ie, msg_t (gap LWS_SMD_SS_RX_HEADER_LEN) payload 57 */ 58 msg = lws_malloc(sizeof(*msg) + LWS_SMD_SS_RX_HEADER_LEN_EFF + len, 59 __func__); 60 if (!msg) 61 return NULL; 62 63 memset(msg, 0, sizeof(*msg)); 64 msg->timestamp = lws_now_usecs(); 65 msg->length = (uint16_t)len; 66 msg->_class = _class; 67 68 return ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF; 69} 70 71void 72lws_smd_msg_free(void **ppay) 73{ 74 lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)*ppay) - 75 LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg)); 76 77 /* if SS configured, actual alloc is LWS_SMD_SS_RX_HEADER_LEN behind */ 78 lws_free(msg); 79 *ppay = NULL; 80} 81 82#if defined(LWS_SMD_DEBUG) 83static void 84lws_smd_dump(lws_smd_t *smd) 85{ 86 int n = 1; 87 88 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, 89 smd->owner_messages.head) { 90 lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); 91 92 lwsl_info(" msg %d: %p: ref %d, lat %dms, cls: 0x%x, len %u: '%s'\n", 93 n++, msg, msg->refcount, 94 (unsigned int)((lws_now_usecs() - msg->timestamp) / 1000), 95 msg->length, msg->_class, 96 (const char *)&msg[1] + LWS_SMD_SS_RX_HEADER_LEN_EFF); 97 98 } lws_end_foreach_dll_safe(p, p1); 99 100 n = 1; 101 lws_start_foreach_dll(struct lws_dll2 *, p, smd->owner_peers.head) { 102 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); 103 104 lwsl_info(" peer %d: %p: tail: %p, filt 0x%x\n", 105 n++, pr, pr->tail, pr->_class_filter); 106 } lws_end_foreach_dll(p); 107} 108#endif 109 110static int 111_lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t *pr, lws_smd_msg_t *msg) 112{ 113 return !!(msg->_class & pr->_class_filter); 114} 115 116/* 117 * Figure out what to set the initial refcount for the message to 118 */ 119 120static int 121_lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg, 122 struct lws_smd_peer *exc) 123{ 124 struct lws_context *ctx = lws_container_of(smd, struct lws_context, smd); 125 int interested = 0; 126 127 lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) { 128 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); 129 130 if (pr != exc && _lws_smd_msg_peer_interested_in_msg(pr, msg)) 131 /* 132 * This peer wants to consume it 133 */ 134 interested++; 135 136 } lws_end_foreach_dll(p); 137 138 return interested; 139} 140 141static int 142_lws_smd_class_mask_union(lws_smd_t *smd) 143{ 144 uint32_t mask = 0; 145 146 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, 147 smd->owner_peers.head) { 148 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); 149 150 mask |= pr->_class_filter; 151 152 } lws_end_foreach_dll_safe(p, p1); 153 154 smd->_class_filter = mask; 155 156 return 0; 157} 158 159/* Call with message lock held */ 160 161static void 162_lws_smd_msg_destroy(struct lws_context *cx, lws_smd_t *smd, lws_smd_msg_t *msg) 163{ 164 /* 165 * We think we gave the message to everyone and can destroy it. 166 * Sanity check that no peer holds a pointer to this guy 167 */ 168 169 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, 170 smd->owner_peers.head) { 171 lws_smd_peer_t *xpr = lws_container_of(p, lws_smd_peer_t, list); 172 173 if (xpr->tail == msg) { 174 lwsl_cx_err(cx, "peer %p has msg %p " 175 "we are about to destroy as tail", xpr, msg); 176#if !defined(LWS_PLAT_FREERTOS) 177 assert(0); 178#endif 179 } 180 181 } lws_end_foreach_dll_safe(p, p1); 182 183 /* 184 * We have fully delivered the message now, it 185 * can be unlinked and destroyed 186 */ 187 lwsl_cx_info(cx, "destroy msg %p", msg); 188 lws_dll2_remove(&msg->list); 189 lws_free(msg); 190} 191 192/* 193 * This is wanting to be threadsafe, limiting the apis we can call 194 */ 195 196int 197_lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc) 198{ 199 lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) - 200 LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg)); 201 202 if (ctx->smd.owner_messages.count >= ctx->smd_queue_depth) { 203 lwsl_cx_warn(ctx, "rejecting message on queue depth %d", 204 (int)ctx->smd.owner_messages.count); 205 /* reject the message due to max queue depth reached */ 206 return 1; 207 } 208 209 if (!ctx->smd.delivering && 210 lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */ 211 return 1; /* For Coverity */ 212 213 if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */ 214 goto bail; 215 216 msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested( 217 &ctx->smd, msg, exc); 218 if (!msg->refcount) { 219 /* possible, condsidering exc and no other participants */ 220 lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */ 221 222 lws_free(msg); 223 if (!ctx->smd.delivering) 224 lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ 225 226 return 0; 227 } 228 229 msg->exc = exc; 230 231 /* let's add him on the queue... */ 232 233 lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages); 234 235 /* 236 * Any peer with no active tail needs to check our class to see if we 237 * should become his tail 238 */ 239 240 lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) { 241 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); 242 243 if (pr != exc && 244 !pr->tail && _lws_smd_msg_peer_interested_in_msg(pr, msg)) { 245 pr->tail = msg; 246 /* tail message has to actually be of interest to the peer */ 247 assert(!pr->tail || (pr->tail->_class & pr->_class_filter)); 248 } 249 250 } lws_end_foreach_dll(p); 251 252#if defined(LWS_SMD_DEBUG) 253 lwsl_smd("%s: added %p (refc %u) depth now %d\n", __func__, 254 msg, msg->refcount, ctx->smd.owner_messages.count); 255 lws_smd_dump(&ctx->smd); 256#endif 257 258 lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */ 259 260bail: 261 if (!ctx->smd.delivering) 262 lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ 263 264 /* we may be happening from another thread context */ 265 lws_cancel_service(ctx); 266 267 return 0; 268} 269 270/* 271 * This is wanting to be threadsafe, limiting the apis we can call 272 */ 273 274int 275lws_smd_msg_send(struct lws_context *ctx, void *pay) 276{ 277 return _lws_smd_msg_send(ctx, pay, NULL); 278} 279 280/* 281 * This is wanting to be threadsafe, limiting the apis we can call 282 */ 283 284int 285lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class, 286 const char *format, ...) 287{ 288 lws_smd_msg_t *msg; 289 va_list ap; 290 void *p; 291 int n; 292 293 if (!(ctx->smd._class_filter & _class)) 294 /* 295 * There's nobody interested in messages of this class atm. 296 * Don't bother generating it, and act like all is well. 297 */ 298 return 0; 299 300 va_start(ap, format); 301 n = vsnprintf(NULL, 0, format, ap); 302 va_end(ap); 303 if (n > LWS_SMD_MAX_PAYLOAD) 304 /* too large to send */ 305 return 1; 306 307 p = lws_smd_msg_alloc(ctx, _class, (size_t)n + 2); 308 if (!p) 309 return 1; 310 msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF - 311 sizeof(*msg)); 312 msg->length = (uint16_t)n; 313 va_start(ap, format); 314 vsnprintf((char *)p, (unsigned int)n + 2, format, ap); 315 va_end(ap); 316 317 /* 318 * locks taken and released in here 319 */ 320 321 if (lws_smd_msg_send(ctx, p)) { 322 lws_smd_msg_free(&p); 323 return 1; 324 } 325 326 return 0; 327} 328 329#if defined(LWS_WITH_SECURE_STREAMS) 330int 331lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len, 332 lws_smd_class_t _class, const char *format, ...) 333{ 334 char *content = (char *)buf + LWS_SMD_SS_RX_HEADER_LEN; 335 va_list ap; 336 int n; 337 338 if (*len < LWS_SMD_SS_RX_HEADER_LEN) 339 return 1; 340 341 lws_ser_wu64be(buf, _class); 342 lws_ser_wu64be(buf + 8, 0); /* valgrind notices uninitialized if left */ 343 344 va_start(ap, format); 345 n = vsnprintf(content, (*len) - LWS_SMD_SS_RX_HEADER_LEN, format, ap); 346 va_end(ap); 347 348 if (n > LWS_SMD_MAX_PAYLOAD || 349 (unsigned int)n > (*len) - LWS_SMD_SS_RX_HEADER_LEN) 350 /* too large to send */ 351 return 1; 352 353 *len = LWS_SMD_SS_RX_HEADER_LEN + (unsigned int)n; 354 355 lwsl_info("%s: %s send cl 0x%x, len %u\n", __func__, tag, (unsigned int)_class, 356 (unsigned int)n); 357 358 return 0; 359} 360 361/* 362 * This is a helper that user rx handler for LWS_SMD_STREAMTYPENAME SS can 363 * call through to with the payload it received from the proxy. It will then 364 * forward the recieved SMD message to all local (same-context) participants 365 * that are interested in that class (except ones with callback skip_cb, so 366 * we don't loop). 367 */ 368 369static int 370_lws_smd_ss_rx_forward(struct lws_context *ctx, const char *tag, 371 struct lws_smd_peer *pr, const uint8_t *buf, size_t len) 372{ 373 lws_smd_class_t _class; 374 lws_smd_msg_t *msg; 375 void *p; 376 377 if (len < LWS_SMD_SS_RX_HEADER_LEN_EFF) 378 return 1; 379 380 if (len >= LWS_SMD_MAX_PAYLOAD + LWS_SMD_SS_RX_HEADER_LEN_EFF) 381 return 1; 382 383 _class = (lws_smd_class_t)lws_ser_ru64be(buf); 384 385 if (_class == LWSSMDCL_METRICS) { 386 387 } 388 389 /* only locally forward messages that we care about in this process */ 390 391 if (!(ctx->smd._class_filter & _class)) 392 /* 393 * There's nobody interested in messages of this class atm. 394 * Don't bother generating it, and act like all is well. 395 */ 396 return 0; 397 398 p = lws_smd_msg_alloc(ctx, _class, len); 399 if (!p) 400 return 1; 401 402 msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF - 403 sizeof(*msg)); 404 msg->length = (uint16_t)(len - LWS_SMD_SS_RX_HEADER_LEN_EFF); 405 /* adopt the original source timestamp, not time we forwarded it */ 406 msg->timestamp = (lws_usec_t)lws_ser_ru64be(buf + 8); 407 408 /* copy the message payload in */ 409 memcpy(p, buf + LWS_SMD_SS_RX_HEADER_LEN_EFF, msg->length); 410 411 /* 412 * locks taken and released in here 413 */ 414 415 if (_lws_smd_msg_send(ctx, p, pr)) { 416 /* we couldn't send it after all that... */ 417 lws_smd_msg_free(&p); 418 419 return 1; 420 } 421 422 lwsl_info("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__, 423 tag, (unsigned int)_class, msg->length, 424 (unsigned long long)msg->timestamp); 425 426 return 0; 427} 428 429int 430lws_smd_ss_rx_forward(void *ss_user, const uint8_t *buf, size_t len) 431{ 432 struct lws_ss_handle *h = (struct lws_ss_handle *) 433 (((char *)ss_user) - sizeof(*h)); 434 struct lws_context *ctx = lws_ss_get_context(h); 435 436 return _lws_smd_ss_rx_forward(ctx, lws_ss_tag(h), h->u.smd.smd_peer, buf, len); 437} 438 439#if defined(LWS_WITH_SECURE_STREAMS_PROXY_API) 440int 441lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len) 442{ 443 struct lws_sspc_handle *h = (struct lws_sspc_handle *) 444 (((char *)ss_user) - sizeof(*h)); 445 struct lws_context *ctx = lws_sspc_get_context(h); 446 447 return _lws_smd_ss_rx_forward(ctx, lws_sspc_tag(h), NULL, buf, len); 448} 449#endif 450 451#endif 452 453/* 454 * Peers that deregister need to adjust the refcount of messages they would 455 * have been interested in, but didn't take delivery of yet 456 */ 457 458static void 459_lws_smd_peer_destroy(lws_smd_peer_t *pr) 460{ 461 lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, 462 owner_peers); 463 464 if (lws_mutex_lock(smd->lock_messages)) /* +++++++++ messages */ 465 return; /* For Coverity */ 466 467 lws_dll2_remove(&pr->list); 468 469 /* 470 * We take the approach to adjust the refcount of every would-have-been 471 * delivered message we were interested in 472 */ 473 474 while (pr->tail) { 475 476 lws_smd_msg_t *m1 = lws_container_of(pr->tail->list.next, 477 lws_smd_msg_t, list); 478 479 if (_lws_smd_msg_peer_interested_in_msg(pr, pr->tail)) { 480 if (!--pr->tail->refcount) 481 _lws_smd_msg_destroy(pr->ctx, smd, pr->tail); 482 } 483 484 pr->tail = m1; 485 } 486 487 lws_free(pr); 488 489 lws_mutex_unlock(smd->lock_messages); /* messages ------- */ 490} 491 492static lws_smd_msg_t * 493_lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr) 494{ 495 lws_dll2_t *tail = &pr->tail->list; 496 lws_smd_msg_t *msg; 497 498 do { 499 tail = tail->next; 500 if (!tail) 501 return NULL; 502 503 msg = lws_container_of(tail, lws_smd_msg_t, list); 504 if (msg->exc != pr && 505 _lws_smd_msg_peer_interested_in_msg(pr, msg)) 506 return msg; 507 } while (1); 508 509 return NULL; 510} 511 512/* 513 * Delivers only one message to the peer and advances the tail, or sets to NULL 514 * if no more filtered queued messages. Returns nonzero if tail non-NULL. 515 * 516 * For Proxied SS, only asks for writeable and does not advance or change the 517 * tail. 518 * 519 * This is done so if multiple messages queued, we don't get a situation where 520 * one participant gets them all spammed, then the next etc. Instead they are 521 * delivered round-robin. 522 * 523 * Requires peer lock, may take message lock 524 */ 525 526static int 527_lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr) 528{ 529 lws_smd_msg_t *msg; 530 531 if (!pr->tail) 532 return 0; 533 534 msg = lws_container_of(pr->tail, lws_smd_msg_t, list); 535 536 537 lwsl_cx_info(ctx, "deliver cl 0x%x, len %d, refc %d, to peer %p", 538 (unsigned int)msg->_class, (int)msg->length, 539 (int)msg->refcount, pr); 540 541 pr->cb(pr->opaque, msg->_class, msg->timestamp, 542 ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF, 543 (size_t)msg->length); 544 545 assert(msg->refcount); 546 547 /* 548 * If there is one, move forward to the next queued 549 * message that meets the filters of this peer 550 */ 551 pr->tail = _lws_smd_msg_next_matching_filter(pr); 552 553 /* tail message has to actually be of interest to the peer */ 554 assert(!pr->tail || (pr->tail->_class & pr->_class_filter)); 555 556 if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++ messages */ 557 return 1; /* For Coverity */ 558 559 if (!--msg->refcount) 560 _lws_smd_msg_destroy(ctx, &ctx->smd, msg); 561 lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */ 562 563 return !!pr->tail; 564} 565 566/* 567 * Called when the event loop could deliver messages synchronously, eg, on 568 * entry to idle 569 */ 570 571int 572lws_smd_msg_distribute(struct lws_context *ctx) 573{ 574 char more; 575 576 /* commonly, no messages and nothing to do... */ 577 578 if (!ctx->smd.owner_messages.count) 579 return 0; 580 581 ctx->smd.delivering = 1; 582 583 do { 584 more = 0; 585 if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */ 586 return 1; /* For Coverity */ 587 588 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, 589 ctx->smd.owner_peers.head) { 590 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); 591 592 more = (char)(more | !!_lws_smd_msg_deliver_peer(ctx, pr)); 593 594 } lws_end_foreach_dll_safe(p, p1); 595 596 lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ 597 } while (more); 598 599 ctx->smd.delivering = 0; 600 601 return 0; 602} 603 604struct lws_smd_peer * 605lws_smd_register(struct lws_context *ctx, void *opaque, int flags, 606 lws_smd_class_t _class_filter, lws_smd_notification_cb_t cb) 607{ 608 lws_smd_peer_t *pr = lws_zalloc(sizeof(*pr), __func__); 609 610 if (!pr) 611 return NULL; 612 613 pr->cb = cb; 614 pr->opaque = opaque; 615 pr->_class_filter = _class_filter; 616 pr->ctx = ctx; 617 618 if (!ctx->smd.delivering && 619 lws_mutex_lock(ctx->smd.lock_peers)) { /* +++++++++++++++ peers */ 620 lws_free(pr); 621 return NULL; /* For Coverity */ 622 } 623 624 /* 625 * Let's lock the message list before adding this peer... because... 626 */ 627 628 if (lws_mutex_lock(ctx->smd.lock_messages)) { /* +++++++++ messages */ 629 lws_free(pr); 630 pr = NULL; 631 goto bail1; /* For Coverity */ 632 } 633 634 lws_dll2_add_tail(&pr->list, &ctx->smd.owner_peers); 635 636 /* update the global class mask union to account for new peer mask */ 637 _lws_smd_class_mask_union(&ctx->smd); 638 639 /* 640 * Now there's a new peer added, any messages we have stashed will try 641 * to deliver to this guy too, if he's interested in that class. So we 642 * have to update the message refcounts for queued messages-he's- 643 * interested-in accordingly. 644 */ 645 646 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, 647 ctx->smd.owner_messages.head) { 648 lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); 649 650 if (_lws_smd_msg_peer_interested_in_msg(pr, msg)) 651 msg->refcount++; 652 653 } lws_end_foreach_dll_safe(p, p1); 654 655 /* ... ok we are done adding the peer */ 656 657 lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */ 658 659 lwsl_cx_info(ctx, "peer %p (count %u) registered", pr, 660 (unsigned int)ctx->smd.owner_peers.count); 661 662bail1: 663 if (!ctx->smd.delivering) 664 lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ 665 666 return pr; 667} 668 669void 670lws_smd_unregister(struct lws_smd_peer *pr) 671{ 672 lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers); 673 674 if (!smd->delivering && 675 lws_mutex_lock(smd->lock_peers)) /* +++++++++++++++++++ peers */ 676 return; /* For Coverity */ 677 lwsl_cx_notice(pr->ctx, "destroying peer %p", pr); 678 _lws_smd_peer_destroy(pr); 679 if (!smd->delivering) 680 lws_mutex_unlock(smd->lock_peers); /* ----------------- peers */ 681} 682 683int 684lws_smd_message_pending(struct lws_context *ctx) 685{ 686 int ret = 1; 687 688 /* 689 * First cheaply check the common case no messages pending, so there's 690 * definitely nothing for this tsi or anything else 691 */ 692 693 if (!ctx->smd.owner_messages.count) 694 return 0; 695 696 /* 697 * If there are any messages, check their age and expire ones that 698 * have been hanging around too long 699 */ 700 701 if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++++++++++ peers */ 702 return 1; /* For Coverity */ 703 if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */ 704 goto bail; /* For Coverity */ 705 706 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, 707 ctx->smd.owner_messages.head) { 708 lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); 709 710 if ((lws_now_usecs() - msg->timestamp) > ctx->smd_ttl_us) { 711 lwsl_cx_warn(ctx, "timing out queued message %p", 712 msg); 713 714 /* 715 * We're forcibly yanking this guy, we can expect that 716 * there might be peers that point to it as their tail. 717 * 718 * In that case, move their tails on to the next guy 719 * they are interested in, if any. 720 */ 721 722 lws_start_foreach_dll_safe(struct lws_dll2 *, pp, pp1, 723 ctx->smd.owner_peers.head) { 724 lws_smd_peer_t *pr = lws_container_of(pp, 725 lws_smd_peer_t, list); 726 727 if (pr->tail == msg) 728 pr->tail = _lws_smd_msg_next_matching_filter(pr); 729 730 } lws_end_foreach_dll_safe(pp, pp1); 731 732 /* 733 * No peer should fall foul of the peer tail checks 734 * when destroying the message now. 735 */ 736 737 _lws_smd_msg_destroy(ctx, &ctx->smd, msg); 738 } 739 } lws_end_foreach_dll_safe(p, p1); 740 741 lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */ 742 743 /* 744 * Walk the peer list 745 */ 746 747 lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) { 748 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); 749 750 if (pr->tail) 751 goto bail; 752 753 } lws_end_foreach_dll(p); 754 755 /* 756 * There's no message pending that we need to handle 757 */ 758 759 ret = 0; 760 761bail: 762 lws_mutex_unlock(ctx->smd.lock_peers); /* --------------------- peers */ 763 764 return ret; 765} 766 767int 768_lws_smd_destroy(struct lws_context *ctx) 769{ 770 /* stop any message creation */ 771 772 ctx->smd._class_filter = 0; 773 774 /* 775 * Walk the message list, destroying them 776 */ 777 778 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, 779 ctx->smd.owner_messages.head) { 780 lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); 781 782 lws_dll2_remove(&msg->list); 783 lws_free(msg); 784 785 } lws_end_foreach_dll_safe(p, p1); 786 787 /* 788 * Walk the peer list, destroying them 789 */ 790 791 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, 792 ctx->smd.owner_peers.head) { 793 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); 794 795 pr->tail = NULL; /* we just nuked all the messages, ignore */ 796 _lws_smd_peer_destroy(pr); 797 798 } lws_end_foreach_dll_safe(p, p1); 799 800 lws_mutex_destroy(ctx->smd.lock_messages); 801 lws_mutex_destroy(ctx->smd.lock_peers); 802 803 return 0; 804} 805