xref: /third_party/libwebsockets/lib/system/smd/smd.c (revision d4afb5ce)
1/*
2 * lws System Message Distribution
3 *
4 * Copyright (C) 2019 - 2021 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25#include "private-lib-core.h"
26#include <assert.h>
27
28/* comment me to remove extra debug and sanity checks */
29// #define LWS_SMD_DEBUG
30
31
32#if defined(LWS_SMD_DEBUG)
33#define lwsl_smd lwsl_notice
34#else
35#define lwsl_smd(_s, ...)
36#endif
37
38void *
39lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len)
40{
41	lws_smd_msg_t *msg;
42
43	/* only allow it if someone wants to consume this class of event */
44
45	if (!(ctx->smd._class_filter & _class)) {
46		lwsl_cx_info(ctx, "rejecting class 0x%x as no participant wants",
47				(unsigned int)_class);
48		return NULL;
49	}
50
51	assert(len <= LWS_SMD_MAX_PAYLOAD);
52
53
54	/*
55	 * If SS configured, over-allocate LWS_SMD_SS_RX_HEADER_LEN behind
56	 * payload, ie,  msg_t (gap LWS_SMD_SS_RX_HEADER_LEN) payload
57	 */
58	msg = lws_malloc(sizeof(*msg) + LWS_SMD_SS_RX_HEADER_LEN_EFF + len,
59			 __func__);
60	if (!msg)
61		return NULL;
62
63	memset(msg, 0, sizeof(*msg));
64	msg->timestamp = lws_now_usecs();
65	msg->length = (uint16_t)len;
66	msg->_class = _class;
67
68	return ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF;
69}
70
71void
72lws_smd_msg_free(void **ppay)
73{
74	lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)*ppay) -
75				LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
76
77	/* if SS configured, actual alloc is LWS_SMD_SS_RX_HEADER_LEN behind */
78	lws_free(msg);
79	*ppay = NULL;
80}
81
82#if defined(LWS_SMD_DEBUG)
83static void
84lws_smd_dump(lws_smd_t *smd)
85{
86	int n = 1;
87
88	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
89				   smd->owner_messages.head) {
90		lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
91
92		lwsl_info(" msg %d: %p: ref %d, lat %dms, cls: 0x%x, len %u: '%s'\n",
93			    n++, msg, msg->refcount,
94			    (unsigned int)((lws_now_usecs() - msg->timestamp) / 1000),
95			    msg->length, msg->_class,
96			    (const char *)&msg[1] + LWS_SMD_SS_RX_HEADER_LEN_EFF);
97
98	} lws_end_foreach_dll_safe(p, p1);
99
100	n = 1;
101	lws_start_foreach_dll(struct lws_dll2 *, p, smd->owner_peers.head) {
102		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
103
104		lwsl_info(" peer %d: %p: tail: %p, filt 0x%x\n",
105			    n++, pr, pr->tail, pr->_class_filter);
106	} lws_end_foreach_dll(p);
107}
108#endif
109
110static int
111_lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t *pr, lws_smd_msg_t *msg)
112{
113    return !!(msg->_class & pr->_class_filter);
114}
115
116/*
117 * Figure out what to set the initial refcount for the message to
118 */
119
120static int
121_lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg,
122				     struct lws_smd_peer *exc)
123{
124	struct lws_context *ctx = lws_container_of(smd, struct lws_context, smd);
125	int interested = 0;
126
127	lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
128		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
129
130		if (pr != exc && _lws_smd_msg_peer_interested_in_msg(pr, msg))
131			/*
132			 * This peer wants to consume it
133			 */
134			interested++;
135
136	} lws_end_foreach_dll(p);
137
138	return interested;
139}
140
141static int
142_lws_smd_class_mask_union(lws_smd_t *smd)
143{
144	uint32_t mask = 0;
145
146	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
147				   smd->owner_peers.head) {
148		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
149
150		mask |= pr->_class_filter;
151
152	} lws_end_foreach_dll_safe(p, p1);
153
154	smd->_class_filter = mask;
155
156	return 0;
157}
158
159/* Call with message lock held */
160
161static void
162_lws_smd_msg_destroy(struct lws_context *cx, lws_smd_t *smd, lws_smd_msg_t *msg)
163{
164	/*
165	 * We think we gave the message to everyone and can destroy it.
166	 * Sanity check that no peer holds a pointer to this guy
167	 */
168
169	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
170				   smd->owner_peers.head) {
171		lws_smd_peer_t *xpr = lws_container_of(p, lws_smd_peer_t, list);
172
173		if (xpr->tail == msg) {
174			lwsl_cx_err(cx, "peer %p has msg %p "
175				 "we are about to destroy as tail", xpr, msg);
176#if !defined(LWS_PLAT_FREERTOS)
177			assert(0);
178#endif
179		}
180
181	} lws_end_foreach_dll_safe(p, p1);
182
183	/*
184	 * We have fully delivered the message now, it
185	 * can be unlinked and destroyed
186	 */
187	lwsl_cx_info(cx, "destroy msg %p", msg);
188	lws_dll2_remove(&msg->list);
189	lws_free(msg);
190}
191
192/*
193 * This is wanting to be threadsafe, limiting the apis we can call
194 */
195
196int
197_lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
198{
199	lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) -
200				LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
201
202	if (ctx->smd.owner_messages.count >= ctx->smd_queue_depth) {
203		lwsl_cx_warn(ctx, "rejecting message on queue depth %d",
204				  (int)ctx->smd.owner_messages.count);
205		/* reject the message due to max queue depth reached */
206		return 1;
207	}
208
209	if (!ctx->smd.delivering &&
210	    lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */
211		return 1; /* For Coverity */
212
213	if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */
214		goto bail;
215
216	msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested(
217							&ctx->smd, msg, exc);
218	if (!msg->refcount) {
219		/* possible, condsidering exc and no other participants */
220		lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
221
222		lws_free(msg);
223		if (!ctx->smd.delivering)
224			lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
225
226		return 0;
227	}
228
229	msg->exc = exc;
230
231	/* let's add him on the queue... */
232
233	lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages);
234
235	/*
236	 * Any peer with no active tail needs to check our class to see if we
237	 * should become his tail
238	 */
239
240	lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
241		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
242
243		if (pr != exc &&
244                   !pr->tail && _lws_smd_msg_peer_interested_in_msg(pr, msg)) {
245			pr->tail = msg;
246			/* tail message has to actually be of interest to the peer */
247			assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
248		}
249
250	} lws_end_foreach_dll(p);
251
252#if defined(LWS_SMD_DEBUG)
253	lwsl_smd("%s: added %p (refc %u) depth now %d\n", __func__,
254		 msg, msg->refcount, ctx->smd.owner_messages.count);
255	lws_smd_dump(&ctx->smd);
256#endif
257
258	lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
259
260bail:
261	if (!ctx->smd.delivering)
262		lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
263
264	/* we may be happening from another thread context */
265	lws_cancel_service(ctx);
266
267	return 0;
268}
269
270/*
271 * This is wanting to be threadsafe, limiting the apis we can call
272 */
273
274int
275lws_smd_msg_send(struct lws_context *ctx, void *pay)
276{
277	return _lws_smd_msg_send(ctx, pay, NULL);
278}
279
280/*
281 * This is wanting to be threadsafe, limiting the apis we can call
282 */
283
284int
285lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class,
286		   const char *format, ...)
287{
288	lws_smd_msg_t *msg;
289	va_list ap;
290	void *p;
291	int n;
292
293	if (!(ctx->smd._class_filter & _class))
294		/*
295		 * There's nobody interested in messages of this class atm.
296		 * Don't bother generating it, and act like all is well.
297		 */
298		return 0;
299
300	va_start(ap, format);
301	n = vsnprintf(NULL, 0, format, ap);
302	va_end(ap);
303	if (n > LWS_SMD_MAX_PAYLOAD)
304		/* too large to send */
305		return 1;
306
307	p = lws_smd_msg_alloc(ctx, _class, (size_t)n + 2);
308	if (!p)
309		return 1;
310	msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
311								sizeof(*msg));
312	msg->length = (uint16_t)n;
313	va_start(ap, format);
314	vsnprintf((char *)p, (unsigned int)n + 2, format, ap);
315	va_end(ap);
316
317	/*
318	 * locks taken and released in here
319	 */
320
321	if (lws_smd_msg_send(ctx, p)) {
322		lws_smd_msg_free(&p);
323		return 1;
324	}
325
326	return 0;
327}
328
329#if defined(LWS_WITH_SECURE_STREAMS)
330int
331lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len,
332		      lws_smd_class_t _class, const char *format, ...)
333{
334	char *content = (char *)buf + LWS_SMD_SS_RX_HEADER_LEN;
335	va_list ap;
336	int n;
337
338	if (*len < LWS_SMD_SS_RX_HEADER_LEN)
339		return 1;
340
341	lws_ser_wu64be(buf, _class);
342	lws_ser_wu64be(buf + 8, 0); /* valgrind notices uninitialized if left */
343
344	va_start(ap, format);
345	n = vsnprintf(content, (*len) - LWS_SMD_SS_RX_HEADER_LEN, format, ap);
346	va_end(ap);
347
348	if (n > LWS_SMD_MAX_PAYLOAD ||
349	    (unsigned int)n > (*len) - LWS_SMD_SS_RX_HEADER_LEN)
350		/* too large to send */
351		return 1;
352
353	*len = LWS_SMD_SS_RX_HEADER_LEN + (unsigned int)n;
354
355	lwsl_info("%s: %s send cl 0x%x, len %u\n", __func__, tag, (unsigned int)_class,
356			(unsigned int)n);
357
358	return 0;
359}
360
361/*
362 * This is a helper that user rx handler for LWS_SMD_STREAMTYPENAME SS can
363 * call through to with the payload it received from the proxy.  It will then
364 * forward the recieved SMD message to all local (same-context) participants
365 * that are interested in that class (except ones with callback skip_cb, so
366 * we don't loop).
367 */
368
369static int
370_lws_smd_ss_rx_forward(struct lws_context *ctx, const char *tag,
371		       struct lws_smd_peer *pr, const uint8_t *buf, size_t len)
372{
373	lws_smd_class_t _class;
374	lws_smd_msg_t *msg;
375	void *p;
376
377	if (len < LWS_SMD_SS_RX_HEADER_LEN_EFF)
378		return 1;
379
380	if (len >= LWS_SMD_MAX_PAYLOAD + LWS_SMD_SS_RX_HEADER_LEN_EFF)
381		return 1;
382
383	_class = (lws_smd_class_t)lws_ser_ru64be(buf);
384
385	if (_class == LWSSMDCL_METRICS) {
386
387	}
388
389	/* only locally forward messages that we care about in this process */
390
391	if (!(ctx->smd._class_filter & _class))
392		/*
393		 * There's nobody interested in messages of this class atm.
394		 * Don't bother generating it, and act like all is well.
395		 */
396		return 0;
397
398	p = lws_smd_msg_alloc(ctx, _class, len);
399	if (!p)
400		return 1;
401
402	msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
403								sizeof(*msg));
404	msg->length = (uint16_t)(len - LWS_SMD_SS_RX_HEADER_LEN_EFF);
405	/* adopt the original source timestamp, not time we forwarded it */
406	msg->timestamp = (lws_usec_t)lws_ser_ru64be(buf + 8);
407
408	/* copy the message payload in */
409	memcpy(p, buf + LWS_SMD_SS_RX_HEADER_LEN_EFF, msg->length);
410
411	/*
412	 * locks taken and released in here
413	 */
414
415	if (_lws_smd_msg_send(ctx, p, pr)) {
416		/* we couldn't send it after all that... */
417		lws_smd_msg_free(&p);
418
419		return 1;
420	}
421
422	lwsl_info("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__,
423		    tag, (unsigned int)_class, msg->length,
424		    (unsigned long long)msg->timestamp);
425
426	return 0;
427}
428
429int
430lws_smd_ss_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
431{
432	struct lws_ss_handle *h = (struct lws_ss_handle *)
433					(((char *)ss_user) - sizeof(*h));
434	struct lws_context *ctx = lws_ss_get_context(h);
435
436	return _lws_smd_ss_rx_forward(ctx, lws_ss_tag(h), h->u.smd.smd_peer, buf, len);
437}
438
439#if defined(LWS_WITH_SECURE_STREAMS_PROXY_API)
440int
441lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
442{
443	struct lws_sspc_handle *h = (struct lws_sspc_handle *)
444					(((char *)ss_user) - sizeof(*h));
445	struct lws_context *ctx = lws_sspc_get_context(h);
446
447	return _lws_smd_ss_rx_forward(ctx, lws_sspc_tag(h), NULL, buf, len);
448}
449#endif
450
451#endif
452
453/*
454 * Peers that deregister need to adjust the refcount of messages they would
455 * have been interested in, but didn't take delivery of yet
456 */
457
458static void
459_lws_smd_peer_destroy(lws_smd_peer_t *pr)
460{
461	lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t,
462					  owner_peers);
463
464	if (lws_mutex_lock(smd->lock_messages)) /* +++++++++ messages */
465		return; /* For Coverity */
466
467	lws_dll2_remove(&pr->list);
468
469	/*
470	 * We take the approach to adjust the refcount of every would-have-been
471	 * delivered message we were interested in
472	 */
473
474	while (pr->tail) {
475
476		lws_smd_msg_t *m1 = lws_container_of(pr->tail->list.next,
477							lws_smd_msg_t, list);
478
479		if (_lws_smd_msg_peer_interested_in_msg(pr, pr->tail)) {
480			if (!--pr->tail->refcount)
481				_lws_smd_msg_destroy(pr->ctx, smd, pr->tail);
482		}
483
484		pr->tail = m1;
485	}
486
487	lws_free(pr);
488
489	lws_mutex_unlock(smd->lock_messages); /* messages ------- */
490}
491
492static lws_smd_msg_t *
493_lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr)
494{
495	lws_dll2_t *tail = &pr->tail->list;
496	lws_smd_msg_t *msg;
497
498	do {
499		tail = tail->next;
500		if (!tail)
501			return NULL;
502
503		msg = lws_container_of(tail, lws_smd_msg_t, list);
504		if (msg->exc != pr &&
505		    _lws_smd_msg_peer_interested_in_msg(pr, msg))
506			return msg;
507	} while (1);
508
509	return NULL;
510}
511
512/*
513 * Delivers only one message to the peer and advances the tail, or sets to NULL
514 * if no more filtered queued messages.  Returns nonzero if tail non-NULL.
515 *
516 * For Proxied SS, only asks for writeable and does not advance or change the
517 * tail.
518 *
519 * This is done so if multiple messages queued, we don't get a situation where
520 * one participant gets them all spammed, then the next etc.  Instead they are
521 * delivered round-robin.
522 *
523 * Requires peer lock, may take message lock
524 */
525
526static int
527_lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr)
528{
529	lws_smd_msg_t *msg;
530
531	if (!pr->tail)
532		return 0;
533
534	msg = lws_container_of(pr->tail, lws_smd_msg_t, list);
535
536
537	lwsl_cx_info(ctx, "deliver cl 0x%x, len %d, refc %d, to peer %p",
538		    (unsigned int)msg->_class, (int)msg->length,
539		    (int)msg->refcount, pr);
540
541	pr->cb(pr->opaque, msg->_class, msg->timestamp,
542	       ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF,
543	       (size_t)msg->length);
544
545	assert(msg->refcount);
546
547	/*
548	 * If there is one, move forward to the next queued
549	 * message that meets the filters of this peer
550	 */
551	pr->tail = _lws_smd_msg_next_matching_filter(pr);
552
553	/* tail message has to actually be of interest to the peer */
554	assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
555
556	if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++ messages */
557		return 1; /* For Coverity */
558
559	if (!--msg->refcount)
560		_lws_smd_msg_destroy(ctx, &ctx->smd, msg);
561	lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
562
563	return !!pr->tail;
564}
565
566/*
567 * Called when the event loop could deliver messages synchronously, eg, on
568 * entry to idle
569 */
570
571int
572lws_smd_msg_distribute(struct lws_context *ctx)
573{
574	char more;
575
576	/* commonly, no messages and nothing to do... */
577
578	if (!ctx->smd.owner_messages.count)
579		return 0;
580
581	ctx->smd.delivering = 1;
582
583	do {
584		more = 0;
585		if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */
586			return 1; /* For Coverity */
587
588		lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
589					   ctx->smd.owner_peers.head) {
590			lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
591
592			more = (char)(more | !!_lws_smd_msg_deliver_peer(ctx, pr));
593
594		} lws_end_foreach_dll_safe(p, p1);
595
596		lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
597	} while (more);
598
599	ctx->smd.delivering = 0;
600
601	return 0;
602}
603
604struct lws_smd_peer *
605lws_smd_register(struct lws_context *ctx, void *opaque, int flags,
606		 lws_smd_class_t _class_filter, lws_smd_notification_cb_t cb)
607{
608	lws_smd_peer_t *pr = lws_zalloc(sizeof(*pr), __func__);
609
610	if (!pr)
611		return NULL;
612
613	pr->cb = cb;
614	pr->opaque = opaque;
615	pr->_class_filter = _class_filter;
616	pr->ctx = ctx;
617
618	if (!ctx->smd.delivering &&
619	    lws_mutex_lock(ctx->smd.lock_peers)) { /* +++++++++++++++ peers */
620			lws_free(pr);
621			return NULL; /* For Coverity */
622		}
623
624	/*
625	 * Let's lock the message list before adding this peer... because...
626	 */
627
628	if (lws_mutex_lock(ctx->smd.lock_messages)) { /* +++++++++ messages */
629		lws_free(pr);
630		pr = NULL;
631		goto bail1; /* For Coverity */
632	}
633
634	lws_dll2_add_tail(&pr->list, &ctx->smd.owner_peers);
635
636	/* update the global class mask union to account for new peer mask */
637	_lws_smd_class_mask_union(&ctx->smd);
638
639	/*
640	 * Now there's a new peer added, any messages we have stashed will try
641	 * to deliver to this guy too, if he's interested in that class.  So we
642	 * have to update the message refcounts for queued messages-he's-
643	 * interested-in accordingly.
644	 */
645
646	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
647				   ctx->smd.owner_messages.head) {
648		lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
649
650		if (_lws_smd_msg_peer_interested_in_msg(pr, msg))
651			msg->refcount++;
652
653	} lws_end_foreach_dll_safe(p, p1);
654
655	/* ... ok we are done adding the peer */
656
657	lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
658
659	lwsl_cx_info(ctx, "peer %p (count %u) registered", pr,
660			(unsigned int)ctx->smd.owner_peers.count);
661
662bail1:
663	if (!ctx->smd.delivering)
664		lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
665
666	return pr;
667}
668
669void
670lws_smd_unregister(struct lws_smd_peer *pr)
671{
672	lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers);
673
674	if (!smd->delivering &&
675	    lws_mutex_lock(smd->lock_peers)) /* +++++++++++++++++++ peers */
676		return; /* For Coverity */
677	lwsl_cx_notice(pr->ctx, "destroying peer %p", pr);
678	_lws_smd_peer_destroy(pr);
679	if (!smd->delivering)
680		lws_mutex_unlock(smd->lock_peers); /* ----------------- peers */
681}
682
683int
684lws_smd_message_pending(struct lws_context *ctx)
685{
686	int ret = 1;
687
688	/*
689	 * First cheaply check the common case no messages pending, so there's
690	 * definitely nothing for this tsi or anything else
691	 */
692
693	if (!ctx->smd.owner_messages.count)
694		return 0;
695
696	/*
697	 * If there are any messages, check their age and expire ones that
698	 * have been hanging around too long
699	 */
700
701	if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++++++++++ peers */
702		return 1; /* For Coverity */
703	if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */
704		goto bail; /* For Coverity */
705
706	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
707				   ctx->smd.owner_messages.head) {
708		lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
709
710		if ((lws_now_usecs() - msg->timestamp) > ctx->smd_ttl_us) {
711			lwsl_cx_warn(ctx, "timing out queued message %p",
712					msg);
713
714			/*
715			 * We're forcibly yanking this guy, we can expect that
716			 * there might be peers that point to it as their tail.
717			 *
718			 * In that case, move their tails on to the next guy
719			 * they are interested in, if any.
720			 */
721
722			lws_start_foreach_dll_safe(struct lws_dll2 *, pp, pp1,
723						   ctx->smd.owner_peers.head) {
724				lws_smd_peer_t *pr = lws_container_of(pp,
725							lws_smd_peer_t, list);
726
727				if (pr->tail == msg)
728					pr->tail = _lws_smd_msg_next_matching_filter(pr);
729
730			} lws_end_foreach_dll_safe(pp, pp1);
731
732			/*
733			 * No peer should fall foul of the peer tail checks
734			 * when destroying the message now.
735			 */
736
737			_lws_smd_msg_destroy(ctx, &ctx->smd, msg);
738		}
739	} lws_end_foreach_dll_safe(p, p1);
740
741	lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
742
743	/*
744	 * Walk the peer list
745	 */
746
747	lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
748		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
749
750		if (pr->tail)
751			goto bail;
752
753	} lws_end_foreach_dll(p);
754
755	/*
756	 * There's no message pending that we need to handle
757	 */
758
759	ret = 0;
760
761bail:
762	lws_mutex_unlock(ctx->smd.lock_peers); /* --------------------- peers */
763
764	return ret;
765}
766
767int
768_lws_smd_destroy(struct lws_context *ctx)
769{
770	/* stop any message creation */
771
772	ctx->smd._class_filter = 0;
773
774	/*
775	 * Walk the message list, destroying them
776	 */
777
778	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
779				   ctx->smd.owner_messages.head) {
780		lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
781
782		lws_dll2_remove(&msg->list);
783		lws_free(msg);
784
785	} lws_end_foreach_dll_safe(p, p1);
786
787	/*
788	 * Walk the peer list, destroying them
789	 */
790
791	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
792				   ctx->smd.owner_peers.head) {
793		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
794
795		pr->tail = NULL; /* we just nuked all the messages, ignore */
796		_lws_smd_peer_destroy(pr);
797
798	} lws_end_foreach_dll_safe(p, p1);
799
800	lws_mutex_destroy(ctx->smd.lock_messages);
801	lws_mutex_destroy(ctx->smd.lock_peers);
802
803	return 0;
804}
805