Linux-libre 3.16.41-gnu
[librecmc/linux-libre.git] / fs / ocfs2 / dlm / dlmmaster.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmmod.c
5  *
6  * standalone DLM module
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26
27
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/init.h>
34 #include <linux/sysctl.h>
35 #include <linux/random.h>
36 #include <linux/blkdev.h>
37 #include <linux/socket.h>
38 #include <linux/inet.h>
39 #include <linux/spinlock.h>
40 #include <linux/delay.h>
41
42
43 #include "cluster/heartbeat.h"
44 #include "cluster/nodemanager.h"
45 #include "cluster/tcp.h"
46
47 #include "dlmapi.h"
48 #include "dlmcommon.h"
49 #include "dlmdomain.h"
50 #include "dlmdebug.h"
51
52 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
53 #include "cluster/masklog.h"
54
55 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
56                               struct dlm_master_list_entry *mle,
57                               struct o2nm_node *node,
58                               int idx);
59 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
60                             struct dlm_master_list_entry *mle,
61                             struct o2nm_node *node,
62                             int idx);
63
64 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
65 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
66                                 struct dlm_lock_resource *res,
67                                 void *nodemap, u32 flags);
68 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
69
70 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
71                                 struct dlm_master_list_entry *mle,
72                                 const char *name,
73                                 unsigned int namelen)
74 {
75         if (dlm != mle->dlm)
76                 return 0;
77
78         if (namelen != mle->mnamelen ||
79             memcmp(name, mle->mname, namelen) != 0)
80                 return 0;
81
82         return 1;
83 }
84
85 static struct kmem_cache *dlm_lockres_cache;
86 static struct kmem_cache *dlm_lockname_cache;
87 static struct kmem_cache *dlm_mle_cache;
88
89 static void dlm_mle_release(struct kref *kref);
90 static void dlm_init_mle(struct dlm_master_list_entry *mle,
91                         enum dlm_mle_type type,
92                         struct dlm_ctxt *dlm,
93                         struct dlm_lock_resource *res,
94                         const char *name,
95                         unsigned int namelen);
96 static void dlm_put_mle(struct dlm_master_list_entry *mle);
97 static void __dlm_put_mle(struct dlm_master_list_entry *mle);
98 static int dlm_find_mle(struct dlm_ctxt *dlm,
99                         struct dlm_master_list_entry **mle,
100                         char *name, unsigned int namelen);
101
102 static int dlm_do_master_request(struct dlm_lock_resource *res,
103                                  struct dlm_master_list_entry *mle, int to);
104
105
106 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
107                                      struct dlm_lock_resource *res,
108                                      struct dlm_master_list_entry *mle,
109                                      int *blocked);
110 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
111                                     struct dlm_lock_resource *res,
112                                     struct dlm_master_list_entry *mle,
113                                     int blocked);
114 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
115                                  struct dlm_lock_resource *res,
116                                  struct dlm_master_list_entry *mle,
117                                  struct dlm_master_list_entry **oldmle,
118                                  const char *name, unsigned int namelen,
119                                  u8 new_master, u8 master);
120
121 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
122                                     struct dlm_lock_resource *res);
123 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
124                                       struct dlm_lock_resource *res);
125 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
126                                        struct dlm_lock_resource *res,
127                                        u8 target);
128 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
129                                        struct dlm_lock_resource *res);
130
131
132 int dlm_is_host_down(int errno)
133 {
134         switch (errno) {
135                 case -EBADF:
136                 case -ECONNREFUSED:
137                 case -ENOTCONN:
138                 case -ECONNRESET:
139                 case -EPIPE:
140                 case -EHOSTDOWN:
141                 case -EHOSTUNREACH:
142                 case -ETIMEDOUT:
143                 case -ECONNABORTED:
144                 case -ENETDOWN:
145                 case -ENETUNREACH:
146                 case -ENETRESET:
147                 case -ESHUTDOWN:
148                 case -ENOPROTOOPT:
149                 case -EINVAL:   /* if returned from our tcp code,
150                                    this means there is no socket */
151                         return 1;
152         }
153         return 0;
154 }
155
156
157 /*
158  * MASTER LIST FUNCTIONS
159  */
160
161
162 /*
163  * regarding master list entries and heartbeat callbacks:
164  *
165  * in order to avoid sleeping and allocation that occurs in
166  * heartbeat, master list entries are simply attached to the
167  * dlm's established heartbeat callbacks.  the mle is attached
168  * when it is created, and since the dlm->spinlock is held at
169  * that time, any heartbeat event will be properly discovered
170  * by the mle.  the mle needs to be detached from the
171  * dlm->mle_hb_events list as soon as heartbeat events are no
172  * longer useful to the mle, and before the mle is freed.
173  *
174  * as a general rule, heartbeat events are no longer needed by
175  * the mle once an "answer" regarding the lock master has been
176  * received.
177  */
178 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
179                                               struct dlm_master_list_entry *mle)
180 {
181         assert_spin_locked(&dlm->spinlock);
182
183         list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
184 }
185
186
187 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
188                                               struct dlm_master_list_entry *mle)
189 {
190         if (!list_empty(&mle->hb_events))
191                 list_del_init(&mle->hb_events);
192 }
193
194
195 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
196                                             struct dlm_master_list_entry *mle)
197 {
198         spin_lock(&dlm->spinlock);
199         __dlm_mle_detach_hb_events(dlm, mle);
200         spin_unlock(&dlm->spinlock);
201 }
202
203 static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
204 {
205         struct dlm_ctxt *dlm;
206         dlm = mle->dlm;
207
208         assert_spin_locked(&dlm->spinlock);
209         assert_spin_locked(&dlm->master_lock);
210         mle->inuse++;
211         kref_get(&mle->mle_refs);
212 }
213
214 static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
215 {
216         struct dlm_ctxt *dlm;
217         dlm = mle->dlm;
218
219         spin_lock(&dlm->spinlock);
220         spin_lock(&dlm->master_lock);
221         mle->inuse--;
222         __dlm_put_mle(mle);
223         spin_unlock(&dlm->master_lock);
224         spin_unlock(&dlm->spinlock);
225
226 }
227
228 /* remove from list and free */
229 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
230 {
231         struct dlm_ctxt *dlm;
232         dlm = mle->dlm;
233
234         assert_spin_locked(&dlm->spinlock);
235         assert_spin_locked(&dlm->master_lock);
236         if (!atomic_read(&mle->mle_refs.refcount)) {
237                 /* this may or may not crash, but who cares.
238                  * it's a BUG. */
239                 mlog(ML_ERROR, "bad mle: %p\n", mle);
240                 dlm_print_one_mle(mle);
241                 BUG();
242         } else
243                 kref_put(&mle->mle_refs, dlm_mle_release);
244 }
245
246
247 /* must not have any spinlocks coming in */
248 static void dlm_put_mle(struct dlm_master_list_entry *mle)
249 {
250         struct dlm_ctxt *dlm;
251         dlm = mle->dlm;
252
253         spin_lock(&dlm->spinlock);
254         spin_lock(&dlm->master_lock);
255         __dlm_put_mle(mle);
256         spin_unlock(&dlm->master_lock);
257         spin_unlock(&dlm->spinlock);
258 }
259
260 static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
261 {
262         kref_get(&mle->mle_refs);
263 }
264
265 static void dlm_init_mle(struct dlm_master_list_entry *mle,
266                         enum dlm_mle_type type,
267                         struct dlm_ctxt *dlm,
268                         struct dlm_lock_resource *res,
269                         const char *name,
270                         unsigned int namelen)
271 {
272         assert_spin_locked(&dlm->spinlock);
273
274         mle->dlm = dlm;
275         mle->type = type;
276         INIT_HLIST_NODE(&mle->master_hash_node);
277         INIT_LIST_HEAD(&mle->hb_events);
278         memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
279         spin_lock_init(&mle->spinlock);
280         init_waitqueue_head(&mle->wq);
281         atomic_set(&mle->woken, 0);
282         kref_init(&mle->mle_refs);
283         memset(mle->response_map, 0, sizeof(mle->response_map));
284         mle->master = O2NM_MAX_NODES;
285         mle->new_master = O2NM_MAX_NODES;
286         mle->inuse = 0;
287
288         BUG_ON(mle->type != DLM_MLE_BLOCK &&
289                mle->type != DLM_MLE_MASTER &&
290                mle->type != DLM_MLE_MIGRATION);
291
292         if (mle->type == DLM_MLE_MASTER) {
293                 BUG_ON(!res);
294                 mle->mleres = res;
295                 memcpy(mle->mname, res->lockname.name, res->lockname.len);
296                 mle->mnamelen = res->lockname.len;
297                 mle->mnamehash = res->lockname.hash;
298         } else {
299                 BUG_ON(!name);
300                 mle->mleres = NULL;
301                 memcpy(mle->mname, name, namelen);
302                 mle->mnamelen = namelen;
303                 mle->mnamehash = dlm_lockid_hash(name, namelen);
304         }
305
306         atomic_inc(&dlm->mle_tot_count[mle->type]);
307         atomic_inc(&dlm->mle_cur_count[mle->type]);
308
309         /* copy off the node_map and register hb callbacks on our copy */
310         memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
311         memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
312         clear_bit(dlm->node_num, mle->vote_map);
313         clear_bit(dlm->node_num, mle->node_map);
314
315         /* attach the mle to the domain node up/down events */
316         __dlm_mle_attach_hb_events(dlm, mle);
317 }
318
319 void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
320 {
321         assert_spin_locked(&dlm->spinlock);
322         assert_spin_locked(&dlm->master_lock);
323
324         if (!hlist_unhashed(&mle->master_hash_node))
325                 hlist_del_init(&mle->master_hash_node);
326 }
327
328 void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
329 {
330         struct hlist_head *bucket;
331
332         assert_spin_locked(&dlm->master_lock);
333
334         bucket = dlm_master_hash(dlm, mle->mnamehash);
335         hlist_add_head(&mle->master_hash_node, bucket);
336 }
337
338 /* returns 1 if found, 0 if not */
339 static int dlm_find_mle(struct dlm_ctxt *dlm,
340                         struct dlm_master_list_entry **mle,
341                         char *name, unsigned int namelen)
342 {
343         struct dlm_master_list_entry *tmpmle;
344         struct hlist_head *bucket;
345         unsigned int hash;
346
347         assert_spin_locked(&dlm->master_lock);
348
349         hash = dlm_lockid_hash(name, namelen);
350         bucket = dlm_master_hash(dlm, hash);
351         hlist_for_each_entry(tmpmle, bucket, master_hash_node) {
352                 if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
353                         continue;
354                 dlm_get_mle(tmpmle);
355                 *mle = tmpmle;
356                 return 1;
357         }
358         return 0;
359 }
360
361 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
362 {
363         struct dlm_master_list_entry *mle;
364
365         assert_spin_locked(&dlm->spinlock);
366
367         list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
368                 if (node_up)
369                         dlm_mle_node_up(dlm, mle, NULL, idx);
370                 else
371                         dlm_mle_node_down(dlm, mle, NULL, idx);
372         }
373 }
374
375 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
376                               struct dlm_master_list_entry *mle,
377                               struct o2nm_node *node, int idx)
378 {
379         spin_lock(&mle->spinlock);
380
381         if (!test_bit(idx, mle->node_map))
382                 mlog(0, "node %u already removed from nodemap!\n", idx);
383         else
384                 clear_bit(idx, mle->node_map);
385
386         spin_unlock(&mle->spinlock);
387 }
388
389 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
390                             struct dlm_master_list_entry *mle,
391                             struct o2nm_node *node, int idx)
392 {
393         spin_lock(&mle->spinlock);
394
395         if (test_bit(idx, mle->node_map))
396                 mlog(0, "node %u already in node map!\n", idx);
397         else
398                 set_bit(idx, mle->node_map);
399
400         spin_unlock(&mle->spinlock);
401 }
402
403
404 int dlm_init_mle_cache(void)
405 {
406         dlm_mle_cache = kmem_cache_create("o2dlm_mle",
407                                           sizeof(struct dlm_master_list_entry),
408                                           0, SLAB_HWCACHE_ALIGN,
409                                           NULL);
410         if (dlm_mle_cache == NULL)
411                 return -ENOMEM;
412         return 0;
413 }
414
415 void dlm_destroy_mle_cache(void)
416 {
417         if (dlm_mle_cache)
418                 kmem_cache_destroy(dlm_mle_cache);
419 }
420
421 static void dlm_mle_release(struct kref *kref)
422 {
423         struct dlm_master_list_entry *mle;
424         struct dlm_ctxt *dlm;
425
426         mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
427         dlm = mle->dlm;
428
429         assert_spin_locked(&dlm->spinlock);
430         assert_spin_locked(&dlm->master_lock);
431
432         mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
433              mle->type);
434
435         /* remove from list if not already */
436         __dlm_unlink_mle(dlm, mle);
437
438         /* detach the mle from the domain node up/down events */
439         __dlm_mle_detach_hb_events(dlm, mle);
440
441         atomic_dec(&dlm->mle_cur_count[mle->type]);
442
443         /* NOTE: kfree under spinlock here.
444          * if this is bad, we can move this to a freelist. */
445         kmem_cache_free(dlm_mle_cache, mle);
446 }
447
448
449 /*
450  * LOCK RESOURCE FUNCTIONS
451  */
452
453 int dlm_init_master_caches(void)
454 {
455         dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
456                                               sizeof(struct dlm_lock_resource),
457                                               0, SLAB_HWCACHE_ALIGN, NULL);
458         if (!dlm_lockres_cache)
459                 goto bail;
460
461         dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
462                                                DLM_LOCKID_NAME_MAX, 0,
463                                                SLAB_HWCACHE_ALIGN, NULL);
464         if (!dlm_lockname_cache)
465                 goto bail;
466
467         return 0;
468 bail:
469         dlm_destroy_master_caches();
470         return -ENOMEM;
471 }
472
473 void dlm_destroy_master_caches(void)
474 {
475         if (dlm_lockname_cache) {
476                 kmem_cache_destroy(dlm_lockname_cache);
477                 dlm_lockname_cache = NULL;
478         }
479
480         if (dlm_lockres_cache) {
481                 kmem_cache_destroy(dlm_lockres_cache);
482                 dlm_lockres_cache = NULL;
483         }
484 }
485
486 static void dlm_lockres_release(struct kref *kref)
487 {
488         struct dlm_lock_resource *res;
489         struct dlm_ctxt *dlm;
490
491         res = container_of(kref, struct dlm_lock_resource, refs);
492         dlm = res->dlm;
493
494         /* This should not happen -- all lockres' have a name
495          * associated with them at init time. */
496         BUG_ON(!res->lockname.name);
497
498         mlog(0, "destroying lockres %.*s\n", res->lockname.len,
499              res->lockname.name);
500
501         spin_lock(&dlm->track_lock);
502         if (!list_empty(&res->tracking))
503                 list_del_init(&res->tracking);
504         else {
505                 mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
506                      res->lockname.len, res->lockname.name);
507                 dlm_print_one_lock_resource(res);
508         }
509         spin_unlock(&dlm->track_lock);
510
511         atomic_dec(&dlm->res_cur_count);
512
513         if (!hlist_unhashed(&res->hash_node) ||
514             !list_empty(&res->granted) ||
515             !list_empty(&res->converting) ||
516             !list_empty(&res->blocked) ||
517             !list_empty(&res->dirty) ||
518             !list_empty(&res->recovering) ||
519             !list_empty(&res->purge)) {
520                 mlog(ML_ERROR,
521                      "Going to BUG for resource %.*s."
522                      "  We're on a list! [%c%c%c%c%c%c%c]\n",
523                      res->lockname.len, res->lockname.name,
524                      !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
525                      !list_empty(&res->granted) ? 'G' : ' ',
526                      !list_empty(&res->converting) ? 'C' : ' ',
527                      !list_empty(&res->blocked) ? 'B' : ' ',
528                      !list_empty(&res->dirty) ? 'D' : ' ',
529                      !list_empty(&res->recovering) ? 'R' : ' ',
530                      !list_empty(&res->purge) ? 'P' : ' ');
531
532                 dlm_print_one_lock_resource(res);
533         }
534
535         /* By the time we're ready to blow this guy away, we shouldn't
536          * be on any lists. */
537         BUG_ON(!hlist_unhashed(&res->hash_node));
538         BUG_ON(!list_empty(&res->granted));
539         BUG_ON(!list_empty(&res->converting));
540         BUG_ON(!list_empty(&res->blocked));
541         BUG_ON(!list_empty(&res->dirty));
542         BUG_ON(!list_empty(&res->recovering));
543         BUG_ON(!list_empty(&res->purge));
544
545         kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
546
547         kmem_cache_free(dlm_lockres_cache, res);
548 }
549
550 void dlm_lockres_put(struct dlm_lock_resource *res)
551 {
552         kref_put(&res->refs, dlm_lockres_release);
553 }
554
555 static void dlm_init_lockres(struct dlm_ctxt *dlm,
556                              struct dlm_lock_resource *res,
557                              const char *name, unsigned int namelen)
558 {
559         char *qname;
560
561         /* If we memset here, we lose our reference to the kmalloc'd
562          * res->lockname.name, so be sure to init every field
563          * correctly! */
564
565         qname = (char *) res->lockname.name;
566         memcpy(qname, name, namelen);
567
568         res->lockname.len = namelen;
569         res->lockname.hash = dlm_lockid_hash(name, namelen);
570
571         init_waitqueue_head(&res->wq);
572         spin_lock_init(&res->spinlock);
573         INIT_HLIST_NODE(&res->hash_node);
574         INIT_LIST_HEAD(&res->granted);
575         INIT_LIST_HEAD(&res->converting);
576         INIT_LIST_HEAD(&res->blocked);
577         INIT_LIST_HEAD(&res->dirty);
578         INIT_LIST_HEAD(&res->recovering);
579         INIT_LIST_HEAD(&res->purge);
580         INIT_LIST_HEAD(&res->tracking);
581         atomic_set(&res->asts_reserved, 0);
582         res->migration_pending = 0;
583         res->inflight_locks = 0;
584         res->inflight_assert_workers = 0;
585
586         res->dlm = dlm;
587
588         kref_init(&res->refs);
589
590         atomic_inc(&dlm->res_tot_count);
591         atomic_inc(&dlm->res_cur_count);
592
593         /* just for consistency */
594         spin_lock(&res->spinlock);
595         dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
596         spin_unlock(&res->spinlock);
597
598         res->state = DLM_LOCK_RES_IN_PROGRESS;
599
600         res->last_used = 0;
601
602         spin_lock(&dlm->spinlock);
603         list_add_tail(&res->tracking, &dlm->tracking_list);
604         spin_unlock(&dlm->spinlock);
605
606         memset(res->lvb, 0, DLM_LVB_LEN);
607         memset(res->refmap, 0, sizeof(res->refmap));
608 }
609
610 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
611                                    const char *name,
612                                    unsigned int namelen)
613 {
614         struct dlm_lock_resource *res = NULL;
615
616         res = kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
617         if (!res)
618                 goto error;
619
620         res->lockname.name = kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
621         if (!res->lockname.name)
622                 goto error;
623
624         dlm_init_lockres(dlm, res, name, namelen);
625         return res;
626
627 error:
628         if (res && res->lockname.name)
629                 kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
630
631         if (res)
632                 kmem_cache_free(dlm_lockres_cache, res);
633         return NULL;
634 }
635
636 void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
637                                 struct dlm_lock_resource *res, int bit)
638 {
639         assert_spin_locked(&res->spinlock);
640
641         mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
642              res->lockname.name, bit, __builtin_return_address(0));
643
644         set_bit(bit, res->refmap);
645 }
646
647 void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
648                                   struct dlm_lock_resource *res, int bit)
649 {
650         assert_spin_locked(&res->spinlock);
651
652         mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
653              res->lockname.name, bit, __builtin_return_address(0));
654
655         clear_bit(bit, res->refmap);
656 }
657
658 static void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
659                                    struct dlm_lock_resource *res)
660 {
661         res->inflight_locks++;
662
663         mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
664              res->lockname.len, res->lockname.name, res->inflight_locks,
665              __builtin_return_address(0));
666 }
667
668 void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
669                                    struct dlm_lock_resource *res)
670 {
671         assert_spin_locked(&res->spinlock);
672         __dlm_lockres_grab_inflight_ref(dlm, res);
673 }
674
675 void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
676                                    struct dlm_lock_resource *res)
677 {
678         assert_spin_locked(&res->spinlock);
679
680         BUG_ON(res->inflight_locks == 0);
681
682         res->inflight_locks--;
683
684         mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
685              res->lockname.len, res->lockname.name, res->inflight_locks,
686              __builtin_return_address(0));
687
688         wake_up(&res->wq);
689 }
690
691 void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
692                 struct dlm_lock_resource *res)
693 {
694         assert_spin_locked(&res->spinlock);
695         res->inflight_assert_workers++;
696         mlog(0, "%s:%.*s: inflight assert worker++: now %u\n",
697                         dlm->name, res->lockname.len, res->lockname.name,
698                         res->inflight_assert_workers);
699 }
700
701 static void dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
702                 struct dlm_lock_resource *res)
703 {
704         spin_lock(&res->spinlock);
705         __dlm_lockres_grab_inflight_worker(dlm, res);
706         spin_unlock(&res->spinlock);
707 }
708
709 static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
710                 struct dlm_lock_resource *res)
711 {
712         assert_spin_locked(&res->spinlock);
713         BUG_ON(res->inflight_assert_workers == 0);
714         res->inflight_assert_workers--;
715         mlog(0, "%s:%.*s: inflight assert worker--: now %u\n",
716                         dlm->name, res->lockname.len, res->lockname.name,
717                         res->inflight_assert_workers);
718 }
719
720 static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
721                 struct dlm_lock_resource *res)
722 {
723         spin_lock(&res->spinlock);
724         __dlm_lockres_drop_inflight_worker(dlm, res);
725         spin_unlock(&res->spinlock);
726 }
727
728 /*
729  * lookup a lock resource by name.
730  * may already exist in the hashtable.
731  * lockid is null terminated
732  *
733  * if not, allocate enough for the lockres and for
734  * the temporary structure used in doing the mastering.
735  *
736  * also, do a lookup in the dlm->master_list to see
737  * if another node has begun mastering the same lock.
738  * if so, there should be a block entry in there
739  * for this name, and we should *not* attempt to master
740  * the lock here.   need to wait around for that node
741  * to assert_master (or die).
742  *
743  */
744 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
745                                           const char *lockid,
746                                           int namelen,
747                                           int flags)
748 {
749         struct dlm_lock_resource *tmpres=NULL, *res=NULL;
750         struct dlm_master_list_entry *mle = NULL;
751         struct dlm_master_list_entry *alloc_mle = NULL;
752         int blocked = 0;
753         int ret, nodenum;
754         struct dlm_node_iter iter;
755         unsigned int hash;
756         int tries = 0;
757         int bit, wait_on_recovery = 0;
758
759         BUG_ON(!lockid);
760
761         hash = dlm_lockid_hash(lockid, namelen);
762
763         mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
764
765 lookup:
766         spin_lock(&dlm->spinlock);
767         tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
768         if (tmpres) {
769                 spin_unlock(&dlm->spinlock);
770                 spin_lock(&tmpres->spinlock);
771
772                 /*
773                  * Right after dlm spinlock was released, dlm_thread could have
774                  * purged the lockres. Check if lockres got unhashed. If so
775                  * start over.
776                  */
777                 if (hlist_unhashed(&tmpres->hash_node)) {
778                         spin_unlock(&tmpres->spinlock);
779                         dlm_lockres_put(tmpres);
780                         tmpres = NULL;
781                         goto lookup;
782                 }
783
784                 /* Wait on the thread that is mastering the resource */
785                 if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
786                         __dlm_wait_on_lockres(tmpres);
787                         BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
788                         spin_unlock(&tmpres->spinlock);
789                         dlm_lockres_put(tmpres);
790                         tmpres = NULL;
791                         goto lookup;
792                 }
793
794                 /* Wait on the resource purge to complete before continuing */
795                 if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
796                         BUG_ON(tmpres->owner == dlm->node_num);
797                         __dlm_wait_on_lockres_flags(tmpres,
798                                                     DLM_LOCK_RES_DROPPING_REF);
799                         spin_unlock(&tmpres->spinlock);
800                         dlm_lockres_put(tmpres);
801                         tmpres = NULL;
802                         goto lookup;
803                 }
804
805                 /* Grab inflight ref to pin the resource */
806                 dlm_lockres_grab_inflight_ref(dlm, tmpres);
807
808                 spin_unlock(&tmpres->spinlock);
809                 if (res)
810                         dlm_lockres_put(res);
811                 res = tmpres;
812                 goto leave;
813         }
814
815         if (!res) {
816                 spin_unlock(&dlm->spinlock);
817                 mlog(0, "allocating a new resource\n");
818                 /* nothing found and we need to allocate one. */
819                 alloc_mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
820                 if (!alloc_mle)
821                         goto leave;
822                 res = dlm_new_lockres(dlm, lockid, namelen);
823                 if (!res)
824                         goto leave;
825                 goto lookup;
826         }
827
828         mlog(0, "no lockres found, allocated our own: %p\n", res);
829
830         if (flags & LKM_LOCAL) {
831                 /* caller knows it's safe to assume it's not mastered elsewhere
832                  * DONE!  return right away */
833                 spin_lock(&res->spinlock);
834                 dlm_change_lockres_owner(dlm, res, dlm->node_num);
835                 __dlm_insert_lockres(dlm, res);
836                 dlm_lockres_grab_inflight_ref(dlm, res);
837                 spin_unlock(&res->spinlock);
838                 spin_unlock(&dlm->spinlock);
839                 /* lockres still marked IN_PROGRESS */
840                 goto wake_waiters;
841         }
842
843         /* check master list to see if another node has started mastering it */
844         spin_lock(&dlm->master_lock);
845
846         /* if we found a block, wait for lock to be mastered by another node */
847         blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
848         if (blocked) {
849                 int mig;
850                 if (mle->type == DLM_MLE_MASTER) {
851                         mlog(ML_ERROR, "master entry for nonexistent lock!\n");
852                         BUG();
853                 }
854                 mig = (mle->type == DLM_MLE_MIGRATION);
855                 /* if there is a migration in progress, let the migration
856                  * finish before continuing.  we can wait for the absence
857                  * of the MIGRATION mle: either the migrate finished or
858                  * one of the nodes died and the mle was cleaned up.
859                  * if there is a BLOCK here, but it already has a master
860                  * set, we are too late.  the master does not have a ref
861                  * for us in the refmap.  detach the mle and drop it.
862                  * either way, go back to the top and start over. */
863                 if (mig || mle->master != O2NM_MAX_NODES) {
864                         BUG_ON(mig && mle->master == dlm->node_num);
865                         /* we arrived too late.  the master does not
866                          * have a ref for us. retry. */
867                         mlog(0, "%s:%.*s: late on %s\n",
868                              dlm->name, namelen, lockid,
869                              mig ?  "MIGRATION" : "BLOCK");
870                         spin_unlock(&dlm->master_lock);
871                         spin_unlock(&dlm->spinlock);
872
873                         /* master is known, detach */
874                         if (!mig)
875                                 dlm_mle_detach_hb_events(dlm, mle);
876                         dlm_put_mle(mle);
877                         mle = NULL;
878                         /* this is lame, but we can't wait on either
879                          * the mle or lockres waitqueue here */
880                         if (mig)
881                                 msleep(100);
882                         goto lookup;
883                 }
884         } else {
885                 /* go ahead and try to master lock on this node */
886                 mle = alloc_mle;
887                 /* make sure this does not get freed below */
888                 alloc_mle = NULL;
889                 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
890                 set_bit(dlm->node_num, mle->maybe_map);
891                 __dlm_insert_mle(dlm, mle);
892
893                 /* still holding the dlm spinlock, check the recovery map
894                  * to see if there are any nodes that still need to be
895                  * considered.  these will not appear in the mle nodemap
896                  * but they might own this lockres.  wait on them. */
897                 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
898                 if (bit < O2NM_MAX_NODES) {
899                         mlog(0, "%s: res %.*s, At least one node (%d) "
900                              "to recover before lock mastery can begin\n",
901                              dlm->name, namelen, (char *)lockid, bit);
902                         wait_on_recovery = 1;
903                 }
904         }
905
906         /* at this point there is either a DLM_MLE_BLOCK or a
907          * DLM_MLE_MASTER on the master list, so it's safe to add the
908          * lockres to the hashtable.  anyone who finds the lock will
909          * still have to wait on the IN_PROGRESS. */
910
911         /* finally add the lockres to its hash bucket */
912         __dlm_insert_lockres(dlm, res);
913
914         /* since this lockres is new it doesn't not require the spinlock */
915         __dlm_lockres_grab_inflight_ref(dlm, res);
916
917         /* get an extra ref on the mle in case this is a BLOCK
918          * if so, the creator of the BLOCK may try to put the last
919          * ref at this time in the assert master handler, so we
920          * need an extra one to keep from a bad ptr deref. */
921         dlm_get_mle_inuse(mle);
922         spin_unlock(&dlm->master_lock);
923         spin_unlock(&dlm->spinlock);
924
925 redo_request:
926         while (wait_on_recovery) {
927                 /* any cluster changes that occurred after dropping the
928                  * dlm spinlock would be detectable be a change on the mle,
929                  * so we only need to clear out the recovery map once. */
930                 if (dlm_is_recovery_lock(lockid, namelen)) {
931                         mlog(0, "%s: Recovery map is not empty, but must "
932                              "master $RECOVERY lock now\n", dlm->name);
933                         if (!dlm_pre_master_reco_lockres(dlm, res))
934                                 wait_on_recovery = 0;
935                         else {
936                                 mlog(0, "%s: waiting 500ms for heartbeat state "
937                                     "change\n", dlm->name);
938                                 msleep(500);
939                         }
940                         continue;
941                 }
942
943                 dlm_kick_recovery_thread(dlm);
944                 msleep(1000);
945                 dlm_wait_for_recovery(dlm);
946
947                 spin_lock(&dlm->spinlock);
948                 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
949                 if (bit < O2NM_MAX_NODES) {
950                         mlog(0, "%s: res %.*s, At least one node (%d) "
951                              "to recover before lock mastery can begin\n",
952                              dlm->name, namelen, (char *)lockid, bit);
953                         wait_on_recovery = 1;
954                 } else
955                         wait_on_recovery = 0;
956                 spin_unlock(&dlm->spinlock);
957
958                 if (wait_on_recovery)
959                         dlm_wait_for_node_recovery(dlm, bit, 10000);
960         }
961
962         /* must wait for lock to be mastered elsewhere */
963         if (blocked)
964                 goto wait;
965
966         ret = -EINVAL;
967         dlm_node_iter_init(mle->vote_map, &iter);
968         while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
969                 ret = dlm_do_master_request(res, mle, nodenum);
970                 if (ret < 0)
971                         mlog_errno(ret);
972                 if (mle->master != O2NM_MAX_NODES) {
973                         /* found a master ! */
974                         if (mle->master <= nodenum)
975                                 break;
976                         /* if our master request has not reached the master
977                          * yet, keep going until it does.  this is how the
978                          * master will know that asserts are needed back to
979                          * the lower nodes. */
980                         mlog(0, "%s: res %.*s, Requests only up to %u but "
981                              "master is %u, keep going\n", dlm->name, namelen,
982                              lockid, nodenum, mle->master);
983                 }
984         }
985
986 wait:
987         /* keep going until the response map includes all nodes */
988         ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
989         if (ret < 0) {
990                 wait_on_recovery = 1;
991                 mlog(0, "%s: res %.*s, Node map changed, redo the master "
992                      "request now, blocked=%d\n", dlm->name, res->lockname.len,
993                      res->lockname.name, blocked);
994                 if (++tries > 20) {
995                         mlog(ML_ERROR, "%s: res %.*s, Spinning on "
996                              "dlm_wait_for_lock_mastery, blocked = %d\n",
997                              dlm->name, res->lockname.len,
998                              res->lockname.name, blocked);
999                         dlm_print_one_lock_resource(res);
1000                         dlm_print_one_mle(mle);
1001                         tries = 0;
1002                 }
1003                 goto redo_request;
1004         }
1005
1006         mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
1007              res->lockname.name, res->owner);
1008         /* make sure we never continue without this */
1009         BUG_ON(res->owner == O2NM_MAX_NODES);
1010
1011         /* master is known, detach if not already detached */
1012         dlm_mle_detach_hb_events(dlm, mle);
1013         dlm_put_mle(mle);
1014         /* put the extra ref */
1015         dlm_put_mle_inuse(mle);
1016
1017 wake_waiters:
1018         spin_lock(&res->spinlock);
1019         res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
1020         spin_unlock(&res->spinlock);
1021         wake_up(&res->wq);
1022
1023 leave:
1024         /* need to free the unused mle */
1025         if (alloc_mle)
1026                 kmem_cache_free(dlm_mle_cache, alloc_mle);
1027
1028         return res;
1029 }
1030
1031
1032 #define DLM_MASTERY_TIMEOUT_MS   5000
1033
1034 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
1035                                      struct dlm_lock_resource *res,
1036                                      struct dlm_master_list_entry *mle,
1037                                      int *blocked)
1038 {
1039         u8 m;
1040         int ret, bit;
1041         int map_changed, voting_done;
1042         int assert, sleep;
1043
1044 recheck:
1045         ret = 0;
1046         assert = 0;
1047
1048         /* check if another node has already become the owner */
1049         spin_lock(&res->spinlock);
1050         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1051                 mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
1052                      res->lockname.len, res->lockname.name, res->owner);
1053                 spin_unlock(&res->spinlock);
1054                 /* this will cause the master to re-assert across
1055                  * the whole cluster, freeing up mles */
1056                 if (res->owner != dlm->node_num) {
1057                         ret = dlm_do_master_request(res, mle, res->owner);
1058                         if (ret < 0) {
1059                                 /* give recovery a chance to run */
1060                                 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1061                                 msleep(500);
1062                                 goto recheck;
1063                         }
1064                 }
1065                 ret = 0;
1066                 goto leave;
1067         }
1068         spin_unlock(&res->spinlock);
1069
1070         spin_lock(&mle->spinlock);
1071         m = mle->master;
1072         map_changed = (memcmp(mle->vote_map, mle->node_map,
1073                               sizeof(mle->vote_map)) != 0);
1074         voting_done = (memcmp(mle->vote_map, mle->response_map,
1075                              sizeof(mle->vote_map)) == 0);
1076
1077         /* restart if we hit any errors */
1078         if (map_changed) {
1079                 int b;
1080                 mlog(0, "%s: %.*s: node map changed, restarting\n",
1081                      dlm->name, res->lockname.len, res->lockname.name);
1082                 ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1083                 b = (mle->type == DLM_MLE_BLOCK);
1084                 if ((*blocked && !b) || (!*blocked && b)) {
1085                         mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
1086                              dlm->name, res->lockname.len, res->lockname.name,
1087                              *blocked, b);
1088                         *blocked = b;
1089                 }
1090                 spin_unlock(&mle->spinlock);
1091                 if (ret < 0) {
1092                         mlog_errno(ret);
1093                         goto leave;
1094                 }
1095                 mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1096                      "rechecking now\n", dlm->name, res->lockname.len,
1097                      res->lockname.name);
1098                 goto recheck;
1099         } else {
1100                 if (!voting_done) {
1101                         mlog(0, "map not changed and voting not done "
1102                              "for %s:%.*s\n", dlm->name, res->lockname.len,
1103                              res->lockname.name);
1104                 }
1105         }
1106
1107         if (m != O2NM_MAX_NODES) {
1108                 /* another node has done an assert!
1109                  * all done! */
1110                 sleep = 0;
1111         } else {
1112                 sleep = 1;
1113                 /* have all nodes responded? */
1114                 if (voting_done && !*blocked) {
1115                         bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
1116                         if (dlm->node_num <= bit) {
1117                                 /* my node number is lowest.
1118                                  * now tell other nodes that I am
1119                                  * mastering this. */
1120                                 mle->master = dlm->node_num;
1121                                 /* ref was grabbed in get_lock_resource
1122                                  * will be dropped in dlmlock_master */
1123                                 assert = 1;
1124                                 sleep = 0;
1125                         }
1126                         /* if voting is done, but we have not received
1127                          * an assert master yet, we must sleep */
1128                 }
1129         }
1130
1131         spin_unlock(&mle->spinlock);
1132
1133         /* sleep if we haven't finished voting yet */
1134         if (sleep) {
1135                 unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1136
1137                 /*
1138                 if (atomic_read(&mle->mle_refs.refcount) < 2)
1139                         mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
1140                         atomic_read(&mle->mle_refs.refcount),
1141                         res->lockname.len, res->lockname.name);
1142                 */
1143                 atomic_set(&mle->woken, 0);
1144                 (void)wait_event_timeout(mle->wq,
1145                                          (atomic_read(&mle->woken) == 1),
1146                                          timeo);
1147                 if (res->owner == O2NM_MAX_NODES) {
1148                         mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1149                              res->lockname.len, res->lockname.name);
1150                         goto recheck;
1151                 }
1152                 mlog(0, "done waiting, master is %u\n", res->owner);
1153                 ret = 0;
1154                 goto leave;
1155         }
1156
1157         ret = 0;   /* done */
1158         if (assert) {
1159                 m = dlm->node_num;
1160                 mlog(0, "about to master %.*s here, this=%u\n",
1161                      res->lockname.len, res->lockname.name, m);
1162                 ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1163                 if (ret) {
1164                         /* This is a failure in the network path,
1165                          * not in the response to the assert_master
1166                          * (any nonzero response is a BUG on this node).
1167                          * Most likely a socket just got disconnected
1168                          * due to node death. */
1169                         mlog_errno(ret);
1170                 }
1171                 /* no longer need to restart lock mastery.
1172                  * all living nodes have been contacted. */
1173                 ret = 0;
1174         }
1175
1176         /* set the lockres owner */
1177         spin_lock(&res->spinlock);
1178         /* mastery reference obtained either during
1179          * assert_master_handler or in get_lock_resource */
1180         dlm_change_lockres_owner(dlm, res, m);
1181         spin_unlock(&res->spinlock);
1182
1183 leave:
1184         return ret;
1185 }
1186
1187 struct dlm_bitmap_diff_iter
1188 {
1189         int curnode;
1190         unsigned long *orig_bm;
1191         unsigned long *cur_bm;
1192         unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1193 };
1194
1195 enum dlm_node_state_change
1196 {
1197         NODE_DOWN = -1,
1198         NODE_NO_CHANGE = 0,
1199         NODE_UP
1200 };
1201
1202 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1203                                       unsigned long *orig_bm,
1204                                       unsigned long *cur_bm)
1205 {
1206         unsigned long p1, p2;
1207         int i;
1208
1209         iter->curnode = -1;
1210         iter->orig_bm = orig_bm;
1211         iter->cur_bm = cur_bm;
1212
1213         for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1214                 p1 = *(iter->orig_bm + i);
1215                 p2 = *(iter->cur_bm + i);
1216                 iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1217         }
1218 }
1219
1220 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1221                                      enum dlm_node_state_change *state)
1222 {
1223         int bit;
1224
1225         if (iter->curnode >= O2NM_MAX_NODES)
1226                 return -ENOENT;
1227
1228         bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1229                             iter->curnode+1);
1230         if (bit >= O2NM_MAX_NODES) {
1231                 iter->curnode = O2NM_MAX_NODES;
1232                 return -ENOENT;
1233         }
1234
1235         /* if it was there in the original then this node died */
1236         if (test_bit(bit, iter->orig_bm))
1237                 *state = NODE_DOWN;
1238         else
1239                 *state = NODE_UP;
1240
1241         iter->curnode = bit;
1242         return bit;
1243 }
1244
1245
1246 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1247                                     struct dlm_lock_resource *res,
1248                                     struct dlm_master_list_entry *mle,
1249                                     int blocked)
1250 {
1251         struct dlm_bitmap_diff_iter bdi;
1252         enum dlm_node_state_change sc;
1253         int node;
1254         int ret = 0;
1255
1256         mlog(0, "something happened such that the "
1257              "master process may need to be restarted!\n");
1258
1259         assert_spin_locked(&mle->spinlock);
1260
1261         dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1262         node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1263         while (node >= 0) {
1264                 if (sc == NODE_UP) {
1265                         /* a node came up.  clear any old vote from
1266                          * the response map and set it in the vote map
1267                          * then restart the mastery. */
1268                         mlog(ML_NOTICE, "node %d up while restarting\n", node);
1269
1270                         /* redo the master request, but only for the new node */
1271                         mlog(0, "sending request to new node\n");
1272                         clear_bit(node, mle->response_map);
1273                         set_bit(node, mle->vote_map);
1274                 } else {
1275                         mlog(ML_ERROR, "node down! %d\n", node);
1276                         if (blocked) {
1277                                 int lowest = find_next_bit(mle->maybe_map,
1278                                                        O2NM_MAX_NODES, 0);
1279
1280                                 /* act like it was never there */
1281                                 clear_bit(node, mle->maybe_map);
1282
1283                                 if (node == lowest) {
1284                                         mlog(0, "expected master %u died"
1285                                             " while this node was blocked "
1286                                             "waiting on it!\n", node);
1287                                         lowest = find_next_bit(mle->maybe_map,
1288                                                         O2NM_MAX_NODES,
1289                                                         lowest+1);
1290                                         if (lowest < O2NM_MAX_NODES) {
1291                                                 mlog(0, "%s:%.*s:still "
1292                                                      "blocked. waiting on %u "
1293                                                      "now\n", dlm->name,
1294                                                      res->lockname.len,
1295                                                      res->lockname.name,
1296                                                      lowest);
1297                                         } else {
1298                                                 /* mle is an MLE_BLOCK, but
1299                                                  * there is now nothing left to
1300                                                  * block on.  we need to return
1301                                                  * all the way back out and try
1302                                                  * again with an MLE_MASTER.
1303                                                  * dlm_do_local_recovery_cleanup
1304                                                  * has already run, so the mle
1305                                                  * refcount is ok */
1306                                                 mlog(0, "%s:%.*s: no "
1307                                                      "longer blocking. try to "
1308                                                      "master this here\n",
1309                                                      dlm->name,
1310                                                      res->lockname.len,
1311                                                      res->lockname.name);
1312                                                 mle->type = DLM_MLE_MASTER;
1313                                                 mle->mleres = res;
1314                                         }
1315                                 }
1316                         }
1317
1318                         /* now blank out everything, as if we had never
1319                          * contacted anyone */
1320                         memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1321                         memset(mle->response_map, 0, sizeof(mle->response_map));
1322                         /* reset the vote_map to the current node_map */
1323                         memcpy(mle->vote_map, mle->node_map,
1324                                sizeof(mle->node_map));
1325                         /* put myself into the maybe map */
1326                         if (mle->type != DLM_MLE_BLOCK)
1327                                 set_bit(dlm->node_num, mle->maybe_map);
1328                 }
1329                 ret = -EAGAIN;
1330                 node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1331         }
1332         return ret;
1333 }
1334
1335
1336 /*
1337  * DLM_MASTER_REQUEST_MSG
1338  *
1339  * returns: 0 on success,
1340  *          -errno on a network error
1341  *
1342  * on error, the caller should assume the target node is "dead"
1343  *
1344  */
1345
1346 static int dlm_do_master_request(struct dlm_lock_resource *res,
1347                                  struct dlm_master_list_entry *mle, int to)
1348 {
1349         struct dlm_ctxt *dlm = mle->dlm;
1350         struct dlm_master_request request;
1351         int ret, response=0, resend;
1352
1353         memset(&request, 0, sizeof(request));
1354         request.node_idx = dlm->node_num;
1355
1356         BUG_ON(mle->type == DLM_MLE_MIGRATION);
1357
1358         request.namelen = (u8)mle->mnamelen;
1359         memcpy(request.name, mle->mname, request.namelen);
1360
1361 again:
1362         ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1363                                  sizeof(request), to, &response);
1364         if (ret < 0)  {
1365                 if (ret == -ESRCH) {
1366                         /* should never happen */
1367                         mlog(ML_ERROR, "TCP stack not ready!\n");
1368                         BUG();
1369                 } else if (ret == -EINVAL) {
1370                         mlog(ML_ERROR, "bad args passed to o2net!\n");
1371                         BUG();
1372                 } else if (ret == -ENOMEM) {
1373                         mlog(ML_ERROR, "out of memory while trying to send "
1374                              "network message!  retrying\n");
1375                         /* this is totally crude */
1376                         msleep(50);
1377                         goto again;
1378                 } else if (!dlm_is_host_down(ret)) {
1379                         /* not a network error. bad. */
1380                         mlog_errno(ret);
1381                         mlog(ML_ERROR, "unhandled error!");
1382                         BUG();
1383                 }
1384                 /* all other errors should be network errors,
1385                  * and likely indicate node death */
1386                 mlog(ML_ERROR, "link to %d went down!\n", to);
1387                 goto out;
1388         }
1389
1390         ret = 0;
1391         resend = 0;
1392         spin_lock(&mle->spinlock);
1393         switch (response) {
1394                 case DLM_MASTER_RESP_YES:
1395                         set_bit(to, mle->response_map);
1396                         mlog(0, "node %u is the master, response=YES\n", to);
1397                         mlog(0, "%s:%.*s: master node %u now knows I have a "
1398                              "reference\n", dlm->name, res->lockname.len,
1399                              res->lockname.name, to);
1400                         mle->master = to;
1401                         break;
1402                 case DLM_MASTER_RESP_NO:
1403                         mlog(0, "node %u not master, response=NO\n", to);
1404                         set_bit(to, mle->response_map);
1405                         break;
1406                 case DLM_MASTER_RESP_MAYBE:
1407                         mlog(0, "node %u not master, response=MAYBE\n", to);
1408                         set_bit(to, mle->response_map);
1409                         set_bit(to, mle->maybe_map);
1410                         break;
1411                 case DLM_MASTER_RESP_ERROR:
1412                         mlog(0, "node %u hit an error, resending\n", to);
1413                         resend = 1;
1414                         response = 0;
1415                         break;
1416                 default:
1417                         mlog(ML_ERROR, "bad response! %u\n", response);
1418                         BUG();
1419         }
1420         spin_unlock(&mle->spinlock);
1421         if (resend) {
1422                 /* this is also totally crude */
1423                 msleep(50);
1424                 goto again;
1425         }
1426
1427 out:
1428         return ret;
1429 }
1430
1431 /*
1432  * locks that can be taken here:
1433  * dlm->spinlock
1434  * res->spinlock
1435  * mle->spinlock
1436  * dlm->master_list
1437  *
1438  * if possible, TRIM THIS DOWN!!!
1439  */
1440 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
1441                                void **ret_data)
1442 {
1443         u8 response = DLM_MASTER_RESP_MAYBE;
1444         struct dlm_ctxt *dlm = data;
1445         struct dlm_lock_resource *res = NULL;
1446         struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1447         struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1448         char *name;
1449         unsigned int namelen, hash;
1450         int found, ret;
1451         int set_maybe;
1452         int dispatch_assert = 0;
1453         int dispatched = 0;
1454
1455         if (!dlm_grab(dlm))
1456                 return DLM_MASTER_RESP_NO;
1457
1458         if (!dlm_domain_fully_joined(dlm)) {
1459                 response = DLM_MASTER_RESP_NO;
1460                 goto send_response;
1461         }
1462
1463         name = request->name;
1464         namelen = request->namelen;
1465         hash = dlm_lockid_hash(name, namelen);
1466
1467         if (namelen > DLM_LOCKID_NAME_MAX) {
1468                 response = DLM_IVBUFLEN;
1469                 goto send_response;
1470         }
1471
1472 way_up_top:
1473         spin_lock(&dlm->spinlock);
1474         res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1475         if (res) {
1476                 spin_unlock(&dlm->spinlock);
1477
1478                 /* take care of the easy cases up front */
1479                 spin_lock(&res->spinlock);
1480                 if (res->state & (DLM_LOCK_RES_RECOVERING|
1481                                   DLM_LOCK_RES_MIGRATING)) {
1482                         spin_unlock(&res->spinlock);
1483                         mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1484                              "being recovered/migrated\n");
1485                         response = DLM_MASTER_RESP_ERROR;
1486                         if (mle)
1487                                 kmem_cache_free(dlm_mle_cache, mle);
1488                         goto send_response;
1489                 }
1490
1491                 if (res->owner == dlm->node_num) {
1492                         dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
1493                         spin_unlock(&res->spinlock);
1494                         response = DLM_MASTER_RESP_YES;
1495                         if (mle)
1496                                 kmem_cache_free(dlm_mle_cache, mle);
1497
1498                         /* this node is the owner.
1499                          * there is some extra work that needs to
1500                          * happen now.  the requesting node has
1501                          * caused all nodes up to this one to
1502                          * create mles.  this node now needs to
1503                          * go back and clean those up. */
1504                         dispatch_assert = 1;
1505                         goto send_response;
1506                 } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1507                         spin_unlock(&res->spinlock);
1508                         // mlog(0, "node %u is the master\n", res->owner);
1509                         response = DLM_MASTER_RESP_NO;
1510                         if (mle)
1511                                 kmem_cache_free(dlm_mle_cache, mle);
1512                         goto send_response;
1513                 }
1514
1515                 /* ok, there is no owner.  either this node is
1516                  * being blocked, or it is actively trying to
1517                  * master this lock. */
1518                 if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1519                         mlog(ML_ERROR, "lock with no owner should be "
1520                              "in-progress!\n");
1521                         BUG();
1522                 }
1523
1524                 // mlog(0, "lockres is in progress...\n");
1525                 spin_lock(&dlm->master_lock);
1526                 found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1527                 if (!found) {
1528                         mlog(ML_ERROR, "no mle found for this lock!\n");
1529                         BUG();
1530                 }
1531                 set_maybe = 1;
1532                 spin_lock(&tmpmle->spinlock);
1533                 if (tmpmle->type == DLM_MLE_BLOCK) {
1534                         // mlog(0, "this node is waiting for "
1535                         // "lockres to be mastered\n");
1536                         response = DLM_MASTER_RESP_NO;
1537                 } else if (tmpmle->type == DLM_MLE_MIGRATION) {
1538                         mlog(0, "node %u is master, but trying to migrate to "
1539                              "node %u.\n", tmpmle->master, tmpmle->new_master);
1540                         if (tmpmle->master == dlm->node_num) {
1541                                 mlog(ML_ERROR, "no owner on lockres, but this "
1542                                      "node is trying to migrate it to %u?!\n",
1543                                      tmpmle->new_master);
1544                                 BUG();
1545                         } else {
1546                                 /* the real master can respond on its own */
1547                                 response = DLM_MASTER_RESP_NO;
1548                         }
1549                 } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1550                         set_maybe = 0;
1551                         if (tmpmle->master == dlm->node_num) {
1552                                 response = DLM_MASTER_RESP_YES;
1553                                 /* this node will be the owner.
1554                                  * go back and clean the mles on any
1555                                  * other nodes */
1556                                 dispatch_assert = 1;
1557                                 dlm_lockres_set_refmap_bit(dlm, res,
1558                                                            request->node_idx);
1559                         } else
1560                                 response = DLM_MASTER_RESP_NO;
1561                 } else {
1562                         // mlog(0, "this node is attempting to "
1563                         // "master lockres\n");
1564                         response = DLM_MASTER_RESP_MAYBE;
1565                 }
1566                 if (set_maybe)
1567                         set_bit(request->node_idx, tmpmle->maybe_map);
1568                 spin_unlock(&tmpmle->spinlock);
1569
1570                 spin_unlock(&dlm->master_lock);
1571                 spin_unlock(&res->spinlock);
1572
1573                 /* keep the mle attached to heartbeat events */
1574                 dlm_put_mle(tmpmle);
1575                 if (mle)
1576                         kmem_cache_free(dlm_mle_cache, mle);
1577                 goto send_response;
1578         }
1579
1580         /*
1581          * lockres doesn't exist on this node
1582          * if there is an MLE_BLOCK, return NO
1583          * if there is an MLE_MASTER, return MAYBE
1584          * otherwise, add an MLE_BLOCK, return NO
1585          */
1586         spin_lock(&dlm->master_lock);
1587         found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1588         if (!found) {
1589                 /* this lockid has never been seen on this node yet */
1590                 // mlog(0, "no mle found\n");
1591                 if (!mle) {
1592                         spin_unlock(&dlm->master_lock);
1593                         spin_unlock(&dlm->spinlock);
1594
1595                         mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1596                         if (!mle) {
1597                                 response = DLM_MASTER_RESP_ERROR;
1598                                 mlog_errno(-ENOMEM);
1599                                 goto send_response;
1600                         }
1601                         goto way_up_top;
1602                 }
1603
1604                 // mlog(0, "this is second time thru, already allocated, "
1605                 // "add the block.\n");
1606                 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1607                 set_bit(request->node_idx, mle->maybe_map);
1608                 __dlm_insert_mle(dlm, mle);
1609                 response = DLM_MASTER_RESP_NO;
1610         } else {
1611                 // mlog(0, "mle was found\n");
1612                 set_maybe = 1;
1613                 spin_lock(&tmpmle->spinlock);
1614                 if (tmpmle->master == dlm->node_num) {
1615                         mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1616                         BUG();
1617                 }
1618                 if (tmpmle->type == DLM_MLE_BLOCK)
1619                         response = DLM_MASTER_RESP_NO;
1620                 else if (tmpmle->type == DLM_MLE_MIGRATION) {
1621                         mlog(0, "migration mle was found (%u->%u)\n",
1622                              tmpmle->master, tmpmle->new_master);
1623                         /* real master can respond on its own */
1624                         response = DLM_MASTER_RESP_NO;
1625                 } else
1626                         response = DLM_MASTER_RESP_MAYBE;
1627                 if (set_maybe)
1628                         set_bit(request->node_idx, tmpmle->maybe_map);
1629                 spin_unlock(&tmpmle->spinlock);
1630         }
1631         spin_unlock(&dlm->master_lock);
1632         spin_unlock(&dlm->spinlock);
1633
1634         if (found) {
1635                 /* keep the mle attached to heartbeat events */
1636                 dlm_put_mle(tmpmle);
1637         }
1638 send_response:
1639         /*
1640          * __dlm_lookup_lockres() grabbed a reference to this lockres.
1641          * The reference is released by dlm_assert_master_worker() under
1642          * the call to dlm_dispatch_assert_master().  If
1643          * dlm_assert_master_worker() isn't called, we drop it here.
1644          */
1645         if (dispatch_assert) {
1646                 if (response != DLM_MASTER_RESP_YES)
1647                         mlog(ML_ERROR, "invalid response %d\n", response);
1648                 if (!res) {
1649                         mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1650                         BUG();
1651                 }
1652                 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1653                              dlm->node_num, res->lockname.len, res->lockname.name);
1654                 ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1655                                                  DLM_ASSERT_MASTER_MLE_CLEANUP);
1656                 if (ret < 0) {
1657                         mlog(ML_ERROR, "failed to dispatch assert master work\n");
1658                         response = DLM_MASTER_RESP_ERROR;
1659                         dlm_lockres_put(res);
1660                 } else {
1661                         dispatched = 1;
1662                         dlm_lockres_grab_inflight_worker(dlm, res);
1663                 }
1664         } else {
1665                 if (res)
1666                         dlm_lockres_put(res);
1667         }
1668
1669         if (!dispatched)
1670                 dlm_put(dlm);
1671         return response;
1672 }
1673
1674 /*
1675  * DLM_ASSERT_MASTER_MSG
1676  */
1677
1678
1679 /*
1680  * NOTE: this can be used for debugging
1681  * can periodically run all locks owned by this node
1682  * and re-assert across the cluster...
1683  */
1684 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1685                                 struct dlm_lock_resource *res,
1686                                 void *nodemap, u32 flags)
1687 {
1688         struct dlm_assert_master assert;
1689         int to, tmpret;
1690         struct dlm_node_iter iter;
1691         int ret = 0;
1692         int reassert;
1693         const char *lockname = res->lockname.name;
1694         unsigned int namelen = res->lockname.len;
1695
1696         BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1697
1698         spin_lock(&res->spinlock);
1699         res->state |= DLM_LOCK_RES_SETREF_INPROG;
1700         spin_unlock(&res->spinlock);
1701
1702 again:
1703         reassert = 0;
1704
1705         /* note that if this nodemap is empty, it returns 0 */
1706         dlm_node_iter_init(nodemap, &iter);
1707         while ((to = dlm_node_iter_next(&iter)) >= 0) {
1708                 int r = 0;
1709                 struct dlm_master_list_entry *mle = NULL;
1710
1711                 mlog(0, "sending assert master to %d (%.*s)\n", to,
1712                      namelen, lockname);
1713                 memset(&assert, 0, sizeof(assert));
1714                 assert.node_idx = dlm->node_num;
1715                 assert.namelen = namelen;
1716                 memcpy(assert.name, lockname, namelen);
1717                 assert.flags = cpu_to_be32(flags);
1718
1719                 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1720                                             &assert, sizeof(assert), to, &r);
1721                 if (tmpret < 0) {
1722                         mlog(ML_ERROR, "Error %d when sending message %u (key "
1723                              "0x%x) to node %u\n", tmpret,
1724                              DLM_ASSERT_MASTER_MSG, dlm->key, to);
1725                         if (!dlm_is_host_down(tmpret)) {
1726                                 mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1727                                 BUG();
1728                         }
1729                         /* a node died.  finish out the rest of the nodes. */
1730                         mlog(0, "link to %d went down!\n", to);
1731                         /* any nonzero status return will do */
1732                         ret = tmpret;
1733                         r = 0;
1734                 } else if (r < 0) {
1735                         /* ok, something horribly messed.  kill thyself. */
1736                         mlog(ML_ERROR,"during assert master of %.*s to %u, "
1737                              "got %d.\n", namelen, lockname, to, r);
1738                         spin_lock(&dlm->spinlock);
1739                         spin_lock(&dlm->master_lock);
1740                         if (dlm_find_mle(dlm, &mle, (char *)lockname,
1741                                          namelen)) {
1742                                 dlm_print_one_mle(mle);
1743                                 __dlm_put_mle(mle);
1744                         }
1745                         spin_unlock(&dlm->master_lock);
1746                         spin_unlock(&dlm->spinlock);
1747                         BUG();
1748                 }
1749
1750                 if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1751                     !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1752                                 mlog(ML_ERROR, "%.*s: very strange, "
1753                                      "master MLE but no lockres on %u\n",
1754                                      namelen, lockname, to);
1755                 }
1756
1757                 if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1758                         mlog(0, "%.*s: node %u create mles on other "
1759                              "nodes and requests a re-assert\n",
1760                              namelen, lockname, to);
1761                         reassert = 1;
1762                 }
1763                 if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1764                         mlog(0, "%.*s: node %u has a reference to this "
1765                              "lockres, set the bit in the refmap\n",
1766                              namelen, lockname, to);
1767                         spin_lock(&res->spinlock);
1768                         dlm_lockres_set_refmap_bit(dlm, res, to);
1769                         spin_unlock(&res->spinlock);
1770                 }
1771         }
1772
1773         if (reassert)
1774                 goto again;
1775
1776         spin_lock(&res->spinlock);
1777         res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
1778         spin_unlock(&res->spinlock);
1779         wake_up(&res->wq);
1780
1781         return ret;
1782 }
1783
1784 /*
1785  * locks that can be taken here:
1786  * dlm->spinlock
1787  * res->spinlock
1788  * mle->spinlock
1789  * dlm->master_list
1790  *
1791  * if possible, TRIM THIS DOWN!!!
1792  */
1793 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1794                               void **ret_data)
1795 {
1796         struct dlm_ctxt *dlm = data;
1797         struct dlm_master_list_entry *mle = NULL;
1798         struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1799         struct dlm_lock_resource *res = NULL;
1800         char *name;
1801         unsigned int namelen, hash;
1802         u32 flags;
1803         int master_request = 0, have_lockres_ref = 0;
1804         int ret = 0;
1805
1806         if (!dlm_grab(dlm))
1807                 return 0;
1808
1809         name = assert->name;
1810         namelen = assert->namelen;
1811         hash = dlm_lockid_hash(name, namelen);
1812         flags = be32_to_cpu(assert->flags);
1813
1814         if (namelen > DLM_LOCKID_NAME_MAX) {
1815                 mlog(ML_ERROR, "Invalid name length!");
1816                 goto done;
1817         }
1818
1819         spin_lock(&dlm->spinlock);
1820
1821         if (flags)
1822                 mlog(0, "assert_master with flags: %u\n", flags);
1823
1824         /* find the MLE */
1825         spin_lock(&dlm->master_lock);
1826         if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1827                 /* not an error, could be master just re-asserting */
1828                 mlog(0, "just got an assert_master from %u, but no "
1829                      "MLE for it! (%.*s)\n", assert->node_idx,
1830                      namelen, name);
1831         } else {
1832                 int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1833                 if (bit >= O2NM_MAX_NODES) {
1834                         /* not necessarily an error, though less likely.
1835                          * could be master just re-asserting. */
1836                         mlog(0, "no bits set in the maybe_map, but %u "
1837                              "is asserting! (%.*s)\n", assert->node_idx,
1838                              namelen, name);
1839                 } else if (bit != assert->node_idx) {
1840                         if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1841                                 mlog(0, "master %u was found, %u should "
1842                                      "back off\n", assert->node_idx, bit);
1843                         } else {
1844                                 /* with the fix for bug 569, a higher node
1845                                  * number winning the mastery will respond
1846                                  * YES to mastery requests, but this node
1847                                  * had no way of knowing.  let it pass. */
1848                                 mlog(0, "%u is the lowest node, "
1849                                      "%u is asserting. (%.*s)  %u must "
1850                                      "have begun after %u won.\n", bit,
1851                                      assert->node_idx, namelen, name, bit,
1852                                      assert->node_idx);
1853                         }
1854                 }
1855                 if (mle->type == DLM_MLE_MIGRATION) {
1856                         if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1857                                 mlog(0, "%s:%.*s: got cleanup assert"
1858                                      " from %u for migration\n",
1859                                      dlm->name, namelen, name,
1860                                      assert->node_idx);
1861                         } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1862                                 mlog(0, "%s:%.*s: got unrelated assert"
1863                                      " from %u for migration, ignoring\n",
1864                                      dlm->name, namelen, name,
1865                                      assert->node_idx);
1866                                 __dlm_put_mle(mle);
1867                                 spin_unlock(&dlm->master_lock);
1868                                 spin_unlock(&dlm->spinlock);
1869                                 goto done;
1870                         }
1871                 }
1872         }
1873         spin_unlock(&dlm->master_lock);
1874
1875         /* ok everything checks out with the MLE
1876          * now check to see if there is a lockres */
1877         res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1878         if (res) {
1879                 spin_lock(&res->spinlock);
1880                 if (res->state & DLM_LOCK_RES_RECOVERING)  {
1881                         mlog(ML_ERROR, "%u asserting but %.*s is "
1882                              "RECOVERING!\n", assert->node_idx, namelen, name);
1883                         goto kill;
1884                 }
1885                 if (!mle) {
1886                         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1887                             res->owner != assert->node_idx) {
1888                                 mlog(ML_ERROR, "DIE! Mastery assert from %u, "
1889                                      "but current owner is %u! (%.*s)\n",
1890                                      assert->node_idx, res->owner, namelen,
1891                                      name);
1892                                 __dlm_print_one_lock_resource(res);
1893                                 BUG();
1894                         }
1895                 } else if (mle->type != DLM_MLE_MIGRATION) {
1896                         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1897                                 /* owner is just re-asserting */
1898                                 if (res->owner == assert->node_idx) {
1899                                         mlog(0, "owner %u re-asserting on "
1900                                              "lock %.*s\n", assert->node_idx,
1901                                              namelen, name);
1902                                         goto ok;
1903                                 }
1904                                 mlog(ML_ERROR, "got assert_master from "
1905                                      "node %u, but %u is the owner! "
1906                                      "(%.*s)\n", assert->node_idx,
1907                                      res->owner, namelen, name);
1908                                 goto kill;
1909                         }
1910                         if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1911                                 mlog(ML_ERROR, "got assert from %u, but lock "
1912                                      "with no owner should be "
1913                                      "in-progress! (%.*s)\n",
1914                                      assert->node_idx,
1915                                      namelen, name);
1916                                 goto kill;
1917                         }
1918                 } else /* mle->type == DLM_MLE_MIGRATION */ {
1919                         /* should only be getting an assert from new master */
1920                         if (assert->node_idx != mle->new_master) {
1921                                 mlog(ML_ERROR, "got assert from %u, but "
1922                                      "new master is %u, and old master "
1923                                      "was %u (%.*s)\n",
1924                                      assert->node_idx, mle->new_master,
1925                                      mle->master, namelen, name);
1926                                 goto kill;
1927                         }
1928
1929                 }
1930 ok:
1931                 spin_unlock(&res->spinlock);
1932         }
1933
1934         // mlog(0, "woo!  got an assert_master from node %u!\n",
1935         //           assert->node_idx);
1936         if (mle) {
1937                 int extra_ref = 0;
1938                 int nn = -1;
1939                 int rr, err = 0;
1940
1941                 spin_lock(&mle->spinlock);
1942                 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1943                         extra_ref = 1;
1944                 else {
1945                         /* MASTER mle: if any bits set in the response map
1946                          * then the calling node needs to re-assert to clear
1947                          * up nodes that this node contacted */
1948                         while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1949                                                     nn+1)) < O2NM_MAX_NODES) {
1950                                 if (nn != dlm->node_num && nn != assert->node_idx) {
1951                                         master_request = 1;
1952                                         break;
1953                                 }
1954                         }
1955                 }
1956                 mle->master = assert->node_idx;
1957                 atomic_set(&mle->woken, 1);
1958                 wake_up(&mle->wq);
1959                 spin_unlock(&mle->spinlock);
1960
1961                 if (res) {
1962                         int wake = 0;
1963                         spin_lock(&res->spinlock);
1964                         if (mle->type == DLM_MLE_MIGRATION) {
1965                                 mlog(0, "finishing off migration of lockres %.*s, "
1966                                         "from %u to %u\n",
1967                                         res->lockname.len, res->lockname.name,
1968                                         dlm->node_num, mle->new_master);
1969                                 res->state &= ~DLM_LOCK_RES_MIGRATING;
1970                                 wake = 1;
1971                                 dlm_change_lockres_owner(dlm, res, mle->new_master);
1972                                 BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1973                         } else {
1974                                 dlm_change_lockres_owner(dlm, res, mle->master);
1975                         }
1976                         spin_unlock(&res->spinlock);
1977                         have_lockres_ref = 1;
1978                         if (wake)
1979                                 wake_up(&res->wq);
1980                 }
1981
1982                 /* master is known, detach if not already detached.
1983                  * ensures that only one assert_master call will happen
1984                  * on this mle. */
1985                 spin_lock(&dlm->master_lock);
1986
1987                 rr = atomic_read(&mle->mle_refs.refcount);
1988                 if (mle->inuse > 0) {
1989                         if (extra_ref && rr < 3)
1990                                 err = 1;
1991                         else if (!extra_ref && rr < 2)
1992                                 err = 1;
1993                 } else {
1994                         if (extra_ref && rr < 2)
1995                                 err = 1;
1996                         else if (!extra_ref && rr < 1)
1997                                 err = 1;
1998                 }
1999                 if (err) {
2000                         mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
2001                              "that will mess up this node, refs=%d, extra=%d, "
2002                              "inuse=%d\n", dlm->name, namelen, name,
2003                              assert->node_idx, rr, extra_ref, mle->inuse);
2004                         dlm_print_one_mle(mle);
2005                 }
2006                 __dlm_unlink_mle(dlm, mle);
2007                 __dlm_mle_detach_hb_events(dlm, mle);
2008                 __dlm_put_mle(mle);
2009                 if (extra_ref) {
2010                         /* the assert master message now balances the extra
2011                          * ref given by the master / migration request message.
2012                          * if this is the last put, it will be removed
2013                          * from the list. */
2014                         __dlm_put_mle(mle);
2015                 }
2016                 spin_unlock(&dlm->master_lock);
2017         } else if (res) {
2018                 if (res->owner != assert->node_idx) {
2019                         mlog(0, "assert_master from %u, but current "
2020                              "owner is %u (%.*s), no mle\n", assert->node_idx,
2021                              res->owner, namelen, name);
2022                 }
2023         }
2024         spin_unlock(&dlm->spinlock);
2025
2026 done:
2027         ret = 0;
2028         if (res) {
2029                 spin_lock(&res->spinlock);
2030                 res->state |= DLM_LOCK_RES_SETREF_INPROG;
2031                 spin_unlock(&res->spinlock);
2032                 *ret_data = (void *)res;
2033         }
2034         dlm_put(dlm);
2035         if (master_request) {
2036                 mlog(0, "need to tell master to reassert\n");
2037                 /* positive. negative would shoot down the node. */
2038                 ret |= DLM_ASSERT_RESPONSE_REASSERT;
2039                 if (!have_lockres_ref) {
2040                         mlog(ML_ERROR, "strange, got assert from %u, MASTER "
2041                              "mle present here for %s:%.*s, but no lockres!\n",
2042                              assert->node_idx, dlm->name, namelen, name);
2043                 }
2044         }
2045         if (have_lockres_ref) {
2046                 /* let the master know we have a reference to the lockres */
2047                 ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
2048                 mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
2049                      dlm->name, namelen, name, assert->node_idx);
2050         }
2051         return ret;
2052
2053 kill:
2054         /* kill the caller! */
2055         mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
2056              "and killing the other node now!  This node is OK and can continue.\n");
2057         __dlm_print_one_lock_resource(res);
2058         spin_unlock(&res->spinlock);
2059         spin_unlock(&dlm->spinlock);
2060         *ret_data = (void *)res;
2061         dlm_put(dlm);
2062         return -EINVAL;
2063 }
2064
2065 void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2066 {
2067         struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2068
2069         if (ret_data) {
2070                 spin_lock(&res->spinlock);
2071                 res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
2072                 spin_unlock(&res->spinlock);
2073                 wake_up(&res->wq);
2074                 dlm_lockres_put(res);
2075         }
2076         return;
2077 }
2078
2079 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2080                                struct dlm_lock_resource *res,
2081                                int ignore_higher, u8 request_from, u32 flags)
2082 {
2083         struct dlm_work_item *item;
2084         item = kzalloc(sizeof(*item), GFP_ATOMIC);
2085         if (!item)
2086                 return -ENOMEM;
2087
2088
2089         /* queue up work for dlm_assert_master_worker */
2090         dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2091         item->u.am.lockres = res; /* already have a ref */
2092         /* can optionally ignore node numbers higher than this node */
2093         item->u.am.ignore_higher = ignore_higher;
2094         item->u.am.request_from = request_from;
2095         item->u.am.flags = flags;
2096
2097         if (ignore_higher)
2098                 mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
2099                      res->lockname.name);
2100
2101         spin_lock(&dlm->work_lock);
2102         list_add_tail(&item->list, &dlm->work_list);
2103         spin_unlock(&dlm->work_lock);
2104
2105         queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2106         return 0;
2107 }
2108
2109 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2110 {
2111         struct dlm_ctxt *dlm = data;
2112         int ret = 0;
2113         struct dlm_lock_resource *res;
2114         unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
2115         int ignore_higher;
2116         int bit;
2117         u8 request_from;
2118         u32 flags;
2119
2120         dlm = item->dlm;
2121         res = item->u.am.lockres;
2122         ignore_higher = item->u.am.ignore_higher;
2123         request_from = item->u.am.request_from;
2124         flags = item->u.am.flags;
2125
2126         spin_lock(&dlm->spinlock);
2127         memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
2128         spin_unlock(&dlm->spinlock);
2129
2130         clear_bit(dlm->node_num, nodemap);
2131         if (ignore_higher) {
2132                 /* if is this just to clear up mles for nodes below
2133                  * this node, do not send the message to the original
2134                  * caller or any node number higher than this */
2135                 clear_bit(request_from, nodemap);
2136                 bit = dlm->node_num;
2137                 while (1) {
2138                         bit = find_next_bit(nodemap, O2NM_MAX_NODES,
2139                                             bit+1);
2140                         if (bit >= O2NM_MAX_NODES)
2141                                 break;
2142                         clear_bit(bit, nodemap);
2143                 }
2144         }
2145
2146         /*
2147          * If we're migrating this lock to someone else, we are no
2148          * longer allowed to assert out own mastery.  OTOH, we need to
2149          * prevent migration from starting while we're still asserting
2150          * our dominance.  The reserved ast delays migration.
2151          */
2152         spin_lock(&res->spinlock);
2153         if (res->state & DLM_LOCK_RES_MIGRATING) {
2154                 mlog(0, "Someone asked us to assert mastery, but we're "
2155                      "in the middle of migration.  Skipping assert, "
2156                      "the new master will handle that.\n");
2157                 spin_unlock(&res->spinlock);
2158                 goto put;
2159         } else
2160                 __dlm_lockres_reserve_ast(res);
2161         spin_unlock(&res->spinlock);
2162
2163         /* this call now finishes out the nodemap
2164          * even if one or more nodes die */
2165         mlog(0, "worker about to master %.*s here, this=%u\n",
2166                      res->lockname.len, res->lockname.name, dlm->node_num);
2167         ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2168         if (ret < 0) {
2169                 /* no need to restart, we are done */
2170                 if (!dlm_is_host_down(ret))
2171                         mlog_errno(ret);
2172         }
2173
2174         /* Ok, we've asserted ourselves.  Let's let migration start. */
2175         dlm_lockres_release_ast(dlm, res);
2176
2177 put:
2178         dlm_lockres_drop_inflight_worker(dlm, res);
2179
2180         dlm_lockres_put(res);
2181
2182         mlog(0, "finished with dlm_assert_master_worker\n");
2183 }
2184
2185 /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2186  * We cannot wait for node recovery to complete to begin mastering this
2187  * lockres because this lockres is used to kick off recovery! ;-)
2188  * So, do a pre-check on all living nodes to see if any of those nodes
2189  * think that $RECOVERY is currently mastered by a dead node.  If so,
2190  * we wait a short time to allow that node to get notified by its own
2191  * heartbeat stack, then check again.  All $RECOVERY lock resources
2192  * mastered by dead nodes are purged when the hearbeat callback is
2193  * fired, so we can know for sure that it is safe to continue once
2194  * the node returns a live node or no node.  */
2195 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2196                                        struct dlm_lock_resource *res)
2197 {
2198         struct dlm_node_iter iter;
2199         int nodenum;
2200         int ret = 0;
2201         u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2202
2203         spin_lock(&dlm->spinlock);
2204         dlm_node_iter_init(dlm->domain_map, &iter);
2205         spin_unlock(&dlm->spinlock);
2206
2207         while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2208                 /* do not send to self */
2209                 if (nodenum == dlm->node_num)
2210                         continue;
2211                 ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2212                 if (ret < 0) {
2213                         mlog_errno(ret);
2214                         if (!dlm_is_host_down(ret))
2215                                 BUG();
2216                         /* host is down, so answer for that node would be
2217                          * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
2218                         ret = 0;
2219                 }
2220
2221                 if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2222                         /* check to see if this master is in the recovery map */
2223                         spin_lock(&dlm->spinlock);
2224                         if (test_bit(master, dlm->recovery_map)) {
2225                                 mlog(ML_NOTICE, "%s: node %u has not seen "
2226                                      "node %u go down yet, and thinks the "
2227                                      "dead node is mastering the recovery "
2228                                      "lock.  must wait.\n", dlm->name,
2229                                      nodenum, master);
2230                                 ret = -EAGAIN;
2231                         }
2232                         spin_unlock(&dlm->spinlock);
2233                         mlog(0, "%s: reco lock master is %u\n", dlm->name,
2234                              master);
2235                         break;
2236                 }
2237         }
2238         return ret;
2239 }
2240
2241 /*
2242  * DLM_DEREF_LOCKRES_MSG
2243  */
2244
2245 int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2246 {
2247         struct dlm_deref_lockres deref;
2248         int ret = 0, r;
2249         const char *lockname;
2250         unsigned int namelen;
2251
2252         lockname = res->lockname.name;
2253         namelen = res->lockname.len;
2254         BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2255
2256         memset(&deref, 0, sizeof(deref));
2257         deref.node_idx = dlm->node_num;
2258         deref.namelen = namelen;
2259         memcpy(deref.name, lockname, namelen);
2260
2261         ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2262                                  &deref, sizeof(deref), res->owner, &r);
2263         if (ret < 0)
2264                 mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n",
2265                      dlm->name, namelen, lockname, ret, res->owner);
2266         else if (r < 0) {
2267                 /* BAD.  other node says I did not have a ref. */
2268                 mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
2269                      dlm->name, namelen, lockname, res->owner, r);
2270                 dlm_print_one_lock_resource(res);
2271                 BUG();
2272         }
2273         return ret;
2274 }
2275
2276 int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2277                               void **ret_data)
2278 {
2279         struct dlm_ctxt *dlm = data;
2280         struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2281         struct dlm_lock_resource *res = NULL;
2282         char *name;
2283         unsigned int namelen;
2284         int ret = -EINVAL;
2285         u8 node;
2286         unsigned int hash;
2287         struct dlm_work_item *item;
2288         int cleared = 0;
2289         int dispatch = 0;
2290
2291         if (!dlm_grab(dlm))
2292                 return 0;
2293
2294         name = deref->name;
2295         namelen = deref->namelen;
2296         node = deref->node_idx;
2297
2298         if (namelen > DLM_LOCKID_NAME_MAX) {
2299                 mlog(ML_ERROR, "Invalid name length!");
2300                 goto done;
2301         }
2302         if (deref->node_idx >= O2NM_MAX_NODES) {
2303                 mlog(ML_ERROR, "Invalid node number: %u\n", node);
2304                 goto done;
2305         }
2306
2307         hash = dlm_lockid_hash(name, namelen);
2308
2309         spin_lock(&dlm->spinlock);
2310         res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2311         if (!res) {
2312                 spin_unlock(&dlm->spinlock);
2313                 mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2314                      dlm->name, namelen, name);
2315                 goto done;
2316         }
2317         spin_unlock(&dlm->spinlock);
2318
2319         spin_lock(&res->spinlock);
2320         if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2321                 dispatch = 1;
2322         else {
2323                 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2324                 if (test_bit(node, res->refmap)) {
2325                         dlm_lockres_clear_refmap_bit(dlm, res, node);
2326                         cleared = 1;
2327                 }
2328         }
2329         spin_unlock(&res->spinlock);
2330
2331         if (!dispatch) {
2332                 if (cleared)
2333                         dlm_lockres_calc_usage(dlm, res);
2334                 else {
2335                         mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2336                         "but it is already dropped!\n", dlm->name,
2337                         res->lockname.len, res->lockname.name, node);
2338                         dlm_print_one_lock_resource(res);
2339                 }
2340                 ret = 0;
2341                 goto done;
2342         }
2343
2344         item = kzalloc(sizeof(*item), GFP_NOFS);
2345         if (!item) {
2346                 ret = -ENOMEM;
2347                 mlog_errno(ret);
2348                 goto done;
2349         }
2350
2351         dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2352         item->u.dl.deref_res = res;
2353         item->u.dl.deref_node = node;
2354
2355         spin_lock(&dlm->work_lock);
2356         list_add_tail(&item->list, &dlm->work_list);
2357         spin_unlock(&dlm->work_lock);
2358
2359         queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2360         return 0;
2361
2362 done:
2363         if (res)
2364                 dlm_lockres_put(res);
2365         dlm_put(dlm);
2366
2367         return ret;
2368 }
2369
2370 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2371 {
2372         struct dlm_ctxt *dlm;
2373         struct dlm_lock_resource *res;
2374         u8 node;
2375         u8 cleared = 0;
2376
2377         dlm = item->dlm;
2378         res = item->u.dl.deref_res;
2379         node = item->u.dl.deref_node;
2380
2381         spin_lock(&res->spinlock);
2382         BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2383         if (test_bit(node, res->refmap)) {
2384                 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2385                 dlm_lockres_clear_refmap_bit(dlm, res, node);
2386                 cleared = 1;
2387         }
2388         spin_unlock(&res->spinlock);
2389
2390         if (cleared) {
2391                 mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2392                      dlm->name, res->lockname.len, res->lockname.name, node);
2393                 dlm_lockres_calc_usage(dlm, res);
2394         } else {
2395                 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2396                      "but it is already dropped!\n", dlm->name,
2397                      res->lockname.len, res->lockname.name, node);
2398                 dlm_print_one_lock_resource(res);
2399         }
2400
2401         dlm_lockres_put(res);
2402 }
2403
2404 /*
2405  * A migrateable resource is one that is :
2406  * 1. locally mastered, and,
2407  * 2. zero local locks, and,
2408  * 3. one or more non-local locks, or, one or more references
2409  * Returns 1 if yes, 0 if not.
2410  */
2411 static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
2412                                       struct dlm_lock_resource *res)
2413 {
2414         enum dlm_lockres_list idx;
2415         int nonlocal = 0, node_ref;
2416         struct list_head *queue;
2417         struct dlm_lock *lock;
2418         u64 cookie;
2419
2420         assert_spin_locked(&res->spinlock);
2421
2422         /* delay migration when the lockres is in MIGRATING state */
2423         if (res->state & DLM_LOCK_RES_MIGRATING)
2424                 return 0;
2425
2426         if (res->owner != dlm->node_num)
2427                 return 0;
2428
2429         for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2430                 queue = dlm_list_idx_to_ptr(res, idx);
2431                 list_for_each_entry(lock, queue, list) {
2432                         if (lock->ml.node != dlm->node_num) {
2433                                 nonlocal++;
2434                                 continue;
2435                         }
2436                         cookie = be64_to_cpu(lock->ml.cookie);
2437                         mlog(0, "%s: Not migrateable res %.*s, lock %u:%llu on "
2438                              "%s list\n", dlm->name, res->lockname.len,
2439                              res->lockname.name,
2440                              dlm_get_lock_cookie_node(cookie),
2441                              dlm_get_lock_cookie_seq(cookie),
2442                              dlm_list_in_text(idx));
2443                         return 0;
2444                 }
2445         }
2446
2447         if (!nonlocal) {
2448                 node_ref = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
2449                 if (node_ref >= O2NM_MAX_NODES)
2450                         return 0;
2451         }
2452
2453         mlog(0, "%s: res %.*s, Migrateable\n", dlm->name, res->lockname.len,
2454              res->lockname.name);
2455
2456         return 1;
2457 }
2458
2459 /*
2460  * DLM_MIGRATE_LOCKRES
2461  */
2462
2463
2464 static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2465                                struct dlm_lock_resource *res, u8 target)
2466 {
2467         struct dlm_master_list_entry *mle = NULL;
2468         struct dlm_master_list_entry *oldmle = NULL;
2469         struct dlm_migratable_lockres *mres = NULL;
2470         int ret = 0;
2471         const char *name;
2472         unsigned int namelen;
2473         int mle_added = 0;
2474         int wake = 0;
2475
2476         if (!dlm_grab(dlm))
2477                 return -EINVAL;
2478
2479         BUG_ON(target == O2NM_MAX_NODES);
2480
2481         name = res->lockname.name;
2482         namelen = res->lockname.len;
2483
2484         mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name,
2485              target);
2486
2487         /* preallocate up front. if this fails, abort */
2488         ret = -ENOMEM;
2489         mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2490         if (!mres) {
2491                 mlog_errno(ret);
2492                 goto leave;
2493         }
2494
2495         mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
2496         if (!mle) {
2497                 mlog_errno(ret);
2498                 goto leave;
2499         }
2500         ret = 0;
2501
2502         /*
2503          * clear any existing master requests and
2504          * add the migration mle to the list
2505          */
2506         spin_lock(&dlm->spinlock);
2507         spin_lock(&dlm->master_lock);
2508         ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2509                                     namelen, target, dlm->node_num);
2510         /* get an extra reference on the mle.
2511          * otherwise the assert_master from the new
2512          * master will destroy this.
2513          */
2514         dlm_get_mle_inuse(mle);
2515         spin_unlock(&dlm->master_lock);
2516         spin_unlock(&dlm->spinlock);
2517
2518         if (ret == -EEXIST) {
2519                 mlog(0, "another process is already migrating it\n");
2520                 goto fail;
2521         }
2522         mle_added = 1;
2523
2524         /*
2525          * set the MIGRATING flag and flush asts
2526          * if we fail after this we need to re-dirty the lockres
2527          */
2528         if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2529                 mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2530                      "the target went down.\n", res->lockname.len,
2531                      res->lockname.name, target);
2532                 spin_lock(&res->spinlock);
2533                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2534                 wake = 1;
2535                 spin_unlock(&res->spinlock);
2536                 ret = -EINVAL;
2537         }
2538
2539 fail:
2540         if (oldmle) {
2541                 /* master is known, detach if not already detached */
2542                 dlm_mle_detach_hb_events(dlm, oldmle);
2543                 dlm_put_mle(oldmle);
2544         }
2545
2546         if (ret < 0) {
2547                 if (mle_added) {
2548                         dlm_mle_detach_hb_events(dlm, mle);
2549                         dlm_put_mle(mle);
2550                         dlm_put_mle_inuse(mle);
2551                 } else if (mle) {
2552                         kmem_cache_free(dlm_mle_cache, mle);
2553                         mle = NULL;
2554                 }
2555                 goto leave;
2556         }
2557
2558         /*
2559          * at this point, we have a migration target, an mle
2560          * in the master list, and the MIGRATING flag set on
2561          * the lockres
2562          */
2563
2564         /* now that remote nodes are spinning on the MIGRATING flag,
2565          * ensure that all assert_master work is flushed. */
2566         flush_workqueue(dlm->dlm_worker);
2567
2568         /* notify new node and send all lock state */
2569         /* call send_one_lockres with migration flag.
2570          * this serves as notice to the target node that a
2571          * migration is starting. */
2572         ret = dlm_send_one_lockres(dlm, res, mres, target,
2573                                    DLM_MRES_MIGRATION);
2574
2575         if (ret < 0) {
2576                 mlog(0, "migration to node %u failed with %d\n",
2577                      target, ret);
2578                 /* migration failed, detach and clean up mle */
2579                 dlm_mle_detach_hb_events(dlm, mle);
2580                 dlm_put_mle(mle);
2581                 dlm_put_mle_inuse(mle);
2582                 spin_lock(&res->spinlock);
2583                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2584                 wake = 1;
2585                 spin_unlock(&res->spinlock);
2586                 if (dlm_is_host_down(ret))
2587                         dlm_wait_for_node_death(dlm, target,
2588                                                 DLM_NODE_DEATH_WAIT_MAX);
2589                 goto leave;
2590         }
2591
2592         /* at this point, the target sends a message to all nodes,
2593          * (using dlm_do_migrate_request).  this node is skipped since
2594          * we had to put an mle in the list to begin the process.  this
2595          * node now waits for target to do an assert master.  this node
2596          * will be the last one notified, ensuring that the migration
2597          * is complete everywhere.  if the target dies while this is
2598          * going on, some nodes could potentially see the target as the
2599          * master, so it is important that my recovery finds the migration
2600          * mle and sets the master to UNKNOWN. */
2601
2602
2603         /* wait for new node to assert master */
2604         while (1) {
2605                 ret = wait_event_interruptible_timeout(mle->wq,
2606                                         (atomic_read(&mle->woken) == 1),
2607                                         msecs_to_jiffies(5000));
2608
2609                 if (ret >= 0) {
2610                         if (atomic_read(&mle->woken) == 1 ||
2611                             res->owner == target)
2612                                 break;
2613
2614                         mlog(0, "%s:%.*s: timed out during migration\n",
2615                              dlm->name, res->lockname.len, res->lockname.name);
2616                         /* avoid hang during shutdown when migrating lockres
2617                          * to a node which also goes down */
2618                         if (dlm_is_node_dead(dlm, target)) {
2619                                 mlog(0, "%s:%.*s: expected migration "
2620                                      "target %u is no longer up, restarting\n",
2621                                      dlm->name, res->lockname.len,
2622                                      res->lockname.name, target);
2623                                 ret = -EINVAL;
2624                                 /* migration failed, detach and clean up mle */
2625                                 dlm_mle_detach_hb_events(dlm, mle);
2626                                 dlm_put_mle(mle);
2627                                 dlm_put_mle_inuse(mle);
2628                                 spin_lock(&res->spinlock);
2629                                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2630                                 wake = 1;
2631                                 spin_unlock(&res->spinlock);
2632                                 goto leave;
2633                         }
2634                 } else
2635                         mlog(0, "%s:%.*s: caught signal during migration\n",
2636                              dlm->name, res->lockname.len, res->lockname.name);
2637         }
2638
2639         /* all done, set the owner, clear the flag */
2640         spin_lock(&res->spinlock);
2641         dlm_set_lockres_owner(dlm, res, target);
2642         res->state &= ~DLM_LOCK_RES_MIGRATING;
2643         dlm_remove_nonlocal_locks(dlm, res);
2644         spin_unlock(&res->spinlock);
2645         wake_up(&res->wq);
2646
2647         /* master is known, detach if not already detached */
2648         dlm_mle_detach_hb_events(dlm, mle);
2649         dlm_put_mle_inuse(mle);
2650         ret = 0;
2651
2652         dlm_lockres_calc_usage(dlm, res);
2653
2654 leave:
2655         /* re-dirty the lockres if we failed */
2656         if (ret < 0)
2657                 dlm_kick_thread(dlm, res);
2658
2659         /* wake up waiters if the MIGRATING flag got set
2660          * but migration failed */
2661         if (wake)
2662                 wake_up(&res->wq);
2663
2664         if (mres)
2665                 free_page((unsigned long)mres);
2666
2667         dlm_put(dlm);
2668
2669         mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen,
2670              name, target, ret);
2671         return ret;
2672 }
2673
2674 #define DLM_MIGRATION_RETRY_MS  100
2675
2676 /*
2677  * Should be called only after beginning the domain leave process.
2678  * There should not be any remaining locks on nonlocal lock resources,
2679  * and there should be no local locks left on locally mastered resources.
2680  *
2681  * Called with the dlm spinlock held, may drop it to do migration, but
2682  * will re-acquire before exit.
2683  *
2684  * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped
2685  */
2686 int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2687 {
2688         int ret;
2689         int lock_dropped = 0;
2690         u8 target = O2NM_MAX_NODES;
2691
2692         assert_spin_locked(&dlm->spinlock);
2693
2694         spin_lock(&res->spinlock);
2695         if (dlm_is_lockres_migrateable(dlm, res))
2696                 target = dlm_pick_migration_target(dlm, res);
2697         spin_unlock(&res->spinlock);
2698
2699         if (target == O2NM_MAX_NODES)
2700                 goto leave;
2701
2702         /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2703         spin_unlock(&dlm->spinlock);
2704         lock_dropped = 1;
2705         ret = dlm_migrate_lockres(dlm, res, target);
2706         if (ret)
2707                 mlog(0, "%s: res %.*s, Migrate to node %u failed with %d\n",
2708                      dlm->name, res->lockname.len, res->lockname.name,
2709                      target, ret);
2710         spin_lock(&dlm->spinlock);
2711 leave:
2712         return lock_dropped;
2713 }
2714
2715 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2716 {
2717         int ret;
2718         spin_lock(&dlm->ast_lock);
2719         spin_lock(&lock->spinlock);
2720         ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2721         spin_unlock(&lock->spinlock);
2722         spin_unlock(&dlm->ast_lock);
2723         return ret;
2724 }
2725
2726 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2727                                      struct dlm_lock_resource *res,
2728                                      u8 mig_target)
2729 {
2730         int can_proceed;
2731         spin_lock(&res->spinlock);
2732         can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2733         spin_unlock(&res->spinlock);
2734
2735         /* target has died, so make the caller break out of the
2736          * wait_event, but caller must recheck the domain_map */
2737         spin_lock(&dlm->spinlock);
2738         if (!test_bit(mig_target, dlm->domain_map))
2739                 can_proceed = 1;
2740         spin_unlock(&dlm->spinlock);
2741         return can_proceed;
2742 }
2743
2744 static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2745                                 struct dlm_lock_resource *res)
2746 {
2747         int ret;
2748         spin_lock(&res->spinlock);
2749         ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2750         spin_unlock(&res->spinlock);
2751         return ret;
2752 }
2753
2754
2755 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2756                                        struct dlm_lock_resource *res,
2757                                        u8 target)
2758 {
2759         int ret = 0;
2760
2761         mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2762                res->lockname.len, res->lockname.name, dlm->node_num,
2763                target);
2764         /* need to set MIGRATING flag on lockres.  this is done by
2765          * ensuring that all asts have been flushed for this lockres. */
2766         spin_lock(&res->spinlock);
2767         BUG_ON(res->migration_pending);
2768         res->migration_pending = 1;
2769         /* strategy is to reserve an extra ast then release
2770          * it below, letting the release do all of the work */
2771         __dlm_lockres_reserve_ast(res);
2772         spin_unlock(&res->spinlock);
2773
2774         /* now flush all the pending asts */
2775         dlm_kick_thread(dlm, res);
2776         /* before waiting on DIRTY, block processes which may
2777          * try to dirty the lockres before MIGRATING is set */
2778         spin_lock(&res->spinlock);
2779         BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
2780         res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
2781         spin_unlock(&res->spinlock);
2782         /* now wait on any pending asts and the DIRTY state */
2783         wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2784         dlm_lockres_release_ast(dlm, res);
2785
2786         mlog(0, "about to wait on migration_wq, dirty=%s\n",
2787                res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2788         /* if the extra ref we just put was the final one, this
2789          * will pass thru immediately.  otherwise, we need to wait
2790          * for the last ast to finish. */
2791 again:
2792         ret = wait_event_interruptible_timeout(dlm->migration_wq,
2793                    dlm_migration_can_proceed(dlm, res, target),
2794                    msecs_to_jiffies(1000));
2795         if (ret < 0) {
2796                 mlog(0, "woken again: migrating? %s, dead? %s\n",
2797                        res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2798                        test_bit(target, dlm->domain_map) ? "no":"yes");
2799         } else {
2800                 mlog(0, "all is well: migrating? %s, dead? %s\n",
2801                        res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2802                        test_bit(target, dlm->domain_map) ? "no":"yes");
2803         }
2804         if (!dlm_migration_can_proceed(dlm, res, target)) {
2805                 mlog(0, "trying again...\n");
2806                 goto again;
2807         }
2808
2809         ret = 0;
2810         /* did the target go down or die? */
2811         spin_lock(&dlm->spinlock);
2812         if (!test_bit(target, dlm->domain_map)) {
2813                 mlog(ML_ERROR, "aha. migration target %u just went down\n",
2814                      target);
2815                 ret = -EHOSTDOWN;
2816         }
2817         spin_unlock(&dlm->spinlock);
2818
2819         /*
2820          * if target is down, we need to clear DLM_LOCK_RES_BLOCK_DIRTY for
2821          * another try; otherwise, we are sure the MIGRATING state is there,
2822          * drop the unneded state which blocked threads trying to DIRTY
2823          */
2824         spin_lock(&res->spinlock);
2825         BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
2826         res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
2827         if (!ret)
2828                 BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2829         spin_unlock(&res->spinlock);
2830
2831         /*
2832          * at this point:
2833          *
2834          *   o the DLM_LOCK_RES_MIGRATING flag is set if target not down
2835          *   o there are no pending asts on this lockres
2836          *   o all processes trying to reserve an ast on this
2837          *     lockres must wait for the MIGRATING flag to clear
2838          */
2839         return ret;
2840 }
2841
2842 /* last step in the migration process.
2843  * original master calls this to free all of the dlm_lock
2844  * structures that used to be for other nodes. */
2845 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2846                                       struct dlm_lock_resource *res)
2847 {
2848         struct list_head *queue = &res->granted;
2849         int i, bit;
2850         struct dlm_lock *lock, *next;
2851
2852         assert_spin_locked(&res->spinlock);
2853
2854         BUG_ON(res->owner == dlm->node_num);
2855
2856         for (i=0; i<3; i++) {
2857                 list_for_each_entry_safe(lock, next, queue, list) {
2858                         if (lock->ml.node != dlm->node_num) {
2859                                 mlog(0, "putting lock for node %u\n",
2860                                      lock->ml.node);
2861                                 /* be extra careful */
2862                                 BUG_ON(!list_empty(&lock->ast_list));
2863                                 BUG_ON(!list_empty(&lock->bast_list));
2864                                 BUG_ON(lock->ast_pending);
2865                                 BUG_ON(lock->bast_pending);
2866                                 dlm_lockres_clear_refmap_bit(dlm, res,
2867                                                              lock->ml.node);
2868                                 list_del_init(&lock->list);
2869                                 dlm_lock_put(lock);
2870                                 /* In a normal unlock, we would have added a
2871                                  * DLM_UNLOCK_FREE_LOCK action. Force it. */
2872                                 dlm_lock_put(lock);
2873                         }
2874                 }
2875                 queue++;
2876         }
2877         bit = 0;
2878         while (1) {
2879                 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2880                 if (bit >= O2NM_MAX_NODES)
2881                         break;
2882                 /* do not clear the local node reference, if there is a
2883                  * process holding this, let it drop the ref itself */
2884                 if (bit != dlm->node_num) {
2885                         mlog(0, "%s:%.*s: node %u had a ref to this "
2886                              "migrating lockres, clearing\n", dlm->name,
2887                              res->lockname.len, res->lockname.name, bit);
2888                         dlm_lockres_clear_refmap_bit(dlm, res, bit);
2889                 }
2890                 bit++;
2891         }
2892 }
2893
2894 /*
2895  * Pick a node to migrate the lock resource to. This function selects a
2896  * potential target based first on the locks and then on refmap. It skips
2897  * nodes that are in the process of exiting the domain.
2898  */
2899 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2900                                     struct dlm_lock_resource *res)
2901 {
2902         enum dlm_lockres_list idx;
2903         struct list_head *queue = &res->granted;
2904         struct dlm_lock *lock;
2905         int noderef;
2906         u8 nodenum = O2NM_MAX_NODES;
2907
2908         assert_spin_locked(&dlm->spinlock);
2909         assert_spin_locked(&res->spinlock);
2910
2911         /* Go through all the locks */
2912         for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2913                 queue = dlm_list_idx_to_ptr(res, idx);
2914                 list_for_each_entry(lock, queue, list) {
2915                         if (lock->ml.node == dlm->node_num)
2916                                 continue;
2917                         if (test_bit(lock->ml.node, dlm->exit_domain_map))
2918                                 continue;
2919                         nodenum = lock->ml.node;
2920                         goto bail;
2921                 }
2922         }
2923
2924         /* Go thru the refmap */
2925         noderef = -1;
2926         while (1) {
2927                 noderef = find_next_bit(res->refmap, O2NM_MAX_NODES,
2928                                         noderef + 1);
2929                 if (noderef >= O2NM_MAX_NODES)
2930                         break;
2931                 if (noderef == dlm->node_num)
2932                         continue;
2933                 if (test_bit(noderef, dlm->exit_domain_map))
2934                         continue;
2935                 nodenum = noderef;
2936                 goto bail;
2937         }
2938
2939 bail:
2940         return nodenum;
2941 }
2942
2943 /* this is called by the new master once all lockres
2944  * data has been received */
2945 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2946                                   struct dlm_lock_resource *res,
2947                                   u8 master, u8 new_master,
2948                                   struct dlm_node_iter *iter)
2949 {
2950         struct dlm_migrate_request migrate;
2951         int ret, skip, status = 0;
2952         int nodenum;
2953
2954         memset(&migrate, 0, sizeof(migrate));
2955         migrate.namelen = res->lockname.len;
2956         memcpy(migrate.name, res->lockname.name, migrate.namelen);
2957         migrate.new_master = new_master;
2958         migrate.master = master;
2959
2960         ret = 0;
2961
2962         /* send message to all nodes, except the master and myself */
2963         while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
2964                 if (nodenum == master ||
2965                     nodenum == new_master)
2966                         continue;
2967
2968                 /* We could race exit domain. If exited, skip. */
2969                 spin_lock(&dlm->spinlock);
2970                 skip = (!test_bit(nodenum, dlm->domain_map));
2971                 spin_unlock(&dlm->spinlock);
2972                 if (skip) {
2973                         clear_bit(nodenum, iter->node_map);
2974                         continue;
2975                 }
2976
2977                 ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
2978                                          &migrate, sizeof(migrate), nodenum,
2979                                          &status);
2980                 if (ret < 0) {
2981                         mlog(ML_ERROR, "%s: res %.*s, Error %d send "
2982                              "MIGRATE_REQUEST to node %u\n", dlm->name,
2983                              migrate.namelen, migrate.name, ret, nodenum);
2984                         if (!dlm_is_host_down(ret)) {
2985                                 mlog(ML_ERROR, "unhandled error=%d!\n", ret);
2986                                 BUG();
2987                         }
2988                         clear_bit(nodenum, iter->node_map);
2989                         ret = 0;
2990                 } else if (status < 0) {
2991                         mlog(0, "migrate request (node %u) returned %d!\n",
2992                              nodenum, status);
2993                         ret = status;
2994                 } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
2995                         /* during the migration request we short-circuited
2996                          * the mastery of the lockres.  make sure we have
2997                          * a mastery ref for nodenum */
2998                         mlog(0, "%s:%.*s: need ref for node %u\n",
2999                              dlm->name, res->lockname.len, res->lockname.name,
3000                              nodenum);
3001                         spin_lock(&res->spinlock);
3002                         dlm_lockres_set_refmap_bit(dlm, res, nodenum);
3003                         spin_unlock(&res->spinlock);
3004                 }
3005         }
3006
3007         if (ret < 0)
3008                 mlog_errno(ret);
3009
3010         mlog(0, "returning ret=%d\n", ret);
3011         return ret;
3012 }
3013
3014
3015 /* if there is an existing mle for this lockres, we now know who the master is.
3016  * (the one who sent us *this* message) we can clear it up right away.
3017  * since the process that put the mle on the list still has a reference to it,
3018  * we can unhash it now, set the master and wake the process.  as a result,
3019  * we will have no mle in the list to start with.  now we can add an mle for
3020  * the migration and this should be the only one found for those scanning the
3021  * list.  */
3022 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3023                                 void **ret_data)
3024 {
3025         struct dlm_ctxt *dlm = data;
3026         struct dlm_lock_resource *res = NULL;
3027         struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
3028         struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
3029         const char *name;
3030         unsigned int namelen, hash;
3031         int ret = 0;
3032
3033         if (!dlm_grab(dlm))
3034                 return -EINVAL;
3035
3036         name = migrate->name;
3037         namelen = migrate->namelen;
3038         hash = dlm_lockid_hash(name, namelen);
3039
3040         /* preallocate.. if this fails, abort */
3041         mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
3042
3043         if (!mle) {
3044                 ret = -ENOMEM;
3045                 goto leave;
3046         }
3047
3048         /* check for pre-existing lock */
3049         spin_lock(&dlm->spinlock);
3050         res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3051         if (res) {
3052                 spin_lock(&res->spinlock);
3053                 if (res->state & DLM_LOCK_RES_RECOVERING) {
3054                         /* if all is working ok, this can only mean that we got
3055                         * a migrate request from a node that we now see as
3056                         * dead.  what can we do here?  drop it to the floor? */
3057                         spin_unlock(&res->spinlock);
3058                         mlog(ML_ERROR, "Got a migrate request, but the "
3059                              "lockres is marked as recovering!");
3060                         kmem_cache_free(dlm_mle_cache, mle);
3061                         ret = -EINVAL; /* need a better solution */
3062                         goto unlock;
3063                 }
3064                 res->state |= DLM_LOCK_RES_MIGRATING;
3065                 spin_unlock(&res->spinlock);
3066         }
3067
3068         spin_lock(&dlm->master_lock);
3069         /* ignore status.  only nonzero status would BUG. */
3070         ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3071                                     name, namelen,
3072                                     migrate->new_master,
3073                                     migrate->master);
3074
3075         spin_unlock(&dlm->master_lock);
3076 unlock:
3077         spin_unlock(&dlm->spinlock);
3078
3079         if (oldmle) {
3080                 /* master is known, detach if not already detached */
3081                 dlm_mle_detach_hb_events(dlm, oldmle);
3082                 dlm_put_mle(oldmle);
3083         }
3084
3085         if (res)
3086                 dlm_lockres_put(res);
3087 leave:
3088         dlm_put(dlm);
3089         return ret;
3090 }
3091
3092 /* must be holding dlm->spinlock and dlm->master_lock
3093  * when adding a migration mle, we can clear any other mles
3094  * in the master list because we know with certainty that
3095  * the master is "master".  so we remove any old mle from
3096  * the list after setting it's master field, and then add
3097  * the new migration mle.  this way we can hold with the rule
3098  * of having only one mle for a given lock name at all times. */
3099 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3100                                  struct dlm_lock_resource *res,
3101                                  struct dlm_master_list_entry *mle,
3102                                  struct dlm_master_list_entry **oldmle,
3103                                  const char *name, unsigned int namelen,
3104                                  u8 new_master, u8 master)
3105 {
3106         int found;
3107         int ret = 0;
3108
3109         *oldmle = NULL;
3110
3111         assert_spin_locked(&dlm->spinlock);
3112         assert_spin_locked(&dlm->master_lock);
3113
3114         /* caller is responsible for any ref taken here on oldmle */
3115         found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3116         if (found) {
3117                 struct dlm_master_list_entry *tmp = *oldmle;
3118                 spin_lock(&tmp->spinlock);
3119                 if (tmp->type == DLM_MLE_MIGRATION) {
3120                         if (master == dlm->node_num) {
3121                                 /* ah another process raced me to it */
3122                                 mlog(0, "tried to migrate %.*s, but some "
3123                                      "process beat me to it\n",
3124                                      namelen, name);
3125                                 ret = -EEXIST;
3126                         } else {
3127                                 /* bad.  2 NODES are trying to migrate! */
3128                                 mlog(ML_ERROR, "migration error  mle: "
3129                                      "master=%u new_master=%u // request: "
3130                                      "master=%u new_master=%u // "
3131                                      "lockres=%.*s\n",
3132                                      tmp->master, tmp->new_master,
3133                                      master, new_master,
3134                                      namelen, name);
3135                                 BUG();
3136                         }
3137                 } else {
3138                         /* this is essentially what assert_master does */
3139                         tmp->master = master;
3140                         atomic_set(&tmp->woken, 1);
3141                         wake_up(&tmp->wq);
3142                         /* remove it so that only one mle will be found */
3143                         __dlm_unlink_mle(dlm, tmp);
3144                         __dlm_mle_detach_hb_events(dlm, tmp);
3145                         if (tmp->type == DLM_MLE_MASTER) {
3146                                 ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3147                                 mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3148                                                 "telling master to get ref "
3149                                                 "for cleared out mle during "
3150                                                 "migration\n", dlm->name,
3151                                                 namelen, name, master,
3152                                                 new_master);
3153                         }
3154                 }
3155                 spin_unlock(&tmp->spinlock);
3156         }
3157
3158         /* now add a migration mle to the tail of the list */
3159         dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3160         mle->new_master = new_master;
3161         /* the new master will be sending an assert master for this.
3162          * at that point we will get the refmap reference */
3163         mle->master = master;
3164         /* do this for consistency with other mle types */
3165         set_bit(new_master, mle->maybe_map);
3166         __dlm_insert_mle(dlm, mle);
3167
3168         return ret;
3169 }
3170
3171 /*
3172  * Sets the owner of the lockres, associated to the mle, to UNKNOWN
3173  */
3174 static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3175                                         struct dlm_master_list_entry *mle)
3176 {
3177         struct dlm_lock_resource *res;
3178
3179         /* Find the lockres associated to the mle and set its owner to UNK */
3180         res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
3181                                    mle->mnamehash);
3182         if (res) {
3183                 spin_unlock(&dlm->master_lock);
3184
3185                 /* move lockres onto recovery list */
3186                 spin_lock(&res->spinlock);
3187                 dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3188                 dlm_move_lockres_to_recovery_list(dlm, res);
3189                 spin_unlock(&res->spinlock);
3190                 dlm_lockres_put(res);
3191
3192                 /* about to get rid of mle, detach from heartbeat */
3193                 __dlm_mle_detach_hb_events(dlm, mle);
3194
3195                 /* dump the mle */
3196                 spin_lock(&dlm->master_lock);
3197                 __dlm_put_mle(mle);
3198                 spin_unlock(&dlm->master_lock);
3199         }
3200
3201         return res;
3202 }
3203
3204 static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3205                                     struct dlm_master_list_entry *mle)
3206 {
3207         __dlm_mle_detach_hb_events(dlm, mle);
3208
3209         spin_lock(&mle->spinlock);
3210         __dlm_unlink_mle(dlm, mle);
3211         atomic_set(&mle->woken, 1);
3212         spin_unlock(&mle->spinlock);
3213
3214         wake_up(&mle->wq);
3215 }
3216
3217 static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3218                                 struct dlm_master_list_entry *mle, u8 dead_node)
3219 {
3220         int bit;
3221
3222         BUG_ON(mle->type != DLM_MLE_BLOCK);
3223
3224         spin_lock(&mle->spinlock);
3225         bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
3226         if (bit != dead_node) {
3227                 mlog(0, "mle found, but dead node %u would not have been "
3228                      "master\n", dead_node);
3229                 spin_unlock(&mle->spinlock);
3230         } else {
3231                 /* Must drop the refcount by one since the assert_master will
3232                  * never arrive. This may result in the mle being unlinked and
3233                  * freed, but there may still be a process waiting in the
3234                  * dlmlock path which is fine. */
3235                 mlog(0, "node %u was expected master\n", dead_node);
3236                 atomic_set(&mle->woken, 1);
3237                 spin_unlock(&mle->spinlock);
3238                 wake_up(&mle->wq);
3239
3240                 /* Do not need events any longer, so detach from heartbeat */
3241                 __dlm_mle_detach_hb_events(dlm, mle);
3242                 __dlm_put_mle(mle);
3243         }
3244 }
3245
3246 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3247 {
3248         struct dlm_master_list_entry *mle;
3249         struct dlm_lock_resource *res;
3250         struct hlist_head *bucket;
3251         struct hlist_node *tmp;
3252         unsigned int i;
3253
3254         mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node);
3255 top:
3256         assert_spin_locked(&dlm->spinlock);
3257
3258         /* clean the master list */
3259         spin_lock(&dlm->master_lock);
3260         for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3261                 bucket = dlm_master_hash(dlm, i);
3262                 hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3263                         BUG_ON(mle->type != DLM_MLE_BLOCK &&
3264                                mle->type != DLM_MLE_MASTER &&
3265                                mle->type != DLM_MLE_MIGRATION);
3266
3267                         /* MASTER mles are initiated locally. The waiting
3268                          * process will notice the node map change shortly.
3269                          * Let that happen as normal. */
3270                         if (mle->type == DLM_MLE_MASTER)
3271                                 continue;
3272
3273                         /* BLOCK mles are initiated by other nodes. Need to
3274                          * clean up if the dead node would have been the
3275                          * master. */
3276                         if (mle->type == DLM_MLE_BLOCK) {
3277                                 dlm_clean_block_mle(dlm, mle, dead_node);
3278                                 continue;
3279                         }
3280
3281                         /* Everything else is a MIGRATION mle */
3282
3283                         /* The rule for MIGRATION mles is that the master
3284                          * becomes UNKNOWN if *either* the original or the new
3285                          * master dies. All UNKNOWN lockres' are sent to
3286                          * whichever node becomes the recovery master. The new
3287                          * master is responsible for determining if there is
3288                          * still a master for this lockres, or if he needs to
3289                          * take over mastery. Either way, this node should
3290                          * expect another message to resolve this. */
3291
3292                         if (mle->master != dead_node &&
3293                             mle->new_master != dead_node)
3294                                 continue;
3295
3296                         if (mle->new_master == dead_node && mle->inuse) {
3297                                 mlog(ML_NOTICE, "%s: target %u died during "
3298                                                 "migration from %u, the MLE is "
3299                                                 "still keep used, ignore it!\n",
3300                                                 dlm->name, dead_node,
3301                                                 mle->master);
3302                                 continue;
3303                         }
3304
3305                         /* If we have reached this point, this mle needs to be
3306                          * removed from the list and freed. */
3307                         dlm_clean_migration_mle(dlm, mle);
3308
3309                         mlog(0, "%s: node %u died during migration from "
3310                              "%u to %u!\n", dlm->name, dead_node, mle->master,
3311                              mle->new_master);
3312
3313                         /* If we find a lockres associated with the mle, we've
3314                          * hit this rare case that messes up our lock ordering.
3315                          * If so, we need to drop the master lock so that we can
3316                          * take the lockres lock, meaning that we will have to
3317                          * restart from the head of list. */
3318                         res = dlm_reset_mleres_owner(dlm, mle);
3319                         if (res)
3320                                 /* restart */
3321                                 goto top;
3322
3323                         /* This may be the last reference */
3324                         __dlm_put_mle(mle);
3325                 }
3326         }
3327         spin_unlock(&dlm->master_lock);
3328 }
3329
3330 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3331                          u8 old_master)
3332 {
3333         struct dlm_node_iter iter;
3334         int ret = 0;
3335
3336         spin_lock(&dlm->spinlock);
3337         dlm_node_iter_init(dlm->domain_map, &iter);
3338         clear_bit(old_master, iter.node_map);
3339         clear_bit(dlm->node_num, iter.node_map);
3340         spin_unlock(&dlm->spinlock);
3341
3342         /* ownership of the lockres is changing.  account for the
3343          * mastery reference here since old_master will briefly have
3344          * a reference after the migration completes */
3345         spin_lock(&res->spinlock);
3346         dlm_lockres_set_refmap_bit(dlm, res, old_master);
3347         spin_unlock(&res->spinlock);
3348
3349         mlog(0, "now time to do a migrate request to other nodes\n");
3350         ret = dlm_do_migrate_request(dlm, res, old_master,
3351                                      dlm->node_num, &iter);
3352         if (ret < 0) {
3353                 mlog_errno(ret);
3354                 goto leave;
3355         }
3356
3357         mlog(0, "doing assert master of %.*s to all except the original node\n",
3358              res->lockname.len, res->lockname.name);
3359         /* this call now finishes out the nodemap
3360          * even if one or more nodes die */
3361         ret = dlm_do_assert_master(dlm, res, iter.node_map,
3362                                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
3363         if (ret < 0) {
3364                 /* no longer need to retry.  all living nodes contacted. */
3365                 mlog_errno(ret);
3366                 ret = 0;
3367         }
3368
3369         memset(iter.node_map, 0, sizeof(iter.node_map));
3370         set_bit(old_master, iter.node_map);
3371         mlog(0, "doing assert master of %.*s back to %u\n",
3372              res->lockname.len, res->lockname.name, old_master);
3373         ret = dlm_do_assert_master(dlm, res, iter.node_map,
3374                                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
3375         if (ret < 0) {
3376                 mlog(0, "assert master to original master failed "
3377                      "with %d.\n", ret);
3378                 /* the only nonzero status here would be because of
3379                  * a dead original node.  we're done. */
3380                 ret = 0;
3381         }
3382
3383         /* all done, set the owner, clear the flag */
3384         spin_lock(&res->spinlock);
3385         dlm_set_lockres_owner(dlm, res, dlm->node_num);
3386         res->state &= ~DLM_LOCK_RES_MIGRATING;
3387         spin_unlock(&res->spinlock);
3388         /* re-dirty it on the new master */
3389         dlm_kick_thread(dlm, res);
3390         wake_up(&res->wq);
3391 leave:
3392         return ret;
3393 }
3394
3395 /*
3396  * LOCKRES AST REFCOUNT
3397  * this is integral to migration
3398  */
3399
3400 /* for future intent to call an ast, reserve one ahead of time.
3401  * this should be called only after waiting on the lockres
3402  * with dlm_wait_on_lockres, and while still holding the
3403  * spinlock after the call. */
3404 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
3405 {
3406         assert_spin_locked(&res->spinlock);
3407         if (res->state & DLM_LOCK_RES_MIGRATING) {
3408                 __dlm_print_one_lock_resource(res);
3409         }
3410         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3411
3412         atomic_inc(&res->asts_reserved);
3413 }
3414
3415 /*
3416  * used to drop the reserved ast, either because it went unused,
3417  * or because the ast/bast was actually called.
3418  *
3419  * also, if there is a pending migration on this lockres,
3420  * and this was the last pending ast on the lockres,
3421  * atomically set the MIGRATING flag before we drop the lock.
3422  * this is how we ensure that migration can proceed with no
3423  * asts in progress.  note that it is ok if the state of the
3424  * queues is such that a lock should be granted in the future
3425  * or that a bast should be fired, because the new master will
3426  * shuffle the lists on this lockres as soon as it is migrated.
3427  */
3428 void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3429                              struct dlm_lock_resource *res)
3430 {
3431         if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
3432                 return;
3433
3434         if (!res->migration_pending) {
3435                 spin_unlock(&res->spinlock);
3436                 return;
3437         }
3438
3439         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3440         res->migration_pending = 0;
3441         res->state |= DLM_LOCK_RES_MIGRATING;
3442         spin_unlock(&res->spinlock);
3443         wake_up(&res->wq);
3444         wake_up(&dlm->migration_wq);
3445 }
3446
3447 void dlm_force_free_mles(struct dlm_ctxt *dlm)
3448 {
3449         int i;
3450         struct hlist_head *bucket;
3451         struct dlm_master_list_entry *mle;
3452         struct hlist_node *tmp;
3453
3454         /*
3455          * We notified all other nodes that we are exiting the domain and
3456          * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
3457          * around we force free them and wake any processes that are waiting
3458          * on the mles
3459          */
3460         spin_lock(&dlm->spinlock);
3461         spin_lock(&dlm->master_lock);
3462
3463         BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
3464         BUG_ON((find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 0) < O2NM_MAX_NODES));
3465
3466         for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3467                 bucket = dlm_master_hash(dlm, i);
3468                 hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3469                         if (mle->type != DLM_MLE_BLOCK) {
3470                                 mlog(ML_ERROR, "bad mle: %p\n", mle);
3471                                 dlm_print_one_mle(mle);
3472                         }
3473                         atomic_set(&mle->woken, 1);
3474                         wake_up(&mle->wq);
3475
3476                         __dlm_unlink_mle(dlm, mle);
3477                         __dlm_mle_detach_hb_events(dlm, mle);
3478                         __dlm_put_mle(mle);
3479                 }
3480         }
3481         spin_unlock(&dlm->master_lock);
3482         spin_unlock(&dlm->spinlock);
3483 }