diff options
Diffstat (limited to 'fs/ceph/caps.c')
-rw-r--r-- | fs/ceph/caps.c | 439 |
1 files changed, 251 insertions, 188 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index b81be9a..98ab13e 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1,4 +1,4 @@ -#include "ceph_debug.h" +#include <linux/ceph/ceph_debug.h> #include <linux/fs.h> #include <linux/kernel.h> @@ -9,8 +9,9 @@ #include <linux/writeback.h> #include "super.h" -#include "decode.h" -#include "messenger.h" +#include "mds_client.h" +#include <linux/ceph/decode.h> +#include <linux/ceph/messenger.h> /* * Capability management @@ -113,58 +114,41 @@ const char *ceph_cap_string(int caps) return cap_str[i]; } -/* - * Cap reservations - * - * Maintain a global pool of preallocated struct ceph_caps, referenced - * by struct ceph_caps_reservations. This ensures that we preallocate - * memory needed to successfully process an MDS response. (If an MDS - * sends us cap information and we fail to process it, we will have - * problems due to the client and MDS being out of sync.) - * - * Reservations are 'owned' by a ceph_cap_reservation context. - */ -static spinlock_t caps_list_lock; -static struct list_head caps_list; /* unused (reserved or unreserved) */ -static int caps_total_count; /* total caps allocated */ -static int caps_use_count; /* in use */ -static int caps_reserve_count; /* unused, reserved */ -static int caps_avail_count; /* unused, unreserved */ -static int caps_min_count; /* keep at least this many (unreserved) */ - -void __init ceph_caps_init(void) +void ceph_caps_init(struct ceph_mds_client *mdsc) { - INIT_LIST_HEAD(&caps_list); - spin_lock_init(&caps_list_lock); + INIT_LIST_HEAD(&mdsc->caps_list); + spin_lock_init(&mdsc->caps_list_lock); } -void ceph_caps_finalize(void) +void ceph_caps_finalize(struct ceph_mds_client *mdsc) { struct ceph_cap *cap; - spin_lock(&caps_list_lock); - while (!list_empty(&caps_list)) { - cap = list_first_entry(&caps_list, struct ceph_cap, caps_item); + spin_lock(&mdsc->caps_list_lock); + while (!list_empty(&mdsc->caps_list)) { + cap = list_first_entry(&mdsc->caps_list, + struct ceph_cap, caps_item); list_del(&cap->caps_item); kmem_cache_free(ceph_cap_cachep, cap); } - caps_total_count = 0; - caps_avail_count = 0; - caps_use_count = 0; - caps_reserve_count = 0; - caps_min_count = 0; - spin_unlock(&caps_list_lock); + mdsc->caps_total_count = 0; + mdsc->caps_avail_count = 0; + mdsc->caps_use_count = 0; + mdsc->caps_reserve_count = 0; + mdsc->caps_min_count = 0; + spin_unlock(&mdsc->caps_list_lock); } -void ceph_adjust_min_caps(int delta) +void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta) { - spin_lock(&caps_list_lock); - caps_min_count += delta; - BUG_ON(caps_min_count < 0); - spin_unlock(&caps_list_lock); + spin_lock(&mdsc->caps_list_lock); + mdsc->caps_min_count += delta; + BUG_ON(mdsc->caps_min_count < 0); + spin_unlock(&mdsc->caps_list_lock); } -int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need) +int ceph_reserve_caps(struct ceph_mds_client *mdsc, + struct ceph_cap_reservation *ctx, int need) { int i; struct ceph_cap *cap; @@ -176,16 +160,17 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need) dout("reserve caps ctx=%p need=%d\n", ctx, need); /* first reserve any caps that are already allocated */ - spin_lock(&caps_list_lock); - if (caps_avail_count >= need) + spin_lock(&mdsc->caps_list_lock); + if (mdsc->caps_avail_count >= need) have = need; else - have = caps_avail_count; - caps_avail_count -= have; - caps_reserve_count += have; - BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + - caps_avail_count); - spin_unlock(&caps_list_lock); + have = mdsc->caps_avail_count; + mdsc->caps_avail_count -= have; + mdsc->caps_reserve_count += have; + BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + + mdsc->caps_reserve_count + + mdsc->caps_avail_count); + spin_unlock(&mdsc->caps_list_lock); for (i = have; i < need; i++) { cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); @@ -198,19 +183,20 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need) } BUG_ON(have + alloc != need); - spin_lock(&caps_list_lock); - caps_total_count += alloc; - caps_reserve_count += alloc; - list_splice(&newcaps, &caps_list); + spin_lock(&mdsc->caps_list_lock); + mdsc->caps_total_count += alloc; + mdsc->caps_reserve_count += alloc; + list_splice(&newcaps, &mdsc->caps_list); - BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + - caps_avail_count); - spin_unlock(&caps_list_lock); + BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + + mdsc->caps_reserve_count + + mdsc->caps_avail_count); + spin_unlock(&mdsc->caps_list_lock); ctx->count = need; dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n", - ctx, caps_total_count, caps_use_count, caps_reserve_count, - caps_avail_count); + ctx, mdsc->caps_total_count, mdsc->caps_use_count, + mdsc->caps_reserve_count, mdsc->caps_avail_count); return 0; out_alloc_count: @@ -220,26 +206,29 @@ out_alloc_count: return ret; } -int ceph_unreserve_caps(struct ceph_cap_reservation *ctx) +int ceph_unreserve_caps(struct ceph_mds_client *mdsc, + struct ceph_cap_reservation *ctx) { dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count); if (ctx->count) { - spin_lock(&caps_list_lock); - BUG_ON(caps_reserve_count < ctx->count); - caps_reserve_count -= ctx->count; - caps_avail_count += ctx->count; + spin_lock(&mdsc->caps_list_lock); + BUG_ON(mdsc->caps_reserve_count < ctx->count); + mdsc->caps_reserve_count -= ctx->count; + mdsc->caps_avail_count += ctx->count; ctx->count = 0; dout("unreserve caps %d = %d used + %d resv + %d avail\n", - caps_total_count, caps_use_count, caps_reserve_count, - caps_avail_count); - BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + - caps_avail_count); - spin_unlock(&caps_list_lock); + mdsc->caps_total_count, mdsc->caps_use_count, + mdsc->caps_reserve_count, mdsc->caps_avail_count); + BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + + mdsc->caps_reserve_count + + mdsc->caps_avail_count); + spin_unlock(&mdsc->caps_list_lock); } return 0; } -static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx) +static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc, + struct ceph_cap_reservation *ctx) { struct ceph_cap *cap = NULL; @@ -247,71 +236,74 @@ static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx) if (!ctx) { cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); if (cap) { - caps_use_count++; - caps_total_count++; + mdsc->caps_use_count++; + mdsc->caps_total_count++; } return cap; } - spin_lock(&caps_list_lock); + spin_lock(&mdsc->caps_list_lock); dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n", - ctx, ctx->count, caps_total_count, caps_use_count, - caps_reserve_count, caps_avail_count); + ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count, + mdsc->caps_reserve_count, mdsc->caps_avail_count); BUG_ON(!ctx->count); - BUG_ON(ctx->count > caps_reserve_count); - BUG_ON(list_empty(&caps_list)); + BUG_ON(ctx->count > mdsc->caps_reserve_count); + BUG_ON(list_empty(&mdsc->caps_list)); ctx->count--; - caps_reserve_count--; - caps_use_count++; + mdsc->caps_reserve_count--; + mdsc->caps_use_count++; - cap = list_first_entry(&caps_list, struct ceph_cap, caps_item); + cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item); list_del(&cap->caps_item); - BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + - caps_avail_count); - spin_unlock(&caps_list_lock); + BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + + mdsc->caps_reserve_count + mdsc->caps_avail_count); + spin_unlock(&mdsc->caps_list_lock); return cap; } -void ceph_put_cap(struct ceph_cap *cap) +void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap) { - spin_lock(&caps_list_lock); + spin_lock(&mdsc->caps_list_lock); dout("put_cap %p %d = %d used + %d resv + %d avail\n", - cap, caps_total_count, caps_use_count, - caps_reserve_count, caps_avail_count); - caps_use_count--; + cap, mdsc->caps_total_count, mdsc->caps_use_count, + mdsc->caps_reserve_count, mdsc->caps_avail_count); + mdsc->caps_use_count--; /* * Keep some preallocated caps around (ceph_min_count), to * avoid lots of free/alloc churn. */ - if (caps_avail_count >= caps_reserve_count + caps_min_count) { - caps_total_count--; + if (mdsc->caps_avail_count >= mdsc->caps_reserve_count + + mdsc->caps_min_count) { + mdsc->caps_total_count--; kmem_cache_free(ceph_cap_cachep, cap); } else { - caps_avail_count++; - list_add(&cap->caps_item, &caps_list); + mdsc->caps_avail_count++; + list_add(&cap->caps_item, &mdsc->caps_list); } - BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + - caps_avail_count); - spin_unlock(&caps_list_lock); + BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + + mdsc->caps_reserve_count + mdsc->caps_avail_count); + spin_unlock(&mdsc->caps_list_lock); } -void ceph_reservation_status(struct ceph_client *client, +void ceph_reservation_status(struct ceph_fs_client *fsc, int *total, int *avail, int *used, int *reserved, int *min) { + struct ceph_mds_client *mdsc = fsc->mdsc; + if (total) - *total = caps_total_count; + *total = mdsc->caps_total_count; if (avail) - *avail = caps_avail_count; + *avail = mdsc->caps_avail_count; if (used) - *used = caps_use_count; + *used = mdsc->caps_use_count; if (reserved) - *reserved = caps_reserve_count; + *reserved = mdsc->caps_reserve_count; if (min) - *min = caps_min_count; + *min = mdsc->caps_min_count; } /* @@ -336,22 +328,29 @@ static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds) return NULL; } +struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds) +{ + struct ceph_cap *cap; + + spin_lock(&ci->vfs_inode.i_lock); + cap = __get_cap_for_mds(ci, mds); + spin_unlock(&ci->vfs_inode.i_lock); + return cap; +} + /* - * Return id of any MDS with a cap, preferably FILE_WR|WRBUFFER|EXCL, else - * -1. + * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1. */ -static int __ceph_get_cap_mds(struct ceph_inode_info *ci, u32 *mseq) +static int __ceph_get_cap_mds(struct ceph_inode_info *ci) { struct ceph_cap *cap; int mds = -1; struct rb_node *p; - /* prefer mds with WR|WRBUFFER|EXCL caps */ + /* prefer mds with WR|BUFFER|EXCL caps */ for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { cap = rb_entry(p, struct ceph_cap, ci_node); mds = cap->mds; - if (mseq) - *mseq = cap->mseq; if (cap->issued & (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_EXCL)) @@ -364,7 +363,7 @@ int ceph_get_cap_mds(struct inode *inode) { int mds; spin_lock(&inode->i_lock); - mds = __ceph_get_cap_mds(ceph_inode(inode), NULL); + mds = __ceph_get_cap_mds(ceph_inode(inode)); spin_unlock(&inode->i_lock); return mds; } @@ -401,7 +400,7 @@ static void __insert_cap_node(struct ceph_inode_info *ci, static void __cap_set_timeouts(struct ceph_mds_client *mdsc, struct ceph_inode_info *ci) { - struct ceph_mount_args *ma = mdsc->client->mount_args; + struct ceph_mount_options *ma = mdsc->fsc->mount_options; ci->i_hold_caps_min = round_jiffies(jiffies + ma->caps_wanted_delay_min * HZ); @@ -483,8 +482,8 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, * Each time we receive FILE_CACHE anew, we increment * i_rdcache_gen. */ - if ((issued & CEPH_CAP_FILE_CACHE) && - (had & CEPH_CAP_FILE_CACHE) == 0) + if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && + (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) ci->i_rdcache_gen++; /* @@ -517,7 +516,7 @@ int ceph_add_cap(struct inode *inode, unsigned seq, unsigned mseq, u64 realmino, int flags, struct ceph_cap_reservation *caps_reservation) { - struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc; + struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap *new_cap = NULL; struct ceph_cap *cap; @@ -543,7 +542,7 @@ retry: new_cap = NULL; } else { spin_unlock(&inode->i_lock); - new_cap = get_cap(caps_reservation); + new_cap = get_cap(mdsc, caps_reservation); if (new_cap == NULL) return -ENOMEM; goto retry; @@ -588,6 +587,7 @@ retry: } else { pr_err("ceph_add_cap: couldn't find snap realm %llx\n", realmino); + WARN_ON(!realm); } } @@ -815,7 +815,7 @@ int __ceph_caps_used(struct ceph_inode_info *ci) used |= CEPH_CAP_PIN; if (ci->i_rd_ref) used |= CEPH_CAP_FILE_RD; - if (ci->i_rdcache_ref || ci->i_rdcache_gen) + if (ci->i_rdcache_ref || ci->vfs_inode.i_data.nrpages) used |= CEPH_CAP_FILE_CACHE; if (ci->i_wr_ref) used |= CEPH_CAP_FILE_WR; @@ -831,7 +831,7 @@ int __ceph_caps_file_wanted(struct ceph_inode_info *ci) { int want = 0; int mode; - for (mode = 0; mode < 4; mode++) + for (mode = 0; mode < CEPH_FILE_MODE_NUM; mode++) if (ci->i_nr_by_mode[mode]) want |= ceph_caps_for_mode(mode); return want; @@ -874,7 +874,7 @@ void __ceph_remove_cap(struct ceph_cap *cap) struct ceph_mds_session *session = cap->session; struct ceph_inode_info *ci = cap->ci; struct ceph_mds_client *mdsc = - &ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; + ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; int removed = 0; dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode); @@ -901,7 +901,7 @@ void __ceph_remove_cap(struct ceph_cap *cap) ci->i_auth_cap = NULL; if (removed) - ceph_put_cap(cap); + ceph_put_cap(mdsc, cap); if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) { struct ceph_snap_realm *realm = ci->i_snap_realm; @@ -1083,6 +1083,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, gid_t gid; struct ceph_mds_session *session; u64 xattr_version = 0; + struct ceph_buffer *xattr_blob = NULL; int delayed = 0; u64 flush_tid = 0; int i; @@ -1143,6 +1144,10 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, for (i = 0; i < CEPH_CAP_BITS; i++) if (flushing & (1 << i)) ci->i_cap_flush_tid[i] = flush_tid; + + follows = ci->i_head_snapc->seq; + } else { + follows = 0; } keep = cap->implemented; @@ -1156,14 +1161,14 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, mtime = inode->i_mtime; atime = inode->i_atime; time_warp_seq = ci->i_time_warp_seq; - follows = ci->i_snap_realm->cached_context->seq; uid = inode->i_uid; gid = inode->i_gid; mode = inode->i_mode; - if (dropping & CEPH_CAP_XATTR_EXCL) { + if (flushing & CEPH_CAP_XATTR_EXCL) { __ceph_build_xattrs_blob(ci); - xattr_version = ci->i_xattrs.version + 1; + xattr_blob = ci->i_xattrs.blob; + xattr_version = ci->i_xattrs.version; } spin_unlock(&inode->i_lock); @@ -1171,9 +1176,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, op, keep, want, flushing, seq, flush_tid, issue_seq, mseq, size, max_size, &mtime, &atime, time_warp_seq, - uid, gid, mode, - xattr_version, - (flushing & CEPH_CAP_XATTR_EXCL) ? ci->i_xattrs.blob : NULL, + uid, gid, mode, xattr_version, xattr_blob, follows); if (ret < 0) { dout("error sending cap msg, must requeue %p\n", inode); @@ -1193,16 +1196,22 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, * asynchronously back to the MDS once sync writes complete and dirty * data is written out. * + * Unless @again is true, skip cap_snaps that were already sent to + * the MDS (i.e., during this session). + * * Called under i_lock. Takes s_mutex as needed. */ void __ceph_flush_snaps(struct ceph_inode_info *ci, - struct ceph_mds_session **psession) + struct ceph_mds_session **psession, + int again) + __releases(ci->vfs_inode->i_lock) + __acquires(ci->vfs_inode->i_lock) { struct inode *inode = &ci->vfs_inode; int mds; struct ceph_cap_snap *capsnap; u32 mseq; - struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc; + struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; struct ceph_mds_session *session = NULL; /* if session != NULL, we hold session->s_mutex */ u64 next_follows = 0; /* keep track of how far we've gotten through the @@ -1223,7 +1232,7 @@ retry: * pages to be written out. */ if (capsnap->dirty_pages || capsnap->writing) - continue; + break; /* * if cap writeback already occurred, we should have dropped @@ -1232,7 +1241,20 @@ retry: BUG_ON(capsnap->dirty == 0); /* pick mds, take s_mutex */ - mds = __ceph_get_cap_mds(ci, &mseq); + if (ci->i_auth_cap == NULL) { + dout("no auth cap (migrating?), doing nothing\n"); + goto out; + } + + /* only flush each capsnap once */ + if (!again && !list_empty(&capsnap->flushing_item)) { + dout("already flushed %p, skipping\n", capsnap); + continue; + } + + mds = ci->i_auth_cap->session->s_mds; + mseq = ci->i_auth_cap->mseq; + if (session && session->s_mds != mds) { dout("oops, wrong session %p mutex\n", session); mutex_unlock(&session->s_mutex); @@ -1251,8 +1273,8 @@ retry: } /* * if session == NULL, we raced against a cap - * deletion. retry, and we'll get a better - * @mds value next time. + * deletion or migration. retry, and we'll + * get a better @mds value next time. */ spin_lock(&inode->i_lock); goto retry; @@ -1266,8 +1288,8 @@ retry: &session->s_cap_snaps_flushing); spin_unlock(&inode->i_lock); - dout("flush_snaps %p cap_snap %p follows %lld size %llu\n", - inode, capsnap, next_follows, capsnap->size); + dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n", + inode, capsnap, capsnap->follows, capsnap->flush_tid); send_cap_msg(session, ceph_vino(inode).ino, 0, CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0, capsnap->dirty, 0, capsnap->flush_tid, 0, mseq, @@ -1275,7 +1297,7 @@ retry: &capsnap->mtime, &capsnap->atime, capsnap->time_warp_seq, capsnap->uid, capsnap->gid, capsnap->mode, - 0, NULL, + capsnap->xattr_version, capsnap->xattr_blob, capsnap->follows); next_follows = capsnap->follows + 1; @@ -1290,6 +1312,7 @@ retry: list_del_init(&ci->i_snap_flush_item); spin_unlock(&mdsc->snap_flush_lock); +out: if (psession) *psession = session; else if (session) { @@ -1303,7 +1326,7 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci) struct inode *inode = &ci->vfs_inode; spin_lock(&inode->i_lock); - __ceph_flush_snaps(ci, NULL); + __ceph_flush_snaps(ci, NULL, 0); spin_unlock(&inode->i_lock); } @@ -1314,7 +1337,7 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci) void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) { struct ceph_mds_client *mdsc = - &ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; + ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; struct inode *inode = &ci->vfs_inode; int was = ci->i_dirty_caps; int dirty = 0; @@ -1324,7 +1347,11 @@ void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) ceph_cap_string(was | mask)); ci->i_dirty_caps |= mask; if (was == 0) { - dout(" inode %p now dirty\n", &ci->vfs_inode); + if (!ci->i_head_snapc) + ci->i_head_snapc = ceph_get_snap_context( + ci->i_snap_realm->cached_context); + dout(" inode %p now dirty snapc %p\n", &ci->vfs_inode, + ci->i_head_snapc); BUG_ON(!list_empty(&ci->i_dirty_item)); spin_lock(&mdsc->cap_dirty_lock); list_add(&ci->i_dirty_item, &mdsc->cap_dirty); @@ -1352,7 +1379,7 @@ void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) static int __mark_caps_flushing(struct inode *inode, struct ceph_mds_session *session) { - struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc; + struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); int flushing; @@ -1390,17 +1417,6 @@ static int __mark_caps_flushing(struct inode *inode, /* * try to invalidate mapping pages without blocking. */ -static int mapping_is_empty(struct address_space *mapping) -{ - struct page *page = find_get_page(mapping, 0); - - if (!page) - return 1; - - put_page(page); - return 0; -} - static int try_nonblocking_invalidate(struct inode *inode) { struct ceph_inode_info *ci = ceph_inode(inode); @@ -1410,7 +1426,7 @@ static int try_nonblocking_invalidate(struct inode *inode) invalidate_mapping_pages(&inode->i_data, 0, -1); spin_lock(&inode->i_lock); - if (mapping_is_empty(&inode->i_data) && + if (inode->i_data.nrpages == 0 && invalidating_gen == ci->i_rdcache_gen) { /* success. */ dout("try_nonblocking_invalidate %p success\n", inode); @@ -1435,10 +1451,9 @@ static int try_nonblocking_invalidate(struct inode *inode) */ void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_mds_session *session) - __releases(session->s_mutex) { - struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode); - struct ceph_mds_client *mdsc = &client->mdsc; + struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode); + struct ceph_mds_client *mdsc = fsc->mdsc; struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; int file_wanted, used; @@ -1463,7 +1478,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, /* flush snaps first time around only */ if (!list_empty(&ci->i_cap_snaps)) - __ceph_flush_snaps(ci, &session); + __ceph_flush_snaps(ci, &session, 0); goto retry_locked; retry: spin_lock(&inode->i_lock); @@ -1508,13 +1523,15 @@ retry_locked: */ if ((!is_delayed || mdsc->stopping) && ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ - ci->i_rdcache_gen && /* may have cached pages */ + inode->i_data.nrpages && /* have cached pages */ (file_wanted == 0 || /* no open files */ - (revoking & CEPH_CAP_FILE_CACHE)) && /* or revoking cache */ + (revoking & (CEPH_CAP_FILE_CACHE| + CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */ !tried_invalidate) { dout("check_caps trying to invalidate on %p\n", inode); if (try_nonblocking_invalidate(inode) < 0) { - if (revoking & CEPH_CAP_FILE_CACHE) { + if (revoking & (CEPH_CAP_FILE_CACHE| + CEPH_CAP_FILE_LAZYIO)) { dout("check_caps queuing invalidate\n"); queue_invalidate = 1; ci->i_rdcache_revoking = ci->i_rdcache_gen; @@ -1679,7 +1696,7 @@ ack: static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session, unsigned *flush_tid) { - struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc; + struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); int unlock_session = session ? 0 : 1; int flushing = 0; @@ -1845,7 +1862,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) caps_are_flushed(inode, flush_tid)); } else { struct ceph_mds_client *mdsc = - &ceph_sb_to_client(inode->i_sb)->mdsc; + ceph_sb_to_client(inode->i_sb)->mdsc; spin_lock(&inode->i_lock); if (__ceph_caps_dirty(ci)) @@ -1878,7 +1895,7 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc, if (cap && cap->session == session) { dout("kick_flushing_caps %p cap %p capsnap %p\n", inode, cap, capsnap); - __ceph_flush_snaps(ci, &session); + __ceph_flush_snaps(ci, &session, 1); } else { pr_err("%p auth cap %p not mds%d ???\n", inode, cap, session->s_mds); @@ -2181,7 +2198,9 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, if (ci->i_head_snapc == snapc) { ci->i_wrbuffer_ref_head -= nr; - if (!ci->i_wrbuffer_ref_head) { + if (ci->i_wrbuffer_ref_head == 0 && + ci->i_dirty_caps == 0 && ci->i_flushing_caps == 0) { + BUG_ON(!ci->i_head_snapc); ceph_put_snap_context(ci->i_head_snapc); ci->i_head_snapc = NULL; } @@ -2250,12 +2269,12 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, struct ceph_mds_session *session, struct ceph_cap *cap, struct ceph_buffer *xattr_buf) - __releases(inode->i_lock) - __releases(session->s_mutex) + __releases(inode->i_lock) { struct ceph_inode_info *ci = ceph_inode(inode); int mds = session->s_mds; - int seq = le32_to_cpu(grant->seq); + unsigned seq = le32_to_cpu(grant->seq); + unsigned issue_seq = le32_to_cpu(grant->issue_seq); int newcaps = le32_to_cpu(grant->caps); int issued, implemented, used, wanted, dirty; u64 size = le64_to_cpu(grant->size); @@ -2267,8 +2286,8 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, int revoked_rdcache = 0; int queue_invalidate = 0; - dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", - inode, cap, mds, seq, ceph_cap_string(newcaps)); + dout("handle_cap_grant inode %p cap %p mds%d seq %u/%u %s\n", + inode, cap, mds, seq, issue_seq, ceph_cap_string(newcaps)); dout(" size %llu max_size %llu, i_size %llu\n", size, max_size, inode->i_size); @@ -2278,6 +2297,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, * will invalidate _after_ writeback.) */ if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && + (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && !ci->i_wrbuffer_ref) { if (try_nonblocking_invalidate(inode) == 0) { revoked_rdcache = 1; @@ -2363,21 +2383,29 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, } cap->seq = seq; + cap->issue_seq = issue_seq; /* file layout may have changed */ ci->i_layout = grant->layout; /* revocation, grant, or no-op? */ if (cap->issued & ~newcaps) { - dout("revocation: %s -> %s\n", ceph_cap_string(cap->issued), - ceph_cap_string(newcaps)); - if ((used & ~newcaps) & CEPH_CAP_FILE_BUFFER) - writeback = 1; /* will delay ack */ - else if (dirty & ~newcaps) - check_caps = 1; /* initiate writeback in check_caps */ - else if (((used & ~newcaps) & CEPH_CAP_FILE_CACHE) == 0 || - revoked_rdcache) - check_caps = 2; /* send revoke ack in check_caps */ + int revoking = cap->issued & ~newcaps; + + dout("revocation: %s -> %s (revoking %s)\n", + ceph_cap_string(cap->issued), + ceph_cap_string(newcaps), + ceph_cap_string(revoking)); + if (revoking & used & CEPH_CAP_FILE_BUFFER) + writeback = 1; /* initiate writeback; will delay ack */ + else if (revoking == CEPH_CAP_FILE_CACHE && + (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && + queue_invalidate) + ; /* do nothing yet, invalidation will be queued */ + else if (cap == ci->i_auth_cap) + check_caps = 1; /* check auth cap only */ + else + check_caps = 2; /* check all caps */ cap->issued = newcaps; cap->implemented |= newcaps; } else if (cap->issued == newcaps) { @@ -2427,7 +2455,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, __releases(inode->i_lock) { struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc; + struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; unsigned seq = le32_to_cpu(m->seq); int dirty = le32_to_cpu(m->dirty); int cleaned = 0; @@ -2467,6 +2495,11 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, dout(" inode %p now clean\n", inode); BUG_ON(!list_empty(&ci->i_dirty_item)); drop = 1; + if (ci->i_wrbuffer_ref_head == 0) { + BUG_ON(!ci->i_head_snapc); + ceph_put_snap_context(ci->i_head_snapc); + ci->i_head_snapc = NULL; + } } else { BUG_ON(list_empty(&ci->i_dirty_item)); } @@ -2568,7 +2601,8 @@ static void handle_cap_trunc(struct inode *inode, * caller holds s_mutex */ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, - struct ceph_mds_session *session) + struct ceph_mds_session *session, + int *open_target_sessions) { struct ceph_inode_info *ci = ceph_inode(inode); int mds = session->s_mds; @@ -2600,6 +2634,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, ci->i_cap_exporting_mds = mds; ci->i_cap_exporting_mseq = mseq; ci->i_cap_exporting_issued = cap->issued; + + /* + * make sure we have open sessions with all possible + * export targets, so that we get the matching IMPORT + */ + *open_target_sessions = 1; } __ceph_remove_cap(cap); } @@ -2663,7 +2703,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, struct ceph_msg *msg) { struct ceph_mds_client *mdsc = session->s_mdsc; - struct super_block *sb = mdsc->client->sb; + struct super_block *sb = mdsc->fsc->sb; struct inode *inode; struct ceph_cap *cap; struct ceph_mds_caps *h; @@ -2675,6 +2715,10 @@ void ceph_handle_caps(struct ceph_mds_session *session, u64 size, max_size; u64 tid; void *snaptrace; + size_t snaptrace_len; + void *flock; + u32 flock_len; + int open_target_sessions = 0; dout("handle_caps from mds%d\n", mds); @@ -2683,7 +2727,6 @@ void ceph_handle_caps(struct ceph_mds_session *session, if (msg->front.iov_len < sizeof(*h)) goto bad; h = msg->front.iov_base; - snaptrace = h + 1; op = le32_to_cpu(h->op); vino.ino = le64_to_cpu(h->ino); vino.snap = CEPH_NOSNAP; @@ -2693,6 +2736,21 @@ void ceph_handle_caps(struct ceph_mds_session *session, size = le64_to_cpu(h->size); max_size = le64_to_cpu(h->max_size); + snaptrace = h + 1; + snaptrace_len = le32_to_cpu(h->snap_trace_len); + + if (le16_to_cpu(msg->hdr.version) >= 2) { + void *p, *end; + + p = snaptrace + snaptrace_len; + end = msg->front.iov_base + msg->front.iov_len; + ceph_decode_32_safe(&p, end, flock_len, bad); + flock = p; + } else { + flock = NULL; + flock_len = 0; + } + mutex_lock(&session->s_mutex); session->s_seq++; dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, @@ -2708,15 +2766,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, if (op == CEPH_CAP_OP_IMPORT) __queue_cap_release(session, vino.ino, cap_id, mseq, seq); - - /* - * send any full release message to try to move things - * along for the mds (who clearly thinks we still have this - * cap). - */ - ceph_add_cap_releases(mdsc, session, -1); - ceph_send_cap_releases(mdsc, session); - goto done; + goto flush_cap_releases; } /* these will work even if we don't have a cap yet */ @@ -2726,12 +2776,12 @@ void ceph_handle_caps(struct ceph_mds_session *session, goto done; case CEPH_CAP_OP_EXPORT: - handle_cap_export(inode, h, session); + handle_cap_export(inode, h, session, &open_target_sessions); goto done; case CEPH_CAP_OP_IMPORT: handle_cap_import(mdsc, inode, h, session, - snaptrace, le32_to_cpu(h->snap_trace_len)); + snaptrace, snaptrace_len); ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY, session); goto done_unlocked; @@ -2744,7 +2794,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, dout(" no cap on %p ino %llx.%llx from mds%d\n", inode, ceph_ino(inode), ceph_snap(inode), mds); spin_unlock(&inode->i_lock); - goto done; + goto flush_cap_releases; } /* note that each of these drops i_lock for us */ @@ -2768,11 +2818,24 @@ void ceph_handle_caps(struct ceph_mds_session *session, ceph_cap_op_name(op)); } + goto done; + +flush_cap_releases: + /* + * send any full release message to try to move things + * along for the mds (who clearly thinks we still have this + * cap). + */ + ceph_add_cap_releases(mdsc, session); + ceph_send_cap_releases(mdsc, session); + done: mutex_unlock(&session->s_mutex); done_unlocked: if (inode) iput(inode); + if (open_target_sessions) + ceph_mdsc_open_export_target_sessions(mdsc, session); return; bad: |