1/*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2019 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25#include "private-lib-core.h"
26
27/*
28 * per pending event
29 */
30typedef struct lws_seq_event {
31	struct lws_dll2			seq_event_list;
32
33	void				*data;
34	void				*aux;
35	lws_seq_events_t		e;
36} lws_seq_event_t;
37
38/*
39 * per sequencer
40 */
41typedef struct lws_sequencer {
42	struct lws_dll2			seq_list;
43
44	lws_sorted_usec_list_t		sul_timeout;
45	lws_sorted_usec_list_t		sul_pending;
46
47	struct lws_dll2_owner		seq_event_owner;
48	struct lws_context_per_thread	*pt;
49	lws_seq_event_cb		cb;
50	const char			*name;
51	const lws_retry_bo_t		*retry;
52
53	lws_usec_t			time_created;
54	lws_usec_t			timeout; /* 0 or time we timeout */
55
56	uint8_t				going_down:1;
57	uint8_t				wakesuspend:1;
58} lws_seq_t;
59
60#define QUEUE_SANITY_LIMIT 10
61
62static void
63lws_sul_seq_heartbeat_cb(lws_sorted_usec_list_t *sul)
64{
65	struct lws_context_per_thread *pt = lws_container_of(sul,
66			struct lws_context_per_thread, sul_seq_heartbeat);
67
68	/* send every sequencer a heartbeat message... it can ignore it */
69
70	lws_start_foreach_dll_safe(struct lws_dll2 *, p, tp,
71				   lws_dll2_get_head(&pt->seq_owner)) {
72		lws_seq_t *s = lws_container_of(p, lws_seq_t, seq_list);
73
74		/* queue the message to inform the sequencer */
75		lws_seq_queue_event(s, LWSSEQ_HEARTBEAT, NULL, NULL);
76
77	} lws_end_foreach_dll_safe(p, tp);
78
79	/* schedule the next one */
80
81	__lws_sul_insert_us(&pt->pt_sul_owner[LWSSULLI_MISS_IF_SUSPENDED],
82			    &pt->sul_seq_heartbeat, LWS_US_PER_SEC);
83}
84
85int
86lws_seq_pt_init(struct lws_context_per_thread *pt)
87{
88	pt->sul_seq_heartbeat.cb = lws_sul_seq_heartbeat_cb;
89
90	/* schedule the first heartbeat */
91	__lws_sul_insert_us(&pt->pt_sul_owner[LWSSULLI_MISS_IF_SUSPENDED],
92			    &pt->sul_seq_heartbeat, LWS_US_PER_SEC);
93
94	return 0;
95}
96
97lws_seq_t *
98lws_seq_create(lws_seq_info_t *i)
99{
100	struct lws_context_per_thread *pt = &i->context->pt[i->tsi];
101	lws_seq_t *seq = lws_zalloc(sizeof(*seq) + i->user_size, __func__);
102
103	if (!seq)
104		return NULL;
105
106	seq->cb = i->cb;
107	seq->pt = pt;
108	seq->name = i->name;
109	seq->retry = i->retry;
110	seq->wakesuspend = i->wakesuspend;
111
112	*i->puser = (void *)&seq[1];
113
114	/* add the sequencer to the pt */
115
116	lws_pt_lock(pt, __func__); /* ---------------------------------- pt { */
117
118	lws_dll2_add_tail(&seq->seq_list, &pt->seq_owner);
119
120	lws_pt_unlock(pt); /* } pt ------------------------------------------ */
121
122	seq->time_created = lws_now_usecs();
123
124	/* try to queue the creation cb */
125
126	if (lws_seq_queue_event(seq, LWSSEQ_CREATED, NULL, NULL)) {
127		lws_dll2_remove(&seq->seq_list);
128		lws_free(seq);
129
130		return NULL;
131	}
132
133	return seq;
134}
135
136static int
137seq_ev_destroy(struct lws_dll2 *d, void *user)
138{
139	lws_seq_event_t *seqe = lws_container_of(d, lws_seq_event_t,
140						 seq_event_list);
141
142	lws_dll2_remove(&seqe->seq_event_list);
143	lws_free(seqe);
144
145	return 0;
146}
147
148void
149lws_seq_destroy(lws_seq_t **pseq)
150{
151	lws_seq_t *seq = *pseq;
152
153	/* defeat another thread racing to add events while we are destroying */
154	seq->going_down = 1;
155
156	seq->cb(seq, (void *)&seq[1], LWSSEQ_DESTROYED, NULL, NULL);
157
158	lws_pt_lock(seq->pt, __func__); /* -------------------------- pt { */
159
160	lws_dll2_remove(&seq->seq_list);
161	lws_dll2_remove(&seq->sul_timeout.list);
162	lws_dll2_remove(&seq->sul_pending.list);
163	/* remove and destroy any pending events */
164	lws_dll2_foreach_safe(&seq->seq_event_owner, NULL, seq_ev_destroy);
165
166	lws_pt_unlock(seq->pt); /* } pt ---------------------------------- */
167
168
169	lws_free_set_NULL(seq);
170}
171
172void
173lws_seq_destroy_all_on_pt(struct lws_context_per_thread *pt)
174{
175	lws_start_foreach_dll_safe(struct lws_dll2 *, p, tp,
176				   pt->seq_owner.head) {
177		lws_seq_t *s = lws_container_of(p, lws_seq_t,
178						      seq_list);
179
180		lws_seq_destroy(&s);
181
182	} lws_end_foreach_dll_safe(p, tp);
183}
184
185static void
186lws_seq_sul_pending_cb(lws_sorted_usec_list_t *sul)
187{
188	lws_seq_t *seq = lws_container_of(sul, lws_seq_t, sul_pending);
189	lws_seq_event_t *seqe;
190	struct lws_dll2 *dh;
191	int n;
192
193	if (!seq->seq_event_owner.count)
194		return;
195
196	/* events are only added at tail, so no race possible yet... */
197
198	dh = lws_dll2_get_head(&seq->seq_event_owner);
199	seqe = lws_container_of(dh, lws_seq_event_t, seq_event_list);
200
201	n = (int)seq->cb(seq, (void *)&seq[1], (int)seqe->e, seqe->data, seqe->aux);
202
203	/* ... have to lock here though, because we will change the list */
204
205	lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */
206
207	/* detach event from sequencer event list and free it */
208	lws_dll2_remove(&seqe->seq_event_list);
209	lws_free(seqe);
210	lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */
211
212	if (n) {
213		lwsl_info("%s: destroying seq '%s' by request\n", __func__,
214				seq->name);
215		lws_seq_destroy(&seq);
216	}
217}
218
219int
220lws_seq_queue_event(lws_seq_t *seq, lws_seq_events_t e, void *data, void *aux)
221{
222	lws_seq_event_t *seqe;
223
224	if (!seq || seq->going_down)
225		return 1;
226
227	seqe = lws_zalloc(sizeof(*seqe), __func__);
228	if (!seqe)
229		return 1;
230
231	seqe->e = e;
232	seqe->data = data;
233	seqe->aux = aux;
234
235	// lwsl_notice("%s: seq %s: event %d\n", __func__, seq->name, e);
236
237	lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */
238
239	if (seq->seq_event_owner.count > QUEUE_SANITY_LIMIT) {
240		lwsl_err("%s: more than %d events queued\n", __func__,
241			 QUEUE_SANITY_LIMIT);
242	}
243
244	lws_dll2_add_tail(&seqe->seq_event_list, &seq->seq_event_owner);
245
246	seq->sul_pending.cb = lws_seq_sul_pending_cb;
247	__lws_sul_insert_us(&seq->pt->pt_sul_owner[seq->wakesuspend],
248			    &seq->sul_pending, 1);
249
250	lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */
251
252	return 0;
253}
254
255/*
256 * Check if wsi still extant, by peeking in the message queue for a
257 * LWSSEQ_WSI_CONN_CLOSE message about wsi.  (Doesn't need to do the same for
258 * CONN_FAIL since that will never have produced any messages prior to that).
259 *
260 * Use this to avoid trying to perform operations on wsi that have already
261 * closed but we didn't get to that message yet.
262 *
263 * Returns 0 if not closed yet or 1 if it has closed but we didn't process the
264 * close message yet.
265 */
266
267int
268lws_seq_check_wsi(lws_seq_t *seq, struct lws *wsi)
269{
270	lws_seq_event_t *seqe;
271	struct lws_dll2 *dh;
272
273	lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */
274
275	dh = lws_dll2_get_head(&seq->seq_event_owner);
276	while (dh) {
277		seqe = lws_container_of(dh, lws_seq_event_t, seq_event_list);
278
279		if (seqe->e == LWSSEQ_WSI_CONN_CLOSE && seqe->data == wsi)
280			break;
281
282		dh = dh->next;
283	}
284
285	lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */
286
287	return !!dh;
288}
289
290
291static void
292lws_seq_sul_timeout_cb(lws_sorted_usec_list_t *sul)
293{
294	lws_seq_t *s = lws_container_of(sul, lws_seq_t, sul_timeout);
295
296	lws_seq_queue_event(s, LWSSEQ_TIMED_OUT, NULL, NULL);
297}
298
299/* set us to LWS_SET_TIMER_USEC_CANCEL to remove timeout */
300
301int
302lws_seq_timeout_us(lws_seq_t *seq, lws_usec_t us)
303{
304	seq->sul_timeout.cb = lws_seq_sul_timeout_cb;
305	/* list is always at the very top of the sul */
306	__lws_sul_insert_us(&seq->pt->pt_sul_owner[seq->wakesuspend],
307			(lws_sorted_usec_list_t *)&seq->sul_timeout.list, us);
308
309	return 0;
310}
311
312lws_seq_t *
313lws_seq_from_user(void *u)
314{
315	return &((lws_seq_t *)u)[-1];
316}
317
318const char *
319lws_seq_name(lws_seq_t *seq)
320{
321	return seq->name;
322}
323
324lws_usec_t
325lws_seq_us_since_creation(lws_seq_t *seq)
326{
327	return lws_now_usecs() - seq->time_created;
328}
329
330struct lws_context *
331lws_seq_get_context(lws_seq_t *seq)
332{
333	return seq->pt->context;
334}
335
336