xref: /kernel/linux/linux-6.6/fs/nfs/direct.c (revision 62306a36)
1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * linux/fs/nfs/direct.c
4 *
5 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
6 *
7 * High-performance uncached I/O for the Linux NFS client
8 *
9 * There are important applications whose performance or correctness
10 * depends on uncached access to file data.  Database clusters
11 * (multiple copies of the same instance running on separate hosts)
12 * implement their own cache coherency protocol that subsumes file
13 * system cache protocols.  Applications that process datasets
14 * considerably larger than the client's memory do not always benefit
15 * from a local cache.  A streaming video server, for instance, has no
16 * need to cache the contents of a file.
17 *
18 * When an application requests uncached I/O, all read and write requests
19 * are made directly to the server; data stored or fetched via these
20 * requests is not cached in the Linux page cache.  The client does not
21 * correct unaligned requests from applications.  All requested bytes are
22 * held on permanent storage before a direct write system call returns to
23 * an application.
24 *
25 * Solaris implements an uncached I/O facility called directio() that
26 * is used for backups and sequential I/O to very large files.  Solaris
27 * also supports uncaching whole NFS partitions with "-o forcedirectio,"
28 * an undocumented mount option.
29 *
30 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
31 * help from Andrew Morton.
32 *
33 * 18 Dec 2001	Initial implementation for 2.4  --cel
34 * 08 Jul 2002	Version for 2.4.19, with bug fixes --trondmy
35 * 08 Jun 2003	Port to 2.5 APIs  --cel
36 * 31 Mar 2004	Handle direct I/O without VFS support  --cel
37 * 15 Sep 2004	Parallel async reads  --cel
38 * 04 May 2005	support O_DIRECT with aio  --cel
39 *
40 */
41
42#include <linux/errno.h>
43#include <linux/sched.h>
44#include <linux/kernel.h>
45#include <linux/file.h>
46#include <linux/pagemap.h>
47#include <linux/kref.h>
48#include <linux/slab.h>
49#include <linux/task_io_accounting_ops.h>
50#include <linux/module.h>
51
52#include <linux/nfs_fs.h>
53#include <linux/nfs_page.h>
54#include <linux/sunrpc/clnt.h>
55
56#include <linux/uaccess.h>
57#include <linux/atomic.h>
58
59#include "internal.h"
60#include "iostat.h"
61#include "pnfs.h"
62#include "fscache.h"
63#include "nfstrace.h"
64
65#define NFSDBG_FACILITY		NFSDBG_VFS
66
67static struct kmem_cache *nfs_direct_cachep;
68
69static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops;
70static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops;
71static void nfs_direct_write_complete(struct nfs_direct_req *dreq);
72static void nfs_direct_write_schedule_work(struct work_struct *work);
73
74static inline void get_dreq(struct nfs_direct_req *dreq)
75{
76	atomic_inc(&dreq->io_count);
77}
78
79static inline int put_dreq(struct nfs_direct_req *dreq)
80{
81	return atomic_dec_and_test(&dreq->io_count);
82}
83
84static void
85nfs_direct_handle_truncated(struct nfs_direct_req *dreq,
86			    const struct nfs_pgio_header *hdr,
87			    ssize_t dreq_len)
88{
89	if (!(test_bit(NFS_IOHDR_ERROR, &hdr->flags) ||
90	      test_bit(NFS_IOHDR_EOF, &hdr->flags)))
91		return;
92	if (dreq->max_count >= dreq_len) {
93		dreq->max_count = dreq_len;
94		if (dreq->count > dreq_len)
95			dreq->count = dreq_len;
96	}
97
98	if (test_bit(NFS_IOHDR_ERROR, &hdr->flags) && !dreq->error)
99		dreq->error = hdr->error;
100}
101
102static void
103nfs_direct_count_bytes(struct nfs_direct_req *dreq,
104		       const struct nfs_pgio_header *hdr)
105{
106	loff_t hdr_end = hdr->io_start + hdr->good_bytes;
107	ssize_t dreq_len = 0;
108
109	if (hdr_end > dreq->io_start)
110		dreq_len = hdr_end - dreq->io_start;
111
112	nfs_direct_handle_truncated(dreq, hdr, dreq_len);
113
114	if (dreq_len > dreq->max_count)
115		dreq_len = dreq->max_count;
116
117	if (dreq->count < dreq_len)
118		dreq->count = dreq_len;
119}
120
121static void nfs_direct_truncate_request(struct nfs_direct_req *dreq,
122					struct nfs_page *req)
123{
124	loff_t offs = req_offset(req);
125	size_t req_start = (size_t)(offs - dreq->io_start);
126
127	if (req_start < dreq->max_count)
128		dreq->max_count = req_start;
129	if (req_start < dreq->count)
130		dreq->count = req_start;
131}
132
133/**
134 * nfs_swap_rw - NFS address space operation for swap I/O
135 * @iocb: target I/O control block
136 * @iter: I/O buffer
137 *
138 * Perform IO to the swap-file.  This is much like direct IO.
139 */
140int nfs_swap_rw(struct kiocb *iocb, struct iov_iter *iter)
141{
142	ssize_t ret;
143
144	VM_BUG_ON(iov_iter_count(iter) != PAGE_SIZE);
145
146	if (iov_iter_rw(iter) == READ)
147		ret = nfs_file_direct_read(iocb, iter, true);
148	else
149		ret = nfs_file_direct_write(iocb, iter, true);
150	if (ret < 0)
151		return ret;
152	return 0;
153}
154
155static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
156{
157	unsigned int i;
158	for (i = 0; i < npages; i++)
159		put_page(pages[i]);
160}
161
162void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
163			      struct nfs_direct_req *dreq)
164{
165	cinfo->inode = dreq->inode;
166	cinfo->mds = &dreq->mds_cinfo;
167	cinfo->ds = &dreq->ds_cinfo;
168	cinfo->dreq = dreq;
169	cinfo->completion_ops = &nfs_direct_commit_completion_ops;
170}
171
172static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
173{
174	struct nfs_direct_req *dreq;
175
176	dreq = kmem_cache_zalloc(nfs_direct_cachep, GFP_KERNEL);
177	if (!dreq)
178		return NULL;
179
180	kref_init(&dreq->kref);
181	kref_get(&dreq->kref);
182	init_completion(&dreq->completion);
183	INIT_LIST_HEAD(&dreq->mds_cinfo.list);
184	pnfs_init_ds_commit_info(&dreq->ds_cinfo);
185	INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
186	spin_lock_init(&dreq->lock);
187
188	return dreq;
189}
190
191static void nfs_direct_req_free(struct kref *kref)
192{
193	struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
194
195	pnfs_release_ds_info(&dreq->ds_cinfo, dreq->inode);
196	if (dreq->l_ctx != NULL)
197		nfs_put_lock_context(dreq->l_ctx);
198	if (dreq->ctx != NULL)
199		put_nfs_open_context(dreq->ctx);
200	kmem_cache_free(nfs_direct_cachep, dreq);
201}
202
203static void nfs_direct_req_release(struct nfs_direct_req *dreq)
204{
205	kref_put(&dreq->kref, nfs_direct_req_free);
206}
207
208ssize_t nfs_dreq_bytes_left(struct nfs_direct_req *dreq, loff_t offset)
209{
210	loff_t start = offset - dreq->io_start;
211	return dreq->max_count - start;
212}
213EXPORT_SYMBOL_GPL(nfs_dreq_bytes_left);
214
215/*
216 * Collects and returns the final error value/byte-count.
217 */
218static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
219{
220	ssize_t result = -EIOCBQUEUED;
221
222	/* Async requests don't wait here */
223	if (dreq->iocb)
224		goto out;
225
226	result = wait_for_completion_killable(&dreq->completion);
227
228	if (!result) {
229		result = dreq->count;
230		WARN_ON_ONCE(dreq->count < 0);
231	}
232	if (!result)
233		result = dreq->error;
234
235out:
236	return (ssize_t) result;
237}
238
239/*
240 * Synchronous I/O uses a stack-allocated iocb.  Thus we can't trust
241 * the iocb is still valid here if this is a synchronous request.
242 */
243static void nfs_direct_complete(struct nfs_direct_req *dreq)
244{
245	struct inode *inode = dreq->inode;
246
247	inode_dio_end(inode);
248
249	if (dreq->iocb) {
250		long res = (long) dreq->error;
251		if (dreq->count != 0) {
252			res = (long) dreq->count;
253			WARN_ON_ONCE(dreq->count < 0);
254		}
255		dreq->iocb->ki_complete(dreq->iocb, res);
256	}
257
258	complete(&dreq->completion);
259
260	nfs_direct_req_release(dreq);
261}
262
263static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
264{
265	unsigned long bytes = 0;
266	struct nfs_direct_req *dreq = hdr->dreq;
267
268	spin_lock(&dreq->lock);
269	if (test_bit(NFS_IOHDR_REDO, &hdr->flags)) {
270		spin_unlock(&dreq->lock);
271		goto out_put;
272	}
273
274	nfs_direct_count_bytes(dreq, hdr);
275	spin_unlock(&dreq->lock);
276
277	while (!list_empty(&hdr->pages)) {
278		struct nfs_page *req = nfs_list_entry(hdr->pages.next);
279		struct page *page = req->wb_page;
280
281		if (!PageCompound(page) && bytes < hdr->good_bytes &&
282		    (dreq->flags == NFS_ODIRECT_SHOULD_DIRTY))
283			set_page_dirty(page);
284		bytes += req->wb_bytes;
285		nfs_list_remove_request(req);
286		nfs_release_request(req);
287	}
288out_put:
289	if (put_dreq(dreq))
290		nfs_direct_complete(dreq);
291	hdr->release(hdr);
292}
293
294static void nfs_read_sync_pgio_error(struct list_head *head, int error)
295{
296	struct nfs_page *req;
297
298	while (!list_empty(head)) {
299		req = nfs_list_entry(head->next);
300		nfs_list_remove_request(req);
301		nfs_release_request(req);
302	}
303}
304
305static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
306{
307	get_dreq(hdr->dreq);
308}
309
310static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
311	.error_cleanup = nfs_read_sync_pgio_error,
312	.init_hdr = nfs_direct_pgio_init,
313	.completion = nfs_direct_read_completion,
314};
315
316/*
317 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
318 * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
319 * bail and stop sending more reads.  Read length accounting is
320 * handled automatically by nfs_direct_read_result().  Otherwise, if
321 * no requests have been sent, just return an error.
322 */
323
324static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
325					      struct iov_iter *iter,
326					      loff_t pos)
327{
328	struct nfs_pageio_descriptor desc;
329	struct inode *inode = dreq->inode;
330	ssize_t result = -EINVAL;
331	size_t requested_bytes = 0;
332	size_t rsize = max_t(size_t, NFS_SERVER(inode)->rsize, PAGE_SIZE);
333
334	nfs_pageio_init_read(&desc, dreq->inode, false,
335			     &nfs_direct_read_completion_ops);
336	get_dreq(dreq);
337	desc.pg_dreq = dreq;
338	inode_dio_begin(inode);
339
340	while (iov_iter_count(iter)) {
341		struct page **pagevec;
342		size_t bytes;
343		size_t pgbase;
344		unsigned npages, i;
345
346		result = iov_iter_get_pages_alloc2(iter, &pagevec,
347						  rsize, &pgbase);
348		if (result < 0)
349			break;
350
351		bytes = result;
352		npages = (result + pgbase + PAGE_SIZE - 1) / PAGE_SIZE;
353		for (i = 0; i < npages; i++) {
354			struct nfs_page *req;
355			unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
356			/* XXX do we need to do the eof zeroing found in async_filler? */
357			req = nfs_page_create_from_page(dreq->ctx, pagevec[i],
358							pgbase, pos, req_len);
359			if (IS_ERR(req)) {
360				result = PTR_ERR(req);
361				break;
362			}
363			if (!nfs_pageio_add_request(&desc, req)) {
364				result = desc.pg_error;
365				nfs_release_request(req);
366				break;
367			}
368			pgbase = 0;
369			bytes -= req_len;
370			requested_bytes += req_len;
371			pos += req_len;
372			dreq->bytes_left -= req_len;
373		}
374		nfs_direct_release_pages(pagevec, npages);
375		kvfree(pagevec);
376		if (result < 0)
377			break;
378	}
379
380	nfs_pageio_complete(&desc);
381
382	/*
383	 * If no bytes were started, return the error, and let the
384	 * generic layer handle the completion.
385	 */
386	if (requested_bytes == 0) {
387		inode_dio_end(inode);
388		nfs_direct_req_release(dreq);
389		return result < 0 ? result : -EIO;
390	}
391
392	if (put_dreq(dreq))
393		nfs_direct_complete(dreq);
394	return requested_bytes;
395}
396
397/**
398 * nfs_file_direct_read - file direct read operation for NFS files
399 * @iocb: target I/O control block
400 * @iter: vector of user buffers into which to read data
401 * @swap: flag indicating this is swap IO, not O_DIRECT IO
402 *
403 * We use this function for direct reads instead of calling
404 * generic_file_aio_read() in order to avoid gfar's check to see if
405 * the request starts before the end of the file.  For that check
406 * to work, we must generate a GETATTR before each direct read, and
407 * even then there is a window between the GETATTR and the subsequent
408 * READ where the file size could change.  Our preference is simply
409 * to do all reads the application wants, and the server will take
410 * care of managing the end of file boundary.
411 *
412 * This function also eliminates unnecessarily updating the file's
413 * atime locally, as the NFS server sets the file's atime, and this
414 * client must read the updated atime from the server back into its
415 * cache.
416 */
417ssize_t nfs_file_direct_read(struct kiocb *iocb, struct iov_iter *iter,
418			     bool swap)
419{
420	struct file *file = iocb->ki_filp;
421	struct address_space *mapping = file->f_mapping;
422	struct inode *inode = mapping->host;
423	struct nfs_direct_req *dreq;
424	struct nfs_lock_context *l_ctx;
425	ssize_t result, requested;
426	size_t count = iov_iter_count(iter);
427	nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
428
429	dfprintk(FILE, "NFS: direct read(%pD2, %zd@%Ld)\n",
430		file, count, (long long) iocb->ki_pos);
431
432	result = 0;
433	if (!count)
434		goto out;
435
436	task_io_account_read(count);
437
438	result = -ENOMEM;
439	dreq = nfs_direct_req_alloc();
440	if (dreq == NULL)
441		goto out;
442
443	dreq->inode = inode;
444	dreq->bytes_left = dreq->max_count = count;
445	dreq->io_start = iocb->ki_pos;
446	dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
447	l_ctx = nfs_get_lock_context(dreq->ctx);
448	if (IS_ERR(l_ctx)) {
449		result = PTR_ERR(l_ctx);
450		nfs_direct_req_release(dreq);
451		goto out_release;
452	}
453	dreq->l_ctx = l_ctx;
454	if (!is_sync_kiocb(iocb))
455		dreq->iocb = iocb;
456
457	if (user_backed_iter(iter))
458		dreq->flags = NFS_ODIRECT_SHOULD_DIRTY;
459
460	if (!swap)
461		nfs_start_io_direct(inode);
462
463	NFS_I(inode)->read_io += count;
464	requested = nfs_direct_read_schedule_iovec(dreq, iter, iocb->ki_pos);
465
466	if (!swap)
467		nfs_end_io_direct(inode);
468
469	if (requested > 0) {
470		result = nfs_direct_wait(dreq);
471		if (result > 0) {
472			requested -= result;
473			iocb->ki_pos += result;
474		}
475		iov_iter_revert(iter, requested);
476	} else {
477		result = requested;
478	}
479
480out_release:
481	nfs_direct_req_release(dreq);
482out:
483	return result;
484}
485
486static void nfs_direct_add_page_head(struct list_head *list,
487				     struct nfs_page *req)
488{
489	struct nfs_page *head = req->wb_head;
490
491	if (!list_empty(&head->wb_list) || !nfs_lock_request(head))
492		return;
493	if (!list_empty(&head->wb_list)) {
494		nfs_unlock_request(head);
495		return;
496	}
497	list_add(&head->wb_list, list);
498	kref_get(&head->wb_kref);
499	kref_get(&head->wb_kref);
500}
501
502static void nfs_direct_join_group(struct list_head *list,
503				  struct nfs_commit_info *cinfo,
504				  struct inode *inode)
505{
506	struct nfs_page *req, *subreq;
507
508	list_for_each_entry(req, list, wb_list) {
509		if (req->wb_head != req) {
510			nfs_direct_add_page_head(&req->wb_list, req);
511			continue;
512		}
513		subreq = req->wb_this_page;
514		if (subreq == req)
515			continue;
516		do {
517			/*
518			 * Remove subrequests from this list before freeing
519			 * them in the call to nfs_join_page_group().
520			 */
521			if (!list_empty(&subreq->wb_list)) {
522				nfs_list_remove_request(subreq);
523				nfs_release_request(subreq);
524			}
525		} while ((subreq = subreq->wb_this_page) != req);
526		nfs_join_page_group(req, cinfo, inode);
527	}
528}
529
530static void
531nfs_direct_write_scan_commit_list(struct inode *inode,
532				  struct list_head *list,
533				  struct nfs_commit_info *cinfo)
534{
535	mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
536	pnfs_recover_commit_reqs(list, cinfo);
537	nfs_scan_commit_list(&cinfo->mds->list, list, cinfo, 0);
538	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
539}
540
541static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
542{
543	struct nfs_pageio_descriptor desc;
544	struct nfs_page *req;
545	LIST_HEAD(reqs);
546	struct nfs_commit_info cinfo;
547
548	nfs_init_cinfo_from_dreq(&cinfo, dreq);
549	nfs_direct_write_scan_commit_list(dreq->inode, &reqs, &cinfo);
550
551	nfs_direct_join_group(&reqs, &cinfo, dreq->inode);
552
553	nfs_clear_pnfs_ds_commit_verifiers(&dreq->ds_cinfo);
554	get_dreq(dreq);
555
556	nfs_pageio_init_write(&desc, dreq->inode, FLUSH_STABLE, false,
557			      &nfs_direct_write_completion_ops);
558	desc.pg_dreq = dreq;
559
560	while (!list_empty(&reqs)) {
561		req = nfs_list_entry(reqs.next);
562		/* Bump the transmission count */
563		req->wb_nio++;
564		if (!nfs_pageio_add_request(&desc, req)) {
565			spin_lock(&dreq->lock);
566			if (dreq->error < 0) {
567				desc.pg_error = dreq->error;
568			} else if (desc.pg_error != -EAGAIN) {
569				dreq->flags = 0;
570				if (!desc.pg_error)
571					desc.pg_error = -EIO;
572				dreq->error = desc.pg_error;
573			} else
574				dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
575			spin_unlock(&dreq->lock);
576			break;
577		}
578		nfs_release_request(req);
579	}
580	nfs_pageio_complete(&desc);
581
582	while (!list_empty(&reqs)) {
583		req = nfs_list_entry(reqs.next);
584		nfs_list_remove_request(req);
585		nfs_unlock_and_release_request(req);
586		if (desc.pg_error == -EAGAIN) {
587			nfs_mark_request_commit(req, NULL, &cinfo, 0);
588		} else {
589			spin_lock(&dreq->lock);
590			nfs_direct_truncate_request(dreq, req);
591			spin_unlock(&dreq->lock);
592			nfs_release_request(req);
593		}
594	}
595
596	if (put_dreq(dreq))
597		nfs_direct_write_complete(dreq);
598}
599
600static void nfs_direct_commit_complete(struct nfs_commit_data *data)
601{
602	const struct nfs_writeverf *verf = data->res.verf;
603	struct nfs_direct_req *dreq = data->dreq;
604	struct nfs_commit_info cinfo;
605	struct nfs_page *req;
606	int status = data->task.tk_status;
607
608	trace_nfs_direct_commit_complete(dreq);
609
610	if (status < 0) {
611		/* Errors in commit are fatal */
612		dreq->error = status;
613		dreq->flags = NFS_ODIRECT_DONE;
614	} else {
615		status = dreq->error;
616	}
617
618	nfs_init_cinfo_from_dreq(&cinfo, dreq);
619
620	while (!list_empty(&data->pages)) {
621		req = nfs_list_entry(data->pages.next);
622		nfs_list_remove_request(req);
623		if (status < 0) {
624			spin_lock(&dreq->lock);
625			nfs_direct_truncate_request(dreq, req);
626			spin_unlock(&dreq->lock);
627			nfs_release_request(req);
628		} else if (!nfs_write_match_verf(verf, req)) {
629			dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
630			/*
631			 * Despite the reboot, the write was successful,
632			 * so reset wb_nio.
633			 */
634			req->wb_nio = 0;
635			nfs_mark_request_commit(req, NULL, &cinfo, 0);
636		} else
637			nfs_release_request(req);
638		nfs_unlock_and_release_request(req);
639	}
640
641	if (nfs_commit_end(cinfo.mds))
642		nfs_direct_write_complete(dreq);
643}
644
645static void nfs_direct_resched_write(struct nfs_commit_info *cinfo,
646		struct nfs_page *req)
647{
648	struct nfs_direct_req *dreq = cinfo->dreq;
649
650	trace_nfs_direct_resched_write(dreq);
651
652	spin_lock(&dreq->lock);
653	if (dreq->flags != NFS_ODIRECT_DONE)
654		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
655	spin_unlock(&dreq->lock);
656	nfs_mark_request_commit(req, NULL, cinfo, 0);
657}
658
659static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops = {
660	.completion = nfs_direct_commit_complete,
661	.resched_write = nfs_direct_resched_write,
662};
663
664static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
665{
666	int res;
667	struct nfs_commit_info cinfo;
668	LIST_HEAD(mds_list);
669
670	nfs_init_cinfo_from_dreq(&cinfo, dreq);
671	nfs_commit_begin(cinfo.mds);
672	nfs_scan_commit(dreq->inode, &mds_list, &cinfo);
673	res = nfs_generic_commit_list(dreq->inode, &mds_list, 0, &cinfo);
674	if (res < 0) { /* res == -ENOMEM */
675		spin_lock(&dreq->lock);
676		if (dreq->flags == 0)
677			dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
678		spin_unlock(&dreq->lock);
679	}
680	if (nfs_commit_end(cinfo.mds))
681		nfs_direct_write_complete(dreq);
682}
683
684static void nfs_direct_write_clear_reqs(struct nfs_direct_req *dreq)
685{
686	struct nfs_commit_info cinfo;
687	struct nfs_page *req;
688	LIST_HEAD(reqs);
689
690	nfs_init_cinfo_from_dreq(&cinfo, dreq);
691	nfs_direct_write_scan_commit_list(dreq->inode, &reqs, &cinfo);
692
693	while (!list_empty(&reqs)) {
694		req = nfs_list_entry(reqs.next);
695		nfs_list_remove_request(req);
696		nfs_direct_truncate_request(dreq, req);
697		nfs_release_request(req);
698		nfs_unlock_and_release_request(req);
699	}
700}
701
702static void nfs_direct_write_schedule_work(struct work_struct *work)
703{
704	struct nfs_direct_req *dreq = container_of(work, struct nfs_direct_req, work);
705	int flags = dreq->flags;
706
707	dreq->flags = 0;
708	switch (flags) {
709		case NFS_ODIRECT_DO_COMMIT:
710			nfs_direct_commit_schedule(dreq);
711			break;
712		case NFS_ODIRECT_RESCHED_WRITES:
713			nfs_direct_write_reschedule(dreq);
714			break;
715		default:
716			nfs_direct_write_clear_reqs(dreq);
717			nfs_zap_mapping(dreq->inode, dreq->inode->i_mapping);
718			nfs_direct_complete(dreq);
719	}
720}
721
722static void nfs_direct_write_complete(struct nfs_direct_req *dreq)
723{
724	trace_nfs_direct_write_complete(dreq);
725	queue_work(nfsiod_workqueue, &dreq->work); /* Calls nfs_direct_write_schedule_work */
726}
727
728static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
729{
730	struct nfs_direct_req *dreq = hdr->dreq;
731	struct nfs_commit_info cinfo;
732	struct nfs_page *req = nfs_list_entry(hdr->pages.next);
733	int flags = NFS_ODIRECT_DONE;
734
735	trace_nfs_direct_write_completion(dreq);
736
737	nfs_init_cinfo_from_dreq(&cinfo, dreq);
738
739	spin_lock(&dreq->lock);
740	if (test_bit(NFS_IOHDR_REDO, &hdr->flags)) {
741		spin_unlock(&dreq->lock);
742		goto out_put;
743	}
744
745	nfs_direct_count_bytes(dreq, hdr);
746	if (test_bit(NFS_IOHDR_UNSTABLE_WRITES, &hdr->flags) &&
747	    !test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
748		if (!dreq->flags)
749			dreq->flags = NFS_ODIRECT_DO_COMMIT;
750		flags = dreq->flags;
751	}
752	spin_unlock(&dreq->lock);
753
754	while (!list_empty(&hdr->pages)) {
755
756		req = nfs_list_entry(hdr->pages.next);
757		nfs_list_remove_request(req);
758		if (flags == NFS_ODIRECT_DO_COMMIT) {
759			kref_get(&req->wb_kref);
760			memcpy(&req->wb_verf, &hdr->verf.verifier,
761			       sizeof(req->wb_verf));
762			nfs_mark_request_commit(req, hdr->lseg, &cinfo,
763				hdr->ds_commit_idx);
764		} else if (flags == NFS_ODIRECT_RESCHED_WRITES) {
765			kref_get(&req->wb_kref);
766			nfs_mark_request_commit(req, NULL, &cinfo, 0);
767		}
768		nfs_unlock_and_release_request(req);
769	}
770
771out_put:
772	if (put_dreq(dreq))
773		nfs_direct_write_complete(dreq);
774	hdr->release(hdr);
775}
776
777static void nfs_write_sync_pgio_error(struct list_head *head, int error)
778{
779	struct nfs_page *req;
780
781	while (!list_empty(head)) {
782		req = nfs_list_entry(head->next);
783		nfs_list_remove_request(req);
784		nfs_unlock_and_release_request(req);
785	}
786}
787
788static void nfs_direct_write_reschedule_io(struct nfs_pgio_header *hdr)
789{
790	struct nfs_direct_req *dreq = hdr->dreq;
791	struct nfs_page *req;
792	struct nfs_commit_info cinfo;
793
794	trace_nfs_direct_write_reschedule_io(dreq);
795
796	nfs_init_cinfo_from_dreq(&cinfo, dreq);
797	spin_lock(&dreq->lock);
798	if (dreq->error == 0)
799		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
800	set_bit(NFS_IOHDR_REDO, &hdr->flags);
801	spin_unlock(&dreq->lock);
802	while (!list_empty(&hdr->pages)) {
803		req = nfs_list_entry(hdr->pages.next);
804		nfs_list_remove_request(req);
805		nfs_unlock_request(req);
806		nfs_mark_request_commit(req, NULL, &cinfo, 0);
807	}
808}
809
810static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops = {
811	.error_cleanup = nfs_write_sync_pgio_error,
812	.init_hdr = nfs_direct_pgio_init,
813	.completion = nfs_direct_write_completion,
814	.reschedule_io = nfs_direct_write_reschedule_io,
815};
816
817
818/*
819 * NB: Return the value of the first error return code.  Subsequent
820 *     errors after the first one are ignored.
821 */
822/*
823 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE
824 * operation.  If nfs_writedata_alloc() or get_user_pages() fails,
825 * bail and stop sending more writes.  Write length accounting is
826 * handled automatically by nfs_direct_write_result().  Otherwise, if
827 * no requests have been sent, just return an error.
828 */
829static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
830					       struct iov_iter *iter,
831					       loff_t pos, int ioflags)
832{
833	struct nfs_pageio_descriptor desc;
834	struct inode *inode = dreq->inode;
835	struct nfs_commit_info cinfo;
836	ssize_t result = 0;
837	size_t requested_bytes = 0;
838	size_t wsize = max_t(size_t, NFS_SERVER(inode)->wsize, PAGE_SIZE);
839	bool defer = false;
840
841	trace_nfs_direct_write_schedule_iovec(dreq);
842
843	nfs_pageio_init_write(&desc, inode, ioflags, false,
844			      &nfs_direct_write_completion_ops);
845	desc.pg_dreq = dreq;
846	get_dreq(dreq);
847	inode_dio_begin(inode);
848
849	NFS_I(inode)->write_io += iov_iter_count(iter);
850	while (iov_iter_count(iter)) {
851		struct page **pagevec;
852		size_t bytes;
853		size_t pgbase;
854		unsigned npages, i;
855
856		result = iov_iter_get_pages_alloc2(iter, &pagevec,
857						  wsize, &pgbase);
858		if (result < 0)
859			break;
860
861		bytes = result;
862		npages = (result + pgbase + PAGE_SIZE - 1) / PAGE_SIZE;
863		for (i = 0; i < npages; i++) {
864			struct nfs_page *req;
865			unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
866
867			req = nfs_page_create_from_page(dreq->ctx, pagevec[i],
868							pgbase, pos, req_len);
869			if (IS_ERR(req)) {
870				result = PTR_ERR(req);
871				break;
872			}
873
874			if (desc.pg_error < 0) {
875				nfs_free_request(req);
876				result = desc.pg_error;
877				break;
878			}
879
880			pgbase = 0;
881			bytes -= req_len;
882			requested_bytes += req_len;
883			pos += req_len;
884			dreq->bytes_left -= req_len;
885
886			if (defer) {
887				nfs_mark_request_commit(req, NULL, &cinfo, 0);
888				continue;
889			}
890
891			nfs_lock_request(req);
892			if (nfs_pageio_add_request(&desc, req))
893				continue;
894
895			/* Exit on hard errors */
896			if (desc.pg_error < 0 && desc.pg_error != -EAGAIN) {
897				result = desc.pg_error;
898				nfs_unlock_and_release_request(req);
899				break;
900			}
901
902			/* If the error is soft, defer remaining requests */
903			nfs_init_cinfo_from_dreq(&cinfo, dreq);
904			spin_lock(&dreq->lock);
905			dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
906			spin_unlock(&dreq->lock);
907			nfs_unlock_request(req);
908			nfs_mark_request_commit(req, NULL, &cinfo, 0);
909			desc.pg_error = 0;
910			defer = true;
911		}
912		nfs_direct_release_pages(pagevec, npages);
913		kvfree(pagevec);
914		if (result < 0)
915			break;
916	}
917	nfs_pageio_complete(&desc);
918
919	/*
920	 * If no bytes were started, return the error, and let the
921	 * generic layer handle the completion.
922	 */
923	if (requested_bytes == 0) {
924		inode_dio_end(inode);
925		nfs_direct_req_release(dreq);
926		return result < 0 ? result : -EIO;
927	}
928
929	if (put_dreq(dreq))
930		nfs_direct_write_complete(dreq);
931	return requested_bytes;
932}
933
934/**
935 * nfs_file_direct_write - file direct write operation for NFS files
936 * @iocb: target I/O control block
937 * @iter: vector of user buffers from which to write data
938 * @swap: flag indicating this is swap IO, not O_DIRECT IO
939 *
940 * We use this function for direct writes instead of calling
941 * generic_file_aio_write() in order to avoid taking the inode
942 * semaphore and updating the i_size.  The NFS server will set
943 * the new i_size and this client must read the updated size
944 * back into its cache.  We let the server do generic write
945 * parameter checking and report problems.
946 *
947 * We eliminate local atime updates, see direct read above.
948 *
949 * We avoid unnecessary page cache invalidations for normal cached
950 * readers of this file.
951 *
952 * Note that O_APPEND is not supported for NFS direct writes, as there
953 * is no atomic O_APPEND write facility in the NFS protocol.
954 */
955ssize_t nfs_file_direct_write(struct kiocb *iocb, struct iov_iter *iter,
956			      bool swap)
957{
958	ssize_t result, requested;
959	size_t count;
960	struct file *file = iocb->ki_filp;
961	struct address_space *mapping = file->f_mapping;
962	struct inode *inode = mapping->host;
963	struct nfs_direct_req *dreq;
964	struct nfs_lock_context *l_ctx;
965	loff_t pos, end;
966
967	dfprintk(FILE, "NFS: direct write(%pD2, %zd@%Ld)\n",
968		file, iov_iter_count(iter), (long long) iocb->ki_pos);
969
970	if (swap)
971		/* bypass generic checks */
972		result =  iov_iter_count(iter);
973	else
974		result = generic_write_checks(iocb, iter);
975	if (result <= 0)
976		return result;
977	count = result;
978	nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
979
980	pos = iocb->ki_pos;
981	end = (pos + iov_iter_count(iter) - 1) >> PAGE_SHIFT;
982
983	task_io_account_write(count);
984
985	result = -ENOMEM;
986	dreq = nfs_direct_req_alloc();
987	if (!dreq)
988		goto out;
989
990	dreq->inode = inode;
991	dreq->bytes_left = dreq->max_count = count;
992	dreq->io_start = pos;
993	dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
994	l_ctx = nfs_get_lock_context(dreq->ctx);
995	if (IS_ERR(l_ctx)) {
996		result = PTR_ERR(l_ctx);
997		nfs_direct_req_release(dreq);
998		goto out_release;
999	}
1000	dreq->l_ctx = l_ctx;
1001	if (!is_sync_kiocb(iocb))
1002		dreq->iocb = iocb;
1003	pnfs_init_ds_commit_info_ops(&dreq->ds_cinfo, inode);
1004
1005	if (swap) {
1006		requested = nfs_direct_write_schedule_iovec(dreq, iter, pos,
1007							    FLUSH_STABLE);
1008	} else {
1009		nfs_start_io_direct(inode);
1010
1011		requested = nfs_direct_write_schedule_iovec(dreq, iter, pos,
1012							    FLUSH_COND_STABLE);
1013
1014		if (mapping->nrpages) {
1015			invalidate_inode_pages2_range(mapping,
1016						      pos >> PAGE_SHIFT, end);
1017		}
1018
1019		nfs_end_io_direct(inode);
1020	}
1021
1022	if (requested > 0) {
1023		result = nfs_direct_wait(dreq);
1024		if (result > 0) {
1025			requested -= result;
1026			iocb->ki_pos = pos + result;
1027			/* XXX: should check the generic_write_sync retval */
1028			generic_write_sync(iocb, result);
1029		}
1030		iov_iter_revert(iter, requested);
1031	} else {
1032		result = requested;
1033	}
1034	nfs_fscache_invalidate(inode, FSCACHE_INVAL_DIO_WRITE);
1035out_release:
1036	nfs_direct_req_release(dreq);
1037out:
1038	return result;
1039}
1040
1041/**
1042 * nfs_init_directcache - create a slab cache for nfs_direct_req structures
1043 *
1044 */
1045int __init nfs_init_directcache(void)
1046{
1047	nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
1048						sizeof(struct nfs_direct_req),
1049						0, (SLAB_RECLAIM_ACCOUNT|
1050							SLAB_MEM_SPREAD),
1051						NULL);
1052	if (nfs_direct_cachep == NULL)
1053		return -ENOMEM;
1054
1055	return 0;
1056}
1057
1058/**
1059 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures
1060 *
1061 */
1062void nfs_destroy_directcache(void)
1063{
1064	kmem_cache_destroy(nfs_direct_cachep);
1065}
1066