1d4afb5ceSopenharmony_ci/* 2d4afb5ceSopenharmony_ci * libwebsockets - small server side websockets and web server implementation 3d4afb5ceSopenharmony_ci * 4d4afb5ceSopenharmony_ci * Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com> 5d4afb5ceSopenharmony_ci * 6d4afb5ceSopenharmony_ci * Permission is hereby granted, free of charge, to any person obtaining a copy 7d4afb5ceSopenharmony_ci * of this software and associated documentation files (the "Software"), to 8d4afb5ceSopenharmony_ci * deal in the Software without restriction, including without limitation the 9d4afb5ceSopenharmony_ci * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 10d4afb5ceSopenharmony_ci * sell copies of the Software, and to permit persons to whom the Software is 11d4afb5ceSopenharmony_ci * furnished to do so, subject to the following conditions: 12d4afb5ceSopenharmony_ci * 13d4afb5ceSopenharmony_ci * The above copyright notice and this permission notice shall be included in 14d4afb5ceSopenharmony_ci * all copies or substantial portions of the Software. 15d4afb5ceSopenharmony_ci * 16d4afb5ceSopenharmony_ci * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17d4afb5ceSopenharmony_ci * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18d4afb5ceSopenharmony_ci * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19d4afb5ceSopenharmony_ci * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20d4afb5ceSopenharmony_ci * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 21d4afb5ceSopenharmony_ci * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 22d4afb5ceSopenharmony_ci * IN THE SOFTWARE. 23d4afb5ceSopenharmony_ci */ 24d4afb5ceSopenharmony_ci 25d4afb5ceSopenharmony_ci#if !defined(_GNU_SOURCE) 26d4afb5ceSopenharmony_ci#define _GNU_SOURCE 27d4afb5ceSopenharmony_ci#endif 28d4afb5ceSopenharmony_ci 29d4afb5ceSopenharmony_ci#if defined(WIN32) 30d4afb5ceSopenharmony_ci#define HAVE_STRUCT_TIMESPEC 31d4afb5ceSopenharmony_ci#if defined(pid_t) 32d4afb5ceSopenharmony_ci#undef pid_t 33d4afb5ceSopenharmony_ci#endif 34d4afb5ceSopenharmony_ci#endif 35d4afb5ceSopenharmony_ci#include <pthread.h> 36d4afb5ceSopenharmony_ci 37d4afb5ceSopenharmony_ci#include "private-lib-core.h" 38d4afb5ceSopenharmony_ci 39d4afb5ceSopenharmony_ci#include <string.h> 40d4afb5ceSopenharmony_ci#include <stdio.h> 41d4afb5ceSopenharmony_ci 42d4afb5ceSopenharmony_cistruct lws_threadpool; 43d4afb5ceSopenharmony_ci 44d4afb5ceSopenharmony_cistruct lws_threadpool_task { 45d4afb5ceSopenharmony_ci struct lws_threadpool_task *task_queue_next; 46d4afb5ceSopenharmony_ci 47d4afb5ceSopenharmony_ci struct lws_threadpool *tp; 48d4afb5ceSopenharmony_ci char name[32]; 49d4afb5ceSopenharmony_ci struct lws_threadpool_task_args args; 50d4afb5ceSopenharmony_ci 51d4afb5ceSopenharmony_ci lws_dll2_t list; 52d4afb5ceSopenharmony_ci 53d4afb5ceSopenharmony_ci lws_usec_t created; 54d4afb5ceSopenharmony_ci lws_usec_t acquired; 55d4afb5ceSopenharmony_ci lws_usec_t done; 56d4afb5ceSopenharmony_ci lws_usec_t entered_state; 57d4afb5ceSopenharmony_ci 58d4afb5ceSopenharmony_ci lws_usec_t acc_running; 59d4afb5ceSopenharmony_ci lws_usec_t acc_syncing; 60d4afb5ceSopenharmony_ci 61d4afb5ceSopenharmony_ci pthread_cond_t wake_idle; 62d4afb5ceSopenharmony_ci 63d4afb5ceSopenharmony_ci enum lws_threadpool_task_status status; 64d4afb5ceSopenharmony_ci 65d4afb5ceSopenharmony_ci int late_sync_retries; 66d4afb5ceSopenharmony_ci 67d4afb5ceSopenharmony_ci char wanted_writeable_cb; 68d4afb5ceSopenharmony_ci char outlive; 69d4afb5ceSopenharmony_ci}; 70d4afb5ceSopenharmony_ci 71d4afb5ceSopenharmony_cistruct lws_pool { 72d4afb5ceSopenharmony_ci struct lws_threadpool *tp; 73d4afb5ceSopenharmony_ci pthread_t thread; 74d4afb5ceSopenharmony_ci pthread_mutex_t lock; /* part of task wake_idle */ 75d4afb5ceSopenharmony_ci struct lws_threadpool_task *task; 76d4afb5ceSopenharmony_ci lws_usec_t acquired; 77d4afb5ceSopenharmony_ci int worker_index; 78d4afb5ceSopenharmony_ci}; 79d4afb5ceSopenharmony_ci 80d4afb5ceSopenharmony_cistruct lws_threadpool { 81d4afb5ceSopenharmony_ci pthread_mutex_t lock; /* protects all pool lists */ 82d4afb5ceSopenharmony_ci pthread_cond_t wake_idle; 83d4afb5ceSopenharmony_ci struct lws_pool *pool_list; 84d4afb5ceSopenharmony_ci 85d4afb5ceSopenharmony_ci struct lws_context *context; 86d4afb5ceSopenharmony_ci struct lws_threadpool *tp_list; /* context list of threadpools */ 87d4afb5ceSopenharmony_ci 88d4afb5ceSopenharmony_ci struct lws_threadpool_task *task_queue_head; 89d4afb5ceSopenharmony_ci struct lws_threadpool_task *task_done_head; 90d4afb5ceSopenharmony_ci 91d4afb5ceSopenharmony_ci char name[32]; 92d4afb5ceSopenharmony_ci 93d4afb5ceSopenharmony_ci int threads_in_pool; 94d4afb5ceSopenharmony_ci int queue_depth; 95d4afb5ceSopenharmony_ci int done_queue_depth; 96d4afb5ceSopenharmony_ci int max_queue_depth; 97d4afb5ceSopenharmony_ci int running_tasks; 98d4afb5ceSopenharmony_ci 99d4afb5ceSopenharmony_ci unsigned int destroying:1; 100d4afb5ceSopenharmony_ci}; 101d4afb5ceSopenharmony_ci 102d4afb5ceSopenharmony_cistatic int 103d4afb5ceSopenharmony_cims_delta(lws_usec_t now, lws_usec_t then) 104d4afb5ceSopenharmony_ci{ 105d4afb5ceSopenharmony_ci return (int)((now - then) / 1000); 106d4afb5ceSopenharmony_ci} 107d4afb5ceSopenharmony_ci 108d4afb5ceSopenharmony_cistatic void 109d4afb5ceSopenharmony_cius_accrue(lws_usec_t *acc, lws_usec_t then) 110d4afb5ceSopenharmony_ci{ 111d4afb5ceSopenharmony_ci lws_usec_t now = lws_now_usecs(); 112d4afb5ceSopenharmony_ci 113d4afb5ceSopenharmony_ci *acc += now - then; 114d4afb5ceSopenharmony_ci} 115d4afb5ceSopenharmony_ci 116d4afb5ceSopenharmony_cistatic int 117d4afb5ceSopenharmony_cipc_delta(lws_usec_t now, lws_usec_t then, lws_usec_t us) 118d4afb5ceSopenharmony_ci{ 119d4afb5ceSopenharmony_ci lws_usec_t delta = (now - then) + 1; 120d4afb5ceSopenharmony_ci 121d4afb5ceSopenharmony_ci return (int)((us * 100) / delta); 122d4afb5ceSopenharmony_ci} 123d4afb5ceSopenharmony_ci 124d4afb5ceSopenharmony_cistatic void 125d4afb5ceSopenharmony_ci__lws_threadpool_task_dump(struct lws_threadpool_task *task, char *buf, int len) 126d4afb5ceSopenharmony_ci{ 127d4afb5ceSopenharmony_ci lws_usec_t now = lws_now_usecs(); 128d4afb5ceSopenharmony_ci char *end = buf + len - 1; 129d4afb5ceSopenharmony_ci int syncms = 0, runms = 0; 130d4afb5ceSopenharmony_ci 131d4afb5ceSopenharmony_ci if (!task->acquired) { 132d4afb5ceSopenharmony_ci buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf), 133d4afb5ceSopenharmony_ci "task: %s, QUEUED queued: %dms", 134d4afb5ceSopenharmony_ci task->name, ms_delta(now, task->created)); 135d4afb5ceSopenharmony_ci 136d4afb5ceSopenharmony_ci return; 137d4afb5ceSopenharmony_ci } 138d4afb5ceSopenharmony_ci 139d4afb5ceSopenharmony_ci if (task->acc_running) 140d4afb5ceSopenharmony_ci runms = (int)task->acc_running; 141d4afb5ceSopenharmony_ci 142d4afb5ceSopenharmony_ci if (task->acc_syncing) 143d4afb5ceSopenharmony_ci syncms = (int)task->acc_syncing; 144d4afb5ceSopenharmony_ci 145d4afb5ceSopenharmony_ci if (!task->done) { 146d4afb5ceSopenharmony_ci buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf), 147d4afb5ceSopenharmony_ci "task: %s, ONGOING state %d (%dms) alive: %dms " 148d4afb5ceSopenharmony_ci "(queued %dms, acquired: %dms, " 149d4afb5ceSopenharmony_ci "run: %d%%, sync: %d%%)", task->name, task->status, 150d4afb5ceSopenharmony_ci ms_delta(now, task->entered_state), 151d4afb5ceSopenharmony_ci ms_delta(now, task->created), 152d4afb5ceSopenharmony_ci ms_delta(task->acquired, task->created), 153d4afb5ceSopenharmony_ci ms_delta(now, task->acquired), 154d4afb5ceSopenharmony_ci pc_delta(now, task->acquired, runms), 155d4afb5ceSopenharmony_ci pc_delta(now, task->acquired, syncms)); 156d4afb5ceSopenharmony_ci 157d4afb5ceSopenharmony_ci return; 158d4afb5ceSopenharmony_ci } 159d4afb5ceSopenharmony_ci 160d4afb5ceSopenharmony_ci lws_snprintf(buf, lws_ptr_diff_size_t(end, buf), 161d4afb5ceSopenharmony_ci "task: %s, DONE state %d lived: %dms " 162d4afb5ceSopenharmony_ci "(queued %dms, on thread: %dms, " 163d4afb5ceSopenharmony_ci "ran: %d%%, synced: %d%%)", task->name, task->status, 164d4afb5ceSopenharmony_ci ms_delta(task->done, task->created), 165d4afb5ceSopenharmony_ci ms_delta(task->acquired, task->created), 166d4afb5ceSopenharmony_ci ms_delta(task->done, task->acquired), 167d4afb5ceSopenharmony_ci pc_delta(task->done, task->acquired, runms), 168d4afb5ceSopenharmony_ci pc_delta(task->done, task->acquired, syncms)); 169d4afb5ceSopenharmony_ci} 170d4afb5ceSopenharmony_ci 171d4afb5ceSopenharmony_civoid 172d4afb5ceSopenharmony_cilws_threadpool_dump(struct lws_threadpool *tp) 173d4afb5ceSopenharmony_ci{ 174d4afb5ceSopenharmony_ci#if 0 175d4afb5ceSopenharmony_ci //defined(_DEBUG) 176d4afb5ceSopenharmony_ci struct lws_threadpool_task **c; 177d4afb5ceSopenharmony_ci char buf[160]; 178d4afb5ceSopenharmony_ci int n, count; 179d4afb5ceSopenharmony_ci 180d4afb5ceSopenharmony_ci pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */ 181d4afb5ceSopenharmony_ci 182d4afb5ceSopenharmony_ci lwsl_thread("%s: tp: %s, Queued: %d, Run: %d, Done: %d\n", __func__, 183d4afb5ceSopenharmony_ci tp->name, tp->queue_depth, tp->running_tasks, 184d4afb5ceSopenharmony_ci tp->done_queue_depth); 185d4afb5ceSopenharmony_ci 186d4afb5ceSopenharmony_ci count = 0; 187d4afb5ceSopenharmony_ci c = &tp->task_queue_head; 188d4afb5ceSopenharmony_ci while (*c) { 189d4afb5ceSopenharmony_ci struct lws_threadpool_task *task = *c; 190d4afb5ceSopenharmony_ci __lws_threadpool_task_dump(task, buf, sizeof(buf)); 191d4afb5ceSopenharmony_ci lwsl_thread(" - %s\n", buf); 192d4afb5ceSopenharmony_ci count++; 193d4afb5ceSopenharmony_ci 194d4afb5ceSopenharmony_ci c = &(*c)->task_queue_next; 195d4afb5ceSopenharmony_ci } 196d4afb5ceSopenharmony_ci 197d4afb5ceSopenharmony_ci if (count != tp->queue_depth) 198d4afb5ceSopenharmony_ci lwsl_err("%s: tp says queue depth %d, but actually %d\n", 199d4afb5ceSopenharmony_ci __func__, tp->queue_depth, count); 200d4afb5ceSopenharmony_ci 201d4afb5ceSopenharmony_ci count = 0; 202d4afb5ceSopenharmony_ci for (n = 0; n < tp->threads_in_pool; n++) { 203d4afb5ceSopenharmony_ci struct lws_pool *pool = &tp->pool_list[n]; 204d4afb5ceSopenharmony_ci struct lws_threadpool_task *task = pool->task; 205d4afb5ceSopenharmony_ci 206d4afb5ceSopenharmony_ci if (task) { 207d4afb5ceSopenharmony_ci __lws_threadpool_task_dump(task, buf, sizeof(buf)); 208d4afb5ceSopenharmony_ci lwsl_thread(" - worker %d: %s\n", n, buf); 209d4afb5ceSopenharmony_ci count++; 210d4afb5ceSopenharmony_ci } 211d4afb5ceSopenharmony_ci } 212d4afb5ceSopenharmony_ci 213d4afb5ceSopenharmony_ci if (count != tp->running_tasks) 214d4afb5ceSopenharmony_ci lwsl_err("%s: tp says %d running_tasks, but actually %d\n", 215d4afb5ceSopenharmony_ci __func__, tp->running_tasks, count); 216d4afb5ceSopenharmony_ci 217d4afb5ceSopenharmony_ci count = 0; 218d4afb5ceSopenharmony_ci c = &tp->task_done_head; 219d4afb5ceSopenharmony_ci while (*c) { 220d4afb5ceSopenharmony_ci struct lws_threadpool_task *task = *c; 221d4afb5ceSopenharmony_ci __lws_threadpool_task_dump(task, buf, sizeof(buf)); 222d4afb5ceSopenharmony_ci lwsl_thread(" - %s\n", buf); 223d4afb5ceSopenharmony_ci count++; 224d4afb5ceSopenharmony_ci 225d4afb5ceSopenharmony_ci c = &(*c)->task_queue_next; 226d4afb5ceSopenharmony_ci } 227d4afb5ceSopenharmony_ci 228d4afb5ceSopenharmony_ci if (count != tp->done_queue_depth) 229d4afb5ceSopenharmony_ci lwsl_err("%s: tp says done_queue_depth %d, but actually %d\n", 230d4afb5ceSopenharmony_ci __func__, tp->done_queue_depth, count); 231d4afb5ceSopenharmony_ci 232d4afb5ceSopenharmony_ci pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */ 233d4afb5ceSopenharmony_ci#endif 234d4afb5ceSopenharmony_ci} 235d4afb5ceSopenharmony_ci 236d4afb5ceSopenharmony_cistatic void 237d4afb5ceSopenharmony_cistate_transition(struct lws_threadpool_task *task, 238d4afb5ceSopenharmony_ci enum lws_threadpool_task_status status) 239d4afb5ceSopenharmony_ci{ 240d4afb5ceSopenharmony_ci task->entered_state = lws_now_usecs(); 241d4afb5ceSopenharmony_ci task->status = status; 242d4afb5ceSopenharmony_ci} 243d4afb5ceSopenharmony_ci 244d4afb5ceSopenharmony_cistatic struct lws * 245d4afb5ceSopenharmony_citask_to_wsi(struct lws_threadpool_task *task) 246d4afb5ceSopenharmony_ci{ 247d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS) 248d4afb5ceSopenharmony_ci if (task->args.ss) 249d4afb5ceSopenharmony_ci return task->args.ss->wsi; 250d4afb5ceSopenharmony_ci#endif 251d4afb5ceSopenharmony_ci return task->args.wsi; 252d4afb5ceSopenharmony_ci} 253d4afb5ceSopenharmony_ci 254d4afb5ceSopenharmony_cistatic void 255d4afb5ceSopenharmony_cilws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task) 256d4afb5ceSopenharmony_ci{ 257d4afb5ceSopenharmony_ci if (task->args.cleanup) 258d4afb5ceSopenharmony_ci task->args.cleanup(task_to_wsi(task), task->args.user); 259d4afb5ceSopenharmony_ci 260d4afb5ceSopenharmony_ci lws_dll2_remove(&task->list); 261d4afb5ceSopenharmony_ci 262d4afb5ceSopenharmony_ci lwsl_thread("%s: tp %p: cleaned finished task for %s\n", 263d4afb5ceSopenharmony_ci __func__, task->tp, lws_wsi_tag(task_to_wsi(task))); 264d4afb5ceSopenharmony_ci 265d4afb5ceSopenharmony_ci lws_free(task); 266d4afb5ceSopenharmony_ci} 267d4afb5ceSopenharmony_ci 268d4afb5ceSopenharmony_cistatic void 269d4afb5ceSopenharmony_ci__lws_threadpool_reap(struct lws_threadpool_task *task) 270d4afb5ceSopenharmony_ci{ 271d4afb5ceSopenharmony_ci struct lws_threadpool_task **c, *t = NULL; 272d4afb5ceSopenharmony_ci struct lws_threadpool *tp = task->tp; 273d4afb5ceSopenharmony_ci 274d4afb5ceSopenharmony_ci /* remove the task from the done queue */ 275d4afb5ceSopenharmony_ci 276d4afb5ceSopenharmony_ci if (tp) { 277d4afb5ceSopenharmony_ci c = &tp->task_done_head; 278d4afb5ceSopenharmony_ci 279d4afb5ceSopenharmony_ci while (*c) { 280d4afb5ceSopenharmony_ci if ((*c) == task) { 281d4afb5ceSopenharmony_ci t = *c; 282d4afb5ceSopenharmony_ci *c = t->task_queue_next; 283d4afb5ceSopenharmony_ci t->task_queue_next = NULL; 284d4afb5ceSopenharmony_ci tp->done_queue_depth--; 285d4afb5ceSopenharmony_ci 286d4afb5ceSopenharmony_ci lwsl_thread("%s: tp %s: reaped task %s\n", __func__, 287d4afb5ceSopenharmony_ci tp->name, lws_wsi_tag(task_to_wsi(task))); 288d4afb5ceSopenharmony_ci 289d4afb5ceSopenharmony_ci break; 290d4afb5ceSopenharmony_ci } 291d4afb5ceSopenharmony_ci c = &(*c)->task_queue_next; 292d4afb5ceSopenharmony_ci } 293d4afb5ceSopenharmony_ci 294d4afb5ceSopenharmony_ci if (!t) { 295d4afb5ceSopenharmony_ci lwsl_err("%s: task %p not in done queue\n", __func__, task); 296d4afb5ceSopenharmony_ci /* 297d4afb5ceSopenharmony_ci * This shouldn't occur, but in this case not really 298d4afb5ceSopenharmony_ci * safe to assume there's a task to destroy 299d4afb5ceSopenharmony_ci */ 300d4afb5ceSopenharmony_ci return; 301d4afb5ceSopenharmony_ci } 302d4afb5ceSopenharmony_ci } else 303d4afb5ceSopenharmony_ci lwsl_err("%s: task->tp NULL already\n", __func__); 304d4afb5ceSopenharmony_ci 305d4afb5ceSopenharmony_ci /* call the task's cleanup and delete the task itself */ 306d4afb5ceSopenharmony_ci 307d4afb5ceSopenharmony_ci lws_threadpool_task_cleanup_destroy(task); 308d4afb5ceSopenharmony_ci} 309d4afb5ceSopenharmony_ci 310d4afb5ceSopenharmony_ci/* 311d4afb5ceSopenharmony_ci * this gets called from each tsi service context after the service was 312d4afb5ceSopenharmony_ci * cancelled... we need to ask for the writable callback from the matching 313d4afb5ceSopenharmony_ci * tsi context for any wsis bound to a worked thread that need it 314d4afb5ceSopenharmony_ci */ 315d4afb5ceSopenharmony_ci 316d4afb5ceSopenharmony_ciint 317d4afb5ceSopenharmony_cilws_threadpool_tsi_context(struct lws_context *context, int tsi) 318d4afb5ceSopenharmony_ci{ 319d4afb5ceSopenharmony_ci struct lws_threadpool_task **c, *task = NULL; 320d4afb5ceSopenharmony_ci struct lws_threadpool *tp; 321d4afb5ceSopenharmony_ci struct lws *wsi; 322d4afb5ceSopenharmony_ci 323d4afb5ceSopenharmony_ci lws_context_lock(context, __func__); 324d4afb5ceSopenharmony_ci 325d4afb5ceSopenharmony_ci tp = context->tp_list_head; 326d4afb5ceSopenharmony_ci while (tp) { 327d4afb5ceSopenharmony_ci int n; 328d4afb5ceSopenharmony_ci 329d4afb5ceSopenharmony_ci /* for the running (syncing...) tasks... */ 330d4afb5ceSopenharmony_ci 331d4afb5ceSopenharmony_ci for (n = 0; n < tp->threads_in_pool; n++) { 332d4afb5ceSopenharmony_ci struct lws_pool *pool = &tp->pool_list[n]; 333d4afb5ceSopenharmony_ci 334d4afb5ceSopenharmony_ci task = pool->task; 335d4afb5ceSopenharmony_ci if (!task) 336d4afb5ceSopenharmony_ci continue; 337d4afb5ceSopenharmony_ci 338d4afb5ceSopenharmony_ci wsi = task_to_wsi(task); 339d4afb5ceSopenharmony_ci if (!wsi || wsi->tsi != tsi || 340d4afb5ceSopenharmony_ci (!task->wanted_writeable_cb && 341d4afb5ceSopenharmony_ci task->status != LWS_TP_STATUS_SYNCING)) 342d4afb5ceSopenharmony_ci continue; 343d4afb5ceSopenharmony_ci 344d4afb5ceSopenharmony_ci task->wanted_writeable_cb = 0; 345d4afb5ceSopenharmony_ci lws_memory_barrier(); 346d4afb5ceSopenharmony_ci 347d4afb5ceSopenharmony_ci /* 348d4afb5ceSopenharmony_ci * finally... we can ask for the callback on 349d4afb5ceSopenharmony_ci * writable from the correct service thread 350d4afb5ceSopenharmony_ci * context 351d4afb5ceSopenharmony_ci */ 352d4afb5ceSopenharmony_ci 353d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 354d4afb5ceSopenharmony_ci } 355d4afb5ceSopenharmony_ci 356d4afb5ceSopenharmony_ci /* for the done tasks... */ 357d4afb5ceSopenharmony_ci 358d4afb5ceSopenharmony_ci c = &tp->task_done_head; 359d4afb5ceSopenharmony_ci 360d4afb5ceSopenharmony_ci while (*c) { 361d4afb5ceSopenharmony_ci task = *c; 362d4afb5ceSopenharmony_ci wsi = task_to_wsi(task); 363d4afb5ceSopenharmony_ci 364d4afb5ceSopenharmony_ci if (wsi && wsi->tsi == tsi && 365d4afb5ceSopenharmony_ci (task->wanted_writeable_cb || 366d4afb5ceSopenharmony_ci task->status == LWS_TP_STATUS_SYNCING)) { 367d4afb5ceSopenharmony_ci 368d4afb5ceSopenharmony_ci task->wanted_writeable_cb = 0; 369d4afb5ceSopenharmony_ci lws_memory_barrier(); 370d4afb5ceSopenharmony_ci 371d4afb5ceSopenharmony_ci /* 372d4afb5ceSopenharmony_ci * finally... we can ask for the callback on 373d4afb5ceSopenharmony_ci * writable from the correct service thread 374d4afb5ceSopenharmony_ci * context 375d4afb5ceSopenharmony_ci */ 376d4afb5ceSopenharmony_ci 377d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 378d4afb5ceSopenharmony_ci } 379d4afb5ceSopenharmony_ci 380d4afb5ceSopenharmony_ci c = &task->task_queue_next; 381d4afb5ceSopenharmony_ci } 382d4afb5ceSopenharmony_ci 383d4afb5ceSopenharmony_ci tp = tp->tp_list; 384d4afb5ceSopenharmony_ci } 385d4afb5ceSopenharmony_ci 386d4afb5ceSopenharmony_ci lws_context_unlock(context); 387d4afb5ceSopenharmony_ci 388d4afb5ceSopenharmony_ci return 0; 389d4afb5ceSopenharmony_ci} 390d4afb5ceSopenharmony_ci 391d4afb5ceSopenharmony_cistatic int 392d4afb5ceSopenharmony_cilws_threadpool_worker_sync(struct lws_pool *pool, 393d4afb5ceSopenharmony_ci struct lws_threadpool_task *task) 394d4afb5ceSopenharmony_ci{ 395d4afb5ceSopenharmony_ci enum lws_threadpool_task_status temp; 396d4afb5ceSopenharmony_ci struct timespec abstime; 397d4afb5ceSopenharmony_ci struct lws *wsi; 398d4afb5ceSopenharmony_ci int tries = 15; 399d4afb5ceSopenharmony_ci 400d4afb5ceSopenharmony_ci /* block until writable acknowledges */ 401d4afb5ceSopenharmony_ci lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC in\n", __func__, task); 402d4afb5ceSopenharmony_ci pthread_mutex_lock(&pool->lock); /* ======================= pool lock */ 403d4afb5ceSopenharmony_ci 404d4afb5ceSopenharmony_ci lwsl_info("%s: %s: task %p (%s): syncing with %s\n", __func__, 405d4afb5ceSopenharmony_ci pool->tp->name, task, task->name, lws_wsi_tag(task_to_wsi(task))); 406d4afb5ceSopenharmony_ci 407d4afb5ceSopenharmony_ci temp = task->status; 408d4afb5ceSopenharmony_ci state_transition(task, LWS_TP_STATUS_SYNCING); 409d4afb5ceSopenharmony_ci while (tries--) { 410d4afb5ceSopenharmony_ci wsi = task_to_wsi(task); 411d4afb5ceSopenharmony_ci 412d4afb5ceSopenharmony_ci /* 413d4afb5ceSopenharmony_ci * if the wsi is no longer attached to this task, there is 414d4afb5ceSopenharmony_ci * nothing we can sync to usefully. Since the work wants to 415d4afb5ceSopenharmony_ci * sync, it means we should react to the situation by telling 416d4afb5ceSopenharmony_ci * the task it can't continue usefully by stopping it. 417d4afb5ceSopenharmony_ci */ 418d4afb5ceSopenharmony_ci 419d4afb5ceSopenharmony_ci if (!wsi) { 420d4afb5ceSopenharmony_ci lwsl_thread("%s: %s: task %p (%s): No longer bound to any " 421d4afb5ceSopenharmony_ci "wsi to sync to\n", __func__, pool->tp->name, 422d4afb5ceSopenharmony_ci task, task->name); 423d4afb5ceSopenharmony_ci 424d4afb5ceSopenharmony_ci state_transition(task, LWS_TP_STATUS_STOPPING); 425d4afb5ceSopenharmony_ci goto done; 426d4afb5ceSopenharmony_ci } 427d4afb5ceSopenharmony_ci 428d4afb5ceSopenharmony_ci /* 429d4afb5ceSopenharmony_ci * So "tries" times this is the maximum time between SYNC asking 430d4afb5ceSopenharmony_ci * for a callback on writable and actually getting it we are 431d4afb5ceSopenharmony_ci * willing to sit still for. 432d4afb5ceSopenharmony_ci * 433d4afb5ceSopenharmony_ci * If it is exceeded, we will stop the task. 434d4afb5ceSopenharmony_ci */ 435d4afb5ceSopenharmony_ci abstime.tv_sec = time(NULL) + 3; 436d4afb5ceSopenharmony_ci abstime.tv_nsec = 0; 437d4afb5ceSopenharmony_ci 438d4afb5ceSopenharmony_ci task->wanted_writeable_cb = 1; 439d4afb5ceSopenharmony_ci lws_memory_barrier(); 440d4afb5ceSopenharmony_ci 441d4afb5ceSopenharmony_ci /* 442d4afb5ceSopenharmony_ci * This will cause lws_threadpool_tsi_context() to get called 443d4afb5ceSopenharmony_ci * from each tsi service context, where we can safely ask for 444d4afb5ceSopenharmony_ci * a callback on writeable on the wsi we are associated with. 445d4afb5ceSopenharmony_ci */ 446d4afb5ceSopenharmony_ci lws_cancel_service(lws_get_context(wsi)); 447d4afb5ceSopenharmony_ci 448d4afb5ceSopenharmony_ci /* 449d4afb5ceSopenharmony_ci * so the danger here is that we asked for a writable callback 450d4afb5ceSopenharmony_ci * on the wsi, but for whatever reason, we are never going to 451d4afb5ceSopenharmony_ci * get one. To avoid deadlocking forever, we allow a set time 452d4afb5ceSopenharmony_ci * for the sync to happen naturally, otherwise the cond wait 453d4afb5ceSopenharmony_ci * times out and we stop the task. 454d4afb5ceSopenharmony_ci */ 455d4afb5ceSopenharmony_ci 456d4afb5ceSopenharmony_ci if (pthread_cond_timedwait(&task->wake_idle, &pool->lock, 457d4afb5ceSopenharmony_ci &abstime) == ETIMEDOUT) { 458d4afb5ceSopenharmony_ci task->late_sync_retries++; 459d4afb5ceSopenharmony_ci if (!tries) { 460d4afb5ceSopenharmony_ci lwsl_err("%s: %s: task %p (%s): SYNC timed out " 461d4afb5ceSopenharmony_ci "(associated %s)\n", 462d4afb5ceSopenharmony_ci __func__, pool->tp->name, task, 463d4afb5ceSopenharmony_ci task->name, lws_wsi_tag(task_to_wsi(task))); 464d4afb5ceSopenharmony_ci 465d4afb5ceSopenharmony_ci pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */ 466d4afb5ceSopenharmony_ci lws_threadpool_dequeue_task(task); 467d4afb5ceSopenharmony_ci return 1; /* destroyed task */ 468d4afb5ceSopenharmony_ci } 469d4afb5ceSopenharmony_ci 470d4afb5ceSopenharmony_ci continue; 471d4afb5ceSopenharmony_ci } else 472d4afb5ceSopenharmony_ci break; 473d4afb5ceSopenharmony_ci } 474d4afb5ceSopenharmony_ci 475d4afb5ceSopenharmony_ci if (task->status == LWS_TP_STATUS_SYNCING) 476d4afb5ceSopenharmony_ci state_transition(task, temp); 477d4afb5ceSopenharmony_ci 478d4afb5ceSopenharmony_ci lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC out\n", __func__, task); 479d4afb5ceSopenharmony_ci 480d4afb5ceSopenharmony_cidone: 481d4afb5ceSopenharmony_ci pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */ 482d4afb5ceSopenharmony_ci 483d4afb5ceSopenharmony_ci return 0; 484d4afb5ceSopenharmony_ci} 485d4afb5ceSopenharmony_ci 486d4afb5ceSopenharmony_ci#if !defined(WIN32) 487d4afb5ceSopenharmony_cistatic int dummy; 488d4afb5ceSopenharmony_ci#endif 489d4afb5ceSopenharmony_ci 490d4afb5ceSopenharmony_cistatic void * 491d4afb5ceSopenharmony_cilws_threadpool_worker(void *d) 492d4afb5ceSopenharmony_ci{ 493d4afb5ceSopenharmony_ci struct lws_threadpool_task **c, **c2, *task; 494d4afb5ceSopenharmony_ci struct lws_pool *pool = d; 495d4afb5ceSopenharmony_ci struct lws_threadpool *tp = pool->tp; 496d4afb5ceSopenharmony_ci char buf[160]; 497d4afb5ceSopenharmony_ci 498d4afb5ceSopenharmony_ci while (!tp->destroying) { 499d4afb5ceSopenharmony_ci 500d4afb5ceSopenharmony_ci /* we have no running task... wait and get one from the queue */ 501d4afb5ceSopenharmony_ci 502d4afb5ceSopenharmony_ci pthread_mutex_lock(&tp->lock); /* =================== tp lock */ 503d4afb5ceSopenharmony_ci 504d4afb5ceSopenharmony_ci /* 505d4afb5ceSopenharmony_ci * if there's no task already waiting in the queue, wait for 506d4afb5ceSopenharmony_ci * the wake_idle condition to signal us that might have changed 507d4afb5ceSopenharmony_ci */ 508d4afb5ceSopenharmony_ci while (!tp->task_queue_head && !tp->destroying) 509d4afb5ceSopenharmony_ci pthread_cond_wait(&tp->wake_idle, &tp->lock); 510d4afb5ceSopenharmony_ci 511d4afb5ceSopenharmony_ci if (tp->destroying) { 512d4afb5ceSopenharmony_ci lwsl_notice("%s: bailing\n", __func__); 513d4afb5ceSopenharmony_ci goto doneski; 514d4afb5ceSopenharmony_ci } 515d4afb5ceSopenharmony_ci 516d4afb5ceSopenharmony_ci c = &tp->task_queue_head; 517d4afb5ceSopenharmony_ci c2 = NULL; 518d4afb5ceSopenharmony_ci task = NULL; 519d4afb5ceSopenharmony_ci pool->task = NULL; 520d4afb5ceSopenharmony_ci 521d4afb5ceSopenharmony_ci /* look at the queue tail */ 522d4afb5ceSopenharmony_ci while (*c) { 523d4afb5ceSopenharmony_ci c2 = c; 524d4afb5ceSopenharmony_ci c = &(*c)->task_queue_next; 525d4afb5ceSopenharmony_ci } 526d4afb5ceSopenharmony_ci 527d4afb5ceSopenharmony_ci /* is there a task at the queue tail? */ 528d4afb5ceSopenharmony_ci if (c2 && *c2) { 529d4afb5ceSopenharmony_ci pool->task = task = *c2; 530d4afb5ceSopenharmony_ci task->acquired = pool->acquired = lws_now_usecs(); 531d4afb5ceSopenharmony_ci /* remove it from the queue */ 532d4afb5ceSopenharmony_ci *c2 = task->task_queue_next; 533d4afb5ceSopenharmony_ci task->task_queue_next = NULL; 534d4afb5ceSopenharmony_ci tp->queue_depth--; 535d4afb5ceSopenharmony_ci /* mark it as running */ 536d4afb5ceSopenharmony_ci state_transition(task, LWS_TP_STATUS_RUNNING); 537d4afb5ceSopenharmony_ci } 538d4afb5ceSopenharmony_ci 539d4afb5ceSopenharmony_ci /* someone else got it first... wait and try again */ 540d4afb5ceSopenharmony_ci if (!task) { 541d4afb5ceSopenharmony_ci pthread_mutex_unlock(&tp->lock); /* ------ tp unlock */ 542d4afb5ceSopenharmony_ci continue; 543d4afb5ceSopenharmony_ci } 544d4afb5ceSopenharmony_ci 545d4afb5ceSopenharmony_ci task->wanted_writeable_cb = 0; 546d4afb5ceSopenharmony_ci 547d4afb5ceSopenharmony_ci /* we have acquired a new task */ 548d4afb5ceSopenharmony_ci 549d4afb5ceSopenharmony_ci __lws_threadpool_task_dump(task, buf, sizeof(buf)); 550d4afb5ceSopenharmony_ci 551d4afb5ceSopenharmony_ci lwsl_thread("%s: %s: worker %d ACQUIRING: %s\n", 552d4afb5ceSopenharmony_ci __func__, tp->name, pool->worker_index, buf); 553d4afb5ceSopenharmony_ci tp->running_tasks++; 554d4afb5ceSopenharmony_ci 555d4afb5ceSopenharmony_ci pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */ 556d4afb5ceSopenharmony_ci 557d4afb5ceSopenharmony_ci /* 558d4afb5ceSopenharmony_ci * 1) The task can return with LWS_TP_RETURN_CHECKING_IN to 559d4afb5ceSopenharmony_ci * "resurface" periodically, and get called again with 560d4afb5ceSopenharmony_ci * cont = 1 immediately to indicate it is picking up where it 561d4afb5ceSopenharmony_ci * left off if the task is not being "stopped". 562d4afb5ceSopenharmony_ci * 563d4afb5ceSopenharmony_ci * This allows long tasks to respond to requests to stop in 564d4afb5ceSopenharmony_ci * a clean and opaque way. 565d4afb5ceSopenharmony_ci * 566d4afb5ceSopenharmony_ci * 2) The task can return with LWS_TP_RETURN_SYNC to register 567d4afb5ceSopenharmony_ci * a "callback on writable" request on the service thread and 568d4afb5ceSopenharmony_ci * block until it hears back from the WRITABLE handler. 569d4afb5ceSopenharmony_ci * 570d4afb5ceSopenharmony_ci * This allows the work on the thread to be synchronized to the 571d4afb5ceSopenharmony_ci * previous work being dispatched cleanly. 572d4afb5ceSopenharmony_ci * 573d4afb5ceSopenharmony_ci * 3) The task can return with LWS_TP_RETURN_FINISHED to 574d4afb5ceSopenharmony_ci * indicate its work is completed nicely. 575d4afb5ceSopenharmony_ci * 576d4afb5ceSopenharmony_ci * 4) The task can return with LWS_TP_RETURN_STOPPED to indicate 577d4afb5ceSopenharmony_ci * it stopped and cleaned up after incomplete work. 578d4afb5ceSopenharmony_ci */ 579d4afb5ceSopenharmony_ci 580d4afb5ceSopenharmony_ci do { 581d4afb5ceSopenharmony_ci lws_usec_t then; 582d4afb5ceSopenharmony_ci int n; 583d4afb5ceSopenharmony_ci 584d4afb5ceSopenharmony_ci if (tp->destroying || !task_to_wsi(task)) { 585d4afb5ceSopenharmony_ci lwsl_info("%s: stopping on wsi gone\n", __func__); 586d4afb5ceSopenharmony_ci state_transition(task, LWS_TP_STATUS_STOPPING); 587d4afb5ceSopenharmony_ci } 588d4afb5ceSopenharmony_ci 589d4afb5ceSopenharmony_ci then = lws_now_usecs(); 590d4afb5ceSopenharmony_ci n = (int)task->args.task(task->args.user, task->status); 591d4afb5ceSopenharmony_ci lwsl_debug(" %d, status %d\n", n, task->status); 592d4afb5ceSopenharmony_ci us_accrue(&task->acc_running, then); 593d4afb5ceSopenharmony_ci if (n & LWS_TP_RETURN_FLAG_OUTLIVE) 594d4afb5ceSopenharmony_ci task->outlive = 1; 595d4afb5ceSopenharmony_ci switch (n & 7) { 596d4afb5ceSopenharmony_ci case LWS_TP_RETURN_CHECKING_IN: 597d4afb5ceSopenharmony_ci /* if not destroying the tp, continue */ 598d4afb5ceSopenharmony_ci break; 599d4afb5ceSopenharmony_ci case LWS_TP_RETURN_SYNC: 600d4afb5ceSopenharmony_ci if (!task_to_wsi(task)) { 601d4afb5ceSopenharmony_ci lwsl_debug("%s: task that wants to " 602d4afb5ceSopenharmony_ci "outlive lost wsi asked " 603d4afb5ceSopenharmony_ci "to sync: bypassed\n", 604d4afb5ceSopenharmony_ci __func__); 605d4afb5ceSopenharmony_ci break; 606d4afb5ceSopenharmony_ci } 607d4afb5ceSopenharmony_ci /* block until writable acknowledges */ 608d4afb5ceSopenharmony_ci then = lws_now_usecs(); 609d4afb5ceSopenharmony_ci if (lws_threadpool_worker_sync(pool, task)) { 610d4afb5ceSopenharmony_ci lwsl_notice("%s: Sync failed\n", __func__); 611d4afb5ceSopenharmony_ci goto doneski; 612d4afb5ceSopenharmony_ci } 613d4afb5ceSopenharmony_ci us_accrue(&task->acc_syncing, then); 614d4afb5ceSopenharmony_ci break; 615d4afb5ceSopenharmony_ci case LWS_TP_RETURN_FINISHED: 616d4afb5ceSopenharmony_ci state_transition(task, LWS_TP_STATUS_FINISHED); 617d4afb5ceSopenharmony_ci break; 618d4afb5ceSopenharmony_ci case LWS_TP_RETURN_STOPPED: 619d4afb5ceSopenharmony_ci state_transition(task, LWS_TP_STATUS_STOPPED); 620d4afb5ceSopenharmony_ci break; 621d4afb5ceSopenharmony_ci } 622d4afb5ceSopenharmony_ci } while (task->status == LWS_TP_STATUS_RUNNING); 623d4afb5ceSopenharmony_ci 624d4afb5ceSopenharmony_ci pthread_mutex_lock(&tp->lock); /* =================== tp lock */ 625d4afb5ceSopenharmony_ci 626d4afb5ceSopenharmony_ci tp->running_tasks--; 627d4afb5ceSopenharmony_ci 628d4afb5ceSopenharmony_ci if (pool->task->status == LWS_TP_STATUS_STOPPING) 629d4afb5ceSopenharmony_ci state_transition(task, LWS_TP_STATUS_STOPPED); 630d4afb5ceSopenharmony_ci 631d4afb5ceSopenharmony_ci /* move the task to the done queue */ 632d4afb5ceSopenharmony_ci 633d4afb5ceSopenharmony_ci pool->task->task_queue_next = tp->task_done_head; 634d4afb5ceSopenharmony_ci tp->task_done_head = task; 635d4afb5ceSopenharmony_ci tp->done_queue_depth++; 636d4afb5ceSopenharmony_ci pool->task->done = lws_now_usecs(); 637d4afb5ceSopenharmony_ci 638d4afb5ceSopenharmony_ci if (!pool->task->args.wsi && 639d4afb5ceSopenharmony_ci (pool->task->status == LWS_TP_STATUS_STOPPED || 640d4afb5ceSopenharmony_ci pool->task->status == LWS_TP_STATUS_FINISHED)) { 641d4afb5ceSopenharmony_ci 642d4afb5ceSopenharmony_ci __lws_threadpool_task_dump(pool->task, buf, sizeof(buf)); 643d4afb5ceSopenharmony_ci lwsl_thread("%s: %s: worker %d REAPING: %s\n", 644d4afb5ceSopenharmony_ci __func__, tp->name, pool->worker_index, 645d4afb5ceSopenharmony_ci buf); 646d4afb5ceSopenharmony_ci 647d4afb5ceSopenharmony_ci /* 648d4afb5ceSopenharmony_ci * there is no longer any wsi attached, so nothing is 649d4afb5ceSopenharmony_ci * going to take care of reaping us. So we must take 650d4afb5ceSopenharmony_ci * care of it ourselves. 651d4afb5ceSopenharmony_ci */ 652d4afb5ceSopenharmony_ci __lws_threadpool_reap(pool->task); 653d4afb5ceSopenharmony_ci } else { 654d4afb5ceSopenharmony_ci 655d4afb5ceSopenharmony_ci __lws_threadpool_task_dump(pool->task, buf, sizeof(buf)); 656d4afb5ceSopenharmony_ci lwsl_thread("%s: %s: worker %d DONE: %s\n", 657d4afb5ceSopenharmony_ci __func__, tp->name, pool->worker_index, 658d4afb5ceSopenharmony_ci buf); 659d4afb5ceSopenharmony_ci 660d4afb5ceSopenharmony_ci /* signal the associated wsi to take a fresh look at 661d4afb5ceSopenharmony_ci * task status */ 662d4afb5ceSopenharmony_ci 663d4afb5ceSopenharmony_ci if (task_to_wsi(pool->task)) { 664d4afb5ceSopenharmony_ci task->wanted_writeable_cb = 1; 665d4afb5ceSopenharmony_ci 666d4afb5ceSopenharmony_ci lws_cancel_service( 667d4afb5ceSopenharmony_ci lws_get_context(task_to_wsi(pool->task))); 668d4afb5ceSopenharmony_ci } 669d4afb5ceSopenharmony_ci } 670d4afb5ceSopenharmony_ci 671d4afb5ceSopenharmony_cidoneski: 672d4afb5ceSopenharmony_ci pool->task = NULL; 673d4afb5ceSopenharmony_ci pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */ 674d4afb5ceSopenharmony_ci } 675d4afb5ceSopenharmony_ci 676d4afb5ceSopenharmony_ci lwsl_notice("%s: Exiting\n", __func__); 677d4afb5ceSopenharmony_ci 678d4afb5ceSopenharmony_ci /* threadpool is being destroyed */ 679d4afb5ceSopenharmony_ci#if !defined(WIN32) 680d4afb5ceSopenharmony_ci pthread_exit(&dummy); 681d4afb5ceSopenharmony_ci#endif 682d4afb5ceSopenharmony_ci 683d4afb5ceSopenharmony_ci return NULL; 684d4afb5ceSopenharmony_ci} 685d4afb5ceSopenharmony_ci 686d4afb5ceSopenharmony_cistruct lws_threadpool * 687d4afb5ceSopenharmony_cilws_threadpool_create(struct lws_context *context, 688d4afb5ceSopenharmony_ci const struct lws_threadpool_create_args *args, 689d4afb5ceSopenharmony_ci const char *format, ...) 690d4afb5ceSopenharmony_ci{ 691d4afb5ceSopenharmony_ci struct lws_threadpool *tp; 692d4afb5ceSopenharmony_ci va_list ap; 693d4afb5ceSopenharmony_ci int n; 694d4afb5ceSopenharmony_ci 695d4afb5ceSopenharmony_ci tp = lws_malloc(sizeof(*tp) + (sizeof(struct lws_pool) * (unsigned int)args->threads), 696d4afb5ceSopenharmony_ci "threadpool alloc"); 697d4afb5ceSopenharmony_ci if (!tp) 698d4afb5ceSopenharmony_ci return NULL; 699d4afb5ceSopenharmony_ci 700d4afb5ceSopenharmony_ci memset(tp, 0, sizeof(*tp) + (sizeof(struct lws_pool) * (unsigned int)args->threads)); 701d4afb5ceSopenharmony_ci tp->pool_list = (struct lws_pool *)(tp + 1); 702d4afb5ceSopenharmony_ci tp->max_queue_depth = args->max_queue_depth; 703d4afb5ceSopenharmony_ci 704d4afb5ceSopenharmony_ci va_start(ap, format); 705d4afb5ceSopenharmony_ci n = vsnprintf(tp->name, sizeof(tp->name) - 1, format, ap); 706d4afb5ceSopenharmony_ci va_end(ap); 707d4afb5ceSopenharmony_ci 708d4afb5ceSopenharmony_ci lws_context_lock(context, __func__); 709d4afb5ceSopenharmony_ci 710d4afb5ceSopenharmony_ci tp->context = context; 711d4afb5ceSopenharmony_ci tp->tp_list = context->tp_list_head; 712d4afb5ceSopenharmony_ci context->tp_list_head = tp; 713d4afb5ceSopenharmony_ci 714d4afb5ceSopenharmony_ci lws_context_unlock(context); 715d4afb5ceSopenharmony_ci 716d4afb5ceSopenharmony_ci pthread_mutex_init(&tp->lock, NULL); 717d4afb5ceSopenharmony_ci pthread_cond_init(&tp->wake_idle, NULL); 718d4afb5ceSopenharmony_ci 719d4afb5ceSopenharmony_ci for (n = 0; n < args->threads; n++) { 720d4afb5ceSopenharmony_ci#if defined(LWS_HAS_PTHREAD_SETNAME_NP) 721d4afb5ceSopenharmony_ci char name[16]; 722d4afb5ceSopenharmony_ci#endif 723d4afb5ceSopenharmony_ci tp->pool_list[n].tp = tp; 724d4afb5ceSopenharmony_ci tp->pool_list[n].worker_index = n; 725d4afb5ceSopenharmony_ci pthread_mutex_init(&tp->pool_list[n].lock, NULL); 726d4afb5ceSopenharmony_ci if (pthread_create(&tp->pool_list[n].thread, NULL, 727d4afb5ceSopenharmony_ci lws_threadpool_worker, &tp->pool_list[n])) { 728d4afb5ceSopenharmony_ci lwsl_err("thread creation failed\n"); 729d4afb5ceSopenharmony_ci } else { 730d4afb5ceSopenharmony_ci#if defined(LWS_HAS_PTHREAD_SETNAME_NP) 731d4afb5ceSopenharmony_ci lws_snprintf(name, sizeof(name), "%s-%d", tp->name, n); 732d4afb5ceSopenharmony_ci pthread_setname_np(tp->pool_list[n].thread, name); 733d4afb5ceSopenharmony_ci#endif 734d4afb5ceSopenharmony_ci tp->threads_in_pool++; 735d4afb5ceSopenharmony_ci } 736d4afb5ceSopenharmony_ci } 737d4afb5ceSopenharmony_ci 738d4afb5ceSopenharmony_ci return tp; 739d4afb5ceSopenharmony_ci} 740d4afb5ceSopenharmony_ci 741d4afb5ceSopenharmony_civoid 742d4afb5ceSopenharmony_cilws_threadpool_finish(struct lws_threadpool *tp) 743d4afb5ceSopenharmony_ci{ 744d4afb5ceSopenharmony_ci struct lws_threadpool_task **c, *task; 745d4afb5ceSopenharmony_ci 746d4afb5ceSopenharmony_ci pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */ 747d4afb5ceSopenharmony_ci 748d4afb5ceSopenharmony_ci /* nothing new can start, running jobs will abort as STOPPED and the 749d4afb5ceSopenharmony_ci * pool threads will exit ASAP (they are joined in destroy) */ 750d4afb5ceSopenharmony_ci tp->destroying = 1; 751d4afb5ceSopenharmony_ci 752d4afb5ceSopenharmony_ci /* stop everyone in the pending queue and move to the done queue */ 753d4afb5ceSopenharmony_ci 754d4afb5ceSopenharmony_ci c = &tp->task_queue_head; 755d4afb5ceSopenharmony_ci while (*c) { 756d4afb5ceSopenharmony_ci task = *c; 757d4afb5ceSopenharmony_ci *c = task->task_queue_next; 758d4afb5ceSopenharmony_ci task->task_queue_next = tp->task_done_head; 759d4afb5ceSopenharmony_ci tp->task_done_head = task; 760d4afb5ceSopenharmony_ci state_transition(task, LWS_TP_STATUS_STOPPED); 761d4afb5ceSopenharmony_ci tp->queue_depth--; 762d4afb5ceSopenharmony_ci tp->done_queue_depth++; 763d4afb5ceSopenharmony_ci task->done = lws_now_usecs(); 764d4afb5ceSopenharmony_ci 765d4afb5ceSopenharmony_ci c = &task->task_queue_next; 766d4afb5ceSopenharmony_ci } 767d4afb5ceSopenharmony_ci 768d4afb5ceSopenharmony_ci pthread_cond_broadcast(&tp->wake_idle); 769d4afb5ceSopenharmony_ci pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */ 770d4afb5ceSopenharmony_ci} 771d4afb5ceSopenharmony_ci 772d4afb5ceSopenharmony_civoid 773d4afb5ceSopenharmony_cilws_threadpool_destroy(struct lws_threadpool *tp) 774d4afb5ceSopenharmony_ci{ 775d4afb5ceSopenharmony_ci struct lws_threadpool_task *task, *next; 776d4afb5ceSopenharmony_ci struct lws_threadpool **ptp; 777d4afb5ceSopenharmony_ci void *retval; 778d4afb5ceSopenharmony_ci int n; 779d4afb5ceSopenharmony_ci 780d4afb5ceSopenharmony_ci /* remove us from the context list of threadpools */ 781d4afb5ceSopenharmony_ci 782d4afb5ceSopenharmony_ci lws_context_lock(tp->context, __func__); 783d4afb5ceSopenharmony_ci ptp = &tp->context->tp_list_head; 784d4afb5ceSopenharmony_ci 785d4afb5ceSopenharmony_ci while (*ptp) { 786d4afb5ceSopenharmony_ci if (*ptp == tp) { 787d4afb5ceSopenharmony_ci *ptp = tp->tp_list; 788d4afb5ceSopenharmony_ci break; 789d4afb5ceSopenharmony_ci } 790d4afb5ceSopenharmony_ci ptp = &(*ptp)->tp_list; 791d4afb5ceSopenharmony_ci } 792d4afb5ceSopenharmony_ci 793d4afb5ceSopenharmony_ci lws_context_unlock(tp->context); 794d4afb5ceSopenharmony_ci 795d4afb5ceSopenharmony_ci /* 796d4afb5ceSopenharmony_ci * Wake up the threadpool guys and tell them to exit 797d4afb5ceSopenharmony_ci */ 798d4afb5ceSopenharmony_ci 799d4afb5ceSopenharmony_ci pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */ 800d4afb5ceSopenharmony_ci tp->destroying = 1; 801d4afb5ceSopenharmony_ci pthread_cond_broadcast(&tp->wake_idle); 802d4afb5ceSopenharmony_ci pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */ 803d4afb5ceSopenharmony_ci 804d4afb5ceSopenharmony_ci lws_threadpool_dump(tp); 805d4afb5ceSopenharmony_ci 806d4afb5ceSopenharmony_ci lwsl_info("%s: waiting for threads to rejoin\n", __func__); 807d4afb5ceSopenharmony_ci#if defined(WIN32) 808d4afb5ceSopenharmony_ci Sleep(1000); 809d4afb5ceSopenharmony_ci#endif 810d4afb5ceSopenharmony_ci 811d4afb5ceSopenharmony_ci for (n = 0; n < tp->threads_in_pool; n++) { 812d4afb5ceSopenharmony_ci task = tp->pool_list[n].task; 813d4afb5ceSopenharmony_ci 814d4afb5ceSopenharmony_ci pthread_join(tp->pool_list[n].thread, &retval); 815d4afb5ceSopenharmony_ci pthread_mutex_destroy(&tp->pool_list[n].lock); 816d4afb5ceSopenharmony_ci } 817d4afb5ceSopenharmony_ci lwsl_info("%s: all threadpools exited\n", __func__); 818d4afb5ceSopenharmony_ci#if defined(WIN32) 819d4afb5ceSopenharmony_ci Sleep(1000); 820d4afb5ceSopenharmony_ci#endif 821d4afb5ceSopenharmony_ci 822d4afb5ceSopenharmony_ci task = tp->task_done_head; 823d4afb5ceSopenharmony_ci while (task) { 824d4afb5ceSopenharmony_ci next = task->task_queue_next; 825d4afb5ceSopenharmony_ci lws_threadpool_task_cleanup_destroy(task); 826d4afb5ceSopenharmony_ci tp->done_queue_depth--; 827d4afb5ceSopenharmony_ci task = next; 828d4afb5ceSopenharmony_ci } 829d4afb5ceSopenharmony_ci 830d4afb5ceSopenharmony_ci pthread_mutex_destroy(&tp->lock); 831d4afb5ceSopenharmony_ci 832d4afb5ceSopenharmony_ci memset(tp, 0xdd, sizeof(*tp)); 833d4afb5ceSopenharmony_ci lws_free(tp); 834d4afb5ceSopenharmony_ci} 835d4afb5ceSopenharmony_ci 836d4afb5ceSopenharmony_ci/* 837d4afb5ceSopenharmony_ci * We want to stop and destroy the tasks and related priv. 838d4afb5ceSopenharmony_ci */ 839d4afb5ceSopenharmony_ci 840d4afb5ceSopenharmony_ciint 841d4afb5ceSopenharmony_cilws_threadpool_dequeue_task(struct lws_threadpool_task *task) 842d4afb5ceSopenharmony_ci{ 843d4afb5ceSopenharmony_ci struct lws_threadpool *tp; 844d4afb5ceSopenharmony_ci struct lws_threadpool_task **c; 845d4afb5ceSopenharmony_ci int n; 846d4afb5ceSopenharmony_ci 847d4afb5ceSopenharmony_ci tp = task->tp; 848d4afb5ceSopenharmony_ci pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */ 849d4afb5ceSopenharmony_ci 850d4afb5ceSopenharmony_ci if (task->outlive && !tp->destroying) { 851d4afb5ceSopenharmony_ci 852d4afb5ceSopenharmony_ci /* disconnect from wsi, and wsi from task */ 853d4afb5ceSopenharmony_ci 854d4afb5ceSopenharmony_ci lws_dll2_remove(&task->list); 855d4afb5ceSopenharmony_ci task->args.wsi = NULL; 856d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS) 857d4afb5ceSopenharmony_ci task->args.ss = NULL; 858d4afb5ceSopenharmony_ci#endif 859d4afb5ceSopenharmony_ci 860d4afb5ceSopenharmony_ci goto bail; 861d4afb5ceSopenharmony_ci } 862d4afb5ceSopenharmony_ci 863d4afb5ceSopenharmony_ci 864d4afb5ceSopenharmony_ci c = &tp->task_queue_head; 865d4afb5ceSopenharmony_ci 866d4afb5ceSopenharmony_ci /* is he queued waiting for a chance to run? Mark him as stopped and 867d4afb5ceSopenharmony_ci * move him on to the done queue */ 868d4afb5ceSopenharmony_ci 869d4afb5ceSopenharmony_ci while (*c) { 870d4afb5ceSopenharmony_ci if ((*c) == task) { 871d4afb5ceSopenharmony_ci *c = task->task_queue_next; 872d4afb5ceSopenharmony_ci task->task_queue_next = tp->task_done_head; 873d4afb5ceSopenharmony_ci tp->task_done_head = task; 874d4afb5ceSopenharmony_ci state_transition(task, LWS_TP_STATUS_STOPPED); 875d4afb5ceSopenharmony_ci tp->queue_depth--; 876d4afb5ceSopenharmony_ci tp->done_queue_depth++; 877d4afb5ceSopenharmony_ci task->done = lws_now_usecs(); 878d4afb5ceSopenharmony_ci 879d4afb5ceSopenharmony_ci lwsl_debug("%s: tp %p: removed queued task %s\n", 880d4afb5ceSopenharmony_ci __func__, tp, lws_wsi_tag(task_to_wsi(task))); 881d4afb5ceSopenharmony_ci 882d4afb5ceSopenharmony_ci break; 883d4afb5ceSopenharmony_ci } 884d4afb5ceSopenharmony_ci c = &(*c)->task_queue_next; 885d4afb5ceSopenharmony_ci } 886d4afb5ceSopenharmony_ci 887d4afb5ceSopenharmony_ci /* is he on the done queue? */ 888d4afb5ceSopenharmony_ci 889d4afb5ceSopenharmony_ci c = &tp->task_done_head; 890d4afb5ceSopenharmony_ci while (*c) { 891d4afb5ceSopenharmony_ci if ((*c) == task) { 892d4afb5ceSopenharmony_ci *c = task->task_queue_next; 893d4afb5ceSopenharmony_ci task->task_queue_next = NULL; 894d4afb5ceSopenharmony_ci lws_threadpool_task_cleanup_destroy(task); 895d4afb5ceSopenharmony_ci tp->done_queue_depth--; 896d4afb5ceSopenharmony_ci goto bail; 897d4afb5ceSopenharmony_ci } 898d4afb5ceSopenharmony_ci c = &(*c)->task_queue_next; 899d4afb5ceSopenharmony_ci } 900d4afb5ceSopenharmony_ci 901d4afb5ceSopenharmony_ci /* he's not in the queue... is he already running on a thread? */ 902d4afb5ceSopenharmony_ci 903d4afb5ceSopenharmony_ci for (n = 0; n < tp->threads_in_pool; n++) { 904d4afb5ceSopenharmony_ci if (!tp->pool_list[n].task || tp->pool_list[n].task != task) 905d4afb5ceSopenharmony_ci continue; 906d4afb5ceSopenharmony_ci 907d4afb5ceSopenharmony_ci /* 908d4afb5ceSopenharmony_ci * ensure we don't collide with tests or changes in the 909d4afb5ceSopenharmony_ci * worker thread 910d4afb5ceSopenharmony_ci */ 911d4afb5ceSopenharmony_ci pthread_mutex_lock(&tp->pool_list[n].lock); 912d4afb5ceSopenharmony_ci 913d4afb5ceSopenharmony_ci /* 914d4afb5ceSopenharmony_ci * mark him as having been requested to stop... 915d4afb5ceSopenharmony_ci * the caller will hear about it in his service thread 916d4afb5ceSopenharmony_ci * context as a request to close 917d4afb5ceSopenharmony_ci */ 918d4afb5ceSopenharmony_ci state_transition(task, LWS_TP_STATUS_STOPPING); 919d4afb5ceSopenharmony_ci 920d4afb5ceSopenharmony_ci /* disconnect from wsi, and wsi from task */ 921d4afb5ceSopenharmony_ci 922d4afb5ceSopenharmony_ci lws_dll2_remove(&task->list); 923d4afb5ceSopenharmony_ci task->args.wsi = NULL; 924d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS) 925d4afb5ceSopenharmony_ci task->args.ss = NULL; 926d4afb5ceSopenharmony_ci#endif 927d4afb5ceSopenharmony_ci 928d4afb5ceSopenharmony_ci pthread_mutex_unlock(&tp->pool_list[n].lock); 929d4afb5ceSopenharmony_ci 930d4afb5ceSopenharmony_ci lwsl_debug("%s: tp %p: request stop running task " 931d4afb5ceSopenharmony_ci "for %s\n", __func__, tp, 932d4afb5ceSopenharmony_ci lws_wsi_tag(task_to_wsi(task))); 933d4afb5ceSopenharmony_ci 934d4afb5ceSopenharmony_ci break; 935d4afb5ceSopenharmony_ci } 936d4afb5ceSopenharmony_ci 937d4afb5ceSopenharmony_ci if (n == tp->threads_in_pool) { 938d4afb5ceSopenharmony_ci /* can't find it */ 939d4afb5ceSopenharmony_ci lwsl_notice("%s: tp %p: no task for %s, decoupling\n", 940d4afb5ceSopenharmony_ci __func__, tp, lws_wsi_tag(task_to_wsi(task))); 941d4afb5ceSopenharmony_ci lws_dll2_remove(&task->list); 942d4afb5ceSopenharmony_ci task->args.wsi = NULL; 943d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS) 944d4afb5ceSopenharmony_ci task->args.ss = NULL; 945d4afb5ceSopenharmony_ci#endif 946d4afb5ceSopenharmony_ci } 947d4afb5ceSopenharmony_ci 948d4afb5ceSopenharmony_cibail: 949d4afb5ceSopenharmony_ci pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */ 950d4afb5ceSopenharmony_ci 951d4afb5ceSopenharmony_ci return 0; 952d4afb5ceSopenharmony_ci} 953d4afb5ceSopenharmony_ci 954d4afb5ceSopenharmony_ciint 955d4afb5ceSopenharmony_cilws_threadpool_dequeue(struct lws *wsi) /* deprecated */ 956d4afb5ceSopenharmony_ci{ 957d4afb5ceSopenharmony_ci struct lws_threadpool_task *task; 958d4afb5ceSopenharmony_ci 959d4afb5ceSopenharmony_ci if (!wsi->tp_task_owner.count) 960d4afb5ceSopenharmony_ci return 0; 961d4afb5ceSopenharmony_ci assert(wsi->tp_task_owner.count != 1); 962d4afb5ceSopenharmony_ci 963d4afb5ceSopenharmony_ci task = lws_container_of(wsi->tp_task_owner.head, 964d4afb5ceSopenharmony_ci struct lws_threadpool_task, list); 965d4afb5ceSopenharmony_ci 966d4afb5ceSopenharmony_ci return lws_threadpool_dequeue_task(task); 967d4afb5ceSopenharmony_ci} 968d4afb5ceSopenharmony_ci 969d4afb5ceSopenharmony_cistruct lws_threadpool_task * 970d4afb5ceSopenharmony_cilws_threadpool_enqueue(struct lws_threadpool *tp, 971d4afb5ceSopenharmony_ci const struct lws_threadpool_task_args *args, 972d4afb5ceSopenharmony_ci const char *format, ...) 973d4afb5ceSopenharmony_ci{ 974d4afb5ceSopenharmony_ci struct lws_threadpool_task *task = NULL; 975d4afb5ceSopenharmony_ci va_list ap; 976d4afb5ceSopenharmony_ci 977d4afb5ceSopenharmony_ci if (tp->destroying) 978d4afb5ceSopenharmony_ci return NULL; 979d4afb5ceSopenharmony_ci 980d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS) 981d4afb5ceSopenharmony_ci assert(args->ss || args->wsi); 982d4afb5ceSopenharmony_ci#endif 983d4afb5ceSopenharmony_ci 984d4afb5ceSopenharmony_ci pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */ 985d4afb5ceSopenharmony_ci 986d4afb5ceSopenharmony_ci /* 987d4afb5ceSopenharmony_ci * if there's room on the queue, the job always goes on the queue 988d4afb5ceSopenharmony_ci * first, then any free thread may pick it up after the wake_idle 989d4afb5ceSopenharmony_ci */ 990d4afb5ceSopenharmony_ci 991d4afb5ceSopenharmony_ci if (tp->queue_depth == tp->max_queue_depth) { 992d4afb5ceSopenharmony_ci lwsl_notice("%s: queue reached limit %d\n", __func__, 993d4afb5ceSopenharmony_ci tp->max_queue_depth); 994d4afb5ceSopenharmony_ci 995d4afb5ceSopenharmony_ci goto bail; 996d4afb5ceSopenharmony_ci } 997d4afb5ceSopenharmony_ci 998d4afb5ceSopenharmony_ci /* 999d4afb5ceSopenharmony_ci * create the task object 1000d4afb5ceSopenharmony_ci */ 1001d4afb5ceSopenharmony_ci 1002d4afb5ceSopenharmony_ci task = lws_malloc(sizeof(*task), __func__); 1003d4afb5ceSopenharmony_ci if (!task) 1004d4afb5ceSopenharmony_ci goto bail; 1005d4afb5ceSopenharmony_ci 1006d4afb5ceSopenharmony_ci memset(task, 0, sizeof(*task)); 1007d4afb5ceSopenharmony_ci pthread_cond_init(&task->wake_idle, NULL); 1008d4afb5ceSopenharmony_ci task->args = *args; 1009d4afb5ceSopenharmony_ci task->tp = tp; 1010d4afb5ceSopenharmony_ci task->created = lws_now_usecs(); 1011d4afb5ceSopenharmony_ci 1012d4afb5ceSopenharmony_ci va_start(ap, format); 1013d4afb5ceSopenharmony_ci vsnprintf(task->name, sizeof(task->name) - 1, format, ap); 1014d4afb5ceSopenharmony_ci va_end(ap); 1015d4afb5ceSopenharmony_ci 1016d4afb5ceSopenharmony_ci /* 1017d4afb5ceSopenharmony_ci * add him on the tp task queue 1018d4afb5ceSopenharmony_ci */ 1019d4afb5ceSopenharmony_ci 1020d4afb5ceSopenharmony_ci task->task_queue_next = tp->task_queue_head; 1021d4afb5ceSopenharmony_ci state_transition(task, LWS_TP_STATUS_QUEUED); 1022d4afb5ceSopenharmony_ci tp->task_queue_head = task; 1023d4afb5ceSopenharmony_ci tp->queue_depth++; 1024d4afb5ceSopenharmony_ci 1025d4afb5ceSopenharmony_ci /* 1026d4afb5ceSopenharmony_ci * mark the wsi itself as depending on this tp (so wsi close for 1027d4afb5ceSopenharmony_ci * whatever reason can clean up) 1028d4afb5ceSopenharmony_ci */ 1029d4afb5ceSopenharmony_ci 1030d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS) 1031d4afb5ceSopenharmony_ci if (args->ss) 1032d4afb5ceSopenharmony_ci lws_dll2_add_tail(&task->list, &args->ss->wsi->tp_task_owner); 1033d4afb5ceSopenharmony_ci else 1034d4afb5ceSopenharmony_ci#endif 1035d4afb5ceSopenharmony_ci lws_dll2_add_tail(&task->list, &args->wsi->tp_task_owner); 1036d4afb5ceSopenharmony_ci 1037d4afb5ceSopenharmony_ci lwsl_thread("%s: tp %s: enqueued task %p (%s) for %s, depth %d\n", 1038d4afb5ceSopenharmony_ci __func__, tp->name, task, task->name, 1039d4afb5ceSopenharmony_ci lws_wsi_tag(task_to_wsi(task)), tp->queue_depth); 1040d4afb5ceSopenharmony_ci 1041d4afb5ceSopenharmony_ci /* alert any idle thread there's something new on the task list */ 1042d4afb5ceSopenharmony_ci 1043d4afb5ceSopenharmony_ci lws_memory_barrier(); 1044d4afb5ceSopenharmony_ci pthread_cond_signal(&tp->wake_idle); 1045d4afb5ceSopenharmony_ci 1046d4afb5ceSopenharmony_cibail: 1047d4afb5ceSopenharmony_ci pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */ 1048d4afb5ceSopenharmony_ci 1049d4afb5ceSopenharmony_ci return task; 1050d4afb5ceSopenharmony_ci} 1051d4afb5ceSopenharmony_ci 1052d4afb5ceSopenharmony_ci/* this should be called from the service thread */ 1053d4afb5ceSopenharmony_ci 1054d4afb5ceSopenharmony_cienum lws_threadpool_task_status 1055d4afb5ceSopenharmony_cilws_threadpool_task_status(struct lws_threadpool_task *task, void **user) 1056d4afb5ceSopenharmony_ci{ 1057d4afb5ceSopenharmony_ci enum lws_threadpool_task_status status; 1058d4afb5ceSopenharmony_ci struct lws_threadpool *tp = task->tp; 1059d4afb5ceSopenharmony_ci 1060d4afb5ceSopenharmony_ci if (!tp) 1061d4afb5ceSopenharmony_ci return LWS_TP_STATUS_FINISHED; 1062d4afb5ceSopenharmony_ci 1063d4afb5ceSopenharmony_ci *user = task->args.user; 1064d4afb5ceSopenharmony_ci status = task->status; 1065d4afb5ceSopenharmony_ci 1066d4afb5ceSopenharmony_ci if (status == LWS_TP_STATUS_FINISHED || 1067d4afb5ceSopenharmony_ci status == LWS_TP_STATUS_STOPPED) { 1068d4afb5ceSopenharmony_ci char buf[160]; 1069d4afb5ceSopenharmony_ci 1070d4afb5ceSopenharmony_ci pthread_mutex_lock(&tp->lock); /* ================ tpool lock */ 1071d4afb5ceSopenharmony_ci __lws_threadpool_task_dump(task, buf, sizeof(buf)); 1072d4afb5ceSopenharmony_ci lwsl_thread("%s: %s: service thread REAPING: %s\n", 1073d4afb5ceSopenharmony_ci __func__, tp->name, buf); 1074d4afb5ceSopenharmony_ci __lws_threadpool_reap(task); 1075d4afb5ceSopenharmony_ci lws_memory_barrier(); 1076d4afb5ceSopenharmony_ci pthread_mutex_unlock(&tp->lock); /* ------------ tpool unlock */ 1077d4afb5ceSopenharmony_ci } 1078d4afb5ceSopenharmony_ci 1079d4afb5ceSopenharmony_ci return status; 1080d4afb5ceSopenharmony_ci} 1081d4afb5ceSopenharmony_ci 1082d4afb5ceSopenharmony_cienum lws_threadpool_task_status 1083d4afb5ceSopenharmony_cilws_threadpool_task_status_noreap(struct lws_threadpool_task *task) 1084d4afb5ceSopenharmony_ci{ 1085d4afb5ceSopenharmony_ci return task->status; 1086d4afb5ceSopenharmony_ci} 1087d4afb5ceSopenharmony_ci 1088d4afb5ceSopenharmony_cienum lws_threadpool_task_status 1089d4afb5ceSopenharmony_cilws_threadpool_task_status_wsi(struct lws *wsi, 1090d4afb5ceSopenharmony_ci struct lws_threadpool_task **_task, void **user) 1091d4afb5ceSopenharmony_ci{ 1092d4afb5ceSopenharmony_ci struct lws_threadpool_task *task; 1093d4afb5ceSopenharmony_ci 1094d4afb5ceSopenharmony_ci if (!wsi->tp_task_owner.count) { 1095d4afb5ceSopenharmony_ci lwsl_notice("%s: wsi has no task, ~=FINISHED\n", __func__); 1096d4afb5ceSopenharmony_ci return LWS_TP_STATUS_FINISHED; 1097d4afb5ceSopenharmony_ci } 1098d4afb5ceSopenharmony_ci 1099d4afb5ceSopenharmony_ci assert(wsi->tp_task_owner.count == 1); /* see deprecation docs in hdr */ 1100d4afb5ceSopenharmony_ci 1101d4afb5ceSopenharmony_ci task = lws_container_of(wsi->tp_task_owner.head, 1102d4afb5ceSopenharmony_ci struct lws_threadpool_task, list); 1103d4afb5ceSopenharmony_ci 1104d4afb5ceSopenharmony_ci *_task = task; 1105d4afb5ceSopenharmony_ci 1106d4afb5ceSopenharmony_ci return lws_threadpool_task_status(task, user); 1107d4afb5ceSopenharmony_ci} 1108d4afb5ceSopenharmony_ci 1109d4afb5ceSopenharmony_civoid 1110d4afb5ceSopenharmony_cilws_threadpool_task_sync(struct lws_threadpool_task *task, int stop) 1111d4afb5ceSopenharmony_ci{ 1112d4afb5ceSopenharmony_ci lwsl_debug("%s\n", __func__); 1113d4afb5ceSopenharmony_ci if (!task) 1114d4afb5ceSopenharmony_ci return; 1115d4afb5ceSopenharmony_ci 1116d4afb5ceSopenharmony_ci if (stop) 1117d4afb5ceSopenharmony_ci state_transition(task, LWS_TP_STATUS_STOPPING); 1118d4afb5ceSopenharmony_ci 1119d4afb5ceSopenharmony_ci pthread_mutex_lock(&task->tp->lock); 1120d4afb5ceSopenharmony_ci pthread_cond_signal(&task->wake_idle); 1121d4afb5ceSopenharmony_ci pthread_mutex_unlock(&task->tp->lock); 1122d4afb5ceSopenharmony_ci} 1123d4afb5ceSopenharmony_ci 1124d4afb5ceSopenharmony_ciint 1125d4afb5ceSopenharmony_cilws_threadpool_foreach_task_wsi(struct lws *wsi, void *user, 1126d4afb5ceSopenharmony_ci int (*cb)(struct lws_threadpool_task *task, 1127d4afb5ceSopenharmony_ci void *user)) 1128d4afb5ceSopenharmony_ci{ 1129d4afb5ceSopenharmony_ci struct lws_threadpool_task *task1; 1130d4afb5ceSopenharmony_ci 1131d4afb5ceSopenharmony_ci if (wsi->tp_task_owner.head == NULL) 1132d4afb5ceSopenharmony_ci return 0; 1133d4afb5ceSopenharmony_ci 1134d4afb5ceSopenharmony_ci task1 = lws_container_of(wsi->tp_task_owner.head, 1135d4afb5ceSopenharmony_ci struct lws_threadpool_task, list); 1136d4afb5ceSopenharmony_ci 1137d4afb5ceSopenharmony_ci pthread_mutex_lock(&task1->tp->lock); /* ================ tpool lock */ 1138d4afb5ceSopenharmony_ci 1139d4afb5ceSopenharmony_ci lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1, 1140d4afb5ceSopenharmony_ci wsi->tp_task_owner.head) { 1141d4afb5ceSopenharmony_ci struct lws_threadpool_task *task = lws_container_of(d, 1142d4afb5ceSopenharmony_ci struct lws_threadpool_task, list); 1143d4afb5ceSopenharmony_ci 1144d4afb5ceSopenharmony_ci if (cb(task, user)) { 1145d4afb5ceSopenharmony_ci pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */ 1146d4afb5ceSopenharmony_ci return 1; 1147d4afb5ceSopenharmony_ci } 1148d4afb5ceSopenharmony_ci 1149d4afb5ceSopenharmony_ci } lws_end_foreach_dll_safe(d, d1); 1150d4afb5ceSopenharmony_ci 1151d4afb5ceSopenharmony_ci pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */ 1152d4afb5ceSopenharmony_ci 1153d4afb5ceSopenharmony_ci return 0; 1154d4afb5ceSopenharmony_ci} 1155d4afb5ceSopenharmony_ci 1156d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS) 1157d4afb5ceSopenharmony_ciint 1158d4afb5ceSopenharmony_cilws_threadpool_foreach_task_ss(struct lws_ss_handle *ss, void *user, 1159d4afb5ceSopenharmony_ci int (*cb)(struct lws_threadpool_task *task, 1160d4afb5ceSopenharmony_ci void *user)) 1161d4afb5ceSopenharmony_ci{ 1162d4afb5ceSopenharmony_ci if (!ss->wsi) 1163d4afb5ceSopenharmony_ci return 0; 1164d4afb5ceSopenharmony_ci 1165d4afb5ceSopenharmony_ci return lws_threadpool_foreach_task_wsi(ss->wsi, user, cb); 1166d4afb5ceSopenharmony_ci} 1167d4afb5ceSopenharmony_ci#endif 1168d4afb5ceSopenharmony_ci 1169d4afb5ceSopenharmony_cistatic int 1170d4afb5ceSopenharmony_cidisassociate_wsi(struct lws_threadpool_task *task, 1171d4afb5ceSopenharmony_ci void *user) 1172d4afb5ceSopenharmony_ci{ 1173d4afb5ceSopenharmony_ci task->args.wsi = NULL; 1174d4afb5ceSopenharmony_ci lws_dll2_remove(&task->list); 1175d4afb5ceSopenharmony_ci 1176d4afb5ceSopenharmony_ci return 0; 1177d4afb5ceSopenharmony_ci} 1178d4afb5ceSopenharmony_ci 1179d4afb5ceSopenharmony_civoid 1180d4afb5ceSopenharmony_cilws_threadpool_wsi_closing(struct lws *wsi) 1181d4afb5ceSopenharmony_ci{ 1182d4afb5ceSopenharmony_ci lws_threadpool_foreach_task_wsi(wsi, NULL, disassociate_wsi); 1183d4afb5ceSopenharmony_ci} 1184d4afb5ceSopenharmony_ci 1185d4afb5ceSopenharmony_cistruct lws_threadpool_task * 1186d4afb5ceSopenharmony_cilws_threadpool_get_task_wsi(struct lws *wsi) 1187d4afb5ceSopenharmony_ci{ 1188d4afb5ceSopenharmony_ci if (wsi->tp_task_owner.head == NULL) 1189d4afb5ceSopenharmony_ci return NULL; 1190d4afb5ceSopenharmony_ci 1191d4afb5ceSopenharmony_ci return lws_container_of(wsi->tp_task_owner.head, 1192d4afb5ceSopenharmony_ci struct lws_threadpool_task, list); 1193d4afb5ceSopenharmony_ci} 1194d4afb5ceSopenharmony_ci 1195d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS) 1196d4afb5ceSopenharmony_cistruct lws_threadpool_task * 1197d4afb5ceSopenharmony_cilws_threadpool_get_task_ss(struct lws_ss_handle *ss) 1198d4afb5ceSopenharmony_ci{ 1199d4afb5ceSopenharmony_ci return lws_threadpool_get_task_wsi(ss->wsi); 1200d4afb5ceSopenharmony_ci} 1201d4afb5ceSopenharmony_ci#endif 1202