Linux-libre 3.4.9-gnu1
[librecmc/linux-libre.git] / fs / dlm / dir.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
25
26
27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28 {
29         spin_lock(&ls->ls_recover_list_lock);
30         list_add(&de->list, &ls->ls_recover_list);
31         spin_unlock(&ls->ls_recover_list_lock);
32 }
33
34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35 {
36         int found = 0;
37         struct dlm_direntry *de;
38
39         spin_lock(&ls->ls_recover_list_lock);
40         list_for_each_entry(de, &ls->ls_recover_list, list) {
41                 if (de->length == len) {
42                         list_del(&de->list);
43                         de->master_nodeid = 0;
44                         memset(de->name, 0, len);
45                         found = 1;
46                         break;
47                 }
48         }
49         spin_unlock(&ls->ls_recover_list_lock);
50
51         if (!found)
52                 de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS);
53         return de;
54 }
55
56 void dlm_clear_free_entries(struct dlm_ls *ls)
57 {
58         struct dlm_direntry *de;
59
60         spin_lock(&ls->ls_recover_list_lock);
61         while (!list_empty(&ls->ls_recover_list)) {
62                 de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
63                                 list);
64                 list_del(&de->list);
65                 kfree(de);
66         }
67         spin_unlock(&ls->ls_recover_list_lock);
68 }
69
70 /*
71  * We use the upper 16 bits of the hash value to select the directory node.
72  * Low bits are used for distribution of rsb's among hash buckets on each node.
73  *
74  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
75  * num_nodes to the hash value.  This value in the desired range is used as an
76  * offset into the sorted list of nodeid's to give the particular nodeid.
77  */
78
79 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
80 {
81         struct list_head *tmp;
82         struct dlm_member *memb = NULL;
83         uint32_t node, n = 0;
84         int nodeid;
85
86         if (ls->ls_num_nodes == 1) {
87                 nodeid = dlm_our_nodeid();
88                 goto out;
89         }
90
91         if (ls->ls_node_array) {
92                 node = (hash >> 16) % ls->ls_total_weight;
93                 nodeid = ls->ls_node_array[node];
94                 goto out;
95         }
96
97         /* make_member_array() failed to kmalloc ls_node_array... */
98
99         node = (hash >> 16) % ls->ls_num_nodes;
100
101         list_for_each(tmp, &ls->ls_nodes) {
102                 if (n++ != node)
103                         continue;
104                 memb = list_entry(tmp, struct dlm_member, list);
105                 break;
106         }
107
108         DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109                                  ls->ls_num_nodes, n, node););
110         nodeid = memb->nodeid;
111  out:
112         return nodeid;
113 }
114
115 int dlm_dir_nodeid(struct dlm_rsb *r)
116 {
117         return dlm_hash2nodeid(r->res_ls, r->res_hash);
118 }
119
120 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
121 {
122         uint32_t val;
123
124         val = jhash(name, len, 0);
125         val &= (ls->ls_dirtbl_size - 1);
126
127         return val;
128 }
129
130 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
131 {
132         uint32_t bucket;
133
134         bucket = dir_hash(ls, de->name, de->length);
135         list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
136 }
137
138 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
139                                           int namelen, uint32_t bucket)
140 {
141         struct dlm_direntry *de;
142
143         list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144                 if (de->length == namelen && !memcmp(name, de->name, namelen))
145                         goto out;
146         }
147         de = NULL;
148  out:
149         return de;
150 }
151
152 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
153 {
154         struct dlm_direntry *de;
155         uint32_t bucket;
156
157         bucket = dir_hash(ls, name, namelen);
158
159         spin_lock(&ls->ls_dirtbl[bucket].lock);
160
161         de = search_bucket(ls, name, namelen, bucket);
162
163         if (!de) {
164                 log_error(ls, "remove fr %u none", nodeid);
165                 goto out;
166         }
167
168         if (de->master_nodeid != nodeid) {
169                 log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170                 goto out;
171         }
172
173         list_del(&de->list);
174         kfree(de);
175  out:
176         spin_unlock(&ls->ls_dirtbl[bucket].lock);
177 }
178
179 void dlm_dir_clear(struct dlm_ls *ls)
180 {
181         struct list_head *head;
182         struct dlm_direntry *de;
183         int i;
184
185         DLM_ASSERT(list_empty(&ls->ls_recover_list), );
186
187         for (i = 0; i < ls->ls_dirtbl_size; i++) {
188                 spin_lock(&ls->ls_dirtbl[i].lock);
189                 head = &ls->ls_dirtbl[i].list;
190                 while (!list_empty(head)) {
191                         de = list_entry(head->next, struct dlm_direntry, list);
192                         list_del(&de->list);
193                         put_free_de(ls, de);
194                 }
195                 spin_unlock(&ls->ls_dirtbl[i].lock);
196         }
197 }
198
199 int dlm_recover_directory(struct dlm_ls *ls)
200 {
201         struct dlm_member *memb;
202         struct dlm_direntry *de;
203         char *b, *last_name = NULL;
204         int error = -ENOMEM, last_len, count = 0;
205         uint16_t namelen;
206
207         log_debug(ls, "dlm_recover_directory");
208
209         if (dlm_no_directory(ls))
210                 goto out_status;
211
212         dlm_dir_clear(ls);
213
214         last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
215         if (!last_name)
216                 goto out;
217
218         list_for_each_entry(memb, &ls->ls_nodes, list) {
219                 memset(last_name, 0, DLM_RESNAME_MAXLEN);
220                 last_len = 0;
221
222                 for (;;) {
223                         int left;
224                         error = dlm_recovery_stopped(ls);
225                         if (error)
226                                 goto out_free;
227
228                         error = dlm_rcom_names(ls, memb->nodeid,
229                                                last_name, last_len);
230                         if (error)
231                                 goto out_free;
232
233                         schedule();
234
235                         /*
236                          * pick namelen/name pairs out of received buffer
237                          */
238
239                         b = ls->ls_recover_buf->rc_buf;
240                         left = ls->ls_recover_buf->rc_header.h_length;
241                         left -= sizeof(struct dlm_rcom);
242
243                         for (;;) {
244                                 __be16 v;
245
246                                 error = -EINVAL;
247                                 if (left < sizeof(__be16))
248                                         goto out_free;
249
250                                 memcpy(&v, b, sizeof(__be16));
251                                 namelen = be16_to_cpu(v);
252                                 b += sizeof(__be16);
253                                 left -= sizeof(__be16);
254
255                                 /* namelen of 0xFFFFF marks end of names for
256                                    this node; namelen of 0 marks end of the
257                                    buffer */
258
259                                 if (namelen == 0xFFFF)
260                                         goto done;
261                                 if (!namelen)
262                                         break;
263
264                                 if (namelen > left)
265                                         goto out_free;
266
267                                 if (namelen > DLM_RESNAME_MAXLEN)
268                                         goto out_free;
269
270                                 error = -ENOMEM;
271                                 de = get_free_de(ls, namelen);
272                                 if (!de)
273                                         goto out_free;
274
275                                 de->master_nodeid = memb->nodeid;
276                                 de->length = namelen;
277                                 last_len = namelen;
278                                 memcpy(de->name, b, namelen);
279                                 memcpy(last_name, b, namelen);
280                                 b += namelen;
281                                 left -= namelen;
282
283                                 add_entry_to_hash(ls, de);
284                                 count++;
285                         }
286                 }
287          done:
288                 ;
289         }
290
291  out_status:
292         error = 0;
293         log_debug(ls, "dlm_recover_directory %d entries", count);
294  out_free:
295         kfree(last_name);
296  out:
297         dlm_clear_free_entries(ls);
298         return error;
299 }
300
301 static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
302                      int namelen, int *r_nodeid)
303 {
304         struct dlm_direntry *de, *tmp;
305         uint32_t bucket;
306
307         bucket = dir_hash(ls, name, namelen);
308
309         spin_lock(&ls->ls_dirtbl[bucket].lock);
310         de = search_bucket(ls, name, namelen, bucket);
311         if (de) {
312                 *r_nodeid = de->master_nodeid;
313                 spin_unlock(&ls->ls_dirtbl[bucket].lock);
314                 if (*r_nodeid == nodeid)
315                         return -EEXIST;
316                 return 0;
317         }
318
319         spin_unlock(&ls->ls_dirtbl[bucket].lock);
320
321         if (namelen > DLM_RESNAME_MAXLEN)
322                 return -EINVAL;
323
324         de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS);
325         if (!de)
326                 return -ENOMEM;
327
328         de->master_nodeid = nodeid;
329         de->length = namelen;
330         memcpy(de->name, name, namelen);
331
332         spin_lock(&ls->ls_dirtbl[bucket].lock);
333         tmp = search_bucket(ls, name, namelen, bucket);
334         if (tmp) {
335                 kfree(de);
336                 de = tmp;
337         } else {
338                 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
339         }
340         *r_nodeid = de->master_nodeid;
341         spin_unlock(&ls->ls_dirtbl[bucket].lock);
342         return 0;
343 }
344
345 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
346                    int *r_nodeid)
347 {
348         return get_entry(ls, nodeid, name, namelen, r_nodeid);
349 }
350
351 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
352 {
353         struct dlm_rsb *r;
354         uint32_t hash, bucket;
355         int rv;
356
357         hash = jhash(name, len, 0);
358         bucket = hash & (ls->ls_rsbtbl_size - 1);
359
360         spin_lock(&ls->ls_rsbtbl[bucket].lock);
361         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, 0, &r);
362         if (rv)
363                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
364                                          name, len, 0, &r);
365         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
366
367         if (!rv)
368                 return r;
369
370         down_read(&ls->ls_root_sem);
371         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
372                 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
373                         up_read(&ls->ls_root_sem);
374                         log_error(ls, "find_rsb_root revert to root_list %s",
375                                   r->res_name);
376                         return r;
377                 }
378         }
379         up_read(&ls->ls_root_sem);
380         return NULL;
381 }
382
383 /* Find the rsb where we left off (or start again), then send rsb names
384    for rsb's we're master of and whose directory node matches the requesting
385    node.  inbuf is the rsb name last sent, inlen is the name's length */
386
387 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
388                            char *outbuf, int outlen, int nodeid)
389 {
390         struct list_head *list;
391         struct dlm_rsb *r;
392         int offset = 0, dir_nodeid;
393         __be16 be_namelen;
394
395         down_read(&ls->ls_root_sem);
396
397         if (inlen > 1) {
398                 r = find_rsb_root(ls, inbuf, inlen);
399                 if (!r) {
400                         inbuf[inlen - 1] = '\0';
401                         log_error(ls, "copy_master_names from %d start %d %s",
402                                   nodeid, inlen, inbuf);
403                         goto out;
404                 }
405                 list = r->res_root_list.next;
406         } else {
407                 list = ls->ls_root_list.next;
408         }
409
410         for (offset = 0; list != &ls->ls_root_list; list = list->next) {
411                 r = list_entry(list, struct dlm_rsb, res_root_list);
412                 if (r->res_nodeid)
413                         continue;
414
415                 dir_nodeid = dlm_dir_nodeid(r);
416                 if (dir_nodeid != nodeid)
417                         continue;
418
419                 /*
420                  * The block ends when we can't fit the following in the
421                  * remaining buffer space:
422                  * namelen (uint16_t) +
423                  * name (r->res_length) +
424                  * end-of-block record 0x0000 (uint16_t)
425                  */
426
427                 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
428                         /* Write end-of-block record */
429                         be_namelen = cpu_to_be16(0);
430                         memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
431                         offset += sizeof(__be16);
432                         goto out;
433                 }
434
435                 be_namelen = cpu_to_be16(r->res_length);
436                 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
437                 offset += sizeof(__be16);
438                 memcpy(outbuf + offset, r->res_name, r->res_length);
439                 offset += r->res_length;
440         }
441
442         /*
443          * If we've reached the end of the list (and there's room) write a
444          * terminating record.
445          */
446
447         if ((list == &ls->ls_root_list) &&
448             (offset + sizeof(uint16_t) <= outlen)) {
449                 be_namelen = cpu_to_be16(0xFFFF);
450                 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
451                 offset += sizeof(__be16);
452         }
453
454  out:
455         up_read(&ls->ls_root_sem);
456 }
457