1d4afb5ceSopenharmony_ci/*
2d4afb5ceSopenharmony_ci * libwebsockets - small server side websockets and web server implementation
3d4afb5ceSopenharmony_ci *
4d4afb5ceSopenharmony_ci * Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
5d4afb5ceSopenharmony_ci *
6d4afb5ceSopenharmony_ci * Permission is hereby granted, free of charge, to any person obtaining a copy
7d4afb5ceSopenharmony_ci * of this software and associated documentation files (the "Software"), to
8d4afb5ceSopenharmony_ci * deal in the Software without restriction, including without limitation the
9d4afb5ceSopenharmony_ci * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10d4afb5ceSopenharmony_ci * sell copies of the Software, and to permit persons to whom the Software is
11d4afb5ceSopenharmony_ci * furnished to do so, subject to the following conditions:
12d4afb5ceSopenharmony_ci *
13d4afb5ceSopenharmony_ci * The above copyright notice and this permission notice shall be included in
14d4afb5ceSopenharmony_ci * all copies or substantial portions of the Software.
15d4afb5ceSopenharmony_ci *
16d4afb5ceSopenharmony_ci * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17d4afb5ceSopenharmony_ci * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18d4afb5ceSopenharmony_ci * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19d4afb5ceSopenharmony_ci * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20d4afb5ceSopenharmony_ci * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21d4afb5ceSopenharmony_ci * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22d4afb5ceSopenharmony_ci * IN THE SOFTWARE.
23d4afb5ceSopenharmony_ci */
24d4afb5ceSopenharmony_ci
25d4afb5ceSopenharmony_ci#if !defined(_GNU_SOURCE)
26d4afb5ceSopenharmony_ci#define _GNU_SOURCE
27d4afb5ceSopenharmony_ci#endif
28d4afb5ceSopenharmony_ci
29d4afb5ceSopenharmony_ci#if defined(WIN32)
30d4afb5ceSopenharmony_ci#define HAVE_STRUCT_TIMESPEC
31d4afb5ceSopenharmony_ci#if defined(pid_t)
32d4afb5ceSopenharmony_ci#undef pid_t
33d4afb5ceSopenharmony_ci#endif
34d4afb5ceSopenharmony_ci#endif
35d4afb5ceSopenharmony_ci#include <pthread.h>
36d4afb5ceSopenharmony_ci
37d4afb5ceSopenharmony_ci#include "private-lib-core.h"
38d4afb5ceSopenharmony_ci
39d4afb5ceSopenharmony_ci#include <string.h>
40d4afb5ceSopenharmony_ci#include <stdio.h>
41d4afb5ceSopenharmony_ci
42d4afb5ceSopenharmony_cistruct lws_threadpool;
43d4afb5ceSopenharmony_ci
44d4afb5ceSopenharmony_cistruct lws_threadpool_task {
45d4afb5ceSopenharmony_ci	struct lws_threadpool_task	*task_queue_next;
46d4afb5ceSopenharmony_ci
47d4afb5ceSopenharmony_ci	struct lws_threadpool		*tp;
48d4afb5ceSopenharmony_ci	char				name[32];
49d4afb5ceSopenharmony_ci	struct lws_threadpool_task_args args;
50d4afb5ceSopenharmony_ci
51d4afb5ceSopenharmony_ci	lws_dll2_t			list;
52d4afb5ceSopenharmony_ci
53d4afb5ceSopenharmony_ci	lws_usec_t			created;
54d4afb5ceSopenharmony_ci	lws_usec_t			acquired;
55d4afb5ceSopenharmony_ci	lws_usec_t			done;
56d4afb5ceSopenharmony_ci	lws_usec_t			entered_state;
57d4afb5ceSopenharmony_ci
58d4afb5ceSopenharmony_ci	lws_usec_t			acc_running;
59d4afb5ceSopenharmony_ci	lws_usec_t			acc_syncing;
60d4afb5ceSopenharmony_ci
61d4afb5ceSopenharmony_ci	pthread_cond_t			wake_idle;
62d4afb5ceSopenharmony_ci
63d4afb5ceSopenharmony_ci	enum lws_threadpool_task_status status;
64d4afb5ceSopenharmony_ci
65d4afb5ceSopenharmony_ci	int				late_sync_retries;
66d4afb5ceSopenharmony_ci
67d4afb5ceSopenharmony_ci	char				wanted_writeable_cb;
68d4afb5ceSopenharmony_ci	char				outlive;
69d4afb5ceSopenharmony_ci};
70d4afb5ceSopenharmony_ci
71d4afb5ceSopenharmony_cistruct lws_pool {
72d4afb5ceSopenharmony_ci	struct lws_threadpool		*tp;
73d4afb5ceSopenharmony_ci	pthread_t			thread;
74d4afb5ceSopenharmony_ci	pthread_mutex_t			lock; /* part of task wake_idle */
75d4afb5ceSopenharmony_ci	struct lws_threadpool_task	*task;
76d4afb5ceSopenharmony_ci	lws_usec_t			acquired;
77d4afb5ceSopenharmony_ci	int				worker_index;
78d4afb5ceSopenharmony_ci};
79d4afb5ceSopenharmony_ci
80d4afb5ceSopenharmony_cistruct lws_threadpool {
81d4afb5ceSopenharmony_ci	pthread_mutex_t			lock; /* protects all pool lists */
82d4afb5ceSopenharmony_ci	pthread_cond_t			wake_idle;
83d4afb5ceSopenharmony_ci	struct lws_pool			*pool_list;
84d4afb5ceSopenharmony_ci
85d4afb5ceSopenharmony_ci	struct lws_context		*context;
86d4afb5ceSopenharmony_ci	struct lws_threadpool		*tp_list; /* context list of threadpools */
87d4afb5ceSopenharmony_ci
88d4afb5ceSopenharmony_ci	struct lws_threadpool_task	*task_queue_head;
89d4afb5ceSopenharmony_ci	struct lws_threadpool_task	*task_done_head;
90d4afb5ceSopenharmony_ci
91d4afb5ceSopenharmony_ci	char				name[32];
92d4afb5ceSopenharmony_ci
93d4afb5ceSopenharmony_ci	int				threads_in_pool;
94d4afb5ceSopenharmony_ci	int				queue_depth;
95d4afb5ceSopenharmony_ci	int				done_queue_depth;
96d4afb5ceSopenharmony_ci	int				max_queue_depth;
97d4afb5ceSopenharmony_ci	int				running_tasks;
98d4afb5ceSopenharmony_ci
99d4afb5ceSopenharmony_ci	unsigned int			destroying:1;
100d4afb5ceSopenharmony_ci};
101d4afb5ceSopenharmony_ci
102d4afb5ceSopenharmony_cistatic int
103d4afb5ceSopenharmony_cims_delta(lws_usec_t now, lws_usec_t then)
104d4afb5ceSopenharmony_ci{
105d4afb5ceSopenharmony_ci	return (int)((now - then) / 1000);
106d4afb5ceSopenharmony_ci}
107d4afb5ceSopenharmony_ci
108d4afb5ceSopenharmony_cistatic void
109d4afb5ceSopenharmony_cius_accrue(lws_usec_t *acc, lws_usec_t then)
110d4afb5ceSopenharmony_ci{
111d4afb5ceSopenharmony_ci	lws_usec_t now = lws_now_usecs();
112d4afb5ceSopenharmony_ci
113d4afb5ceSopenharmony_ci	*acc += now - then;
114d4afb5ceSopenharmony_ci}
115d4afb5ceSopenharmony_ci
116d4afb5ceSopenharmony_cistatic int
117d4afb5ceSopenharmony_cipc_delta(lws_usec_t now, lws_usec_t then, lws_usec_t us)
118d4afb5ceSopenharmony_ci{
119d4afb5ceSopenharmony_ci	lws_usec_t delta = (now - then) + 1;
120d4afb5ceSopenharmony_ci
121d4afb5ceSopenharmony_ci	return (int)((us * 100) / delta);
122d4afb5ceSopenharmony_ci}
123d4afb5ceSopenharmony_ci
124d4afb5ceSopenharmony_cistatic void
125d4afb5ceSopenharmony_ci__lws_threadpool_task_dump(struct lws_threadpool_task *task, char *buf, int len)
126d4afb5ceSopenharmony_ci{
127d4afb5ceSopenharmony_ci	lws_usec_t now = lws_now_usecs();
128d4afb5ceSopenharmony_ci	char *end = buf + len - 1;
129d4afb5ceSopenharmony_ci	int syncms = 0, runms = 0;
130d4afb5ceSopenharmony_ci
131d4afb5ceSopenharmony_ci	if (!task->acquired) {
132d4afb5ceSopenharmony_ci		buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
133d4afb5ceSopenharmony_ci				    "task: %s, QUEUED queued: %dms",
134d4afb5ceSopenharmony_ci				    task->name, ms_delta(now, task->created));
135d4afb5ceSopenharmony_ci
136d4afb5ceSopenharmony_ci		return;
137d4afb5ceSopenharmony_ci	}
138d4afb5ceSopenharmony_ci
139d4afb5ceSopenharmony_ci	if (task->acc_running)
140d4afb5ceSopenharmony_ci		runms = (int)task->acc_running;
141d4afb5ceSopenharmony_ci
142d4afb5ceSopenharmony_ci	if (task->acc_syncing)
143d4afb5ceSopenharmony_ci		syncms = (int)task->acc_syncing;
144d4afb5ceSopenharmony_ci
145d4afb5ceSopenharmony_ci	if (!task->done) {
146d4afb5ceSopenharmony_ci		buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
147d4afb5ceSopenharmony_ci			"task: %s, ONGOING state %d (%dms) alive: %dms "
148d4afb5ceSopenharmony_ci			"(queued %dms, acquired: %dms, "
149d4afb5ceSopenharmony_ci			"run: %d%%, sync: %d%%)", task->name, task->status,
150d4afb5ceSopenharmony_ci			ms_delta(now, task->entered_state),
151d4afb5ceSopenharmony_ci			ms_delta(now, task->created),
152d4afb5ceSopenharmony_ci			ms_delta(task->acquired, task->created),
153d4afb5ceSopenharmony_ci			ms_delta(now, task->acquired),
154d4afb5ceSopenharmony_ci			pc_delta(now, task->acquired, runms),
155d4afb5ceSopenharmony_ci			pc_delta(now, task->acquired, syncms));
156d4afb5ceSopenharmony_ci
157d4afb5ceSopenharmony_ci		return;
158d4afb5ceSopenharmony_ci	}
159d4afb5ceSopenharmony_ci
160d4afb5ceSopenharmony_ci	lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
161d4afb5ceSopenharmony_ci		"task: %s, DONE state %d lived: %dms "
162d4afb5ceSopenharmony_ci		"(queued %dms, on thread: %dms, "
163d4afb5ceSopenharmony_ci		"ran: %d%%, synced: %d%%)", task->name, task->status,
164d4afb5ceSopenharmony_ci		ms_delta(task->done, task->created),
165d4afb5ceSopenharmony_ci		ms_delta(task->acquired, task->created),
166d4afb5ceSopenharmony_ci		ms_delta(task->done, task->acquired),
167d4afb5ceSopenharmony_ci		pc_delta(task->done, task->acquired, runms),
168d4afb5ceSopenharmony_ci		pc_delta(task->done, task->acquired, syncms));
169d4afb5ceSopenharmony_ci}
170d4afb5ceSopenharmony_ci
171d4afb5ceSopenharmony_civoid
172d4afb5ceSopenharmony_cilws_threadpool_dump(struct lws_threadpool *tp)
173d4afb5ceSopenharmony_ci{
174d4afb5ceSopenharmony_ci#if 0
175d4afb5ceSopenharmony_ci	//defined(_DEBUG)
176d4afb5ceSopenharmony_ci	struct lws_threadpool_task **c;
177d4afb5ceSopenharmony_ci	char buf[160];
178d4afb5ceSopenharmony_ci	int n, count;
179d4afb5ceSopenharmony_ci
180d4afb5ceSopenharmony_ci	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
181d4afb5ceSopenharmony_ci
182d4afb5ceSopenharmony_ci	lwsl_thread("%s: tp: %s, Queued: %d, Run: %d, Done: %d\n", __func__,
183d4afb5ceSopenharmony_ci		    tp->name, tp->queue_depth, tp->running_tasks,
184d4afb5ceSopenharmony_ci		    tp->done_queue_depth);
185d4afb5ceSopenharmony_ci
186d4afb5ceSopenharmony_ci	count = 0;
187d4afb5ceSopenharmony_ci	c = &tp->task_queue_head;
188d4afb5ceSopenharmony_ci	while (*c) {
189d4afb5ceSopenharmony_ci		struct lws_threadpool_task *task = *c;
190d4afb5ceSopenharmony_ci		__lws_threadpool_task_dump(task, buf, sizeof(buf));
191d4afb5ceSopenharmony_ci		lwsl_thread("  - %s\n", buf);
192d4afb5ceSopenharmony_ci		count++;
193d4afb5ceSopenharmony_ci
194d4afb5ceSopenharmony_ci		c = &(*c)->task_queue_next;
195d4afb5ceSopenharmony_ci	}
196d4afb5ceSopenharmony_ci
197d4afb5ceSopenharmony_ci	if (count != tp->queue_depth)
198d4afb5ceSopenharmony_ci		lwsl_err("%s: tp says queue depth %d, but actually %d\n",
199d4afb5ceSopenharmony_ci			 __func__, tp->queue_depth, count);
200d4afb5ceSopenharmony_ci
201d4afb5ceSopenharmony_ci	count = 0;
202d4afb5ceSopenharmony_ci	for (n = 0; n < tp->threads_in_pool; n++) {
203d4afb5ceSopenharmony_ci		struct lws_pool *pool = &tp->pool_list[n];
204d4afb5ceSopenharmony_ci		struct lws_threadpool_task *task = pool->task;
205d4afb5ceSopenharmony_ci
206d4afb5ceSopenharmony_ci		if (task) {
207d4afb5ceSopenharmony_ci			__lws_threadpool_task_dump(task, buf, sizeof(buf));
208d4afb5ceSopenharmony_ci			lwsl_thread("  - worker %d: %s\n", n, buf);
209d4afb5ceSopenharmony_ci			count++;
210d4afb5ceSopenharmony_ci		}
211d4afb5ceSopenharmony_ci	}
212d4afb5ceSopenharmony_ci
213d4afb5ceSopenharmony_ci	if (count != tp->running_tasks)
214d4afb5ceSopenharmony_ci		lwsl_err("%s: tp says %d running_tasks, but actually %d\n",
215d4afb5ceSopenharmony_ci			 __func__, tp->running_tasks, count);
216d4afb5ceSopenharmony_ci
217d4afb5ceSopenharmony_ci	count = 0;
218d4afb5ceSopenharmony_ci	c = &tp->task_done_head;
219d4afb5ceSopenharmony_ci	while (*c) {
220d4afb5ceSopenharmony_ci		struct lws_threadpool_task *task = *c;
221d4afb5ceSopenharmony_ci		__lws_threadpool_task_dump(task, buf, sizeof(buf));
222d4afb5ceSopenharmony_ci		lwsl_thread("  - %s\n", buf);
223d4afb5ceSopenharmony_ci		count++;
224d4afb5ceSopenharmony_ci
225d4afb5ceSopenharmony_ci		c = &(*c)->task_queue_next;
226d4afb5ceSopenharmony_ci	}
227d4afb5ceSopenharmony_ci
228d4afb5ceSopenharmony_ci	if (count != tp->done_queue_depth)
229d4afb5ceSopenharmony_ci		lwsl_err("%s: tp says done_queue_depth %d, but actually %d\n",
230d4afb5ceSopenharmony_ci			 __func__, tp->done_queue_depth, count);
231d4afb5ceSopenharmony_ci
232d4afb5ceSopenharmony_ci	pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
233d4afb5ceSopenharmony_ci#endif
234d4afb5ceSopenharmony_ci}
235d4afb5ceSopenharmony_ci
236d4afb5ceSopenharmony_cistatic void
237d4afb5ceSopenharmony_cistate_transition(struct lws_threadpool_task *task,
238d4afb5ceSopenharmony_ci		 enum lws_threadpool_task_status status)
239d4afb5ceSopenharmony_ci{
240d4afb5ceSopenharmony_ci	task->entered_state = lws_now_usecs();
241d4afb5ceSopenharmony_ci	task->status = status;
242d4afb5ceSopenharmony_ci}
243d4afb5ceSopenharmony_ci
244d4afb5ceSopenharmony_cistatic struct lws *
245d4afb5ceSopenharmony_citask_to_wsi(struct lws_threadpool_task *task)
246d4afb5ceSopenharmony_ci{
247d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS)
248d4afb5ceSopenharmony_ci	if (task->args.ss)
249d4afb5ceSopenharmony_ci		return task->args.ss->wsi;
250d4afb5ceSopenharmony_ci#endif
251d4afb5ceSopenharmony_ci	return task->args.wsi;
252d4afb5ceSopenharmony_ci}
253d4afb5ceSopenharmony_ci
254d4afb5ceSopenharmony_cistatic void
255d4afb5ceSopenharmony_cilws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task)
256d4afb5ceSopenharmony_ci{
257d4afb5ceSopenharmony_ci	if (task->args.cleanup)
258d4afb5ceSopenharmony_ci		task->args.cleanup(task_to_wsi(task), task->args.user);
259d4afb5ceSopenharmony_ci
260d4afb5ceSopenharmony_ci	lws_dll2_remove(&task->list);
261d4afb5ceSopenharmony_ci
262d4afb5ceSopenharmony_ci	lwsl_thread("%s: tp %p: cleaned finished task for %s\n",
263d4afb5ceSopenharmony_ci		    __func__, task->tp, lws_wsi_tag(task_to_wsi(task)));
264d4afb5ceSopenharmony_ci
265d4afb5ceSopenharmony_ci	lws_free(task);
266d4afb5ceSopenharmony_ci}
267d4afb5ceSopenharmony_ci
268d4afb5ceSopenharmony_cistatic void
269d4afb5ceSopenharmony_ci__lws_threadpool_reap(struct lws_threadpool_task *task)
270d4afb5ceSopenharmony_ci{
271d4afb5ceSopenharmony_ci	struct lws_threadpool_task **c, *t = NULL;
272d4afb5ceSopenharmony_ci	struct lws_threadpool *tp = task->tp;
273d4afb5ceSopenharmony_ci
274d4afb5ceSopenharmony_ci	/* remove the task from the done queue */
275d4afb5ceSopenharmony_ci
276d4afb5ceSopenharmony_ci	if (tp) {
277d4afb5ceSopenharmony_ci		c = &tp->task_done_head;
278d4afb5ceSopenharmony_ci
279d4afb5ceSopenharmony_ci		while (*c) {
280d4afb5ceSopenharmony_ci			if ((*c) == task) {
281d4afb5ceSopenharmony_ci				t = *c;
282d4afb5ceSopenharmony_ci				*c = t->task_queue_next;
283d4afb5ceSopenharmony_ci				t->task_queue_next = NULL;
284d4afb5ceSopenharmony_ci				tp->done_queue_depth--;
285d4afb5ceSopenharmony_ci
286d4afb5ceSopenharmony_ci				lwsl_thread("%s: tp %s: reaped task %s\n", __func__,
287d4afb5ceSopenharmony_ci					   tp->name, lws_wsi_tag(task_to_wsi(task)));
288d4afb5ceSopenharmony_ci
289d4afb5ceSopenharmony_ci				break;
290d4afb5ceSopenharmony_ci			}
291d4afb5ceSopenharmony_ci			c = &(*c)->task_queue_next;
292d4afb5ceSopenharmony_ci		}
293d4afb5ceSopenharmony_ci
294d4afb5ceSopenharmony_ci		if (!t) {
295d4afb5ceSopenharmony_ci			lwsl_err("%s: task %p not in done queue\n", __func__, task);
296d4afb5ceSopenharmony_ci			/*
297d4afb5ceSopenharmony_ci			 * This shouldn't occur, but in this case not really
298d4afb5ceSopenharmony_ci			 * safe to assume there's a task to destroy
299d4afb5ceSopenharmony_ci			 */
300d4afb5ceSopenharmony_ci			return;
301d4afb5ceSopenharmony_ci		}
302d4afb5ceSopenharmony_ci	} else
303d4afb5ceSopenharmony_ci		lwsl_err("%s: task->tp NULL already\n", __func__);
304d4afb5ceSopenharmony_ci
305d4afb5ceSopenharmony_ci	/* call the task's cleanup and delete the task itself */
306d4afb5ceSopenharmony_ci
307d4afb5ceSopenharmony_ci	lws_threadpool_task_cleanup_destroy(task);
308d4afb5ceSopenharmony_ci}
309d4afb5ceSopenharmony_ci
310d4afb5ceSopenharmony_ci/*
311d4afb5ceSopenharmony_ci * this gets called from each tsi service context after the service was
312d4afb5ceSopenharmony_ci * cancelled... we need to ask for the writable callback from the matching
313d4afb5ceSopenharmony_ci * tsi context for any wsis bound to a worked thread that need it
314d4afb5ceSopenharmony_ci */
315d4afb5ceSopenharmony_ci
316d4afb5ceSopenharmony_ciint
317d4afb5ceSopenharmony_cilws_threadpool_tsi_context(struct lws_context *context, int tsi)
318d4afb5ceSopenharmony_ci{
319d4afb5ceSopenharmony_ci	struct lws_threadpool_task **c, *task = NULL;
320d4afb5ceSopenharmony_ci	struct lws_threadpool *tp;
321d4afb5ceSopenharmony_ci	struct lws *wsi;
322d4afb5ceSopenharmony_ci
323d4afb5ceSopenharmony_ci	lws_context_lock(context, __func__);
324d4afb5ceSopenharmony_ci
325d4afb5ceSopenharmony_ci	tp = context->tp_list_head;
326d4afb5ceSopenharmony_ci	while (tp) {
327d4afb5ceSopenharmony_ci		int n;
328d4afb5ceSopenharmony_ci
329d4afb5ceSopenharmony_ci		/* for the running (syncing...) tasks... */
330d4afb5ceSopenharmony_ci
331d4afb5ceSopenharmony_ci		for (n = 0; n < tp->threads_in_pool; n++) {
332d4afb5ceSopenharmony_ci			struct lws_pool *pool = &tp->pool_list[n];
333d4afb5ceSopenharmony_ci
334d4afb5ceSopenharmony_ci			task = pool->task;
335d4afb5ceSopenharmony_ci			if (!task)
336d4afb5ceSopenharmony_ci				continue;
337d4afb5ceSopenharmony_ci
338d4afb5ceSopenharmony_ci			wsi = task_to_wsi(task);
339d4afb5ceSopenharmony_ci			if (!wsi || wsi->tsi != tsi ||
340d4afb5ceSopenharmony_ci			    (!task->wanted_writeable_cb &&
341d4afb5ceSopenharmony_ci			     task->status != LWS_TP_STATUS_SYNCING))
342d4afb5ceSopenharmony_ci				continue;
343d4afb5ceSopenharmony_ci
344d4afb5ceSopenharmony_ci			task->wanted_writeable_cb = 0;
345d4afb5ceSopenharmony_ci			lws_memory_barrier();
346d4afb5ceSopenharmony_ci
347d4afb5ceSopenharmony_ci			/*
348d4afb5ceSopenharmony_ci			 * finally... we can ask for the callback on
349d4afb5ceSopenharmony_ci			 * writable from the correct service thread
350d4afb5ceSopenharmony_ci			 * context
351d4afb5ceSopenharmony_ci			 */
352d4afb5ceSopenharmony_ci
353d4afb5ceSopenharmony_ci			lws_callback_on_writable(wsi);
354d4afb5ceSopenharmony_ci		}
355d4afb5ceSopenharmony_ci
356d4afb5ceSopenharmony_ci		/* for the done tasks... */
357d4afb5ceSopenharmony_ci
358d4afb5ceSopenharmony_ci		c = &tp->task_done_head;
359d4afb5ceSopenharmony_ci
360d4afb5ceSopenharmony_ci		while (*c) {
361d4afb5ceSopenharmony_ci			task = *c;
362d4afb5ceSopenharmony_ci			wsi = task_to_wsi(task);
363d4afb5ceSopenharmony_ci
364d4afb5ceSopenharmony_ci			if (wsi && wsi->tsi == tsi &&
365d4afb5ceSopenharmony_ci			    (task->wanted_writeable_cb ||
366d4afb5ceSopenharmony_ci			     task->status == LWS_TP_STATUS_SYNCING)) {
367d4afb5ceSopenharmony_ci
368d4afb5ceSopenharmony_ci				task->wanted_writeable_cb = 0;
369d4afb5ceSopenharmony_ci				lws_memory_barrier();
370d4afb5ceSopenharmony_ci
371d4afb5ceSopenharmony_ci				/*
372d4afb5ceSopenharmony_ci				 * finally... we can ask for the callback on
373d4afb5ceSopenharmony_ci				 * writable from the correct service thread
374d4afb5ceSopenharmony_ci				 * context
375d4afb5ceSopenharmony_ci				 */
376d4afb5ceSopenharmony_ci
377d4afb5ceSopenharmony_ci				lws_callback_on_writable(wsi);
378d4afb5ceSopenharmony_ci			}
379d4afb5ceSopenharmony_ci
380d4afb5ceSopenharmony_ci			c = &task->task_queue_next;
381d4afb5ceSopenharmony_ci		}
382d4afb5ceSopenharmony_ci
383d4afb5ceSopenharmony_ci		tp = tp->tp_list;
384d4afb5ceSopenharmony_ci	}
385d4afb5ceSopenharmony_ci
386d4afb5ceSopenharmony_ci	lws_context_unlock(context);
387d4afb5ceSopenharmony_ci
388d4afb5ceSopenharmony_ci	return 0;
389d4afb5ceSopenharmony_ci}
390d4afb5ceSopenharmony_ci
391d4afb5ceSopenharmony_cistatic int
392d4afb5ceSopenharmony_cilws_threadpool_worker_sync(struct lws_pool *pool,
393d4afb5ceSopenharmony_ci			   struct lws_threadpool_task *task)
394d4afb5ceSopenharmony_ci{
395d4afb5ceSopenharmony_ci	enum lws_threadpool_task_status temp;
396d4afb5ceSopenharmony_ci	struct timespec abstime;
397d4afb5ceSopenharmony_ci	struct lws *wsi;
398d4afb5ceSopenharmony_ci	int tries = 15;
399d4afb5ceSopenharmony_ci
400d4afb5ceSopenharmony_ci	/* block until writable acknowledges */
401d4afb5ceSopenharmony_ci	lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC in\n", __func__, task);
402d4afb5ceSopenharmony_ci	pthread_mutex_lock(&pool->lock); /* ======================= pool lock */
403d4afb5ceSopenharmony_ci
404d4afb5ceSopenharmony_ci	lwsl_info("%s: %s: task %p (%s): syncing with %s\n", __func__,
405d4afb5ceSopenharmony_ci		    pool->tp->name, task, task->name, lws_wsi_tag(task_to_wsi(task)));
406d4afb5ceSopenharmony_ci
407d4afb5ceSopenharmony_ci	temp = task->status;
408d4afb5ceSopenharmony_ci	state_transition(task, LWS_TP_STATUS_SYNCING);
409d4afb5ceSopenharmony_ci	while (tries--) {
410d4afb5ceSopenharmony_ci		wsi = task_to_wsi(task);
411d4afb5ceSopenharmony_ci
412d4afb5ceSopenharmony_ci		/*
413d4afb5ceSopenharmony_ci		 * if the wsi is no longer attached to this task, there is
414d4afb5ceSopenharmony_ci		 * nothing we can sync to usefully.  Since the work wants to
415d4afb5ceSopenharmony_ci		 * sync, it means we should react to the situation by telling
416d4afb5ceSopenharmony_ci		 * the task it can't continue usefully by stopping it.
417d4afb5ceSopenharmony_ci		 */
418d4afb5ceSopenharmony_ci
419d4afb5ceSopenharmony_ci		if (!wsi) {
420d4afb5ceSopenharmony_ci			lwsl_thread("%s: %s: task %p (%s): No longer bound to any "
421d4afb5ceSopenharmony_ci				 "wsi to sync to\n", __func__, pool->tp->name,
422d4afb5ceSopenharmony_ci				 task, task->name);
423d4afb5ceSopenharmony_ci
424d4afb5ceSopenharmony_ci			state_transition(task, LWS_TP_STATUS_STOPPING);
425d4afb5ceSopenharmony_ci			goto done;
426d4afb5ceSopenharmony_ci		}
427d4afb5ceSopenharmony_ci
428d4afb5ceSopenharmony_ci		/*
429d4afb5ceSopenharmony_ci		 * So "tries" times this is the maximum time between SYNC asking
430d4afb5ceSopenharmony_ci		 * for a callback on writable and actually getting it we are
431d4afb5ceSopenharmony_ci		 * willing to sit still for.
432d4afb5ceSopenharmony_ci		 *
433d4afb5ceSopenharmony_ci		 * If it is exceeded, we will stop the task.
434d4afb5ceSopenharmony_ci		 */
435d4afb5ceSopenharmony_ci		abstime.tv_sec = time(NULL) + 3;
436d4afb5ceSopenharmony_ci		abstime.tv_nsec = 0;
437d4afb5ceSopenharmony_ci
438d4afb5ceSopenharmony_ci		task->wanted_writeable_cb = 1;
439d4afb5ceSopenharmony_ci		lws_memory_barrier();
440d4afb5ceSopenharmony_ci
441d4afb5ceSopenharmony_ci		/*
442d4afb5ceSopenharmony_ci		 * This will cause lws_threadpool_tsi_context() to get called
443d4afb5ceSopenharmony_ci		 * from each tsi service context, where we can safely ask for
444d4afb5ceSopenharmony_ci		 * a callback on writeable on the wsi we are associated with.
445d4afb5ceSopenharmony_ci		 */
446d4afb5ceSopenharmony_ci		lws_cancel_service(lws_get_context(wsi));
447d4afb5ceSopenharmony_ci
448d4afb5ceSopenharmony_ci		/*
449d4afb5ceSopenharmony_ci		 * so the danger here is that we asked for a writable callback
450d4afb5ceSopenharmony_ci		 * on the wsi, but for whatever reason, we are never going to
451d4afb5ceSopenharmony_ci		 * get one.  To avoid deadlocking forever, we allow a set time
452d4afb5ceSopenharmony_ci		 * for the sync to happen naturally, otherwise the cond wait
453d4afb5ceSopenharmony_ci		 * times out and we stop the task.
454d4afb5ceSopenharmony_ci		 */
455d4afb5ceSopenharmony_ci
456d4afb5ceSopenharmony_ci		if (pthread_cond_timedwait(&task->wake_idle, &pool->lock,
457d4afb5ceSopenharmony_ci					   &abstime) == ETIMEDOUT) {
458d4afb5ceSopenharmony_ci			task->late_sync_retries++;
459d4afb5ceSopenharmony_ci			if (!tries) {
460d4afb5ceSopenharmony_ci				lwsl_err("%s: %s: task %p (%s): SYNC timed out "
461d4afb5ceSopenharmony_ci					 "(associated %s)\n",
462d4afb5ceSopenharmony_ci					 __func__, pool->tp->name, task,
463d4afb5ceSopenharmony_ci					 task->name, lws_wsi_tag(task_to_wsi(task)));
464d4afb5ceSopenharmony_ci
465d4afb5ceSopenharmony_ci				pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */
466d4afb5ceSopenharmony_ci				lws_threadpool_dequeue_task(task);
467d4afb5ceSopenharmony_ci				return 1; /* destroyed task */
468d4afb5ceSopenharmony_ci			}
469d4afb5ceSopenharmony_ci
470d4afb5ceSopenharmony_ci			continue;
471d4afb5ceSopenharmony_ci		} else
472d4afb5ceSopenharmony_ci			break;
473d4afb5ceSopenharmony_ci	}
474d4afb5ceSopenharmony_ci
475d4afb5ceSopenharmony_ci	if (task->status == LWS_TP_STATUS_SYNCING)
476d4afb5ceSopenharmony_ci		state_transition(task, temp);
477d4afb5ceSopenharmony_ci
478d4afb5ceSopenharmony_ci	lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC out\n", __func__, task);
479d4afb5ceSopenharmony_ci
480d4afb5ceSopenharmony_cidone:
481d4afb5ceSopenharmony_ci	pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */
482d4afb5ceSopenharmony_ci
483d4afb5ceSopenharmony_ci	return 0;
484d4afb5ceSopenharmony_ci}
485d4afb5ceSopenharmony_ci
486d4afb5ceSopenharmony_ci#if !defined(WIN32)
487d4afb5ceSopenharmony_cistatic int dummy;
488d4afb5ceSopenharmony_ci#endif
489d4afb5ceSopenharmony_ci
490d4afb5ceSopenharmony_cistatic void *
491d4afb5ceSopenharmony_cilws_threadpool_worker(void *d)
492d4afb5ceSopenharmony_ci{
493d4afb5ceSopenharmony_ci	struct lws_threadpool_task **c, **c2, *task;
494d4afb5ceSopenharmony_ci	struct lws_pool *pool = d;
495d4afb5ceSopenharmony_ci	struct lws_threadpool *tp = pool->tp;
496d4afb5ceSopenharmony_ci	char buf[160];
497d4afb5ceSopenharmony_ci
498d4afb5ceSopenharmony_ci	while (!tp->destroying) {
499d4afb5ceSopenharmony_ci
500d4afb5ceSopenharmony_ci		/* we have no running task... wait and get one from the queue */
501d4afb5ceSopenharmony_ci
502d4afb5ceSopenharmony_ci		pthread_mutex_lock(&tp->lock); /* =================== tp lock */
503d4afb5ceSopenharmony_ci
504d4afb5ceSopenharmony_ci		/*
505d4afb5ceSopenharmony_ci		 * if there's no task already waiting in the queue, wait for
506d4afb5ceSopenharmony_ci		 * the wake_idle condition to signal us that might have changed
507d4afb5ceSopenharmony_ci		 */
508d4afb5ceSopenharmony_ci		while (!tp->task_queue_head && !tp->destroying)
509d4afb5ceSopenharmony_ci			pthread_cond_wait(&tp->wake_idle, &tp->lock);
510d4afb5ceSopenharmony_ci
511d4afb5ceSopenharmony_ci		if (tp->destroying) {
512d4afb5ceSopenharmony_ci			lwsl_notice("%s: bailing\n", __func__);
513d4afb5ceSopenharmony_ci			goto doneski;
514d4afb5ceSopenharmony_ci		}
515d4afb5ceSopenharmony_ci
516d4afb5ceSopenharmony_ci		c = &tp->task_queue_head;
517d4afb5ceSopenharmony_ci		c2 = NULL;
518d4afb5ceSopenharmony_ci		task = NULL;
519d4afb5ceSopenharmony_ci		pool->task = NULL;
520d4afb5ceSopenharmony_ci
521d4afb5ceSopenharmony_ci		/* look at the queue tail */
522d4afb5ceSopenharmony_ci		while (*c) {
523d4afb5ceSopenharmony_ci			c2 = c;
524d4afb5ceSopenharmony_ci			c = &(*c)->task_queue_next;
525d4afb5ceSopenharmony_ci		}
526d4afb5ceSopenharmony_ci
527d4afb5ceSopenharmony_ci		/* is there a task at the queue tail? */
528d4afb5ceSopenharmony_ci		if (c2 && *c2) {
529d4afb5ceSopenharmony_ci			pool->task = task = *c2;
530d4afb5ceSopenharmony_ci			task->acquired = pool->acquired = lws_now_usecs();
531d4afb5ceSopenharmony_ci			/* remove it from the queue */
532d4afb5ceSopenharmony_ci			*c2 = task->task_queue_next;
533d4afb5ceSopenharmony_ci			task->task_queue_next = NULL;
534d4afb5ceSopenharmony_ci			tp->queue_depth--;
535d4afb5ceSopenharmony_ci			/* mark it as running */
536d4afb5ceSopenharmony_ci			state_transition(task, LWS_TP_STATUS_RUNNING);
537d4afb5ceSopenharmony_ci		}
538d4afb5ceSopenharmony_ci
539d4afb5ceSopenharmony_ci		/* someone else got it first... wait and try again */
540d4afb5ceSopenharmony_ci		if (!task) {
541d4afb5ceSopenharmony_ci			pthread_mutex_unlock(&tp->lock);  /* ------ tp unlock */
542d4afb5ceSopenharmony_ci			continue;
543d4afb5ceSopenharmony_ci		}
544d4afb5ceSopenharmony_ci
545d4afb5ceSopenharmony_ci		task->wanted_writeable_cb = 0;
546d4afb5ceSopenharmony_ci
547d4afb5ceSopenharmony_ci		/* we have acquired a new task */
548d4afb5ceSopenharmony_ci
549d4afb5ceSopenharmony_ci		__lws_threadpool_task_dump(task, buf, sizeof(buf));
550d4afb5ceSopenharmony_ci
551d4afb5ceSopenharmony_ci		lwsl_thread("%s: %s: worker %d ACQUIRING: %s\n",
552d4afb5ceSopenharmony_ci			    __func__, tp->name, pool->worker_index, buf);
553d4afb5ceSopenharmony_ci		tp->running_tasks++;
554d4afb5ceSopenharmony_ci
555d4afb5ceSopenharmony_ci		pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
556d4afb5ceSopenharmony_ci
557d4afb5ceSopenharmony_ci		/*
558d4afb5ceSopenharmony_ci		 * 1) The task can return with LWS_TP_RETURN_CHECKING_IN to
559d4afb5ceSopenharmony_ci		 * "resurface" periodically, and get called again with
560d4afb5ceSopenharmony_ci		 * cont = 1 immediately to indicate it is picking up where it
561d4afb5ceSopenharmony_ci		 * left off if the task is not being "stopped".
562d4afb5ceSopenharmony_ci		 *
563d4afb5ceSopenharmony_ci		 * This allows long tasks to respond to requests to stop in
564d4afb5ceSopenharmony_ci		 * a clean and opaque way.
565d4afb5ceSopenharmony_ci		 *
566d4afb5ceSopenharmony_ci		 * 2) The task can return with LWS_TP_RETURN_SYNC to register
567d4afb5ceSopenharmony_ci		 * a "callback on writable" request on the service thread and
568d4afb5ceSopenharmony_ci		 * block until it hears back from the WRITABLE handler.
569d4afb5ceSopenharmony_ci		 *
570d4afb5ceSopenharmony_ci		 * This allows the work on the thread to be synchronized to the
571d4afb5ceSopenharmony_ci		 * previous work being dispatched cleanly.
572d4afb5ceSopenharmony_ci		 *
573d4afb5ceSopenharmony_ci		 * 3) The task can return with LWS_TP_RETURN_FINISHED to
574d4afb5ceSopenharmony_ci		 * indicate its work is completed nicely.
575d4afb5ceSopenharmony_ci		 *
576d4afb5ceSopenharmony_ci		 * 4) The task can return with LWS_TP_RETURN_STOPPED to indicate
577d4afb5ceSopenharmony_ci		 * it stopped and cleaned up after incomplete work.
578d4afb5ceSopenharmony_ci		 */
579d4afb5ceSopenharmony_ci
580d4afb5ceSopenharmony_ci		do {
581d4afb5ceSopenharmony_ci			lws_usec_t then;
582d4afb5ceSopenharmony_ci			int n;
583d4afb5ceSopenharmony_ci
584d4afb5ceSopenharmony_ci			if (tp->destroying || !task_to_wsi(task)) {
585d4afb5ceSopenharmony_ci				lwsl_info("%s: stopping on wsi gone\n", __func__);
586d4afb5ceSopenharmony_ci				state_transition(task, LWS_TP_STATUS_STOPPING);
587d4afb5ceSopenharmony_ci			}
588d4afb5ceSopenharmony_ci
589d4afb5ceSopenharmony_ci			then = lws_now_usecs();
590d4afb5ceSopenharmony_ci			n = (int)task->args.task(task->args.user, task->status);
591d4afb5ceSopenharmony_ci			lwsl_debug("   %d, status %d\n", n, task->status);
592d4afb5ceSopenharmony_ci			us_accrue(&task->acc_running, then);
593d4afb5ceSopenharmony_ci			if (n & LWS_TP_RETURN_FLAG_OUTLIVE)
594d4afb5ceSopenharmony_ci				task->outlive = 1;
595d4afb5ceSopenharmony_ci			switch (n & 7) {
596d4afb5ceSopenharmony_ci			case LWS_TP_RETURN_CHECKING_IN:
597d4afb5ceSopenharmony_ci				/* if not destroying the tp, continue */
598d4afb5ceSopenharmony_ci				break;
599d4afb5ceSopenharmony_ci			case LWS_TP_RETURN_SYNC:
600d4afb5ceSopenharmony_ci				if (!task_to_wsi(task)) {
601d4afb5ceSopenharmony_ci					lwsl_debug("%s: task that wants to "
602d4afb5ceSopenharmony_ci						    "outlive lost wsi asked "
603d4afb5ceSopenharmony_ci						    "to sync: bypassed\n",
604d4afb5ceSopenharmony_ci						    __func__);
605d4afb5ceSopenharmony_ci					break;
606d4afb5ceSopenharmony_ci				}
607d4afb5ceSopenharmony_ci				/* block until writable acknowledges */
608d4afb5ceSopenharmony_ci				then = lws_now_usecs();
609d4afb5ceSopenharmony_ci				if (lws_threadpool_worker_sync(pool, task)) {
610d4afb5ceSopenharmony_ci					lwsl_notice("%s: Sync failed\n", __func__);
611d4afb5ceSopenharmony_ci					goto doneski;
612d4afb5ceSopenharmony_ci				}
613d4afb5ceSopenharmony_ci				us_accrue(&task->acc_syncing, then);
614d4afb5ceSopenharmony_ci				break;
615d4afb5ceSopenharmony_ci			case LWS_TP_RETURN_FINISHED:
616d4afb5ceSopenharmony_ci				state_transition(task, LWS_TP_STATUS_FINISHED);
617d4afb5ceSopenharmony_ci				break;
618d4afb5ceSopenharmony_ci			case LWS_TP_RETURN_STOPPED:
619d4afb5ceSopenharmony_ci				state_transition(task, LWS_TP_STATUS_STOPPED);
620d4afb5ceSopenharmony_ci				break;
621d4afb5ceSopenharmony_ci			}
622d4afb5ceSopenharmony_ci		} while (task->status == LWS_TP_STATUS_RUNNING);
623d4afb5ceSopenharmony_ci
624d4afb5ceSopenharmony_ci		pthread_mutex_lock(&tp->lock); /* =================== tp lock */
625d4afb5ceSopenharmony_ci
626d4afb5ceSopenharmony_ci		tp->running_tasks--;
627d4afb5ceSopenharmony_ci
628d4afb5ceSopenharmony_ci		if (pool->task->status == LWS_TP_STATUS_STOPPING)
629d4afb5ceSopenharmony_ci			state_transition(task, LWS_TP_STATUS_STOPPED);
630d4afb5ceSopenharmony_ci
631d4afb5ceSopenharmony_ci		/* move the task to the done queue */
632d4afb5ceSopenharmony_ci
633d4afb5ceSopenharmony_ci		pool->task->task_queue_next = tp->task_done_head;
634d4afb5ceSopenharmony_ci		tp->task_done_head = task;
635d4afb5ceSopenharmony_ci		tp->done_queue_depth++;
636d4afb5ceSopenharmony_ci		pool->task->done = lws_now_usecs();
637d4afb5ceSopenharmony_ci
638d4afb5ceSopenharmony_ci		if (!pool->task->args.wsi &&
639d4afb5ceSopenharmony_ci		    (pool->task->status == LWS_TP_STATUS_STOPPED ||
640d4afb5ceSopenharmony_ci		     pool->task->status == LWS_TP_STATUS_FINISHED)) {
641d4afb5ceSopenharmony_ci
642d4afb5ceSopenharmony_ci			__lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
643d4afb5ceSopenharmony_ci			lwsl_thread("%s: %s: worker %d REAPING: %s\n",
644d4afb5ceSopenharmony_ci				    __func__, tp->name, pool->worker_index,
645d4afb5ceSopenharmony_ci				    buf);
646d4afb5ceSopenharmony_ci
647d4afb5ceSopenharmony_ci			/*
648d4afb5ceSopenharmony_ci			 * there is no longer any wsi attached, so nothing is
649d4afb5ceSopenharmony_ci			 * going to take care of reaping us.  So we must take
650d4afb5ceSopenharmony_ci			 * care of it ourselves.
651d4afb5ceSopenharmony_ci			 */
652d4afb5ceSopenharmony_ci			__lws_threadpool_reap(pool->task);
653d4afb5ceSopenharmony_ci		} else {
654d4afb5ceSopenharmony_ci
655d4afb5ceSopenharmony_ci			__lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
656d4afb5ceSopenharmony_ci			lwsl_thread("%s: %s: worker %d DONE: %s\n",
657d4afb5ceSopenharmony_ci				    __func__, tp->name, pool->worker_index,
658d4afb5ceSopenharmony_ci				    buf);
659d4afb5ceSopenharmony_ci
660d4afb5ceSopenharmony_ci			/* signal the associated wsi to take a fresh look at
661d4afb5ceSopenharmony_ci			 * task status */
662d4afb5ceSopenharmony_ci
663d4afb5ceSopenharmony_ci			if (task_to_wsi(pool->task)) {
664d4afb5ceSopenharmony_ci				task->wanted_writeable_cb = 1;
665d4afb5ceSopenharmony_ci
666d4afb5ceSopenharmony_ci				lws_cancel_service(
667d4afb5ceSopenharmony_ci					lws_get_context(task_to_wsi(pool->task)));
668d4afb5ceSopenharmony_ci			}
669d4afb5ceSopenharmony_ci		}
670d4afb5ceSopenharmony_ci
671d4afb5ceSopenharmony_cidoneski:
672d4afb5ceSopenharmony_ci		pool->task = NULL;
673d4afb5ceSopenharmony_ci		pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
674d4afb5ceSopenharmony_ci	}
675d4afb5ceSopenharmony_ci
676d4afb5ceSopenharmony_ci	lwsl_notice("%s: Exiting\n", __func__);
677d4afb5ceSopenharmony_ci
678d4afb5ceSopenharmony_ci	/* threadpool is being destroyed */
679d4afb5ceSopenharmony_ci#if !defined(WIN32)
680d4afb5ceSopenharmony_ci	pthread_exit(&dummy);
681d4afb5ceSopenharmony_ci#endif
682d4afb5ceSopenharmony_ci
683d4afb5ceSopenharmony_ci	return NULL;
684d4afb5ceSopenharmony_ci}
685d4afb5ceSopenharmony_ci
686d4afb5ceSopenharmony_cistruct lws_threadpool *
687d4afb5ceSopenharmony_cilws_threadpool_create(struct lws_context *context,
688d4afb5ceSopenharmony_ci		      const struct lws_threadpool_create_args *args,
689d4afb5ceSopenharmony_ci		      const char *format, ...)
690d4afb5ceSopenharmony_ci{
691d4afb5ceSopenharmony_ci	struct lws_threadpool *tp;
692d4afb5ceSopenharmony_ci	va_list ap;
693d4afb5ceSopenharmony_ci	int n;
694d4afb5ceSopenharmony_ci
695d4afb5ceSopenharmony_ci	tp = lws_malloc(sizeof(*tp) + (sizeof(struct lws_pool) * (unsigned int)args->threads),
696d4afb5ceSopenharmony_ci			"threadpool alloc");
697d4afb5ceSopenharmony_ci	if (!tp)
698d4afb5ceSopenharmony_ci		return NULL;
699d4afb5ceSopenharmony_ci
700d4afb5ceSopenharmony_ci	memset(tp, 0, sizeof(*tp) + (sizeof(struct lws_pool) * (unsigned int)args->threads));
701d4afb5ceSopenharmony_ci	tp->pool_list = (struct lws_pool *)(tp + 1);
702d4afb5ceSopenharmony_ci	tp->max_queue_depth = args->max_queue_depth;
703d4afb5ceSopenharmony_ci
704d4afb5ceSopenharmony_ci	va_start(ap, format);
705d4afb5ceSopenharmony_ci	n = vsnprintf(tp->name, sizeof(tp->name) - 1, format, ap);
706d4afb5ceSopenharmony_ci	va_end(ap);
707d4afb5ceSopenharmony_ci
708d4afb5ceSopenharmony_ci	lws_context_lock(context, __func__);
709d4afb5ceSopenharmony_ci
710d4afb5ceSopenharmony_ci	tp->context = context;
711d4afb5ceSopenharmony_ci	tp->tp_list = context->tp_list_head;
712d4afb5ceSopenharmony_ci	context->tp_list_head = tp;
713d4afb5ceSopenharmony_ci
714d4afb5ceSopenharmony_ci	lws_context_unlock(context);
715d4afb5ceSopenharmony_ci
716d4afb5ceSopenharmony_ci	pthread_mutex_init(&tp->lock, NULL);
717d4afb5ceSopenharmony_ci	pthread_cond_init(&tp->wake_idle, NULL);
718d4afb5ceSopenharmony_ci
719d4afb5ceSopenharmony_ci	for (n = 0; n < args->threads; n++) {
720d4afb5ceSopenharmony_ci#if defined(LWS_HAS_PTHREAD_SETNAME_NP)
721d4afb5ceSopenharmony_ci		char name[16];
722d4afb5ceSopenharmony_ci#endif
723d4afb5ceSopenharmony_ci		tp->pool_list[n].tp = tp;
724d4afb5ceSopenharmony_ci		tp->pool_list[n].worker_index = n;
725d4afb5ceSopenharmony_ci		pthread_mutex_init(&tp->pool_list[n].lock, NULL);
726d4afb5ceSopenharmony_ci		if (pthread_create(&tp->pool_list[n].thread, NULL,
727d4afb5ceSopenharmony_ci				   lws_threadpool_worker, &tp->pool_list[n])) {
728d4afb5ceSopenharmony_ci			lwsl_err("thread creation failed\n");
729d4afb5ceSopenharmony_ci		} else {
730d4afb5ceSopenharmony_ci#if defined(LWS_HAS_PTHREAD_SETNAME_NP)
731d4afb5ceSopenharmony_ci			lws_snprintf(name, sizeof(name), "%s-%d", tp->name, n);
732d4afb5ceSopenharmony_ci			pthread_setname_np(tp->pool_list[n].thread, name);
733d4afb5ceSopenharmony_ci#endif
734d4afb5ceSopenharmony_ci			tp->threads_in_pool++;
735d4afb5ceSopenharmony_ci		}
736d4afb5ceSopenharmony_ci	}
737d4afb5ceSopenharmony_ci
738d4afb5ceSopenharmony_ci	return tp;
739d4afb5ceSopenharmony_ci}
740d4afb5ceSopenharmony_ci
741d4afb5ceSopenharmony_civoid
742d4afb5ceSopenharmony_cilws_threadpool_finish(struct lws_threadpool *tp)
743d4afb5ceSopenharmony_ci{
744d4afb5ceSopenharmony_ci	struct lws_threadpool_task **c, *task;
745d4afb5ceSopenharmony_ci
746d4afb5ceSopenharmony_ci	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
747d4afb5ceSopenharmony_ci
748d4afb5ceSopenharmony_ci	/* nothing new can start, running jobs will abort as STOPPED and the
749d4afb5ceSopenharmony_ci	 * pool threads will exit ASAP (they are joined in destroy) */
750d4afb5ceSopenharmony_ci	tp->destroying = 1;
751d4afb5ceSopenharmony_ci
752d4afb5ceSopenharmony_ci	/* stop everyone in the pending queue and move to the done queue */
753d4afb5ceSopenharmony_ci
754d4afb5ceSopenharmony_ci	c = &tp->task_queue_head;
755d4afb5ceSopenharmony_ci	while (*c) {
756d4afb5ceSopenharmony_ci		task = *c;
757d4afb5ceSopenharmony_ci		*c = task->task_queue_next;
758d4afb5ceSopenharmony_ci		task->task_queue_next = tp->task_done_head;
759d4afb5ceSopenharmony_ci		tp->task_done_head = task;
760d4afb5ceSopenharmony_ci		state_transition(task, LWS_TP_STATUS_STOPPED);
761d4afb5ceSopenharmony_ci		tp->queue_depth--;
762d4afb5ceSopenharmony_ci		tp->done_queue_depth++;
763d4afb5ceSopenharmony_ci		task->done = lws_now_usecs();
764d4afb5ceSopenharmony_ci
765d4afb5ceSopenharmony_ci		c = &task->task_queue_next;
766d4afb5ceSopenharmony_ci	}
767d4afb5ceSopenharmony_ci
768d4afb5ceSopenharmony_ci	pthread_cond_broadcast(&tp->wake_idle);
769d4afb5ceSopenharmony_ci	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
770d4afb5ceSopenharmony_ci}
771d4afb5ceSopenharmony_ci
772d4afb5ceSopenharmony_civoid
773d4afb5ceSopenharmony_cilws_threadpool_destroy(struct lws_threadpool *tp)
774d4afb5ceSopenharmony_ci{
775d4afb5ceSopenharmony_ci	struct lws_threadpool_task *task, *next;
776d4afb5ceSopenharmony_ci	struct lws_threadpool **ptp;
777d4afb5ceSopenharmony_ci	void *retval;
778d4afb5ceSopenharmony_ci	int n;
779d4afb5ceSopenharmony_ci
780d4afb5ceSopenharmony_ci	/* remove us from the context list of threadpools */
781d4afb5ceSopenharmony_ci
782d4afb5ceSopenharmony_ci	lws_context_lock(tp->context, __func__);
783d4afb5ceSopenharmony_ci	ptp = &tp->context->tp_list_head;
784d4afb5ceSopenharmony_ci
785d4afb5ceSopenharmony_ci	while (*ptp) {
786d4afb5ceSopenharmony_ci		if (*ptp == tp) {
787d4afb5ceSopenharmony_ci			*ptp = tp->tp_list;
788d4afb5ceSopenharmony_ci			break;
789d4afb5ceSopenharmony_ci		}
790d4afb5ceSopenharmony_ci		ptp = &(*ptp)->tp_list;
791d4afb5ceSopenharmony_ci	}
792d4afb5ceSopenharmony_ci
793d4afb5ceSopenharmony_ci	lws_context_unlock(tp->context);
794d4afb5ceSopenharmony_ci
795d4afb5ceSopenharmony_ci	/*
796d4afb5ceSopenharmony_ci	 * Wake up the threadpool guys and tell them to exit
797d4afb5ceSopenharmony_ci	 */
798d4afb5ceSopenharmony_ci
799d4afb5ceSopenharmony_ci	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
800d4afb5ceSopenharmony_ci	tp->destroying = 1;
801d4afb5ceSopenharmony_ci	pthread_cond_broadcast(&tp->wake_idle);
802d4afb5ceSopenharmony_ci	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
803d4afb5ceSopenharmony_ci
804d4afb5ceSopenharmony_ci	lws_threadpool_dump(tp);
805d4afb5ceSopenharmony_ci
806d4afb5ceSopenharmony_ci	lwsl_info("%s: waiting for threads to rejoin\n", __func__);
807d4afb5ceSopenharmony_ci#if defined(WIN32)
808d4afb5ceSopenharmony_ci	Sleep(1000);
809d4afb5ceSopenharmony_ci#endif
810d4afb5ceSopenharmony_ci
811d4afb5ceSopenharmony_ci	for (n = 0; n < tp->threads_in_pool; n++) {
812d4afb5ceSopenharmony_ci		task = tp->pool_list[n].task;
813d4afb5ceSopenharmony_ci
814d4afb5ceSopenharmony_ci		pthread_join(tp->pool_list[n].thread, &retval);
815d4afb5ceSopenharmony_ci		pthread_mutex_destroy(&tp->pool_list[n].lock);
816d4afb5ceSopenharmony_ci	}
817d4afb5ceSopenharmony_ci	lwsl_info("%s: all threadpools exited\n", __func__);
818d4afb5ceSopenharmony_ci#if defined(WIN32)
819d4afb5ceSopenharmony_ci	Sleep(1000);
820d4afb5ceSopenharmony_ci#endif
821d4afb5ceSopenharmony_ci
822d4afb5ceSopenharmony_ci	task = tp->task_done_head;
823d4afb5ceSopenharmony_ci	while (task) {
824d4afb5ceSopenharmony_ci		next = task->task_queue_next;
825d4afb5ceSopenharmony_ci		lws_threadpool_task_cleanup_destroy(task);
826d4afb5ceSopenharmony_ci		tp->done_queue_depth--;
827d4afb5ceSopenharmony_ci		task = next;
828d4afb5ceSopenharmony_ci	}
829d4afb5ceSopenharmony_ci
830d4afb5ceSopenharmony_ci	pthread_mutex_destroy(&tp->lock);
831d4afb5ceSopenharmony_ci
832d4afb5ceSopenharmony_ci	memset(tp, 0xdd, sizeof(*tp));
833d4afb5ceSopenharmony_ci	lws_free(tp);
834d4afb5ceSopenharmony_ci}
835d4afb5ceSopenharmony_ci
836d4afb5ceSopenharmony_ci/*
837d4afb5ceSopenharmony_ci * We want to stop and destroy the tasks and related priv.
838d4afb5ceSopenharmony_ci */
839d4afb5ceSopenharmony_ci
840d4afb5ceSopenharmony_ciint
841d4afb5ceSopenharmony_cilws_threadpool_dequeue_task(struct lws_threadpool_task *task)
842d4afb5ceSopenharmony_ci{
843d4afb5ceSopenharmony_ci	struct lws_threadpool *tp;
844d4afb5ceSopenharmony_ci	struct lws_threadpool_task **c;
845d4afb5ceSopenharmony_ci	int n;
846d4afb5ceSopenharmony_ci
847d4afb5ceSopenharmony_ci	tp = task->tp;
848d4afb5ceSopenharmony_ci	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
849d4afb5ceSopenharmony_ci
850d4afb5ceSopenharmony_ci	if (task->outlive && !tp->destroying) {
851d4afb5ceSopenharmony_ci
852d4afb5ceSopenharmony_ci		/* disconnect from wsi, and wsi from task */
853d4afb5ceSopenharmony_ci
854d4afb5ceSopenharmony_ci		lws_dll2_remove(&task->list);
855d4afb5ceSopenharmony_ci		task->args.wsi = NULL;
856d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS)
857d4afb5ceSopenharmony_ci		task->args.ss = NULL;
858d4afb5ceSopenharmony_ci#endif
859d4afb5ceSopenharmony_ci
860d4afb5ceSopenharmony_ci		goto bail;
861d4afb5ceSopenharmony_ci	}
862d4afb5ceSopenharmony_ci
863d4afb5ceSopenharmony_ci
864d4afb5ceSopenharmony_ci	c = &tp->task_queue_head;
865d4afb5ceSopenharmony_ci
866d4afb5ceSopenharmony_ci	/* is he queued waiting for a chance to run?  Mark him as stopped and
867d4afb5ceSopenharmony_ci	 * move him on to the done queue */
868d4afb5ceSopenharmony_ci
869d4afb5ceSopenharmony_ci	while (*c) {
870d4afb5ceSopenharmony_ci		if ((*c) == task) {
871d4afb5ceSopenharmony_ci			*c = task->task_queue_next;
872d4afb5ceSopenharmony_ci			task->task_queue_next = tp->task_done_head;
873d4afb5ceSopenharmony_ci			tp->task_done_head = task;
874d4afb5ceSopenharmony_ci			state_transition(task, LWS_TP_STATUS_STOPPED);
875d4afb5ceSopenharmony_ci			tp->queue_depth--;
876d4afb5ceSopenharmony_ci			tp->done_queue_depth++;
877d4afb5ceSopenharmony_ci			task->done = lws_now_usecs();
878d4afb5ceSopenharmony_ci
879d4afb5ceSopenharmony_ci			lwsl_debug("%s: tp %p: removed queued task %s\n",
880d4afb5ceSopenharmony_ci				    __func__, tp, lws_wsi_tag(task_to_wsi(task)));
881d4afb5ceSopenharmony_ci
882d4afb5ceSopenharmony_ci			break;
883d4afb5ceSopenharmony_ci		}
884d4afb5ceSopenharmony_ci		c = &(*c)->task_queue_next;
885d4afb5ceSopenharmony_ci	}
886d4afb5ceSopenharmony_ci
887d4afb5ceSopenharmony_ci	/* is he on the done queue? */
888d4afb5ceSopenharmony_ci
889d4afb5ceSopenharmony_ci	c = &tp->task_done_head;
890d4afb5ceSopenharmony_ci	while (*c) {
891d4afb5ceSopenharmony_ci		if ((*c) == task) {
892d4afb5ceSopenharmony_ci			*c = task->task_queue_next;
893d4afb5ceSopenharmony_ci			task->task_queue_next = NULL;
894d4afb5ceSopenharmony_ci			lws_threadpool_task_cleanup_destroy(task);
895d4afb5ceSopenharmony_ci			tp->done_queue_depth--;
896d4afb5ceSopenharmony_ci			goto bail;
897d4afb5ceSopenharmony_ci		}
898d4afb5ceSopenharmony_ci		c = &(*c)->task_queue_next;
899d4afb5ceSopenharmony_ci	}
900d4afb5ceSopenharmony_ci
901d4afb5ceSopenharmony_ci	/* he's not in the queue... is he already running on a thread? */
902d4afb5ceSopenharmony_ci
903d4afb5ceSopenharmony_ci	for (n = 0; n < tp->threads_in_pool; n++) {
904d4afb5ceSopenharmony_ci		if (!tp->pool_list[n].task || tp->pool_list[n].task != task)
905d4afb5ceSopenharmony_ci			continue;
906d4afb5ceSopenharmony_ci
907d4afb5ceSopenharmony_ci		/*
908d4afb5ceSopenharmony_ci		 * ensure we don't collide with tests or changes in the
909d4afb5ceSopenharmony_ci		 * worker thread
910d4afb5ceSopenharmony_ci		 */
911d4afb5ceSopenharmony_ci		pthread_mutex_lock(&tp->pool_list[n].lock);
912d4afb5ceSopenharmony_ci
913d4afb5ceSopenharmony_ci		/*
914d4afb5ceSopenharmony_ci		 * mark him as having been requested to stop...
915d4afb5ceSopenharmony_ci		 * the caller will hear about it in his service thread
916d4afb5ceSopenharmony_ci		 * context as a request to close
917d4afb5ceSopenharmony_ci		 */
918d4afb5ceSopenharmony_ci		state_transition(task, LWS_TP_STATUS_STOPPING);
919d4afb5ceSopenharmony_ci
920d4afb5ceSopenharmony_ci		/* disconnect from wsi, and wsi from task */
921d4afb5ceSopenharmony_ci
922d4afb5ceSopenharmony_ci		lws_dll2_remove(&task->list);
923d4afb5ceSopenharmony_ci		task->args.wsi = NULL;
924d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS)
925d4afb5ceSopenharmony_ci		task->args.ss = NULL;
926d4afb5ceSopenharmony_ci#endif
927d4afb5ceSopenharmony_ci
928d4afb5ceSopenharmony_ci		pthread_mutex_unlock(&tp->pool_list[n].lock);
929d4afb5ceSopenharmony_ci
930d4afb5ceSopenharmony_ci		lwsl_debug("%s: tp %p: request stop running task "
931d4afb5ceSopenharmony_ci			    "for %s\n", __func__, tp,
932d4afb5ceSopenharmony_ci			    lws_wsi_tag(task_to_wsi(task)));
933d4afb5ceSopenharmony_ci
934d4afb5ceSopenharmony_ci		break;
935d4afb5ceSopenharmony_ci	}
936d4afb5ceSopenharmony_ci
937d4afb5ceSopenharmony_ci	if (n == tp->threads_in_pool) {
938d4afb5ceSopenharmony_ci		/* can't find it */
939d4afb5ceSopenharmony_ci		lwsl_notice("%s: tp %p: no task for %s, decoupling\n",
940d4afb5ceSopenharmony_ci			    __func__, tp, lws_wsi_tag(task_to_wsi(task)));
941d4afb5ceSopenharmony_ci		lws_dll2_remove(&task->list);
942d4afb5ceSopenharmony_ci		task->args.wsi = NULL;
943d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS)
944d4afb5ceSopenharmony_ci		task->args.ss = NULL;
945d4afb5ceSopenharmony_ci#endif
946d4afb5ceSopenharmony_ci	}
947d4afb5ceSopenharmony_ci
948d4afb5ceSopenharmony_cibail:
949d4afb5ceSopenharmony_ci	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
950d4afb5ceSopenharmony_ci
951d4afb5ceSopenharmony_ci	return 0;
952d4afb5ceSopenharmony_ci}
953d4afb5ceSopenharmony_ci
954d4afb5ceSopenharmony_ciint
955d4afb5ceSopenharmony_cilws_threadpool_dequeue(struct lws *wsi) /* deprecated */
956d4afb5ceSopenharmony_ci{
957d4afb5ceSopenharmony_ci	struct lws_threadpool_task *task;
958d4afb5ceSopenharmony_ci
959d4afb5ceSopenharmony_ci	if (!wsi->tp_task_owner.count)
960d4afb5ceSopenharmony_ci		return 0;
961d4afb5ceSopenharmony_ci	assert(wsi->tp_task_owner.count != 1);
962d4afb5ceSopenharmony_ci
963d4afb5ceSopenharmony_ci	task = lws_container_of(wsi->tp_task_owner.head,
964d4afb5ceSopenharmony_ci				struct lws_threadpool_task, list);
965d4afb5ceSopenharmony_ci
966d4afb5ceSopenharmony_ci	return lws_threadpool_dequeue_task(task);
967d4afb5ceSopenharmony_ci}
968d4afb5ceSopenharmony_ci
969d4afb5ceSopenharmony_cistruct lws_threadpool_task *
970d4afb5ceSopenharmony_cilws_threadpool_enqueue(struct lws_threadpool *tp,
971d4afb5ceSopenharmony_ci		       const struct lws_threadpool_task_args *args,
972d4afb5ceSopenharmony_ci		       const char *format, ...)
973d4afb5ceSopenharmony_ci{
974d4afb5ceSopenharmony_ci	struct lws_threadpool_task *task = NULL;
975d4afb5ceSopenharmony_ci	va_list ap;
976d4afb5ceSopenharmony_ci
977d4afb5ceSopenharmony_ci	if (tp->destroying)
978d4afb5ceSopenharmony_ci		return NULL;
979d4afb5ceSopenharmony_ci
980d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS)
981d4afb5ceSopenharmony_ci	assert(args->ss || args->wsi);
982d4afb5ceSopenharmony_ci#endif
983d4afb5ceSopenharmony_ci
984d4afb5ceSopenharmony_ci	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
985d4afb5ceSopenharmony_ci
986d4afb5ceSopenharmony_ci	/*
987d4afb5ceSopenharmony_ci	 * if there's room on the queue, the job always goes on the queue
988d4afb5ceSopenharmony_ci	 * first, then any free thread may pick it up after the wake_idle
989d4afb5ceSopenharmony_ci	 */
990d4afb5ceSopenharmony_ci
991d4afb5ceSopenharmony_ci	if (tp->queue_depth == tp->max_queue_depth) {
992d4afb5ceSopenharmony_ci		lwsl_notice("%s: queue reached limit %d\n", __func__,
993d4afb5ceSopenharmony_ci			    tp->max_queue_depth);
994d4afb5ceSopenharmony_ci
995d4afb5ceSopenharmony_ci		goto bail;
996d4afb5ceSopenharmony_ci	}
997d4afb5ceSopenharmony_ci
998d4afb5ceSopenharmony_ci	/*
999d4afb5ceSopenharmony_ci	 * create the task object
1000d4afb5ceSopenharmony_ci	 */
1001d4afb5ceSopenharmony_ci
1002d4afb5ceSopenharmony_ci	task = lws_malloc(sizeof(*task), __func__);
1003d4afb5ceSopenharmony_ci	if (!task)
1004d4afb5ceSopenharmony_ci		goto bail;
1005d4afb5ceSopenharmony_ci
1006d4afb5ceSopenharmony_ci	memset(task, 0, sizeof(*task));
1007d4afb5ceSopenharmony_ci	pthread_cond_init(&task->wake_idle, NULL);
1008d4afb5ceSopenharmony_ci	task->args = *args;
1009d4afb5ceSopenharmony_ci	task->tp = tp;
1010d4afb5ceSopenharmony_ci	task->created = lws_now_usecs();
1011d4afb5ceSopenharmony_ci
1012d4afb5ceSopenharmony_ci	va_start(ap, format);
1013d4afb5ceSopenharmony_ci	vsnprintf(task->name, sizeof(task->name) - 1, format, ap);
1014d4afb5ceSopenharmony_ci	va_end(ap);
1015d4afb5ceSopenharmony_ci
1016d4afb5ceSopenharmony_ci	/*
1017d4afb5ceSopenharmony_ci	 * add him on the tp task queue
1018d4afb5ceSopenharmony_ci	 */
1019d4afb5ceSopenharmony_ci
1020d4afb5ceSopenharmony_ci	task->task_queue_next = tp->task_queue_head;
1021d4afb5ceSopenharmony_ci	state_transition(task, LWS_TP_STATUS_QUEUED);
1022d4afb5ceSopenharmony_ci	tp->task_queue_head = task;
1023d4afb5ceSopenharmony_ci	tp->queue_depth++;
1024d4afb5ceSopenharmony_ci
1025d4afb5ceSopenharmony_ci	/*
1026d4afb5ceSopenharmony_ci	 * mark the wsi itself as depending on this tp (so wsi close for
1027d4afb5ceSopenharmony_ci	 * whatever reason can clean up)
1028d4afb5ceSopenharmony_ci	 */
1029d4afb5ceSopenharmony_ci
1030d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS)
1031d4afb5ceSopenharmony_ci	if (args->ss)
1032d4afb5ceSopenharmony_ci		lws_dll2_add_tail(&task->list, &args->ss->wsi->tp_task_owner);
1033d4afb5ceSopenharmony_ci	else
1034d4afb5ceSopenharmony_ci#endif
1035d4afb5ceSopenharmony_ci		lws_dll2_add_tail(&task->list, &args->wsi->tp_task_owner);
1036d4afb5ceSopenharmony_ci
1037d4afb5ceSopenharmony_ci	lwsl_thread("%s: tp %s: enqueued task %p (%s) for %s, depth %d\n",
1038d4afb5ceSopenharmony_ci		    __func__, tp->name, task, task->name,
1039d4afb5ceSopenharmony_ci		    lws_wsi_tag(task_to_wsi(task)), tp->queue_depth);
1040d4afb5ceSopenharmony_ci
1041d4afb5ceSopenharmony_ci	/* alert any idle thread there's something new on the task list */
1042d4afb5ceSopenharmony_ci
1043d4afb5ceSopenharmony_ci	lws_memory_barrier();
1044d4afb5ceSopenharmony_ci	pthread_cond_signal(&tp->wake_idle);
1045d4afb5ceSopenharmony_ci
1046d4afb5ceSopenharmony_cibail:
1047d4afb5ceSopenharmony_ci	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
1048d4afb5ceSopenharmony_ci
1049d4afb5ceSopenharmony_ci	return task;
1050d4afb5ceSopenharmony_ci}
1051d4afb5ceSopenharmony_ci
1052d4afb5ceSopenharmony_ci/* this should be called from the service thread */
1053d4afb5ceSopenharmony_ci
1054d4afb5ceSopenharmony_cienum lws_threadpool_task_status
1055d4afb5ceSopenharmony_cilws_threadpool_task_status(struct lws_threadpool_task *task, void **user)
1056d4afb5ceSopenharmony_ci{
1057d4afb5ceSopenharmony_ci	enum lws_threadpool_task_status status;
1058d4afb5ceSopenharmony_ci	struct lws_threadpool *tp = task->tp;
1059d4afb5ceSopenharmony_ci
1060d4afb5ceSopenharmony_ci	if (!tp)
1061d4afb5ceSopenharmony_ci		return LWS_TP_STATUS_FINISHED;
1062d4afb5ceSopenharmony_ci
1063d4afb5ceSopenharmony_ci	*user = task->args.user;
1064d4afb5ceSopenharmony_ci	status = task->status;
1065d4afb5ceSopenharmony_ci
1066d4afb5ceSopenharmony_ci	if (status == LWS_TP_STATUS_FINISHED ||
1067d4afb5ceSopenharmony_ci	    status == LWS_TP_STATUS_STOPPED) {
1068d4afb5ceSopenharmony_ci		char buf[160];
1069d4afb5ceSopenharmony_ci
1070d4afb5ceSopenharmony_ci		pthread_mutex_lock(&tp->lock); /* ================ tpool lock */
1071d4afb5ceSopenharmony_ci		__lws_threadpool_task_dump(task, buf, sizeof(buf));
1072d4afb5ceSopenharmony_ci		lwsl_thread("%s: %s: service thread REAPING: %s\n",
1073d4afb5ceSopenharmony_ci			    __func__, tp->name, buf);
1074d4afb5ceSopenharmony_ci		__lws_threadpool_reap(task);
1075d4afb5ceSopenharmony_ci		lws_memory_barrier();
1076d4afb5ceSopenharmony_ci		pthread_mutex_unlock(&tp->lock); /* ------------ tpool unlock */
1077d4afb5ceSopenharmony_ci	}
1078d4afb5ceSopenharmony_ci
1079d4afb5ceSopenharmony_ci	return status;
1080d4afb5ceSopenharmony_ci}
1081d4afb5ceSopenharmony_ci
1082d4afb5ceSopenharmony_cienum lws_threadpool_task_status
1083d4afb5ceSopenharmony_cilws_threadpool_task_status_noreap(struct lws_threadpool_task *task)
1084d4afb5ceSopenharmony_ci{
1085d4afb5ceSopenharmony_ci	return task->status;
1086d4afb5ceSopenharmony_ci}
1087d4afb5ceSopenharmony_ci
1088d4afb5ceSopenharmony_cienum lws_threadpool_task_status
1089d4afb5ceSopenharmony_cilws_threadpool_task_status_wsi(struct lws *wsi,
1090d4afb5ceSopenharmony_ci			       struct lws_threadpool_task **_task, void **user)
1091d4afb5ceSopenharmony_ci{
1092d4afb5ceSopenharmony_ci	struct lws_threadpool_task *task;
1093d4afb5ceSopenharmony_ci
1094d4afb5ceSopenharmony_ci	if (!wsi->tp_task_owner.count) {
1095d4afb5ceSopenharmony_ci		lwsl_notice("%s: wsi has no task, ~=FINISHED\n", __func__);
1096d4afb5ceSopenharmony_ci		return LWS_TP_STATUS_FINISHED;
1097d4afb5ceSopenharmony_ci	}
1098d4afb5ceSopenharmony_ci
1099d4afb5ceSopenharmony_ci	assert(wsi->tp_task_owner.count == 1); /* see deprecation docs in hdr */
1100d4afb5ceSopenharmony_ci
1101d4afb5ceSopenharmony_ci	task = lws_container_of(wsi->tp_task_owner.head,
1102d4afb5ceSopenharmony_ci				struct lws_threadpool_task, list);
1103d4afb5ceSopenharmony_ci
1104d4afb5ceSopenharmony_ci	*_task = task;
1105d4afb5ceSopenharmony_ci
1106d4afb5ceSopenharmony_ci	return lws_threadpool_task_status(task, user);
1107d4afb5ceSopenharmony_ci}
1108d4afb5ceSopenharmony_ci
1109d4afb5ceSopenharmony_civoid
1110d4afb5ceSopenharmony_cilws_threadpool_task_sync(struct lws_threadpool_task *task, int stop)
1111d4afb5ceSopenharmony_ci{
1112d4afb5ceSopenharmony_ci	lwsl_debug("%s\n", __func__);
1113d4afb5ceSopenharmony_ci	if (!task)
1114d4afb5ceSopenharmony_ci		return;
1115d4afb5ceSopenharmony_ci
1116d4afb5ceSopenharmony_ci	if (stop)
1117d4afb5ceSopenharmony_ci		state_transition(task, LWS_TP_STATUS_STOPPING);
1118d4afb5ceSopenharmony_ci
1119d4afb5ceSopenharmony_ci	pthread_mutex_lock(&task->tp->lock);
1120d4afb5ceSopenharmony_ci	pthread_cond_signal(&task->wake_idle);
1121d4afb5ceSopenharmony_ci	pthread_mutex_unlock(&task->tp->lock);
1122d4afb5ceSopenharmony_ci}
1123d4afb5ceSopenharmony_ci
1124d4afb5ceSopenharmony_ciint
1125d4afb5ceSopenharmony_cilws_threadpool_foreach_task_wsi(struct lws *wsi, void *user,
1126d4afb5ceSopenharmony_ci				int (*cb)(struct lws_threadpool_task *task,
1127d4afb5ceSopenharmony_ci					  void *user))
1128d4afb5ceSopenharmony_ci{
1129d4afb5ceSopenharmony_ci	struct lws_threadpool_task *task1;
1130d4afb5ceSopenharmony_ci
1131d4afb5ceSopenharmony_ci	if (wsi->tp_task_owner.head == NULL)
1132d4afb5ceSopenharmony_ci		return 0;
1133d4afb5ceSopenharmony_ci
1134d4afb5ceSopenharmony_ci	task1 = lws_container_of(wsi->tp_task_owner.head,
1135d4afb5ceSopenharmony_ci				 struct lws_threadpool_task, list);
1136d4afb5ceSopenharmony_ci
1137d4afb5ceSopenharmony_ci	pthread_mutex_lock(&task1->tp->lock); /* ================ tpool lock */
1138d4afb5ceSopenharmony_ci
1139d4afb5ceSopenharmony_ci	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
1140d4afb5ceSopenharmony_ci				   wsi->tp_task_owner.head) {
1141d4afb5ceSopenharmony_ci		struct lws_threadpool_task *task = lws_container_of(d,
1142d4afb5ceSopenharmony_ci					struct lws_threadpool_task, list);
1143d4afb5ceSopenharmony_ci
1144d4afb5ceSopenharmony_ci		if (cb(task, user)) {
1145d4afb5ceSopenharmony_ci			pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */
1146d4afb5ceSopenharmony_ci			return 1;
1147d4afb5ceSopenharmony_ci		}
1148d4afb5ceSopenharmony_ci
1149d4afb5ceSopenharmony_ci	} lws_end_foreach_dll_safe(d, d1);
1150d4afb5ceSopenharmony_ci
1151d4afb5ceSopenharmony_ci	pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */
1152d4afb5ceSopenharmony_ci
1153d4afb5ceSopenharmony_ci	return 0;
1154d4afb5ceSopenharmony_ci}
1155d4afb5ceSopenharmony_ci
1156d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS)
1157d4afb5ceSopenharmony_ciint
1158d4afb5ceSopenharmony_cilws_threadpool_foreach_task_ss(struct lws_ss_handle *ss, void *user,
1159d4afb5ceSopenharmony_ci			       int (*cb)(struct lws_threadpool_task *task,
1160d4afb5ceSopenharmony_ci					 void *user))
1161d4afb5ceSopenharmony_ci{
1162d4afb5ceSopenharmony_ci	if (!ss->wsi)
1163d4afb5ceSopenharmony_ci		return 0;
1164d4afb5ceSopenharmony_ci
1165d4afb5ceSopenharmony_ci	return lws_threadpool_foreach_task_wsi(ss->wsi, user, cb);
1166d4afb5ceSopenharmony_ci}
1167d4afb5ceSopenharmony_ci#endif
1168d4afb5ceSopenharmony_ci
1169d4afb5ceSopenharmony_cistatic int
1170d4afb5ceSopenharmony_cidisassociate_wsi(struct lws_threadpool_task *task,
1171d4afb5ceSopenharmony_ci		  void *user)
1172d4afb5ceSopenharmony_ci{
1173d4afb5ceSopenharmony_ci	task->args.wsi = NULL;
1174d4afb5ceSopenharmony_ci	lws_dll2_remove(&task->list);
1175d4afb5ceSopenharmony_ci
1176d4afb5ceSopenharmony_ci	return 0;
1177d4afb5ceSopenharmony_ci}
1178d4afb5ceSopenharmony_ci
1179d4afb5ceSopenharmony_civoid
1180d4afb5ceSopenharmony_cilws_threadpool_wsi_closing(struct lws *wsi)
1181d4afb5ceSopenharmony_ci{
1182d4afb5ceSopenharmony_ci	lws_threadpool_foreach_task_wsi(wsi, NULL, disassociate_wsi);
1183d4afb5ceSopenharmony_ci}
1184d4afb5ceSopenharmony_ci
1185d4afb5ceSopenharmony_cistruct lws_threadpool_task *
1186d4afb5ceSopenharmony_cilws_threadpool_get_task_wsi(struct lws *wsi)
1187d4afb5ceSopenharmony_ci{
1188d4afb5ceSopenharmony_ci	if (wsi->tp_task_owner.head == NULL)
1189d4afb5ceSopenharmony_ci		return NULL;
1190d4afb5ceSopenharmony_ci
1191d4afb5ceSopenharmony_ci	return lws_container_of(wsi->tp_task_owner.head,
1192d4afb5ceSopenharmony_ci				 struct lws_threadpool_task, list);
1193d4afb5ceSopenharmony_ci}
1194d4afb5ceSopenharmony_ci
1195d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SECURE_STREAMS)
1196d4afb5ceSopenharmony_cistruct lws_threadpool_task *
1197d4afb5ceSopenharmony_cilws_threadpool_get_task_ss(struct lws_ss_handle *ss)
1198d4afb5ceSopenharmony_ci{
1199d4afb5ceSopenharmony_ci	return lws_threadpool_get_task_wsi(ss->wsi);
1200d4afb5ceSopenharmony_ci}
1201d4afb5ceSopenharmony_ci#endif
1202