Linux-libre 3.5.4-gnu1
[librecmc/linux-libre.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n = simple_strtol(buf, NULL, 0);
39
40         ls = dlm_find_lockspace_local(ls->ls_local_handle);
41         if (!ls)
42                 return -EINVAL;
43
44         switch (n) {
45         case 0:
46                 dlm_ls_stop(ls);
47                 break;
48         case 1:
49                 dlm_ls_start(ls);
50                 break;
51         default:
52                 ret = -EINVAL;
53         }
54         dlm_put_lockspace(ls);
55         return ret;
56 }
57
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62         wake_up(&ls->ls_uevent_wait);
63         return len;
64 }
65
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74         return len;
75 }
76
77 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
78 {
79         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
80 }
81
82 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
83 {
84         int val = simple_strtoul(buf, NULL, 0);
85         if (val == 1)
86                 set_bit(LSFL_NODIR, &ls->ls_flags);
87         return len;
88 }
89
90 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
91 {
92         uint32_t status = dlm_recover_status(ls);
93         return snprintf(buf, PAGE_SIZE, "%x\n", status);
94 }
95
96 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
97 {
98         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
99 }
100
101 struct dlm_attr {
102         struct attribute attr;
103         ssize_t (*show)(struct dlm_ls *, char *);
104         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
105 };
106
107 static struct dlm_attr dlm_attr_control = {
108         .attr  = {.name = "control", .mode = S_IWUSR},
109         .store = dlm_control_store
110 };
111
112 static struct dlm_attr dlm_attr_event = {
113         .attr  = {.name = "event_done", .mode = S_IWUSR},
114         .store = dlm_event_store
115 };
116
117 static struct dlm_attr dlm_attr_id = {
118         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
119         .show  = dlm_id_show,
120         .store = dlm_id_store
121 };
122
123 static struct dlm_attr dlm_attr_nodir = {
124         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
125         .show  = dlm_nodir_show,
126         .store = dlm_nodir_store
127 };
128
129 static struct dlm_attr dlm_attr_recover_status = {
130         .attr  = {.name = "recover_status", .mode = S_IRUGO},
131         .show  = dlm_recover_status_show
132 };
133
134 static struct dlm_attr dlm_attr_recover_nodeid = {
135         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
136         .show  = dlm_recover_nodeid_show
137 };
138
139 static struct attribute *dlm_attrs[] = {
140         &dlm_attr_control.attr,
141         &dlm_attr_event.attr,
142         &dlm_attr_id.attr,
143         &dlm_attr_nodir.attr,
144         &dlm_attr_recover_status.attr,
145         &dlm_attr_recover_nodeid.attr,
146         NULL,
147 };
148
149 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
150                              char *buf)
151 {
152         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
153         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
154         return a->show ? a->show(ls, buf) : 0;
155 }
156
157 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
158                               const char *buf, size_t len)
159 {
160         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
161         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
162         return a->store ? a->store(ls, buf, len) : len;
163 }
164
165 static void lockspace_kobj_release(struct kobject *k)
166 {
167         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
168         kfree(ls);
169 }
170
171 static const struct sysfs_ops dlm_attr_ops = {
172         .show  = dlm_attr_show,
173         .store = dlm_attr_store,
174 };
175
176 static struct kobj_type dlm_ktype = {
177         .default_attrs = dlm_attrs,
178         .sysfs_ops     = &dlm_attr_ops,
179         .release       = lockspace_kobj_release,
180 };
181
182 static struct kset *dlm_kset;
183
184 static int do_uevent(struct dlm_ls *ls, int in)
185 {
186         int error;
187
188         if (in)
189                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
190         else
191                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
192
193         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
194
195         /* dlm_controld will see the uevent, do the necessary group management
196            and then write to sysfs to wake us */
197
198         error = wait_event_interruptible(ls->ls_uevent_wait,
199                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
200
201         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
202
203         if (error)
204                 goto out;
205
206         error = ls->ls_uevent_result;
207  out:
208         if (error)
209                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
210                           error, ls->ls_uevent_result);
211         return error;
212 }
213
214 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
215                       struct kobj_uevent_env *env)
216 {
217         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
218
219         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
220         return 0;
221 }
222
223 static struct kset_uevent_ops dlm_uevent_ops = {
224         .uevent = dlm_uevent,
225 };
226
227 int __init dlm_lockspace_init(void)
228 {
229         ls_count = 0;
230         mutex_init(&ls_lock);
231         INIT_LIST_HEAD(&lslist);
232         spin_lock_init(&lslist_lock);
233
234         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
235         if (!dlm_kset) {
236                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
237                 return -ENOMEM;
238         }
239         return 0;
240 }
241
242 void dlm_lockspace_exit(void)
243 {
244         kset_unregister(dlm_kset);
245 }
246
247 static struct dlm_ls *find_ls_to_scan(void)
248 {
249         struct dlm_ls *ls;
250
251         spin_lock(&lslist_lock);
252         list_for_each_entry(ls, &lslist, ls_list) {
253                 if (time_after_eq(jiffies, ls->ls_scan_time +
254                                             dlm_config.ci_scan_secs * HZ)) {
255                         spin_unlock(&lslist_lock);
256                         return ls;
257                 }
258         }
259         spin_unlock(&lslist_lock);
260         return NULL;
261 }
262
263 static int dlm_scand(void *data)
264 {
265         struct dlm_ls *ls;
266
267         while (!kthread_should_stop()) {
268                 ls = find_ls_to_scan();
269                 if (ls) {
270                         if (dlm_lock_recovery_try(ls)) {
271                                 ls->ls_scan_time = jiffies;
272                                 dlm_scan_rsbs(ls);
273                                 dlm_scan_timeout(ls);
274                                 dlm_scan_waiters(ls);
275                                 dlm_unlock_recovery(ls);
276                         } else {
277                                 ls->ls_scan_time += HZ;
278                         }
279                         continue;
280                 }
281                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
282         }
283         return 0;
284 }
285
286 static int dlm_scand_start(void)
287 {
288         struct task_struct *p;
289         int error = 0;
290
291         p = kthread_run(dlm_scand, NULL, "dlm_scand");
292         if (IS_ERR(p))
293                 error = PTR_ERR(p);
294         else
295                 scand_task = p;
296         return error;
297 }
298
299 static void dlm_scand_stop(void)
300 {
301         kthread_stop(scand_task);
302 }
303
304 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
305 {
306         struct dlm_ls *ls;
307
308         spin_lock(&lslist_lock);
309
310         list_for_each_entry(ls, &lslist, ls_list) {
311                 if (ls->ls_global_id == id) {
312                         ls->ls_count++;
313                         goto out;
314                 }
315         }
316         ls = NULL;
317  out:
318         spin_unlock(&lslist_lock);
319         return ls;
320 }
321
322 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
323 {
324         struct dlm_ls *ls;
325
326         spin_lock(&lslist_lock);
327         list_for_each_entry(ls, &lslist, ls_list) {
328                 if (ls->ls_local_handle == lockspace) {
329                         ls->ls_count++;
330                         goto out;
331                 }
332         }
333         ls = NULL;
334  out:
335         spin_unlock(&lslist_lock);
336         return ls;
337 }
338
339 struct dlm_ls *dlm_find_lockspace_device(int minor)
340 {
341         struct dlm_ls *ls;
342
343         spin_lock(&lslist_lock);
344         list_for_each_entry(ls, &lslist, ls_list) {
345                 if (ls->ls_device.minor == minor) {
346                         ls->ls_count++;
347                         goto out;
348                 }
349         }
350         ls = NULL;
351  out:
352         spin_unlock(&lslist_lock);
353         return ls;
354 }
355
356 void dlm_put_lockspace(struct dlm_ls *ls)
357 {
358         spin_lock(&lslist_lock);
359         ls->ls_count--;
360         spin_unlock(&lslist_lock);
361 }
362
363 static void remove_lockspace(struct dlm_ls *ls)
364 {
365         for (;;) {
366                 spin_lock(&lslist_lock);
367                 if (ls->ls_count == 0) {
368                         WARN_ON(ls->ls_create_count != 0);
369                         list_del(&ls->ls_list);
370                         spin_unlock(&lslist_lock);
371                         return;
372                 }
373                 spin_unlock(&lslist_lock);
374                 ssleep(1);
375         }
376 }
377
378 static int threads_start(void)
379 {
380         int error;
381
382         error = dlm_scand_start();
383         if (error) {
384                 log_print("cannot start dlm_scand thread %d", error);
385                 goto fail;
386         }
387
388         /* Thread for sending/receiving messages for all lockspace's */
389         error = dlm_lowcomms_start();
390         if (error) {
391                 log_print("cannot start dlm lowcomms %d", error);
392                 goto scand_fail;
393         }
394
395         return 0;
396
397  scand_fail:
398         dlm_scand_stop();
399  fail:
400         return error;
401 }
402
403 static void threads_stop(void)
404 {
405         dlm_scand_stop();
406         dlm_lowcomms_stop();
407 }
408
409 static int new_lockspace(const char *name, const char *cluster,
410                          uint32_t flags, int lvblen,
411                          const struct dlm_lockspace_ops *ops, void *ops_arg,
412                          int *ops_result, dlm_lockspace_t **lockspace)
413 {
414         struct dlm_ls *ls;
415         int i, size, error;
416         int do_unreg = 0;
417         int namelen = strlen(name);
418
419         if (namelen > DLM_LOCKSPACE_LEN)
420                 return -EINVAL;
421
422         if (!lvblen || (lvblen % 8))
423                 return -EINVAL;
424
425         if (!try_module_get(THIS_MODULE))
426                 return -EINVAL;
427
428         if (!dlm_user_daemon_available()) {
429                 log_print("dlm user daemon not available");
430                 error = -EUNATCH;
431                 goto out;
432         }
433
434         if (ops && ops_result) {
435                 if (!dlm_config.ci_recover_callbacks)
436                         *ops_result = -EOPNOTSUPP;
437                 else
438                         *ops_result = 0;
439         }
440
441         if (dlm_config.ci_recover_callbacks && cluster &&
442             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
443                 log_print("dlm cluster name %s mismatch %s",
444                           dlm_config.ci_cluster_name, cluster);
445                 error = -EBADR;
446                 goto out;
447         }
448
449         error = 0;
450
451         spin_lock(&lslist_lock);
452         list_for_each_entry(ls, &lslist, ls_list) {
453                 WARN_ON(ls->ls_create_count <= 0);
454                 if (ls->ls_namelen != namelen)
455                         continue;
456                 if (memcmp(ls->ls_name, name, namelen))
457                         continue;
458                 if (flags & DLM_LSFL_NEWEXCL) {
459                         error = -EEXIST;
460                         break;
461                 }
462                 ls->ls_create_count++;
463                 *lockspace = ls;
464                 error = 1;
465                 break;
466         }
467         spin_unlock(&lslist_lock);
468
469         if (error)
470                 goto out;
471
472         error = -ENOMEM;
473
474         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
475         if (!ls)
476                 goto out;
477         memcpy(ls->ls_name, name, namelen);
478         ls->ls_namelen = namelen;
479         ls->ls_lvblen = lvblen;
480         ls->ls_count = 0;
481         ls->ls_flags = 0;
482         ls->ls_scan_time = jiffies;
483
484         if (ops && dlm_config.ci_recover_callbacks) {
485                 ls->ls_ops = ops;
486                 ls->ls_ops_arg = ops_arg;
487         }
488
489         if (flags & DLM_LSFL_TIMEWARN)
490                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
491
492         /* ls_exflags are forced to match among nodes, and we don't
493            need to require all nodes to have some flags set */
494         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
495                                     DLM_LSFL_NEWEXCL));
496
497         size = dlm_config.ci_rsbtbl_size;
498         ls->ls_rsbtbl_size = size;
499
500         ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
501         if (!ls->ls_rsbtbl)
502                 goto out_lsfree;
503         for (i = 0; i < size; i++) {
504                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
505                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
506                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
507         }
508
509         idr_init(&ls->ls_lkbidr);
510         spin_lock_init(&ls->ls_lkbidr_spin);
511
512         size = dlm_config.ci_dirtbl_size;
513         ls->ls_dirtbl_size = size;
514
515         ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size);
516         if (!ls->ls_dirtbl)
517                 goto out_lkbfree;
518         for (i = 0; i < size; i++) {
519                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
520                 spin_lock_init(&ls->ls_dirtbl[i].lock);
521         }
522
523         INIT_LIST_HEAD(&ls->ls_waiters);
524         mutex_init(&ls->ls_waiters_mutex);
525         INIT_LIST_HEAD(&ls->ls_orphans);
526         mutex_init(&ls->ls_orphans_mutex);
527         INIT_LIST_HEAD(&ls->ls_timeout);
528         mutex_init(&ls->ls_timeout_mutex);
529
530         INIT_LIST_HEAD(&ls->ls_new_rsb);
531         spin_lock_init(&ls->ls_new_rsb_spin);
532
533         INIT_LIST_HEAD(&ls->ls_nodes);
534         INIT_LIST_HEAD(&ls->ls_nodes_gone);
535         ls->ls_num_nodes = 0;
536         ls->ls_low_nodeid = 0;
537         ls->ls_total_weight = 0;
538         ls->ls_node_array = NULL;
539
540         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
541         ls->ls_stub_rsb.res_ls = ls;
542
543         ls->ls_debug_rsb_dentry = NULL;
544         ls->ls_debug_waiters_dentry = NULL;
545
546         init_waitqueue_head(&ls->ls_uevent_wait);
547         ls->ls_uevent_result = 0;
548         init_completion(&ls->ls_members_done);
549         ls->ls_members_result = -1;
550
551         mutex_init(&ls->ls_cb_mutex);
552         INIT_LIST_HEAD(&ls->ls_cb_delay);
553
554         ls->ls_recoverd_task = NULL;
555         mutex_init(&ls->ls_recoverd_active);
556         spin_lock_init(&ls->ls_recover_lock);
557         spin_lock_init(&ls->ls_rcom_spin);
558         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
559         ls->ls_recover_status = 0;
560         ls->ls_recover_seq = 0;
561         ls->ls_recover_args = NULL;
562         init_rwsem(&ls->ls_in_recovery);
563         init_rwsem(&ls->ls_recv_active);
564         INIT_LIST_HEAD(&ls->ls_requestqueue);
565         mutex_init(&ls->ls_requestqueue_mutex);
566         mutex_init(&ls->ls_clear_proc_locks);
567
568         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
569         if (!ls->ls_recover_buf)
570                 goto out_dirfree;
571
572         ls->ls_slot = 0;
573         ls->ls_num_slots = 0;
574         ls->ls_slots_size = 0;
575         ls->ls_slots = NULL;
576
577         INIT_LIST_HEAD(&ls->ls_recover_list);
578         spin_lock_init(&ls->ls_recover_list_lock);
579         ls->ls_recover_list_count = 0;
580         ls->ls_local_handle = ls;
581         init_waitqueue_head(&ls->ls_wait_general);
582         INIT_LIST_HEAD(&ls->ls_root_list);
583         init_rwsem(&ls->ls_root_sem);
584
585         down_write(&ls->ls_in_recovery);
586
587         spin_lock(&lslist_lock);
588         ls->ls_create_count = 1;
589         list_add(&ls->ls_list, &lslist);
590         spin_unlock(&lslist_lock);
591
592         if (flags & DLM_LSFL_FS) {
593                 error = dlm_callback_start(ls);
594                 if (error) {
595                         log_error(ls, "can't start dlm_callback %d", error);
596                         goto out_delist;
597                 }
598         }
599
600         /* needs to find ls in lslist */
601         error = dlm_recoverd_start(ls);
602         if (error) {
603                 log_error(ls, "can't start dlm_recoverd %d", error);
604                 goto out_callback;
605         }
606
607         ls->ls_kobj.kset = dlm_kset;
608         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
609                                      "%s", ls->ls_name);
610         if (error)
611                 goto out_recoverd;
612         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
613
614         /* let kobject handle freeing of ls if there's an error */
615         do_unreg = 1;
616
617         /* This uevent triggers dlm_controld in userspace to add us to the
618            group of nodes that are members of this lockspace (managed by the
619            cluster infrastructure.)  Once it's done that, it tells us who the
620            current lockspace members are (via configfs) and then tells the
621            lockspace to start running (via sysfs) in dlm_ls_start(). */
622
623         error = do_uevent(ls, 1);
624         if (error)
625                 goto out_recoverd;
626
627         wait_for_completion(&ls->ls_members_done);
628         error = ls->ls_members_result;
629         if (error)
630                 goto out_members;
631
632         dlm_create_debug_file(ls);
633
634         log_debug(ls, "join complete");
635         *lockspace = ls;
636         return 0;
637
638  out_members:
639         do_uevent(ls, 0);
640         dlm_clear_members(ls);
641         kfree(ls->ls_node_array);
642  out_recoverd:
643         dlm_recoverd_stop(ls);
644  out_callback:
645         dlm_callback_stop(ls);
646  out_delist:
647         spin_lock(&lslist_lock);
648         list_del(&ls->ls_list);
649         spin_unlock(&lslist_lock);
650         kfree(ls->ls_recover_buf);
651  out_dirfree:
652         vfree(ls->ls_dirtbl);
653  out_lkbfree:
654         idr_destroy(&ls->ls_lkbidr);
655         vfree(ls->ls_rsbtbl);
656  out_lsfree:
657         if (do_unreg)
658                 kobject_put(&ls->ls_kobj);
659         else
660                 kfree(ls);
661  out:
662         module_put(THIS_MODULE);
663         return error;
664 }
665
666 int dlm_new_lockspace(const char *name, const char *cluster,
667                       uint32_t flags, int lvblen,
668                       const struct dlm_lockspace_ops *ops, void *ops_arg,
669                       int *ops_result, dlm_lockspace_t **lockspace)
670 {
671         int error = 0;
672
673         mutex_lock(&ls_lock);
674         if (!ls_count)
675                 error = threads_start();
676         if (error)
677                 goto out;
678
679         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
680                               ops_result, lockspace);
681         if (!error)
682                 ls_count++;
683         if (error > 0)
684                 error = 0;
685         if (!ls_count)
686                 threads_stop();
687  out:
688         mutex_unlock(&ls_lock);
689         return error;
690 }
691
692 static int lkb_idr_is_local(int id, void *p, void *data)
693 {
694         struct dlm_lkb *lkb = p;
695
696         if (!lkb->lkb_nodeid)
697                 return 1;
698         return 0;
699 }
700
701 static int lkb_idr_is_any(int id, void *p, void *data)
702 {
703         return 1;
704 }
705
706 static int lkb_idr_free(int id, void *p, void *data)
707 {
708         struct dlm_lkb *lkb = p;
709
710         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
711                 dlm_free_lvb(lkb->lkb_lvbptr);
712
713         dlm_free_lkb(lkb);
714         return 0;
715 }
716
717 /* NOTE: We check the lkbidr here rather than the resource table.
718    This is because there may be LKBs queued as ASTs that have been unlinked
719    from their RSBs and are pending deletion once the AST has been delivered */
720
721 static int lockspace_busy(struct dlm_ls *ls, int force)
722 {
723         int rv;
724
725         spin_lock(&ls->ls_lkbidr_spin);
726         if (force == 0) {
727                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
728         } else if (force == 1) {
729                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
730         } else {
731                 rv = 0;
732         }
733         spin_unlock(&ls->ls_lkbidr_spin);
734         return rv;
735 }
736
737 static int release_lockspace(struct dlm_ls *ls, int force)
738 {
739         struct dlm_rsb *rsb;
740         struct rb_node *n;
741         int i, busy, rv;
742
743         busy = lockspace_busy(ls, force);
744
745         spin_lock(&lslist_lock);
746         if (ls->ls_create_count == 1) {
747                 if (busy) {
748                         rv = -EBUSY;
749                 } else {
750                         /* remove_lockspace takes ls off lslist */
751                         ls->ls_create_count = 0;
752                         rv = 0;
753                 }
754         } else if (ls->ls_create_count > 1) {
755                 rv = --ls->ls_create_count;
756         } else {
757                 rv = -EINVAL;
758         }
759         spin_unlock(&lslist_lock);
760
761         if (rv) {
762                 log_debug(ls, "release_lockspace no remove %d", rv);
763                 return rv;
764         }
765
766         dlm_device_deregister(ls);
767
768         if (force < 3 && dlm_user_daemon_available())
769                 do_uevent(ls, 0);
770
771         dlm_recoverd_stop(ls);
772
773         dlm_callback_stop(ls);
774
775         remove_lockspace(ls);
776
777         dlm_delete_debug_file(ls);
778
779         kfree(ls->ls_recover_buf);
780
781         /*
782          * Free direntry structs.
783          */
784
785         dlm_dir_clear(ls);
786         vfree(ls->ls_dirtbl);
787
788         /*
789          * Free all lkb's in idr
790          */
791
792         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
793         idr_remove_all(&ls->ls_lkbidr);
794         idr_destroy(&ls->ls_lkbidr);
795
796         /*
797          * Free all rsb's on rsbtbl[] lists
798          */
799
800         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
801                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
802                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
803                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
804                         dlm_free_rsb(rsb);
805                 }
806
807                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
808                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
809                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
810                         dlm_free_rsb(rsb);
811                 }
812         }
813
814         vfree(ls->ls_rsbtbl);
815
816         while (!list_empty(&ls->ls_new_rsb)) {
817                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
818                                        res_hashchain);
819                 list_del(&rsb->res_hashchain);
820                 dlm_free_rsb(rsb);
821         }
822
823         /*
824          * Free structures on any other lists
825          */
826
827         dlm_purge_requestqueue(ls);
828         kfree(ls->ls_recover_args);
829         dlm_clear_free_entries(ls);
830         dlm_clear_members(ls);
831         dlm_clear_members_gone(ls);
832         kfree(ls->ls_node_array);
833         log_debug(ls, "release_lockspace final free");
834         kobject_put(&ls->ls_kobj);
835         /* The ls structure will be freed when the kobject is done with */
836
837         module_put(THIS_MODULE);
838         return 0;
839 }
840
841 /*
842  * Called when a system has released all its locks and is not going to use the
843  * lockspace any longer.  We free everything we're managing for this lockspace.
844  * Remaining nodes will go through the recovery process as if we'd died.  The
845  * lockspace must continue to function as usual, participating in recoveries,
846  * until this returns.
847  *
848  * Force has 4 possible values:
849  * 0 - don't destroy locksapce if it has any LKBs
850  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
851  * 2 - destroy lockspace regardless of LKBs
852  * 3 - destroy lockspace as part of a forced shutdown
853  */
854
855 int dlm_release_lockspace(void *lockspace, int force)
856 {
857         struct dlm_ls *ls;
858         int error;
859
860         ls = dlm_find_lockspace_local(lockspace);
861         if (!ls)
862                 return -EINVAL;
863         dlm_put_lockspace(ls);
864
865         mutex_lock(&ls_lock);
866         error = release_lockspace(ls, force);
867         if (!error)
868                 ls_count--;
869         if (!ls_count)
870                 threads_stop();
871         mutex_unlock(&ls_lock);
872
873         return error;
874 }
875
876 void dlm_stop_lockspaces(void)
877 {
878         struct dlm_ls *ls;
879
880  restart:
881         spin_lock(&lslist_lock);
882         list_for_each_entry(ls, &lslist, ls_list) {
883                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
884                         continue;
885                 spin_unlock(&lslist_lock);
886                 log_error(ls, "no userland control daemon, stopping lockspace");
887                 dlm_ls_stop(ls);
888                 goto restart;
889         }
890         spin_unlock(&lslist_lock);
891 }
892