diff options
author | Alex Elder <elder@inktank.com> | 2013-02-14 18:16:43 (GMT) |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-05-02 04:16:27 (GMT) |
commit | 0fff87ec798abdb4a99f01cbb0197266bb68c5dc (patch) | |
tree | 33c853319e28ed9dd20835c1f3f066be404b50a1 /fs/ceph | |
parent | 2ac2b7a6d4976bd6b5dc0751aa77d12d48d3ac4c (diff) | |
download | linux-0fff87ec798abdb4a99f01cbb0197266bb68c5dc.tar.xz |
libceph: separate read and write data
An osd request defines information about where data to be read
should be placed as well as where data to write comes from.
Currently these are represented by common fields.
Keep information about data for writing separate from data to be
read by splitting these into data_in and data_out fields.
This is the key patch in this whole series, in that it actually
identifies which osd requests generate outgoing data and which
generate incoming data. It's less obvious (currently) that an osd
CALL op generates both outgoing and incoming data; that's the focus
of some upcoming work.
This resolves:
http://tracker.ceph.com/issues/4127
Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'fs/ceph')
-rw-r--r-- | fs/ceph/addr.c | 67 | ||||
-rw-r--r-- | fs/ceph/file.c | 10 |
2 files changed, 41 insertions, 36 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 276fe96..c117c51 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -243,9 +243,9 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); /* unlock all pages, zeroing any data we didn't read */ - BUG_ON(req->r_data.type != CEPH_OSD_DATA_TYPE_PAGES); - for (i = 0; i < req->r_data.num_pages; i++, bytes -= PAGE_CACHE_SIZE) { - struct page *page = req->r_data.pages[i]; + BUG_ON(req->r_data_in.type != CEPH_OSD_DATA_TYPE_PAGES); + for (i = 0; i < req->r_data_in.num_pages; i++) { + struct page *page = req->r_data_in.pages[i]; if (bytes < (int)PAGE_CACHE_SIZE) { /* zero (remainder of) page */ @@ -258,8 +258,9 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) SetPageUptodate(page); unlock_page(page); page_cache_release(page); + bytes -= PAGE_CACHE_SIZE; } - kfree(req->r_data.pages); + kfree(req->r_data_in.pages); } static void ceph_unlock_page_vector(struct page **pages, int num_pages) @@ -337,10 +338,10 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) } pages[i] = page; } - req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; - req->r_data.pages = pages; - req->r_data.num_pages = nr_pages; - req->r_data.alignment = 0; + req->r_data_in.type = CEPH_OSD_DATA_TYPE_PAGES; + req->r_data_in.pages = pages; + req->r_data_in.num_pages = nr_pages; + req->r_data_in.alignment = 0; req->r_callback = finish_read; req->r_inode = inode; @@ -563,7 +564,7 @@ static void writepages_finish(struct ceph_osd_request *req, long writeback_stat; unsigned issued = ceph_caps_issued(ci); - BUG_ON(req->r_data.type != CEPH_OSD_DATA_TYPE_PAGES); + BUG_ON(req->r_data_out.type != CEPH_OSD_DATA_TYPE_PAGES); if (rc >= 0) { /* * Assume we wrote the pages we originally sent. The @@ -571,7 +572,7 @@ static void writepages_finish(struct ceph_osd_request *req, * raced with a truncation and was adjusted at the osd, * so don't believe the reply. */ - wrote = req->r_data.num_pages; + wrote = req->r_data_out.num_pages; } else { wrote = 0; mapping_set_error(mapping, rc); @@ -580,8 +581,8 @@ static void writepages_finish(struct ceph_osd_request *req, inode, rc, bytes, wrote); /* clean all pages */ - for (i = 0; i < req->r_data.num_pages; i++) { - page = req->r_data.pages[i]; + for (i = 0; i < req->r_data_out.num_pages; i++) { + page = req->r_data_out.pages[i]; BUG_ON(!page); WARN_ON(!PageUptodate(page)); @@ -610,31 +611,34 @@ static void writepages_finish(struct ceph_osd_request *req, unlock_page(page); } dout("%p wrote+cleaned %d pages\n", inode, wrote); - ceph_put_wrbuffer_cap_refs(ci, req->r_data.num_pages, snapc); + ceph_put_wrbuffer_cap_refs(ci, req->r_data_out.num_pages, snapc); - ceph_release_pages(req->r_data.pages, req->r_data.num_pages); - if (req->r_data.pages_from_pool) - mempool_free(req->r_data.pages, + ceph_release_pages(req->r_data_out.pages, req->r_data_out.num_pages); + if (req->r_data_out.pages_from_pool) + mempool_free(req->r_data_out.pages, ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool); else - kfree(req->r_data.pages); + kfree(req->r_data_out.pages); ceph_osdc_put_request(req); } /* * allocate a page vec, either directly, or if necessary, via a the - * mempool. we avoid the mempool if we can because req->r_data.num_pages + * mempool. we avoid the mempool if we can because req->r_data_out.num_pages * may be less than the maximum write size. */ static void alloc_page_vec(struct ceph_fs_client *fsc, struct ceph_osd_request *req) { - req->r_data.pages = kmalloc(sizeof(struct page *) * req->r_data.num_pages, - GFP_NOFS); - if (!req->r_data.pages) { - req->r_data.pages = mempool_alloc(fsc->wb_pagevec_pool, GFP_NOFS); - req->r_data.pages_from_pool = 1; - WARN_ON(!req->r_data.pages); + size_t size; + + size = sizeof (struct page *) * req->r_data_out.num_pages; + req->r_data_out.pages = kmalloc(size, GFP_NOFS); + if (!req->r_data_out.pages) { + req->r_data_out.pages = mempool_alloc(fsc->wb_pagevec_pool, + GFP_NOFS); + req->r_data_out.pages_from_pool = 1; + WARN_ON(!req->r_data_out.pages); } } @@ -833,10 +837,11 @@ get_more_pages: break; } - req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; - req->r_data.num_pages = calc_pages_for(0, len); - req->r_data.alignment = 0; - max_pages = req->r_data.num_pages; + req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGES; + req->r_data_out.num_pages = + calc_pages_for(0, len); + req->r_data_out.alignment = 0; + max_pages = req->r_data_out.num_pages; alloc_page_vec(fsc, req); req->r_callback = writepages_finish; @@ -858,7 +863,7 @@ get_more_pages: } set_page_writeback(page); - req->r_data.pages[locked_pages] = page; + req->r_data_out.pages[locked_pages] = page; locked_pages++; next = page->index + 1; } @@ -888,14 +893,14 @@ get_more_pages: } /* submit the write */ - offset = req->r_data.pages[0]->index << PAGE_CACHE_SHIFT; + offset = req->r_data_out.pages[0]->index << PAGE_CACHE_SHIFT; len = min((snap_size ? snap_size : i_size_read(inode)) - offset, (u64)locked_pages << PAGE_CACHE_SHIFT); dout("writepages got %d pages at %llu~%llu\n", locked_pages, offset, len); /* revise final length, page count */ - req->r_data.num_pages = locked_pages; + req->r_data_out.num_pages = locked_pages; req->r_request_ops[0].extent.length = cpu_to_le64(len); req->r_request_ops[0].payload_len = cpu_to_le32(len); req->r_request->hdr.data_len = cpu_to_le32(len); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 3643a38..501fb37 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -568,13 +568,13 @@ more: if ((file->f_flags & O_SYNC) == 0) { /* get a second commit callback */ req->r_safe_callback = sync_write_commit; - req->r_data.own_pages = 1; + req->r_data_out.own_pages = 1; } } - req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; - req->r_data.pages = pages; - req->r_data.num_pages = num_pages; - req->r_data.alignment = page_align; + req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGES; + req->r_data_out.pages = pages; + req->r_data_out.num_pages = num_pages; + req->r_data_out.alignment = page_align; req->r_inode = inode; ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); |