1/* 2 * lws-minimal-http-server-sse 3 * 4 * Written in 2010-2019 by Andy Green <andy@warmcat.com> 5 * 6 * This file is made available under the Creative Commons CC0 1.0 7 * Universal Public Domain Dedication. 8 * 9 * This demonstrates a minimal http server that can serve both normal static 10 * content and server-side event connections. 11 * 12 * To keep it simple, it serves the static stuff from the subdirectory 13 * "./mount-origin" of the directory it was started in. 14 * 15 * You can change that by changing mount.origin below. 16 */ 17 18#include <libwebsockets.h> 19#include <string.h> 20#include <stdlib.h> 21#include <signal.h> 22#if defined(WIN32) 23#define HAVE_STRUCT_TIMESPEC 24#if defined(pid_t) 25#undef pid_t 26#endif 27#endif 28#include <pthread.h> 29#include <time.h> 30 31/* one of these created for each message in the ringbuffer */ 32 33struct msg { 34 void *payload; /* is malloc'd */ 35 size_t len; 36}; 37 38/* 39 * Unlike ws, http is a stateless protocol. This pss only exists for the 40 * duration of a single http transaction. With http/1.1 keep-alive and http/2, 41 * that is unrelated to (shorter than) the lifetime of the network connection. 42 */ 43struct pss { 44 struct pss *pss_list; 45 struct lws *wsi; 46 uint32_t tail; 47}; 48 49/* one of these is created for each vhost our protocol is used with */ 50 51struct vhd { 52 struct lws_context *context; 53 struct lws_vhost *vhost; 54 const struct lws_protocols *protocol; 55 56 struct pss *pss_list; /* linked-list of live pss*/ 57 pthread_t pthread_spam[2]; 58 59 pthread_mutex_t lock_ring; /* serialize access to the ring buffer */ 60 struct lws_ring *ring; /* ringbuffer holding unsent messages */ 61 char finished; 62}; 63 64static int interrupted; 65 66#if defined(WIN32) 67static void usleep(unsigned long l) { Sleep(l / 1000); } 68#endif 69 70 71/* destroys the message when everyone has had a copy of it */ 72 73static void 74__minimal_destroy_message(void *_msg) 75{ 76 struct msg *msg = _msg; 77 78 free(msg->payload); 79 msg->payload = NULL; 80 msg->len = 0; 81} 82 83/* 84 * This runs under the "spam thread" thread context only. 85 * 86 * We spawn two threads that generate messages with this. 87 * 88 */ 89 90static void * 91thread_spam(void *d) 92{ 93 struct vhd *vhd = (struct vhd *)d; 94 struct msg amsg; 95 int len = 128, index = 1, n, whoami = 0; 96 97 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++) 98 if (pthread_equal(pthread_self(), vhd->pthread_spam[n])) 99 whoami = n + 1; 100 101 do { 102 /* don't generate output if nobody connected */ 103 if (!vhd->pss_list) 104 goto wait; 105 106 pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */ 107 108 /* only create if space in ringbuffer */ 109 n = (int)lws_ring_get_count_free_elements(vhd->ring); 110 if (!n) { 111 lwsl_user("dropping!\n"); 112 goto wait_unlock; 113 } 114 115 amsg.payload = malloc((unsigned int)len); 116 if (!amsg.payload) { 117 lwsl_user("OOM: dropping\n"); 118 goto wait_unlock; 119 } 120 n = lws_snprintf((char *)amsg.payload, (unsigned int)len, 121 "%s: tid: %d, msg: %d", __func__, whoami, index++); 122 amsg.len = (unsigned int)n; 123 n = (int)lws_ring_insert(vhd->ring, &amsg, 1); 124 if (n != 1) { 125 __minimal_destroy_message(&amsg); 126 lwsl_user("dropping!\n"); 127 } else 128 /* 129 * This will cause a LWS_CALLBACK_EVENT_WAIT_CANCELLED 130 * in the lws service thread context. 131 */ 132 lws_cancel_service(vhd->context); 133 134wait_unlock: 135 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ 136 137wait: 138 /* rand() would make more sense but coverity shrieks */ 139 usleep((useconds_t)(100000 + (time(NULL) & 0xffff))); 140 141 } while (!vhd->finished); 142 143 lwsl_notice("thread_spam %d exiting\n", whoami); 144 145 pthread_exit(NULL); 146 147 return NULL; 148} 149 150 151static int 152callback_sse(struct lws *wsi, enum lws_callback_reasons reason, void *user, 153 void *in, size_t len) 154{ 155 struct pss *pss = (struct pss *)user; 156 struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get( 157 lws_get_vhost(wsi), lws_get_protocol(wsi)); 158 uint8_t buf[LWS_PRE + LWS_RECOMMENDED_MIN_HEADER_SPACE], 159 *start = &buf[LWS_PRE], *p = start, 160 *end = &buf[sizeof(buf) - 1]; 161 const struct msg *pmsg; 162 void *retval; 163 int n; 164 165 switch (reason) { 166 167 /* --- vhost protocol lifecycle --- */ 168 169 case LWS_CALLBACK_PROTOCOL_INIT: 170 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), 171 lws_get_protocol(wsi), sizeof(struct vhd)); 172 vhd->context = lws_get_context(wsi); 173 vhd->protocol = lws_get_protocol(wsi); 174 vhd->vhost = lws_get_vhost(wsi); 175 176 vhd->ring = lws_ring_create(sizeof(struct msg), 8, 177 __minimal_destroy_message); 178 if (!vhd->ring) 179 return 1; 180 181 pthread_mutex_init(&vhd->lock_ring, NULL); 182 183 /* start the content-creating threads */ 184 185 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++) 186 if (pthread_create(&vhd->pthread_spam[n], NULL, 187 thread_spam, vhd)) { 188 lwsl_err("thread creation failed\n"); 189 goto init_fail; 190 } 191 192 return 0; 193 194 case LWS_CALLBACK_PROTOCOL_DESTROY: 195 init_fail: 196 vhd->finished = 1; 197 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++) 198 pthread_join(vhd->pthread_spam[n], &retval); 199 200 if (vhd->ring) 201 lws_ring_destroy(vhd->ring); 202 203 pthread_mutex_destroy(&vhd->lock_ring); 204 return 0; 205 206 /* --- http connection lifecycle --- */ 207 208 case LWS_CALLBACK_HTTP: 209 /* 210 * `in` contains the url part after our mountpoint /sse, if any 211 * you can use this to determine what data to return and store 212 * that in the pss 213 */ 214 lwsl_info("%s: LWS_CALLBACK_HTTP: '%s'\n", __func__, 215 (const char *)in); 216 217 /* SSE requires a http OK response with this content-type */ 218 219 if (lws_add_http_common_headers(wsi, HTTP_STATUS_OK, 220 "text/event-stream", 221 LWS_ILLEGAL_HTTP_CONTENT_LEN, 222 &p, end)) 223 return 1; 224 225 if (lws_finalize_write_http_header(wsi, start, &p, end)) 226 return 1; 227 228 /* add ourselves to the list of live pss held in the vhd */ 229 230 lws_ll_fwd_insert(pss, pss_list, vhd->pss_list); 231 pss->tail = lws_ring_get_oldest_tail(vhd->ring); 232 pss->wsi = wsi; 233 234 /* 235 * This tells lws we are no longer a normal http stream, 236 * but are an "immortal" (plus or minus whatever timeout you 237 * set on it afterwards) SSE stream. In http/2 case that also 238 * stops idle timeouts being applied to the network connection 239 * while this wsi is still open. 240 */ 241 lws_http_mark_sse(wsi); 242 243 /* write the body separately */ 244 245 lws_callback_on_writable(wsi); 246 247 return 0; 248 249 case LWS_CALLBACK_CLOSED_HTTP: 250 /* remove our closing pss from the list of live pss */ 251 252 lws_ll_fwd_remove(struct pss, pss_list, pss, vhd->pss_list); 253 return 0; 254 255 /* --- data transfer --- */ 256 257 case LWS_CALLBACK_HTTP_WRITEABLE: 258 259 lwsl_info("%s: LWS_CALLBACK_HTTP_WRITEABLE\n", __func__); 260 261 pmsg = lws_ring_get_element(vhd->ring, &pss->tail); 262 if (!pmsg) 263 break; 264 265 p += lws_snprintf((char *)p, lws_ptr_diff_size_t(end, p), 266 "data: %s\x0d\x0a\x0d\x0a", 267 (const char *)pmsg->payload); 268 269 if (lws_write(wsi, (uint8_t *)start, lws_ptr_diff_size_t(p, start), 270 LWS_WRITE_HTTP) != lws_ptr_diff(p, start)) 271 return 1; 272 273 lws_ring_consume_and_update_oldest_tail( 274 vhd->ring, /* lws_ring object */ 275 struct pss, /* type of objects with tails */ 276 &pss->tail, /* tail of guy doing the consuming */ 277 1, /* number of payload objects being consumed */ 278 vhd->pss_list, /* head of list of objects with tails */ 279 tail, /* member name of tail in objects with tails */ 280 pss_list /* member name of next object in objects with tails */ 281 ); 282 283 if (lws_ring_get_element(vhd->ring, &pss->tail)) 284 /* come back as soon as we can write more */ 285 lws_callback_on_writable(pss->wsi); 286 287 return 0; 288 289 case LWS_CALLBACK_EVENT_WAIT_CANCELLED: 290 if (!vhd) 291 break; 292 /* 293 * let everybody know we want to write something on them 294 * as soon as they are ready 295 */ 296 lws_start_foreach_llp(struct pss **, ppss, vhd->pss_list) { 297 lws_callback_on_writable((*ppss)->wsi); 298 } lws_end_foreach_llp(ppss, pss_list); 299 return 0; 300 301 default: 302 break; 303 } 304 305 return lws_callback_http_dummy(wsi, reason, user, in, len); 306} 307 308static struct lws_protocols protocols[] = { 309 { "http", lws_callback_http_dummy, 0, 0, 0, NULL, 0 }, 310 { "sse", callback_sse, sizeof(struct pss), 0, 0, NULL, 0 }, 311 LWS_PROTOCOL_LIST_TERM 312}; 313 314/* override the default mount for /sse in the URL space */ 315 316static const struct lws_http_mount mount_sse = { 317 /* .mount_next */ NULL, /* linked-list "next" */ 318 /* .mountpoint */ "/sse", /* mountpoint URL */ 319 /* .origin */ NULL, /* protocol */ 320 /* .def */ NULL, 321 /* .protocol */ "sse", 322 /* .cgienv */ NULL, 323 /* .extra_mimetypes */ NULL, 324 /* .interpret */ NULL, 325 /* .cgi_timeout */ 0, 326 /* .cache_max_age */ 0, 327 /* .auth_mask */ 0, 328 /* .cache_reusable */ 0, 329 /* .cache_revalidate */ 0, 330 /* .cache_intermediaries */ 0, 331 /* .origin_protocol */ LWSMPRO_CALLBACK, /* dynamic */ 332 /* .mountpoint_len */ 4, /* char count */ 333 /* .basic_auth_login_file */ NULL, 334}; 335 336/* default mount serves the URL space from ./mount-origin */ 337 338static const struct lws_http_mount mount = { 339 /* .mount_next */ &mount_sse, /* linked-list "next" */ 340 /* .mountpoint */ "/", /* mountpoint URL */ 341 /* .origin */ "./mount-origin", /* serve from dir */ 342 /* .def */ "index.html", /* default filename */ 343 /* .protocol */ NULL, 344 /* .cgienv */ NULL, 345 /* .extra_mimetypes */ NULL, 346 /* .interpret */ NULL, 347 /* .cgi_timeout */ 0, 348 /* .cache_max_age */ 0, 349 /* .auth_mask */ 0, 350 /* .cache_reusable */ 0, 351 /* .cache_revalidate */ 0, 352 /* .cache_intermediaries */ 0, 353 /* .origin_protocol */ LWSMPRO_FILE, /* files in a dir */ 354 /* .mountpoint_len */ 1, /* char count */ 355 /* .basic_auth_login_file */ NULL, 356}; 357 358void sigint_handler(int sig) 359{ 360 interrupted = 1; 361} 362 363int main(int argc, const char **argv) 364{ 365 struct lws_context_creation_info info; 366 struct lws_context *context; 367 const char *p; 368 int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE 369 /* for LLL_ verbosity above NOTICE to be built into lws, 370 * lws must have been configured and built with 371 * -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */ 372 /* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */ 373 /* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */ 374 /* | LLL_DEBUG */; 375 376 signal(SIGINT, sigint_handler); 377 378 if ((p = lws_cmdline_option(argc, argv, "-d"))) 379 logs = atoi(p); 380 381 lws_set_log_level(logs, NULL); 382 lwsl_user("LWS minimal http Server-Side Events + ring | visit http://localhost:7681\n"); 383 384 memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ 385 info.port = 7681; 386 info.protocols = protocols; 387 info.mounts = &mount; 388 info.options = 389 LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE; 390 391 context = lws_create_context(&info); 392 if (!context) { 393 lwsl_err("lws init failed\n"); 394 return 1; 395 } 396 397 while (n >= 0 && !interrupted) 398 n = lws_service(context, 0); 399 400 lws_context_destroy(context); 401 402 return 0; 403} 404