1d4afb5ceSopenharmony_ci/*
2d4afb5ceSopenharmony_ci * libwebsockets-test-server - libwebsockets test implementation
3d4afb5ceSopenharmony_ci *
4d4afb5ceSopenharmony_ci * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5d4afb5ceSopenharmony_ci *
6d4afb5ceSopenharmony_ci * This file is made available under the Creative Commons CC0 1.0
7d4afb5ceSopenharmony_ci * Universal Public Domain Dedication.
8d4afb5ceSopenharmony_ci *
9d4afb5ceSopenharmony_ci * The person who associated a work with this deed has dedicated
10d4afb5ceSopenharmony_ci * the work to the public domain by waiving all of his or her rights
11d4afb5ceSopenharmony_ci * to the work worldwide under copyright law, including all related
12d4afb5ceSopenharmony_ci * and neighboring rights, to the extent allowed by law. You can copy,
13d4afb5ceSopenharmony_ci * modify, distribute and perform the work, even for commercial purposes,
14d4afb5ceSopenharmony_ci * all without asking permission.
15d4afb5ceSopenharmony_ci *
16d4afb5ceSopenharmony_ci * The test apps are intended to be adapted for use in your code, which
17d4afb5ceSopenharmony_ci * may be proprietary.  So unlike the library itself, they are licensed
18d4afb5ceSopenharmony_ci * Public Domain.
19d4afb5ceSopenharmony_ci *
20d4afb5ceSopenharmony_ci * Notice that the lws_pthread... locking apis are all zero-footprint
21d4afb5ceSopenharmony_ci * NOPs in the case LWS_MAX_SMP == 1, which is the default.  When lws
22d4afb5ceSopenharmony_ci * is built for multiple service threads though, they resolve to their
23d4afb5ceSopenharmony_ci * pthreads equivalents.
24d4afb5ceSopenharmony_ci */
25d4afb5ceSopenharmony_ci
26d4afb5ceSopenharmony_ci#if !defined (LWS_PLUGIN_STATIC)
27d4afb5ceSopenharmony_ci#if !defined(LWS_DLL)
28d4afb5ceSopenharmony_ci#define LWS_DLL
29d4afb5ceSopenharmony_ci#endif
30d4afb5ceSopenharmony_ci#if !defined(LWS_INTERNAL)
31d4afb5ceSopenharmony_ci#define LWS_INTERNAL
32d4afb5ceSopenharmony_ci#endif
33d4afb5ceSopenharmony_ci#include <libwebsockets.h>
34d4afb5ceSopenharmony_ci#endif
35d4afb5ceSopenharmony_ci
36d4afb5ceSopenharmony_ci#include <string.h>
37d4afb5ceSopenharmony_ci#include <stdlib.h>
38d4afb5ceSopenharmony_ci
39d4afb5ceSopenharmony_ci#define QUEUELEN 32
40d4afb5ceSopenharmony_ci/* queue free space below this, rx flow is disabled */
41d4afb5ceSopenharmony_ci#define RXFLOW_MIN (4)
42d4afb5ceSopenharmony_ci/* queue free space above this, rx flow is enabled */
43d4afb5ceSopenharmony_ci#define RXFLOW_MAX ((2 * QUEUELEN) / 3)
44d4afb5ceSopenharmony_ci
45d4afb5ceSopenharmony_ci#define MAX_MIRROR_INSTANCES 3
46d4afb5ceSopenharmony_ci
47d4afb5ceSopenharmony_cistruct mirror_instance;
48d4afb5ceSopenharmony_ci
49d4afb5ceSopenharmony_cistruct per_session_data__lws_mirror {
50d4afb5ceSopenharmony_ci	struct lws *wsi;
51d4afb5ceSopenharmony_ci	struct mirror_instance *mi;
52d4afb5ceSopenharmony_ci	struct per_session_data__lws_mirror *same_mi_pss_list;
53d4afb5ceSopenharmony_ci	uint32_t tail;
54d4afb5ceSopenharmony_ci};
55d4afb5ceSopenharmony_ci
56d4afb5ceSopenharmony_ci/* this is the element in the ring */
57d4afb5ceSopenharmony_cistruct a_message {
58d4afb5ceSopenharmony_ci	void *payload;
59d4afb5ceSopenharmony_ci	size_t len;
60d4afb5ceSopenharmony_ci};
61d4afb5ceSopenharmony_ci
62d4afb5ceSopenharmony_cistruct mirror_instance {
63d4afb5ceSopenharmony_ci	struct mirror_instance *next;
64d4afb5ceSopenharmony_ci	lws_pthread_mutex(lock) /* protects all mirror instance data */
65d4afb5ceSopenharmony_ci	struct per_session_data__lws_mirror *same_mi_pss_list;
66d4afb5ceSopenharmony_ci	/**< must hold the the per_vhost_data__lws_mirror.lock as well
67d4afb5ceSopenharmony_ci	 * to change mi list membership */
68d4afb5ceSopenharmony_ci	struct lws_ring *ring;
69d4afb5ceSopenharmony_ci	int messages_allocated;
70d4afb5ceSopenharmony_ci	char name[30];
71d4afb5ceSopenharmony_ci	char rx_enabled;
72d4afb5ceSopenharmony_ci};
73d4afb5ceSopenharmony_ci
74d4afb5ceSopenharmony_cistruct per_vhost_data__lws_mirror {
75d4afb5ceSopenharmony_ci	lws_pthread_mutex(lock) /* protects mi_list membership changes */
76d4afb5ceSopenharmony_ci	struct mirror_instance *mi_list;
77d4afb5ceSopenharmony_ci};
78d4afb5ceSopenharmony_ci
79d4afb5ceSopenharmony_ci
80d4afb5ceSopenharmony_ci/* enable or disable rx from all connections to this mirror instance */
81d4afb5ceSopenharmony_cistatic void
82d4afb5ceSopenharmony_ci__mirror_rxflow_instance(struct mirror_instance *mi, int enable)
83d4afb5ceSopenharmony_ci{
84d4afb5ceSopenharmony_ci	lws_start_foreach_ll(struct per_session_data__lws_mirror *,
85d4afb5ceSopenharmony_ci			     pss, mi->same_mi_pss_list) {
86d4afb5ceSopenharmony_ci		lws_rx_flow_control(pss->wsi, enable);
87d4afb5ceSopenharmony_ci	} lws_end_foreach_ll(pss, same_mi_pss_list);
88d4afb5ceSopenharmony_ci
89d4afb5ceSopenharmony_ci	mi->rx_enabled = (char)enable;
90d4afb5ceSopenharmony_ci}
91d4afb5ceSopenharmony_ci
92d4afb5ceSopenharmony_ci/*
93d4afb5ceSopenharmony_ci * Find out which connection to this mirror instance has the longest number
94d4afb5ceSopenharmony_ci * of still unread elements in the ringbuffer and update the lws_ring "oldest
95d4afb5ceSopenharmony_ci * tail" with it.  Elements behind the "oldest tail" are freed and recycled for
96d4afb5ceSopenharmony_ci * new head content.  Elements after the "oldest tail" are still waiting to be
97d4afb5ceSopenharmony_ci * read by somebody.
98d4afb5ceSopenharmony_ci *
99d4afb5ceSopenharmony_ci * If the oldest tail moved on from before, check if it created enough space
100d4afb5ceSopenharmony_ci * in the queue to re-enable RX flow control for the mirror instance.
101d4afb5ceSopenharmony_ci *
102d4afb5ceSopenharmony_ci * Mark connections that are at the oldest tail as being on a 3s timeout to
103d4afb5ceSopenharmony_ci * transmit something, otherwise the connection will be closed.  Without this,
104d4afb5ceSopenharmony_ci * a choked or nonresponsive connection can block the FIFO from freeing up any
105d4afb5ceSopenharmony_ci * new space for new data.
106d4afb5ceSopenharmony_ci *
107d4afb5ceSopenharmony_ci * You can skip calling this if on your connection, before processing, the tail
108d4afb5ceSopenharmony_ci * was not equal to the current worst, ie,  if the tail you will work on is !=
109d4afb5ceSopenharmony_ci * lws_ring_get_oldest_tail(ring) then no need to call this when the tail
110d4afb5ceSopenharmony_ci * has changed; it wasn't the oldest so it won't change the oldest.
111d4afb5ceSopenharmony_ci *
112d4afb5ceSopenharmony_ci * Returns 0 if oldest unchanged or 1 if oldest changed from this call.
113d4afb5ceSopenharmony_ci */
114d4afb5ceSopenharmony_cistatic int
115d4afb5ceSopenharmony_ci__mirror_update_worst_tail(struct mirror_instance *mi)
116d4afb5ceSopenharmony_ci{
117d4afb5ceSopenharmony_ci	uint32_t wai, worst = 0, worst_tail = 0, oldest;
118d4afb5ceSopenharmony_ci	struct per_session_data__lws_mirror *worst_pss = NULL;
119d4afb5ceSopenharmony_ci
120d4afb5ceSopenharmony_ci	oldest = lws_ring_get_oldest_tail(mi->ring);
121d4afb5ceSopenharmony_ci
122d4afb5ceSopenharmony_ci	lws_start_foreach_ll(struct per_session_data__lws_mirror *,
123d4afb5ceSopenharmony_ci			     pss, mi->same_mi_pss_list) {
124d4afb5ceSopenharmony_ci		wai = (uint32_t)lws_ring_get_count_waiting_elements(mi->ring,
125d4afb5ceSopenharmony_ci								&pss->tail);
126d4afb5ceSopenharmony_ci		if (wai >= worst) {
127d4afb5ceSopenharmony_ci			worst = wai;
128d4afb5ceSopenharmony_ci			worst_tail = pss->tail;
129d4afb5ceSopenharmony_ci			worst_pss = pss;
130d4afb5ceSopenharmony_ci		}
131d4afb5ceSopenharmony_ci	} lws_end_foreach_ll(pss, same_mi_pss_list);
132d4afb5ceSopenharmony_ci
133d4afb5ceSopenharmony_ci	if (!worst_pss)
134d4afb5ceSopenharmony_ci		return 0;
135d4afb5ceSopenharmony_ci
136d4afb5ceSopenharmony_ci	lws_ring_update_oldest_tail(mi->ring, worst_tail);
137d4afb5ceSopenharmony_ci	if (oldest == lws_ring_get_oldest_tail(mi->ring))
138d4afb5ceSopenharmony_ci		return 0;
139d4afb5ceSopenharmony_ci	/*
140d4afb5ceSopenharmony_ci	 * The oldest tail did move on.  Check if we should re-enable rx flow
141d4afb5ceSopenharmony_ci	 * for the mirror instance since we made some space now.
142d4afb5ceSopenharmony_ci	 */
143d4afb5ceSopenharmony_ci	if (!mi->rx_enabled && /* rx is disabled */
144d4afb5ceSopenharmony_ci	    lws_ring_get_count_free_elements(mi->ring) >= RXFLOW_MAX)
145d4afb5ceSopenharmony_ci		/* there is enough space, let's re-enable rx for our instance */
146d4afb5ceSopenharmony_ci		__mirror_rxflow_instance(mi, 1);
147d4afb5ceSopenharmony_ci
148d4afb5ceSopenharmony_ci	/* if nothing in queue, no timeout needed */
149d4afb5ceSopenharmony_ci	if (!worst)
150d4afb5ceSopenharmony_ci		return 1;
151d4afb5ceSopenharmony_ci
152d4afb5ceSopenharmony_ci	/*
153d4afb5ceSopenharmony_ci	 * The guy(s) with the oldest tail block the ringbuffer from recycling
154d4afb5ceSopenharmony_ci	 * the FIFO entries he has not read yet.  Don't allow those guys to
155d4afb5ceSopenharmony_ci	 * block the FIFO operation for very long.
156d4afb5ceSopenharmony_ci	 */
157d4afb5ceSopenharmony_ci	lws_start_foreach_ll(struct per_session_data__lws_mirror *,
158d4afb5ceSopenharmony_ci			     pss, mi->same_mi_pss_list) {
159d4afb5ceSopenharmony_ci		if (pss->tail == worst_tail)
160d4afb5ceSopenharmony_ci			/*
161d4afb5ceSopenharmony_ci			 * Our policy is if you are the slowest connection,
162d4afb5ceSopenharmony_ci			 * you had better transmit something to help with that
163d4afb5ceSopenharmony_ci			 * within 3s, or we will hang up on you to stop you
164d4afb5ceSopenharmony_ci			 * blocking the FIFO for everyone else.
165d4afb5ceSopenharmony_ci			 */
166d4afb5ceSopenharmony_ci			lws_set_timeout(pss->wsi,
167d4afb5ceSopenharmony_ci					PENDING_TIMEOUT_USER_REASON_BASE, 3);
168d4afb5ceSopenharmony_ci	} lws_end_foreach_ll(pss, same_mi_pss_list);
169d4afb5ceSopenharmony_ci
170d4afb5ceSopenharmony_ci	return 1;
171d4afb5ceSopenharmony_ci}
172d4afb5ceSopenharmony_ci
173d4afb5ceSopenharmony_cistatic void
174d4afb5ceSopenharmony_ci__mirror_callback_all_in_mi_on_writable(struct mirror_instance *mi)
175d4afb5ceSopenharmony_ci{
176d4afb5ceSopenharmony_ci	/* ask for WRITABLE callback for every wsi on this mi */
177d4afb5ceSopenharmony_ci	lws_start_foreach_ll(struct per_session_data__lws_mirror *,
178d4afb5ceSopenharmony_ci			     pss, mi->same_mi_pss_list) {
179d4afb5ceSopenharmony_ci		lws_callback_on_writable(pss->wsi);
180d4afb5ceSopenharmony_ci	} lws_end_foreach_ll(pss, same_mi_pss_list);
181d4afb5ceSopenharmony_ci}
182d4afb5ceSopenharmony_ci
183d4afb5ceSopenharmony_cistatic void
184d4afb5ceSopenharmony_ci__mirror_destroy_message(void *_msg)
185d4afb5ceSopenharmony_ci{
186d4afb5ceSopenharmony_ci	struct a_message *msg = _msg;
187d4afb5ceSopenharmony_ci
188d4afb5ceSopenharmony_ci	free(msg->payload);
189d4afb5ceSopenharmony_ci	msg->payload = NULL;
190d4afb5ceSopenharmony_ci	msg->len = 0;
191d4afb5ceSopenharmony_ci}
192d4afb5ceSopenharmony_ci
193d4afb5ceSopenharmony_cistatic int
194d4afb5ceSopenharmony_cicallback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
195d4afb5ceSopenharmony_ci		    void *user, void *in, size_t len)
196d4afb5ceSopenharmony_ci{
197d4afb5ceSopenharmony_ci	struct per_session_data__lws_mirror *pss =
198d4afb5ceSopenharmony_ci			(struct per_session_data__lws_mirror *)user;
199d4afb5ceSopenharmony_ci	struct per_vhost_data__lws_mirror *v =
200d4afb5ceSopenharmony_ci			(struct per_vhost_data__lws_mirror *)
201d4afb5ceSopenharmony_ci			lws_protocol_vh_priv_get(lws_get_vhost(wsi),
202d4afb5ceSopenharmony_ci						 lws_get_protocol(wsi));
203d4afb5ceSopenharmony_ci	char name[300], update_worst, sent_something, *pn = name;
204d4afb5ceSopenharmony_ci	struct mirror_instance *mi = NULL;
205d4afb5ceSopenharmony_ci	const struct a_message *msg;
206d4afb5ceSopenharmony_ci	struct a_message amsg;
207d4afb5ceSopenharmony_ci	uint32_t oldest_tail;
208d4afb5ceSopenharmony_ci	int n, count_mi = 0;
209d4afb5ceSopenharmony_ci
210d4afb5ceSopenharmony_ci	switch (reason) {
211d4afb5ceSopenharmony_ci	case LWS_CALLBACK_ESTABLISHED:
212d4afb5ceSopenharmony_ci		lwsl_info("%s: LWS_CALLBACK_ESTABLISHED\n", __func__);
213d4afb5ceSopenharmony_ci		if (!v) {
214d4afb5ceSopenharmony_ci			lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
215d4afb5ceSopenharmony_ci					lws_get_protocol(wsi),
216d4afb5ceSopenharmony_ci					sizeof(struct per_vhost_data__lws_mirror));
217d4afb5ceSopenharmony_ci			v = (struct per_vhost_data__lws_mirror *)
218d4afb5ceSopenharmony_ci					lws_protocol_vh_priv_get(lws_get_vhost(wsi),
219d4afb5ceSopenharmony_ci								 lws_get_protocol(wsi));
220d4afb5ceSopenharmony_ci			lws_pthread_mutex_init(&v->lock);
221d4afb5ceSopenharmony_ci		}
222d4afb5ceSopenharmony_ci
223d4afb5ceSopenharmony_ci		/*
224d4afb5ceSopenharmony_ci		 * mirror instance name... defaults to "", but if URL includes
225d4afb5ceSopenharmony_ci		 * "?mirror=xxx", will be "xxx"
226d4afb5ceSopenharmony_ci		 */
227d4afb5ceSopenharmony_ci
228d4afb5ceSopenharmony_ci		if (lws_get_urlarg_by_name_safe(wsi, "mirror", name,
229d4afb5ceSopenharmony_ci					        sizeof(name) - 1) < 0) {
230d4afb5ceSopenharmony_ci			lwsl_debug("get urlarg failed\n");
231d4afb5ceSopenharmony_ci			name[0] = '\0';
232d4afb5ceSopenharmony_ci		}
233d4afb5ceSopenharmony_ci
234d4afb5ceSopenharmony_ci		//lwsl_notice("%s: mirror name '%s'\n", __func__, pn);
235d4afb5ceSopenharmony_ci
236d4afb5ceSopenharmony_ci		/* is there already a mirror instance of this name? */
237d4afb5ceSopenharmony_ci
238d4afb5ceSopenharmony_ci		lws_pthread_mutex_lock(&v->lock); /* vhost lock { */
239d4afb5ceSopenharmony_ci
240d4afb5ceSopenharmony_ci		lws_start_foreach_ll(struct mirror_instance *, mi1,
241d4afb5ceSopenharmony_ci				     v->mi_list) {
242d4afb5ceSopenharmony_ci			count_mi++;
243d4afb5ceSopenharmony_ci			if (!strcmp(pn, mi1->name)) {
244d4afb5ceSopenharmony_ci				/* yes... we will join it */
245d4afb5ceSopenharmony_ci				mi = mi1;
246d4afb5ceSopenharmony_ci				break;
247d4afb5ceSopenharmony_ci			}
248d4afb5ceSopenharmony_ci		} lws_end_foreach_ll(mi1, next);
249d4afb5ceSopenharmony_ci
250d4afb5ceSopenharmony_ci		if (!mi) {
251d4afb5ceSopenharmony_ci
252d4afb5ceSopenharmony_ci			/* no existing mirror instance for name */
253d4afb5ceSopenharmony_ci			if (count_mi == MAX_MIRROR_INSTANCES) {
254d4afb5ceSopenharmony_ci				lws_pthread_mutex_unlock(&v->lock); /* } vh lock */
255d4afb5ceSopenharmony_ci				return -1;
256d4afb5ceSopenharmony_ci			}
257d4afb5ceSopenharmony_ci
258d4afb5ceSopenharmony_ci			/* create one with this name, and join it */
259d4afb5ceSopenharmony_ci			mi = malloc(sizeof(*mi));
260d4afb5ceSopenharmony_ci			if (!mi)
261d4afb5ceSopenharmony_ci				goto bail1;
262d4afb5ceSopenharmony_ci			memset(mi, 0, sizeof(*mi));
263d4afb5ceSopenharmony_ci			mi->ring = lws_ring_create(sizeof(struct a_message),
264d4afb5ceSopenharmony_ci						   QUEUELEN,
265d4afb5ceSopenharmony_ci						   __mirror_destroy_message);
266d4afb5ceSopenharmony_ci			if (!mi->ring) {
267d4afb5ceSopenharmony_ci				free(mi);
268d4afb5ceSopenharmony_ci				goto bail1;
269d4afb5ceSopenharmony_ci			}
270d4afb5ceSopenharmony_ci
271d4afb5ceSopenharmony_ci			mi->next = v->mi_list;
272d4afb5ceSopenharmony_ci			v->mi_list = mi;
273d4afb5ceSopenharmony_ci			lws_snprintf(mi->name, sizeof(mi->name) - 1, "%s", pn);
274d4afb5ceSopenharmony_ci			mi->rx_enabled = 1;
275d4afb5ceSopenharmony_ci
276d4afb5ceSopenharmony_ci			lws_pthread_mutex_init(&mi->lock);
277d4afb5ceSopenharmony_ci
278d4afb5ceSopenharmony_ci			lwsl_notice("Created new mi %p '%s'\n", mi, pn);
279d4afb5ceSopenharmony_ci		}
280d4afb5ceSopenharmony_ci
281d4afb5ceSopenharmony_ci		/* add our pss to list of guys bound to this mi */
282d4afb5ceSopenharmony_ci
283d4afb5ceSopenharmony_ci		lws_ll_fwd_insert(pss, same_mi_pss_list, mi->same_mi_pss_list);
284d4afb5ceSopenharmony_ci
285d4afb5ceSopenharmony_ci		/* init the pss */
286d4afb5ceSopenharmony_ci
287d4afb5ceSopenharmony_ci		pss->mi = mi;
288d4afb5ceSopenharmony_ci		pss->tail = lws_ring_get_oldest_tail(mi->ring);
289d4afb5ceSopenharmony_ci		pss->wsi = wsi;
290d4afb5ceSopenharmony_ci
291d4afb5ceSopenharmony_ci		lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */
292d4afb5ceSopenharmony_ci		break;
293d4afb5ceSopenharmony_ci
294d4afb5ceSopenharmony_cibail1:
295d4afb5ceSopenharmony_ci		lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */
296d4afb5ceSopenharmony_ci		return 1;
297d4afb5ceSopenharmony_ci
298d4afb5ceSopenharmony_ci	case LWS_CALLBACK_CLOSED:
299d4afb5ceSopenharmony_ci		/* detach our pss from the mirror instance */
300d4afb5ceSopenharmony_ci		mi = pss->mi;
301d4afb5ceSopenharmony_ci		if (!mi)
302d4afb5ceSopenharmony_ci			break;
303d4afb5ceSopenharmony_ci
304d4afb5ceSopenharmony_ci		lws_pthread_mutex_lock(&v->lock); /* vhost lock { */
305d4afb5ceSopenharmony_ci
306d4afb5ceSopenharmony_ci		/* remove our closing pss from its mirror instance list */
307d4afb5ceSopenharmony_ci		lws_ll_fwd_remove(struct per_session_data__lws_mirror,
308d4afb5ceSopenharmony_ci				  same_mi_pss_list, pss, mi->same_mi_pss_list);
309d4afb5ceSopenharmony_ci		pss->mi = NULL;
310d4afb5ceSopenharmony_ci
311d4afb5ceSopenharmony_ci		if (mi->same_mi_pss_list) {
312d4afb5ceSopenharmony_ci			/*
313d4afb5ceSopenharmony_ci			 * Still other pss using the mirror instance.  The pss
314d4afb5ceSopenharmony_ci			 * going away may have had the oldest tail, reconfirm
315d4afb5ceSopenharmony_ci			 * using the remaining pss what is the current oldest
316d4afb5ceSopenharmony_ci			 * tail.  If the oldest tail moves on, this call also
317d4afb5ceSopenharmony_ci			 * will re-enable rx flow control when appropriate.
318d4afb5ceSopenharmony_ci			 */
319d4afb5ceSopenharmony_ci			lws_pthread_mutex_lock(&mi->lock); /* mi lock { */
320d4afb5ceSopenharmony_ci			__mirror_update_worst_tail(mi);
321d4afb5ceSopenharmony_ci			lws_pthread_mutex_unlock(&mi->lock); /* } mi lock */
322d4afb5ceSopenharmony_ci			lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */
323d4afb5ceSopenharmony_ci			break;
324d4afb5ceSopenharmony_ci		}
325d4afb5ceSopenharmony_ci
326d4afb5ceSopenharmony_ci		/* No more pss using the mirror instance... delete mi */
327d4afb5ceSopenharmony_ci
328d4afb5ceSopenharmony_ci		lws_start_foreach_llp(struct mirror_instance **,
329d4afb5ceSopenharmony_ci				pmi, v->mi_list) {
330d4afb5ceSopenharmony_ci			if (*pmi == mi) {
331d4afb5ceSopenharmony_ci				*pmi = (*pmi)->next;
332d4afb5ceSopenharmony_ci
333d4afb5ceSopenharmony_ci				lws_ring_destroy(mi->ring);
334d4afb5ceSopenharmony_ci				lws_pthread_mutex_destroy(&mi->lock);
335d4afb5ceSopenharmony_ci
336d4afb5ceSopenharmony_ci				free(mi);
337d4afb5ceSopenharmony_ci				break;
338d4afb5ceSopenharmony_ci			}
339d4afb5ceSopenharmony_ci		} lws_end_foreach_llp(pmi, next);
340d4afb5ceSopenharmony_ci
341d4afb5ceSopenharmony_ci		lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */
342d4afb5ceSopenharmony_ci		break;
343d4afb5ceSopenharmony_ci
344d4afb5ceSopenharmony_ci	case LWS_CALLBACK_CONFIRM_EXTENSION_OKAY:
345d4afb5ceSopenharmony_ci		return 1; /* disallow compression */
346d4afb5ceSopenharmony_ci
347d4afb5ceSopenharmony_ci	case LWS_CALLBACK_PROTOCOL_INIT: /* per vhost */
348d4afb5ceSopenharmony_ci		if (!v) {
349d4afb5ceSopenharmony_ci			lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
350d4afb5ceSopenharmony_ci				lws_get_protocol(wsi),
351d4afb5ceSopenharmony_ci				sizeof(struct per_vhost_data__lws_mirror));
352d4afb5ceSopenharmony_ci			v = (struct per_vhost_data__lws_mirror *)
353d4afb5ceSopenharmony_ci				lws_protocol_vh_priv_get(lws_get_vhost(wsi),
354d4afb5ceSopenharmony_ci							 lws_get_protocol(wsi));
355d4afb5ceSopenharmony_ci			if (!v)
356d4afb5ceSopenharmony_ci				return 0;
357d4afb5ceSopenharmony_ci			lws_pthread_mutex_init(&v->lock);
358d4afb5ceSopenharmony_ci		}
359d4afb5ceSopenharmony_ci		break;
360d4afb5ceSopenharmony_ci
361d4afb5ceSopenharmony_ci	case LWS_CALLBACK_PROTOCOL_DESTROY:
362d4afb5ceSopenharmony_ci		lws_pthread_mutex_destroy(&v->lock);
363d4afb5ceSopenharmony_ci		break;
364d4afb5ceSopenharmony_ci
365d4afb5ceSopenharmony_ci	case LWS_CALLBACK_SERVER_WRITEABLE:
366d4afb5ceSopenharmony_ci		lws_pthread_mutex_lock(&pss->mi->lock); /* instance lock { */
367d4afb5ceSopenharmony_ci		oldest_tail = lws_ring_get_oldest_tail(pss->mi->ring);
368d4afb5ceSopenharmony_ci		update_worst = oldest_tail == pss->tail;
369d4afb5ceSopenharmony_ci		sent_something = 0;
370d4afb5ceSopenharmony_ci
371d4afb5ceSopenharmony_ci		do {
372d4afb5ceSopenharmony_ci			msg = lws_ring_get_element(pss->mi->ring, &pss->tail);
373d4afb5ceSopenharmony_ci			if (!msg)
374d4afb5ceSopenharmony_ci				break;
375d4afb5ceSopenharmony_ci
376d4afb5ceSopenharmony_ci			if (!msg->payload) {
377d4afb5ceSopenharmony_ci				lwsl_err("%s: NULL payload: worst = %d,"
378d4afb5ceSopenharmony_ci					 " pss->tail = %d\n", __func__,
379d4afb5ceSopenharmony_ci					 oldest_tail, pss->tail);
380d4afb5ceSopenharmony_ci				if (lws_ring_consume(pss->mi->ring, &pss->tail,
381d4afb5ceSopenharmony_ci						     NULL, 1))
382d4afb5ceSopenharmony_ci					continue;
383d4afb5ceSopenharmony_ci				break;
384d4afb5ceSopenharmony_ci			}
385d4afb5ceSopenharmony_ci
386d4afb5ceSopenharmony_ci			n = lws_write(wsi, (unsigned char *)msg->payload +
387d4afb5ceSopenharmony_ci				      LWS_PRE, msg->len, LWS_WRITE_TEXT);
388d4afb5ceSopenharmony_ci			if (n < 0) {
389d4afb5ceSopenharmony_ci				lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
390d4afb5ceSopenharmony_ci
391d4afb5ceSopenharmony_ci				goto bail2;
392d4afb5ceSopenharmony_ci			}
393d4afb5ceSopenharmony_ci			sent_something = 1;
394d4afb5ceSopenharmony_ci			lws_ring_consume(pss->mi->ring, &pss->tail, NULL, 1);
395d4afb5ceSopenharmony_ci
396d4afb5ceSopenharmony_ci		} while (!lws_send_pipe_choked(wsi));
397d4afb5ceSopenharmony_ci
398d4afb5ceSopenharmony_ci		/* if any left for us to send, ask for writeable again */
399d4afb5ceSopenharmony_ci		if (lws_ring_get_count_waiting_elements(pss->mi->ring,
400d4afb5ceSopenharmony_ci							&pss->tail))
401d4afb5ceSopenharmony_ci			lws_callback_on_writable(wsi);
402d4afb5ceSopenharmony_ci
403d4afb5ceSopenharmony_ci		if (!sent_something || !update_worst)
404d4afb5ceSopenharmony_ci			goto done1;
405d4afb5ceSopenharmony_ci
406d4afb5ceSopenharmony_ci		/*
407d4afb5ceSopenharmony_ci		 * We are no longer holding the oldest tail (since we sent
408d4afb5ceSopenharmony_ci		 * something.  So free us of the timeout related to hogging the
409d4afb5ceSopenharmony_ci		 * oldest tail.
410d4afb5ceSopenharmony_ci		 */
411d4afb5ceSopenharmony_ci		lws_set_timeout(pss->wsi, NO_PENDING_TIMEOUT, 0);
412d4afb5ceSopenharmony_ci		/*
413d4afb5ceSopenharmony_ci		 * If we were originally at the oldest fifo position of
414d4afb5ceSopenharmony_ci		 * all the tails, now we used some up we may have
415d4afb5ceSopenharmony_ci		 * changed the oldest fifo position and made some space.
416d4afb5ceSopenharmony_ci		 */
417d4afb5ceSopenharmony_ci		__mirror_update_worst_tail(pss->mi);
418d4afb5ceSopenharmony_ci
419d4afb5ceSopenharmony_cidone1:
420d4afb5ceSopenharmony_ci		lws_pthread_mutex_unlock(&pss->mi->lock); /* } instance lock */
421d4afb5ceSopenharmony_ci		break;
422d4afb5ceSopenharmony_ci
423d4afb5ceSopenharmony_cibail2:
424d4afb5ceSopenharmony_ci		lws_pthread_mutex_unlock(&pss->mi->lock); /* } instance lock */
425d4afb5ceSopenharmony_ci
426d4afb5ceSopenharmony_ci		return -1;
427d4afb5ceSopenharmony_ci
428d4afb5ceSopenharmony_ci	case LWS_CALLBACK_RECEIVE:
429d4afb5ceSopenharmony_ci		lws_pthread_mutex_lock(&pss->mi->lock); /* mi lock { */
430d4afb5ceSopenharmony_ci		n = (int)lws_ring_get_count_free_elements(pss->mi->ring);
431d4afb5ceSopenharmony_ci		if (!n) {
432d4afb5ceSopenharmony_ci			lwsl_notice("dropping!\n");
433d4afb5ceSopenharmony_ci			if (pss->mi->rx_enabled)
434d4afb5ceSopenharmony_ci				__mirror_rxflow_instance(pss->mi, 0);
435d4afb5ceSopenharmony_ci			goto req_writable;
436d4afb5ceSopenharmony_ci		}
437d4afb5ceSopenharmony_ci
438d4afb5ceSopenharmony_ci		amsg.payload = malloc(LWS_PRE + len);
439d4afb5ceSopenharmony_ci		amsg.len = len;
440d4afb5ceSopenharmony_ci		if (!amsg.payload) {
441d4afb5ceSopenharmony_ci			lwsl_notice("OOM: dropping\n");
442d4afb5ceSopenharmony_ci			goto done2;
443d4afb5ceSopenharmony_ci		}
444d4afb5ceSopenharmony_ci
445d4afb5ceSopenharmony_ci		memcpy((char *)amsg.payload + LWS_PRE, in, len);
446d4afb5ceSopenharmony_ci		if (!lws_ring_insert(pss->mi->ring, &amsg, 1)) {
447d4afb5ceSopenharmony_ci			__mirror_destroy_message(&amsg);
448d4afb5ceSopenharmony_ci			lwsl_notice("dropping!\n");
449d4afb5ceSopenharmony_ci			if (pss->mi->rx_enabled)
450d4afb5ceSopenharmony_ci				__mirror_rxflow_instance(pss->mi, 0);
451d4afb5ceSopenharmony_ci			goto req_writable;
452d4afb5ceSopenharmony_ci		}
453d4afb5ceSopenharmony_ci
454d4afb5ceSopenharmony_ci		if (pss->mi->rx_enabled &&
455d4afb5ceSopenharmony_ci		    lws_ring_get_count_free_elements(pss->mi->ring) <
456d4afb5ceSopenharmony_ci								    RXFLOW_MIN)
457d4afb5ceSopenharmony_ci			__mirror_rxflow_instance(pss->mi, 0);
458d4afb5ceSopenharmony_ci
459d4afb5ceSopenharmony_cireq_writable:
460d4afb5ceSopenharmony_ci		__mirror_callback_all_in_mi_on_writable(pss->mi);
461d4afb5ceSopenharmony_ci
462d4afb5ceSopenharmony_cidone2:
463d4afb5ceSopenharmony_ci		lws_pthread_mutex_unlock(&pss->mi->lock); /* } mi lock */
464d4afb5ceSopenharmony_ci		break;
465d4afb5ceSopenharmony_ci
466d4afb5ceSopenharmony_ci	case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
467d4afb5ceSopenharmony_ci		lwsl_info("LWS_CALLBACK_EVENT_WAIT_CANCELLED\n");
468d4afb5ceSopenharmony_ci		break;
469d4afb5ceSopenharmony_ci
470d4afb5ceSopenharmony_ci	default:
471d4afb5ceSopenharmony_ci		break;
472d4afb5ceSopenharmony_ci	}
473d4afb5ceSopenharmony_ci
474d4afb5ceSopenharmony_ci	return 0;
475d4afb5ceSopenharmony_ci}
476d4afb5ceSopenharmony_ci
477d4afb5ceSopenharmony_ci#define LWS_PLUGIN_PROTOCOL_MIRROR { \
478d4afb5ceSopenharmony_ci		"lws-mirror-protocol", \
479d4afb5ceSopenharmony_ci		callback_lws_mirror, \
480d4afb5ceSopenharmony_ci		sizeof(struct per_session_data__lws_mirror), \
481d4afb5ceSopenharmony_ci		4096, /* rx buf size must be >= permessage-deflate rx size */ \
482d4afb5ceSopenharmony_ci		0, NULL, 0 \
483d4afb5ceSopenharmony_ci	}
484d4afb5ceSopenharmony_ci
485d4afb5ceSopenharmony_ci#if !defined (LWS_PLUGIN_STATIC)
486d4afb5ceSopenharmony_ci
487d4afb5ceSopenharmony_ciLWS_VISIBLE const struct lws_protocols lws_mirror_protocols[] = {
488d4afb5ceSopenharmony_ci	LWS_PLUGIN_PROTOCOL_MIRROR
489d4afb5ceSopenharmony_ci};
490d4afb5ceSopenharmony_ci
491d4afb5ceSopenharmony_ciLWS_VISIBLE const lws_plugin_protocol_t lws_mirror = {
492d4afb5ceSopenharmony_ci	.hdr = {
493d4afb5ceSopenharmony_ci		"lws mirror",
494d4afb5ceSopenharmony_ci		"lws_protocol_plugin",
495d4afb5ceSopenharmony_ci		LWS_BUILD_HASH,
496d4afb5ceSopenharmony_ci		LWS_PLUGIN_API_MAGIC
497d4afb5ceSopenharmony_ci	},
498d4afb5ceSopenharmony_ci
499d4afb5ceSopenharmony_ci	.protocols = lws_mirror_protocols,
500d4afb5ceSopenharmony_ci	.count_protocols = LWS_ARRAY_SIZE(lws_mirror_protocols),
501d4afb5ceSopenharmony_ci	.extensions = NULL,
502d4afb5ceSopenharmony_ci	.count_extensions = 0,
503d4afb5ceSopenharmony_ci};
504d4afb5ceSopenharmony_ci
505d4afb5ceSopenharmony_ci#endif
506