Linux-libre 5.3.12-gnu
[librecmc/linux-libre.git] / fs / nfs / direct.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * linux/fs/nfs/direct.c
4  *
5  * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
6  *
7  * High-performance uncached I/O for the Linux NFS client
8  *
9  * There are important applications whose performance or correctness
10  * depends on uncached access to file data.  Database clusters
11  * (multiple copies of the same instance running on separate hosts)
12  * implement their own cache coherency protocol that subsumes file
13  * system cache protocols.  Applications that process datasets
14  * considerably larger than the client's memory do not always benefit
15  * from a local cache.  A streaming video server, for instance, has no
16  * need to cache the contents of a file.
17  *
18  * When an application requests uncached I/O, all read and write requests
19  * are made directly to the server; data stored or fetched via these
20  * requests is not cached in the Linux page cache.  The client does not
21  * correct unaligned requests from applications.  All requested bytes are
22  * held on permanent storage before a direct write system call returns to
23  * an application.
24  *
25  * Solaris implements an uncached I/O facility called directio() that
26  * is used for backups and sequential I/O to very large files.  Solaris
27  * also supports uncaching whole NFS partitions with "-o forcedirectio,"
28  * an undocumented mount option.
29  *
30  * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
31  * help from Andrew Morton.
32  *
33  * 18 Dec 2001  Initial implementation for 2.4  --cel
34  * 08 Jul 2002  Version for 2.4.19, with bug fixes --trondmy
35  * 08 Jun 2003  Port to 2.5 APIs  --cel
36  * 31 Mar 2004  Handle direct I/O without VFS support  --cel
37  * 15 Sep 2004  Parallel async reads  --cel
38  * 04 May 2005  support O_DIRECT with aio  --cel
39  *
40  */
41
42 #include <linux/errno.h>
43 #include <linux/sched.h>
44 #include <linux/kernel.h>
45 #include <linux/file.h>
46 #include <linux/pagemap.h>
47 #include <linux/kref.h>
48 #include <linux/slab.h>
49 #include <linux/task_io_accounting_ops.h>
50 #include <linux/module.h>
51
52 #include <linux/nfs_fs.h>
53 #include <linux/nfs_page.h>
54 #include <linux/sunrpc/clnt.h>
55
56 #include <linux/uaccess.h>
57 #include <linux/atomic.h>
58
59 #include "internal.h"
60 #include "iostat.h"
61 #include "pnfs.h"
62
63 #define NFSDBG_FACILITY         NFSDBG_VFS
64
65 static struct kmem_cache *nfs_direct_cachep;
66
67 /*
68  * This represents a set of asynchronous requests that we're waiting on
69  */
70 struct nfs_direct_mirror {
71         ssize_t count;
72 };
73
74 struct nfs_direct_req {
75         struct kref             kref;           /* release manager */
76
77         /* I/O parameters */
78         struct nfs_open_context *ctx;           /* file open context info */
79         struct nfs_lock_context *l_ctx;         /* Lock context info */
80         struct kiocb *          iocb;           /* controlling i/o request */
81         struct inode *          inode;          /* target file of i/o */
82
83         /* completion state */
84         atomic_t                io_count;       /* i/os we're waiting for */
85         spinlock_t              lock;           /* protect completion state */
86
87         struct nfs_direct_mirror mirrors[NFS_PAGEIO_DESCRIPTOR_MIRROR_MAX];
88         int                     mirror_count;
89
90         loff_t                  io_start;       /* Start offset for I/O */
91         ssize_t                 count,          /* bytes actually processed */
92                                 max_count,      /* max expected count */
93                                 bytes_left,     /* bytes left to be sent */
94                                 error;          /* any reported error */
95         struct completion       completion;     /* wait for i/o completion */
96
97         /* commit state */
98         struct nfs_mds_commit_info mds_cinfo;   /* Storage for cinfo */
99         struct pnfs_ds_commit_info ds_cinfo;    /* Storage for cinfo */
100         struct work_struct      work;
101         int                     flags;
102         /* for write */
103 #define NFS_ODIRECT_DO_COMMIT           (1)     /* an unstable reply was received */
104 #define NFS_ODIRECT_RESCHED_WRITES      (2)     /* write verification failed */
105         /* for read */
106 #define NFS_ODIRECT_SHOULD_DIRTY        (3)     /* dirty user-space page after read */
107         struct nfs_writeverf    verf;           /* unstable write verifier */
108 };
109
110 static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops;
111 static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops;
112 static void nfs_direct_write_complete(struct nfs_direct_req *dreq);
113 static void nfs_direct_write_schedule_work(struct work_struct *work);
114
115 static inline void get_dreq(struct nfs_direct_req *dreq)
116 {
117         atomic_inc(&dreq->io_count);
118 }
119
120 static inline int put_dreq(struct nfs_direct_req *dreq)
121 {
122         return atomic_dec_and_test(&dreq->io_count);
123 }
124
125 static void
126 nfs_direct_handle_truncated(struct nfs_direct_req *dreq,
127                             const struct nfs_pgio_header *hdr,
128                             ssize_t dreq_len)
129 {
130         struct nfs_direct_mirror *mirror = &dreq->mirrors[hdr->pgio_mirror_idx];
131
132         if (!(test_bit(NFS_IOHDR_ERROR, &hdr->flags) ||
133               test_bit(NFS_IOHDR_EOF, &hdr->flags)))
134                 return;
135         if (dreq->max_count >= dreq_len) {
136                 dreq->max_count = dreq_len;
137                 if (dreq->count > dreq_len)
138                         dreq->count = dreq_len;
139
140                 if (test_bit(NFS_IOHDR_ERROR, &hdr->flags))
141                         dreq->error = hdr->error;
142                 else /* Clear outstanding error if this is EOF */
143                         dreq->error = 0;
144         }
145         if (mirror->count > dreq_len)
146                 mirror->count = dreq_len;
147 }
148
149 static void
150 nfs_direct_count_bytes(struct nfs_direct_req *dreq,
151                        const struct nfs_pgio_header *hdr)
152 {
153         struct nfs_direct_mirror *mirror = &dreq->mirrors[hdr->pgio_mirror_idx];
154         loff_t hdr_end = hdr->io_start + hdr->good_bytes;
155         ssize_t dreq_len = 0;
156
157         if (hdr_end > dreq->io_start)
158                 dreq_len = hdr_end - dreq->io_start;
159
160         nfs_direct_handle_truncated(dreq, hdr, dreq_len);
161
162         if (dreq_len > dreq->max_count)
163                 dreq_len = dreq->max_count;
164
165         if (mirror->count < dreq_len)
166                 mirror->count = dreq_len;
167         if (dreq->count < dreq_len)
168                 dreq->count = dreq_len;
169 }
170
171 /*
172  * nfs_direct_select_verf - select the right verifier
173  * @dreq - direct request possibly spanning multiple servers
174  * @ds_clp - nfs_client of data server or NULL if MDS / non-pnfs
175  * @commit_idx - commit bucket index for the DS
176  *
177  * returns the correct verifier to use given the role of the server
178  */
179 static struct nfs_writeverf *
180 nfs_direct_select_verf(struct nfs_direct_req *dreq,
181                        struct nfs_client *ds_clp,
182                        int commit_idx)
183 {
184         struct nfs_writeverf *verfp = &dreq->verf;
185
186 #ifdef CONFIG_NFS_V4_1
187         /*
188          * pNFS is in use, use the DS verf except commit_through_mds is set
189          * for layout segment where nbuckets is zero.
190          */
191         if (ds_clp && dreq->ds_cinfo.nbuckets > 0) {
192                 if (commit_idx >= 0 && commit_idx < dreq->ds_cinfo.nbuckets)
193                         verfp = &dreq->ds_cinfo.buckets[commit_idx].direct_verf;
194                 else
195                         WARN_ON_ONCE(1);
196         }
197 #endif
198         return verfp;
199 }
200
201
202 /*
203  * nfs_direct_set_hdr_verf - set the write/commit verifier
204  * @dreq - direct request possibly spanning multiple servers
205  * @hdr - pageio header to validate against previously seen verfs
206  *
207  * Set the server's (MDS or DS) "seen" verifier
208  */
209 static void nfs_direct_set_hdr_verf(struct nfs_direct_req *dreq,
210                                     struct nfs_pgio_header *hdr)
211 {
212         struct nfs_writeverf *verfp;
213
214         verfp = nfs_direct_select_verf(dreq, hdr->ds_clp, hdr->ds_commit_idx);
215         WARN_ON_ONCE(verfp->committed >= 0);
216         memcpy(verfp, &hdr->verf, sizeof(struct nfs_writeverf));
217         WARN_ON_ONCE(verfp->committed < 0);
218 }
219
220 static int nfs_direct_cmp_verf(const struct nfs_writeverf *v1,
221                 const struct nfs_writeverf *v2)
222 {
223         return nfs_write_verifier_cmp(&v1->verifier, &v2->verifier);
224 }
225
226 /*
227  * nfs_direct_cmp_hdr_verf - compare verifier for pgio header
228  * @dreq - direct request possibly spanning multiple servers
229  * @hdr - pageio header to validate against previously seen verf
230  *
231  * set the server's "seen" verf if not initialized.
232  * returns result of comparison between @hdr->verf and the "seen"
233  * verf of the server used by @hdr (DS or MDS)
234  */
235 static int nfs_direct_set_or_cmp_hdr_verf(struct nfs_direct_req *dreq,
236                                           struct nfs_pgio_header *hdr)
237 {
238         struct nfs_writeverf *verfp;
239
240         verfp = nfs_direct_select_verf(dreq, hdr->ds_clp, hdr->ds_commit_idx);
241         if (verfp->committed < 0) {
242                 nfs_direct_set_hdr_verf(dreq, hdr);
243                 return 0;
244         }
245         return nfs_direct_cmp_verf(verfp, &hdr->verf);
246 }
247
248 /*
249  * nfs_direct_cmp_commit_data_verf - compare verifier for commit data
250  * @dreq - direct request possibly spanning multiple servers
251  * @data - commit data to validate against previously seen verf
252  *
253  * returns result of comparison between @data->verf and the verf of
254  * the server used by @data (DS or MDS)
255  */
256 static int nfs_direct_cmp_commit_data_verf(struct nfs_direct_req *dreq,
257                                            struct nfs_commit_data *data)
258 {
259         struct nfs_writeverf *verfp;
260
261         verfp = nfs_direct_select_verf(dreq, data->ds_clp,
262                                          data->ds_commit_index);
263
264         /* verifier not set so always fail */
265         if (verfp->committed < 0)
266                 return 1;
267
268         return nfs_direct_cmp_verf(verfp, &data->verf);
269 }
270
271 /**
272  * nfs_direct_IO - NFS address space operation for direct I/O
273  * @iocb: target I/O control block
274  * @iter: I/O buffer
275  *
276  * The presence of this routine in the address space ops vector means
277  * the NFS client supports direct I/O. However, for most direct IO, we
278  * shunt off direct read and write requests before the VFS gets them,
279  * so this method is only ever called for swap.
280  */
281 ssize_t nfs_direct_IO(struct kiocb *iocb, struct iov_iter *iter)
282 {
283         struct inode *inode = iocb->ki_filp->f_mapping->host;
284
285         /* we only support swap file calling nfs_direct_IO */
286         if (!IS_SWAPFILE(inode))
287                 return 0;
288
289         VM_BUG_ON(iov_iter_count(iter) != PAGE_SIZE);
290
291         if (iov_iter_rw(iter) == READ)
292                 return nfs_file_direct_read(iocb, iter);
293         return nfs_file_direct_write(iocb, iter);
294 }
295
296 static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
297 {
298         unsigned int i;
299         for (i = 0; i < npages; i++)
300                 put_page(pages[i]);
301 }
302
303 void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
304                               struct nfs_direct_req *dreq)
305 {
306         cinfo->inode = dreq->inode;
307         cinfo->mds = &dreq->mds_cinfo;
308         cinfo->ds = &dreq->ds_cinfo;
309         cinfo->dreq = dreq;
310         cinfo->completion_ops = &nfs_direct_commit_completion_ops;
311 }
312
313 static inline void nfs_direct_setup_mirroring(struct nfs_direct_req *dreq,
314                                              struct nfs_pageio_descriptor *pgio,
315                                              struct nfs_page *req)
316 {
317         int mirror_count = 1;
318
319         if (pgio->pg_ops->pg_get_mirror_count)
320                 mirror_count = pgio->pg_ops->pg_get_mirror_count(pgio, req);
321
322         dreq->mirror_count = mirror_count;
323 }
324
325 static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
326 {
327         struct nfs_direct_req *dreq;
328
329         dreq = kmem_cache_zalloc(nfs_direct_cachep, GFP_KERNEL);
330         if (!dreq)
331                 return NULL;
332
333         kref_init(&dreq->kref);
334         kref_get(&dreq->kref);
335         init_completion(&dreq->completion);
336         INIT_LIST_HEAD(&dreq->mds_cinfo.list);
337         dreq->verf.committed = NFS_INVALID_STABLE_HOW;  /* not set yet */
338         INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
339         dreq->mirror_count = 1;
340         spin_lock_init(&dreq->lock);
341
342         return dreq;
343 }
344
345 static void nfs_direct_req_free(struct kref *kref)
346 {
347         struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
348
349         nfs_free_pnfs_ds_cinfo(&dreq->ds_cinfo);
350         if (dreq->l_ctx != NULL)
351                 nfs_put_lock_context(dreq->l_ctx);
352         if (dreq->ctx != NULL)
353                 put_nfs_open_context(dreq->ctx);
354         kmem_cache_free(nfs_direct_cachep, dreq);
355 }
356
357 static void nfs_direct_req_release(struct nfs_direct_req *dreq)
358 {
359         kref_put(&dreq->kref, nfs_direct_req_free);
360 }
361
362 ssize_t nfs_dreq_bytes_left(struct nfs_direct_req *dreq)
363 {
364         return dreq->bytes_left;
365 }
366 EXPORT_SYMBOL_GPL(nfs_dreq_bytes_left);
367
368 /*
369  * Collects and returns the final error value/byte-count.
370  */
371 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
372 {
373         ssize_t result = -EIOCBQUEUED;
374
375         /* Async requests don't wait here */
376         if (dreq->iocb)
377                 goto out;
378
379         result = wait_for_completion_killable(&dreq->completion);
380
381         if (!result) {
382                 result = dreq->count;
383                 WARN_ON_ONCE(dreq->count < 0);
384         }
385         if (!result)
386                 result = dreq->error;
387
388 out:
389         return (ssize_t) result;
390 }
391
392 /*
393  * Synchronous I/O uses a stack-allocated iocb.  Thus we can't trust
394  * the iocb is still valid here if this is a synchronous request.
395  */
396 static void nfs_direct_complete(struct nfs_direct_req *dreq)
397 {
398         struct inode *inode = dreq->inode;
399
400         inode_dio_end(inode);
401
402         if (dreq->iocb) {
403                 long res = (long) dreq->error;
404                 if (dreq->count != 0) {
405                         res = (long) dreq->count;
406                         WARN_ON_ONCE(dreq->count < 0);
407                 }
408                 dreq->iocb->ki_complete(dreq->iocb, res, 0);
409         }
410
411         complete(&dreq->completion);
412
413         nfs_direct_req_release(dreq);
414 }
415
416 static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
417 {
418         unsigned long bytes = 0;
419         struct nfs_direct_req *dreq = hdr->dreq;
420
421         spin_lock(&dreq->lock);
422         if (test_bit(NFS_IOHDR_REDO, &hdr->flags)) {
423                 spin_unlock(&dreq->lock);
424                 goto out_put;
425         }
426
427         nfs_direct_count_bytes(dreq, hdr);
428         spin_unlock(&dreq->lock);
429
430         while (!list_empty(&hdr->pages)) {
431                 struct nfs_page *req = nfs_list_entry(hdr->pages.next);
432                 struct page *page = req->wb_page;
433
434                 if (!PageCompound(page) && bytes < hdr->good_bytes &&
435                     (dreq->flags == NFS_ODIRECT_SHOULD_DIRTY))
436                         set_page_dirty(page);
437                 bytes += req->wb_bytes;
438                 nfs_list_remove_request(req);
439                 nfs_release_request(req);
440         }
441 out_put:
442         if (put_dreq(dreq))
443                 nfs_direct_complete(dreq);
444         hdr->release(hdr);
445 }
446
447 static void nfs_read_sync_pgio_error(struct list_head *head, int error)
448 {
449         struct nfs_page *req;
450
451         while (!list_empty(head)) {
452                 req = nfs_list_entry(head->next);
453                 nfs_list_remove_request(req);
454                 nfs_release_request(req);
455         }
456 }
457
458 static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
459 {
460         get_dreq(hdr->dreq);
461 }
462
463 static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
464         .error_cleanup = nfs_read_sync_pgio_error,
465         .init_hdr = nfs_direct_pgio_init,
466         .completion = nfs_direct_read_completion,
467 };
468
469 /*
470  * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
471  * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
472  * bail and stop sending more reads.  Read length accounting is
473  * handled automatically by nfs_direct_read_result().  Otherwise, if
474  * no requests have been sent, just return an error.
475  */
476
477 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
478                                               struct iov_iter *iter,
479                                               loff_t pos)
480 {
481         struct nfs_pageio_descriptor desc;
482         struct inode *inode = dreq->inode;
483         ssize_t result = -EINVAL;
484         size_t requested_bytes = 0;
485         size_t rsize = max_t(size_t, NFS_SERVER(inode)->rsize, PAGE_SIZE);
486
487         nfs_pageio_init_read(&desc, dreq->inode, false,
488                              &nfs_direct_read_completion_ops);
489         get_dreq(dreq);
490         desc.pg_dreq = dreq;
491         inode_dio_begin(inode);
492
493         while (iov_iter_count(iter)) {
494                 struct page **pagevec;
495                 size_t bytes;
496                 size_t pgbase;
497                 unsigned npages, i;
498
499                 result = iov_iter_get_pages_alloc(iter, &pagevec, 
500                                                   rsize, &pgbase);
501                 if (result < 0)
502                         break;
503         
504                 bytes = result;
505                 iov_iter_advance(iter, bytes);
506                 npages = (result + pgbase + PAGE_SIZE - 1) / PAGE_SIZE;
507                 for (i = 0; i < npages; i++) {
508                         struct nfs_page *req;
509                         unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
510                         /* XXX do we need to do the eof zeroing found in async_filler? */
511                         req = nfs_create_request(dreq->ctx, pagevec[i],
512                                                  pgbase, req_len);
513                         if (IS_ERR(req)) {
514                                 result = PTR_ERR(req);
515                                 break;
516                         }
517                         req->wb_index = pos >> PAGE_SHIFT;
518                         req->wb_offset = pos & ~PAGE_MASK;
519                         if (!nfs_pageio_add_request(&desc, req)) {
520                                 result = desc.pg_error;
521                                 nfs_release_request(req);
522                                 break;
523                         }
524                         pgbase = 0;
525                         bytes -= req_len;
526                         requested_bytes += req_len;
527                         pos += req_len;
528                         dreq->bytes_left -= req_len;
529                 }
530                 nfs_direct_release_pages(pagevec, npages);
531                 kvfree(pagevec);
532                 if (result < 0)
533                         break;
534         }
535
536         nfs_pageio_complete(&desc);
537
538         /*
539          * If no bytes were started, return the error, and let the
540          * generic layer handle the completion.
541          */
542         if (requested_bytes == 0) {
543                 inode_dio_end(inode);
544                 nfs_direct_req_release(dreq);
545                 return result < 0 ? result : -EIO;
546         }
547
548         if (put_dreq(dreq))
549                 nfs_direct_complete(dreq);
550         return requested_bytes;
551 }
552
553 /**
554  * nfs_file_direct_read - file direct read operation for NFS files
555  * @iocb: target I/O control block
556  * @iter: vector of user buffers into which to read data
557  *
558  * We use this function for direct reads instead of calling
559  * generic_file_aio_read() in order to avoid gfar's check to see if
560  * the request starts before the end of the file.  For that check
561  * to work, we must generate a GETATTR before each direct read, and
562  * even then there is a window between the GETATTR and the subsequent
563  * READ where the file size could change.  Our preference is simply
564  * to do all reads the application wants, and the server will take
565  * care of managing the end of file boundary.
566  *
567  * This function also eliminates unnecessarily updating the file's
568  * atime locally, as the NFS server sets the file's atime, and this
569  * client must read the updated atime from the server back into its
570  * cache.
571  */
572 ssize_t nfs_file_direct_read(struct kiocb *iocb, struct iov_iter *iter)
573 {
574         struct file *file = iocb->ki_filp;
575         struct address_space *mapping = file->f_mapping;
576         struct inode *inode = mapping->host;
577         struct nfs_direct_req *dreq;
578         struct nfs_lock_context *l_ctx;
579         ssize_t result = -EINVAL, requested;
580         size_t count = iov_iter_count(iter);
581         nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
582
583         dfprintk(FILE, "NFS: direct read(%pD2, %zd@%Ld)\n",
584                 file, count, (long long) iocb->ki_pos);
585
586         result = 0;
587         if (!count)
588                 goto out;
589
590         task_io_account_read(count);
591
592         result = -ENOMEM;
593         dreq = nfs_direct_req_alloc();
594         if (dreq == NULL)
595                 goto out;
596
597         dreq->inode = inode;
598         dreq->bytes_left = dreq->max_count = count;
599         dreq->io_start = iocb->ki_pos;
600         dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
601         l_ctx = nfs_get_lock_context(dreq->ctx);
602         if (IS_ERR(l_ctx)) {
603                 result = PTR_ERR(l_ctx);
604                 goto out_release;
605         }
606         dreq->l_ctx = l_ctx;
607         if (!is_sync_kiocb(iocb))
608                 dreq->iocb = iocb;
609
610         if (iter_is_iovec(iter))
611                 dreq->flags = NFS_ODIRECT_SHOULD_DIRTY;
612
613         nfs_start_io_direct(inode);
614
615         NFS_I(inode)->read_io += count;
616         requested = nfs_direct_read_schedule_iovec(dreq, iter, iocb->ki_pos);
617
618         nfs_end_io_direct(inode);
619
620         if (requested > 0) {
621                 result = nfs_direct_wait(dreq);
622                 if (result > 0) {
623                         requested -= result;
624                         iocb->ki_pos += result;
625                 }
626                 iov_iter_revert(iter, requested);
627         } else {
628                 result = requested;
629         }
630
631 out_release:
632         nfs_direct_req_release(dreq);
633 out:
634         return result;
635 }
636
637 static void
638 nfs_direct_write_scan_commit_list(struct inode *inode,
639                                   struct list_head *list,
640                                   struct nfs_commit_info *cinfo)
641 {
642         mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
643 #ifdef CONFIG_NFS_V4_1
644         if (cinfo->ds != NULL && cinfo->ds->nwritten != 0)
645                 NFS_SERVER(inode)->pnfs_curr_ld->recover_commit_reqs(list, cinfo);
646 #endif
647         nfs_scan_commit_list(&cinfo->mds->list, list, cinfo, 0);
648         mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
649 }
650
651 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
652 {
653         struct nfs_pageio_descriptor desc;
654         struct nfs_page *req, *tmp;
655         LIST_HEAD(reqs);
656         struct nfs_commit_info cinfo;
657         LIST_HEAD(failed);
658         int i;
659
660         nfs_init_cinfo_from_dreq(&cinfo, dreq);
661         nfs_direct_write_scan_commit_list(dreq->inode, &reqs, &cinfo);
662
663         dreq->count = 0;
664         dreq->max_count = 0;
665         list_for_each_entry(req, &reqs, wb_list)
666                 dreq->max_count += req->wb_bytes;
667         dreq->verf.committed = NFS_INVALID_STABLE_HOW;
668         nfs_clear_pnfs_ds_commit_verifiers(&dreq->ds_cinfo);
669         for (i = 0; i < dreq->mirror_count; i++)
670                 dreq->mirrors[i].count = 0;
671         get_dreq(dreq);
672
673         nfs_pageio_init_write(&desc, dreq->inode, FLUSH_STABLE, false,
674                               &nfs_direct_write_completion_ops);
675         desc.pg_dreq = dreq;
676
677         req = nfs_list_entry(reqs.next);
678         nfs_direct_setup_mirroring(dreq, &desc, req);
679         if (desc.pg_error < 0) {
680                 list_splice_init(&reqs, &failed);
681                 goto out_failed;
682         }
683
684         list_for_each_entry_safe(req, tmp, &reqs, wb_list) {
685                 /* Bump the transmission count */
686                 req->wb_nio++;
687                 if (!nfs_pageio_add_request(&desc, req)) {
688                         nfs_list_move_request(req, &failed);
689                         spin_lock(&cinfo.inode->i_lock);
690                         dreq->flags = 0;
691                         if (desc.pg_error < 0)
692                                 dreq->error = desc.pg_error;
693                         else
694                                 dreq->error = -EIO;
695                         spin_unlock(&cinfo.inode->i_lock);
696                 }
697                 nfs_release_request(req);
698         }
699         nfs_pageio_complete(&desc);
700
701 out_failed:
702         while (!list_empty(&failed)) {
703                 req = nfs_list_entry(failed.next);
704                 nfs_list_remove_request(req);
705                 nfs_unlock_and_release_request(req);
706         }
707
708         if (put_dreq(dreq))
709                 nfs_direct_write_complete(dreq);
710 }
711
712 static void nfs_direct_commit_complete(struct nfs_commit_data *data)
713 {
714         struct nfs_direct_req *dreq = data->dreq;
715         struct nfs_commit_info cinfo;
716         struct nfs_page *req;
717         int status = data->task.tk_status;
718
719         nfs_init_cinfo_from_dreq(&cinfo, dreq);
720         if (status < 0 || nfs_direct_cmp_commit_data_verf(dreq, data))
721                 dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
722
723         while (!list_empty(&data->pages)) {
724                 req = nfs_list_entry(data->pages.next);
725                 nfs_list_remove_request(req);
726                 if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES) {
727                         /*
728                          * Despite the reboot, the write was successful,
729                          * so reset wb_nio.
730                          */
731                         req->wb_nio = 0;
732                         /* Note the rewrite will go through mds */
733                         nfs_mark_request_commit(req, NULL, &cinfo, 0);
734                 } else
735                         nfs_release_request(req);
736                 nfs_unlock_and_release_request(req);
737         }
738
739         if (atomic_dec_and_test(&cinfo.mds->rpcs_out))
740                 nfs_direct_write_complete(dreq);
741 }
742
743 static void nfs_direct_resched_write(struct nfs_commit_info *cinfo,
744                 struct nfs_page *req)
745 {
746         struct nfs_direct_req *dreq = cinfo->dreq;
747
748         spin_lock(&dreq->lock);
749         dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
750         spin_unlock(&dreq->lock);
751         nfs_mark_request_commit(req, NULL, cinfo, 0);
752 }
753
754 static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops = {
755         .completion = nfs_direct_commit_complete,
756         .resched_write = nfs_direct_resched_write,
757 };
758
759 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
760 {
761         int res;
762         struct nfs_commit_info cinfo;
763         LIST_HEAD(mds_list);
764
765         nfs_init_cinfo_from_dreq(&cinfo, dreq);
766         nfs_scan_commit(dreq->inode, &mds_list, &cinfo);
767         res = nfs_generic_commit_list(dreq->inode, &mds_list, 0, &cinfo);
768         if (res < 0) /* res == -ENOMEM */
769                 nfs_direct_write_reschedule(dreq);
770 }
771
772 static void nfs_direct_write_schedule_work(struct work_struct *work)
773 {
774         struct nfs_direct_req *dreq = container_of(work, struct nfs_direct_req, work);
775         int flags = dreq->flags;
776
777         dreq->flags = 0;
778         switch (flags) {
779                 case NFS_ODIRECT_DO_COMMIT:
780                         nfs_direct_commit_schedule(dreq);
781                         break;
782                 case NFS_ODIRECT_RESCHED_WRITES:
783                         nfs_direct_write_reschedule(dreq);
784                         break;
785                 default:
786                         nfs_zap_mapping(dreq->inode, dreq->inode->i_mapping);
787                         nfs_direct_complete(dreq);
788         }
789 }
790
791 static void nfs_direct_write_complete(struct nfs_direct_req *dreq)
792 {
793         queue_work(nfsiod_workqueue, &dreq->work); /* Calls nfs_direct_write_schedule_work */
794 }
795
796 static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
797 {
798         struct nfs_direct_req *dreq = hdr->dreq;
799         struct nfs_commit_info cinfo;
800         bool request_commit = false;
801         struct nfs_page *req = nfs_list_entry(hdr->pages.next);
802
803         nfs_init_cinfo_from_dreq(&cinfo, dreq);
804
805         spin_lock(&dreq->lock);
806         if (test_bit(NFS_IOHDR_REDO, &hdr->flags)) {
807                 spin_unlock(&dreq->lock);
808                 goto out_put;
809         }
810
811         nfs_direct_count_bytes(dreq, hdr);
812         if (hdr->good_bytes != 0) {
813                 if (nfs_write_need_commit(hdr)) {
814                         if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES)
815                                 request_commit = true;
816                         else if (dreq->flags == 0) {
817                                 nfs_direct_set_hdr_verf(dreq, hdr);
818                                 request_commit = true;
819                                 dreq->flags = NFS_ODIRECT_DO_COMMIT;
820                         } else if (dreq->flags == NFS_ODIRECT_DO_COMMIT) {
821                                 request_commit = true;
822                                 if (nfs_direct_set_or_cmp_hdr_verf(dreq, hdr))
823                                         dreq->flags =
824                                                 NFS_ODIRECT_RESCHED_WRITES;
825                         }
826                 }
827         }
828         spin_unlock(&dreq->lock);
829
830         while (!list_empty(&hdr->pages)) {
831
832                 req = nfs_list_entry(hdr->pages.next);
833                 nfs_list_remove_request(req);
834                 if (request_commit) {
835                         kref_get(&req->wb_kref);
836                         nfs_mark_request_commit(req, hdr->lseg, &cinfo,
837                                 hdr->ds_commit_idx);
838                 }
839                 nfs_unlock_and_release_request(req);
840         }
841
842 out_put:
843         if (put_dreq(dreq))
844                 nfs_direct_write_complete(dreq);
845         hdr->release(hdr);
846 }
847
848 static void nfs_write_sync_pgio_error(struct list_head *head, int error)
849 {
850         struct nfs_page *req;
851
852         while (!list_empty(head)) {
853                 req = nfs_list_entry(head->next);
854                 nfs_list_remove_request(req);
855                 nfs_unlock_and_release_request(req);
856         }
857 }
858
859 static void nfs_direct_write_reschedule_io(struct nfs_pgio_header *hdr)
860 {
861         struct nfs_direct_req *dreq = hdr->dreq;
862
863         spin_lock(&dreq->lock);
864         if (dreq->error == 0) {
865                 dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
866                 /* fake unstable write to let common nfs resend pages */
867                 hdr->verf.committed = NFS_UNSTABLE;
868                 hdr->good_bytes = hdr->args.count;
869         }
870         spin_unlock(&dreq->lock);
871 }
872
873 static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops = {
874         .error_cleanup = nfs_write_sync_pgio_error,
875         .init_hdr = nfs_direct_pgio_init,
876         .completion = nfs_direct_write_completion,
877         .reschedule_io = nfs_direct_write_reschedule_io,
878 };
879
880
881 /*
882  * NB: Return the value of the first error return code.  Subsequent
883  *     errors after the first one are ignored.
884  */
885 /*
886  * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE
887  * operation.  If nfs_writedata_alloc() or get_user_pages() fails,
888  * bail and stop sending more writes.  Write length accounting is
889  * handled automatically by nfs_direct_write_result().  Otherwise, if
890  * no requests have been sent, just return an error.
891  */
892 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
893                                                struct iov_iter *iter,
894                                                loff_t pos)
895 {
896         struct nfs_pageio_descriptor desc;
897         struct inode *inode = dreq->inode;
898         ssize_t result = 0;
899         size_t requested_bytes = 0;
900         size_t wsize = max_t(size_t, NFS_SERVER(inode)->wsize, PAGE_SIZE);
901
902         nfs_pageio_init_write(&desc, inode, FLUSH_COND_STABLE, false,
903                               &nfs_direct_write_completion_ops);
904         desc.pg_dreq = dreq;
905         get_dreq(dreq);
906         inode_dio_begin(inode);
907
908         NFS_I(inode)->write_io += iov_iter_count(iter);
909         while (iov_iter_count(iter)) {
910                 struct page **pagevec;
911                 size_t bytes;
912                 size_t pgbase;
913                 unsigned npages, i;
914
915                 result = iov_iter_get_pages_alloc(iter, &pagevec, 
916                                                   wsize, &pgbase);
917                 if (result < 0)
918                         break;
919
920                 bytes = result;
921                 iov_iter_advance(iter, bytes);
922                 npages = (result + pgbase + PAGE_SIZE - 1) / PAGE_SIZE;
923                 for (i = 0; i < npages; i++) {
924                         struct nfs_page *req;
925                         unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
926
927                         req = nfs_create_request(dreq->ctx, pagevec[i],
928                                                  pgbase, req_len);
929                         if (IS_ERR(req)) {
930                                 result = PTR_ERR(req);
931                                 break;
932                         }
933
934                         nfs_direct_setup_mirroring(dreq, &desc, req);
935                         if (desc.pg_error < 0) {
936                                 nfs_free_request(req);
937                                 result = desc.pg_error;
938                                 break;
939                         }
940
941                         nfs_lock_request(req);
942                         req->wb_index = pos >> PAGE_SHIFT;
943                         req->wb_offset = pos & ~PAGE_MASK;
944                         if (!nfs_pageio_add_request(&desc, req)) {
945                                 result = desc.pg_error;
946                                 nfs_unlock_and_release_request(req);
947                                 break;
948                         }
949                         pgbase = 0;
950                         bytes -= req_len;
951                         requested_bytes += req_len;
952                         pos += req_len;
953                         dreq->bytes_left -= req_len;
954                 }
955                 nfs_direct_release_pages(pagevec, npages);
956                 kvfree(pagevec);
957                 if (result < 0)
958                         break;
959         }
960         nfs_pageio_complete(&desc);
961
962         /*
963          * If no bytes were started, return the error, and let the
964          * generic layer handle the completion.
965          */
966         if (requested_bytes == 0) {
967                 inode_dio_end(inode);
968                 nfs_direct_req_release(dreq);
969                 return result < 0 ? result : -EIO;
970         }
971
972         if (put_dreq(dreq))
973                 nfs_direct_write_complete(dreq);
974         return requested_bytes;
975 }
976
977 /**
978  * nfs_file_direct_write - file direct write operation for NFS files
979  * @iocb: target I/O control block
980  * @iter: vector of user buffers from which to write data
981  *
982  * We use this function for direct writes instead of calling
983  * generic_file_aio_write() in order to avoid taking the inode
984  * semaphore and updating the i_size.  The NFS server will set
985  * the new i_size and this client must read the updated size
986  * back into its cache.  We let the server do generic write
987  * parameter checking and report problems.
988  *
989  * We eliminate local atime updates, see direct read above.
990  *
991  * We avoid unnecessary page cache invalidations for normal cached
992  * readers of this file.
993  *
994  * Note that O_APPEND is not supported for NFS direct writes, as there
995  * is no atomic O_APPEND write facility in the NFS protocol.
996  */
997 ssize_t nfs_file_direct_write(struct kiocb *iocb, struct iov_iter *iter)
998 {
999         ssize_t result = -EINVAL, requested;
1000         size_t count;
1001         struct file *file = iocb->ki_filp;
1002         struct address_space *mapping = file->f_mapping;
1003         struct inode *inode = mapping->host;
1004         struct nfs_direct_req *dreq;
1005         struct nfs_lock_context *l_ctx;
1006         loff_t pos, end;
1007
1008         dfprintk(FILE, "NFS: direct write(%pD2, %zd@%Ld)\n",
1009                 file, iov_iter_count(iter), (long long) iocb->ki_pos);
1010
1011         result = generic_write_checks(iocb, iter);
1012         if (result <= 0)
1013                 return result;
1014         count = result;
1015         nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
1016
1017         pos = iocb->ki_pos;
1018         end = (pos + iov_iter_count(iter) - 1) >> PAGE_SHIFT;
1019
1020         task_io_account_write(count);
1021
1022         result = -ENOMEM;
1023         dreq = nfs_direct_req_alloc();
1024         if (!dreq)
1025                 goto out;
1026
1027         dreq->inode = inode;
1028         dreq->bytes_left = dreq->max_count = count;
1029         dreq->io_start = pos;
1030         dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
1031         l_ctx = nfs_get_lock_context(dreq->ctx);
1032         if (IS_ERR(l_ctx)) {
1033                 result = PTR_ERR(l_ctx);
1034                 goto out_release;
1035         }
1036         dreq->l_ctx = l_ctx;
1037         if (!is_sync_kiocb(iocb))
1038                 dreq->iocb = iocb;
1039
1040         nfs_start_io_direct(inode);
1041
1042         requested = nfs_direct_write_schedule_iovec(dreq, iter, pos);
1043
1044         if (mapping->nrpages) {
1045                 invalidate_inode_pages2_range(mapping,
1046                                               pos >> PAGE_SHIFT, end);
1047         }
1048
1049         nfs_end_io_direct(inode);
1050
1051         if (requested > 0) {
1052                 result = nfs_direct_wait(dreq);
1053                 if (result > 0) {
1054                         requested -= result;
1055                         iocb->ki_pos = pos + result;
1056                         /* XXX: should check the generic_write_sync retval */
1057                         generic_write_sync(iocb, result);
1058                 }
1059                 iov_iter_revert(iter, requested);
1060         } else {
1061                 result = requested;
1062         }
1063 out_release:
1064         nfs_direct_req_release(dreq);
1065 out:
1066         return result;
1067 }
1068
1069 /**
1070  * nfs_init_directcache - create a slab cache for nfs_direct_req structures
1071  *
1072  */
1073 int __init nfs_init_directcache(void)
1074 {
1075         nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
1076                                                 sizeof(struct nfs_direct_req),
1077                                                 0, (SLAB_RECLAIM_ACCOUNT|
1078                                                         SLAB_MEM_SPREAD),
1079                                                 NULL);
1080         if (nfs_direct_cachep == NULL)
1081                 return -ENOMEM;
1082
1083         return 0;
1084 }
1085
1086 /**
1087  * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures
1088  *
1089  */
1090 void nfs_destroy_directcache(void)
1091 {
1092         kmem_cache_destroy(nfs_direct_cachep);
1093 }