From 9d521470a40f16110bd31018034155c60c1a1275 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 31 Jan 2014 17:54:26 +0200 Subject: libceph: a per-osdc crush scratch buffer With the addition of erasure coding support in the future, scratch variable-length array in crush_do_rule_ary() is going to grow to at least 200 bytes on average, on top of another 128 bytes consumed by rawosd/osd arrays in the call chain. Replace it with a buffer inside struct osdmap and a mutex. This shouldn't result in any contention, because all osd requests were already serialized by request_mutex at that point; the only unlocked caller was ceph_ioctl_get_dataloc(). Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index 49ff69f..8c8b3ce 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -84,6 +84,9 @@ struct ceph_osdmap { /* the CRUSH map specifies the mapping of placement groups to * the list of osds that store+replicate them. */ struct crush_map *crush; + + struct mutex crush_scratch_mutex; + int crush_scratch_ary[CEPH_PG_MAX_SIZE * 3]; }; static inline void ceph_oid_set_name(struct ceph_object_id *oid, diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index aade4a5..9d1aaa2 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -698,7 +698,9 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) map = kzalloc(sizeof(*map), GFP_NOFS); if (map == NULL) return ERR_PTR(-ENOMEM); + map->pg_temp = RB_ROOT; + mutex_init(&map->crush_scratch_mutex); ceph_decode_16_safe(p, end, version, bad); if (version > 6) { @@ -1142,14 +1144,20 @@ int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap, } EXPORT_SYMBOL(ceph_oloc_oid_to_pg); -static int crush_do_rule_ary(const struct crush_map *map, int ruleno, int x, - int *result, int result_max, - const __u32 *weight, int weight_max) +static int do_crush(struct ceph_osdmap *map, int ruleno, int x, + int *result, int result_max, + const __u32 *weight, int weight_max) { - int scratch[result_max * 3]; + int r; + + BUG_ON(result_max > CEPH_PG_MAX_SIZE); + + mutex_lock(&map->crush_scratch_mutex); + r = crush_do_rule(map->crush, ruleno, x, result, result_max, + weight, weight_max, map->crush_scratch_ary); + mutex_unlock(&map->crush_scratch_mutex); - return crush_do_rule(map, ruleno, x, result, result_max, - weight, weight_max, scratch); + return r; } /* @@ -1205,9 +1213,8 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, pool->pgp_num_mask) + (unsigned)pgid.pool; } - r = crush_do_rule_ary(osdmap->crush, ruleno, pps, - osds, min_t(int, pool->size, *num), - osdmap->osd_weight, osdmap->max_osd); + r = do_crush(osdmap, ruleno, pps, osds, min_t(int, pool->size, *num), + osdmap->osd_weight, osdmap->max_osd); if (r < 0) { pr_err("error %d from crush rule: pool %lld ruleset %d type %d" " size %d\n", r, pgid.pool, pool->crush_ruleset, -- cgit v0.10.2 From 62054da65c626dd603190c16805f92cf2cf47d4c Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 4 Mar 2014 11:57:17 +0200 Subject: rbd: remove out_partial label in rbd_img_request_fill() Commit 03507db631c94 ("rbd: fix buffer size for writes to images with snapshots") moved the call to rbd_img_obj_request_add() up, making the out_partial label bogus. Remove it. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 34898d5..43b9814 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -2190,6 +2190,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, rbd_segment_name_free(object_name); if (!obj_request) goto out_unwind; + /* * set obj_request->img_request before creating the * osd_request so that it gets the right snapc @@ -2207,7 +2208,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, clone_size, GFP_ATOMIC); if (!obj_request->bio_list) - goto out_partial; + goto out_unwind; } else { unsigned int page_count; @@ -2222,7 +2223,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, osd_req = rbd_osd_req_create(rbd_dev, write_request, obj_request); if (!osd_req) - goto out_partial; + goto out_unwind; obj_request->osd_req = osd_req; obj_request->callback = rbd_img_obj_callback; @@ -2249,8 +2250,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, return 0; -out_partial: - rbd_obj_request_put(obj_request); out_unwind: for_each_obj_request_safe(img_request, obj_request, next_obj_request) rbd_obj_request_put(obj_request); -- cgit v0.10.2 From 42dd037c08c7cd6e3e9af7824b0c1d063f838885 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 4 Mar 2014 11:57:17 +0200 Subject: rbd: fix error paths in rbd_img_request_fill() Doing rbd_obj_request_put() in rbd_img_request_fill() error paths is not only insufficient, but also triggers an rbd_assert() in rbd_obj_request_destroy(): Assertion failure in rbd_obj_request_destroy() at line 1867: rbd_assert(obj_request->img_request == NULL); rbd_img_obj_request_add() adds obj_requests to the img_request, the opposite is rbd_img_obj_request_del(). Use it. Fixes: http://tracker.ceph.com/issues/7327 Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 43b9814..b55b781 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -2252,7 +2252,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, out_unwind: for_each_obj_request_safe(img_request, obj_request, next_obj_request) - rbd_obj_request_put(obj_request); + rbd_img_obj_request_del(img_request, obj_request); return -ENOMEM; } -- cgit v0.10.2 From 7b25bf5f02c5c80adf96120e031dc3a1756ce54d Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 25 Feb 2014 16:22:26 +0200 Subject: libceph: encode CEPH_OSD_OP_FLAG_* op flags Encode ceph_osd_op::flags field so that it gets sent over the wire. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil Reviewed-by: Alex Elder diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index fd47e87..e94f5da 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -76,6 +76,7 @@ struct ceph_osd_data { struct ceph_osd_req_op { u16 op; /* CEPH_OSD_OP_* */ + u32 flags; /* CEPH_OSD_OP_FLAG_* */ u32 payload_len; union { struct ceph_osd_data raw_data_in; diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h index 96292df..8f9bf45 100644 --- a/include/linux/ceph/rados.h +++ b/include/linux/ceph/rados.h @@ -382,7 +382,7 @@ enum { */ struct ceph_osd_op { __le16 op; /* CEPH_OSD_OP_* */ - __le32 flags; /* CEPH_OSD_FLAG_* */ + __le32 flags; /* CEPH_OSD_OP_FLAG_* */ union { struct { __le64 offset, length; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 0676f2b..5d7fd0b 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -688,7 +688,9 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, return 0; } + dst->op = cpu_to_le16(src->op); + dst->flags = cpu_to_le32(src->flags); dst->payload_len = cpu_to_le32(src->payload_len); return request_data_len; -- cgit v0.10.2 From c647b8a8c6366f849c2a237bfe525cb1d316d5f4 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 25 Feb 2014 16:22:27 +0200 Subject: libceph: add support for CEPH_OSD_OP_SETALLOCHINT osd op This is primarily for rbd's benefit and is supposed to combat fragmentation: "... knowing that rbd images have a 4m size, librbd can pass a hint that will let the osd do the xfs allocation size ioctl on new files so that they are allocated in 1m or 4m chunks. We've seen cases where users with rbd workloads have very high levels of fragmentation in xfs and this would mitigate that and probably have a pretty nice performance benefit." SETALLOCHINT is considered advisory, so our backwards compatibility mechanism here is to set FAILOK flag for all SETALLOCHINT ops. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil Reviewed-by: Alex Elder diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index e94f5da..c42d1ad 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -103,6 +103,10 @@ struct ceph_osd_req_op { u32 timeout; __u8 flag; } watch; + struct { + u64 expected_object_size; + u64 expected_write_size; + } alloc_hint; }; }; @@ -294,6 +298,10 @@ extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req, extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, u64 cookie, u64 version, int flag); +extern void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, + unsigned int which, + u64 expected_object_size, + u64 expected_write_size); extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, struct ceph_snap_context *snapc, diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h index 8f9bf45..2caabef 100644 --- a/include/linux/ceph/rados.h +++ b/include/linux/ceph/rados.h @@ -227,6 +227,9 @@ enum { CEPH_OSD_OP_OMAPRMKEYS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 24, CEPH_OSD_OP_OMAP_CMP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 25, + /* hints */ + CEPH_OSD_OP_SETALLOCHINT = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 35, + /** multi **/ CEPH_OSD_OP_CLONERANGE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_MULTI | 1, CEPH_OSD_OP_ASSERT_SRC_VERSION = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_MULTI | 2, @@ -416,6 +419,10 @@ struct ceph_osd_op { __le64 offset, length; __le64 src_offset; } __attribute__ ((packed)) clonerange; + struct { + __le64 expected_object_size; + __le64 expected_write_size; + } __attribute__ ((packed)) alloc_hint; }; __le32 payload_len; } __attribute__ ((packed)); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 5d7fd0b..71830d7 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -436,6 +436,7 @@ static bool osd_req_opcode_valid(u16 opcode) case CEPH_OSD_OP_OMAPCLEAR: case CEPH_OSD_OP_OMAPRMKEYS: case CEPH_OSD_OP_OMAP_CMP: + case CEPH_OSD_OP_SETALLOCHINT: case CEPH_OSD_OP_CLONERANGE: case CEPH_OSD_OP_ASSERT_SRC_VERSION: case CEPH_OSD_OP_SRC_CMPXATTR: @@ -591,6 +592,26 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req, } EXPORT_SYMBOL(osd_req_op_watch_init); +void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, + unsigned int which, + u64 expected_object_size, + u64 expected_write_size) +{ + struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, + CEPH_OSD_OP_SETALLOCHINT); + + op->alloc_hint.expected_object_size = expected_object_size; + op->alloc_hint.expected_write_size = expected_write_size; + + /* + * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed + * not worth a feature bit. Set FAILOK per-op flag to make + * sure older osds don't trip over an unsupported opcode. + */ + op->flags |= CEPH_OSD_OP_FLAG_FAILOK; +} +EXPORT_SYMBOL(osd_req_op_alloc_hint_init); + static void ceph_osdc_msg_data_add(struct ceph_msg *msg, struct ceph_osd_data *osd_data) { @@ -681,6 +702,12 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, dst->watch.ver = cpu_to_le64(src->watch.ver); dst->watch.flag = src->watch.flag; break; + case CEPH_OSD_OP_SETALLOCHINT: + dst->alloc_hint.expected_object_size = + cpu_to_le64(src->alloc_hint.expected_object_size); + dst->alloc_hint.expected_write_size = + cpu_to_le64(src->alloc_hint.expected_write_size); + break; default: pr_err("unsupported osd opcode %s\n", ceph_osd_op_name(src->op)); -- cgit v0.10.2 From 7cc69d42e6950404587bef9489a5ed6f9f6bab4e Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 25 Feb 2014 16:22:27 +0200 Subject: libceph: bump CEPH_OSD_MAX_OP to 3 Our longest osd request now contains 3 ops: copyup+hint+write. Also, CEPH_OSD_MAX_OP value in a BUG_ON in rbd_osd_req_callback() was hard-coded to 2. Fix it, and switch to rbd_assert while at it. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil Reviewed-by: Alex Elder diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index b55b781..4c612c4 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1654,7 +1654,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, if (osd_req->r_result < 0) obj_request->result = osd_req->r_result; - BUG_ON(osd_req->r_num_ops > 2); + rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP); /* * We support a 64-bit length, but ultimately it has to be diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index c42d1ad..94ec696 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -43,7 +43,7 @@ struct ceph_osd { }; -#define CEPH_OSD_MAX_OP 2 +#define CEPH_OSD_MAX_OP 3 enum ceph_osd_data_type { CEPH_OSD_DATA_TYPE_NONE = 0, -- cgit v0.10.2 From deb236b300cea3e7a114115571194b9872dbdfd1 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 25 Feb 2014 16:22:27 +0200 Subject: rbd: num_ops parameter for rbd_osd_req_create() In preparation for prefixing rbd writes with an allocation hint introduce a num_ops parameter for rbd_osd_req_create(). The rationale is that not every write request is a write op that needs to be prefixed (e.g. watch op), so the num_ops logic needs to be in the callers. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil Reviewed-by: Alex Elder diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 4c612c4..42ecae1 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1718,6 +1718,7 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) static struct ceph_osd_request *rbd_osd_req_create( struct rbd_device *rbd_dev, bool write_request, + unsigned int num_ops, struct rbd_obj_request *obj_request) { struct ceph_snap_context *snapc = NULL; @@ -1733,10 +1734,13 @@ static struct ceph_osd_request *rbd_osd_req_create( snapc = img_request->snapc; } - /* Allocate and initialize the request, for the single op */ + rbd_assert(num_ops == 1); + + /* Allocate and initialize the request, for the num_ops ops */ osdc = &rbd_dev->rbd_client->client->osdc; - osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC); + osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, + GFP_ATOMIC); if (!osd_req) return NULL; /* ENOMEM */ @@ -2220,8 +2224,8 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, pages += page_count; } - osd_req = rbd_osd_req_create(rbd_dev, write_request, - obj_request); + osd_req = rbd_osd_req_create(rbd_dev, write_request, 1, + obj_request); if (!osd_req) goto out_unwind; obj_request->osd_req = osd_req; @@ -2602,8 +2606,8 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) rbd_assert(obj_request->img_request); rbd_dev = obj_request->img_request->rbd_dev; - stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, - stat_request); + stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1, + stat_request); if (!stat_request->osd_req) goto out; stat_request->callback = rbd_img_obj_exists_callback; @@ -2806,7 +2810,8 @@ static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id) return -ENOMEM; ret = -ENOMEM; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1, + obj_request); if (!obj_request->osd_req) goto out; @@ -2869,7 +2874,8 @@ static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start) if (!obj_request) goto out_cancel; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request); + obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1, + obj_request); if (!obj_request->osd_req) goto out_cancel; @@ -2977,7 +2983,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, obj_request->pages = pages; obj_request->page_count = page_count; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1, + obj_request); if (!obj_request->osd_req) goto out; @@ -3210,7 +3217,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, obj_request->pages = pages; obj_request->page_count = page_count; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1, + obj_request); if (!obj_request->osd_req) goto out; -- cgit v0.10.2 From 0ccd59266973047770d5160318561c9189b79c93 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 25 Feb 2014 16:22:28 +0200 Subject: rbd: prefix rbd writes with CEPH_OSD_OP_SETALLOCHINT osd op In an effort to reduce fragmentation, prefix every rbd write with a CEPH_OSD_OP_SETALLOCHINT osd op with an expected_write_size value set to the object size (1 << order). Backwards compatibility is taken care of on the libceph/osd side. "The CEPH_OSD_OP_SETALLOCHINT hint is durable, in that it's enough to do it once. The reason every rbd write is prefixed is that rbd doesn't explicitly create objects and relies on writes creating them implicitly, so there is no place to stick a single hint op into. To get around that we decided to prefix every rbd write with a hint (just like write and setattr ops, hint op will create an object implicitly if it doesn't exist)." Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil Reviewed-by: Alex Elder diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 42ecae1..4c95b50 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1662,11 +1662,15 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, */ obj_request->xferred = osd_req->r_reply_op_len[0]; rbd_assert(obj_request->xferred < (u64)UINT_MAX); + opcode = osd_req->r_ops[0].op; switch (opcode) { case CEPH_OSD_OP_READ: rbd_osd_read_callback(obj_request); break; + case CEPH_OSD_OP_SETALLOCHINT: + rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE); + /* fall through */ case CEPH_OSD_OP_WRITE: rbd_osd_write_callback(obj_request); break; @@ -1715,6 +1719,12 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) snapc, CEPH_NOSNAP, &mtime); } +/* + * Create an osd request. A read request has one osd op (read). + * A write request has either one (watch) or two (hint+write) osd ops. + * (All rbd data writes are prefixed with an allocation hint op, but + * technically osd watch is a write request, hence this distinction.) + */ static struct ceph_osd_request *rbd_osd_req_create( struct rbd_device *rbd_dev, bool write_request, @@ -1734,7 +1744,7 @@ static struct ceph_osd_request *rbd_osd_req_create( snapc = img_request->snapc; } - rbd_assert(num_ops == 1); + rbd_assert(num_ops == 1 || (write_request && num_ops == 2)); /* Allocate and initialize the request, for the num_ops ops */ @@ -1760,8 +1770,8 @@ static struct ceph_osd_request *rbd_osd_req_create( /* * Create a copyup osd request based on the information in the - * object request supplied. A copyup request has two osd ops, - * a copyup method call, and a "normal" write request. + * object request supplied. A copyup request has three osd ops, + * a copyup method call, a hint op, and a write op. */ static struct ceph_osd_request * rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) @@ -1777,12 +1787,12 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) rbd_assert(img_request); rbd_assert(img_request_write_test(img_request)); - /* Allocate and initialize the request, for the two ops */ + /* Allocate and initialize the request, for the three ops */ snapc = img_request->snapc; rbd_dev = img_request->rbd_dev; osdc = &rbd_dev->rbd_client->client->osdc; - osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC); + osd_req = ceph_osdc_alloc_request(osdc, snapc, 3, false, GFP_ATOMIC); if (!osd_req) return NULL; /* ENOMEM */ @@ -2182,6 +2192,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, const char *object_name; u64 offset; u64 length; + unsigned int which = 0; object_name = rbd_segment_name(rbd_dev, img_offset); if (!object_name) @@ -2224,20 +2235,28 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, pages += page_count; } - osd_req = rbd_osd_req_create(rbd_dev, write_request, 1, + osd_req = rbd_osd_req_create(rbd_dev, write_request, + (write_request ? 2 : 1), obj_request); if (!osd_req) goto out_unwind; obj_request->osd_req = osd_req; obj_request->callback = rbd_img_obj_callback; - osd_req_op_extent_init(osd_req, 0, opcode, offset, length, - 0, 0); + if (write_request) { + osd_req_op_alloc_hint_init(osd_req, which, + rbd_obj_bytes(&rbd_dev->header), + rbd_obj_bytes(&rbd_dev->header)); + which++; + } + + osd_req_op_extent_init(osd_req, which, opcode, offset, length, + 0, 0); if (type == OBJ_REQUEST_BIO) - osd_req_op_extent_osd_data_bio(osd_req, 0, + osd_req_op_extent_osd_data_bio(osd_req, which, obj_request->bio_list, length); else - osd_req_op_extent_osd_data_pages(osd_req, 0, + osd_req_op_extent_osd_data_pages(osd_req, which, obj_request->pages, length, offset & ~PAGE_MASK, false, false); @@ -2356,7 +2375,7 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) /* * The original osd request is of no use to use any more. - * We need a new one that can hold the two ops in a copyup + * We need a new one that can hold the three ops in a copyup * request. Allocate the new copyup osd request for the * original request, and release the old one. */ @@ -2375,17 +2394,22 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0, false, false); - /* Then the original write request op */ + /* Then the hint op */ + + osd_req_op_alloc_hint_init(osd_req, 1, rbd_obj_bytes(&rbd_dev->header), + rbd_obj_bytes(&rbd_dev->header)); + + /* And the original write request op */ offset = orig_request->offset; length = orig_request->length; - osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE, + osd_req_op_extent_init(osd_req, 2, CEPH_OSD_OP_WRITE, offset, length, 0, 0); if (orig_request->type == OBJ_REQUEST_BIO) - osd_req_op_extent_osd_data_bio(osd_req, 1, + osd_req_op_extent_osd_data_bio(osd_req, 2, orig_request->bio_list, length); else - osd_req_op_extent_osd_data_pages(osd_req, 1, + osd_req_op_extent_osd_data_pages(osd_req, 2, orig_request->pages, length, offset & ~PAGE_MASK, false, false); -- cgit v0.10.2 From f0494206076703aaa0c8005eff41c413216ae26b Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 27 Feb 2014 16:26:24 +0800 Subject: ceph: fix ceph_dir_llseek() Comparing offset with inode->i_sb->s_maxbytes doesn't make sense for directory. For a fragmented directory, offset (frag_t, off) can be larger than inode->i_sb->s_maxbytes. At the very beginning of ceph_dir_llseek(), local variable old_offset is initialized to parameter offset. This doesn't make sense neither. Old_offset should be ceph_make_fpos(fi->frag, fi->next_offset). Signed-off-by: Yan, Zheng Reviewed-by: Alex Elder diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 45eda6d..a7eaf96 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -190,7 +190,7 @@ more: if (last) { /* remember our position */ fi->dentry = last; - fi->next_offset = di->offset; + fi->next_offset = fpos_off(di->offset); } dput(dentry); return 0; @@ -369,9 +369,9 @@ more: fi->next_offset = 0; off = fi->next_offset; } + fi->frag = frag; fi->offset = fi->next_offset; fi->last_readdir = req; - fi->frag = frag; if (req->r_reply_info.dir_end) { kfree(fi->last_name); @@ -474,7 +474,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) { struct ceph_file_info *fi = file->private_data; struct inode *inode = file->f_mapping->host; - loff_t old_offset = offset; + loff_t old_offset = ceph_make_fpos(fi->frag, fi->next_offset); loff_t retval; mutex_lock(&inode->i_mutex); @@ -491,7 +491,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) goto out; } - if (offset >= 0 && offset <= inode->i_sb->s_maxbytes) { + if (offset >= 0) { if (offset != file->f_pos) { file->f_pos = offset; file->f_version = 0; @@ -504,14 +504,14 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) * seek to new frag, or seek prior to current chunk. */ if (offset == 0 || - fpos_frag(offset) != fpos_frag(old_offset) || + fpos_frag(offset) != fi->frag || fpos_off(offset) < fi->offset) { dout("dir_llseek dropping %p content\n", file); reset_readdir(fi); } /* bump dir_release_count if we did a forward seek */ - if (offset > old_offset) + if (fpos_cmp(offset, old_offset) > 0) fi->dir_release_count--; } out: diff --git a/fs/ceph/super.h b/fs/ceph/super.h index d8801a9..70bb183 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -577,7 +577,7 @@ struct ceph_file_info { /* readdir: position within a frag */ unsigned offset; /* offset of last chunk, adjusted for . and .. */ - u64 next_offset; /* offset of next chunk (last_name's + 1) */ + unsigned next_offset; /* offset of next chunk (last_name's + 1) */ char *last_name; /* last entry in previous chunk */ struct dentry *dentry; /* next dentry (for dcache readdir) */ int dir_release_count; -- cgit v0.10.2 From dcd3cc05e5f230f8fbc0c3369a5d6ad4f1d23aed Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 28 Feb 2014 16:36:09 +0800 Subject: ceph: fix reset_readdir() When changing readdir postion, fi->next_offset should be set to 0 if the new postion is not in the first dirfrag. Signed-off-by: Yan, Zheng Reviewed-by: Alex Elder diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index a7eaf96..8ce8833 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -454,7 +454,7 @@ more: return 0; } -static void reset_readdir(struct ceph_file_info *fi) +static void reset_readdir(struct ceph_file_info *fi, unsigned frag) { if (fi->last_readdir) { ceph_mdsc_put_request(fi->last_readdir); @@ -462,7 +462,10 @@ static void reset_readdir(struct ceph_file_info *fi) } kfree(fi->last_name); fi->last_name = NULL; - fi->next_offset = 2; /* compensate for . and .. */ + if (ceph_frag_is_leftmost(frag)) + fi->next_offset = 2; /* compensate for . and .. */ + else + fi->next_offset = 0; if (fi->dentry) { dput(fi->dentry); fi->dentry = NULL; @@ -507,7 +510,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) fpos_frag(offset) != fi->frag || fpos_off(offset) < fi->offset) { dout("dir_llseek dropping %p content\n", file); - reset_readdir(fi); + reset_readdir(fi, fpos_frag(offset)); } /* bump dir_release_count if we did a forward seek */ -- cgit v0.10.2 From 15289dc85b2d03d42d7e479476254be2b17c65d5 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Mon, 3 Mar 2014 09:20:44 +0800 Subject: ceph: let MDS adjust readdir 'frag' If readdir 'frag' is adjusted, readdir 'offset' should be reset. Otherwise some dentries may be lost when readdir and fragmenting directory happen at the some. Another way to fix this issue is let MDS adjust readdir 'frag'. The code that handles MDS reply reset the readdir 'offset' if the readdir reply is different than the requested one. Signed-off-by: Yan, Zheng diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 8ce8833..e9918a0 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -322,9 +322,6 @@ more: fi->last_readdir = NULL; } - /* requery frag tree, as the frag topology may have changed */ - frag = ceph_choose_frag(ceph_inode(inode), frag, NULL, NULL); - dout("readdir fetching %llx.%llx frag %x offset '%s'\n", ceph_vinop(inode), frag, fi->last_name); req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); -- cgit v0.10.2 From 180061a58c17681dd236e5059ba57fe092dbe368 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 5 Feb 2013 13:36:05 -0800 Subject: ceph: avoid useless ceph_get_dentry_parent_inode() in ceph_rename() This is just old_dir; no reason to abuse the dcache pointers. Reported-by: Al Viro Signed-off-by: Sage Weil Reviewed-by: Yan, Zheng diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index e9918a0..e079737 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -911,10 +911,11 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RENAME, USE_AUTH_MDS); if (IS_ERR(req)) return PTR_ERR(req); + ihold(old_dir); req->r_dentry = dget(new_dentry); req->r_num_caps = 2; req->r_old_dentry = dget(old_dentry); - req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry); + req->r_old_dentry_dir = old_dir; req->r_locked_dir = new_dir; req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL; -- cgit v0.10.2 From 752c8bdcfe88f27a17c5c9264df928fd145a4b30 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 5 Feb 2013 13:52:29 -0800 Subject: ceph: do not chain inode updates to parent fsync The fsync(dirfd) only covers namespace operations, not inode updates. We do not need to cover setattr variants or O_TRUNC. Reported-by: Al Viro Signed-off-by: Sage Weil Reviewed-by: Yan, Zheng diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 09c7afe..c298a7b 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -210,7 +210,7 @@ int ceph_open(struct inode *inode, struct file *file) ihold(inode); req->r_num_caps = 1; - if (flags & (O_CREAT|O_TRUNC)) + if (flags & O_CREAT) parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); err = ceph_mdsc_do_request(mdsc, parent_inode, req); iput(parent_inode); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 32d519d..8bf2384 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1627,7 +1627,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) { struct inode *inode = dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); - struct inode *parent_inode; const unsigned int ia_valid = attr->ia_valid; struct ceph_mds_request *req; struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; @@ -1819,9 +1818,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) req->r_inode_drop = release; req->r_args.setattr.mask = cpu_to_le32(mask); req->r_num_caps = 1; - parent_inode = ceph_get_dentry_parent_inode(dentry); - err = ceph_mdsc_do_request(mdsc, parent_inode, req); - iput(parent_inode); + err = ceph_mdsc_do_request(mdsc, NULL, req); } dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err, ceph_cap_string(dirtied), mask); diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index dc66c9e..efbe082 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -64,7 +64,6 @@ static long __validate_layout(struct ceph_mds_client *mdsc, static long ceph_ioctl_set_layout(struct file *file, void __user *arg) { struct inode *inode = file_inode(file); - struct inode *parent_inode; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_mds_request *req; struct ceph_ioctl_layout l; @@ -121,9 +120,7 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) cpu_to_le32(l.object_size); req->r_args.setlayout.layout.fl_pg_pool = cpu_to_le32(l.data_pool); - parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); - err = ceph_mdsc_do_request(mdsc, parent_inode, req); - iput(parent_inode); + err = ceph_mdsc_do_request(mdsc, NULL, req); ceph_mdsc_put_request(req); return err; } diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index a55ec37..2dbd668 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -842,7 +842,6 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name, struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); struct inode *inode = dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); - struct inode *parent_inode; struct ceph_mds_request *req; struct ceph_mds_client *mdsc = fsc->mdsc; int err; @@ -893,9 +892,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name, req->r_data_len = size; dout("xattr.ver (before): %lld\n", ci->i_xattrs.version); - parent_inode = ceph_get_dentry_parent_inode(dentry); - err = ceph_mdsc_do_request(mdsc, parent_inode, req); - iput(parent_inode); + err = ceph_mdsc_do_request(mdsc, NULL, req); ceph_mdsc_put_request(req); dout("xattr.ver (after): %lld\n", ci->i_xattrs.version); @@ -1019,7 +1016,6 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name) struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); struct ceph_mds_client *mdsc = fsc->mdsc; struct inode *inode = dentry->d_inode; - struct inode *parent_inode; struct ceph_mds_request *req; int err; @@ -1033,9 +1029,7 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name) req->r_num_caps = 1; req->r_path2 = kstrdup(name, GFP_NOFS); - parent_inode = ceph_get_dentry_parent_inode(dentry); - err = ceph_mdsc_do_request(mdsc, parent_inode, req); - iput(parent_inode); + err = ceph_mdsc_do_request(mdsc, NULL, req); ceph_mdsc_put_request(req); return err; } -- cgit v0.10.2 From 844d87c3329980e2b1849cf53205d7fa965d8995 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 5 Feb 2013 13:40:09 -0800 Subject: ceph: do not assume r_old_dentry[_dir] always set together Do not assume that r_old_dentry implies that r_old_dentry_dir is also true. Separate out the ref cleanup and make the debugs dump behave when it is NULL. Signed-off-by: Sage Weil Reviewed-by: Yan, Zheng diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index 6d59006..8c6f313 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -102,7 +102,8 @@ static int mdsc_show(struct seq_file *s, void *p) path = NULL; spin_lock(&req->r_old_dentry->d_lock); seq_printf(s, " #%llx/%.*s (%s)", - ceph_ino(req->r_old_dentry_dir), + req->r_old_dentry_dir ? + ceph_ino(req->r_old_dentry_dir) : 0, req->r_old_dentry->d_name.len, req->r_old_dentry->d_name.name, path ? path : ""); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index f4f050a..f260bd8 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -528,7 +528,9 @@ void ceph_mdsc_release_request(struct kref *kref) iput(req->r_target_inode); if (req->r_dentry) dput(req->r_dentry); - if (req->r_old_dentry) { + if (req->r_old_dentry) + dput(req->r_old_dentry); + if (req->r_old_dentry_dir) { /* * track (and drop pins for) r_old_dentry_dir * separately, since r_old_dentry's d_parent may have @@ -537,7 +539,6 @@ void ceph_mdsc_release_request(struct kref *kref) */ ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), CEPH_CAP_PIN); - dput(req->r_old_dentry); iput(req->r_old_dentry_dir); } kfree(req->r_path1); @@ -2053,7 +2054,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); if (req->r_locked_dir) ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); - if (req->r_old_dentry) + if (req->r_old_dentry_dir) ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), CEPH_CAP_PIN); -- cgit v0.10.2 From 4b58c9b19bddb47a1961608bc62d0c2f3dc9705e Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 5 Feb 2013 13:41:23 -0800 Subject: ceph: do not set r_old_dentry_dir on link() This is racy--we do not know whather d_parent has changed out from underneath us because i_mutex is not held on the source inode's directory. Also, taking this reference is useless. Reported-by: Al Viro Signed-off-by: Sage Weil Reviewed-by: Yan, Zheng diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index e079737..ff2864a 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -812,8 +812,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, } req->r_dentry = dget(dentry); req->r_num_caps = 2; - req->r_old_dentry = dget(old_dentry); /* or inode? hrm. */ - req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry); + req->r_old_dentry = dget(old_dentry); req->r_locked_dir = dir; req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; -- cgit v0.10.2 From 020c4bddc030815a767d86ba36ee0563e9855c23 Mon Sep 17 00:00:00 2001 From: Yunchuan Wen Date: Thu, 26 Dec 2013 06:29:26 -0800 Subject: ceph: fscache: add an interface to synchronize object store limit Add an interface to explicitly synchronize object->store_limit[_l] with inode->i_size Tested-by: Milosz Tanski Signed-off-by: Yunchuan Wen Signed-off-by: Min Chen Signed-off-by: Li Wang diff --git a/fs/ceph/cache.h b/fs/ceph/cache.h index da95f61..5ac591b 100644 --- a/fs/ceph/cache.h +++ b/fs/ceph/cache.h @@ -48,6 +48,12 @@ void ceph_readpage_to_fscache(struct inode *inode, struct page *page); void ceph_invalidate_fscache_page(struct inode* inode, struct page *page); void ceph_queue_revalidate(struct inode *inode); +static inline void ceph_fscache_update_objectsize(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + fscache_attr_changed(ci->fscache); +} + static inline void ceph_fscache_invalidate(struct inode *inode) { fscache_invalidate(ceph_inode(inode)->fscache); @@ -135,6 +141,10 @@ static inline void ceph_readpage_to_fscache(struct inode *inode, { } +static inline void ceph_fscache_update_objectsize(struct inode *inode) +{ +} + static inline void ceph_fscache_invalidate(struct inode *inode) { } -- cgit v0.10.2 From 32d3e148ddac0087fdd8499ce4075db20518e122 Mon Sep 17 00:00:00 2001 From: Yunchuan Wen Date: Thu, 26 Dec 2013 06:29:27 -0800 Subject: ceph: fscache: Update object store limit after file writing Synchronize object->store_limit[_l] with new inode->i_size after file writing. Tested-by: Milosz Tanski Signed-off-by: Yunchuan Wen Signed-off-by: Min Chen Signed-off-by: Li Wang diff --git a/fs/ceph/file.c b/fs/ceph/file.c index c298a7b..2862a75 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -970,6 +970,7 @@ retry_snap: goto retry_snap; } } else { + loff_t old_size = inode->i_size; /* * No need to acquire the i_truncate_mutex. Because * the MDS revokes Fwb caps before sending truncate @@ -980,6 +981,8 @@ retry_snap: written = generic_file_buffered_write(iocb, iov, nr_segs, pos, &iocb->ki_pos, count, 0); + if (inode->i_size > old_size) + ceph_fscache_update_objectsize(inode); mutex_unlock(&inode->i_mutex); } -- cgit v0.10.2 From f1fc4fee3bb163eebd0e1d16a8c84b66af03886e Mon Sep 17 00:00:00 2001 From: Yunchuan Wen Date: Thu, 26 Dec 2013 06:29:28 -0800 Subject: ceph: fscache: Wait for completion of object initialization The object store limit needs to be updated after writing, and this can be done provided the corresponding object has already been initialized. Current object initialization is done asynchrously, which introduce a race if a file is opened, then immediately followed by a writing, the initialization may have not completed, the code will reach the ASSERT in fscache_submit_exclusive_op() to cause kernel bug. Tested-by: Milosz Tanski Signed-off-by: Yunchuan Wen Signed-off-by: Min Chen Signed-off-by: Li Wang diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c index 8c44fdd..834f9f3 100644 --- a/fs/ceph/cache.c +++ b/fs/ceph/cache.c @@ -205,6 +205,7 @@ void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc, ci->fscache = fscache_acquire_cookie(fsc->fscache, &ceph_fscache_inode_object_def, ci, true); + fscache_check_consistency(ci->fscache); done: mutex_unlock(&inode->i_mutex); -- cgit v0.10.2 From 4f32b42dca660208c7556e13ebd84c510ad91840 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sat, 1 Mar 2014 18:05:41 +0800 Subject: ceph: simplify ceph_fh_to_dentry() MDS handles LOOKUPHASH and LOOKUPINO MDS requests in the same way. So __cfh_to_dentry() is redundant. Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil diff --git a/fs/ceph/export.c b/fs/ceph/export.c index 16796be..976d341 100644 --- a/fs/ceph/export.c +++ b/fs/ceph/export.c @@ -8,23 +8,6 @@ #include "mds_client.h" /* - * NFS export support - * - * NFS re-export of a ceph mount is, at present, only semireliable. - * The basic issue is that the Ceph architectures doesn't lend itself - * well to generating filehandles that will remain valid forever. - * - * So, we do our best. If you're lucky, your inode will be in the - * client's cache. If it's not, and you have a connectable fh, then - * the MDS server may be able to find it for you. Otherwise, you get - * ESTALE. - * - * There are ways to this more reliable, but in the non-connectable fh - * case, we won't every work perfectly, and in the connectable case, - * some changes are needed on the MDS side to work better. - */ - -/* * Basic fh */ struct ceph_nfs_fh { @@ -32,22 +15,12 @@ struct ceph_nfs_fh { } __attribute__ ((packed)); /* - * Larger 'connectable' fh that includes parent ino and name hash. - * Use this whenever possible, as it works more reliably. + * Larger fh that includes parent ino. */ struct ceph_nfs_confh { u64 ino, parent_ino; - u32 parent_name_hash; } __attribute__ ((packed)); -/* - * The presence of @parent_inode here tells us whether NFS wants a - * connectable file handle. However, we want to make a connectionable - * file handle unconditionally so that the MDS gets as much of a hint - * as possible. That means we only use @parent_dentry to indicate - * whether nfsd wants a connectable fh, and whether we should indicate - * failure from a too-small @max_len. - */ static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len, struct inode *parent_inode) { @@ -56,54 +29,36 @@ static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len, struct ceph_nfs_confh *cfh = (void *)rawfh; int connected_handle_length = sizeof(*cfh)/4; int handle_length = sizeof(*fh)/4; - struct dentry *dentry; - struct dentry *parent; /* don't re-export snaps */ if (ceph_snap(inode) != CEPH_NOSNAP) return -EINVAL; - dentry = d_find_alias(inode); + if (parent_inode && (*max_len < connected_handle_length)) { + *max_len = connected_handle_length; + return FILEID_INVALID; + } else if (*max_len < handle_length) { + *max_len = handle_length; + return FILEID_INVALID; + } - /* if we found an alias, generate a connectable fh */ - if (*max_len >= connected_handle_length && dentry) { - dout("encode_fh %p connectable\n", dentry); - spin_lock(&dentry->d_lock); - parent = dentry->d_parent; + if (parent_inode) { + dout("encode_fh %llx with parent %llx\n", + ceph_ino(inode), ceph_ino(parent_inode)); cfh->ino = ceph_ino(inode); - cfh->parent_ino = ceph_ino(parent->d_inode); - cfh->parent_name_hash = ceph_dentry_hash(parent->d_inode, - dentry); + cfh->parent_ino = ceph_ino(parent_inode); *max_len = connected_handle_length; - type = 2; - spin_unlock(&dentry->d_lock); - } else if (*max_len >= handle_length) { - if (parent_inode) { - /* nfsd wants connectable */ - *max_len = connected_handle_length; - type = FILEID_INVALID; - } else { - dout("encode_fh %p\n", dentry); - fh->ino = ceph_ino(inode); - *max_len = handle_length; - type = 1; - } + type = FILEID_INO32_GEN_PARENT; } else { + dout("encode_fh %llx\n", ceph_ino(inode)); + fh->ino = ceph_ino(inode); *max_len = handle_length; - type = FILEID_INVALID; + type = FILEID_INO32_GEN; } - if (dentry) - dput(dentry); return type; } -/* - * convert regular fh to dentry - * - * FIXME: we should try harder by querying the mds for the ino. - */ -static struct dentry *__fh_to_dentry(struct super_block *sb, - struct ceph_nfs_fh *fh, int fh_len) +static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino) { struct ceph_mds_client *mdsc = ceph_sb_to_client(sb)->mdsc; struct inode *inode; @@ -111,11 +66,7 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, struct ceph_vino vino; int err; - if (fh_len < sizeof(*fh) / 4) - return ERR_PTR(-ESTALE); - - dout("__fh_to_dentry %llx\n", fh->ino); - vino.ino = fh->ino; + vino.ino = ino; vino.snap = CEPH_NOSNAP; inode = ceph_find_inode(sb, vino); if (!inode) { @@ -139,89 +90,35 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, dentry = d_obtain_alias(inode); if (IS_ERR(dentry)) { - pr_err("fh_to_dentry %llx -- inode %p but ENOMEM\n", - fh->ino, inode); iput(inode); return dentry; } err = ceph_init_dentry(dentry); if (err < 0) { - iput(inode); + dput(dentry); return ERR_PTR(err); } - dout("__fh_to_dentry %llx %p dentry %p\n", fh->ino, inode, dentry); + dout("__fh_to_dentry %llx %p dentry %p\n", ino, inode, dentry); return dentry; } /* - * convert connectable fh to dentry + * convert regular fh to dentry */ -static struct dentry *__cfh_to_dentry(struct super_block *sb, - struct ceph_nfs_confh *cfh, int fh_len) +static struct dentry *ceph_fh_to_dentry(struct super_block *sb, + struct fid *fid, + int fh_len, int fh_type) { - struct ceph_mds_client *mdsc = ceph_sb_to_client(sb)->mdsc; - struct inode *inode; - struct dentry *dentry; - struct ceph_vino vino; - int err; - - if (fh_len < sizeof(*cfh) / 4) - return ERR_PTR(-ESTALE); - - dout("__cfh_to_dentry %llx (%llx/%x)\n", - cfh->ino, cfh->parent_ino, cfh->parent_name_hash); - - vino.ino = cfh->ino; - vino.snap = CEPH_NOSNAP; - inode = ceph_find_inode(sb, vino); - if (!inode) { - struct ceph_mds_request *req; - - req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPHASH, - USE_ANY_MDS); - if (IS_ERR(req)) - return ERR_CAST(req); - - req->r_ino1 = vino; - req->r_ino2.ino = cfh->parent_ino; - req->r_ino2.snap = CEPH_NOSNAP; - req->r_path2 = kmalloc(16, GFP_NOFS); - snprintf(req->r_path2, 16, "%d", cfh->parent_name_hash); - req->r_num_caps = 1; - err = ceph_mdsc_do_request(mdsc, NULL, req); - inode = req->r_target_inode; - if (inode) - ihold(inode); - ceph_mdsc_put_request(req); - if (!inode) - return ERR_PTR(err ? err : -ESTALE); - } + struct ceph_nfs_fh *fh = (void *)fid->raw; - dentry = d_obtain_alias(inode); - if (IS_ERR(dentry)) { - pr_err("cfh_to_dentry %llx -- inode %p but ENOMEM\n", - cfh->ino, inode); - iput(inode); - return dentry; - } - err = ceph_init_dentry(dentry); - if (err < 0) { - iput(inode); - return ERR_PTR(err); - } - dout("__cfh_to_dentry %llx %p dentry %p\n", cfh->ino, inode, dentry); - return dentry; -} + if (fh_type != FILEID_INO32_GEN && + fh_type != FILEID_INO32_GEN_PARENT) + return NULL; + if (fh_len < sizeof(*fh) / 4) + return NULL; -static struct dentry *ceph_fh_to_dentry(struct super_block *sb, struct fid *fid, - int fh_len, int fh_type) -{ - if (fh_type == 1) - return __fh_to_dentry(sb, (struct ceph_nfs_fh *)fid->raw, - fh_len); - else - return __cfh_to_dentry(sb, (struct ceph_nfs_confh *)fid->raw, - fh_len); + dout("fh_to_dentry %llx\n", fh->ino); + return __fh_to_dentry(sb, fh->ino); } /* -- cgit v0.10.2 From 9017c2ec78c730fb3ecd703d44e4a9061de2ba52 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sat, 1 Mar 2014 22:11:45 +0800 Subject: ceph: add get_parent() NFS export callback The callback uses LOOKUPPARENT MDS request to find parent. Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil diff --git a/fs/ceph/export.c b/fs/ceph/export.c index 976d341..9c28b6a 100644 --- a/fs/ceph/export.c +++ b/fs/ceph/export.c @@ -121,6 +121,65 @@ static struct dentry *ceph_fh_to_dentry(struct super_block *sb, return __fh_to_dentry(sb, fh->ino); } +static struct dentry *__get_parent(struct super_block *sb, + struct dentry *child, u64 ino) +{ + struct ceph_mds_client *mdsc = ceph_sb_to_client(sb)->mdsc; + struct ceph_mds_request *req; + struct inode *inode; + struct dentry *dentry; + int err; + + req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPPARENT, + USE_ANY_MDS); + if (IS_ERR(req)) + return ERR_CAST(req); + + if (child) { + req->r_inode = child->d_inode; + ihold(child->d_inode); + } else { + req->r_ino1 = (struct ceph_vino) { + .ino = ino, + .snap = CEPH_NOSNAP, + }; + } + req->r_num_caps = 1; + err = ceph_mdsc_do_request(mdsc, NULL, req); + inode = req->r_target_inode; + if (inode) + ihold(inode); + ceph_mdsc_put_request(req); + if (!inode) + return ERR_PTR(-ENOENT); + + dentry = d_obtain_alias(inode); + if (IS_ERR(dentry)) { + iput(inode); + return dentry; + } + err = ceph_init_dentry(dentry); + if (err < 0) { + dput(dentry); + return ERR_PTR(err); + } + dout("__get_parent ino %llx parent %p ino %llx.%llx\n", + child ? ceph_ino(child->d_inode) : ino, + dentry, ceph_vinop(inode)); + return dentry; +} + +struct dentry *ceph_get_parent(struct dentry *child) +{ + /* don't re-export snaps */ + if (ceph_snap(child->d_inode) != CEPH_NOSNAP) + return ERR_PTR(-EINVAL); + + dout("get_parent %p ino %llx.%llx\n", + child, ceph_vinop(child->d_inode)); + return __get_parent(child->d_sb, child, 0); +} + /* * get parent, if possible. * @@ -171,4 +230,5 @@ const struct export_operations ceph_export_ops = { .encode_fh = ceph_encode_fh, .fh_to_dentry = ceph_fh_to_dentry, .fh_to_parent = ceph_fh_to_parent, + .get_parent = ceph_get_parent, }; -- cgit v0.10.2 From 8996f4f23db735f0f3bab34352188b1ab21d7d7f Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sat, 1 Mar 2014 23:09:05 +0800 Subject: ceph: fix ceph_fh_to_parent() ceph_fh_to_parent() returns dentry that corresponds to the 'ino' field of struct ceph_nfs_confh. This is wrong, it should return dentry that corresponds to the 'parent_ino' field. Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil diff --git a/fs/ceph/export.c b/fs/ceph/export.c index 9c28b6a..eb66408 100644 --- a/fs/ceph/export.c +++ b/fs/ceph/export.c @@ -181,48 +181,24 @@ struct dentry *ceph_get_parent(struct dentry *child) } /* - * get parent, if possible. - * - * FIXME: we could do better by querying the mds to discover the - * parent. + * convert regular fh to parent */ static struct dentry *ceph_fh_to_parent(struct super_block *sb, - struct fid *fid, + struct fid *fid, int fh_len, int fh_type) { struct ceph_nfs_confh *cfh = (void *)fid->raw; - struct ceph_vino vino; - struct inode *inode; struct dentry *dentry; - int err; - if (fh_type == 1) - return ERR_PTR(-ESTALE); + if (fh_type != FILEID_INO32_GEN_PARENT) + return NULL; if (fh_len < sizeof(*cfh) / 4) - return ERR_PTR(-ESTALE); - - pr_debug("fh_to_parent %llx/%d\n", cfh->parent_ino, - cfh->parent_name_hash); - - vino.ino = cfh->ino; - vino.snap = CEPH_NOSNAP; - inode = ceph_find_inode(sb, vino); - if (!inode) - return ERR_PTR(-ESTALE); + return NULL; - dentry = d_obtain_alias(inode); - if (IS_ERR(dentry)) { - pr_err("fh_to_parent %llx -- inode %p but ENOMEM\n", - cfh->ino, inode); - iput(inode); - return dentry; - } - err = ceph_init_dentry(dentry); - if (err < 0) { - iput(inode); - return ERR_PTR(err); - } - dout("fh_to_parent %llx %p dentry %p\n", cfh->ino, inode, dentry); + dout("fh_to_parent %llx\n", cfh->parent_ino); + dentry = __get_parent(sb, NULL, cfh->ino); + if (IS_ERR(dentry) && PTR_ERR(dentry) == -ENOENT) + dentry = __fh_to_dentry(sb, cfh->parent_ino); return dentry; } -- cgit v0.10.2 From 19913b4eac4a230dccb548931358398f45dabe4c Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 6 Mar 2014 16:40:32 +0800 Subject: ceph: add get_name() NFS export callback Use the newly introduced LOOKUPNAME MDS request to connect child inode to its parent directory. Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil diff --git a/fs/ceph/export.c b/fs/ceph/export.c index eb66408..00d6af6 100644 --- a/fs/ceph/export.c +++ b/fs/ceph/export.c @@ -202,9 +202,49 @@ static struct dentry *ceph_fh_to_parent(struct super_block *sb, return dentry; } +static int ceph_get_name(struct dentry *parent, char *name, + struct dentry *child) +{ + struct ceph_mds_client *mdsc; + struct ceph_mds_request *req; + int err; + + mdsc = ceph_inode_to_client(child->d_inode)->mdsc; + req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPNAME, + USE_ANY_MDS); + if (IS_ERR(req)) + return PTR_ERR(req); + + mutex_lock(&parent->d_inode->i_mutex); + + req->r_inode = child->d_inode; + ihold(child->d_inode); + req->r_ino2 = ceph_vino(parent->d_inode); + req->r_locked_dir = parent->d_inode; + req->r_num_caps = 2; + err = ceph_mdsc_do_request(mdsc, NULL, req); + + mutex_unlock(&parent->d_inode->i_mutex); + + if (!err) { + struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; + memcpy(name, rinfo->dname, rinfo->dname_len); + name[rinfo->dname_len] = 0; + dout("get_name %p ino %llx.%llx name %s\n", + child, ceph_vinop(child->d_inode), name); + } else { + dout("get_name %p ino %llx.%llx err %d\n", + child, ceph_vinop(child->d_inode), err); + } + + ceph_mdsc_put_request(req); + return err; +} + const struct export_operations ceph_export_ops = { .encode_fh = ceph_encode_fh, .fh_to_dentry = ceph_fh_to_dentry, .fh_to_parent = ceph_fh_to_parent, .get_parent = ceph_get_parent, + .get_name = ceph_get_name, }; diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 8bf2384..91d6c9d 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1044,10 +1044,59 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, session, req->r_request_started, -1, &req->r_caps_reservation); if (err < 0) - return err; + goto done; } else { WARN_ON_ONCE(1); } + + if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME) { + struct qstr dname; + struct dentry *dn, *parent; + + BUG_ON(!rinfo->head->is_target); + BUG_ON(req->r_dentry); + + parent = d_find_any_alias(dir); + BUG_ON(!parent); + + dname.name = rinfo->dname; + dname.len = rinfo->dname_len; + dname.hash = full_name_hash(dname.name, dname.len); + vino.ino = le64_to_cpu(rinfo->targeti.in->ino); + vino.snap = le64_to_cpu(rinfo->targeti.in->snapid); +retry_lookup: + dn = d_lookup(parent, &dname); + dout("d_lookup on parent=%p name=%.*s got %p\n", + parent, dname.len, dname.name, dn); + + if (!dn) { + dn = d_alloc(parent, &dname); + dout("d_alloc %p '%.*s' = %p\n", parent, + dname.len, dname.name, dn); + if (dn == NULL) { + dput(parent); + err = -ENOMEM; + goto done; + } + err = ceph_init_dentry(dn); + if (err < 0) { + dput(dn); + dput(parent); + goto done; + } + } else if (dn->d_inode && + (ceph_ino(dn->d_inode) != vino.ino || + ceph_snap(dn->d_inode) != vino.snap)) { + dout(" dn %p points to wrong inode %p\n", + dn, dn->d_inode); + d_delete(dn); + dput(dn); + goto retry_lookup; + } + + req->r_dentry = dn; + dput(parent); + } } if (rinfo->head->is_target) { diff --git a/fs/ceph/strings.c b/fs/ceph/strings.c index 4440f44..51cc23e 100644 --- a/fs/ceph/strings.c +++ b/fs/ceph/strings.c @@ -54,6 +54,7 @@ const char *ceph_mds_op_name(int op) case CEPH_MDS_OP_LOOKUPHASH: return "lookuphash"; case CEPH_MDS_OP_LOOKUPPARENT: return "lookupparent"; case CEPH_MDS_OP_LOOKUPINO: return "lookupino"; + case CEPH_MDS_OP_LOOKUPNAME: return "lookupname"; case CEPH_MDS_OP_GETATTR: return "getattr"; case CEPH_MDS_OP_SETXATTR: return "setxattr"; case CEPH_MDS_OP_SETATTR: return "setattr"; diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 25bfb0e..35f345f 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -332,6 +332,7 @@ enum { CEPH_MDS_OP_LOOKUPHASH = 0x00102, CEPH_MDS_OP_LOOKUPPARENT = 0x00103, CEPH_MDS_OP_LOOKUPINO = 0x00104, + CEPH_MDS_OP_LOOKUPNAME = 0x00105, CEPH_MDS_OP_SETXATTR = 0x01105, CEPH_MDS_OP_RMXATTR = 0x01106, -- cgit v0.10.2 From c137a32a408af7a5635f3d0c5ddd34d270af9a3b Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sat, 1 Mar 2014 22:22:57 +0800 Subject: ceph: print inode number for LOOKUPINO request Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index 8c6f313..16b54aa 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -93,6 +93,8 @@ static int mdsc_show(struct seq_file *s, void *p) } else if (req->r_path1) { seq_printf(s, " #%llx/%s", req->r_ino1.ino, req->r_path1); + } else { + seq_printf(s, " #%llx", req->r_ino1.ino); } if (req->r_old_dentry) { -- cgit v0.10.2 From a255060451dcb416c8097218b40d86d613d84bfc Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sat, 8 Mar 2014 09:51:45 +0800 Subject: ceph: make sure write caps are registered with auth MDS Only auth MDS can issue write caps to clients, so don't consider write caps registered with non-auth MDS as valid. Signed-off-by: Yan, Zheng diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 1754338..d9ef44e 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -885,7 +885,10 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci) cap = rb_entry(p, struct ceph_cap, ci_node); if (!__cap_is_valid(cap)) continue; - mds_wanted |= cap->mds_wanted; + if (cap == ci->i_auth_cap) + mds_wanted |= cap->mds_wanted; + else + mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR); } return mds_wanted; } -- cgit v0.10.2 From 8c93cd610c6c5a4c0dddfc6fe906814331b3af87 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sat, 8 Mar 2014 20:12:23 +0800 Subject: ceph: update i_max_size even if inode version does not change handle following sequence of events: - client releases a inode with i_max_size > 0. The release message is queued. (is not sent to the auth MDS) - a 'lookup' request reply from non-auth MDS returns the same inode. - client opens the inode in write mode. The version of inode trace in 'open' request reply is equal to the cached inode's version. - client requests new max size. The MDS ignores the request because it does not affect client's write range Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 91d6c9d..1a37ee7 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -659,14 +659,6 @@ static int fill_inode(struct inode *inode, le32_to_cpu(info->time_warp_seq), &ctime, &mtime, &atime); - /* only update max_size on auth cap */ - if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) && - ci->i_max_size != le64_to_cpu(info->max_size)) { - dout("max_size %lld -> %llu\n", ci->i_max_size, - le64_to_cpu(info->max_size)); - ci->i_max_size = le64_to_cpu(info->max_size); - } - ci->i_layout = info->layout; inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1; @@ -755,6 +747,14 @@ static int fill_inode(struct inode *inode, ci->i_max_offset = 2; } no_change: + /* only update max_size on auth cap */ + if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) && + ci->i_max_size != le64_to_cpu(info->max_size)) { + dout("max_size %lld -> %llu\n", ci->i_max_size, + le64_to_cpu(info->max_size)); + ci->i_max_size = le64_to_cpu(info->max_size); + } + spin_unlock(&ci->i_ceph_lock); /* queue truncate if we saw i_size decrease */ -- cgit v0.10.2 From 0e8e95d6d74b2326e32274bb0401404cf3486038 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 4 Mar 2014 15:42:24 +0800 Subject: ceph: use fl->fl_type to decide flock operation VFS does not directly pass flock's operation code to filesystem's flock callback. It translates the operation code to the form how posix lock's parameters are presented. Signed-off-by: Yan, Zheng diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c index ae6d14e..133e006 100644 --- a/fs/ceph/locks.c +++ b/fs/ceph/locks.c @@ -91,10 +91,10 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl) dout("ceph_lock, fl_pid:%d", fl->fl_pid); /* set wait bit as appropriate, then make command as Ceph expects it*/ - if (F_SETLKW == cmd) - wait = 1; - if (F_GETLK == cmd) + if (IS_GETLK(cmd)) op = CEPH_MDS_OP_GETFILELOCK; + else if (IS_SETLKW(cmd)) + wait = 1; if (F_RDLCK == fl->fl_type) lock_cmd = CEPH_LOCK_SHARED; @@ -131,20 +131,17 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl) { u8 lock_cmd; int err; - u8 wait = 1; + u8 wait = 0; fl->fl_nspid = get_pid(task_tgid(current)); dout("ceph_flock, fl_pid:%d", fl->fl_pid); - /* set wait bit, then clear it out of cmd*/ - if (cmd & LOCK_NB) - wait = 0; - cmd = cmd & (LOCK_SH | LOCK_EX | LOCK_UN); - /* set command sequence that Ceph wants to see: - shared lock, exclusive lock, or unlock */ - if (LOCK_SH == cmd) + if (IS_SETLKW(cmd)) + wait = 1; + + if (F_RDLCK == fl->fl_type) lock_cmd = CEPH_LOCK_SHARED; - else if (LOCK_EX == cmd) + else if (F_WRLCK == fl->fl_type) lock_cmd = CEPH_LOCK_EXCL; else lock_cmd = CEPH_LOCK_UNLOCK; -- cgit v0.10.2 From eb70c0ce4e877c6854c682c0fc009d7875994942 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 4 Mar 2014 15:50:06 +0800 Subject: ceph: forbid mandatory file lock Signed-off-by: Yan, Zheng diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c index 133e006..f91a569a 100644 --- a/fs/ceph/locks.c +++ b/fs/ceph/locks.c @@ -87,6 +87,12 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl) u8 wait = 0; u16 op = CEPH_MDS_OP_SETFILELOCK; + if (!(fl->fl_flags & FL_POSIX)) + return -ENOLCK; + /* No mandatory locks */ + if (__mandatory_lock(file->f_mapping->host) && fl->fl_type != F_UNLCK) + return -ENOLCK; + fl->fl_nspid = get_pid(task_tgid(current)); dout("ceph_lock, fl_pid:%d", fl->fl_pid); @@ -133,6 +139,12 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl) int err; u8 wait = 0; + if (!(fl->fl_flags & FL_FLOCK)) + return -ENOLCK; + /* No mandatory locks */ + if (__mandatory_lock(file->f_mapping->host) && fl->fl_type != F_UNLCK) + return -ENOLCK; + fl->fl_nspid = get_pid(task_tgid(current)); dout("ceph_flock, fl_pid:%d", fl->fl_pid); -- cgit v0.10.2 From eb13e832f823f6c110ea53e3067bafe22b87de63 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sun, 9 Mar 2014 23:16:40 +0800 Subject: ceph: use fl->fl_file as owner identifier of flock and posix lock flock and posix lock should use fl->fl_file instead of process ID as owner identifier. (posix lock uses fl->fl_owner. fl->fl_owner is usually equal to fl->fl_file, but it also can be a customized value). The process ID of who holds the lock is just for F_GETLK fcntl(2). The fix is rename the 'pid' fields of struct ceph_mds_request_args and struct ceph_filelock to 'owner', rename 'pid_namespace' fields to 'pid'. Assign fl->fl_file to the 'owner' field of lock messages. We also set the most significant bit of the 'owner' field. MDS can use that bit to distinguish between old and new clients. The MDS counterpart of this patch modifies the flock code to not take the 'pid_namespace' into consideration when checking conflict locks. Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c index f91a569a..d94ba0d 100644 --- a/fs/ceph/locks.c +++ b/fs/ceph/locks.c @@ -2,11 +2,31 @@ #include #include +#include #include "super.h" #include "mds_client.h" #include +static u64 lock_secret; + +static inline u64 secure_addr(void *addr) +{ + u64 v = lock_secret ^ (u64)(unsigned long)addr; + /* + * Set the most significant bit, so that MDS knows the 'owner' + * is sufficient to identify the owner of lock. (old code uses + * both 'owner' and 'pid') + */ + v |= (1ULL << 63); + return v; +} + +void __init ceph_flock_init(void) +{ + get_random_bytes(&lock_secret, sizeof(lock_secret)); +} + /** * Implement fcntl and flock locking functions. */ @@ -14,11 +34,11 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file, int cmd, u8 wait, struct file_lock *fl) { struct inode *inode = file_inode(file); - struct ceph_mds_client *mdsc = - ceph_sb_to_client(inode->i_sb)->mdsc; + struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_mds_request *req; int err; u64 length = 0; + u64 owner; req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS); if (IS_ERR(req)) @@ -32,25 +52,27 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file, else length = fl->fl_end - fl->fl_start + 1; - dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, " - "length: %llu, wait: %d, type: %d", (int)lock_type, - (int)operation, (u64)fl->fl_pid, fl->fl_start, - length, wait, fl->fl_type); + if (lock_type == CEPH_LOCK_FCNTL) + owner = secure_addr(fl->fl_owner); + else + owner = secure_addr(fl->fl_file); + + dout("ceph_lock_message: rule: %d, op: %d, owner: %llx, pid: %llu, " + "start: %llu, length: %llu, wait: %d, type: %d", (int)lock_type, + (int)operation, owner, (u64)fl->fl_pid, fl->fl_start, length, + wait, fl->fl_type); req->r_args.filelock_change.rule = lock_type; req->r_args.filelock_change.type = cmd; + req->r_args.filelock_change.owner = cpu_to_le64(owner); req->r_args.filelock_change.pid = cpu_to_le64((u64)fl->fl_pid); - /* This should be adjusted, but I'm not sure if - namespaces actually get id numbers*/ - req->r_args.filelock_change.pid_namespace = - cpu_to_le64((u64)(unsigned long)fl->fl_nspid); req->r_args.filelock_change.start = cpu_to_le64(fl->fl_start); req->r_args.filelock_change.length = cpu_to_le64(length); req->r_args.filelock_change.wait = wait; err = ceph_mdsc_do_request(mdsc, inode, req); - if ( operation == CEPH_MDS_OP_GETFILELOCK){ + if (operation == CEPH_MDS_OP_GETFILELOCK) { fl->fl_pid = le64_to_cpu(req->r_reply_info.filelock_reply->pid); if (CEPH_LOCK_SHARED == req->r_reply_info.filelock_reply->type) fl->fl_type = F_RDLCK; @@ -93,8 +115,7 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl) if (__mandatory_lock(file->f_mapping->host) && fl->fl_type != F_UNLCK) return -ENOLCK; - fl->fl_nspid = get_pid(task_tgid(current)); - dout("ceph_lock, fl_pid:%d", fl->fl_pid); + dout("ceph_lock, fl_owner: %p", fl->fl_owner); /* set wait bit as appropriate, then make command as Ceph expects it*/ if (IS_GETLK(cmd)) @@ -111,7 +132,7 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl) err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file, lock_cmd, wait, fl); if (!err) { - if ( op != CEPH_MDS_OP_GETFILELOCK ){ + if (op != CEPH_MDS_OP_GETFILELOCK) { dout("mds locked, locking locally"); err = posix_lock_file(file, fl, NULL); if (err && (CEPH_MDS_OP_SETFILELOCK == op)) { @@ -145,8 +166,7 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl) if (__mandatory_lock(file->f_mapping->host) && fl->fl_type != F_UNLCK) return -ENOLCK; - fl->fl_nspid = get_pid(task_tgid(current)); - dout("ceph_flock, fl_pid:%d", fl->fl_pid); + dout("ceph_flock, fl_file: %p", fl->fl_file); if (IS_SETLKW(cmd)) wait = 1; @@ -289,13 +309,14 @@ int lock_to_ceph_filelock(struct file_lock *lock, struct ceph_filelock *cephlock) { int err = 0; - cephlock->start = cpu_to_le64(lock->fl_start); cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1); cephlock->client = cpu_to_le64(0); - cephlock->pid = cpu_to_le64(lock->fl_pid); - cephlock->pid_namespace = - cpu_to_le64((u64)(unsigned long)lock->fl_nspid); + cephlock->pid = cpu_to_le64((u64)lock->fl_pid); + if (lock->fl_flags & FL_POSIX) + cephlock->owner = cpu_to_le64(secure_addr(lock->fl_owner)); + else + cephlock->owner = cpu_to_le64(secure_addr(lock->fl_file)); switch (lock->fl_type) { case F_RDLCK: diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 10a4ccb..06150fd 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -1026,6 +1026,7 @@ static int __init init_ceph(void) if (ret) goto out; + ceph_flock_init(); ceph_xattr_init(); ret = register_filesystem(&ceph_fs_type); if (ret) diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 70bb183..7866cd0 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -871,6 +871,7 @@ extern long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg); extern const struct export_operations ceph_export_ops; /* locks.c */ +extern __init void ceph_flock_init(void); extern int ceph_lock(struct file *file, int cmd, struct file_lock *fl); extern int ceph_flock(struct file *file, int cmd, struct file_lock *fl); extern void ceph_count_locks(struct inode *inode, int *p_num, int *f_num); diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 35f345f..5f6db18 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -421,8 +421,8 @@ union ceph_mds_request_args { struct { __u8 rule; /* currently fcntl or flock */ __u8 type; /* shared, exclusive, remove*/ + __le64 owner; /* owner of the lock */ __le64 pid; /* process id requesting the lock */ - __le64 pid_namespace; __le64 start; /* initial location to lock */ __le64 length; /* num bytes to lock from start */ __u8 wait; /* will caller wait for lock to become available? */ @@ -533,8 +533,8 @@ struct ceph_filelock { __le64 start;/* file offset to start lock at */ __le64 length; /* num bytes to lock; 0 for all following start */ __le64 client; /* which client holds the lock */ + __le64 owner; /* owner the lock */ __le64 pid; /* process id holding the lock on the client */ - __le64 pid_namespace; __u8 type; /* shared lock, exclusive lock, or unlock */ } __attribute__ ((packed)); -- cgit v0.10.2 From d9ffc4f77073e7e1ca731f21804769de9c094b87 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 18 Mar 2014 10:15:29 +0800 Subject: ceph: set mds_wanted when MDS reply changes a cap to auth cap When adjusting caps client wants, MDS does not record caps that are not allowed. For non-auth MDS, it does not record WR caps. So when a MDS reply changes a non-auth cap to auth cap, client needs to set cap's mds_wanted according to the reply. Signed-off-by: Yan, Zheng diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index d9ef44e..2e5e648 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -622,8 +622,10 @@ retry: if (flags & CEPH_CAP_FLAG_AUTH) { if (ci->i_auth_cap == NULL || - ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) + ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) { ci->i_auth_cap = cap; + cap->mds_wanted = wanted; + } ci->i_cap_exporting_issued = 0; } else { WARN_ON(ci->i_auth_cap == cap); -- cgit v0.10.2 From 5f75ce57818e4a48bdeac0b76daeb434eea26059 Mon Sep 17 00:00:00 2001 From: Fabian Frederick Date: Fri, 21 Mar 2014 09:52:58 -0700 Subject: ceph: Remove get/set acl on symlinks Remove unsupported symlink operations. Signed-off-by: Fabian Frederick Signed-off-by: Ilya Dryomov diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 1a37ee7..c9f2567 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1665,8 +1665,6 @@ static const struct inode_operations ceph_symlink_iops = { .getxattr = ceph_getxattr, .listxattr = ceph_listxattr, .removexattr = ceph_removexattr, - .get_acl = ceph_get_acl, - .set_acl = ceph_set_acl, }; /* -- cgit v0.10.2 From d90deda69cb82411ba7d990e97218e0f8b2d07bb Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sun, 23 Mar 2014 06:50:39 +0800 Subject: libceph: fix oops in ceph_msg_data_{pages,pagelist}_advance() When there is no more data, ceph_msg_data_{pages,pagelist}_advance() should not move on to the next page. Signed-off-by: Yan, Zheng diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 30efc5c..4f55f9c 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -919,6 +919,9 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor, if (!bytes || cursor->page_offset) return false; /* more bytes to process in the current page */ + if (!cursor->resid) + return false; /* no more data */ + /* Move on to the next page; offset is already at 0 */ BUG_ON(cursor->page_index >= cursor->page_count); @@ -1004,6 +1007,9 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor, if (!bytes || cursor->offset & ~PAGE_MASK) return false; /* more bytes to process in the current page */ + if (!cursor->resid) + return false; /* no more data */ + /* Move on to the next page */ BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); -- cgit v0.10.2 From 00bd8edb861eb41d274938cfc0338999d9c593a3 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Mon, 24 Mar 2014 09:56:43 +0800 Subject: ceph: fix null pointer dereference in discard_cap_releases() send_mds_reconnect() may call discard_cap_releases() after all release messages have been dropped by cleanup_cap_releases() Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index f260bd8..77640ada4 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1462,15 +1462,18 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc, dout("discard_cap_releases mds%d\n", session->s_mds); - /* zero out the in-progress message */ - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); - head = msg->front.iov_base; - num = le32_to_cpu(head->num); - dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); - head->num = cpu_to_le32(0); - msg->front.iov_len = sizeof(*head); - session->s_num_cap_releases += num; + if (!list_empty(&session->s_cap_releases)) { + /* zero out the in-progress message */ + msg = list_first_entry(&session->s_cap_releases, + struct ceph_msg, list_head); + head = msg->front.iov_base; + num = le32_to_cpu(head->num); + dout("discard_cap_releases mds%d %p %u\n", + session->s_mds, msg, num); + head->num = cpu_to_le32(0); + msg->front.iov_len = sizeof(*head); + session->s_num_cap_releases += num; + } /* requeue completed messages */ while (!list_empty(&session->s_cap_releases_done)) { -- cgit v0.10.2 From 1e5c6649ff0a2049511bafa297277234011a5c58 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Mon, 24 Mar 2014 13:00:54 +0800 Subject: ceph: check buffer size in ceph_vxattrcb_layout() If buffer size is zero, return the size of layout vxattr. If buffer size is not zero, check if it is large enough for layout vxattr. Signed-off-by: Yan, Zheng diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 2dbd668..28549d5 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -64,32 +64,48 @@ static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci) } static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, - size_t size) + size_t size) { int ret; struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb); struct ceph_osd_client *osdc = &fsc->client->osdc; s64 pool = ceph_file_layout_pg_pool(ci->i_layout); const char *pool_name; + char buf[128]; dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode); down_read(&osdc->map_sem); pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool); - if (pool_name) - ret = snprintf(val, size, - "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%s", + if (pool_name) { + size_t len = strlen(pool_name); + ret = snprintf(buf, sizeof(buf), + "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=", (unsigned long long)ceph_file_layout_su(ci->i_layout), (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), - (unsigned long long)ceph_file_layout_object_size(ci->i_layout), - pool_name); - else - ret = snprintf(val, size, + (unsigned long long)ceph_file_layout_object_size(ci->i_layout)); + if (!size) { + ret += len; + } else if (ret + len > size) { + ret = -ERANGE; + } else { + memcpy(val, buf, ret); + memcpy(val + ret, pool_name, len); + ret += len; + } + } else { + ret = snprintf(buf, sizeof(buf), "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%lld", (unsigned long long)ceph_file_layout_su(ci->i_layout), (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), (unsigned long long)ceph_file_layout_object_size(ci->i_layout), (unsigned long long)pool); - + if (size) { + if (ret <= size) + memcpy(val, buf, ret); + else + ret = -ERANGE; + } + } up_read(&osdc->map_sem); return ret; } -- cgit v0.10.2 From cc48c3e85f7fc48092f2e9874f1a07dd997d9184 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Mon, 24 Mar 2014 19:15:05 +0800 Subject: ceph: don't include ceph.{file,dir}.layout vxattr in listxattr() This avoids 'cp -a' modifying layout of new files/directories. Signed-off-by: Yan, Zheng diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 28549d5..c9c2b88 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -231,7 +231,7 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = { .name_size = sizeof("ceph.dir.layout"), .getxattr_cb = ceph_vxattrcb_layout, .readonly = false, - .hidden = false, + .hidden = true, .exists_cb = ceph_vxattrcb_layout_exists, }, XATTR_LAYOUT_FIELD(dir, layout, stripe_unit), @@ -258,7 +258,7 @@ static struct ceph_vxattr ceph_file_vxattrs[] = { .name_size = sizeof("ceph.file.layout"), .getxattr_cb = ceph_vxattrcb_layout, .readonly = false, - .hidden = false, + .hidden = true, .exists_cb = ceph_vxattrcb_layout_exists, }, XATTR_LAYOUT_FIELD(file, layout, stripe_unit), -- cgit v0.10.2 From 48a163dbb517eba13643bf404a0d695c1ab0a60d Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 19 Mar 2014 16:58:36 +0200 Subject: crush: fix off-by-one errors in total_tries refactor Back in 27f4d1f6bc32c2ed7b2c5080cbd58b14df622607 we refactored the CRUSH code to allow adjustment of the retry counts on a per-pool basis. That commit had an off-by-one bug: the previous "tries" counter was a *retry* count, not a *try* count, but the new code was passing in 1 meaning there should be no retries. Fix the ftotal vs tries comparison to use < instead of <= to fix the problem. Note that the original code used <= here, which means the global "choose_total_tries" tunable is actually counting retries. Compensate for that by adding 1 in crush_do_rule when we pull the tunable into the local variable. This was noticed looking at output from a user provided osdmap. Unfortunately the map doesn't illustrate the change in mapping behavior and I haven't managed to construct one yet that does. Inspection of the crush debug output now aligns with prior versions, though. Reflects ceph.git commit 795704fd615f0b008dcc81aa088a859b2d075138. Signed-off-by: Ilya Dryomov Reviewed-by: Josh Durgin diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index b703790..074bb2a 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -292,8 +292,8 @@ static int is_out(const struct crush_map *map, * @outpos: our position in that vector * @tries: number of attempts to make * @recurse_tries: number of attempts to have recursive chooseleaf make - * @local_tries: localized retries - * @local_fallback_tries: localized fallback retries + * @local_retries: localized retries + * @local_fallback_retries: localized fallback retries * @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose) * @out2: second output vector for leaf items (if @recurse_to_leaf) */ @@ -304,8 +304,8 @@ static int crush_choose_firstn(const struct crush_map *map, int *out, int outpos, unsigned int tries, unsigned int recurse_tries, - unsigned int local_tries, - unsigned int local_fallback_tries, + unsigned int local_retries, + unsigned int local_fallback_retries, int recurse_to_leaf, int *out2) { @@ -344,9 +344,9 @@ static int crush_choose_firstn(const struct crush_map *map, reject = 1; goto reject; } - if (local_fallback_tries > 0 && + if (local_fallback_retries > 0 && flocal >= (in->size>>1) && - flocal > local_fallback_tries) + flocal > local_fallback_retries) item = bucket_perm_choose(in, x, r); else item = crush_bucket_choose(in, x, r); @@ -393,8 +393,8 @@ static int crush_choose_firstn(const struct crush_map *map, x, outpos+1, 0, out2, outpos, recurse_tries, 0, - local_tries, - local_fallback_tries, + local_retries, + local_fallback_retries, 0, NULL) <= outpos) /* didn't get leaf */ @@ -420,14 +420,14 @@ reject: ftotal++; flocal++; - if (collide && flocal <= local_tries) + if (collide && flocal <= local_retries) /* retry locally a few times */ retry_bucket = 1; - else if (local_fallback_tries > 0 && - flocal <= in->size + local_fallback_tries) + else if (local_fallback_retries > 0 && + flocal <= in->size + local_fallback_retries) /* exhaustive bucket search */ retry_bucket = 1; - else if (ftotal <= tries) + else if (ftotal < tries) /* then retry descent */ retry_descent = 1; else @@ -640,10 +640,18 @@ int crush_do_rule(const struct crush_map *map, __u32 step; int i, j; int numrep; - int choose_tries = map->choose_total_tries; - int choose_local_tries = map->choose_local_tries; - int choose_local_fallback_tries = map->choose_local_fallback_tries; + /* + * the original choose_total_tries value was off by one (it + * counted "retries" and not "tries"). add one. + */ + int choose_tries = map->choose_total_tries + 1; int choose_leaf_tries = 0; + /* + * the local tries values were counted as "retries", though, + * and need no adjustment + */ + int choose_local_retries = map->choose_local_tries; + int choose_local_fallback_retries = map->choose_local_fallback_tries; if ((__u32)ruleno >= map->max_rules) { dprintk(" bad ruleno %d\n", ruleno); @@ -677,12 +685,12 @@ int crush_do_rule(const struct crush_map *map, case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES: if (curstep->arg1 > 0) - choose_local_tries = curstep->arg1; + choose_local_retries = curstep->arg1; break; case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES: if (curstep->arg1 > 0) - choose_local_fallback_tries = curstep->arg1; + choose_local_fallback_retries = curstep->arg1; break; case CRUSH_RULE_CHOOSELEAF_FIRSTN: @@ -734,8 +742,8 @@ int crush_do_rule(const struct crush_map *map, o+osize, j, choose_tries, recurse_tries, - choose_local_tries, - choose_local_fallback_tries, + choose_local_retries, + choose_local_fallback_retries, recurse_to_leaf, c+osize); } else { -- cgit v0.10.2 From 6ed1002f368c63ef79d7f659fcb4368a90098132 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 19 Mar 2014 16:58:37 +0200 Subject: crush: allow crush rules to set (re)tries counts to 0 These two fields are misnomers; they are *retry* counts. Reflects ceph.git commit f17caba8ae0cad7b6f8f35e53e5f73b444696835. Signed-off-by: Ilya Dryomov Reviewed-by: Josh Durgin diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 074bb2a..b3fb849 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -684,12 +684,12 @@ int crush_do_rule(const struct crush_map *map, break; case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES: - if (curstep->arg1 > 0) + if (curstep->arg1 >= 0) choose_local_retries = curstep->arg1; break; case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES: - if (curstep->arg1 > 0) + if (curstep->arg1 >= 0) choose_local_fallback_retries = curstep->arg1; break; -- cgit v0.10.2 From e2b149cc4ba00766aceb87950c6de72ea7fc8b2e Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 19 Mar 2014 16:58:37 +0200 Subject: crush: add chooseleaf_vary_r tunable The current crush_choose_firstn code will re-use the same 'r' value for the recursive call. That means that if we are hitting a collision or rejection for some reason (say, an OSD that is marked out) and need to retry, we will keep making the same (bad) choice in that recursive selection. Introduce a tunable that fixes that behavior by incorporating the parent 'r' value into the recursive starting point, so that a different path will be taken in subsequent placement attempts. Note that this was done from the get-go for the new crush_choose_indep algorithm. This was exposed by a user who was seeing PGs stuck in active+remapped after reweight-by-utilization because the up set mapped to a single OSD. Reflects ceph.git commit a8e6c9fbf88bad056dd05d3eb790e98a5e43451a. Signed-off-by: Ilya Dryomov Reviewed-by: Josh Durgin diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index acaa561..75f36a6 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -173,6 +173,12 @@ struct crush_map { * apply to a collision: in that case we will retry as we used * to. */ __u32 chooseleaf_descend_once; + + /* if non-zero, feed r into chooseleaf, bit-shifted right by (r-1) + * bits. a value of 1 is best for new clusters. for legacy clusters + * that want to limit reshuffling, a value of 3 or 4 will make the + * mappings line up a bit better with previous mappings. */ + __u8 chooseleaf_vary_r; }; diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index b3fb849..947150c 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -295,7 +295,9 @@ static int is_out(const struct crush_map *map, * @local_retries: localized retries * @local_fallback_retries: localized fallback retries * @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose) + * @vary_r: pass r to recursive calls * @out2: second output vector for leaf items (if @recurse_to_leaf) + * @parent_r: r value passed from the parent */ static int crush_choose_firstn(const struct crush_map *map, struct crush_bucket *bucket, @@ -307,7 +309,9 @@ static int crush_choose_firstn(const struct crush_map *map, unsigned int local_retries, unsigned int local_fallback_retries, int recurse_to_leaf, - int *out2) + unsigned int vary_r, + int *out2, + int parent_r) { int rep; unsigned int ftotal, flocal; @@ -319,8 +323,11 @@ static int crush_choose_firstn(const struct crush_map *map, int itemtype; int collide, reject; - dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", - bucket->id, x, outpos, numrep); + dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d tries %d recurse_tries %d local_retries %d local_fallback_retries %d parent_r %d\n", + recurse_to_leaf ? "_LEAF" : "", + bucket->id, x, outpos, numrep, + tries, recurse_tries, local_retries, local_fallback_retries, + parent_r); for (rep = outpos; rep < numrep; rep++) { /* keep trying until we get a non-out, non-colliding item */ @@ -335,7 +342,7 @@ static int crush_choose_firstn(const struct crush_map *map, do { collide = 0; retry_bucket = 0; - r = rep; + r = rep + parent_r; /* r' = r + f_total */ r += ftotal; @@ -387,6 +394,11 @@ static int crush_choose_firstn(const struct crush_map *map, reject = 0; if (!collide && recurse_to_leaf) { if (item < 0) { + int sub_r; + if (vary_r) + sub_r = r >> (vary_r-1); + else + sub_r = 0; if (crush_choose_firstn(map, map->buckets[-1-item], weight, weight_max, @@ -396,7 +408,9 @@ static int crush_choose_firstn(const struct crush_map *map, local_retries, local_fallback_retries, 0, - NULL) <= outpos) + vary_r, + NULL, + sub_r) <= outpos) /* didn't get leaf */ reject = 1; } else { @@ -653,6 +667,8 @@ int crush_do_rule(const struct crush_map *map, int choose_local_retries = map->choose_local_tries; int choose_local_fallback_retries = map->choose_local_fallback_tries; + int vary_r = map->chooseleaf_vary_r; + if ((__u32)ruleno >= map->max_rules) { dprintk(" bad ruleno %d\n", ruleno); return 0; @@ -745,7 +761,9 @@ int crush_do_rule(const struct crush_map *map, choose_local_retries, choose_local_fallback_retries, recurse_to_leaf, - c+osize); + vary_r, + c+osize, + 0); } else { crush_choose_indep( map, -- cgit v0.10.2 From d83ed858f144e3cfbef883d4bc499113cdddabeb Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 19 Mar 2014 16:58:37 +0200 Subject: crush: add SET_CHOOSELEAF_VARY_R step This lets you adjust the vary_r tunable on a per-rule basis. Reflects ceph.git commit f944ccc20aee60a7d8da7e405ec75ad1cd449fac. Signed-off-by: Ilya Dryomov Reviewed-by: Josh Durgin diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 75f36a6..4fad5f8 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -51,6 +51,7 @@ enum { CRUSH_RULE_SET_CHOOSELEAF_TRIES = 9, /* override chooseleaf_descend_once */ CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES = 10, CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES = 11, + CRUSH_RULE_SET_CHOOSELEAF_VARY_R = 12 }; /* diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 947150c..a1ef53c 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -709,6 +709,11 @@ int crush_do_rule(const struct crush_map *map, choose_local_fallback_retries = curstep->arg1; break; + case CRUSH_RULE_SET_CHOOSELEAF_VARY_R: + if (curstep->arg1 >= 0) + vary_r = curstep->arg1; + break; + case CRUSH_RULE_CHOOSELEAF_FIRSTN: case CRUSH_RULE_CHOOSE_FIRSTN: firstn = 1; -- cgit v0.10.2 From 07bd7de47a65767432ceb66d4ab30cdc05ed2b35 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 19 Mar 2014 16:58:37 +0200 Subject: crush: support chooseleaf_vary_r tunable (tunables3) by default Add TUNABLES3 feature (chooseleaf_vary_r tunable) to a set of features supported by default. Signed-off-by: Ilya Dryomov Reviewed-by: Josh Durgin diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h index 138448f..77c097f 100644 --- a/include/linux/ceph/ceph_features.h +++ b/include/linux/ceph/ceph_features.h @@ -43,6 +43,13 @@ #define CEPH_FEATURE_CRUSH_V2 (1ULL<<36) /* new indep; SET_* steps */ #define CEPH_FEATURE_EXPORT_PEER (1ULL<<37) #define CEPH_FEATURE_OSD_ERASURE_CODES (1ULL<<38) +#define CEPH_FEATURE_OSD_TMAP2OMAP (1ULL<<38) /* overlap with EC */ +/* The process supports new-style OSDMap encoding. Monitors also use + this bit to determine if peers support NAK messages. */ +#define CEPH_FEATURE_OSDMAP_ENC (1ULL<<39) +#define CEPH_FEATURE_MDS_INLINE_DATA (1ULL<<40) +#define CEPH_FEATURE_CRUSH_TUNABLES3 (1ULL<<41) +#define CEPH_FEATURE_OSD_PRIMARY_AFFINITY (1ULL<<41) /* overlap w/ tunables3 */ /* * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature @@ -82,7 +89,8 @@ static inline u64 ceph_sanitize_features(u64 features) CEPH_FEATURE_OSDHASHPSPOOL | \ CEPH_FEATURE_OSD_CACHEPOOL | \ CEPH_FEATURE_CRUSH_V2 | \ - CEPH_FEATURE_EXPORT_PEER) + CEPH_FEATURE_EXPORT_PEER | \ + CEPH_FEATURE_CRUSH_TUNABLES3) #define CEPH_FEATURES_REQUIRED_DEFAULT \ (CEPH_FEATURE_NOSRCADDR | \ -- cgit v0.10.2 From 35fea3a18a1df8f981f292b492c2de3a9e4e5fc2 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 13 Mar 2014 16:36:12 +0200 Subject: libceph: refer to osdmap directly in osdmap_show() To make it more readable and save screen space. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 258a382..d225842 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -53,34 +53,36 @@ static int osdmap_show(struct seq_file *s, void *p) { int i; struct ceph_client *client = s->private; + struct ceph_osdmap *map = client->osdc.osdmap; struct rb_node *n; - if (client->osdc.osdmap == NULL) + if (map == NULL) return 0; - seq_printf(s, "epoch %d\n", client->osdc.osdmap->epoch); + + seq_printf(s, "epoch %d\n", map->epoch); seq_printf(s, "flags%s%s\n", - (client->osdc.osdmap->flags & CEPH_OSDMAP_NEARFULL) ? - " NEARFULL" : "", - (client->osdc.osdmap->flags & CEPH_OSDMAP_FULL) ? - " FULL" : ""); - for (n = rb_first(&client->osdc.osdmap->pg_pools); n; n = rb_next(n)) { + (map->flags & CEPH_OSDMAP_NEARFULL) ? " NEARFULL" : "", + (map->flags & CEPH_OSDMAP_FULL) ? " FULL" : ""); + + for (n = rb_first(&map->pg_pools); n; n = rb_next(n)) { struct ceph_pg_pool_info *pool = rb_entry(n, struct ceph_pg_pool_info, node); + seq_printf(s, "pg_pool %llu pg_num %d / %d\n", (unsigned long long)pool->id, pool->pg_num, pool->pg_num_mask); } - for (i = 0; i < client->osdc.osdmap->max_osd; i++) { - struct ceph_entity_addr *addr = - &client->osdc.osdmap->osd_addr[i]; - int state = client->osdc.osdmap->osd_state[i]; + for (i = 0; i < map->max_osd; i++) { + struct ceph_entity_addr *addr = &map->osd_addr[i]; + int state = map->osd_state[i]; char sb[64]; seq_printf(s, "\tosd%d\t%s\t%3d%%\t(%s)\n", i, ceph_pr_addr(&addr->in_addr), - ((client->osdc.osdmap->osd_weight[i]*100) >> 16), + ((map->osd_weight[i]*100) >> 16), ceph_osdmap_state_str(sb, sizeof(sb), state)); } + return 0; } -- cgit v0.10.2 From 0a2800d7280ccdf3194bd8bd74a2eb8c315b54c6 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 13 Mar 2014 16:36:12 +0200 Subject: libceph: do not prefix osd lines with \t in debugfs output To save screen space in anticipation of more fields (e.g. primary affinity). Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index d225842..112d98e 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -77,7 +77,7 @@ static int osdmap_show(struct seq_file *s, void *p) int state = map->osd_state[i]; char sb[64]; - seq_printf(s, "\tosd%d\t%s\t%3d%%\t(%s)\n", + seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\n", i, ceph_pr_addr(&addr->in_addr), ((map->osd_weight[i]*100) >> 16), ceph_osdmap_state_str(sb, sizeof(sb), state)); -- cgit v0.10.2 From 1c00240e007d14d3242fa490b50166b4f1b2770a Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 13 Mar 2014 16:36:13 +0200 Subject: libceph: dump pg_temp mappings to debugfs Dump pg_temp mappings to /sys/kernel/debug/ceph//osdmap, one 'pg_temp [, ..., ]' per line, e.g: pg_temp 2.6 [2,3,4] Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 112d98e..c45d235 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -82,6 +82,17 @@ static int osdmap_show(struct seq_file *s, void *p) ((map->osd_weight[i]*100) >> 16), ceph_osdmap_state_str(sb, sizeof(sb), state)); } + for (n = rb_first(&map->pg_temp); n; n = rb_next(n)) { + struct ceph_pg_mapping *pg = + rb_entry(n, struct ceph_pg_mapping, node); + + seq_printf(s, "pg_temp %llu.%x [", pg->pgid.pool, + pg->pgid.seed); + for (i = 0; i < pg->len; i++) + seq_printf(s, "%s%d", (i == 0 ? "" : ","), + pg->osds[i]); + seq_printf(s, "]\n"); + } return 0; } -- cgit v0.10.2 From 38a8d560231b45489d5b12f5c7d0edfba94e1f30 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 13 Mar 2014 16:36:13 +0200 Subject: libceph: dump osdmap and enhance output on decode errors Dump osdmap in hex on both full and incremental decode errors, to make it easier to match the contents with error offset. dout() map epoch and max_osd value on success. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 9d1aaa2..4dd000d 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -690,6 +690,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) u16 version; u32 len, max, i; int err = -EINVAL; + u32 epoch = 0; void *start = *p; struct ceph_pg_pool_info *pi; @@ -714,7 +715,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) ceph_decode_need(p, end, 2*sizeof(u64)+6*sizeof(u32), bad); ceph_decode_copy(p, &map->fsid, sizeof(map->fsid)); - map->epoch = ceph_decode_32(p); + epoch = map->epoch = ceph_decode_32(p); ceph_decode_copy(p, &map->created, sizeof(map->created)); ceph_decode_copy(p, &map->modified, sizeof(map->modified)); @@ -814,14 +815,18 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) goto bad; } - /* ignore the rest of the map */ + /* ignore the rest */ *p = end; - dout("osdmap_decode done %p %p\n", *p, end); + dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); return map; bad: - dout("osdmap_decode fail err %d\n", err); + pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n", + err, epoch, (int)(*p - start), *p, start, end); + print_hex_dump(KERN_DEBUG, "osdmap: ", + DUMP_PREFIX_OFFSET, 16, 1, + start, end - start, true); ceph_osdmap_destroy(map); return ERR_PTR(err); } @@ -845,6 +850,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, int err = -EINVAL; u16 version; + dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); + ceph_decode_16_safe(p, end, version, bad); if (version != 6) { pr_warning("got unknown v %d != 6 of inc osdmap\n", version); @@ -1032,11 +1039,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, /* ignore the rest */ *p = end; + + dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); return map; bad: - pr_err("corrupt inc osdmap epoch %d off %d (%p of %p-%p)\n", - epoch, (int)(*p - start), *p, start, end); + pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n", + err, epoch, (int)(*p - start), *p, start, end); print_hex_dump(KERN_DEBUG, "osdmap: ", DUMP_PREFIX_OFFSET, 16, 1, start, end - start, true); -- cgit v0.10.2 From a2505d63ee0541d9b4685250b033192e68222e97 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 13 Mar 2014 16:36:13 +0200 Subject: libceph: split osdmap allocation and decode steps Split osdmap allocation and initialization into a separate function, ceph_osdmap_decode(). Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index 8c8b3ce..46c3e30 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -156,7 +156,7 @@ static inline int ceph_decode_pgid(void **p, void *end, struct ceph_pg *pgid) return 0; } -extern struct ceph_osdmap *osdmap_decode(void **p, void *end); +extern struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end); extern struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, struct ceph_osdmap *map, struct ceph_messenger *msgr); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 71830d7..6f64eec 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2062,7 +2062,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) int skipped_map = 0; dout("taking full map %u len %d\n", epoch, maplen); - newmap = osdmap_decode(&p, p+maplen); + newmap = ceph_osdmap_decode(&p, p+maplen); if (IS_ERR(newmap)) { err = PTR_ERR(newmap); goto bad; diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 4dd000d..a82df6e 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -684,9 +684,8 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) /* * decode a full map. */ -struct ceph_osdmap *osdmap_decode(void **p, void *end) +static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) { - struct ceph_osdmap *map; u16 version; u32 len, max, i; int err = -EINVAL; @@ -694,14 +693,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) void *start = *p; struct ceph_pg_pool_info *pi; - dout("osdmap_decode %p to %p len %d\n", *p, end, (int)(end - *p)); - - map = kzalloc(sizeof(*map), GFP_NOFS); - if (map == NULL) - return ERR_PTR(-ENOMEM); - - map->pg_temp = RB_ROOT; - mutex_init(&map->crush_scratch_mutex); + dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); ceph_decode_16_safe(p, end, version, bad); if (version > 6) { @@ -751,7 +743,6 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) err = osdmap_set_max_osd(map, max); if (err < 0) goto bad; - dout("osdmap_decode max_osd = %d\n", map->max_osd); /* osds */ err = -EINVAL; @@ -819,7 +810,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) *p = end; dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); - return map; + return 0; bad: pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n", @@ -827,8 +818,31 @@ bad: print_hex_dump(KERN_DEBUG, "osdmap: ", DUMP_PREFIX_OFFSET, 16, 1, start, end - start, true); - ceph_osdmap_destroy(map); - return ERR_PTR(err); + return err; +} + +/* + * Allocate and decode a full map. + */ +struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end) +{ + struct ceph_osdmap *map; + int ret; + + map = kzalloc(sizeof(*map), GFP_NOFS); + if (!map) + return ERR_PTR(-ENOMEM); + + map->pg_temp = RB_ROOT; + mutex_init(&map->crush_scratch_mutex); + + ret = osdmap_decode(p, end, map); + if (ret) { + ceph_osdmap_destroy(map); + return ERR_PTR(ret); + } + + return map; } /* @@ -872,7 +886,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, if (len > 0) { dout("apply_incremental full map len %d, %p to %p\n", len, *p, end); - return osdmap_decode(p, min(*p+len, end)); + return ceph_osdmap_decode(p, min(*p+len, end)); } /* new crush? */ -- cgit v0.10.2 From 597b52f6ca247086371abd67e5083292a500e736 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 13 Mar 2014 16:36:14 +0200 Subject: libceph: fixup error handling in osdmap_decode() The existing error handling scheme requires resetting err to -EINVAL prior to calling any ceph_decode_* macro. This is ugly and fragile, and there already are a few places where we would return 0 on error, due to a missing reset. Fix this by adding a special e_inval label to be used by all ceph_decode_* macros. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index a82df6e..298d076 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -688,36 +688,37 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) { u16 version; u32 len, max, i; - int err = -EINVAL; u32 epoch = 0; void *start = *p; + int err; struct ceph_pg_pool_info *pi; dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); - ceph_decode_16_safe(p, end, version, bad); + ceph_decode_16_safe(p, end, version, e_inval); if (version > 6) { pr_warning("got unknown v %d > 6 of osdmap\n", version); - goto bad; + goto e_inval; } if (version < 6) { pr_warning("got old v %d < 6 of osdmap\n", version); - goto bad; + goto e_inval; } - ceph_decode_need(p, end, 2*sizeof(u64)+6*sizeof(u32), bad); + ceph_decode_need(p, end, 2*sizeof(u64)+6*sizeof(u32), e_inval); ceph_decode_copy(p, &map->fsid, sizeof(map->fsid)); epoch = map->epoch = ceph_decode_32(p); ceph_decode_copy(p, &map->created, sizeof(map->created)); ceph_decode_copy(p, &map->modified, sizeof(map->modified)); - ceph_decode_32_safe(p, end, max, bad); + ceph_decode_32_safe(p, end, max, e_inval); while (max--) { - ceph_decode_need(p, end, 8 + 2, bad); - err = -ENOMEM; + ceph_decode_need(p, end, 8 + 2, e_inval); pi = kzalloc(sizeof(*pi), GFP_NOFS); - if (!pi) + if (!pi) { + err = -ENOMEM; goto bad; + } pi->id = ceph_decode_64(p); err = __decode_pool(p, end, pi); if (err < 0) { @@ -728,27 +729,25 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) } err = __decode_pool_names(p, end, map); - if (err < 0) { - dout("fail to decode pool names"); + if (err) goto bad; - } - ceph_decode_32_safe(p, end, map->pool_max, bad); + ceph_decode_32_safe(p, end, map->pool_max, e_inval); - ceph_decode_32_safe(p, end, map->flags, bad); + ceph_decode_32_safe(p, end, map->flags, e_inval); max = ceph_decode_32(p); /* (re)alloc osd arrays */ err = osdmap_set_max_osd(map, max); - if (err < 0) + if (err) goto bad; /* osds */ - err = -EINVAL; ceph_decode_need(p, end, 3*sizeof(u32) + map->max_osd*(1 + sizeof(*map->osd_weight) + - sizeof(*map->osd_addr)), bad); + sizeof(*map->osd_addr)), e_inval); + *p += 4; /* skip length field (should match max) */ ceph_decode_copy(p, map->osd_state, map->max_osd); @@ -762,7 +761,7 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) ceph_decode_addr(&map->osd_addr[i]); /* pg_temp */ - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); for (i = 0; i < len; i++) { int n, j; struct ceph_pg pgid; @@ -771,16 +770,16 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) err = ceph_decode_pgid(p, end, &pgid); if (err) goto bad; - ceph_decode_need(p, end, sizeof(u32), bad); + ceph_decode_need(p, end, sizeof(u32), e_inval); n = ceph_decode_32(p); - err = -EINVAL; if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) - goto bad; - ceph_decode_need(p, end, n * sizeof(u32), bad); - err = -ENOMEM; + goto e_inval; + ceph_decode_need(p, end, n * sizeof(u32), e_inval); pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS); - if (!pg) + if (!pg) { + err = -ENOMEM; goto bad; + } pg->pgid = pgid; pg->len = n; for (j = 0; j < n; j++) @@ -794,10 +793,10 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) } /* crush */ - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); dout("osdmap_decode crush len %d from off 0x%x\n", len, (int)(*p - start)); - ceph_decode_need(p, end, len, bad); + ceph_decode_need(p, end, len, e_inval); map->crush = crush_decode(*p, end); *p += len; if (IS_ERR(map->crush)) { @@ -812,6 +811,8 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); return 0; +e_inval: + err = -EINVAL; bad: pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n", err, epoch, (int)(*p - start), *p, start, end); -- cgit v0.10.2 From 3977058c468b872c6bc5e5273bf911d791848643 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 13 Mar 2014 16:36:14 +0200 Subject: libceph: safely decode max_osd value in osdmap_decode() max_osd value is not covered by any ceph_decode_need(). Use a safe version of ceph_decode_* macro to decode it. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 298d076..ec06010 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -687,9 +687,10 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) { u16 version; - u32 len, max, i; u32 epoch = 0; void *start = *p; + u32 max; + u32 len, i; int err; struct ceph_pg_pool_info *pi; @@ -736,7 +737,8 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) ceph_decode_32_safe(p, end, map->flags, e_inval); - max = ceph_decode_32(p); + /* max_osd */ + ceph_decode_32_safe(p, end, max, e_inval); /* (re)alloc osd arrays */ err = osdmap_set_max_osd(map, max); -- cgit v0.10.2 From 2d88b2e0819e0401ebb195e9fa20fab4be1965c8 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 13 Mar 2014 16:36:14 +0200 Subject: libceph: check length of osdmap osd arrays Check length of osd_state, osd_weight and osd_addr arrays. They should all have exactly max_osd elements after the call to osdmap_set_max_osd(). Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index ec06010..c39ac62 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -745,19 +745,25 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) if (err) goto bad; - /* osds */ + /* osd_state, osd_weight, osd_addrs->client_addr */ ceph_decode_need(p, end, 3*sizeof(u32) + map->max_osd*(1 + sizeof(*map->osd_weight) + sizeof(*map->osd_addr)), e_inval); - *p += 4; /* skip length field (should match max) */ + if (ceph_decode_32(p) != map->max_osd) + goto e_inval; + ceph_decode_copy(p, map->osd_state, map->max_osd); - *p += 4; /* skip length field (should match max) */ + if (ceph_decode_32(p) != map->max_osd) + goto e_inval; + for (i = 0; i < map->max_osd; i++) map->osd_weight[i] = ceph_decode_32(p); - *p += 4; /* skip length field (should match max) */ + if (ceph_decode_32(p) != map->max_osd) + goto e_inval; + ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr)); for (i = 0; i < map->max_osd; i++) ceph_decode_addr(&map->osd_addr[i]); -- cgit v0.10.2 From 9902e682c7f3df9ed5f60bc6f9c7efa6fd6b2d1d Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 13 Mar 2014 16:36:15 +0200 Subject: libceph: fix crush_decode() call site in osdmap_decode() The size of the memory area feeded to crush_decode() should be limited not only by osdmap end, but also by the crush map length. Also, drop unnecessary dout() (dout() in crush_decode() conveys the same info) and step past crush map only if it is decoded successfully. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index c39ac62..d4a6b0d 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -802,16 +802,13 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) /* crush */ ceph_decode_32_safe(p, end, len, e_inval); - dout("osdmap_decode crush len %d from off 0x%x\n", len, - (int)(*p - start)); - ceph_decode_need(p, end, len, e_inval); - map->crush = crush_decode(*p, end); - *p += len; + map->crush = crush_decode(*p, min(*p + len, end)); if (IS_ERR(map->crush)) { err = PTR_ERR(map->crush); map->crush = NULL; goto bad; } + *p += len; /* ignore the rest */ *p = end; -- cgit v0.10.2 From 86f1742b94dd0b4a2eb9255205d1756ddea182f8 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 13 Mar 2014 16:36:15 +0200 Subject: libceph: fixup error handling in osdmap_apply_incremental() The existing error handling scheme requires resetting err to -EINVAL prior to calling any ceph_decode_* macro. This is ugly and fragile, and there already are a few places where we would return 0 on error, due to a missing reset. Follow osdmap_decode() and fix this by adding a special e_inval label to be used by all ceph_decode_* macros. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index d4a6b0d..b844a92 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -867,19 +867,19 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, __s64 new_pool_max; __s32 new_flags, max; void *start = *p; - int err = -EINVAL; + int err; u16 version; dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); - ceph_decode_16_safe(p, end, version, bad); + ceph_decode_16_safe(p, end, version, e_inval); if (version != 6) { pr_warning("got unknown v %d != 6 of inc osdmap\n", version); - goto bad; + goto e_inval; } ceph_decode_need(p, end, sizeof(fsid)+sizeof(modified)+2*sizeof(u32), - bad); + e_inval); ceph_decode_copy(p, &fsid, sizeof(fsid)); epoch = ceph_decode_32(p); BUG_ON(epoch != map->epoch+1); @@ -888,7 +888,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, new_flags = ceph_decode_32(p); /* full map? */ - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); if (len > 0) { dout("apply_incremental full map len %d, %p to %p\n", len, *p, end); @@ -896,13 +896,14 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, } /* new crush? */ - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); if (len > 0) { - dout("apply_incremental new crush map len %d, %p to %p\n", - len, *p, end); newcrush = crush_decode(*p, min(*p+len, end)); - if (IS_ERR(newcrush)) - return ERR_CAST(newcrush); + if (IS_ERR(newcrush)) { + err = PTR_ERR(newcrush); + newcrush = NULL; + goto bad; + } *p += len; } @@ -912,13 +913,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, if (new_pool_max >= 0) map->pool_max = new_pool_max; - ceph_decode_need(p, end, 5*sizeof(u32), bad); + ceph_decode_need(p, end, 5*sizeof(u32), e_inval); /* new max? */ max = ceph_decode_32(p); if (max >= 0) { err = osdmap_set_max_osd(map, max); - if (err < 0) + if (err) goto bad; } @@ -932,11 +933,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, } /* new_pool */ - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); while (len--) { struct ceph_pg_pool_info *pi; - ceph_decode_64_safe(p, end, pool, bad); + ceph_decode_64_safe(p, end, pool, e_inval); pi = __lookup_pg_pool(&map->pg_pools, pool); if (!pi) { pi = kzalloc(sizeof(*pi), GFP_NOFS); @@ -953,29 +954,28 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, } if (version >= 5) { err = __decode_pool_names(p, end, map); - if (err < 0) + if (err) goto bad; } /* old_pool */ - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); while (len--) { struct ceph_pg_pool_info *pi; - ceph_decode_64_safe(p, end, pool, bad); + ceph_decode_64_safe(p, end, pool, e_inval); pi = __lookup_pg_pool(&map->pg_pools, pool); if (pi) __remove_pg_pool(&map->pg_pools, pi); } /* new_up */ - err = -EINVAL; - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); while (len--) { u32 osd; struct ceph_entity_addr addr; - ceph_decode_32_safe(p, end, osd, bad); - ceph_decode_copy_safe(p, end, &addr, sizeof(addr), bad); + ceph_decode_32_safe(p, end, osd, e_inval); + ceph_decode_copy_safe(p, end, &addr, sizeof(addr), e_inval); ceph_decode_addr(&addr); pr_info("osd%d up\n", osd); BUG_ON(osd >= map->max_osd); @@ -984,11 +984,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, } /* new_state */ - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); while (len--) { u32 osd; u8 xorstate; - ceph_decode_32_safe(p, end, osd, bad); + ceph_decode_32_safe(p, end, osd, e_inval); xorstate = **(u8 **)p; (*p)++; /* clean flag */ if (xorstate == 0) @@ -1000,10 +1000,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, } /* new_weight */ - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); while (len--) { u32 osd, off; - ceph_decode_need(p, end, sizeof(u32)*2, bad); + ceph_decode_need(p, end, sizeof(u32)*2, e_inval); osd = ceph_decode_32(p); off = ceph_decode_32(p); pr_info("osd%d weight 0x%x %s\n", osd, off, @@ -1014,7 +1014,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, } /* new_pg_temp */ - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); while (len--) { struct ceph_pg_mapping *pg; int j; @@ -1024,22 +1024,22 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, err = ceph_decode_pgid(p, end, &pgid); if (err) goto bad; - ceph_decode_need(p, end, sizeof(u32), bad); + ceph_decode_need(p, end, sizeof(u32), e_inval); pglen = ceph_decode_32(p); if (pglen) { - ceph_decode_need(p, end, pglen*sizeof(u32), bad); + ceph_decode_need(p, end, pglen*sizeof(u32), e_inval); /* removing existing (if any) */ (void) __remove_pg_mapping(&map->pg_temp, pgid); /* insert */ - err = -EINVAL; if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) - goto bad; - err = -ENOMEM; + goto e_inval; pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); - if (!pg) + if (!pg) { + err = -ENOMEM; goto bad; + } pg->pgid = pgid; pg->len = pglen; for (j = 0; j < pglen; j++) @@ -1063,6 +1063,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); return map; +e_inval: + err = -EINVAL; bad: pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n", err, epoch, (int)(*p - start), *p, start, end); -- cgit v0.10.2 From 9464d00862ea6a5c0edc7118c86bdfa71a95190e Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 13 Mar 2014 16:36:16 +0200 Subject: libceph: nuke bogus encoding version check in osdmap_apply_incremental() Only version 6 of osdmap encoding is supported, anything other than version 6 results in an error and halts the decoding process. Checking if version is >= 5 is therefore bogus. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index b844a92..07fa369 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -952,11 +952,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, if (err < 0) goto bad; } - if (version >= 5) { - err = __decode_pool_names(p, end, map); - if (err) - goto bad; - } + + err = __decode_pool_names(p, end, map); + if (err) + goto bad; /* old_pool */ ceph_decode_32_safe(p, end, len, e_inval); -- cgit v0.10.2 From 53bbaba9d811f75acc20038bfd707a4cfdc4571f Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 13 Mar 2014 16:36:16 +0200 Subject: libceph: fix and clarify ceph_decode_need() sizes Sum up sizeof(...) results instead of (incorrectly) hard-coding the number of bytes, expressed in ints and longs. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 07fa369..1164910 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -706,7 +706,9 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) goto e_inval; } - ceph_decode_need(p, end, 2*sizeof(u64)+6*sizeof(u32), e_inval); + /* fsid, epoch, created, modified */ + ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) + + sizeof(map->created) + sizeof(map->modified), e_inval); ceph_decode_copy(p, &map->fsid, sizeof(map->fsid)); epoch = map->epoch = ceph_decode_32(p); ceph_decode_copy(p, &map->created, sizeof(map->created)); @@ -878,8 +880,9 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, goto e_inval; } - ceph_decode_need(p, end, sizeof(fsid)+sizeof(modified)+2*sizeof(u32), - e_inval); + /* fsid, epoch, modified, new_pool_max, new_flags */ + ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) + + sizeof(u64) + sizeof(u32), e_inval); ceph_decode_copy(p, &fsid, sizeof(fsid)); epoch = ceph_decode_32(p); BUG_ON(epoch != map->epoch+1); @@ -913,10 +916,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, if (new_pool_max >= 0) map->pool_max = new_pool_max; - ceph_decode_need(p, end, 5*sizeof(u32), e_inval); - /* new max? */ - max = ceph_decode_32(p); + ceph_decode_32_safe(p, end, max, e_inval); if (max >= 0) { err = osdmap_set_max_osd(map, max); if (err) -- cgit v0.10.2 From 0f70c7eedbbbd245398fc74b4b020f3b800f071c Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 21 Mar 2014 19:05:27 +0200 Subject: libceph: rename __decode_pool{,_names}() to decode_pool{,_names}() To be in line with all the other osdmap decode helpers. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 1164910..39938d7 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -506,7 +506,7 @@ static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi) kfree(pi); } -static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) +static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) { u8 ev, cv; unsigned len, num; @@ -587,7 +587,7 @@ bad: return -EINVAL; } -static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map) +static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map) { struct ceph_pg_pool_info *pi; u32 num, len; @@ -723,7 +723,7 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) goto bad; } pi->id = ceph_decode_64(p); - err = __decode_pool(p, end, pi); + err = decode_pool(p, end, pi); if (err < 0) { kfree(pi); goto bad; @@ -731,7 +731,8 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) __insert_pg_pool(&map->pg_pools, pi); } - err = __decode_pool_names(p, end, map); + /* pool_name */ + err = decode_pool_names(p, end, map); if (err) goto bad; @@ -949,12 +950,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, pi->id = pool; __insert_pg_pool(&map->pg_pools, pi); } - err = __decode_pool(p, end, pi); + err = decode_pool(p, end, pi); if (err < 0) goto bad; } - err = __decode_pool_names(p, end, map); + /* new_pool_names */ + err = decode_pool_names(p, end, map); if (err) goto bad; -- cgit v0.10.2 From 433fbdd31db267564bab20420bd8f161a7c69e4d Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 21 Mar 2014 19:05:27 +0200 Subject: libceph: introduce decode{,_new}_pools() and switch to them Consolidate pools (full map, map) and new_pools (inc map, same) decoding logic into a common helper and switch to it. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 39938d7..0ba3062 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -681,6 +681,55 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) return 0; } +static int __decode_pools(void **p, void *end, struct ceph_osdmap *map, + bool incremental) +{ + u32 n; + + ceph_decode_32_safe(p, end, n, e_inval); + while (n--) { + struct ceph_pg_pool_info *pi; + u64 pool; + int ret; + + ceph_decode_64_safe(p, end, pool, e_inval); + + pi = __lookup_pg_pool(&map->pg_pools, pool); + if (!incremental || !pi) { + pi = kzalloc(sizeof(*pi), GFP_NOFS); + if (!pi) + return -ENOMEM; + + pi->id = pool; + + ret = __insert_pg_pool(&map->pg_pools, pi); + if (ret) { + kfree(pi); + return ret; + } + } + + ret = decode_pool(p, end, pi); + if (ret) + return ret; + } + + return 0; + +e_inval: + return -EINVAL; +} + +static int decode_pools(void **p, void *end, struct ceph_osdmap *map) +{ + return __decode_pools(p, end, map, false); +} + +static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map) +{ + return __decode_pools(p, end, map, true); +} + /* * decode a full map. */ @@ -692,7 +741,6 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) u32 max; u32 len, i; int err; - struct ceph_pg_pool_info *pi; dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); @@ -714,22 +762,10 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) ceph_decode_copy(p, &map->created, sizeof(map->created)); ceph_decode_copy(p, &map->modified, sizeof(map->modified)); - ceph_decode_32_safe(p, end, max, e_inval); - while (max--) { - ceph_decode_need(p, end, 8 + 2, e_inval); - pi = kzalloc(sizeof(*pi), GFP_NOFS); - if (!pi) { - err = -ENOMEM; - goto bad; - } - pi->id = ceph_decode_64(p); - err = decode_pool(p, end, pi); - if (err < 0) { - kfree(pi); - goto bad; - } - __insert_pg_pool(&map->pg_pools, pi); - } + /* pools */ + err = decode_pools(p, end, map); + if (err) + goto bad; /* pool_name */ err = decode_pool_names(p, end, map); @@ -934,26 +970,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, newcrush = NULL; } - /* new_pool */ - ceph_decode_32_safe(p, end, len, e_inval); - while (len--) { - struct ceph_pg_pool_info *pi; - - ceph_decode_64_safe(p, end, pool, e_inval); - pi = __lookup_pg_pool(&map->pg_pools, pool); - if (!pi) { - pi = kzalloc(sizeof(*pi), GFP_NOFS); - if (!pi) { - err = -ENOMEM; - goto bad; - } - pi->id = pool; - __insert_pg_pool(&map->pg_pools, pi); - } - err = decode_pool(p, end, pi); - if (err < 0) - goto bad; - } + /* new_pools */ + err = decode_new_pools(p, end, map); + if (err) + goto bad; /* new_pool_names */ err = decode_pool_names(p, end, map); -- cgit v0.10.2 From 4d60351f9089ef0f39d73c0b6a103e61fc0ed187 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 21 Mar 2014 19:05:28 +0200 Subject: libceph: switch osdmap_set_max_osd() to krealloc() Use krealloc() instead of rolling our own. (krealloc() with a NULL first argument acts as a kmalloc()). Properly initalize the new array elements. This is needed to make future additions to osdmap easier. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 0ba3062..a350286 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -646,38 +646,40 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map) } /* - * adjust max osd value. reallocate arrays. + * Adjust max_osd value, (re)allocate arrays. + * + * The new elements are properly initialized. */ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) { u8 *state; - struct ceph_entity_addr *addr; u32 *weight; + struct ceph_entity_addr *addr; + int i; - state = kcalloc(max, sizeof(*state), GFP_NOFS); - addr = kcalloc(max, sizeof(*addr), GFP_NOFS); - weight = kcalloc(max, sizeof(*weight), GFP_NOFS); - if (state == NULL || addr == NULL || weight == NULL) { + state = krealloc(map->osd_state, max*sizeof(*state), GFP_NOFS); + weight = krealloc(map->osd_weight, max*sizeof(*weight), GFP_NOFS); + addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS); + if (!state || !weight || !addr) { kfree(state); - kfree(addr); kfree(weight); + kfree(addr); + return -ENOMEM; } - /* copy old? */ - if (map->osd_state) { - memcpy(state, map->osd_state, map->max_osd*sizeof(*state)); - memcpy(addr, map->osd_addr, map->max_osd*sizeof(*addr)); - memcpy(weight, map->osd_weight, map->max_osd*sizeof(*weight)); - kfree(map->osd_state); - kfree(map->osd_addr); - kfree(map->osd_weight); + for (i = map->max_osd; i < max; i++) { + state[i] = 0; + weight[i] = CEPH_OSD_OUT; + memset(addr + i, 0, sizeof(*addr)); } map->osd_state = state; map->osd_weight = weight; map->osd_addr = addr; + map->max_osd = max; + return 0; } -- cgit v0.10.2 From 10db634e2083a202a26123d6d4c9ede98d6a1199 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 21 Mar 2014 19:05:28 +0200 Subject: libceph: introduce decode{,_new}_pg_temp() and switch to them Consolidate pg_temp (full map, map>) and new_pg_temp (inc map, same) decoding logic into a common helper and switch to it. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index a350286..6497322 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -732,6 +732,67 @@ static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map) return __decode_pools(p, end, map, true); } +static int __decode_pg_temp(void **p, void *end, struct ceph_osdmap *map, + bool incremental) +{ + u32 n; + + ceph_decode_32_safe(p, end, n, e_inval); + while (n--) { + struct ceph_pg pgid; + u32 len, i; + int ret; + + ret = ceph_decode_pgid(p, end, &pgid); + if (ret) + return ret; + + ceph_decode_32_safe(p, end, len, e_inval); + + ret = __remove_pg_mapping(&map->pg_temp, pgid); + BUG_ON(!incremental && ret != -ENOENT); + + if (!incremental || len > 0) { + struct ceph_pg_mapping *pg; + + ceph_decode_need(p, end, len*sizeof(u32), e_inval); + + if (len > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) + return -EINVAL; + + pg = kzalloc(sizeof(*pg) + len*sizeof(u32), GFP_NOFS); + if (!pg) + return -ENOMEM; + + pg->pgid = pgid; + pg->len = len; + for (i = 0; i < len; i++) + pg->osds[i] = ceph_decode_32(p); + + ret = __insert_pg_mapping(pg, &map->pg_temp); + if (ret) { + kfree(pg); + return ret; + } + } + } + + return 0; + +e_inval: + return -EINVAL; +} + +static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map) +{ + return __decode_pg_temp(p, end, map, false); +} + +static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map) +{ + return __decode_pg_temp(p, end, map, true); +} + /* * decode a full map. */ @@ -810,36 +871,9 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) ceph_decode_addr(&map->osd_addr[i]); /* pg_temp */ - ceph_decode_32_safe(p, end, len, e_inval); - for (i = 0; i < len; i++) { - int n, j; - struct ceph_pg pgid; - struct ceph_pg_mapping *pg; - - err = ceph_decode_pgid(p, end, &pgid); - if (err) - goto bad; - ceph_decode_need(p, end, sizeof(u32), e_inval); - n = ceph_decode_32(p); - if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) - goto e_inval; - ceph_decode_need(p, end, n * sizeof(u32), e_inval); - pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS); - if (!pg) { - err = -ENOMEM; - goto bad; - } - pg->pgid = pgid; - pg->len = n; - for (j = 0; j < n; j++) - pg->osds[j] = ceph_decode_32(p); - - err = __insert_pg_mapping(pg, &map->pg_temp); - if (err) - goto bad; - dout(" added pg_temp %lld.%x len %d\n", pgid.pool, pgid.seed, - len); - } + err = decode_pg_temp(p, end, map); + if (err) + goto bad; /* crush */ ceph_decode_32_safe(p, end, len, e_inval); @@ -1038,48 +1072,9 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, } /* new_pg_temp */ - ceph_decode_32_safe(p, end, len, e_inval); - while (len--) { - struct ceph_pg_mapping *pg; - int j; - struct ceph_pg pgid; - u32 pglen; - - err = ceph_decode_pgid(p, end, &pgid); - if (err) - goto bad; - ceph_decode_need(p, end, sizeof(u32), e_inval); - pglen = ceph_decode_32(p); - if (pglen) { - ceph_decode_need(p, end, pglen*sizeof(u32), e_inval); - - /* removing existing (if any) */ - (void) __remove_pg_mapping(&map->pg_temp, pgid); - - /* insert */ - if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) - goto e_inval; - pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); - if (!pg) { - err = -ENOMEM; - goto bad; - } - pg->pgid = pgid; - pg->len = pglen; - for (j = 0; j < pglen; j++) - pg->osds[j] = ceph_decode_32(p); - err = __insert_pg_mapping(pg, &map->pg_temp); - if (err) { - kfree(pg); - goto bad; - } - dout(" added pg_temp %lld.%x len %d\n", pgid.pool, - pgid.seed, pglen); - } else { - /* remove */ - __remove_pg_mapping(&map->pg_temp, pgid); - } - } + err = decode_new_pg_temp(p, end, map); + if (err) + goto bad; /* ignore the rest */ *p = end; -- cgit v0.10.2 From ec7af97258396161e6effba7e788c3fc3cb55263 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 21 Mar 2014 19:05:29 +0200 Subject: libceph: introduce get_osdmap_client_data_v() Full and incremental osdmaps are structured identically and have identical headers. Add a helper to decode both "old" (16-bit version, v6) and "new" (8-bit struct_v+struct_compat+struct_len, v7) osdmap enconding headers and switch to it. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 6497322..be2a65f 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -683,6 +683,63 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) return 0; } +#define OSDMAP_WRAPPER_COMPAT_VER 7 +#define OSDMAP_CLIENT_DATA_COMPAT_VER 1 + +/* + * Return 0 or error. On success, *v is set to 0 for old (v6) osdmaps, + * to struct_v of the client_data section for new (v7 and above) + * osdmaps. + */ +static int get_osdmap_client_data_v(void **p, void *end, + const char *prefix, u8 *v) +{ + u8 struct_v; + + ceph_decode_8_safe(p, end, struct_v, e_inval); + if (struct_v >= 7) { + u8 struct_compat; + + ceph_decode_8_safe(p, end, struct_compat, e_inval); + if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) { + pr_warning("got v %d cv %d > %d of %s ceph_osdmap\n", + struct_v, struct_compat, + OSDMAP_WRAPPER_COMPAT_VER, prefix); + return -EINVAL; + } + *p += 4; /* ignore wrapper struct_len */ + + ceph_decode_8_safe(p, end, struct_v, e_inval); + ceph_decode_8_safe(p, end, struct_compat, e_inval); + if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) { + pr_warning("got v %d cv %d > %d of %s ceph_osdmap client data\n", + struct_v, struct_compat, + OSDMAP_CLIENT_DATA_COMPAT_VER, prefix); + return -EINVAL; + } + *p += 4; /* ignore client data struct_len */ + } else { + u16 version; + + *p -= 1; + ceph_decode_16_safe(p, end, version, e_inval); + if (version < 6) { + pr_warning("got v %d < 6 of %s ceph_osdmap\n", version, + prefix); + return -EINVAL; + } + + /* old osdmap enconding */ + struct_v = 0; + } + + *v = struct_v; + return 0; + +e_inval: + return -EINVAL; +} + static int __decode_pools(void **p, void *end, struct ceph_osdmap *map, bool incremental) { @@ -798,7 +855,7 @@ static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map) */ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) { - u16 version; + u8 struct_v; u32 epoch = 0; void *start = *p; u32 max; @@ -807,15 +864,9 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); - ceph_decode_16_safe(p, end, version, e_inval); - if (version > 6) { - pr_warning("got unknown v %d > 6 of osdmap\n", version); - goto e_inval; - } - if (version < 6) { - pr_warning("got old v %d < 6 of osdmap\n", version); - goto e_inval; - } + err = get_osdmap_client_data_v(p, end, "full", &struct_v); + if (err) + goto bad; /* fsid, epoch, created, modified */ ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) + @@ -943,15 +994,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, __s32 new_flags, max; void *start = *p; int err; - u16 version; + u8 struct_v; dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); - ceph_decode_16_safe(p, end, version, e_inval); - if (version != 6) { - pr_warning("got unknown v %d != 6 of inc osdmap\n", version); - goto e_inval; - } + err = get_osdmap_client_data_v(p, end, "inc", &struct_v); + if (err) + goto bad; /* fsid, epoch, modified, new_pool_max, new_flags */ ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) + -- cgit v0.10.2 From 35a935d75d51abe58d3427a8b4ae3745a5a14e1c Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 21 Mar 2014 19:05:29 +0200 Subject: libceph: generalize ceph_pg_mapping In preparation for adding support for primary_temp mappings, generalize struct ceph_pg_mapping so it can hold mappings other than pg_temp. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index 46c3e30..4837e58 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -60,8 +60,13 @@ struct ceph_object_id { struct ceph_pg_mapping { struct rb_node node; struct ceph_pg pgid; - int len; - int osds[]; + + union { + struct { + int len; + int osds[]; + } pg_temp; + }; }; struct ceph_osdmap { diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index c45d235..5865f2c 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -88,9 +88,9 @@ static int osdmap_show(struct seq_file *s, void *p) seq_printf(s, "pg_temp %llu.%x [", pg->pgid.pool, pg->pgid.seed); - for (i = 0; i < pg->len; i++) + for (i = 0; i < pg->pg_temp.len; i++) seq_printf(s, "%s%d", (i == 0 ? "" : ","), - pg->osds[i]); + pg->pg_temp.osds[i]); seq_printf(s, "]\n"); } diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index be2a65f..c67a309 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -822,9 +822,9 @@ static int __decode_pg_temp(void **p, void *end, struct ceph_osdmap *map, return -ENOMEM; pg->pgid = pgid; - pg->len = len; + pg->pg_temp.len = len; for (i = 0; i < len; i++) - pg->osds[i] = ceph_decode_32(p); + pg->pg_temp.osds[i] = ceph_decode_32(p); ret = __insert_pg_mapping(pg, &map->pg_temp); if (ret) { @@ -1281,8 +1281,8 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, pool->pg_num_mask); pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); if (pg) { - *num = pg->len; - return pg->osds; + *num = pg->pg_temp.len; + return pg->pg_temp.osds; } /* crush */ -- cgit v0.10.2 From 9686f94c8cfc06e8afb7b2233ab8f1f6ac01957f Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 21 Mar 2014 19:05:29 +0200 Subject: libceph: primary_temp infrastructure Add primary_temp mappings infrastructure. struct ceph_pg_mapping is overloaded, primary_temp mappings are stored in an rb-tree, rooted at ceph_osdmap, in a manner similar to pg_temp mappings. Dump primary_temp mappings to /sys/kernel/debug/ceph//osdmap, one 'primary_temp ' per line, e.g: primary_temp 2.6 4 Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index 4837e58..db4fb63 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -66,6 +66,9 @@ struct ceph_pg_mapping { int len; int osds[]; } pg_temp; + struct { + int osd; + } primary_temp; }; }; @@ -83,6 +86,8 @@ struct ceph_osdmap { struct ceph_entity_addr *osd_addr; struct rb_root pg_temp; + struct rb_root primary_temp; + struct rb_root pg_pools; u32 pool_max; diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 5865f2c..612bf55 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -93,6 +93,13 @@ static int osdmap_show(struct seq_file *s, void *p) pg->pg_temp.osds[i]); seq_printf(s, "]\n"); } + for (n = rb_first(&map->primary_temp); n; n = rb_next(n)) { + struct ceph_pg_mapping *pg = + rb_entry(n, struct ceph_pg_mapping, node); + + seq_printf(s, "primary_temp %llu.%x %d\n", pg->pgid.pool, + pg->pgid.seed, pg->primary_temp.osd); + } return 0; } diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index c67a309..c0fc517 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -343,7 +343,7 @@ bad: /* * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid - * to a set of osds) + * to a set of osds) and primary_temp (explicit primary setting) */ static int pgid_cmp(struct ceph_pg l, struct ceph_pg r) { @@ -633,6 +633,13 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map) rb_erase(&pg->node, &map->pg_temp); kfree(pg); } + while (!RB_EMPTY_ROOT(&map->primary_temp)) { + struct ceph_pg_mapping *pg = + rb_entry(rb_first(&map->primary_temp), + struct ceph_pg_mapping, node); + rb_erase(&pg->node, &map->primary_temp); + kfree(pg); + } while (!RB_EMPTY_ROOT(&map->pg_pools)) { struct ceph_pg_pool_info *pi = rb_entry(rb_first(&map->pg_pools), @@ -966,6 +973,7 @@ struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end) return ERR_PTR(-ENOMEM); map->pg_temp = RB_ROOT; + map->primary_temp = RB_ROOT; mutex_init(&map->crush_scratch_mutex); ret = osdmap_decode(p, end, map); -- cgit v0.10.2 From d286de796aab9e306e674c6d23c4f3c1f55e394c Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 21 Mar 2014 19:05:30 +0200 Subject: libceph: primary_temp decode bits Add a common helper to decode both primary_temp (full map, map) and new_primary_temp (inc map, same) and switch to it. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index c0fc517..c2d793d 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -857,6 +857,61 @@ static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map) return __decode_pg_temp(p, end, map, true); } +static int __decode_primary_temp(void **p, void *end, struct ceph_osdmap *map, + bool incremental) +{ + u32 n; + + ceph_decode_32_safe(p, end, n, e_inval); + while (n--) { + struct ceph_pg pgid; + u32 osd; + int ret; + + ret = ceph_decode_pgid(p, end, &pgid); + if (ret) + return ret; + + ceph_decode_32_safe(p, end, osd, e_inval); + + ret = __remove_pg_mapping(&map->primary_temp, pgid); + BUG_ON(!incremental && ret != -ENOENT); + + if (!incremental || osd != (u32)-1) { + struct ceph_pg_mapping *pg; + + pg = kzalloc(sizeof(*pg), GFP_NOFS); + if (!pg) + return -ENOMEM; + + pg->pgid = pgid; + pg->primary_temp.osd = osd; + + ret = __insert_pg_mapping(pg, &map->primary_temp); + if (ret) { + kfree(pg); + return ret; + } + } + } + + return 0; + +e_inval: + return -EINVAL; +} + +static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map) +{ + return __decode_primary_temp(p, end, map, false); +} + +static int decode_new_primary_temp(void **p, void *end, + struct ceph_osdmap *map) +{ + return __decode_primary_temp(p, end, map, true); +} + /* * decode a full map. */ @@ -933,6 +988,13 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) if (err) goto bad; + /* primary_temp */ + if (struct_v >= 1) { + err = decode_primary_temp(p, end, map); + if (err) + goto bad; + } + /* crush */ ceph_decode_32_safe(p, end, len, e_inval); map->crush = crush_decode(*p, min(*p + len, end)); @@ -1133,6 +1195,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, if (err) goto bad; + /* new_primary_temp */ + if (struct_v >= 1) { + err = decode_new_primary_temp(p, end, map); + if (err) + goto bad; + } + /* ignore the rest */ *p = end; -- cgit v0.10.2 From 2cfa34f2d67a36e292cbe6e4c1e60d212b7ba4d1 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 21 Mar 2014 19:05:30 +0200 Subject: libceph: primary_affinity infrastructure Add primary_affinity infrastructure. primary_affinity values are stored in an max_osd-sized array, hanging off ceph_osdmap, similar to a osd_weight array. Introduce {get,set}_primary_affinity() helpers, primarily to return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY when no affinity has been set and to abstract out osd_primary_affinity array allocation and initialization. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index db4fb63..6e030cb 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -88,6 +88,8 @@ struct ceph_osdmap { struct rb_root pg_temp; struct rb_root primary_temp; + u32 *osd_primary_affinity; + struct rb_root pg_pools; u32 pool_max; @@ -134,6 +136,7 @@ static inline bool ceph_osdmap_flag(struct ceph_osdmap *map, int flag) } extern char *ceph_osdmap_state_str(char *str, int len, int state); +extern u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd); static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map, int osd) diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h index 2caabef..bb6f40c 100644 --- a/include/linux/ceph/rados.h +++ b/include/linux/ceph/rados.h @@ -133,6 +133,10 @@ extern const char *ceph_osd_state_name(int s); #define CEPH_OSD_IN 0x10000 #define CEPH_OSD_OUT 0 +/* osd primary-affinity. fixed point value: 0x10000 == baseline */ +#define CEPH_OSD_MAX_PRIMARY_AFFINITY 0x10000 +#define CEPH_OSD_DEFAULT_PRIMARY_AFFINITY 0x10000 + /* * osd map flag bits diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 612bf55..34453a2 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -77,10 +77,11 @@ static int osdmap_show(struct seq_file *s, void *p) int state = map->osd_state[i]; char sb[64]; - seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\n", + seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\n", i, ceph_pr_addr(&addr->in_addr), ((map->osd_weight[i]*100) >> 16), - ceph_osdmap_state_str(sb, sizeof(sb), state)); + ceph_osdmap_state_str(sb, sizeof(sb), state), + ((ceph_get_primary_affinity(map, i)*100) >> 16)); } for (n = rb_first(&map->pg_temp); n; n = rb_next(n)) { struct ceph_pg_mapping *pg = diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index c2d793d..0ac12938 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -649,6 +649,7 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map) kfree(map->osd_state); kfree(map->osd_weight); kfree(map->osd_addr); + kfree(map->osd_primary_affinity); kfree(map); } @@ -685,6 +686,20 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) map->osd_weight = weight; map->osd_addr = addr; + if (map->osd_primary_affinity) { + u32 *affinity; + + affinity = krealloc(map->osd_primary_affinity, + max*sizeof(*affinity), GFP_NOFS); + if (!affinity) + return -ENOMEM; + + for (i = map->max_osd; i < max; i++) + affinity[i] = CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; + + map->osd_primary_affinity = affinity; + } + map->max_osd = max; return 0; @@ -912,6 +927,38 @@ static int decode_new_primary_temp(void **p, void *end, return __decode_primary_temp(p, end, map, true); } +u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd) +{ + BUG_ON(osd >= map->max_osd); + + if (!map->osd_primary_affinity) + return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; + + return map->osd_primary_affinity[osd]; +} + +static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff) +{ + BUG_ON(osd >= map->max_osd); + + if (!map->osd_primary_affinity) { + int i; + + map->osd_primary_affinity = kmalloc(map->max_osd*sizeof(u32), + GFP_NOFS); + if (!map->osd_primary_affinity) + return -ENOMEM; + + for (i = 0; i < map->max_osd; i++) + map->osd_primary_affinity[i] = + CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; + } + + map->osd_primary_affinity[osd] = aff; + + return 0; +} + /* * decode a full map. */ -- cgit v0.10.2 From 63a6993f521b2629872e89c02a336fb3f18b092b Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 21 Mar 2014 19:05:30 +0200 Subject: libceph: primary_affinity decode bits Add two helpers to decode primary_affinity (full map, vector) and new_primary_affinity (inc map, map) and switch to them. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 0ac12938..9568e62 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -959,6 +959,60 @@ static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff) return 0; } +static int decode_primary_affinity(void **p, void *end, + struct ceph_osdmap *map) +{ + u32 len, i; + + ceph_decode_32_safe(p, end, len, e_inval); + if (len == 0) { + kfree(map->osd_primary_affinity); + map->osd_primary_affinity = NULL; + return 0; + } + if (len != map->max_osd) + goto e_inval; + + ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval); + + for (i = 0; i < map->max_osd; i++) { + int ret; + + ret = set_primary_affinity(map, i, ceph_decode_32(p)); + if (ret) + return ret; + } + + return 0; + +e_inval: + return -EINVAL; +} + +static int decode_new_primary_affinity(void **p, void *end, + struct ceph_osdmap *map) +{ + u32 n; + + ceph_decode_32_safe(p, end, n, e_inval); + while (n--) { + u32 osd, aff; + int ret; + + ceph_decode_32_safe(p, end, osd, e_inval); + ceph_decode_32_safe(p, end, aff, e_inval); + + ret = set_primary_affinity(map, osd, aff); + if (ret) + return ret; + } + + return 0; + +e_inval: + return -EINVAL; +} + /* * decode a full map. */ @@ -1042,6 +1096,17 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) goto bad; } + /* primary_affinity */ + if (struct_v >= 2) { + err = decode_primary_affinity(p, end, map); + if (err) + goto bad; + } else { + /* XXX can this happen? */ + kfree(map->osd_primary_affinity); + map->osd_primary_affinity = NULL; + } + /* crush */ ceph_decode_32_safe(p, end, len, e_inval); map->crush = crush_decode(*p, min(*p + len, end)); @@ -1249,6 +1314,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, goto bad; } + /* new_primary_affinity */ + if (struct_v >= 2) { + err = decode_new_primary_affinity(p, end, map); + if (err) + goto bad; + } + /* ignore the rest */ *p = end; -- cgit v0.10.2 From ddf3a21a03d0f01c5ba83deaecd2d0c381d5ef42 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 21 Mar 2014 19:05:31 +0200 Subject: libceph: enable OSDMAP_ENC feature bit Announce our support for "new" (v7 - split and separately versioned client and osd sections) osdmap enconding. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h index 77c097f..7a4cab5 100644 --- a/include/linux/ceph/ceph_features.h +++ b/include/linux/ceph/ceph_features.h @@ -90,6 +90,7 @@ static inline u64 ceph_sanitize_features(u64 features) CEPH_FEATURE_OSD_CACHEPOOL | \ CEPH_FEATURE_CRUSH_V2 | \ CEPH_FEATURE_EXPORT_PEER | \ + CEPH_FEATURE_OSDMAP_ENC | \ CEPH_FEATURE_CRUSH_TUNABLES3) #define CEPH_FEATURES_REQUIRED_DEFAULT \ -- cgit v0.10.2 From 246138fa6787db6f4016f26604fdc05dc9f95627 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 24 Mar 2014 17:12:46 +0200 Subject: libceph: ceph_osd_{exists,is_up,is_down}(osd) definitions Sync up with ceph.git definitions. Bring in ceph_osd_is_down(). Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index 6e030cb..0895797 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -125,9 +125,21 @@ static inline void ceph_oid_copy(struct ceph_object_id *dest, dest->name_len = src->name_len; } +static inline int ceph_osd_exists(struct ceph_osdmap *map, int osd) +{ + return osd >= 0 && osd < map->max_osd && + (map->osd_state[osd] & CEPH_OSD_EXISTS); +} + static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd) { - return (osd < map->max_osd) && (map->osd_state[osd] & CEPH_OSD_UP); + return ceph_osd_exists(map, osd) && + (map->osd_state[osd] & CEPH_OSD_UP); +} + +static inline int ceph_osd_is_down(struct ceph_osdmap *map, int osd) +{ + return !ceph_osd_is_up(map, osd); } static inline bool ceph_osdmap_flag(struct ceph_osdmap *map, int flag) -- cgit v0.10.2 From 2abebdbca7997422bfab6bf8b6559384a6b95294 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 24 Mar 2014 17:12:47 +0200 Subject: libceph: ceph_can_shift_osds(pool) and pool type defines Bring in pg_pool_t::can_shift_osds() counterpart along with pool type defines. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index 0895797..4e28c1e 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -41,6 +41,18 @@ struct ceph_pg_pool_info { char *name; }; +static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool) +{ + switch (pool->type) { + case CEPH_POOL_TYPE_REP: + return true; + case CEPH_POOL_TYPE_EC: + return false; + default: + BUG_ON(1); + } +} + struct ceph_object_locator { s64 pool; }; diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h index bb6f40c..f20e0d8 100644 --- a/include/linux/ceph/rados.h +++ b/include/linux/ceph/rados.h @@ -81,8 +81,9 @@ struct ceph_pg_v1 { */ #define CEPH_NOPOOL ((__u64) (-1)) /* pool id not defined */ -#define CEPH_PG_TYPE_REP 1 -#define CEPH_PG_TYPE_RAID4 2 +#define CEPH_POOL_TYPE_REP 1 +#define CEPH_POOL_TYPE_RAID4 2 /* never implemented */ +#define CEPH_POOL_TYPE_EC 3 /* * stable_mod func is used to control number of placement groups. -- cgit v0.10.2 From 2bd93d4d7ec2dd461cfb87c6d8a9b1ef9b30de08 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 24 Mar 2014 17:12:47 +0200 Subject: libceph: introduce pg_to_raw_osds() and raw_to_up_osds() helpers pg_to_raw_osds() helper for computing a raw (crush) set, which can contain non-existant and down osds. raw_to_up_osds() helper for pruning non-existant and down osds from the raw set, therefore transforming it into an up set, and determining up primary. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 9568e62..b8bbef0 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1521,6 +1521,82 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, } /* + * Calculate raw (crush) set for given pgid. + * + * Return raw set length, or error. + */ +static int pg_to_raw_osds(struct ceph_osdmap *osdmap, + struct ceph_pg_pool_info *pool, + struct ceph_pg pgid, u32 pps, int *osds) +{ + int ruleno; + int len; + + /* crush */ + ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset, + pool->type, pool->size); + if (ruleno < 0) { + pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n", + pgid.pool, pool->crush_ruleset, pool->type, + pool->size); + return -ENOENT; + } + + len = do_crush(osdmap, ruleno, pps, osds, + min_t(int, pool->size, CEPH_PG_MAX_SIZE), + osdmap->osd_weight, osdmap->max_osd); + if (len < 0) { + pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n", + len, ruleno, pgid.pool, pool->crush_ruleset, + pool->type, pool->size); + return len; + } + + return len; +} + +/* + * Given raw set, calculate up set and up primary. + * + * Return up set length. *primary is set to up primary osd id, or -1 + * if up set is empty. + */ +static int raw_to_up_osds(struct ceph_osdmap *osdmap, + struct ceph_pg_pool_info *pool, + int *osds, int len, int *primary) +{ + int up_primary = -1; + int i; + + if (ceph_can_shift_osds(pool)) { + int removed = 0; + + for (i = 0; i < len; i++) { + if (ceph_osd_is_down(osdmap, osds[i])) { + removed++; + continue; + } + if (removed) + osds[i - removed] = osds[i]; + } + + len -= removed; + if (len > 0) + up_primary = osds[0]; + } else { + for (i = len - 1; i >= 0; i--) { + if (ceph_osd_is_down(osdmap, osds[i])) + osds[i] = CRUSH_ITEM_NONE; + else + up_primary = osds[i]; + } + } + + *primary = up_primary; + return len; +} + +/* * Return acting set for given pgid. */ int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, -- cgit v0.10.2 From 45966c3467e8291382a8970adbd78403a7818d45 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 24 Mar 2014 17:12:47 +0200 Subject: libceph: introduce apply_temps() helper apply_temp() helper for applying various temporary mappings (at this point only pg_temp mappings) to the up set, therefore transforming it into an acting set. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index b8bbef0..bd40f56 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1597,6 +1597,58 @@ static int raw_to_up_osds(struct ceph_osdmap *osdmap, } /* + * Given up set, apply pg_temp mapping. + * + * Return acting set length. *primary is set to acting primary osd id, + * or -1 if acting set is empty. + */ +static int apply_temps(struct ceph_osdmap *osdmap, + struct ceph_pg_pool_info *pool, struct ceph_pg pgid, + int *osds, int len, int *primary) +{ + struct ceph_pg_mapping *pg; + int temp_len; + int temp_primary; + int i; + + /* raw_pg -> pg */ + pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num, + pool->pg_num_mask); + + /* pg_temp? */ + pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); + if (pg) { + temp_len = 0; + temp_primary = -1; + + for (i = 0; i < pg->pg_temp.len; i++) { + if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) { + if (ceph_can_shift_osds(pool)) + continue; + else + osds[temp_len++] = CRUSH_ITEM_NONE; + } else { + osds[temp_len++] = pg->pg_temp.osds[i]; + } + } + + /* apply pg_temp's primary */ + for (i = 0; i < temp_len; i++) { + if (osds[i] != CRUSH_ITEM_NONE) { + temp_primary = osds[i]; + break; + } + } + } else { + temp_len = len; + temp_primary = *primary; + } + + *primary = temp_primary; + return temp_len; +} + +/* * Return acting set for given pgid. */ int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, -- cgit v0.10.2 From ac972230e20581b044f5ce66dcaf3c5af8d57444 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 24 Mar 2014 17:12:48 +0200 Subject: libceph: switch ceph_calc_pg_acting() to new helpers Switch ceph_calc_pg_acting() to new helpers: pg_to_raw_osds(), raw_to_up_osds() and apply_temps(). Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index 4e28c1e..b0c8f84 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -212,7 +212,7 @@ extern int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap, extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, - int *acting); + int *osds); extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index bd40f56..f1cad21 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1649,24 +1649,49 @@ static int apply_temps(struct ceph_osdmap *osdmap, } /* - * Return acting set for given pgid. + * Calculate acting set for given pgid. + * + * Return acting set length, or error. */ int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, - int *acting) + int *osds) { - int rawosds[CEPH_PG_MAX_SIZE], *osds; - int i, o, num = CEPH_PG_MAX_SIZE; + struct ceph_pg_pool_info *pool; + u32 pps; + int len; + int primary; - osds = calc_pg_raw(osdmap, pgid, rawosds, &num); - if (!osds) - return -1; + pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); + if (!pool) + return 0; - /* primary is first up osd */ - o = 0; - for (i = 0; i < num; i++) - if (ceph_osd_is_up(osdmap, osds[i])) - acting[o++] = osds[i]; - return o; + if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) { + /* hash pool id and seed so that pool PGs do not overlap */ + pps = crush_hash32_2(CRUSH_HASH_RJENKINS1, + ceph_stable_mod(pgid.seed, pool->pgp_num, + pool->pgp_num_mask), + pgid.pool); + } else { + /* + * legacy behavior: add ps and pool together. this is + * not a great approach because the PGs from each pool + * will overlap on top of each other: 0.5 == 1.4 == + * 2.3 == ... + */ + pps = ceph_stable_mod(pgid.seed, pool->pgp_num, + pool->pgp_num_mask) + + (unsigned)pgid.pool; + } + + len = pg_to_raw_osds(osdmap, pool, pgid, pps, osds); + if (len < 0) + return len; + + len = raw_to_up_osds(osdmap, pool, osds, len, &primary); + + len = apply_temps(osdmap, pool, pgid, osds, len, &primary); + + return len; } /* -- cgit v0.10.2 From 8008ab1080c1768b02d232dcfd9e161cd47cc9f7 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 24 Mar 2014 17:12:48 +0200 Subject: libceph: return primary from ceph_calc_pg_acting() In preparation for adding support for primary_temp, stop assuming primaryness: add a primary out parameter to ceph_calc_pg_acting() and change call sites accordingly. Primary is now specified separately from the order of osds in the set. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index b0c8f84..561ea89 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -212,7 +212,7 @@ extern int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap, extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, - int *osds); + int *osds, int *primary); extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 6f64eec..b4157dc 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1333,7 +1333,7 @@ static int __map_request(struct ceph_osd_client *osdc, { struct ceph_pg pgid; int acting[CEPH_PG_MAX_SIZE]; - int o = -1, num = 0; + int num, o; int err; bool was_paused; @@ -1346,11 +1346,9 @@ static int __map_request(struct ceph_osd_client *osdc, } req->r_pgid = pgid; - err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); - if (err > 0) { - o = acting[0]; - num = err; - } + num = ceph_calc_pg_acting(osdc->osdmap, pgid, acting, &o); + if (num < 0) + num = 0; was_paused = req->r_paused; req->r_paused = __req_should_be_paused(osdc, req); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index f1cad21..df9389d 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1651,19 +1651,21 @@ static int apply_temps(struct ceph_osdmap *osdmap, /* * Calculate acting set for given pgid. * - * Return acting set length, or error. + * Return acting set length, or error. *primary is set to acting + * primary osd id, or -1 if acting set is empty or on error. */ int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, - int *osds) + int *osds, int *primary) { struct ceph_pg_pool_info *pool; u32 pps; int len; - int primary; pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); - if (!pool) - return 0; + if (!pool) { + *primary = -1; + return -ENOENT; + } if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) { /* hash pool id and seed so that pool PGs do not overlap */ @@ -1684,12 +1686,14 @@ int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, } len = pg_to_raw_osds(osdmap, pool, pgid, pps, osds); - if (len < 0) + if (len < 0) { + *primary = -1; return len; + } - len = raw_to_up_osds(osdmap, pool, osds, len, &primary); + len = raw_to_up_osds(osdmap, pool, osds, len, primary); - len = apply_temps(osdmap, pool, pgid, osds, len, &primary); + len = apply_temps(osdmap, pool, pgid, osds, len, primary); return len; } -- cgit v0.10.2 From 5e8d4d36bf23bb7baf027c479d54395840219928 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 24 Mar 2014 17:12:48 +0200 Subject: libceph: add support for primary_temp mappings Change apply_temp() to override primary in the same way pg_temp overrides osd set. primary_temp overrides pg_temp primary too. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index df9389d..20a38a3 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1597,7 +1597,7 @@ static int raw_to_up_osds(struct ceph_osdmap *osdmap, } /* - * Given up set, apply pg_temp mapping. + * Given up set, apply pg_temp and primary_temp mappings. * * Return acting set length. *primary is set to acting primary osd id, * or -1 if acting set is empty. @@ -1644,6 +1644,11 @@ static int apply_temps(struct ceph_osdmap *osdmap, temp_primary = *primary; } + /* primary_temp? */ + pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid); + if (pg) + temp_primary = pg->primary_temp.osd; + *primary = temp_primary; return temp_len; } -- cgit v0.10.2 From 47ec1f3cc46dde00deb34922dbffdeda254ad76d Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 24 Mar 2014 17:12:49 +0200 Subject: libceph: add support for osd primary affinity Respond to non-default primary_affinity values accordingly. (Primary affinity allows the admin to shift 'primary responsibility' away from specific osds, effectively shifting around the read side of the workload and whatever overhead is incurred by peering and writes by virtue of being the primary). Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 20a38a3..ae8f367 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1596,6 +1596,72 @@ static int raw_to_up_osds(struct ceph_osdmap *osdmap, return len; } +static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps, + struct ceph_pg_pool_info *pool, + int *osds, int len, int *primary) +{ + int i; + int pos = -1; + + /* + * Do we have any non-default primary_affinity values for these + * osds? + */ + if (!osdmap->osd_primary_affinity) + return; + + for (i = 0; i < len; i++) { + if (osds[i] != CRUSH_ITEM_NONE && + osdmap->osd_primary_affinity[i] != + CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) { + break; + } + } + if (i == len) + return; + + /* + * Pick the primary. Feed both the seed (for the pg) and the + * osd into the hash/rng so that a proportional fraction of an + * osd's pgs get rejected as primary. + */ + for (i = 0; i < len; i++) { + int osd; + u32 aff; + + osd = osds[i]; + if (osd == CRUSH_ITEM_NONE) + continue; + + aff = osdmap->osd_primary_affinity[osd]; + if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY && + (crush_hash32_2(CRUSH_HASH_RJENKINS1, + pps, osd) >> 16) >= aff) { + /* + * We chose not to use this primary. Note it + * anyway as a fallback in case we don't pick + * anyone else, but keep looking. + */ + if (pos < 0) + pos = i; + } else { + pos = i; + break; + } + } + if (pos < 0) + return; + + *primary = osds[pos]; + + if (ceph_can_shift_osds(pool) && pos > 0) { + /* move the new primary to the front */ + for (i = pos; i > 0; i--) + osds[i] = osds[i - 1]; + osds[0] = *primary; + } +} + /* * Given up set, apply pg_temp and primary_temp mappings. * @@ -1698,6 +1764,8 @@ int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, len = raw_to_up_osds(osdmap, pool, osds, len, primary); + apply_primary_affinity(osdmap, pps, pool, osds, len, primary); + len = apply_temps(osdmap, pool, pgid, osds, len, primary); return len; -- cgit v0.10.2 From c4c1228525a2cbf9e06657b01135391700c1ec14 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 24 Mar 2014 17:12:49 +0200 Subject: libceph: redo ceph_calc_pg_primary() in terms of ceph_calc_pg_acting() Reimplement ceph_calc_pg_primary() in terms of ceph_calc_pg_acting() and get rid of the now unused calc_pg_raw(). Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index ae8f367..408d148 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1456,71 +1456,6 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x, } /* - * Calculate raw osd vector for the given pgid. Return pointer to osd - * array, or NULL on failure. - */ -static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, - int *osds, int *num) -{ - struct ceph_pg_mapping *pg; - struct ceph_pg_pool_info *pool; - int ruleno; - int r; - u32 pps; - - pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); - if (!pool) - return NULL; - - /* pg_temp? */ - pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num, - pool->pg_num_mask); - pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); - if (pg) { - *num = pg->pg_temp.len; - return pg->pg_temp.osds; - } - - /* crush */ - ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset, - pool->type, pool->size); - if (ruleno < 0) { - pr_err("no crush rule pool %lld ruleset %d type %d size %d\n", - pgid.pool, pool->crush_ruleset, pool->type, - pool->size); - return NULL; - } - - if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) { - /* hash pool id and seed sothat pool PGs do not overlap */ - pps = crush_hash32_2(CRUSH_HASH_RJENKINS1, - ceph_stable_mod(pgid.seed, pool->pgp_num, - pool->pgp_num_mask), - pgid.pool); - } else { - /* - * legacy ehavior: add ps and pool together. this is - * not a great approach because the PGs from each pool - * will overlap on top of each other: 0.5 == 1.4 == - * 2.3 == ... - */ - pps = ceph_stable_mod(pgid.seed, pool->pgp_num, - pool->pgp_num_mask) + - (unsigned)pgid.pool; - } - r = do_crush(osdmap, ruleno, pps, osds, min_t(int, pool->size, *num), - osdmap->osd_weight, osdmap->max_osd); - if (r < 0) { - pr_err("error %d from crush rule: pool %lld ruleset %d type %d" - " size %d\n", r, pgid.pool, pool->crush_ruleset, - pool->type, pool->size); - return NULL; - } - *num = r; - return osds; -} - -/* * Calculate raw (crush) set for given pgid. * * Return raw set length, or error. @@ -1776,17 +1711,11 @@ int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, */ int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid) { - int rawosds[CEPH_PG_MAX_SIZE], *osds; - int i, num = CEPH_PG_MAX_SIZE; + int osds[CEPH_PG_MAX_SIZE]; + int primary; - osds = calc_pg_raw(osdmap, pgid, rawosds, &num); - if (!osds) - return -1; + ceph_calc_pg_acting(osdmap, pgid, osds, &primary); - /* primary is first up osd */ - for (i = 0; i < num; i++) - if (ceph_osd_is_up(osdmap, osds[i])) - return osds[i]; - return -1; + return primary; } EXPORT_SYMBOL(ceph_calc_pg_primary); -- cgit v0.10.2 From 18cb95af2d7c69aa136ab13f02dd55188c120e75 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 24 Mar 2014 17:12:50 +0200 Subject: libceph: enable PRIMARY_AFFINITY feature bit Announce our support for osdmaps with non-default primary affinity values. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h index 7a4cab5..d12659c 100644 --- a/include/linux/ceph/ceph_features.h +++ b/include/linux/ceph/ceph_features.h @@ -91,7 +91,8 @@ static inline u64 ceph_sanitize_features(u64 features) CEPH_FEATURE_CRUSH_V2 | \ CEPH_FEATURE_EXPORT_PEER | \ CEPH_FEATURE_OSDMAP_ENC | \ - CEPH_FEATURE_CRUSH_TUNABLES3) + CEPH_FEATURE_CRUSH_TUNABLES3 | \ + CEPH_FEATURE_OSD_PRIMARY_AFFINITY) #define CEPH_FEATURES_REQUIRED_DEFAULT \ (CEPH_FEATURE_NOSRCADDR | \ -- cgit v0.10.2 From 54008399dc0ce511a07b87f1af3d1f5c791982a4 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sat, 29 Mar 2014 13:41:15 +0800 Subject: ceph: preallocate buffer for readdir reply Preallocate buffer for readdir reply. Limit number of entries in readdir reply according to the buffer size. Signed-off-by: Yan, Zheng diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index ff2864a..46cd092 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -252,8 +252,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) int err; u32 ftype; struct ceph_mds_reply_info_parsed *rinfo; - const int max_entries = fsc->mount_options->max_readdir; - const int max_bytes = fsc->mount_options->max_readdir_bytes; dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off); if (fi->flags & CEPH_F_ATEND) @@ -327,6 +325,11 @@ more: req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); if (IS_ERR(req)) return PTR_ERR(req); + err = ceph_alloc_readdir_reply_buffer(req, inode); + if (err) { + ceph_mdsc_put_request(req); + return err; + } req->r_inode = inode; ihold(inode); req->r_dentry = dget(file->f_dentry); @@ -337,9 +340,6 @@ more: req->r_path2 = kstrdup(fi->last_name, GFP_NOFS); req->r_readdir_offset = fi->next_offset; req->r_args.readdir.frag = cpu_to_le32(frag); - req->r_args.readdir.max_entries = cpu_to_le32(max_entries); - req->r_args.readdir.max_bytes = cpu_to_le32(max_bytes); - req->r_num_caps = max_entries + 1; err = ceph_mdsc_do_request(mdsc, NULL, req); if (err < 0) { ceph_mdsc_put_request(req); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 77640ada4..19fbfc4 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -165,21 +166,18 @@ static int parse_reply_info_dir(void **p, void *end, if (num == 0) goto done; - /* alloc large array */ - info->dir_nr = num; - info->dir_in = kcalloc(num, sizeof(*info->dir_in) + - sizeof(*info->dir_dname) + - sizeof(*info->dir_dname_len) + - sizeof(*info->dir_dlease), - GFP_NOFS); - if (info->dir_in == NULL) { - err = -ENOMEM; - goto out_bad; - } + BUG_ON(!info->dir_in); info->dir_dname = (void *)(info->dir_in + num); info->dir_dname_len = (void *)(info->dir_dname + num); info->dir_dlease = (void *)(info->dir_dname_len + num); + if ((unsigned long)(info->dir_dlease + num) > + (unsigned long)info->dir_in + info->dir_buf_size) { + pr_err("dir contents are larger than expected\n"); + WARN_ON(1); + goto bad; + } + info->dir_nr = num; while (num) { /* dentry */ ceph_decode_need(p, end, sizeof(u32)*2, bad); @@ -327,7 +325,9 @@ out_bad: static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) { - kfree(info->dir_in); + if (!info->dir_in) + return; + free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size)); } @@ -512,12 +512,11 @@ void ceph_mdsc_release_request(struct kref *kref) struct ceph_mds_request *req = container_of(kref, struct ceph_mds_request, r_kref); + destroy_reply_info(&req->r_reply_info); if (req->r_request) ceph_msg_put(req->r_request); - if (req->r_reply) { + if (req->r_reply) ceph_msg_put(req->r_reply); - destroy_reply_info(&req->r_reply_info); - } if (req->r_inode) { ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); iput(req->r_inode); @@ -1496,6 +1495,43 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc, * requests */ +int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, + struct inode *dir) +{ + struct ceph_inode_info *ci = ceph_inode(dir); + struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; + struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; + size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) + + sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease); + int order, num_entries; + + spin_lock(&ci->i_ceph_lock); + num_entries = ci->i_files + ci->i_subdirs; + spin_unlock(&ci->i_ceph_lock); + num_entries = max(num_entries, 1); + num_entries = min(num_entries, opt->max_readdir); + + order = get_order(size * num_entries); + while (order >= 0) { + rinfo->dir_in = (void*)__get_free_pages(GFP_NOFS | __GFP_NOWARN, + order); + if (rinfo->dir_in) + break; + order--; + } + if (!rinfo->dir_in) + return -ENOMEM; + + num_entries = (PAGE_SIZE << order) / size; + num_entries = min(num_entries, opt->max_readdir); + + rinfo->dir_buf_size = PAGE_SIZE << order; + req->r_num_caps = num_entries + 1; + req->r_args.readdir.max_entries = cpu_to_le32(num_entries); + req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); + return 0; +} + /* * Create an mds request. */ diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 6828891..e90cfcc 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -67,6 +67,7 @@ struct ceph_mds_reply_info_parsed { /* for readdir results */ struct { struct ceph_mds_reply_dirfrag *dir_dir; + size_t dir_buf_size; int dir_nr; char **dir_dname; u32 *dir_dname_len; @@ -346,7 +347,8 @@ extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct dentry *dn); extern void ceph_invalidate_dir_request(struct ceph_mds_request *req); - +extern int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, + struct inode *dir); extern struct ceph_mds_request * ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode); extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, -- cgit v0.10.2 From ab866549b3da3eef88e51696bcb24e79f1cc3745 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 1 Apr 2014 20:20:17 +0800 Subject: ceph: drop extra open file reference in ceph_atomic_open() ceph_atomic_open() calls ceph_open() after receiving the MDS reply. ceph_open() grabs an extra open file reference. (The open request already holds an open file reference) Signed-off-by: Yan, Zheng diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 2862a75..66075a4 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -291,8 +291,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, } err = finish_open(file, dentry, ceph_open, opened); } - out_err: + if (!req->r_err && req->r_target_inode) + ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode); ceph_mdsc_put_request(req); dout("atomic_open result=%d\n", err); return err; -- cgit v0.10.2 From 48193012873e341f08de48304e32d0499b96c60b Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 1 Apr 2014 20:34:18 +0800 Subject: ceph: don't grabs open file reference for aborted request Signed-off-by: Yan, Zheng diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index c9f2567..0b0728e 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1112,7 +1112,7 @@ retry_lookup: err = fill_inode(in, &rinfo->targeti, NULL, session, req->r_request_started, - (le32_to_cpu(rinfo->head->result) == 0) ? + (!req->r_aborted && rinfo->head->result == 0) ? req->r_fmode : -1, &req->r_caps_reservation); if (err < 0) { -- cgit v0.10.2 From a56371d9d920799ebb88c196aa018e76fc46554f Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 1 Apr 2014 20:34:56 +0800 Subject: ceph: flush cap release queue when trimming session caps Signed-off-by: Yan, Zheng diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 19fbfc4..2b4d093 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1311,6 +1311,9 @@ static int trim_caps(struct ceph_mds_client *mdsc, trim_caps - session->s_trim_caps); session->s_trim_caps = 0; } + + ceph_add_cap_releases(mdsc, session); + ceph_send_cap_releases(mdsc, session); return 0; } -- cgit v0.10.2 From f31da0f3e12e57f21d73315e06c48fb9860fe07d Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 2 Apr 2014 20:34:04 +0400 Subject: libceph: output primary affinity values on osdmap updates Similar to osd weights, output primary affinity values on incremental osdmap updates. Signed-off-by: Ilya Dryomov diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 408d148..e632b5a 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1005,6 +1005,8 @@ static int decode_new_primary_affinity(void **p, void *end, ret = set_primary_affinity(map, osd, aff); if (ret) return ret; + + pr_info("osd%d primary-affinity 0x%x\n", osd, aff); } return 0; -- cgit v0.10.2 From 8a53f23fcda355958a79774c6333a3a31c380ecf Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 4 Apr 2014 18:21:32 +0400 Subject: libceph: dump pool {read,write}_tier to debugfs Dump pool {read,write}_tier to debugfs. While at it, fixup printk type specifiers and remove the unnecessary cast to unsigned long long. Signed-off-by: Ilya Dryomov diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 34453a2..10421a4 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -68,9 +68,9 @@ static int osdmap_show(struct seq_file *s, void *p) struct ceph_pg_pool_info *pool = rb_entry(n, struct ceph_pg_pool_info, node); - seq_printf(s, "pg_pool %llu pg_num %d / %d\n", - (unsigned long long)pool->id, pool->pg_num, - pool->pg_num_mask); + seq_printf(s, "pool %lld pg_num %u (%d) read_tier %lld write_tier %lld\n", + pool->id, pool->pg_num, pool->pg_num_mask, + pool->read_tier, pool->write_tier); } for (i = 0; i < map->max_osd; i++) { struct ceph_entity_addr *addr = &map->osd_addr[i]; -- cgit v0.10.2 From a30be7cb2ccb995ad5e67fd4b548f11fe37fc8b1 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sun, 6 Apr 2014 14:10:04 +0800 Subject: ceph: skip invalid dentry during dcache readdir skip dentries that were added before MDS issued FILE_SHARED to client. Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 46cd092..766410a 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -119,7 +119,8 @@ static int fpos_cmp(loff_t l, loff_t r) * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by * the MDS if/when the directory is modified). */ -static int __dcache_readdir(struct file *file, struct dir_context *ctx) +static int __dcache_readdir(struct file *file, struct dir_context *ctx, + u32 shared_gen) { struct ceph_file_info *fi = file->private_data; struct dentry *parent = file->f_dentry; @@ -133,8 +134,8 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx) last = fi->dentry; fi->dentry = NULL; - dout("__dcache_readdir %p at %llu (last %p)\n", dir, ctx->pos, - last); + dout("__dcache_readdir %p v%u at %llu (last %p)\n", + dir, shared_gen, ctx->pos, last); spin_lock(&parent->d_lock); @@ -161,7 +162,8 @@ more: goto out_unlock; } spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED); - if (!d_unhashed(dentry) && dentry->d_inode && + if (di->lease_shared_gen == shared_gen && + !d_unhashed(dentry) && dentry->d_inode && ceph_snap(dentry->d_inode) != CEPH_SNAPDIR && ceph_ino(dentry->d_inode) != CEPH_INO_CEPH && fpos_cmp(ctx->pos, di->offset) <= 0) @@ -289,8 +291,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ceph_snap(inode) != CEPH_SNAPDIR && __ceph_dir_is_complete(ci) && __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { + u32 shared_gen = ci->i_shared_gen; spin_unlock(&ci->i_ceph_lock); - err = __dcache_readdir(file, ctx); + err = __dcache_readdir(file, ctx, shared_gen); if (err != -EAGAIN) return err; } else { -- cgit v0.10.2