1/*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25#include "private-lib-core.h"
26
27int
28_lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
29{
30#if !defined(LWS_WITH_EVENT_LIBS)
31	volatile struct lws_context_per_thread *vpt;
32#endif
33	struct lws_context_per_thread *pt;
34	struct lws_context *context;
35	int ret = 0, pa_events;
36	struct lws_pollfd *pfd;
37	int sampled_tid, tid;
38
39	if (!wsi)
40		return 0;
41
42	assert(wsi->position_in_fds_table == LWS_NO_FDS_POS ||
43	       wsi->position_in_fds_table >= 0);
44
45	if (wsi->position_in_fds_table == LWS_NO_FDS_POS)
46		return 0;
47
48	if (((volatile struct lws *)wsi)->handling_pollout &&
49	    !_and && _or == LWS_POLLOUT) {
50		/*
51		 * Happening alongside service thread handling POLLOUT.
52		 * The danger is when he is finished, he will disable POLLOUT,
53		 * countermanding what we changed here.
54		 *
55		 * Instead of changing the fds, inform the service thread
56		 * what happened, and ask it to leave POLLOUT active on exit
57		 */
58		((volatile struct lws *)wsi)->leave_pollout_active = 1;
59		/*
60		 * by definition service thread is not in poll wait, so no need
61		 * to cancel service
62		 */
63
64		lwsl_wsi_debug(wsi, "using leave_pollout_active");
65
66		return 0;
67	}
68
69	context = wsi->a.context;
70	pt = &context->pt[(int)wsi->tsi];
71
72	assert(wsi->position_in_fds_table < (int)pt->fds_count);
73
74#if !defined(LWS_WITH_EVENT_LIBS)
75	/*
76	 * This only applies when we use the default poll() event loop.
77	 *
78	 * BSD can revert pa->events at any time, when the kernel decides to
79	 * exit from poll().  We can't protect against it using locking.
80	 *
81	 * Therefore we must check first if the service thread is in poll()
82	 * wait; if so, we know we must be being called from a foreign thread,
83	 * and we must keep a strictly ordered list of changes we made instead
84	 * of trying to apply them, since when poll() exits, which may happen
85	 * at any time it would revert our changes.
86	 *
87	 * The plat code will apply them when it leaves the poll() wait
88	 * before doing anything else.
89	 */
90
91	vpt = (volatile struct lws_context_per_thread *)pt;
92
93	vpt->foreign_spinlock = 1;
94	lws_memory_barrier();
95
96	if (vpt->inside_poll) {
97		struct lws_foreign_thread_pollfd *ftp, **ftp1;
98		/*
99		 * We are certainly a foreign thread trying to change events
100		 * while the service thread is in the poll() wait.
101		 *
102		 * Create a list of changes to be applied after poll() exit,
103		 * instead of trying to apply them now.
104		 */
105		ftp = lws_malloc(sizeof(*ftp), "ftp");
106		if (!ftp) {
107			vpt->foreign_spinlock = 0;
108			lws_memory_barrier();
109			ret = -1;
110			goto bail;
111		}
112
113		ftp->_and = _and;
114		ftp->_or = _or;
115		ftp->fd_index = wsi->position_in_fds_table;
116		ftp->next = NULL;
117
118		lws_pt_lock(pt, __func__);
119
120		/* place at END of list to maintain order */
121		ftp1 = (struct lws_foreign_thread_pollfd **)
122						&vpt->foreign_pfd_list;
123		while (*ftp1)
124			ftp1 = &((*ftp1)->next);
125
126		*ftp1 = ftp;
127		vpt->foreign_spinlock = 0;
128		lws_memory_barrier();
129
130		lws_pt_unlock(pt);
131
132		lws_cancel_service_pt(wsi);
133
134		return 0;
135	}
136
137	vpt->foreign_spinlock = 0;
138	lws_memory_barrier();
139#endif
140
141#if !defined(__linux__) && !defined(WIN32)
142	/* OSX couldn't see close on stdin pipe side otherwise; WSAPOLL
143	 * blows up if we give it POLLHUP
144	 */
145	_or |= LWS_POLLHUP;
146#endif
147
148	pfd = &pt->fds[wsi->position_in_fds_table];
149	pa->fd = wsi->desc.sockfd;
150	lwsl_wsi_debug(wsi, "fd %d events %d -> %d", pa->fd, pfd->events,
151						(pfd->events & ~_and) | _or);
152	pa->prev_events = pfd->events;
153	pa->events = pfd->events = (short)((pfd->events & ~_and) | _or);
154
155	if (wsi->mux_substream)
156		return 0;
157
158#if defined(LWS_WITH_EXTERNAL_POLL)
159
160	if (wsi->a.vhost &&
161	    wsi->a.vhost->protocols[0].callback(wsi,
162			    	    	      LWS_CALLBACK_CHANGE_MODE_POLL_FD,
163					      wsi->user_space, (void *)pa, 0)) {
164		ret = -1;
165		goto bail;
166	}
167#endif
168
169	if (context->event_loop_ops->io) {
170		if (_and & LWS_POLLIN)
171			context->event_loop_ops->io(wsi,
172					LWS_EV_STOP | LWS_EV_READ);
173
174		if (_or & LWS_POLLIN)
175			context->event_loop_ops->io(wsi,
176					LWS_EV_START | LWS_EV_READ);
177
178		if (_and & LWS_POLLOUT)
179			context->event_loop_ops->io(wsi,
180					LWS_EV_STOP | LWS_EV_WRITE);
181
182		if (_or & LWS_POLLOUT)
183			context->event_loop_ops->io(wsi,
184					LWS_EV_START | LWS_EV_WRITE);
185	}
186
187	/*
188	 * if we changed something in this pollfd...
189	 *   ... and we're running in a different thread context
190	 *     than the service thread...
191	 *       ... and the service thread is waiting ...
192	 *         then cancel it to force a restart with our changed events
193	 */
194	pa_events = pa->prev_events != pa->events;
195	pfd->events = (short)pa->events;
196
197	if (pa_events) {
198		if (lws_plat_change_pollfd(context, wsi, pfd)) {
199			lwsl_wsi_info(wsi, "failed");
200			ret = -1;
201			goto bail;
202		}
203		sampled_tid = pt->service_tid;
204		if (sampled_tid && wsi->a.vhost) {
205			tid = wsi->a.vhost->protocols[0].callback(wsi,
206				     LWS_CALLBACK_GET_THREAD_ID, NULL, NULL, 0);
207			if (tid == -1) {
208				ret = -1;
209				goto bail;
210			}
211			if (tid != sampled_tid)
212				lws_cancel_service_pt(wsi);
213		}
214	}
215
216bail:
217	return ret;
218}
219
220#if defined(LWS_WITH_SERVER)
221/*
222 * Enable or disable listen sockets on this pt globally...
223 * it's modulated according to the pt having space for a new accept.
224 */
225static void
226lws_accept_modulation(struct lws_context *context,
227		      struct lws_context_per_thread *pt, int allow)
228{
229	struct lws_vhost *vh = context->vhost_list;
230	struct lws_pollargs pa1;
231
232	while (vh) {
233		lws_start_foreach_dll(struct lws_dll2 *, d,
234				      lws_dll2_get_head(&vh->listen_wsi)) {
235			struct lws *wsi = lws_container_of(d, struct lws,
236							   listen_list);
237
238			_lws_change_pollfd(wsi, allow ? 0 : LWS_POLLIN,
239						allow ? LWS_POLLIN : 0, &pa1);
240		} lws_end_foreach_dll(d);
241
242		vh = vh->vhost_next;
243	}
244}
245#endif
246
247#if _LWS_ENABLED_LOGS & LLL_WARN
248void
249__dump_fds(struct lws_context_per_thread *pt, const char *s)
250{
251	unsigned int n;
252
253	lwsl_cx_warn(pt->context, "fds_count %u, %s", pt->fds_count, s);
254
255	for (n = 0; n < pt->fds_count; n++) {
256		struct lws *wsi = wsi_from_fd(pt->context, pt->fds[n].fd);
257
258		lwsl_cx_warn(pt->context, "  %d: fd %d, wsi %s, pos_in_fds: %d",
259			n + 1, pt->fds[n].fd, lws_wsi_tag(wsi),
260			wsi ? wsi->position_in_fds_table : -1);
261	}
262}
263#else
264#define __dump_fds(x, y)
265#endif
266
267int
268__insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi)
269{
270#if defined(LWS_WITH_EXTERNAL_POLL)
271	struct lws_pollargs pa = { wsi->desc.sockfd, LWS_POLLIN, 0 };
272#endif
273	struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
274	int ret = 0;
275
276//	__dump_fds(pt, "pre insert");
277
278	lws_pt_assert_lock_held(pt);
279
280	lwsl_wsi_debug(wsi, "tsi=%d, sock=%d, pos-in-fds=%d",
281			wsi->tsi, wsi->desc.sockfd, pt->fds_count);
282
283	if ((unsigned int)pt->fds_count >= context->fd_limit_per_thread) {
284		lwsl_cx_err(context, "Too many fds (%d vs %d)", context->max_fds,
285				context->fd_limit_per_thread);
286		return 1;
287	}
288
289#if !defined(_WIN32)
290	if (!wsi->a.context->max_fds_unrelated_to_ulimit &&
291	    wsi->desc.sockfd - lws_plat_socket_offset() >= (int)context->max_fds) {
292		lwsl_cx_err(context, "Socket fd %d is too high (%d) offset %d",
293			 wsi->desc.sockfd, context->max_fds,
294			 lws_plat_socket_offset());
295		return 1;
296	}
297#endif
298
299	assert(wsi);
300
301#if defined(LWS_WITH_NETLINK)
302	assert(wsi->event_pipe || wsi->a.vhost || wsi == pt->context->netlink);
303#else
304	assert(wsi->event_pipe || wsi->a.vhost);
305#endif
306	assert(lws_socket_is_valid(wsi->desc.sockfd));
307
308#if defined(LWS_WITH_EXTERNAL_POLL)
309
310	if (wsi->a.vhost &&
311	    wsi->a.vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
312					   wsi->user_space, (void *) &pa, 1))
313		return -1;
314#endif
315
316	if (insert_wsi(context, wsi))
317		return -1;
318	pt->count_conns++;
319	wsi->position_in_fds_table = (int)pt->fds_count;
320
321	pt->fds[wsi->position_in_fds_table].fd = wsi->desc.sockfd;
322	pt->fds[wsi->position_in_fds_table].events = LWS_POLLIN;
323#if defined(LWS_WITH_EXTERNAL_POLL)
324	pa.events = pt->fds[pt->fds_count].events;
325#endif
326
327	lws_plat_insert_socket_into_fds(context, wsi);
328
329#if defined(LWS_WITH_EXTERNAL_POLL)
330
331	/* external POLL support via protocol 0 */
332	if (wsi->a.vhost &&
333	    wsi->a.vhost->protocols[0].callback(wsi, LWS_CALLBACK_ADD_POLL_FD,
334					   wsi->user_space, (void *) &pa, 0))
335		ret =  -1;
336#endif
337#if defined(LWS_WITH_SERVER)
338	/* if no more room, defeat accepts on this service thread */
339	if ((unsigned int)pt->fds_count == context->fd_limit_per_thread - 1)
340		lws_accept_modulation(context, pt, 0);
341#endif
342
343#if defined(LWS_WITH_EXTERNAL_POLL)
344	if (wsi->a.vhost &&
345	    wsi->a.vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
346					   wsi->user_space, (void *)&pa, 1))
347		ret = -1;
348#endif
349
350//	__dump_fds(pt, "post insert");
351
352	return ret;
353}
354
355/* requires pt lock */
356
357int
358__remove_wsi_socket_from_fds(struct lws *wsi)
359{
360	struct lws_context *context = wsi->a.context;
361#if defined(LWS_WITH_EXTERNAL_POLL)
362	struct lws_pollargs pa = { wsi->desc.sockfd, 0, 0 };
363#endif
364	struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
365	struct lws *end_wsi;
366	int v, m, ret = 0;
367
368	lws_pt_assert_lock_held(pt);
369
370//	__dump_fds(pt, "pre remove");
371
372#if !defined(_WIN32)
373	if (!wsi->a.context->max_fds_unrelated_to_ulimit &&
374	    wsi->desc.sockfd - lws_plat_socket_offset() > (int)context->max_fds) {
375		lwsl_wsi_err(wsi, "fd %d too high (%d)",
376				   wsi->desc.sockfd,
377				   context->max_fds);
378
379		return 1;
380	}
381#endif
382#if defined(LWS_WITH_EXTERNAL_POLL)
383	if (wsi->a.vhost && wsi->a.vhost->protocols &&
384	    wsi->a.vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
385					   wsi->user_space, (void *)&pa, 1))
386		return -1;
387#endif
388
389	__lws_same_vh_protocol_remove(wsi);
390
391	/* the guy who is to be deleted's slot index in pt->fds */
392	m = wsi->position_in_fds_table;
393
394	/* these are the only valid possibilities for position_in_fds_table */
395	assert(m == LWS_NO_FDS_POS || (m >= 0 && (unsigned int)m < pt->fds_count));
396
397	if (context->event_loop_ops->io)
398		context->event_loop_ops->io(wsi, LWS_EV_STOP | LWS_EV_READ |
399							       LWS_EV_WRITE);
400/*
401	lwsl_notice("%s: wsi=%s, skt=%d, fds pos=%d, end guy pos=%d, endfd=%d\n",
402		  __func__, lws_wsi_tag(wsi), wsi->desc.sockfd, wsi->position_in_fds_table,
403		  pt->fds_count, pt->fds[pt->fds_count - 1].fd); */
404
405	if (m != LWS_NO_FDS_POS) {
406		char fixup = 0;
407
408		assert(pt->fds_count && (unsigned int)m != pt->fds_count);
409
410		/* deletion guy's lws_lookup entry needs nuking */
411		delete_from_fd(context, wsi->desc.sockfd);
412
413		if ((unsigned int)m != pt->fds_count - 1) {
414			/* have the last guy take up the now vacant slot */
415			pt->fds[m] = pt->fds[pt->fds_count - 1];
416			fixup = 1;
417		}
418
419		pt->fds[pt->fds_count - 1].fd = -1;
420
421		/* this decrements pt->fds_count */
422		lws_plat_delete_socket_from_fds(context, wsi, m);
423		pt->count_conns--;
424		if (fixup) {
425			v = (int) pt->fds[m].fd;
426			/* old end guy's "position in fds table" is now the
427			 * deletion guy's old one */
428			end_wsi = wsi_from_fd(context, v);
429			if (!end_wsi) {
430				lwsl_wsi_err(wsi, "no wsi for fd %d pos %d, "
431						  "pt->fds_count=%d",
432						  (int)pt->fds[m].fd, m,
433						  pt->fds_count);
434				// assert(0);
435			} else
436				end_wsi->position_in_fds_table = m;
437		}
438
439		/* removed wsi has no position any more */
440		wsi->position_in_fds_table = LWS_NO_FDS_POS;
441
442#if defined(LWS_WITH_EXTERNAL_POLL)
443		/* remove also from external POLL support via protocol 0 */
444		if (lws_socket_is_valid(wsi->desc.sockfd) && wsi->a.vhost &&
445		    wsi->a.vhost->protocols[0].callback(wsi,
446						        LWS_CALLBACK_DEL_POLL_FD,
447						        wsi->user_space,
448						        (void *) &pa, 0))
449			ret = -1;
450#endif
451	}
452
453#if defined(LWS_WITH_SERVER)
454	if (!context->being_destroyed &&
455	    /* if this made some room, accept connects on this thread */
456	    (unsigned int)pt->fds_count < context->fd_limit_per_thread - 1)
457		lws_accept_modulation(context, pt, 1);
458#endif
459
460#if defined(LWS_WITH_EXTERNAL_POLL)
461	if (wsi->a.vhost &&
462	    wsi->a.vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
463					      wsi->user_space, (void *) &pa, 1))
464		ret = -1;
465#endif
466
467//	__dump_fds(pt, "post remove");
468
469	return ret;
470}
471
472int
473__lws_change_pollfd(struct lws *wsi, int _and, int _or)
474{
475	struct lws_context *context;
476	struct lws_pollargs pa;
477	int ret = 0;
478
479	if (!wsi || (!wsi->a.protocol && !wsi->event_pipe) ||
480	    wsi->position_in_fds_table == LWS_NO_FDS_POS)
481		return 0;
482
483	context = lws_get_context(wsi);
484	if (!context)
485		return 1;
486
487#if defined(LWS_WITH_EXTERNAL_POLL)
488	if (wsi->a.vhost &&
489	    wsi->a.vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
490					      wsi->user_space, (void *) &pa, 0))
491		return -1;
492#endif
493
494	ret = _lws_change_pollfd(wsi, _and, _or, &pa);
495
496#if defined(LWS_WITH_EXTERNAL_POLL)
497	if (wsi->a.vhost &&
498	    wsi->a.vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
499					   wsi->user_space, (void *) &pa, 0))
500		ret = -1;
501#endif
502
503	return ret;
504}
505
506int
507lws_change_pollfd(struct lws *wsi, int _and, int _or)
508{
509	struct lws_context_per_thread *pt;
510	int ret = 0;
511
512	pt = &wsi->a.context->pt[(int)wsi->tsi];
513
514	lws_pt_lock(pt, __func__);
515	ret = __lws_change_pollfd(wsi, _and, _or);
516	lws_pt_unlock(pt);
517
518	return ret;
519}
520
521int
522lws_callback_on_writable(struct lws *wsi)
523{
524	struct lws *w = wsi;
525
526	if (lwsi_state(wsi) == LRS_SHUTDOWN)
527		return 0;
528
529	if (wsi->socket_is_permanently_unusable)
530		return 0;
531
532	if (lws_rops_fidx(wsi->role_ops, LWS_ROPS_callback_on_writable)) {
533		int q = lws_rops_func_fidx(wsi->role_ops,
534					   LWS_ROPS_callback_on_writable).
535						      callback_on_writable(wsi);
536		if (q)
537			return 1;
538		w = lws_get_network_wsi(wsi);
539	} else
540		if (w->position_in_fds_table == LWS_NO_FDS_POS) {
541			lwsl_wsi_debug(wsi, "failed to find socket %d",
542					    wsi->desc.sockfd);
543			return -1;
544		}
545
546	if (__lws_change_pollfd(w, 0, LWS_POLLOUT))
547		return -1;
548
549	return 1;
550}
551
552
553/*
554 * stitch protocol choice into the vh protocol linked list
555 * We always insert ourselves at the start of the list
556 *
557 * X <-> B
558 * X <-> pAn <-> pB
559 *
560 * Illegal to attach more than once without detach inbetween
561 */
562void
563lws_same_vh_protocol_insert(struct lws *wsi, int n)
564{
565	lws_context_lock(wsi->a.context, __func__);
566	lws_vhost_lock(wsi->a.vhost);
567
568	lws_dll2_remove(&wsi->same_vh_protocol);
569	lws_dll2_add_head(&wsi->same_vh_protocol,
570			  &wsi->a.vhost->same_vh_protocol_owner[n]);
571
572	wsi->bound_vhost_index = (uint8_t)n;
573
574	lws_vhost_unlock(wsi->a.vhost);
575	lws_context_unlock(wsi->a.context);
576}
577
578void
579__lws_same_vh_protocol_remove(struct lws *wsi)
580{
581	if (wsi->a.vhost && wsi->a.vhost->same_vh_protocol_owner)
582		lws_dll2_remove(&wsi->same_vh_protocol);
583}
584
585void
586lws_same_vh_protocol_remove(struct lws *wsi)
587{
588	if (!wsi->a.vhost)
589		return;
590
591	lws_context_lock(wsi->a.context, __func__);
592	lws_vhost_lock(wsi->a.vhost);
593
594	__lws_same_vh_protocol_remove(wsi);
595
596	lws_vhost_unlock(wsi->a.vhost);
597	lws_context_unlock(wsi->a.context);
598}
599
600
601int
602lws_callback_on_writable_all_protocol_vhost(const struct lws_vhost *vhost,
603				           const struct lws_protocols *protocol)
604{
605	struct lws *wsi;
606	int n;
607
608	if (protocol < vhost->protocols ||
609	    protocol >= (vhost->protocols + vhost->count_protocols)) {
610		lwsl_vhost_err((struct lws_vhost *)vhost,
611			       "protocol %p is not from vhost %p (%p - %p)",
612			       protocol, vhost->protocols, vhost,
613				  (vhost->protocols + vhost->count_protocols));
614
615		return -1;
616	}
617
618	n = (int)(protocol - vhost->protocols);
619
620	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
621			lws_dll2_get_head(&vhost->same_vh_protocol_owner[n])) {
622		wsi = lws_container_of(d, struct lws, same_vh_protocol);
623
624		assert(wsi->a.protocol == protocol);
625		lws_callback_on_writable(wsi);
626
627	} lws_end_foreach_dll_safe(d, d1);
628
629	return 0;
630}
631
632int
633lws_callback_on_writable_all_protocol(const struct lws_context *context,
634				      const struct lws_protocols *protocol)
635{
636	struct lws_vhost *vhost;
637	int n;
638
639	if (!context)
640		return 0;
641
642	vhost = context->vhost_list;
643
644	while (vhost) {
645		for (n = 0; n < vhost->count_protocols; n++)
646			if (protocol->callback ==
647			     vhost->protocols[n].callback &&
648			    !strcmp(protocol->name, vhost->protocols[n].name))
649				break;
650		if (n != vhost->count_protocols)
651			lws_callback_on_writable_all_protocol_vhost(
652				vhost, &vhost->protocols[n]);
653
654		vhost = vhost->vhost_next;
655	}
656
657	return 0;
658}
659