From 65c491d833a06fd0d1383297590772c75d28155c Mon Sep 17 00:00:00 2001 From: Mark Fasheh Date: Mon, 6 Mar 2006 15:36:17 -0800 Subject: ocfs2: move lockres qstr next to hlist_node structure Gains us a bit of performance on loads which heavily hit the lockres hash. Patch suggested by Daniel Phillips . Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 88cc43d..1c05d48 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -216,6 +216,7 @@ struct dlm_lock_resource /* WARNING: Please see the comment in dlm_init_lockres before * adding fields here. */ struct hlist_node hash_node; + struct qstr lockname; struct kref refs; /* please keep these next 3 in this order @@ -238,7 +239,6 @@ struct dlm_lock_resource wait_queue_head_t wq; u8 owner; //node which owns the lock resource, or unknown u16 state; - struct qstr lockname; char lvb[DLM_LVB_LEN]; }; -- cgit v0.10.2 From a3d3329159ea76bae0b3b8680691a1c3ecf5801f Mon Sep 17 00:00:00 2001 From: Mark Fasheh Date: Thu, 9 Mar 2006 17:55:56 -0800 Subject: ocfs2: calculate lockid hash values outside of the spinlock Fixes a performance bug - pointed out by Andrew. Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 1c05d48..3b67536 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -39,6 +39,9 @@ #define DLM_HASH_BUCKETS (PAGE_SIZE / sizeof(struct hlist_head)) +/* Intended to make it easier for us to switch out hash functions */ +#define dlm_lockid_hash(_n, _l) full_name_hash(_n, _l) + enum dlm_ast_type { DLM_AST = 0, DLM_BAST, @@ -694,7 +697,8 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, const char *name, - unsigned int len); + unsigned int len, + unsigned int hash); struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, const char *name, unsigned int len); diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index 8f3a9e3..a818fde 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -90,7 +90,6 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm, assert_spin_locked(&dlm->spinlock); q = &res->lockname; - q->hash = full_name_hash(q->name, q->len); bucket = &(dlm->lockres_hash[q->hash % DLM_HASH_BUCKETS]); /* get a reference for our hashtable */ @@ -100,10 +99,10 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm, } struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, - const char *name, - unsigned int len) + const char *name, + unsigned int len, + unsigned int hash) { - unsigned int hash; struct hlist_node *iter; struct dlm_lock_resource *tmpres=NULL; struct hlist_head *bucket; @@ -112,8 +111,6 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, assert_spin_locked(&dlm->spinlock); - hash = full_name_hash(name, len); - bucket = &(dlm->lockres_hash[hash % DLM_HASH_BUCKETS]); /* check for pre-existing lock */ @@ -135,9 +132,10 @@ struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, unsigned int len) { struct dlm_lock_resource *res; + unsigned int hash = dlm_lockid_hash(name, len); spin_lock(&dlm->spinlock); - res = __dlm_lookup_lockres(dlm, name, len); + res = __dlm_lookup_lockres(dlm, name, len, hash); spin_unlock(&dlm->spinlock); return res; } diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 940be4c..953aa84 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -603,7 +603,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm, memcpy(qname, name, namelen); res->lockname.len = namelen; - res->lockname.hash = full_name_hash(name, namelen); + res->lockname.hash = dlm_lockid_hash(name, namelen); init_waitqueue_head(&res->wq); spin_lock_init(&res->spinlock); @@ -677,19 +677,20 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, int blocked = 0; int ret, nodenum; struct dlm_node_iter iter; - unsigned int namelen; + unsigned int namelen, hash; int tries = 0; int bit, wait_on_recovery = 0; BUG_ON(!lockid); namelen = strlen(lockid); + hash = dlm_lockid_hash(lockid, namelen); mlog(0, "get lockres %s (len %d)\n", lockid, namelen); lookup: spin_lock(&dlm->spinlock); - tmpres = __dlm_lookup_lockres(dlm, lockid, namelen); + tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash); if (tmpres) { spin_unlock(&dlm->spinlock); mlog(0, "found in hash!\n"); @@ -1316,7 +1317,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) struct dlm_master_request *request = (struct dlm_master_request *) msg->buf; struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL; char *name; - unsigned int namelen; + unsigned int namelen, hash; int found, ret; int set_maybe; int dispatch_assert = 0; @@ -1331,6 +1332,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) name = request->name; namelen = request->namelen; + hash = dlm_lockid_hash(name, namelen); if (namelen > DLM_LOCKID_NAME_MAX) { response = DLM_IVBUFLEN; @@ -1339,7 +1341,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) way_up_top: spin_lock(&dlm->spinlock); - res = __dlm_lookup_lockres(dlm, name, namelen); + res = __dlm_lookup_lockres(dlm, name, namelen, hash); if (res) { spin_unlock(&dlm->spinlock); @@ -1612,7 +1614,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf; struct dlm_lock_resource *res = NULL; char *name; - unsigned int namelen; + unsigned int namelen, hash; u32 flags; int master_request = 0; int ret = 0; @@ -1622,6 +1624,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) name = assert->name; namelen = assert->namelen; + hash = dlm_lockid_hash(name, namelen); flags = be32_to_cpu(assert->flags); if (namelen > DLM_LOCKID_NAME_MAX) { @@ -1670,7 +1673,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) /* ok everything checks out with the MLE * now check to see if there is a lockres */ - res = __dlm_lookup_lockres(dlm, name, namelen); + res = __dlm_lookup_lockres(dlm, name, namelen, hash); if (res) { spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) { @@ -2462,7 +2465,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf; struct dlm_master_list_entry *mle = NULL, *oldmle = NULL; const char *name; - unsigned int namelen; + unsigned int namelen, hash; int ret = 0; if (!dlm_grab(dlm)) @@ -2470,6 +2473,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) name = migrate->name; namelen = migrate->namelen; + hash = dlm_lockid_hash(name, namelen); /* preallocate.. if this fails, abort */ mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, @@ -2482,7 +2486,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) /* check for pre-existing lock */ spin_lock(&dlm->spinlock); - res = __dlm_lookup_lockres(dlm, name, namelen); + res = __dlm_lookup_lockres(dlm, name, namelen, hash); spin_lock(&dlm->master_lock); if (res) { @@ -2601,6 +2605,7 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node) struct list_head *iter, *iter2; struct dlm_master_list_entry *mle; struct dlm_lock_resource *res; + unsigned int hash; mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node); top: @@ -2684,8 +2689,9 @@ top: mle->master, mle->new_master); /* if there is a lockres associated with this * mle, find it and set its owner to UNKNOWN */ + hash = dlm_lockid_hash(mle->u.name.name, mle->u.name.len); res = __dlm_lookup_lockres(dlm, mle->u.name.name, - mle->u.name.len); + mle->u.name.len, hash); if (res) { /* unfortunately if we hit this rare case, our * lock ordering is messed. we need to drop diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 9962190..4f3d482 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1404,6 +1404,7 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data) struct dlm_ctxt *dlm = data; struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf; struct dlm_lock_resource *res = NULL; + unsigned int hash; int master = DLM_LOCK_RES_OWNER_UNKNOWN; u32 flags = DLM_ASSERT_MASTER_REQUERY; @@ -1413,8 +1414,10 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data) return master; } + hash = dlm_lockid_hash(req->name, req->namelen); + spin_lock(&dlm->spinlock); - res = __dlm_lookup_lockres(dlm, req->name, req->namelen); + res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash); if (res) { spin_lock(&res->spinlock); master = res->owner; -- cgit v0.10.2 From 4198985f7ae119a23f83503a692dd822bd574080 Mon Sep 17 00:00:00 2001 From: Daniel Phillips Date: Fri, 10 Mar 2006 13:31:47 -0800 Subject: [PATCH] Clean up ocfs2 hash probe and make it faster Signed-Off-By: Daniel Phillips Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index a818fde..595b1c7 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -103,28 +103,27 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, unsigned int len, unsigned int hash) { - struct hlist_node *iter; - struct dlm_lock_resource *tmpres=NULL; struct hlist_head *bucket; + struct hlist_node *list; mlog_entry("%.*s\n", len, name); assert_spin_locked(&dlm->spinlock); - bucket = &(dlm->lockres_hash[hash % DLM_HASH_BUCKETS]); - - /* check for pre-existing lock */ - hlist_for_each(iter, bucket) { - tmpres = hlist_entry(iter, struct dlm_lock_resource, hash_node); - if (tmpres->lockname.len == len && - memcmp(tmpres->lockname.name, name, len) == 0) { - dlm_lockres_get(tmpres); - break; - } - - tmpres = NULL; + bucket = dlm->lockres_hash + full_name_hash(name, len) % DLM_HASH_BUCKETS; + hlist_for_each(list, bucket) { + struct dlm_lock_resource *res = hlist_entry(list, + struct dlm_lock_resource, hash_node); + if (res->lockname.name[0] != name[0]) + continue; + if (unlikely(res->lockname.len != len)) + continue; + if (memcmp(res->lockname.name + 1, name + 1, len - 1)) + continue; + dlm_lockres_get(res); + return res; } - return tmpres; + return NULL; } struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, -- cgit v0.10.2 From 95c4f581d6551de55cf5b8693db98b01ce07021b Mon Sep 17 00:00:00 2001 From: Mark Fasheh Date: Fri, 10 Mar 2006 13:44:00 -0800 Subject: ocfs2: inline dlm_lockres_get() It's called on every lookup so this might help performance a bit. Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 3b67536..8761281 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -690,7 +690,12 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres); -void dlm_lockres_get(struct dlm_lock_resource *res); +static inline void dlm_lockres_get(struct dlm_lock_resource *res) +{ + /* This is called on every lookup, so it might be worth + * inlining. */ + kref_get(&res->refs); +} void dlm_lockres_put(struct dlm_lock_resource *res); void __dlm_unhash_lockres(struct dlm_lock_resource *res); void __dlm_insert_lockres(struct dlm_ctxt *dlm, diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 953aa84..f1fbf2f 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -579,11 +579,6 @@ static void dlm_lockres_release(struct kref *kref) kfree(res); } -void dlm_lockres_get(struct dlm_lock_resource *res) -{ - kref_get(&res->refs); -} - void dlm_lockres_put(struct dlm_lock_resource *res) { kref_put(&res->refs, dlm_lockres_release); -- cgit v0.10.2 From 03d864c02c3ea803b1718940ac6953a257182d7a Mon Sep 17 00:00:00 2001 From: Daniel Phillips Date: Fri, 10 Mar 2006 18:08:16 -0800 Subject: ocfs2: allocate lockres hash pages in an array This allows us to have a hash table greater than a single page which greatly improves dlm performance on some tests. Signed-off-by: Daniel Phillips Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 8761281..0378ddb 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -37,7 +37,10 @@ #define DLM_THREAD_SHUFFLE_INTERVAL 5 // flush everything every 5 passes #define DLM_THREAD_MS 200 // flush at least every 200 ms -#define DLM_HASH_BUCKETS (PAGE_SIZE / sizeof(struct hlist_head)) +#define DLM_HASH_SIZE (1 << 14) +#define DLM_HASH_PAGES (DLM_HASH_SIZE / PAGE_SIZE) +#define DLM_BUCKETS_PER_PAGE (PAGE_SIZE / sizeof(struct hlist_head)) +#define DLM_HASH_BUCKETS (DLM_HASH_PAGES * DLM_BUCKETS_PER_PAGE) /* Intended to make it easier for us to switch out hash functions */ #define dlm_lockid_hash(_n, _l) full_name_hash(_n, _l) @@ -88,7 +91,7 @@ enum dlm_ctxt_state { struct dlm_ctxt { struct list_head list; - struct hlist_head *lockres_hash; + struct hlist_head **lockres_hash; struct list_head dirty_list; struct list_head purge_list; struct list_head pending_asts; @@ -135,6 +138,11 @@ struct dlm_ctxt struct list_head dlm_eviction_callbacks; }; +static inline struct hlist_head *dlm_lockres_hash(struct dlm_ctxt *dlm, unsigned i) +{ + return dlm->lockres_hash[(i / DLM_BUCKETS_PER_PAGE) % DLM_HASH_PAGES] + (i % DLM_BUCKETS_PER_PAGE); +} + /* these keventd work queue items are for less-frequently * called functions that cannot be directly called from the * net message handlers for some reason, usually because diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c index c7eae5d..e119e4d 100644 --- a/fs/ocfs2/dlm/dlmdebug.c +++ b/fs/ocfs2/dlm/dlmdebug.c @@ -136,7 +136,7 @@ void dlm_dump_lock_resources(struct dlm_ctxt *dlm) spin_lock(&dlm->spinlock); for (i=0; ilockres_hash[i]); + bucket = dlm_lockres_hash(dlm, i); hlist_for_each_entry(res, iter, bucket, hash_node) dlm_print_one_lock_resource(res); } diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index 595b1c7..80b8cce 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -49,6 +49,30 @@ #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN) #include "cluster/masklog.h" +static void dlm_free_pagevec(void **vec, int pages) +{ + while (pages--) + free_page((unsigned long)vec[pages]); + kfree(vec); +} + +static void **dlm_alloc_pagevec(int pages) +{ + void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL); + int i; + + if (!vec) + return NULL; + + for (i = 0; i < pages; i++) + if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL))) + goto out_free; + return vec; +out_free: + dlm_free_pagevec(vec, i); + return NULL; +} + /* * * spinlock lock ordering: if multiple locks are needed, obey this ordering: @@ -90,7 +114,7 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm, assert_spin_locked(&dlm->spinlock); q = &res->lockname; - bucket = &(dlm->lockres_hash[q->hash % DLM_HASH_BUCKETS]); + bucket = dlm_lockres_hash(dlm, q->hash); /* get a reference for our hashtable */ dlm_lockres_get(res); @@ -110,7 +134,8 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, assert_spin_locked(&dlm->spinlock); - bucket = dlm->lockres_hash + full_name_hash(name, len) % DLM_HASH_BUCKETS; + bucket = dlm_lockres_hash(dlm, hash); + hlist_for_each(list, bucket) { struct dlm_lock_resource *res = hlist_entry(list, struct dlm_lock_resource, hash_node); @@ -191,7 +216,7 @@ static int dlm_wait_on_domain_helper(const char *domain) static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm) { if (dlm->lockres_hash) - free_page((unsigned long) dlm->lockres_hash); + dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); if (dlm->name) kfree(dlm->name); @@ -301,8 +326,8 @@ static void dlm_migrate_all_locks(struct dlm_ctxt *dlm) restart: spin_lock(&dlm->spinlock); for (i = 0; i < DLM_HASH_BUCKETS; i++) { - while (!hlist_empty(&dlm->lockres_hash[i])) { - res = hlist_entry(dlm->lockres_hash[i].first, + while (!hlist_empty(dlm_lockres_hash(dlm, i))) { + res = hlist_entry(dlm_lockres_hash(dlm, i)->first, struct dlm_lock_resource, hash_node); /* need reference when manually grabbing lockres */ dlm_lockres_get(res); @@ -1188,7 +1213,7 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, goto leave; } - dlm->lockres_hash = (struct hlist_head *) __get_free_page(GFP_KERNEL); + dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES); if (!dlm->lockres_hash) { mlog_errno(-ENOMEM); kfree(dlm->name); @@ -1197,8 +1222,8 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, goto leave; } - for (i=0; ilockres_hash[i]); + for (i = 0; i < DLM_HASH_BUCKETS; i++) + INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i)); strcpy(dlm->name, domain); dlm->key = key; diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 4f3d482..bf2a02e 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1719,7 +1719,7 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, * the RECOVERING state and set the owner * if necessary */ for (i = 0; i < DLM_HASH_BUCKETS; i++) { - bucket = &(dlm->lockres_hash[i]); + bucket = dlm_lockres_hash(dlm, i); hlist_for_each_entry(res, hash_iter, bucket, hash_node) { if (res->state & DLM_LOCK_RES_RECOVERING) { if (res->owner == dead_node) { @@ -1884,7 +1884,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) * need to be fired as a result. */ for (i = 0; i < DLM_HASH_BUCKETS; i++) { - bucket = &(dlm->lockres_hash[i]); + bucket = dlm_lockres_hash(dlm, i); hlist_for_each_entry(res, iter, bucket, hash_node) { /* always prune any $RECOVERY entries for dead nodes, * otherwise hangs can occur during later recovery */ -- cgit v0.10.2 From c8f33b6e86af74ee7b800f57cac7b3c8559318fe Mon Sep 17 00:00:00 2001 From: Joel Becker Date: Thu, 16 Mar 2006 17:40:37 -0800 Subject: [PATCH] ocfs2: Alloc at least a page for the DLM hash The OCFS2 DLM allocates a number of pages for a hash to lookup locks. There was a bug where a PAGE_SIZE bigger than the hash size (eg, 64K pages) would result in zero pages allocated. Signed-off-by: Joel Becker Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 0378ddb..4fc1be3 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -37,8 +37,12 @@ #define DLM_THREAD_SHUFFLE_INTERVAL 5 // flush everything every 5 passes #define DLM_THREAD_MS 200 // flush at least every 200 ms -#define DLM_HASH_SIZE (1 << 14) -#define DLM_HASH_PAGES (DLM_HASH_SIZE / PAGE_SIZE) +#define DLM_HASH_SIZE_DEFAULT (1 << 14) +#if DLM_HASH_SIZE_DEFAULT < PAGE_SIZE +# define DLM_HASH_PAGES 1 +#else +# define DLM_HASH_PAGES (DLM_HASH_SIZE_DEFAULT / PAGE_SIZE) +#endif #define DLM_BUCKETS_PER_PAGE (PAGE_SIZE / sizeof(struct hlist_head)) #define DLM_HASH_BUCKETS (DLM_HASH_PAGES * DLM_BUCKETS_PER_PAGE) diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index 80b8cce..a074ec6 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -67,6 +67,9 @@ static void **dlm_alloc_pagevec(int pages) for (i = 0; i < pages; i++) if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL))) goto out_free; + + mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %Zd buckets per page\n", + pages, DLM_HASH_PAGES, DLM_BUCKETS_PER_PAGE); return vec; out_free: dlm_free_pagevec(vec, i); -- cgit v0.10.2 From 685f1adb3872d904e08e22fab699f34432d5068a Mon Sep 17 00:00:00 2001 From: Mark Fasheh Date: Thu, 23 Mar 2006 11:23:29 -0800 Subject: ocfs2: silence a compile warning in dlm_alloc_pagevec() Reported by Andrew Morton. Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index a074ec6..3511930 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -68,8 +68,8 @@ static void **dlm_alloc_pagevec(int pages) if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL))) goto out_free; - mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %Zd buckets per page\n", - pages, DLM_HASH_PAGES, DLM_BUCKETS_PER_PAGE); + mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n", + pages, DLM_HASH_PAGES, (unsigned long)DLM_BUCKETS_PER_PAGE); return vec; out_free: dlm_free_pagevec(vec, i); -- cgit v0.10.2 From 8d79d088e88198d5456861ee9e6a8226dcd08799 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 17:58:23 -0700 Subject: ocfs2: add a small delay after a failed migration Otherwise we risk starving other threads. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index 44d3b57..22bb58a 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -39,6 +39,7 @@ #include #include #include +#include #include "cluster/heartbeat.h" @@ -165,6 +166,7 @@ again: } else if (ret < 0) { mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n", lockres->lockname.len, lockres->lockname.name); + msleep(100); goto again; } @@ -640,8 +642,9 @@ static int dlm_thread(void *data) * spinlock and do NOT have the dlm lock. * safe to reserve/queue asts and run the lists. */ - mlog(0, "calling dlm_shuffle_lists with dlm=%p, " - "res=%p\n", dlm, res); + mlog(0, "calling dlm_shuffle_lists with dlm=%s, " + "res=%.*s\n", dlm->name, + res->lockname.len, res->lockname.name); /* called while holding lockres lock */ dlm_shuffle_lists(dlm, res); -- cgit v0.10.2 From 2580a580e029f9a59a66cd230b1fd7e2d9ee339d Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 17:59:46 -0700 Subject: ocfs2: recheck lockres master before sending an unlock request. Recovery may have happened and it may now be mastered locally. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmunlock.c b/fs/ocfs2/dlm/dlmunlock.c index ac89c50..9b40d60 100644 --- a/fs/ocfs2/dlm/dlmunlock.c +++ b/fs/ocfs2/dlm/dlmunlock.c @@ -317,6 +317,16 @@ static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm, size_t veclen = 1; mlog_entry("%.*s\n", res->lockname.len, res->lockname.name); + + if (owner == dlm->node_num) { + /* ended up trying to contact ourself. this means + * that the lockres had been remote but became local + * via a migration. just retry it, now as local */ + mlog(0, "%s:%.*s: this node became the master due to a " + "migration, re-evaluate now\n", dlm->name, + res->lockname.len, res->lockname.name); + return DLM_FORWARD; + } memset(&unlock, 0, sizeof(unlock)); unlock.node_idx = dlm->node_num; -- cgit v0.10.2 From aba9aac78817d88aa2b223f1aedf1e9815ae97b8 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 18:00:21 -0700 Subject: ocfs2: fix inverted logic in dlm_is_node_dead Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index bf2a02e..b568bb6 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -267,7 +267,7 @@ int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node) { int dead; spin_lock(&dlm->spinlock); - dead = test_bit(node, dlm->domain_map); + dead = !test_bit(node, dlm->domain_map); spin_unlock(&dlm->spinlock); return dead; } -- cgit v0.10.2 From 8bc674cb4834fb25206b7f7f5e37fe571aa76b34 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 18:02:10 -0700 Subject: ocfs2: Fix empty lvb check The check for an empty lvb should check the entire buffer not just the first byte. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 4fc1be3..bf87391 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -315,6 +315,15 @@ enum dlm_lockres_list { DLM_BLOCKED_LIST }; +static inline int dlm_lvb_is_empty(char *lvb) +{ + int i; + for (i=0; itype == LKM_PRMODE) { /* if it is already set, this had better be a PR * and it has to match */ - if (mres->lvb[0] && (ml->type == LKM_EXMODE || - memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) { + if (!dlm_lvb_is_empty(mres->lvb) && + (ml->type == LKM_EXMODE || + memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) { mlog(ML_ERROR, "mismatched lvbs!\n"); __dlm_print_one_lock_resource(lock->lockres); BUG(); @@ -1554,7 +1555,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, lksb->flags |= (ml->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)); - if (mres->lvb[0]) { + if (!dlm_lvb_is_empty(mres->lvb)) { if (lksb->flags & DLM_LKSB_PUT_LVB) { /* other node was trying to update * lvb when node died. recreate the @@ -1565,8 +1566,9 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, * most recent valid lvb info */ BUG_ON(ml->type != LKM_EXMODE && ml->type != LKM_PRMODE); - if (res->lvb[0] && (ml->type == LKM_EXMODE || - memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { + if (!dlm_lvb_is_empty(res->lvb) && + (ml->type == LKM_EXMODE || + memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { mlog(ML_ERROR, "received bad lvb!\n"); __dlm_print_one_lock_resource(res); BUG(); -- cgit v0.10.2 From ab27eb6f47092923a92f7c164dcf9be3b76f3944 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 18:03:49 -0700 Subject: ocfs2: Better tracking for recovery state changes Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 8a3f017..ee0860b 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -115,12 +115,31 @@ static u64 dlm_get_next_mig_cookie(void) return c; } +static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm, + u8 dead_node) +{ + assert_spin_locked(&dlm->spinlock); + if (dlm->reco.dead_node != dead_node) + mlog(0, "%s: changing dead_node from %u to %u\n", + dlm->name, dlm->reco.dead_node, dead_node); + dlm->reco.dead_node = dead_node; +} + +static inline void dlm_set_reco_master(struct dlm_ctxt *dlm, + u8 master) +{ + assert_spin_locked(&dlm->spinlock); + mlog(0, "%s: changing new_master from %u to %u\n", + dlm->name, dlm->reco.new_master, master); + dlm->reco.new_master = master; +} + static inline void dlm_reset_recovery(struct dlm_ctxt *dlm) { spin_lock(&dlm->spinlock); clear_bit(dlm->reco.dead_node, dlm->recovery_map); - dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; - dlm->reco.new_master = O2NM_INVALID_NODE_NUM; + dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); + dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); spin_unlock(&dlm->spinlock); } @@ -341,7 +360,7 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) mlog(0, "new master %u died while recovering %u!\n", dlm->reco.new_master, dlm->reco.dead_node); /* unset the new_master, leave dead_node */ - dlm->reco.new_master = O2NM_INVALID_NODE_NUM; + dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); } /* select a target to recover */ @@ -350,14 +369,14 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0); if (bit >= O2NM_MAX_NODES || bit < 0) - dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; + dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); else - dlm->reco.dead_node = bit; + dlm_set_reco_dead_node(dlm, bit); } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) { /* BUG? */ mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n", dlm->reco.dead_node); - dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; + dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); } if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { @@ -2089,7 +2108,7 @@ again: /* set the new_master to this node */ spin_lock(&dlm->spinlock); - dlm->reco.new_master = dlm->node_num; + dlm_set_reco_master(dlm, dlm->node_num); spin_unlock(&dlm->spinlock); } @@ -2254,8 +2273,8 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data) "node %u changing it to %u\n", dlm->name, dlm->reco.dead_node, br->node_idx, br->dead_node); } - dlm->reco.new_master = br->node_idx; - dlm->reco.dead_node = br->dead_node; + dlm_set_reco_master(dlm, br->node_idx); + dlm_set_reco_dead_node(dlm, br->dead_node); if (!test_bit(br->dead_node, dlm->recovery_map)) { mlog(0, "recovery master %u sees %u as dead, but this " "node has not yet. marking %u as dead\n", -- cgit v0.10.2 From c3187ce5e335cf8e06391236cc1ad7d1b1e193ed Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 18:05:41 -0700 Subject: ocfs2: only recover one dead node at a time Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index ee0860b..3948876 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -710,6 +710,14 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data) if (!dlm_grab(dlm)) return -EINVAL; + if (lr->dead_node != dlm->reco.dead_node) { + mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local " + "dead_node is %u\n", dlm->name, lr->node_idx, + lr->dead_node, dlm->reco.dead_node); + /* this is a hack */ + dlm_put(dlm); + return -ENOMEM; + } BUG_ON(lr->dead_node != dlm->reco.dead_node); item = kcalloc(1, sizeof(*item), GFP_KERNEL); @@ -1504,7 +1512,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, struct dlm_lock *newlock = NULL; struct dlm_lockstatus *lksb = NULL; int ret = 0; - int i; + int i, bad; struct list_head *iter; struct dlm_lock *lock = NULL; @@ -1613,9 +1621,33 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, * relative to each other, but clearly *not* * preserved relative to locks from other nodes. */ + bad = 0; spin_lock(&res->spinlock); - dlm_lock_get(newlock); - list_add_tail(&newlock->list, queue); + list_for_each_entry(lock, queue, list) { + if (lock->ml.cookie == ml->cookie) { + u64 c = lock->ml.cookie; + mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already " + "exists on this lockres!\n", dlm->name, + res->lockname.len, res->lockname.name, + dlm_get_lock_cookie_node(c), + dlm_get_lock_cookie_seq(c)); + + mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, " + "node=%u, cookie=%u:%llu, queue=%d\n", + ml->type, ml->convert_type, ml->node, + dlm_get_lock_cookie_node(ml->cookie), + dlm_get_lock_cookie_seq(ml->cookie), + ml->list); + + __dlm_print_one_lock_resource(res); + bad = 1; + break; + } + } + if (!bad) { + dlm_lock_get(newlock); + list_add_tail(&newlock->list, queue); + } spin_unlock(&res->spinlock); } mlog(0, "done running all the locks\n"); -- cgit v0.10.2 From 29c0fa0f56f20b4512f65b0f3e55bc8af50485b7 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 18:06:58 -0700 Subject: ocfs2: handle network errors during recovery Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 3948876..59c8976 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -757,6 +757,7 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) struct list_head *iter; int ret; u8 dead_node, reco_master; + int skip_all_done = 0; dlm = item->dlm; dead_node = item->u.ral.dead_node; @@ -793,12 +794,18 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) dlm_move_reco_locks_to_list(dlm, &resources, dead_node); /* now we can begin blasting lockreses without the dlm lock */ + + /* any errors returned will be due to the new_master dying, + * the dlm_reco_thread should detect this */ list_for_each(iter, &resources) { res = list_entry (iter, struct dlm_lock_resource, recovering); ret = dlm_send_one_lockres(dlm, res, mres, reco_master, DLM_MRES_RECOVERY); - if (ret < 0) + if (ret < 0) { mlog_errno(ret); + skip_all_done = 1; + break; + } } /* move the resources back to the list */ @@ -806,9 +813,12 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) list_splice_init(&resources, &dlm->reco.resources); spin_unlock(&dlm->spinlock); - ret = dlm_send_all_done_msg(dlm, dead_node, reco_master); - if (ret < 0) - mlog_errno(ret); + if (!skip_all_done) { + ret = dlm_send_all_done_msg(dlm, dead_node, reco_master); + if (ret < 0) { + mlog_errno(ret); + } + } free_page((unsigned long)data); } @@ -828,8 +838,14 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to) ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, sizeof(done_msg), send_to, &tmpret); - /* negative status is ignored by the caller */ - if (ret >= 0) + if (ret < 0) { + if (!dlm_is_host_down(ret)) { + mlog_errno(ret); + mlog(ML_ERROR, "%s: unknown error sending data-done " + "to %u\n", dlm->name, send_to); + BUG(); + } + } else ret = tmpret; return ret; } @@ -1109,22 +1125,25 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, * we must send it immediately. */ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); - if (ret < 0) { - // TODO - mlog(ML_ERROR, "dlm_send_mig_lockres_msg " - "returned %d, TODO\n", ret); - BUG(); - } + if (ret < 0) + goto error; } } /* flush any remaining locks */ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); - if (ret < 0) { - // TODO - mlog(ML_ERROR, "dlm_send_mig_lockres_msg returned %d, " - "TODO\n", ret); + if (ret < 0) + goto error; + return ret; + +error: + mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n", + dlm->name, ret); + if (!dlm_is_host_down(ret)) BUG(); - } + mlog(0, "%s: node %u went down while sending %s " + "lockres %.*s\n", dlm->name, send_to, + flags & DLM_MRES_RECOVERY ? "recovery" : "migration", + res->lockname.len, res->lockname.name); return ret; } -- cgit v0.10.2 From d6dea6e9732f5319b723e14f7adbc5f99cd69aec Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 18:08:51 -0700 Subject: ocfs2: clean up recovery related messages Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 59c8976..81bd240 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -239,6 +239,52 @@ void dlm_complete_recovery_thread(struct dlm_ctxt *dlm) * */ +static void dlm_print_reco_node_status(struct dlm_ctxt *dlm) +{ + struct dlm_reco_node_data *ndata; + struct dlm_lock_resource *res; + + mlog(ML_NOTICE, "%s(%d): recovery info, state=%s, dead=%u, master=%u\n", + dlm->name, dlm->dlm_reco_thread_task->pid, + dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive", + dlm->reco.dead_node, dlm->reco.new_master); + + list_for_each_entry(ndata, &dlm->reco.node_data, list) { + char *st = "unknown"; + switch (ndata->state) { + case DLM_RECO_NODE_DATA_INIT: + st = "init"; + break; + case DLM_RECO_NODE_DATA_REQUESTING: + st = "requesting"; + break; + case DLM_RECO_NODE_DATA_DEAD: + st = "dead"; + break; + case DLM_RECO_NODE_DATA_RECEIVING: + st = "receiving"; + break; + case DLM_RECO_NODE_DATA_REQUESTED: + st = "requested"; + break; + case DLM_RECO_NODE_DATA_DONE: + st = "done"; + break; + case DLM_RECO_NODE_DATA_FINALIZE_SENT: + st = "finalize-sent"; + break; + default: + st = "bad"; + break; + } + mlog(ML_NOTICE, "%s: reco state, node %u, state=%s\n", + dlm->name, ndata->node_num, st); + } + list_for_each_entry(res, &dlm->reco.resources, recovering) { + mlog(ML_NOTICE, "%s: lockres %.*s on recovering list\n", + dlm->name, res->lockname.len, res->lockname.name); + } +} #define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000) @@ -385,7 +431,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) /* return to main thread loop and sleep. */ return 0; } - mlog(0, "recovery thread found node %u in the recovery map!\n", + mlog(0, "%s(%d):recovery thread found node %u in the recovery map!\n", + dlm->name, dlm->dlm_reco_thread_task->pid, dlm->reco.dead_node); spin_unlock(&dlm->spinlock); @@ -408,8 +455,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) } mlog(0, "another node will master this recovery session.\n"); } - mlog(0, "dlm=%s, new_master=%u, this node=%u, dead_node=%u\n", - dlm->name, dlm->reco.new_master, + mlog(0, "dlm=%s (%d), new_master=%u, this node=%u, dead_node=%u\n", + dlm->name, dlm->dlm_reco_thread_task->pid, dlm->reco.new_master, dlm->node_num, dlm->reco.dead_node); /* it is safe to start everything back up here @@ -421,7 +468,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) return 0; master_here: - mlog(0, "mastering recovery of %s:%u here(this=%u)!\n", + mlog(0, "(%d) mastering recovery of %s:%u here(this=%u)!\n", + dlm->dlm_reco_thread_task->pid, dlm->name, dlm->reco.dead_node, dlm->node_num); status = dlm_remaster_locks(dlm, dlm->reco.dead_node); @@ -563,11 +611,19 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) goto leave; case DLM_RECO_NODE_DATA_RECEIVING: case DLM_RECO_NODE_DATA_REQUESTED: + mlog(0, "%s: node %u still in state %s\n", + dlm->name, ndata->node_num, + ndata->state==DLM_RECO_NODE_DATA_RECEIVING ? + "receiving" : "requested"); all_nodes_done = 0; break; case DLM_RECO_NODE_DATA_DONE: + mlog(0, "%s: node %u state is done\n", + dlm->name, ndata->node_num); break; case DLM_RECO_NODE_DATA_FINALIZE_SENT: + mlog(0, "%s: node %u state is finalize\n", + dlm->name, ndata->node_num); break; } } @@ -714,6 +770,7 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data) mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local " "dead_node is %u\n", dlm->name, lr->node_idx, lr->dead_node, dlm->reco.dead_node); + dlm_print_reco_node_status(dlm); /* this is a hack */ dlm_put(dlm); return -ENOMEM; @@ -764,6 +821,9 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) reco_master = item->u.ral.reco_master; mres = (struct dlm_migratable_lockres *)data; + mlog(0, "%s: recovery worker started, dead=%u, master=%u\n", + dlm->name, dead_node, reco_master); + if (dead_node != dlm->reco.dead_node || reco_master != dlm->reco.new_master) { /* show extra debug info if the recovery state is messed */ @@ -802,7 +862,9 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) ret = dlm_send_one_lockres(dlm, res, mres, reco_master, DLM_MRES_RECOVERY); if (ret < 0) { - mlog_errno(ret); + mlog(ML_ERROR, "%s: node %u went down while sending " + "recovery state for dead node %u, ret=%d\n", dlm->name, + reco_master, dead_node, ret); skip_all_done = 1; break; } @@ -816,7 +878,9 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) if (!skip_all_done) { ret = dlm_send_all_done_msg(dlm, dead_node, reco_master); if (ret < 0) { - mlog_errno(ret); + mlog(ML_ERROR, "%s: node %u went down while sending " + "recovery all-done for dead node %u, ret=%d\n", + dlm->name, reco_master, dead_node, ret); } } @@ -865,7 +929,11 @@ int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data) mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, " "node_idx=%u, this node=%u\n", done->dead_node, dlm->reco.dead_node, done->node_idx, dlm->node_num); - BUG_ON(done->dead_node != dlm->reco.dead_node); + + mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node), + "Got DATA DONE: dead_node=%u, reco.dead_node=%u, " + "node_idx=%u, this node=%u\n", done->dead_node, + dlm->reco.dead_node, done->node_idx, dlm->node_num); spin_lock(&dlm_reco_state_lock); list_for_each(iter, &dlm->reco.node_data) { @@ -2228,7 +2296,7 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node) mlog_entry("%u\n", dead_node); - mlog(0, "dead node is %u\n", dead_node); + mlog(0, "%s: dead node is %u\n", dlm->name, dead_node); spin_lock(&dlm->spinlock); dlm_node_iter_init(dlm->domain_map, &iter); @@ -2301,8 +2369,9 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data) if (!dlm_grab(dlm)) return 0; - mlog(0, "node %u wants to recover node %u\n", - br->node_idx, br->dead_node); + mlog(0, "%s: node %u wants to recover node %u (%u:%u)\n", + dlm->name, br->node_idx, br->dead_node, + dlm->reco.dead_node, dlm->reco.new_master); dlm_fire_domain_eviction_callbacks(dlm, br->dead_node); @@ -2344,6 +2413,11 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data) spin_unlock(&dlm->spinlock); dlm_kick_recovery_thread(dlm); + + mlog(0, "%s: recovery started by node %u, for %u (%u:%u)\n", + dlm->name, br->node_idx, br->dead_node, + dlm->reco.dead_node, dlm->reco.new_master); + dlm_put(dlm); return 0; } @@ -2401,8 +2475,9 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data) if (!dlm_grab(dlm)) return 0; - mlog(0, "node %u finalizing recovery of node %u\n", - fr->node_idx, fr->dead_node); + mlog(0, "%s: node %u finalizing recovery of node %u (%u:%u)\n", + dlm->name, fr->node_idx, fr->dead_node, + dlm->reco.dead_node, dlm->reco.new_master); spin_lock(&dlm->spinlock); @@ -2426,6 +2501,9 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data) dlm_reset_recovery(dlm); dlm_kick_recovery_thread(dlm); + mlog(0, "%s: recovery done, reco master was %u, dead now %u, master now %u\n", + dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master); + dlm_put(dlm); return 0; } -- cgit v0.10.2 From 958837197e6415009cba0f31bbb5aacdb936ef09 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 18:47:41 -0700 Subject: ocfs2: better mle debugging Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index f1fbf2f..d1c85f1 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -130,15 +130,30 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm, #if 0 /* Code here is included but defined out as it aids debugging */ +#define dlm_print_nodemap(m) _dlm_print_nodemap(m,#m) +void _dlm_print_nodemap(unsigned long *map, const char *mapname) +{ + int i; + printk("%s=[ ", mapname); + for (i=0; imaybe_map, + *vote = mle->vote_map, + *resp = mle->response_map, + *node = mle->node_map; k = &mle->mle_refs; if (mle->type == DLM_MLE_BLOCK) @@ -159,9 +174,18 @@ void dlm_print_one_mle(struct dlm_master_list_entry *mle) name = mle->u.res->lockname.name; } - mlog(ML_NOTICE, " #%3d: %3s %3d %3u %3u %c (%d)%.*s\n", - i, type, refs, master, mle->new_master, attached, - namelen, namelen, name); + mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ", + namelen, name, type, refs, master, mle->new_master, attached, + mle->inuse); + dlm_print_nodemap(maybe); + printk(", "); + dlm_print_nodemap(vote); + printk(", "); + dlm_print_nodemap(resp); + printk(", "); + dlm_print_nodemap(node); + printk(", "); + printk("\n"); } static void dlm_dump_mles(struct dlm_ctxt *dlm) @@ -170,7 +194,6 @@ static void dlm_dump_mles(struct dlm_ctxt *dlm) struct list_head *iter; mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name); - mlog(ML_NOTICE, " ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n"); spin_lock(&dlm->master_lock); list_for_each(iter, &dlm->master_list) { mle = list_entry(iter, struct dlm_master_list_entry, list); -- cgit v0.10.2 From a2bf04774bf4aa0a75036c1e92e3d2fd1cce2aff Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 18:51:26 -0700 Subject: ocfs2: mle ref counting fixes Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index d1c85f1..1939944 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -74,6 +74,7 @@ struct dlm_master_list_entry wait_queue_head_t wq; atomic_t woken; struct kref mle_refs; + int inuse; unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; @@ -337,6 +338,31 @@ static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm, spin_unlock(&dlm->spinlock); } +static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle) +{ + struct dlm_ctxt *dlm; + dlm = mle->dlm; + + assert_spin_locked(&dlm->spinlock); + assert_spin_locked(&dlm->master_lock); + mle->inuse++; + kref_get(&mle->mle_refs); +} + +static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle) +{ + struct dlm_ctxt *dlm; + dlm = mle->dlm; + + spin_lock(&dlm->spinlock); + spin_lock(&dlm->master_lock); + mle->inuse--; + __dlm_put_mle(mle); + spin_unlock(&dlm->master_lock); + spin_unlock(&dlm->spinlock); + +} + /* remove from list and free */ static void __dlm_put_mle(struct dlm_master_list_entry *mle) { @@ -390,6 +416,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle, memset(mle->response_map, 0, sizeof(mle->response_map)); mle->master = O2NM_MAX_NODES; mle->new_master = O2NM_MAX_NODES; + mle->inuse = 0; if (mle->type == DLM_MLE_MASTER) { BUG_ON(!res); @@ -809,7 +836,7 @@ lookup: * if so, the creator of the BLOCK may try to put the last * ref at this time in the assert master handler, so we * need an extra one to keep from a bad ptr deref. */ - dlm_get_mle(mle); + dlm_get_mle_inuse(mle); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); @@ -899,7 +926,7 @@ wait: dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); /* put the extra ref */ - dlm_put_mle(mle); + dlm_put_mle_inuse(mle); wake_waiters: spin_lock(&res->spinlock); @@ -1753,6 +1780,7 @@ ok: if (mle) { int extra_ref = 0; int nn = -1; + int rr, err = 0; spin_lock(&mle->spinlock); if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION) @@ -1772,27 +1800,64 @@ ok: wake_up(&mle->wq); spin_unlock(&mle->spinlock); - if (mle->type == DLM_MLE_MIGRATION && res) { - mlog(0, "finishing off migration of lockres %.*s, " - "from %u to %u\n", - res->lockname.len, res->lockname.name, - dlm->node_num, mle->new_master); + if (res) { spin_lock(&res->spinlock); - res->state &= ~DLM_LOCK_RES_MIGRATING; - dlm_change_lockres_owner(dlm, res, mle->new_master); - BUG_ON(res->state & DLM_LOCK_RES_DIRTY); + if (mle->type == DLM_MLE_MIGRATION) { + mlog(0, "finishing off migration of lockres %.*s, " + "from %u to %u\n", + res->lockname.len, res->lockname.name, + dlm->node_num, mle->new_master); + res->state &= ~DLM_LOCK_RES_MIGRATING; + dlm_change_lockres_owner(dlm, res, mle->new_master); + BUG_ON(res->state & DLM_LOCK_RES_DIRTY); + } else { + dlm_change_lockres_owner(dlm, res, mle->master); + } spin_unlock(&res->spinlock); } - /* master is known, detach if not already detached */ - dlm_mle_detach_hb_events(dlm, mle); - dlm_put_mle(mle); - + + /* master is known, detach if not already detached. + * ensures that only one assert_master call will happen + * on this mle. */ + spin_lock(&dlm->spinlock); + spin_lock(&dlm->master_lock); + + rr = atomic_read(&mle->mle_refs.refcount); + if (mle->inuse > 0) { + if (extra_ref && rr < 3) + err = 1; + else if (!extra_ref && rr < 2) + err = 1; + } else { + if (extra_ref && rr < 2) + err = 1; + else if (!extra_ref && rr < 1) + err = 1; + } + if (err) { + mlog(ML_ERROR, "%s:%.*s: got assert master from %u " + "that will mess up this node, refs=%d, extra=%d, " + "inuse=%d\n", dlm->name, namelen, name, + assert->node_idx, rr, extra_ref, mle->inuse); + dlm_print_one_mle(mle); + } + list_del_init(&mle->list); + __dlm_mle_detach_hb_events(dlm, mle); + __dlm_put_mle(mle); if (extra_ref) { /* the assert master message now balances the extra * ref given by the master / migration request message. * if this is the last put, it will be removed * from the list. */ - dlm_put_mle(mle); + __dlm_put_mle(mle); + } + spin_unlock(&dlm->master_lock); + spin_unlock(&dlm->spinlock); + } else if (res) { + if (res->owner != assert->node_idx) { + mlog(0, "assert_master from %u, but current " + "owner is %u (%.*s), no mle\n", assert->node_idx, + res->owner, namelen, name); } } @@ -2138,7 +2203,7 @@ fail: * take both dlm->spinlock and dlm->master_lock */ spin_lock(&dlm->spinlock); spin_lock(&dlm->master_lock); - dlm_get_mle(mle); + dlm_get_mle_inuse(mle); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); @@ -2155,7 +2220,10 @@ fail: /* migration failed, detach and clean up mle */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); - dlm_put_mle(mle); + dlm_put_mle_inuse(mle); + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_MIGRATING; + spin_unlock(&res->spinlock); goto leave; } @@ -2196,7 +2264,10 @@ fail: /* migration failed, detach and clean up mle */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); - dlm_put_mle(mle); + dlm_put_mle_inuse(mle); + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_MIGRATING; + spin_unlock(&res->spinlock); goto leave; } /* TODO: if node died: stop, clean up, return error */ @@ -2212,7 +2283,7 @@ fail: /* master is known, detach if not already detached */ dlm_mle_detach_hb_events(dlm, mle); - dlm_put_mle(mle); + dlm_put_mle_inuse(mle); ret = 0; dlm_lockres_calc_usage(dlm, res); -- cgit v0.10.2 From da01ad05528bf6f6bcb286329b14225a71713325 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 18:53:04 -0700 Subject: ocfs2: detach mle from heartbeat events Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 1939944..d7a5169 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -2673,6 +2673,7 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, /* remove it from the list so that only one * mle will be found */ list_del_init(&tmp->list); + __dlm_mle_detach_hb_events(dlm, mle); } spin_unlock(&tmp->spinlock); } @@ -2767,6 +2768,7 @@ top: /* remove from the list early. NOTE: unlinking * list_head while in list_for_each_safe */ + __dlm_mle_detach_hb_events(dlm, mle); spin_lock(&mle->spinlock); list_del_init(&mle->list); atomic_set(&mle->woken, 1); -- cgit v0.10.2 From 41b8c8a101ba77f59d9a4b3cac6c846cb8a34840 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 19:00:26 -0700 Subject: ocfs2: properly initialize the mle structure Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index d7a5169..376283e 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -1512,15 +1512,12 @@ way_up_top: mlog_errno(-ENOMEM); goto send_response; } - spin_lock(&dlm->spinlock); - dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, - name, namelen); - spin_unlock(&dlm->spinlock); goto way_up_top; } // mlog(0, "this is second time thru, already allocated, " // "add the block.\n"); + dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen); set_bit(request->node_idx, mle->maybe_map); list_add(&mle->list, &dlm->master_list); response = DLM_MASTER_RESP_NO; -- cgit v0.10.2 From 2d1a868c563f07c07c681836d273d69efb7c5ad8 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 19:01:35 -0700 Subject: ocfs2: take mle reference during migration Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 376283e..0b7e29e 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -1710,6 +1710,23 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) assert->node_idx); } } + if (mle->type == DLM_MLE_MIGRATION) { + if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) { + mlog(0, "%s:%.*s: got cleanup assert" + " from %u for migration\n", + dlm->name, namelen, name, + assert->node_idx); + } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) { + mlog(0, "%s:%.*s: got unrelated assert" + " from %u for migration, ignoring\n", + dlm->name, namelen, name, + assert->node_idx); + __dlm_put_mle(mle); + spin_unlock(&dlm->master_lock); + spin_unlock(&dlm->spinlock); + goto done; + } + } } spin_unlock(&dlm->master_lock); -- cgit v0.10.2 From dc2ed195dda848c8e4b24f3f0e1952fa74f74f6b Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 19:03:18 -0700 Subject: ocfs2: allow for an assert message during lock mastery Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 0b7e29e..1b28d3e 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -1741,7 +1741,8 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) goto kill; } if (!mle) { - if (res->owner != assert->node_idx) { + if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN && + res->owner != assert->node_idx) { mlog(ML_ERROR, "assert_master from " "%u, but current owner is " "%u! (%.*s)\n", -- cgit v0.10.2 From aa85235427992b8d3040297d9174d69dd1d8a675 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 19:04:49 -0700 Subject: ocfs2: mle ref count debugging Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 1b28d3e..9c5ef40 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -371,9 +371,14 @@ static void __dlm_put_mle(struct dlm_master_list_entry *mle) assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->master_lock); - BUG_ON(!atomic_read(&mle->mle_refs.refcount)); - - kref_put(&mle->mle_refs, dlm_mle_release); + if (!atomic_read(&mle->mle_refs.refcount)) { + /* this may or may not crash, but who cares. + * it's a BUG. */ + mlog(ML_ERROR, "bad mle: %p\n", mle); + dlm_print_one_mle(mle); + BUG(); + } else + kref_put(&mle->mle_refs, dlm_mle_release); } @@ -1008,6 +1013,12 @@ recheck: "rechecking now\n", dlm->name, res->lockname.len, res->lockname.name); goto recheck; + } else { + if (!voting_done) { + mlog(0, "map not changed and voting not done " + "for %s:%.*s\n", dlm->name, res->lockname.len, + res->lockname.name); + } } if (m != O2NM_MAX_NODES) { @@ -1691,7 +1702,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) if (bit >= O2NM_MAX_NODES) { /* not necessarily an error, though less likely. * could be master just re-asserting. */ - mlog(ML_ERROR, "no bits set in the maybe_map, but %u " + mlog(0, "no bits set in the maybe_map, but %u " "is asserting! (%.*s)\n", assert->node_idx, namelen, name); } else if (bit != assert->node_idx) { @@ -1703,7 +1714,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) * number winning the mastery will respond * YES to mastery requests, but this node * had no way of knowing. let it pass. */ - mlog(ML_ERROR, "%u is the lowest node, " + mlog(0, "%u is the lowest node, " "%u is asserting. (%.*s) %u must " "have begun after %u won.\n", bit, assert->node_idx, namelen, name, bit, @@ -2268,8 +2279,8 @@ fail: /* avoid hang during shutdown when migrating lockres * to a node which also goes down */ if (dlm_is_node_dead(dlm, target)) { - mlog(0, "%s:%.*s: expected migration target %u " - "is no longer up. restarting.\n", + mlog(0, "%s:%.*s: expected migration " + "target %u is no longer up, restarting\n", dlm->name, res->lockname.len, res->lockname.name, target); ret = -ERESTARTSYS; @@ -2790,8 +2801,8 @@ top: spin_unlock(&mle->spinlock); wake_up(&mle->wq); - mlog(0, "node %u died during migration from " - "%u to %u!\n", dead_node, + mlog(0, "%s: node %u died during migration from " + "%u to %u!\n", dlm->name, dead_node, mle->master, mle->new_master); /* if there is a lockres associated with this * mle, find it and set its owner to UNKNOWN */ -- cgit v0.10.2 From c0a8520c7333dd62624683772f31864c7f9c46d9 Mon Sep 17 00:00:00 2001 From: Mark Fasheh Date: Thu, 27 Apr 2006 19:07:45 -0700 Subject: ocfs2: do LVB puts in place Don't wait until the AST will be fired to do the LVB copy into the lock resource. Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmast.c b/fs/ocfs2/dlm/dlmast.c index 87ee29c..42775e2 100644 --- a/fs/ocfs2/dlm/dlmast.c +++ b/fs/ocfs2/dlm/dlmast.c @@ -197,12 +197,14 @@ static void dlm_update_lvb(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, lock->ml.node == dlm->node_num ? "master" : "remote"); memcpy(lksb->lvb, res->lvb, DLM_LVB_LEN); - } else if (lksb->flags & DLM_LKSB_PUT_LVB) { - mlog(0, "setting lvb from lockres for %s node\n", - lock->ml.node == dlm->node_num ? "master" : - "remote"); - memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN); } + /* Do nothing for lvb put requests - they should be done in + * place when the lock is downconverted - otherwise we risk + * racing gets and puts which could result in old lvb data + * being propagated. We leave the put flag set and clear it + * here. In the future we might want to clear it at the time + * the put is actually done. + */ spin_unlock(&res->spinlock); } diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c index 70888b3..90cbaaf 100644 --- a/fs/ocfs2/dlm/dlmconvert.c +++ b/fs/ocfs2/dlm/dlmconvert.c @@ -214,6 +214,9 @@ grant: if (lock->ml.node == dlm->node_num) mlog(0, "doing in-place convert for nonlocal lock\n"); lock->ml.type = type; + if (lock->lksb->flags & DLM_LKSB_PUT_LVB) + memcpy(res->lvb, lock->lksb->lvb, DLM_LVB_LEN); + status = DLM_NORMAL; *call_ast = 1; goto unlock_exit; -- cgit v0.10.2 From a7f90d83ea8dc8b0999ab7c1c0539af9a6ed69d2 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 19:24:21 -0700 Subject: ocfs2: dump lockres info before we BUG() on a bad reference Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 9c5ef40..427c0af 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -619,6 +619,28 @@ static void dlm_lockres_release(struct kref *kref) mlog(0, "destroying lockres %.*s\n", res->lockname.len, res->lockname.name); + if (!hlist_unhashed(&res->hash_node) || + !list_empty(&res->granted) || + !list_empty(&res->converting) || + !list_empty(&res->blocked) || + !list_empty(&res->dirty) || + !list_empty(&res->recovering) || + !list_empty(&res->purge)) { + mlog(ML_ERROR, + "Going to BUG for resource %.*s." + " We're on a list! [%c%c%c%c%c%c%c]\n", + res->lockname.len, res->lockname.name, + !hlist_unhashed(&res->hash_node) ? 'H' : ' ', + !list_empty(&res->granted) ? 'G' : ' ', + !list_empty(&res->converting) ? 'C' : ' ', + !list_empty(&res->blocked) ? 'B' : ' ', + !list_empty(&res->dirty) ? 'D' : ' ', + !list_empty(&res->recovering) ? 'R' : ' ', + !list_empty(&res->purge) ? 'P' : ' '); + + dlm_print_one_lock_resource(res); + } + /* By the time we're ready to blow this guy away, we shouldn't * be on any lists. */ BUG_ON(!hlist_unhashed(&res->hash_node)); -- cgit v0.10.2 From a9ee4c8a67b962db0208addf0e32935aa571af6b Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Thu, 27 Apr 2006 19:26:15 -0700 Subject: ocfs2: better error handling during assert master message handle errors during lock assert master by either killing self or other node Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 427c0af..81ceee2 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -1633,6 +1633,8 @@ again: dlm_node_iter_init(nodemap, &iter); while ((to = dlm_node_iter_next(&iter)) >= 0) { int r = 0; + struct dlm_master_list_entry *mle = NULL; + mlog(0, "sending assert master to %d (%.*s)\n", to, namelen, lockname); memset(&assert, 0, sizeof(assert)); @@ -1657,7 +1659,15 @@ again: /* ok, something horribly messed. kill thyself. */ mlog(ML_ERROR,"during assert master of %.*s to %u, " "got %d.\n", namelen, lockname, to, r); - dlm_dump_lock_resources(dlm); + spin_lock(&dlm->spinlock); + spin_lock(&dlm->master_lock); + if (dlm_find_mle(dlm, &mle, (char *)lockname, + namelen)) { + dlm_print_one_mle(mle); + __dlm_put_mle(mle); + } + spin_unlock(&dlm->master_lock); + spin_unlock(&dlm->spinlock); BUG(); } else if (r == EAGAIN) { mlog(0, "%.*s: node %u create mles on other " @@ -1922,12 +1932,12 @@ done: kill: /* kill the caller! */ + mlog(ML_ERROR, "Bad message received from another node. Dumping state " + "and killing the other node now! This node is OK and can continue.\n"); + __dlm_print_one_lock_resource(res); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); dlm_lockres_put(res); - mlog(ML_ERROR, "Bad message received from another node. Dumping state " - "and killing the other node now! This node is OK and can continue.\n"); - dlm_dump_lock_resources(dlm); dlm_put(dlm); return -EINVAL; } -- cgit v0.10.2 From 69d72b066cc5971318d9e29e34289b74cf8a9d22 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 10:57:51 -0700 Subject: ocfs2: dlm recovery / lockres reference count fix Take a reference on lockres structures while they are on the recovery list. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index bf87391..9e05244 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -849,6 +849,7 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node); int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock); +int __dlm_lockres_unused(struct dlm_lock_resource *res); static inline const char * dlm_lock_mode_name(int mode) { diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 81bd240..6ee8b32 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1758,8 +1758,14 @@ void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm, struct dlm_lock *lock; res->state |= DLM_LOCK_RES_RECOVERING; - if (!list_empty(&res->recovering)) + if (!list_empty(&res->recovering)) { + mlog(0, + "Recovering res %s:%.*s, is already on recovery list!\n", + dlm->name, res->lockname.len, res->lockname.name); list_del_init(&res->recovering); + } + /* We need to hold a reference while on the recovery list */ + dlm_lockres_get(res); list_add_tail(&res->recovering, &dlm->reco.resources); /* find any pending locks and put them back on proper list */ @@ -1848,9 +1854,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, spin_lock(&res->spinlock); dlm_change_lockres_owner(dlm, res, new_master); res->state &= ~DLM_LOCK_RES_RECOVERING; - __dlm_dirty_lockres(dlm, res); + if (!__dlm_lockres_unused(res)) + __dlm_dirty_lockres(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); + dlm_lockres_put(res); } } @@ -1883,11 +1891,13 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, dlm->name, res->lockname.len, res->lockname.name, res->owner); list_del_init(&res->recovering); + dlm_lockres_put(res); } spin_lock(&res->spinlock); dlm_change_lockres_owner(dlm, res, new_master); res->state &= ~DLM_LOCK_RES_RECOVERING; - __dlm_dirty_lockres(dlm, res); + if (!__dlm_lockres_unused(res)) + __dlm_dirty_lockres(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); } diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index 22bb58a..71302b9 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -81,7 +81,7 @@ repeat: } -static int __dlm_lockres_unused(struct dlm_lock_resource *res) +int __dlm_lockres_unused(struct dlm_lock_resource *res) { if (list_empty(&res->granted) && list_empty(&res->converting) && -- cgit v0.10.2 From 466d1a4591c4e1bc3affd5c0cf3df5ad20338fb9 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 11:11:13 -0700 Subject: ocfs2: make dlm recovery finalization 2 stage Makes it easier for the recovery process to deal with node death. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 9e05244..78eccd0 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -71,7 +71,8 @@ static inline int dlm_is_recovery_lock(const char *lock_name, int name_len) return 0; } -#define DLM_RECO_STATE_ACTIVE 0x0001 +#define DLM_RECO_STATE_ACTIVE 0x0001 +#define DLM_RECO_STATE_FINALIZE 0x0002 struct dlm_recovery_ctxt { @@ -633,7 +634,8 @@ struct dlm_finalize_reco { u8 node_idx; u8 dead_node; - __be16 pad1; + u8 flags; + u8 pad1; __be32 pad2; }; diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 6ee8b32..19123ce 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -134,12 +134,18 @@ static inline void dlm_set_reco_master(struct dlm_ctxt *dlm, dlm->reco.new_master = master; } -static inline void dlm_reset_recovery(struct dlm_ctxt *dlm) +static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm) { - spin_lock(&dlm->spinlock); + assert_spin_locked(&dlm->spinlock); clear_bit(dlm->reco.dead_node, dlm->recovery_map); dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); +} + +static inline void dlm_reset_recovery(struct dlm_ctxt *dlm) +{ + spin_lock(&dlm->spinlock); + __dlm_reset_recovery(dlm); spin_unlock(&dlm->spinlock); } @@ -2074,6 +2080,20 @@ static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx) { assert_spin_locked(&dlm->spinlock); + if (dlm->reco.new_master == idx) { + mlog(0, "%s: recovery master %d just died\n", + dlm->name, idx); + if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { + /* finalize1 was reached, so it is safe to clear + * the new_master and dead_node. that recovery + * is complete. */ + mlog(0, "%s: dead master %d had reached " + "finalize1 state, clearing\n", dlm->name, idx); + dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; + __dlm_reset_recovery(dlm); + } + } + /* check to see if the node is already considered dead */ if (!test_bit(idx, dlm->live_nodes_map)) { mlog(0, "for domain %s, node %d is already dead. " @@ -2364,6 +2384,14 @@ retry: * another ENOMEM */ msleep(100); goto retry; + } else if (ret == EAGAIN) { + mlog(0, "%s: trying to start recovery of node " + "%u, but node %u is waiting for last recovery " + "to complete, backoff for a bit\n", dlm->name, + dead_node, nodenum); + /* TODO Look into replacing msleep with cond_resched() */ + msleep(100); + goto retry; } } @@ -2379,6 +2407,17 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data) if (!dlm_grab(dlm)) return 0; + spin_lock(&dlm->spinlock); + if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { + mlog(0, "%s: node %u wants to recover node %u (%u:%u) " + "but this node is in finalize state, waiting on finalize2\n", + dlm->name, br->node_idx, br->dead_node, + dlm->reco.dead_node, dlm->reco.new_master); + spin_unlock(&dlm->spinlock); + return EAGAIN; + } + spin_unlock(&dlm->spinlock); + mlog(0, "%s: node %u wants to recover node %u (%u:%u)\n", dlm->name, br->node_idx, br->dead_node, dlm->reco.dead_node, dlm->reco.new_master); @@ -2432,6 +2471,7 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data) return 0; } +#define DLM_FINALIZE_STAGE2 0x01 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) { int ret = 0; @@ -2439,25 +2479,31 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) struct dlm_node_iter iter; int nodenum; int status; + int stage = 1; - mlog(0, "finishing recovery for node %s:%u\n", - dlm->name, dlm->reco.dead_node); + mlog(0, "finishing recovery for node %s:%u, " + "stage %d\n", dlm->name, dlm->reco.dead_node, stage); spin_lock(&dlm->spinlock); dlm_node_iter_init(dlm->domain_map, &iter); spin_unlock(&dlm->spinlock); +stage2: memset(&fr, 0, sizeof(fr)); fr.node_idx = dlm->node_num; fr.dead_node = dlm->reco.dead_node; + if (stage == 2) + fr.flags |= DLM_FINALIZE_STAGE2; while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { if (nodenum == dlm->node_num) continue; ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key, &fr, sizeof(fr), nodenum, &status); - if (ret >= 0) { + if (ret >= 0) ret = status; + if (ret < 0) { + mlog_errno(ret); if (dlm_is_host_down(ret)) { /* this has no effect on this recovery * session, so set the status to zero to @@ -2466,12 +2512,15 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) "node finished recovery.\n", nodenum); ret = 0; } - } - if (ret < 0) { - mlog_errno(ret); break; } } + if (stage == 1) { + /* reset the node_iter back to the top and send finalize2 */ + iter.curnode = -1; + stage = 2; + goto stage2; + } return ret; } @@ -2480,15 +2529,19 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data) { struct dlm_ctxt *dlm = data; struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf; + int stage = 1; /* ok to return 0, domain has gone away */ if (!dlm_grab(dlm)) return 0; - mlog(0, "%s: node %u finalizing recovery of node %u (%u:%u)\n", - dlm->name, fr->node_idx, fr->dead_node, - dlm->reco.dead_node, dlm->reco.new_master); + if (fr->flags & DLM_FINALIZE_STAGE2) + stage = 2; + mlog(0, "%s: node %u finalizing recovery stage%d of " + "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage, + fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master); + spin_lock(&dlm->spinlock); if (dlm->reco.new_master != fr->node_idx) { @@ -2504,13 +2557,38 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data) BUG(); } - dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx); - - spin_unlock(&dlm->spinlock); - - dlm_reset_recovery(dlm); + switch (stage) { + case 1: + dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx); + if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { + mlog(ML_ERROR, "%s: received finalize1 from " + "new master %u for dead node %u, but " + "this node has already received it!\n", + dlm->name, fr->node_idx, fr->dead_node); + dlm_print_reco_node_status(dlm); + BUG(); + } + dlm->reco.state |= DLM_RECO_STATE_FINALIZE; + spin_unlock(&dlm->spinlock); + break; + case 2: + if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) { + mlog(ML_ERROR, "%s: received finalize2 from " + "new master %u for dead node %u, but " + "this node did not have finalize1!\n", + dlm->name, fr->node_idx, fr->dead_node); + dlm_print_reco_node_status(dlm); + BUG(); + } + dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; + spin_unlock(&dlm->spinlock); + dlm_reset_recovery(dlm); + dlm_kick_recovery_thread(dlm); + break; + default: + BUG(); + } - dlm_kick_recovery_thread(dlm); mlog(0, "%s: recovery done, reco master was %u, dead now %u, master now %u\n", dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master); -- cgit v0.10.2 From 343e26a4007d14c2154c1d13d1209797dce5c535 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 11:15:04 -0700 Subject: ocfs2: dump mismatching migrated lvbs before BUG() Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 19123ce..de68e49 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1689,8 +1689,19 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, if (!dlm_lvb_is_empty(res->lvb) && (ml->type == LKM_EXMODE || memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { - mlog(ML_ERROR, "received bad lvb!\n"); - __dlm_print_one_lock_resource(res); + int i; + mlog(ML_ERROR, "%s:%.*s: received bad " + "lvb! type=%d\n", dlm->name, + res->lockname.len, + res->lockname.name, ml->type); + printk("lockres lvb=["); + for (i=0; ilvb[i]); + printk("]\nmigrated lvb=["); + for (i=0; ilvb[i]); + printk("]\n"); + dlm_print_one_lock_resource(res); BUG(); } memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); -- cgit v0.10.2 From 8b2198097ae6a5b54ed92345989ec343070f916b Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 11:16:45 -0700 Subject: ocfs2: purge lockres' sooner Immediately purge a lockress that the local node is not the master of. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index 71302b9..76526ea 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -54,6 +54,8 @@ #include "cluster/masklog.h" static int dlm_thread(void *data); +static void dlm_purge_lockres_now(struct dlm_ctxt *dlm, + struct dlm_lock_resource *lockres); static void dlm_flush_asts(struct dlm_ctxt *dlm); @@ -111,10 +113,23 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, res->last_used = jiffies; list_add_tail(&res->purge, &dlm->purge_list); dlm->purge_count++; + + /* if this node is not the owner, there is + * no way to keep track of who the owner could be. + * unhash it to avoid serious problems. */ + if (res->owner != dlm->node_num) { + mlog(0, "%s:%.*s: doing immediate " + "purge of lockres owned by %u\n", + dlm->name, res->lockname.len, + res->lockname.name, res->owner); + + dlm_purge_lockres_now(dlm, res); + } } } else if (!list_empty(&res->purge)) { - mlog(0, "removing lockres %.*s from purge list\n", - res->lockname.len, res->lockname.name); + mlog(0, "removing lockres %.*s from purge list, " + "owner=%u\n", res->lockname.len, res->lockname.name, + res->owner); list_del_init(&res->purge); dlm->purge_count--; @@ -180,6 +195,24 @@ finish: __dlm_unhash_lockres(lockres); } +/* make an unused lockres go away immediately. + * as soon as the dlm spinlock is dropped, this lockres + * will not be found. kfree still happens on last put. */ +static void dlm_purge_lockres_now(struct dlm_ctxt *dlm, + struct dlm_lock_resource *lockres) +{ + assert_spin_locked(&dlm->spinlock); + assert_spin_locked(&lockres->spinlock); + + BUG_ON(!__dlm_lockres_unused(lockres)); + + if (!list_empty(&lockres->purge)) { + list_del_init(&lockres->purge); + dlm->purge_count--; + } + __dlm_unhash_lockres(lockres); +} + static void dlm_run_purge_list(struct dlm_ctxt *dlm, int purge_now) { -- cgit v0.10.2 From 588e00902b06528c476eed38019dba4a087def01 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 11:22:06 -0700 Subject: ocfs2: do not send master requests to localhost Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 81ceee2..e5d7271 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -994,12 +994,14 @@ recheck: spin_unlock(&res->spinlock); /* this will cause the master to re-assert across * the whole cluster, freeing up mles */ - ret = dlm_do_master_request(mle, res->owner); - if (ret < 0) { - /* give recovery a chance to run */ - mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); - msleep(500); - goto recheck; + if (res->owner != dlm->node_num) { + ret = dlm_do_master_request(mle, res->owner); + if (ret < 0) { + /* give recovery a chance to run */ + mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); + msleep(500); + goto recheck; + } } ret = 0; goto leave; -- cgit v0.10.2 From ccd8b1f916bc5e4b2156f03ccd3546be7f65f6b3 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 11:32:14 -0700 Subject: ocfs2: update lvb immediately during recovery Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index de68e49..86199f6 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1674,40 +1674,48 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, } lksb->flags |= (ml->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)); - + + if (ml->type == LKM_NLMODE) + goto skip_lvb; + if (!dlm_lvb_is_empty(mres->lvb)) { if (lksb->flags & DLM_LKSB_PUT_LVB) { /* other node was trying to update * lvb when node died. recreate the * lksb with the updated lvb. */ memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN); + /* the lock resource lvb update must happen + * NOW, before the spinlock is dropped. + * we no longer wait for the AST to update + * the lvb. */ + memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); } else { /* otherwise, the node is sending its * most recent valid lvb info */ BUG_ON(ml->type != LKM_EXMODE && ml->type != LKM_PRMODE); if (!dlm_lvb_is_empty(res->lvb) && - (ml->type == LKM_EXMODE || - memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { - int i; - mlog(ML_ERROR, "%s:%.*s: received bad " - "lvb! type=%d\n", dlm->name, - res->lockname.len, - res->lockname.name, ml->type); - printk("lockres lvb=["); - for (i=0; ilvb[i]); - printk("]\nmigrated lvb=["); - for (i=0; ilvb[i]); - printk("]\n"); - dlm_print_one_lock_resource(res); - BUG(); + (ml->type == LKM_EXMODE || + memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { + int i; + mlog(ML_ERROR, "%s:%.*s: received bad " + "lvb! type=%d\n", dlm->name, + res->lockname.len, + res->lockname.name, ml->type); + printk("lockres lvb=["); + for (i=0; ilvb[i]); + printk("]\nmigrated lvb=["); + for (i=0; ilvb[i]); + printk("]\n"); + dlm_print_one_lock_resource(res); + BUG(); } memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); } } - +skip_lvb: /* NOTE: * wrt lock queue ordering and recovery: -- cgit v0.10.2 From e4eb03681a8313168d99c2f93175331a898a2c16 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 11:46:59 -0700 Subject: ocfs2: gracefully handle stale create_lock messages. This is an error on the sending side, so gracefully error out on the receiving end. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c index 55cda25..5757620 100644 --- a/fs/ocfs2/dlm/dlmlock.c +++ b/fs/ocfs2/dlm/dlmlock.c @@ -280,6 +280,14 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, if (tmpret >= 0) { // successfully sent and received ret = status; // this is already a dlm_status + if (ret == DLM_RECOVERING) { + mlog(ML_ERROR, "%s:%.*s: BUG. this is a stale lockres " + "no longer owned by %u. that node is coming back " + "up currently.\n", dlm->name, create.namelen, + create.name, res->owner); + dlm_print_one_lock_resource(res); + BUG(); + } } else { mlog_errno(tmpret); if (dlm_is_host_down(tmpret)) { @@ -428,11 +436,16 @@ int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data) if (!dlm_grab(dlm)) return DLM_REJECTED; - mlog_bug_on_msg(!dlm_domain_fully_joined(dlm), - "Domain %s not fully joined!\n", dlm->name); - name = create->name; namelen = create->namelen; + status = DLM_RECOVERING; + if (!dlm_domain_fully_joined(dlm)) { + mlog(ML_ERROR, "Domain %s not fully joined, but node %u is " + "sending a create_lock message for lock %.*s!\n", + dlm->name, create->node_idx, namelen, name); + dlm_error(status); + goto leave; + } status = DLM_IVBUFLEN; if (namelen > DLM_LOCKID_NAME_MAX) { -- cgit v0.10.2 From e7e69eb38946ebef86e27442d01514fcf9c854ee Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 11:49:52 -0700 Subject: ocfs2: teach dlm_restart_lock_mastery() to wait on recovery Change behavior of dlm_restart_lock_mastery() when a node goes down. Dump all responses that have been collected and start over. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index e5d7271..915283f 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -867,6 +867,7 @@ lookup: spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); +redo_request: while (wait_on_recovery) { /* any cluster changes that occurred after dropping the * dlm spinlock would be detectable be a change on the mle, @@ -904,7 +905,6 @@ lookup: if (blocked) goto wait; -redo_request: ret = -EINVAL; dlm_node_iter_init(mle->vote_map, &iter); while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { @@ -929,6 +929,7 @@ wait: /* keep going until the response map includes all nodes */ ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked); if (ret < 0) { + wait_on_recovery = 1; mlog(0, "%s:%.*s: node map changed, redo the " "master request now, blocked=%d\n", dlm->name, res->lockname.len, @@ -1210,18 +1211,6 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, set_bit(node, mle->vote_map); } else { mlog(ML_ERROR, "node down! %d\n", node); - - /* if the node wasn't involved in mastery skip it, - * but clear it out from the maps so that it will - * not affect mastery of this lockres */ - clear_bit(node, mle->response_map); - clear_bit(node, mle->vote_map); - if (!test_bit(node, mle->maybe_map)) - goto next; - - /* if we're already blocked on lock mastery, and the - * dead node wasn't the expected master, or there is - * another node in the maybe_map, keep waiting */ if (blocked) { int lowest = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); @@ -1229,54 +1218,53 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, /* act like it was never there */ clear_bit(node, mle->maybe_map); - if (node != lowest) - goto next; - - mlog(ML_ERROR, "expected master %u died while " - "this node was blocked waiting on it!\n", - node); - lowest = find_next_bit(mle->maybe_map, - O2NM_MAX_NODES, - lowest+1); - if (lowest < O2NM_MAX_NODES) { - mlog(0, "still blocked. waiting " - "on %u now\n", lowest); - goto next; + if (node == lowest) { + mlog(0, "expected master %u died" + " while this node was blocked " + "waiting on it!\n", node); + lowest = find_next_bit(mle->maybe_map, + O2NM_MAX_NODES, + lowest+1); + if (lowest < O2NM_MAX_NODES) { + mlog(0, "%s:%.*s:still " + "blocked. waiting on %u " + "now\n", dlm->name, + res->lockname.len, + res->lockname.name, + lowest); + } else { + /* mle is an MLE_BLOCK, but + * there is now nothing left to + * block on. we need to return + * all the way back out and try + * again with an MLE_MASTER. + * dlm_do_local_recovery_cleanup + * has already run, so the mle + * refcount is ok */ + mlog(0, "%s:%.*s: no " + "longer blocking. try to " + "master this here\n", + dlm->name, + res->lockname.len, + res->lockname.name); + mle->type = DLM_MLE_MASTER; + mle->u.res = res; + } } - - /* mle is an MLE_BLOCK, but there is now - * nothing left to block on. we need to return - * all the way back out and try again with - * an MLE_MASTER. dlm_do_local_recovery_cleanup - * has already run, so the mle refcount is ok */ - mlog(0, "no longer blocking. we can " - "try to master this here\n"); - mle->type = DLM_MLE_MASTER; - memset(mle->maybe_map, 0, - sizeof(mle->maybe_map)); - memset(mle->response_map, 0, - sizeof(mle->maybe_map)); - memcpy(mle->vote_map, mle->node_map, - sizeof(mle->node_map)); - mle->u.res = res; - set_bit(dlm->node_num, mle->maybe_map); - - ret = -EAGAIN; - goto next; } - clear_bit(node, mle->maybe_map); - if (node > dlm->node_num) - goto next; - - mlog(0, "dead node in map!\n"); - /* yuck. go back and re-contact all nodes - * in the vote_map, removing this node. */ - memset(mle->response_map, 0, - sizeof(mle->response_map)); + /* now blank out everything, as if we had never + * contacted anyone */ + memset(mle->maybe_map, 0, sizeof(mle->maybe_map)); + memset(mle->response_map, 0, sizeof(mle->response_map)); + /* reset the vote_map to the current node_map */ + memcpy(mle->vote_map, mle->node_map, + sizeof(mle->node_map)); + /* put myself into the maybe map */ + if (mle->type != DLM_MLE_BLOCK) + set_bit(dlm->node_num, mle->maybe_map); } ret = -EAGAIN; -next: node = dlm_bitmap_diff_iter_next(&bdi, &sc); } return ret; -- cgit v0.10.2 From 6ff06a93916b3f95e83c346f7530cf2f5c68ae0c Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 11:51:45 -0700 Subject: ocfs2: give the dlm dirty list a reference on the lockres Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 78eccd0..829cc39 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -235,18 +235,26 @@ struct dlm_lock_resource struct qstr lockname; struct kref refs; - /* please keep these next 3 in this order - * some funcs want to iterate over all lists */ + /* + * Please keep granted, converting, and blocked in this order, + * as some funcs want to iterate over all lists. + * + * All four lists are protected by the hash's reference. + */ struct list_head granted; struct list_head converting; struct list_head blocked; + struct list_head purge; + /* + * These two lists require you to hold an additional reference + * while they are on the list. + */ struct list_head dirty; struct list_head recovering; // dlm_recovery_ctxt.resources list /* unused lock resources have their last_used stamped and are * put on a list for the dlm thread to run. */ - struct list_head purge; unsigned long last_used; unsigned migration_pending:1; diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index 76526ea..610dc76 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -455,6 +455,8 @@ void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) /* don't shuffle secondary queues */ if ((res->owner == dlm->node_num) && !(res->state & DLM_LOCK_RES_DIRTY)) { + /* ref for dirty_list */ + dlm_lockres_get(res); list_add_tail(&res->dirty, &dlm->dirty_list); res->state |= DLM_LOCK_RES_DIRTY; } @@ -639,6 +641,8 @@ static int dlm_thread(void *data) list_del_init(&res->dirty); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); + /* Drop dirty_list ref */ + dlm_lockres_put(res); /* lockres can be re-dirtied/re-added to the * dirty_list in this gap, but that is ok */ @@ -691,6 +695,8 @@ in_progress: /* if the lock was in-progress, stick * it on the back of the list */ if (delay) { + /* ref for dirty_list */ + dlm_lockres_get(res); spin_lock(&res->spinlock); list_add_tail(&res->dirty, &dlm->dirty_list); res->state |= DLM_LOCK_RES_DIRTY; -- cgit v0.10.2 From f42a100b2272bc5cb44fb2aa03526b436b1d6833 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 11:53:33 -0700 Subject: ocfs2: have dlm_pre_master_reco_lockres() ignore dead nodes Recovery will spin in dlm_pre_master_reco_lockres if we do not ignore timed-out network responses from dead nodes. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 915283f..2e371e0 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -2050,6 +2050,7 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm, BUG(); /* host is down, so answer for that node would be * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */ + ret = 0; } if (master != DLM_LOCK_RES_OWNER_UNKNOWN) { -- cgit v0.10.2 From aa087b84977173395c0e3a1e0c1773314958f277 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 12:02:07 -0700 Subject: ocfs2: increase backoff before waiting for recovery When mastering non-recovery lock resources, additional time was frequently needed to allow the disk heartbeat to catch up with the network timeout. the recovery lock resource is time critical and avoids this path. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 2e371e0..21081bcf 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -886,7 +886,7 @@ redo_request: } dlm_kick_recovery_thread(dlm); - msleep(100); + msleep(1000); dlm_wait_for_recovery(dlm); spin_lock(&dlm->spinlock); -- cgit v0.10.2 From 2abaf97e62e51fdd09d5a46703b3b680f24bdd8b Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 13:27:10 -0700 Subject: ocfs2: do not unconditionally purge the lockres in dlmlock_remote() In dlmlock_remote(), do not call purge_lockres until the lock resource actually changes. otherwise, the mastery info on the lockres will go away underneath the caller. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c index 5757620..675123c 100644 --- a/fs/ocfs2/dlm/dlmlock.c +++ b/fs/ocfs2/dlm/dlmlock.c @@ -201,6 +201,7 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, struct dlm_lock *lock, int flags) { enum dlm_status status = DLM_DENIED; + int lockres_changed = 1; mlog_entry("type=%d\n", lock->ml.type); mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len, @@ -230,6 +231,10 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, dlm_error(status); dlm_revert_pending_lock(res, lock); dlm_lock_put(lock); + /* do NOT call calc_usage, as this would unhash the remote + * lockres before we ever get to use it. treat as if we + * never made any change to the lockres. */ + lockres_changed = 0; } else if (dlm_is_recovery_lock(res->lockname.name, res->lockname.len)) { /* special case for the $RECOVERY lock. @@ -243,7 +248,8 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, } spin_unlock(&res->spinlock); - dlm_lockres_calc_usage(dlm, res); + if (lockres_changed) + dlm_lockres_calc_usage(dlm, res); wake_up(&res->wq); return status; -- cgit v0.10.2 From c87a9ae7059f718bf1bb87a702fcbef535e32111 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 13:30:49 -0700 Subject: ocfs2: temporarily disable automatic lock migration Now we never change the owner of a lock resource until unmount or node death. This will be re-enabled once some issues in the algorithm used have been resolved. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c index 675123c..0ff9348 100644 --- a/fs/ocfs2/dlm/dlmlock.c +++ b/fs/ocfs2/dlm/dlmlock.c @@ -227,14 +227,18 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, res->state &= ~DLM_LOCK_RES_IN_PROGRESS; lock->lock_pending = 0; if (status != DLM_NORMAL) { - if (status != DLM_NOTQUEUED) + if (status != DLM_NOTQUEUED) { + /* + * DO NOT call calc_usage, as this would unhash + * the remote lockres before we ever get to use + * it. treat as if we never made any change to + * the lockres. + */ + lockres_changed = 0; dlm_error(status); + } dlm_revert_pending_lock(res, lock); dlm_lock_put(lock); - /* do NOT call calc_usage, as this would unhash the remote - * lockres before we ever get to use it. treat as if we - * never made any change to the lockres. */ - lockres_changed = 0; } else if (dlm_is_recovery_lock(res->lockname.name, res->lockname.len)) { /* special case for the $RECOVERY lock. diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index 610dc76..c1c10fd 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -106,6 +106,20 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, assert_spin_locked(&res->spinlock); if (__dlm_lockres_unused(res)){ + /* For now, just keep any resource we master */ + if (res->owner == dlm->node_num) + { + if (!list_empty(&res->purge)) { + mlog(0, "we master %s:%.*s, but it is on " + "the purge list. Removing\n", + dlm->name, res->lockname.len, + res->lockname.name); + list_del_init(&res->purge); + dlm->purge_count--; + } + return; + } + if (list_empty(&res->purge)) { mlog(0, "putting lockres %.*s from purge list\n", res->lockname.len, res->lockname.name); -- cgit v0.10.2 From 36407488b1cbc4d84bc2bd14e03f3f9b768090d9 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 13:32:27 -0700 Subject: ocfs2: pending mastery asserts and migrations should block each other Use the existing structure for blocking migrations when ASTs are pending to achieve the same result. If we can catch the assert before it goes on the wire, just cancel it and let the migration continue. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 21081bcf..78ba77b 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -2000,6 +2000,23 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data) } } + /* + * If we're migrating this lock to someone else, we are no + * longer allowed to assert out own mastery. OTOH, we need to + * prevent migration from starting while we're still asserting + * our dominance. The reserved ast delays migration. + */ + spin_lock(&res->spinlock); + if (res->state & DLM_LOCK_RES_MIGRATING) { + mlog(0, "Someone asked us to assert mastery, but we're " + "in the middle of migration. Skipping assert, " + "the new master will handle that.\n"); + spin_unlock(&res->spinlock); + goto put; + } else + __dlm_lockres_reserve_ast(res); + spin_unlock(&res->spinlock); + /* this call now finishes out the nodemap * even if one or more nodes die */ mlog(0, "worker about to master %.*s here, this=%u\n", @@ -2012,6 +2029,10 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data) mlog_errno(ret); } + /* Ok, we've asserted ourselves. Let's let migration start. */ + dlm_lockres_release_ast(dlm, res); + +put: dlm_lockres_put(res); mlog(0, "finished with dlm_assert_master_worker\n"); -- cgit v0.10.2 From c8df412e1c746dd21094966d04b3a79aad0f4d08 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 13:47:50 -0700 Subject: ocfs2: special case recovery lock in dlmlock_remote() If the previous master of the recovery lock dies, let calc_usage take it down completely and let the caller completely redo the dlmlock() call. Otherwise, there will never be an opportunity to re-master the lockres and recovery wont be able to progress. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c index 0ff9348..20b38dc 100644 --- a/fs/ocfs2/dlm/dlmlock.c +++ b/fs/ocfs2/dlm/dlmlock.c @@ -227,7 +227,16 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, res->state &= ~DLM_LOCK_RES_IN_PROGRESS; lock->lock_pending = 0; if (status != DLM_NORMAL) { - if (status != DLM_NOTQUEUED) { + if (status == DLM_RECOVERING && + dlm_is_recovery_lock(res->lockname.name, + res->lockname.len)) { + /* recovery lock was mastered by dead node. + * we need to have calc_usage shoot down this + * lockres and completely remaster it. */ + mlog(0, "%s: recovery lock was owned by " + "dead node %u, remaster it now.\n", + dlm->name, res->owner); + } else if (status != DLM_NOTQUEUED) { /* * DO NOT call calc_usage, as this would unhash * the remote lockres before we ever get to use @@ -691,18 +700,22 @@ retry_lock: msleep(100); /* no waiting for dlm_reco_thread */ if (recovery) { - if (status == DLM_RECOVERING) { - mlog(0, "%s: got RECOVERING " - "for $REOCVERY lock, master " - "was %u\n", dlm->name, - res->owner); - dlm_wait_for_node_death(dlm, res->owner, - DLM_NODE_DEATH_WAIT_MAX); - } + if (status != DLM_RECOVERING) + goto retry_lock; + + mlog(0, "%s: got RECOVERING " + "for $RECOVERY lock, master " + "was %u\n", dlm->name, + res->owner); + /* wait to see the node go down, then + * drop down and allow the lockres to + * get cleaned up. need to remaster. */ + dlm_wait_for_node_death(dlm, res->owner, + DLM_NODE_DEATH_WAIT_MAX); } else { dlm_wait_for_recovery(dlm); + goto retry_lock; } - goto retry_lock; } if (status != DLM_NORMAL) { diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 86199f6..00209f4 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -2314,6 +2314,10 @@ again: mlog(0, "%s: reco master %u is ready to recover %u\n", dlm->name, dlm->reco.new_master, dlm->reco.dead_node); status = -EEXIST; + } else if (ret == DLM_RECOVERING) { + mlog(0, "dlm=%s dlmlock says master node died (this=%u)\n", + dlm->name, dlm->node_num); + goto again; } else { struct dlm_lock_resource *res; -- cgit v0.10.2 From 6a41321121ee2af33b8ac55c87657603df480b25 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 13:49:20 -0700 Subject: ocfs2: dlm_remaster_locks() should never exit without completing We cannot restart recovery. Once we begin to recover a node, keep the state of the recovery intact and follow through, regardless of any other node deaths that may occur. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 00209f4..22a0b05 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -480,6 +480,7 @@ master_here: status = dlm_remaster_locks(dlm, dlm->reco.dead_node); if (status < 0) { + /* we should never hit this anymore */ mlog(ML_ERROR, "error %d remastering locks for node %u, " "retrying.\n", status, dlm->reco.dead_node); /* yield a bit to allow any final network messages @@ -506,9 +507,16 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) int destroy = 0; int pass = 0; - status = dlm_init_recovery_area(dlm, dead_node); - if (status < 0) - goto leave; + do { + /* we have become recovery master. there is no escaping + * this, so just keep trying until we get it. */ + status = dlm_init_recovery_area(dlm, dead_node); + if (status < 0) { + mlog(ML_ERROR, "%s: failed to alloc recovery area, " + "retrying\n", dlm->name); + msleep(1000); + } + } while (status != 0); /* safe to access the node data list without a lock, since this * process is the only one to change the list */ @@ -525,16 +533,36 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) continue; } - status = dlm_request_all_locks(dlm, ndata->node_num, dead_node); - if (status < 0) { - mlog_errno(status); - if (dlm_is_host_down(status)) - ndata->state = DLM_RECO_NODE_DATA_DEAD; - else { - destroy = 1; - goto leave; + do { + status = dlm_request_all_locks(dlm, ndata->node_num, + dead_node); + if (status < 0) { + mlog_errno(status); + if (dlm_is_host_down(status)) { + /* node died, ignore it for recovery */ + status = 0; + ndata->state = DLM_RECO_NODE_DATA_DEAD; + /* wait for the domain map to catch up + * with the network state. */ + wait_event_timeout(dlm->dlm_reco_thread_wq, + dlm_is_node_dead(dlm, + ndata->node_num), + msecs_to_jiffies(1000)); + mlog(0, "waited 1 sec for %u, " + "dead? %s\n", ndata->node_num, + dlm_is_node_dead(dlm, ndata->node_num) ? + "yes" : "no"); + } else { + /* -ENOMEM on the other node */ + mlog(0, "%s: node %u returned " + "%d during recovery, retrying " + "after a short wait\n", + dlm->name, ndata->node_num, + status); + msleep(100); + } } - } + } while (status != 0); switch (ndata->state) { case DLM_RECO_NODE_DATA_INIT: @@ -546,10 +574,9 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) mlog(0, "node %u died after requesting " "recovery info for node %u\n", ndata->node_num, dead_node); - // start all over - destroy = 1; - status = -EAGAIN; - goto leave; + /* fine. don't need this node's info. + * continue without it. */ + break; case DLM_RECO_NODE_DATA_REQUESTING: ndata->state = DLM_RECO_NODE_DATA_REQUESTED; mlog(0, "now receiving recovery data from " @@ -593,28 +620,12 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) BUG(); break; case DLM_RECO_NODE_DATA_DEAD: - mlog(ML_NOTICE, "node %u died after " + mlog(0, "node %u died after " "requesting recovery info for " "node %u\n", ndata->node_num, dead_node); spin_unlock(&dlm_reco_state_lock); - // start all over - destroy = 1; - status = -EAGAIN; - /* instead of spinning like crazy here, - * wait for the domain map to catch up - * with the network state. otherwise this - * can be hit hundreds of times before - * the node is really seen as dead. */ - wait_event_timeout(dlm->dlm_reco_thread_wq, - dlm_is_node_dead(dlm, - ndata->node_num), - msecs_to_jiffies(1000)); - mlog(0, "waited 1 sec for %u, " - "dead? %s\n", ndata->node_num, - dlm_is_node_dead(dlm, ndata->node_num) ? - "yes" : "no"); - goto leave; + break; case DLM_RECO_NODE_DATA_RECEIVING: case DLM_RECO_NODE_DATA_REQUESTED: mlog(0, "%s: node %u still in state %s\n", @@ -659,7 +670,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) jiffies, dlm->reco.dead_node, dlm->node_num, dlm->reco.new_master); destroy = 1; - status = ret; + status = 0; /* rescan everything marked dirty along the way */ dlm_kick_thread(dlm, NULL); break; @@ -672,7 +683,6 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) } -leave: if (destroy) dlm_destroy_recovery_area(dlm, dead_node); @@ -832,24 +842,22 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) if (dead_node != dlm->reco.dead_node || reco_master != dlm->reco.new_master) { - /* show extra debug info if the recovery state is messed */ - mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), " - "request(dead=%u, master=%u)\n", - dlm->name, dlm->reco.dead_node, dlm->reco.new_master, - dead_node, reco_master); - mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u " - "entry[0]={c=%u:%llu,l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n", - dlm->name, mres->lockname_len, mres->lockname, mres->master, - mres->num_locks, mres->total_locks, mres->flags, - dlm_get_lock_cookie_node(mres->ml[0].cookie), - dlm_get_lock_cookie_seq(mres->ml[0].cookie), - mres->ml[0].list, mres->ml[0].flags, - mres->ml[0].type, mres->ml[0].convert_type, - mres->ml[0].highest_blocked, mres->ml[0].node); - BUG(); + /* worker could have been created before the recovery master + * died. if so, do not continue, but do not error. */ + if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { + mlog(ML_NOTICE, "%s: will not send recovery state, " + "recovery master %u died, thread=(dead=%u,mas=%u)" + " current=(dead=%u,mas=%u)\n", dlm->name, + reco_master, dead_node, reco_master, + dlm->reco.dead_node, dlm->reco.new_master); + } else { + mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, " + "master=%u), request(dead=%u, master=%u)\n", + dlm->name, dlm->reco.dead_node, + dlm->reco.new_master, dead_node, reco_master); + } + goto leave; } - BUG_ON(dead_node != dlm->reco.dead_node); - BUG_ON(reco_master != dlm->reco.new_master); /* lock resources should have already been moved to the * dlm->reco.resources list. now move items from that list @@ -889,7 +897,7 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) dlm->name, reco_master, dead_node, ret); } } - +leave: free_page((unsigned long)data); } -- cgit v0.10.2 From 67a187412baa84dfff2d423961d86663b7fc7d3c Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 13:50:12 -0700 Subject: ocfs2: remove unneccesary spin_unlock() in dlm_remaster_locks() Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 22a0b05..4e0aada 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -624,7 +624,6 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) "requesting recovery info for " "node %u\n", ndata->node_num, dead_node); - spin_unlock(&dlm_reco_state_lock); break; case DLM_RECO_NODE_DATA_RECEIVING: case DLM_RECO_NODE_DATA_REQUESTED: -- cgit v0.10.2 From c27069e6cfa242a3b84eb3442934c6fe51ee9066 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 13:51:49 -0700 Subject: ocfs2: continue recovery when a dead node is encountered Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 4e0aada..c699a45 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -2541,6 +2541,7 @@ stage2: mlog(ML_ERROR, "node %u went down after this " "node finished recovery.\n", nodenum); ret = 0; + continue; } break; } -- cgit v0.10.2 From b7084ab538ac2bd71ce494cf1cbbea9fe9db2c07 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 13:54:07 -0700 Subject: ocfs2: wait for recovery when starting lock mastery Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 829cc39..9bea5c6 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -710,6 +710,7 @@ void dlm_wait_for_recovery(struct dlm_ctxt *dlm); void dlm_kick_recovery_thread(struct dlm_ctxt *dlm); int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node); int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout); +int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout); void dlm_put(struct dlm_ctxt *dlm); struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm); diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 78ba77b..ed1601d 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -899,6 +899,9 @@ redo_request: } else wait_on_recovery = 0; spin_unlock(&dlm->spinlock); + + if (wait_on_recovery) + dlm_wait_for_node_recovery(dlm, bit, 10000); } /* must wait for lock to be mastered elsewhere */ diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index c699a45..b03feca 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -343,6 +343,18 @@ int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node) return dead; } +/* returns true if node is no longer in the domain + * could be dead or just not joined */ +int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node) +{ + int recovered; + spin_lock(&dlm->spinlock); + recovered = !test_bit(node, dlm->recovery_map); + spin_unlock(&dlm->spinlock); + return recovered; +} + + int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) { if (timeout) { @@ -361,6 +373,24 @@ int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) return 0; } +int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout) +{ + if (timeout) { + mlog(0, "%s: waiting %dms for notification of " + "recovery of node %u\n", dlm->name, timeout, node); + wait_event_timeout(dlm->dlm_reco_thread_wq, + dlm_is_node_recovered(dlm, node), + msecs_to_jiffies(timeout)); + } else { + mlog(0, "%s: waiting indefinitely for notification " + "of recovery of node %u\n", dlm->name, node); + wait_event(dlm->dlm_reco_thread_wq, + dlm_is_node_recovered(dlm, node)); + } + /* for now, return 0 */ + return 0; +} + /* callers of the top-level api calls (dlmlock/dlmunlock) should * block on the dlm->reco.event when recovery is in progress. * the dlm recovery thread will set this state when it begins -- cgit v0.10.2 From ad8100e0d20e0123def9f83c040b68c96c8638f0 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 14:25:21 -0700 Subject: ocfs2: use GFP_NOFS in some dlm operations Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmfs.c b/fs/ocfs2/dlm/dlmfs.c index 7273d9f..033ad17 100644 --- a/fs/ocfs2/dlm/dlmfs.c +++ b/fs/ocfs2/dlm/dlmfs.c @@ -116,7 +116,7 @@ static int dlmfs_file_open(struct inode *inode, * doesn't make sense for LVB writes. */ file->f_flags &= ~O_APPEND; - fp = kmalloc(sizeof(*fp), GFP_KERNEL); + fp = kmalloc(sizeof(*fp), GFP_NOFS); if (!fp) { status = -ENOMEM; goto bail; @@ -196,7 +196,7 @@ static ssize_t dlmfs_file_read(struct file *filp, else readlen = count - *ppos; - lvb_buf = kmalloc(readlen, GFP_KERNEL); + lvb_buf = kmalloc(readlen, GFP_NOFS); if (!lvb_buf) return -ENOMEM; @@ -240,7 +240,7 @@ static ssize_t dlmfs_file_write(struct file *filp, else writelen = count - *ppos; - lvb_buf = kmalloc(writelen, GFP_KERNEL); + lvb_buf = kmalloc(writelen, GFP_NOFS); if (!lvb_buf) return -ENOMEM; diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c index 20b38dc..e3cab0e 100644 --- a/fs/ocfs2/dlm/dlmlock.c +++ b/fs/ocfs2/dlm/dlmlock.c @@ -408,13 +408,13 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, struct dlm_lock *lock; int kernel_allocated = 0; - lock = kcalloc(1, sizeof(*lock), GFP_KERNEL); + lock = kcalloc(1, sizeof(*lock), GFP_NOFS); if (!lock) return NULL; if (!lksb) { /* zero memory only if kernel-allocated */ - lksb = kcalloc(1, sizeof(*lksb), GFP_KERNEL); + lksb = kcalloc(1, sizeof(*lksb), GFP_NOFS); if (!lksb) { kfree(lock); return NULL; diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index ed1601d..b780e159 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -709,11 +709,11 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, { struct dlm_lock_resource *res; - res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); + res = kmalloc(sizeof(struct dlm_lock_resource), GFP_NOFS); if (!res) return NULL; - res->lockname.name = kmalloc(namelen, GFP_KERNEL); + res->lockname.name = kmalloc(namelen, GFP_NOFS); if (!res->lockname.name) { kfree(res); return NULL; @@ -777,7 +777,7 @@ lookup: mlog(0, "allocating a new resource\n"); /* nothing found and we need to allocate one. */ alloc_mle = (struct dlm_master_list_entry *) - kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); + kmem_cache_alloc(dlm_mle_cache, GFP_NOFS); if (!alloc_mle) goto leave; res = dlm_new_lockres(dlm, lockid, namelen); @@ -1532,7 +1532,7 @@ way_up_top: spin_unlock(&dlm->spinlock); mle = (struct dlm_master_list_entry *) - kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); + kmem_cache_alloc(dlm_mle_cache, GFP_NOFS); if (!mle) { response = DLM_MASTER_RESP_ERROR; mlog_errno(-ENOMEM); @@ -1940,7 +1940,7 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, int ignore_higher, u8 request_from, u32 flags) { struct dlm_work_item *item; - item = kcalloc(1, sizeof(*item), GFP_KERNEL); + item = kcalloc(1, sizeof(*item), GFP_NOFS); if (!item) return -ENOMEM; @@ -2175,14 +2175,14 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, */ ret = -ENOMEM; - mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL); + mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS); if (!mres) { mlog_errno(ret); goto leave; } mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, - GFP_KERNEL); + GFP_NOFS); if (!mle) { mlog_errno(ret); goto leave; @@ -2639,7 +2639,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) /* preallocate.. if this fails, abort */ mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, - GFP_KERNEL); + GFP_NOFS); if (!mle) { ret = -ENOMEM; diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index b03feca..f76b498 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -737,7 +737,7 @@ static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node) } BUG_ON(num == dead_node); - ndata = kcalloc(1, sizeof(*ndata), GFP_KERNEL); + ndata = kcalloc(1, sizeof(*ndata), GFP_NOFS); if (!ndata) { dlm_destroy_recovery_area(dlm, dead_node); return -ENOMEM; @@ -822,14 +822,14 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data) } BUG_ON(lr->dead_node != dlm->reco.dead_node); - item = kcalloc(1, sizeof(*item), GFP_KERNEL); + item = kcalloc(1, sizeof(*item), GFP_NOFS); if (!item) { dlm_put(dlm); return -ENOMEM; } /* this will get freed by dlm_request_all_locks_worker */ - buf = (char *) __get_free_page(GFP_KERNEL); + buf = (char *) __get_free_page(GFP_NOFS); if (!buf) { kfree(item); dlm_put(dlm); @@ -1302,8 +1302,8 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data) mlog(0, "all done flag. all lockres data received!\n"); ret = -ENOMEM; - buf = kmalloc(be16_to_cpu(msg->data_len), GFP_KERNEL); - item = kcalloc(1, sizeof(*item), GFP_KERNEL); + buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS); + item = kcalloc(1, sizeof(*item), GFP_NOFS); if (!buf || !item) goto leave; diff --git a/fs/ocfs2/dlm/userdlm.c b/fs/ocfs2/dlm/userdlm.c index 74ca4e5f..e641b08 100644 --- a/fs/ocfs2/dlm/userdlm.c +++ b/fs/ocfs2/dlm/userdlm.c @@ -672,7 +672,7 @@ struct dlm_ctxt *user_dlm_register_context(struct qstr *name) u32 dlm_key; char *domain; - domain = kmalloc(name->len + 1, GFP_KERNEL); + domain = kmalloc(name->len + 1, GFP_NOFS); if (!domain) { mlog_errno(-ENOMEM); return ERR_PTR(-ENOMEM); -- cgit v0.10.2 From f85cd47a5825b77a146bad6870b2ddcf08415c13 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 14:27:41 -0700 Subject: ocfs2: use cond_resched() in dlm_thread() yield() does not yield. cond_resched() does. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index c1c10fd..0c822f3 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -731,7 +731,7 @@ in_progress: /* yield and continue right away if there is more work to do */ if (!n) { - yield(); + cond_resched(); continue; } -- cgit v0.10.2 From b220532a71adf65d45c3aa8a284bfa7ec57957bd Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 14:29:28 -0700 Subject: ocfs2: retry operations when a lock is marked in recovery Before checking for a nonexistent lock, make sure the lockres is not marked RECOVERING. The caller will just retry and the state should be fixed up when recovery completes. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c index 90cbaaf..2f7ad52 100644 --- a/fs/ocfs2/dlm/dlmconvert.c +++ b/fs/ocfs2/dlm/dlmconvert.c @@ -464,6 +464,12 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data) } spin_lock(&res->spinlock); + status = __dlm_lockres_state_to_status(res); + if (status != DLM_NORMAL) { + spin_unlock(&res->spinlock); + dlm_error(status); + goto leave; + } list_for_each(iter, &res->granted) { lock = list_entry(iter, struct dlm_lock, list); if (lock->ml.cookie == cnv->cookie && @@ -473,6 +479,20 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data) } lock = NULL; } + if (!lock) { + __dlm_print_one_lock_resource(res); + list_for_each(iter, &res->granted) { + lock = list_entry(iter, struct dlm_lock, list); + if (lock->ml.node == cnv->node_idx) { + mlog(0, "There is something here " + "for node %u, lock->ml.cookie=%llu, " + "cnv->cookie=%llu\n", cnv->node_idx, + lock->ml.cookie, cnv->cookie); + break; + } + } + lock = NULL; + } spin_unlock(&res->spinlock); if (!lock) { status = DLM_IVLOCKID; -- cgit v0.10.2 From 44a7f1d063bbe45773353903f36d9d88fb73d82a Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 14:29:59 -0700 Subject: ocfs2: mlog in dlm_convert_lock_handler() should be ML_ERROR Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c index 2f7ad52..b24fa53 100644 --- a/fs/ocfs2/dlm/dlmconvert.c +++ b/fs/ocfs2/dlm/dlmconvert.c @@ -484,7 +484,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data) list_for_each(iter, &res->granted) { lock = list_entry(iter, struct dlm_lock, list); if (lock->ml.node == cnv->node_idx) { - mlog(0, "There is something here " + mlog(ML_ERROR, "There is something here " "for node %u, lock->ml.cookie=%llu, " "cnv->cookie=%llu\n", cnv->node_idx, lock->ml.cookie, cnv->cookie); -- cgit v0.10.2 From 56a7c104bc91b4a5f970d7372ebb04eebc633c68 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 14:30:39 -0700 Subject: ocfs2: display message before waiting for recovery to complete Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index f76b498..08336ae 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -409,6 +409,13 @@ static int dlm_in_recovery(struct dlm_ctxt *dlm) void dlm_wait_for_recovery(struct dlm_ctxt *dlm) { + if (dlm_in_recovery(dlm)) { + mlog(ML_NOTICE, "%s: reco thread %d in recovery: " + "state=%d, master=%u, dead=%u\n", + dlm->name, dlm->dlm_reco_thread_task->pid, + dlm->reco.state, dlm->reco.new_master, + dlm->reco.dead_node); + } wait_event(dlm->reco.event, !dlm_in_recovery(dlm)); } -- cgit v0.10.2 From 3b3b84a892d37ba336391e411eb5f8b013b9a669 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 14:31:37 -0700 Subject: ocfs2: tune down some noisy messages during dlm recovery Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index b780e159..f186e09 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -1639,13 +1639,13 @@ again: tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key, &assert, sizeof(assert), to, &r); if (tmpret < 0) { - mlog(ML_ERROR, "assert_master returned %d!\n", tmpret); + mlog(0, "assert_master returned %d!\n", tmpret); if (!dlm_is_host_down(tmpret)) { - mlog(ML_ERROR, "unhandled error!\n"); + mlog(ML_ERROR, "unhandled error=%d!\n", tmpret); BUG(); } /* a node died. finish out the rest of the nodes. */ - mlog(ML_ERROR, "link to %d went down!\n", to); + mlog(0, "link to %d went down!\n", to); /* any nonzero status return will do */ ret = tmpret; } else if (r < 0) { @@ -2029,7 +2029,8 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data) nodemap, flags); if (ret < 0) { /* no need to restart, we are done */ - mlog_errno(ret); + if (!dlm_is_host_down(ret)) + mlog_errno(ret); } /* Ok, we've asserted ourselves. Let's let migration start. */ @@ -2808,7 +2809,7 @@ top: * may result in the mle being unlinked and * freed, but there may still be a process * waiting in the dlmlock path which is fine. */ - mlog(ML_ERROR, "node %u was expected master\n", + mlog(0, "node %u was expected master\n", dead_node); atomic_set(&mle->woken, 1); spin_unlock(&mle->spinlock); diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 08336ae..f0549e9 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -410,7 +410,7 @@ static int dlm_in_recovery(struct dlm_ctxt *dlm) void dlm_wait_for_recovery(struct dlm_ctxt *dlm) { if (dlm_in_recovery(dlm)) { - mlog(ML_NOTICE, "%s: reco thread %d in recovery: " + mlog(0, "%s: reco thread %d in recovery: " "state=%d, master=%u, dead=%u\n", dlm->name, dlm->dlm_reco_thread_task->pid, dlm->reco.state, dlm->reco.new_master, -- cgit v0.10.2 From 495ac96e638cb0ad33baa7113531d742bfb328d4 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 14:34:08 -0700 Subject: ocfs2: fix incorrect error returns Use DLM_REJECTED instead of DLM_RECOVERING. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c index e3cab0e..d6f8957 100644 --- a/fs/ocfs2/dlm/dlmlock.c +++ b/fs/ocfs2/dlm/dlmlock.c @@ -299,7 +299,7 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, if (tmpret >= 0) { // successfully sent and received ret = status; // this is already a dlm_status - if (ret == DLM_RECOVERING) { + if (ret == DLM_REJECTED) { mlog(ML_ERROR, "%s:%.*s: BUG. this is a stale lockres " "no longer owned by %u. that node is coming back " "up currently.\n", dlm->name, create.namelen, @@ -457,7 +457,7 @@ int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data) name = create->name; namelen = create->namelen; - status = DLM_RECOVERING; + status = DLM_REJECTED; if (!dlm_domain_fully_joined(dlm)) { mlog(ML_ERROR, "Domain %s not fully joined, but node %u is " "sending a create_lock message for lock %.*s!\n", -- cgit v0.10.2 From 3156d267016627fe427a6b0d4ed8a9678557e91e Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 14:39:29 -0700 Subject: ocfs2: move dlm work to a private work queue The work that is done can block for long periods of time and so is not appropriate for keventd. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 9bea5c6..9bdc9cf 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -131,6 +131,7 @@ struct dlm_ctxt struct o2hb_callback_func dlm_hb_down; struct task_struct *dlm_thread_task; struct task_struct *dlm_reco_thread_task; + struct workqueue_struct *dlm_worker; wait_queue_head_t dlm_thread_wq; wait_queue_head_t dlm_reco_thread_wq; wait_queue_head_t ast_wq; diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index 3511930..c3462ea 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -303,11 +303,21 @@ int dlm_domain_fully_joined(struct dlm_ctxt *dlm) return ret; } +static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm) +{ + if (dlm->dlm_worker) { + flush_workqueue(dlm->dlm_worker); + destroy_workqueue(dlm->dlm_worker); + dlm->dlm_worker = NULL; + } +} + static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm) { dlm_unregister_domain_handlers(dlm); dlm_complete_thread(dlm); dlm_complete_recovery_thread(dlm); + dlm_destroy_dlm_worker(dlm); /* We've left the domain. Now we can take ourselves out of the * list and allow the kref stuff to help us free the @@ -1151,6 +1161,13 @@ static int dlm_join_domain(struct dlm_ctxt *dlm) goto bail; } + dlm->dlm_worker = create_singlethread_workqueue("dlm_wq"); + if (!dlm->dlm_worker) { + status = -ENOMEM; + mlog_errno(status); + goto bail; + } + do { unsigned int backoff; status = dlm_try_to_join_domain(dlm); @@ -1191,6 +1208,7 @@ bail: dlm_unregister_domain_handlers(dlm); dlm_complete_thread(dlm); dlm_complete_recovery_thread(dlm); + dlm_destroy_dlm_worker(dlm); } return status; @@ -1256,6 +1274,7 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, dlm->dlm_thread_task = NULL; dlm->dlm_reco_thread_task = NULL; + dlm->dlm_worker = NULL; init_waitqueue_head(&dlm->dlm_thread_wq); init_waitqueue_head(&dlm->dlm_reco_thread_wq); init_waitqueue_head(&dlm->reco.event); diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index f186e09..733d9a5 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -1962,7 +1962,7 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, list_add_tail(&item->list, &dlm->work_list); spin_unlock(&dlm->work_lock); - schedule_work(&dlm->dispatched_work); + queue_work(dlm->dlm_worker, &dlm->dispatched_work); return 0; } diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index f0549e9..22e6a5b 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -157,12 +157,21 @@ void dlm_dispatch_work(void *data) struct list_head *iter, *iter2; struct dlm_work_item *item; dlm_workfunc_t *workfunc; + int tot=0; + + if (!dlm_joined(dlm)) + return; spin_lock(&dlm->work_lock); list_splice_init(&dlm->work_list, &tmp_list); spin_unlock(&dlm->work_lock); list_for_each_safe(iter, iter2, &tmp_list) { + tot++; + } + mlog(0, "%s: work thread has %d work items\n", dlm->name, tot); + + list_for_each_safe(iter, iter2, &tmp_list) { item = list_entry(iter, struct dlm_work_item, list); workfunc = item->func; list_del_init(&item->list); @@ -851,7 +860,7 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data) spin_lock(&dlm->work_lock); list_add_tail(&item->list, &dlm->work_list); spin_unlock(&dlm->work_lock); - schedule_work(&dlm->dispatched_work); + queue_work(dlm->dlm_worker, &dlm->dispatched_work); dlm_put(dlm); return 0; @@ -1401,7 +1410,7 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data) spin_lock(&dlm->work_lock); list_add_tail(&item->list, &dlm->work_list); spin_unlock(&dlm->work_lock); - schedule_work(&dlm->dispatched_work); + queue_work(dlm->dlm_worker, &dlm->dispatched_work); leave: dlm_put(dlm); -- cgit v0.10.2 From 0032abd674a606c3cf2d9961c8119f5d72c411d0 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Mon, 1 May 2006 14:39:57 -0700 Subject: ocfs2: remove whitespace in dlmunlock.c Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmunlock.c b/fs/ocfs2/dlm/dlmunlock.c index 9b40d60..b0c3134 100644 --- a/fs/ocfs2/dlm/dlmunlock.c +++ b/fs/ocfs2/dlm/dlmunlock.c @@ -317,7 +317,7 @@ static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm, size_t veclen = 1; mlog_entry("%.*s\n", res->lockname.len, res->lockname.name); - + if (owner == dlm->node_num) { /* ended up trying to contact ourself. this means * that the lockres had been remote but became local -- cgit v0.10.2 From 8a9343fa24d8d3fcb189bed2b7afcf4b8a8c1c8d Mon Sep 17 00:00:00 2001 From: Mark Fasheh Date: Mon, 1 May 2006 14:55:10 -0700 Subject: ocfs2: dlm_print_one_mle() needs to be defined Fixes compile breakage. Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 733d9a5..1b38c55 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -128,11 +128,8 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm, return 1; } -#if 0 -/* Code here is included but defined out as it aids debugging */ - #define dlm_print_nodemap(m) _dlm_print_nodemap(m,#m) -void _dlm_print_nodemap(unsigned long *map, const char *mapname) +static void _dlm_print_nodemap(unsigned long *map, const char *mapname) { int i; printk("%s=[ ", mapname); @@ -142,7 +139,7 @@ void _dlm_print_nodemap(unsigned long *map, const char *mapname) printk("]"); } -void dlm_print_one_mle(struct dlm_master_list_entry *mle) +static void dlm_print_one_mle(struct dlm_master_list_entry *mle) { int refs; char *type; @@ -189,6 +186,9 @@ void dlm_print_one_mle(struct dlm_master_list_entry *mle) printk("\n"); } +#if 0 +/* Code here is included but defined out as it aids debugging */ + static void dlm_dump_mles(struct dlm_ctxt *dlm) { struct dlm_master_list_entry *mle; @@ -943,7 +943,7 @@ wait: dlm->name, res->lockname.len, res->lockname.name, blocked); dlm_print_one_lock_resource(res); - /* dlm_print_one_mle(mle); */ + dlm_print_one_mle(mle); tries = 0; } goto redo_request; -- cgit v0.10.2 From 43dee336c903fae15783b90983dfdedd2c7ffefc Mon Sep 17 00:00:00 2001 From: Mark Fasheh Date: Mon, 1 May 2006 14:56:57 -0700 Subject: ocfs2: fix compiler warnings in dlm_convert_lock_handler() We need to cast to unsigned long long. Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c index b24fa53..c764dc8 100644 --- a/fs/ocfs2/dlm/dlmconvert.c +++ b/fs/ocfs2/dlm/dlmconvert.c @@ -487,7 +487,8 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data) mlog(ML_ERROR, "There is something here " "for node %u, lock->ml.cookie=%llu, " "cnv->cookie=%llu\n", cnv->node_idx, - lock->ml.cookie, cnv->cookie); + (unsigned long long)lock->ml.cookie, + (unsigned long long)cnv->cookie); break; } } -- cgit v0.10.2 From 3fb5a9891dbb553dda96783dbc0dc4e77cbb2529 Mon Sep 17 00:00:00 2001 From: Adrian Bunk Date: Tue, 16 May 2006 17:26:41 +0200 Subject: [PATCH] fs/ocfs2/dlm/: cleanups This patch #if 0's the no longer used dlm_dump_lock_resources(). Since this makes dlmdebug.h empty, this patch also removes this header. Additionally, the needlessly global dlm_is_node_recovered() is made static. Signed-off-by: Adrian Bunk Signed-off-by: Mark Fasheh diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c index e119e4d..3f6c8d8 100644 --- a/fs/ocfs2/dlm/dlmdebug.c +++ b/fs/ocfs2/dlm/dlmdebug.c @@ -37,10 +37,8 @@ #include "dlmapi.h" #include "dlmcommon.h" -#include "dlmdebug.h" #include "dlmdomain.h" -#include "dlmdebug.h" #define MLOG_MASK_PREFIX ML_DLM #include "cluster/masklog.h" @@ -120,6 +118,7 @@ void dlm_print_one_lock(struct dlm_lock *lockid) } EXPORT_SYMBOL_GPL(dlm_print_one_lock); +#if 0 void dlm_dump_lock_resources(struct dlm_ctxt *dlm) { struct dlm_lock_resource *res; @@ -142,6 +141,7 @@ void dlm_dump_lock_resources(struct dlm_ctxt *dlm) } spin_unlock(&dlm->spinlock); } +#endif /* 0 */ static const char *dlm_errnames[] = { [DLM_NORMAL] = "DLM_NORMAL", diff --git a/fs/ocfs2/dlm/dlmdebug.h b/fs/ocfs2/dlm/dlmdebug.h deleted file mode 100644 index 6858510..0000000 --- a/fs/ocfs2/dlm/dlmdebug.h +++ /dev/null @@ -1,30 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; -*- - * vim: noexpandtab sw=8 ts=8 sts=0: - * - * dlmdebug.h - * - * Copyright (C) 2004 Oracle. All rights reserved. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 021110-1307, USA. - * - */ - -#ifndef DLMDEBUG_H -#define DLMDEBUG_H - -void dlm_dump_lock_resources(struct dlm_ctxt *dlm); - -#endif diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index c3462ea..ba27c5c 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -41,7 +41,6 @@ #include "dlmapi.h" #include "dlmcommon.h" -#include "dlmdebug.h" #include "dlmdomain.h" #include "dlmver.h" diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 1b38c55..1b8346d 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -47,7 +47,6 @@ #include "dlmapi.h" #include "dlmcommon.h" -#include "dlmdebug.h" #include "dlmdomain.h" #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER) diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 22e6a5b..da39901 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -354,7 +354,7 @@ int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node) /* returns true if node is no longer in the domain * could be dead or just not joined */ -int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node) +static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node) { int recovered; spin_lock(&dlm->spinlock); -- cgit v0.10.2