Linux-libre 5.4.47-gnu
[librecmc/linux-libre.git] / fs / ocfs2 / stack_user.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /* -*- mode: c; c-basic-offset: 8; -*-
3  * vim: noexpandtab sw=8 ts=8 sts=0:
4  *
5  * stack_user.c
6  *
7  * Code which interfaces ocfs2 with fs/dlm and a userspace stack.
8  *
9  * Copyright (C) 2007 Oracle.  All rights reserved.
10  */
11
12 #include <linux/module.h>
13 #include <linux/fs.h>
14 #include <linux/miscdevice.h>
15 #include <linux/mutex.h>
16 #include <linux/slab.h>
17 #include <linux/reboot.h>
18 #include <linux/sched.h>
19 #include <linux/uaccess.h>
20
21 #include "stackglue.h"
22
23 #include <linux/dlm_plock.h>
24
25 /*
26  * The control protocol starts with a handshake.  Until the handshake
27  * is complete, the control device will fail all write(2)s.
28  *
29  * The handshake is simple.  First, the client reads until EOF.  Each line
30  * of output is a supported protocol tag.  All protocol tags are a single
31  * character followed by a two hex digit version number.  Currently the
32  * only things supported is T01, for "Text-base version 0x01".  Next, the
33  * client writes the version they would like to use, including the newline.
34  * Thus, the protocol tag is 'T01\n'.  If the version tag written is
35  * unknown, -EINVAL is returned.  Once the negotiation is complete, the
36  * client can start sending messages.
37  *
38  * The T01 protocol has three messages.  First is the "SETN" message.
39  * It has the following syntax:
40  *
41  *  SETN<space><8-char-hex-nodenum><newline>
42  *
43  * This is 14 characters.
44  *
45  * The "SETN" message must be the first message following the protocol.
46  * It tells ocfs2_control the local node number.
47  *
48  * Next comes the "SETV" message.  It has the following syntax:
49  *
50  *  SETV<space><2-char-hex-major><space><2-char-hex-minor><newline>
51  *
52  * This is 11 characters.
53  *
54  * The "SETV" message sets the filesystem locking protocol version as
55  * negotiated by the client.  The client negotiates based on the maximum
56  * version advertised in /sys/fs/ocfs2/max_locking_protocol.  The major
57  * number from the "SETV" message must match
58  * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number
59  * must be less than or equal to ...sp_max_version.pv_minor.
60  *
61  * Once this information has been set, mounts will be allowed.  From this
62  * point on, the "DOWN" message can be sent for node down notification.
63  * It has the following syntax:
64  *
65  *  DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline>
66  *
67  * eg:
68  *
69  *  DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n
70  *
71  * This is 47 characters.
72  */
73
74 /*
75  * Whether or not the client has done the handshake.
76  * For now, we have just one protocol version.
77  */
78 #define OCFS2_CONTROL_PROTO                     "T01\n"
79 #define OCFS2_CONTROL_PROTO_LEN                 4
80
81 /* Handshake states */
82 #define OCFS2_CONTROL_HANDSHAKE_INVALID         (0)
83 #define OCFS2_CONTROL_HANDSHAKE_READ            (1)
84 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL        (2)
85 #define OCFS2_CONTROL_HANDSHAKE_VALID           (3)
86
87 /* Messages */
88 #define OCFS2_CONTROL_MESSAGE_OP_LEN            4
89 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP        "SETN"
90 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN 14
91 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP     "SETV"
92 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN      11
93 #define OCFS2_CONTROL_MESSAGE_DOWN_OP           "DOWN"
94 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN    47
95 #define OCFS2_TEXT_UUID_LEN                     32
96 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN        2
97 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN       8
98 #define VERSION_LOCK                            "version_lock"
99
100 enum ocfs2_connection_type {
101         WITH_CONTROLD,
102         NO_CONTROLD
103 };
104
105 /*
106  * ocfs2_live_connection is refcounted because the filesystem and
107  * miscdevice sides can detach in different order.  Let's just be safe.
108  */
109 struct ocfs2_live_connection {
110         struct list_head                oc_list;
111         struct ocfs2_cluster_connection *oc_conn;
112         enum ocfs2_connection_type      oc_type;
113         atomic_t                        oc_this_node;
114         int                             oc_our_slot;
115         struct dlm_lksb                 oc_version_lksb;
116         char                            oc_lvb[DLM_LVB_LEN];
117         struct completion               oc_sync_wait;
118         wait_queue_head_t               oc_wait;
119 };
120
121 struct ocfs2_control_private {
122         struct list_head op_list;
123         int op_state;
124         int op_this_node;
125         struct ocfs2_protocol_version op_proto;
126 };
127
128 /* SETN<space><8-char-hex-nodenum><newline> */
129 struct ocfs2_control_message_setn {
130         char    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
131         char    space;
132         char    nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
133         char    newline;
134 };
135
136 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */
137 struct ocfs2_control_message_setv {
138         char    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
139         char    space1;
140         char    major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
141         char    space2;
142         char    minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
143         char    newline;
144 };
145
146 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */
147 struct ocfs2_control_message_down {
148         char    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
149         char    space1;
150         char    uuid[OCFS2_TEXT_UUID_LEN];
151         char    space2;
152         char    nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
153         char    newline;
154 };
155
156 union ocfs2_control_message {
157         char                                    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
158         struct ocfs2_control_message_setn       u_setn;
159         struct ocfs2_control_message_setv       u_setv;
160         struct ocfs2_control_message_down       u_down;
161 };
162
163 static struct ocfs2_stack_plugin ocfs2_user_plugin;
164
165 static atomic_t ocfs2_control_opened;
166 static int ocfs2_control_this_node = -1;
167 static struct ocfs2_protocol_version running_proto;
168
169 static LIST_HEAD(ocfs2_live_connection_list);
170 static LIST_HEAD(ocfs2_control_private_list);
171 static DEFINE_MUTEX(ocfs2_control_lock);
172
173 static inline void ocfs2_control_set_handshake_state(struct file *file,
174                                                      int state)
175 {
176         struct ocfs2_control_private *p = file->private_data;
177         p->op_state = state;
178 }
179
180 static inline int ocfs2_control_get_handshake_state(struct file *file)
181 {
182         struct ocfs2_control_private *p = file->private_data;
183         return p->op_state;
184 }
185
186 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
187 {
188         size_t len = strlen(name);
189         struct ocfs2_live_connection *c;
190
191         BUG_ON(!mutex_is_locked(&ocfs2_control_lock));
192
193         list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) {
194                 if ((c->oc_conn->cc_namelen == len) &&
195                     !strncmp(c->oc_conn->cc_name, name, len))
196                         return c;
197         }
198
199         return NULL;
200 }
201
202 /*
203  * ocfs2_live_connection structures are created underneath the ocfs2
204  * mount path.  Since the VFS prevents multiple calls to
205  * fill_super(), we can't get dupes here.
206  */
207 static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn,
208                                      struct ocfs2_live_connection *c)
209 {
210         int rc = 0;
211
212         mutex_lock(&ocfs2_control_lock);
213         c->oc_conn = conn;
214
215         if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened))
216                 list_add(&c->oc_list, &ocfs2_live_connection_list);
217         else {
218                 printk(KERN_ERR
219                        "ocfs2: Userspace control daemon is not present\n");
220                 rc = -ESRCH;
221         }
222
223         mutex_unlock(&ocfs2_control_lock);
224         return rc;
225 }
226
227 /*
228  * This function disconnects the cluster connection from ocfs2_control.
229  * Afterwards, userspace can't affect the cluster connection.
230  */
231 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c)
232 {
233         mutex_lock(&ocfs2_control_lock);
234         list_del_init(&c->oc_list);
235         c->oc_conn = NULL;
236         mutex_unlock(&ocfs2_control_lock);
237
238         kfree(c);
239 }
240
241 static int ocfs2_control_cfu(void *target, size_t target_len,
242                              const char __user *buf, size_t count)
243 {
244         /* The T01 expects write(2) calls to have exactly one command */
245         if ((count != target_len) ||
246             (count > sizeof(union ocfs2_control_message)))
247                 return -EINVAL;
248
249         if (copy_from_user(target, buf, target_len))
250                 return -EFAULT;
251
252         return 0;
253 }
254
255 static ssize_t ocfs2_control_validate_protocol(struct file *file,
256                                                const char __user *buf,
257                                                size_t count)
258 {
259         ssize_t ret;
260         char kbuf[OCFS2_CONTROL_PROTO_LEN];
261
262         ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN,
263                                 buf, count);
264         if (ret)
265                 return ret;
266
267         if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN))
268                 return -EINVAL;
269
270         ocfs2_control_set_handshake_state(file,
271                                           OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
272
273         return count;
274 }
275
276 static void ocfs2_control_send_down(const char *uuid,
277                                     int nodenum)
278 {
279         struct ocfs2_live_connection *c;
280
281         mutex_lock(&ocfs2_control_lock);
282
283         c = ocfs2_connection_find(uuid);
284         if (c) {
285                 BUG_ON(c->oc_conn == NULL);
286                 c->oc_conn->cc_recovery_handler(nodenum,
287                                                 c->oc_conn->cc_recovery_data);
288         }
289
290         mutex_unlock(&ocfs2_control_lock);
291 }
292
293 /*
294  * Called whenever configuration elements are sent to /dev/ocfs2_control.
295  * If all configuration elements are present, try to set the global
296  * values.  If there is a problem, return an error.  Skip any missing
297  * elements, and only bump ocfs2_control_opened when we have all elements
298  * and are successful.
299  */
300 static int ocfs2_control_install_private(struct file *file)
301 {
302         int rc = 0;
303         int set_p = 1;
304         struct ocfs2_control_private *p = file->private_data;
305
306         BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
307
308         mutex_lock(&ocfs2_control_lock);
309
310         if (p->op_this_node < 0) {
311                 set_p = 0;
312         } else if ((ocfs2_control_this_node >= 0) &&
313                    (ocfs2_control_this_node != p->op_this_node)) {
314                 rc = -EINVAL;
315                 goto out_unlock;
316         }
317
318         if (!p->op_proto.pv_major) {
319                 set_p = 0;
320         } else if (!list_empty(&ocfs2_live_connection_list) &&
321                    ((running_proto.pv_major != p->op_proto.pv_major) ||
322                     (running_proto.pv_minor != p->op_proto.pv_minor))) {
323                 rc = -EINVAL;
324                 goto out_unlock;
325         }
326
327         if (set_p) {
328                 ocfs2_control_this_node = p->op_this_node;
329                 running_proto.pv_major = p->op_proto.pv_major;
330                 running_proto.pv_minor = p->op_proto.pv_minor;
331         }
332
333 out_unlock:
334         mutex_unlock(&ocfs2_control_lock);
335
336         if (!rc && set_p) {
337                 /* We set the global values successfully */
338                 atomic_inc(&ocfs2_control_opened);
339                 ocfs2_control_set_handshake_state(file,
340                                         OCFS2_CONTROL_HANDSHAKE_VALID);
341         }
342
343         return rc;
344 }
345
346 static int ocfs2_control_get_this_node(void)
347 {
348         int rc;
349
350         mutex_lock(&ocfs2_control_lock);
351         if (ocfs2_control_this_node < 0)
352                 rc = -EINVAL;
353         else
354                 rc = ocfs2_control_this_node;
355         mutex_unlock(&ocfs2_control_lock);
356
357         return rc;
358 }
359
360 static int ocfs2_control_do_setnode_msg(struct file *file,
361                                         struct ocfs2_control_message_setn *msg)
362 {
363         long nodenum;
364         char *ptr = NULL;
365         struct ocfs2_control_private *p = file->private_data;
366
367         if (ocfs2_control_get_handshake_state(file) !=
368             OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
369                 return -EINVAL;
370
371         if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
372                     OCFS2_CONTROL_MESSAGE_OP_LEN))
373                 return -EINVAL;
374
375         if ((msg->space != ' ') || (msg->newline != '\n'))
376                 return -EINVAL;
377         msg->space = msg->newline = '\0';
378
379         nodenum = simple_strtol(msg->nodestr, &ptr, 16);
380         if (!ptr || *ptr)
381                 return -EINVAL;
382
383         if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
384             (nodenum > INT_MAX) || (nodenum < 0))
385                 return -ERANGE;
386         p->op_this_node = nodenum;
387
388         return ocfs2_control_install_private(file);
389 }
390
391 static int ocfs2_control_do_setversion_msg(struct file *file,
392                                            struct ocfs2_control_message_setv *msg)
393 {
394         long major, minor;
395         char *ptr = NULL;
396         struct ocfs2_control_private *p = file->private_data;
397         struct ocfs2_protocol_version *max =
398                 &ocfs2_user_plugin.sp_max_proto;
399
400         if (ocfs2_control_get_handshake_state(file) !=
401             OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
402                 return -EINVAL;
403
404         if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
405                     OCFS2_CONTROL_MESSAGE_OP_LEN))
406                 return -EINVAL;
407
408         if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
409             (msg->newline != '\n'))
410                 return -EINVAL;
411         msg->space1 = msg->space2 = msg->newline = '\0';
412
413         major = simple_strtol(msg->major, &ptr, 16);
414         if (!ptr || *ptr)
415                 return -EINVAL;
416         minor = simple_strtol(msg->minor, &ptr, 16);
417         if (!ptr || *ptr)
418                 return -EINVAL;
419
420         /*
421          * The major must be between 1 and 255, inclusive.  The minor
422          * must be between 0 and 255, inclusive.  The version passed in
423          * must be within the maximum version supported by the filesystem.
424          */
425         if ((major == LONG_MIN) || (major == LONG_MAX) ||
426             (major > (u8)-1) || (major < 1))
427                 return -ERANGE;
428         if ((minor == LONG_MIN) || (minor == LONG_MAX) ||
429             (minor > (u8)-1) || (minor < 0))
430                 return -ERANGE;
431         if ((major != max->pv_major) ||
432             (minor > max->pv_minor))
433                 return -EINVAL;
434
435         p->op_proto.pv_major = major;
436         p->op_proto.pv_minor = minor;
437
438         return ocfs2_control_install_private(file);
439 }
440
441 static int ocfs2_control_do_down_msg(struct file *file,
442                                      struct ocfs2_control_message_down *msg)
443 {
444         long nodenum;
445         char *p = NULL;
446
447         if (ocfs2_control_get_handshake_state(file) !=
448             OCFS2_CONTROL_HANDSHAKE_VALID)
449                 return -EINVAL;
450
451         if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
452                     OCFS2_CONTROL_MESSAGE_OP_LEN))
453                 return -EINVAL;
454
455         if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
456             (msg->newline != '\n'))
457                 return -EINVAL;
458         msg->space1 = msg->space2 = msg->newline = '\0';
459
460         nodenum = simple_strtol(msg->nodestr, &p, 16);
461         if (!p || *p)
462                 return -EINVAL;
463
464         if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
465             (nodenum > INT_MAX) || (nodenum < 0))
466                 return -ERANGE;
467
468         ocfs2_control_send_down(msg->uuid, nodenum);
469
470         return 0;
471 }
472
473 static ssize_t ocfs2_control_message(struct file *file,
474                                      const char __user *buf,
475                                      size_t count)
476 {
477         ssize_t ret;
478         union ocfs2_control_message msg;
479
480         /* Try to catch padding issues */
481         WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) !=
482                 (sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1)));
483
484         memset(&msg, 0, sizeof(union ocfs2_control_message));
485         ret = ocfs2_control_cfu(&msg, count, buf, count);
486         if (ret)
487                 goto out;
488
489         if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) &&
490             !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
491                      OCFS2_CONTROL_MESSAGE_OP_LEN))
492                 ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn);
493         else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) &&
494                  !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
495                           OCFS2_CONTROL_MESSAGE_OP_LEN))
496                 ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv);
497         else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) &&
498                  !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
499                           OCFS2_CONTROL_MESSAGE_OP_LEN))
500                 ret = ocfs2_control_do_down_msg(file, &msg.u_down);
501         else
502                 ret = -EINVAL;
503
504 out:
505         return ret ? ret : count;
506 }
507
508 static ssize_t ocfs2_control_write(struct file *file,
509                                    const char __user *buf,
510                                    size_t count,
511                                    loff_t *ppos)
512 {
513         ssize_t ret;
514
515         switch (ocfs2_control_get_handshake_state(file)) {
516                 case OCFS2_CONTROL_HANDSHAKE_INVALID:
517                         ret = -EINVAL;
518                         break;
519
520                 case OCFS2_CONTROL_HANDSHAKE_READ:
521                         ret = ocfs2_control_validate_protocol(file, buf,
522                                                               count);
523                         break;
524
525                 case OCFS2_CONTROL_HANDSHAKE_PROTOCOL:
526                 case OCFS2_CONTROL_HANDSHAKE_VALID:
527                         ret = ocfs2_control_message(file, buf, count);
528                         break;
529
530                 default:
531                         BUG();
532                         ret = -EIO;
533                         break;
534         }
535
536         return ret;
537 }
538
539 /*
540  * This is a naive version.  If we ever have a new protocol, we'll expand
541  * it.  Probably using seq_file.
542  */
543 static ssize_t ocfs2_control_read(struct file *file,
544                                   char __user *buf,
545                                   size_t count,
546                                   loff_t *ppos)
547 {
548         ssize_t ret;
549
550         ret = simple_read_from_buffer(buf, count, ppos,
551                         OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN);
552
553         /* Have we read the whole protocol list? */
554         if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN)
555                 ocfs2_control_set_handshake_state(file,
556                                                   OCFS2_CONTROL_HANDSHAKE_READ);
557
558         return ret;
559 }
560
561 static int ocfs2_control_release(struct inode *inode, struct file *file)
562 {
563         struct ocfs2_control_private *p = file->private_data;
564
565         mutex_lock(&ocfs2_control_lock);
566
567         if (ocfs2_control_get_handshake_state(file) !=
568             OCFS2_CONTROL_HANDSHAKE_VALID)
569                 goto out;
570
571         if (atomic_dec_and_test(&ocfs2_control_opened)) {
572                 if (!list_empty(&ocfs2_live_connection_list)) {
573                         /* XXX: Do bad things! */
574                         printk(KERN_ERR
575                                "ocfs2: Unexpected release of ocfs2_control!\n"
576                                "       Loss of cluster connection requires "
577                                "an emergency restart!\n");
578                         emergency_restart();
579                 }
580                 /*
581                  * Last valid close clears the node number and resets
582                  * the locking protocol version
583                  */
584                 ocfs2_control_this_node = -1;
585                 running_proto.pv_major = 0;
586                 running_proto.pv_minor = 0;
587         }
588
589 out:
590         list_del_init(&p->op_list);
591         file->private_data = NULL;
592
593         mutex_unlock(&ocfs2_control_lock);
594
595         kfree(p);
596
597         return 0;
598 }
599
600 static int ocfs2_control_open(struct inode *inode, struct file *file)
601 {
602         struct ocfs2_control_private *p;
603
604         p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL);
605         if (!p)
606                 return -ENOMEM;
607         p->op_this_node = -1;
608
609         mutex_lock(&ocfs2_control_lock);
610         file->private_data = p;
611         list_add(&p->op_list, &ocfs2_control_private_list);
612         mutex_unlock(&ocfs2_control_lock);
613
614         return 0;
615 }
616
617 static const struct file_operations ocfs2_control_fops = {
618         .open    = ocfs2_control_open,
619         .release = ocfs2_control_release,
620         .read    = ocfs2_control_read,
621         .write   = ocfs2_control_write,
622         .owner   = THIS_MODULE,
623         .llseek  = default_llseek,
624 };
625
626 static struct miscdevice ocfs2_control_device = {
627         .minor          = MISC_DYNAMIC_MINOR,
628         .name           = "ocfs2_control",
629         .fops           = &ocfs2_control_fops,
630 };
631
632 static int ocfs2_control_init(void)
633 {
634         int rc;
635
636         atomic_set(&ocfs2_control_opened, 0);
637
638         rc = misc_register(&ocfs2_control_device);
639         if (rc)
640                 printk(KERN_ERR
641                        "ocfs2: Unable to register ocfs2_control device "
642                        "(errno %d)\n",
643                        -rc);
644
645         return rc;
646 }
647
648 static void ocfs2_control_exit(void)
649 {
650         misc_deregister(&ocfs2_control_device);
651 }
652
653 static void fsdlm_lock_ast_wrapper(void *astarg)
654 {
655         struct ocfs2_dlm_lksb *lksb = astarg;
656         int status = lksb->lksb_fsdlm.sb_status;
657
658         /*
659          * For now we're punting on the issue of other non-standard errors
660          * where we can't tell if the unlock_ast or lock_ast should be called.
661          * The main "other error" that's possible is EINVAL which means the
662          * function was called with invalid args, which shouldn't be possible
663          * since the caller here is under our control.  Other non-standard
664          * errors probably fall into the same category, or otherwise are fatal
665          * which means we can't carry on anyway.
666          */
667
668         if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
669                 lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0);
670         else
671                 lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
672 }
673
674 static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
675 {
676         struct ocfs2_dlm_lksb *lksb = astarg;
677
678         lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
679 }
680
681 static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
682                          int mode,
683                          struct ocfs2_dlm_lksb *lksb,
684                          u32 flags,
685                          void *name,
686                          unsigned int namelen)
687 {
688         int ret;
689
690         if (!lksb->lksb_fsdlm.sb_lvbptr)
691                 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
692                                              sizeof(struct dlm_lksb);
693
694         ret = dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
695                        flags|DLM_LKF_NODLCKWT, name, namelen, 0,
696                        fsdlm_lock_ast_wrapper, lksb,
697                        fsdlm_blocking_ast_wrapper);
698         return ret;
699 }
700
701 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
702                            struct ocfs2_dlm_lksb *lksb,
703                            u32 flags)
704 {
705         int ret;
706
707         ret = dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
708                          flags, &lksb->lksb_fsdlm, lksb);
709         return ret;
710 }
711
712 static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
713 {
714         return lksb->lksb_fsdlm.sb_status;
715 }
716
717 static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
718 {
719         int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID;
720
721         return !invalid;
722 }
723
724 static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
725 {
726         if (!lksb->lksb_fsdlm.sb_lvbptr)
727                 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
728                                              sizeof(struct dlm_lksb);
729         return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
730 }
731
732 static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
733 {
734 }
735
736 static int user_plock(struct ocfs2_cluster_connection *conn,
737                       u64 ino,
738                       struct file *file,
739                       int cmd,
740                       struct file_lock *fl)
741 {
742         /*
743          * This more or less just demuxes the plock request into any
744          * one of three dlm calls.
745          *
746          * Internally, fs/dlm will pass these to a misc device, which
747          * a userspace daemon will read and write to.
748          *
749          * For now, cancel requests (which happen internally only),
750          * are turned into unlocks. Most of this function taken from
751          * gfs2_lock.
752          */
753
754         if (cmd == F_CANCELLK) {
755                 cmd = F_SETLK;
756                 fl->fl_type = F_UNLCK;
757         }
758
759         if (IS_GETLK(cmd))
760                 return dlm_posix_get(conn->cc_lockspace, ino, file, fl);
761         else if (fl->fl_type == F_UNLCK)
762                 return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl);
763         else
764                 return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl);
765 }
766
767 /*
768  * Compare a requested locking protocol version against the current one.
769  *
770  * If the major numbers are different, they are incompatible.
771  * If the current minor is greater than the request, they are incompatible.
772  * If the current minor is less than or equal to the request, they are
773  * compatible, and the requester should run at the current minor version.
774  */
775 static int fs_protocol_compare(struct ocfs2_protocol_version *existing,
776                                struct ocfs2_protocol_version *request)
777 {
778         if (existing->pv_major != request->pv_major)
779                 return 1;
780
781         if (existing->pv_minor > request->pv_minor)
782                 return 1;
783
784         if (existing->pv_minor < request->pv_minor)
785                 request->pv_minor = existing->pv_minor;
786
787         return 0;
788 }
789
790 static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver)
791 {
792         struct ocfs2_protocol_version *pv =
793                 (struct ocfs2_protocol_version *)lvb;
794         /*
795          * ocfs2_protocol_version has two u8 variables, so we don't
796          * need any endian conversion.
797          */
798         ver->pv_major = pv->pv_major;
799         ver->pv_minor = pv->pv_minor;
800 }
801
802 static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb)
803 {
804         struct ocfs2_protocol_version *pv =
805                 (struct ocfs2_protocol_version *)lvb;
806         /*
807          * ocfs2_protocol_version has two u8 variables, so we don't
808          * need any endian conversion.
809          */
810         pv->pv_major = ver->pv_major;
811         pv->pv_minor = ver->pv_minor;
812 }
813
814 static void sync_wait_cb(void *arg)
815 {
816         struct ocfs2_cluster_connection *conn = arg;
817         struct ocfs2_live_connection *lc = conn->cc_private;
818         complete(&lc->oc_sync_wait);
819 }
820
821 static int sync_unlock(struct ocfs2_cluster_connection *conn,
822                 struct dlm_lksb *lksb, char *name)
823 {
824         int error;
825         struct ocfs2_live_connection *lc = conn->cc_private;
826
827         error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn);
828         if (error) {
829                 printk(KERN_ERR "%s lkid %x error %d\n",
830                                 name, lksb->sb_lkid, error);
831                 return error;
832         }
833
834         wait_for_completion(&lc->oc_sync_wait);
835
836         if (lksb->sb_status != -DLM_EUNLOCK) {
837                 printk(KERN_ERR "%s lkid %x status %d\n",
838                                 name, lksb->sb_lkid, lksb->sb_status);
839                 return -1;
840         }
841         return 0;
842 }
843
844 static int sync_lock(struct ocfs2_cluster_connection *conn,
845                 int mode, uint32_t flags,
846                 struct dlm_lksb *lksb, char *name)
847 {
848         int error, status;
849         struct ocfs2_live_connection *lc = conn->cc_private;
850
851         error = dlm_lock(conn->cc_lockspace, mode, lksb, flags,
852                         name, strlen(name),
853                         0, sync_wait_cb, conn, NULL);
854         if (error) {
855                 printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n",
856                                 name, lksb->sb_lkid, flags, mode, error);
857                 return error;
858         }
859
860         wait_for_completion(&lc->oc_sync_wait);
861
862         status = lksb->sb_status;
863
864         if (status && status != -EAGAIN) {
865                 printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n",
866                                 name, lksb->sb_lkid, flags, mode, status);
867         }
868
869         return status;
870 }
871
872
873 static int version_lock(struct ocfs2_cluster_connection *conn, int mode,
874                 int flags)
875 {
876         struct ocfs2_live_connection *lc = conn->cc_private;
877         return sync_lock(conn, mode, flags,
878                         &lc->oc_version_lksb, VERSION_LOCK);
879 }
880
881 static int version_unlock(struct ocfs2_cluster_connection *conn)
882 {
883         struct ocfs2_live_connection *lc = conn->cc_private;
884         return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK);
885 }
886
887 /* get_protocol_version()
888  *
889  * To exchange ocfs2 versioning, we use the LVB of the version dlm lock.
890  * The algorithm is:
891  * 1. Attempt to take the lock in EX mode (non-blocking).
892  * 2. If successful (which means it is the first mount), write the
893  *    version number and downconvert to PR lock.
894  * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after
895  *    taking the PR lock.
896  */
897
898 static int get_protocol_version(struct ocfs2_cluster_connection *conn)
899 {
900         int ret;
901         struct ocfs2_live_connection *lc = conn->cc_private;
902         struct ocfs2_protocol_version pv;
903
904         running_proto.pv_major =
905                 ocfs2_user_plugin.sp_max_proto.pv_major;
906         running_proto.pv_minor =
907                 ocfs2_user_plugin.sp_max_proto.pv_minor;
908
909         lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb;
910         ret = version_lock(conn, DLM_LOCK_EX,
911                         DLM_LKF_VALBLK|DLM_LKF_NOQUEUE);
912         if (!ret) {
913                 conn->cc_version.pv_major = running_proto.pv_major;
914                 conn->cc_version.pv_minor = running_proto.pv_minor;
915                 version_to_lvb(&running_proto, lc->oc_lvb);
916                 version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
917         } else if (ret == -EAGAIN) {
918                 ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK);
919                 if (ret)
920                         goto out;
921                 lvb_to_version(lc->oc_lvb, &pv);
922
923                 if ((pv.pv_major != running_proto.pv_major) ||
924                                 (pv.pv_minor > running_proto.pv_minor)) {
925                         ret = -EINVAL;
926                         goto out;
927                 }
928
929                 conn->cc_version.pv_major = pv.pv_major;
930                 conn->cc_version.pv_minor = pv.pv_minor;
931         }
932 out:
933         return ret;
934 }
935
936 static void user_recover_prep(void *arg)
937 {
938 }
939
940 static void user_recover_slot(void *arg, struct dlm_slot *slot)
941 {
942         struct ocfs2_cluster_connection *conn = arg;
943         printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n",
944                         slot->nodeid, slot->slot);
945         conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data);
946
947 }
948
949 static void user_recover_done(void *arg, struct dlm_slot *slots,
950                 int num_slots, int our_slot,
951                 uint32_t generation)
952 {
953         struct ocfs2_cluster_connection *conn = arg;
954         struct ocfs2_live_connection *lc = conn->cc_private;
955         int i;
956
957         for (i = 0; i < num_slots; i++)
958                 if (slots[i].slot == our_slot) {
959                         atomic_set(&lc->oc_this_node, slots[i].nodeid);
960                         break;
961                 }
962
963         lc->oc_our_slot = our_slot;
964         wake_up(&lc->oc_wait);
965 }
966
967 static const struct dlm_lockspace_ops ocfs2_ls_ops = {
968         .recover_prep = user_recover_prep,
969         .recover_slot = user_recover_slot,
970         .recover_done = user_recover_done,
971 };
972
973 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
974 {
975         version_unlock(conn);
976         dlm_release_lockspace(conn->cc_lockspace, 2);
977         conn->cc_lockspace = NULL;
978         ocfs2_live_connection_drop(conn->cc_private);
979         conn->cc_private = NULL;
980         return 0;
981 }
982
983 static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
984 {
985         dlm_lockspace_t *fsdlm;
986         struct ocfs2_live_connection *lc;
987         int rc, ops_rv;
988
989         BUG_ON(conn == NULL);
990
991         lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
992         if (!lc)
993                 return -ENOMEM;
994
995         init_waitqueue_head(&lc->oc_wait);
996         init_completion(&lc->oc_sync_wait);
997         atomic_set(&lc->oc_this_node, 0);
998         conn->cc_private = lc;
999         lc->oc_type = NO_CONTROLD;
1000
1001         rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name,
1002                                DLM_LSFL_FS | DLM_LSFL_NEWEXCL, DLM_LVB_LEN,
1003                                &ocfs2_ls_ops, conn, &ops_rv, &fsdlm);
1004         if (rc) {
1005                 if (rc == -EEXIST || rc == -EPROTO)
1006                         printk(KERN_ERR "ocfs2: Unable to create the "
1007                                 "lockspace %s (%d), because a ocfs2-tools "
1008                                 "program is running on this file system "
1009                                 "with the same name lockspace\n",
1010                                 conn->cc_name, rc);
1011                 goto out;
1012         }
1013
1014         if (ops_rv == -EOPNOTSUPP) {
1015                 lc->oc_type = WITH_CONTROLD;
1016                 printk(KERN_NOTICE "ocfs2: You seem to be using an older "
1017                                 "version of dlm_controld and/or ocfs2-tools."
1018                                 " Please consider upgrading.\n");
1019         } else if (ops_rv) {
1020                 rc = ops_rv;
1021                 goto out;
1022         }
1023         conn->cc_lockspace = fsdlm;
1024
1025         rc = ocfs2_live_connection_attach(conn, lc);
1026         if (rc)
1027                 goto out;
1028
1029         if (lc->oc_type == NO_CONTROLD) {
1030                 rc = get_protocol_version(conn);
1031                 if (rc) {
1032                         printk(KERN_ERR "ocfs2: Could not determine"
1033                                         " locking version\n");
1034                         user_cluster_disconnect(conn);
1035                         goto out;
1036                 }
1037                 wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0));
1038         }
1039
1040         /*
1041          * running_proto must have been set before we allowed any mounts
1042          * to proceed.
1043          */
1044         if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
1045                 printk(KERN_ERR
1046                        "Unable to mount with fs locking protocol version "
1047                        "%u.%u because negotiated protocol is %u.%u\n",
1048                        conn->cc_version.pv_major, conn->cc_version.pv_minor,
1049                        running_proto.pv_major, running_proto.pv_minor);
1050                 rc = -EPROTO;
1051                 ocfs2_live_connection_drop(lc);
1052                 lc = NULL;
1053         }
1054
1055 out:
1056         if (rc)
1057                 kfree(lc);
1058         return rc;
1059 }
1060
1061
1062 static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
1063                                   unsigned int *this_node)
1064 {
1065         int rc;
1066         struct ocfs2_live_connection *lc = conn->cc_private;
1067
1068         if (lc->oc_type == WITH_CONTROLD)
1069                 rc = ocfs2_control_get_this_node();
1070         else if (lc->oc_type == NO_CONTROLD)
1071                 rc = atomic_read(&lc->oc_this_node);
1072         else
1073                 rc = -EINVAL;
1074
1075         if (rc < 0)
1076                 return rc;
1077
1078         *this_node = rc;
1079         return 0;
1080 }
1081
1082 static struct ocfs2_stack_operations ocfs2_user_plugin_ops = {
1083         .connect        = user_cluster_connect,
1084         .disconnect     = user_cluster_disconnect,
1085         .this_node      = user_cluster_this_node,
1086         .dlm_lock       = user_dlm_lock,
1087         .dlm_unlock     = user_dlm_unlock,
1088         .lock_status    = user_dlm_lock_status,
1089         .lvb_valid      = user_dlm_lvb_valid,
1090         .lock_lvb       = user_dlm_lvb,
1091         .plock          = user_plock,
1092         .dump_lksb      = user_dlm_dump_lksb,
1093 };
1094
1095 static struct ocfs2_stack_plugin ocfs2_user_plugin = {
1096         .sp_name        = "user",
1097         .sp_ops         = &ocfs2_user_plugin_ops,
1098         .sp_owner       = THIS_MODULE,
1099 };
1100
1101
1102 static int __init ocfs2_user_plugin_init(void)
1103 {
1104         int rc;
1105
1106         rc = ocfs2_control_init();
1107         if (!rc) {
1108                 rc = ocfs2_stack_glue_register(&ocfs2_user_plugin);
1109                 if (rc)
1110                         ocfs2_control_exit();
1111         }
1112
1113         return rc;
1114 }
1115
1116 static void __exit ocfs2_user_plugin_exit(void)
1117 {
1118         ocfs2_stack_glue_unregister(&ocfs2_user_plugin);
1119         ocfs2_control_exit();
1120 }
1121
1122 MODULE_AUTHOR("Oracle");
1123 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks");
1124 MODULE_LICENSE("GPL");
1125 module_init(ocfs2_user_plugin_init);
1126 module_exit(ocfs2_user_plugin_exit);