summaryrefslogtreecommitdiff
path: root/fs/nfs/direct.c
diff options
context:
space:
mode:
authorFred Isaman <iisaman@netapp.com>2012-04-20 18:47:51 (GMT)
committerTrond Myklebust <Trond.Myklebust@netapp.com>2012-04-27 18:10:38 (GMT)
commit584aa810b6240d88c28113a90c5029449814a3b5 (patch)
tree694b0942747c9ee7b8f53f21cb81ddc32cc07bbb /fs/nfs/direct.c
parent1825a0d08f22463e5a8f4b1636473efd057a3479 (diff)
downloadlinux-584aa810b6240d88c28113a90c5029449814a3b5.tar.xz
NFS: rewrite directio read to use async coalesce code
This also has the advantage that it allows directio to use pnfs. Signed-off-by: Fred Isaman <iisaman@netapp.com> Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
Diffstat (limited to 'fs/nfs/direct.c')
-rw-r--r--fs/nfs/direct.c255
1 files changed, 123 insertions, 132 deletions
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 22a40c4..4ba9a2c 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -124,22 +124,6 @@ ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_
return -EINVAL;
}
-static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count)
-{
- unsigned int npages;
- unsigned int i;
-
- if (count == 0)
- return;
- pages += (pgbase >> PAGE_SHIFT);
- npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT;
- for (i = 0; i < npages; i++) {
- struct page *page = pages[i];
- if (!PageCompound(page))
- set_page_dirty(page);
- }
-}
-
static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
{
unsigned int i;
@@ -226,58 +210,92 @@ static void nfs_direct_complete(struct nfs_direct_req *dreq)
nfs_direct_req_release(dreq);
}
-/*
- * We must hold a reference to all the pages in this direct read request
- * until the RPCs complete. This could be long *after* we are woken up in
- * nfs_direct_wait (for instance, if someone hits ^C on a slow server).
- */
-static void nfs_direct_read_result(struct rpc_task *task, void *calldata)
+void nfs_direct_readpage_release(struct nfs_page *req)
{
- struct nfs_read_data *data = calldata;
-
- nfs_readpage_result(task, data);
+ dprintk("NFS: direct read done (%s/%lld %d@%lld)\n",
+ req->wb_context->dentry->d_inode->i_sb->s_id,
+ (long long)NFS_FILEID(req->wb_context->dentry->d_inode),
+ req->wb_bytes,
+ (long long)req_offset(req));
+ nfs_release_request(req);
}
-static void nfs_direct_read_release(void *calldata)
+static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
{
+ unsigned long bytes = 0;
+ struct nfs_direct_req *dreq = hdr->dreq;
- struct nfs_read_data *data = calldata;
- struct nfs_direct_req *dreq = (struct nfs_direct_req *)data->header->req;
- int status = data->task.tk_status;
+ if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
+ goto out_put;
spin_lock(&dreq->lock);
- if (unlikely(status < 0)) {
- dreq->error = status;
- spin_unlock(&dreq->lock);
+ if (test_bit(NFS_IOHDR_ERROR, &hdr->flags) && (hdr->good_bytes == 0))
+ dreq->error = hdr->error;
+ else
+ dreq->count += hdr->good_bytes;
+ spin_unlock(&dreq->lock);
+
+ if (!test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
+ while (!list_empty(&hdr->pages)) {
+ struct nfs_page *req = nfs_list_entry(hdr->pages.next);
+ struct page *page = req->wb_page;
+
+ if (test_bit(NFS_IOHDR_EOF, &hdr->flags)) {
+ if (bytes > hdr->good_bytes)
+ zero_user(page, 0, PAGE_SIZE);
+ else if (hdr->good_bytes - bytes < PAGE_SIZE)
+ zero_user_segment(page,
+ hdr->good_bytes & ~PAGE_MASK,
+ PAGE_SIZE);
+ }
+ bytes += req->wb_bytes;
+ nfs_list_remove_request(req);
+ nfs_direct_readpage_release(req);
+ if (!PageCompound(page))
+ set_page_dirty(page);
+ page_cache_release(page);
+ }
} else {
- dreq->count += data->res.count;
- spin_unlock(&dreq->lock);
- nfs_direct_dirty_pages(data->pages.pagevec,
- data->args.pgbase,
- data->res.count);
+ while (!list_empty(&hdr->pages)) {
+ struct nfs_page *req = nfs_list_entry(hdr->pages.next);
+
+ if (bytes < hdr->good_bytes)
+ if (!PageCompound(req->wb_page))
+ set_page_dirty(req->wb_page);
+ bytes += req->wb_bytes;
+ page_cache_release(req->wb_page);
+ nfs_list_remove_request(req);
+ nfs_direct_readpage_release(req);
+ }
}
- nfs_direct_release_pages(data->pages.pagevec, data->pages.npages);
-
+out_put:
if (put_dreq(dreq))
nfs_direct_complete(dreq);
- nfs_readdata_release(data);
+ hdr->release(hdr);
}
-static const struct rpc_call_ops nfs_read_direct_ops = {
- .rpc_call_prepare = nfs_read_prepare,
- .rpc_call_done = nfs_direct_read_result,
- .rpc_release = nfs_direct_read_release,
-};
-
-static void nfs_direct_readhdr_release(struct nfs_read_header *rhdr)
+static void nfs_sync_pgio_error(struct list_head *head)
{
- struct nfs_read_data *data = &rhdr->rpc_data;
+ struct nfs_page *req;
- if (data->pages.pagevec != data->pages.page_array)
- kfree(data->pages.pagevec);
- nfs_readhdr_free(&rhdr->header);
+ while (!list_empty(head)) {
+ req = nfs_list_entry(head->next);
+ nfs_list_remove_request(req);
+ nfs_release_request(req);
+ }
}
+static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
+{
+ get_dreq(hdr->dreq);
+}
+
+static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
+ .error_cleanup = nfs_sync_pgio_error,
+ .init_hdr = nfs_direct_pgio_init,
+ .completion = nfs_direct_read_completion,
+};
+
/*
* For each rsize'd chunk of the user's buffer, dispatch an NFS READ
* operation. If nfs_readdata_alloc() or get_user_pages() fails,
@@ -285,118 +303,85 @@ static void nfs_direct_readhdr_release(struct nfs_read_header *rhdr)
* handled automatically by nfs_direct_read_result(). Otherwise, if
* no requests have been sent, just return an error.
*/
-static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
+static ssize_t nfs_direct_read_schedule_segment(struct nfs_pageio_descriptor *desc,
const struct iovec *iov,
loff_t pos)
{
+ struct nfs_direct_req *dreq = desc->pg_dreq;
struct nfs_open_context *ctx = dreq->ctx;
struct inode *inode = ctx->dentry->d_inode;
unsigned long user_addr = (unsigned long)iov->iov_base;
size_t count = iov->iov_len;
size_t rsize = NFS_SERVER(inode)->rsize;
- struct rpc_task *task;
- struct rpc_message msg = {
- .rpc_cred = ctx->cred,
- };
- struct rpc_task_setup task_setup_data = {
- .rpc_client = NFS_CLIENT(inode),
- .rpc_message = &msg,
- .callback_ops = &nfs_read_direct_ops,
- .workqueue = nfsiod_workqueue,
- .flags = RPC_TASK_ASYNC,
- };
unsigned int pgbase;
int result;
ssize_t started = 0;
+ struct page **pagevec = NULL;
+ unsigned int npages;
do {
- struct nfs_read_header *rhdr;
- struct nfs_read_data *data;
- struct nfs_page_array *pages;
size_t bytes;
+ int i;
pgbase = user_addr & ~PAGE_MASK;
- bytes = min(rsize,count);
+ bytes = min(max(rsize, PAGE_SIZE), count);
result = -ENOMEM;
- rhdr = nfs_readhdr_alloc();
- if (unlikely(!rhdr))
- break;
- data = nfs_readdata_alloc(&rhdr->header, nfs_page_array_len(pgbase, bytes));
- if (!data) {
- nfs_readhdr_free(&rhdr->header);
+ npages = nfs_page_array_len(pgbase, bytes);
+ if (!pagevec)
+ pagevec = kmalloc(npages * sizeof(struct page *),
+ GFP_KERNEL);
+ if (!pagevec)
break;
- }
- data->header = &rhdr->header;
- atomic_inc(&data->header->refcnt);
- pages = &data->pages;
-
down_read(&current->mm->mmap_sem);
result = get_user_pages(current, current->mm, user_addr,
- pages->npages, 1, 0, pages->pagevec, NULL);
+ npages, 1, 0, pagevec, NULL);
up_read(&current->mm->mmap_sem);
- if (result < 0) {
- nfs_direct_readhdr_release(rhdr);
+ if (result < 0)
break;
- }
- if ((unsigned)result < pages->npages) {
+ if ((unsigned)result < npages) {
bytes = result * PAGE_SIZE;
if (bytes <= pgbase) {
- nfs_direct_release_pages(pages->pagevec, result);
- nfs_direct_readhdr_release(rhdr);
+ nfs_direct_release_pages(pagevec, result);
break;
}
bytes -= pgbase;
- pages->npages = result;
+ npages = result;
}
- get_dreq(dreq);
-
- rhdr->header.req = (struct nfs_page *) dreq;
- rhdr->header.inode = inode;
- rhdr->header.cred = msg.rpc_cred;
- data->args.fh = NFS_FH(inode);
- data->args.context = get_nfs_open_context(ctx);
- data->args.lock_context = dreq->l_ctx;
- data->args.offset = pos;
- data->args.pgbase = pgbase;
- data->args.pages = pages->pagevec;
- data->args.count = bytes;
- data->res.fattr = &data->fattr;
- data->res.eof = 0;
- data->res.count = bytes;
- nfs_fattr_init(&data->fattr);
- msg.rpc_argp = &data->args;
- msg.rpc_resp = &data->res;
-
- task_setup_data.task = &data->task;
- task_setup_data.callback_data = data;
- NFS_PROTO(inode)->read_setup(data, &msg);
-
- task = rpc_run_task(&task_setup_data);
- if (IS_ERR(task))
- break;
-
- dprintk("NFS: %5u initiated direct read call "
- "(req %s/%Ld, %zu bytes @ offset %Lu)\n",
- task->tk_pid,
- inode->i_sb->s_id,
- (long long)NFS_FILEID(inode),
- bytes,
- (unsigned long long)data->args.offset);
- rpc_put_task(task);
-
- started += bytes;
- user_addr += bytes;
- pos += bytes;
- /* FIXME: Remove this unnecessary math from final patch */
- pgbase += bytes;
- pgbase &= ~PAGE_MASK;
- BUG_ON(pgbase != (user_addr & ~PAGE_MASK));
-
- count -= bytes;
+ for (i = 0; i < npages; i++) {
+ struct nfs_page *req;
+ unsigned int req_len = min(bytes, PAGE_SIZE - pgbase);
+ /* XXX do we need to do the eof zeroing found in async_filler? */
+ req = nfs_create_request(dreq->ctx, dreq->inode,
+ pagevec[i],
+ pgbase, req_len);
+ if (IS_ERR(req)) {
+ nfs_direct_release_pages(pagevec + i,
+ npages - i);
+ result = PTR_ERR(req);
+ break;
+ }
+ req->wb_index = pos >> PAGE_SHIFT;
+ req->wb_offset = pos & ~PAGE_MASK;
+ if (!nfs_pageio_add_request(desc, req)) {
+ result = desc->pg_error;
+ nfs_release_request(req);
+ nfs_direct_release_pages(pagevec + i,
+ npages - i);
+ break;
+ }
+ pgbase = 0;
+ bytes -= req_len;
+ started += req_len;
+ user_addr += req_len;
+ pos += req_len;
+ count -= req_len;
+ }
} while (count != 0);
+ kfree(pagevec);
+
if (started)
return started;
return result < 0 ? (ssize_t) result : -EFAULT;
@@ -407,15 +392,19 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
unsigned long nr_segs,
loff_t pos)
{
+ struct nfs_pageio_descriptor desc;
ssize_t result = -EINVAL;
size_t requested_bytes = 0;
unsigned long seg;
+ nfs_pageio_init_read(&desc, dreq->inode,
+ &nfs_direct_read_completion_ops);
get_dreq(dreq);
+ desc.pg_dreq = dreq;
for (seg = 0; seg < nr_segs; seg++) {
const struct iovec *vec = &iov[seg];
- result = nfs_direct_read_schedule_segment(dreq, vec, pos);
+ result = nfs_direct_read_schedule_segment(&desc, vec, pos);
if (result < 0)
break;
requested_bytes += result;
@@ -424,6 +413,8 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
pos += vec->iov_len;
}
+ nfs_pageio_complete(&desc);
+
/*
* If no bytes were started, return the error, and let the
* generic layer handle the completion.