162306a36Sopenharmony_ci// SPDX-License-Identifier: GPL-2.0 262306a36Sopenharmony_ci/* 362306a36Sopenharmony_ci * Basic worker thread pool for io_uring 462306a36Sopenharmony_ci * 562306a36Sopenharmony_ci * Copyright (C) 2019 Jens Axboe 662306a36Sopenharmony_ci * 762306a36Sopenharmony_ci */ 862306a36Sopenharmony_ci#include <linux/kernel.h> 962306a36Sopenharmony_ci#include <linux/init.h> 1062306a36Sopenharmony_ci#include <linux/errno.h> 1162306a36Sopenharmony_ci#include <linux/sched/signal.h> 1262306a36Sopenharmony_ci#include <linux/percpu.h> 1362306a36Sopenharmony_ci#include <linux/slab.h> 1462306a36Sopenharmony_ci#include <linux/rculist_nulls.h> 1562306a36Sopenharmony_ci#include <linux/cpu.h> 1662306a36Sopenharmony_ci#include <linux/task_work.h> 1762306a36Sopenharmony_ci#include <linux/audit.h> 1862306a36Sopenharmony_ci#include <linux/mmu_context.h> 1962306a36Sopenharmony_ci#include <uapi/linux/io_uring.h> 2062306a36Sopenharmony_ci 2162306a36Sopenharmony_ci#include "io-wq.h" 2262306a36Sopenharmony_ci#include "slist.h" 2362306a36Sopenharmony_ci#include "io_uring.h" 2462306a36Sopenharmony_ci 2562306a36Sopenharmony_ci#define WORKER_IDLE_TIMEOUT (5 * HZ) 2662306a36Sopenharmony_ci 2762306a36Sopenharmony_cienum { 2862306a36Sopenharmony_ci IO_WORKER_F_UP = 1, /* up and active */ 2962306a36Sopenharmony_ci IO_WORKER_F_RUNNING = 2, /* account as running */ 3062306a36Sopenharmony_ci IO_WORKER_F_FREE = 4, /* worker on free list */ 3162306a36Sopenharmony_ci IO_WORKER_F_BOUND = 8, /* is doing bounded work */ 3262306a36Sopenharmony_ci}; 3362306a36Sopenharmony_ci 3462306a36Sopenharmony_cienum { 3562306a36Sopenharmony_ci IO_WQ_BIT_EXIT = 0, /* wq exiting */ 3662306a36Sopenharmony_ci}; 3762306a36Sopenharmony_ci 3862306a36Sopenharmony_cienum { 3962306a36Sopenharmony_ci IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ 4062306a36Sopenharmony_ci}; 4162306a36Sopenharmony_ci 4262306a36Sopenharmony_ci/* 4362306a36Sopenharmony_ci * One for each thread in a wq pool 4462306a36Sopenharmony_ci */ 4562306a36Sopenharmony_cistruct io_worker { 4662306a36Sopenharmony_ci refcount_t ref; 4762306a36Sopenharmony_ci unsigned flags; 4862306a36Sopenharmony_ci struct hlist_nulls_node nulls_node; 4962306a36Sopenharmony_ci struct list_head all_list; 5062306a36Sopenharmony_ci struct task_struct *task; 5162306a36Sopenharmony_ci struct io_wq *wq; 5262306a36Sopenharmony_ci 5362306a36Sopenharmony_ci struct io_wq_work *cur_work; 5462306a36Sopenharmony_ci struct io_wq_work *next_work; 5562306a36Sopenharmony_ci raw_spinlock_t lock; 5662306a36Sopenharmony_ci 5762306a36Sopenharmony_ci struct completion ref_done; 5862306a36Sopenharmony_ci 5962306a36Sopenharmony_ci unsigned long create_state; 6062306a36Sopenharmony_ci struct callback_head create_work; 6162306a36Sopenharmony_ci int create_index; 6262306a36Sopenharmony_ci 6362306a36Sopenharmony_ci union { 6462306a36Sopenharmony_ci struct rcu_head rcu; 6562306a36Sopenharmony_ci struct work_struct work; 6662306a36Sopenharmony_ci }; 6762306a36Sopenharmony_ci}; 6862306a36Sopenharmony_ci 6962306a36Sopenharmony_ci#if BITS_PER_LONG == 64 7062306a36Sopenharmony_ci#define IO_WQ_HASH_ORDER 6 7162306a36Sopenharmony_ci#else 7262306a36Sopenharmony_ci#define IO_WQ_HASH_ORDER 5 7362306a36Sopenharmony_ci#endif 7462306a36Sopenharmony_ci 7562306a36Sopenharmony_ci#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 7662306a36Sopenharmony_ci 7762306a36Sopenharmony_cistruct io_wq_acct { 7862306a36Sopenharmony_ci unsigned nr_workers; 7962306a36Sopenharmony_ci unsigned max_workers; 8062306a36Sopenharmony_ci int index; 8162306a36Sopenharmony_ci atomic_t nr_running; 8262306a36Sopenharmony_ci raw_spinlock_t lock; 8362306a36Sopenharmony_ci struct io_wq_work_list work_list; 8462306a36Sopenharmony_ci unsigned long flags; 8562306a36Sopenharmony_ci}; 8662306a36Sopenharmony_ci 8762306a36Sopenharmony_cienum { 8862306a36Sopenharmony_ci IO_WQ_ACCT_BOUND, 8962306a36Sopenharmony_ci IO_WQ_ACCT_UNBOUND, 9062306a36Sopenharmony_ci IO_WQ_ACCT_NR, 9162306a36Sopenharmony_ci}; 9262306a36Sopenharmony_ci 9362306a36Sopenharmony_ci/* 9462306a36Sopenharmony_ci * Per io_wq state 9562306a36Sopenharmony_ci */ 9662306a36Sopenharmony_cistruct io_wq { 9762306a36Sopenharmony_ci unsigned long state; 9862306a36Sopenharmony_ci 9962306a36Sopenharmony_ci free_work_fn *free_work; 10062306a36Sopenharmony_ci io_wq_work_fn *do_work; 10162306a36Sopenharmony_ci 10262306a36Sopenharmony_ci struct io_wq_hash *hash; 10362306a36Sopenharmony_ci 10462306a36Sopenharmony_ci atomic_t worker_refs; 10562306a36Sopenharmony_ci struct completion worker_done; 10662306a36Sopenharmony_ci 10762306a36Sopenharmony_ci struct hlist_node cpuhp_node; 10862306a36Sopenharmony_ci 10962306a36Sopenharmony_ci struct task_struct *task; 11062306a36Sopenharmony_ci 11162306a36Sopenharmony_ci struct io_wq_acct acct[IO_WQ_ACCT_NR]; 11262306a36Sopenharmony_ci 11362306a36Sopenharmony_ci /* lock protects access to elements below */ 11462306a36Sopenharmony_ci raw_spinlock_t lock; 11562306a36Sopenharmony_ci 11662306a36Sopenharmony_ci struct hlist_nulls_head free_list; 11762306a36Sopenharmony_ci struct list_head all_list; 11862306a36Sopenharmony_ci 11962306a36Sopenharmony_ci struct wait_queue_entry wait; 12062306a36Sopenharmony_ci 12162306a36Sopenharmony_ci struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 12262306a36Sopenharmony_ci 12362306a36Sopenharmony_ci cpumask_var_t cpu_mask; 12462306a36Sopenharmony_ci}; 12562306a36Sopenharmony_ci 12662306a36Sopenharmony_cistatic enum cpuhp_state io_wq_online; 12762306a36Sopenharmony_ci 12862306a36Sopenharmony_cistruct io_cb_cancel_data { 12962306a36Sopenharmony_ci work_cancel_fn *fn; 13062306a36Sopenharmony_ci void *data; 13162306a36Sopenharmony_ci int nr_running; 13262306a36Sopenharmony_ci int nr_pending; 13362306a36Sopenharmony_ci bool cancel_all; 13462306a36Sopenharmony_ci}; 13562306a36Sopenharmony_ci 13662306a36Sopenharmony_cistatic bool create_io_worker(struct io_wq *wq, int index); 13762306a36Sopenharmony_cistatic void io_wq_dec_running(struct io_worker *worker); 13862306a36Sopenharmony_cistatic bool io_acct_cancel_pending_work(struct io_wq *wq, 13962306a36Sopenharmony_ci struct io_wq_acct *acct, 14062306a36Sopenharmony_ci struct io_cb_cancel_data *match); 14162306a36Sopenharmony_cistatic void create_worker_cb(struct callback_head *cb); 14262306a36Sopenharmony_cistatic void io_wq_cancel_tw_create(struct io_wq *wq); 14362306a36Sopenharmony_ci 14462306a36Sopenharmony_cistatic bool io_worker_get(struct io_worker *worker) 14562306a36Sopenharmony_ci{ 14662306a36Sopenharmony_ci return refcount_inc_not_zero(&worker->ref); 14762306a36Sopenharmony_ci} 14862306a36Sopenharmony_ci 14962306a36Sopenharmony_cistatic void io_worker_release(struct io_worker *worker) 15062306a36Sopenharmony_ci{ 15162306a36Sopenharmony_ci if (refcount_dec_and_test(&worker->ref)) 15262306a36Sopenharmony_ci complete(&worker->ref_done); 15362306a36Sopenharmony_ci} 15462306a36Sopenharmony_ci 15562306a36Sopenharmony_cistatic inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) 15662306a36Sopenharmony_ci{ 15762306a36Sopenharmony_ci return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 15862306a36Sopenharmony_ci} 15962306a36Sopenharmony_ci 16062306a36Sopenharmony_cistatic inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, 16162306a36Sopenharmony_ci struct io_wq_work *work) 16262306a36Sopenharmony_ci{ 16362306a36Sopenharmony_ci return io_get_acct(wq, !(work->flags & IO_WQ_WORK_UNBOUND)); 16462306a36Sopenharmony_ci} 16562306a36Sopenharmony_ci 16662306a36Sopenharmony_cistatic inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) 16762306a36Sopenharmony_ci{ 16862306a36Sopenharmony_ci return io_get_acct(worker->wq, worker->flags & IO_WORKER_F_BOUND); 16962306a36Sopenharmony_ci} 17062306a36Sopenharmony_ci 17162306a36Sopenharmony_cistatic void io_worker_ref_put(struct io_wq *wq) 17262306a36Sopenharmony_ci{ 17362306a36Sopenharmony_ci if (atomic_dec_and_test(&wq->worker_refs)) 17462306a36Sopenharmony_ci complete(&wq->worker_done); 17562306a36Sopenharmony_ci} 17662306a36Sopenharmony_ci 17762306a36Sopenharmony_cibool io_wq_worker_stopped(void) 17862306a36Sopenharmony_ci{ 17962306a36Sopenharmony_ci struct io_worker *worker = current->worker_private; 18062306a36Sopenharmony_ci 18162306a36Sopenharmony_ci if (WARN_ON_ONCE(!io_wq_current_is_worker())) 18262306a36Sopenharmony_ci return true; 18362306a36Sopenharmony_ci 18462306a36Sopenharmony_ci return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state); 18562306a36Sopenharmony_ci} 18662306a36Sopenharmony_ci 18762306a36Sopenharmony_cistatic void io_worker_cancel_cb(struct io_worker *worker) 18862306a36Sopenharmony_ci{ 18962306a36Sopenharmony_ci struct io_wq_acct *acct = io_wq_get_acct(worker); 19062306a36Sopenharmony_ci struct io_wq *wq = worker->wq; 19162306a36Sopenharmony_ci 19262306a36Sopenharmony_ci atomic_dec(&acct->nr_running); 19362306a36Sopenharmony_ci raw_spin_lock(&wq->lock); 19462306a36Sopenharmony_ci acct->nr_workers--; 19562306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 19662306a36Sopenharmony_ci io_worker_ref_put(wq); 19762306a36Sopenharmony_ci clear_bit_unlock(0, &worker->create_state); 19862306a36Sopenharmony_ci io_worker_release(worker); 19962306a36Sopenharmony_ci} 20062306a36Sopenharmony_ci 20162306a36Sopenharmony_cistatic bool io_task_worker_match(struct callback_head *cb, void *data) 20262306a36Sopenharmony_ci{ 20362306a36Sopenharmony_ci struct io_worker *worker; 20462306a36Sopenharmony_ci 20562306a36Sopenharmony_ci if (cb->func != create_worker_cb) 20662306a36Sopenharmony_ci return false; 20762306a36Sopenharmony_ci worker = container_of(cb, struct io_worker, create_work); 20862306a36Sopenharmony_ci return worker == data; 20962306a36Sopenharmony_ci} 21062306a36Sopenharmony_ci 21162306a36Sopenharmony_cistatic void io_worker_exit(struct io_worker *worker) 21262306a36Sopenharmony_ci{ 21362306a36Sopenharmony_ci struct io_wq *wq = worker->wq; 21462306a36Sopenharmony_ci 21562306a36Sopenharmony_ci while (1) { 21662306a36Sopenharmony_ci struct callback_head *cb = task_work_cancel_match(wq->task, 21762306a36Sopenharmony_ci io_task_worker_match, worker); 21862306a36Sopenharmony_ci 21962306a36Sopenharmony_ci if (!cb) 22062306a36Sopenharmony_ci break; 22162306a36Sopenharmony_ci io_worker_cancel_cb(worker); 22262306a36Sopenharmony_ci } 22362306a36Sopenharmony_ci 22462306a36Sopenharmony_ci io_worker_release(worker); 22562306a36Sopenharmony_ci wait_for_completion(&worker->ref_done); 22662306a36Sopenharmony_ci 22762306a36Sopenharmony_ci raw_spin_lock(&wq->lock); 22862306a36Sopenharmony_ci if (worker->flags & IO_WORKER_F_FREE) 22962306a36Sopenharmony_ci hlist_nulls_del_rcu(&worker->nulls_node); 23062306a36Sopenharmony_ci list_del_rcu(&worker->all_list); 23162306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 23262306a36Sopenharmony_ci io_wq_dec_running(worker); 23362306a36Sopenharmony_ci /* 23462306a36Sopenharmony_ci * this worker is a goner, clear ->worker_private to avoid any 23562306a36Sopenharmony_ci * inc/dec running calls that could happen as part of exit from 23662306a36Sopenharmony_ci * touching 'worker'. 23762306a36Sopenharmony_ci */ 23862306a36Sopenharmony_ci current->worker_private = NULL; 23962306a36Sopenharmony_ci 24062306a36Sopenharmony_ci kfree_rcu(worker, rcu); 24162306a36Sopenharmony_ci io_worker_ref_put(wq); 24262306a36Sopenharmony_ci do_exit(0); 24362306a36Sopenharmony_ci} 24462306a36Sopenharmony_ci 24562306a36Sopenharmony_cistatic inline bool __io_acct_run_queue(struct io_wq_acct *acct) 24662306a36Sopenharmony_ci{ 24762306a36Sopenharmony_ci return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) && 24862306a36Sopenharmony_ci !wq_list_empty(&acct->work_list); 24962306a36Sopenharmony_ci} 25062306a36Sopenharmony_ci 25162306a36Sopenharmony_ci/* 25262306a36Sopenharmony_ci * If there's work to do, returns true with acct->lock acquired. If not, 25362306a36Sopenharmony_ci * returns false with no lock held. 25462306a36Sopenharmony_ci */ 25562306a36Sopenharmony_cistatic inline bool io_acct_run_queue(struct io_wq_acct *acct) 25662306a36Sopenharmony_ci __acquires(&acct->lock) 25762306a36Sopenharmony_ci{ 25862306a36Sopenharmony_ci raw_spin_lock(&acct->lock); 25962306a36Sopenharmony_ci if (__io_acct_run_queue(acct)) 26062306a36Sopenharmony_ci return true; 26162306a36Sopenharmony_ci 26262306a36Sopenharmony_ci raw_spin_unlock(&acct->lock); 26362306a36Sopenharmony_ci return false; 26462306a36Sopenharmony_ci} 26562306a36Sopenharmony_ci 26662306a36Sopenharmony_ci/* 26762306a36Sopenharmony_ci * Check head of free list for an available worker. If one isn't available, 26862306a36Sopenharmony_ci * caller must create one. 26962306a36Sopenharmony_ci */ 27062306a36Sopenharmony_cistatic bool io_wq_activate_free_worker(struct io_wq *wq, 27162306a36Sopenharmony_ci struct io_wq_acct *acct) 27262306a36Sopenharmony_ci __must_hold(RCU) 27362306a36Sopenharmony_ci{ 27462306a36Sopenharmony_ci struct hlist_nulls_node *n; 27562306a36Sopenharmony_ci struct io_worker *worker; 27662306a36Sopenharmony_ci 27762306a36Sopenharmony_ci /* 27862306a36Sopenharmony_ci * Iterate free_list and see if we can find an idle worker to 27962306a36Sopenharmony_ci * activate. If a given worker is on the free_list but in the process 28062306a36Sopenharmony_ci * of exiting, keep trying. 28162306a36Sopenharmony_ci */ 28262306a36Sopenharmony_ci hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) { 28362306a36Sopenharmony_ci if (!io_worker_get(worker)) 28462306a36Sopenharmony_ci continue; 28562306a36Sopenharmony_ci if (io_wq_get_acct(worker) != acct) { 28662306a36Sopenharmony_ci io_worker_release(worker); 28762306a36Sopenharmony_ci continue; 28862306a36Sopenharmony_ci } 28962306a36Sopenharmony_ci /* 29062306a36Sopenharmony_ci * If the worker is already running, it's either already 29162306a36Sopenharmony_ci * starting work or finishing work. In either case, if it does 29262306a36Sopenharmony_ci * to go sleep, we'll kick off a new task for this work anyway. 29362306a36Sopenharmony_ci */ 29462306a36Sopenharmony_ci wake_up_process(worker->task); 29562306a36Sopenharmony_ci io_worker_release(worker); 29662306a36Sopenharmony_ci return true; 29762306a36Sopenharmony_ci } 29862306a36Sopenharmony_ci 29962306a36Sopenharmony_ci return false; 30062306a36Sopenharmony_ci} 30162306a36Sopenharmony_ci 30262306a36Sopenharmony_ci/* 30362306a36Sopenharmony_ci * We need a worker. If we find a free one, we're good. If not, and we're 30462306a36Sopenharmony_ci * below the max number of workers, create one. 30562306a36Sopenharmony_ci */ 30662306a36Sopenharmony_cistatic bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) 30762306a36Sopenharmony_ci{ 30862306a36Sopenharmony_ci /* 30962306a36Sopenharmony_ci * Most likely an attempt to queue unbounded work on an io_wq that 31062306a36Sopenharmony_ci * wasn't setup with any unbounded workers. 31162306a36Sopenharmony_ci */ 31262306a36Sopenharmony_ci if (unlikely(!acct->max_workers)) 31362306a36Sopenharmony_ci pr_warn_once("io-wq is not configured for unbound workers"); 31462306a36Sopenharmony_ci 31562306a36Sopenharmony_ci raw_spin_lock(&wq->lock); 31662306a36Sopenharmony_ci if (acct->nr_workers >= acct->max_workers) { 31762306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 31862306a36Sopenharmony_ci return true; 31962306a36Sopenharmony_ci } 32062306a36Sopenharmony_ci acct->nr_workers++; 32162306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 32262306a36Sopenharmony_ci atomic_inc(&acct->nr_running); 32362306a36Sopenharmony_ci atomic_inc(&wq->worker_refs); 32462306a36Sopenharmony_ci return create_io_worker(wq, acct->index); 32562306a36Sopenharmony_ci} 32662306a36Sopenharmony_ci 32762306a36Sopenharmony_cistatic void io_wq_inc_running(struct io_worker *worker) 32862306a36Sopenharmony_ci{ 32962306a36Sopenharmony_ci struct io_wq_acct *acct = io_wq_get_acct(worker); 33062306a36Sopenharmony_ci 33162306a36Sopenharmony_ci atomic_inc(&acct->nr_running); 33262306a36Sopenharmony_ci} 33362306a36Sopenharmony_ci 33462306a36Sopenharmony_cistatic void create_worker_cb(struct callback_head *cb) 33562306a36Sopenharmony_ci{ 33662306a36Sopenharmony_ci struct io_worker *worker; 33762306a36Sopenharmony_ci struct io_wq *wq; 33862306a36Sopenharmony_ci 33962306a36Sopenharmony_ci struct io_wq_acct *acct; 34062306a36Sopenharmony_ci bool do_create = false; 34162306a36Sopenharmony_ci 34262306a36Sopenharmony_ci worker = container_of(cb, struct io_worker, create_work); 34362306a36Sopenharmony_ci wq = worker->wq; 34462306a36Sopenharmony_ci acct = &wq->acct[worker->create_index]; 34562306a36Sopenharmony_ci raw_spin_lock(&wq->lock); 34662306a36Sopenharmony_ci 34762306a36Sopenharmony_ci if (acct->nr_workers < acct->max_workers) { 34862306a36Sopenharmony_ci acct->nr_workers++; 34962306a36Sopenharmony_ci do_create = true; 35062306a36Sopenharmony_ci } 35162306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 35262306a36Sopenharmony_ci if (do_create) { 35362306a36Sopenharmony_ci create_io_worker(wq, worker->create_index); 35462306a36Sopenharmony_ci } else { 35562306a36Sopenharmony_ci atomic_dec(&acct->nr_running); 35662306a36Sopenharmony_ci io_worker_ref_put(wq); 35762306a36Sopenharmony_ci } 35862306a36Sopenharmony_ci clear_bit_unlock(0, &worker->create_state); 35962306a36Sopenharmony_ci io_worker_release(worker); 36062306a36Sopenharmony_ci} 36162306a36Sopenharmony_ci 36262306a36Sopenharmony_cistatic bool io_queue_worker_create(struct io_worker *worker, 36362306a36Sopenharmony_ci struct io_wq_acct *acct, 36462306a36Sopenharmony_ci task_work_func_t func) 36562306a36Sopenharmony_ci{ 36662306a36Sopenharmony_ci struct io_wq *wq = worker->wq; 36762306a36Sopenharmony_ci 36862306a36Sopenharmony_ci /* raced with exit, just ignore create call */ 36962306a36Sopenharmony_ci if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 37062306a36Sopenharmony_ci goto fail; 37162306a36Sopenharmony_ci if (!io_worker_get(worker)) 37262306a36Sopenharmony_ci goto fail; 37362306a36Sopenharmony_ci /* 37462306a36Sopenharmony_ci * create_state manages ownership of create_work/index. We should 37562306a36Sopenharmony_ci * only need one entry per worker, as the worker going to sleep 37662306a36Sopenharmony_ci * will trigger the condition, and waking will clear it once it 37762306a36Sopenharmony_ci * runs the task_work. 37862306a36Sopenharmony_ci */ 37962306a36Sopenharmony_ci if (test_bit(0, &worker->create_state) || 38062306a36Sopenharmony_ci test_and_set_bit_lock(0, &worker->create_state)) 38162306a36Sopenharmony_ci goto fail_release; 38262306a36Sopenharmony_ci 38362306a36Sopenharmony_ci atomic_inc(&wq->worker_refs); 38462306a36Sopenharmony_ci init_task_work(&worker->create_work, func); 38562306a36Sopenharmony_ci worker->create_index = acct->index; 38662306a36Sopenharmony_ci if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { 38762306a36Sopenharmony_ci /* 38862306a36Sopenharmony_ci * EXIT may have been set after checking it above, check after 38962306a36Sopenharmony_ci * adding the task_work and remove any creation item if it is 39062306a36Sopenharmony_ci * now set. wq exit does that too, but we can have added this 39162306a36Sopenharmony_ci * work item after we canceled in io_wq_exit_workers(). 39262306a36Sopenharmony_ci */ 39362306a36Sopenharmony_ci if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 39462306a36Sopenharmony_ci io_wq_cancel_tw_create(wq); 39562306a36Sopenharmony_ci io_worker_ref_put(wq); 39662306a36Sopenharmony_ci return true; 39762306a36Sopenharmony_ci } 39862306a36Sopenharmony_ci io_worker_ref_put(wq); 39962306a36Sopenharmony_ci clear_bit_unlock(0, &worker->create_state); 40062306a36Sopenharmony_cifail_release: 40162306a36Sopenharmony_ci io_worker_release(worker); 40262306a36Sopenharmony_cifail: 40362306a36Sopenharmony_ci atomic_dec(&acct->nr_running); 40462306a36Sopenharmony_ci io_worker_ref_put(wq); 40562306a36Sopenharmony_ci return false; 40662306a36Sopenharmony_ci} 40762306a36Sopenharmony_ci 40862306a36Sopenharmony_cistatic void io_wq_dec_running(struct io_worker *worker) 40962306a36Sopenharmony_ci{ 41062306a36Sopenharmony_ci struct io_wq_acct *acct = io_wq_get_acct(worker); 41162306a36Sopenharmony_ci struct io_wq *wq = worker->wq; 41262306a36Sopenharmony_ci 41362306a36Sopenharmony_ci if (!(worker->flags & IO_WORKER_F_UP)) 41462306a36Sopenharmony_ci return; 41562306a36Sopenharmony_ci 41662306a36Sopenharmony_ci if (!atomic_dec_and_test(&acct->nr_running)) 41762306a36Sopenharmony_ci return; 41862306a36Sopenharmony_ci if (!io_acct_run_queue(acct)) 41962306a36Sopenharmony_ci return; 42062306a36Sopenharmony_ci 42162306a36Sopenharmony_ci raw_spin_unlock(&acct->lock); 42262306a36Sopenharmony_ci atomic_inc(&acct->nr_running); 42362306a36Sopenharmony_ci atomic_inc(&wq->worker_refs); 42462306a36Sopenharmony_ci io_queue_worker_create(worker, acct, create_worker_cb); 42562306a36Sopenharmony_ci} 42662306a36Sopenharmony_ci 42762306a36Sopenharmony_ci/* 42862306a36Sopenharmony_ci * Worker will start processing some work. Move it to the busy list, if 42962306a36Sopenharmony_ci * it's currently on the freelist 43062306a36Sopenharmony_ci */ 43162306a36Sopenharmony_cistatic void __io_worker_busy(struct io_wq *wq, struct io_worker *worker) 43262306a36Sopenharmony_ci{ 43362306a36Sopenharmony_ci if (worker->flags & IO_WORKER_F_FREE) { 43462306a36Sopenharmony_ci worker->flags &= ~IO_WORKER_F_FREE; 43562306a36Sopenharmony_ci raw_spin_lock(&wq->lock); 43662306a36Sopenharmony_ci hlist_nulls_del_init_rcu(&worker->nulls_node); 43762306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 43862306a36Sopenharmony_ci } 43962306a36Sopenharmony_ci} 44062306a36Sopenharmony_ci 44162306a36Sopenharmony_ci/* 44262306a36Sopenharmony_ci * No work, worker going to sleep. Move to freelist. 44362306a36Sopenharmony_ci */ 44462306a36Sopenharmony_cistatic void __io_worker_idle(struct io_wq *wq, struct io_worker *worker) 44562306a36Sopenharmony_ci __must_hold(wq->lock) 44662306a36Sopenharmony_ci{ 44762306a36Sopenharmony_ci if (!(worker->flags & IO_WORKER_F_FREE)) { 44862306a36Sopenharmony_ci worker->flags |= IO_WORKER_F_FREE; 44962306a36Sopenharmony_ci hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); 45062306a36Sopenharmony_ci } 45162306a36Sopenharmony_ci} 45262306a36Sopenharmony_ci 45362306a36Sopenharmony_cistatic inline unsigned int io_get_work_hash(struct io_wq_work *work) 45462306a36Sopenharmony_ci{ 45562306a36Sopenharmony_ci return work->flags >> IO_WQ_HASH_SHIFT; 45662306a36Sopenharmony_ci} 45762306a36Sopenharmony_ci 45862306a36Sopenharmony_cistatic bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) 45962306a36Sopenharmony_ci{ 46062306a36Sopenharmony_ci bool ret = false; 46162306a36Sopenharmony_ci 46262306a36Sopenharmony_ci spin_lock_irq(&wq->hash->wait.lock); 46362306a36Sopenharmony_ci if (list_empty(&wq->wait.entry)) { 46462306a36Sopenharmony_ci __add_wait_queue(&wq->hash->wait, &wq->wait); 46562306a36Sopenharmony_ci if (!test_bit(hash, &wq->hash->map)) { 46662306a36Sopenharmony_ci __set_current_state(TASK_RUNNING); 46762306a36Sopenharmony_ci list_del_init(&wq->wait.entry); 46862306a36Sopenharmony_ci ret = true; 46962306a36Sopenharmony_ci } 47062306a36Sopenharmony_ci } 47162306a36Sopenharmony_ci spin_unlock_irq(&wq->hash->wait.lock); 47262306a36Sopenharmony_ci return ret; 47362306a36Sopenharmony_ci} 47462306a36Sopenharmony_ci 47562306a36Sopenharmony_cistatic struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, 47662306a36Sopenharmony_ci struct io_worker *worker) 47762306a36Sopenharmony_ci __must_hold(acct->lock) 47862306a36Sopenharmony_ci{ 47962306a36Sopenharmony_ci struct io_wq_work_node *node, *prev; 48062306a36Sopenharmony_ci struct io_wq_work *work, *tail; 48162306a36Sopenharmony_ci unsigned int stall_hash = -1U; 48262306a36Sopenharmony_ci struct io_wq *wq = worker->wq; 48362306a36Sopenharmony_ci 48462306a36Sopenharmony_ci wq_list_for_each(node, prev, &acct->work_list) { 48562306a36Sopenharmony_ci unsigned int hash; 48662306a36Sopenharmony_ci 48762306a36Sopenharmony_ci work = container_of(node, struct io_wq_work, list); 48862306a36Sopenharmony_ci 48962306a36Sopenharmony_ci /* not hashed, can run anytime */ 49062306a36Sopenharmony_ci if (!io_wq_is_hashed(work)) { 49162306a36Sopenharmony_ci wq_list_del(&acct->work_list, node, prev); 49262306a36Sopenharmony_ci return work; 49362306a36Sopenharmony_ci } 49462306a36Sopenharmony_ci 49562306a36Sopenharmony_ci hash = io_get_work_hash(work); 49662306a36Sopenharmony_ci /* all items with this hash lie in [work, tail] */ 49762306a36Sopenharmony_ci tail = wq->hash_tail[hash]; 49862306a36Sopenharmony_ci 49962306a36Sopenharmony_ci /* hashed, can run if not already running */ 50062306a36Sopenharmony_ci if (!test_and_set_bit(hash, &wq->hash->map)) { 50162306a36Sopenharmony_ci wq->hash_tail[hash] = NULL; 50262306a36Sopenharmony_ci wq_list_cut(&acct->work_list, &tail->list, prev); 50362306a36Sopenharmony_ci return work; 50462306a36Sopenharmony_ci } 50562306a36Sopenharmony_ci if (stall_hash == -1U) 50662306a36Sopenharmony_ci stall_hash = hash; 50762306a36Sopenharmony_ci /* fast forward to a next hash, for-each will fix up @prev */ 50862306a36Sopenharmony_ci node = &tail->list; 50962306a36Sopenharmony_ci } 51062306a36Sopenharmony_ci 51162306a36Sopenharmony_ci if (stall_hash != -1U) { 51262306a36Sopenharmony_ci bool unstalled; 51362306a36Sopenharmony_ci 51462306a36Sopenharmony_ci /* 51562306a36Sopenharmony_ci * Set this before dropping the lock to avoid racing with new 51662306a36Sopenharmony_ci * work being added and clearing the stalled bit. 51762306a36Sopenharmony_ci */ 51862306a36Sopenharmony_ci set_bit(IO_ACCT_STALLED_BIT, &acct->flags); 51962306a36Sopenharmony_ci raw_spin_unlock(&acct->lock); 52062306a36Sopenharmony_ci unstalled = io_wait_on_hash(wq, stall_hash); 52162306a36Sopenharmony_ci raw_spin_lock(&acct->lock); 52262306a36Sopenharmony_ci if (unstalled) { 52362306a36Sopenharmony_ci clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 52462306a36Sopenharmony_ci if (wq_has_sleeper(&wq->hash->wait)) 52562306a36Sopenharmony_ci wake_up(&wq->hash->wait); 52662306a36Sopenharmony_ci } 52762306a36Sopenharmony_ci } 52862306a36Sopenharmony_ci 52962306a36Sopenharmony_ci return NULL; 53062306a36Sopenharmony_ci} 53162306a36Sopenharmony_ci 53262306a36Sopenharmony_cistatic void io_assign_current_work(struct io_worker *worker, 53362306a36Sopenharmony_ci struct io_wq_work *work) 53462306a36Sopenharmony_ci{ 53562306a36Sopenharmony_ci if (work) { 53662306a36Sopenharmony_ci io_run_task_work(); 53762306a36Sopenharmony_ci cond_resched(); 53862306a36Sopenharmony_ci } 53962306a36Sopenharmony_ci 54062306a36Sopenharmony_ci raw_spin_lock(&worker->lock); 54162306a36Sopenharmony_ci worker->cur_work = work; 54262306a36Sopenharmony_ci worker->next_work = NULL; 54362306a36Sopenharmony_ci raw_spin_unlock(&worker->lock); 54462306a36Sopenharmony_ci} 54562306a36Sopenharmony_ci 54662306a36Sopenharmony_ci/* 54762306a36Sopenharmony_ci * Called with acct->lock held, drops it before returning 54862306a36Sopenharmony_ci */ 54962306a36Sopenharmony_cistatic void io_worker_handle_work(struct io_wq_acct *acct, 55062306a36Sopenharmony_ci struct io_worker *worker) 55162306a36Sopenharmony_ci __releases(&acct->lock) 55262306a36Sopenharmony_ci{ 55362306a36Sopenharmony_ci struct io_wq *wq = worker->wq; 55462306a36Sopenharmony_ci bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 55562306a36Sopenharmony_ci 55662306a36Sopenharmony_ci do { 55762306a36Sopenharmony_ci struct io_wq_work *work; 55862306a36Sopenharmony_ci 55962306a36Sopenharmony_ci /* 56062306a36Sopenharmony_ci * If we got some work, mark us as busy. If we didn't, but 56162306a36Sopenharmony_ci * the list isn't empty, it means we stalled on hashed work. 56262306a36Sopenharmony_ci * Mark us stalled so we don't keep looking for work when we 56362306a36Sopenharmony_ci * can't make progress, any work completion or insertion will 56462306a36Sopenharmony_ci * clear the stalled flag. 56562306a36Sopenharmony_ci */ 56662306a36Sopenharmony_ci work = io_get_next_work(acct, worker); 56762306a36Sopenharmony_ci raw_spin_unlock(&acct->lock); 56862306a36Sopenharmony_ci if (work) { 56962306a36Sopenharmony_ci __io_worker_busy(wq, worker); 57062306a36Sopenharmony_ci 57162306a36Sopenharmony_ci /* 57262306a36Sopenharmony_ci * Make sure cancelation can find this, even before 57362306a36Sopenharmony_ci * it becomes the active work. That avoids a window 57462306a36Sopenharmony_ci * where the work has been removed from our general 57562306a36Sopenharmony_ci * work list, but isn't yet discoverable as the 57662306a36Sopenharmony_ci * current work item for this worker. 57762306a36Sopenharmony_ci */ 57862306a36Sopenharmony_ci raw_spin_lock(&worker->lock); 57962306a36Sopenharmony_ci worker->next_work = work; 58062306a36Sopenharmony_ci raw_spin_unlock(&worker->lock); 58162306a36Sopenharmony_ci } else { 58262306a36Sopenharmony_ci break; 58362306a36Sopenharmony_ci } 58462306a36Sopenharmony_ci io_assign_current_work(worker, work); 58562306a36Sopenharmony_ci __set_current_state(TASK_RUNNING); 58662306a36Sopenharmony_ci 58762306a36Sopenharmony_ci /* handle a whole dependent link */ 58862306a36Sopenharmony_ci do { 58962306a36Sopenharmony_ci struct io_wq_work *next_hashed, *linked; 59062306a36Sopenharmony_ci unsigned int hash = io_get_work_hash(work); 59162306a36Sopenharmony_ci 59262306a36Sopenharmony_ci next_hashed = wq_next_work(work); 59362306a36Sopenharmony_ci 59462306a36Sopenharmony_ci if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND)) 59562306a36Sopenharmony_ci work->flags |= IO_WQ_WORK_CANCEL; 59662306a36Sopenharmony_ci wq->do_work(work); 59762306a36Sopenharmony_ci io_assign_current_work(worker, NULL); 59862306a36Sopenharmony_ci 59962306a36Sopenharmony_ci linked = wq->free_work(work); 60062306a36Sopenharmony_ci work = next_hashed; 60162306a36Sopenharmony_ci if (!work && linked && !io_wq_is_hashed(linked)) { 60262306a36Sopenharmony_ci work = linked; 60362306a36Sopenharmony_ci linked = NULL; 60462306a36Sopenharmony_ci } 60562306a36Sopenharmony_ci io_assign_current_work(worker, work); 60662306a36Sopenharmony_ci if (linked) 60762306a36Sopenharmony_ci io_wq_enqueue(wq, linked); 60862306a36Sopenharmony_ci 60962306a36Sopenharmony_ci if (hash != -1U && !next_hashed) { 61062306a36Sopenharmony_ci /* serialize hash clear with wake_up() */ 61162306a36Sopenharmony_ci spin_lock_irq(&wq->hash->wait.lock); 61262306a36Sopenharmony_ci clear_bit(hash, &wq->hash->map); 61362306a36Sopenharmony_ci clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 61462306a36Sopenharmony_ci spin_unlock_irq(&wq->hash->wait.lock); 61562306a36Sopenharmony_ci if (wq_has_sleeper(&wq->hash->wait)) 61662306a36Sopenharmony_ci wake_up(&wq->hash->wait); 61762306a36Sopenharmony_ci } 61862306a36Sopenharmony_ci } while (work); 61962306a36Sopenharmony_ci 62062306a36Sopenharmony_ci if (!__io_acct_run_queue(acct)) 62162306a36Sopenharmony_ci break; 62262306a36Sopenharmony_ci raw_spin_lock(&acct->lock); 62362306a36Sopenharmony_ci } while (1); 62462306a36Sopenharmony_ci} 62562306a36Sopenharmony_ci 62662306a36Sopenharmony_cistatic int io_wq_worker(void *data) 62762306a36Sopenharmony_ci{ 62862306a36Sopenharmony_ci struct io_worker *worker = data; 62962306a36Sopenharmony_ci struct io_wq_acct *acct = io_wq_get_acct(worker); 63062306a36Sopenharmony_ci struct io_wq *wq = worker->wq; 63162306a36Sopenharmony_ci bool exit_mask = false, last_timeout = false; 63262306a36Sopenharmony_ci char buf[TASK_COMM_LEN]; 63362306a36Sopenharmony_ci 63462306a36Sopenharmony_ci worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); 63562306a36Sopenharmony_ci 63662306a36Sopenharmony_ci snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 63762306a36Sopenharmony_ci set_task_comm(current, buf); 63862306a36Sopenharmony_ci 63962306a36Sopenharmony_ci while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 64062306a36Sopenharmony_ci long ret; 64162306a36Sopenharmony_ci 64262306a36Sopenharmony_ci set_current_state(TASK_INTERRUPTIBLE); 64362306a36Sopenharmony_ci 64462306a36Sopenharmony_ci /* 64562306a36Sopenharmony_ci * If we have work to do, io_acct_run_queue() returns with 64662306a36Sopenharmony_ci * the acct->lock held. If not, it will drop it. 64762306a36Sopenharmony_ci */ 64862306a36Sopenharmony_ci while (io_acct_run_queue(acct)) 64962306a36Sopenharmony_ci io_worker_handle_work(acct, worker); 65062306a36Sopenharmony_ci 65162306a36Sopenharmony_ci raw_spin_lock(&wq->lock); 65262306a36Sopenharmony_ci /* 65362306a36Sopenharmony_ci * Last sleep timed out. Exit if we're not the last worker, 65462306a36Sopenharmony_ci * or if someone modified our affinity. 65562306a36Sopenharmony_ci */ 65662306a36Sopenharmony_ci if (last_timeout && (exit_mask || acct->nr_workers > 1)) { 65762306a36Sopenharmony_ci acct->nr_workers--; 65862306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 65962306a36Sopenharmony_ci __set_current_state(TASK_RUNNING); 66062306a36Sopenharmony_ci break; 66162306a36Sopenharmony_ci } 66262306a36Sopenharmony_ci last_timeout = false; 66362306a36Sopenharmony_ci __io_worker_idle(wq, worker); 66462306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 66562306a36Sopenharmony_ci if (io_run_task_work()) 66662306a36Sopenharmony_ci continue; 66762306a36Sopenharmony_ci ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 66862306a36Sopenharmony_ci if (signal_pending(current)) { 66962306a36Sopenharmony_ci struct ksignal ksig; 67062306a36Sopenharmony_ci 67162306a36Sopenharmony_ci if (!get_signal(&ksig)) 67262306a36Sopenharmony_ci continue; 67362306a36Sopenharmony_ci break; 67462306a36Sopenharmony_ci } 67562306a36Sopenharmony_ci if (!ret) { 67662306a36Sopenharmony_ci last_timeout = true; 67762306a36Sopenharmony_ci exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), 67862306a36Sopenharmony_ci wq->cpu_mask); 67962306a36Sopenharmony_ci } 68062306a36Sopenharmony_ci } 68162306a36Sopenharmony_ci 68262306a36Sopenharmony_ci if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct)) 68362306a36Sopenharmony_ci io_worker_handle_work(acct, worker); 68462306a36Sopenharmony_ci 68562306a36Sopenharmony_ci io_worker_exit(worker); 68662306a36Sopenharmony_ci return 0; 68762306a36Sopenharmony_ci} 68862306a36Sopenharmony_ci 68962306a36Sopenharmony_ci/* 69062306a36Sopenharmony_ci * Called when a worker is scheduled in. Mark us as currently running. 69162306a36Sopenharmony_ci */ 69262306a36Sopenharmony_civoid io_wq_worker_running(struct task_struct *tsk) 69362306a36Sopenharmony_ci{ 69462306a36Sopenharmony_ci struct io_worker *worker = tsk->worker_private; 69562306a36Sopenharmony_ci 69662306a36Sopenharmony_ci if (!worker) 69762306a36Sopenharmony_ci return; 69862306a36Sopenharmony_ci if (!(worker->flags & IO_WORKER_F_UP)) 69962306a36Sopenharmony_ci return; 70062306a36Sopenharmony_ci if (worker->flags & IO_WORKER_F_RUNNING) 70162306a36Sopenharmony_ci return; 70262306a36Sopenharmony_ci worker->flags |= IO_WORKER_F_RUNNING; 70362306a36Sopenharmony_ci io_wq_inc_running(worker); 70462306a36Sopenharmony_ci} 70562306a36Sopenharmony_ci 70662306a36Sopenharmony_ci/* 70762306a36Sopenharmony_ci * Called when worker is going to sleep. If there are no workers currently 70862306a36Sopenharmony_ci * running and we have work pending, wake up a free one or create a new one. 70962306a36Sopenharmony_ci */ 71062306a36Sopenharmony_civoid io_wq_worker_sleeping(struct task_struct *tsk) 71162306a36Sopenharmony_ci{ 71262306a36Sopenharmony_ci struct io_worker *worker = tsk->worker_private; 71362306a36Sopenharmony_ci 71462306a36Sopenharmony_ci if (!worker) 71562306a36Sopenharmony_ci return; 71662306a36Sopenharmony_ci if (!(worker->flags & IO_WORKER_F_UP)) 71762306a36Sopenharmony_ci return; 71862306a36Sopenharmony_ci if (!(worker->flags & IO_WORKER_F_RUNNING)) 71962306a36Sopenharmony_ci return; 72062306a36Sopenharmony_ci 72162306a36Sopenharmony_ci worker->flags &= ~IO_WORKER_F_RUNNING; 72262306a36Sopenharmony_ci io_wq_dec_running(worker); 72362306a36Sopenharmony_ci} 72462306a36Sopenharmony_ci 72562306a36Sopenharmony_cistatic void io_init_new_worker(struct io_wq *wq, struct io_worker *worker, 72662306a36Sopenharmony_ci struct task_struct *tsk) 72762306a36Sopenharmony_ci{ 72862306a36Sopenharmony_ci tsk->worker_private = worker; 72962306a36Sopenharmony_ci worker->task = tsk; 73062306a36Sopenharmony_ci set_cpus_allowed_ptr(tsk, wq->cpu_mask); 73162306a36Sopenharmony_ci 73262306a36Sopenharmony_ci raw_spin_lock(&wq->lock); 73362306a36Sopenharmony_ci hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); 73462306a36Sopenharmony_ci list_add_tail_rcu(&worker->all_list, &wq->all_list); 73562306a36Sopenharmony_ci worker->flags |= IO_WORKER_F_FREE; 73662306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 73762306a36Sopenharmony_ci wake_up_new_task(tsk); 73862306a36Sopenharmony_ci} 73962306a36Sopenharmony_ci 74062306a36Sopenharmony_cistatic bool io_wq_work_match_all(struct io_wq_work *work, void *data) 74162306a36Sopenharmony_ci{ 74262306a36Sopenharmony_ci return true; 74362306a36Sopenharmony_ci} 74462306a36Sopenharmony_ci 74562306a36Sopenharmony_cistatic inline bool io_should_retry_thread(long err) 74662306a36Sopenharmony_ci{ 74762306a36Sopenharmony_ci /* 74862306a36Sopenharmony_ci * Prevent perpetual task_work retry, if the task (or its group) is 74962306a36Sopenharmony_ci * exiting. 75062306a36Sopenharmony_ci */ 75162306a36Sopenharmony_ci if (fatal_signal_pending(current)) 75262306a36Sopenharmony_ci return false; 75362306a36Sopenharmony_ci 75462306a36Sopenharmony_ci switch (err) { 75562306a36Sopenharmony_ci case -EAGAIN: 75662306a36Sopenharmony_ci case -ERESTARTSYS: 75762306a36Sopenharmony_ci case -ERESTARTNOINTR: 75862306a36Sopenharmony_ci case -ERESTARTNOHAND: 75962306a36Sopenharmony_ci return true; 76062306a36Sopenharmony_ci default: 76162306a36Sopenharmony_ci return false; 76262306a36Sopenharmony_ci } 76362306a36Sopenharmony_ci} 76462306a36Sopenharmony_ci 76562306a36Sopenharmony_cistatic void create_worker_cont(struct callback_head *cb) 76662306a36Sopenharmony_ci{ 76762306a36Sopenharmony_ci struct io_worker *worker; 76862306a36Sopenharmony_ci struct task_struct *tsk; 76962306a36Sopenharmony_ci struct io_wq *wq; 77062306a36Sopenharmony_ci 77162306a36Sopenharmony_ci worker = container_of(cb, struct io_worker, create_work); 77262306a36Sopenharmony_ci clear_bit_unlock(0, &worker->create_state); 77362306a36Sopenharmony_ci wq = worker->wq; 77462306a36Sopenharmony_ci tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 77562306a36Sopenharmony_ci if (!IS_ERR(tsk)) { 77662306a36Sopenharmony_ci io_init_new_worker(wq, worker, tsk); 77762306a36Sopenharmony_ci io_worker_release(worker); 77862306a36Sopenharmony_ci return; 77962306a36Sopenharmony_ci } else if (!io_should_retry_thread(PTR_ERR(tsk))) { 78062306a36Sopenharmony_ci struct io_wq_acct *acct = io_wq_get_acct(worker); 78162306a36Sopenharmony_ci 78262306a36Sopenharmony_ci atomic_dec(&acct->nr_running); 78362306a36Sopenharmony_ci raw_spin_lock(&wq->lock); 78462306a36Sopenharmony_ci acct->nr_workers--; 78562306a36Sopenharmony_ci if (!acct->nr_workers) { 78662306a36Sopenharmony_ci struct io_cb_cancel_data match = { 78762306a36Sopenharmony_ci .fn = io_wq_work_match_all, 78862306a36Sopenharmony_ci .cancel_all = true, 78962306a36Sopenharmony_ci }; 79062306a36Sopenharmony_ci 79162306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 79262306a36Sopenharmony_ci while (io_acct_cancel_pending_work(wq, acct, &match)) 79362306a36Sopenharmony_ci ; 79462306a36Sopenharmony_ci } else { 79562306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 79662306a36Sopenharmony_ci } 79762306a36Sopenharmony_ci io_worker_ref_put(wq); 79862306a36Sopenharmony_ci kfree(worker); 79962306a36Sopenharmony_ci return; 80062306a36Sopenharmony_ci } 80162306a36Sopenharmony_ci 80262306a36Sopenharmony_ci /* re-create attempts grab a new worker ref, drop the existing one */ 80362306a36Sopenharmony_ci io_worker_release(worker); 80462306a36Sopenharmony_ci schedule_work(&worker->work); 80562306a36Sopenharmony_ci} 80662306a36Sopenharmony_ci 80762306a36Sopenharmony_cistatic void io_workqueue_create(struct work_struct *work) 80862306a36Sopenharmony_ci{ 80962306a36Sopenharmony_ci struct io_worker *worker = container_of(work, struct io_worker, work); 81062306a36Sopenharmony_ci struct io_wq_acct *acct = io_wq_get_acct(worker); 81162306a36Sopenharmony_ci 81262306a36Sopenharmony_ci if (!io_queue_worker_create(worker, acct, create_worker_cont)) 81362306a36Sopenharmony_ci kfree(worker); 81462306a36Sopenharmony_ci} 81562306a36Sopenharmony_ci 81662306a36Sopenharmony_cistatic bool create_io_worker(struct io_wq *wq, int index) 81762306a36Sopenharmony_ci{ 81862306a36Sopenharmony_ci struct io_wq_acct *acct = &wq->acct[index]; 81962306a36Sopenharmony_ci struct io_worker *worker; 82062306a36Sopenharmony_ci struct task_struct *tsk; 82162306a36Sopenharmony_ci 82262306a36Sopenharmony_ci __set_current_state(TASK_RUNNING); 82362306a36Sopenharmony_ci 82462306a36Sopenharmony_ci worker = kzalloc(sizeof(*worker), GFP_KERNEL); 82562306a36Sopenharmony_ci if (!worker) { 82662306a36Sopenharmony_cifail: 82762306a36Sopenharmony_ci atomic_dec(&acct->nr_running); 82862306a36Sopenharmony_ci raw_spin_lock(&wq->lock); 82962306a36Sopenharmony_ci acct->nr_workers--; 83062306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 83162306a36Sopenharmony_ci io_worker_ref_put(wq); 83262306a36Sopenharmony_ci return false; 83362306a36Sopenharmony_ci } 83462306a36Sopenharmony_ci 83562306a36Sopenharmony_ci refcount_set(&worker->ref, 1); 83662306a36Sopenharmony_ci worker->wq = wq; 83762306a36Sopenharmony_ci raw_spin_lock_init(&worker->lock); 83862306a36Sopenharmony_ci init_completion(&worker->ref_done); 83962306a36Sopenharmony_ci 84062306a36Sopenharmony_ci if (index == IO_WQ_ACCT_BOUND) 84162306a36Sopenharmony_ci worker->flags |= IO_WORKER_F_BOUND; 84262306a36Sopenharmony_ci 84362306a36Sopenharmony_ci tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 84462306a36Sopenharmony_ci if (!IS_ERR(tsk)) { 84562306a36Sopenharmony_ci io_init_new_worker(wq, worker, tsk); 84662306a36Sopenharmony_ci } else if (!io_should_retry_thread(PTR_ERR(tsk))) { 84762306a36Sopenharmony_ci kfree(worker); 84862306a36Sopenharmony_ci goto fail; 84962306a36Sopenharmony_ci } else { 85062306a36Sopenharmony_ci INIT_WORK(&worker->work, io_workqueue_create); 85162306a36Sopenharmony_ci schedule_work(&worker->work); 85262306a36Sopenharmony_ci } 85362306a36Sopenharmony_ci 85462306a36Sopenharmony_ci return true; 85562306a36Sopenharmony_ci} 85662306a36Sopenharmony_ci 85762306a36Sopenharmony_ci/* 85862306a36Sopenharmony_ci * Iterate the passed in list and call the specific function for each 85962306a36Sopenharmony_ci * worker that isn't exiting 86062306a36Sopenharmony_ci */ 86162306a36Sopenharmony_cistatic bool io_wq_for_each_worker(struct io_wq *wq, 86262306a36Sopenharmony_ci bool (*func)(struct io_worker *, void *), 86362306a36Sopenharmony_ci void *data) 86462306a36Sopenharmony_ci{ 86562306a36Sopenharmony_ci struct io_worker *worker; 86662306a36Sopenharmony_ci bool ret = false; 86762306a36Sopenharmony_ci 86862306a36Sopenharmony_ci list_for_each_entry_rcu(worker, &wq->all_list, all_list) { 86962306a36Sopenharmony_ci if (io_worker_get(worker)) { 87062306a36Sopenharmony_ci /* no task if node is/was offline */ 87162306a36Sopenharmony_ci if (worker->task) 87262306a36Sopenharmony_ci ret = func(worker, data); 87362306a36Sopenharmony_ci io_worker_release(worker); 87462306a36Sopenharmony_ci if (ret) 87562306a36Sopenharmony_ci break; 87662306a36Sopenharmony_ci } 87762306a36Sopenharmony_ci } 87862306a36Sopenharmony_ci 87962306a36Sopenharmony_ci return ret; 88062306a36Sopenharmony_ci} 88162306a36Sopenharmony_ci 88262306a36Sopenharmony_cistatic bool io_wq_worker_wake(struct io_worker *worker, void *data) 88362306a36Sopenharmony_ci{ 88462306a36Sopenharmony_ci __set_notify_signal(worker->task); 88562306a36Sopenharmony_ci wake_up_process(worker->task); 88662306a36Sopenharmony_ci return false; 88762306a36Sopenharmony_ci} 88862306a36Sopenharmony_ci 88962306a36Sopenharmony_cistatic void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) 89062306a36Sopenharmony_ci{ 89162306a36Sopenharmony_ci do { 89262306a36Sopenharmony_ci work->flags |= IO_WQ_WORK_CANCEL; 89362306a36Sopenharmony_ci wq->do_work(work); 89462306a36Sopenharmony_ci work = wq->free_work(work); 89562306a36Sopenharmony_ci } while (work); 89662306a36Sopenharmony_ci} 89762306a36Sopenharmony_ci 89862306a36Sopenharmony_cistatic void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work) 89962306a36Sopenharmony_ci{ 90062306a36Sopenharmony_ci struct io_wq_acct *acct = io_work_get_acct(wq, work); 90162306a36Sopenharmony_ci unsigned int hash; 90262306a36Sopenharmony_ci struct io_wq_work *tail; 90362306a36Sopenharmony_ci 90462306a36Sopenharmony_ci if (!io_wq_is_hashed(work)) { 90562306a36Sopenharmony_ciappend: 90662306a36Sopenharmony_ci wq_list_add_tail(&work->list, &acct->work_list); 90762306a36Sopenharmony_ci return; 90862306a36Sopenharmony_ci } 90962306a36Sopenharmony_ci 91062306a36Sopenharmony_ci hash = io_get_work_hash(work); 91162306a36Sopenharmony_ci tail = wq->hash_tail[hash]; 91262306a36Sopenharmony_ci wq->hash_tail[hash] = work; 91362306a36Sopenharmony_ci if (!tail) 91462306a36Sopenharmony_ci goto append; 91562306a36Sopenharmony_ci 91662306a36Sopenharmony_ci wq_list_add_after(&work->list, &tail->list, &acct->work_list); 91762306a36Sopenharmony_ci} 91862306a36Sopenharmony_ci 91962306a36Sopenharmony_cistatic bool io_wq_work_match_item(struct io_wq_work *work, void *data) 92062306a36Sopenharmony_ci{ 92162306a36Sopenharmony_ci return work == data; 92262306a36Sopenharmony_ci} 92362306a36Sopenharmony_ci 92462306a36Sopenharmony_civoid io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 92562306a36Sopenharmony_ci{ 92662306a36Sopenharmony_ci struct io_wq_acct *acct = io_work_get_acct(wq, work); 92762306a36Sopenharmony_ci struct io_cb_cancel_data match; 92862306a36Sopenharmony_ci unsigned work_flags = work->flags; 92962306a36Sopenharmony_ci bool do_create; 93062306a36Sopenharmony_ci 93162306a36Sopenharmony_ci /* 93262306a36Sopenharmony_ci * If io-wq is exiting for this task, or if the request has explicitly 93362306a36Sopenharmony_ci * been marked as one that should not get executed, cancel it here. 93462306a36Sopenharmony_ci */ 93562306a36Sopenharmony_ci if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 93662306a36Sopenharmony_ci (work->flags & IO_WQ_WORK_CANCEL)) { 93762306a36Sopenharmony_ci io_run_cancel(work, wq); 93862306a36Sopenharmony_ci return; 93962306a36Sopenharmony_ci } 94062306a36Sopenharmony_ci 94162306a36Sopenharmony_ci raw_spin_lock(&acct->lock); 94262306a36Sopenharmony_ci io_wq_insert_work(wq, work); 94362306a36Sopenharmony_ci clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 94462306a36Sopenharmony_ci raw_spin_unlock(&acct->lock); 94562306a36Sopenharmony_ci 94662306a36Sopenharmony_ci rcu_read_lock(); 94762306a36Sopenharmony_ci do_create = !io_wq_activate_free_worker(wq, acct); 94862306a36Sopenharmony_ci rcu_read_unlock(); 94962306a36Sopenharmony_ci 95062306a36Sopenharmony_ci if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || 95162306a36Sopenharmony_ci !atomic_read(&acct->nr_running))) { 95262306a36Sopenharmony_ci bool did_create; 95362306a36Sopenharmony_ci 95462306a36Sopenharmony_ci did_create = io_wq_create_worker(wq, acct); 95562306a36Sopenharmony_ci if (likely(did_create)) 95662306a36Sopenharmony_ci return; 95762306a36Sopenharmony_ci 95862306a36Sopenharmony_ci raw_spin_lock(&wq->lock); 95962306a36Sopenharmony_ci if (acct->nr_workers) { 96062306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 96162306a36Sopenharmony_ci return; 96262306a36Sopenharmony_ci } 96362306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 96462306a36Sopenharmony_ci 96562306a36Sopenharmony_ci /* fatal condition, failed to create the first worker */ 96662306a36Sopenharmony_ci match.fn = io_wq_work_match_item, 96762306a36Sopenharmony_ci match.data = work, 96862306a36Sopenharmony_ci match.cancel_all = false, 96962306a36Sopenharmony_ci 97062306a36Sopenharmony_ci io_acct_cancel_pending_work(wq, acct, &match); 97162306a36Sopenharmony_ci } 97262306a36Sopenharmony_ci} 97362306a36Sopenharmony_ci 97462306a36Sopenharmony_ci/* 97562306a36Sopenharmony_ci * Work items that hash to the same value will not be done in parallel. 97662306a36Sopenharmony_ci * Used to limit concurrent writes, generally hashed by inode. 97762306a36Sopenharmony_ci */ 97862306a36Sopenharmony_civoid io_wq_hash_work(struct io_wq_work *work, void *val) 97962306a36Sopenharmony_ci{ 98062306a36Sopenharmony_ci unsigned int bit; 98162306a36Sopenharmony_ci 98262306a36Sopenharmony_ci bit = hash_ptr(val, IO_WQ_HASH_ORDER); 98362306a36Sopenharmony_ci work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); 98462306a36Sopenharmony_ci} 98562306a36Sopenharmony_ci 98662306a36Sopenharmony_cistatic bool __io_wq_worker_cancel(struct io_worker *worker, 98762306a36Sopenharmony_ci struct io_cb_cancel_data *match, 98862306a36Sopenharmony_ci struct io_wq_work *work) 98962306a36Sopenharmony_ci{ 99062306a36Sopenharmony_ci if (work && match->fn(work, match->data)) { 99162306a36Sopenharmony_ci work->flags |= IO_WQ_WORK_CANCEL; 99262306a36Sopenharmony_ci __set_notify_signal(worker->task); 99362306a36Sopenharmony_ci return true; 99462306a36Sopenharmony_ci } 99562306a36Sopenharmony_ci 99662306a36Sopenharmony_ci return false; 99762306a36Sopenharmony_ci} 99862306a36Sopenharmony_ci 99962306a36Sopenharmony_cistatic bool io_wq_worker_cancel(struct io_worker *worker, void *data) 100062306a36Sopenharmony_ci{ 100162306a36Sopenharmony_ci struct io_cb_cancel_data *match = data; 100262306a36Sopenharmony_ci 100362306a36Sopenharmony_ci /* 100462306a36Sopenharmony_ci * Hold the lock to avoid ->cur_work going out of scope, caller 100562306a36Sopenharmony_ci * may dereference the passed in work. 100662306a36Sopenharmony_ci */ 100762306a36Sopenharmony_ci raw_spin_lock(&worker->lock); 100862306a36Sopenharmony_ci if (__io_wq_worker_cancel(worker, match, worker->cur_work) || 100962306a36Sopenharmony_ci __io_wq_worker_cancel(worker, match, worker->next_work)) 101062306a36Sopenharmony_ci match->nr_running++; 101162306a36Sopenharmony_ci raw_spin_unlock(&worker->lock); 101262306a36Sopenharmony_ci 101362306a36Sopenharmony_ci return match->nr_running && !match->cancel_all; 101462306a36Sopenharmony_ci} 101562306a36Sopenharmony_ci 101662306a36Sopenharmony_cistatic inline void io_wq_remove_pending(struct io_wq *wq, 101762306a36Sopenharmony_ci struct io_wq_work *work, 101862306a36Sopenharmony_ci struct io_wq_work_node *prev) 101962306a36Sopenharmony_ci{ 102062306a36Sopenharmony_ci struct io_wq_acct *acct = io_work_get_acct(wq, work); 102162306a36Sopenharmony_ci unsigned int hash = io_get_work_hash(work); 102262306a36Sopenharmony_ci struct io_wq_work *prev_work = NULL; 102362306a36Sopenharmony_ci 102462306a36Sopenharmony_ci if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { 102562306a36Sopenharmony_ci if (prev) 102662306a36Sopenharmony_ci prev_work = container_of(prev, struct io_wq_work, list); 102762306a36Sopenharmony_ci if (prev_work && io_get_work_hash(prev_work) == hash) 102862306a36Sopenharmony_ci wq->hash_tail[hash] = prev_work; 102962306a36Sopenharmony_ci else 103062306a36Sopenharmony_ci wq->hash_tail[hash] = NULL; 103162306a36Sopenharmony_ci } 103262306a36Sopenharmony_ci wq_list_del(&acct->work_list, &work->list, prev); 103362306a36Sopenharmony_ci} 103462306a36Sopenharmony_ci 103562306a36Sopenharmony_cistatic bool io_acct_cancel_pending_work(struct io_wq *wq, 103662306a36Sopenharmony_ci struct io_wq_acct *acct, 103762306a36Sopenharmony_ci struct io_cb_cancel_data *match) 103862306a36Sopenharmony_ci{ 103962306a36Sopenharmony_ci struct io_wq_work_node *node, *prev; 104062306a36Sopenharmony_ci struct io_wq_work *work; 104162306a36Sopenharmony_ci 104262306a36Sopenharmony_ci raw_spin_lock(&acct->lock); 104362306a36Sopenharmony_ci wq_list_for_each(node, prev, &acct->work_list) { 104462306a36Sopenharmony_ci work = container_of(node, struct io_wq_work, list); 104562306a36Sopenharmony_ci if (!match->fn(work, match->data)) 104662306a36Sopenharmony_ci continue; 104762306a36Sopenharmony_ci io_wq_remove_pending(wq, work, prev); 104862306a36Sopenharmony_ci raw_spin_unlock(&acct->lock); 104962306a36Sopenharmony_ci io_run_cancel(work, wq); 105062306a36Sopenharmony_ci match->nr_pending++; 105162306a36Sopenharmony_ci /* not safe to continue after unlock */ 105262306a36Sopenharmony_ci return true; 105362306a36Sopenharmony_ci } 105462306a36Sopenharmony_ci raw_spin_unlock(&acct->lock); 105562306a36Sopenharmony_ci 105662306a36Sopenharmony_ci return false; 105762306a36Sopenharmony_ci} 105862306a36Sopenharmony_ci 105962306a36Sopenharmony_cistatic void io_wq_cancel_pending_work(struct io_wq *wq, 106062306a36Sopenharmony_ci struct io_cb_cancel_data *match) 106162306a36Sopenharmony_ci{ 106262306a36Sopenharmony_ci int i; 106362306a36Sopenharmony_ciretry: 106462306a36Sopenharmony_ci for (i = 0; i < IO_WQ_ACCT_NR; i++) { 106562306a36Sopenharmony_ci struct io_wq_acct *acct = io_get_acct(wq, i == 0); 106662306a36Sopenharmony_ci 106762306a36Sopenharmony_ci if (io_acct_cancel_pending_work(wq, acct, match)) { 106862306a36Sopenharmony_ci if (match->cancel_all) 106962306a36Sopenharmony_ci goto retry; 107062306a36Sopenharmony_ci break; 107162306a36Sopenharmony_ci } 107262306a36Sopenharmony_ci } 107362306a36Sopenharmony_ci} 107462306a36Sopenharmony_ci 107562306a36Sopenharmony_cistatic void io_wq_cancel_running_work(struct io_wq *wq, 107662306a36Sopenharmony_ci struct io_cb_cancel_data *match) 107762306a36Sopenharmony_ci{ 107862306a36Sopenharmony_ci rcu_read_lock(); 107962306a36Sopenharmony_ci io_wq_for_each_worker(wq, io_wq_worker_cancel, match); 108062306a36Sopenharmony_ci rcu_read_unlock(); 108162306a36Sopenharmony_ci} 108262306a36Sopenharmony_ci 108362306a36Sopenharmony_cienum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 108462306a36Sopenharmony_ci void *data, bool cancel_all) 108562306a36Sopenharmony_ci{ 108662306a36Sopenharmony_ci struct io_cb_cancel_data match = { 108762306a36Sopenharmony_ci .fn = cancel, 108862306a36Sopenharmony_ci .data = data, 108962306a36Sopenharmony_ci .cancel_all = cancel_all, 109062306a36Sopenharmony_ci }; 109162306a36Sopenharmony_ci 109262306a36Sopenharmony_ci /* 109362306a36Sopenharmony_ci * First check pending list, if we're lucky we can just remove it 109462306a36Sopenharmony_ci * from there. CANCEL_OK means that the work is returned as-new, 109562306a36Sopenharmony_ci * no completion will be posted for it. 109662306a36Sopenharmony_ci * 109762306a36Sopenharmony_ci * Then check if a free (going busy) or busy worker has the work 109862306a36Sopenharmony_ci * currently running. If we find it there, we'll return CANCEL_RUNNING 109962306a36Sopenharmony_ci * as an indication that we attempt to signal cancellation. The 110062306a36Sopenharmony_ci * completion will run normally in this case. 110162306a36Sopenharmony_ci * 110262306a36Sopenharmony_ci * Do both of these while holding the wq->lock, to ensure that 110362306a36Sopenharmony_ci * we'll find a work item regardless of state. 110462306a36Sopenharmony_ci */ 110562306a36Sopenharmony_ci io_wq_cancel_pending_work(wq, &match); 110662306a36Sopenharmony_ci if (match.nr_pending && !match.cancel_all) 110762306a36Sopenharmony_ci return IO_WQ_CANCEL_OK; 110862306a36Sopenharmony_ci 110962306a36Sopenharmony_ci raw_spin_lock(&wq->lock); 111062306a36Sopenharmony_ci io_wq_cancel_running_work(wq, &match); 111162306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 111262306a36Sopenharmony_ci if (match.nr_running && !match.cancel_all) 111362306a36Sopenharmony_ci return IO_WQ_CANCEL_RUNNING; 111462306a36Sopenharmony_ci 111562306a36Sopenharmony_ci if (match.nr_running) 111662306a36Sopenharmony_ci return IO_WQ_CANCEL_RUNNING; 111762306a36Sopenharmony_ci if (match.nr_pending) 111862306a36Sopenharmony_ci return IO_WQ_CANCEL_OK; 111962306a36Sopenharmony_ci return IO_WQ_CANCEL_NOTFOUND; 112062306a36Sopenharmony_ci} 112162306a36Sopenharmony_ci 112262306a36Sopenharmony_cistatic int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, 112362306a36Sopenharmony_ci int sync, void *key) 112462306a36Sopenharmony_ci{ 112562306a36Sopenharmony_ci struct io_wq *wq = container_of(wait, struct io_wq, wait); 112662306a36Sopenharmony_ci int i; 112762306a36Sopenharmony_ci 112862306a36Sopenharmony_ci list_del_init(&wait->entry); 112962306a36Sopenharmony_ci 113062306a36Sopenharmony_ci rcu_read_lock(); 113162306a36Sopenharmony_ci for (i = 0; i < IO_WQ_ACCT_NR; i++) { 113262306a36Sopenharmony_ci struct io_wq_acct *acct = &wq->acct[i]; 113362306a36Sopenharmony_ci 113462306a36Sopenharmony_ci if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 113562306a36Sopenharmony_ci io_wq_activate_free_worker(wq, acct); 113662306a36Sopenharmony_ci } 113762306a36Sopenharmony_ci rcu_read_unlock(); 113862306a36Sopenharmony_ci return 1; 113962306a36Sopenharmony_ci} 114062306a36Sopenharmony_ci 114162306a36Sopenharmony_cistruct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 114262306a36Sopenharmony_ci{ 114362306a36Sopenharmony_ci int ret, i; 114462306a36Sopenharmony_ci struct io_wq *wq; 114562306a36Sopenharmony_ci 114662306a36Sopenharmony_ci if (WARN_ON_ONCE(!data->free_work || !data->do_work)) 114762306a36Sopenharmony_ci return ERR_PTR(-EINVAL); 114862306a36Sopenharmony_ci if (WARN_ON_ONCE(!bounded)) 114962306a36Sopenharmony_ci return ERR_PTR(-EINVAL); 115062306a36Sopenharmony_ci 115162306a36Sopenharmony_ci wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); 115262306a36Sopenharmony_ci if (!wq) 115362306a36Sopenharmony_ci return ERR_PTR(-ENOMEM); 115462306a36Sopenharmony_ci 115562306a36Sopenharmony_ci refcount_inc(&data->hash->refs); 115662306a36Sopenharmony_ci wq->hash = data->hash; 115762306a36Sopenharmony_ci wq->free_work = data->free_work; 115862306a36Sopenharmony_ci wq->do_work = data->do_work; 115962306a36Sopenharmony_ci 116062306a36Sopenharmony_ci ret = -ENOMEM; 116162306a36Sopenharmony_ci 116262306a36Sopenharmony_ci if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) 116362306a36Sopenharmony_ci goto err; 116462306a36Sopenharmony_ci cpumask_copy(wq->cpu_mask, cpu_possible_mask); 116562306a36Sopenharmony_ci wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 116662306a36Sopenharmony_ci wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = 116762306a36Sopenharmony_ci task_rlimit(current, RLIMIT_NPROC); 116862306a36Sopenharmony_ci INIT_LIST_HEAD(&wq->wait.entry); 116962306a36Sopenharmony_ci wq->wait.func = io_wq_hash_wake; 117062306a36Sopenharmony_ci for (i = 0; i < IO_WQ_ACCT_NR; i++) { 117162306a36Sopenharmony_ci struct io_wq_acct *acct = &wq->acct[i]; 117262306a36Sopenharmony_ci 117362306a36Sopenharmony_ci acct->index = i; 117462306a36Sopenharmony_ci atomic_set(&acct->nr_running, 0); 117562306a36Sopenharmony_ci INIT_WQ_LIST(&acct->work_list); 117662306a36Sopenharmony_ci raw_spin_lock_init(&acct->lock); 117762306a36Sopenharmony_ci } 117862306a36Sopenharmony_ci 117962306a36Sopenharmony_ci raw_spin_lock_init(&wq->lock); 118062306a36Sopenharmony_ci INIT_HLIST_NULLS_HEAD(&wq->free_list, 0); 118162306a36Sopenharmony_ci INIT_LIST_HEAD(&wq->all_list); 118262306a36Sopenharmony_ci 118362306a36Sopenharmony_ci wq->task = get_task_struct(data->task); 118462306a36Sopenharmony_ci atomic_set(&wq->worker_refs, 1); 118562306a36Sopenharmony_ci init_completion(&wq->worker_done); 118662306a36Sopenharmony_ci ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 118762306a36Sopenharmony_ci if (ret) 118862306a36Sopenharmony_ci goto err; 118962306a36Sopenharmony_ci 119062306a36Sopenharmony_ci return wq; 119162306a36Sopenharmony_cierr: 119262306a36Sopenharmony_ci io_wq_put_hash(data->hash); 119362306a36Sopenharmony_ci free_cpumask_var(wq->cpu_mask); 119462306a36Sopenharmony_ci kfree(wq); 119562306a36Sopenharmony_ci return ERR_PTR(ret); 119662306a36Sopenharmony_ci} 119762306a36Sopenharmony_ci 119862306a36Sopenharmony_cistatic bool io_task_work_match(struct callback_head *cb, void *data) 119962306a36Sopenharmony_ci{ 120062306a36Sopenharmony_ci struct io_worker *worker; 120162306a36Sopenharmony_ci 120262306a36Sopenharmony_ci if (cb->func != create_worker_cb && cb->func != create_worker_cont) 120362306a36Sopenharmony_ci return false; 120462306a36Sopenharmony_ci worker = container_of(cb, struct io_worker, create_work); 120562306a36Sopenharmony_ci return worker->wq == data; 120662306a36Sopenharmony_ci} 120762306a36Sopenharmony_ci 120862306a36Sopenharmony_civoid io_wq_exit_start(struct io_wq *wq) 120962306a36Sopenharmony_ci{ 121062306a36Sopenharmony_ci set_bit(IO_WQ_BIT_EXIT, &wq->state); 121162306a36Sopenharmony_ci} 121262306a36Sopenharmony_ci 121362306a36Sopenharmony_cistatic void io_wq_cancel_tw_create(struct io_wq *wq) 121462306a36Sopenharmony_ci{ 121562306a36Sopenharmony_ci struct callback_head *cb; 121662306a36Sopenharmony_ci 121762306a36Sopenharmony_ci while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 121862306a36Sopenharmony_ci struct io_worker *worker; 121962306a36Sopenharmony_ci 122062306a36Sopenharmony_ci worker = container_of(cb, struct io_worker, create_work); 122162306a36Sopenharmony_ci io_worker_cancel_cb(worker); 122262306a36Sopenharmony_ci /* 122362306a36Sopenharmony_ci * Only the worker continuation helper has worker allocated and 122462306a36Sopenharmony_ci * hence needs freeing. 122562306a36Sopenharmony_ci */ 122662306a36Sopenharmony_ci if (cb->func == create_worker_cont) 122762306a36Sopenharmony_ci kfree(worker); 122862306a36Sopenharmony_ci } 122962306a36Sopenharmony_ci} 123062306a36Sopenharmony_ci 123162306a36Sopenharmony_cistatic void io_wq_exit_workers(struct io_wq *wq) 123262306a36Sopenharmony_ci{ 123362306a36Sopenharmony_ci if (!wq->task) 123462306a36Sopenharmony_ci return; 123562306a36Sopenharmony_ci 123662306a36Sopenharmony_ci io_wq_cancel_tw_create(wq); 123762306a36Sopenharmony_ci 123862306a36Sopenharmony_ci rcu_read_lock(); 123962306a36Sopenharmony_ci io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); 124062306a36Sopenharmony_ci rcu_read_unlock(); 124162306a36Sopenharmony_ci io_worker_ref_put(wq); 124262306a36Sopenharmony_ci wait_for_completion(&wq->worker_done); 124362306a36Sopenharmony_ci 124462306a36Sopenharmony_ci spin_lock_irq(&wq->hash->wait.lock); 124562306a36Sopenharmony_ci list_del_init(&wq->wait.entry); 124662306a36Sopenharmony_ci spin_unlock_irq(&wq->hash->wait.lock); 124762306a36Sopenharmony_ci 124862306a36Sopenharmony_ci put_task_struct(wq->task); 124962306a36Sopenharmony_ci wq->task = NULL; 125062306a36Sopenharmony_ci} 125162306a36Sopenharmony_ci 125262306a36Sopenharmony_cistatic void io_wq_destroy(struct io_wq *wq) 125362306a36Sopenharmony_ci{ 125462306a36Sopenharmony_ci struct io_cb_cancel_data match = { 125562306a36Sopenharmony_ci .fn = io_wq_work_match_all, 125662306a36Sopenharmony_ci .cancel_all = true, 125762306a36Sopenharmony_ci }; 125862306a36Sopenharmony_ci 125962306a36Sopenharmony_ci cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 126062306a36Sopenharmony_ci io_wq_cancel_pending_work(wq, &match); 126162306a36Sopenharmony_ci free_cpumask_var(wq->cpu_mask); 126262306a36Sopenharmony_ci io_wq_put_hash(wq->hash); 126362306a36Sopenharmony_ci kfree(wq); 126462306a36Sopenharmony_ci} 126562306a36Sopenharmony_ci 126662306a36Sopenharmony_civoid io_wq_put_and_exit(struct io_wq *wq) 126762306a36Sopenharmony_ci{ 126862306a36Sopenharmony_ci WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 126962306a36Sopenharmony_ci 127062306a36Sopenharmony_ci io_wq_exit_workers(wq); 127162306a36Sopenharmony_ci io_wq_destroy(wq); 127262306a36Sopenharmony_ci} 127362306a36Sopenharmony_ci 127462306a36Sopenharmony_cistruct online_data { 127562306a36Sopenharmony_ci unsigned int cpu; 127662306a36Sopenharmony_ci bool online; 127762306a36Sopenharmony_ci}; 127862306a36Sopenharmony_ci 127962306a36Sopenharmony_cistatic bool io_wq_worker_affinity(struct io_worker *worker, void *data) 128062306a36Sopenharmony_ci{ 128162306a36Sopenharmony_ci struct online_data *od = data; 128262306a36Sopenharmony_ci 128362306a36Sopenharmony_ci if (od->online) 128462306a36Sopenharmony_ci cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); 128562306a36Sopenharmony_ci else 128662306a36Sopenharmony_ci cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); 128762306a36Sopenharmony_ci return false; 128862306a36Sopenharmony_ci} 128962306a36Sopenharmony_ci 129062306a36Sopenharmony_cistatic int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 129162306a36Sopenharmony_ci{ 129262306a36Sopenharmony_ci struct online_data od = { 129362306a36Sopenharmony_ci .cpu = cpu, 129462306a36Sopenharmony_ci .online = online 129562306a36Sopenharmony_ci }; 129662306a36Sopenharmony_ci 129762306a36Sopenharmony_ci rcu_read_lock(); 129862306a36Sopenharmony_ci io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); 129962306a36Sopenharmony_ci rcu_read_unlock(); 130062306a36Sopenharmony_ci return 0; 130162306a36Sopenharmony_ci} 130262306a36Sopenharmony_ci 130362306a36Sopenharmony_cistatic int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 130462306a36Sopenharmony_ci{ 130562306a36Sopenharmony_ci struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 130662306a36Sopenharmony_ci 130762306a36Sopenharmony_ci return __io_wq_cpu_online(wq, cpu, true); 130862306a36Sopenharmony_ci} 130962306a36Sopenharmony_ci 131062306a36Sopenharmony_cistatic int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 131162306a36Sopenharmony_ci{ 131262306a36Sopenharmony_ci struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 131362306a36Sopenharmony_ci 131462306a36Sopenharmony_ci return __io_wq_cpu_online(wq, cpu, false); 131562306a36Sopenharmony_ci} 131662306a36Sopenharmony_ci 131762306a36Sopenharmony_ciint io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask) 131862306a36Sopenharmony_ci{ 131962306a36Sopenharmony_ci if (!tctx || !tctx->io_wq) 132062306a36Sopenharmony_ci return -EINVAL; 132162306a36Sopenharmony_ci 132262306a36Sopenharmony_ci rcu_read_lock(); 132362306a36Sopenharmony_ci if (mask) 132462306a36Sopenharmony_ci cpumask_copy(tctx->io_wq->cpu_mask, mask); 132562306a36Sopenharmony_ci else 132662306a36Sopenharmony_ci cpumask_copy(tctx->io_wq->cpu_mask, cpu_possible_mask); 132762306a36Sopenharmony_ci rcu_read_unlock(); 132862306a36Sopenharmony_ci 132962306a36Sopenharmony_ci return 0; 133062306a36Sopenharmony_ci} 133162306a36Sopenharmony_ci 133262306a36Sopenharmony_ci/* 133362306a36Sopenharmony_ci * Set max number of unbounded workers, returns old value. If new_count is 0, 133462306a36Sopenharmony_ci * then just return the old value. 133562306a36Sopenharmony_ci */ 133662306a36Sopenharmony_ciint io_wq_max_workers(struct io_wq *wq, int *new_count) 133762306a36Sopenharmony_ci{ 133862306a36Sopenharmony_ci struct io_wq_acct *acct; 133962306a36Sopenharmony_ci int prev[IO_WQ_ACCT_NR]; 134062306a36Sopenharmony_ci int i; 134162306a36Sopenharmony_ci 134262306a36Sopenharmony_ci BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); 134362306a36Sopenharmony_ci BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); 134462306a36Sopenharmony_ci BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); 134562306a36Sopenharmony_ci 134662306a36Sopenharmony_ci for (i = 0; i < IO_WQ_ACCT_NR; i++) { 134762306a36Sopenharmony_ci if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) 134862306a36Sopenharmony_ci new_count[i] = task_rlimit(current, RLIMIT_NPROC); 134962306a36Sopenharmony_ci } 135062306a36Sopenharmony_ci 135162306a36Sopenharmony_ci for (i = 0; i < IO_WQ_ACCT_NR; i++) 135262306a36Sopenharmony_ci prev[i] = 0; 135362306a36Sopenharmony_ci 135462306a36Sopenharmony_ci rcu_read_lock(); 135562306a36Sopenharmony_ci 135662306a36Sopenharmony_ci raw_spin_lock(&wq->lock); 135762306a36Sopenharmony_ci for (i = 0; i < IO_WQ_ACCT_NR; i++) { 135862306a36Sopenharmony_ci acct = &wq->acct[i]; 135962306a36Sopenharmony_ci prev[i] = max_t(int, acct->max_workers, prev[i]); 136062306a36Sopenharmony_ci if (new_count[i]) 136162306a36Sopenharmony_ci acct->max_workers = new_count[i]; 136262306a36Sopenharmony_ci } 136362306a36Sopenharmony_ci raw_spin_unlock(&wq->lock); 136462306a36Sopenharmony_ci rcu_read_unlock(); 136562306a36Sopenharmony_ci 136662306a36Sopenharmony_ci for (i = 0; i < IO_WQ_ACCT_NR; i++) 136762306a36Sopenharmony_ci new_count[i] = prev[i]; 136862306a36Sopenharmony_ci 136962306a36Sopenharmony_ci return 0; 137062306a36Sopenharmony_ci} 137162306a36Sopenharmony_ci 137262306a36Sopenharmony_cistatic __init int io_wq_init(void) 137362306a36Sopenharmony_ci{ 137462306a36Sopenharmony_ci int ret; 137562306a36Sopenharmony_ci 137662306a36Sopenharmony_ci ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 137762306a36Sopenharmony_ci io_wq_cpu_online, io_wq_cpu_offline); 137862306a36Sopenharmony_ci if (ret < 0) 137962306a36Sopenharmony_ci return ret; 138062306a36Sopenharmony_ci io_wq_online = ret; 138162306a36Sopenharmony_ci return 0; 138262306a36Sopenharmony_ci} 138362306a36Sopenharmony_cisubsys_initcall(io_wq_init); 1384