1/*
2 * lws-minimal-http-server-sse
3 *
4 * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 * This demonstrates a minimal http server that can serve both normal static
10 * content and server-side event connections.
11 *
12 * To keep it simple, it serves the static stuff from the subdirectory
13 * "./mount-origin" of the directory it was started in.
14 *
15 * You can change that by changing mount.origin below.
16 */
17
18#include <libwebsockets.h>
19#include <string.h>
20#include <stdlib.h>
21#include <signal.h>
22#if defined(WIN32)
23#define HAVE_STRUCT_TIMESPEC
24#if defined(pid_t)
25#undef pid_t
26#endif
27#endif
28#include <pthread.h>
29#include <time.h>
30
31/* one of these created for each message in the ringbuffer */
32
33struct msg {
34	void *payload; /* is malloc'd */
35	size_t len;
36};
37
38/*
39 * Unlike ws, http is a stateless protocol.  This pss only exists for the
40 * duration of a single http transaction.  With http/1.1 keep-alive and http/2,
41 * that is unrelated to (shorter than) the lifetime of the network connection.
42 */
43struct pss {
44	struct pss *pss_list;
45	struct lws *wsi;
46	uint32_t tail;
47};
48
49/* one of these is created for each vhost our protocol is used with */
50
51struct vhd {
52	struct lws_context *context;
53	struct lws_vhost *vhost;
54	const struct lws_protocols *protocol;
55
56	struct pss *pss_list; /* linked-list of live pss*/
57	pthread_t pthread_spam[2];
58
59	pthread_mutex_t lock_ring; /* serialize access to the ring buffer */
60	struct lws_ring *ring; /* ringbuffer holding unsent messages */
61	char finished;
62};
63
64static int interrupted;
65
66#if defined(WIN32)
67static void usleep(unsigned long l) { Sleep(l / 1000); }
68#endif
69
70
71/* destroys the message when everyone has had a copy of it */
72
73static void
74__minimal_destroy_message(void *_msg)
75{
76	struct msg *msg = _msg;
77
78	free(msg->payload);
79	msg->payload = NULL;
80	msg->len = 0;
81}
82
83/*
84 * This runs under the "spam thread" thread context only.
85 *
86 * We spawn two threads that generate messages with this.
87 *
88 */
89
90static void *
91thread_spam(void *d)
92{
93	struct vhd *vhd = (struct vhd *)d;
94	struct msg amsg;
95	int len = 128, index = 1, n, whoami = 0;
96
97	for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
98		if (pthread_equal(pthread_self(), vhd->pthread_spam[n]))
99			whoami = n + 1;
100
101	do {
102		/* don't generate output if nobody connected */
103		if (!vhd->pss_list)
104			goto wait;
105
106		pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */
107
108		/* only create if space in ringbuffer */
109		n = (int)lws_ring_get_count_free_elements(vhd->ring);
110		if (!n) {
111			lwsl_user("dropping!\n");
112			goto wait_unlock;
113		}
114
115		amsg.payload = malloc((unsigned int)len);
116		if (!amsg.payload) {
117			lwsl_user("OOM: dropping\n");
118			goto wait_unlock;
119		}
120		n = lws_snprintf((char *)amsg.payload, (unsigned int)len,
121			         "%s: tid: %d, msg: %d", __func__, whoami, index++);
122		amsg.len = (unsigned int)n;
123		n = (int)lws_ring_insert(vhd->ring, &amsg, 1);
124		if (n != 1) {
125			__minimal_destroy_message(&amsg);
126			lwsl_user("dropping!\n");
127		} else
128			/*
129			 * This will cause a LWS_CALLBACK_EVENT_WAIT_CANCELLED
130			 * in the lws service thread context.
131			 */
132			lws_cancel_service(vhd->context);
133
134wait_unlock:
135		pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */
136
137wait:
138		/* rand() would make more sense but coverity shrieks */
139		usleep((useconds_t)(100000 + (time(NULL) & 0xffff)));
140
141	} while (!vhd->finished);
142
143	lwsl_notice("thread_spam %d exiting\n", whoami);
144
145	pthread_exit(NULL);
146
147	return NULL;
148}
149
150
151static int
152callback_sse(struct lws *wsi, enum lws_callback_reasons reason, void *user,
153	     void *in, size_t len)
154{
155	struct pss *pss = (struct pss *)user;
156	struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
157			lws_get_vhost(wsi), lws_get_protocol(wsi));
158	uint8_t buf[LWS_PRE + LWS_RECOMMENDED_MIN_HEADER_SPACE],
159		*start = &buf[LWS_PRE], *p = start,
160		*end = &buf[sizeof(buf) - 1];
161	const struct msg *pmsg;
162	void *retval;
163	int n;
164
165	switch (reason) {
166
167	/* --- vhost protocol lifecycle --- */
168
169	case LWS_CALLBACK_PROTOCOL_INIT:
170		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
171				lws_get_protocol(wsi), sizeof(struct vhd));
172		vhd->context = lws_get_context(wsi);
173		vhd->protocol = lws_get_protocol(wsi);
174		vhd->vhost = lws_get_vhost(wsi);
175
176		vhd->ring = lws_ring_create(sizeof(struct msg), 8,
177					    __minimal_destroy_message);
178		if (!vhd->ring)
179			return 1;
180
181		pthread_mutex_init(&vhd->lock_ring, NULL);
182
183		/* start the content-creating threads */
184
185		for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
186			if (pthread_create(&vhd->pthread_spam[n], NULL,
187					   thread_spam, vhd)) {
188				lwsl_err("thread creation failed\n");
189				goto init_fail;
190			}
191
192		return 0;
193
194	case LWS_CALLBACK_PROTOCOL_DESTROY:
195		init_fail:
196		vhd->finished = 1;
197		for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
198			pthread_join(vhd->pthread_spam[n], &retval);
199
200		if (vhd->ring)
201			lws_ring_destroy(vhd->ring);
202
203		pthread_mutex_destroy(&vhd->lock_ring);
204		return 0;
205
206	/* --- http connection lifecycle --- */
207
208	case LWS_CALLBACK_HTTP:
209		/*
210		 * `in` contains the url part after our mountpoint /sse, if any
211		 * you can use this to determine what data to return and store
212		 * that in the pss
213		 */
214		lwsl_info("%s: LWS_CALLBACK_HTTP: '%s'\n", __func__,
215			  (const char *)in);
216
217		/* SSE requires a http OK response with this content-type */
218
219		if (lws_add_http_common_headers(wsi, HTTP_STATUS_OK,
220						"text/event-stream",
221						LWS_ILLEGAL_HTTP_CONTENT_LEN,
222						&p, end))
223			return 1;
224
225		if (lws_finalize_write_http_header(wsi, start, &p, end))
226			return 1;
227
228		/* add ourselves to the list of live pss held in the vhd */
229
230		lws_ll_fwd_insert(pss, pss_list, vhd->pss_list);
231		pss->tail = lws_ring_get_oldest_tail(vhd->ring);
232		pss->wsi = wsi;
233
234		/*
235		 * This tells lws we are no longer a normal http stream,
236		 * but are an "immortal" (plus or minus whatever timeout you
237		 * set on it afterwards) SSE stream.  In http/2 case that also
238		 * stops idle timeouts being applied to the network connection
239		 * while this wsi is still open.
240		 */
241		lws_http_mark_sse(wsi);
242
243		/* write the body separately */
244
245		lws_callback_on_writable(wsi);
246
247		return 0;
248
249	case LWS_CALLBACK_CLOSED_HTTP:
250		/* remove our closing pss from the list of live pss */
251
252		lws_ll_fwd_remove(struct pss, pss_list, pss, vhd->pss_list);
253		return 0;
254
255	/* --- data transfer --- */
256
257	case LWS_CALLBACK_HTTP_WRITEABLE:
258
259		lwsl_info("%s: LWS_CALLBACK_HTTP_WRITEABLE\n", __func__);
260
261		pmsg = lws_ring_get_element(vhd->ring, &pss->tail);
262		if (!pmsg)
263			break;
264
265		p += lws_snprintf((char *)p, lws_ptr_diff_size_t(end, p),
266				  "data: %s\x0d\x0a\x0d\x0a",
267				  (const char *)pmsg->payload);
268
269		if (lws_write(wsi, (uint8_t *)start, lws_ptr_diff_size_t(p, start),
270			      LWS_WRITE_HTTP) != lws_ptr_diff(p, start))
271			return 1;
272
273		lws_ring_consume_and_update_oldest_tail(
274			vhd->ring,	/* lws_ring object */
275			struct pss,	/* type of objects with tails */
276			&pss->tail,	/* tail of guy doing the consuming */
277			1,	/* number of payload objects being consumed */
278			vhd->pss_list,	/* head of list of objects with tails */
279			tail,	/* member name of tail in objects with tails */
280			pss_list /* member name of next object in objects with tails */
281		);
282
283		if (lws_ring_get_element(vhd->ring, &pss->tail))
284			/* come back as soon as we can write more */
285			lws_callback_on_writable(pss->wsi);
286
287		return 0;
288
289	case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
290		if (!vhd)
291			break;
292		/*
293		 * let everybody know we want to write something on them
294		 * as soon as they are ready
295		 */
296		lws_start_foreach_llp(struct pss **, ppss, vhd->pss_list) {
297			lws_callback_on_writable((*ppss)->wsi);
298		} lws_end_foreach_llp(ppss, pss_list);
299		return 0;
300
301	default:
302		break;
303	}
304
305	return lws_callback_http_dummy(wsi, reason, user, in, len);
306}
307
308static struct lws_protocols protocols[] = {
309	{ "http", lws_callback_http_dummy, 0, 0, 0, NULL, 0 },
310	{ "sse", callback_sse, sizeof(struct pss), 0, 0, NULL, 0 },
311	LWS_PROTOCOL_LIST_TERM
312};
313
314/* override the default mount for /sse in the URL space */
315
316static const struct lws_http_mount mount_sse = {
317	/* .mount_next */		NULL,		/* linked-list "next" */
318	/* .mountpoint */		"/sse",		/* mountpoint URL */
319	/* .origin */			NULL,		/* protocol */
320	/* .def */			NULL,
321	/* .protocol */			"sse",
322	/* .cgienv */			NULL,
323	/* .extra_mimetypes */		NULL,
324	/* .interpret */		NULL,
325	/* .cgi_timeout */		0,
326	/* .cache_max_age */		0,
327	/* .auth_mask */		0,
328	/* .cache_reusable */		0,
329	/* .cache_revalidate */		0,
330	/* .cache_intermediaries */	0,
331	/* .origin_protocol */		LWSMPRO_CALLBACK, /* dynamic */
332	/* .mountpoint_len */		4,		  /* char count */
333	/* .basic_auth_login_file */	NULL,
334};
335
336/* default mount serves the URL space from ./mount-origin */
337
338static const struct lws_http_mount mount = {
339	/* .mount_next */		&mount_sse,	/* linked-list "next" */
340	/* .mountpoint */		"/",		/* mountpoint URL */
341	/* .origin */			"./mount-origin", /* serve from dir */
342	/* .def */			"index.html",	/* default filename */
343	/* .protocol */			NULL,
344	/* .cgienv */			NULL,
345	/* .extra_mimetypes */		NULL,
346	/* .interpret */		NULL,
347	/* .cgi_timeout */		0,
348	/* .cache_max_age */		0,
349	/* .auth_mask */		0,
350	/* .cache_reusable */		0,
351	/* .cache_revalidate */		0,
352	/* .cache_intermediaries */	0,
353	/* .origin_protocol */		LWSMPRO_FILE,	/* files in a dir */
354	/* .mountpoint_len */		1,		/* char count */
355	/* .basic_auth_login_file */	NULL,
356};
357
358void sigint_handler(int sig)
359{
360	interrupted = 1;
361}
362
363int main(int argc, const char **argv)
364{
365	struct lws_context_creation_info info;
366	struct lws_context *context;
367	const char *p;
368	int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
369			/* for LLL_ verbosity above NOTICE to be built into lws,
370			 * lws must have been configured and built with
371			 * -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */
372			/* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */
373			/* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */
374			/* | LLL_DEBUG */;
375
376	signal(SIGINT, sigint_handler);
377
378	if ((p = lws_cmdline_option(argc, argv, "-d")))
379		logs = atoi(p);
380
381	lws_set_log_level(logs, NULL);
382	lwsl_user("LWS minimal http Server-Side Events + ring | visit http://localhost:7681\n");
383
384	memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
385	info.port = 7681;
386	info.protocols = protocols;
387	info.mounts = &mount;
388	info.options =
389		LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE;
390
391	context = lws_create_context(&info);
392	if (!context) {
393		lwsl_err("lws init failed\n");
394		return 1;
395	}
396
397	while (n >= 0 && !interrupted)
398		n = lws_service(context, 0);
399
400	lws_context_destroy(context);
401
402	return 0;
403}
404