From 13bf283408077931ace05449de92c68c1cb55120 Mon Sep 17 00:00:00 2001 From: Julia Lawall Date: Sun, 13 Sep 2015 14:15:26 +0200 Subject: rbd: drop null test before destroy functions Remove unneeded NULL test. The semantic patch that makes this change is as follows: (http://coccinelle.lip6.fr/) // @@ expression x; @@ -if (x != NULL) { \(kmem_cache_destroy\|mempool_destroy\|dma_pool_destroy\)(x); x = NULL; -} // Signed-off-by: Julia Lawall Signed-off-by: Ilya Dryomov diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 128e7df..8f3dcb6 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -5663,10 +5663,8 @@ static int rbd_slab_init(void) if (rbd_segment_name_cache) return 0; out_err: - if (rbd_obj_request_cache) { - kmem_cache_destroy(rbd_obj_request_cache); - rbd_obj_request_cache = NULL; - } + kmem_cache_destroy(rbd_obj_request_cache); + rbd_obj_request_cache = NULL; kmem_cache_destroy(rbd_img_request_cache); rbd_img_request_cache = NULL; -- cgit v0.10.2 From 1291fb950f12005600eb410c206fffd7231dee6f Mon Sep 17 00:00:00 2001 From: Geliang Tang Date: Wed, 30 Sep 2015 11:41:05 +0800 Subject: ceph: fix a comment typo Signed-off-by: Geliang Tang Signed-off-by: Yan, Zheng diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c index 834f9f3..a4766de 100644 --- a/fs/ceph/cache.c +++ b/fs/ceph/cache.c @@ -88,7 +88,7 @@ static uint16_t ceph_fscache_inode_get_key(const void *cookie_netfs_data, const struct ceph_inode_info* ci = cookie_netfs_data; uint16_t klen; - /* use ceph virtual inode (id + snaphot) */ + /* use ceph virtual inode (id + snapshot) */ klen = sizeof(ci->i_vino); if (klen > maxbuf) return 0; -- cgit v0.10.2 From 777d738a5e58ba3b6f3932ab1543ce93703f4873 Mon Sep 17 00:00:00 2001 From: Arnd Bergmann Date: Wed, 30 Sep 2015 15:04:42 +0200 Subject: ceph: fix message length computation create_request_message() computes the maximum length of a message, but uses the wrong type for the time stamp: sizeof(struct timespec) may be 8 or 16 depending on the architecture, while sizeof(struct ceph_timespec) is always 8, and that is what gets put into the message. Found while auditing the uses of timespec for y2038 problems. Fixes: b8e69066d8af ("ceph: include time stamp in every MDS request") Signed-off-by: Arnd Bergmann Signed-off-by: Yan, Zheng diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 51cb02d..fe2c982 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1935,7 +1935,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, len = sizeof(*head) + pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + - sizeof(struct timespec); + sizeof(struct ceph_timespec); /* calculate (max) length for cap releases */ len += sizeof(struct ceph_mds_request_release) * -- cgit v0.10.2 From b5b98989dc7ed2093aeb76f2d0db79888582b0a2 Mon Sep 17 00:00:00 2001 From: "Zhu, Caifeng" Date: Thu, 8 Oct 2015 15:26:15 +0800 Subject: ceph: combine as many iovec as possile into one OSD request Both ceph_sync_direct_write and ceph_sync_read iterate iovec elements one by one, send one OSD request for each iovec. This is sub-optimal, We can combine serveral iovec into one page vector, and send an OSD request for the whole page vector. Signed-off-by: Zhu, Caifeng Signed-off-by: Yan, Zheng diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 0c62868..3c68e6a 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -34,6 +34,74 @@ * need to wait for MDS acknowledgement. */ +/* + * Calculate the length sum of direct io vectors that can + * be combined into one page vector. + */ +static size_t dio_get_pagev_size(const struct iov_iter *it) +{ + const struct iovec *iov = it->iov; + const struct iovec *iovend = iov + it->nr_segs; + size_t size; + + size = iov->iov_len - it->iov_offset; + /* + * An iov can be page vectored when both the current tail + * and the next base are page aligned. + */ + while (PAGE_ALIGNED((iov->iov_base + iov->iov_len)) && + (++iov < iovend && PAGE_ALIGNED((iov->iov_base)))) { + size += iov->iov_len; + } + dout("dio_get_pagevlen len = %zu\n", size); + return size; +} + +/* + * Allocate a page vector based on (@it, @nbytes). + * The return value is the tuple describing a page vector, + * that is (@pages, @page_align, @num_pages). + */ +static struct page ** +dio_get_pages_alloc(const struct iov_iter *it, size_t nbytes, + size_t *page_align, int *num_pages) +{ + struct iov_iter tmp_it = *it; + size_t align; + struct page **pages; + int ret = 0, idx, npages; + + align = (unsigned long)(it->iov->iov_base + it->iov_offset) & + (PAGE_SIZE - 1); + npages = calc_pages_for(align, nbytes); + pages = kmalloc(sizeof(*pages) * npages, GFP_KERNEL); + if (!pages) { + pages = vmalloc(sizeof(*pages) * npages); + if (!pages) + return ERR_PTR(-ENOMEM); + } + + for (idx = 0; idx < npages; ) { + size_t start; + ret = iov_iter_get_pages(&tmp_it, pages + idx, nbytes, + npages - idx, &start); + if (ret < 0) + goto fail; + + iov_iter_advance(&tmp_it, ret); + nbytes -= ret; + idx += (ret + start + PAGE_SIZE - 1) / PAGE_SIZE; + } + + BUG_ON(nbytes != 0); + *num_pages = npages; + *page_align = align; + dout("dio_get_pages_alloc: got %d pages align %zu\n", npages, align); + return pages; +fail: + ceph_put_page_vector(pages, idx, false); + return ERR_PTR(ret); +} /* * Prepare an open request. Preallocate ceph_cap to avoid an @@ -458,11 +526,10 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, size_t start; ssize_t n; - n = iov_iter_get_pages_alloc(i, &pages, INT_MAX, &start); - if (n < 0) - return n; - - num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE; + n = dio_get_pagev_size(i); + pages = dio_get_pages_alloc(i, n, &start, &num_pages); + if (IS_ERR(pages)) + return PTR_ERR(pages); ret = striped_read(inode, off, n, pages, num_pages, checkeof, @@ -592,7 +659,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, CEPH_OSD_FLAG_WRITE; while (iov_iter_count(from) > 0) { - u64 len = iov_iter_single_seg_count(from); + u64 len = dio_get_pagev_size(from); size_t start; ssize_t n; @@ -611,14 +678,14 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); - n = iov_iter_get_pages_alloc(from, &pages, len, &start); - if (unlikely(n < 0)) { - ret = n; + n = len; + pages = dio_get_pages_alloc(from, len, &start, &num_pages); + if (IS_ERR(pages)) { ceph_osdc_put_request(req); + ret = PTR_ERR(pages); break; } - num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE; /* * throw out any page cache pages in this range. this * may block. -- cgit v0.10.2 From 70cf052d0c4b60b6fbb981380660893306b9f172 Mon Sep 17 00:00:00 2001 From: Shraddha Barke Date: Sun, 18 Oct 2015 13:55:38 +0530 Subject: libceph: remove con argument in handle_reply() Since handle_reply() does not use its con argument, remove it. Signed-off-by: Shraddha Barke Signed-off-by: Ilya Dryomov diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index f79ccac..a362d7e 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1750,8 +1750,7 @@ static void complete_request(struct ceph_osd_request *req) * handle osd op reply. either call the callback if it is specified, * or do the completion to wake up the waiting thread. */ -static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, - struct ceph_connection *con) +static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) { void *p, *end; struct ceph_osd_request *req; @@ -2807,7 +2806,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) ceph_osdc_handle_map(osdc, msg); break; case CEPH_MSG_OSD_OPREPLY: - handle_reply(osdc, msg, con); + handle_reply(osdc, msg); break; case CEPH_MSG_WATCH_NOTIFY: handle_watch_notify(osdc, msg); -- cgit v0.10.2 From 343128ce91836d4131ead74b53d83b72e93d55b2 Mon Sep 17 00:00:00 2001 From: Shraddha Barke Date: Mon, 19 Oct 2015 21:59:00 +0530 Subject: libceph: use local variable cursor instead of &msg->cursor Use local variable cursor in place of &msg->cursor in read_partial_msg_data() and write_partial_msg_data(). Signed-off-by: Shraddha Barke Signed-off-by: Ilya Dryomov diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index b9b0e3b..fce6ad63 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1552,8 +1552,8 @@ static int write_partial_message_data(struct ceph_connection *con) bool need_crc; int ret; - page = ceph_msg_data_next(&msg->cursor, &page_offset, &length, - &last_piece); + page = ceph_msg_data_next(cursor, &page_offset, &length, + &last_piece); ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, !last_piece); if (ret <= 0) { @@ -1564,7 +1564,7 @@ static int write_partial_message_data(struct ceph_connection *con) } if (do_datacrc && cursor->need_crc) crc = ceph_crc32c_page(crc, page, page_offset, length); - need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret); + need_crc = ceph_msg_data_advance(cursor, (size_t)ret); } dout("%s %p msg %p done\n", __func__, con, msg); @@ -2246,8 +2246,7 @@ static int read_partial_msg_data(struct ceph_connection *con) if (do_datacrc) crc = con->in_data_crc; while (cursor->resid) { - page = ceph_msg_data_next(&msg->cursor, &page_offset, &length, - NULL); + page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); if (ret <= 0) { if (do_datacrc) @@ -2258,7 +2257,7 @@ static int read_partial_msg_data(struct ceph_connection *con) if (do_datacrc) crc = ceph_crc32c_page(crc, page, page_offset, ret); - (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret); + (void) ceph_msg_data_advance(cursor, (size_t)ret); } if (do_datacrc) con->in_data_crc = crc; -- cgit v0.10.2 From b51c83c241910f66b0c9a2ab17cd57db8109a98f Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 15 Oct 2015 15:38:57 +0200 Subject: rbd: return -ENOMEM instead of pool id if rbd_dev_create() fails Returning pool id (i.e. >= 0) from a sysfs ->store() callback makes userspace think it needs to retry the write. Fix it - it's a leftover from the times when the equivalent of rbd_dev_create() was the first action in rbd_add(). Signed-off-by: Ilya Dryomov diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 8f3dcb6..5e7234d 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -5397,7 +5397,7 @@ static ssize_t do_rbd_add(struct bus_type *bus, struct rbd_spec *spec = NULL; struct rbd_client *rbdc; bool read_only; - int rc = -ENOMEM; + int rc; if (!try_module_get(THIS_MODULE)) return -ENODEV; @@ -5432,8 +5432,10 @@ static ssize_t do_rbd_add(struct bus_type *bus, } rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts); - if (!rbd_dev) + if (!rbd_dev) { + rc = -ENOMEM; goto err_out_client; + } rbdc = NULL; /* rbd_dev now owns this */ spec = NULL; /* rbd_dev now owns this */ rbd_opts = NULL; /* rbd_dev now owns this */ -- cgit v0.10.2 From dd5ac32d425f881624bfe59c8e00dd1c3ccc6bb1 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 16 Oct 2015 17:09:24 +0200 Subject: rbd: don't free rbd_dev outside of the release callback struct rbd_device has struct device embedded in it, which means it's part of kobject universe and has an unpredictable life cycle. Freeing its memory outside of the release callback is flawed, yet commits 200a6a8be5db ("rbd: don't destroy rbd_dev in device release function") and 8ad42cd0c002 ("rbd: don't have device release destroy rbd_dev") moved rbd_dev_destroy() out to rbd_dev_image_release(). This commit reverts most of that, the key points are: - rbd_dev->dev is initialized in rbd_dev_create(), making it possible to use rbd_dev_destroy() - which is just a put_device() - both before we register with device core and after. - rbd_dev_release() (the release callback) is the only place we kfree(rbd_dev). It's also where we do module_put(), keeping the module unload race window as small as possible. - We pin the module in rbd_dev_create(), but only for mapping rbd_dev-s. Moving image related stuff out of struct rbd_device into another struct which isn't tied with sysfs and device core is long overdue, but until that happens, this will keep rbd module refcount (which users can observe with lsmod) sane. Fixes: http://tracker.ceph.com/issues/12697 Cc: Alex Elder Signed-off-by: Ilya Dryomov diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 5e7234d..fd7bd87 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -418,8 +418,6 @@ MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (d static int rbd_img_request_submit(struct rbd_img_request *img_request); -static void rbd_dev_device_release(struct device *dev); - static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count); static ssize_t rbd_remove(struct bus_type *bus, const char *buf, @@ -4041,6 +4039,25 @@ static void rbd_spec_free(struct kref *kref) kfree(spec); } +static void rbd_dev_release(struct device *dev) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + bool need_put = !!rbd_dev->opts; + + rbd_put_client(rbd_dev->rbd_client); + rbd_spec_put(rbd_dev->spec); + kfree(rbd_dev->opts); + kfree(rbd_dev); + + /* + * This is racy, but way better than putting module outside of + * the release callback. The race window is pretty small, so + * doing something similar to dm (dm-builtin.c) is overkill. + */ + if (need_put) + module_put(THIS_MODULE); +} + static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, struct rbd_spec *spec, struct rbd_options *opts) @@ -4057,6 +4074,12 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, INIT_LIST_HEAD(&rbd_dev->node); init_rwsem(&rbd_dev->header_rwsem); + rbd_dev->dev.bus = &rbd_bus_type; + rbd_dev->dev.type = &rbd_device_type; + rbd_dev->dev.parent = &rbd_root_dev; + rbd_dev->dev.release = rbd_dev_release; + device_initialize(&rbd_dev->dev); + rbd_dev->rbd_client = rbdc; rbd_dev->spec = spec; rbd_dev->opts = opts; @@ -4068,15 +4091,21 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id); + /* + * If this is a mapping rbd_dev (as opposed to a parent one), + * pin our module. We have a ref from do_rbd_add(), so use + * __module_get(). + */ + if (rbd_dev->opts) + __module_get(THIS_MODULE); + return rbd_dev; } static void rbd_dev_destroy(struct rbd_device *rbd_dev) { - rbd_put_client(rbd_dev->rbd_client); - rbd_spec_put(rbd_dev->spec); - kfree(rbd_dev->opts); - kfree(rbd_dev); + if (rbd_dev) + put_device(&rbd_dev->dev); } /* @@ -4702,27 +4731,6 @@ static int rbd_dev_header_info(struct rbd_device *rbd_dev) return rbd_dev_v2_header_info(rbd_dev); } -static int rbd_bus_add_dev(struct rbd_device *rbd_dev) -{ - struct device *dev; - int ret; - - dev = &rbd_dev->dev; - dev->bus = &rbd_bus_type; - dev->type = &rbd_device_type; - dev->parent = &rbd_root_dev; - dev->release = rbd_dev_device_release; - dev_set_name(dev, "%d", rbd_dev->dev_id); - ret = device_register(dev); - - return ret; -} - -static void rbd_bus_del_dev(struct rbd_device *rbd_dev) -{ - device_unregister(&rbd_dev->dev); -} - /* * Get a unique rbd identifier for the given new rbd_dev, and add * the rbd_dev to the global list. @@ -5225,7 +5233,8 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); - ret = rbd_bus_add_dev(rbd_dev); + dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id); + ret = device_add(&rbd_dev->dev); if (ret) goto err_out_mapping; @@ -5405,7 +5414,7 @@ static ssize_t do_rbd_add(struct bus_type *bus, /* parse add command */ rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); if (rc < 0) - goto err_out_module; + goto out; rbdc = rbd_get_client(ceph_opts); if (IS_ERR(rbdc)) { @@ -5460,10 +5469,13 @@ static ssize_t do_rbd_add(struct bus_type *bus, */ rbd_dev_header_unwatch_sync(rbd_dev); rbd_dev_image_release(rbd_dev); - goto err_out_module; + goto out; } - return count; + rc = count; +out: + module_put(THIS_MODULE); + return rc; err_out_rbd_dev: rbd_dev_destroy(rbd_dev); @@ -5472,12 +5484,7 @@ err_out_client: err_out_args: rbd_spec_put(spec); kfree(rbd_opts); -err_out_module: - module_put(THIS_MODULE); - - dout("Error adding device %s\n", buf); - - return (ssize_t)rc; + goto out; } static ssize_t rbd_add(struct bus_type *bus, @@ -5497,12 +5504,11 @@ static ssize_t rbd_add_single_major(struct bus_type *bus, return do_rbd_add(bus, buf, count); } -static void rbd_dev_device_release(struct device *dev) +static void rbd_dev_device_release(struct rbd_device *rbd_dev) { - struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - rbd_free_disk(rbd_dev); clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); + device_del(&rbd_dev->dev); rbd_dev_mapping_clear(rbd_dev); if (!single_major) unregister_blkdev(rbd_dev->major, rbd_dev->name); @@ -5592,9 +5598,8 @@ static ssize_t do_rbd_remove(struct bus_type *bus, * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting * in a potential use after free of rbd_dev->disk or rbd_dev. */ - rbd_bus_del_dev(rbd_dev); + rbd_dev_device_release(rbd_dev); rbd_dev_image_release(rbd_dev); - module_put(THIS_MODULE); return count; } -- cgit v0.10.2 From 6cac4695f2042a1d0e17aa48c5705f69907e74c3 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 16 Oct 2015 20:11:25 +0200 Subject: rbd: set device_type::release instead of device::release No point in providing an empty device_type::release callback and then setting device::release for each rbd_dev dynamically. Signed-off-by: Ilya Dryomov diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index fd7bd87..6eec200 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -3989,14 +3989,12 @@ static const struct attribute_group *rbd_attr_groups[] = { NULL }; -static void rbd_sysfs_dev_release(struct device *dev) -{ -} +static void rbd_dev_release(struct device *dev); static struct device_type rbd_device_type = { .name = "rbd", .groups = rbd_attr_groups, - .release = rbd_sysfs_dev_release, + .release = rbd_dev_release, }; static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) @@ -4077,7 +4075,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.type = &rbd_device_type; rbd_dev->dev.parent = &rbd_root_dev; - rbd_dev->dev.release = rbd_dev_release; device_initialize(&rbd_dev->dev); rbd_dev->rbd_client = rbdc; -- cgit v0.10.2 From 4afb04c0c88e21f37e5ef4776e432907d7b12838 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 22 Oct 2015 16:44:15 +0200 Subject: rbd: remove duplicate calls to rbd_dev_mapping_clear() Commit d1cf5788450e ("rbd: set mapping info earlier") defined rbd_dev_mapping_clear(), but, just a few days after, commit f35a4dee14c3 ("rbd: set the mapping size and features later") moved rbd_dev_mapping_set() calls and added another rbd_dev_mapping_clear() call instead of moving the old one. Around the same time, another duplicate was introduced in rbd_dev_device_release() - kill both. Signed-off-by: Ilya Dryomov diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 6eec200..235708c 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -5254,8 +5254,6 @@ err_out_blkdev: unregister_blkdev(rbd_dev->major, rbd_dev->name); err_out_id: rbd_dev_id_put(rbd_dev); - rbd_dev_mapping_clear(rbd_dev); - return ret; } @@ -5510,7 +5508,6 @@ static void rbd_dev_device_release(struct rbd_device *rbd_dev) if (!single_major) unregister_blkdev(rbd_dev->major, rbd_dev->name); rbd_dev_id_put(rbd_dev); - rbd_dev_mapping_clear(rbd_dev); } static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) -- cgit v0.10.2 From 5e804ac4824302efc3038e086cb21f2e93ab8900 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Mon, 26 Oct 2015 16:08:43 +0800 Subject: ceph: don't invalidate page cache when inode is no longer used ceph_check_caps() invalidate page cache when inode is not used by any open file. This behaviour is not friendly for workload that repeatly read files. Signed-off-by: Yan, Zheng diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 27b5668..3493153 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1655,9 +1655,8 @@ retry_locked: !S_ISDIR(inode->i_mode) && /* ignore readdir cache */ ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ inode->i_data.nrpages && /* have cached pages */ - (file_wanted == 0 || /* no open files */ - (revoking & (CEPH_CAP_FILE_CACHE| - CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */ + (revoking & (CEPH_CAP_FILE_CACHE| + CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */ !tried_invalidate) { dout("check_caps trying to invalidate on %p\n", inode); if (try_nonblocking_invalidate(inode) < 0) { diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index fe2c982..a7a967a 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1430,6 +1430,13 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) if ((used | wanted) & CEPH_CAP_ANY_WR) goto out; } + /* The inode has cached pages, but it's no longer used. + * we can safely drop it */ + if (wanted == 0 && used == CEPH_CAP_FILE_CACHE && + !(oissued & CEPH_CAP_FILE_CACHE)) { + used = 0; + oissued = 0; + } if ((used | wanted) & ~oissued & mine) goto out; /* we need these caps */ @@ -1438,7 +1445,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) /* we aren't the only cap.. just remove us */ __ceph_remove_cap(cap, true); } else { - /* try to drop referring dentries */ + /* try dropping referring dentries */ spin_unlock(&ci->i_ceph_lock); d_prune_aliases(inode); dout("trim_caps_cb %p cap %p pruned, count now %d\n", -- cgit v0.10.2 From cbf99a11fb14db0835acd79ecd7469d37e398660 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 26 Oct 2015 11:03:46 +0100 Subject: libceph: introduce ceph_x_authorizer_cleanup() Commit ae385eaf24dc ("libceph: store session key in cephx authorizer") introduced ceph_x_authorizer::session_key, but didn't update all the exit/error paths. Introduce ceph_x_authorizer_cleanup() to encapsulate ceph_x_authorizer cleanup and switch to it. This fixes ceph_x_destroy(), which currently always leaks key and ceph_x_build_authorizer() error paths. Signed-off-by: Ilya Dryomov Reviewed-by: Yan, Zheng diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c index ba6eb17..65054fd 100644 --- a/net/ceph/auth_x.c +++ b/net/ceph/auth_x.c @@ -279,6 +279,15 @@ bad: return -EINVAL; } +static void ceph_x_authorizer_cleanup(struct ceph_x_authorizer *au) +{ + ceph_crypto_key_destroy(&au->session_key); + if (au->buf) { + ceph_buffer_put(au->buf); + au->buf = NULL; + } +} + static int ceph_x_build_authorizer(struct ceph_auth_client *ac, struct ceph_x_ticket_handler *th, struct ceph_x_authorizer *au) @@ -297,7 +306,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, ceph_crypto_key_destroy(&au->session_key); ret = ceph_crypto_key_clone(&au->session_key, &th->session_key); if (ret) - return ret; + goto out_au; maxlen = sizeof(*msg_a) + sizeof(msg_b) + ceph_x_encrypt_buflen(ticket_blob_len); @@ -309,8 +318,8 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, if (!au->buf) { au->buf = ceph_buffer_new(maxlen, GFP_NOFS); if (!au->buf) { - ceph_crypto_key_destroy(&au->session_key); - return -ENOMEM; + ret = -ENOMEM; + goto out_au; } } au->service = th->service; @@ -340,7 +349,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, ret = ceph_x_encrypt(&au->session_key, &msg_b, sizeof(msg_b), p, end - p); if (ret < 0) - goto out_buf; + goto out_au; p += ret; au->buf->vec.iov_len = p - au->buf->vec.iov_base; dout(" built authorizer nonce %llx len %d\n", au->nonce, @@ -348,9 +357,8 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, BUG_ON(au->buf->vec.iov_len > maxlen); return 0; -out_buf: - ceph_buffer_put(au->buf); - au->buf = NULL; +out_au: + ceph_x_authorizer_cleanup(au); return ret; } @@ -624,8 +632,7 @@ static void ceph_x_destroy_authorizer(struct ceph_auth_client *ac, { struct ceph_x_authorizer *au = (void *)a; - ceph_crypto_key_destroy(&au->session_key); - ceph_buffer_put(au->buf); + ceph_x_authorizer_cleanup(au); kfree(au); } @@ -653,8 +660,7 @@ static void ceph_x_destroy(struct ceph_auth_client *ac) remove_ticket_handler(ac, th); } - if (xi->auth_authorizer.buf) - ceph_buffer_put(xi->auth_authorizer.buf); + ceph_x_authorizer_cleanup(&xi->auth_authorizer); kfree(ac->private); ac->private = NULL; diff --git a/net/ceph/crypto.h b/net/ceph/crypto.h index d149822..2e9cab0 100644 --- a/net/ceph/crypto.h +++ b/net/ceph/crypto.h @@ -16,8 +16,10 @@ struct ceph_crypto_key { static inline void ceph_crypto_key_destroy(struct ceph_crypto_key *key) { - if (key) + if (key) { kfree(key->key); + key->key = NULL; + } } int ceph_crypto_key_clone(struct ceph_crypto_key *dst, -- cgit v0.10.2 From 4c06ace81a60636dec358c288ef6aaf3aa6dc599 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 27 Oct 2015 17:18:00 +0800 Subject: ceph: add request to i_unsafe_dirops when getting unsafe reply Previously we add request to i_unsafe_dirops when registering request. So ceph_fsync() also waits for imcomplete requests. This is unnecessary, ceph_fsync() only needs to wait unsafe requests. Signed-off-by: Yan, Zheng diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index a7a967a..1e47a3d 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -633,13 +633,8 @@ static void __register_request(struct ceph_mds_client *mdsc, mdsc->oldest_tid = req->r_tid; if (dir) { - struct ceph_inode_info *ci = ceph_inode(dir); - ihold(dir); - spin_lock(&ci->i_unsafe_lock); req->r_unsafe_dir = dir; - list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); - spin_unlock(&ci->i_unsafe_lock); } } @@ -665,13 +660,14 @@ static void __unregister_request(struct ceph_mds_client *mdsc, rb_erase(&req->r_node, &mdsc->request_tree); RB_CLEAR_NODE(&req->r_node); - if (req->r_unsafe_dir) { + if (req->r_unsafe_dir && req->r_got_unsafe) { struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); - spin_lock(&ci->i_unsafe_lock); list_del_init(&req->r_unsafe_dir_item); spin_unlock(&ci->i_unsafe_lock); + } + if (req->r_unsafe_dir) { iput(req->r_unsafe_dir); req->r_unsafe_dir = NULL; } @@ -2484,6 +2480,14 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) } else { req->r_got_unsafe = true; list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); + if (req->r_unsafe_dir) { + struct ceph_inode_info *ci = + ceph_inode(req->r_unsafe_dir); + spin_lock(&ci->i_unsafe_lock); + list_add_tail(&req->r_unsafe_dir_item, + &ci->i_unsafe_dirops); + spin_unlock(&ci->i_unsafe_lock); + } } dout("handle_reply tid %lld result %d\n", tid, result); -- cgit v0.10.2 From 68cd5b4b7612c2956d8553dfb39490b29f32566d Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 27 Oct 2015 18:36:06 +0800 Subject: ceph: make fsync() wait unsafe requests that created/modified inode If we get a unsafe reply for request that created/modified inode, add the unsafe request to a list in the newly created/modified inode. So we can make fsync() wait these unsafe requests. Signed-off-by: Yan, Zheng diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 3493153..c69e125 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1970,49 +1970,46 @@ out: } /* - * wait for any uncommitted directory operations to commit. + * wait for any unsafe requests to complete. */ -static int unsafe_dirop_wait(struct inode *inode) +static int unsafe_request_wait(struct inode *inode) { struct ceph_inode_info *ci = ceph_inode(inode); - struct list_head *head = &ci->i_unsafe_dirops; - struct ceph_mds_request *req; - u64 last_tid; - int ret = 0; - - if (!S_ISDIR(inode->i_mode)) - return 0; + struct ceph_mds_request *req1 = NULL, *req2 = NULL; + int ret, err = 0; spin_lock(&ci->i_unsafe_lock); - if (list_empty(head)) - goto out; - - req = list_last_entry(head, struct ceph_mds_request, - r_unsafe_dir_item); - last_tid = req->r_tid; - - do { - ceph_mdsc_get_request(req); - spin_unlock(&ci->i_unsafe_lock); + if (S_ISDIR(inode->i_mode) && !list_empty(&ci->i_unsafe_dirops)) { + req1 = list_last_entry(&ci->i_unsafe_dirops, + struct ceph_mds_request, + r_unsafe_dir_item); + ceph_mdsc_get_request(req1); + } + if (!list_empty(&ci->i_unsafe_iops)) { + req2 = list_last_entry(&ci->i_unsafe_iops, + struct ceph_mds_request, + r_unsafe_target_item); + ceph_mdsc_get_request(req2); + } + spin_unlock(&ci->i_unsafe_lock); - dout("unsafe_dirop_wait %p wait on tid %llu (until %llu)\n", - inode, req->r_tid, last_tid); - ret = !wait_for_completion_timeout(&req->r_safe_completion, - ceph_timeout_jiffies(req->r_timeout)); + dout("unsafe_requeset_wait %p wait on tid %llu %llu\n", + inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL); + if (req1) { + ret = !wait_for_completion_timeout(&req1->r_safe_completion, + ceph_timeout_jiffies(req1->r_timeout)); if (ret) - ret = -EIO; /* timed out */ - - ceph_mdsc_put_request(req); - - spin_lock(&ci->i_unsafe_lock); - if (ret || list_empty(head)) - break; - req = list_first_entry(head, struct ceph_mds_request, - r_unsafe_dir_item); - } while (req->r_tid < last_tid); -out: - spin_unlock(&ci->i_unsafe_lock); - return ret; + err = -EIO; + ceph_mdsc_put_request(req1); + } + if (req2) { + ret = !wait_for_completion_timeout(&req2->r_safe_completion, + ceph_timeout_jiffies(req2->r_timeout)); + if (ret) + err = -EIO; + ceph_mdsc_put_request(req2); + } + return err; } int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) @@ -2038,7 +2035,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) dirty = try_flush_caps(inode, &flush_tid); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); - ret = unsafe_dirop_wait(inode); + ret = unsafe_request_wait(inode); /* * only wait on non-file metadata writeback (the mds diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 96d2bd8..498dcfa 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -452,6 +452,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) INIT_LIST_HEAD(&ci->i_unsafe_writes); INIT_LIST_HEAD(&ci->i_unsafe_dirops); + INIT_LIST_HEAD(&ci->i_unsafe_iops); spin_lock_init(&ci->i_unsafe_lock); ci->i_snap_realm = NULL; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 1e47a3d..89838a2 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -666,6 +666,12 @@ static void __unregister_request(struct ceph_mds_client *mdsc, list_del_init(&req->r_unsafe_dir_item); spin_unlock(&ci->i_unsafe_lock); } + if (req->r_target_inode && req->r_got_unsafe) { + struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); + spin_lock(&ci->i_unsafe_lock); + list_del_init(&req->r_unsafe_target_item); + spin_unlock(&ci->i_unsafe_lock); + } if (req->r_unsafe_dir) { iput(req->r_unsafe_dir); @@ -1707,6 +1713,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) req->r_started = jiffies; req->r_resend_mds = -1; INIT_LIST_HEAD(&req->r_unsafe_dir_item); + INIT_LIST_HEAD(&req->r_unsafe_target_item); req->r_fmode = -1; kref_init(&req->r_kref); INIT_LIST_HEAD(&req->r_wait); @@ -2529,6 +2536,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) up_read(&mdsc->snap_rwsem); if (realm) ceph_put_snap_realm(mdsc, realm); + + if (err == 0 && req->r_got_unsafe && req->r_target_inode) { + struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); + spin_lock(&ci->i_unsafe_lock); + list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops); + spin_unlock(&ci->i_unsafe_lock); + } out_err: mutex_lock(&mdsc->mutex); if (!req->r_aborted) { diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index f575eaf..ccf11ef 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -236,6 +236,9 @@ struct ceph_mds_request { struct inode *r_unsafe_dir; struct list_head r_unsafe_dir_item; + /* unsafe requests that modify the target inode */ + struct list_head r_unsafe_target_item; + struct ceph_mds_session *r_session; int r_attempts; /* resend attempts */ diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 2f2460d..75b7d12 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -342,6 +342,7 @@ struct ceph_inode_info { struct list_head i_unsafe_writes; /* uncommitted sync writes */ struct list_head i_unsafe_dirops; /* uncommitted mds dir ops */ + struct list_head i_unsafe_iops; /* uncommitted mds inode ops */ spinlock_t i_unsafe_lock; struct ceph_snap_realm *i_snap_realm; /* snap realm (if caps) */ -- cgit v0.10.2 From 8a703a383dd3458753e0ad71860ed3a5097692b3 Mon Sep 17 00:00:00 2001 From: Ioana Ciornei Date: Thu, 22 Oct 2015 18:06:07 +0300 Subject: libceph: evaluate osd_req_op_data() arguments only once This patch changes the osd_req_op_data() macro to not evaluate arguments more than once in order to follow the kernel coding style. Signed-off-by: Ioana Ciornei Reviewed-by: Alex Elder [idryomov@gmail.com: changelog, formatting] Signed-off-by: Ilya Dryomov diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index a362d7e..191bc21 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -120,11 +120,13 @@ static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, } #endif /* CONFIG_BLOCK */ -#define osd_req_op_data(oreq, whch, typ, fld) \ - ({ \ - BUG_ON(whch >= (oreq)->r_num_ops); \ - &(oreq)->r_ops[whch].typ.fld; \ - }) +#define osd_req_op_data(oreq, whch, typ, fld) \ +({ \ + struct ceph_osd_request *__oreq = (oreq); \ + unsigned int __whch = (whch); \ + BUG_ON(__whch >= __oreq->r_num_ops); \ + &__oreq->r_ops[__whch].typ.fld; \ +}) static struct ceph_osd_data * osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) -- cgit v0.10.2 From 79dbd1baa651cece408e68a1b445f3628c4b5bdc Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 26 Oct 2015 22:23:56 +0100 Subject: libceph: msg signing callouts don't need con argument We can use msg->con instead - at the point we sign an outgoing message or check the signature on the incoming one, msg->con is always set. We wouldn't know how to sign a message without an associated session (i.e. msg->con == NULL) and being able to sign a message using an explicitly provided authorizer is of no use. Signed-off-by: Ilya Dryomov diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 89838a2..e7b130a 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -3942,17 +3942,19 @@ static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, return msg; } -static int sign_message(struct ceph_connection *con, struct ceph_msg *msg) +static int mds_sign_message(struct ceph_msg *msg) { - struct ceph_mds_session *s = con->private; + struct ceph_mds_session *s = msg->con->private; struct ceph_auth_handshake *auth = &s->s_auth; + return ceph_auth_sign_message(auth, msg); } -static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg) +static int mds_check_message_signature(struct ceph_msg *msg) { - struct ceph_mds_session *s = con->private; + struct ceph_mds_session *s = msg->con->private; struct ceph_auth_handshake *auth = &s->s_auth; + return ceph_auth_check_message_signature(auth, msg); } @@ -3965,8 +3967,8 @@ static const struct ceph_connection_operations mds_con_ops = { .invalidate_authorizer = invalidate_authorizer, .peer_reset = peer_reset, .alloc_msg = mds_alloc_msg, - .sign_message = sign_message, - .check_message_signature = check_message_signature, + .sign_message = mds_sign_message, + .check_message_signature = mds_check_message_signature, }; /* eof */ diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index b2371d9..3687ff0 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -43,10 +43,9 @@ struct ceph_connection_operations { struct ceph_msg * (*alloc_msg) (struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip); - int (*sign_message) (struct ceph_connection *con, struct ceph_msg *msg); - int (*check_message_signature) (struct ceph_connection *con, - struct ceph_msg *msg); + int (*sign_message) (struct ceph_msg *msg); + int (*check_message_signature) (struct ceph_msg *msg); }; /* use format string %s%d */ diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index fce6ad63..805f6f8 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1205,7 +1205,7 @@ static void prepare_write_message_footer(struct ceph_connection *con) con->out_kvec[v].iov_base = &m->footer; if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { if (con->ops->sign_message) - con->ops->sign_message(con, m); + con->ops->sign_message(m); else m->footer.sig = 0; con->out_kvec[v].iov_len = sizeof(m->footer); @@ -2422,7 +2422,7 @@ static int read_partial_message(struct ceph_connection *con) } if (need_sign && con->ops->check_message_signature && - con->ops->check_message_signature(con, m)) { + con->ops->check_message_signature(m)) { pr_err("read_partial_message %p signature check failed\n", m); return -EBADMSG; } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 191bc21..118e4ce 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2979,17 +2979,19 @@ static int invalidate_authorizer(struct ceph_connection *con) return ceph_monc_validate_auth(&osdc->client->monc); } -static int sign_message(struct ceph_connection *con, struct ceph_msg *msg) +static int osd_sign_message(struct ceph_msg *msg) { - struct ceph_osd *o = con->private; + struct ceph_osd *o = msg->con->private; struct ceph_auth_handshake *auth = &o->o_auth; + return ceph_auth_sign_message(auth, msg); } -static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg) +static int osd_check_message_signature(struct ceph_msg *msg) { - struct ceph_osd *o = con->private; + struct ceph_osd *o = msg->con->private; struct ceph_auth_handshake *auth = &o->o_auth; + return ceph_auth_check_message_signature(auth, msg); } @@ -3001,7 +3003,7 @@ static const struct ceph_connection_operations osd_con_ops = { .verify_authorizer_reply = verify_authorizer_reply, .invalidate_authorizer = invalidate_authorizer, .alloc_msg = alloc_msg, - .sign_message = sign_message, - .check_message_signature = check_message_signature, + .sign_message = osd_sign_message, + .check_message_signature = osd_check_message_signature, .fault = osd_reset, }; -- cgit v0.10.2 From 4199b8eec36405822619d4176bddfacf7b47eb44 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 27 Oct 2015 16:42:49 +0100 Subject: libceph: drop authorizer check from cephx msg signing routines I don't see a way for auth->authorizer to be NULL in ceph_x_sign_message() or ceph_x_check_message_signature(). Signed-off-by: Ilya Dryomov diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c index 65054fd..3a544ca 100644 --- a/net/ceph/auth_x.c +++ b/net/ceph/auth_x.c @@ -697,8 +697,7 @@ static int ceph_x_sign_message(struct ceph_auth_handshake *auth, struct ceph_msg *msg) { int ret; - if (!auth->authorizer) - return 0; + ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer, msg, &msg->footer.sig); if (ret < 0) @@ -713,8 +712,6 @@ static int ceph_x_check_message_signature(struct ceph_auth_handshake *auth, __le64 sig_check; int ret; - if (!auth->authorizer) - return 0; ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer, msg, &sig_check); if (ret < 0) -- cgit v0.10.2 From 859bff51dc5e92ddfb5eb6f17b8040d9311095bb Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 28 Oct 2015 23:50:58 +0100 Subject: libceph: stop duplicating client fields in messenger supported_features and required_features serve no purpose at all, while nocrc and tcp_nodelay belong to ceph_options::flags. Signed-off-by: Ilya Dryomov diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index 397c5cd..a7caafe 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -137,6 +137,7 @@ struct ceph_client { #endif }; +#define from_msgr(ms) container_of(ms, struct ceph_client, msgr) /* diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 3687ff0..71b1d6c 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -57,8 +57,6 @@ struct ceph_messenger { atomic_t stopping; possible_net_t net; - bool nocrc; - bool tcp_nodelay; /* * the global_seq counts connections i (attempt to) initiate @@ -66,9 +64,6 @@ struct ceph_messenger { */ u32 global_seq; spinlock_t global_seq_lock; - - u64 supported_features; - u64 required_features; }; enum ceph_msg_data_type { @@ -267,11 +262,7 @@ extern void ceph_msgr_exit(void); extern void ceph_msgr_flush(void); extern void ceph_messenger_init(struct ceph_messenger *msgr, - struct ceph_entity_addr *myaddr, - u64 supported_features, - u64 required_features, - bool nocrc, - bool tcp_nodelay); + struct ceph_entity_addr *myaddr); extern void ceph_messenger_fini(struct ceph_messenger *msgr); extern void ceph_con_init(struct ceph_connection *con, void *private, diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 54a00d6..d1494d1 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -596,11 +596,7 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, if (ceph_test_opt(client, MYIP)) myaddr = &client->options->my_addr; - ceph_messenger_init(&client->msgr, myaddr, - client->supported_features, - client->required_features, - ceph_test_opt(client, NOCRC), - ceph_test_opt(client, TCP_NODELAY)); + ceph_messenger_init(&client->msgr, myaddr); /* subsystems */ err = ceph_monc_init(&client->monc, client); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 805f6f8..1110807 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -509,7 +509,7 @@ static int ceph_tcp_connect(struct ceph_connection *con) return ret; } - if (con->msgr->tcp_nodelay) { + if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) { int optval = 1; ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, @@ -1432,7 +1432,8 @@ static int prepare_write_connect(struct ceph_connection *con) dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, con->connect_seq, global_seq, proto); - con->out_connect.features = cpu_to_le64(con->msgr->supported_features); + con->out_connect.features = + cpu_to_le64(from_msgr(con->msgr)->supported_features); con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); con->out_connect.global_seq = cpu_to_le32(global_seq); @@ -1527,7 +1528,7 @@ static int write_partial_message_data(struct ceph_connection *con) { struct ceph_msg *msg = con->out_msg; struct ceph_msg_data_cursor *cursor = &msg->cursor; - bool do_datacrc = !con->msgr->nocrc; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); u32 crc; dout("%s %p msg %p\n", __func__, con, msg); @@ -2005,8 +2006,8 @@ static int process_banner(struct ceph_connection *con) static int process_connect(struct ceph_connection *con) { - u64 sup_feat = con->msgr->supported_features; - u64 req_feat = con->msgr->required_features; + u64 sup_feat = from_msgr(con->msgr)->supported_features; + u64 req_feat = from_msgr(con->msgr)->required_features; u64 server_feat = ceph_sanitize_features( le64_to_cpu(con->in_reply.features)); int ret; @@ -2232,7 +2233,7 @@ static int read_partial_msg_data(struct ceph_connection *con) { struct ceph_msg *msg = con->in_msg; struct ceph_msg_data_cursor *cursor = &msg->cursor; - const bool do_datacrc = !con->msgr->nocrc; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); struct page *page; size_t page_offset; size_t length; @@ -2277,7 +2278,7 @@ static int read_partial_message(struct ceph_connection *con) int end; int ret; unsigned int front_len, middle_len, data_len; - bool do_datacrc = !con->msgr->nocrc; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); u64 seq; u32 crc; @@ -2951,15 +2952,8 @@ static void con_fault(struct ceph_connection *con) * initialize a new messenger instance */ void ceph_messenger_init(struct ceph_messenger *msgr, - struct ceph_entity_addr *myaddr, - u64 supported_features, - u64 required_features, - bool nocrc, - bool tcp_nodelay) + struct ceph_entity_addr *myaddr) { - msgr->supported_features = supported_features; - msgr->required_features = required_features; - spin_lock_init(&msgr->global_seq_lock); if (myaddr) @@ -2969,8 +2963,6 @@ void ceph_messenger_init(struct ceph_messenger *msgr, msgr->inst.addr.type = 0; get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); encode_my_addr(msgr); - msgr->nocrc = nocrc; - msgr->tcp_nodelay = tcp_nodelay; atomic_set(&msgr->stopping, 0); write_pnet(&msgr->net, get_net(current->nsproxy->net_ns)); -- cgit v0.10.2 From a51983e4dd2d4d63912aab939f657c4cd476e21a Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 28 Oct 2015 23:52:06 +0100 Subject: libceph: add nocephx_sign_messages option Support for message signing was merged into 3.19, along with nocephx_require_signatures option. But, all that option does is allow the kernel client to talk to clusters that don't support MSG_AUTH feature bit. That's pretty useless, given that it's been supported since bobtail. Meanwhile, if one disables message signing on the server side with "cephx sign messages = false", it becomes impossible to use the kernel client since it expects messages to be signed if MSG_AUTH was negotiated. Add nocephx_sign_messages option to support this use case. Signed-off-by: Ilya Dryomov diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index a7caafe..3e3799c 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -29,8 +29,9 @@ #define CEPH_OPT_NOSHARE (1<<1) /* don't share client with other sbs */ #define CEPH_OPT_MYIP (1<<2) /* specified my ip */ #define CEPH_OPT_NOCRC (1<<3) /* no data crc on writes */ -#define CEPH_OPT_NOMSGAUTH (1<<4) /* not require cephx message signature */ +#define CEPH_OPT_NOMSGAUTH (1<<4) /* don't require msg signing feat */ #define CEPH_OPT_TCP_NODELAY (1<<5) /* TCP_NODELAY on TCP sockets */ +#define CEPH_OPT_NOMSGSIGN (1<<6) /* don't sign msgs */ #define CEPH_OPT_DEFAULT (CEPH_OPT_TCP_NODELAY) diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c index 3a544ca..10d87753 100644 --- a/net/ceph/auth_x.c +++ b/net/ceph/auth_x.c @@ -8,6 +8,7 @@ #include #include +#include #include #include "crypto.h" @@ -698,6 +699,9 @@ static int ceph_x_sign_message(struct ceph_auth_handshake *auth, { int ret; + if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN)) + return 0; + ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer, msg, &msg->footer.sig); if (ret < 0) @@ -712,6 +716,9 @@ static int ceph_x_check_message_signature(struct ceph_auth_handshake *auth, __le64 sig_check; int ret; + if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN)) + return 0; + ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer, msg, &sig_check); if (ret < 0) diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index d1494d1..6b4d3a1 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -245,6 +245,8 @@ enum { Opt_nocrc, Opt_cephx_require_signatures, Opt_nocephx_require_signatures, + Opt_cephx_sign_messages, + Opt_nocephx_sign_messages, Opt_tcp_nodelay, Opt_notcp_nodelay, }; @@ -267,6 +269,8 @@ static match_table_t opt_tokens = { {Opt_nocrc, "nocrc"}, {Opt_cephx_require_signatures, "cephx_require_signatures"}, {Opt_nocephx_require_signatures, "nocephx_require_signatures"}, + {Opt_cephx_sign_messages, "cephx_sign_messages"}, + {Opt_nocephx_sign_messages, "nocephx_sign_messages"}, {Opt_tcp_nodelay, "tcp_nodelay"}, {Opt_notcp_nodelay, "notcp_nodelay"}, {-1, NULL} @@ -491,6 +495,12 @@ ceph_parse_options(char *options, const char *dev_name, case Opt_nocephx_require_signatures: opt->flags |= CEPH_OPT_NOMSGAUTH; break; + case Opt_cephx_sign_messages: + opt->flags &= ~CEPH_OPT_NOMSGSIGN; + break; + case Opt_nocephx_sign_messages: + opt->flags |= CEPH_OPT_NOMSGSIGN; + break; case Opt_tcp_nodelay: opt->flags |= CEPH_OPT_TCP_NODELAY; @@ -534,6 +544,8 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client) seq_puts(m, "nocrc,"); if (opt->flags & CEPH_OPT_NOMSGAUTH) seq_puts(m, "nocephx_require_signatures,"); + if (opt->flags & CEPH_OPT_NOMSGSIGN) + seq_puts(m, "nocephx_sign_messages,"); if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0) seq_puts(m, "notcp_nodelay,"); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 1110807..0cc5608 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2677,7 +2677,7 @@ more: if (ret <= 0) { switch (ret) { case -EBADMSG: - con->error_msg = "bad crc"; + con->error_msg = "bad crc/signature"; /* fall through */ case -EBADE: ret = -EIO; -- cgit v0.10.2 From 583d0fef756a7615e50f0f68ea0892a497d03971 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 2 Nov 2015 17:13:58 +0100 Subject: libceph: clear msg->con in ceph_msg_release() only The following bit in ceph_msg_revoke_incoming() is unsafe: struct ceph_connection *con = msg->con; if (!con) return; mutex_lock(&con->mutex); con use> There is nothing preventing con from getting destroyed right after msg->con test. One easy way to reproduce this is to disable message signing only on the server side and try to map an image. The system will go into a libceph: read_partial_message ffff880073f0ab68 signature check failed libceph: osd0 192.168.255.155:6801 bad crc/signature libceph: read_partial_message ffff880073f0ab68 signature check failed libceph: osd0 192.168.255.155:6801 bad crc/signature loop which has to be interrupted with Ctrl-C. Hit Ctrl-C and you are likely to end up with a random GP fault if the reset handler executes "within" ceph_msg_revoke_incoming(): ... rbd_obj_request_end ceph_osdc_cancel_request __unregister_request ceph_osdc_put_request ceph_msg_revoke_incoming ... osd_reset __kick_osd_requests __reset_osd remove_osd ceph_con_close reset_connection in_msg->con> put_osd con use> <-- !!! If ceph_msg_revoke_incoming() executes "before" the reset handler, osd/con will be leaked because ceph_msg_revoke_incoming() clears con->in_msg but doesn't put con ref, while reset_connection() only puts con ref if con->in_msg != NULL. The current msg->con scheme was introduced by commits 38941f8031bf ("libceph: have messages point to their connection") and 92ce034b5a74 ("libceph: have messages take a connection reference"), which defined when messages get associated with a connection and when that association goes away. Part of the problem is that this association is supposed to go away in much too many places; closing this race entirely requires either a rework of the existing or an addition of a new layer of synchronization. In lieu of that, we can make it *much* less likely to hit by disassociating messages only on their destruction and resend through a different connection. This makes the code simpler and is probably a good thing to do regardless - this patch adds a msg_con_set() helper which is is called from only three places: ceph_con_send() and ceph_con_in_msg_alloc() to set msg->con and ceph_msg_release() to clear it. Signed-off-by: Ilya Dryomov diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 0cc5608..9981039 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -637,9 +637,6 @@ static int con_close_socket(struct ceph_connection *con) static void ceph_msg_remove(struct ceph_msg *msg) { list_del_init(&msg->list_head); - BUG_ON(msg->con == NULL); - msg->con->ops->put(msg->con); - msg->con = NULL; ceph_msg_put(msg); } @@ -662,15 +659,14 @@ static void reset_connection(struct ceph_connection *con) if (con->in_msg) { BUG_ON(con->in_msg->con != con); - con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; - con->ops->put(con); } con->connect_seq = 0; con->out_seq = 0; if (con->out_msg) { + BUG_ON(con->out_msg->con != con); ceph_msg_put(con->out_msg); con->out_msg = NULL; } @@ -2438,13 +2434,10 @@ static int read_partial_message(struct ceph_connection *con) */ static void process_message(struct ceph_connection *con) { - struct ceph_msg *msg; + struct ceph_msg *msg = con->in_msg; BUG_ON(con->in_msg->con != con); - con->in_msg->con = NULL; - msg = con->in_msg; con->in_msg = NULL; - con->ops->put(con); /* if first message, set peer_name */ if (con->peer_name.type == 0) @@ -2918,10 +2911,8 @@ static void con_fault(struct ceph_connection *con) if (con->in_msg) { BUG_ON(con->in_msg->con != con); - con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; - con->ops->put(con); } /* Requeue anything that hasn't been acked */ @@ -2977,6 +2968,15 @@ void ceph_messenger_fini(struct ceph_messenger *msgr) } EXPORT_SYMBOL(ceph_messenger_fini); +static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con) +{ + if (msg->con) + msg->con->ops->put(msg->con); + + msg->con = con ? con->ops->get(con) : NULL; + BUG_ON(msg->con != con); +} + static void clear_standby(struct ceph_connection *con) { /* come back from STANDBY? */ @@ -3008,9 +3008,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) return; } - BUG_ON(msg->con != NULL); - msg->con = con->ops->get(con); - BUG_ON(msg->con == NULL); + msg_con_set(msg, con); BUG_ON(!list_empty(&msg->list_head)); list_add_tail(&msg->list_head, &con->out_queue); @@ -3038,16 +3036,15 @@ void ceph_msg_revoke(struct ceph_msg *msg) { struct ceph_connection *con = msg->con; - if (!con) + if (!con) { + dout("%s msg %p null con\n", __func__, msg); return; /* Message not in our possession */ + } mutex_lock(&con->mutex); if (!list_empty(&msg->list_head)) { dout("%s %p msg %p - was on queue\n", __func__, con, msg); list_del_init(&msg->list_head); - BUG_ON(msg->con == NULL); - msg->con->ops->put(msg->con); - msg->con = NULL; msg->hdr.seq = 0; ceph_msg_put(msg); @@ -3071,16 +3068,13 @@ void ceph_msg_revoke(struct ceph_msg *msg) */ void ceph_msg_revoke_incoming(struct ceph_msg *msg) { - struct ceph_connection *con; + struct ceph_connection *con = msg->con; - BUG_ON(msg == NULL); - if (!msg->con) { + if (!con) { dout("%s msg %p null con\n", __func__, msg); - return; /* Message not in our possession */ } - con = msg->con; mutex_lock(&con->mutex); if (con->in_msg == msg) { unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); @@ -3326,9 +3320,8 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) } if (msg) { BUG_ON(*skip); + msg_con_set(msg, con); con->in_msg = msg; - con->in_msg->con = con->ops->get(con); - BUG_ON(con->in_msg->con == NULL); } else { /* * Null message pointer means either we should skip @@ -3375,6 +3368,8 @@ static void ceph_msg_release(struct kref *kref) dout("%s %p\n", __func__, m); WARN_ON(!list_empty(&m->list_head)); + msg_con_set(m, NULL); + /* drop middle, data, if any */ if (m->middle) { ceph_buffer_put(m->middle); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 118e4ce..f8f2359 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2850,9 +2850,6 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, goto out; } - if (req->r_reply->con) - dout("%s revoking msg %p from old con %p\n", __func__, - req->r_reply, req->r_reply->con); ceph_msg_revoke_incoming(req->r_reply); if (front_len > req->r_reply->front_alloc_len) { -- cgit v0.10.2