1/* 2 * ws protocol handler plugin for "lws-minimal" demonstrating multithread 3 * 4 * Written in 2010-2019 by Andy Green <andy@warmcat.com> 5 * 6 * This file is made available under the Creative Commons CC0 1.0 7 * Universal Public Domain Dedication. 8 */ 9 10#if !defined (LWS_PLUGIN_STATIC) 11#define LWS_DLL 12#define LWS_INTERNAL 13#include <libwebsockets.h> 14#endif 15 16#include <string.h> 17#include <assert.h> 18 19/* one of these created for each message in the ringbuffer */ 20 21struct msg { 22 void *payload; /* is malloc'd */ 23 size_t len; 24}; 25 26/* 27 * One of these is created for each client connecting to us. 28 * 29 * It is ONLY read or written from the lws service thread context. 30 */ 31 32struct per_session_data__minimal { 33 struct per_session_data__minimal *pss_list; 34 struct lws *wsi; 35 uint32_t tail; 36}; 37 38/* 39 * One of these is created for each vhost our protocol is used with, that 40 * means it is a shared resource between the SMP threads and must be locked. 41 */ 42 43struct per_vhost_data__minimal { 44 struct lws_context *context; 45 struct lws_vhost *vhost; 46 const struct lws_protocols *protocol; 47 48 struct per_session_data__minimal *pss_list; /* linked-list of live pss*/ 49 pthread_t pthread_spam[2]; 50 51 pthread_mutex_t lock_ring; /* serialize access to the ring buffer */ 52 struct lws_ring *ring; /* {lock_ring} ringbuffer holding unsent content */ 53 54 const char *config; 55 char finished; 56}; 57 58#if defined(WIN32) 59static void usleep(unsigned long l) { Sleep(l / 1000); } 60#endif 61 62/* 63 * This runs under both lws service and "spam threads" contexts. 64 * Access is serialized by vhd->lock_ring. 65 */ 66 67static void 68__minimal_destroy_message(void *_msg) 69{ 70 struct msg *msg = _msg; 71 72 free(msg->payload); 73 msg->payload = NULL; 74 msg->len = 0; 75} 76 77/* 78 * This runs under the "spam thread" thread context only. 79 * 80 * We spawn two threads that generate messages with this. 81 * 82 */ 83 84static void * 85thread_spam(void *d) 86{ 87 struct per_vhost_data__minimal *vhd = 88 (struct per_vhost_data__minimal *)d; 89 struct msg amsg; 90 int len = 128, index = 1, n, whoami = 0; 91 92 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++) 93 if (pthread_equal(pthread_self(), vhd->pthread_spam[n])) 94 whoami = n + 1; 95 96 do { 97 pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */ 98 99 /* don't generate output if nobody connected */ 100 if (!vhd->pss_list) 101 goto wait_unlock; 102 103 /* only create if space in ringbuffer */ 104 n = (int)lws_ring_get_count_free_elements(vhd->ring); 105 if (!n) { 106 // lwsl_user("dropping!\n"); 107 goto wait_unlock; 108 } 109 110 amsg.payload = malloc((unsigned int)(LWS_PRE + len)); 111 if (!amsg.payload) { 112 lwsl_user("OOM: dropping\n"); 113 goto wait_unlock; 114 } 115 n = lws_snprintf((char *)amsg.payload + LWS_PRE, (unsigned int)len, 116 "%s: spam tid: %d, msg: %d", vhd->config, 117 whoami, index++); 118 amsg.len = (unsigned int)n; 119 n = (int)lws_ring_insert(vhd->ring, &amsg, 1); 120 if (n != 1) { 121 __minimal_destroy_message(&amsg); 122 // lwsl_user("dropping!\n"); 123 } else 124 /* 125 * This will cause a LWS_CALLBACK_EVENT_WAIT_CANCELLED 126 * in the lws service thread context. 127 */ 128 lws_cancel_service(vhd->context); 129 130wait_unlock: 131 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ 132 133 usleep(100000); 134 135 } while (!vhd->finished); 136 137 lwsl_notice("thread_spam %d exiting\n", whoami); 138 139 pthread_exit(NULL); 140 141 return NULL; 142} 143 144/* this runs under the lws service thread context only */ 145 146static int 147callback_minimal(struct lws *wsi, enum lws_callback_reasons reason, 148 void *user, void *in, size_t len) 149{ 150 struct per_session_data__minimal *pss = 151 (struct per_session_data__minimal *)user; 152 struct per_vhost_data__minimal *vhd = 153 (struct per_vhost_data__minimal *) 154 lws_protocol_vh_priv_get(lws_get_vhost(wsi), 155 lws_get_protocol(wsi)); 156 const struct lws_protocol_vhost_options *pvo; 157 const struct msg *pmsg; 158 char temp[LWS_PRE + 256]; 159 void *retval; 160 int n, m, r = 0; 161 162 switch (reason) { 163 case LWS_CALLBACK_PROTOCOL_INIT: 164 /* create our per-vhost struct */ 165 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), 166 lws_get_protocol(wsi), 167 sizeof(struct per_vhost_data__minimal)); 168 if (!vhd) 169 return 1; 170 171 pthread_mutex_init(&vhd->lock_ring, NULL); 172 173 /* recover the pointer to the globals struct */ 174 pvo = lws_pvo_search( 175 (const struct lws_protocol_vhost_options *)in, 176 "config"); 177 if (!pvo || !pvo->value) { 178 lwsl_err("%s: Can't find \"config\" pvo\n", __func__); 179 return 1; 180 } 181 vhd->config = pvo->value; 182 183 vhd->context = lws_get_context(wsi); 184 vhd->protocol = lws_get_protocol(wsi); 185 vhd->vhost = lws_get_vhost(wsi); 186 187 vhd->ring = lws_ring_create(sizeof(struct msg), 8, 188 __minimal_destroy_message); 189 if (!vhd->ring) { 190 lwsl_err("%s: failed to create ring\n", __func__); 191 return 1; 192 } 193 194 /* start the content-creating threads */ 195 196 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++) 197 if (pthread_create(&vhd->pthread_spam[n], NULL, 198 thread_spam, vhd)) { 199 lwsl_err("thread creation failed\n"); 200 r = 1; 201 goto init_fail; 202 } 203 break; 204 205 case LWS_CALLBACK_PROTOCOL_DESTROY: 206init_fail: 207 vhd->finished = 1; 208 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++) 209 pthread_join(vhd->pthread_spam[n], &retval); 210 211 if (vhd->ring) 212 lws_ring_destroy(vhd->ring); 213 214 pthread_mutex_destroy(&vhd->lock_ring); 215 break; 216 217 case LWS_CALLBACK_ESTABLISHED: 218 /* add ourselves to the list of live pss held in the vhd */ 219 pthread_mutex_lock(&vhd->lock_ring); 220 lws_ll_fwd_insert(pss, pss_list, vhd->pss_list); 221 pss->tail = lws_ring_get_oldest_tail(vhd->ring); 222 pss->wsi = wsi; 223 pthread_mutex_unlock(&vhd->lock_ring); 224 break; 225 226 case LWS_CALLBACK_CLOSED: 227 /* doesn't reference ring */ 228 pthread_mutex_lock(&vhd->lock_ring); 229 /* remove our closing pss from the list of live pss */ 230 lws_ll_fwd_remove(struct per_session_data__minimal, pss_list, 231 pss, vhd->pss_list); 232 pthread_mutex_unlock(&vhd->lock_ring); 233 break; 234 235 case LWS_CALLBACK_SERVER_WRITEABLE: 236 pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */ 237 238 pmsg = lws_ring_get_element(vhd->ring, &pss->tail); 239 if (!pmsg) { 240 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ 241 242 break; 243 } 244 245 assert(pmsg->payload); 246 247 n = lws_snprintf(temp + LWS_PRE, sizeof(temp) - LWS_PRE, 248 "svc, %s", 249 (char *)pmsg->payload + LWS_PRE); 250 251 /* notice we allowed for LWS_PRE in the payload already */ 252 m = lws_write(wsi, (unsigned char *)temp + LWS_PRE, (unsigned int)n, 253 LWS_WRITE_TEXT); 254 if (m < n) { 255 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ 256 257 lwsl_err("ERROR %d writing to ws socket\n", m); 258 return -1; 259 } 260 261 lws_ring_consume_and_update_oldest_tail( 262 vhd->ring, /* lws_ring object */ 263 struct per_session_data__minimal, /* type of objects with tails */ 264 &pss->tail, /* tail of guy doing the consuming */ 265 1, /* number of payload objects being consumed */ 266 vhd->pss_list, /* head of list of objects with tails */ 267 tail, /* member name of tail in objects with tails */ 268 pss_list /* member name of next object in objects with tails */ 269 ); 270 271 /* more to do? */ 272 if (lws_ring_get_element(vhd->ring, &pss->tail)) 273 /* come back as soon as we can write more */ 274 lws_callback_on_writable(pss->wsi); 275 276 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ 277 278 break; 279 280 case LWS_CALLBACK_RECEIVE: 281 break; 282 283 case LWS_CALLBACK_EVENT_WAIT_CANCELLED: 284 // lwsl_notice("EVENT_WAIT_CANCELLED tsi %d\n", lws_wsi_tsi(wsi)); 285 if (!vhd) 286 break; 287 /* 288 * When the "spam" threads add a message to the ringbuffer, 289 * they create this event in the lws service thread context 290 * using lws_cancel_service(). 291 * 292 * We respond by scheduling a writable callback for all 293 * connected clients. 294 */ 295 296 pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */ 297 298 lws_start_foreach_llp(struct per_session_data__minimal **, 299 ppss, vhd->pss_list) { 300 if (lws_wsi_tsi((*ppss)->wsi) == lws_wsi_tsi(wsi)) 301 lws_callback_on_writable((*ppss)->wsi); 302 } lws_end_foreach_llp(ppss, pss_list); 303 304 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ 305 break; 306 307 default: 308 break; 309 } 310 311 return r; 312} 313 314#define LWS_PLUGIN_PROTOCOL_MINIMAL \ 315 { \ 316 "lws-minimal", \ 317 callback_minimal, \ 318 sizeof(struct per_session_data__minimal), \ 319 128, \ 320 0, NULL, 0 \ 321 } 322