Lines Matching refs:worker

3  * Basic worker thread pool for io_uring
30 IO_WORKER_F_FREE = 4, /* worker on free list */
137 static void io_wq_dec_running(struct io_worker *worker);
144 static bool io_worker_get(struct io_worker *worker)
146 return refcount_inc_not_zero(&worker->ref);
149 static void io_worker_release(struct io_worker *worker)
151 if (refcount_dec_and_test(&worker->ref))
152 complete(&worker->ref_done);
166 static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker)
168 return io_get_acct(worker->wq, worker->flags & IO_WORKER_F_BOUND);
179 struct io_worker *worker = current->worker_private;
184 return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state);
187 static void io_worker_cancel_cb(struct io_worker *worker)
189 struct io_wq_acct *acct = io_wq_get_acct(worker);
190 struct io_wq *wq = worker->wq;
197 clear_bit_unlock(0, &worker->create_state);
198 io_worker_release(worker);
203 struct io_worker *worker;
207 worker = container_of(cb, struct io_worker, create_work);
208 return worker == data;
211 static void io_worker_exit(struct io_worker *worker)
213 struct io_wq *wq = worker->wq;
217 io_task_worker_match, worker);
221 io_worker_cancel_cb(worker);
224 io_worker_release(worker);
225 wait_for_completion(&worker->ref_done);
228 if (worker->flags & IO_WORKER_F_FREE)
229 hlist_nulls_del_rcu(&worker->nulls_node);
230 list_del_rcu(&worker->all_list);
232 io_wq_dec_running(worker);
234 * this worker is a goner, clear ->worker_private to avoid any
236 * touching 'worker'.
240 kfree_rcu(worker, rcu);
267 * Check head of free list for an available worker. If one isn't available,
275 struct io_worker *worker;
278 * Iterate free_list and see if we can find an idle worker to
279 * activate. If a given worker is on the free_list but in the process
282 hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) {
283 if (!io_worker_get(worker))
285 if (io_wq_get_acct(worker) != acct) {
286 io_worker_release(worker);
290 * If the worker is already running, it's either already
294 wake_up_process(worker->task);
295 io_worker_release(worker);
303 * We need a worker. If we find a free one, we're good. If not, and we're
327 static void io_wq_inc_running(struct io_worker *worker)
329 struct io_wq_acct *acct = io_wq_get_acct(worker);
336 struct io_worker *worker;
342 worker = container_of(cb, struct io_worker, create_work);
343 wq = worker->wq;
344 acct = &wq->acct[worker->create_index];
353 create_io_worker(wq, worker->create_index);
358 clear_bit_unlock(0, &worker->create_state);
359 io_worker_release(worker);
362 static bool io_queue_worker_create(struct io_worker *worker,
366 struct io_wq *wq = worker->wq;
371 if (!io_worker_get(worker))
375 * only need one entry per worker, as the worker going to sleep
379 if (test_bit(0, &worker->create_state) ||
380 test_and_set_bit_lock(0, &worker->create_state))
384 init_task_work(&worker->create_work, func);
385 worker->create_index = acct->index;
386 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
399 clear_bit_unlock(0, &worker->create_state);
401 io_worker_release(worker);
408 static void io_wq_dec_running(struct io_worker *worker)
410 struct io_wq_acct *acct = io_wq_get_acct(worker);
411 struct io_wq *wq = worker->wq;
413 if (!(worker->flags & IO_WORKER_F_UP))
424 io_queue_worker_create(worker, acct, create_worker_cb);
431 static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker)
433 if (worker->flags & IO_WORKER_F_FREE) {
434 worker->flags &= ~IO_WORKER_F_FREE;
436 hlist_nulls_del_init_rcu(&worker->nulls_node);
442 * No work, worker going to sleep. Move to freelist.
444 static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker)
447 if (!(worker->flags & IO_WORKER_F_FREE)) {
448 worker->flags |= IO_WORKER_F_FREE;
449 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
476 struct io_worker *worker)
482 struct io_wq *wq = worker->wq;
532 static void io_assign_current_work(struct io_worker *worker,
540 raw_spin_lock(&worker->lock);
541 worker->cur_work = work;
542 worker->next_work = NULL;
543 raw_spin_unlock(&worker->lock);
550 struct io_worker *worker)
553 struct io_wq *wq = worker->wq;
566 work = io_get_next_work(acct, worker);
569 __io_worker_busy(wq, worker);
576 * current work item for this worker.
578 raw_spin_lock(&worker->lock);
579 worker->next_work = work;
580 raw_spin_unlock(&worker->lock);
584 io_assign_current_work(worker, work);
597 io_assign_current_work(worker, NULL);
605 io_assign_current_work(worker, work);
628 struct io_worker *worker = data;
629 struct io_wq_acct *acct = io_wq_get_acct(worker);
630 struct io_wq *wq = worker->wq;
634 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
649 io_worker_handle_work(acct, worker);
653 * Last sleep timed out. Exit if we're not the last worker,
663 __io_worker_idle(wq, worker);
683 io_worker_handle_work(acct, worker);
685 io_worker_exit(worker);
690 * Called when a worker is scheduled in. Mark us as currently running.
694 struct io_worker *worker = tsk->worker_private;
696 if (!worker)
698 if (!(worker->flags & IO_WORKER_F_UP))
700 if (worker->flags & IO_WORKER_F_RUNNING)
702 worker->flags |= IO_WORKER_F_RUNNING;
703 io_wq_inc_running(worker);
707 * Called when worker is going to sleep. If there are no workers currently
712 struct io_worker *worker = tsk->worker_private;
714 if (!worker)
716 if (!(worker->flags & IO_WORKER_F_UP))
718 if (!(worker->flags & IO_WORKER_F_RUNNING))
721 worker->flags &= ~IO_WORKER_F_RUNNING;
722 io_wq_dec_running(worker);
725 static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker,
728 tsk->worker_private = worker;
729 worker->task = tsk;
733 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
734 list_add_tail_rcu(&worker->all_list, &wq->all_list);
735 worker->flags |= IO_WORKER_F_FREE;
767 struct io_worker *worker;
771 worker = container_of(cb, struct io_worker, create_work);
772 clear_bit_unlock(0, &worker->create_state);
773 wq = worker->wq;
774 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
776 io_init_new_worker(wq, worker, tsk);
777 io_worker_release(worker);
780 struct io_wq_acct *acct = io_wq_get_acct(worker);
798 kfree(worker);
802 /* re-create attempts grab a new worker ref, drop the existing one */
803 io_worker_release(worker);
804 schedule_work(&worker->work);
809 struct io_worker *worker = container_of(work, struct io_worker, work);
810 struct io_wq_acct *acct = io_wq_get_acct(worker);
812 if (!io_queue_worker_create(worker, acct, create_worker_cont))
813 kfree(worker);
819 struct io_worker *worker;
824 worker = kzalloc(sizeof(*worker), GFP_KERNEL);
825 if (!worker) {
835 refcount_set(&worker->ref, 1);
836 worker->wq = wq;
837 raw_spin_lock_init(&worker->lock);
838 init_completion(&worker->ref_done);
841 worker->flags |= IO_WORKER_F_BOUND;
843 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
845 io_init_new_worker(wq, worker, tsk);
847 kfree(worker);
850 INIT_WORK(&worker->work, io_workqueue_create);
851 schedule_work(&worker->work);
859 * worker that isn't exiting
865 struct io_worker *worker;
868 list_for_each_entry_rcu(worker, &wq->all_list, all_list) {
869 if (io_worker_get(worker)) {
871 if (worker->task)
872 ret = func(worker, data);
873 io_worker_release(worker);
882 static bool io_wq_worker_wake(struct io_worker *worker, void *data)
884 __set_notify_signal(worker->task);
885 wake_up_process(worker->task);
965 /* fatal condition, failed to create the first worker */
986 static bool __io_wq_worker_cancel(struct io_worker *worker,
992 __set_notify_signal(worker->task);
999 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
1007 raw_spin_lock(&worker->lock);
1008 if (__io_wq_worker_cancel(worker, match, worker->cur_work) ||
1009 __io_wq_worker_cancel(worker, match, worker->next_work))
1011 raw_spin_unlock(&worker->lock);
1097 * Then check if a free (going busy) or busy worker has the work
1200 struct io_worker *worker;
1204 worker = container_of(cb, struct io_worker, create_work);
1205 return worker->wq == data;
1218 struct io_worker *worker;
1220 worker = container_of(cb, struct io_worker, create_work);
1221 io_worker_cancel_cb(worker);
1223 * Only the worker continuation helper has worker allocated and
1227 kfree(worker);
1279 static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
1284 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask);
1286 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask);