1d4afb5ceSopenharmony_ci/*
2d4afb5ceSopenharmony_ci * ws protocol handler plugin for "lws-minimal" demonstrating lws threadpool
3d4afb5ceSopenharmony_ci *
4d4afb5ceSopenharmony_ci * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5d4afb5ceSopenharmony_ci *
6d4afb5ceSopenharmony_ci * This file is made available under the Creative Commons CC0 1.0
7d4afb5ceSopenharmony_ci * Universal Public Domain Dedication.
8d4afb5ceSopenharmony_ci *
9d4afb5ceSopenharmony_ci * The main reason some things are as they are is that the task lifecycle may
10d4afb5ceSopenharmony_ci * be unrelated to the wsi lifecycle that queued that task.
11d4afb5ceSopenharmony_ci *
12d4afb5ceSopenharmony_ci * Consider the task may call an external library and run for 30s without
13d4afb5ceSopenharmony_ci * "checking in" to see if it should stop.  The wsi that started the task may
14d4afb5ceSopenharmony_ci * have closed at any time before the 30s are up, with the browser window
15d4afb5ceSopenharmony_ci * closing or whatever.
16d4afb5ceSopenharmony_ci *
17d4afb5ceSopenharmony_ci * So data shared between the asynchronous task and the wsi must have its
18d4afb5ceSopenharmony_ci * lifecycle determined by the task, not the wsi.  That means a separate struct
19d4afb5ceSopenharmony_ci * that can be freed by the task.
20d4afb5ceSopenharmony_ci *
21d4afb5ceSopenharmony_ci * In the case the wsi outlives the task, the tasks do not get destroyed until
22d4afb5ceSopenharmony_ci * the service thread has called lws_threadpool_task_status() on the completed
23d4afb5ceSopenharmony_ci * task.  So there is no danger of the shared task private data getting randomly
24d4afb5ceSopenharmony_ci * freed.
25d4afb5ceSopenharmony_ci */
26d4afb5ceSopenharmony_ci
27d4afb5ceSopenharmony_ci#if !defined (LWS_PLUGIN_STATIC)
28d4afb5ceSopenharmony_ci#define LWS_DLL
29d4afb5ceSopenharmony_ci#define LWS_INTERNAL
30d4afb5ceSopenharmony_ci#include <libwebsockets.h>
31d4afb5ceSopenharmony_ci#endif
32d4afb5ceSopenharmony_ci
33d4afb5ceSopenharmony_ci#include <string.h>
34d4afb5ceSopenharmony_ci
35d4afb5ceSopenharmony_cistruct per_vhost_data__minimal {
36d4afb5ceSopenharmony_ci	struct lws_threadpool *tp;
37d4afb5ceSopenharmony_ci	struct lws_context *context;
38d4afb5ceSopenharmony_ci	lws_sorted_usec_list_t sul;
39d4afb5ceSopenharmony_ci	const char *config;
40d4afb5ceSopenharmony_ci};
41d4afb5ceSopenharmony_ci
42d4afb5ceSopenharmony_cistruct task_data {
43d4afb5ceSopenharmony_ci	char result[64];
44d4afb5ceSopenharmony_ci
45d4afb5ceSopenharmony_ci	uint64_t pos, end;
46d4afb5ceSopenharmony_ci};
47d4afb5ceSopenharmony_ci
48d4afb5ceSopenharmony_ci#if defined(WIN32)
49d4afb5ceSopenharmony_cistatic void usleep(unsigned long l) { Sleep(l / 1000); }
50d4afb5ceSopenharmony_ci#endif
51d4afb5ceSopenharmony_ci
52d4afb5ceSopenharmony_ci/*
53d4afb5ceSopenharmony_ci * Create the private data for the task
54d4afb5ceSopenharmony_ci *
55d4afb5ceSopenharmony_ci * Notice we hand over responsibility for the cleanup and freeing of the
56d4afb5ceSopenharmony_ci * allocated task_data to the threadpool, because the wsi it was originally
57d4afb5ceSopenharmony_ci * bound to may close while the thread is still running.  So we allocate
58d4afb5ceSopenharmony_ci * something discrete for the task private data that can be definitively owned
59d4afb5ceSopenharmony_ci * and freed by the threadpool, not the wsi... the pss won't do, as it only
60d4afb5ceSopenharmony_ci * exists for the lifecycle of the wsi connection.
61d4afb5ceSopenharmony_ci *
62d4afb5ceSopenharmony_ci * When the task is created, we also tell it how to destroy the private data
63d4afb5ceSopenharmony_ci * by giving it args.cleanup as cleanup_task_private_data() defined below.
64d4afb5ceSopenharmony_ci */
65d4afb5ceSopenharmony_ci
66d4afb5ceSopenharmony_cistatic struct task_data *
67d4afb5ceSopenharmony_cicreate_task_private_data(void)
68d4afb5ceSopenharmony_ci{
69d4afb5ceSopenharmony_ci	struct task_data *priv = malloc(sizeof(*priv));
70d4afb5ceSopenharmony_ci
71d4afb5ceSopenharmony_ci	return priv;
72d4afb5ceSopenharmony_ci}
73d4afb5ceSopenharmony_ci
74d4afb5ceSopenharmony_ci/*
75d4afb5ceSopenharmony_ci * Destroy the private data for the task
76d4afb5ceSopenharmony_ci *
77d4afb5ceSopenharmony_ci * Notice the wsi the task was originally bound to may be long gone, in the
78d4afb5ceSopenharmony_ci * case we are destroying the lws context and the thread was doing something
79d4afb5ceSopenharmony_ci * for a long time without checking in.
80d4afb5ceSopenharmony_ci */
81d4afb5ceSopenharmony_cistatic void
82d4afb5ceSopenharmony_cicleanup_task_private_data(struct lws *wsi, void *user)
83d4afb5ceSopenharmony_ci{
84d4afb5ceSopenharmony_ci	struct task_data *priv = (struct task_data *)user;
85d4afb5ceSopenharmony_ci
86d4afb5ceSopenharmony_ci	free(priv);
87d4afb5ceSopenharmony_ci}
88d4afb5ceSopenharmony_ci
89d4afb5ceSopenharmony_ci/*
90d4afb5ceSopenharmony_ci * This runs in its own thread, from the threadpool.
91d4afb5ceSopenharmony_ci *
92d4afb5ceSopenharmony_ci * The implementation behind this in lws uses pthreads, but no pthreadisms are
93d4afb5ceSopenharmony_ci * required in the user code.
94d4afb5ceSopenharmony_ci *
95d4afb5ceSopenharmony_ci * The example counts to 10M, "checking in" to see if it should stop after every
96d4afb5ceSopenharmony_ci * 100K and pausing to sync with the service thread to send a ws message every
97d4afb5ceSopenharmony_ci * 1M.  It resumes after the service thread determines the wsi is writable and
98d4afb5ceSopenharmony_ci * the LWS_CALLBACK_SERVER_WRITEABLE indicates the task thread can continue by
99d4afb5ceSopenharmony_ci * calling lws_threadpool_task_sync().
100d4afb5ceSopenharmony_ci */
101d4afb5ceSopenharmony_ci
102d4afb5ceSopenharmony_cistatic enum lws_threadpool_task_return
103d4afb5ceSopenharmony_citask_function(void *user, enum lws_threadpool_task_status s)
104d4afb5ceSopenharmony_ci{
105d4afb5ceSopenharmony_ci	struct task_data *priv = (struct task_data *)user;
106d4afb5ceSopenharmony_ci	int budget = 100 * 1000;
107d4afb5ceSopenharmony_ci
108d4afb5ceSopenharmony_ci	if (priv->pos == priv->end)
109d4afb5ceSopenharmony_ci		return LWS_TP_RETURN_FINISHED;
110d4afb5ceSopenharmony_ci
111d4afb5ceSopenharmony_ci	/*
112d4afb5ceSopenharmony_ci	 * Preferably replace this with ~100ms of your real task, so it
113d4afb5ceSopenharmony_ci	 * can "check in" at short intervals to see if it has been asked to
114d4afb5ceSopenharmony_ci	 * stop.
115d4afb5ceSopenharmony_ci	 *
116d4afb5ceSopenharmony_ci	 * You can just run tasks atomically here with the thread dedicated
117d4afb5ceSopenharmony_ci	 * to it, but it will cause odd delays while shutting down etc and
118d4afb5ceSopenharmony_ci	 * the task will run to completion even if the wsi that started it
119d4afb5ceSopenharmony_ci	 * has since closed.
120d4afb5ceSopenharmony_ci	 */
121d4afb5ceSopenharmony_ci
122d4afb5ceSopenharmony_ci	while (budget--)
123d4afb5ceSopenharmony_ci		priv->pos++;
124d4afb5ceSopenharmony_ci
125d4afb5ceSopenharmony_ci	usleep(100000);
126d4afb5ceSopenharmony_ci
127d4afb5ceSopenharmony_ci	if (!(priv->pos % (1000 * 1000))) {
128d4afb5ceSopenharmony_ci		lws_snprintf(priv->result + LWS_PRE,
129d4afb5ceSopenharmony_ci			     sizeof(priv->result) - LWS_PRE,
130d4afb5ceSopenharmony_ci			     "pos %llu", (unsigned long long)priv->pos);
131d4afb5ceSopenharmony_ci
132d4afb5ceSopenharmony_ci		return LWS_TP_RETURN_SYNC;
133d4afb5ceSopenharmony_ci	}
134d4afb5ceSopenharmony_ci
135d4afb5ceSopenharmony_ci	return LWS_TP_RETURN_CHECKING_IN;
136d4afb5ceSopenharmony_ci}
137d4afb5ceSopenharmony_ci
138d4afb5ceSopenharmony_ci
139d4afb5ceSopenharmony_cistatic void
140d4afb5ceSopenharmony_cisul_tp_dump(struct lws_sorted_usec_list *sul)
141d4afb5ceSopenharmony_ci{
142d4afb5ceSopenharmony_ci	struct per_vhost_data__minimal *vhd =
143d4afb5ceSopenharmony_ci		lws_container_of(sul, struct per_vhost_data__minimal, sul);
144d4afb5ceSopenharmony_ci	/*
145d4afb5ceSopenharmony_ci	 * in debug mode, dump the threadpool stat to the logs once
146d4afb5ceSopenharmony_ci	 * a second
147d4afb5ceSopenharmony_ci	 */
148d4afb5ceSopenharmony_ci	lws_threadpool_dump(vhd->tp);
149d4afb5ceSopenharmony_ci	lws_sul_schedule(vhd->context, 0, &vhd->sul,
150d4afb5ceSopenharmony_ci			 sul_tp_dump, LWS_US_PER_SEC);
151d4afb5ceSopenharmony_ci}
152d4afb5ceSopenharmony_ci
153d4afb5ceSopenharmony_ci
154d4afb5ceSopenharmony_cistatic int
155d4afb5ceSopenharmony_cicallback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
156d4afb5ceSopenharmony_ci			void *user, void *in, size_t len)
157d4afb5ceSopenharmony_ci{
158d4afb5ceSopenharmony_ci	struct per_vhost_data__minimal *vhd =
159d4afb5ceSopenharmony_ci			(struct per_vhost_data__minimal *)
160d4afb5ceSopenharmony_ci			lws_protocol_vh_priv_get(lws_get_vhost(wsi),
161d4afb5ceSopenharmony_ci					lws_get_protocol(wsi));
162d4afb5ceSopenharmony_ci	const struct lws_protocol_vhost_options *pvo;
163d4afb5ceSopenharmony_ci	struct lws_threadpool_create_args cargs;
164d4afb5ceSopenharmony_ci	struct lws_threadpool_task_args args;
165d4afb5ceSopenharmony_ci	struct lws_threadpool_task *task;
166d4afb5ceSopenharmony_ci	struct task_data *priv;
167d4afb5ceSopenharmony_ci	int n, m, r = 0;
168d4afb5ceSopenharmony_ci	char name[32];
169d4afb5ceSopenharmony_ci	void *_user;
170d4afb5ceSopenharmony_ci
171d4afb5ceSopenharmony_ci	switch (reason) {
172d4afb5ceSopenharmony_ci	case LWS_CALLBACK_PROTOCOL_INIT:
173d4afb5ceSopenharmony_ci		/* create our per-vhost struct */
174d4afb5ceSopenharmony_ci		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
175d4afb5ceSopenharmony_ci				lws_get_protocol(wsi),
176d4afb5ceSopenharmony_ci				sizeof(struct per_vhost_data__minimal));
177d4afb5ceSopenharmony_ci		if (!vhd)
178d4afb5ceSopenharmony_ci			return 1;
179d4afb5ceSopenharmony_ci
180d4afb5ceSopenharmony_ci		vhd->context = lws_get_context(wsi);
181d4afb5ceSopenharmony_ci
182d4afb5ceSopenharmony_ci		/* recover the pointer to the globals struct */
183d4afb5ceSopenharmony_ci		pvo = lws_pvo_search(
184d4afb5ceSopenharmony_ci			(const struct lws_protocol_vhost_options *)in,
185d4afb5ceSopenharmony_ci			"config");
186d4afb5ceSopenharmony_ci		if (!pvo || !pvo->value) {
187d4afb5ceSopenharmony_ci			lwsl_err("%s: Can't find \"config\" pvo\n", __func__);
188d4afb5ceSopenharmony_ci			return 1;
189d4afb5ceSopenharmony_ci		}
190d4afb5ceSopenharmony_ci		vhd->config = pvo->value;
191d4afb5ceSopenharmony_ci
192d4afb5ceSopenharmony_ci		memset(&cargs, 0, sizeof(cargs));
193d4afb5ceSopenharmony_ci
194d4afb5ceSopenharmony_ci		cargs.max_queue_depth = 8;
195d4afb5ceSopenharmony_ci		cargs.threads = 3;
196d4afb5ceSopenharmony_ci		vhd->tp = lws_threadpool_create(lws_get_context(wsi),
197d4afb5ceSopenharmony_ci				&cargs, "%s",
198d4afb5ceSopenharmony_ci				lws_get_vhost_name(lws_get_vhost(wsi)));
199d4afb5ceSopenharmony_ci		if (!vhd->tp)
200d4afb5ceSopenharmony_ci			return 1;
201d4afb5ceSopenharmony_ci
202d4afb5ceSopenharmony_ci		lws_sul_schedule(vhd->context, 0, &vhd->sul,
203d4afb5ceSopenharmony_ci				 sul_tp_dump, LWS_US_PER_SEC);
204d4afb5ceSopenharmony_ci		break;
205d4afb5ceSopenharmony_ci
206d4afb5ceSopenharmony_ci	case LWS_CALLBACK_PROTOCOL_DESTROY:
207d4afb5ceSopenharmony_ci		lws_threadpool_finish(vhd->tp);
208d4afb5ceSopenharmony_ci		lws_threadpool_destroy(vhd->tp);
209d4afb5ceSopenharmony_ci		lws_sul_cancel(&vhd->sul);
210d4afb5ceSopenharmony_ci		break;
211d4afb5ceSopenharmony_ci
212d4afb5ceSopenharmony_ci	case LWS_CALLBACK_ESTABLISHED:
213d4afb5ceSopenharmony_ci
214d4afb5ceSopenharmony_ci		memset(&args, 0, sizeof(args));
215d4afb5ceSopenharmony_ci		priv = args.user = create_task_private_data();
216d4afb5ceSopenharmony_ci		if (!args.user)
217d4afb5ceSopenharmony_ci			return 1;
218d4afb5ceSopenharmony_ci
219d4afb5ceSopenharmony_ci		priv->pos = 0;
220d4afb5ceSopenharmony_ci		priv->end = 10 * 1000 * 1000;
221d4afb5ceSopenharmony_ci
222d4afb5ceSopenharmony_ci		/* queue the task... the task takes on responsibility for
223d4afb5ceSopenharmony_ci		 * destroying args.user.  pss->priv just has a copy of it */
224d4afb5ceSopenharmony_ci
225d4afb5ceSopenharmony_ci		args.wsi = wsi;
226d4afb5ceSopenharmony_ci		args.task = task_function;
227d4afb5ceSopenharmony_ci		args.cleanup = cleanup_task_private_data;
228d4afb5ceSopenharmony_ci
229d4afb5ceSopenharmony_ci		lws_get_peer_simple(wsi, name, sizeof(name));
230d4afb5ceSopenharmony_ci
231d4afb5ceSopenharmony_ci		if (!lws_threadpool_enqueue(vhd->tp, &args, "ws %s", name)) {
232d4afb5ceSopenharmony_ci			lwsl_user("%s: Couldn't enqueue task\n", __func__);
233d4afb5ceSopenharmony_ci			cleanup_task_private_data(wsi, priv);
234d4afb5ceSopenharmony_ci			return 1;
235d4afb5ceSopenharmony_ci		}
236d4afb5ceSopenharmony_ci
237d4afb5ceSopenharmony_ci		lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL, 30);
238d4afb5ceSopenharmony_ci
239d4afb5ceSopenharmony_ci		/*
240d4afb5ceSopenharmony_ci		 * so the asynchronous worker will let us know the next step
241d4afb5ceSopenharmony_ci		 * by causing LWS_CALLBACK_SERVER_WRITEABLE
242d4afb5ceSopenharmony_ci		 */
243d4afb5ceSopenharmony_ci
244d4afb5ceSopenharmony_ci		break;
245d4afb5ceSopenharmony_ci
246d4afb5ceSopenharmony_ci	case LWS_CALLBACK_CLOSED:
247d4afb5ceSopenharmony_ci		break;
248d4afb5ceSopenharmony_ci
249d4afb5ceSopenharmony_ci	case LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL:
250d4afb5ceSopenharmony_ci		lwsl_debug("LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL: %p\n", wsi);
251d4afb5ceSopenharmony_ci		lws_threadpool_dequeue_task(lws_threadpool_get_task_wsi(wsi));
252d4afb5ceSopenharmony_ci		break;
253d4afb5ceSopenharmony_ci
254d4afb5ceSopenharmony_ci	case LWS_CALLBACK_SERVER_WRITEABLE:
255d4afb5ceSopenharmony_ci
256d4afb5ceSopenharmony_ci		/*
257d4afb5ceSopenharmony_ci		 * even completed tasks wait in a queue until we call the
258d4afb5ceSopenharmony_ci		 * below on them.  Then they may destroy themselves and their
259d4afb5ceSopenharmony_ci		 * args.user data (by calling the cleanup callback).
260d4afb5ceSopenharmony_ci		 *
261d4afb5ceSopenharmony_ci		 * If you need to get things from the still-valid private task
262d4afb5ceSopenharmony_ci		 * data, copy it here before calling
263d4afb5ceSopenharmony_ci		 * lws_threadpool_task_status() that may free the task and the
264d4afb5ceSopenharmony_ci		 * private task data.
265d4afb5ceSopenharmony_ci		 */
266d4afb5ceSopenharmony_ci
267d4afb5ceSopenharmony_ci		task = lws_threadpool_get_task_wsi(wsi);
268d4afb5ceSopenharmony_ci		if (!task)
269d4afb5ceSopenharmony_ci			break;
270d4afb5ceSopenharmony_ci		n = (int)lws_threadpool_task_status(task, &_user);
271d4afb5ceSopenharmony_ci		lwsl_debug("%s: LWS_CALLBACK_SERVER_WRITEABLE: status %d\n",
272d4afb5ceSopenharmony_ci			   __func__, n);
273d4afb5ceSopenharmony_ci		switch(n) {
274d4afb5ceSopenharmony_ci
275d4afb5ceSopenharmony_ci		case LWS_TP_STATUS_FINISHED:
276d4afb5ceSopenharmony_ci		case LWS_TP_STATUS_STOPPED:
277d4afb5ceSopenharmony_ci		case LWS_TP_STATUS_QUEUED:
278d4afb5ceSopenharmony_ci		case LWS_TP_STATUS_RUNNING:
279d4afb5ceSopenharmony_ci		case LWS_TP_STATUS_STOPPING:
280d4afb5ceSopenharmony_ci			return 0;
281d4afb5ceSopenharmony_ci
282d4afb5ceSopenharmony_ci		case LWS_TP_STATUS_SYNCING:
283d4afb5ceSopenharmony_ci			/* the task has paused for us to do something */
284d4afb5ceSopenharmony_ci			break;
285d4afb5ceSopenharmony_ci		default:
286d4afb5ceSopenharmony_ci			return -1;
287d4afb5ceSopenharmony_ci		}
288d4afb5ceSopenharmony_ci
289d4afb5ceSopenharmony_ci		priv = (struct task_data *)_user;
290d4afb5ceSopenharmony_ci
291d4afb5ceSopenharmony_ci		lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL_TASK, 5);
292d4afb5ceSopenharmony_ci
293d4afb5ceSopenharmony_ci		n = (int)strlen(priv->result + LWS_PRE);
294d4afb5ceSopenharmony_ci		m = lws_write(wsi, (unsigned char *)priv->result + LWS_PRE,
295d4afb5ceSopenharmony_ci			      (unsigned int)n, LWS_WRITE_TEXT);
296d4afb5ceSopenharmony_ci		if (m < n) {
297d4afb5ceSopenharmony_ci			lwsl_err("ERROR %d writing to ws socket\n", m);
298d4afb5ceSopenharmony_ci			lws_threadpool_task_sync(task, 1);
299d4afb5ceSopenharmony_ci			return -1;
300d4afb5ceSopenharmony_ci		}
301d4afb5ceSopenharmony_ci
302d4afb5ceSopenharmony_ci		/*
303d4afb5ceSopenharmony_ci		 * service thread has done whatever it wanted to do with the
304d4afb5ceSopenharmony_ci		 * data the task produced: if it's waiting to do more it can
305d4afb5ceSopenharmony_ci		 * continue now.
306d4afb5ceSopenharmony_ci		 */
307d4afb5ceSopenharmony_ci		lws_threadpool_task_sync(task, 0);
308d4afb5ceSopenharmony_ci		break;
309d4afb5ceSopenharmony_ci
310d4afb5ceSopenharmony_ci	default:
311d4afb5ceSopenharmony_ci		break;
312d4afb5ceSopenharmony_ci	}
313d4afb5ceSopenharmony_ci
314d4afb5ceSopenharmony_ci	return r;
315d4afb5ceSopenharmony_ci}
316d4afb5ceSopenharmony_ci
317d4afb5ceSopenharmony_ci#define LWS_PLUGIN_PROTOCOL_MINIMAL \
318d4afb5ceSopenharmony_ci	{ \
319d4afb5ceSopenharmony_ci		"lws-minimal", \
320d4afb5ceSopenharmony_ci		callback_minimal, \
321d4afb5ceSopenharmony_ci		0, \
322d4afb5ceSopenharmony_ci		128, \
323d4afb5ceSopenharmony_ci		0, NULL, 0 \
324d4afb5ceSopenharmony_ci	}
325