Lines Matching defs:task
74 pthread_mutex_t lock; /* part of task wake_idle */
75 struct lws_threadpool_task *task;
125 __lws_threadpool_task_dump(struct lws_threadpool_task *task, char *buf, int len)
131 if (!task->acquired) {
133 "task: %s, QUEUED queued: %dms",
134 task->name, ms_delta(now, task->created));
139 if (task->acc_running)
140 runms = (int)task->acc_running;
142 if (task->acc_syncing)
143 syncms = (int)task->acc_syncing;
145 if (!task->done) {
147 "task: %s, ONGOING state %d (%dms) alive: %dms "
149 "run: %d%%, sync: %d%%)", task->name, task->status,
150 ms_delta(now, task->entered_state),
151 ms_delta(now, task->created),
152 ms_delta(task->acquired, task->created),
153 ms_delta(now, task->acquired),
154 pc_delta(now, task->acquired, runms),
155 pc_delta(now, task->acquired, syncms));
161 "task: %s, DONE state %d lived: %dms "
163 "ran: %d%%, synced: %d%%)", task->name, task->status,
164 ms_delta(task->done, task->created),
165 ms_delta(task->acquired, task->created),
166 ms_delta(task->done, task->acquired),
167 pc_delta(task->done, task->acquired, runms),
168 pc_delta(task->done, task->acquired, syncms));
189 struct lws_threadpool_task *task = *c;
190 __lws_threadpool_task_dump(task, buf, sizeof(buf));
204 struct lws_threadpool_task *task = pool->task;
206 if (task) {
207 __lws_threadpool_task_dump(task, buf, sizeof(buf));
220 struct lws_threadpool_task *task = *c;
221 __lws_threadpool_task_dump(task, buf, sizeof(buf));
237 state_transition(struct lws_threadpool_task *task,
240 task->entered_state = lws_now_usecs();
241 task->status = status;
245 task_to_wsi(struct lws_threadpool_task *task)
248 if (task->args.ss)
249 return task->args.ss->wsi;
251 return task->args.wsi;
255 lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task)
257 if (task->args.cleanup)
258 task->args.cleanup(task_to_wsi(task), task->args.user);
260 lws_dll2_remove(&task->list);
262 lwsl_thread("%s: tp %p: cleaned finished task for %s\n",
263 __func__, task->tp, lws_wsi_tag(task_to_wsi(task)));
265 lws_free(task);
269 __lws_threadpool_reap(struct lws_threadpool_task *task)
272 struct lws_threadpool *tp = task->tp;
274 /* remove the task from the done queue */
280 if ((*c) == task) {
286 lwsl_thread("%s: tp %s: reaped task %s\n", __func__,
287 tp->name, lws_wsi_tag(task_to_wsi(task)));
295 lwsl_err("%s: task %p not in done queue\n", __func__, task);
298 * safe to assume there's a task to destroy
303 lwsl_err("%s: task->tp NULL already\n", __func__);
305 /* call the task's cleanup and delete the task itself */
307 lws_threadpool_task_cleanup_destroy(task);
319 struct lws_threadpool_task **c, *task = NULL;
334 task = pool->task;
335 if (!task)
338 wsi = task_to_wsi(task);
340 (!task->wanted_writeable_cb &&
341 task->status != LWS_TP_STATUS_SYNCING))
344 task->wanted_writeable_cb = 0;
361 task = *c;
362 wsi = task_to_wsi(task);
365 (task->wanted_writeable_cb ||
366 task->status == LWS_TP_STATUS_SYNCING)) {
368 task->wanted_writeable_cb = 0;
380 c = &task->task_queue_next;
393 struct lws_threadpool_task *task)
401 lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC in\n", __func__, task);
404 lwsl_info("%s: %s: task %p (%s): syncing with %s\n", __func__,
405 pool->tp->name, task, task->name, lws_wsi_tag(task_to_wsi(task)));
407 temp = task->status;
408 state_transition(task, LWS_TP_STATUS_SYNCING);
410 wsi = task_to_wsi(task);
413 * if the wsi is no longer attached to this task, there is
416 * the task it can't continue usefully by stopping it.
420 lwsl_thread("%s: %s: task %p (%s): No longer bound to any "
422 task, task->name);
424 state_transition(task, LWS_TP_STATUS_STOPPING);
433 * If it is exceeded, we will stop the task.
438 task->wanted_writeable_cb = 1;
453 * times out and we stop the task.
456 if (pthread_cond_timedwait(&task->wake_idle, &pool->lock,
458 task->late_sync_retries++;
460 lwsl_err("%s: %s: task %p (%s): SYNC timed out "
462 __func__, pool->tp->name, task,
463 task->name, lws_wsi_tag(task_to_wsi(task)));
466 lws_threadpool_dequeue_task(task);
467 return 1; /* destroyed task */
475 if (task->status == LWS_TP_STATUS_SYNCING)
476 state_transition(task, temp);
478 lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC out\n", __func__, task);
493 struct lws_threadpool_task **c, **c2, *task;
500 /* we have no running task... wait and get one from the queue */
505 * if there's no task already waiting in the queue, wait for
518 task = NULL;
519 pool->task = NULL;
527 /* is there a task at the queue tail? */
529 pool->task = task = *c2;
530 task->acquired = pool->acquired = lws_now_usecs();
532 *c2 = task->task_queue_next;
533 task->task_queue_next = NULL;
536 state_transition(task, LWS_TP_STATUS_RUNNING);
540 if (!task) {
545 task->wanted_writeable_cb = 0;
547 /* we have acquired a new task */
549 __lws_threadpool_task_dump(task, buf, sizeof(buf));
558 * 1) The task can return with LWS_TP_RETURN_CHECKING_IN to
561 * left off if the task is not being "stopped".
566 * 2) The task can return with LWS_TP_RETURN_SYNC to register
573 * 3) The task can return with LWS_TP_RETURN_FINISHED to
576 * 4) The task can return with LWS_TP_RETURN_STOPPED to indicate
584 if (tp->destroying || !task_to_wsi(task)) {
586 state_transition(task, LWS_TP_STATUS_STOPPING);
590 n = (int)task->args.task(task->args.user, task->status);
591 lwsl_debug(" %d, status %d\n", n, task->status);
592 us_accrue(&task->acc_running, then);
594 task->outlive = 1;
600 if (!task_to_wsi(task)) {
601 lwsl_debug("%s: task that wants to "
609 if (lws_threadpool_worker_sync(pool, task)) {
613 us_accrue(&task->acc_syncing, then);
616 state_transition(task, LWS_TP_STATUS_FINISHED);
619 state_transition(task, LWS_TP_STATUS_STOPPED);
622 } while (task->status == LWS_TP_STATUS_RUNNING);
628 if (pool->task->status == LWS_TP_STATUS_STOPPING)
629 state_transition(task, LWS_TP_STATUS_STOPPED);
631 /* move the task to the done queue */
633 pool->task->task_queue_next = tp->task_done_head;
634 tp->task_done_head = task;
636 pool->task->done = lws_now_usecs();
638 if (!pool->task->args.wsi &&
639 (pool->task->status == LWS_TP_STATUS_STOPPED ||
640 pool->task->status == LWS_TP_STATUS_FINISHED)) {
642 __lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
652 __lws_threadpool_reap(pool->task);
655 __lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
661 * task status */
663 if (task_to_wsi(pool->task)) {
664 task->wanted_writeable_cb = 1;
667 lws_get_context(task_to_wsi(pool->task)));
672 pool->task = NULL;
744 struct lws_threadpool_task **c, *task;
756 task = *c;
757 *c = task->task_queue_next;
758 task->task_queue_next = tp->task_done_head;
759 tp->task_done_head = task;
760 state_transition(task, LWS_TP_STATUS_STOPPED);
763 task->done = lws_now_usecs();
765 c = &task->task_queue_next;
775 struct lws_threadpool_task *task, *next;
812 task = tp->pool_list[n].task;
822 task = tp->task_done_head;
823 while (task) {
824 next = task->task_queue_next;
825 lws_threadpool_task_cleanup_destroy(task);
827 task = next;
841 lws_threadpool_dequeue_task(struct lws_threadpool_task *task)
847 tp = task->tp;
850 if (task->outlive && !tp->destroying) {
852 /* disconnect from wsi, and wsi from task */
854 lws_dll2_remove(&task->list);
855 task->args.wsi = NULL;
857 task->args.ss = NULL;
870 if ((*c) == task) {
871 *c = task->task_queue_next;
872 task->task_queue_next = tp->task_done_head;
873 tp->task_done_head = task;
874 state_transition(task, LWS_TP_STATUS_STOPPED);
877 task->done = lws_now_usecs();
879 lwsl_debug("%s: tp %p: removed queued task %s\n",
880 __func__, tp, lws_wsi_tag(task_to_wsi(task)));
891 if ((*c) == task) {
892 *c = task->task_queue_next;
893 task->task_queue_next = NULL;
894 lws_threadpool_task_cleanup_destroy(task);
904 if (!tp->pool_list[n].task || tp->pool_list[n].task != task)
918 state_transition(task, LWS_TP_STATUS_STOPPING);
920 /* disconnect from wsi, and wsi from task */
922 lws_dll2_remove(&task->list);
923 task->args.wsi = NULL;
925 task->args.ss = NULL;
930 lwsl_debug("%s: tp %p: request stop running task "
932 lws_wsi_tag(task_to_wsi(task)));
939 lwsl_notice("%s: tp %p: no task for %s, decoupling\n",
940 __func__, tp, lws_wsi_tag(task_to_wsi(task)));
941 lws_dll2_remove(&task->list);
942 task->args.wsi = NULL;
944 task->args.ss = NULL;
957 struct lws_threadpool_task *task;
963 task = lws_container_of(wsi->tp_task_owner.head,
966 return lws_threadpool_dequeue_task(task);
974 struct lws_threadpool_task *task = NULL;
999 * create the task object
1002 task = lws_malloc(sizeof(*task), __func__);
1003 if (!task)
1006 memset(task, 0, sizeof(*task));
1007 pthread_cond_init(&task->wake_idle, NULL);
1008 task->args = *args;
1009 task->tp = tp;
1010 task->created = lws_now_usecs();
1013 vsnprintf(task->name, sizeof(task->name) - 1, format, ap);
1017 * add him on the tp task queue
1020 task->task_queue_next = tp->task_queue_head;
1021 state_transition(task, LWS_TP_STATUS_QUEUED);
1022 tp->task_queue_head = task;
1032 lws_dll2_add_tail(&task->list, &args->ss->wsi->tp_task_owner);
1035 lws_dll2_add_tail(&task->list, &args->wsi->tp_task_owner);
1037 lwsl_thread("%s: tp %s: enqueued task %p (%s) for %s, depth %d\n",
1038 __func__, tp->name, task, task->name,
1039 lws_wsi_tag(task_to_wsi(task)), tp->queue_depth);
1041 /* alert any idle thread there's something new on the task list */
1049 return task;
1055 lws_threadpool_task_status(struct lws_threadpool_task *task, void **user)
1058 struct lws_threadpool *tp = task->tp;
1063 *user = task->args.user;
1064 status = task->status;
1071 __lws_threadpool_task_dump(task, buf, sizeof(buf));
1074 __lws_threadpool_reap(task);
1083 lws_threadpool_task_status_noreap(struct lws_threadpool_task *task)
1085 return task->status;
1092 struct lws_threadpool_task *task;
1095 lwsl_notice("%s: wsi has no task, ~=FINISHED\n", __func__);
1101 task = lws_container_of(wsi->tp_task_owner.head,
1104 *_task = task;
1106 return lws_threadpool_task_status(task, user);
1110 lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop)
1113 if (!task)
1117 state_transition(task, LWS_TP_STATUS_STOPPING);
1119 pthread_mutex_lock(&task->tp->lock);
1120 pthread_cond_signal(&task->wake_idle);
1121 pthread_mutex_unlock(&task->tp->lock);
1126 int (*cb)(struct lws_threadpool_task *task,
1141 struct lws_threadpool_task *task = lws_container_of(d,
1144 if (cb(task, user)) {
1159 int (*cb)(struct lws_threadpool_task *task,
1170 disassociate_wsi(struct lws_threadpool_task *task,
1173 task->args.wsi = NULL;
1174 lws_dll2_remove(&task->list);