Lines Matching refs:worker
3 * Basic worker thread pool for io_uring
26 IO_WORKER_F_FREE = 4, /* worker on free list */
88 * Per-node worker thread pool
139 static void io_wqe_dec_running(struct io_worker *worker);
146 static bool io_worker_get(struct io_worker *worker)
148 return refcount_inc_not_zero(&worker->ref);
151 static void io_worker_release(struct io_worker *worker)
153 if (refcount_dec_and_test(&worker->ref))
154 complete(&worker->ref_done);
168 static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
170 return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
181 struct io_worker *worker = current->pf_io_worker;
186 return test_bit(IO_WQ_BIT_EXIT, &worker->wqe->wq->state);
189 static void io_worker_cancel_cb(struct io_worker *worker)
191 struct io_wqe_acct *acct = io_wqe_get_acct(worker);
192 struct io_wqe *wqe = worker->wqe;
196 raw_spin_lock(&worker->wqe->lock);
198 raw_spin_unlock(&worker->wqe->lock);
200 clear_bit_unlock(0, &worker->create_state);
201 io_worker_release(worker);
206 struct io_worker *worker;
210 worker = container_of(cb, struct io_worker, create_work);
211 return worker == data;
214 static void io_worker_exit(struct io_worker *worker)
216 struct io_wqe *wqe = worker->wqe;
221 io_task_worker_match, worker);
225 io_worker_cancel_cb(worker);
228 if (refcount_dec_and_test(&worker->ref))
229 complete(&worker->ref_done);
230 wait_for_completion(&worker->ref_done);
233 if (worker->flags & IO_WORKER_F_FREE)
234 hlist_nulls_del_rcu(&worker->nulls_node);
235 list_del_rcu(&worker->all_list);
237 io_wqe_dec_running(worker);
238 worker->flags = 0;
243 kfree_rcu(worker, rcu);
257 * Check head of free list for an available worker. If one isn't available,
265 struct io_worker *worker;
268 * Iterate free_list and see if we can find an idle worker to
269 * activate. If a given worker is on the free_list but in the process
272 hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
273 if (!io_worker_get(worker))
275 if (io_wqe_get_acct(worker) != acct) {
276 io_worker_release(worker);
279 if (wake_up_process(worker->task)) {
280 io_worker_release(worker);
283 io_worker_release(worker);
290 * We need a worker. If we find a free one, we're good. If not, and we're
314 static void io_wqe_inc_running(struct io_worker *worker)
316 struct io_wqe_acct *acct = io_wqe_get_acct(worker);
323 struct io_worker *worker;
329 worker = container_of(cb, struct io_worker, create_work);
330 wqe = worker->wqe;
332 acct = &wqe->acct[worker->create_index];
340 create_io_worker(wq, wqe, worker->create_index);
345 clear_bit_unlock(0, &worker->create_state);
346 io_worker_release(worker);
349 static bool io_queue_worker_create(struct io_worker *worker,
353 struct io_wqe *wqe = worker->wqe;
359 if (!io_worker_get(worker))
363 * only need one entry per worker, as the worker going to sleep
367 if (test_bit(0, &worker->create_state) ||
368 test_and_set_bit_lock(0, &worker->create_state))
372 init_task_work(&worker->create_work, func);
373 worker->create_index = acct->index;
374 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
387 clear_bit_unlock(0, &worker->create_state);
389 io_worker_release(worker);
396 static void io_wqe_dec_running(struct io_worker *worker)
399 struct io_wqe_acct *acct = io_wqe_get_acct(worker);
400 struct io_wqe *wqe = worker->wqe;
402 if (!(worker->flags & IO_WORKER_F_UP))
409 io_queue_worker_create(worker, acct, create_worker_cb);
418 static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
422 if (worker->flags & IO_WORKER_F_FREE) {
423 worker->flags &= ~IO_WORKER_F_FREE;
424 hlist_nulls_del_init_rcu(&worker->nulls_node);
429 * No work, worker going to sleep. Move to freelist, and unuse mm if we
435 static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
438 if (!(worker->flags & IO_WORKER_F_FREE)) {
439 worker->flags |= IO_WORKER_F_FREE;
440 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
468 struct io_worker *worker)
474 struct io_wqe *wqe = worker->wqe;
534 static void io_assign_current_work(struct io_worker *worker,
542 spin_lock(&worker->lock);
543 worker->cur_work = work;
544 spin_unlock(&worker->lock);
549 static void io_worker_handle_work(struct io_worker *worker)
552 struct io_wqe_acct *acct = io_wqe_get_acct(worker);
553 struct io_wqe *wqe = worker->wqe;
567 work = io_get_next_work(acct, worker);
569 __io_worker_busy(wqe, worker, work);
574 io_assign_current_work(worker, work);
587 io_assign_current_work(worker, NULL);
595 io_assign_current_work(worker, work);
621 struct io_worker *worker = data;
622 struct io_wqe_acct *acct = io_wqe_get_acct(worker);
623 struct io_wqe *wqe = worker->wqe;
628 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
640 io_worker_handle_work(worker);
643 /* timed out, exit unless we're the last worker */
651 __io_worker_idle(wqe, worker);
668 io_worker_handle_work(worker);
671 io_worker_exit(worker);
676 * Called when a worker is scheduled in. Mark us as currently running.
680 struct io_worker *worker = tsk->pf_io_worker;
682 if (!worker)
684 if (!(worker->flags & IO_WORKER_F_UP))
686 if (worker->flags & IO_WORKER_F_RUNNING)
688 worker->flags |= IO_WORKER_F_RUNNING;
689 io_wqe_inc_running(worker);
693 * Called when worker is going to sleep. If there are no workers currently
698 struct io_worker *worker = tsk->pf_io_worker;
700 if (!worker)
702 if (!(worker->flags & IO_WORKER_F_UP))
704 if (!(worker->flags & IO_WORKER_F_RUNNING))
707 worker->flags &= ~IO_WORKER_F_RUNNING;
709 raw_spin_lock(&worker->wqe->lock);
710 io_wqe_dec_running(worker);
711 raw_spin_unlock(&worker->wqe->lock);
714 static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
717 tsk->pf_io_worker = worker;
718 worker->task = tsk;
723 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
724 list_add_tail_rcu(&worker->all_list, &wqe->all_list);
725 worker->flags |= IO_WORKER_F_FREE;
757 struct io_worker *worker;
761 worker = container_of(cb, struct io_worker, create_work);
762 clear_bit_unlock(0, &worker->create_state);
763 wqe = worker->wqe;
764 tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
766 io_init_new_worker(wqe, worker, tsk);
767 io_worker_release(worker);
770 struct io_wqe_acct *acct = io_wqe_get_acct(worker);
786 kfree(worker);
790 /* re-create attempts grab a new worker ref, drop the existing one */
791 io_worker_release(worker);
792 schedule_work(&worker->work);
797 struct io_worker *worker = container_of(work, struct io_worker, work);
798 struct io_wqe_acct *acct = io_wqe_get_acct(worker);
800 if (!io_queue_worker_create(worker, acct, create_worker_cont))
801 kfree(worker);
807 struct io_worker *worker;
812 worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
813 if (!worker) {
823 refcount_set(&worker->ref, 1);
824 worker->wqe = wqe;
825 spin_lock_init(&worker->lock);
826 init_completion(&worker->ref_done);
829 worker->flags |= IO_WORKER_F_BOUND;
831 tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
833 io_init_new_worker(wqe, worker, tsk);
835 kfree(worker);
838 INIT_WORK(&worker->work, io_workqueue_create);
839 schedule_work(&worker->work);
847 * worker that isn't exiting
853 struct io_worker *worker;
856 list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
857 if (io_worker_get(worker)) {
859 if (worker->task)
860 ret = func(worker, data);
861 io_worker_release(worker);
870 static bool io_wq_worker_wake(struct io_worker *worker, void *data)
872 set_notify_signal(worker->task);
873 wake_up_process(worker->task);
949 /* fatal condition, failed to create the first worker */
983 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
991 spin_lock(&worker->lock);
992 if (worker->cur_work &&
993 match->fn(worker->cur_work, match->data)) {
994 set_notify_signal(worker->task);
997 spin_unlock(&worker->lock);
1094 * Now check if a free (going busy) or busy worker has the work
1208 struct io_worker *worker;
1212 worker = container_of(cb, struct io_worker, create_work);
1213 return worker->wqe->wq == data;
1226 struct io_worker *worker;
1228 worker = container_of(cb, struct io_worker, create_work);
1229 io_worker_cancel_cb(worker);
1231 * Only the worker continuation helper has worker allocated and
1235 kfree(worker);
1300 static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
1305 cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
1307 cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);