1d4afb5ceSopenharmony_ci/* 2d4afb5ceSopenharmony_ci * ws protocol handler plugin for "lws-minimal" demonstrating lws threadpool 3d4afb5ceSopenharmony_ci * 4d4afb5ceSopenharmony_ci * Written in 2010-2019 by Andy Green <andy@warmcat.com> 5d4afb5ceSopenharmony_ci * 6d4afb5ceSopenharmony_ci * This file is made available under the Creative Commons CC0 1.0 7d4afb5ceSopenharmony_ci * Universal Public Domain Dedication. 8d4afb5ceSopenharmony_ci * 9d4afb5ceSopenharmony_ci * The main reason some things are as they are is that the task lifecycle may 10d4afb5ceSopenharmony_ci * be unrelated to the wsi lifecycle that queued that task. 11d4afb5ceSopenharmony_ci * 12d4afb5ceSopenharmony_ci * Consider the task may call an external library and run for 30s without 13d4afb5ceSopenharmony_ci * "checking in" to see if it should stop. The wsi that started the task may 14d4afb5ceSopenharmony_ci * have closed at any time before the 30s are up, with the browser window 15d4afb5ceSopenharmony_ci * closing or whatever. 16d4afb5ceSopenharmony_ci * 17d4afb5ceSopenharmony_ci * So data shared between the asynchronous task and the wsi must have its 18d4afb5ceSopenharmony_ci * lifecycle determined by the task, not the wsi. That means a separate struct 19d4afb5ceSopenharmony_ci * that can be freed by the task. 20d4afb5ceSopenharmony_ci * 21d4afb5ceSopenharmony_ci * In the case the wsi outlives the task, the tasks do not get destroyed until 22d4afb5ceSopenharmony_ci * the service thread has called lws_threadpool_task_status() on the completed 23d4afb5ceSopenharmony_ci * task. So there is no danger of the shared task private data getting randomly 24d4afb5ceSopenharmony_ci * freed. 25d4afb5ceSopenharmony_ci */ 26d4afb5ceSopenharmony_ci 27d4afb5ceSopenharmony_ci#if !defined (LWS_PLUGIN_STATIC) 28d4afb5ceSopenharmony_ci#define LWS_DLL 29d4afb5ceSopenharmony_ci#define LWS_INTERNAL 30d4afb5ceSopenharmony_ci#include <libwebsockets.h> 31d4afb5ceSopenharmony_ci#endif 32d4afb5ceSopenharmony_ci 33d4afb5ceSopenharmony_ci#include <string.h> 34d4afb5ceSopenharmony_ci 35d4afb5ceSopenharmony_cistruct per_vhost_data__minimal { 36d4afb5ceSopenharmony_ci struct lws_threadpool *tp; 37d4afb5ceSopenharmony_ci struct lws_context *context; 38d4afb5ceSopenharmony_ci lws_sorted_usec_list_t sul; 39d4afb5ceSopenharmony_ci const char *config; 40d4afb5ceSopenharmony_ci}; 41d4afb5ceSopenharmony_ci 42d4afb5ceSopenharmony_cistruct task_data { 43d4afb5ceSopenharmony_ci char result[64]; 44d4afb5ceSopenharmony_ci 45d4afb5ceSopenharmony_ci uint64_t pos, end; 46d4afb5ceSopenharmony_ci}; 47d4afb5ceSopenharmony_ci 48d4afb5ceSopenharmony_ci#if defined(WIN32) 49d4afb5ceSopenharmony_cistatic void usleep(unsigned long l) { Sleep(l / 1000); } 50d4afb5ceSopenharmony_ci#endif 51d4afb5ceSopenharmony_ci 52d4afb5ceSopenharmony_ci/* 53d4afb5ceSopenharmony_ci * Create the private data for the task 54d4afb5ceSopenharmony_ci * 55d4afb5ceSopenharmony_ci * Notice we hand over responsibility for the cleanup and freeing of the 56d4afb5ceSopenharmony_ci * allocated task_data to the threadpool, because the wsi it was originally 57d4afb5ceSopenharmony_ci * bound to may close while the thread is still running. So we allocate 58d4afb5ceSopenharmony_ci * something discrete for the task private data that can be definitively owned 59d4afb5ceSopenharmony_ci * and freed by the threadpool, not the wsi... the pss won't do, as it only 60d4afb5ceSopenharmony_ci * exists for the lifecycle of the wsi connection. 61d4afb5ceSopenharmony_ci * 62d4afb5ceSopenharmony_ci * When the task is created, we also tell it how to destroy the private data 63d4afb5ceSopenharmony_ci * by giving it args.cleanup as cleanup_task_private_data() defined below. 64d4afb5ceSopenharmony_ci */ 65d4afb5ceSopenharmony_ci 66d4afb5ceSopenharmony_cistatic struct task_data * 67d4afb5ceSopenharmony_cicreate_task_private_data(void) 68d4afb5ceSopenharmony_ci{ 69d4afb5ceSopenharmony_ci struct task_data *priv = malloc(sizeof(*priv)); 70d4afb5ceSopenharmony_ci 71d4afb5ceSopenharmony_ci return priv; 72d4afb5ceSopenharmony_ci} 73d4afb5ceSopenharmony_ci 74d4afb5ceSopenharmony_ci/* 75d4afb5ceSopenharmony_ci * Destroy the private data for the task 76d4afb5ceSopenharmony_ci * 77d4afb5ceSopenharmony_ci * Notice the wsi the task was originally bound to may be long gone, in the 78d4afb5ceSopenharmony_ci * case we are destroying the lws context and the thread was doing something 79d4afb5ceSopenharmony_ci * for a long time without checking in. 80d4afb5ceSopenharmony_ci */ 81d4afb5ceSopenharmony_cistatic void 82d4afb5ceSopenharmony_cicleanup_task_private_data(struct lws *wsi, void *user) 83d4afb5ceSopenharmony_ci{ 84d4afb5ceSopenharmony_ci struct task_data *priv = (struct task_data *)user; 85d4afb5ceSopenharmony_ci 86d4afb5ceSopenharmony_ci free(priv); 87d4afb5ceSopenharmony_ci} 88d4afb5ceSopenharmony_ci 89d4afb5ceSopenharmony_ci/* 90d4afb5ceSopenharmony_ci * This runs in its own thread, from the threadpool. 91d4afb5ceSopenharmony_ci * 92d4afb5ceSopenharmony_ci * The implementation behind this in lws uses pthreads, but no pthreadisms are 93d4afb5ceSopenharmony_ci * required in the user code. 94d4afb5ceSopenharmony_ci * 95d4afb5ceSopenharmony_ci * The example counts to 10M, "checking in" to see if it should stop after every 96d4afb5ceSopenharmony_ci * 100K and pausing to sync with the service thread to send a ws message every 97d4afb5ceSopenharmony_ci * 1M. It resumes after the service thread determines the wsi is writable and 98d4afb5ceSopenharmony_ci * the LWS_CALLBACK_SERVER_WRITEABLE indicates the task thread can continue by 99d4afb5ceSopenharmony_ci * calling lws_threadpool_task_sync(). 100d4afb5ceSopenharmony_ci */ 101d4afb5ceSopenharmony_ci 102d4afb5ceSopenharmony_cistatic enum lws_threadpool_task_return 103d4afb5ceSopenharmony_citask_function(void *user, enum lws_threadpool_task_status s) 104d4afb5ceSopenharmony_ci{ 105d4afb5ceSopenharmony_ci struct task_data *priv = (struct task_data *)user; 106d4afb5ceSopenharmony_ci int budget = 100 * 1000; 107d4afb5ceSopenharmony_ci 108d4afb5ceSopenharmony_ci if (priv->pos == priv->end) 109d4afb5ceSopenharmony_ci return LWS_TP_RETURN_FINISHED; 110d4afb5ceSopenharmony_ci 111d4afb5ceSopenharmony_ci /* 112d4afb5ceSopenharmony_ci * Preferably replace this with ~100ms of your real task, so it 113d4afb5ceSopenharmony_ci * can "check in" at short intervals to see if it has been asked to 114d4afb5ceSopenharmony_ci * stop. 115d4afb5ceSopenharmony_ci * 116d4afb5ceSopenharmony_ci * You can just run tasks atomically here with the thread dedicated 117d4afb5ceSopenharmony_ci * to it, but it will cause odd delays while shutting down etc and 118d4afb5ceSopenharmony_ci * the task will run to completion even if the wsi that started it 119d4afb5ceSopenharmony_ci * has since closed. 120d4afb5ceSopenharmony_ci */ 121d4afb5ceSopenharmony_ci 122d4afb5ceSopenharmony_ci while (budget--) 123d4afb5ceSopenharmony_ci priv->pos++; 124d4afb5ceSopenharmony_ci 125d4afb5ceSopenharmony_ci usleep(100000); 126d4afb5ceSopenharmony_ci 127d4afb5ceSopenharmony_ci if (!(priv->pos % (1000 * 1000))) { 128d4afb5ceSopenharmony_ci lws_snprintf(priv->result + LWS_PRE, 129d4afb5ceSopenharmony_ci sizeof(priv->result) - LWS_PRE, 130d4afb5ceSopenharmony_ci "pos %llu", (unsigned long long)priv->pos); 131d4afb5ceSopenharmony_ci 132d4afb5ceSopenharmony_ci return LWS_TP_RETURN_SYNC; 133d4afb5ceSopenharmony_ci } 134d4afb5ceSopenharmony_ci 135d4afb5ceSopenharmony_ci return LWS_TP_RETURN_CHECKING_IN; 136d4afb5ceSopenharmony_ci} 137d4afb5ceSopenharmony_ci 138d4afb5ceSopenharmony_ci 139d4afb5ceSopenharmony_cistatic void 140d4afb5ceSopenharmony_cisul_tp_dump(struct lws_sorted_usec_list *sul) 141d4afb5ceSopenharmony_ci{ 142d4afb5ceSopenharmony_ci struct per_vhost_data__minimal *vhd = 143d4afb5ceSopenharmony_ci lws_container_of(sul, struct per_vhost_data__minimal, sul); 144d4afb5ceSopenharmony_ci /* 145d4afb5ceSopenharmony_ci * in debug mode, dump the threadpool stat to the logs once 146d4afb5ceSopenharmony_ci * a second 147d4afb5ceSopenharmony_ci */ 148d4afb5ceSopenharmony_ci lws_threadpool_dump(vhd->tp); 149d4afb5ceSopenharmony_ci lws_sul_schedule(vhd->context, 0, &vhd->sul, 150d4afb5ceSopenharmony_ci sul_tp_dump, LWS_US_PER_SEC); 151d4afb5ceSopenharmony_ci} 152d4afb5ceSopenharmony_ci 153d4afb5ceSopenharmony_ci 154d4afb5ceSopenharmony_cistatic int 155d4afb5ceSopenharmony_cicallback_minimal(struct lws *wsi, enum lws_callback_reasons reason, 156d4afb5ceSopenharmony_ci void *user, void *in, size_t len) 157d4afb5ceSopenharmony_ci{ 158d4afb5ceSopenharmony_ci struct per_vhost_data__minimal *vhd = 159d4afb5ceSopenharmony_ci (struct per_vhost_data__minimal *) 160d4afb5ceSopenharmony_ci lws_protocol_vh_priv_get(lws_get_vhost(wsi), 161d4afb5ceSopenharmony_ci lws_get_protocol(wsi)); 162d4afb5ceSopenharmony_ci const struct lws_protocol_vhost_options *pvo; 163d4afb5ceSopenharmony_ci struct lws_threadpool_create_args cargs; 164d4afb5ceSopenharmony_ci struct lws_threadpool_task_args args; 165d4afb5ceSopenharmony_ci struct lws_threadpool_task *task; 166d4afb5ceSopenharmony_ci struct task_data *priv; 167d4afb5ceSopenharmony_ci int n, m, r = 0; 168d4afb5ceSopenharmony_ci char name[32]; 169d4afb5ceSopenharmony_ci void *_user; 170d4afb5ceSopenharmony_ci 171d4afb5ceSopenharmony_ci switch (reason) { 172d4afb5ceSopenharmony_ci case LWS_CALLBACK_PROTOCOL_INIT: 173d4afb5ceSopenharmony_ci /* create our per-vhost struct */ 174d4afb5ceSopenharmony_ci vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), 175d4afb5ceSopenharmony_ci lws_get_protocol(wsi), 176d4afb5ceSopenharmony_ci sizeof(struct per_vhost_data__minimal)); 177d4afb5ceSopenharmony_ci if (!vhd) 178d4afb5ceSopenharmony_ci return 1; 179d4afb5ceSopenharmony_ci 180d4afb5ceSopenharmony_ci vhd->context = lws_get_context(wsi); 181d4afb5ceSopenharmony_ci 182d4afb5ceSopenharmony_ci /* recover the pointer to the globals struct */ 183d4afb5ceSopenharmony_ci pvo = lws_pvo_search( 184d4afb5ceSopenharmony_ci (const struct lws_protocol_vhost_options *)in, 185d4afb5ceSopenharmony_ci "config"); 186d4afb5ceSopenharmony_ci if (!pvo || !pvo->value) { 187d4afb5ceSopenharmony_ci lwsl_err("%s: Can't find \"config\" pvo\n", __func__); 188d4afb5ceSopenharmony_ci return 1; 189d4afb5ceSopenharmony_ci } 190d4afb5ceSopenharmony_ci vhd->config = pvo->value; 191d4afb5ceSopenharmony_ci 192d4afb5ceSopenharmony_ci memset(&cargs, 0, sizeof(cargs)); 193d4afb5ceSopenharmony_ci 194d4afb5ceSopenharmony_ci cargs.max_queue_depth = 8; 195d4afb5ceSopenharmony_ci cargs.threads = 3; 196d4afb5ceSopenharmony_ci vhd->tp = lws_threadpool_create(lws_get_context(wsi), 197d4afb5ceSopenharmony_ci &cargs, "%s", 198d4afb5ceSopenharmony_ci lws_get_vhost_name(lws_get_vhost(wsi))); 199d4afb5ceSopenharmony_ci if (!vhd->tp) 200d4afb5ceSopenharmony_ci return 1; 201d4afb5ceSopenharmony_ci 202d4afb5ceSopenharmony_ci lws_sul_schedule(vhd->context, 0, &vhd->sul, 203d4afb5ceSopenharmony_ci sul_tp_dump, LWS_US_PER_SEC); 204d4afb5ceSopenharmony_ci break; 205d4afb5ceSopenharmony_ci 206d4afb5ceSopenharmony_ci case LWS_CALLBACK_PROTOCOL_DESTROY: 207d4afb5ceSopenharmony_ci lws_threadpool_finish(vhd->tp); 208d4afb5ceSopenharmony_ci lws_threadpool_destroy(vhd->tp); 209d4afb5ceSopenharmony_ci lws_sul_cancel(&vhd->sul); 210d4afb5ceSopenharmony_ci break; 211d4afb5ceSopenharmony_ci 212d4afb5ceSopenharmony_ci case LWS_CALLBACK_ESTABLISHED: 213d4afb5ceSopenharmony_ci 214d4afb5ceSopenharmony_ci memset(&args, 0, sizeof(args)); 215d4afb5ceSopenharmony_ci priv = args.user = create_task_private_data(); 216d4afb5ceSopenharmony_ci if (!args.user) 217d4afb5ceSopenharmony_ci return 1; 218d4afb5ceSopenharmony_ci 219d4afb5ceSopenharmony_ci priv->pos = 0; 220d4afb5ceSopenharmony_ci priv->end = 10 * 1000 * 1000; 221d4afb5ceSopenharmony_ci 222d4afb5ceSopenharmony_ci /* queue the task... the task takes on responsibility for 223d4afb5ceSopenharmony_ci * destroying args.user. pss->priv just has a copy of it */ 224d4afb5ceSopenharmony_ci 225d4afb5ceSopenharmony_ci args.wsi = wsi; 226d4afb5ceSopenharmony_ci args.task = task_function; 227d4afb5ceSopenharmony_ci args.cleanup = cleanup_task_private_data; 228d4afb5ceSopenharmony_ci 229d4afb5ceSopenharmony_ci lws_get_peer_simple(wsi, name, sizeof(name)); 230d4afb5ceSopenharmony_ci 231d4afb5ceSopenharmony_ci if (!lws_threadpool_enqueue(vhd->tp, &args, "ws %s", name)) { 232d4afb5ceSopenharmony_ci lwsl_user("%s: Couldn't enqueue task\n", __func__); 233d4afb5ceSopenharmony_ci cleanup_task_private_data(wsi, priv); 234d4afb5ceSopenharmony_ci return 1; 235d4afb5ceSopenharmony_ci } 236d4afb5ceSopenharmony_ci 237d4afb5ceSopenharmony_ci lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL, 30); 238d4afb5ceSopenharmony_ci 239d4afb5ceSopenharmony_ci /* 240d4afb5ceSopenharmony_ci * so the asynchronous worker will let us know the next step 241d4afb5ceSopenharmony_ci * by causing LWS_CALLBACK_SERVER_WRITEABLE 242d4afb5ceSopenharmony_ci */ 243d4afb5ceSopenharmony_ci 244d4afb5ceSopenharmony_ci break; 245d4afb5ceSopenharmony_ci 246d4afb5ceSopenharmony_ci case LWS_CALLBACK_CLOSED: 247d4afb5ceSopenharmony_ci break; 248d4afb5ceSopenharmony_ci 249d4afb5ceSopenharmony_ci case LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL: 250d4afb5ceSopenharmony_ci lwsl_debug("LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL: %p\n", wsi); 251d4afb5ceSopenharmony_ci lws_threadpool_dequeue_task(lws_threadpool_get_task_wsi(wsi)); 252d4afb5ceSopenharmony_ci break; 253d4afb5ceSopenharmony_ci 254d4afb5ceSopenharmony_ci case LWS_CALLBACK_SERVER_WRITEABLE: 255d4afb5ceSopenharmony_ci 256d4afb5ceSopenharmony_ci /* 257d4afb5ceSopenharmony_ci * even completed tasks wait in a queue until we call the 258d4afb5ceSopenharmony_ci * below on them. Then they may destroy themselves and their 259d4afb5ceSopenharmony_ci * args.user data (by calling the cleanup callback). 260d4afb5ceSopenharmony_ci * 261d4afb5ceSopenharmony_ci * If you need to get things from the still-valid private task 262d4afb5ceSopenharmony_ci * data, copy it here before calling 263d4afb5ceSopenharmony_ci * lws_threadpool_task_status() that may free the task and the 264d4afb5ceSopenharmony_ci * private task data. 265d4afb5ceSopenharmony_ci */ 266d4afb5ceSopenharmony_ci 267d4afb5ceSopenharmony_ci task = lws_threadpool_get_task_wsi(wsi); 268d4afb5ceSopenharmony_ci if (!task) 269d4afb5ceSopenharmony_ci break; 270d4afb5ceSopenharmony_ci n = (int)lws_threadpool_task_status(task, &_user); 271d4afb5ceSopenharmony_ci lwsl_debug("%s: LWS_CALLBACK_SERVER_WRITEABLE: status %d\n", 272d4afb5ceSopenharmony_ci __func__, n); 273d4afb5ceSopenharmony_ci switch(n) { 274d4afb5ceSopenharmony_ci 275d4afb5ceSopenharmony_ci case LWS_TP_STATUS_FINISHED: 276d4afb5ceSopenharmony_ci case LWS_TP_STATUS_STOPPED: 277d4afb5ceSopenharmony_ci case LWS_TP_STATUS_QUEUED: 278d4afb5ceSopenharmony_ci case LWS_TP_STATUS_RUNNING: 279d4afb5ceSopenharmony_ci case LWS_TP_STATUS_STOPPING: 280d4afb5ceSopenharmony_ci return 0; 281d4afb5ceSopenharmony_ci 282d4afb5ceSopenharmony_ci case LWS_TP_STATUS_SYNCING: 283d4afb5ceSopenharmony_ci /* the task has paused for us to do something */ 284d4afb5ceSopenharmony_ci break; 285d4afb5ceSopenharmony_ci default: 286d4afb5ceSopenharmony_ci return -1; 287d4afb5ceSopenharmony_ci } 288d4afb5ceSopenharmony_ci 289d4afb5ceSopenharmony_ci priv = (struct task_data *)_user; 290d4afb5ceSopenharmony_ci 291d4afb5ceSopenharmony_ci lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL_TASK, 5); 292d4afb5ceSopenharmony_ci 293d4afb5ceSopenharmony_ci n = (int)strlen(priv->result + LWS_PRE); 294d4afb5ceSopenharmony_ci m = lws_write(wsi, (unsigned char *)priv->result + LWS_PRE, 295d4afb5ceSopenharmony_ci (unsigned int)n, LWS_WRITE_TEXT); 296d4afb5ceSopenharmony_ci if (m < n) { 297d4afb5ceSopenharmony_ci lwsl_err("ERROR %d writing to ws socket\n", m); 298d4afb5ceSopenharmony_ci lws_threadpool_task_sync(task, 1); 299d4afb5ceSopenharmony_ci return -1; 300d4afb5ceSopenharmony_ci } 301d4afb5ceSopenharmony_ci 302d4afb5ceSopenharmony_ci /* 303d4afb5ceSopenharmony_ci * service thread has done whatever it wanted to do with the 304d4afb5ceSopenharmony_ci * data the task produced: if it's waiting to do more it can 305d4afb5ceSopenharmony_ci * continue now. 306d4afb5ceSopenharmony_ci */ 307d4afb5ceSopenharmony_ci lws_threadpool_task_sync(task, 0); 308d4afb5ceSopenharmony_ci break; 309d4afb5ceSopenharmony_ci 310d4afb5ceSopenharmony_ci default: 311d4afb5ceSopenharmony_ci break; 312d4afb5ceSopenharmony_ci } 313d4afb5ceSopenharmony_ci 314d4afb5ceSopenharmony_ci return r; 315d4afb5ceSopenharmony_ci} 316d4afb5ceSopenharmony_ci 317d4afb5ceSopenharmony_ci#define LWS_PLUGIN_PROTOCOL_MINIMAL \ 318d4afb5ceSopenharmony_ci { \ 319d4afb5ceSopenharmony_ci "lws-minimal", \ 320d4afb5ceSopenharmony_ci callback_minimal, \ 321d4afb5ceSopenharmony_ci 0, \ 322d4afb5ceSopenharmony_ci 128, \ 323d4afb5ceSopenharmony_ci 0, NULL, 0 \ 324d4afb5ceSopenharmony_ci } 325