xref: /kernel/linux/linux-6.6/io_uring/io_uring.c (revision 62306a36)
1// SPDX-License-Identifier: GPL-2.0
2/*
3 * Shared application/kernel submission and completion ring pairs, for
4 * supporting fast/efficient IO.
5 *
6 * A note on the read/write ordering memory barriers that are matched between
7 * the application and kernel side.
8 *
9 * After the application reads the CQ ring tail, it must use an
10 * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11 * before writing the tail (using smp_load_acquire to read the tail will
12 * do). It also needs a smp_mb() before updating CQ head (ordering the
13 * entry load(s) with the head store), pairing with an implicit barrier
14 * through a control-dependency in io_get_cqe (smp_store_release to
15 * store head will do). Failure to do so could lead to reading invalid
16 * CQ entries.
17 *
18 * Likewise, the application must use an appropriate smp_wmb() before
19 * writing the SQ tail (ordering SQ entry stores with the tail store),
20 * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21 * to store the tail will do). And it needs a barrier ordering the SQ
22 * head load before writing new SQ entries (smp_load_acquire to read
23 * head will do).
24 *
25 * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26 * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27 * updating the SQ tail; a full memory barrier smp_mb() is needed
28 * between.
29 *
30 * Also see the examples in the liburing library:
31 *
32 *	git://git.kernel.dk/liburing
33 *
34 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35 * from data shared between the kernel and application. This is done both
36 * for ordering purposes, but also to ensure that once a value is loaded from
37 * data that the application could potentially modify, it remains stable.
38 *
39 * Copyright (C) 2018-2019 Jens Axboe
40 * Copyright (c) 2018-2019 Christoph Hellwig
41 */
42#include <linux/kernel.h>
43#include <linux/init.h>
44#include <linux/errno.h>
45#include <linux/syscalls.h>
46#include <net/compat.h>
47#include <linux/refcount.h>
48#include <linux/uio.h>
49#include <linux/bits.h>
50
51#include <linux/sched/signal.h>
52#include <linux/fs.h>
53#include <linux/file.h>
54#include <linux/fdtable.h>
55#include <linux/mm.h>
56#include <linux/mman.h>
57#include <linux/percpu.h>
58#include <linux/slab.h>
59#include <linux/bvec.h>
60#include <linux/net.h>
61#include <net/sock.h>
62#include <net/af_unix.h>
63#include <linux/anon_inodes.h>
64#include <linux/sched/mm.h>
65#include <linux/uaccess.h>
66#include <linux/nospec.h>
67#include <linux/highmem.h>
68#include <linux/fsnotify.h>
69#include <linux/fadvise.h>
70#include <linux/task_work.h>
71#include <linux/io_uring.h>
72#include <linux/audit.h>
73#include <linux/security.h>
74#include <asm/shmparam.h>
75
76#define CREATE_TRACE_POINTS
77#include <trace/events/io_uring.h>
78
79#include <uapi/linux/io_uring.h>
80
81#include "io-wq.h"
82
83#include "io_uring.h"
84#include "opdef.h"
85#include "refs.h"
86#include "tctx.h"
87#include "sqpoll.h"
88#include "fdinfo.h"
89#include "kbuf.h"
90#include "rsrc.h"
91#include "cancel.h"
92#include "net.h"
93#include "notif.h"
94
95#include "timeout.h"
96#include "poll.h"
97#include "rw.h"
98#include "alloc_cache.h"
99
100#define IORING_MAX_ENTRIES	32768
101#define IORING_MAX_CQ_ENTRIES	(2 * IORING_MAX_ENTRIES)
102
103#define IORING_MAX_RESTRICTIONS	(IORING_RESTRICTION_LAST + \
104				 IORING_REGISTER_LAST + IORING_OP_LAST)
105
106#define SQE_COMMON_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_LINK | \
107			  IOSQE_IO_HARDLINK | IOSQE_ASYNC)
108
109#define SQE_VALID_FLAGS	(SQE_COMMON_FLAGS | IOSQE_BUFFER_SELECT | \
110			IOSQE_IO_DRAIN | IOSQE_CQE_SKIP_SUCCESS)
111
112#define IO_REQ_CLEAN_FLAGS (REQ_F_BUFFER_SELECTED | REQ_F_NEED_CLEANUP | \
113				REQ_F_POLLED | REQ_F_INFLIGHT | REQ_F_CREDS | \
114				REQ_F_ASYNC_DATA)
115
116#define IO_REQ_CLEAN_SLOW_FLAGS (REQ_F_REFCOUNT | REQ_F_LINK | REQ_F_HARDLINK |\
117				 IO_REQ_CLEAN_FLAGS)
118
119#define IO_TCTX_REFS_CACHE_NR	(1U << 10)
120
121#define IO_COMPL_BATCH			32
122#define IO_REQ_ALLOC_BATCH		8
123
124enum {
125	IO_CHECK_CQ_OVERFLOW_BIT,
126	IO_CHECK_CQ_DROPPED_BIT,
127};
128
129enum {
130	IO_EVENTFD_OP_SIGNAL_BIT,
131	IO_EVENTFD_OP_FREE_BIT,
132};
133
134struct io_defer_entry {
135	struct list_head	list;
136	struct io_kiocb		*req;
137	u32			seq;
138};
139
140/* requests with any of those set should undergo io_disarm_next() */
141#define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
142#define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)
143
144static bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
145					 struct task_struct *task,
146					 bool cancel_all);
147
148static void io_queue_sqe(struct io_kiocb *req);
149
150struct kmem_cache *req_cachep;
151
152static int __read_mostly sysctl_io_uring_disabled;
153static int __read_mostly sysctl_io_uring_group = -1;
154
155#ifdef CONFIG_SYSCTL
156static struct ctl_table kernel_io_uring_disabled_table[] = {
157	{
158		.procname	= "io_uring_disabled",
159		.data		= &sysctl_io_uring_disabled,
160		.maxlen		= sizeof(sysctl_io_uring_disabled),
161		.mode		= 0644,
162		.proc_handler	= proc_dointvec_minmax,
163		.extra1		= SYSCTL_ZERO,
164		.extra2		= SYSCTL_TWO,
165	},
166	{
167		.procname	= "io_uring_group",
168		.data		= &sysctl_io_uring_group,
169		.maxlen		= sizeof(gid_t),
170		.mode		= 0644,
171		.proc_handler	= proc_dointvec,
172	},
173	{},
174};
175#endif
176
177static inline void io_submit_flush_completions(struct io_ring_ctx *ctx)
178{
179	if (!wq_list_empty(&ctx->submit_state.compl_reqs) ||
180	    ctx->submit_state.cqes_count)
181		__io_submit_flush_completions(ctx);
182}
183
184static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
185{
186	return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
187}
188
189static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx)
190{
191	return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head);
192}
193
194static bool io_match_linked(struct io_kiocb *head)
195{
196	struct io_kiocb *req;
197
198	io_for_each_link(req, head) {
199		if (req->flags & REQ_F_INFLIGHT)
200			return true;
201	}
202	return false;
203}
204
205/*
206 * As io_match_task() but protected against racing with linked timeouts.
207 * User must not hold timeout_lock.
208 */
209bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
210			bool cancel_all)
211{
212	bool matched;
213
214	if (task && head->task != task)
215		return false;
216	if (cancel_all)
217		return true;
218
219	if (head->flags & REQ_F_LINK_TIMEOUT) {
220		struct io_ring_ctx *ctx = head->ctx;
221
222		/* protect against races with linked timeouts */
223		spin_lock_irq(&ctx->timeout_lock);
224		matched = io_match_linked(head);
225		spin_unlock_irq(&ctx->timeout_lock);
226	} else {
227		matched = io_match_linked(head);
228	}
229	return matched;
230}
231
232static inline void req_fail_link_node(struct io_kiocb *req, int res)
233{
234	req_set_fail(req);
235	io_req_set_res(req, res, 0);
236}
237
238static inline void io_req_add_to_cache(struct io_kiocb *req, struct io_ring_ctx *ctx)
239{
240	wq_stack_add_head(&req->comp_list, &ctx->submit_state.free_list);
241}
242
243static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
244{
245	struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
246
247	complete(&ctx->ref_comp);
248}
249
250static __cold void io_fallback_req_func(struct work_struct *work)
251{
252	struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
253						fallback_work.work);
254	struct llist_node *node = llist_del_all(&ctx->fallback_llist);
255	struct io_kiocb *req, *tmp;
256	struct io_tw_state ts = { .locked = true, };
257
258	percpu_ref_get(&ctx->refs);
259	mutex_lock(&ctx->uring_lock);
260	llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
261		req->io_task_work.func(req, &ts);
262	if (WARN_ON_ONCE(!ts.locked))
263		return;
264	io_submit_flush_completions(ctx);
265	mutex_unlock(&ctx->uring_lock);
266	percpu_ref_put(&ctx->refs);
267}
268
269static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits)
270{
271	unsigned hash_buckets = 1U << bits;
272	size_t hash_size = hash_buckets * sizeof(table->hbs[0]);
273
274	table->hbs = kmalloc(hash_size, GFP_KERNEL);
275	if (!table->hbs)
276		return -ENOMEM;
277
278	table->hash_bits = bits;
279	init_hash_table(table, hash_buckets);
280	return 0;
281}
282
283static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
284{
285	struct io_ring_ctx *ctx;
286	int hash_bits;
287
288	ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
289	if (!ctx)
290		return NULL;
291
292	xa_init(&ctx->io_bl_xa);
293
294	/*
295	 * Use 5 bits less than the max cq entries, that should give us around
296	 * 32 entries per hash list if totally full and uniformly spread, but
297	 * don't keep too many buckets to not overconsume memory.
298	 */
299	hash_bits = ilog2(p->cq_entries) - 5;
300	hash_bits = clamp(hash_bits, 1, 8);
301	if (io_alloc_hash_table(&ctx->cancel_table, hash_bits))
302		goto err;
303	if (io_alloc_hash_table(&ctx->cancel_table_locked, hash_bits))
304		goto err;
305	if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
306			    0, GFP_KERNEL))
307		goto err;
308
309	ctx->flags = p->flags;
310	init_waitqueue_head(&ctx->sqo_sq_wait);
311	INIT_LIST_HEAD(&ctx->sqd_list);
312	INIT_LIST_HEAD(&ctx->cq_overflow_list);
313	INIT_LIST_HEAD(&ctx->io_buffers_cache);
314	INIT_HLIST_HEAD(&ctx->io_buf_list);
315	io_alloc_cache_init(&ctx->rsrc_node_cache, IO_NODE_ALLOC_CACHE_MAX,
316			    sizeof(struct io_rsrc_node));
317	io_alloc_cache_init(&ctx->apoll_cache, IO_ALLOC_CACHE_MAX,
318			    sizeof(struct async_poll));
319	io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX,
320			    sizeof(struct io_async_msghdr));
321	init_completion(&ctx->ref_comp);
322	xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
323	mutex_init(&ctx->uring_lock);
324	init_waitqueue_head(&ctx->cq_wait);
325	init_waitqueue_head(&ctx->poll_wq);
326	init_waitqueue_head(&ctx->rsrc_quiesce_wq);
327	spin_lock_init(&ctx->completion_lock);
328	spin_lock_init(&ctx->timeout_lock);
329	INIT_WQ_LIST(&ctx->iopoll_list);
330	INIT_LIST_HEAD(&ctx->io_buffers_pages);
331	INIT_LIST_HEAD(&ctx->io_buffers_comp);
332	INIT_LIST_HEAD(&ctx->defer_list);
333	INIT_LIST_HEAD(&ctx->timeout_list);
334	INIT_LIST_HEAD(&ctx->ltimeout_list);
335	INIT_LIST_HEAD(&ctx->rsrc_ref_list);
336	init_llist_head(&ctx->work_llist);
337	INIT_LIST_HEAD(&ctx->tctx_list);
338	ctx->submit_state.free_list.next = NULL;
339	INIT_WQ_LIST(&ctx->locked_free_list);
340	INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
341	INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
342	return ctx;
343err:
344	kfree(ctx->cancel_table.hbs);
345	kfree(ctx->cancel_table_locked.hbs);
346	kfree(ctx->io_bl);
347	xa_destroy(&ctx->io_bl_xa);
348	kfree(ctx);
349	return NULL;
350}
351
352static void io_account_cq_overflow(struct io_ring_ctx *ctx)
353{
354	struct io_rings *r = ctx->rings;
355
356	WRITE_ONCE(r->cq_overflow, READ_ONCE(r->cq_overflow) + 1);
357	ctx->cq_extra--;
358}
359
360static bool req_need_defer(struct io_kiocb *req, u32 seq)
361{
362	if (unlikely(req->flags & REQ_F_IO_DRAIN)) {
363		struct io_ring_ctx *ctx = req->ctx;
364
365		return seq + READ_ONCE(ctx->cq_extra) != ctx->cached_cq_tail;
366	}
367
368	return false;
369}
370
371static void io_clean_op(struct io_kiocb *req)
372{
373	if (req->flags & REQ_F_BUFFER_SELECTED) {
374		spin_lock(&req->ctx->completion_lock);
375		io_put_kbuf_comp(req);
376		spin_unlock(&req->ctx->completion_lock);
377	}
378
379	if (req->flags & REQ_F_NEED_CLEANUP) {
380		const struct io_cold_def *def = &io_cold_defs[req->opcode];
381
382		if (def->cleanup)
383			def->cleanup(req);
384	}
385	if ((req->flags & REQ_F_POLLED) && req->apoll) {
386		kfree(req->apoll->double_poll);
387		kfree(req->apoll);
388		req->apoll = NULL;
389	}
390	if (req->flags & REQ_F_INFLIGHT) {
391		struct io_uring_task *tctx = req->task->io_uring;
392
393		atomic_dec(&tctx->inflight_tracked);
394	}
395	if (req->flags & REQ_F_CREDS)
396		put_cred(req->creds);
397	if (req->flags & REQ_F_ASYNC_DATA) {
398		kfree(req->async_data);
399		req->async_data = NULL;
400	}
401	req->flags &= ~IO_REQ_CLEAN_FLAGS;
402}
403
404static inline void io_req_track_inflight(struct io_kiocb *req)
405{
406	if (!(req->flags & REQ_F_INFLIGHT)) {
407		req->flags |= REQ_F_INFLIGHT;
408		atomic_inc(&req->task->io_uring->inflight_tracked);
409	}
410}
411
412static struct io_kiocb *__io_prep_linked_timeout(struct io_kiocb *req)
413{
414	if (WARN_ON_ONCE(!req->link))
415		return NULL;
416
417	req->flags &= ~REQ_F_ARM_LTIMEOUT;
418	req->flags |= REQ_F_LINK_TIMEOUT;
419
420	/* linked timeouts should have two refs once prep'ed */
421	io_req_set_refcount(req);
422	__io_req_set_refcount(req->link, 2);
423	return req->link;
424}
425
426static inline struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
427{
428	if (likely(!(req->flags & REQ_F_ARM_LTIMEOUT)))
429		return NULL;
430	return __io_prep_linked_timeout(req);
431}
432
433static noinline void __io_arm_ltimeout(struct io_kiocb *req)
434{
435	io_queue_linked_timeout(__io_prep_linked_timeout(req));
436}
437
438static inline void io_arm_ltimeout(struct io_kiocb *req)
439{
440	if (unlikely(req->flags & REQ_F_ARM_LTIMEOUT))
441		__io_arm_ltimeout(req);
442}
443
444static void io_prep_async_work(struct io_kiocb *req)
445{
446	const struct io_issue_def *def = &io_issue_defs[req->opcode];
447	struct io_ring_ctx *ctx = req->ctx;
448
449	if (!(req->flags & REQ_F_CREDS)) {
450		req->flags |= REQ_F_CREDS;
451		req->creds = get_current_cred();
452	}
453
454	req->work.list.next = NULL;
455	req->work.flags = 0;
456	req->work.cancel_seq = atomic_read(&ctx->cancel_seq);
457	if (req->flags & REQ_F_FORCE_ASYNC)
458		req->work.flags |= IO_WQ_WORK_CONCURRENT;
459
460	if (req->file && !(req->flags & REQ_F_FIXED_FILE))
461		req->flags |= io_file_get_flags(req->file);
462
463	if (req->file && (req->flags & REQ_F_ISREG)) {
464		bool should_hash = def->hash_reg_file;
465
466		/* don't serialize this request if the fs doesn't need it */
467		if (should_hash && (req->file->f_flags & O_DIRECT) &&
468		    (req->file->f_mode & FMODE_DIO_PARALLEL_WRITE))
469			should_hash = false;
470		if (should_hash || (ctx->flags & IORING_SETUP_IOPOLL))
471			io_wq_hash_work(&req->work, file_inode(req->file));
472	} else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) {
473		if (def->unbound_nonreg_file)
474			req->work.flags |= IO_WQ_WORK_UNBOUND;
475	}
476}
477
478static void io_prep_async_link(struct io_kiocb *req)
479{
480	struct io_kiocb *cur;
481
482	if (req->flags & REQ_F_LINK_TIMEOUT) {
483		struct io_ring_ctx *ctx = req->ctx;
484
485		spin_lock_irq(&ctx->timeout_lock);
486		io_for_each_link(cur, req)
487			io_prep_async_work(cur);
488		spin_unlock_irq(&ctx->timeout_lock);
489	} else {
490		io_for_each_link(cur, req)
491			io_prep_async_work(cur);
492	}
493}
494
495void io_queue_iowq(struct io_kiocb *req, struct io_tw_state *ts_dont_use)
496{
497	struct io_kiocb *link = io_prep_linked_timeout(req);
498	struct io_uring_task *tctx = req->task->io_uring;
499
500	BUG_ON(!tctx);
501	BUG_ON(!tctx->io_wq);
502
503	/* init ->work of the whole link before punting */
504	io_prep_async_link(req);
505
506	/*
507	 * Not expected to happen, but if we do have a bug where this _can_
508	 * happen, catch it here and ensure the request is marked as
509	 * canceled. That will make io-wq go through the usual work cancel
510	 * procedure rather than attempt to run this request (or create a new
511	 * worker for it).
512	 */
513	if (WARN_ON_ONCE(!same_thread_group(req->task, current)))
514		req->work.flags |= IO_WQ_WORK_CANCEL;
515
516	trace_io_uring_queue_async_work(req, io_wq_is_hashed(&req->work));
517	io_wq_enqueue(tctx->io_wq, &req->work);
518	if (link)
519		io_queue_linked_timeout(link);
520}
521
522static __cold void io_queue_deferred(struct io_ring_ctx *ctx)
523{
524	while (!list_empty(&ctx->defer_list)) {
525		struct io_defer_entry *de = list_first_entry(&ctx->defer_list,
526						struct io_defer_entry, list);
527
528		if (req_need_defer(de->req, de->seq))
529			break;
530		list_del_init(&de->list);
531		io_req_task_queue(de->req);
532		kfree(de);
533	}
534}
535
536
537static void io_eventfd_ops(struct rcu_head *rcu)
538{
539	struct io_ev_fd *ev_fd = container_of(rcu, struct io_ev_fd, rcu);
540	int ops = atomic_xchg(&ev_fd->ops, 0);
541
542	if (ops & BIT(IO_EVENTFD_OP_SIGNAL_BIT))
543		eventfd_signal_mask(ev_fd->cq_ev_fd, 1, EPOLL_URING_WAKE);
544
545	/* IO_EVENTFD_OP_FREE_BIT may not be set here depending on callback
546	 * ordering in a race but if references are 0 we know we have to free
547	 * it regardless.
548	 */
549	if (atomic_dec_and_test(&ev_fd->refs)) {
550		eventfd_ctx_put(ev_fd->cq_ev_fd);
551		kfree(ev_fd);
552	}
553}
554
555static void io_eventfd_signal(struct io_ring_ctx *ctx)
556{
557	struct io_ev_fd *ev_fd = NULL;
558
559	rcu_read_lock();
560	/*
561	 * rcu_dereference ctx->io_ev_fd once and use it for both for checking
562	 * and eventfd_signal
563	 */
564	ev_fd = rcu_dereference(ctx->io_ev_fd);
565
566	/*
567	 * Check again if ev_fd exists incase an io_eventfd_unregister call
568	 * completed between the NULL check of ctx->io_ev_fd at the start of
569	 * the function and rcu_read_lock.
570	 */
571	if (unlikely(!ev_fd))
572		goto out;
573	if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED)
574		goto out;
575	if (ev_fd->eventfd_async && !io_wq_current_is_worker())
576		goto out;
577
578	if (likely(eventfd_signal_allowed())) {
579		eventfd_signal_mask(ev_fd->cq_ev_fd, 1, EPOLL_URING_WAKE);
580	} else {
581		atomic_inc(&ev_fd->refs);
582		if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_SIGNAL_BIT), &ev_fd->ops))
583			call_rcu_hurry(&ev_fd->rcu, io_eventfd_ops);
584		else
585			atomic_dec(&ev_fd->refs);
586	}
587
588out:
589	rcu_read_unlock();
590}
591
592static void io_eventfd_flush_signal(struct io_ring_ctx *ctx)
593{
594	bool skip;
595
596	spin_lock(&ctx->completion_lock);
597
598	/*
599	 * Eventfd should only get triggered when at least one event has been
600	 * posted. Some applications rely on the eventfd notification count
601	 * only changing IFF a new CQE has been added to the CQ ring. There's
602	 * no depedency on 1:1 relationship between how many times this
603	 * function is called (and hence the eventfd count) and number of CQEs
604	 * posted to the CQ ring.
605	 */
606	skip = ctx->cached_cq_tail == ctx->evfd_last_cq_tail;
607	ctx->evfd_last_cq_tail = ctx->cached_cq_tail;
608	spin_unlock(&ctx->completion_lock);
609	if (skip)
610		return;
611
612	io_eventfd_signal(ctx);
613}
614
615void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
616{
617	if (ctx->poll_activated)
618		io_poll_wq_wake(ctx);
619	if (ctx->off_timeout_used)
620		io_flush_timeouts(ctx);
621	if (ctx->drain_active) {
622		spin_lock(&ctx->completion_lock);
623		io_queue_deferred(ctx);
624		spin_unlock(&ctx->completion_lock);
625	}
626	if (ctx->has_evfd)
627		io_eventfd_flush_signal(ctx);
628}
629
630static inline void __io_cq_lock(struct io_ring_ctx *ctx)
631{
632	if (!ctx->lockless_cq)
633		spin_lock(&ctx->completion_lock);
634}
635
636static inline void io_cq_lock(struct io_ring_ctx *ctx)
637	__acquires(ctx->completion_lock)
638{
639	spin_lock(&ctx->completion_lock);
640}
641
642static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
643{
644	io_commit_cqring(ctx);
645	if (!ctx->task_complete) {
646		if (!ctx->lockless_cq)
647			spin_unlock(&ctx->completion_lock);
648		/* IOPOLL rings only need to wake up if it's also SQPOLL */
649		if (!ctx->syscall_iopoll)
650			io_cqring_wake(ctx);
651	}
652	io_commit_cqring_flush(ctx);
653}
654
655static void io_cq_unlock_post(struct io_ring_ctx *ctx)
656	__releases(ctx->completion_lock)
657{
658	io_commit_cqring(ctx);
659	spin_unlock(&ctx->completion_lock);
660	io_cqring_wake(ctx);
661	io_commit_cqring_flush(ctx);
662}
663
664/* Returns true if there are no backlogged entries after the flush */
665static void io_cqring_overflow_kill(struct io_ring_ctx *ctx)
666{
667	struct io_overflow_cqe *ocqe;
668	LIST_HEAD(list);
669
670	spin_lock(&ctx->completion_lock);
671	list_splice_init(&ctx->cq_overflow_list, &list);
672	clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
673	spin_unlock(&ctx->completion_lock);
674
675	while (!list_empty(&list)) {
676		ocqe = list_first_entry(&list, struct io_overflow_cqe, list);
677		list_del(&ocqe->list);
678		kfree(ocqe);
679	}
680}
681
682static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx)
683{
684	size_t cqe_size = sizeof(struct io_uring_cqe);
685
686	if (__io_cqring_events(ctx) == ctx->cq_entries)
687		return;
688
689	if (ctx->flags & IORING_SETUP_CQE32)
690		cqe_size <<= 1;
691
692	io_cq_lock(ctx);
693	while (!list_empty(&ctx->cq_overflow_list)) {
694		struct io_uring_cqe *cqe;
695		struct io_overflow_cqe *ocqe;
696
697		if (!io_get_cqe_overflow(ctx, &cqe, true))
698			break;
699		ocqe = list_first_entry(&ctx->cq_overflow_list,
700					struct io_overflow_cqe, list);
701		memcpy(cqe, &ocqe->cqe, cqe_size);
702		list_del(&ocqe->list);
703		kfree(ocqe);
704	}
705
706	if (list_empty(&ctx->cq_overflow_list)) {
707		clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
708		atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
709	}
710	io_cq_unlock_post(ctx);
711}
712
713static void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx)
714{
715	/* iopoll syncs against uring_lock, not completion_lock */
716	if (ctx->flags & IORING_SETUP_IOPOLL)
717		mutex_lock(&ctx->uring_lock);
718	__io_cqring_overflow_flush(ctx);
719	if (ctx->flags & IORING_SETUP_IOPOLL)
720		mutex_unlock(&ctx->uring_lock);
721}
722
723static void io_cqring_overflow_flush(struct io_ring_ctx *ctx)
724{
725	if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))
726		io_cqring_do_overflow_flush(ctx);
727}
728
729/* can be called by any task */
730static void io_put_task_remote(struct task_struct *task)
731{
732	struct io_uring_task *tctx = task->io_uring;
733
734	percpu_counter_sub(&tctx->inflight, 1);
735	if (unlikely(atomic_read(&tctx->in_cancel)))
736		wake_up(&tctx->wait);
737	put_task_struct(task);
738}
739
740/* used by a task to put its own references */
741static void io_put_task_local(struct task_struct *task)
742{
743	task->io_uring->cached_refs++;
744}
745
746/* must to be called somewhat shortly after putting a request */
747static inline void io_put_task(struct task_struct *task)
748{
749	if (likely(task == current))
750		io_put_task_local(task);
751	else
752		io_put_task_remote(task);
753}
754
755void io_task_refs_refill(struct io_uring_task *tctx)
756{
757	unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR;
758
759	percpu_counter_add(&tctx->inflight, refill);
760	refcount_add(refill, &current->usage);
761	tctx->cached_refs += refill;
762}
763
764static __cold void io_uring_drop_tctx_refs(struct task_struct *task)
765{
766	struct io_uring_task *tctx = task->io_uring;
767	unsigned int refs = tctx->cached_refs;
768
769	if (refs) {
770		tctx->cached_refs = 0;
771		percpu_counter_sub(&tctx->inflight, refs);
772		put_task_struct_many(task, refs);
773	}
774}
775
776static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data,
777				     s32 res, u32 cflags, u64 extra1, u64 extra2)
778{
779	struct io_overflow_cqe *ocqe;
780	size_t ocq_size = sizeof(struct io_overflow_cqe);
781	bool is_cqe32 = (ctx->flags & IORING_SETUP_CQE32);
782
783	lockdep_assert_held(&ctx->completion_lock);
784
785	if (is_cqe32)
786		ocq_size += sizeof(struct io_uring_cqe);
787
788	ocqe = kmalloc(ocq_size, GFP_ATOMIC | __GFP_ACCOUNT);
789	trace_io_uring_cqe_overflow(ctx, user_data, res, cflags, ocqe);
790	if (!ocqe) {
791		/*
792		 * If we're in ring overflow flush mode, or in task cancel mode,
793		 * or cannot allocate an overflow entry, then we need to drop it
794		 * on the floor.
795		 */
796		io_account_cq_overflow(ctx);
797		set_bit(IO_CHECK_CQ_DROPPED_BIT, &ctx->check_cq);
798		return false;
799	}
800	if (list_empty(&ctx->cq_overflow_list)) {
801		set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
802		atomic_or(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
803
804	}
805	ocqe->cqe.user_data = user_data;
806	ocqe->cqe.res = res;
807	ocqe->cqe.flags = cflags;
808	if (is_cqe32) {
809		ocqe->cqe.big_cqe[0] = extra1;
810		ocqe->cqe.big_cqe[1] = extra2;
811	}
812	list_add_tail(&ocqe->list, &ctx->cq_overflow_list);
813	return true;
814}
815
816void io_req_cqe_overflow(struct io_kiocb *req)
817{
818	io_cqring_event_overflow(req->ctx, req->cqe.user_data,
819				req->cqe.res, req->cqe.flags,
820				req->big_cqe.extra1, req->big_cqe.extra2);
821	memset(&req->big_cqe, 0, sizeof(req->big_cqe));
822}
823
824/*
825 * writes to the cq entry need to come after reading head; the
826 * control dependency is enough as we're using WRITE_ONCE to
827 * fill the cq entry
828 */
829bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow)
830{
831	struct io_rings *rings = ctx->rings;
832	unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1);
833	unsigned int free, queued, len;
834
835	/*
836	 * Posting into the CQ when there are pending overflowed CQEs may break
837	 * ordering guarantees, which will affect links, F_MORE users and more.
838	 * Force overflow the completion.
839	 */
840	if (!overflow && (ctx->check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)))
841		return false;
842
843	/* userspace may cheat modifying the tail, be safe and do min */
844	queued = min(__io_cqring_events(ctx), ctx->cq_entries);
845	free = ctx->cq_entries - queued;
846	/* we need a contiguous range, limit based on the current array offset */
847	len = min(free, ctx->cq_entries - off);
848	if (!len)
849		return false;
850
851	if (ctx->flags & IORING_SETUP_CQE32) {
852		off <<= 1;
853		len <<= 1;
854	}
855
856	ctx->cqe_cached = &rings->cqes[off];
857	ctx->cqe_sentinel = ctx->cqe_cached + len;
858	return true;
859}
860
861static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
862			      u32 cflags)
863{
864	struct io_uring_cqe *cqe;
865
866	ctx->cq_extra++;
867
868	/*
869	 * If we can't get a cq entry, userspace overflowed the
870	 * submission (by quite a lot). Increment the overflow count in
871	 * the ring.
872	 */
873	if (likely(io_get_cqe(ctx, &cqe))) {
874		trace_io_uring_complete(ctx, NULL, user_data, res, cflags, 0, 0);
875
876		WRITE_ONCE(cqe->user_data, user_data);
877		WRITE_ONCE(cqe->res, res);
878		WRITE_ONCE(cqe->flags, cflags);
879
880		if (ctx->flags & IORING_SETUP_CQE32) {
881			WRITE_ONCE(cqe->big_cqe[0], 0);
882			WRITE_ONCE(cqe->big_cqe[1], 0);
883		}
884		return true;
885	}
886	return false;
887}
888
889static void __io_flush_post_cqes(struct io_ring_ctx *ctx)
890	__must_hold(&ctx->uring_lock)
891{
892	struct io_submit_state *state = &ctx->submit_state;
893	unsigned int i;
894
895	lockdep_assert_held(&ctx->uring_lock);
896	for (i = 0; i < state->cqes_count; i++) {
897		struct io_uring_cqe *cqe = &ctx->completion_cqes[i];
898
899		if (!io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags)) {
900			if (ctx->lockless_cq) {
901				spin_lock(&ctx->completion_lock);
902				io_cqring_event_overflow(ctx, cqe->user_data,
903							cqe->res, cqe->flags, 0, 0);
904				spin_unlock(&ctx->completion_lock);
905			} else {
906				io_cqring_event_overflow(ctx, cqe->user_data,
907							cqe->res, cqe->flags, 0, 0);
908			}
909		}
910	}
911	state->cqes_count = 0;
912}
913
914static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
915			      bool allow_overflow)
916{
917	bool filled;
918
919	io_cq_lock(ctx);
920	filled = io_fill_cqe_aux(ctx, user_data, res, cflags);
921	if (!filled && allow_overflow)
922		filled = io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
923
924	io_cq_unlock_post(ctx);
925	return filled;
926}
927
928bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags)
929{
930	return __io_post_aux_cqe(ctx, user_data, res, cflags, true);
931}
932
933/*
934 * A helper for multishot requests posting additional CQEs.
935 * Should only be used from a task_work including IO_URING_F_MULTISHOT.
936 */
937bool io_fill_cqe_req_aux(struct io_kiocb *req, bool defer, s32 res, u32 cflags)
938{
939	struct io_ring_ctx *ctx = req->ctx;
940	u64 user_data = req->cqe.user_data;
941	struct io_uring_cqe *cqe;
942
943	if (!defer)
944		return __io_post_aux_cqe(ctx, user_data, res, cflags, false);
945
946	lockdep_assert_held(&ctx->uring_lock);
947
948	if (ctx->submit_state.cqes_count == ARRAY_SIZE(ctx->completion_cqes)) {
949		__io_cq_lock(ctx);
950		__io_flush_post_cqes(ctx);
951		/* no need to flush - flush is deferred */
952		__io_cq_unlock_post(ctx);
953	}
954
955	/* For defered completions this is not as strict as it is otherwise,
956	 * however it's main job is to prevent unbounded posted completions,
957	 * and in that it works just as well.
958	 */
959	if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))
960		return false;
961
962	cqe = &ctx->completion_cqes[ctx->submit_state.cqes_count++];
963	cqe->user_data = user_data;
964	cqe->res = res;
965	cqe->flags = cflags;
966	return true;
967}
968
969static void __io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
970{
971	struct io_ring_ctx *ctx = req->ctx;
972	struct io_rsrc_node *rsrc_node = NULL;
973
974	io_cq_lock(ctx);
975	if (!(req->flags & REQ_F_CQE_SKIP)) {
976		if (!io_fill_cqe_req(ctx, req))
977			io_req_cqe_overflow(req);
978	}
979
980	/*
981	 * If we're the last reference to this request, add to our locked
982	 * free_list cache.
983	 */
984	if (req_ref_put_and_test(req)) {
985		if (req->flags & IO_REQ_LINK_FLAGS) {
986			if (req->flags & IO_DISARM_MASK)
987				io_disarm_next(req);
988			if (req->link) {
989				io_req_task_queue(req->link);
990				req->link = NULL;
991			}
992		}
993		io_put_kbuf_comp(req);
994		if (unlikely(req->flags & IO_REQ_CLEAN_FLAGS))
995			io_clean_op(req);
996		io_put_file(req);
997
998		rsrc_node = req->rsrc_node;
999		/*
1000		 * Selected buffer deallocation in io_clean_op() assumes that
1001		 * we don't hold ->completion_lock. Clean them here to avoid
1002		 * deadlocks.
1003		 */
1004		io_put_task_remote(req->task);
1005		wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
1006		ctx->locked_free_nr++;
1007	}
1008	io_cq_unlock_post(ctx);
1009
1010	if (rsrc_node) {
1011		io_ring_submit_lock(ctx, issue_flags);
1012		io_put_rsrc_node(ctx, rsrc_node);
1013		io_ring_submit_unlock(ctx, issue_flags);
1014	}
1015}
1016
1017void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
1018{
1019	if (req->ctx->task_complete && req->ctx->submitter_task != current) {
1020		req->io_task_work.func = io_req_task_complete;
1021		io_req_task_work_add(req);
1022	} else if (!(issue_flags & IO_URING_F_UNLOCKED) ||
1023		   !(req->ctx->flags & IORING_SETUP_IOPOLL)) {
1024		__io_req_complete_post(req, issue_flags);
1025	} else {
1026		struct io_ring_ctx *ctx = req->ctx;
1027
1028		mutex_lock(&ctx->uring_lock);
1029		__io_req_complete_post(req, issue_flags & ~IO_URING_F_UNLOCKED);
1030		mutex_unlock(&ctx->uring_lock);
1031	}
1032}
1033
1034void io_req_defer_failed(struct io_kiocb *req, s32 res)
1035	__must_hold(&ctx->uring_lock)
1036{
1037	const struct io_cold_def *def = &io_cold_defs[req->opcode];
1038
1039	lockdep_assert_held(&req->ctx->uring_lock);
1040
1041	req_set_fail(req);
1042	io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED));
1043	if (def->fail)
1044		def->fail(req);
1045	io_req_complete_defer(req);
1046}
1047
1048/*
1049 * Don't initialise the fields below on every allocation, but do that in
1050 * advance and keep them valid across allocations.
1051 */
1052static void io_preinit_req(struct io_kiocb *req, struct io_ring_ctx *ctx)
1053{
1054	req->ctx = ctx;
1055	req->link = NULL;
1056	req->async_data = NULL;
1057	/* not necessary, but safer to zero */
1058	memset(&req->cqe, 0, sizeof(req->cqe));
1059	memset(&req->big_cqe, 0, sizeof(req->big_cqe));
1060}
1061
1062static void io_flush_cached_locked_reqs(struct io_ring_ctx *ctx,
1063					struct io_submit_state *state)
1064{
1065	spin_lock(&ctx->completion_lock);
1066	wq_list_splice(&ctx->locked_free_list, &state->free_list);
1067	ctx->locked_free_nr = 0;
1068	spin_unlock(&ctx->completion_lock);
1069}
1070
1071/*
1072 * A request might get retired back into the request caches even before opcode
1073 * handlers and io_issue_sqe() are done with it, e.g. inline completion path.
1074 * Because of that, io_alloc_req() should be called only under ->uring_lock
1075 * and with extra caution to not get a request that is still worked on.
1076 */
1077__cold bool __io_alloc_req_refill(struct io_ring_ctx *ctx)
1078	__must_hold(&ctx->uring_lock)
1079{
1080	gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
1081	void *reqs[IO_REQ_ALLOC_BATCH];
1082	int ret, i;
1083
1084	/*
1085	 * If we have more than a batch's worth of requests in our IRQ side
1086	 * locked cache, grab the lock and move them over to our submission
1087	 * side cache.
1088	 */
1089	if (data_race(ctx->locked_free_nr) > IO_COMPL_BATCH) {
1090		io_flush_cached_locked_reqs(ctx, &ctx->submit_state);
1091		if (!io_req_cache_empty(ctx))
1092			return true;
1093	}
1094
1095	ret = kmem_cache_alloc_bulk(req_cachep, gfp, ARRAY_SIZE(reqs), reqs);
1096
1097	/*
1098	 * Bulk alloc is all-or-nothing. If we fail to get a batch,
1099	 * retry single alloc to be on the safe side.
1100	 */
1101	if (unlikely(ret <= 0)) {
1102		reqs[0] = kmem_cache_alloc(req_cachep, gfp);
1103		if (!reqs[0])
1104			return false;
1105		ret = 1;
1106	}
1107
1108	percpu_ref_get_many(&ctx->refs, ret);
1109	for (i = 0; i < ret; i++) {
1110		struct io_kiocb *req = reqs[i];
1111
1112		io_preinit_req(req, ctx);
1113		io_req_add_to_cache(req, ctx);
1114	}
1115	return true;
1116}
1117
1118__cold void io_free_req(struct io_kiocb *req)
1119{
1120	/* refs were already put, restore them for io_req_task_complete() */
1121	req->flags &= ~REQ_F_REFCOUNT;
1122	/* we only want to free it, don't post CQEs */
1123	req->flags |= REQ_F_CQE_SKIP;
1124	req->io_task_work.func = io_req_task_complete;
1125	io_req_task_work_add(req);
1126}
1127
1128static void __io_req_find_next_prep(struct io_kiocb *req)
1129{
1130	struct io_ring_ctx *ctx = req->ctx;
1131
1132	spin_lock(&ctx->completion_lock);
1133	io_disarm_next(req);
1134	spin_unlock(&ctx->completion_lock);
1135}
1136
1137static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
1138{
1139	struct io_kiocb *nxt;
1140
1141	/*
1142	 * If LINK is set, we have dependent requests in this chain. If we
1143	 * didn't fail this request, queue the first one up, moving any other
1144	 * dependencies to the next request. In case of failure, fail the rest
1145	 * of the chain.
1146	 */
1147	if (unlikely(req->flags & IO_DISARM_MASK))
1148		__io_req_find_next_prep(req);
1149	nxt = req->link;
1150	req->link = NULL;
1151	return nxt;
1152}
1153
1154static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
1155{
1156	if (!ctx)
1157		return;
1158	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1159		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1160	if (ts->locked) {
1161		io_submit_flush_completions(ctx);
1162		mutex_unlock(&ctx->uring_lock);
1163		ts->locked = false;
1164	}
1165	percpu_ref_put(&ctx->refs);
1166}
1167
1168static unsigned int handle_tw_list(struct llist_node *node,
1169				   struct io_ring_ctx **ctx,
1170				   struct io_tw_state *ts)
1171{
1172	unsigned int count = 0;
1173
1174	do {
1175		struct llist_node *next = node->next;
1176		struct io_kiocb *req = container_of(node, struct io_kiocb,
1177						    io_task_work.node);
1178
1179		prefetch(container_of(next, struct io_kiocb, io_task_work.node));
1180
1181		if (req->ctx != *ctx) {
1182			ctx_flush_and_put(*ctx, ts);
1183			*ctx = req->ctx;
1184			/* if not contended, grab and improve batching */
1185			ts->locked = mutex_trylock(&(*ctx)->uring_lock);
1186			percpu_ref_get(&(*ctx)->refs);
1187		}
1188		INDIRECT_CALL_2(req->io_task_work.func,
1189				io_poll_task_func, io_req_rw_complete,
1190				req, ts);
1191		node = next;
1192		count++;
1193		if (unlikely(need_resched())) {
1194			ctx_flush_and_put(*ctx, ts);
1195			*ctx = NULL;
1196			cond_resched();
1197		}
1198	} while (node);
1199
1200	return count;
1201}
1202
1203/**
1204 * io_llist_xchg - swap all entries in a lock-less list
1205 * @head:	the head of lock-less list to delete all entries
1206 * @new:	new entry as the head of the list
1207 *
1208 * If list is empty, return NULL, otherwise, return the pointer to the first entry.
1209 * The order of entries returned is from the newest to the oldest added one.
1210 */
1211static inline struct llist_node *io_llist_xchg(struct llist_head *head,
1212					       struct llist_node *new)
1213{
1214	return xchg(&head->first, new);
1215}
1216
1217static __cold void io_fallback_tw(struct io_uring_task *tctx, bool sync)
1218{
1219	struct llist_node *node = llist_del_all(&tctx->task_list);
1220	struct io_ring_ctx *last_ctx = NULL;
1221	struct io_kiocb *req;
1222
1223	while (node) {
1224		req = container_of(node, struct io_kiocb, io_task_work.node);
1225		node = node->next;
1226		if (sync && last_ctx != req->ctx) {
1227			if (last_ctx) {
1228				flush_delayed_work(&last_ctx->fallback_work);
1229				percpu_ref_put(&last_ctx->refs);
1230			}
1231			last_ctx = req->ctx;
1232			percpu_ref_get(&last_ctx->refs);
1233		}
1234		if (llist_add(&req->io_task_work.node,
1235			      &req->ctx->fallback_llist))
1236			schedule_delayed_work(&req->ctx->fallback_work, 1);
1237	}
1238
1239	if (last_ctx) {
1240		flush_delayed_work(&last_ctx->fallback_work);
1241		percpu_ref_put(&last_ctx->refs);
1242	}
1243}
1244
1245void tctx_task_work(struct callback_head *cb)
1246{
1247	struct io_tw_state ts = {};
1248	struct io_ring_ctx *ctx = NULL;
1249	struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
1250						  task_work);
1251	struct llist_node *node;
1252	unsigned int count = 0;
1253
1254	if (unlikely(current->flags & PF_EXITING)) {
1255		io_fallback_tw(tctx, true);
1256		return;
1257	}
1258
1259	node = llist_del_all(&tctx->task_list);
1260	if (node)
1261		count = handle_tw_list(node, &ctx, &ts);
1262
1263	ctx_flush_and_put(ctx, &ts);
1264
1265	/* relaxed read is enough as only the task itself sets ->in_cancel */
1266	if (unlikely(atomic_read(&tctx->in_cancel)))
1267		io_uring_drop_tctx_refs(current);
1268
1269	trace_io_uring_task_work_run(tctx, count, 1);
1270}
1271
1272static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
1273{
1274	struct io_ring_ctx *ctx = req->ctx;
1275	unsigned nr_wait, nr_tw, nr_tw_prev;
1276	struct llist_node *first;
1277
1278	if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
1279		flags &= ~IOU_F_TWQ_LAZY_WAKE;
1280
1281	first = READ_ONCE(ctx->work_llist.first);
1282	do {
1283		nr_tw_prev = 0;
1284		if (first) {
1285			struct io_kiocb *first_req = container_of(first,
1286							struct io_kiocb,
1287							io_task_work.node);
1288			/*
1289			 * Might be executed at any moment, rely on
1290			 * SLAB_TYPESAFE_BY_RCU to keep it alive.
1291			 */
1292			nr_tw_prev = READ_ONCE(first_req->nr_tw);
1293		}
1294		nr_tw = nr_tw_prev + 1;
1295		/* Large enough to fail the nr_wait comparison below */
1296		if (!(flags & IOU_F_TWQ_LAZY_WAKE))
1297			nr_tw = INT_MAX;
1298
1299		req->nr_tw = nr_tw;
1300		req->io_task_work.node.next = first;
1301	} while (!try_cmpxchg(&ctx->work_llist.first, &first,
1302			      &req->io_task_work.node));
1303
1304	if (!first) {
1305		if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1306			atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1307		if (ctx->has_evfd)
1308			io_eventfd_signal(ctx);
1309	}
1310
1311	nr_wait = atomic_read(&ctx->cq_wait_nr);
1312	/* no one is waiting */
1313	if (!nr_wait)
1314		return;
1315	/* either not enough or the previous add has already woken it up */
1316	if (nr_wait > nr_tw || nr_tw_prev >= nr_wait)
1317		return;
1318	/* pairs with set_current_state() in io_cqring_wait() */
1319	smp_mb__after_atomic();
1320	wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
1321}
1322
1323static void io_req_normal_work_add(struct io_kiocb *req)
1324{
1325	struct io_uring_task *tctx = req->task->io_uring;
1326	struct io_ring_ctx *ctx = req->ctx;
1327
1328	/* task_work already pending, we're done */
1329	if (!llist_add(&req->io_task_work.node, &tctx->task_list))
1330		return;
1331
1332	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1333		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1334
1335	if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method)))
1336		return;
1337
1338	io_fallback_tw(tctx, false);
1339}
1340
1341void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
1342{
1343	if (req->ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
1344		rcu_read_lock();
1345		io_req_local_work_add(req, flags);
1346		rcu_read_unlock();
1347	} else {
1348		io_req_normal_work_add(req);
1349	}
1350}
1351
1352static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
1353{
1354	struct llist_node *node;
1355
1356	node = llist_del_all(&ctx->work_llist);
1357	while (node) {
1358		struct io_kiocb *req = container_of(node, struct io_kiocb,
1359						    io_task_work.node);
1360
1361		node = node->next;
1362		io_req_normal_work_add(req);
1363	}
1364}
1365
1366static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
1367				       int min_events)
1368{
1369	if (llist_empty(&ctx->work_llist))
1370		return false;
1371	if (events < min_events)
1372		return true;
1373	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1374		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1375	return false;
1376}
1377
1378static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
1379			       int min_events)
1380{
1381	struct llist_node *node;
1382	unsigned int loops = 0;
1383	int ret = 0;
1384
1385	if (WARN_ON_ONCE(ctx->submitter_task != current))
1386		return -EEXIST;
1387	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1388		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1389again:
1390	/*
1391	 * llists are in reverse order, flip it back the right way before
1392	 * running the pending items.
1393	 */
1394	node = llist_reverse_order(io_llist_xchg(&ctx->work_llist, NULL));
1395	while (node) {
1396		struct llist_node *next = node->next;
1397		struct io_kiocb *req = container_of(node, struct io_kiocb,
1398						    io_task_work.node);
1399		prefetch(container_of(next, struct io_kiocb, io_task_work.node));
1400		INDIRECT_CALL_2(req->io_task_work.func,
1401				io_poll_task_func, io_req_rw_complete,
1402				req, ts);
1403		ret++;
1404		node = next;
1405	}
1406	loops++;
1407
1408	if (io_run_local_work_continue(ctx, ret, min_events))
1409		goto again;
1410	if (ts->locked) {
1411		io_submit_flush_completions(ctx);
1412		if (io_run_local_work_continue(ctx, ret, min_events))
1413			goto again;
1414	}
1415
1416	trace_io_uring_local_work_run(ctx, ret, loops);
1417	return ret;
1418}
1419
1420static inline int io_run_local_work_locked(struct io_ring_ctx *ctx,
1421					   int min_events)
1422{
1423	struct io_tw_state ts = { .locked = true, };
1424	int ret;
1425
1426	if (llist_empty(&ctx->work_llist))
1427		return 0;
1428
1429	ret = __io_run_local_work(ctx, &ts, min_events);
1430	/* shouldn't happen! */
1431	if (WARN_ON_ONCE(!ts.locked))
1432		mutex_lock(&ctx->uring_lock);
1433	return ret;
1434}
1435
1436static int io_run_local_work(struct io_ring_ctx *ctx, int min_events)
1437{
1438	struct io_tw_state ts = {};
1439	int ret;
1440
1441	ts.locked = mutex_trylock(&ctx->uring_lock);
1442	ret = __io_run_local_work(ctx, &ts, min_events);
1443	if (ts.locked)
1444		mutex_unlock(&ctx->uring_lock);
1445
1446	return ret;
1447}
1448
1449static void io_req_task_cancel(struct io_kiocb *req, struct io_tw_state *ts)
1450{
1451	io_tw_lock(req->ctx, ts);
1452	io_req_defer_failed(req, req->cqe.res);
1453}
1454
1455void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts)
1456{
1457	io_tw_lock(req->ctx, ts);
1458	/* req->task == current here, checking PF_EXITING is safe */
1459	if (unlikely(req->task->flags & PF_EXITING))
1460		io_req_defer_failed(req, -EFAULT);
1461	else if (req->flags & REQ_F_FORCE_ASYNC)
1462		io_queue_iowq(req, ts);
1463	else
1464		io_queue_sqe(req);
1465}
1466
1467void io_req_task_queue_fail(struct io_kiocb *req, int ret)
1468{
1469	io_req_set_res(req, ret, 0);
1470	req->io_task_work.func = io_req_task_cancel;
1471	io_req_task_work_add(req);
1472}
1473
1474void io_req_task_queue(struct io_kiocb *req)
1475{
1476	req->io_task_work.func = io_req_task_submit;
1477	io_req_task_work_add(req);
1478}
1479
1480void io_queue_next(struct io_kiocb *req)
1481{
1482	struct io_kiocb *nxt = io_req_find_next(req);
1483
1484	if (nxt)
1485		io_req_task_queue(nxt);
1486}
1487
1488static void io_free_batch_list(struct io_ring_ctx *ctx,
1489			       struct io_wq_work_node *node)
1490	__must_hold(&ctx->uring_lock)
1491{
1492	do {
1493		struct io_kiocb *req = container_of(node, struct io_kiocb,
1494						    comp_list);
1495
1496		if (unlikely(req->flags & IO_REQ_CLEAN_SLOW_FLAGS)) {
1497			if (req->flags & REQ_F_REFCOUNT) {
1498				node = req->comp_list.next;
1499				if (!req_ref_put_and_test(req))
1500					continue;
1501			}
1502			if ((req->flags & REQ_F_POLLED) && req->apoll) {
1503				struct async_poll *apoll = req->apoll;
1504
1505				if (apoll->double_poll)
1506					kfree(apoll->double_poll);
1507				if (!io_alloc_cache_put(&ctx->apoll_cache, &apoll->cache))
1508					kfree(apoll);
1509				req->flags &= ~REQ_F_POLLED;
1510			}
1511			if (req->flags & IO_REQ_LINK_FLAGS)
1512				io_queue_next(req);
1513			if (unlikely(req->flags & IO_REQ_CLEAN_FLAGS))
1514				io_clean_op(req);
1515		}
1516		io_put_file(req);
1517
1518		io_req_put_rsrc_locked(req, ctx);
1519
1520		io_put_task(req->task);
1521		node = req->comp_list.next;
1522		io_req_add_to_cache(req, ctx);
1523	} while (node);
1524}
1525
1526void __io_submit_flush_completions(struct io_ring_ctx *ctx)
1527	__must_hold(&ctx->uring_lock)
1528{
1529	struct io_submit_state *state = &ctx->submit_state;
1530	struct io_wq_work_node *node;
1531
1532	__io_cq_lock(ctx);
1533	/* must come first to preserve CQE ordering in failure cases */
1534	if (state->cqes_count)
1535		__io_flush_post_cqes(ctx);
1536	__wq_list_for_each(node, &state->compl_reqs) {
1537		struct io_kiocb *req = container_of(node, struct io_kiocb,
1538					    comp_list);
1539
1540		if (!(req->flags & REQ_F_CQE_SKIP) &&
1541		    unlikely(!io_fill_cqe_req(ctx, req))) {
1542			if (ctx->lockless_cq) {
1543				spin_lock(&ctx->completion_lock);
1544				io_req_cqe_overflow(req);
1545				spin_unlock(&ctx->completion_lock);
1546			} else {
1547				io_req_cqe_overflow(req);
1548			}
1549		}
1550	}
1551	__io_cq_unlock_post(ctx);
1552
1553	if (!wq_list_empty(&ctx->submit_state.compl_reqs)) {
1554		io_free_batch_list(ctx, state->compl_reqs.first);
1555		INIT_WQ_LIST(&state->compl_reqs);
1556	}
1557}
1558
1559static unsigned io_cqring_events(struct io_ring_ctx *ctx)
1560{
1561	/* See comment at the top of this file */
1562	smp_rmb();
1563	return __io_cqring_events(ctx);
1564}
1565
1566/*
1567 * We can't just wait for polled events to come to us, we have to actively
1568 * find and complete them.
1569 */
1570static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
1571{
1572	if (!(ctx->flags & IORING_SETUP_IOPOLL))
1573		return;
1574
1575	mutex_lock(&ctx->uring_lock);
1576	while (!wq_list_empty(&ctx->iopoll_list)) {
1577		/* let it sleep and repeat later if can't complete a request */
1578		if (io_do_iopoll(ctx, true) == 0)
1579			break;
1580		/*
1581		 * Ensure we allow local-to-the-cpu processing to take place,
1582		 * in this case we need to ensure that we reap all events.
1583		 * Also let task_work, etc. to progress by releasing the mutex
1584		 */
1585		if (need_resched()) {
1586			mutex_unlock(&ctx->uring_lock);
1587			cond_resched();
1588			mutex_lock(&ctx->uring_lock);
1589		}
1590	}
1591	mutex_unlock(&ctx->uring_lock);
1592}
1593
1594static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
1595{
1596	unsigned int nr_events = 0;
1597	unsigned long check_cq;
1598
1599	if (!io_allowed_run_tw(ctx))
1600		return -EEXIST;
1601
1602	check_cq = READ_ONCE(ctx->check_cq);
1603	if (unlikely(check_cq)) {
1604		if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
1605			__io_cqring_overflow_flush(ctx);
1606		/*
1607		 * Similarly do not spin if we have not informed the user of any
1608		 * dropped CQE.
1609		 */
1610		if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT))
1611			return -EBADR;
1612	}
1613	/*
1614	 * Don't enter poll loop if we already have events pending.
1615	 * If we do, we can potentially be spinning for commands that
1616	 * already triggered a CQE (eg in error).
1617	 */
1618	if (io_cqring_events(ctx))
1619		return 0;
1620
1621	do {
1622		int ret = 0;
1623
1624		/*
1625		 * If a submit got punted to a workqueue, we can have the
1626		 * application entering polling for a command before it gets
1627		 * issued. That app will hold the uring_lock for the duration
1628		 * of the poll right here, so we need to take a breather every
1629		 * now and then to ensure that the issue has a chance to add
1630		 * the poll to the issued list. Otherwise we can spin here
1631		 * forever, while the workqueue is stuck trying to acquire the
1632		 * very same mutex.
1633		 */
1634		if (wq_list_empty(&ctx->iopoll_list) ||
1635		    io_task_work_pending(ctx)) {
1636			u32 tail = ctx->cached_cq_tail;
1637
1638			(void) io_run_local_work_locked(ctx, min);
1639
1640			if (task_work_pending(current) ||
1641			    wq_list_empty(&ctx->iopoll_list)) {
1642				mutex_unlock(&ctx->uring_lock);
1643				io_run_task_work();
1644				mutex_lock(&ctx->uring_lock);
1645			}
1646			/* some requests don't go through iopoll_list */
1647			if (tail != ctx->cached_cq_tail ||
1648			    wq_list_empty(&ctx->iopoll_list))
1649				break;
1650		}
1651		ret = io_do_iopoll(ctx, !min);
1652		if (unlikely(ret < 0))
1653			return ret;
1654
1655		if (task_sigpending(current))
1656			return -EINTR;
1657		if (need_resched())
1658			break;
1659
1660		nr_events += ret;
1661	} while (nr_events < min);
1662
1663	return 0;
1664}
1665
1666void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts)
1667{
1668	if (ts->locked)
1669		io_req_complete_defer(req);
1670	else
1671		io_req_complete_post(req, IO_URING_F_UNLOCKED);
1672}
1673
1674/*
1675 * After the iocb has been issued, it's safe to be found on the poll list.
1676 * Adding the kiocb to the list AFTER submission ensures that we don't
1677 * find it from a io_do_iopoll() thread before the issuer is done
1678 * accessing the kiocb cookie.
1679 */
1680static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
1681{
1682	struct io_ring_ctx *ctx = req->ctx;
1683	const bool needs_lock = issue_flags & IO_URING_F_UNLOCKED;
1684
1685	/* workqueue context doesn't hold uring_lock, grab it now */
1686	if (unlikely(needs_lock))
1687		mutex_lock(&ctx->uring_lock);
1688
1689	/*
1690	 * Track whether we have multiple files in our lists. This will impact
1691	 * how we do polling eventually, not spinning if we're on potentially
1692	 * different devices.
1693	 */
1694	if (wq_list_empty(&ctx->iopoll_list)) {
1695		ctx->poll_multi_queue = false;
1696	} else if (!ctx->poll_multi_queue) {
1697		struct io_kiocb *list_req;
1698
1699		list_req = container_of(ctx->iopoll_list.first, struct io_kiocb,
1700					comp_list);
1701		if (list_req->file != req->file)
1702			ctx->poll_multi_queue = true;
1703	}
1704
1705	/*
1706	 * For fast devices, IO may have already completed. If it has, add
1707	 * it to the front so we find it first.
1708	 */
1709	if (READ_ONCE(req->iopoll_completed))
1710		wq_list_add_head(&req->comp_list, &ctx->iopoll_list);
1711	else
1712		wq_list_add_tail(&req->comp_list, &ctx->iopoll_list);
1713
1714	if (unlikely(needs_lock)) {
1715		/*
1716		 * If IORING_SETUP_SQPOLL is enabled, sqes are either handle
1717		 * in sq thread task context or in io worker task context. If
1718		 * current task context is sq thread, we don't need to check
1719		 * whether should wake up sq thread.
1720		 */
1721		if ((ctx->flags & IORING_SETUP_SQPOLL) &&
1722		    wq_has_sleeper(&ctx->sq_data->wait))
1723			wake_up(&ctx->sq_data->wait);
1724
1725		mutex_unlock(&ctx->uring_lock);
1726	}
1727}
1728
1729unsigned int io_file_get_flags(struct file *file)
1730{
1731	unsigned int res = 0;
1732
1733	if (S_ISREG(file_inode(file)->i_mode))
1734		res |= REQ_F_ISREG;
1735	if ((file->f_flags & O_NONBLOCK) || (file->f_mode & FMODE_NOWAIT))
1736		res |= REQ_F_SUPPORT_NOWAIT;
1737	return res;
1738}
1739
1740bool io_alloc_async_data(struct io_kiocb *req)
1741{
1742	WARN_ON_ONCE(!io_cold_defs[req->opcode].async_size);
1743	req->async_data = kmalloc(io_cold_defs[req->opcode].async_size, GFP_KERNEL);
1744	if (req->async_data) {
1745		req->flags |= REQ_F_ASYNC_DATA;
1746		return false;
1747	}
1748	return true;
1749}
1750
1751int io_req_prep_async(struct io_kiocb *req)
1752{
1753	const struct io_cold_def *cdef = &io_cold_defs[req->opcode];
1754	const struct io_issue_def *def = &io_issue_defs[req->opcode];
1755
1756	/* assign early for deferred execution for non-fixed file */
1757	if (def->needs_file && !(req->flags & REQ_F_FIXED_FILE) && !req->file)
1758		req->file = io_file_get_normal(req, req->cqe.fd);
1759	if (!cdef->prep_async)
1760		return 0;
1761	if (WARN_ON_ONCE(req_has_async_data(req)))
1762		return -EFAULT;
1763	if (!def->manual_alloc) {
1764		if (io_alloc_async_data(req))
1765			return -EAGAIN;
1766	}
1767	return cdef->prep_async(req);
1768}
1769
1770static u32 io_get_sequence(struct io_kiocb *req)
1771{
1772	u32 seq = req->ctx->cached_sq_head;
1773	struct io_kiocb *cur;
1774
1775	/* need original cached_sq_head, but it was increased for each req */
1776	io_for_each_link(cur, req)
1777		seq--;
1778	return seq;
1779}
1780
1781static __cold void io_drain_req(struct io_kiocb *req)
1782	__must_hold(&ctx->uring_lock)
1783{
1784	struct io_ring_ctx *ctx = req->ctx;
1785	struct io_defer_entry *de;
1786	int ret;
1787	u32 seq = io_get_sequence(req);
1788
1789	/* Still need defer if there is pending req in defer list. */
1790	spin_lock(&ctx->completion_lock);
1791	if (!req_need_defer(req, seq) && list_empty_careful(&ctx->defer_list)) {
1792		spin_unlock(&ctx->completion_lock);
1793queue:
1794		ctx->drain_active = false;
1795		io_req_task_queue(req);
1796		return;
1797	}
1798	spin_unlock(&ctx->completion_lock);
1799
1800	io_prep_async_link(req);
1801	de = kmalloc(sizeof(*de), GFP_KERNEL);
1802	if (!de) {
1803		ret = -ENOMEM;
1804		io_req_defer_failed(req, ret);
1805		return;
1806	}
1807
1808	spin_lock(&ctx->completion_lock);
1809	if (!req_need_defer(req, seq) && list_empty(&ctx->defer_list)) {
1810		spin_unlock(&ctx->completion_lock);
1811		kfree(de);
1812		goto queue;
1813	}
1814
1815	trace_io_uring_defer(req);
1816	de->req = req;
1817	de->seq = seq;
1818	list_add_tail(&de->list, &ctx->defer_list);
1819	spin_unlock(&ctx->completion_lock);
1820}
1821
1822static bool io_assign_file(struct io_kiocb *req, const struct io_issue_def *def,
1823			   unsigned int issue_flags)
1824{
1825	if (req->file || !def->needs_file)
1826		return true;
1827
1828	if (req->flags & REQ_F_FIXED_FILE)
1829		req->file = io_file_get_fixed(req, req->cqe.fd, issue_flags);
1830	else
1831		req->file = io_file_get_normal(req, req->cqe.fd);
1832
1833	return !!req->file;
1834}
1835
1836static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
1837{
1838	const struct io_issue_def *def = &io_issue_defs[req->opcode];
1839	const struct cred *creds = NULL;
1840	int ret;
1841
1842	if (unlikely(!io_assign_file(req, def, issue_flags)))
1843		return -EBADF;
1844
1845	if (unlikely((req->flags & REQ_F_CREDS) && req->creds != current_cred()))
1846		creds = override_creds(req->creds);
1847
1848	if (!def->audit_skip)
1849		audit_uring_entry(req->opcode);
1850
1851	ret = def->issue(req, issue_flags);
1852
1853	if (!def->audit_skip)
1854		audit_uring_exit(!ret, ret);
1855
1856	if (creds)
1857		revert_creds(creds);
1858
1859	if (ret == IOU_OK) {
1860		if (issue_flags & IO_URING_F_COMPLETE_DEFER)
1861			io_req_complete_defer(req);
1862		else
1863			io_req_complete_post(req, issue_flags);
1864
1865		return 0;
1866	}
1867
1868	if (ret != IOU_ISSUE_SKIP_COMPLETE)
1869		return ret;
1870
1871	/* If the op doesn't have a file, we're not polling for it */
1872	if ((req->ctx->flags & IORING_SETUP_IOPOLL) && def->iopoll_queue)
1873		io_iopoll_req_issued(req, issue_flags);
1874
1875	return 0;
1876}
1877
1878int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts)
1879{
1880	io_tw_lock(req->ctx, ts);
1881	return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT|
1882				 IO_URING_F_COMPLETE_DEFER);
1883}
1884
1885struct io_wq_work *io_wq_free_work(struct io_wq_work *work)
1886{
1887	struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1888	struct io_kiocb *nxt = NULL;
1889
1890	if (req_ref_put_and_test(req)) {
1891		if (req->flags & IO_REQ_LINK_FLAGS)
1892			nxt = io_req_find_next(req);
1893		io_free_req(req);
1894	}
1895	return nxt ? &nxt->work : NULL;
1896}
1897
1898void io_wq_submit_work(struct io_wq_work *work)
1899{
1900	struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1901	const struct io_issue_def *def = &io_issue_defs[req->opcode];
1902	unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ;
1903	bool needs_poll = false;
1904	int ret = 0, err = -ECANCELED;
1905
1906	/* one will be dropped by ->io_wq_free_work() after returning to io-wq */
1907	if (!(req->flags & REQ_F_REFCOUNT))
1908		__io_req_set_refcount(req, 2);
1909	else
1910		req_ref_get(req);
1911
1912	io_arm_ltimeout(req);
1913
1914	/* either cancelled or io-wq is dying, so don't touch tctx->iowq */
1915	if (work->flags & IO_WQ_WORK_CANCEL) {
1916fail:
1917		io_req_task_queue_fail(req, err);
1918		return;
1919	}
1920	if (!io_assign_file(req, def, issue_flags)) {
1921		err = -EBADF;
1922		work->flags |= IO_WQ_WORK_CANCEL;
1923		goto fail;
1924	}
1925
1926	if (req->flags & REQ_F_FORCE_ASYNC) {
1927		bool opcode_poll = def->pollin || def->pollout;
1928
1929		if (opcode_poll && file_can_poll(req->file)) {
1930			needs_poll = true;
1931			issue_flags |= IO_URING_F_NONBLOCK;
1932		}
1933	}
1934
1935	do {
1936		ret = io_issue_sqe(req, issue_flags);
1937		if (ret != -EAGAIN)
1938			break;
1939
1940		/*
1941		 * If REQ_F_NOWAIT is set, then don't wait or retry with
1942		 * poll. -EAGAIN is final for that case.
1943		 */
1944		if (req->flags & REQ_F_NOWAIT)
1945			break;
1946
1947		/*
1948		 * We can get EAGAIN for iopolled IO even though we're
1949		 * forcing a sync submission from here, since we can't
1950		 * wait for request slots on the block side.
1951		 */
1952		if (!needs_poll) {
1953			if (!(req->ctx->flags & IORING_SETUP_IOPOLL))
1954				break;
1955			if (io_wq_worker_stopped())
1956				break;
1957			cond_resched();
1958			continue;
1959		}
1960
1961		if (io_arm_poll_handler(req, issue_flags) == IO_APOLL_OK)
1962			return;
1963		/* aborted or ready, in either case retry blocking */
1964		needs_poll = false;
1965		issue_flags &= ~IO_URING_F_NONBLOCK;
1966	} while (1);
1967
1968	/* avoid locking problems by failing it from a clean context */
1969	if (ret < 0)
1970		io_req_task_queue_fail(req, ret);
1971}
1972
1973inline struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
1974				      unsigned int issue_flags)
1975{
1976	struct io_ring_ctx *ctx = req->ctx;
1977	struct io_fixed_file *slot;
1978	struct file *file = NULL;
1979
1980	io_ring_submit_lock(ctx, issue_flags);
1981
1982	if (unlikely((unsigned int)fd >= ctx->nr_user_files))
1983		goto out;
1984	fd = array_index_nospec(fd, ctx->nr_user_files);
1985	slot = io_fixed_file_slot(&ctx->file_table, fd);
1986	file = io_slot_file(slot);
1987	req->flags |= io_slot_flags(slot);
1988	io_req_set_rsrc_node(req, ctx, 0);
1989out:
1990	io_ring_submit_unlock(ctx, issue_flags);
1991	return file;
1992}
1993
1994struct file *io_file_get_normal(struct io_kiocb *req, int fd)
1995{
1996	struct file *file = fget(fd);
1997
1998	trace_io_uring_file_get(req, fd);
1999
2000	/* we don't allow fixed io_uring files */
2001	if (file && io_is_uring_fops(file))
2002		io_req_track_inflight(req);
2003	return file;
2004}
2005
2006static void io_queue_async(struct io_kiocb *req, int ret)
2007	__must_hold(&req->ctx->uring_lock)
2008{
2009	struct io_kiocb *linked_timeout;
2010
2011	if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) {
2012		io_req_defer_failed(req, ret);
2013		return;
2014	}
2015
2016	linked_timeout = io_prep_linked_timeout(req);
2017
2018	switch (io_arm_poll_handler(req, 0)) {
2019	case IO_APOLL_READY:
2020		io_kbuf_recycle(req, 0);
2021		io_req_task_queue(req);
2022		break;
2023	case IO_APOLL_ABORTED:
2024		io_kbuf_recycle(req, 0);
2025		io_queue_iowq(req, NULL);
2026		break;
2027	case IO_APOLL_OK:
2028		break;
2029	}
2030
2031	if (linked_timeout)
2032		io_queue_linked_timeout(linked_timeout);
2033}
2034
2035static inline void io_queue_sqe(struct io_kiocb *req)
2036	__must_hold(&req->ctx->uring_lock)
2037{
2038	int ret;
2039
2040	ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
2041
2042	/*
2043	 * We async punt it if the file wasn't marked NOWAIT, or if the file
2044	 * doesn't support non-blocking read/write attempts
2045	 */
2046	if (likely(!ret))
2047		io_arm_ltimeout(req);
2048	else
2049		io_queue_async(req, ret);
2050}
2051
2052static void io_queue_sqe_fallback(struct io_kiocb *req)
2053	__must_hold(&req->ctx->uring_lock)
2054{
2055	if (unlikely(req->flags & REQ_F_FAIL)) {
2056		/*
2057		 * We don't submit, fail them all, for that replace hardlinks
2058		 * with normal links. Extra REQ_F_LINK is tolerated.
2059		 */
2060		req->flags &= ~REQ_F_HARDLINK;
2061		req->flags |= REQ_F_LINK;
2062		io_req_defer_failed(req, req->cqe.res);
2063	} else {
2064		int ret = io_req_prep_async(req);
2065
2066		if (unlikely(ret)) {
2067			io_req_defer_failed(req, ret);
2068			return;
2069		}
2070
2071		if (unlikely(req->ctx->drain_active))
2072			io_drain_req(req);
2073		else
2074			io_queue_iowq(req, NULL);
2075	}
2076}
2077
2078/*
2079 * Check SQE restrictions (opcode and flags).
2080 *
2081 * Returns 'true' if SQE is allowed, 'false' otherwise.
2082 */
2083static inline bool io_check_restriction(struct io_ring_ctx *ctx,
2084					struct io_kiocb *req,
2085					unsigned int sqe_flags)
2086{
2087	if (!test_bit(req->opcode, ctx->restrictions.sqe_op))
2088		return false;
2089
2090	if ((sqe_flags & ctx->restrictions.sqe_flags_required) !=
2091	    ctx->restrictions.sqe_flags_required)
2092		return false;
2093
2094	if (sqe_flags & ~(ctx->restrictions.sqe_flags_allowed |
2095			  ctx->restrictions.sqe_flags_required))
2096		return false;
2097
2098	return true;
2099}
2100
2101static void io_init_req_drain(struct io_kiocb *req)
2102{
2103	struct io_ring_ctx *ctx = req->ctx;
2104	struct io_kiocb *head = ctx->submit_state.link.head;
2105
2106	ctx->drain_active = true;
2107	if (head) {
2108		/*
2109		 * If we need to drain a request in the middle of a link, drain
2110		 * the head request and the next request/link after the current
2111		 * link. Considering sequential execution of links,
2112		 * REQ_F_IO_DRAIN will be maintained for every request of our
2113		 * link.
2114		 */
2115		head->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
2116		ctx->drain_next = true;
2117	}
2118}
2119
2120static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
2121		       const struct io_uring_sqe *sqe)
2122	__must_hold(&ctx->uring_lock)
2123{
2124	const struct io_issue_def *def;
2125	unsigned int sqe_flags;
2126	int personality;
2127	u8 opcode;
2128
2129	/* req is partially pre-initialised, see io_preinit_req() */
2130	req->opcode = opcode = READ_ONCE(sqe->opcode);
2131	/* same numerical values with corresponding REQ_F_*, safe to copy */
2132	req->flags = sqe_flags = READ_ONCE(sqe->flags);
2133	req->cqe.user_data = READ_ONCE(sqe->user_data);
2134	req->file = NULL;
2135	req->rsrc_node = NULL;
2136	req->task = current;
2137
2138	if (unlikely(opcode >= IORING_OP_LAST)) {
2139		req->opcode = 0;
2140		return -EINVAL;
2141	}
2142	def = &io_issue_defs[opcode];
2143	if (unlikely(sqe_flags & ~SQE_COMMON_FLAGS)) {
2144		/* enforce forwards compatibility on users */
2145		if (sqe_flags & ~SQE_VALID_FLAGS)
2146			return -EINVAL;
2147		if (sqe_flags & IOSQE_BUFFER_SELECT) {
2148			if (!def->buffer_select)
2149				return -EOPNOTSUPP;
2150			req->buf_index = READ_ONCE(sqe->buf_group);
2151		}
2152		if (sqe_flags & IOSQE_CQE_SKIP_SUCCESS)
2153			ctx->drain_disabled = true;
2154		if (sqe_flags & IOSQE_IO_DRAIN) {
2155			if (ctx->drain_disabled)
2156				return -EOPNOTSUPP;
2157			io_init_req_drain(req);
2158		}
2159	}
2160	if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) {
2161		if (ctx->restricted && !io_check_restriction(ctx, req, sqe_flags))
2162			return -EACCES;
2163		/* knock it to the slow queue path, will be drained there */
2164		if (ctx->drain_active)
2165			req->flags |= REQ_F_FORCE_ASYNC;
2166		/* if there is no link, we're at "next" request and need to drain */
2167		if (unlikely(ctx->drain_next) && !ctx->submit_state.link.head) {
2168			ctx->drain_next = false;
2169			ctx->drain_active = true;
2170			req->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
2171		}
2172	}
2173
2174	if (!def->ioprio && sqe->ioprio)
2175		return -EINVAL;
2176	if (!def->iopoll && (ctx->flags & IORING_SETUP_IOPOLL))
2177		return -EINVAL;
2178
2179	if (def->needs_file) {
2180		struct io_submit_state *state = &ctx->submit_state;
2181
2182		req->cqe.fd = READ_ONCE(sqe->fd);
2183
2184		/*
2185		 * Plug now if we have more than 2 IO left after this, and the
2186		 * target is potentially a read/write to block based storage.
2187		 */
2188		if (state->need_plug && def->plug) {
2189			state->plug_started = true;
2190			state->need_plug = false;
2191			blk_start_plug_nr_ios(&state->plug, state->submit_nr);
2192		}
2193	}
2194
2195	personality = READ_ONCE(sqe->personality);
2196	if (personality) {
2197		int ret;
2198
2199		req->creds = xa_load(&ctx->personalities, personality);
2200		if (!req->creds)
2201			return -EINVAL;
2202		get_cred(req->creds);
2203		ret = security_uring_override_creds(req->creds);
2204		if (ret) {
2205			put_cred(req->creds);
2206			return ret;
2207		}
2208		req->flags |= REQ_F_CREDS;
2209	}
2210
2211	return def->prep(req, sqe);
2212}
2213
2214static __cold int io_submit_fail_init(const struct io_uring_sqe *sqe,
2215				      struct io_kiocb *req, int ret)
2216{
2217	struct io_ring_ctx *ctx = req->ctx;
2218	struct io_submit_link *link = &ctx->submit_state.link;
2219	struct io_kiocb *head = link->head;
2220
2221	trace_io_uring_req_failed(sqe, req, ret);
2222
2223	/*
2224	 * Avoid breaking links in the middle as it renders links with SQPOLL
2225	 * unusable. Instead of failing eagerly, continue assembling the link if
2226	 * applicable and mark the head with REQ_F_FAIL. The link flushing code
2227	 * should find the flag and handle the rest.
2228	 */
2229	req_fail_link_node(req, ret);
2230	if (head && !(head->flags & REQ_F_FAIL))
2231		req_fail_link_node(head, -ECANCELED);
2232
2233	if (!(req->flags & IO_REQ_LINK_FLAGS)) {
2234		if (head) {
2235			link->last->link = req;
2236			link->head = NULL;
2237			req = head;
2238		}
2239		io_queue_sqe_fallback(req);
2240		return ret;
2241	}
2242
2243	if (head)
2244		link->last->link = req;
2245	else
2246		link->head = req;
2247	link->last = req;
2248	return 0;
2249}
2250
2251static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
2252			 const struct io_uring_sqe *sqe)
2253	__must_hold(&ctx->uring_lock)
2254{
2255	struct io_submit_link *link = &ctx->submit_state.link;
2256	int ret;
2257
2258	ret = io_init_req(ctx, req, sqe);
2259	if (unlikely(ret))
2260		return io_submit_fail_init(sqe, req, ret);
2261
2262	trace_io_uring_submit_req(req);
2263
2264	/*
2265	 * If we already have a head request, queue this one for async
2266	 * submittal once the head completes. If we don't have a head but
2267	 * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
2268	 * submitted sync once the chain is complete. If none of those
2269	 * conditions are true (normal request), then just queue it.
2270	 */
2271	if (unlikely(link->head)) {
2272		ret = io_req_prep_async(req);
2273		if (unlikely(ret))
2274			return io_submit_fail_init(sqe, req, ret);
2275
2276		trace_io_uring_link(req, link->head);
2277		link->last->link = req;
2278		link->last = req;
2279
2280		if (req->flags & IO_REQ_LINK_FLAGS)
2281			return 0;
2282		/* last request of the link, flush it */
2283		req = link->head;
2284		link->head = NULL;
2285		if (req->flags & (REQ_F_FORCE_ASYNC | REQ_F_FAIL))
2286			goto fallback;
2287
2288	} else if (unlikely(req->flags & (IO_REQ_LINK_FLAGS |
2289					  REQ_F_FORCE_ASYNC | REQ_F_FAIL))) {
2290		if (req->flags & IO_REQ_LINK_FLAGS) {
2291			link->head = req;
2292			link->last = req;
2293		} else {
2294fallback:
2295			io_queue_sqe_fallback(req);
2296		}
2297		return 0;
2298	}
2299
2300	io_queue_sqe(req);
2301	return 0;
2302}
2303
2304/*
2305 * Batched submission is done, ensure local IO is flushed out.
2306 */
2307static void io_submit_state_end(struct io_ring_ctx *ctx)
2308{
2309	struct io_submit_state *state = &ctx->submit_state;
2310
2311	if (unlikely(state->link.head))
2312		io_queue_sqe_fallback(state->link.head);
2313	/* flush only after queuing links as they can generate completions */
2314	io_submit_flush_completions(ctx);
2315	if (state->plug_started)
2316		blk_finish_plug(&state->plug);
2317}
2318
2319/*
2320 * Start submission side cache.
2321 */
2322static void io_submit_state_start(struct io_submit_state *state,
2323				  unsigned int max_ios)
2324{
2325	state->plug_started = false;
2326	state->need_plug = max_ios > 2;
2327	state->submit_nr = max_ios;
2328	/* set only head, no need to init link_last in advance */
2329	state->link.head = NULL;
2330}
2331
2332static void io_commit_sqring(struct io_ring_ctx *ctx)
2333{
2334	struct io_rings *rings = ctx->rings;
2335
2336	/*
2337	 * Ensure any loads from the SQEs are done at this point,
2338	 * since once we write the new head, the application could
2339	 * write new data to them.
2340	 */
2341	smp_store_release(&rings->sq.head, ctx->cached_sq_head);
2342}
2343
2344/*
2345 * Fetch an sqe, if one is available. Note this returns a pointer to memory
2346 * that is mapped by userspace. This means that care needs to be taken to
2347 * ensure that reads are stable, as we cannot rely on userspace always
2348 * being a good citizen. If members of the sqe are validated and then later
2349 * used, it's important that those reads are done through READ_ONCE() to
2350 * prevent a re-load down the line.
2351 */
2352static bool io_get_sqe(struct io_ring_ctx *ctx, const struct io_uring_sqe **sqe)
2353{
2354	unsigned mask = ctx->sq_entries - 1;
2355	unsigned head = ctx->cached_sq_head++ & mask;
2356
2357	if (!(ctx->flags & IORING_SETUP_NO_SQARRAY)) {
2358		head = READ_ONCE(ctx->sq_array[head]);
2359		if (unlikely(head >= ctx->sq_entries)) {
2360			/* drop invalid entries */
2361			spin_lock(&ctx->completion_lock);
2362			ctx->cq_extra--;
2363			spin_unlock(&ctx->completion_lock);
2364			WRITE_ONCE(ctx->rings->sq_dropped,
2365				   READ_ONCE(ctx->rings->sq_dropped) + 1);
2366			return false;
2367		}
2368	}
2369
2370	/*
2371	 * The cached sq head (or cq tail) serves two purposes:
2372	 *
2373	 * 1) allows us to batch the cost of updating the user visible
2374	 *    head updates.
2375	 * 2) allows the kernel side to track the head on its own, even
2376	 *    though the application is the one updating it.
2377	 */
2378
2379	/* double index for 128-byte SQEs, twice as long */
2380	if (ctx->flags & IORING_SETUP_SQE128)
2381		head <<= 1;
2382	*sqe = &ctx->sq_sqes[head];
2383	return true;
2384}
2385
2386int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
2387	__must_hold(&ctx->uring_lock)
2388{
2389	unsigned int entries = io_sqring_entries(ctx);
2390	unsigned int left;
2391	int ret;
2392
2393	if (unlikely(!entries))
2394		return 0;
2395	/* make sure SQ entry isn't read before tail */
2396	ret = left = min(nr, entries);
2397	io_get_task_refs(left);
2398	io_submit_state_start(&ctx->submit_state, left);
2399
2400	do {
2401		const struct io_uring_sqe *sqe;
2402		struct io_kiocb *req;
2403
2404		if (unlikely(!io_alloc_req(ctx, &req)))
2405			break;
2406		if (unlikely(!io_get_sqe(ctx, &sqe))) {
2407			io_req_add_to_cache(req, ctx);
2408			break;
2409		}
2410
2411		/*
2412		 * Continue submitting even for sqe failure if the
2413		 * ring was setup with IORING_SETUP_SUBMIT_ALL
2414		 */
2415		if (unlikely(io_submit_sqe(ctx, req, sqe)) &&
2416		    !(ctx->flags & IORING_SETUP_SUBMIT_ALL)) {
2417			left--;
2418			break;
2419		}
2420	} while (--left);
2421
2422	if (unlikely(left)) {
2423		ret -= left;
2424		/* try again if it submitted nothing and can't allocate a req */
2425		if (!ret && io_req_cache_empty(ctx))
2426			ret = -EAGAIN;
2427		current->io_uring->cached_refs += left;
2428	}
2429
2430	io_submit_state_end(ctx);
2431	 /* Commit SQ ring head once we've consumed and submitted all SQEs */
2432	io_commit_sqring(ctx);
2433	return ret;
2434}
2435
2436struct io_wait_queue {
2437	struct wait_queue_entry wq;
2438	struct io_ring_ctx *ctx;
2439	unsigned cq_tail;
2440	unsigned nr_timeouts;
2441	ktime_t timeout;
2442};
2443
2444static inline bool io_has_work(struct io_ring_ctx *ctx)
2445{
2446	return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) ||
2447	       !llist_empty(&ctx->work_llist);
2448}
2449
2450static inline bool io_should_wake(struct io_wait_queue *iowq)
2451{
2452	struct io_ring_ctx *ctx = iowq->ctx;
2453	int dist = READ_ONCE(ctx->rings->cq.tail) - (int) iowq->cq_tail;
2454
2455	/*
2456	 * Wake up if we have enough events, or if a timeout occurred since we
2457	 * started waiting. For timeouts, we always want to return to userspace,
2458	 * regardless of event count.
2459	 */
2460	return dist >= 0 || atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
2461}
2462
2463static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
2464			    int wake_flags, void *key)
2465{
2466	struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
2467
2468	/*
2469	 * Cannot safely flush overflowed CQEs from here, ensure we wake up
2470	 * the task, and the next invocation will do it.
2471	 */
2472	if (io_should_wake(iowq) || io_has_work(iowq->ctx))
2473		return autoremove_wake_function(curr, mode, wake_flags, key);
2474	return -1;
2475}
2476
2477int io_run_task_work_sig(struct io_ring_ctx *ctx)
2478{
2479	if (!llist_empty(&ctx->work_llist)) {
2480		__set_current_state(TASK_RUNNING);
2481		if (io_run_local_work(ctx, INT_MAX) > 0)
2482			return 0;
2483	}
2484	if (io_run_task_work() > 0)
2485		return 0;
2486	if (task_sigpending(current))
2487		return -EINTR;
2488	return 0;
2489}
2490
2491static bool current_pending_io(void)
2492{
2493	struct io_uring_task *tctx = current->io_uring;
2494
2495	if (!tctx)
2496		return false;
2497	return percpu_counter_read_positive(&tctx->inflight);
2498}
2499
2500/* when returns >0, the caller should retry */
2501static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
2502					  struct io_wait_queue *iowq)
2503{
2504	int ret;
2505
2506	if (unlikely(READ_ONCE(ctx->check_cq)))
2507		return 1;
2508	if (unlikely(!llist_empty(&ctx->work_llist)))
2509		return 1;
2510	if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL)))
2511		return 1;
2512	if (unlikely(task_sigpending(current)))
2513		return -EINTR;
2514	if (unlikely(io_should_wake(iowq)))
2515		return 0;
2516
2517	/*
2518	 * Mark us as being in io_wait if we have pending requests, so cpufreq
2519	 * can take into account that the task is waiting for IO - turns out
2520	 * to be important for low QD IO.
2521	 */
2522	if (current_pending_io())
2523		current->in_iowait = 1;
2524	ret = 0;
2525	if (iowq->timeout == KTIME_MAX)
2526		schedule();
2527	else if (!schedule_hrtimeout(&iowq->timeout, HRTIMER_MODE_ABS))
2528		ret = -ETIME;
2529	current->in_iowait = 0;
2530	return ret;
2531}
2532
2533/*
2534 * Wait until events become available, if we don't already have some. The
2535 * application must reap them itself, as they reside on the shared cq ring.
2536 */
2537static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2538			  const sigset_t __user *sig, size_t sigsz,
2539			  struct __kernel_timespec __user *uts)
2540{
2541	struct io_wait_queue iowq;
2542	struct io_rings *rings = ctx->rings;
2543	int ret;
2544
2545	if (!io_allowed_run_tw(ctx))
2546		return -EEXIST;
2547	if (!llist_empty(&ctx->work_llist))
2548		io_run_local_work(ctx, min_events);
2549	io_run_task_work();
2550	io_cqring_overflow_flush(ctx);
2551	/* if user messes with these they will just get an early return */
2552	if (__io_cqring_events_user(ctx) >= min_events)
2553		return 0;
2554
2555	if (sig) {
2556#ifdef CONFIG_COMPAT
2557		if (in_compat_syscall())
2558			ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
2559						      sigsz);
2560		else
2561#endif
2562			ret = set_user_sigmask(sig, sigsz);
2563
2564		if (ret)
2565			return ret;
2566	}
2567
2568	init_waitqueue_func_entry(&iowq.wq, io_wake_function);
2569	iowq.wq.private = current;
2570	INIT_LIST_HEAD(&iowq.wq.entry);
2571	iowq.ctx = ctx;
2572	iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
2573	iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
2574	iowq.timeout = KTIME_MAX;
2575
2576	if (uts) {
2577		struct timespec64 ts;
2578
2579		if (get_timespec64(&ts, uts))
2580			return -EFAULT;
2581		iowq.timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
2582	}
2583
2584	trace_io_uring_cqring_wait(ctx, min_events);
2585	do {
2586		int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);
2587		unsigned long check_cq;
2588
2589		if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
2590			atomic_set(&ctx->cq_wait_nr, nr_wait);
2591			set_current_state(TASK_INTERRUPTIBLE);
2592		} else {
2593			prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
2594							TASK_INTERRUPTIBLE);
2595		}
2596
2597		ret = io_cqring_wait_schedule(ctx, &iowq);
2598		__set_current_state(TASK_RUNNING);
2599		atomic_set(&ctx->cq_wait_nr, 0);
2600
2601		/*
2602		 * Run task_work after scheduling and before io_should_wake().
2603		 * If we got woken because of task_work being processed, run it
2604		 * now rather than let the caller do another wait loop.
2605		 */
2606		io_run_task_work();
2607		if (!llist_empty(&ctx->work_llist))
2608			io_run_local_work(ctx, nr_wait);
2609
2610		/*
2611		 * Non-local task_work will be run on exit to userspace, but
2612		 * if we're using DEFER_TASKRUN, then we could have waited
2613		 * with a timeout for a number of requests. If the timeout
2614		 * hits, we could have some requests ready to process. Ensure
2615		 * this break is _after_ we have run task_work, to avoid
2616		 * deferring running potentially pending requests until the
2617		 * next time we wait for events.
2618		 */
2619		if (ret < 0)
2620			break;
2621
2622		check_cq = READ_ONCE(ctx->check_cq);
2623		if (unlikely(check_cq)) {
2624			/* let the caller flush overflows, retry */
2625			if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
2626				io_cqring_do_overflow_flush(ctx);
2627			if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
2628				ret = -EBADR;
2629				break;
2630			}
2631		}
2632
2633		if (io_should_wake(&iowq)) {
2634			ret = 0;
2635			break;
2636		}
2637		cond_resched();
2638	} while (1);
2639
2640	if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
2641		finish_wait(&ctx->cq_wait, &iowq.wq);
2642	restore_saved_sigmask_unless(ret == -EINTR);
2643
2644	return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
2645}
2646
2647void io_mem_free(void *ptr)
2648{
2649	if (!ptr)
2650		return;
2651
2652	folio_put(virt_to_folio(ptr));
2653}
2654
2655static void io_pages_free(struct page ***pages, int npages)
2656{
2657	struct page **page_array;
2658	int i;
2659
2660	if (!pages)
2661		return;
2662
2663	page_array = *pages;
2664	if (!page_array)
2665		return;
2666
2667	for (i = 0; i < npages; i++)
2668		unpin_user_page(page_array[i]);
2669	kvfree(page_array);
2670	*pages = NULL;
2671}
2672
2673static void *__io_uaddr_map(struct page ***pages, unsigned short *npages,
2674			    unsigned long uaddr, size_t size)
2675{
2676	struct page **page_array;
2677	unsigned int nr_pages;
2678	void *page_addr;
2679	int ret, i, pinned;
2680
2681	*npages = 0;
2682
2683	if (uaddr & (PAGE_SIZE - 1) || !size)
2684		return ERR_PTR(-EINVAL);
2685
2686	nr_pages = (size + PAGE_SIZE - 1) >> PAGE_SHIFT;
2687	if (nr_pages > USHRT_MAX)
2688		return ERR_PTR(-EINVAL);
2689	page_array = kvmalloc_array(nr_pages, sizeof(struct page *), GFP_KERNEL);
2690	if (!page_array)
2691		return ERR_PTR(-ENOMEM);
2692
2693
2694	pinned = pin_user_pages_fast(uaddr, nr_pages, FOLL_WRITE | FOLL_LONGTERM,
2695				     page_array);
2696	if (pinned != nr_pages) {
2697		ret = (pinned < 0) ? pinned : -EFAULT;
2698		goto free_pages;
2699	}
2700
2701	page_addr = page_address(page_array[0]);
2702	for (i = 0; i < nr_pages; i++) {
2703		ret = -EINVAL;
2704
2705		/*
2706		 * Can't support mapping user allocated ring memory on 32-bit
2707		 * archs where it could potentially reside in highmem. Just
2708		 * fail those with -EINVAL, just like we did on kernels that
2709		 * didn't support this feature.
2710		 */
2711		if (PageHighMem(page_array[i]))
2712			goto free_pages;
2713
2714		/*
2715		 * No support for discontig pages for now, should either be a
2716		 * single normal page, or a huge page. Later on we can add
2717		 * support for remapping discontig pages, for now we will
2718		 * just fail them with EINVAL.
2719		 */
2720		if (page_address(page_array[i]) != page_addr)
2721			goto free_pages;
2722		page_addr += PAGE_SIZE;
2723	}
2724
2725	*pages = page_array;
2726	*npages = nr_pages;
2727	return page_to_virt(page_array[0]);
2728
2729free_pages:
2730	io_pages_free(&page_array, pinned > 0 ? pinned : 0);
2731	return ERR_PTR(ret);
2732}
2733
2734static void *io_rings_map(struct io_ring_ctx *ctx, unsigned long uaddr,
2735			  size_t size)
2736{
2737	return __io_uaddr_map(&ctx->ring_pages, &ctx->n_ring_pages, uaddr,
2738				size);
2739}
2740
2741static void *io_sqes_map(struct io_ring_ctx *ctx, unsigned long uaddr,
2742			 size_t size)
2743{
2744	return __io_uaddr_map(&ctx->sqe_pages, &ctx->n_sqe_pages, uaddr,
2745				size);
2746}
2747
2748static void io_rings_free(struct io_ring_ctx *ctx)
2749{
2750	if (!(ctx->flags & IORING_SETUP_NO_MMAP)) {
2751		io_mem_free(ctx->rings);
2752		io_mem_free(ctx->sq_sqes);
2753		ctx->rings = NULL;
2754		ctx->sq_sqes = NULL;
2755	} else {
2756		io_pages_free(&ctx->ring_pages, ctx->n_ring_pages);
2757		ctx->n_ring_pages = 0;
2758		io_pages_free(&ctx->sqe_pages, ctx->n_sqe_pages);
2759		ctx->n_sqe_pages = 0;
2760	}
2761}
2762
2763void *io_mem_alloc(size_t size)
2764{
2765	gfp_t gfp = GFP_KERNEL_ACCOUNT | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP;
2766	void *ret;
2767
2768	ret = (void *) __get_free_pages(gfp, get_order(size));
2769	if (ret)
2770		return ret;
2771	return ERR_PTR(-ENOMEM);
2772}
2773
2774static unsigned long rings_size(struct io_ring_ctx *ctx, unsigned int sq_entries,
2775				unsigned int cq_entries, size_t *sq_offset)
2776{
2777	struct io_rings *rings;
2778	size_t off, sq_array_size;
2779
2780	off = struct_size(rings, cqes, cq_entries);
2781	if (off == SIZE_MAX)
2782		return SIZE_MAX;
2783	if (ctx->flags & IORING_SETUP_CQE32) {
2784		if (check_shl_overflow(off, 1, &off))
2785			return SIZE_MAX;
2786	}
2787
2788#ifdef CONFIG_SMP
2789	off = ALIGN(off, SMP_CACHE_BYTES);
2790	if (off == 0)
2791		return SIZE_MAX;
2792#endif
2793
2794	if (ctx->flags & IORING_SETUP_NO_SQARRAY) {
2795		if (sq_offset)
2796			*sq_offset = SIZE_MAX;
2797		return off;
2798	}
2799
2800	if (sq_offset)
2801		*sq_offset = off;
2802
2803	sq_array_size = array_size(sizeof(u32), sq_entries);
2804	if (sq_array_size == SIZE_MAX)
2805		return SIZE_MAX;
2806
2807	if (check_add_overflow(off, sq_array_size, &off))
2808		return SIZE_MAX;
2809
2810	return off;
2811}
2812
2813static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg,
2814			       unsigned int eventfd_async)
2815{
2816	struct io_ev_fd *ev_fd;
2817	__s32 __user *fds = arg;
2818	int fd;
2819
2820	ev_fd = rcu_dereference_protected(ctx->io_ev_fd,
2821					lockdep_is_held(&ctx->uring_lock));
2822	if (ev_fd)
2823		return -EBUSY;
2824
2825	if (copy_from_user(&fd, fds, sizeof(*fds)))
2826		return -EFAULT;
2827
2828	ev_fd = kmalloc(sizeof(*ev_fd), GFP_KERNEL);
2829	if (!ev_fd)
2830		return -ENOMEM;
2831
2832	ev_fd->cq_ev_fd = eventfd_ctx_fdget(fd);
2833	if (IS_ERR(ev_fd->cq_ev_fd)) {
2834		int ret = PTR_ERR(ev_fd->cq_ev_fd);
2835		kfree(ev_fd);
2836		return ret;
2837	}
2838
2839	spin_lock(&ctx->completion_lock);
2840	ctx->evfd_last_cq_tail = ctx->cached_cq_tail;
2841	spin_unlock(&ctx->completion_lock);
2842
2843	ev_fd->eventfd_async = eventfd_async;
2844	ctx->has_evfd = true;
2845	rcu_assign_pointer(ctx->io_ev_fd, ev_fd);
2846	atomic_set(&ev_fd->refs, 1);
2847	atomic_set(&ev_fd->ops, 0);
2848	return 0;
2849}
2850
2851static int io_eventfd_unregister(struct io_ring_ctx *ctx)
2852{
2853	struct io_ev_fd *ev_fd;
2854
2855	ev_fd = rcu_dereference_protected(ctx->io_ev_fd,
2856					lockdep_is_held(&ctx->uring_lock));
2857	if (ev_fd) {
2858		ctx->has_evfd = false;
2859		rcu_assign_pointer(ctx->io_ev_fd, NULL);
2860		if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_FREE_BIT), &ev_fd->ops))
2861			call_rcu(&ev_fd->rcu, io_eventfd_ops);
2862		return 0;
2863	}
2864
2865	return -ENXIO;
2866}
2867
2868static void io_req_caches_free(struct io_ring_ctx *ctx)
2869{
2870	struct io_kiocb *req;
2871	int nr = 0;
2872
2873	mutex_lock(&ctx->uring_lock);
2874	io_flush_cached_locked_reqs(ctx, &ctx->submit_state);
2875
2876	while (!io_req_cache_empty(ctx)) {
2877		req = io_extract_req(ctx);
2878		kmem_cache_free(req_cachep, req);
2879		nr++;
2880	}
2881	if (nr)
2882		percpu_ref_put_many(&ctx->refs, nr);
2883	mutex_unlock(&ctx->uring_lock);
2884}
2885
2886static void io_rsrc_node_cache_free(struct io_cache_entry *entry)
2887{
2888	kfree(container_of(entry, struct io_rsrc_node, cache));
2889}
2890
2891static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
2892{
2893	io_sq_thread_finish(ctx);
2894	/* __io_rsrc_put_work() may need uring_lock to progress, wait w/o it */
2895	if (WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list)))
2896		return;
2897
2898	mutex_lock(&ctx->uring_lock);
2899	if (ctx->buf_data)
2900		__io_sqe_buffers_unregister(ctx);
2901	if (ctx->file_data)
2902		__io_sqe_files_unregister(ctx);
2903	io_cqring_overflow_kill(ctx);
2904	io_eventfd_unregister(ctx);
2905	io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free);
2906	io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
2907	io_destroy_buffers(ctx);
2908	mutex_unlock(&ctx->uring_lock);
2909	if (ctx->sq_creds)
2910		put_cred(ctx->sq_creds);
2911	if (ctx->submitter_task)
2912		put_task_struct(ctx->submitter_task);
2913
2914	/* there are no registered resources left, nobody uses it */
2915	if (ctx->rsrc_node)
2916		io_rsrc_node_destroy(ctx, ctx->rsrc_node);
2917
2918	WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list));
2919	WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list));
2920
2921	io_alloc_cache_free(&ctx->rsrc_node_cache, io_rsrc_node_cache_free);
2922	if (ctx->mm_account) {
2923		mmdrop(ctx->mm_account);
2924		ctx->mm_account = NULL;
2925	}
2926	io_rings_free(ctx);
2927	io_kbuf_mmap_list_free(ctx);
2928
2929	percpu_ref_exit(&ctx->refs);
2930	free_uid(ctx->user);
2931	io_req_caches_free(ctx);
2932	if (ctx->hash_map)
2933		io_wq_put_hash(ctx->hash_map);
2934	kfree(ctx->cancel_table.hbs);
2935	kfree(ctx->cancel_table_locked.hbs);
2936	kfree(ctx->io_bl);
2937	xa_destroy(&ctx->io_bl_xa);
2938	kfree(ctx);
2939}
2940
2941static __cold void io_activate_pollwq_cb(struct callback_head *cb)
2942{
2943	struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx,
2944					       poll_wq_task_work);
2945
2946	mutex_lock(&ctx->uring_lock);
2947	ctx->poll_activated = true;
2948	mutex_unlock(&ctx->uring_lock);
2949
2950	/*
2951	 * Wake ups for some events between start of polling and activation
2952	 * might've been lost due to loose synchronisation.
2953	 */
2954	wake_up_all(&ctx->poll_wq);
2955	percpu_ref_put(&ctx->refs);
2956}
2957
2958static __cold void io_activate_pollwq(struct io_ring_ctx *ctx)
2959{
2960	spin_lock(&ctx->completion_lock);
2961	/* already activated or in progress */
2962	if (ctx->poll_activated || ctx->poll_wq_task_work.func)
2963		goto out;
2964	if (WARN_ON_ONCE(!ctx->task_complete))
2965		goto out;
2966	if (!ctx->submitter_task)
2967		goto out;
2968	/*
2969	 * with ->submitter_task only the submitter task completes requests, we
2970	 * only need to sync with it, which is done by injecting a tw
2971	 */
2972	init_task_work(&ctx->poll_wq_task_work, io_activate_pollwq_cb);
2973	percpu_ref_get(&ctx->refs);
2974	if (task_work_add(ctx->submitter_task, &ctx->poll_wq_task_work, TWA_SIGNAL))
2975		percpu_ref_put(&ctx->refs);
2976out:
2977	spin_unlock(&ctx->completion_lock);
2978}
2979
2980static __poll_t io_uring_poll(struct file *file, poll_table *wait)
2981{
2982	struct io_ring_ctx *ctx = file->private_data;
2983	__poll_t mask = 0;
2984
2985	if (unlikely(!ctx->poll_activated))
2986		io_activate_pollwq(ctx);
2987
2988	poll_wait(file, &ctx->poll_wq, wait);
2989	/*
2990	 * synchronizes with barrier from wq_has_sleeper call in
2991	 * io_commit_cqring
2992	 */
2993	smp_rmb();
2994	if (!io_sqring_full(ctx))
2995		mask |= EPOLLOUT | EPOLLWRNORM;
2996
2997	/*
2998	 * Don't flush cqring overflow list here, just do a simple check.
2999	 * Otherwise there could possible be ABBA deadlock:
3000	 *      CPU0                    CPU1
3001	 *      ----                    ----
3002	 * lock(&ctx->uring_lock);
3003	 *                              lock(&ep->mtx);
3004	 *                              lock(&ctx->uring_lock);
3005	 * lock(&ep->mtx);
3006	 *
3007	 * Users may get EPOLLIN meanwhile seeing nothing in cqring, this
3008	 * pushes them to do the flush.
3009	 */
3010
3011	if (__io_cqring_events_user(ctx) || io_has_work(ctx))
3012		mask |= EPOLLIN | EPOLLRDNORM;
3013
3014	return mask;
3015}
3016
3017static int io_unregister_personality(struct io_ring_ctx *ctx, unsigned id)
3018{
3019	const struct cred *creds;
3020
3021	creds = xa_erase(&ctx->personalities, id);
3022	if (creds) {
3023		put_cred(creds);
3024		return 0;
3025	}
3026
3027	return -EINVAL;
3028}
3029
3030struct io_tctx_exit {
3031	struct callback_head		task_work;
3032	struct completion		completion;
3033	struct io_ring_ctx		*ctx;
3034};
3035
3036static __cold void io_tctx_exit_cb(struct callback_head *cb)
3037{
3038	struct io_uring_task *tctx = current->io_uring;
3039	struct io_tctx_exit *work;
3040
3041	work = container_of(cb, struct io_tctx_exit, task_work);
3042	/*
3043	 * When @in_cancel, we're in cancellation and it's racy to remove the
3044	 * node. It'll be removed by the end of cancellation, just ignore it.
3045	 * tctx can be NULL if the queueing of this task_work raced with
3046	 * work cancelation off the exec path.
3047	 */
3048	if (tctx && !atomic_read(&tctx->in_cancel))
3049		io_uring_del_tctx_node((unsigned long)work->ctx);
3050	complete(&work->completion);
3051}
3052
3053static __cold bool io_cancel_ctx_cb(struct io_wq_work *work, void *data)
3054{
3055	struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3056
3057	return req->ctx == data;
3058}
3059
3060static __cold void io_ring_exit_work(struct work_struct *work)
3061{
3062	struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, exit_work);
3063	unsigned long timeout = jiffies + HZ * 60 * 5;
3064	unsigned long interval = HZ / 20;
3065	struct io_tctx_exit exit;
3066	struct io_tctx_node *node;
3067	int ret;
3068
3069	/*
3070	 * If we're doing polled IO and end up having requests being
3071	 * submitted async (out-of-line), then completions can come in while
3072	 * we're waiting for refs to drop. We need to reap these manually,
3073	 * as nobody else will be looking for them.
3074	 */
3075	do {
3076		if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
3077			mutex_lock(&ctx->uring_lock);
3078			io_cqring_overflow_kill(ctx);
3079			mutex_unlock(&ctx->uring_lock);
3080		}
3081
3082		if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
3083			io_move_task_work_from_local(ctx);
3084
3085		while (io_uring_try_cancel_requests(ctx, NULL, true))
3086			cond_resched();
3087
3088		if (ctx->sq_data) {
3089			struct io_sq_data *sqd = ctx->sq_data;
3090			struct task_struct *tsk;
3091
3092			io_sq_thread_park(sqd);
3093			tsk = sqd->thread;
3094			if (tsk && tsk->io_uring && tsk->io_uring->io_wq)
3095				io_wq_cancel_cb(tsk->io_uring->io_wq,
3096						io_cancel_ctx_cb, ctx, true);
3097			io_sq_thread_unpark(sqd);
3098		}
3099
3100		io_req_caches_free(ctx);
3101
3102		if (WARN_ON_ONCE(time_after(jiffies, timeout))) {
3103			/* there is little hope left, don't run it too often */
3104			interval = HZ * 60;
3105		}
3106		/*
3107		 * This is really an uninterruptible wait, as it has to be
3108		 * complete. But it's also run from a kworker, which doesn't
3109		 * take signals, so it's fine to make it interruptible. This
3110		 * avoids scenarios where we knowingly can wait much longer
3111		 * on completions, for example if someone does a SIGSTOP on
3112		 * a task that needs to finish task_work to make this loop
3113		 * complete. That's a synthetic situation that should not
3114		 * cause a stuck task backtrace, and hence a potential panic
3115		 * on stuck tasks if that is enabled.
3116		 */
3117	} while (!wait_for_completion_interruptible_timeout(&ctx->ref_comp, interval));
3118
3119	init_completion(&exit.completion);
3120	init_task_work(&exit.task_work, io_tctx_exit_cb);
3121	exit.ctx = ctx;
3122
3123	mutex_lock(&ctx->uring_lock);
3124	while (!list_empty(&ctx->tctx_list)) {
3125		WARN_ON_ONCE(time_after(jiffies, timeout));
3126
3127		node = list_first_entry(&ctx->tctx_list, struct io_tctx_node,
3128					ctx_node);
3129		/* don't spin on a single task if cancellation failed */
3130		list_rotate_left(&ctx->tctx_list);
3131		ret = task_work_add(node->task, &exit.task_work, TWA_SIGNAL);
3132		if (WARN_ON_ONCE(ret))
3133			continue;
3134
3135		mutex_unlock(&ctx->uring_lock);
3136		/*
3137		 * See comment above for
3138		 * wait_for_completion_interruptible_timeout() on why this
3139		 * wait is marked as interruptible.
3140		 */
3141		wait_for_completion_interruptible(&exit.completion);
3142		mutex_lock(&ctx->uring_lock);
3143	}
3144	mutex_unlock(&ctx->uring_lock);
3145	spin_lock(&ctx->completion_lock);
3146	spin_unlock(&ctx->completion_lock);
3147
3148	/* pairs with RCU read section in io_req_local_work_add() */
3149	if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
3150		synchronize_rcu();
3151
3152	io_ring_ctx_free(ctx);
3153}
3154
3155static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
3156{
3157	unsigned long index;
3158	struct creds *creds;
3159
3160	mutex_lock(&ctx->uring_lock);
3161	percpu_ref_kill(&ctx->refs);
3162	xa_for_each(&ctx->personalities, index, creds)
3163		io_unregister_personality(ctx, index);
3164	if (ctx->rings)
3165		io_poll_remove_all(ctx, NULL, true);
3166	mutex_unlock(&ctx->uring_lock);
3167
3168	/*
3169	 * If we failed setting up the ctx, we might not have any rings
3170	 * and therefore did not submit any requests
3171	 */
3172	if (ctx->rings)
3173		io_kill_timeouts(ctx, NULL, true);
3174
3175	flush_delayed_work(&ctx->fallback_work);
3176
3177	INIT_WORK(&ctx->exit_work, io_ring_exit_work);
3178	/*
3179	 * Use system_unbound_wq to avoid spawning tons of event kworkers
3180	 * if we're exiting a ton of rings at the same time. It just adds
3181	 * noise and overhead, there's no discernable change in runtime
3182	 * over using system_wq.
3183	 */
3184	queue_work(system_unbound_wq, &ctx->exit_work);
3185}
3186
3187static int io_uring_release(struct inode *inode, struct file *file)
3188{
3189	struct io_ring_ctx *ctx = file->private_data;
3190
3191	file->private_data = NULL;
3192	io_ring_ctx_wait_and_kill(ctx);
3193	return 0;
3194}
3195
3196struct io_task_cancel {
3197	struct task_struct *task;
3198	bool all;
3199};
3200
3201static bool io_cancel_task_cb(struct io_wq_work *work, void *data)
3202{
3203	struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3204	struct io_task_cancel *cancel = data;
3205
3206	return io_match_task_safe(req, cancel->task, cancel->all);
3207}
3208
3209static __cold bool io_cancel_defer_files(struct io_ring_ctx *ctx,
3210					 struct task_struct *task,
3211					 bool cancel_all)
3212{
3213	struct io_defer_entry *de;
3214	LIST_HEAD(list);
3215
3216	spin_lock(&ctx->completion_lock);
3217	list_for_each_entry_reverse(de, &ctx->defer_list, list) {
3218		if (io_match_task_safe(de->req, task, cancel_all)) {
3219			list_cut_position(&list, &ctx->defer_list, &de->list);
3220			break;
3221		}
3222	}
3223	spin_unlock(&ctx->completion_lock);
3224	if (list_empty(&list))
3225		return false;
3226
3227	while (!list_empty(&list)) {
3228		de = list_first_entry(&list, struct io_defer_entry, list);
3229		list_del_init(&de->list);
3230		io_req_task_queue_fail(de->req, -ECANCELED);
3231		kfree(de);
3232	}
3233	return true;
3234}
3235
3236static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx)
3237{
3238	struct io_tctx_node *node;
3239	enum io_wq_cancel cret;
3240	bool ret = false;
3241
3242	mutex_lock(&ctx->uring_lock);
3243	list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
3244		struct io_uring_task *tctx = node->task->io_uring;
3245
3246		/*
3247		 * io_wq will stay alive while we hold uring_lock, because it's
3248		 * killed after ctx nodes, which requires to take the lock.
3249		 */
3250		if (!tctx || !tctx->io_wq)
3251			continue;
3252		cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_ctx_cb, ctx, true);
3253		ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
3254	}
3255	mutex_unlock(&ctx->uring_lock);
3256
3257	return ret;
3258}
3259
3260static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
3261						struct task_struct *task,
3262						bool cancel_all)
3263{
3264	struct io_task_cancel cancel = { .task = task, .all = cancel_all, };
3265	struct io_uring_task *tctx = task ? task->io_uring : NULL;
3266	enum io_wq_cancel cret;
3267	bool ret = false;
3268
3269	/* set it so io_req_local_work_add() would wake us up */
3270	if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
3271		atomic_set(&ctx->cq_wait_nr, 1);
3272		smp_mb();
3273	}
3274
3275	/* failed during ring init, it couldn't have issued any requests */
3276	if (!ctx->rings)
3277		return false;
3278
3279	if (!task) {
3280		ret |= io_uring_try_cancel_iowq(ctx);
3281	} else if (tctx && tctx->io_wq) {
3282		/*
3283		 * Cancels requests of all rings, not only @ctx, but
3284		 * it's fine as the task is in exit/exec.
3285		 */
3286		cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_task_cb,
3287				       &cancel, true);
3288		ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
3289	}
3290
3291	/* SQPOLL thread does its own polling */
3292	if ((!(ctx->flags & IORING_SETUP_SQPOLL) && cancel_all) ||
3293	    (ctx->sq_data && ctx->sq_data->thread == current)) {
3294		while (!wq_list_empty(&ctx->iopoll_list)) {
3295			io_iopoll_try_reap_events(ctx);
3296			ret = true;
3297			cond_resched();
3298		}
3299	}
3300
3301	if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
3302	    io_allowed_defer_tw_run(ctx))
3303		ret |= io_run_local_work(ctx, INT_MAX) > 0;
3304	ret |= io_cancel_defer_files(ctx, task, cancel_all);
3305	mutex_lock(&ctx->uring_lock);
3306	ret |= io_poll_remove_all(ctx, task, cancel_all);
3307	mutex_unlock(&ctx->uring_lock);
3308	ret |= io_kill_timeouts(ctx, task, cancel_all);
3309	if (task)
3310		ret |= io_run_task_work() > 0;
3311	return ret;
3312}
3313
3314static s64 tctx_inflight(struct io_uring_task *tctx, bool tracked)
3315{
3316	if (tracked)
3317		return atomic_read(&tctx->inflight_tracked);
3318	return percpu_counter_sum(&tctx->inflight);
3319}
3320
3321/*
3322 * Find any io_uring ctx that this task has registered or done IO on, and cancel
3323 * requests. @sqd should be not-null IFF it's an SQPOLL thread cancellation.
3324 */
3325__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
3326{
3327	struct io_uring_task *tctx = current->io_uring;
3328	struct io_ring_ctx *ctx;
3329	struct io_tctx_node *node;
3330	unsigned long index;
3331	s64 inflight;
3332	DEFINE_WAIT(wait);
3333
3334	WARN_ON_ONCE(sqd && sqd->thread != current);
3335
3336	if (!current->io_uring)
3337		return;
3338	if (tctx->io_wq)
3339		io_wq_exit_start(tctx->io_wq);
3340
3341	atomic_inc(&tctx->in_cancel);
3342	do {
3343		bool loop = false;
3344
3345		io_uring_drop_tctx_refs(current);
3346		/* read completions before cancelations */
3347		inflight = tctx_inflight(tctx, !cancel_all);
3348		if (!inflight)
3349			break;
3350
3351		if (!sqd) {
3352			xa_for_each(&tctx->xa, index, node) {
3353				/* sqpoll task will cancel all its requests */
3354				if (node->ctx->sq_data)
3355					continue;
3356				loop |= io_uring_try_cancel_requests(node->ctx,
3357							current, cancel_all);
3358			}
3359		} else {
3360			list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
3361				loop |= io_uring_try_cancel_requests(ctx,
3362								     current,
3363								     cancel_all);
3364		}
3365
3366		if (loop) {
3367			cond_resched();
3368			continue;
3369		}
3370
3371		prepare_to_wait(&tctx->wait, &wait, TASK_INTERRUPTIBLE);
3372		io_run_task_work();
3373		io_uring_drop_tctx_refs(current);
3374		xa_for_each(&tctx->xa, index, node) {
3375			if (!llist_empty(&node->ctx->work_llist)) {
3376				WARN_ON_ONCE(node->ctx->submitter_task &&
3377					     node->ctx->submitter_task != current);
3378				goto end_wait;
3379			}
3380		}
3381		/*
3382		 * If we've seen completions, retry without waiting. This
3383		 * avoids a race where a completion comes in before we did
3384		 * prepare_to_wait().
3385		 */
3386		if (inflight == tctx_inflight(tctx, !cancel_all))
3387			schedule();
3388end_wait:
3389		finish_wait(&tctx->wait, &wait);
3390	} while (1);
3391
3392	io_uring_clean_tctx(tctx);
3393	if (cancel_all) {
3394		/*
3395		 * We shouldn't run task_works after cancel, so just leave
3396		 * ->in_cancel set for normal exit.
3397		 */
3398		atomic_dec(&tctx->in_cancel);
3399		/* for exec all current's requests should be gone, kill tctx */
3400		__io_uring_free(current);
3401	}
3402}
3403
3404void __io_uring_cancel(bool cancel_all)
3405{
3406	io_uring_cancel_generic(cancel_all, NULL);
3407}
3408
3409static void *io_uring_validate_mmap_request(struct file *file,
3410					    loff_t pgoff, size_t sz)
3411{
3412	struct io_ring_ctx *ctx = file->private_data;
3413	loff_t offset = pgoff << PAGE_SHIFT;
3414	struct page *page;
3415	void *ptr;
3416
3417	switch (offset & IORING_OFF_MMAP_MASK) {
3418	case IORING_OFF_SQ_RING:
3419	case IORING_OFF_CQ_RING:
3420		/* Don't allow mmap if the ring was setup without it */
3421		if (ctx->flags & IORING_SETUP_NO_MMAP)
3422			return ERR_PTR(-EINVAL);
3423		ptr = ctx->rings;
3424		break;
3425	case IORING_OFF_SQES:
3426		/* Don't allow mmap if the ring was setup without it */
3427		if (ctx->flags & IORING_SETUP_NO_MMAP)
3428			return ERR_PTR(-EINVAL);
3429		ptr = ctx->sq_sqes;
3430		break;
3431	case IORING_OFF_PBUF_RING: {
3432		unsigned int bgid;
3433
3434		bgid = (offset & ~IORING_OFF_MMAP_MASK) >> IORING_OFF_PBUF_SHIFT;
3435		rcu_read_lock();
3436		ptr = io_pbuf_get_address(ctx, bgid);
3437		rcu_read_unlock();
3438		if (!ptr)
3439			return ERR_PTR(-EINVAL);
3440		break;
3441		}
3442	default:
3443		return ERR_PTR(-EINVAL);
3444	}
3445
3446	page = virt_to_head_page(ptr);
3447	if (sz > page_size(page))
3448		return ERR_PTR(-EINVAL);
3449
3450	return ptr;
3451}
3452
3453#ifdef CONFIG_MMU
3454
3455static __cold int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
3456{
3457	size_t sz = vma->vm_end - vma->vm_start;
3458	unsigned long pfn;
3459	void *ptr;
3460
3461	ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
3462	if (IS_ERR(ptr))
3463		return PTR_ERR(ptr);
3464
3465	pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
3466	return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
3467}
3468
3469static unsigned long io_uring_mmu_get_unmapped_area(struct file *filp,
3470			unsigned long addr, unsigned long len,
3471			unsigned long pgoff, unsigned long flags)
3472{
3473	void *ptr;
3474
3475	/*
3476	 * Do not allow to map to user-provided address to avoid breaking the
3477	 * aliasing rules. Userspace is not able to guess the offset address of
3478	 * kernel kmalloc()ed memory area.
3479	 */
3480	if (addr)
3481		return -EINVAL;
3482
3483	ptr = io_uring_validate_mmap_request(filp, pgoff, len);
3484	if (IS_ERR(ptr))
3485		return -ENOMEM;
3486
3487	/*
3488	 * Some architectures have strong cache aliasing requirements.
3489	 * For such architectures we need a coherent mapping which aliases
3490	 * kernel memory *and* userspace memory. To achieve that:
3491	 * - use a NULL file pointer to reference physical memory, and
3492	 * - use the kernel virtual address of the shared io_uring context
3493	 *   (instead of the userspace-provided address, which has to be 0UL
3494	 *   anyway).
3495	 * - use the same pgoff which the get_unmapped_area() uses to
3496	 *   calculate the page colouring.
3497	 * For architectures without such aliasing requirements, the
3498	 * architecture will return any suitable mapping because addr is 0.
3499	 */
3500	filp = NULL;
3501	flags |= MAP_SHARED;
3502	pgoff = 0;	/* has been translated to ptr above */
3503#ifdef SHM_COLOUR
3504	addr = (uintptr_t) ptr;
3505	pgoff = addr >> PAGE_SHIFT;
3506#else
3507	addr = 0UL;
3508#endif
3509	return current->mm->get_unmapped_area(filp, addr, len, pgoff, flags);
3510}
3511
3512#else /* !CONFIG_MMU */
3513
3514static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
3515{
3516	return is_nommu_shared_mapping(vma->vm_flags) ? 0 : -EINVAL;
3517}
3518
3519static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
3520{
3521	return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
3522}
3523
3524static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
3525	unsigned long addr, unsigned long len,
3526	unsigned long pgoff, unsigned long flags)
3527{
3528	void *ptr;
3529
3530	ptr = io_uring_validate_mmap_request(file, pgoff, len);
3531	if (IS_ERR(ptr))
3532		return PTR_ERR(ptr);
3533
3534	return (unsigned long) ptr;
3535}
3536
3537#endif /* !CONFIG_MMU */
3538
3539static int io_validate_ext_arg(unsigned flags, const void __user *argp, size_t argsz)
3540{
3541	if (flags & IORING_ENTER_EXT_ARG) {
3542		struct io_uring_getevents_arg arg;
3543
3544		if (argsz != sizeof(arg))
3545			return -EINVAL;
3546		if (copy_from_user(&arg, argp, sizeof(arg)))
3547			return -EFAULT;
3548	}
3549	return 0;
3550}
3551
3552static int io_get_ext_arg(unsigned flags, const void __user *argp, size_t *argsz,
3553			  struct __kernel_timespec __user **ts,
3554			  const sigset_t __user **sig)
3555{
3556	struct io_uring_getevents_arg arg;
3557
3558	/*
3559	 * If EXT_ARG isn't set, then we have no timespec and the argp pointer
3560	 * is just a pointer to the sigset_t.
3561	 */
3562	if (!(flags & IORING_ENTER_EXT_ARG)) {
3563		*sig = (const sigset_t __user *) argp;
3564		*ts = NULL;
3565		return 0;
3566	}
3567
3568	/*
3569	 * EXT_ARG is set - ensure we agree on the size of it and copy in our
3570	 * timespec and sigset_t pointers if good.
3571	 */
3572	if (*argsz != sizeof(arg))
3573		return -EINVAL;
3574	if (copy_from_user(&arg, argp, sizeof(arg)))
3575		return -EFAULT;
3576	if (arg.pad)
3577		return -EINVAL;
3578	*sig = u64_to_user_ptr(arg.sigmask);
3579	*argsz = arg.sigmask_sz;
3580	*ts = u64_to_user_ptr(arg.ts);
3581	return 0;
3582}
3583
3584SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
3585		u32, min_complete, u32, flags, const void __user *, argp,
3586		size_t, argsz)
3587{
3588	struct io_ring_ctx *ctx;
3589	struct file *file;
3590	long ret;
3591
3592	if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP |
3593			       IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG |
3594			       IORING_ENTER_REGISTERED_RING)))
3595		return -EINVAL;
3596
3597	/*
3598	 * Ring fd has been registered via IORING_REGISTER_RING_FDS, we
3599	 * need only dereference our task private array to find it.
3600	 */
3601	if (flags & IORING_ENTER_REGISTERED_RING) {
3602		struct io_uring_task *tctx = current->io_uring;
3603
3604		if (unlikely(!tctx || fd >= IO_RINGFD_REG_MAX))
3605			return -EINVAL;
3606		fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
3607		file = tctx->registered_rings[fd];
3608		if (unlikely(!file))
3609			return -EBADF;
3610	} else {
3611		file = fget(fd);
3612		if (unlikely(!file))
3613			return -EBADF;
3614		ret = -EOPNOTSUPP;
3615		if (unlikely(!io_is_uring_fops(file)))
3616			goto out;
3617	}
3618
3619	ctx = file->private_data;
3620	ret = -EBADFD;
3621	if (unlikely(ctx->flags & IORING_SETUP_R_DISABLED))
3622		goto out;
3623
3624	/*
3625	 * For SQ polling, the thread will do all submissions and completions.
3626	 * Just return the requested submit count, and wake the thread if
3627	 * we were asked to.
3628	 */
3629	ret = 0;
3630	if (ctx->flags & IORING_SETUP_SQPOLL) {
3631		io_cqring_overflow_flush(ctx);
3632
3633		if (unlikely(ctx->sq_data->thread == NULL)) {
3634			ret = -EOWNERDEAD;
3635			goto out;
3636		}
3637		if (flags & IORING_ENTER_SQ_WAKEUP)
3638			wake_up(&ctx->sq_data->wait);
3639		if (flags & IORING_ENTER_SQ_WAIT)
3640			io_sqpoll_wait_sq(ctx);
3641
3642		ret = to_submit;
3643	} else if (to_submit) {
3644		ret = io_uring_add_tctx_node(ctx);
3645		if (unlikely(ret))
3646			goto out;
3647
3648		mutex_lock(&ctx->uring_lock);
3649		ret = io_submit_sqes(ctx, to_submit);
3650		if (ret != to_submit) {
3651			mutex_unlock(&ctx->uring_lock);
3652			goto out;
3653		}
3654		if (flags & IORING_ENTER_GETEVENTS) {
3655			if (ctx->syscall_iopoll)
3656				goto iopoll_locked;
3657			/*
3658			 * Ignore errors, we'll soon call io_cqring_wait() and
3659			 * it should handle ownership problems if any.
3660			 */
3661			if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
3662				(void)io_run_local_work_locked(ctx, min_complete);
3663		}
3664		mutex_unlock(&ctx->uring_lock);
3665	}
3666
3667	if (flags & IORING_ENTER_GETEVENTS) {
3668		int ret2;
3669
3670		if (ctx->syscall_iopoll) {
3671			/*
3672			 * We disallow the app entering submit/complete with
3673			 * polling, but we still need to lock the ring to
3674			 * prevent racing with polled issue that got punted to
3675			 * a workqueue.
3676			 */
3677			mutex_lock(&ctx->uring_lock);
3678iopoll_locked:
3679			ret2 = io_validate_ext_arg(flags, argp, argsz);
3680			if (likely(!ret2)) {
3681				min_complete = min(min_complete,
3682						   ctx->cq_entries);
3683				ret2 = io_iopoll_check(ctx, min_complete);
3684			}
3685			mutex_unlock(&ctx->uring_lock);
3686		} else {
3687			const sigset_t __user *sig;
3688			struct __kernel_timespec __user *ts;
3689
3690			ret2 = io_get_ext_arg(flags, argp, &argsz, &ts, &sig);
3691			if (likely(!ret2)) {
3692				min_complete = min(min_complete,
3693						   ctx->cq_entries);
3694				ret2 = io_cqring_wait(ctx, min_complete, sig,
3695						      argsz, ts);
3696			}
3697		}
3698
3699		if (!ret) {
3700			ret = ret2;
3701
3702			/*
3703			 * EBADR indicates that one or more CQE were dropped.
3704			 * Once the user has been informed we can clear the bit
3705			 * as they are obviously ok with those drops.
3706			 */
3707			if (unlikely(ret2 == -EBADR))
3708				clear_bit(IO_CHECK_CQ_DROPPED_BIT,
3709					  &ctx->check_cq);
3710		}
3711	}
3712out:
3713	if (!(flags & IORING_ENTER_REGISTERED_RING))
3714		fput(file);
3715	return ret;
3716}
3717
3718static const struct file_operations io_uring_fops = {
3719	.release	= io_uring_release,
3720	.mmap		= io_uring_mmap,
3721#ifndef CONFIG_MMU
3722	.get_unmapped_area = io_uring_nommu_get_unmapped_area,
3723	.mmap_capabilities = io_uring_nommu_mmap_capabilities,
3724#else
3725	.get_unmapped_area = io_uring_mmu_get_unmapped_area,
3726#endif
3727	.poll		= io_uring_poll,
3728#ifdef CONFIG_PROC_FS
3729	.show_fdinfo	= io_uring_show_fdinfo,
3730#endif
3731};
3732
3733bool io_is_uring_fops(struct file *file)
3734{
3735	return file->f_op == &io_uring_fops;
3736}
3737
3738static __cold int io_allocate_scq_urings(struct io_ring_ctx *ctx,
3739					 struct io_uring_params *p)
3740{
3741	struct io_rings *rings;
3742	size_t size, sq_array_offset;
3743	void *ptr;
3744
3745	/* make sure these are sane, as we already accounted them */
3746	ctx->sq_entries = p->sq_entries;
3747	ctx->cq_entries = p->cq_entries;
3748
3749	size = rings_size(ctx, p->sq_entries, p->cq_entries, &sq_array_offset);
3750	if (size == SIZE_MAX)
3751		return -EOVERFLOW;
3752
3753	if (!(ctx->flags & IORING_SETUP_NO_MMAP))
3754		rings = io_mem_alloc(size);
3755	else
3756		rings = io_rings_map(ctx, p->cq_off.user_addr, size);
3757
3758	if (IS_ERR(rings))
3759		return PTR_ERR(rings);
3760
3761	ctx->rings = rings;
3762	if (!(ctx->flags & IORING_SETUP_NO_SQARRAY))
3763		ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
3764	rings->sq_ring_mask = p->sq_entries - 1;
3765	rings->cq_ring_mask = p->cq_entries - 1;
3766	rings->sq_ring_entries = p->sq_entries;
3767	rings->cq_ring_entries = p->cq_entries;
3768
3769	if (p->flags & IORING_SETUP_SQE128)
3770		size = array_size(2 * sizeof(struct io_uring_sqe), p->sq_entries);
3771	else
3772		size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
3773	if (size == SIZE_MAX) {
3774		io_rings_free(ctx);
3775		return -EOVERFLOW;
3776	}
3777
3778	if (!(ctx->flags & IORING_SETUP_NO_MMAP))
3779		ptr = io_mem_alloc(size);
3780	else
3781		ptr = io_sqes_map(ctx, p->sq_off.user_addr, size);
3782
3783	if (IS_ERR(ptr)) {
3784		io_rings_free(ctx);
3785		return PTR_ERR(ptr);
3786	}
3787
3788	ctx->sq_sqes = ptr;
3789	return 0;
3790}
3791
3792static int io_uring_install_fd(struct file *file)
3793{
3794	int fd;
3795
3796	fd = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
3797	if (fd < 0)
3798		return fd;
3799	fd_install(fd, file);
3800	return fd;
3801}
3802
3803/*
3804 * Allocate an anonymous fd, this is what constitutes the application
3805 * visible backing of an io_uring instance. The application mmaps this
3806 * fd to gain access to the SQ/CQ ring details.
3807 */
3808static struct file *io_uring_get_file(struct io_ring_ctx *ctx)
3809{
3810	return anon_inode_getfile_secure("[io_uring]", &io_uring_fops, ctx,
3811					 O_RDWR | O_CLOEXEC, NULL);
3812}
3813
3814static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
3815				  struct io_uring_params __user *params)
3816{
3817	struct io_ring_ctx *ctx;
3818	struct io_uring_task *tctx;
3819	struct file *file;
3820	int ret;
3821
3822	if (!entries)
3823		return -EINVAL;
3824	if (entries > IORING_MAX_ENTRIES) {
3825		if (!(p->flags & IORING_SETUP_CLAMP))
3826			return -EINVAL;
3827		entries = IORING_MAX_ENTRIES;
3828	}
3829
3830	if ((p->flags & IORING_SETUP_REGISTERED_FD_ONLY)
3831	    && !(p->flags & IORING_SETUP_NO_MMAP))
3832		return -EINVAL;
3833
3834	/*
3835	 * Use twice as many entries for the CQ ring. It's possible for the
3836	 * application to drive a higher depth than the size of the SQ ring,
3837	 * since the sqes are only used at submission time. This allows for
3838	 * some flexibility in overcommitting a bit. If the application has
3839	 * set IORING_SETUP_CQSIZE, it will have passed in the desired number
3840	 * of CQ ring entries manually.
3841	 */
3842	p->sq_entries = roundup_pow_of_two(entries);
3843	if (p->flags & IORING_SETUP_CQSIZE) {
3844		/*
3845		 * If IORING_SETUP_CQSIZE is set, we do the same roundup
3846		 * to a power-of-two, if it isn't already. We do NOT impose
3847		 * any cq vs sq ring sizing.
3848		 */
3849		if (!p->cq_entries)
3850			return -EINVAL;
3851		if (p->cq_entries > IORING_MAX_CQ_ENTRIES) {
3852			if (!(p->flags & IORING_SETUP_CLAMP))
3853				return -EINVAL;
3854			p->cq_entries = IORING_MAX_CQ_ENTRIES;
3855		}
3856		p->cq_entries = roundup_pow_of_two(p->cq_entries);
3857		if (p->cq_entries < p->sq_entries)
3858			return -EINVAL;
3859	} else {
3860		p->cq_entries = 2 * p->sq_entries;
3861	}
3862
3863	ctx = io_ring_ctx_alloc(p);
3864	if (!ctx)
3865		return -ENOMEM;
3866
3867	if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
3868	    !(ctx->flags & IORING_SETUP_IOPOLL) &&
3869	    !(ctx->flags & IORING_SETUP_SQPOLL))
3870		ctx->task_complete = true;
3871
3872	if (ctx->task_complete || (ctx->flags & IORING_SETUP_IOPOLL))
3873		ctx->lockless_cq = true;
3874
3875	/*
3876	 * lazy poll_wq activation relies on ->task_complete for synchronisation
3877	 * purposes, see io_activate_pollwq()
3878	 */
3879	if (!ctx->task_complete)
3880		ctx->poll_activated = true;
3881
3882	/*
3883	 * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
3884	 * space applications don't need to do io completion events
3885	 * polling again, they can rely on io_sq_thread to do polling
3886	 * work, which can reduce cpu usage and uring_lock contention.
3887	 */
3888	if (ctx->flags & IORING_SETUP_IOPOLL &&
3889	    !(ctx->flags & IORING_SETUP_SQPOLL))
3890		ctx->syscall_iopoll = 1;
3891
3892	ctx->compat = in_compat_syscall();
3893	if (!ns_capable_noaudit(&init_user_ns, CAP_IPC_LOCK))
3894		ctx->user = get_uid(current_user());
3895
3896	/*
3897	 * For SQPOLL, we just need a wakeup, always. For !SQPOLL, if
3898	 * COOP_TASKRUN is set, then IPIs are never needed by the app.
3899	 */
3900	ret = -EINVAL;
3901	if (ctx->flags & IORING_SETUP_SQPOLL) {
3902		/* IPI related flags don't make sense with SQPOLL */
3903		if (ctx->flags & (IORING_SETUP_COOP_TASKRUN |
3904				  IORING_SETUP_TASKRUN_FLAG |
3905				  IORING_SETUP_DEFER_TASKRUN))
3906			goto err;
3907		ctx->notify_method = TWA_SIGNAL_NO_IPI;
3908	} else if (ctx->flags & IORING_SETUP_COOP_TASKRUN) {
3909		ctx->notify_method = TWA_SIGNAL_NO_IPI;
3910	} else {
3911		if (ctx->flags & IORING_SETUP_TASKRUN_FLAG &&
3912		    !(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
3913			goto err;
3914		ctx->notify_method = TWA_SIGNAL;
3915	}
3916
3917	/*
3918	 * For DEFER_TASKRUN we require the completion task to be the same as the
3919	 * submission task. This implies that there is only one submitter, so enforce
3920	 * that.
3921	 */
3922	if (ctx->flags & IORING_SETUP_DEFER_TASKRUN &&
3923	    !(ctx->flags & IORING_SETUP_SINGLE_ISSUER)) {
3924		goto err;
3925	}
3926
3927	/*
3928	 * This is just grabbed for accounting purposes. When a process exits,
3929	 * the mm is exited and dropped before the files, hence we need to hang
3930	 * on to this mm purely for the purposes of being able to unaccount
3931	 * memory (locked/pinned vm). It's not used for anything else.
3932	 */
3933	mmgrab(current->mm);
3934	ctx->mm_account = current->mm;
3935
3936	ret = io_allocate_scq_urings(ctx, p);
3937	if (ret)
3938		goto err;
3939
3940	ret = io_sq_offload_create(ctx, p);
3941	if (ret)
3942		goto err;
3943
3944	ret = io_rsrc_init(ctx);
3945	if (ret)
3946		goto err;
3947
3948	p->sq_off.head = offsetof(struct io_rings, sq.head);
3949	p->sq_off.tail = offsetof(struct io_rings, sq.tail);
3950	p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
3951	p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
3952	p->sq_off.flags = offsetof(struct io_rings, sq_flags);
3953	p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
3954	if (!(ctx->flags & IORING_SETUP_NO_SQARRAY))
3955		p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
3956	p->sq_off.resv1 = 0;
3957	if (!(ctx->flags & IORING_SETUP_NO_MMAP))
3958		p->sq_off.user_addr = 0;
3959
3960	p->cq_off.head = offsetof(struct io_rings, cq.head);
3961	p->cq_off.tail = offsetof(struct io_rings, cq.tail);
3962	p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
3963	p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
3964	p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
3965	p->cq_off.cqes = offsetof(struct io_rings, cqes);
3966	p->cq_off.flags = offsetof(struct io_rings, cq_flags);
3967	p->cq_off.resv1 = 0;
3968	if (!(ctx->flags & IORING_SETUP_NO_MMAP))
3969		p->cq_off.user_addr = 0;
3970
3971	p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
3972			IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS |
3973			IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL |
3974			IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
3975			IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS |
3976			IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP |
3977			IORING_FEAT_LINKED_FILE | IORING_FEAT_REG_REG_RING;
3978
3979	if (copy_to_user(params, p, sizeof(*p))) {
3980		ret = -EFAULT;
3981		goto err;
3982	}
3983
3984	if (ctx->flags & IORING_SETUP_SINGLE_ISSUER
3985	    && !(ctx->flags & IORING_SETUP_R_DISABLED))
3986		WRITE_ONCE(ctx->submitter_task, get_task_struct(current));
3987
3988	file = io_uring_get_file(ctx);
3989	if (IS_ERR(file)) {
3990		ret = PTR_ERR(file);
3991		goto err;
3992	}
3993
3994	ret = __io_uring_add_tctx_node(ctx);
3995	if (ret)
3996		goto err_fput;
3997	tctx = current->io_uring;
3998
3999	/*
4000	 * Install ring fd as the very last thing, so we don't risk someone
4001	 * having closed it before we finish setup
4002	 */
4003	if (p->flags & IORING_SETUP_REGISTERED_FD_ONLY)
4004		ret = io_ring_add_registered_file(tctx, file, 0, IO_RINGFD_REG_MAX);
4005	else
4006		ret = io_uring_install_fd(file);
4007	if (ret < 0)
4008		goto err_fput;
4009
4010	trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
4011	return ret;
4012err:
4013	io_ring_ctx_wait_and_kill(ctx);
4014	return ret;
4015err_fput:
4016	fput(file);
4017	return ret;
4018}
4019
4020/*
4021 * Sets up an aio uring context, and returns the fd. Applications asks for a
4022 * ring size, we return the actual sq/cq ring sizes (among other things) in the
4023 * params structure passed in.
4024 */
4025static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
4026{
4027	struct io_uring_params p;
4028	int i;
4029
4030	if (copy_from_user(&p, params, sizeof(p)))
4031		return -EFAULT;
4032	for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
4033		if (p.resv[i])
4034			return -EINVAL;
4035	}
4036
4037	if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
4038			IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
4039			IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ |
4040			IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL |
4041			IORING_SETUP_COOP_TASKRUN | IORING_SETUP_TASKRUN_FLAG |
4042			IORING_SETUP_SQE128 | IORING_SETUP_CQE32 |
4043			IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN |
4044			IORING_SETUP_NO_MMAP | IORING_SETUP_REGISTERED_FD_ONLY |
4045			IORING_SETUP_NO_SQARRAY))
4046		return -EINVAL;
4047
4048	return io_uring_create(entries, &p, params);
4049}
4050
4051static inline bool io_uring_allowed(void)
4052{
4053	int disabled = READ_ONCE(sysctl_io_uring_disabled);
4054	kgid_t io_uring_group;
4055
4056	if (disabled == 2)
4057		return false;
4058
4059	if (disabled == 0 || capable(CAP_SYS_ADMIN))
4060		return true;
4061
4062	io_uring_group = make_kgid(&init_user_ns, sysctl_io_uring_group);
4063	if (!gid_valid(io_uring_group))
4064		return false;
4065
4066	return in_group_p(io_uring_group);
4067}
4068
4069SYSCALL_DEFINE2(io_uring_setup, u32, entries,
4070		struct io_uring_params __user *, params)
4071{
4072	if (!io_uring_allowed())
4073		return -EPERM;
4074
4075	return io_uring_setup(entries, params);
4076}
4077
4078static __cold int io_probe(struct io_ring_ctx *ctx, void __user *arg,
4079			   unsigned nr_args)
4080{
4081	struct io_uring_probe *p;
4082	size_t size;
4083	int i, ret;
4084
4085	size = struct_size(p, ops, nr_args);
4086	if (size == SIZE_MAX)
4087		return -EOVERFLOW;
4088	p = kzalloc(size, GFP_KERNEL);
4089	if (!p)
4090		return -ENOMEM;
4091
4092	ret = -EFAULT;
4093	if (copy_from_user(p, arg, size))
4094		goto out;
4095	ret = -EINVAL;
4096	if (memchr_inv(p, 0, size))
4097		goto out;
4098
4099	p->last_op = IORING_OP_LAST - 1;
4100	if (nr_args > IORING_OP_LAST)
4101		nr_args = IORING_OP_LAST;
4102
4103	for (i = 0; i < nr_args; i++) {
4104		p->ops[i].op = i;
4105		if (!io_issue_defs[i].not_supported)
4106			p->ops[i].flags = IO_URING_OP_SUPPORTED;
4107	}
4108	p->ops_len = i;
4109
4110	ret = 0;
4111	if (copy_to_user(arg, p, size))
4112		ret = -EFAULT;
4113out:
4114	kfree(p);
4115	return ret;
4116}
4117
4118static int io_register_personality(struct io_ring_ctx *ctx)
4119{
4120	const struct cred *creds;
4121	u32 id;
4122	int ret;
4123
4124	creds = get_current_cred();
4125
4126	ret = xa_alloc_cyclic(&ctx->personalities, &id, (void *)creds,
4127			XA_LIMIT(0, USHRT_MAX), &ctx->pers_next, GFP_KERNEL);
4128	if (ret < 0) {
4129		put_cred(creds);
4130		return ret;
4131	}
4132	return id;
4133}
4134
4135static __cold int io_register_restrictions(struct io_ring_ctx *ctx,
4136					   void __user *arg, unsigned int nr_args)
4137{
4138	struct io_uring_restriction *res;
4139	size_t size;
4140	int i, ret;
4141
4142	/* Restrictions allowed only if rings started disabled */
4143	if (!(ctx->flags & IORING_SETUP_R_DISABLED))
4144		return -EBADFD;
4145
4146	/* We allow only a single restrictions registration */
4147	if (ctx->restrictions.registered)
4148		return -EBUSY;
4149
4150	if (!arg || nr_args > IORING_MAX_RESTRICTIONS)
4151		return -EINVAL;
4152
4153	size = array_size(nr_args, sizeof(*res));
4154	if (size == SIZE_MAX)
4155		return -EOVERFLOW;
4156
4157	res = memdup_user(arg, size);
4158	if (IS_ERR(res))
4159		return PTR_ERR(res);
4160
4161	ret = 0;
4162
4163	for (i = 0; i < nr_args; i++) {
4164		switch (res[i].opcode) {
4165		case IORING_RESTRICTION_REGISTER_OP:
4166			if (res[i].register_op >= IORING_REGISTER_LAST) {
4167				ret = -EINVAL;
4168				goto out;
4169			}
4170
4171			__set_bit(res[i].register_op,
4172				  ctx->restrictions.register_op);
4173			break;
4174		case IORING_RESTRICTION_SQE_OP:
4175			if (res[i].sqe_op >= IORING_OP_LAST) {
4176				ret = -EINVAL;
4177				goto out;
4178			}
4179
4180			__set_bit(res[i].sqe_op, ctx->restrictions.sqe_op);
4181			break;
4182		case IORING_RESTRICTION_SQE_FLAGS_ALLOWED:
4183			ctx->restrictions.sqe_flags_allowed = res[i].sqe_flags;
4184			break;
4185		case IORING_RESTRICTION_SQE_FLAGS_REQUIRED:
4186			ctx->restrictions.sqe_flags_required = res[i].sqe_flags;
4187			break;
4188		default:
4189			ret = -EINVAL;
4190			goto out;
4191		}
4192	}
4193
4194out:
4195	/* Reset all restrictions if an error happened */
4196	if (ret != 0)
4197		memset(&ctx->restrictions, 0, sizeof(ctx->restrictions));
4198	else
4199		ctx->restrictions.registered = true;
4200
4201	kfree(res);
4202	return ret;
4203}
4204
4205static int io_register_enable_rings(struct io_ring_ctx *ctx)
4206{
4207	if (!(ctx->flags & IORING_SETUP_R_DISABLED))
4208		return -EBADFD;
4209
4210	if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task) {
4211		WRITE_ONCE(ctx->submitter_task, get_task_struct(current));
4212		/*
4213		 * Lazy activation attempts would fail if it was polled before
4214		 * submitter_task is set.
4215		 */
4216		if (wq_has_sleeper(&ctx->poll_wq))
4217			io_activate_pollwq(ctx);
4218	}
4219
4220	if (ctx->restrictions.registered)
4221		ctx->restricted = 1;
4222
4223	ctx->flags &= ~IORING_SETUP_R_DISABLED;
4224	if (ctx->sq_data && wq_has_sleeper(&ctx->sq_data->wait))
4225		wake_up(&ctx->sq_data->wait);
4226	return 0;
4227}
4228
4229static __cold int __io_register_iowq_aff(struct io_ring_ctx *ctx,
4230					 cpumask_var_t new_mask)
4231{
4232	int ret;
4233
4234	if (!(ctx->flags & IORING_SETUP_SQPOLL)) {
4235		ret = io_wq_cpu_affinity(current->io_uring, new_mask);
4236	} else {
4237		mutex_unlock(&ctx->uring_lock);
4238		ret = io_sqpoll_wq_cpu_affinity(ctx, new_mask);
4239		mutex_lock(&ctx->uring_lock);
4240	}
4241
4242	return ret;
4243}
4244
4245static __cold int io_register_iowq_aff(struct io_ring_ctx *ctx,
4246				       void __user *arg, unsigned len)
4247{
4248	cpumask_var_t new_mask;
4249	int ret;
4250
4251	if (!alloc_cpumask_var(&new_mask, GFP_KERNEL))
4252		return -ENOMEM;
4253
4254	cpumask_clear(new_mask);
4255	if (len > cpumask_size())
4256		len = cpumask_size();
4257
4258	if (in_compat_syscall()) {
4259		ret = compat_get_bitmap(cpumask_bits(new_mask),
4260					(const compat_ulong_t __user *)arg,
4261					len * 8 /* CHAR_BIT */);
4262	} else {
4263		ret = copy_from_user(new_mask, arg, len);
4264	}
4265
4266	if (ret) {
4267		free_cpumask_var(new_mask);
4268		return -EFAULT;
4269	}
4270
4271	ret = __io_register_iowq_aff(ctx, new_mask);
4272	free_cpumask_var(new_mask);
4273	return ret;
4274}
4275
4276static __cold int io_unregister_iowq_aff(struct io_ring_ctx *ctx)
4277{
4278	return __io_register_iowq_aff(ctx, NULL);
4279}
4280
4281static __cold int io_register_iowq_max_workers(struct io_ring_ctx *ctx,
4282					       void __user *arg)
4283	__must_hold(&ctx->uring_lock)
4284{
4285	struct io_tctx_node *node;
4286	struct io_uring_task *tctx = NULL;
4287	struct io_sq_data *sqd = NULL;
4288	__u32 new_count[2];
4289	int i, ret;
4290
4291	if (copy_from_user(new_count, arg, sizeof(new_count)))
4292		return -EFAULT;
4293	for (i = 0; i < ARRAY_SIZE(new_count); i++)
4294		if (new_count[i] > INT_MAX)
4295			return -EINVAL;
4296
4297	if (ctx->flags & IORING_SETUP_SQPOLL) {
4298		sqd = ctx->sq_data;
4299		if (sqd) {
4300			/*
4301			 * Observe the correct sqd->lock -> ctx->uring_lock
4302			 * ordering. Fine to drop uring_lock here, we hold
4303			 * a ref to the ctx.
4304			 */
4305			refcount_inc(&sqd->refs);
4306			mutex_unlock(&ctx->uring_lock);
4307			mutex_lock(&sqd->lock);
4308			mutex_lock(&ctx->uring_lock);
4309			if (sqd->thread)
4310				tctx = sqd->thread->io_uring;
4311		}
4312	} else {
4313		tctx = current->io_uring;
4314	}
4315
4316	BUILD_BUG_ON(sizeof(new_count) != sizeof(ctx->iowq_limits));
4317
4318	for (i = 0; i < ARRAY_SIZE(new_count); i++)
4319		if (new_count[i])
4320			ctx->iowq_limits[i] = new_count[i];
4321	ctx->iowq_limits_set = true;
4322
4323	if (tctx && tctx->io_wq) {
4324		ret = io_wq_max_workers(tctx->io_wq, new_count);
4325		if (ret)
4326			goto err;
4327	} else {
4328		memset(new_count, 0, sizeof(new_count));
4329	}
4330
4331	if (sqd) {
4332		mutex_unlock(&sqd->lock);
4333		io_put_sq_data(sqd);
4334	}
4335
4336	if (copy_to_user(arg, new_count, sizeof(new_count)))
4337		return -EFAULT;
4338
4339	/* that's it for SQPOLL, only the SQPOLL task creates requests */
4340	if (sqd)
4341		return 0;
4342
4343	/* now propagate the restriction to all registered users */
4344	list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
4345		struct io_uring_task *tctx = node->task->io_uring;
4346
4347		if (WARN_ON_ONCE(!tctx->io_wq))
4348			continue;
4349
4350		for (i = 0; i < ARRAY_SIZE(new_count); i++)
4351			new_count[i] = ctx->iowq_limits[i];
4352		/* ignore errors, it always returns zero anyway */
4353		(void)io_wq_max_workers(tctx->io_wq, new_count);
4354	}
4355	return 0;
4356err:
4357	if (sqd) {
4358		mutex_unlock(&sqd->lock);
4359		io_put_sq_data(sqd);
4360	}
4361	return ret;
4362}
4363
4364static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
4365			       void __user *arg, unsigned nr_args)
4366	__releases(ctx->uring_lock)
4367	__acquires(ctx->uring_lock)
4368{
4369	int ret;
4370
4371	/*
4372	 * We don't quiesce the refs for register anymore and so it can't be
4373	 * dying as we're holding a file ref here.
4374	 */
4375	if (WARN_ON_ONCE(percpu_ref_is_dying(&ctx->refs)))
4376		return -ENXIO;
4377
4378	if (ctx->submitter_task && ctx->submitter_task != current)
4379		return -EEXIST;
4380
4381	if (ctx->restricted) {
4382		opcode = array_index_nospec(opcode, IORING_REGISTER_LAST);
4383		if (!test_bit(opcode, ctx->restrictions.register_op))
4384			return -EACCES;
4385	}
4386
4387	switch (opcode) {
4388	case IORING_REGISTER_BUFFERS:
4389		ret = -EFAULT;
4390		if (!arg)
4391			break;
4392		ret = io_sqe_buffers_register(ctx, arg, nr_args, NULL);
4393		break;
4394	case IORING_UNREGISTER_BUFFERS:
4395		ret = -EINVAL;
4396		if (arg || nr_args)
4397			break;
4398		ret = io_sqe_buffers_unregister(ctx);
4399		break;
4400	case IORING_REGISTER_FILES:
4401		ret = -EFAULT;
4402		if (!arg)
4403			break;
4404		ret = io_sqe_files_register(ctx, arg, nr_args, NULL);
4405		break;
4406	case IORING_UNREGISTER_FILES:
4407		ret = -EINVAL;
4408		if (arg || nr_args)
4409			break;
4410		ret = io_sqe_files_unregister(ctx);
4411		break;
4412	case IORING_REGISTER_FILES_UPDATE:
4413		ret = io_register_files_update(ctx, arg, nr_args);
4414		break;
4415	case IORING_REGISTER_EVENTFD:
4416		ret = -EINVAL;
4417		if (nr_args != 1)
4418			break;
4419		ret = io_eventfd_register(ctx, arg, 0);
4420		break;
4421	case IORING_REGISTER_EVENTFD_ASYNC:
4422		ret = -EINVAL;
4423		if (nr_args != 1)
4424			break;
4425		ret = io_eventfd_register(ctx, arg, 1);
4426		break;
4427	case IORING_UNREGISTER_EVENTFD:
4428		ret = -EINVAL;
4429		if (arg || nr_args)
4430			break;
4431		ret = io_eventfd_unregister(ctx);
4432		break;
4433	case IORING_REGISTER_PROBE:
4434		ret = -EINVAL;
4435		if (!arg || nr_args > 256)
4436			break;
4437		ret = io_probe(ctx, arg, nr_args);
4438		break;
4439	case IORING_REGISTER_PERSONALITY:
4440		ret = -EINVAL;
4441		if (arg || nr_args)
4442			break;
4443		ret = io_register_personality(ctx);
4444		break;
4445	case IORING_UNREGISTER_PERSONALITY:
4446		ret = -EINVAL;
4447		if (arg)
4448			break;
4449		ret = io_unregister_personality(ctx, nr_args);
4450		break;
4451	case IORING_REGISTER_ENABLE_RINGS:
4452		ret = -EINVAL;
4453		if (arg || nr_args)
4454			break;
4455		ret = io_register_enable_rings(ctx);
4456		break;
4457	case IORING_REGISTER_RESTRICTIONS:
4458		ret = io_register_restrictions(ctx, arg, nr_args);
4459		break;
4460	case IORING_REGISTER_FILES2:
4461		ret = io_register_rsrc(ctx, arg, nr_args, IORING_RSRC_FILE);
4462		break;
4463	case IORING_REGISTER_FILES_UPDATE2:
4464		ret = io_register_rsrc_update(ctx, arg, nr_args,
4465					      IORING_RSRC_FILE);
4466		break;
4467	case IORING_REGISTER_BUFFERS2:
4468		ret = io_register_rsrc(ctx, arg, nr_args, IORING_RSRC_BUFFER);
4469		break;
4470	case IORING_REGISTER_BUFFERS_UPDATE:
4471		ret = io_register_rsrc_update(ctx, arg, nr_args,
4472					      IORING_RSRC_BUFFER);
4473		break;
4474	case IORING_REGISTER_IOWQ_AFF:
4475		ret = -EINVAL;
4476		if (!arg || !nr_args)
4477			break;
4478		ret = io_register_iowq_aff(ctx, arg, nr_args);
4479		break;
4480	case IORING_UNREGISTER_IOWQ_AFF:
4481		ret = -EINVAL;
4482		if (arg || nr_args)
4483			break;
4484		ret = io_unregister_iowq_aff(ctx);
4485		break;
4486	case IORING_REGISTER_IOWQ_MAX_WORKERS:
4487		ret = -EINVAL;
4488		if (!arg || nr_args != 2)
4489			break;
4490		ret = io_register_iowq_max_workers(ctx, arg);
4491		break;
4492	case IORING_REGISTER_RING_FDS:
4493		ret = io_ringfd_register(ctx, arg, nr_args);
4494		break;
4495	case IORING_UNREGISTER_RING_FDS:
4496		ret = io_ringfd_unregister(ctx, arg, nr_args);
4497		break;
4498	case IORING_REGISTER_PBUF_RING:
4499		ret = -EINVAL;
4500		if (!arg || nr_args != 1)
4501			break;
4502		ret = io_register_pbuf_ring(ctx, arg);
4503		break;
4504	case IORING_UNREGISTER_PBUF_RING:
4505		ret = -EINVAL;
4506		if (!arg || nr_args != 1)
4507			break;
4508		ret = io_unregister_pbuf_ring(ctx, arg);
4509		break;
4510	case IORING_REGISTER_SYNC_CANCEL:
4511		ret = -EINVAL;
4512		if (!arg || nr_args != 1)
4513			break;
4514		ret = io_sync_cancel(ctx, arg);
4515		break;
4516	case IORING_REGISTER_FILE_ALLOC_RANGE:
4517		ret = -EINVAL;
4518		if (!arg || nr_args)
4519			break;
4520		ret = io_register_file_alloc_range(ctx, arg);
4521		break;
4522	default:
4523		ret = -EINVAL;
4524		break;
4525	}
4526
4527	return ret;
4528}
4529
4530SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
4531		void __user *, arg, unsigned int, nr_args)
4532{
4533	struct io_ring_ctx *ctx;
4534	long ret = -EBADF;
4535	struct file *file;
4536	bool use_registered_ring;
4537
4538	use_registered_ring = !!(opcode & IORING_REGISTER_USE_REGISTERED_RING);
4539	opcode &= ~IORING_REGISTER_USE_REGISTERED_RING;
4540
4541	if (opcode >= IORING_REGISTER_LAST)
4542		return -EINVAL;
4543
4544	if (use_registered_ring) {
4545		/*
4546		 * Ring fd has been registered via IORING_REGISTER_RING_FDS, we
4547		 * need only dereference our task private array to find it.
4548		 */
4549		struct io_uring_task *tctx = current->io_uring;
4550
4551		if (unlikely(!tctx || fd >= IO_RINGFD_REG_MAX))
4552			return -EINVAL;
4553		fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
4554		file = tctx->registered_rings[fd];
4555		if (unlikely(!file))
4556			return -EBADF;
4557	} else {
4558		file = fget(fd);
4559		if (unlikely(!file))
4560			return -EBADF;
4561		ret = -EOPNOTSUPP;
4562		if (!io_is_uring_fops(file))
4563			goto out_fput;
4564	}
4565
4566	ctx = file->private_data;
4567
4568	mutex_lock(&ctx->uring_lock);
4569	ret = __io_uring_register(ctx, opcode, arg, nr_args);
4570	mutex_unlock(&ctx->uring_lock);
4571	trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs, ret);
4572out_fput:
4573	if (!use_registered_ring)
4574		fput(file);
4575	return ret;
4576}
4577
4578static int __init io_uring_init(void)
4579{
4580#define __BUILD_BUG_VERIFY_OFFSET_SIZE(stype, eoffset, esize, ename) do { \
4581	BUILD_BUG_ON(offsetof(stype, ename) != eoffset); \
4582	BUILD_BUG_ON(sizeof_field(stype, ename) != esize); \
4583} while (0)
4584
4585#define BUILD_BUG_SQE_ELEM(eoffset, etype, ename) \
4586	__BUILD_BUG_VERIFY_OFFSET_SIZE(struct io_uring_sqe, eoffset, sizeof(etype), ename)
4587#define BUILD_BUG_SQE_ELEM_SIZE(eoffset, esize, ename) \
4588	__BUILD_BUG_VERIFY_OFFSET_SIZE(struct io_uring_sqe, eoffset, esize, ename)
4589	BUILD_BUG_ON(sizeof(struct io_uring_sqe) != 64);
4590	BUILD_BUG_SQE_ELEM(0,  __u8,   opcode);
4591	BUILD_BUG_SQE_ELEM(1,  __u8,   flags);
4592	BUILD_BUG_SQE_ELEM(2,  __u16,  ioprio);
4593	BUILD_BUG_SQE_ELEM(4,  __s32,  fd);
4594	BUILD_BUG_SQE_ELEM(8,  __u64,  off);
4595	BUILD_BUG_SQE_ELEM(8,  __u64,  addr2);
4596	BUILD_BUG_SQE_ELEM(8,  __u32,  cmd_op);
4597	BUILD_BUG_SQE_ELEM(12, __u32, __pad1);
4598	BUILD_BUG_SQE_ELEM(16, __u64,  addr);
4599	BUILD_BUG_SQE_ELEM(16, __u64,  splice_off_in);
4600	BUILD_BUG_SQE_ELEM(24, __u32,  len);
4601	BUILD_BUG_SQE_ELEM(28,     __kernel_rwf_t, rw_flags);
4602	BUILD_BUG_SQE_ELEM(28, /* compat */   int, rw_flags);
4603	BUILD_BUG_SQE_ELEM(28, /* compat */ __u32, rw_flags);
4604	BUILD_BUG_SQE_ELEM(28, __u32,  fsync_flags);
4605	BUILD_BUG_SQE_ELEM(28, /* compat */ __u16,  poll_events);
4606	BUILD_BUG_SQE_ELEM(28, __u32,  poll32_events);
4607	BUILD_BUG_SQE_ELEM(28, __u32,  sync_range_flags);
4608	BUILD_BUG_SQE_ELEM(28, __u32,  msg_flags);
4609	BUILD_BUG_SQE_ELEM(28, __u32,  timeout_flags);
4610	BUILD_BUG_SQE_ELEM(28, __u32,  accept_flags);
4611	BUILD_BUG_SQE_ELEM(28, __u32,  cancel_flags);
4612	BUILD_BUG_SQE_ELEM(28, __u32,  open_flags);
4613	BUILD_BUG_SQE_ELEM(28, __u32,  statx_flags);
4614	BUILD_BUG_SQE_ELEM(28, __u32,  fadvise_advice);
4615	BUILD_BUG_SQE_ELEM(28, __u32,  splice_flags);
4616	BUILD_BUG_SQE_ELEM(28, __u32,  rename_flags);
4617	BUILD_BUG_SQE_ELEM(28, __u32,  unlink_flags);
4618	BUILD_BUG_SQE_ELEM(28, __u32,  hardlink_flags);
4619	BUILD_BUG_SQE_ELEM(28, __u32,  xattr_flags);
4620	BUILD_BUG_SQE_ELEM(28, __u32,  msg_ring_flags);
4621	BUILD_BUG_SQE_ELEM(32, __u64,  user_data);
4622	BUILD_BUG_SQE_ELEM(40, __u16,  buf_index);
4623	BUILD_BUG_SQE_ELEM(40, __u16,  buf_group);
4624	BUILD_BUG_SQE_ELEM(42, __u16,  personality);
4625	BUILD_BUG_SQE_ELEM(44, __s32,  splice_fd_in);
4626	BUILD_BUG_SQE_ELEM(44, __u32,  file_index);
4627	BUILD_BUG_SQE_ELEM(44, __u16,  addr_len);
4628	BUILD_BUG_SQE_ELEM(46, __u16,  __pad3[0]);
4629	BUILD_BUG_SQE_ELEM(48, __u64,  addr3);
4630	BUILD_BUG_SQE_ELEM_SIZE(48, 0, cmd);
4631	BUILD_BUG_SQE_ELEM(56, __u64,  __pad2);
4632
4633	BUILD_BUG_ON(sizeof(struct io_uring_files_update) !=
4634		     sizeof(struct io_uring_rsrc_update));
4635	BUILD_BUG_ON(sizeof(struct io_uring_rsrc_update) >
4636		     sizeof(struct io_uring_rsrc_update2));
4637
4638	/* ->buf_index is u16 */
4639	BUILD_BUG_ON(offsetof(struct io_uring_buf_ring, bufs) != 0);
4640	BUILD_BUG_ON(offsetof(struct io_uring_buf, resv) !=
4641		     offsetof(struct io_uring_buf_ring, tail));
4642
4643	/* should fit into one byte */
4644	BUILD_BUG_ON(SQE_VALID_FLAGS >= (1 << 8));
4645	BUILD_BUG_ON(SQE_COMMON_FLAGS >= (1 << 8));
4646	BUILD_BUG_ON((SQE_VALID_FLAGS | SQE_COMMON_FLAGS) != SQE_VALID_FLAGS);
4647
4648	BUILD_BUG_ON(__REQ_F_LAST_BIT > 8 * sizeof(int));
4649
4650	BUILD_BUG_ON(sizeof(atomic_t) != sizeof(u32));
4651
4652	io_uring_optable_init();
4653
4654	/*
4655	 * Allow user copy in the per-command field, which starts after the
4656	 * file in io_kiocb and until the opcode field. The openat2 handling
4657	 * requires copying in user memory into the io_kiocb object in that
4658	 * range, and HARDENED_USERCOPY will complain if we haven't
4659	 * correctly annotated this range.
4660	 */
4661	req_cachep = kmem_cache_create_usercopy("io_kiocb",
4662				sizeof(struct io_kiocb), 0,
4663				SLAB_HWCACHE_ALIGN | SLAB_PANIC |
4664				SLAB_ACCOUNT | SLAB_TYPESAFE_BY_RCU,
4665				offsetof(struct io_kiocb, cmd.data),
4666				sizeof_field(struct io_kiocb, cmd.data), NULL);
4667
4668#ifdef CONFIG_SYSCTL
4669	register_sysctl_init("kernel", kernel_io_uring_disabled_table);
4670#endif
4671
4672	return 0;
4673};
4674__initcall(io_uring_init);
4675