1/* 2 * libwebsockets - small server side websockets and web server implementation 3 * 4 * Copyright (C) 2010 - 2019 Andy Green <andy@warmcat.com> 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining a copy 7 * of this software and associated documentation files (the "Software"), to 8 * deal in the Software without restriction, including without limitation the 9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 10 * sell copies of the Software, and to permit persons to whom the Software is 11 * furnished to do so, subject to the following conditions: 12 * 13 * The above copyright notice and this permission notice shall be included in 14 * all copies or substantial portions of the Software. 15 * 16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 22 * IN THE SOFTWARE. 23 */ 24 25#include "private-lib-core.h" 26 27/* 28 * per pending event 29 */ 30typedef struct lws_seq_event { 31 struct lws_dll2 seq_event_list; 32 33 void *data; 34 void *aux; 35 lws_seq_events_t e; 36} lws_seq_event_t; 37 38/* 39 * per sequencer 40 */ 41typedef struct lws_sequencer { 42 struct lws_dll2 seq_list; 43 44 lws_sorted_usec_list_t sul_timeout; 45 lws_sorted_usec_list_t sul_pending; 46 47 struct lws_dll2_owner seq_event_owner; 48 struct lws_context_per_thread *pt; 49 lws_seq_event_cb cb; 50 const char *name; 51 const lws_retry_bo_t *retry; 52 53 lws_usec_t time_created; 54 lws_usec_t timeout; /* 0 or time we timeout */ 55 56 uint8_t going_down:1; 57 uint8_t wakesuspend:1; 58} lws_seq_t; 59 60#define QUEUE_SANITY_LIMIT 10 61 62static void 63lws_sul_seq_heartbeat_cb(lws_sorted_usec_list_t *sul) 64{ 65 struct lws_context_per_thread *pt = lws_container_of(sul, 66 struct lws_context_per_thread, sul_seq_heartbeat); 67 68 /* send every sequencer a heartbeat message... it can ignore it */ 69 70 lws_start_foreach_dll_safe(struct lws_dll2 *, p, tp, 71 lws_dll2_get_head(&pt->seq_owner)) { 72 lws_seq_t *s = lws_container_of(p, lws_seq_t, seq_list); 73 74 /* queue the message to inform the sequencer */ 75 lws_seq_queue_event(s, LWSSEQ_HEARTBEAT, NULL, NULL); 76 77 } lws_end_foreach_dll_safe(p, tp); 78 79 /* schedule the next one */ 80 81 __lws_sul_insert_us(&pt->pt_sul_owner[LWSSULLI_MISS_IF_SUSPENDED], 82 &pt->sul_seq_heartbeat, LWS_US_PER_SEC); 83} 84 85int 86lws_seq_pt_init(struct lws_context_per_thread *pt) 87{ 88 pt->sul_seq_heartbeat.cb = lws_sul_seq_heartbeat_cb; 89 90 /* schedule the first heartbeat */ 91 __lws_sul_insert_us(&pt->pt_sul_owner[LWSSULLI_MISS_IF_SUSPENDED], 92 &pt->sul_seq_heartbeat, LWS_US_PER_SEC); 93 94 return 0; 95} 96 97lws_seq_t * 98lws_seq_create(lws_seq_info_t *i) 99{ 100 struct lws_context_per_thread *pt = &i->context->pt[i->tsi]; 101 lws_seq_t *seq = lws_zalloc(sizeof(*seq) + i->user_size, __func__); 102 103 if (!seq) 104 return NULL; 105 106 seq->cb = i->cb; 107 seq->pt = pt; 108 seq->name = i->name; 109 seq->retry = i->retry; 110 seq->wakesuspend = i->wakesuspend; 111 112 *i->puser = (void *)&seq[1]; 113 114 /* add the sequencer to the pt */ 115 116 lws_pt_lock(pt, __func__); /* ---------------------------------- pt { */ 117 118 lws_dll2_add_tail(&seq->seq_list, &pt->seq_owner); 119 120 lws_pt_unlock(pt); /* } pt ------------------------------------------ */ 121 122 seq->time_created = lws_now_usecs(); 123 124 /* try to queue the creation cb */ 125 126 if (lws_seq_queue_event(seq, LWSSEQ_CREATED, NULL, NULL)) { 127 lws_dll2_remove(&seq->seq_list); 128 lws_free(seq); 129 130 return NULL; 131 } 132 133 return seq; 134} 135 136static int 137seq_ev_destroy(struct lws_dll2 *d, void *user) 138{ 139 lws_seq_event_t *seqe = lws_container_of(d, lws_seq_event_t, 140 seq_event_list); 141 142 lws_dll2_remove(&seqe->seq_event_list); 143 lws_free(seqe); 144 145 return 0; 146} 147 148void 149lws_seq_destroy(lws_seq_t **pseq) 150{ 151 lws_seq_t *seq = *pseq; 152 153 /* defeat another thread racing to add events while we are destroying */ 154 seq->going_down = 1; 155 156 seq->cb(seq, (void *)&seq[1], LWSSEQ_DESTROYED, NULL, NULL); 157 158 lws_pt_lock(seq->pt, __func__); /* -------------------------- pt { */ 159 160 lws_dll2_remove(&seq->seq_list); 161 lws_dll2_remove(&seq->sul_timeout.list); 162 lws_dll2_remove(&seq->sul_pending.list); 163 /* remove and destroy any pending events */ 164 lws_dll2_foreach_safe(&seq->seq_event_owner, NULL, seq_ev_destroy); 165 166 lws_pt_unlock(seq->pt); /* } pt ---------------------------------- */ 167 168 169 lws_free_set_NULL(seq); 170} 171 172void 173lws_seq_destroy_all_on_pt(struct lws_context_per_thread *pt) 174{ 175 lws_start_foreach_dll_safe(struct lws_dll2 *, p, tp, 176 pt->seq_owner.head) { 177 lws_seq_t *s = lws_container_of(p, lws_seq_t, 178 seq_list); 179 180 lws_seq_destroy(&s); 181 182 } lws_end_foreach_dll_safe(p, tp); 183} 184 185static void 186lws_seq_sul_pending_cb(lws_sorted_usec_list_t *sul) 187{ 188 lws_seq_t *seq = lws_container_of(sul, lws_seq_t, sul_pending); 189 lws_seq_event_t *seqe; 190 struct lws_dll2 *dh; 191 int n; 192 193 if (!seq->seq_event_owner.count) 194 return; 195 196 /* events are only added at tail, so no race possible yet... */ 197 198 dh = lws_dll2_get_head(&seq->seq_event_owner); 199 seqe = lws_container_of(dh, lws_seq_event_t, seq_event_list); 200 201 n = (int)seq->cb(seq, (void *)&seq[1], (int)seqe->e, seqe->data, seqe->aux); 202 203 /* ... have to lock here though, because we will change the list */ 204 205 lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */ 206 207 /* detach event from sequencer event list and free it */ 208 lws_dll2_remove(&seqe->seq_event_list); 209 lws_free(seqe); 210 lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */ 211 212 if (n) { 213 lwsl_info("%s: destroying seq '%s' by request\n", __func__, 214 seq->name); 215 lws_seq_destroy(&seq); 216 } 217} 218 219int 220lws_seq_queue_event(lws_seq_t *seq, lws_seq_events_t e, void *data, void *aux) 221{ 222 lws_seq_event_t *seqe; 223 224 if (!seq || seq->going_down) 225 return 1; 226 227 seqe = lws_zalloc(sizeof(*seqe), __func__); 228 if (!seqe) 229 return 1; 230 231 seqe->e = e; 232 seqe->data = data; 233 seqe->aux = aux; 234 235 // lwsl_notice("%s: seq %s: event %d\n", __func__, seq->name, e); 236 237 lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */ 238 239 if (seq->seq_event_owner.count > QUEUE_SANITY_LIMIT) { 240 lwsl_err("%s: more than %d events queued\n", __func__, 241 QUEUE_SANITY_LIMIT); 242 } 243 244 lws_dll2_add_tail(&seqe->seq_event_list, &seq->seq_event_owner); 245 246 seq->sul_pending.cb = lws_seq_sul_pending_cb; 247 __lws_sul_insert_us(&seq->pt->pt_sul_owner[seq->wakesuspend], 248 &seq->sul_pending, 1); 249 250 lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */ 251 252 return 0; 253} 254 255/* 256 * Check if wsi still extant, by peeking in the message queue for a 257 * LWSSEQ_WSI_CONN_CLOSE message about wsi. (Doesn't need to do the same for 258 * CONN_FAIL since that will never have produced any messages prior to that). 259 * 260 * Use this to avoid trying to perform operations on wsi that have already 261 * closed but we didn't get to that message yet. 262 * 263 * Returns 0 if not closed yet or 1 if it has closed but we didn't process the 264 * close message yet. 265 */ 266 267int 268lws_seq_check_wsi(lws_seq_t *seq, struct lws *wsi) 269{ 270 lws_seq_event_t *seqe; 271 struct lws_dll2 *dh; 272 273 lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */ 274 275 dh = lws_dll2_get_head(&seq->seq_event_owner); 276 while (dh) { 277 seqe = lws_container_of(dh, lws_seq_event_t, seq_event_list); 278 279 if (seqe->e == LWSSEQ_WSI_CONN_CLOSE && seqe->data == wsi) 280 break; 281 282 dh = dh->next; 283 } 284 285 lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */ 286 287 return !!dh; 288} 289 290 291static void 292lws_seq_sul_timeout_cb(lws_sorted_usec_list_t *sul) 293{ 294 lws_seq_t *s = lws_container_of(sul, lws_seq_t, sul_timeout); 295 296 lws_seq_queue_event(s, LWSSEQ_TIMED_OUT, NULL, NULL); 297} 298 299/* set us to LWS_SET_TIMER_USEC_CANCEL to remove timeout */ 300 301int 302lws_seq_timeout_us(lws_seq_t *seq, lws_usec_t us) 303{ 304 seq->sul_timeout.cb = lws_seq_sul_timeout_cb; 305 /* list is always at the very top of the sul */ 306 __lws_sul_insert_us(&seq->pt->pt_sul_owner[seq->wakesuspend], 307 (lws_sorted_usec_list_t *)&seq->sul_timeout.list, us); 308 309 return 0; 310} 311 312lws_seq_t * 313lws_seq_from_user(void *u) 314{ 315 return &((lws_seq_t *)u)[-1]; 316} 317 318const char * 319lws_seq_name(lws_seq_t *seq) 320{ 321 return seq->name; 322} 323 324lws_usec_t 325lws_seq_us_since_creation(lws_seq_t *seq) 326{ 327 return lws_now_usecs() - seq->time_created; 328} 329 330struct lws_context * 331lws_seq_get_context(lws_seq_t *seq) 332{ 333 return seq->pt->context; 334} 335 336