Linux-libre 5.3.12-gnu
[librecmc/linux-libre.git] / fs / ceph / mds_client.c
1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/fs.h>
5 #include <linux/wait.h>
6 #include <linux/slab.h>
7 #include <linux/gfp.h>
8 #include <linux/sched.h>
9 #include <linux/debugfs.h>
10 #include <linux/seq_file.h>
11 #include <linux/ratelimit.h>
12
13 #include "super.h"
14 #include "mds_client.h"
15
16 #include <linux/ceph/ceph_features.h>
17 #include <linux/ceph/messenger.h>
18 #include <linux/ceph/decode.h>
19 #include <linux/ceph/pagelist.h>
20 #include <linux/ceph/auth.h>
21 #include <linux/ceph/debugfs.h>
22
23 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
24
25 /*
26  * A cluster of MDS (metadata server) daemons is responsible for
27  * managing the file system namespace (the directory hierarchy and
28  * inodes) and for coordinating shared access to storage.  Metadata is
29  * partitioning hierarchically across a number of servers, and that
30  * partition varies over time as the cluster adjusts the distribution
31  * in order to balance load.
32  *
33  * The MDS client is primarily responsible to managing synchronous
34  * metadata requests for operations like open, unlink, and so forth.
35  * If there is a MDS failure, we find out about it when we (possibly
36  * request and) receive a new MDS map, and can resubmit affected
37  * requests.
38  *
39  * For the most part, though, we take advantage of a lossless
40  * communications channel to the MDS, and do not need to worry about
41  * timing out or resubmitting requests.
42  *
43  * We maintain a stateful "session" with each MDS we interact with.
44  * Within each session, we sent periodic heartbeat messages to ensure
45  * any capabilities or leases we have been issues remain valid.  If
46  * the session times out and goes stale, our leases and capabilities
47  * are no longer valid.
48  */
49
50 struct ceph_reconnect_state {
51         struct ceph_mds_session *session;
52         int nr_caps, nr_realms;
53         struct ceph_pagelist *pagelist;
54         unsigned msg_version;
55         bool allow_multi;
56 };
57
58 static void __wake_requests(struct ceph_mds_client *mdsc,
59                             struct list_head *head);
60 static void ceph_cap_release_work(struct work_struct *work);
61 static void ceph_cap_reclaim_work(struct work_struct *work);
62
63 static const struct ceph_connection_operations mds_con_ops;
64
65
66 /*
67  * mds reply parsing
68  */
69
70 static int parse_reply_info_quota(void **p, void *end,
71                                   struct ceph_mds_reply_info_in *info)
72 {
73         u8 struct_v, struct_compat;
74         u32 struct_len;
75
76         ceph_decode_8_safe(p, end, struct_v, bad);
77         ceph_decode_8_safe(p, end, struct_compat, bad);
78         /* struct_v is expected to be >= 1. we only
79          * understand encoding with struct_compat == 1. */
80         if (!struct_v || struct_compat != 1)
81                 goto bad;
82         ceph_decode_32_safe(p, end, struct_len, bad);
83         ceph_decode_need(p, end, struct_len, bad);
84         end = *p + struct_len;
85         ceph_decode_64_safe(p, end, info->max_bytes, bad);
86         ceph_decode_64_safe(p, end, info->max_files, bad);
87         *p = end;
88         return 0;
89 bad:
90         return -EIO;
91 }
92
93 /*
94  * parse individual inode info
95  */
96 static int parse_reply_info_in(void **p, void *end,
97                                struct ceph_mds_reply_info_in *info,
98                                u64 features)
99 {
100         int err = 0;
101         u8 struct_v = 0;
102
103         if (features == (u64)-1) {
104                 u32 struct_len;
105                 u8 struct_compat;
106                 ceph_decode_8_safe(p, end, struct_v, bad);
107                 ceph_decode_8_safe(p, end, struct_compat, bad);
108                 /* struct_v is expected to be >= 1. we only understand
109                  * encoding with struct_compat == 1. */
110                 if (!struct_v || struct_compat != 1)
111                         goto bad;
112                 ceph_decode_32_safe(p, end, struct_len, bad);
113                 ceph_decode_need(p, end, struct_len, bad);
114                 end = *p + struct_len;
115         }
116
117         ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
118         info->in = *p;
119         *p += sizeof(struct ceph_mds_reply_inode) +
120                 sizeof(*info->in->fragtree.splits) *
121                 le32_to_cpu(info->in->fragtree.nsplits);
122
123         ceph_decode_32_safe(p, end, info->symlink_len, bad);
124         ceph_decode_need(p, end, info->symlink_len, bad);
125         info->symlink = *p;
126         *p += info->symlink_len;
127
128         ceph_decode_copy_safe(p, end, &info->dir_layout,
129                               sizeof(info->dir_layout), bad);
130         ceph_decode_32_safe(p, end, info->xattr_len, bad);
131         ceph_decode_need(p, end, info->xattr_len, bad);
132         info->xattr_data = *p;
133         *p += info->xattr_len;
134
135         if (features == (u64)-1) {
136                 /* inline data */
137                 ceph_decode_64_safe(p, end, info->inline_version, bad);
138                 ceph_decode_32_safe(p, end, info->inline_len, bad);
139                 ceph_decode_need(p, end, info->inline_len, bad);
140                 info->inline_data = *p;
141                 *p += info->inline_len;
142                 /* quota */
143                 err = parse_reply_info_quota(p, end, info);
144                 if (err < 0)
145                         goto out_bad;
146                 /* pool namespace */
147                 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
148                 if (info->pool_ns_len > 0) {
149                         ceph_decode_need(p, end, info->pool_ns_len, bad);
150                         info->pool_ns_data = *p;
151                         *p += info->pool_ns_len;
152                 }
153
154                 /* btime */
155                 ceph_decode_need(p, end, sizeof(info->btime), bad);
156                 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
157
158                 /* change attribute */
159                 ceph_decode_64_safe(p, end, info->change_attr, bad);
160
161                 /* dir pin */
162                 if (struct_v >= 2) {
163                         ceph_decode_32_safe(p, end, info->dir_pin, bad);
164                 } else {
165                         info->dir_pin = -ENODATA;
166                 }
167
168                 /* snapshot birth time, remains zero for v<=2 */
169                 if (struct_v >= 3) {
170                         ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
171                         ceph_decode_copy(p, &info->snap_btime,
172                                          sizeof(info->snap_btime));
173                 } else {
174                         memset(&info->snap_btime, 0, sizeof(info->snap_btime));
175                 }
176
177                 *p = end;
178         } else {
179                 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
180                         ceph_decode_64_safe(p, end, info->inline_version, bad);
181                         ceph_decode_32_safe(p, end, info->inline_len, bad);
182                         ceph_decode_need(p, end, info->inline_len, bad);
183                         info->inline_data = *p;
184                         *p += info->inline_len;
185                 } else
186                         info->inline_version = CEPH_INLINE_NONE;
187
188                 if (features & CEPH_FEATURE_MDS_QUOTA) {
189                         err = parse_reply_info_quota(p, end, info);
190                         if (err < 0)
191                                 goto out_bad;
192                 } else {
193                         info->max_bytes = 0;
194                         info->max_files = 0;
195                 }
196
197                 info->pool_ns_len = 0;
198                 info->pool_ns_data = NULL;
199                 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
200                         ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
201                         if (info->pool_ns_len > 0) {
202                                 ceph_decode_need(p, end, info->pool_ns_len, bad);
203                                 info->pool_ns_data = *p;
204                                 *p += info->pool_ns_len;
205                         }
206                 }
207
208                 if (features & CEPH_FEATURE_FS_BTIME) {
209                         ceph_decode_need(p, end, sizeof(info->btime), bad);
210                         ceph_decode_copy(p, &info->btime, sizeof(info->btime));
211                         ceph_decode_64_safe(p, end, info->change_attr, bad);
212                 }
213
214                 info->dir_pin = -ENODATA;
215                 /* info->snap_btime remains zero */
216         }
217         return 0;
218 bad:
219         err = -EIO;
220 out_bad:
221         return err;
222 }
223
224 static int parse_reply_info_dir(void **p, void *end,
225                                 struct ceph_mds_reply_dirfrag **dirfrag,
226                                 u64 features)
227 {
228         if (features == (u64)-1) {
229                 u8 struct_v, struct_compat;
230                 u32 struct_len;
231                 ceph_decode_8_safe(p, end, struct_v, bad);
232                 ceph_decode_8_safe(p, end, struct_compat, bad);
233                 /* struct_v is expected to be >= 1. we only understand
234                  * encoding whose struct_compat == 1. */
235                 if (!struct_v || struct_compat != 1)
236                         goto bad;
237                 ceph_decode_32_safe(p, end, struct_len, bad);
238                 ceph_decode_need(p, end, struct_len, bad);
239                 end = *p + struct_len;
240         }
241
242         ceph_decode_need(p, end, sizeof(**dirfrag), bad);
243         *dirfrag = *p;
244         *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
245         if (unlikely(*p > end))
246                 goto bad;
247         if (features == (u64)-1)
248                 *p = end;
249         return 0;
250 bad:
251         return -EIO;
252 }
253
254 static int parse_reply_info_lease(void **p, void *end,
255                                   struct ceph_mds_reply_lease **lease,
256                                   u64 features)
257 {
258         if (features == (u64)-1) {
259                 u8 struct_v, struct_compat;
260                 u32 struct_len;
261                 ceph_decode_8_safe(p, end, struct_v, bad);
262                 ceph_decode_8_safe(p, end, struct_compat, bad);
263                 /* struct_v is expected to be >= 1. we only understand
264                  * encoding whose struct_compat == 1. */
265                 if (!struct_v || struct_compat != 1)
266                         goto bad;
267                 ceph_decode_32_safe(p, end, struct_len, bad);
268                 ceph_decode_need(p, end, struct_len, bad);
269                 end = *p + struct_len;
270         }
271
272         ceph_decode_need(p, end, sizeof(**lease), bad);
273         *lease = *p;
274         *p += sizeof(**lease);
275         if (features == (u64)-1)
276                 *p = end;
277         return 0;
278 bad:
279         return -EIO;
280 }
281
282 /*
283  * parse a normal reply, which may contain a (dir+)dentry and/or a
284  * target inode.
285  */
286 static int parse_reply_info_trace(void **p, void *end,
287                                   struct ceph_mds_reply_info_parsed *info,
288                                   u64 features)
289 {
290         int err;
291
292         if (info->head->is_dentry) {
293                 err = parse_reply_info_in(p, end, &info->diri, features);
294                 if (err < 0)
295                         goto out_bad;
296
297                 err = parse_reply_info_dir(p, end, &info->dirfrag, features);
298                 if (err < 0)
299                         goto out_bad;
300
301                 ceph_decode_32_safe(p, end, info->dname_len, bad);
302                 ceph_decode_need(p, end, info->dname_len, bad);
303                 info->dname = *p;
304                 *p += info->dname_len;
305
306                 err = parse_reply_info_lease(p, end, &info->dlease, features);
307                 if (err < 0)
308                         goto out_bad;
309         }
310
311         if (info->head->is_target) {
312                 err = parse_reply_info_in(p, end, &info->targeti, features);
313                 if (err < 0)
314                         goto out_bad;
315         }
316
317         if (unlikely(*p != end))
318                 goto bad;
319         return 0;
320
321 bad:
322         err = -EIO;
323 out_bad:
324         pr_err("problem parsing mds trace %d\n", err);
325         return err;
326 }
327
328 /*
329  * parse readdir results
330  */
331 static int parse_reply_info_readdir(void **p, void *end,
332                                 struct ceph_mds_reply_info_parsed *info,
333                                 u64 features)
334 {
335         u32 num, i = 0;
336         int err;
337
338         err = parse_reply_info_dir(p, end, &info->dir_dir, features);
339         if (err < 0)
340                 goto out_bad;
341
342         ceph_decode_need(p, end, sizeof(num) + 2, bad);
343         num = ceph_decode_32(p);
344         {
345                 u16 flags = ceph_decode_16(p);
346                 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
347                 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
348                 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
349                 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
350         }
351         if (num == 0)
352                 goto done;
353
354         BUG_ON(!info->dir_entries);
355         if ((unsigned long)(info->dir_entries + num) >
356             (unsigned long)info->dir_entries + info->dir_buf_size) {
357                 pr_err("dir contents are larger than expected\n");
358                 WARN_ON(1);
359                 goto bad;
360         }
361
362         info->dir_nr = num;
363         while (num) {
364                 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
365                 /* dentry */
366                 ceph_decode_32_safe(p, end, rde->name_len, bad);
367                 ceph_decode_need(p, end, rde->name_len, bad);
368                 rde->name = *p;
369                 *p += rde->name_len;
370                 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
371
372                 /* dentry lease */
373                 err = parse_reply_info_lease(p, end, &rde->lease, features);
374                 if (err)
375                         goto out_bad;
376                 /* inode */
377                 err = parse_reply_info_in(p, end, &rde->inode, features);
378                 if (err < 0)
379                         goto out_bad;
380                 /* ceph_readdir_prepopulate() will update it */
381                 rde->offset = 0;
382                 i++;
383                 num--;
384         }
385
386 done:
387         /* Skip over any unrecognized fields */
388         *p = end;
389         return 0;
390
391 bad:
392         err = -EIO;
393 out_bad:
394         pr_err("problem parsing dir contents %d\n", err);
395         return err;
396 }
397
398 /*
399  * parse fcntl F_GETLK results
400  */
401 static int parse_reply_info_filelock(void **p, void *end,
402                                      struct ceph_mds_reply_info_parsed *info,
403                                      u64 features)
404 {
405         if (*p + sizeof(*info->filelock_reply) > end)
406                 goto bad;
407
408         info->filelock_reply = *p;
409
410         /* Skip over any unrecognized fields */
411         *p = end;
412         return 0;
413 bad:
414         return -EIO;
415 }
416
417 /*
418  * parse create results
419  */
420 static int parse_reply_info_create(void **p, void *end,
421                                   struct ceph_mds_reply_info_parsed *info,
422                                   u64 features)
423 {
424         if (features == (u64)-1 ||
425             (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
426                 /* Malformed reply? */
427                 if (*p == end) {
428                         info->has_create_ino = false;
429                 } else {
430                         info->has_create_ino = true;
431                         ceph_decode_64_safe(p, end, info->ino, bad);
432                 }
433         } else {
434                 if (*p != end)
435                         goto bad;
436         }
437
438         /* Skip over any unrecognized fields */
439         *p = end;
440         return 0;
441 bad:
442         return -EIO;
443 }
444
445 /*
446  * parse extra results
447  */
448 static int parse_reply_info_extra(void **p, void *end,
449                                   struct ceph_mds_reply_info_parsed *info,
450                                   u64 features)
451 {
452         u32 op = le32_to_cpu(info->head->op);
453
454         if (op == CEPH_MDS_OP_GETFILELOCK)
455                 return parse_reply_info_filelock(p, end, info, features);
456         else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
457                 return parse_reply_info_readdir(p, end, info, features);
458         else if (op == CEPH_MDS_OP_CREATE)
459                 return parse_reply_info_create(p, end, info, features);
460         else
461                 return -EIO;
462 }
463
464 /*
465  * parse entire mds reply
466  */
467 static int parse_reply_info(struct ceph_msg *msg,
468                             struct ceph_mds_reply_info_parsed *info,
469                             u64 features)
470 {
471         void *p, *end;
472         u32 len;
473         int err;
474
475         info->head = msg->front.iov_base;
476         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
477         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
478
479         /* trace */
480         ceph_decode_32_safe(&p, end, len, bad);
481         if (len > 0) {
482                 ceph_decode_need(&p, end, len, bad);
483                 err = parse_reply_info_trace(&p, p+len, info, features);
484                 if (err < 0)
485                         goto out_bad;
486         }
487
488         /* extra */
489         ceph_decode_32_safe(&p, end, len, bad);
490         if (len > 0) {
491                 ceph_decode_need(&p, end, len, bad);
492                 err = parse_reply_info_extra(&p, p+len, info, features);
493                 if (err < 0)
494                         goto out_bad;
495         }
496
497         /* snap blob */
498         ceph_decode_32_safe(&p, end, len, bad);
499         info->snapblob_len = len;
500         info->snapblob = p;
501         p += len;
502
503         if (p != end)
504                 goto bad;
505         return 0;
506
507 bad:
508         err = -EIO;
509 out_bad:
510         pr_err("mds parse_reply err %d\n", err);
511         return err;
512 }
513
514 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
515 {
516         if (!info->dir_entries)
517                 return;
518         free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
519 }
520
521
522 /*
523  * sessions
524  */
525 const char *ceph_session_state_name(int s)
526 {
527         switch (s) {
528         case CEPH_MDS_SESSION_NEW: return "new";
529         case CEPH_MDS_SESSION_OPENING: return "opening";
530         case CEPH_MDS_SESSION_OPEN: return "open";
531         case CEPH_MDS_SESSION_HUNG: return "hung";
532         case CEPH_MDS_SESSION_CLOSING: return "closing";
533         case CEPH_MDS_SESSION_RESTARTING: return "restarting";
534         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
535         case CEPH_MDS_SESSION_REJECTED: return "rejected";
536         default: return "???";
537         }
538 }
539
540 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
541 {
542         if (refcount_inc_not_zero(&s->s_ref)) {
543                 dout("mdsc get_session %p %d -> %d\n", s,
544                      refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
545                 return s;
546         } else {
547                 dout("mdsc get_session %p 0 -- FAIL\n", s);
548                 return NULL;
549         }
550 }
551
552 void ceph_put_mds_session(struct ceph_mds_session *s)
553 {
554         dout("mdsc put_session %p %d -> %d\n", s,
555              refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
556         if (refcount_dec_and_test(&s->s_ref)) {
557                 if (s->s_auth.authorizer)
558                         ceph_auth_destroy_authorizer(s->s_auth.authorizer);
559                 kfree(s);
560         }
561 }
562
563 /*
564  * called under mdsc->mutex
565  */
566 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
567                                                    int mds)
568 {
569         if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
570                 return NULL;
571         return get_session(mdsc->sessions[mds]);
572 }
573
574 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
575 {
576         if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
577                 return false;
578         else
579                 return true;
580 }
581
582 static int __verify_registered_session(struct ceph_mds_client *mdsc,
583                                        struct ceph_mds_session *s)
584 {
585         if (s->s_mds >= mdsc->max_sessions ||
586             mdsc->sessions[s->s_mds] != s)
587                 return -ENOENT;
588         return 0;
589 }
590
591 /*
592  * create+register a new session for given mds.
593  * called under mdsc->mutex.
594  */
595 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
596                                                  int mds)
597 {
598         struct ceph_mds_session *s;
599
600         if (mds >= mdsc->mdsmap->m_num_mds)
601                 return ERR_PTR(-EINVAL);
602
603         s = kzalloc(sizeof(*s), GFP_NOFS);
604         if (!s)
605                 return ERR_PTR(-ENOMEM);
606
607         if (mds >= mdsc->max_sessions) {
608                 int newmax = 1 << get_count_order(mds + 1);
609                 struct ceph_mds_session **sa;
610
611                 dout("%s: realloc to %d\n", __func__, newmax);
612                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
613                 if (!sa)
614                         goto fail_realloc;
615                 if (mdsc->sessions) {
616                         memcpy(sa, mdsc->sessions,
617                                mdsc->max_sessions * sizeof(void *));
618                         kfree(mdsc->sessions);
619                 }
620                 mdsc->sessions = sa;
621                 mdsc->max_sessions = newmax;
622         }
623
624         dout("%s: mds%d\n", __func__, mds);
625         s->s_mdsc = mdsc;
626         s->s_mds = mds;
627         s->s_state = CEPH_MDS_SESSION_NEW;
628         s->s_ttl = 0;
629         s->s_seq = 0;
630         mutex_init(&s->s_mutex);
631
632         ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
633
634         spin_lock_init(&s->s_gen_ttl_lock);
635         s->s_cap_gen = 1;
636         s->s_cap_ttl = jiffies - 1;
637
638         spin_lock_init(&s->s_cap_lock);
639         s->s_renew_requested = 0;
640         s->s_renew_seq = 0;
641         INIT_LIST_HEAD(&s->s_caps);
642         s->s_nr_caps = 0;
643         s->s_trim_caps = 0;
644         refcount_set(&s->s_ref, 1);
645         INIT_LIST_HEAD(&s->s_waiting);
646         INIT_LIST_HEAD(&s->s_unsafe);
647         s->s_num_cap_releases = 0;
648         s->s_cap_reconnect = 0;
649         s->s_cap_iterator = NULL;
650         INIT_LIST_HEAD(&s->s_cap_releases);
651         INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
652
653         INIT_LIST_HEAD(&s->s_cap_flushing);
654
655         mdsc->sessions[mds] = s;
656         atomic_inc(&mdsc->num_sessions);
657         refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
658
659         ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
660                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
661
662         return s;
663
664 fail_realloc:
665         kfree(s);
666         return ERR_PTR(-ENOMEM);
667 }
668
669 /*
670  * called under mdsc->mutex
671  */
672 static void __unregister_session(struct ceph_mds_client *mdsc,
673                                struct ceph_mds_session *s)
674 {
675         dout("__unregister_session mds%d %p\n", s->s_mds, s);
676         BUG_ON(mdsc->sessions[s->s_mds] != s);
677         mdsc->sessions[s->s_mds] = NULL;
678         s->s_state = 0;
679         ceph_con_close(&s->s_con);
680         ceph_put_mds_session(s);
681         atomic_dec(&mdsc->num_sessions);
682 }
683
684 /*
685  * drop session refs in request.
686  *
687  * should be last request ref, or hold mdsc->mutex
688  */
689 static void put_request_session(struct ceph_mds_request *req)
690 {
691         if (req->r_session) {
692                 ceph_put_mds_session(req->r_session);
693                 req->r_session = NULL;
694         }
695 }
696
697 void ceph_mdsc_release_request(struct kref *kref)
698 {
699         struct ceph_mds_request *req = container_of(kref,
700                                                     struct ceph_mds_request,
701                                                     r_kref);
702         destroy_reply_info(&req->r_reply_info);
703         if (req->r_request)
704                 ceph_msg_put(req->r_request);
705         if (req->r_reply)
706                 ceph_msg_put(req->r_reply);
707         if (req->r_inode) {
708                 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
709                 /* avoid calling iput_final() in mds dispatch threads */
710                 ceph_async_iput(req->r_inode);
711         }
712         if (req->r_parent)
713                 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
714         ceph_async_iput(req->r_target_inode);
715         if (req->r_dentry)
716                 dput(req->r_dentry);
717         if (req->r_old_dentry)
718                 dput(req->r_old_dentry);
719         if (req->r_old_dentry_dir) {
720                 /*
721                  * track (and drop pins for) r_old_dentry_dir
722                  * separately, since r_old_dentry's d_parent may have
723                  * changed between the dir mutex being dropped and
724                  * this request being freed.
725                  */
726                 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
727                                   CEPH_CAP_PIN);
728                 ceph_async_iput(req->r_old_dentry_dir);
729         }
730         kfree(req->r_path1);
731         kfree(req->r_path2);
732         if (req->r_pagelist)
733                 ceph_pagelist_release(req->r_pagelist);
734         put_request_session(req);
735         ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
736         WARN_ON_ONCE(!list_empty(&req->r_wait));
737         kfree(req);
738 }
739
740 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
741
742 /*
743  * lookup session, bump ref if found.
744  *
745  * called under mdsc->mutex.
746  */
747 static struct ceph_mds_request *
748 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
749 {
750         struct ceph_mds_request *req;
751
752         req = lookup_request(&mdsc->request_tree, tid);
753         if (req)
754                 ceph_mdsc_get_request(req);
755
756         return req;
757 }
758
759 /*
760  * Register an in-flight request, and assign a tid.  Link to directory
761  * are modifying (if any).
762  *
763  * Called under mdsc->mutex.
764  */
765 static void __register_request(struct ceph_mds_client *mdsc,
766                                struct ceph_mds_request *req,
767                                struct inode *dir)
768 {
769         int ret = 0;
770
771         req->r_tid = ++mdsc->last_tid;
772         if (req->r_num_caps) {
773                 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
774                                         req->r_num_caps);
775                 if (ret < 0) {
776                         pr_err("__register_request %p "
777                                "failed to reserve caps: %d\n", req, ret);
778                         /* set req->r_err to fail early from __do_request */
779                         req->r_err = ret;
780                         return;
781                 }
782         }
783         dout("__register_request %p tid %lld\n", req, req->r_tid);
784         ceph_mdsc_get_request(req);
785         insert_request(&mdsc->request_tree, req);
786
787         req->r_uid = current_fsuid();
788         req->r_gid = current_fsgid();
789
790         if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
791                 mdsc->oldest_tid = req->r_tid;
792
793         if (dir) {
794                 ihold(dir);
795                 req->r_unsafe_dir = dir;
796         }
797 }
798
799 static void __unregister_request(struct ceph_mds_client *mdsc,
800                                  struct ceph_mds_request *req)
801 {
802         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
803
804         /* Never leave an unregistered request on an unsafe list! */
805         list_del_init(&req->r_unsafe_item);
806
807         if (req->r_tid == mdsc->oldest_tid) {
808                 struct rb_node *p = rb_next(&req->r_node);
809                 mdsc->oldest_tid = 0;
810                 while (p) {
811                         struct ceph_mds_request *next_req =
812                                 rb_entry(p, struct ceph_mds_request, r_node);
813                         if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
814                                 mdsc->oldest_tid = next_req->r_tid;
815                                 break;
816                         }
817                         p = rb_next(p);
818                 }
819         }
820
821         erase_request(&mdsc->request_tree, req);
822
823         if (req->r_unsafe_dir  &&
824             test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
825                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
826                 spin_lock(&ci->i_unsafe_lock);
827                 list_del_init(&req->r_unsafe_dir_item);
828                 spin_unlock(&ci->i_unsafe_lock);
829         }
830         if (req->r_target_inode &&
831             test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
832                 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
833                 spin_lock(&ci->i_unsafe_lock);
834                 list_del_init(&req->r_unsafe_target_item);
835                 spin_unlock(&ci->i_unsafe_lock);
836         }
837
838         if (req->r_unsafe_dir) {
839                 /* avoid calling iput_final() in mds dispatch threads */
840                 ceph_async_iput(req->r_unsafe_dir);
841                 req->r_unsafe_dir = NULL;
842         }
843
844         complete_all(&req->r_safe_completion);
845
846         ceph_mdsc_put_request(req);
847 }
848
849 /*
850  * Walk back up the dentry tree until we hit a dentry representing a
851  * non-snapshot inode. We do this using the rcu_read_lock (which must be held
852  * when calling this) to ensure that the objects won't disappear while we're
853  * working with them. Once we hit a candidate dentry, we attempt to take a
854  * reference to it, and return that as the result.
855  */
856 static struct inode *get_nonsnap_parent(struct dentry *dentry)
857 {
858         struct inode *inode = NULL;
859
860         while (dentry && !IS_ROOT(dentry)) {
861                 inode = d_inode_rcu(dentry);
862                 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
863                         break;
864                 dentry = dentry->d_parent;
865         }
866         if (inode)
867                 inode = igrab(inode);
868         return inode;
869 }
870
871 /*
872  * Choose mds to send request to next.  If there is a hint set in the
873  * request (e.g., due to a prior forward hint from the mds), use that.
874  * Otherwise, consult frag tree and/or caps to identify the
875  * appropriate mds.  If all else fails, choose randomly.
876  *
877  * Called under mdsc->mutex.
878  */
879 static int __choose_mds(struct ceph_mds_client *mdsc,
880                         struct ceph_mds_request *req)
881 {
882         struct inode *inode;
883         struct ceph_inode_info *ci;
884         struct ceph_cap *cap;
885         int mode = req->r_direct_mode;
886         int mds = -1;
887         u32 hash = req->r_direct_hash;
888         bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
889
890         /*
891          * is there a specific mds we should try?  ignore hint if we have
892          * no session and the mds is not up (active or recovering).
893          */
894         if (req->r_resend_mds >= 0 &&
895             (__have_session(mdsc, req->r_resend_mds) ||
896              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
897                 dout("choose_mds using resend_mds mds%d\n",
898                      req->r_resend_mds);
899                 return req->r_resend_mds;
900         }
901
902         if (mode == USE_RANDOM_MDS)
903                 goto random;
904
905         inode = NULL;
906         if (req->r_inode) {
907                 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
908                         inode = req->r_inode;
909                         ihold(inode);
910                 } else {
911                         /* req->r_dentry is non-null for LSSNAP request */
912                         rcu_read_lock();
913                         inode = get_nonsnap_parent(req->r_dentry);
914                         rcu_read_unlock();
915                         dout("__choose_mds using snapdir's parent %p\n", inode);
916                 }
917         } else if (req->r_dentry) {
918                 /* ignore race with rename; old or new d_parent is okay */
919                 struct dentry *parent;
920                 struct inode *dir;
921
922                 rcu_read_lock();
923                 parent = READ_ONCE(req->r_dentry->d_parent);
924                 dir = req->r_parent ? : d_inode_rcu(parent);
925
926                 if (!dir || dir->i_sb != mdsc->fsc->sb) {
927                         /*  not this fs or parent went negative */
928                         inode = d_inode(req->r_dentry);
929                         if (inode)
930                                 ihold(inode);
931                 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
932                         /* direct snapped/virtual snapdir requests
933                          * based on parent dir inode */
934                         inode = get_nonsnap_parent(parent);
935                         dout("__choose_mds using nonsnap parent %p\n", inode);
936                 } else {
937                         /* dentry target */
938                         inode = d_inode(req->r_dentry);
939                         if (!inode || mode == USE_AUTH_MDS) {
940                                 /* dir + name */
941                                 inode = igrab(dir);
942                                 hash = ceph_dentry_hash(dir, req->r_dentry);
943                                 is_hash = true;
944                         } else {
945                                 ihold(inode);
946                         }
947                 }
948                 rcu_read_unlock();
949         }
950
951         dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
952              (int)hash, mode);
953         if (!inode)
954                 goto random;
955         ci = ceph_inode(inode);
956
957         if (is_hash && S_ISDIR(inode->i_mode)) {
958                 struct ceph_inode_frag frag;
959                 int found;
960
961                 ceph_choose_frag(ci, hash, &frag, &found);
962                 if (found) {
963                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
964                                 u8 r;
965
966                                 /* choose a random replica */
967                                 get_random_bytes(&r, 1);
968                                 r %= frag.ndist;
969                                 mds = frag.dist[r];
970                                 dout("choose_mds %p %llx.%llx "
971                                      "frag %u mds%d (%d/%d)\n",
972                                      inode, ceph_vinop(inode),
973                                      frag.frag, mds,
974                                      (int)r, frag.ndist);
975                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
976                                     CEPH_MDS_STATE_ACTIVE)
977                                         goto out;
978                         }
979
980                         /* since this file/dir wasn't known to be
981                          * replicated, then we want to look for the
982                          * authoritative mds. */
983                         mode = USE_AUTH_MDS;
984                         if (frag.mds >= 0) {
985                                 /* choose auth mds */
986                                 mds = frag.mds;
987                                 dout("choose_mds %p %llx.%llx "
988                                      "frag %u mds%d (auth)\n",
989                                      inode, ceph_vinop(inode), frag.frag, mds);
990                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
991                                     CEPH_MDS_STATE_ACTIVE)
992                                         goto out;
993                         }
994                 }
995         }
996
997         spin_lock(&ci->i_ceph_lock);
998         cap = NULL;
999         if (mode == USE_AUTH_MDS)
1000                 cap = ci->i_auth_cap;
1001         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1002                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1003         if (!cap) {
1004                 spin_unlock(&ci->i_ceph_lock);
1005                 ceph_async_iput(inode);
1006                 goto random;
1007         }
1008         mds = cap->session->s_mds;
1009         dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
1010              inode, ceph_vinop(inode), mds,
1011              cap == ci->i_auth_cap ? "auth " : "", cap);
1012         spin_unlock(&ci->i_ceph_lock);
1013 out:
1014         /* avoid calling iput_final() while holding mdsc->mutex or
1015          * in mds dispatch threads */
1016         ceph_async_iput(inode);
1017         return mds;
1018
1019 random:
1020         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1021         dout("choose_mds chose random mds%d\n", mds);
1022         return mds;
1023 }
1024
1025
1026 /*
1027  * session messages
1028  */
1029 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
1030 {
1031         struct ceph_msg *msg;
1032         struct ceph_mds_session_head *h;
1033
1034         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1035                            false);
1036         if (!msg) {
1037                 pr_err("create_session_msg ENOMEM creating msg\n");
1038                 return NULL;
1039         }
1040         h = msg->front.iov_base;
1041         h->op = cpu_to_le32(op);
1042         h->seq = cpu_to_le64(seq);
1043
1044         return msg;
1045 }
1046
1047 static void encode_supported_features(void **p, void *end)
1048 {
1049         static const unsigned char bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1050         static const size_t count = ARRAY_SIZE(bits);
1051
1052         if (count > 0) {
1053                 size_t i;
1054                 size_t size = ((size_t)bits[count - 1] + 64) / 64 * 8;
1055
1056                 BUG_ON(*p + 4 + size > end);
1057                 ceph_encode_32(p, size);
1058                 memset(*p, 0, size);
1059                 for (i = 0; i < count; i++)
1060                         ((unsigned char*)(*p))[i / 8] |= 1 << (bits[i] % 8);
1061                 *p += size;
1062         } else {
1063                 BUG_ON(*p + 4 > end);
1064                 ceph_encode_32(p, 0);
1065         }
1066 }
1067
1068 /*
1069  * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1070  * to include additional client metadata fields.
1071  */
1072 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1073 {
1074         struct ceph_msg *msg;
1075         struct ceph_mds_session_head *h;
1076         int i = -1;
1077         int extra_bytes = 0;
1078         int metadata_key_count = 0;
1079         struct ceph_options *opt = mdsc->fsc->client->options;
1080         struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1081         void *p, *end;
1082
1083         const char* metadata[][2] = {
1084                 {"hostname", mdsc->nodename},
1085                 {"kernel_version", init_utsname()->release},
1086                 {"entity_id", opt->name ? : ""},
1087                 {"root", fsopt->server_path ? : "/"},
1088                 {NULL, NULL}
1089         };
1090
1091         /* Calculate serialized length of metadata */
1092         extra_bytes = 4;  /* map length */
1093         for (i = 0; metadata[i][0]; ++i) {
1094                 extra_bytes += 8 + strlen(metadata[i][0]) +
1095                         strlen(metadata[i][1]);
1096                 metadata_key_count++;
1097         }
1098         /* supported feature */
1099         extra_bytes += 4 + 8;
1100
1101         /* Allocate the message */
1102         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1103                            GFP_NOFS, false);
1104         if (!msg) {
1105                 pr_err("create_session_msg ENOMEM creating msg\n");
1106                 return NULL;
1107         }
1108         p = msg->front.iov_base;
1109         end = p + msg->front.iov_len;
1110
1111         h = p;
1112         h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1113         h->seq = cpu_to_le64(seq);
1114
1115         /*
1116          * Serialize client metadata into waiting buffer space, using
1117          * the format that userspace expects for map<string, string>
1118          *
1119          * ClientSession messages with metadata are v2
1120          */
1121         msg->hdr.version = cpu_to_le16(3);
1122         msg->hdr.compat_version = cpu_to_le16(1);
1123
1124         /* The write pointer, following the session_head structure */
1125         p += sizeof(*h);
1126
1127         /* Number of entries in the map */
1128         ceph_encode_32(&p, metadata_key_count);
1129
1130         /* Two length-prefixed strings for each entry in the map */
1131         for (i = 0; metadata[i][0]; ++i) {
1132                 size_t const key_len = strlen(metadata[i][0]);
1133                 size_t const val_len = strlen(metadata[i][1]);
1134
1135                 ceph_encode_32(&p, key_len);
1136                 memcpy(p, metadata[i][0], key_len);
1137                 p += key_len;
1138                 ceph_encode_32(&p, val_len);
1139                 memcpy(p, metadata[i][1], val_len);
1140                 p += val_len;
1141         }
1142
1143         encode_supported_features(&p, end);
1144         msg->front.iov_len = p - msg->front.iov_base;
1145         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1146
1147         return msg;
1148 }
1149
1150 /*
1151  * send session open request.
1152  *
1153  * called under mdsc->mutex
1154  */
1155 static int __open_session(struct ceph_mds_client *mdsc,
1156                           struct ceph_mds_session *session)
1157 {
1158         struct ceph_msg *msg;
1159         int mstate;
1160         int mds = session->s_mds;
1161
1162         /* wait for mds to go active? */
1163         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1164         dout("open_session to mds%d (%s)\n", mds,
1165              ceph_mds_state_name(mstate));
1166         session->s_state = CEPH_MDS_SESSION_OPENING;
1167         session->s_renew_requested = jiffies;
1168
1169         /* send connect message */
1170         msg = create_session_open_msg(mdsc, session->s_seq);
1171         if (!msg)
1172                 return -ENOMEM;
1173         ceph_con_send(&session->s_con, msg);
1174         return 0;
1175 }
1176
1177 /*
1178  * open sessions for any export targets for the given mds
1179  *
1180  * called under mdsc->mutex
1181  */
1182 static struct ceph_mds_session *
1183 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
1184 {
1185         struct ceph_mds_session *session;
1186
1187         session = __ceph_lookup_mds_session(mdsc, target);
1188         if (!session) {
1189                 session = register_session(mdsc, target);
1190                 if (IS_ERR(session))
1191                         return session;
1192         }
1193         if (session->s_state == CEPH_MDS_SESSION_NEW ||
1194             session->s_state == CEPH_MDS_SESSION_CLOSING)
1195                 __open_session(mdsc, session);
1196
1197         return session;
1198 }
1199
1200 struct ceph_mds_session *
1201 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1202 {
1203         struct ceph_mds_session *session;
1204
1205         dout("open_export_target_session to mds%d\n", target);
1206
1207         mutex_lock(&mdsc->mutex);
1208         session = __open_export_target_session(mdsc, target);
1209         mutex_unlock(&mdsc->mutex);
1210
1211         return session;
1212 }
1213
1214 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1215                                           struct ceph_mds_session *session)
1216 {
1217         struct ceph_mds_info *mi;
1218         struct ceph_mds_session *ts;
1219         int i, mds = session->s_mds;
1220
1221         if (mds >= mdsc->mdsmap->m_num_mds)
1222                 return;
1223
1224         mi = &mdsc->mdsmap->m_info[mds];
1225         dout("open_export_target_sessions for mds%d (%d targets)\n",
1226              session->s_mds, mi->num_export_targets);
1227
1228         for (i = 0; i < mi->num_export_targets; i++) {
1229                 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1230                 if (!IS_ERR(ts))
1231                         ceph_put_mds_session(ts);
1232         }
1233 }
1234
1235 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1236                                            struct ceph_mds_session *session)
1237 {
1238         mutex_lock(&mdsc->mutex);
1239         __open_export_target_sessions(mdsc, session);
1240         mutex_unlock(&mdsc->mutex);
1241 }
1242
1243 /*
1244  * session caps
1245  */
1246
1247 static void detach_cap_releases(struct ceph_mds_session *session,
1248                                 struct list_head *target)
1249 {
1250         lockdep_assert_held(&session->s_cap_lock);
1251
1252         list_splice_init(&session->s_cap_releases, target);
1253         session->s_num_cap_releases = 0;
1254         dout("dispose_cap_releases mds%d\n", session->s_mds);
1255 }
1256
1257 static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1258                                  struct list_head *dispose)
1259 {
1260         while (!list_empty(dispose)) {
1261                 struct ceph_cap *cap;
1262                 /* zero out the in-progress message */
1263                 cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1264                 list_del(&cap->session_caps);
1265                 ceph_put_cap(mdsc, cap);
1266         }
1267 }
1268
1269 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1270                                      struct ceph_mds_session *session)
1271 {
1272         struct ceph_mds_request *req;
1273         struct rb_node *p;
1274
1275         dout("cleanup_session_requests mds%d\n", session->s_mds);
1276         mutex_lock(&mdsc->mutex);
1277         while (!list_empty(&session->s_unsafe)) {
1278                 req = list_first_entry(&session->s_unsafe,
1279                                        struct ceph_mds_request, r_unsafe_item);
1280                 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1281                                     req->r_tid);
1282                 __unregister_request(mdsc, req);
1283         }
1284         /* zero r_attempts, so kick_requests() will re-send requests */
1285         p = rb_first(&mdsc->request_tree);
1286         while (p) {
1287                 req = rb_entry(p, struct ceph_mds_request, r_node);
1288                 p = rb_next(p);
1289                 if (req->r_session &&
1290                     req->r_session->s_mds == session->s_mds)
1291                         req->r_attempts = 0;
1292         }
1293         mutex_unlock(&mdsc->mutex);
1294 }
1295
1296 /*
1297  * Helper to safely iterate over all caps associated with a session, with
1298  * special care taken to handle a racing __ceph_remove_cap().
1299  *
1300  * Caller must hold session s_mutex.
1301  */
1302 int ceph_iterate_session_caps(struct ceph_mds_session *session,
1303                               int (*cb)(struct inode *, struct ceph_cap *,
1304                                         void *), void *arg)
1305 {
1306         struct list_head *p;
1307         struct ceph_cap *cap;
1308         struct inode *inode, *last_inode = NULL;
1309         struct ceph_cap *old_cap = NULL;
1310         int ret;
1311
1312         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1313         spin_lock(&session->s_cap_lock);
1314         p = session->s_caps.next;
1315         while (p != &session->s_caps) {
1316                 cap = list_entry(p, struct ceph_cap, session_caps);
1317                 inode = igrab(&cap->ci->vfs_inode);
1318                 if (!inode) {
1319                         p = p->next;
1320                         continue;
1321                 }
1322                 session->s_cap_iterator = cap;
1323                 spin_unlock(&session->s_cap_lock);
1324
1325                 if (last_inode) {
1326                         /* avoid calling iput_final() while holding
1327                          * s_mutex or in mds dispatch threads */
1328                         ceph_async_iput(last_inode);
1329                         last_inode = NULL;
1330                 }
1331                 if (old_cap) {
1332                         ceph_put_cap(session->s_mdsc, old_cap);
1333                         old_cap = NULL;
1334                 }
1335
1336                 ret = cb(inode, cap, arg);
1337                 last_inode = inode;
1338
1339                 spin_lock(&session->s_cap_lock);
1340                 p = p->next;
1341                 if (!cap->ci) {
1342                         dout("iterate_session_caps  finishing cap %p removal\n",
1343                              cap);
1344                         BUG_ON(cap->session != session);
1345                         cap->session = NULL;
1346                         list_del_init(&cap->session_caps);
1347                         session->s_nr_caps--;
1348                         if (cap->queue_release)
1349                                 __ceph_queue_cap_release(session, cap);
1350                         else
1351                                 old_cap = cap;  /* put_cap it w/o locks held */
1352                 }
1353                 if (ret < 0)
1354                         goto out;
1355         }
1356         ret = 0;
1357 out:
1358         session->s_cap_iterator = NULL;
1359         spin_unlock(&session->s_cap_lock);
1360
1361         ceph_async_iput(last_inode);
1362         if (old_cap)
1363                 ceph_put_cap(session->s_mdsc, old_cap);
1364
1365         return ret;
1366 }
1367
1368 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1369                                   void *arg)
1370 {
1371         struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1372         struct ceph_inode_info *ci = ceph_inode(inode);
1373         LIST_HEAD(to_remove);
1374         bool drop = false;
1375         bool invalidate = false;
1376
1377         dout("removing cap %p, ci is %p, inode is %p\n",
1378              cap, ci, &ci->vfs_inode);
1379         spin_lock(&ci->i_ceph_lock);
1380         if (cap->mds_wanted | cap->issued)
1381                 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1382         __ceph_remove_cap(cap, false);
1383         if (!ci->i_auth_cap) {
1384                 struct ceph_cap_flush *cf;
1385                 struct ceph_mds_client *mdsc = fsc->mdsc;
1386
1387                 if (ci->i_wrbuffer_ref > 0 &&
1388                     READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
1389                         invalidate = true;
1390
1391                 while (!list_empty(&ci->i_cap_flush_list)) {
1392                         cf = list_first_entry(&ci->i_cap_flush_list,
1393                                               struct ceph_cap_flush, i_list);
1394                         list_move(&cf->i_list, &to_remove);
1395                 }
1396
1397                 spin_lock(&mdsc->cap_dirty_lock);
1398
1399                 list_for_each_entry(cf, &to_remove, i_list)
1400                         list_del(&cf->g_list);
1401
1402                 if (!list_empty(&ci->i_dirty_item)) {
1403                         pr_warn_ratelimited(
1404                                 " dropping dirty %s state for %p %lld\n",
1405                                 ceph_cap_string(ci->i_dirty_caps),
1406                                 inode, ceph_ino(inode));
1407                         ci->i_dirty_caps = 0;
1408                         list_del_init(&ci->i_dirty_item);
1409                         drop = true;
1410                 }
1411                 if (!list_empty(&ci->i_flushing_item)) {
1412                         pr_warn_ratelimited(
1413                                 " dropping dirty+flushing %s state for %p %lld\n",
1414                                 ceph_cap_string(ci->i_flushing_caps),
1415                                 inode, ceph_ino(inode));
1416                         ci->i_flushing_caps = 0;
1417                         list_del_init(&ci->i_flushing_item);
1418                         mdsc->num_cap_flushing--;
1419                         drop = true;
1420                 }
1421                 spin_unlock(&mdsc->cap_dirty_lock);
1422
1423                 if (atomic_read(&ci->i_filelock_ref) > 0) {
1424                         /* make further file lock syscall return -EIO */
1425                         ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1426                         pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1427                                             inode, ceph_ino(inode));
1428                 }
1429
1430                 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1431                         list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1432                         ci->i_prealloc_cap_flush = NULL;
1433                 }
1434
1435                if (drop &&
1436                   ci->i_wrbuffer_ref_head == 0 &&
1437                   ci->i_wr_ref == 0 &&
1438                   ci->i_dirty_caps == 0 &&
1439                   ci->i_flushing_caps == 0) {
1440                       ceph_put_snap_context(ci->i_head_snapc);
1441                       ci->i_head_snapc = NULL;
1442                }
1443         }
1444         spin_unlock(&ci->i_ceph_lock);
1445         while (!list_empty(&to_remove)) {
1446                 struct ceph_cap_flush *cf;
1447                 cf = list_first_entry(&to_remove,
1448                                       struct ceph_cap_flush, i_list);
1449                 list_del(&cf->i_list);
1450                 ceph_free_cap_flush(cf);
1451         }
1452
1453         wake_up_all(&ci->i_cap_wq);
1454         if (invalidate)
1455                 ceph_queue_invalidate(inode);
1456         if (drop)
1457                 iput(inode);
1458         return 0;
1459 }
1460
1461 /*
1462  * caller must hold session s_mutex
1463  */
1464 static void remove_session_caps(struct ceph_mds_session *session)
1465 {
1466         struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1467         struct super_block *sb = fsc->sb;
1468         LIST_HEAD(dispose);
1469
1470         dout("remove_session_caps on %p\n", session);
1471         ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1472
1473         wake_up_all(&fsc->mdsc->cap_flushing_wq);
1474
1475         spin_lock(&session->s_cap_lock);
1476         if (session->s_nr_caps > 0) {
1477                 struct inode *inode;
1478                 struct ceph_cap *cap, *prev = NULL;
1479                 struct ceph_vino vino;
1480                 /*
1481                  * iterate_session_caps() skips inodes that are being
1482                  * deleted, we need to wait until deletions are complete.
1483                  * __wait_on_freeing_inode() is designed for the job,
1484                  * but it is not exported, so use lookup inode function
1485                  * to access it.
1486                  */
1487                 while (!list_empty(&session->s_caps)) {
1488                         cap = list_entry(session->s_caps.next,
1489                                          struct ceph_cap, session_caps);
1490                         if (cap == prev)
1491                                 break;
1492                         prev = cap;
1493                         vino = cap->ci->i_vino;
1494                         spin_unlock(&session->s_cap_lock);
1495
1496                         inode = ceph_find_inode(sb, vino);
1497                          /* avoid calling iput_final() while holding s_mutex */
1498                         ceph_async_iput(inode);
1499
1500                         spin_lock(&session->s_cap_lock);
1501                 }
1502         }
1503
1504         // drop cap expires and unlock s_cap_lock
1505         detach_cap_releases(session, &dispose);
1506
1507         BUG_ON(session->s_nr_caps > 0);
1508         BUG_ON(!list_empty(&session->s_cap_flushing));
1509         spin_unlock(&session->s_cap_lock);
1510         dispose_cap_releases(session->s_mdsc, &dispose);
1511 }
1512
1513 enum {
1514         RECONNECT,
1515         RENEWCAPS,
1516         FORCE_RO,
1517 };
1518
1519 /*
1520  * wake up any threads waiting on this session's caps.  if the cap is
1521  * old (didn't get renewed on the client reconnect), remove it now.
1522  *
1523  * caller must hold s_mutex.
1524  */
1525 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1526                               void *arg)
1527 {
1528         struct ceph_inode_info *ci = ceph_inode(inode);
1529         unsigned long ev = (unsigned long)arg;
1530
1531         if (ev == RECONNECT) {
1532                 spin_lock(&ci->i_ceph_lock);
1533                 ci->i_wanted_max_size = 0;
1534                 ci->i_requested_max_size = 0;
1535                 spin_unlock(&ci->i_ceph_lock);
1536         } else if (ev == RENEWCAPS) {
1537                 if (cap->cap_gen < cap->session->s_cap_gen) {
1538                         /* mds did not re-issue stale cap */
1539                         spin_lock(&ci->i_ceph_lock);
1540                         cap->issued = cap->implemented = CEPH_CAP_PIN;
1541                         /* make sure mds knows what we want */
1542                         if (__ceph_caps_file_wanted(ci) & ~cap->mds_wanted)
1543                                 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1544                         spin_unlock(&ci->i_ceph_lock);
1545                 }
1546         } else if (ev == FORCE_RO) {
1547         }
1548         wake_up_all(&ci->i_cap_wq);
1549         return 0;
1550 }
1551
1552 static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1553 {
1554         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1555         ceph_iterate_session_caps(session, wake_up_session_cb,
1556                                   (void *)(unsigned long)ev);
1557 }
1558
1559 /*
1560  * Send periodic message to MDS renewing all currently held caps.  The
1561  * ack will reset the expiration for all caps from this session.
1562  *
1563  * caller holds s_mutex
1564  */
1565 static int send_renew_caps(struct ceph_mds_client *mdsc,
1566                            struct ceph_mds_session *session)
1567 {
1568         struct ceph_msg *msg;
1569         int state;
1570
1571         if (time_after_eq(jiffies, session->s_cap_ttl) &&
1572             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1573                 pr_info("mds%d caps stale\n", session->s_mds);
1574         session->s_renew_requested = jiffies;
1575
1576         /* do not try to renew caps until a recovering mds has reconnected
1577          * with its clients. */
1578         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1579         if (state < CEPH_MDS_STATE_RECONNECT) {
1580                 dout("send_renew_caps ignoring mds%d (%s)\n",
1581                      session->s_mds, ceph_mds_state_name(state));
1582                 return 0;
1583         }
1584
1585         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1586                 ceph_mds_state_name(state));
1587         msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1588                                  ++session->s_renew_seq);
1589         if (!msg)
1590                 return -ENOMEM;
1591         ceph_con_send(&session->s_con, msg);
1592         return 0;
1593 }
1594
1595 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1596                              struct ceph_mds_session *session, u64 seq)
1597 {
1598         struct ceph_msg *msg;
1599
1600         dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1601              session->s_mds, ceph_session_state_name(session->s_state), seq);
1602         msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1603         if (!msg)
1604                 return -ENOMEM;
1605         ceph_con_send(&session->s_con, msg);
1606         return 0;
1607 }
1608
1609
1610 /*
1611  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1612  *
1613  * Called under session->s_mutex
1614  */
1615 static void renewed_caps(struct ceph_mds_client *mdsc,
1616                          struct ceph_mds_session *session, int is_renew)
1617 {
1618         int was_stale;
1619         int wake = 0;
1620
1621         spin_lock(&session->s_cap_lock);
1622         was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1623
1624         session->s_cap_ttl = session->s_renew_requested +
1625                 mdsc->mdsmap->m_session_timeout*HZ;
1626
1627         if (was_stale) {
1628                 if (time_before(jiffies, session->s_cap_ttl)) {
1629                         pr_info("mds%d caps renewed\n", session->s_mds);
1630                         wake = 1;
1631                 } else {
1632                         pr_info("mds%d caps still stale\n", session->s_mds);
1633                 }
1634         }
1635         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1636              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1637              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1638         spin_unlock(&session->s_cap_lock);
1639
1640         if (wake)
1641                 wake_up_session_caps(session, RENEWCAPS);
1642 }
1643
1644 /*
1645  * send a session close request
1646  */
1647 static int request_close_session(struct ceph_mds_client *mdsc,
1648                                  struct ceph_mds_session *session)
1649 {
1650         struct ceph_msg *msg;
1651
1652         dout("request_close_session mds%d state %s seq %lld\n",
1653              session->s_mds, ceph_session_state_name(session->s_state),
1654              session->s_seq);
1655         msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1656         if (!msg)
1657                 return -ENOMEM;
1658         ceph_con_send(&session->s_con, msg);
1659         return 1;
1660 }
1661
1662 /*
1663  * Called with s_mutex held.
1664  */
1665 static int __close_session(struct ceph_mds_client *mdsc,
1666                          struct ceph_mds_session *session)
1667 {
1668         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1669                 return 0;
1670         session->s_state = CEPH_MDS_SESSION_CLOSING;
1671         return request_close_session(mdsc, session);
1672 }
1673
1674 static bool drop_negative_children(struct dentry *dentry)
1675 {
1676         struct dentry *child;
1677         bool all_negative = true;
1678
1679         if (!d_is_dir(dentry))
1680                 goto out;
1681
1682         spin_lock(&dentry->d_lock);
1683         list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1684                 if (d_really_is_positive(child)) {
1685                         all_negative = false;
1686                         break;
1687                 }
1688         }
1689         spin_unlock(&dentry->d_lock);
1690
1691         if (all_negative)
1692                 shrink_dcache_parent(dentry);
1693 out:
1694         return all_negative;
1695 }
1696
1697 /*
1698  * Trim old(er) caps.
1699  *
1700  * Because we can't cache an inode without one or more caps, we do
1701  * this indirectly: if a cap is unused, we prune its aliases, at which
1702  * point the inode will hopefully get dropped to.
1703  *
1704  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1705  * memory pressure from the MDS, though, so it needn't be perfect.
1706  */
1707 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1708 {
1709         struct ceph_mds_session *session = arg;
1710         struct ceph_inode_info *ci = ceph_inode(inode);
1711         int used, wanted, oissued, mine;
1712
1713         if (session->s_trim_caps <= 0)
1714                 return -1;
1715
1716         spin_lock(&ci->i_ceph_lock);
1717         mine = cap->issued | cap->implemented;
1718         used = __ceph_caps_used(ci);
1719         wanted = __ceph_caps_file_wanted(ci);
1720         oissued = __ceph_caps_issued_other(ci, cap);
1721
1722         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1723              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1724              ceph_cap_string(used), ceph_cap_string(wanted));
1725         if (cap == ci->i_auth_cap) {
1726                 if (ci->i_dirty_caps || ci->i_flushing_caps ||
1727                     !list_empty(&ci->i_cap_snaps))
1728                         goto out;
1729                 if ((used | wanted) & CEPH_CAP_ANY_WR)
1730                         goto out;
1731                 /* Note: it's possible that i_filelock_ref becomes non-zero
1732                  * after dropping auth caps. It doesn't hurt because reply
1733                  * of lock mds request will re-add auth caps. */
1734                 if (atomic_read(&ci->i_filelock_ref) > 0)
1735                         goto out;
1736         }
1737         /* The inode has cached pages, but it's no longer used.
1738          * we can safely drop it */
1739         if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1740             !(oissued & CEPH_CAP_FILE_CACHE)) {
1741           used = 0;
1742           oissued = 0;
1743         }
1744         if ((used | wanted) & ~oissued & mine)
1745                 goto out;   /* we need these caps */
1746
1747         if (oissued) {
1748                 /* we aren't the only cap.. just remove us */
1749                 __ceph_remove_cap(cap, true);
1750                 session->s_trim_caps--;
1751         } else {
1752                 struct dentry *dentry;
1753                 /* try dropping referring dentries */
1754                 spin_unlock(&ci->i_ceph_lock);
1755                 dentry = d_find_any_alias(inode);
1756                 if (dentry && drop_negative_children(dentry)) {
1757                         int count;
1758                         dput(dentry);
1759                         d_prune_aliases(inode);
1760                         count = atomic_read(&inode->i_count);
1761                         if (count == 1)
1762                                 session->s_trim_caps--;
1763                         dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1764                              inode, cap, count);
1765                 } else {
1766                         dput(dentry);
1767                 }
1768                 return 0;
1769         }
1770
1771 out:
1772         spin_unlock(&ci->i_ceph_lock);
1773         return 0;
1774 }
1775
1776 /*
1777  * Trim session cap count down to some max number.
1778  */
1779 int ceph_trim_caps(struct ceph_mds_client *mdsc,
1780                    struct ceph_mds_session *session,
1781                    int max_caps)
1782 {
1783         int trim_caps = session->s_nr_caps - max_caps;
1784
1785         dout("trim_caps mds%d start: %d / %d, trim %d\n",
1786              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1787         if (trim_caps > 0) {
1788                 session->s_trim_caps = trim_caps;
1789                 ceph_iterate_session_caps(session, trim_caps_cb, session);
1790                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1791                      session->s_mds, session->s_nr_caps, max_caps,
1792                         trim_caps - session->s_trim_caps);
1793                 session->s_trim_caps = 0;
1794         }
1795
1796         ceph_flush_cap_releases(mdsc, session);
1797         return 0;
1798 }
1799
1800 static int check_caps_flush(struct ceph_mds_client *mdsc,
1801                             u64 want_flush_tid)
1802 {
1803         int ret = 1;
1804
1805         spin_lock(&mdsc->cap_dirty_lock);
1806         if (!list_empty(&mdsc->cap_flush_list)) {
1807                 struct ceph_cap_flush *cf =
1808                         list_first_entry(&mdsc->cap_flush_list,
1809                                          struct ceph_cap_flush, g_list);
1810                 if (cf->tid <= want_flush_tid) {
1811                         dout("check_caps_flush still flushing tid "
1812                              "%llu <= %llu\n", cf->tid, want_flush_tid);
1813                         ret = 0;
1814                 }
1815         }
1816         spin_unlock(&mdsc->cap_dirty_lock);
1817         return ret;
1818 }
1819
1820 /*
1821  * flush all dirty inode data to disk.
1822  *
1823  * returns true if we've flushed through want_flush_tid
1824  */
1825 static void wait_caps_flush(struct ceph_mds_client *mdsc,
1826                             u64 want_flush_tid)
1827 {
1828         dout("check_caps_flush want %llu\n", want_flush_tid);
1829
1830         wait_event(mdsc->cap_flushing_wq,
1831                    check_caps_flush(mdsc, want_flush_tid));
1832
1833         dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1834 }
1835
1836 /*
1837  * called under s_mutex
1838  */
1839 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1840                                    struct ceph_mds_session *session)
1841 {
1842         struct ceph_msg *msg = NULL;
1843         struct ceph_mds_cap_release *head;
1844         struct ceph_mds_cap_item *item;
1845         struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
1846         struct ceph_cap *cap;
1847         LIST_HEAD(tmp_list);
1848         int num_cap_releases;
1849         __le32  barrier, *cap_barrier;
1850
1851         down_read(&osdc->lock);
1852         barrier = cpu_to_le32(osdc->epoch_barrier);
1853         up_read(&osdc->lock);
1854
1855         spin_lock(&session->s_cap_lock);
1856 again:
1857         list_splice_init(&session->s_cap_releases, &tmp_list);
1858         num_cap_releases = session->s_num_cap_releases;
1859         session->s_num_cap_releases = 0;
1860         spin_unlock(&session->s_cap_lock);
1861
1862         while (!list_empty(&tmp_list)) {
1863                 if (!msg) {
1864                         msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
1865                                         PAGE_SIZE, GFP_NOFS, false);
1866                         if (!msg)
1867                                 goto out_err;
1868                         head = msg->front.iov_base;
1869                         head->num = cpu_to_le32(0);
1870                         msg->front.iov_len = sizeof(*head);
1871
1872                         msg->hdr.version = cpu_to_le16(2);
1873                         msg->hdr.compat_version = cpu_to_le16(1);
1874                 }
1875
1876                 cap = list_first_entry(&tmp_list, struct ceph_cap,
1877                                         session_caps);
1878                 list_del(&cap->session_caps);
1879                 num_cap_releases--;
1880
1881                 head = msg->front.iov_base;
1882                 put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
1883                                    &head->num);
1884                 item = msg->front.iov_base + msg->front.iov_len;
1885                 item->ino = cpu_to_le64(cap->cap_ino);
1886                 item->cap_id = cpu_to_le64(cap->cap_id);
1887                 item->migrate_seq = cpu_to_le32(cap->mseq);
1888                 item->seq = cpu_to_le32(cap->issue_seq);
1889                 msg->front.iov_len += sizeof(*item);
1890
1891                 ceph_put_cap(mdsc, cap);
1892
1893                 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1894                         // Append cap_barrier field
1895                         cap_barrier = msg->front.iov_base + msg->front.iov_len;
1896                         *cap_barrier = barrier;
1897                         msg->front.iov_len += sizeof(*cap_barrier);
1898
1899                         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1900                         dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1901                         ceph_con_send(&session->s_con, msg);
1902                         msg = NULL;
1903                 }
1904         }
1905
1906         BUG_ON(num_cap_releases != 0);
1907
1908         spin_lock(&session->s_cap_lock);
1909         if (!list_empty(&session->s_cap_releases))
1910                 goto again;
1911         spin_unlock(&session->s_cap_lock);
1912
1913         if (msg) {
1914                 // Append cap_barrier field
1915                 cap_barrier = msg->front.iov_base + msg->front.iov_len;
1916                 *cap_barrier = barrier;
1917                 msg->front.iov_len += sizeof(*cap_barrier);
1918
1919                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1920                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1921                 ceph_con_send(&session->s_con, msg);
1922         }
1923         return;
1924 out_err:
1925         pr_err("send_cap_releases mds%d, failed to allocate message\n",
1926                 session->s_mds);
1927         spin_lock(&session->s_cap_lock);
1928         list_splice(&tmp_list, &session->s_cap_releases);
1929         session->s_num_cap_releases += num_cap_releases;
1930         spin_unlock(&session->s_cap_lock);
1931 }
1932
1933 static void ceph_cap_release_work(struct work_struct *work)
1934 {
1935         struct ceph_mds_session *session =
1936                 container_of(work, struct ceph_mds_session, s_cap_release_work);
1937
1938         mutex_lock(&session->s_mutex);
1939         if (session->s_state == CEPH_MDS_SESSION_OPEN ||
1940             session->s_state == CEPH_MDS_SESSION_HUNG)
1941                 ceph_send_cap_releases(session->s_mdsc, session);
1942         mutex_unlock(&session->s_mutex);
1943         ceph_put_mds_session(session);
1944 }
1945
1946 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
1947                              struct ceph_mds_session *session)
1948 {
1949         if (mdsc->stopping)
1950                 return;
1951
1952         get_session(session);
1953         if (queue_work(mdsc->fsc->cap_wq,
1954                        &session->s_cap_release_work)) {
1955                 dout("cap release work queued\n");
1956         } else {
1957                 ceph_put_mds_session(session);
1958                 dout("failed to queue cap release work\n");
1959         }
1960 }
1961
1962 /*
1963  * caller holds session->s_cap_lock
1964  */
1965 void __ceph_queue_cap_release(struct ceph_mds_session *session,
1966                               struct ceph_cap *cap)
1967 {
1968         list_add_tail(&cap->session_caps, &session->s_cap_releases);
1969         session->s_num_cap_releases++;
1970
1971         if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
1972                 ceph_flush_cap_releases(session->s_mdsc, session);
1973 }
1974
1975 static void ceph_cap_reclaim_work(struct work_struct *work)
1976 {
1977         struct ceph_mds_client *mdsc =
1978                 container_of(work, struct ceph_mds_client, cap_reclaim_work);
1979         int ret = ceph_trim_dentries(mdsc);
1980         if (ret == -EAGAIN)
1981                 ceph_queue_cap_reclaim_work(mdsc);
1982 }
1983
1984 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
1985 {
1986         if (mdsc->stopping)
1987                 return;
1988
1989         if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
1990                 dout("caps reclaim work queued\n");
1991         } else {
1992                 dout("failed to queue caps release work\n");
1993         }
1994 }
1995
1996 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
1997 {
1998         int val;
1999         if (!nr)
2000                 return;
2001         val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2002         if (!(val % CEPH_CAPS_PER_RELEASE)) {
2003                 atomic_set(&mdsc->cap_reclaim_pending, 0);
2004                 ceph_queue_cap_reclaim_work(mdsc);
2005         }
2006 }
2007
2008 /*
2009  * requests
2010  */
2011
2012 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2013                                     struct inode *dir)
2014 {
2015         struct ceph_inode_info *ci = ceph_inode(dir);
2016         struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2017         struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2018         size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2019         int order, num_entries;
2020
2021         spin_lock(&ci->i_ceph_lock);
2022         num_entries = ci->i_files + ci->i_subdirs;
2023         spin_unlock(&ci->i_ceph_lock);
2024         num_entries = max(num_entries, 1);
2025         num_entries = min(num_entries, opt->max_readdir);
2026
2027         order = get_order(size * num_entries);
2028         while (order >= 0) {
2029                 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2030                                                              __GFP_NOWARN,
2031                                                              order);
2032                 if (rinfo->dir_entries)
2033                         break;
2034                 order--;
2035         }
2036         if (!rinfo->dir_entries)
2037                 return -ENOMEM;
2038
2039         num_entries = (PAGE_SIZE << order) / size;
2040         num_entries = min(num_entries, opt->max_readdir);
2041
2042         rinfo->dir_buf_size = PAGE_SIZE << order;
2043         req->r_num_caps = num_entries + 1;
2044         req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2045         req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2046         return 0;
2047 }
2048
2049 /*
2050  * Create an mds request.
2051  */
2052 struct ceph_mds_request *
2053 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2054 {
2055         struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
2056         struct timespec64 ts;
2057
2058         if (!req)
2059                 return ERR_PTR(-ENOMEM);
2060
2061         mutex_init(&req->r_fill_mutex);
2062         req->r_mdsc = mdsc;
2063         req->r_started = jiffies;
2064         req->r_resend_mds = -1;
2065         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2066         INIT_LIST_HEAD(&req->r_unsafe_target_item);
2067         req->r_fmode = -1;
2068         kref_init(&req->r_kref);
2069         RB_CLEAR_NODE(&req->r_node);
2070         INIT_LIST_HEAD(&req->r_wait);
2071         init_completion(&req->r_completion);
2072         init_completion(&req->r_safe_completion);
2073         INIT_LIST_HEAD(&req->r_unsafe_item);
2074
2075         ktime_get_coarse_real_ts64(&ts);
2076         req->r_stamp = timespec64_trunc(ts, mdsc->fsc->sb->s_time_gran);
2077
2078         req->r_op = op;
2079         req->r_direct_mode = mode;
2080         return req;
2081 }
2082
2083 /*
2084  * return oldest (lowest) request, tid in request tree, 0 if none.
2085  *
2086  * called under mdsc->mutex.
2087  */
2088 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2089 {
2090         if (RB_EMPTY_ROOT(&mdsc->request_tree))
2091                 return NULL;
2092         return rb_entry(rb_first(&mdsc->request_tree),
2093                         struct ceph_mds_request, r_node);
2094 }
2095
2096 static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2097 {
2098         return mdsc->oldest_tid;
2099 }
2100
2101 /*
2102  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
2103  * on build_path_from_dentry in fs/cifs/dir.c.
2104  *
2105  * If @stop_on_nosnap, generate path relative to the first non-snapped
2106  * inode.
2107  *
2108  * Encode hidden .snap dirs as a double /, i.e.
2109  *   foo/.snap/bar -> foo//bar
2110  */
2111 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2112                            int stop_on_nosnap)
2113 {
2114         struct dentry *temp;
2115         char *path;
2116         int pos;
2117         unsigned seq;
2118         u64 base;
2119
2120         if (!dentry)
2121                 return ERR_PTR(-EINVAL);
2122
2123         path = __getname();
2124         if (!path)
2125                 return ERR_PTR(-ENOMEM);
2126 retry:
2127         pos = PATH_MAX - 1;
2128         path[pos] = '\0';
2129
2130         seq = read_seqbegin(&rename_lock);
2131         rcu_read_lock();
2132         temp = dentry;
2133         for (;;) {
2134                 struct inode *inode;
2135
2136                 spin_lock(&temp->d_lock);
2137                 inode = d_inode(temp);
2138                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2139                         dout("build_path path+%d: %p SNAPDIR\n",
2140                              pos, temp);
2141                 } else if (stop_on_nosnap && inode && dentry != temp &&
2142                            ceph_snap(inode) == CEPH_NOSNAP) {
2143                         spin_unlock(&temp->d_lock);
2144                         pos++; /* get rid of any prepended '/' */
2145                         break;
2146                 } else {
2147                         pos -= temp->d_name.len;
2148                         if (pos < 0) {
2149                                 spin_unlock(&temp->d_lock);
2150                                 break;
2151                         }
2152                         memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2153                 }
2154                 spin_unlock(&temp->d_lock);
2155                 temp = READ_ONCE(temp->d_parent);
2156
2157                 /* Are we at the root? */
2158                 if (IS_ROOT(temp))
2159                         break;
2160
2161                 /* Are we out of buffer? */
2162                 if (--pos < 0)
2163                         break;
2164
2165                 path[pos] = '/';
2166         }
2167         base = ceph_ino(d_inode(temp));
2168         rcu_read_unlock();
2169         if (pos < 0 || read_seqretry(&rename_lock, seq)) {
2170                 pr_err("build_path did not end path lookup where "
2171                        "expected, pos is %d\n", pos);
2172                 /* presumably this is only possible if racing with a
2173                    rename of one of the parent directories (we can not
2174                    lock the dentries above us to prevent this, but
2175                    retrying should be harmless) */
2176                 goto retry;
2177         }
2178
2179         *pbase = base;
2180         *plen = PATH_MAX - 1 - pos;
2181         dout("build_path on %p %d built %llx '%.*s'\n",
2182              dentry, d_count(dentry), base, *plen, path + pos);
2183         return path + pos;
2184 }
2185
2186 static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2187                              const char **ppath, int *ppathlen, u64 *pino,
2188                              bool *pfreepath, bool parent_locked)
2189 {
2190         char *path;
2191
2192         rcu_read_lock();
2193         if (!dir)
2194                 dir = d_inode_rcu(dentry->d_parent);
2195         if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2196                 *pino = ceph_ino(dir);
2197                 rcu_read_unlock();
2198                 *ppath = dentry->d_name.name;
2199                 *ppathlen = dentry->d_name.len;
2200                 return 0;
2201         }
2202         rcu_read_unlock();
2203         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2204         if (IS_ERR(path))
2205                 return PTR_ERR(path);
2206         *ppath = path;
2207         *pfreepath = true;
2208         return 0;
2209 }
2210
2211 static int build_inode_path(struct inode *inode,
2212                             const char **ppath, int *ppathlen, u64 *pino,
2213                             bool *pfreepath)
2214 {
2215         struct dentry *dentry;
2216         char *path;
2217
2218         if (ceph_snap(inode) == CEPH_NOSNAP) {
2219                 *pino = ceph_ino(inode);
2220                 *ppathlen = 0;
2221                 return 0;
2222         }
2223         dentry = d_find_alias(inode);
2224         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2225         dput(dentry);
2226         if (IS_ERR(path))
2227                 return PTR_ERR(path);
2228         *ppath = path;
2229         *pfreepath = true;
2230         return 0;
2231 }
2232
2233 /*
2234  * request arguments may be specified via an inode *, a dentry *, or
2235  * an explicit ino+path.
2236  */
2237 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2238                                   struct inode *rdiri, const char *rpath,
2239                                   u64 rino, const char **ppath, int *pathlen,
2240                                   u64 *ino, bool *freepath, bool parent_locked)
2241 {
2242         int r = 0;
2243
2244         if (rinode) {
2245                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2246                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2247                      ceph_snap(rinode));
2248         } else if (rdentry) {
2249                 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2250                                         freepath, parent_locked);
2251                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2252                      *ppath);
2253         } else if (rpath || rino) {
2254                 *ino = rino;
2255                 *ppath = rpath;
2256                 *pathlen = rpath ? strlen(rpath) : 0;
2257                 dout(" path %.*s\n", *pathlen, rpath);
2258         }
2259
2260         return r;
2261 }
2262
2263 /*
2264  * called under mdsc->mutex
2265  */
2266 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
2267                                                struct ceph_mds_request *req,
2268                                                int mds, bool drop_cap_releases)
2269 {
2270         struct ceph_msg *msg;
2271         struct ceph_mds_request_head *head;
2272         const char *path1 = NULL;
2273         const char *path2 = NULL;
2274         u64 ino1 = 0, ino2 = 0;
2275         int pathlen1 = 0, pathlen2 = 0;
2276         bool freepath1 = false, freepath2 = false;
2277         int len;
2278         u16 releases;
2279         void *p, *end;
2280         int ret;
2281
2282         ret = set_request_path_attr(req->r_inode, req->r_dentry,
2283                               req->r_parent, req->r_path1, req->r_ino1.ino,
2284                               &path1, &pathlen1, &ino1, &freepath1,
2285                               test_bit(CEPH_MDS_R_PARENT_LOCKED,
2286                                         &req->r_req_flags));
2287         if (ret < 0) {
2288                 msg = ERR_PTR(ret);
2289                 goto out;
2290         }
2291
2292         /* If r_old_dentry is set, then assume that its parent is locked */
2293         ret = set_request_path_attr(NULL, req->r_old_dentry,
2294                               req->r_old_dentry_dir,
2295                               req->r_path2, req->r_ino2.ino,
2296                               &path2, &pathlen2, &ino2, &freepath2, true);
2297         if (ret < 0) {
2298                 msg = ERR_PTR(ret);
2299                 goto out_free1;
2300         }
2301
2302         len = sizeof(*head) +
2303                 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2304                 sizeof(struct ceph_timespec);
2305
2306         /* calculate (max) length for cap releases */
2307         len += sizeof(struct ceph_mds_request_release) *
2308                 (!!req->r_inode_drop + !!req->r_dentry_drop +
2309                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2310         if (req->r_dentry_drop)
2311                 len += pathlen1;
2312         if (req->r_old_dentry_drop)
2313                 len += pathlen2;
2314
2315         msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2316         if (!msg) {
2317                 msg = ERR_PTR(-ENOMEM);
2318                 goto out_free2;
2319         }
2320
2321         msg->hdr.version = cpu_to_le16(2);
2322         msg->hdr.tid = cpu_to_le64(req->r_tid);
2323
2324         head = msg->front.iov_base;
2325         p = msg->front.iov_base + sizeof(*head);
2326         end = msg->front.iov_base + msg->front.iov_len;
2327
2328         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2329         head->op = cpu_to_le32(req->r_op);
2330         head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
2331         head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
2332         head->args = req->r_args;
2333
2334         ceph_encode_filepath(&p, end, ino1, path1);
2335         ceph_encode_filepath(&p, end, ino2, path2);
2336
2337         /* make note of release offset, in case we need to replay */
2338         req->r_request_release_offset = p - msg->front.iov_base;
2339
2340         /* cap releases */
2341         releases = 0;
2342         if (req->r_inode_drop)
2343                 releases += ceph_encode_inode_release(&p,
2344                       req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2345                       mds, req->r_inode_drop, req->r_inode_unless, 0);
2346         if (req->r_dentry_drop)
2347                 releases += ceph_encode_dentry_release(&p, req->r_dentry,
2348                                 req->r_parent, mds, req->r_dentry_drop,
2349                                 req->r_dentry_unless);
2350         if (req->r_old_dentry_drop)
2351                 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2352                                 req->r_old_dentry_dir, mds,
2353                                 req->r_old_dentry_drop,
2354                                 req->r_old_dentry_unless);
2355         if (req->r_old_inode_drop)
2356                 releases += ceph_encode_inode_release(&p,
2357                       d_inode(req->r_old_dentry),
2358                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2359
2360         if (drop_cap_releases) {
2361                 releases = 0;
2362                 p = msg->front.iov_base + req->r_request_release_offset;
2363         }
2364
2365         head->num_releases = cpu_to_le16(releases);
2366
2367         /* time stamp */
2368         {
2369                 struct ceph_timespec ts;
2370                 ceph_encode_timespec64(&ts, &req->r_stamp);
2371                 ceph_encode_copy(&p, &ts, sizeof(ts));
2372         }
2373
2374         BUG_ON(p > end);
2375         msg->front.iov_len = p - msg->front.iov_base;
2376         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2377
2378         if (req->r_pagelist) {
2379                 struct ceph_pagelist *pagelist = req->r_pagelist;
2380                 ceph_msg_data_add_pagelist(msg, pagelist);
2381                 msg->hdr.data_len = cpu_to_le32(pagelist->length);
2382         } else {
2383                 msg->hdr.data_len = 0;
2384         }
2385
2386         msg->hdr.data_off = cpu_to_le16(0);
2387
2388 out_free2:
2389         if (freepath2)
2390                 ceph_mdsc_free_path((char *)path2, pathlen2);
2391 out_free1:
2392         if (freepath1)
2393                 ceph_mdsc_free_path((char *)path1, pathlen1);
2394 out:
2395         return msg;
2396 }
2397
2398 /*
2399  * called under mdsc->mutex if error, under no mutex if
2400  * success.
2401  */
2402 static void complete_request(struct ceph_mds_client *mdsc,
2403                              struct ceph_mds_request *req)
2404 {
2405         if (req->r_callback)
2406                 req->r_callback(mdsc, req);
2407         complete_all(&req->r_completion);
2408 }
2409
2410 /*
2411  * called under mdsc->mutex
2412  */
2413 static int __prepare_send_request(struct ceph_mds_client *mdsc,
2414                                   struct ceph_mds_request *req,
2415                                   int mds, bool drop_cap_releases)
2416 {
2417         struct ceph_mds_request_head *rhead;
2418         struct ceph_msg *msg;
2419         int flags = 0;
2420
2421         req->r_attempts++;
2422         if (req->r_inode) {
2423                 struct ceph_cap *cap =
2424                         ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2425
2426                 if (cap)
2427                         req->r_sent_on_mseq = cap->mseq;
2428                 else
2429                         req->r_sent_on_mseq = -1;
2430         }
2431         dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2432              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2433
2434         if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2435                 void *p;
2436                 /*
2437                  * Replay.  Do not regenerate message (and rebuild
2438                  * paths, etc.); just use the original message.
2439                  * Rebuilding paths will break for renames because
2440                  * d_move mangles the src name.
2441                  */
2442                 msg = req->r_request;
2443                 rhead = msg->front.iov_base;
2444
2445                 flags = le32_to_cpu(rhead->flags);
2446                 flags |= CEPH_MDS_FLAG_REPLAY;
2447                 rhead->flags = cpu_to_le32(flags);
2448
2449                 if (req->r_target_inode)
2450                         rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2451
2452                 rhead->num_retry = req->r_attempts - 1;
2453
2454                 /* remove cap/dentry releases from message */
2455                 rhead->num_releases = 0;
2456
2457                 /* time stamp */
2458                 p = msg->front.iov_base + req->r_request_release_offset;
2459                 {
2460                         struct ceph_timespec ts;
2461                         ceph_encode_timespec64(&ts, &req->r_stamp);
2462                         ceph_encode_copy(&p, &ts, sizeof(ts));
2463                 }
2464
2465                 msg->front.iov_len = p - msg->front.iov_base;
2466                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2467                 return 0;
2468         }
2469
2470         if (req->r_request) {
2471                 ceph_msg_put(req->r_request);
2472                 req->r_request = NULL;
2473         }
2474         msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2475         if (IS_ERR(msg)) {
2476                 req->r_err = PTR_ERR(msg);
2477                 return PTR_ERR(msg);
2478         }
2479         req->r_request = msg;
2480
2481         rhead = msg->front.iov_base;
2482         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2483         if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2484                 flags |= CEPH_MDS_FLAG_REPLAY;
2485         if (req->r_parent)
2486                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2487         rhead->flags = cpu_to_le32(flags);
2488         rhead->num_fwd = req->r_num_fwd;
2489         rhead->num_retry = req->r_attempts - 1;
2490         rhead->ino = 0;
2491
2492         dout(" r_parent = %p\n", req->r_parent);
2493         return 0;
2494 }
2495
2496 /*
2497  * send request, or put it on the appropriate wait list.
2498  */
2499 static void __do_request(struct ceph_mds_client *mdsc,
2500                         struct ceph_mds_request *req)
2501 {
2502         struct ceph_mds_session *session = NULL;
2503         int mds = -1;
2504         int err = 0;
2505
2506         if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2507                 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2508                         __unregister_request(mdsc, req);
2509                 return;
2510         }
2511
2512         if (req->r_timeout &&
2513             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2514                 dout("do_request timed out\n");
2515                 err = -EIO;
2516                 goto finish;
2517         }
2518         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2519                 dout("do_request forced umount\n");
2520                 err = -EIO;
2521                 goto finish;
2522         }
2523         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2524                 if (mdsc->mdsmap_err) {
2525                         err = mdsc->mdsmap_err;
2526                         dout("do_request mdsmap err %d\n", err);
2527                         goto finish;
2528                 }
2529                 if (mdsc->mdsmap->m_epoch == 0) {
2530                         dout("do_request no mdsmap, waiting for map\n");
2531                         list_add(&req->r_wait, &mdsc->waiting_for_map);
2532                         return;
2533                 }
2534                 if (!(mdsc->fsc->mount_options->flags &
2535                       CEPH_MOUNT_OPT_MOUNTWAIT) &&
2536                     !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2537                         err = -ENOENT;
2538                         pr_info("probably no mds server is up\n");
2539                         goto finish;
2540                 }
2541         }
2542
2543         put_request_session(req);
2544
2545         mds = __choose_mds(mdsc, req);
2546         if (mds < 0 ||
2547             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2548                 dout("do_request no mds or not active, waiting for map\n");
2549                 list_add(&req->r_wait, &mdsc->waiting_for_map);
2550                 return;
2551         }
2552
2553         /* get, open session */
2554         session = __ceph_lookup_mds_session(mdsc, mds);
2555         if (!session) {
2556                 session = register_session(mdsc, mds);
2557                 if (IS_ERR(session)) {
2558                         err = PTR_ERR(session);
2559                         goto finish;
2560                 }
2561         }
2562         req->r_session = get_session(session);
2563
2564         dout("do_request mds%d session %p state %s\n", mds, session,
2565              ceph_session_state_name(session->s_state));
2566         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2567             session->s_state != CEPH_MDS_SESSION_HUNG) {
2568                 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2569                         err = -EACCES;
2570                         goto out_session;
2571                 }
2572                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
2573                     session->s_state == CEPH_MDS_SESSION_CLOSING)
2574                         __open_session(mdsc, session);
2575                 list_add(&req->r_wait, &session->s_waiting);
2576                 goto out_session;
2577         }
2578
2579         /* send request */
2580         req->r_resend_mds = -1;   /* forget any previous mds hint */
2581
2582         if (req->r_request_started == 0)   /* note request start time */
2583                 req->r_request_started = jiffies;
2584
2585         err = __prepare_send_request(mdsc, req, mds, false);
2586         if (!err) {
2587                 ceph_msg_get(req->r_request);
2588                 ceph_con_send(&session->s_con, req->r_request);
2589         }
2590
2591 out_session:
2592         ceph_put_mds_session(session);
2593 finish:
2594         if (err) {
2595                 dout("__do_request early error %d\n", err);
2596                 req->r_err = err;
2597                 complete_request(mdsc, req);
2598                 __unregister_request(mdsc, req);
2599         }
2600         return;
2601 }
2602
2603 /*
2604  * called under mdsc->mutex
2605  */
2606 static void __wake_requests(struct ceph_mds_client *mdsc,
2607                             struct list_head *head)
2608 {
2609         struct ceph_mds_request *req;
2610         LIST_HEAD(tmp_list);
2611
2612         list_splice_init(head, &tmp_list);
2613
2614         while (!list_empty(&tmp_list)) {
2615                 req = list_entry(tmp_list.next,
2616                                  struct ceph_mds_request, r_wait);
2617                 list_del_init(&req->r_wait);
2618                 dout(" wake request %p tid %llu\n", req, req->r_tid);
2619                 __do_request(mdsc, req);
2620         }
2621 }
2622
2623 /*
2624  * Wake up threads with requests pending for @mds, so that they can
2625  * resubmit their requests to a possibly different mds.
2626  */
2627 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2628 {
2629         struct ceph_mds_request *req;
2630         struct rb_node *p = rb_first(&mdsc->request_tree);
2631
2632         dout("kick_requests mds%d\n", mds);
2633         while (p) {
2634                 req = rb_entry(p, struct ceph_mds_request, r_node);
2635                 p = rb_next(p);
2636                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2637                         continue;
2638                 if (req->r_attempts > 0)
2639                         continue; /* only new requests */
2640                 if (req->r_session &&
2641                     req->r_session->s_mds == mds) {
2642                         dout(" kicking tid %llu\n", req->r_tid);
2643                         list_del_init(&req->r_wait);
2644                         __do_request(mdsc, req);
2645                 }
2646         }
2647 }
2648
2649 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2650                               struct ceph_mds_request *req)
2651 {
2652         int err;
2653
2654         /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2655         if (req->r_inode)
2656                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2657         if (req->r_parent)
2658                 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
2659         if (req->r_old_dentry_dir)
2660                 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2661                                   CEPH_CAP_PIN);
2662
2663         dout("submit_request on %p for inode %p\n", req, dir);
2664         mutex_lock(&mdsc->mutex);
2665         __register_request(mdsc, req, dir);
2666         __do_request(mdsc, req);
2667         err = req->r_err;
2668         mutex_unlock(&mdsc->mutex);
2669         return err;
2670 }
2671
2672 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
2673                                   struct ceph_mds_request *req)
2674 {
2675         int err;
2676
2677         /* wait */
2678         dout("do_request waiting\n");
2679         if (!req->r_timeout && req->r_wait_for_completion) {
2680                 err = req->r_wait_for_completion(mdsc, req);
2681         } else {
2682                 long timeleft = wait_for_completion_killable_timeout(
2683                                         &req->r_completion,
2684                                         ceph_timeout_jiffies(req->r_timeout));
2685                 if (timeleft > 0)
2686                         err = 0;
2687                 else if (!timeleft)
2688                         err = -EIO;  /* timed out */
2689                 else
2690                         err = timeleft;  /* killed */
2691         }
2692         dout("do_request waited, got %d\n", err);
2693         mutex_lock(&mdsc->mutex);
2694
2695         /* only abort if we didn't race with a real reply */
2696         if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2697                 err = le32_to_cpu(req->r_reply_info.head->result);
2698         } else if (err < 0) {
2699                 dout("aborted request %lld with %d\n", req->r_tid, err);
2700
2701                 /*
2702                  * ensure we aren't running concurrently with
2703                  * ceph_fill_trace or ceph_readdir_prepopulate, which
2704                  * rely on locks (dir mutex) held by our caller.
2705                  */
2706                 mutex_lock(&req->r_fill_mutex);
2707                 req->r_err = err;
2708                 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
2709                 mutex_unlock(&req->r_fill_mutex);
2710
2711                 if (req->r_parent &&
2712                     (req->r_op & CEPH_MDS_OP_WRITE))
2713                         ceph_invalidate_dir_request(req);
2714         } else {
2715                 err = req->r_err;
2716         }
2717
2718         mutex_unlock(&mdsc->mutex);
2719         return err;
2720 }
2721
2722 /*
2723  * Synchrously perform an mds request.  Take care of all of the
2724  * session setup, forwarding, retry details.
2725  */
2726 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2727                          struct inode *dir,
2728                          struct ceph_mds_request *req)
2729 {
2730         int err;
2731
2732         dout("do_request on %p\n", req);
2733
2734         /* issue */
2735         err = ceph_mdsc_submit_request(mdsc, dir, req);
2736         if (!err)
2737                 err = ceph_mdsc_wait_request(mdsc, req);
2738         dout("do_request %p done, result %d\n", req, err);
2739         return err;
2740 }
2741
2742 /*
2743  * Invalidate dir's completeness, dentry lease state on an aborted MDS
2744  * namespace request.
2745  */
2746 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2747 {
2748         struct inode *dir = req->r_parent;
2749         struct inode *old_dir = req->r_old_dentry_dir;
2750
2751         dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
2752
2753         ceph_dir_clear_complete(dir);
2754         if (old_dir)
2755                 ceph_dir_clear_complete(old_dir);
2756         if (req->r_dentry)
2757                 ceph_invalidate_dentry_lease(req->r_dentry);
2758         if (req->r_old_dentry)
2759                 ceph_invalidate_dentry_lease(req->r_old_dentry);
2760 }
2761
2762 /*
2763  * Handle mds reply.
2764  *
2765  * We take the session mutex and parse and process the reply immediately.
2766  * This preserves the logical ordering of replies, capabilities, etc., sent
2767  * by the MDS as they are applied to our local cache.
2768  */
2769 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2770 {
2771         struct ceph_mds_client *mdsc = session->s_mdsc;
2772         struct ceph_mds_request *req;
2773         struct ceph_mds_reply_head *head = msg->front.iov_base;
2774         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2775         struct ceph_snap_realm *realm;
2776         u64 tid;
2777         int err, result;
2778         int mds = session->s_mds;
2779
2780         if (msg->front.iov_len < sizeof(*head)) {
2781                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2782                 ceph_msg_dump(msg);
2783                 return;
2784         }
2785
2786         /* get request, session */
2787         tid = le64_to_cpu(msg->hdr.tid);
2788         mutex_lock(&mdsc->mutex);
2789         req = lookup_get_request(mdsc, tid);
2790         if (!req) {
2791                 dout("handle_reply on unknown tid %llu\n", tid);
2792                 mutex_unlock(&mdsc->mutex);
2793                 return;
2794         }
2795         dout("handle_reply %p\n", req);
2796
2797         /* correct session? */
2798         if (req->r_session != session) {
2799                 pr_err("mdsc_handle_reply got %llu on session mds%d"
2800                        " not mds%d\n", tid, session->s_mds,
2801                        req->r_session ? req->r_session->s_mds : -1);
2802                 mutex_unlock(&mdsc->mutex);
2803                 goto out;
2804         }
2805
2806         /* dup? */
2807         if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
2808             (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
2809                 pr_warn("got a dup %s reply on %llu from mds%d\n",
2810                            head->safe ? "safe" : "unsafe", tid, mds);
2811                 mutex_unlock(&mdsc->mutex);
2812                 goto out;
2813         }
2814         if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
2815                 pr_warn("got unsafe after safe on %llu from mds%d\n",
2816                            tid, mds);
2817                 mutex_unlock(&mdsc->mutex);
2818                 goto out;
2819         }
2820
2821         result = le32_to_cpu(head->result);
2822
2823         /*
2824          * Handle an ESTALE
2825          * if we're not talking to the authority, send to them
2826          * if the authority has changed while we weren't looking,
2827          * send to new authority
2828          * Otherwise we just have to return an ESTALE
2829          */
2830         if (result == -ESTALE) {
2831                 dout("got ESTALE on request %llu\n", req->r_tid);
2832                 req->r_resend_mds = -1;
2833                 if (req->r_direct_mode != USE_AUTH_MDS) {
2834                         dout("not using auth, setting for that now\n");
2835                         req->r_direct_mode = USE_AUTH_MDS;
2836                         __do_request(mdsc, req);
2837                         mutex_unlock(&mdsc->mutex);
2838                         goto out;
2839                 } else  {
2840                         int mds = __choose_mds(mdsc, req);
2841                         if (mds >= 0 && mds != req->r_session->s_mds) {
2842                                 dout("but auth changed, so resending\n");
2843                                 __do_request(mdsc, req);
2844                                 mutex_unlock(&mdsc->mutex);
2845                                 goto out;
2846                         }
2847                 }
2848                 dout("have to return ESTALE on request %llu\n", req->r_tid);
2849         }
2850
2851
2852         if (head->safe) {
2853                 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
2854                 __unregister_request(mdsc, req);
2855
2856                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2857                         /*
2858                          * We already handled the unsafe response, now do the
2859                          * cleanup.  No need to examine the response; the MDS
2860                          * doesn't include any result info in the safe
2861                          * response.  And even if it did, there is nothing
2862                          * useful we could do with a revised return value.
2863                          */
2864                         dout("got safe reply %llu, mds%d\n", tid, mds);
2865
2866                         /* last unsafe request during umount? */
2867                         if (mdsc->stopping && !__get_oldest_req(mdsc))
2868                                 complete_all(&mdsc->safe_umount_waiters);
2869                         mutex_unlock(&mdsc->mutex);
2870                         goto out;
2871                 }
2872         } else {
2873                 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
2874                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2875                 if (req->r_unsafe_dir) {
2876                         struct ceph_inode_info *ci =
2877                                         ceph_inode(req->r_unsafe_dir);
2878                         spin_lock(&ci->i_unsafe_lock);
2879                         list_add_tail(&req->r_unsafe_dir_item,
2880                                       &ci->i_unsafe_dirops);
2881                         spin_unlock(&ci->i_unsafe_lock);
2882                 }
2883         }
2884
2885         dout("handle_reply tid %lld result %d\n", tid, result);
2886         rinfo = &req->r_reply_info;
2887         if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
2888                 err = parse_reply_info(msg, rinfo, (u64)-1);
2889         else
2890                 err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2891         mutex_unlock(&mdsc->mutex);
2892
2893         mutex_lock(&session->s_mutex);
2894         if (err < 0) {
2895                 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2896                 ceph_msg_dump(msg);
2897                 goto out_err;
2898         }
2899
2900         /* snap trace */
2901         realm = NULL;
2902         if (rinfo->snapblob_len) {
2903                 down_write(&mdsc->snap_rwsem);
2904                 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2905                                 rinfo->snapblob + rinfo->snapblob_len,
2906                                 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2907                                 &realm);
2908                 downgrade_write(&mdsc->snap_rwsem);
2909         } else {
2910                 down_read(&mdsc->snap_rwsem);
2911         }
2912
2913         /* insert trace into our cache */
2914         mutex_lock(&req->r_fill_mutex);
2915         current->journal_info = req;
2916         err = ceph_fill_trace(mdsc->fsc->sb, req);
2917         if (err == 0) {
2918                 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2919                                     req->r_op == CEPH_MDS_OP_LSSNAP))
2920                         ceph_readdir_prepopulate(req, req->r_session);
2921         }
2922         current->journal_info = NULL;
2923         mutex_unlock(&req->r_fill_mutex);
2924
2925         up_read(&mdsc->snap_rwsem);
2926         if (realm)
2927                 ceph_put_snap_realm(mdsc, realm);
2928
2929         if (err == 0) {
2930                 if (req->r_target_inode &&
2931                     test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2932                         struct ceph_inode_info *ci =
2933                                 ceph_inode(req->r_target_inode);
2934                         spin_lock(&ci->i_unsafe_lock);
2935                         list_add_tail(&req->r_unsafe_target_item,
2936                                       &ci->i_unsafe_iops);
2937                         spin_unlock(&ci->i_unsafe_lock);
2938                 }
2939
2940                 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2941         }
2942 out_err:
2943         mutex_lock(&mdsc->mutex);
2944         if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2945                 if (err) {
2946                         req->r_err = err;
2947                 } else {
2948                         req->r_reply =  ceph_msg_get(msg);
2949                         set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
2950                 }
2951         } else {
2952                 dout("reply arrived after request %lld was aborted\n", tid);
2953         }
2954         mutex_unlock(&mdsc->mutex);
2955
2956         mutex_unlock(&session->s_mutex);
2957
2958         /* kick calling process */
2959         complete_request(mdsc, req);
2960 out:
2961         ceph_mdsc_put_request(req);
2962         return;
2963 }
2964
2965
2966
2967 /*
2968  * handle mds notification that our request has been forwarded.
2969  */
2970 static void handle_forward(struct ceph_mds_client *mdsc,
2971                            struct ceph_mds_session *session,
2972                            struct ceph_msg *msg)
2973 {
2974         struct ceph_mds_request *req;
2975         u64 tid = le64_to_cpu(msg->hdr.tid);
2976         u32 next_mds;
2977         u32 fwd_seq;
2978         int err = -EINVAL;
2979         void *p = msg->front.iov_base;
2980         void *end = p + msg->front.iov_len;
2981
2982         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2983         next_mds = ceph_decode_32(&p);
2984         fwd_seq = ceph_decode_32(&p);
2985
2986         mutex_lock(&mdsc->mutex);
2987         req = lookup_get_request(mdsc, tid);
2988         if (!req) {
2989                 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2990                 goto out;  /* dup reply? */
2991         }
2992
2993         if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2994                 dout("forward tid %llu aborted, unregistering\n", tid);
2995                 __unregister_request(mdsc, req);
2996         } else if (fwd_seq <= req->r_num_fwd) {
2997                 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2998                      tid, next_mds, req->r_num_fwd, fwd_seq);
2999         } else {
3000                 /* resend. forward race not possible; mds would drop */
3001                 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3002                 BUG_ON(req->r_err);
3003                 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3004                 req->r_attempts = 0;
3005                 req->r_num_fwd = fwd_seq;
3006                 req->r_resend_mds = next_mds;
3007                 put_request_session(req);
3008                 __do_request(mdsc, req);
3009         }
3010         ceph_mdsc_put_request(req);
3011 out:
3012         mutex_unlock(&mdsc->mutex);
3013         return;
3014
3015 bad:
3016         pr_err("mdsc_handle_forward decode error err=%d\n", err);
3017 }
3018
3019 static int __decode_and_drop_session_metadata(void **p, void *end)
3020 {
3021         /* map<string,string> */
3022         u32 n;
3023         ceph_decode_32_safe(p, end, n, bad);
3024         while (n-- > 0) {
3025                 u32 len;
3026                 ceph_decode_32_safe(p, end, len, bad);
3027                 ceph_decode_need(p, end, len, bad);
3028                 *p += len;
3029                 ceph_decode_32_safe(p, end, len, bad);
3030                 ceph_decode_need(p, end, len, bad);
3031                 *p += len;
3032         }
3033         return 0;
3034 bad:
3035         return -1;
3036 }
3037
3038 /*
3039  * handle a mds session control message
3040  */
3041 static void handle_session(struct ceph_mds_session *session,
3042                            struct ceph_msg *msg)
3043 {
3044         struct ceph_mds_client *mdsc = session->s_mdsc;
3045         int mds = session->s_mds;
3046         int msg_version = le16_to_cpu(msg->hdr.version);
3047         void *p = msg->front.iov_base;
3048         void *end = p + msg->front.iov_len;
3049         struct ceph_mds_session_head *h;
3050         u32 op;
3051         u64 seq;
3052         unsigned long features = 0;
3053         int wake = 0;
3054
3055         /* decode */
3056         ceph_decode_need(&p, end, sizeof(*h), bad);
3057         h = p;
3058         p += sizeof(*h);
3059
3060         op = le32_to_cpu(h->op);
3061         seq = le64_to_cpu(h->seq);
3062
3063         if (msg_version >= 3) {
3064                 u32 len;
3065                 /* version >= 2, metadata */
3066                 if (__decode_and_drop_session_metadata(&p, end) < 0)
3067                         goto bad;
3068                 /* version >= 3, feature bits */
3069                 ceph_decode_32_safe(&p, end, len, bad);
3070                 ceph_decode_need(&p, end, len, bad);
3071                 memcpy(&features, p, min_t(size_t, len, sizeof(features)));
3072                 p += len;
3073         }
3074
3075         mutex_lock(&mdsc->mutex);
3076         if (op == CEPH_SESSION_CLOSE) {
3077                 get_session(session);
3078                 __unregister_session(mdsc, session);
3079         }
3080         /* FIXME: this ttl calculation is generous */
3081         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3082         mutex_unlock(&mdsc->mutex);
3083
3084         mutex_lock(&session->s_mutex);
3085
3086         dout("handle_session mds%d %s %p state %s seq %llu\n",
3087              mds, ceph_session_op_name(op), session,
3088              ceph_session_state_name(session->s_state), seq);
3089
3090         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3091                 session->s_state = CEPH_MDS_SESSION_OPEN;
3092                 pr_info("mds%d came back\n", session->s_mds);
3093         }
3094
3095         switch (op) {
3096         case CEPH_SESSION_OPEN:
3097                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3098                         pr_info("mds%d reconnect success\n", session->s_mds);
3099                 session->s_state = CEPH_MDS_SESSION_OPEN;
3100                 session->s_features = features;
3101                 renewed_caps(mdsc, session, 0);
3102                 wake = 1;
3103                 if (mdsc->stopping)
3104                         __close_session(mdsc, session);
3105                 break;
3106
3107         case CEPH_SESSION_RENEWCAPS:
3108                 if (session->s_renew_seq == seq)
3109                         renewed_caps(mdsc, session, 1);
3110                 break;
3111
3112         case CEPH_SESSION_CLOSE:
3113                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3114                         pr_info("mds%d reconnect denied\n", session->s_mds);
3115                 cleanup_session_requests(mdsc, session);
3116                 remove_session_caps(session);
3117                 wake = 2; /* for good measure */
3118                 wake_up_all(&mdsc->session_close_wq);
3119                 break;
3120
3121         case CEPH_SESSION_STALE:
3122                 pr_info("mds%d caps went stale, renewing\n",
3123                         session->s_mds);
3124                 spin_lock(&session->s_gen_ttl_lock);
3125                 session->s_cap_gen++;
3126                 session->s_cap_ttl = jiffies - 1;
3127                 spin_unlock(&session->s_gen_ttl_lock);
3128                 send_renew_caps(mdsc, session);
3129                 break;
3130
3131         case CEPH_SESSION_RECALL_STATE:
3132                 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3133                 break;
3134
3135         case CEPH_SESSION_FLUSHMSG:
3136                 send_flushmsg_ack(mdsc, session, seq);
3137                 break;
3138
3139         case CEPH_SESSION_FORCE_RO:
3140                 dout("force_session_readonly %p\n", session);
3141                 spin_lock(&session->s_cap_lock);
3142                 session->s_readonly = true;
3143                 spin_unlock(&session->s_cap_lock);
3144                 wake_up_session_caps(session, FORCE_RO);
3145                 break;
3146
3147         case CEPH_SESSION_REJECT:
3148                 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3149                 pr_info("mds%d rejected session\n", session->s_mds);
3150                 session->s_state = CEPH_MDS_SESSION_REJECTED;
3151                 cleanup_session_requests(mdsc, session);
3152                 remove_session_caps(session);
3153                 wake = 2; /* for good measure */
3154                 break;
3155
3156         default:
3157                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3158                 WARN_ON(1);
3159         }
3160
3161         mutex_unlock(&session->s_mutex);
3162         if (wake) {
3163                 mutex_lock(&mdsc->mutex);
3164                 __wake_requests(mdsc, &session->s_waiting);
3165                 if (wake == 2)
3166                         kick_requests(mdsc, mds);
3167                 mutex_unlock(&mdsc->mutex);
3168         }
3169         if (op == CEPH_SESSION_CLOSE)
3170                 ceph_put_mds_session(session);
3171         return;
3172
3173 bad:
3174         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3175                (int)msg->front.iov_len);
3176         ceph_msg_dump(msg);
3177         return;
3178 }
3179
3180
3181 /*
3182  * called under session->mutex.
3183  */
3184 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3185                                    struct ceph_mds_session *session)
3186 {
3187         struct ceph_mds_request *req, *nreq;
3188         struct rb_node *p;
3189         int err;
3190
3191         dout("replay_unsafe_requests mds%d\n", session->s_mds);
3192
3193         mutex_lock(&mdsc->mutex);
3194         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
3195                 err = __prepare_send_request(mdsc, req, session->s_mds, true);
3196                 if (!err) {
3197                         ceph_msg_get(req->r_request);
3198                         ceph_con_send(&session->s_con, req->r_request);
3199                 }
3200         }
3201
3202         /*
3203          * also re-send old requests when MDS enters reconnect stage. So that MDS
3204          * can process completed request in clientreplay stage.
3205          */
3206         p = rb_first(&mdsc->request_tree);
3207         while (p) {
3208                 req = rb_entry(p, struct ceph_mds_request, r_node);
3209                 p = rb_next(p);
3210                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3211                         continue;
3212                 if (req->r_attempts == 0)
3213                         continue; /* only old requests */
3214                 if (req->r_session &&
3215                     req->r_session->s_mds == session->s_mds) {
3216                         err = __prepare_send_request(mdsc, req,
3217                                                      session->s_mds, true);
3218                         if (!err) {
3219                                 ceph_msg_get(req->r_request);
3220                                 ceph_con_send(&session->s_con, req->r_request);
3221                         }
3222                 }
3223         }
3224         mutex_unlock(&mdsc->mutex);
3225 }
3226
3227 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3228 {
3229         struct ceph_msg *reply;
3230         struct ceph_pagelist *_pagelist;
3231         struct page *page;
3232         __le32 *addr;
3233         int err = -ENOMEM;
3234
3235         if (!recon_state->allow_multi)
3236                 return -ENOSPC;
3237
3238         /* can't handle message that contains both caps and realm */
3239         BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3240
3241         /* pre-allocate new pagelist */
3242         _pagelist = ceph_pagelist_alloc(GFP_NOFS);
3243         if (!_pagelist)
3244                 return -ENOMEM;
3245
3246         reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3247         if (!reply)
3248                 goto fail_msg;
3249
3250         /* placeholder for nr_caps */
3251         err = ceph_pagelist_encode_32(_pagelist, 0);
3252         if (err < 0)
3253                 goto fail;
3254
3255         if (recon_state->nr_caps) {
3256                 /* currently encoding caps */
3257                 err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3258                 if (err)
3259                         goto fail;
3260         } else {
3261                 /* placeholder for nr_realms (currently encoding relams) */
3262                 err = ceph_pagelist_encode_32(_pagelist, 0);
3263                 if (err < 0)
3264                         goto fail;
3265         }
3266
3267         err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3268         if (err)
3269                 goto fail;
3270
3271         page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3272         addr = kmap_atomic(page);
3273         if (recon_state->nr_caps) {
3274                 /* currently encoding caps */
3275                 *addr = cpu_to_le32(recon_state->nr_caps);
3276         } else {
3277                 /* currently encoding relams */
3278                 *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3279         }
3280         kunmap_atomic(addr);
3281
3282         reply->hdr.version = cpu_to_le16(5);
3283         reply->hdr.compat_version = cpu_to_le16(4);
3284
3285         reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3286         ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3287
3288         ceph_con_send(&recon_state->session->s_con, reply);
3289         ceph_pagelist_release(recon_state->pagelist);
3290
3291         recon_state->pagelist = _pagelist;
3292         recon_state->nr_caps = 0;
3293         recon_state->nr_realms = 0;
3294         recon_state->msg_version = 5;
3295         return 0;
3296 fail:
3297         ceph_msg_put(reply);
3298 fail_msg:
3299         ceph_pagelist_release(_pagelist);
3300         return err;
3301 }
3302
3303 /*
3304  * Encode information about a cap for a reconnect with the MDS.
3305  */
3306 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
3307                           void *arg)
3308 {
3309         union {
3310                 struct ceph_mds_cap_reconnect v2;
3311                 struct ceph_mds_cap_reconnect_v1 v1;
3312         } rec;
3313         struct ceph_inode_info *ci = cap->ci;
3314         struct ceph_reconnect_state *recon_state = arg;
3315         struct ceph_pagelist *pagelist = recon_state->pagelist;
3316         int err;
3317         u64 snap_follows;
3318
3319         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3320              inode, ceph_vinop(inode), cap, cap->cap_id,
3321              ceph_cap_string(cap->issued));
3322
3323         spin_lock(&ci->i_ceph_lock);
3324         cap->seq = 0;        /* reset cap seq */
3325         cap->issue_seq = 0;  /* and issue_seq */
3326         cap->mseq = 0;       /* and migrate_seq */
3327         cap->cap_gen = cap->session->s_cap_gen;
3328
3329         if (recon_state->msg_version >= 2) {
3330                 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3331                 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3332                 rec.v2.issued = cpu_to_le32(cap->issued);
3333                 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3334                 rec.v2.pathbase = 0;
3335                 rec.v2.flock_len = (__force __le32)
3336                         ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3337         } else {
3338                 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3339                 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3340                 rec.v1.issued = cpu_to_le32(cap->issued);
3341                 rec.v1.size = cpu_to_le64(inode->i_size);
3342                 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3343                 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3344                 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3345                 rec.v1.pathbase = 0;
3346         }
3347
3348         if (list_empty(&ci->i_cap_snaps)) {
3349                 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3350         } else {
3351                 struct ceph_cap_snap *capsnap =
3352                         list_first_entry(&ci->i_cap_snaps,
3353                                          struct ceph_cap_snap, ci_item);
3354                 snap_follows = capsnap->follows;
3355         }
3356         spin_unlock(&ci->i_ceph_lock);
3357
3358         if (recon_state->msg_version >= 2) {
3359                 int num_fcntl_locks, num_flock_locks;
3360                 struct ceph_filelock *flocks = NULL;
3361                 size_t struct_len, total_len = sizeof(u64);
3362                 u8 struct_v = 0;
3363
3364 encode_again:
3365                 if (rec.v2.flock_len) {
3366                         ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3367                 } else {
3368                         num_fcntl_locks = 0;
3369                         num_flock_locks = 0;
3370                 }
3371                 if (num_fcntl_locks + num_flock_locks > 0) {
3372                         flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3373                                                sizeof(struct ceph_filelock),
3374                                                GFP_NOFS);
3375                         if (!flocks) {
3376                                 err = -ENOMEM;
3377                                 goto out_err;
3378                         }
3379                         err = ceph_encode_locks_to_buffer(inode, flocks,
3380                                                           num_fcntl_locks,
3381                                                           num_flock_locks);
3382                         if (err) {
3383                                 kfree(flocks);
3384                                 flocks = NULL;
3385                                 if (err == -ENOSPC)
3386                                         goto encode_again;
3387                                 goto out_err;
3388                         }
3389                 } else {
3390                         kfree(flocks);
3391                         flocks = NULL;
3392                 }
3393
3394                 if (recon_state->msg_version >= 3) {
3395                         /* version, compat_version and struct_len */
3396                         total_len += 2 * sizeof(u8) + sizeof(u32);
3397                         struct_v = 2;
3398                 }
3399                 /*
3400                  * number of encoded locks is stable, so copy to pagelist
3401                  */
3402                 struct_len = 2 * sizeof(u32) +
3403                             (num_fcntl_locks + num_flock_locks) *
3404                             sizeof(struct ceph_filelock);
3405                 rec.v2.flock_len = cpu_to_le32(struct_len);
3406
3407                 struct_len += sizeof(u32) + sizeof(rec.v2);
3408
3409                 if (struct_v >= 2)
3410                         struct_len += sizeof(u64); /* snap_follows */
3411
3412                 total_len += struct_len;
3413
3414                 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3415                         err = send_reconnect_partial(recon_state);
3416                         if (err)
3417                                 goto out_freeflocks;
3418                         pagelist = recon_state->pagelist;
3419                 }
3420
3421                 err = ceph_pagelist_reserve(pagelist, total_len);
3422                 if (err)
3423                         goto out_freeflocks;
3424
3425                 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3426                 if (recon_state->msg_version >= 3) {
3427                         ceph_pagelist_encode_8(pagelist, struct_v);
3428                         ceph_pagelist_encode_8(pagelist, 1);
3429                         ceph_pagelist_encode_32(pagelist, struct_len);
3430                 }
3431                 ceph_pagelist_encode_string(pagelist, NULL, 0);
3432                 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3433                 ceph_locks_to_pagelist(flocks, pagelist,
3434                                        num_fcntl_locks, num_flock_locks);
3435                 if (struct_v >= 2)
3436                         ceph_pagelist_encode_64(pagelist, snap_follows);
3437 out_freeflocks:
3438                 kfree(flocks);
3439         } else {
3440                 u64 pathbase = 0;
3441                 int pathlen = 0;
3442                 char *path = NULL;
3443                 struct dentry *dentry;
3444
3445                 dentry = d_find_alias(inode);
3446                 if (dentry) {
3447                         path = ceph_mdsc_build_path(dentry,
3448                                                 &pathlen, &pathbase, 0);
3449                         dput(dentry);
3450                         if (IS_ERR(path)) {
3451                                 err = PTR_ERR(path);
3452                                 goto out_err;
3453                         }
3454                         rec.v1.pathbase = cpu_to_le64(pathbase);
3455                 }
3456
3457                 err = ceph_pagelist_reserve(pagelist,
3458                                             sizeof(u64) + sizeof(u32) +
3459                                             pathlen + sizeof(rec.v1));
3460                 if (err) {
3461                         goto out_freepath;
3462                 }
3463
3464                 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3465                 ceph_pagelist_encode_string(pagelist, path, pathlen);
3466                 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3467 out_freepath:
3468                 ceph_mdsc_free_path(path, pathlen);
3469         }
3470
3471 out_err:
3472         if (err >= 0)
3473                 recon_state->nr_caps++;
3474         return err;
3475 }
3476
3477 static int encode_snap_realms(struct ceph_mds_client *mdsc,
3478                               struct ceph_reconnect_state *recon_state)
3479 {
3480         struct rb_node *p;
3481         struct ceph_pagelist *pagelist = recon_state->pagelist;
3482         int err = 0;
3483
3484         if (recon_state->msg_version >= 4) {
3485                 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3486                 if (err < 0)
3487                         goto fail;
3488         }
3489
3490         /*
3491          * snaprealms.  we provide mds with the ino, seq (version), and
3492          * parent for all of our realms.  If the mds has any newer info,
3493          * it will tell us.
3494          */
3495         for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3496                 struct ceph_snap_realm *realm =
3497                        rb_entry(p, struct ceph_snap_realm, node);
3498                 struct ceph_mds_snaprealm_reconnect sr_rec;
3499
3500                 if (recon_state->msg_version >= 4) {
3501                         size_t need = sizeof(u8) * 2 + sizeof(u32) +
3502                                       sizeof(sr_rec);
3503
3504                         if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3505                                 err = send_reconnect_partial(recon_state);
3506                                 if (err)
3507                                         goto fail;
3508                                 pagelist = recon_state->pagelist;
3509                         }
3510
3511                         err = ceph_pagelist_reserve(pagelist, need);
3512                         if (err)
3513                                 goto fail;
3514
3515                         ceph_pagelist_encode_8(pagelist, 1);
3516                         ceph_pagelist_encode_8(pagelist, 1);
3517                         ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3518                 }
3519
3520                 dout(" adding snap realm %llx seq %lld parent %llx\n",
3521                      realm->ino, realm->seq, realm->parent_ino);
3522                 sr_rec.ino = cpu_to_le64(realm->ino);
3523                 sr_rec.seq = cpu_to_le64(realm->seq);
3524                 sr_rec.parent = cpu_to_le64(realm->parent_ino);
3525
3526                 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3527                 if (err)
3528                         goto fail;
3529
3530                 recon_state->nr_realms++;
3531         }
3532 fail:
3533         return err;
3534 }
3535
3536
3537 /*
3538  * If an MDS fails and recovers, clients need to reconnect in order to
3539  * reestablish shared state.  This includes all caps issued through
3540  * this session _and_ the snap_realm hierarchy.  Because it's not
3541  * clear which snap realms the mds cares about, we send everything we
3542  * know about.. that ensures we'll then get any new info the
3543  * recovering MDS might have.
3544  *
3545  * This is a relatively heavyweight operation, but it's rare.
3546  *
3547  * called with mdsc->mutex held.
3548  */
3549 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3550                                struct ceph_mds_session *session)
3551 {
3552         struct ceph_msg *reply;
3553         int mds = session->s_mds;
3554         int err = -ENOMEM;
3555         struct ceph_reconnect_state recon_state = {
3556                 .session = session,
3557         };
3558         LIST_HEAD(dispose);
3559
3560         pr_info("mds%d reconnect start\n", mds);
3561
3562         recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3563         if (!recon_state.pagelist)
3564                 goto fail_nopagelist;
3565
3566         reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3567         if (!reply)
3568                 goto fail_nomsg;
3569
3570         mutex_lock(&session->s_mutex);
3571         session->s_state = CEPH_MDS_SESSION_RECONNECTING;
3572         session->s_seq = 0;
3573
3574         dout("session %p state %s\n", session,
3575              ceph_session_state_name(session->s_state));
3576
3577         spin_lock(&session->s_gen_ttl_lock);
3578         session->s_cap_gen++;
3579         spin_unlock(&session->s_gen_ttl_lock);
3580
3581         spin_lock(&session->s_cap_lock);
3582         /* don't know if session is readonly */
3583         session->s_readonly = 0;
3584         /*
3585          * notify __ceph_remove_cap() that we are composing cap reconnect.
3586          * If a cap get released before being added to the cap reconnect,
3587          * __ceph_remove_cap() should skip queuing cap release.
3588          */
3589         session->s_cap_reconnect = 1;
3590         /* drop old cap expires; we're about to reestablish that state */
3591         detach_cap_releases(session, &dispose);
3592         spin_unlock(&session->s_cap_lock);
3593         dispose_cap_releases(mdsc, &dispose);
3594
3595         /* trim unused caps to reduce MDS's cache rejoin time */
3596         if (mdsc->fsc->sb->s_root)
3597                 shrink_dcache_parent(mdsc->fsc->sb->s_root);
3598
3599         ceph_con_close(&session->s_con);
3600         ceph_con_open(&session->s_con,
3601                       CEPH_ENTITY_TYPE_MDS, mds,
3602                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
3603
3604         /* replay unsafe requests */
3605         replay_unsafe_requests(mdsc, session);
3606
3607         ceph_early_kick_flushing_caps(mdsc, session);
3608
3609         down_read(&mdsc->snap_rwsem);
3610
3611         /* placeholder for nr_caps */
3612         err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
3613         if (err)
3614                 goto fail;
3615
3616         if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
3617                 recon_state.msg_version = 3;
3618                 recon_state.allow_multi = true;
3619         } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
3620                 recon_state.msg_version = 3;
3621         } else {
3622                 recon_state.msg_version = 2;
3623         }
3624         /* trsaverse this session's caps */
3625         err = ceph_iterate_session_caps(session, encode_caps_cb, &recon_state);
3626
3627         spin_lock(&session->s_cap_lock);
3628         session->s_cap_reconnect = 0;
3629         spin_unlock(&session->s_cap_lock);
3630
3631         if (err < 0)
3632                 goto fail;
3633
3634         /* check if all realms can be encoded into current message */
3635         if (mdsc->num_snap_realms) {
3636                 size_t total_len =
3637                         recon_state.pagelist->length +
3638                         mdsc->num_snap_realms *
3639                         sizeof(struct ceph_mds_snaprealm_reconnect);
3640                 if (recon_state.msg_version >= 4) {
3641                         /* number of realms */
3642                         total_len += sizeof(u32);
3643                         /* version, compat_version and struct_len */
3644                         total_len += mdsc->num_snap_realms *
3645                                      (2 * sizeof(u8) + sizeof(u32));
3646                 }
3647                 if (total_len > RECONNECT_MAX_SIZE) {
3648                         if (!recon_state.allow_multi) {
3649                                 err = -ENOSPC;
3650                                 goto fail;
3651                         }
3652                         if (recon_state.nr_caps) {
3653                                 err = send_reconnect_partial(&recon_state);
3654                                 if (err)
3655                                         goto fail;
3656                         }
3657                         recon_state.msg_version = 5;
3658                 }
3659         }
3660
3661         err = encode_snap_realms(mdsc, &recon_state);
3662         if (err < 0)
3663                 goto fail;
3664
3665         if (recon_state.msg_version >= 5) {
3666                 err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
3667                 if (err < 0)
3668                         goto fail;
3669         }
3670
3671         if (recon_state.nr_caps || recon_state.nr_realms) {
3672                 struct page *page =
3673                         list_first_entry(&recon_state.pagelist->head,
3674                                         struct page, lru);
3675                 __le32 *addr = kmap_atomic(page);
3676                 if (recon_state.nr_caps) {
3677                         WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
3678                         *addr = cpu_to_le32(recon_state.nr_caps);
3679                 } else if (recon_state.msg_version >= 4) {
3680                         *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
3681                 }
3682                 kunmap_atomic(addr);
3683         }
3684
3685         reply->hdr.version = cpu_to_le16(recon_state.msg_version);
3686         if (recon_state.msg_version >= 4)
3687                 reply->hdr.compat_version = cpu_to_le16(4);
3688
3689         reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
3690         ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
3691
3692         ceph_con_send(&session->s_con, reply);
3693
3694         mutex_unlock(&session->s_mutex);
3695
3696         mutex_lock(&mdsc->mutex);
3697         __wake_requests(mdsc, &session->s_waiting);
3698         mutex_unlock(&mdsc->mutex);
3699
3700         up_read(&mdsc->snap_rwsem);
3701         ceph_pagelist_release(recon_state.pagelist);
3702         return;
3703
3704 fail:
3705         ceph_msg_put(reply);
3706         up_read(&mdsc->snap_rwsem);
3707         mutex_unlock(&session->s_mutex);
3708 fail_nomsg:
3709         ceph_pagelist_release(recon_state.pagelist);
3710 fail_nopagelist:
3711         pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3712         return;
3713 }
3714
3715
3716 /*
3717  * compare old and new mdsmaps, kicking requests
3718  * and closing out old connections as necessary
3719  *
3720  * called under mdsc->mutex.
3721  */
3722 static void check_new_map(struct ceph_mds_client *mdsc,
3723                           struct ceph_mdsmap *newmap,
3724                           struct ceph_mdsmap *oldmap)
3725 {
3726         int i;
3727         int oldstate, newstate;
3728         struct ceph_mds_session *s;
3729
3730         dout("check_new_map new %u old %u\n",
3731              newmap->m_epoch, oldmap->m_epoch);
3732
3733         for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
3734                 if (!mdsc->sessions[i])
3735                         continue;
3736                 s = mdsc->sessions[i];
3737                 oldstate = ceph_mdsmap_get_state(oldmap, i);
3738                 newstate = ceph_mdsmap_get_state(newmap, i);
3739
3740                 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3741                      i, ceph_mds_state_name(oldstate),
3742                      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3743                      ceph_mds_state_name(newstate),
3744                      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3745                      ceph_session_state_name(s->s_state));
3746
3747                 if (i >= newmap->m_num_mds) {
3748                         /* force close session for stopped mds */
3749                         get_session(s);
3750                         __unregister_session(mdsc, s);
3751                         __wake_requests(mdsc, &s->s_waiting);
3752                         mutex_unlock(&mdsc->mutex);
3753
3754                         mutex_lock(&s->s_mutex);
3755                         cleanup_session_requests(mdsc, s);
3756                         remove_session_caps(s);
3757                         mutex_unlock(&s->s_mutex);
3758
3759                         ceph_put_mds_session(s);
3760
3761                         mutex_lock(&mdsc->mutex);
3762                         kick_requests(mdsc, i);
3763                         continue;
3764                 }
3765
3766                 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
3767                            ceph_mdsmap_get_addr(newmap, i),
3768                            sizeof(struct ceph_entity_addr))) {
3769                         /* just close it */
3770                         mutex_unlock(&mdsc->mutex);
3771                         mutex_lock(&s->s_mutex);
3772                         mutex_lock(&mdsc->mutex);
3773                         ceph_con_close(&s->s_con);
3774                         mutex_unlock(&s->s_mutex);
3775                         s->s_state = CEPH_MDS_SESSION_RESTARTING;
3776                 } else if (oldstate == newstate) {
3777                         continue;  /* nothing new with this mds */
3778                 }
3779
3780                 /*
3781                  * send reconnect?
3782                  */
3783                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
3784                     newstate >= CEPH_MDS_STATE_RECONNECT) {
3785                         mutex_unlock(&mdsc->mutex);
3786                         send_mds_reconnect(mdsc, s);
3787                         mutex_lock(&mdsc->mutex);
3788                 }
3789
3790                 /*
3791                  * kick request on any mds that has gone active.
3792                  */
3793                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
3794                     newstate >= CEPH_MDS_STATE_ACTIVE) {
3795                         if (oldstate != CEPH_MDS_STATE_CREATING &&
3796                             oldstate != CEPH_MDS_STATE_STARTING)
3797                                 pr_info("mds%d recovery completed\n", s->s_mds);
3798                         kick_requests(mdsc, i);
3799                         ceph_kick_flushing_caps(mdsc, s);
3800                         wake_up_session_caps(s, RECONNECT);
3801                 }
3802         }
3803
3804         for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
3805                 s = mdsc->sessions[i];
3806                 if (!s)
3807                         continue;
3808                 if (!ceph_mdsmap_is_laggy(newmap, i))
3809                         continue;
3810                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3811                     s->s_state == CEPH_MDS_SESSION_HUNG ||
3812                     s->s_state == CEPH_MDS_SESSION_CLOSING) {
3813                         dout(" connecting to export targets of laggy mds%d\n",
3814                              i);
3815                         __open_export_target_sessions(mdsc, s);
3816                 }
3817         }
3818 }
3819
3820
3821
3822 /*
3823  * leases
3824  */
3825
3826 /*
3827  * caller must hold session s_mutex, dentry->d_lock
3828  */
3829 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
3830 {
3831         struct ceph_dentry_info *di = ceph_dentry(dentry);
3832
3833         ceph_put_mds_session(di->lease_session);
3834         di->lease_session = NULL;
3835 }
3836
3837 static void handle_lease(struct ceph_mds_client *mdsc,
3838                          struct ceph_mds_session *session,
3839                          struct ceph_msg *msg)
3840 {
3841         struct super_block *sb = mdsc->fsc->sb;
3842         struct inode *inode;
3843         struct dentry *parent, *dentry;
3844         struct ceph_dentry_info *di;
3845         int mds = session->s_mds;
3846         struct ceph_mds_lease *h = msg->front.iov_base;
3847         u32 seq;
3848         struct ceph_vino vino;
3849         struct qstr dname;
3850         int release = 0;
3851
3852         dout("handle_lease from mds%d\n", mds);
3853
3854         /* decode */
3855         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
3856                 goto bad;
3857         vino.ino = le64_to_cpu(h->ino);
3858         vino.snap = CEPH_NOSNAP;
3859         seq = le32_to_cpu(h->seq);
3860         dname.len = get_unaligned_le32(h + 1);
3861         if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
3862                 goto bad;
3863         dname.name = (void *)(h + 1) + sizeof(u32);
3864
3865         /* lookup inode */
3866         inode = ceph_find_inode(sb, vino);
3867         dout("handle_lease %s, ino %llx %p %.*s\n",
3868              ceph_lease_op_name(h->action), vino.ino, inode,
3869              dname.len, dname.name);
3870
3871         mutex_lock(&session->s_mutex);
3872         session->s_seq++;
3873
3874         if (!inode) {
3875                 dout("handle_lease no inode %llx\n", vino.ino);
3876                 goto release;
3877         }
3878
3879         /* dentry */
3880         parent = d_find_alias(inode);
3881         if (!parent) {
3882                 dout("no parent dentry on inode %p\n", inode);
3883                 WARN_ON(1);
3884                 goto release;  /* hrm... */
3885         }
3886         dname.hash = full_name_hash(parent, dname.name, dname.len);
3887         dentry = d_lookup(parent, &dname);
3888         dput(parent);
3889         if (!dentry)
3890                 goto release;
3891
3892         spin_lock(&dentry->d_lock);
3893         di = ceph_dentry(dentry);
3894         switch (h->action) {
3895         case CEPH_MDS_LEASE_REVOKE:
3896                 if (di->lease_session == session) {
3897                         if (ceph_seq_cmp(di->lease_seq, seq) > 0)
3898                                 h->seq = cpu_to_le32(di->lease_seq);
3899                         __ceph_mdsc_drop_dentry_lease(dentry);
3900                 }
3901                 release = 1;
3902                 break;
3903
3904         case CEPH_MDS_LEASE_RENEW:
3905                 if (di->lease_session == session &&
3906                     di->lease_gen == session->s_cap_gen &&
3907                     di->lease_renew_from &&
3908                     di->lease_renew_after == 0) {
3909                         unsigned long duration =
3910                                 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
3911
3912                         di->lease_seq = seq;
3913                         di->time = di->lease_renew_from + duration;
3914                         di->lease_renew_after = di->lease_renew_from +
3915                                 (duration >> 1);
3916                         di->lease_renew_from = 0;
3917                 }
3918                 break;
3919         }
3920         spin_unlock(&dentry->d_lock);
3921         dput(dentry);
3922
3923         if (!release)
3924                 goto out;
3925
3926 release:
3927         /* let's just reuse the same message */
3928         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
3929         ceph_msg_get(msg);
3930         ceph_con_send(&session->s_con, msg);
3931
3932 out:
3933         mutex_unlock(&session->s_mutex);
3934         /* avoid calling iput_final() in mds dispatch threads */
3935         ceph_async_iput(inode);
3936         return;
3937
3938 bad:
3939         pr_err("corrupt lease message\n");
3940         ceph_msg_dump(msg);
3941 }
3942
3943 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3944                               struct dentry *dentry, char action,
3945                               u32 seq)
3946 {
3947         struct ceph_msg *msg;
3948         struct ceph_mds_lease *lease;
3949         struct inode *dir;
3950         int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
3951
3952         dout("lease_send_msg identry %p %s to mds%d\n",
3953              dentry, ceph_lease_op_name(action), session->s_mds);
3954
3955         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
3956         if (!msg)
3957                 return;
3958         lease = msg->front.iov_base;
3959         lease->action = action;
3960         lease->seq = cpu_to_le32(seq);
3961
3962         spin_lock(&dentry->d_lock);
3963         dir = d_inode(dentry->d_parent);
3964         lease->ino = cpu_to_le64(ceph_ino(dir));
3965         lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
3966
3967         put_unaligned_le32(dentry->d_name.len, lease + 1);
3968         memcpy((void *)(lease + 1) + 4,
3969                dentry->d_name.name, dentry->d_name.len);
3970         spin_unlock(&dentry->d_lock);
3971         /*
3972          * if this is a preemptive lease RELEASE, no need to
3973          * flush request stream, since the actual request will
3974          * soon follow.
3975          */
3976         msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
3977
3978         ceph_con_send(&session->s_con, msg);
3979 }
3980
3981 /*
3982  * lock unlock sessions, to wait ongoing session activities
3983  */
3984 static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
3985 {
3986         int i;
3987
3988         mutex_lock(&mdsc->mutex);
3989         for (i = 0; i < mdsc->max_sessions; i++) {
3990                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3991                 if (!s)
3992                         continue;
3993                 mutex_unlock(&mdsc->mutex);
3994                 mutex_lock(&s->s_mutex);
3995                 mutex_unlock(&s->s_mutex);
3996                 ceph_put_mds_session(s);
3997                 mutex_lock(&mdsc->mutex);
3998         }
3999         mutex_unlock(&mdsc->mutex);
4000 }
4001
4002
4003
4004 /*
4005  * delayed work -- periodically trim expired leases, renew caps with mds
4006  */
4007 static void schedule_delayed(struct ceph_mds_client *mdsc)
4008 {
4009         int delay = 5;
4010         unsigned hz = round_jiffies_relative(HZ * delay);
4011         schedule_delayed_work(&mdsc->delayed_work, hz);
4012 }
4013
4014 static void delayed_work(struct work_struct *work)
4015 {
4016         int i;
4017         struct ceph_mds_client *mdsc =
4018                 container_of(work, struct ceph_mds_client, delayed_work.work);
4019         int renew_interval;
4020         int renew_caps;
4021
4022         dout("mdsc delayed_work\n");
4023
4024         mutex_lock(&mdsc->mutex);
4025         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4026         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4027                                    mdsc->last_renew_caps);
4028         if (renew_caps)
4029                 mdsc->last_renew_caps = jiffies;
4030
4031         for (i = 0; i < mdsc->max_sessions; i++) {
4032                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4033                 if (!s)
4034                         continue;
4035                 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4036                         dout("resending session close request for mds%d\n",
4037                              s->s_mds);
4038                         request_close_session(mdsc, s);
4039                         ceph_put_mds_session(s);
4040                         continue;
4041                 }
4042                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4043                         if (s->s_state == CEPH_MDS_SESSION_OPEN) {
4044                                 s->s_state = CEPH_MDS_SESSION_HUNG;
4045                                 pr_info("mds%d hung\n", s->s_mds);
4046                         }
4047                 }
4048                 if (s->s_state == CEPH_MDS_SESSION_NEW ||
4049                     s->s_state == CEPH_MDS_SESSION_RESTARTING ||
4050                     s->s_state == CEPH_MDS_SESSION_REJECTED) {
4051                         /* this mds is failed or recovering, just wait */
4052                         ceph_put_mds_session(s);
4053                         continue;
4054                 }
4055                 mutex_unlock(&mdsc->mutex);
4056
4057                 mutex_lock(&s->s_mutex);
4058                 if (renew_caps)
4059                         send_renew_caps(mdsc, s);
4060                 else
4061                         ceph_con_keepalive(&s->s_con);
4062                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4063                     s->s_state == CEPH_MDS_SESSION_HUNG)
4064                         ceph_send_cap_releases(mdsc, s);
4065                 mutex_unlock(&s->s_mutex);
4066                 ceph_put_mds_session(s);
4067
4068                 mutex_lock(&mdsc->mutex);
4069         }
4070         mutex_unlock(&mdsc->mutex);
4071
4072         ceph_check_delayed_caps(mdsc);
4073
4074         ceph_queue_cap_reclaim_work(mdsc);
4075
4076         ceph_trim_snapid_map(mdsc);
4077
4078         schedule_delayed(mdsc);
4079 }
4080
4081 int ceph_mdsc_init(struct ceph_fs_client *fsc)
4082
4083 {
4084         struct ceph_mds_client *mdsc;
4085
4086         mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4087         if (!mdsc)
4088                 return -ENOMEM;
4089         mdsc->fsc = fsc;
4090         mutex_init(&mdsc->mutex);
4091         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4092         if (!mdsc->mdsmap) {
4093                 kfree(mdsc);
4094                 return -ENOMEM;
4095         }
4096
4097         fsc->mdsc = mdsc;
4098         init_completion(&mdsc->safe_umount_waiters);
4099         init_waitqueue_head(&mdsc->session_close_wq);
4100         INIT_LIST_HEAD(&mdsc->waiting_for_map);
4101         mdsc->sessions = NULL;
4102         atomic_set(&mdsc->num_sessions, 0);
4103         mdsc->max_sessions = 0;
4104         mdsc->stopping = 0;
4105         atomic64_set(&mdsc->quotarealms_count, 0);
4106         mdsc->quotarealms_inodes = RB_ROOT;
4107         mutex_init(&mdsc->quotarealms_inodes_mutex);
4108         mdsc->last_snap_seq = 0;
4109         init_rwsem(&mdsc->snap_rwsem);
4110         mdsc->snap_realms = RB_ROOT;
4111         INIT_LIST_HEAD(&mdsc->snap_empty);
4112         mdsc->num_snap_realms = 0;
4113         spin_lock_init(&mdsc->snap_empty_lock);
4114         mdsc->last_tid = 0;
4115         mdsc->oldest_tid = 0;
4116         mdsc->request_tree = RB_ROOT;
4117         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4118         mdsc->last_renew_caps = jiffies;
4119         INIT_LIST_HEAD(&mdsc->cap_delay_list);
4120         spin_lock_init(&mdsc->cap_delay_lock);
4121         INIT_LIST_HEAD(&mdsc->snap_flush_list);
4122         spin_lock_init(&mdsc->snap_flush_lock);
4123         mdsc->last_cap_flush_tid = 1;
4124         INIT_LIST_HEAD(&mdsc->cap_flush_list);
4125         INIT_LIST_HEAD(&mdsc->cap_dirty);
4126         INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4127         mdsc->num_cap_flushing = 0;
4128         spin_lock_init(&mdsc->cap_dirty_lock);
4129         init_waitqueue_head(&mdsc->cap_flushing_wq);
4130         INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4131         atomic_set(&mdsc->cap_reclaim_pending, 0);
4132
4133         spin_lock_init(&mdsc->dentry_list_lock);
4134         INIT_LIST_HEAD(&mdsc->dentry_leases);
4135         INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4136
4137         ceph_caps_init(mdsc);
4138         ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4139
4140         spin_lock_init(&mdsc->snapid_map_lock);
4141         mdsc->snapid_map_tree = RB_ROOT;
4142         INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4143
4144         init_rwsem(&mdsc->pool_perm_rwsem);
4145         mdsc->pool_perm_tree = RB_ROOT;
4146
4147         strscpy(mdsc->nodename, utsname()->nodename,
4148                 sizeof(mdsc->nodename));
4149         return 0;
4150 }
4151
4152 /*
4153  * Wait for safe replies on open mds requests.  If we time out, drop
4154  * all requests from the tree to avoid dangling dentry refs.
4155  */
4156 static void wait_requests(struct ceph_mds_client *mdsc)
4157 {
4158         struct ceph_options *opts = mdsc->fsc->client->options;
4159         struct ceph_mds_request *req;
4160
4161         mutex_lock(&mdsc->mutex);
4162         if (__get_oldest_req(mdsc)) {
4163                 mutex_unlock(&mdsc->mutex);
4164
4165                 dout("wait_requests waiting for requests\n");
4166                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4167                                     ceph_timeout_jiffies(opts->mount_timeout));
4168
4169                 /* tear down remaining requests */
4170                 mutex_lock(&mdsc->mutex);
4171                 while ((req = __get_oldest_req(mdsc))) {
4172                         dout("wait_requests timed out on tid %llu\n",
4173                              req->r_tid);
4174                         list_del_init(&req->r_wait);
4175                         __unregister_request(mdsc, req);
4176                 }
4177         }
4178         mutex_unlock(&mdsc->mutex);
4179         dout("wait_requests done\n");
4180 }
4181
4182 /*
4183  * called before mount is ro, and before dentries are torn down.
4184  * (hmm, does this still race with new lookups?)
4185  */
4186 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4187 {
4188         dout("pre_umount\n");
4189         mdsc->stopping = 1;
4190
4191         lock_unlock_sessions(mdsc);
4192         ceph_flush_dirty_caps(mdsc);
4193         wait_requests(mdsc);
4194
4195         /*
4196          * wait for reply handlers to drop their request refs and
4197          * their inode/dcache refs
4198          */
4199         ceph_msgr_flush();
4200
4201         ceph_cleanup_quotarealms_inodes(mdsc);
4202 }
4203
4204 /*
4205  * wait for all write mds requests to flush.
4206  */
4207 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4208 {
4209         struct ceph_mds_request *req = NULL, *nextreq;
4210         struct rb_node *n;
4211
4212         mutex_lock(&mdsc->mutex);
4213         dout("wait_unsafe_requests want %lld\n", want_tid);
4214 restart:
4215         req = __get_oldest_req(mdsc);
4216         while (req && req->r_tid <= want_tid) {
4217                 /* find next request */
4218                 n = rb_next(&req->r_node);
4219                 if (n)
4220                         nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4221                 else
4222                         nextreq = NULL;
4223                 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4224                     (req->r_op & CEPH_MDS_OP_WRITE)) {
4225                         /* write op */
4226                         ceph_mdsc_get_request(req);
4227                         if (nextreq)
4228                                 ceph_mdsc_get_request(nextreq);
4229                         mutex_unlock(&mdsc->mutex);
4230                         dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
4231                              req->r_tid, want_tid);
4232                         wait_for_completion(&req->r_safe_completion);
4233                         mutex_lock(&mdsc->mutex);
4234                         ceph_mdsc_put_request(req);
4235                         if (!nextreq)
4236                                 break;  /* next dne before, so we're done! */
4237                         if (RB_EMPTY_NODE(&nextreq->r_node)) {
4238                                 /* next request was removed from tree */
4239                                 ceph_mdsc_put_request(nextreq);
4240                                 goto restart;
4241                         }
4242                         ceph_mdsc_put_request(nextreq);  /* won't go away */
4243                 }
4244                 req = nextreq;
4245         }
4246         mutex_unlock(&mdsc->mutex);
4247         dout("wait_unsafe_requests done\n");
4248 }
4249
4250 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4251 {
4252         u64 want_tid, want_flush;
4253
4254         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4255                 return;
4256
4257         dout("sync\n");
4258         mutex_lock(&mdsc->mutex);
4259         want_tid = mdsc->last_tid;
4260         mutex_unlock(&mdsc->mutex);
4261
4262         ceph_flush_dirty_caps(mdsc);
4263         spin_lock(&mdsc->cap_dirty_lock);
4264         want_flush = mdsc->last_cap_flush_tid;
4265         if (!list_empty(&mdsc->cap_flush_list)) {
4266                 struct ceph_cap_flush *cf =
4267                         list_last_entry(&mdsc->cap_flush_list,
4268                                         struct ceph_cap_flush, g_list);
4269                 cf->wake = true;
4270         }
4271         spin_unlock(&mdsc->cap_dirty_lock);
4272
4273         dout("sync want tid %lld flush_seq %lld\n",
4274              want_tid, want_flush);
4275
4276         wait_unsafe_requests(mdsc, want_tid);
4277         wait_caps_flush(mdsc, want_flush);
4278 }
4279
4280 /*
4281  * true if all sessions are closed, or we force unmount
4282  */
4283 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4284 {
4285         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4286                 return true;
4287         return atomic_read(&mdsc->num_sessions) <= skipped;
4288 }
4289
4290 /*
4291  * called after sb is ro.
4292  */
4293 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4294 {
4295         struct ceph_options *opts = mdsc->fsc->client->options;
4296         struct ceph_mds_session *session;
4297         int i;
4298         int skipped = 0;
4299
4300         dout("close_sessions\n");
4301
4302         /* close sessions */
4303         mutex_lock(&mdsc->mutex);
4304         for (i = 0; i < mdsc->max_sessions; i++) {
4305                 session = __ceph_lookup_mds_session(mdsc, i);
4306                 if (!session)
4307                         continue;
4308                 mutex_unlock(&mdsc->mutex);
4309                 mutex_lock(&session->s_mutex);
4310                 if (__close_session(mdsc, session) <= 0)
4311                         skipped++;
4312                 mutex_unlock(&session->s_mutex);
4313                 ceph_put_mds_session(session);
4314                 mutex_lock(&mdsc->mutex);
4315         }
4316         mutex_unlock(&mdsc->mutex);
4317
4318         dout("waiting for sessions to close\n");
4319         wait_event_timeout(mdsc->session_close_wq,
4320                            done_closing_sessions(mdsc, skipped),
4321                            ceph_timeout_jiffies(opts->mount_timeout));
4322
4323         /* tear down remaining sessions */
4324         mutex_lock(&mdsc->mutex);
4325         for (i = 0; i < mdsc->max_sessions; i++) {
4326                 if (mdsc->sessions[i]) {
4327                         session = get_session(mdsc->sessions[i]);
4328                         __unregister_session(mdsc, session);
4329                         mutex_unlock(&mdsc->mutex);
4330                         mutex_lock(&session->s_mutex);
4331                         remove_session_caps(session);
4332                         mutex_unlock(&session->s_mutex);
4333                         ceph_put_mds_session(session);
4334                         mutex_lock(&mdsc->mutex);
4335                 }
4336         }
4337         WARN_ON(!list_empty(&mdsc->cap_delay_list));
4338         mutex_unlock(&mdsc->mutex);
4339
4340         ceph_cleanup_snapid_map(mdsc);
4341         ceph_cleanup_empty_realms(mdsc);
4342
4343         cancel_work_sync(&mdsc->cap_reclaim_work);
4344         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4345
4346         dout("stopped\n");
4347 }
4348
4349 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4350 {
4351         struct ceph_mds_session *session;
4352         int mds;
4353
4354         dout("force umount\n");
4355
4356         mutex_lock(&mdsc->mutex);
4357         for (mds = 0; mds < mdsc->max_sessions; mds++) {
4358                 session = __ceph_lookup_mds_session(mdsc, mds);
4359                 if (!session)
4360                         continue;
4361                 mutex_unlock(&mdsc->mutex);
4362                 mutex_lock(&session->s_mutex);
4363                 __close_session(mdsc, session);
4364                 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4365                         cleanup_session_requests(mdsc, session);
4366                         remove_session_caps(session);
4367                 }
4368                 mutex_unlock(&session->s_mutex);
4369                 ceph_put_mds_session(session);
4370                 mutex_lock(&mdsc->mutex);
4371                 kick_requests(mdsc, mds);
4372         }
4373         __wake_requests(mdsc, &mdsc->waiting_for_map);
4374         mutex_unlock(&mdsc->mutex);
4375 }
4376
4377 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4378 {
4379         dout("stop\n");
4380         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4381         if (mdsc->mdsmap)
4382                 ceph_mdsmap_destroy(mdsc->mdsmap);
4383         kfree(mdsc->sessions);
4384         ceph_caps_finalize(mdsc);
4385         ceph_pool_perm_destroy(mdsc);
4386 }
4387
4388 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4389 {
4390         struct ceph_mds_client *mdsc = fsc->mdsc;
4391         dout("mdsc_destroy %p\n", mdsc);
4392
4393         if (!mdsc)
4394                 return;
4395
4396         /* flush out any connection work with references to us */
4397         ceph_msgr_flush();
4398
4399         ceph_mdsc_stop(mdsc);
4400
4401         fsc->mdsc = NULL;
4402         kfree(mdsc);
4403         dout("mdsc_destroy %p done\n", mdsc);
4404 }
4405
4406 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4407 {
4408         struct ceph_fs_client *fsc = mdsc->fsc;
4409         const char *mds_namespace = fsc->mount_options->mds_namespace;
4410         void *p = msg->front.iov_base;
4411         void *end = p + msg->front.iov_len;
4412         u32 epoch;
4413         u32 map_len;
4414         u32 num_fs;
4415         u32 mount_fscid = (u32)-1;
4416         u8 struct_v, struct_cv;
4417         int err = -EINVAL;
4418
4419         ceph_decode_need(&p, end, sizeof(u32), bad);
4420         epoch = ceph_decode_32(&p);
4421
4422         dout("handle_fsmap epoch %u\n", epoch);
4423
4424         ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4425         struct_v = ceph_decode_8(&p);
4426         struct_cv = ceph_decode_8(&p);
4427         map_len = ceph_decode_32(&p);
4428
4429         ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
4430         p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
4431
4432         num_fs = ceph_decode_32(&p);
4433         while (num_fs-- > 0) {
4434                 void *info_p, *info_end;
4435                 u32 info_len;
4436                 u8 info_v, info_cv;
4437                 u32 fscid, namelen;
4438
4439                 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4440                 info_v = ceph_decode_8(&p);
4441                 info_cv = ceph_decode_8(&p);
4442                 info_len = ceph_decode_32(&p);
4443                 ceph_decode_need(&p, end, info_len, bad);
4444                 info_p = p;
4445                 info_end = p + info_len;
4446                 p = info_end;
4447
4448                 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4449                 fscid = ceph_decode_32(&info_p);
4450                 namelen = ceph_decode_32(&info_p);
4451                 ceph_decode_need(&info_p, info_end, namelen, bad);
4452
4453                 if (mds_namespace &&
4454                     strlen(mds_namespace) == namelen &&
4455                     !strncmp(mds_namespace, (char *)info_p, namelen)) {
4456                         mount_fscid = fscid;
4457                         break;
4458                 }
4459         }
4460
4461         ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4462         if (mount_fscid != (u32)-1) {
4463                 fsc->client->monc.fs_cluster_id = mount_fscid;
4464                 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4465                                    0, true);
4466                 ceph_monc_renew_subs(&fsc->client->monc);
4467         } else {
4468                 err = -ENOENT;
4469                 goto err_out;
4470         }
4471         return;
4472
4473 bad:
4474         pr_err("error decoding fsmap\n");
4475 err_out:
4476         mutex_lock(&mdsc->mutex);
4477         mdsc->mdsmap_err = err;
4478         __wake_requests(mdsc, &mdsc->waiting_for_map);
4479         mutex_unlock(&mdsc->mutex);
4480 }
4481
4482 /*
4483  * handle mds map update.
4484  */
4485 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4486 {
4487         u32 epoch;
4488         u32 maplen;
4489         void *p = msg->front.iov_base;
4490         void *end = p + msg->front.iov_len;
4491         struct ceph_mdsmap *newmap, *oldmap;
4492         struct ceph_fsid fsid;
4493         int err = -EINVAL;
4494
4495         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
4496         ceph_decode_copy(&p, &fsid, sizeof(fsid));
4497         if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
4498                 return;
4499         epoch = ceph_decode_32(&p);
4500         maplen = ceph_decode_32(&p);
4501         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
4502
4503         /* do we need it? */
4504         mutex_lock(&mdsc->mutex);
4505         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
4506                 dout("handle_map epoch %u <= our %u\n",
4507                      epoch, mdsc->mdsmap->m_epoch);
4508                 mutex_unlock(&mdsc->mutex);
4509                 return;
4510         }
4511
4512         newmap = ceph_mdsmap_decode(&p, end);
4513         if (IS_ERR(newmap)) {
4514                 err = PTR_ERR(newmap);
4515                 goto bad_unlock;
4516         }
4517
4518         /* swap into place */
4519         if (mdsc->mdsmap) {
4520                 oldmap = mdsc->mdsmap;
4521                 mdsc->mdsmap = newmap;
4522                 check_new_map(mdsc, newmap, oldmap);
4523                 ceph_mdsmap_destroy(oldmap);
4524         } else {
4525                 mdsc->mdsmap = newmap;  /* first mds map */
4526         }
4527         mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
4528                                         MAX_LFS_FILESIZE);
4529
4530         __wake_requests(mdsc, &mdsc->waiting_for_map);
4531         ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
4532                           mdsc->mdsmap->m_epoch);
4533
4534         mutex_unlock(&mdsc->mutex);
4535         schedule_delayed(mdsc);
4536         return;
4537
4538 bad_unlock:
4539         mutex_unlock(&mdsc->mutex);
4540 bad:
4541         pr_err("error decoding mdsmap %d\n", err);
4542         return;
4543 }
4544
4545 static struct ceph_connection *con_get(struct ceph_connection *con)
4546 {
4547         struct ceph_mds_session *s = con->private;
4548
4549         if (get_session(s)) {
4550                 dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
4551                 return con;
4552         }
4553         dout("mdsc con_get %p FAIL\n", s);
4554         return NULL;
4555 }
4556
4557 static void con_put(struct ceph_connection *con)
4558 {
4559         struct ceph_mds_session *s = con->private;
4560
4561         dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
4562         ceph_put_mds_session(s);
4563 }
4564
4565 /*
4566  * if the client is unresponsive for long enough, the mds will kill
4567  * the session entirely.
4568  */
4569 static void peer_reset(struct ceph_connection *con)
4570 {
4571         struct ceph_mds_session *s = con->private;
4572         struct ceph_mds_client *mdsc = s->s_mdsc;
4573
4574         pr_warn("mds%d closed our session\n", s->s_mds);
4575         send_mds_reconnect(mdsc, s);
4576 }
4577
4578 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
4579 {
4580         struct ceph_mds_session *s = con->private;
4581         struct ceph_mds_client *mdsc = s->s_mdsc;
4582         int type = le16_to_cpu(msg->hdr.type);
4583
4584         mutex_lock(&mdsc->mutex);
4585         if (__verify_registered_session(mdsc, s) < 0) {
4586                 mutex_unlock(&mdsc->mutex);
4587                 goto out;
4588         }
4589         mutex_unlock(&mdsc->mutex);
4590
4591         switch (type) {
4592         case CEPH_MSG_MDS_MAP:
4593                 ceph_mdsc_handle_mdsmap(mdsc, msg);
4594                 break;
4595         case CEPH_MSG_FS_MAP_USER:
4596                 ceph_mdsc_handle_fsmap(mdsc, msg);
4597                 break;
4598         case CEPH_MSG_CLIENT_SESSION:
4599                 handle_session(s, msg);
4600                 break;
4601         case CEPH_MSG_CLIENT_REPLY:
4602                 handle_reply(s, msg);
4603                 break;
4604         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
4605                 handle_forward(mdsc, s, msg);
4606                 break;
4607         case CEPH_MSG_CLIENT_CAPS:
4608                 ceph_handle_caps(s, msg);
4609                 break;
4610         case CEPH_MSG_CLIENT_SNAP:
4611                 ceph_handle_snap(mdsc, s, msg);
4612                 break;
4613         case CEPH_MSG_CLIENT_LEASE:
4614                 handle_lease(mdsc, s, msg);
4615                 break;
4616         case CEPH_MSG_CLIENT_QUOTA:
4617                 ceph_handle_quota(mdsc, s, msg);
4618                 break;
4619
4620         default:
4621                 pr_err("received unknown message type %d %s\n", type,
4622                        ceph_msg_type_name(type));
4623         }
4624 out:
4625         ceph_msg_put(msg);
4626 }
4627
4628 /*
4629  * authentication
4630  */
4631
4632 /*
4633  * Note: returned pointer is the address of a structure that's
4634  * managed separately.  Caller must *not* attempt to free it.
4635  */
4636 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
4637                                         int *proto, int force_new)
4638 {
4639         struct ceph_mds_session *s = con->private;
4640         struct ceph_mds_client *mdsc = s->s_mdsc;
4641         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4642         struct ceph_auth_handshake *auth = &s->s_auth;
4643
4644         if (force_new && auth->authorizer) {
4645                 ceph_auth_destroy_authorizer(auth->authorizer);
4646                 auth->authorizer = NULL;
4647         }
4648         if (!auth->authorizer) {
4649                 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4650                                                       auth);
4651                 if (ret)
4652                         return ERR_PTR(ret);
4653         } else {
4654                 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4655                                                       auth);
4656                 if (ret)
4657                         return ERR_PTR(ret);
4658         }
4659         *proto = ac->protocol;
4660
4661         return auth;
4662 }
4663
4664 static int add_authorizer_challenge(struct ceph_connection *con,
4665                                     void *challenge_buf, int challenge_buf_len)
4666 {
4667         struct ceph_mds_session *s = con->private;
4668         struct ceph_mds_client *mdsc = s->s_mdsc;
4669         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4670
4671         return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
4672                                             challenge_buf, challenge_buf_len);
4673 }
4674
4675 static int verify_authorizer_reply(struct ceph_connection *con)
4676 {
4677         struct ceph_mds_session *s = con->private;
4678         struct ceph_mds_client *mdsc = s->s_mdsc;
4679         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4680
4681         return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
4682 }
4683
4684 static int invalidate_authorizer(struct ceph_connection *con)
4685 {
4686         struct ceph_mds_session *s = con->private;
4687         struct ceph_mds_client *mdsc = s->s_mdsc;
4688         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4689
4690         ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
4691
4692         return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
4693 }
4694
4695 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
4696                                 struct ceph_msg_header *hdr, int *skip)
4697 {
4698         struct ceph_msg *msg;
4699         int type = (int) le16_to_cpu(hdr->type);
4700         int front_len = (int) le32_to_cpu(hdr->front_len);
4701
4702         if (con->in_msg)
4703                 return con->in_msg;
4704
4705         *skip = 0;
4706         msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
4707         if (!msg) {
4708                 pr_err("unable to allocate msg type %d len %d\n",
4709                        type, front_len);
4710                 return NULL;
4711         }
4712
4713         return msg;
4714 }
4715
4716 static int mds_sign_message(struct ceph_msg *msg)
4717 {
4718        struct ceph_mds_session *s = msg->con->private;
4719        struct ceph_auth_handshake *auth = &s->s_auth;
4720
4721        return ceph_auth_sign_message(auth, msg);
4722 }
4723
4724 static int mds_check_message_signature(struct ceph_msg *msg)
4725 {
4726        struct ceph_mds_session *s = msg->con->private;
4727        struct ceph_auth_handshake *auth = &s->s_auth;
4728
4729        return ceph_auth_check_message_signature(auth, msg);
4730 }
4731
4732 static const struct ceph_connection_operations mds_con_ops = {
4733         .get = con_get,
4734         .put = con_put,
4735         .dispatch = dispatch,
4736         .get_authorizer = get_authorizer,
4737         .add_authorizer_challenge = add_authorizer_challenge,
4738         .verify_authorizer_reply = verify_authorizer_reply,
4739         .invalidate_authorizer = invalidate_authorizer,
4740         .peer_reset = peer_reset,
4741         .alloc_msg = mds_alloc_msg,
4742         .sign_message = mds_sign_message,
4743         .check_message_signature = mds_check_message_signature,
4744 };
4745
4746 /* eof */