1d4afb5ceSopenharmony_ci/* 2d4afb5ceSopenharmony_ci * ws protocol handler plugin for "lws-minimal" demonstrating multithread 3d4afb5ceSopenharmony_ci * 4d4afb5ceSopenharmony_ci * Written in 2010-2019 by Andy Green <andy@warmcat.com> 5d4afb5ceSopenharmony_ci * 6d4afb5ceSopenharmony_ci * This file is made available under the Creative Commons CC0 1.0 7d4afb5ceSopenharmony_ci * Universal Public Domain Dedication. 8d4afb5ceSopenharmony_ci */ 9d4afb5ceSopenharmony_ci 10d4afb5ceSopenharmony_ci#if !defined (LWS_PLUGIN_STATIC) 11d4afb5ceSopenharmony_ci#define LWS_DLL 12d4afb5ceSopenharmony_ci#define LWS_INTERNAL 13d4afb5ceSopenharmony_ci#include <libwebsockets.h> 14d4afb5ceSopenharmony_ci#endif 15d4afb5ceSopenharmony_ci 16d4afb5ceSopenharmony_ci#include <string.h> 17d4afb5ceSopenharmony_ci#include <assert.h> 18d4afb5ceSopenharmony_ci 19d4afb5ceSopenharmony_ci/* one of these created for each message in the ringbuffer */ 20d4afb5ceSopenharmony_ci 21d4afb5ceSopenharmony_cistruct msg { 22d4afb5ceSopenharmony_ci void *payload; /* is malloc'd */ 23d4afb5ceSopenharmony_ci size_t len; 24d4afb5ceSopenharmony_ci}; 25d4afb5ceSopenharmony_ci 26d4afb5ceSopenharmony_ci/* 27d4afb5ceSopenharmony_ci * One of these is created for each client connecting to us. 28d4afb5ceSopenharmony_ci * 29d4afb5ceSopenharmony_ci * It is ONLY read or written from the lws service thread context. 30d4afb5ceSopenharmony_ci */ 31d4afb5ceSopenharmony_ci 32d4afb5ceSopenharmony_cistruct per_session_data__minimal { 33d4afb5ceSopenharmony_ci struct per_session_data__minimal *pss_list; 34d4afb5ceSopenharmony_ci struct lws *wsi; 35d4afb5ceSopenharmony_ci uint32_t tail; 36d4afb5ceSopenharmony_ci}; 37d4afb5ceSopenharmony_ci 38d4afb5ceSopenharmony_ci/* 39d4afb5ceSopenharmony_ci * One of these is created for each vhost our protocol is used with, that 40d4afb5ceSopenharmony_ci * means it is a shared resource between the SMP threads and must be locked. 41d4afb5ceSopenharmony_ci */ 42d4afb5ceSopenharmony_ci 43d4afb5ceSopenharmony_cistruct per_vhost_data__minimal { 44d4afb5ceSopenharmony_ci struct lws_context *context; 45d4afb5ceSopenharmony_ci struct lws_vhost *vhost; 46d4afb5ceSopenharmony_ci const struct lws_protocols *protocol; 47d4afb5ceSopenharmony_ci 48d4afb5ceSopenharmony_ci struct per_session_data__minimal *pss_list; /* linked-list of live pss*/ 49d4afb5ceSopenharmony_ci pthread_t pthread_spam[2]; 50d4afb5ceSopenharmony_ci 51d4afb5ceSopenharmony_ci pthread_mutex_t lock_ring; /* serialize access to the ring buffer */ 52d4afb5ceSopenharmony_ci struct lws_ring *ring; /* {lock_ring} ringbuffer holding unsent content */ 53d4afb5ceSopenharmony_ci 54d4afb5ceSopenharmony_ci const char *config; 55d4afb5ceSopenharmony_ci char finished; 56d4afb5ceSopenharmony_ci}; 57d4afb5ceSopenharmony_ci 58d4afb5ceSopenharmony_ci#if defined(WIN32) 59d4afb5ceSopenharmony_cistatic void usleep(unsigned long l) { Sleep(l / 1000); } 60d4afb5ceSopenharmony_ci#endif 61d4afb5ceSopenharmony_ci 62d4afb5ceSopenharmony_ci/* 63d4afb5ceSopenharmony_ci * This runs under both lws service and "spam threads" contexts. 64d4afb5ceSopenharmony_ci * Access is serialized by vhd->lock_ring. 65d4afb5ceSopenharmony_ci */ 66d4afb5ceSopenharmony_ci 67d4afb5ceSopenharmony_cistatic void 68d4afb5ceSopenharmony_ci__minimal_destroy_message(void *_msg) 69d4afb5ceSopenharmony_ci{ 70d4afb5ceSopenharmony_ci struct msg *msg = _msg; 71d4afb5ceSopenharmony_ci 72d4afb5ceSopenharmony_ci free(msg->payload); 73d4afb5ceSopenharmony_ci msg->payload = NULL; 74d4afb5ceSopenharmony_ci msg->len = 0; 75d4afb5ceSopenharmony_ci} 76d4afb5ceSopenharmony_ci 77d4afb5ceSopenharmony_ci/* 78d4afb5ceSopenharmony_ci * This runs under the "spam thread" thread context only. 79d4afb5ceSopenharmony_ci * 80d4afb5ceSopenharmony_ci * We spawn two threads that generate messages with this. 81d4afb5ceSopenharmony_ci * 82d4afb5ceSopenharmony_ci */ 83d4afb5ceSopenharmony_ci 84d4afb5ceSopenharmony_cistatic void * 85d4afb5ceSopenharmony_cithread_spam(void *d) 86d4afb5ceSopenharmony_ci{ 87d4afb5ceSopenharmony_ci struct per_vhost_data__minimal *vhd = 88d4afb5ceSopenharmony_ci (struct per_vhost_data__minimal *)d; 89d4afb5ceSopenharmony_ci struct msg amsg; 90d4afb5ceSopenharmony_ci int len = 128, index = 1, n, whoami = 0; 91d4afb5ceSopenharmony_ci 92d4afb5ceSopenharmony_ci for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++) 93d4afb5ceSopenharmony_ci if (pthread_equal(pthread_self(), vhd->pthread_spam[n])) 94d4afb5ceSopenharmony_ci whoami = n + 1; 95d4afb5ceSopenharmony_ci 96d4afb5ceSopenharmony_ci do { 97d4afb5ceSopenharmony_ci pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */ 98d4afb5ceSopenharmony_ci 99d4afb5ceSopenharmony_ci /* don't generate output if nobody connected */ 100d4afb5ceSopenharmony_ci if (!vhd->pss_list) 101d4afb5ceSopenharmony_ci goto wait_unlock; 102d4afb5ceSopenharmony_ci 103d4afb5ceSopenharmony_ci /* only create if space in ringbuffer */ 104d4afb5ceSopenharmony_ci n = (int)lws_ring_get_count_free_elements(vhd->ring); 105d4afb5ceSopenharmony_ci if (!n) { 106d4afb5ceSopenharmony_ci // lwsl_user("dropping!\n"); 107d4afb5ceSopenharmony_ci goto wait_unlock; 108d4afb5ceSopenharmony_ci } 109d4afb5ceSopenharmony_ci 110d4afb5ceSopenharmony_ci amsg.payload = malloc((unsigned int)(LWS_PRE + len)); 111d4afb5ceSopenharmony_ci if (!amsg.payload) { 112d4afb5ceSopenharmony_ci lwsl_user("OOM: dropping\n"); 113d4afb5ceSopenharmony_ci goto wait_unlock; 114d4afb5ceSopenharmony_ci } 115d4afb5ceSopenharmony_ci n = lws_snprintf((char *)amsg.payload + LWS_PRE, (unsigned int)len, 116d4afb5ceSopenharmony_ci "%s: spam tid: %d, msg: %d", vhd->config, 117d4afb5ceSopenharmony_ci whoami, index++); 118d4afb5ceSopenharmony_ci amsg.len = (unsigned int)n; 119d4afb5ceSopenharmony_ci n = (int)lws_ring_insert(vhd->ring, &amsg, 1); 120d4afb5ceSopenharmony_ci if (n != 1) { 121d4afb5ceSopenharmony_ci __minimal_destroy_message(&amsg); 122d4afb5ceSopenharmony_ci // lwsl_user("dropping!\n"); 123d4afb5ceSopenharmony_ci } else 124d4afb5ceSopenharmony_ci /* 125d4afb5ceSopenharmony_ci * This will cause a LWS_CALLBACK_EVENT_WAIT_CANCELLED 126d4afb5ceSopenharmony_ci * in the lws service thread context. 127d4afb5ceSopenharmony_ci */ 128d4afb5ceSopenharmony_ci lws_cancel_service(vhd->context); 129d4afb5ceSopenharmony_ci 130d4afb5ceSopenharmony_ciwait_unlock: 131d4afb5ceSopenharmony_ci pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ 132d4afb5ceSopenharmony_ci 133d4afb5ceSopenharmony_ci usleep(100000); 134d4afb5ceSopenharmony_ci 135d4afb5ceSopenharmony_ci } while (!vhd->finished); 136d4afb5ceSopenharmony_ci 137d4afb5ceSopenharmony_ci lwsl_notice("thread_spam %d exiting\n", whoami); 138d4afb5ceSopenharmony_ci 139d4afb5ceSopenharmony_ci pthread_exit(NULL); 140d4afb5ceSopenharmony_ci 141d4afb5ceSopenharmony_ci return NULL; 142d4afb5ceSopenharmony_ci} 143d4afb5ceSopenharmony_ci 144d4afb5ceSopenharmony_ci/* this runs under the lws service thread context only */ 145d4afb5ceSopenharmony_ci 146d4afb5ceSopenharmony_cistatic int 147d4afb5ceSopenharmony_cicallback_minimal(struct lws *wsi, enum lws_callback_reasons reason, 148d4afb5ceSopenharmony_ci void *user, void *in, size_t len) 149d4afb5ceSopenharmony_ci{ 150d4afb5ceSopenharmony_ci struct per_session_data__minimal *pss = 151d4afb5ceSopenharmony_ci (struct per_session_data__minimal *)user; 152d4afb5ceSopenharmony_ci struct per_vhost_data__minimal *vhd = 153d4afb5ceSopenharmony_ci (struct per_vhost_data__minimal *) 154d4afb5ceSopenharmony_ci lws_protocol_vh_priv_get(lws_get_vhost(wsi), 155d4afb5ceSopenharmony_ci lws_get_protocol(wsi)); 156d4afb5ceSopenharmony_ci const struct lws_protocol_vhost_options *pvo; 157d4afb5ceSopenharmony_ci const struct msg *pmsg; 158d4afb5ceSopenharmony_ci char temp[LWS_PRE + 256]; 159d4afb5ceSopenharmony_ci void *retval; 160d4afb5ceSopenharmony_ci int n, m, r = 0; 161d4afb5ceSopenharmony_ci 162d4afb5ceSopenharmony_ci switch (reason) { 163d4afb5ceSopenharmony_ci case LWS_CALLBACK_PROTOCOL_INIT: 164d4afb5ceSopenharmony_ci /* create our per-vhost struct */ 165d4afb5ceSopenharmony_ci vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), 166d4afb5ceSopenharmony_ci lws_get_protocol(wsi), 167d4afb5ceSopenharmony_ci sizeof(struct per_vhost_data__minimal)); 168d4afb5ceSopenharmony_ci if (!vhd) 169d4afb5ceSopenharmony_ci return 1; 170d4afb5ceSopenharmony_ci 171d4afb5ceSopenharmony_ci pthread_mutex_init(&vhd->lock_ring, NULL); 172d4afb5ceSopenharmony_ci 173d4afb5ceSopenharmony_ci /* recover the pointer to the globals struct */ 174d4afb5ceSopenharmony_ci pvo = lws_pvo_search( 175d4afb5ceSopenharmony_ci (const struct lws_protocol_vhost_options *)in, 176d4afb5ceSopenharmony_ci "config"); 177d4afb5ceSopenharmony_ci if (!pvo || !pvo->value) { 178d4afb5ceSopenharmony_ci lwsl_err("%s: Can't find \"config\" pvo\n", __func__); 179d4afb5ceSopenharmony_ci return 1; 180d4afb5ceSopenharmony_ci } 181d4afb5ceSopenharmony_ci vhd->config = pvo->value; 182d4afb5ceSopenharmony_ci 183d4afb5ceSopenharmony_ci vhd->context = lws_get_context(wsi); 184d4afb5ceSopenharmony_ci vhd->protocol = lws_get_protocol(wsi); 185d4afb5ceSopenharmony_ci vhd->vhost = lws_get_vhost(wsi); 186d4afb5ceSopenharmony_ci 187d4afb5ceSopenharmony_ci vhd->ring = lws_ring_create(sizeof(struct msg), 8, 188d4afb5ceSopenharmony_ci __minimal_destroy_message); 189d4afb5ceSopenharmony_ci if (!vhd->ring) { 190d4afb5ceSopenharmony_ci lwsl_err("%s: failed to create ring\n", __func__); 191d4afb5ceSopenharmony_ci return 1; 192d4afb5ceSopenharmony_ci } 193d4afb5ceSopenharmony_ci 194d4afb5ceSopenharmony_ci /* start the content-creating threads */ 195d4afb5ceSopenharmony_ci 196d4afb5ceSopenharmony_ci for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++) 197d4afb5ceSopenharmony_ci if (pthread_create(&vhd->pthread_spam[n], NULL, 198d4afb5ceSopenharmony_ci thread_spam, vhd)) { 199d4afb5ceSopenharmony_ci lwsl_err("thread creation failed\n"); 200d4afb5ceSopenharmony_ci r = 1; 201d4afb5ceSopenharmony_ci goto init_fail; 202d4afb5ceSopenharmony_ci } 203d4afb5ceSopenharmony_ci break; 204d4afb5ceSopenharmony_ci 205d4afb5ceSopenharmony_ci case LWS_CALLBACK_PROTOCOL_DESTROY: 206d4afb5ceSopenharmony_ciinit_fail: 207d4afb5ceSopenharmony_ci vhd->finished = 1; 208d4afb5ceSopenharmony_ci for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++) 209d4afb5ceSopenharmony_ci pthread_join(vhd->pthread_spam[n], &retval); 210d4afb5ceSopenharmony_ci 211d4afb5ceSopenharmony_ci if (vhd->ring) 212d4afb5ceSopenharmony_ci lws_ring_destroy(vhd->ring); 213d4afb5ceSopenharmony_ci 214d4afb5ceSopenharmony_ci pthread_mutex_destroy(&vhd->lock_ring); 215d4afb5ceSopenharmony_ci break; 216d4afb5ceSopenharmony_ci 217d4afb5ceSopenharmony_ci case LWS_CALLBACK_ESTABLISHED: 218d4afb5ceSopenharmony_ci /* add ourselves to the list of live pss held in the vhd */ 219d4afb5ceSopenharmony_ci pthread_mutex_lock(&vhd->lock_ring); 220d4afb5ceSopenharmony_ci lws_ll_fwd_insert(pss, pss_list, vhd->pss_list); 221d4afb5ceSopenharmony_ci pss->tail = lws_ring_get_oldest_tail(vhd->ring); 222d4afb5ceSopenharmony_ci pss->wsi = wsi; 223d4afb5ceSopenharmony_ci pthread_mutex_unlock(&vhd->lock_ring); 224d4afb5ceSopenharmony_ci break; 225d4afb5ceSopenharmony_ci 226d4afb5ceSopenharmony_ci case LWS_CALLBACK_CLOSED: 227d4afb5ceSopenharmony_ci /* doesn't reference ring */ 228d4afb5ceSopenharmony_ci pthread_mutex_lock(&vhd->lock_ring); 229d4afb5ceSopenharmony_ci /* remove our closing pss from the list of live pss */ 230d4afb5ceSopenharmony_ci lws_ll_fwd_remove(struct per_session_data__minimal, pss_list, 231d4afb5ceSopenharmony_ci pss, vhd->pss_list); 232d4afb5ceSopenharmony_ci pthread_mutex_unlock(&vhd->lock_ring); 233d4afb5ceSopenharmony_ci break; 234d4afb5ceSopenharmony_ci 235d4afb5ceSopenharmony_ci case LWS_CALLBACK_SERVER_WRITEABLE: 236d4afb5ceSopenharmony_ci pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */ 237d4afb5ceSopenharmony_ci 238d4afb5ceSopenharmony_ci pmsg = lws_ring_get_element(vhd->ring, &pss->tail); 239d4afb5ceSopenharmony_ci if (!pmsg) { 240d4afb5ceSopenharmony_ci pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ 241d4afb5ceSopenharmony_ci 242d4afb5ceSopenharmony_ci break; 243d4afb5ceSopenharmony_ci } 244d4afb5ceSopenharmony_ci 245d4afb5ceSopenharmony_ci assert(pmsg->payload); 246d4afb5ceSopenharmony_ci 247d4afb5ceSopenharmony_ci n = lws_snprintf(temp + LWS_PRE, sizeof(temp) - LWS_PRE, 248d4afb5ceSopenharmony_ci "svc, %s", 249d4afb5ceSopenharmony_ci (char *)pmsg->payload + LWS_PRE); 250d4afb5ceSopenharmony_ci 251d4afb5ceSopenharmony_ci /* notice we allowed for LWS_PRE in the payload already */ 252d4afb5ceSopenharmony_ci m = lws_write(wsi, (unsigned char *)temp + LWS_PRE, (unsigned int)n, 253d4afb5ceSopenharmony_ci LWS_WRITE_TEXT); 254d4afb5ceSopenharmony_ci if (m < n) { 255d4afb5ceSopenharmony_ci pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ 256d4afb5ceSopenharmony_ci 257d4afb5ceSopenharmony_ci lwsl_err("ERROR %d writing to ws socket\n", m); 258d4afb5ceSopenharmony_ci return -1; 259d4afb5ceSopenharmony_ci } 260d4afb5ceSopenharmony_ci 261d4afb5ceSopenharmony_ci lws_ring_consume_and_update_oldest_tail( 262d4afb5ceSopenharmony_ci vhd->ring, /* lws_ring object */ 263d4afb5ceSopenharmony_ci struct per_session_data__minimal, /* type of objects with tails */ 264d4afb5ceSopenharmony_ci &pss->tail, /* tail of guy doing the consuming */ 265d4afb5ceSopenharmony_ci 1, /* number of payload objects being consumed */ 266d4afb5ceSopenharmony_ci vhd->pss_list, /* head of list of objects with tails */ 267d4afb5ceSopenharmony_ci tail, /* member name of tail in objects with tails */ 268d4afb5ceSopenharmony_ci pss_list /* member name of next object in objects with tails */ 269d4afb5ceSopenharmony_ci ); 270d4afb5ceSopenharmony_ci 271d4afb5ceSopenharmony_ci /* more to do? */ 272d4afb5ceSopenharmony_ci if (lws_ring_get_element(vhd->ring, &pss->tail)) 273d4afb5ceSopenharmony_ci /* come back as soon as we can write more */ 274d4afb5ceSopenharmony_ci lws_callback_on_writable(pss->wsi); 275d4afb5ceSopenharmony_ci 276d4afb5ceSopenharmony_ci pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ 277d4afb5ceSopenharmony_ci 278d4afb5ceSopenharmony_ci break; 279d4afb5ceSopenharmony_ci 280d4afb5ceSopenharmony_ci case LWS_CALLBACK_RECEIVE: 281d4afb5ceSopenharmony_ci break; 282d4afb5ceSopenharmony_ci 283d4afb5ceSopenharmony_ci case LWS_CALLBACK_EVENT_WAIT_CANCELLED: 284d4afb5ceSopenharmony_ci // lwsl_notice("EVENT_WAIT_CANCELLED tsi %d\n", lws_wsi_tsi(wsi)); 285d4afb5ceSopenharmony_ci if (!vhd) 286d4afb5ceSopenharmony_ci break; 287d4afb5ceSopenharmony_ci /* 288d4afb5ceSopenharmony_ci * When the "spam" threads add a message to the ringbuffer, 289d4afb5ceSopenharmony_ci * they create this event in the lws service thread context 290d4afb5ceSopenharmony_ci * using lws_cancel_service(). 291d4afb5ceSopenharmony_ci * 292d4afb5ceSopenharmony_ci * We respond by scheduling a writable callback for all 293d4afb5ceSopenharmony_ci * connected clients. 294d4afb5ceSopenharmony_ci */ 295d4afb5ceSopenharmony_ci 296d4afb5ceSopenharmony_ci pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */ 297d4afb5ceSopenharmony_ci 298d4afb5ceSopenharmony_ci lws_start_foreach_llp(struct per_session_data__minimal **, 299d4afb5ceSopenharmony_ci ppss, vhd->pss_list) { 300d4afb5ceSopenharmony_ci if (lws_wsi_tsi((*ppss)->wsi) == lws_wsi_tsi(wsi)) 301d4afb5ceSopenharmony_ci lws_callback_on_writable((*ppss)->wsi); 302d4afb5ceSopenharmony_ci } lws_end_foreach_llp(ppss, pss_list); 303d4afb5ceSopenharmony_ci 304d4afb5ceSopenharmony_ci pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ 305d4afb5ceSopenharmony_ci break; 306d4afb5ceSopenharmony_ci 307d4afb5ceSopenharmony_ci default: 308d4afb5ceSopenharmony_ci break; 309d4afb5ceSopenharmony_ci } 310d4afb5ceSopenharmony_ci 311d4afb5ceSopenharmony_ci return r; 312d4afb5ceSopenharmony_ci} 313d4afb5ceSopenharmony_ci 314d4afb5ceSopenharmony_ci#define LWS_PLUGIN_PROTOCOL_MINIMAL \ 315d4afb5ceSopenharmony_ci { \ 316d4afb5ceSopenharmony_ci "lws-minimal", \ 317d4afb5ceSopenharmony_ci callback_minimal, \ 318d4afb5ceSopenharmony_ci sizeof(struct per_session_data__minimal), \ 319d4afb5ceSopenharmony_ci 128, \ 320d4afb5ceSopenharmony_ci 0, NULL, 0 \ 321d4afb5ceSopenharmony_ci } 322