1/* 2 * ws protocol handler plugin for "lws-minimal" demonstrating multithread 3 * 4 * Written in 2010-2019 by Andy Green <andy@warmcat.com> 5 * 6 * This file is made available under the Creative Commons CC0 1.0 7 * Universal Public Domain Dedication. 8 */ 9 10#if !defined (LWS_PLUGIN_STATIC) 11#define LWS_DLL 12#define LWS_INTERNAL 13#include <libwebsockets.h> 14#endif 15 16#include <string.h> 17 18/* one of these created for each message in the ringbuffer */ 19 20struct msg { 21 void *payload; /* is malloc'd */ 22 size_t len; 23}; 24 25/* 26 * One of these is created for each client connecting to us. 27 * 28 * It is ONLY read or written from the lws service thread context. 29 */ 30 31struct per_session_data__minimal { 32 struct per_session_data__minimal *pss_list; 33 struct lws *wsi; 34 uint32_t tail; 35}; 36 37/* one of these is created for each vhost our protocol is used with */ 38 39struct per_vhost_data__minimal { 40 struct lws_context *context; 41 struct lws_vhost *vhost; 42 const struct lws_protocols *protocol; 43 44 struct per_session_data__minimal *pss_list; /* linked-list of live pss*/ 45 pthread_t pthread_spam[2]; 46 47 pthread_mutex_t lock_ring; /* serialize access to the ring buffer */ 48 struct lws_ring *ring; /* {lock_ring} ringbuffer holding unsent content */ 49 50 const char *config; 51 char finished; 52}; 53 54#if defined(WIN32) 55static void usleep(unsigned long l) { Sleep(l / 1000); } 56#endif 57 58/* 59 * This runs under both lws service and "spam threads" contexts. 60 * Access is serialized by vhd->lock_ring. 61 */ 62 63static void 64__minimal_destroy_message(void *_msg) 65{ 66 struct msg *msg = _msg; 67 68 free(msg->payload); 69 msg->payload = NULL; 70 msg->len = 0; 71} 72 73/* 74 * This runs under the "spam thread" thread context only. 75 * 76 * We spawn two threads that generate messages with this. 77 * 78 */ 79 80static void * 81thread_spam(void *d) 82{ 83 struct per_vhost_data__minimal *vhd = 84 (struct per_vhost_data__minimal *)d; 85 struct msg amsg; 86 int len = 128, index = 1, n, whoami = 0; 87 88 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++) 89 if (pthread_equal(pthread_self(), vhd->pthread_spam[n])) 90 whoami = n + 1; 91 92 do { 93 /* don't generate output if nobody connected */ 94 if (!vhd->pss_list) 95 goto wait; 96 97 pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */ 98 99 /* only create if space in ringbuffer */ 100 n = (int)lws_ring_get_count_free_elements(vhd->ring); 101 if (!n) { 102 lwsl_user("dropping!\n"); 103 goto wait_unlock; 104 } 105 106 amsg.payload = malloc((unsigned int)(LWS_PRE + len)); 107 if (!amsg.payload) { 108 lwsl_user("OOM: dropping\n"); 109 goto wait_unlock; 110 } 111 n = lws_snprintf((char *)amsg.payload + LWS_PRE, (unsigned int)len, 112 "%s: tid: %d, msg: %d", vhd->config, 113 whoami, index++); 114 amsg.len = (unsigned int)n; 115 n = (int)lws_ring_insert(vhd->ring, &amsg, 1); 116 if (n != 1) { 117 __minimal_destroy_message(&amsg); 118 lwsl_user("dropping!\n"); 119 } else 120 /* 121 * This will cause a LWS_CALLBACK_EVENT_WAIT_CANCELLED 122 * in the lws service thread context. 123 */ 124 lws_cancel_service(vhd->context); 125 126wait_unlock: 127 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ 128 129wait: 130 usleep(100000); 131 132 } while (!vhd->finished); 133 134 lwsl_notice("thread_spam %d exiting\n", whoami); 135 136 pthread_exit(NULL); 137 138 return NULL; 139} 140 141/* this runs under the lws service thread context only */ 142 143static int 144callback_minimal(struct lws *wsi, enum lws_callback_reasons reason, 145 void *user, void *in, size_t len) 146{ 147 struct per_session_data__minimal *pss = 148 (struct per_session_data__minimal *)user; 149 struct per_vhost_data__minimal *vhd = 150 (struct per_vhost_data__minimal *) 151 lws_protocol_vh_priv_get(lws_get_vhost(wsi), 152 lws_get_protocol(wsi)); 153 const struct lws_protocol_vhost_options *pvo; 154 const struct msg *pmsg; 155 void *retval; 156 int n, m, r = 0; 157 158 switch (reason) { 159 case LWS_CALLBACK_PROTOCOL_INIT: 160 /* create our per-vhost struct */ 161 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), 162 lws_get_protocol(wsi), 163 sizeof(struct per_vhost_data__minimal)); 164 if (!vhd) 165 return 1; 166 167 pthread_mutex_init(&vhd->lock_ring, NULL); 168 169 /* recover the pointer to the globals struct */ 170 pvo = lws_pvo_search( 171 (const struct lws_protocol_vhost_options *)in, 172 "config"); 173 if (!pvo || !pvo->value) { 174 lwsl_err("%s: Can't find \"config\" pvo\n", __func__); 175 return 1; 176 } 177 vhd->config = pvo->value; 178 179 vhd->context = lws_get_context(wsi); 180 vhd->protocol = lws_get_protocol(wsi); 181 vhd->vhost = lws_get_vhost(wsi); 182 183 vhd->ring = lws_ring_create(sizeof(struct msg), 8, 184 __minimal_destroy_message); 185 if (!vhd->ring) { 186 lwsl_err("%s: failed to create ring\n", __func__); 187 return 1; 188 } 189 190 /* start the content-creating threads */ 191 192 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++) 193 if (pthread_create(&vhd->pthread_spam[n], NULL, 194 thread_spam, vhd)) { 195 lwsl_err("thread creation failed\n"); 196 r = 1; 197 goto init_fail; 198 } 199 break; 200 201 case LWS_CALLBACK_PROTOCOL_DESTROY: 202init_fail: 203 vhd->finished = 1; 204 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++) 205 if (vhd->pthread_spam[n]) 206 pthread_join(vhd->pthread_spam[n], &retval); 207 208 if (vhd->ring) 209 lws_ring_destroy(vhd->ring); 210 211 pthread_mutex_destroy(&vhd->lock_ring); 212 break; 213 214 case LWS_CALLBACK_ESTABLISHED: 215 /* add ourselves to the list of live pss held in the vhd */ 216 lws_ll_fwd_insert(pss, pss_list, vhd->pss_list); 217 pss->tail = lws_ring_get_oldest_tail(vhd->ring); 218 pss->wsi = wsi; 219 break; 220 221 case LWS_CALLBACK_CLOSED: 222 /* remove our closing pss from the list of live pss */ 223 lws_ll_fwd_remove(struct per_session_data__minimal, pss_list, 224 pss, vhd->pss_list); 225 break; 226 227 case LWS_CALLBACK_SERVER_WRITEABLE: 228 pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */ 229 230 pmsg = lws_ring_get_element(vhd->ring, &pss->tail); 231 if (!pmsg) { 232 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ 233 break; 234 } 235 236 /* notice we allowed for LWS_PRE in the payload already */ 237 m = lws_write(wsi, ((unsigned char *)pmsg->payload) + LWS_PRE, 238 pmsg->len, LWS_WRITE_TEXT); 239 if (m < (int)pmsg->len) { 240 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ 241 lwsl_err("ERROR %d writing to ws socket\n", m); 242 return -1; 243 } 244 245 lws_ring_consume_and_update_oldest_tail( 246 vhd->ring, /* lws_ring object */ 247 struct per_session_data__minimal, /* type of objects with tails */ 248 &pss->tail, /* tail of guy doing the consuming */ 249 1, /* number of payload objects being consumed */ 250 vhd->pss_list, /* head of list of objects with tails */ 251 tail, /* member name of tail in objects with tails */ 252 pss_list /* member name of next object in objects with tails */ 253 ); 254 255 /* more to do? */ 256 if (lws_ring_get_element(vhd->ring, &pss->tail)) 257 /* come back as soon as we can write more */ 258 lws_callback_on_writable(pss->wsi); 259 260 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ 261 break; 262 263 case LWS_CALLBACK_RECEIVE: 264 break; 265 266 case LWS_CALLBACK_EVENT_WAIT_CANCELLED: 267 if (!vhd) 268 break; 269 /* 270 * When the "spam" threads add a message to the ringbuffer, 271 * they create this event in the lws service thread context 272 * using lws_cancel_service(). 273 * 274 * We respond by scheduling a writable callback for all 275 * connected clients. 276 */ 277 lws_start_foreach_llp(struct per_session_data__minimal **, 278 ppss, vhd->pss_list) { 279 lws_callback_on_writable((*ppss)->wsi); 280 } lws_end_foreach_llp(ppss, pss_list); 281 break; 282 283 default: 284 break; 285 } 286 287 return r; 288} 289 290#define LWS_PLUGIN_PROTOCOL_MINIMAL \ 291 { \ 292 "lws-minimal", \ 293 callback_minimal, \ 294 sizeof(struct per_session_data__minimal), \ 295 128, \ 296 0, NULL, 0 \ 297 } 298