Linux-libre 4.14.69-gnu
[librecmc/linux-libre.git] / drivers / staging / lustre / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39 #include <obd_class.h>
40 #include <lprocfs_status.h>
41 #include <lustre_kernelcomm.h>
42
43 spinlock_t obd_types_lock;
44
45 static struct kmem_cache *obd_device_cachep;
46 struct kmem_cache *obdo_cachep;
47 EXPORT_SYMBOL(obdo_cachep);
48 static struct kmem_cache *import_cachep;
49
50 static struct list_head      obd_zombie_imports;
51 static struct list_head      obd_zombie_exports;
52 static spinlock_t  obd_zombie_impexp_lock;
53 static void obd_zombie_impexp_notify(void);
54 static void obd_zombie_export_add(struct obd_export *exp);
55 static void obd_zombie_import_add(struct obd_import *imp);
56
57 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
58 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
59
60 /*
61  * support functions: we could use inter-module communication, but this
62  * is more portable to other OS's
63  */
64 static struct obd_device *obd_device_alloc(void)
65 {
66         struct obd_device *obd;
67
68         obd = kmem_cache_zalloc(obd_device_cachep, GFP_NOFS);
69         if (obd)
70                 obd->obd_magic = OBD_DEVICE_MAGIC;
71         return obd;
72 }
73
74 static void obd_device_free(struct obd_device *obd)
75 {
76         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
77                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
78         if (obd->obd_namespace) {
79                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
80                        obd, obd->obd_namespace, obd->obd_force);
81                 LBUG();
82         }
83         lu_ref_fini(&obd->obd_reference);
84         kmem_cache_free(obd_device_cachep, obd);
85 }
86
87 static struct obd_type *class_search_type(const char *name)
88 {
89         struct list_head *tmp;
90         struct obd_type *type;
91
92         spin_lock(&obd_types_lock);
93         list_for_each(tmp, &obd_types) {
94                 type = list_entry(tmp, struct obd_type, typ_chain);
95                 if (strcmp(type->typ_name, name) == 0) {
96                         spin_unlock(&obd_types_lock);
97                         return type;
98                 }
99         }
100         spin_unlock(&obd_types_lock);
101         return NULL;
102 }
103
104 static struct obd_type *class_get_type(const char *name)
105 {
106         struct obd_type *type = class_search_type(name);
107
108         if (!type) {
109                 const char *modname = name;
110
111                 if (!request_module("%s", modname)) {
112                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
113                         type = class_search_type(name);
114                 } else {
115                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
116                                            modname);
117                 }
118         }
119         if (type) {
120                 spin_lock(&type->obd_type_lock);
121                 type->typ_refcnt++;
122                 try_module_get(type->typ_dt_ops->owner);
123                 spin_unlock(&type->obd_type_lock);
124         }
125         return type;
126 }
127
128 void class_put_type(struct obd_type *type)
129 {
130         LASSERT(type);
131         spin_lock(&type->obd_type_lock);
132         type->typ_refcnt--;
133         module_put(type->typ_dt_ops->owner);
134         spin_unlock(&type->obd_type_lock);
135 }
136
137 #define CLASS_MAX_NAME 1024
138
139 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
140                         const char *name,
141                         struct lu_device_type *ldt)
142 {
143         struct obd_type *type;
144         int rc;
145
146         /* sanity check */
147         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
148
149         if (class_search_type(name)) {
150                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
151                 return -EEXIST;
152         }
153
154         rc = -ENOMEM;
155         type = kzalloc(sizeof(*type), GFP_NOFS);
156         if (!type)
157                 return rc;
158
159         type->typ_dt_ops = kzalloc(sizeof(*type->typ_dt_ops), GFP_NOFS);
160         type->typ_md_ops = kzalloc(sizeof(*type->typ_md_ops), GFP_NOFS);
161         type->typ_name = kzalloc(strlen(name) + 1, GFP_NOFS);
162
163         if (!type->typ_dt_ops ||
164             !type->typ_md_ops ||
165             !type->typ_name)
166                 goto failed;
167
168         *type->typ_dt_ops = *dt_ops;
169         /* md_ops is optional */
170         if (md_ops)
171                 *type->typ_md_ops = *md_ops;
172         strcpy(type->typ_name, name);
173         spin_lock_init(&type->obd_type_lock);
174
175         type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
176                                                     debugfs_lustre_root,
177                                                     NULL, type);
178         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
179                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
180                                              : -ENOMEM;
181                 type->typ_debugfs_entry = NULL;
182                 goto failed;
183         }
184
185         type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
186         if (!type->typ_kobj) {
187                 rc = -ENOMEM;
188                 goto failed;
189         }
190
191         if (ldt) {
192                 type->typ_lu = ldt;
193                 rc = lu_device_type_init(ldt);
194                 if (rc != 0)
195                         goto failed;
196         }
197
198         spin_lock(&obd_types_lock);
199         list_add(&type->typ_chain, &obd_types);
200         spin_unlock(&obd_types_lock);
201
202         return 0;
203
204  failed:
205         if (type->typ_kobj)
206                 kobject_put(type->typ_kobj);
207         kfree(type->typ_name);
208         kfree(type->typ_md_ops);
209         kfree(type->typ_dt_ops);
210         kfree(type);
211         return rc;
212 }
213 EXPORT_SYMBOL(class_register_type);
214
215 int class_unregister_type(const char *name)
216 {
217         struct obd_type *type = class_search_type(name);
218
219         if (!type) {
220                 CERROR("unknown obd type\n");
221                 return -EINVAL;
222         }
223
224         if (type->typ_refcnt) {
225                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
226                 /* This is a bad situation, let's make the best of it */
227                 /* Remove ops, but leave the name for debugging */
228                 kfree(type->typ_dt_ops);
229                 kfree(type->typ_md_ops);
230                 return -EBUSY;
231         }
232
233         if (type->typ_kobj)
234                 kobject_put(type->typ_kobj);
235
236         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
237                 ldebugfs_remove(&type->typ_debugfs_entry);
238
239         if (type->typ_lu)
240                 lu_device_type_fini(type->typ_lu);
241
242         spin_lock(&obd_types_lock);
243         list_del(&type->typ_chain);
244         spin_unlock(&obd_types_lock);
245         kfree(type->typ_name);
246         kfree(type->typ_dt_ops);
247         kfree(type->typ_md_ops);
248         kfree(type);
249         return 0;
250 } /* class_unregister_type */
251 EXPORT_SYMBOL(class_unregister_type);
252
253 /**
254  * Create a new obd device.
255  *
256  * Find an empty slot in ::obd_devs[], create a new obd device in it.
257  *
258  * \param[in] type_name obd device type string.
259  * \param[in] name      obd device name.
260  *
261  * \retval NULL if create fails, otherwise return the obd device
262  *       pointer created.
263  */
264 struct obd_device *class_newdev(const char *type_name, const char *name)
265 {
266         struct obd_device *result = NULL;
267         struct obd_device *newdev;
268         struct obd_type *type = NULL;
269         int i;
270         int new_obd_minor = 0;
271
272         if (strlen(name) >= MAX_OBD_NAME) {
273                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
274                 return ERR_PTR(-EINVAL);
275         }
276
277         type = class_get_type(type_name);
278         if (!type) {
279                 CERROR("OBD: unknown type: %s\n", type_name);
280                 return ERR_PTR(-ENODEV);
281         }
282
283         newdev = obd_device_alloc();
284         if (!newdev) {
285                 result = ERR_PTR(-ENOMEM);
286                 goto out_type;
287         }
288
289         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
290
291         write_lock(&obd_dev_lock);
292         for (i = 0; i < class_devno_max(); i++) {
293                 struct obd_device *obd = class_num2obd(i);
294
295                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
296                         CERROR("Device %s already exists at %d, won't add\n",
297                                name, i);
298                         if (result) {
299                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
300                                          "%p obd_magic %08x != %08x\n", result,
301                                          result->obd_magic, OBD_DEVICE_MAGIC);
302                                 LASSERTF(result->obd_minor == new_obd_minor,
303                                          "%p obd_minor %d != %d\n", result,
304                                          result->obd_minor, new_obd_minor);
305
306                                 obd_devs[result->obd_minor] = NULL;
307                                 result->obd_name[0] = '\0';
308                          }
309                         result = ERR_PTR(-EEXIST);
310                         break;
311                 }
312                 if (!result && !obd) {
313                         result = newdev;
314                         result->obd_minor = i;
315                         new_obd_minor = i;
316                         result->obd_type = type;
317                         strncpy(result->obd_name, name,
318                                 sizeof(result->obd_name) - 1);
319                         obd_devs[i] = result;
320                 }
321         }
322         write_unlock(&obd_dev_lock);
323
324         if (!result && i >= class_devno_max()) {
325                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
326                        class_devno_max());
327                 result = ERR_PTR(-EOVERFLOW);
328                 goto out;
329         }
330
331         if (IS_ERR(result))
332                 goto out;
333
334         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
335                result->obd_name, result);
336
337         return result;
338 out:
339         obd_device_free(newdev);
340 out_type:
341         class_put_type(type);
342         return result;
343 }
344
345 void class_release_dev(struct obd_device *obd)
346 {
347         struct obd_type *obd_type = obd->obd_type;
348
349         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
350                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
351         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
352                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
353         LASSERT(obd_type);
354
355         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
356                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
357
358         write_lock(&obd_dev_lock);
359         obd_devs[obd->obd_minor] = NULL;
360         write_unlock(&obd_dev_lock);
361         obd_device_free(obd);
362
363         class_put_type(obd_type);
364 }
365
366 int class_name2dev(const char *name)
367 {
368         int i;
369
370         if (!name)
371                 return -1;
372
373         read_lock(&obd_dev_lock);
374         for (i = 0; i < class_devno_max(); i++) {
375                 struct obd_device *obd = class_num2obd(i);
376
377                 if (obd && strcmp(name, obd->obd_name) == 0) {
378                         /* Make sure we finished attaching before we give
379                          * out any references
380                          */
381                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
382                         if (obd->obd_attached) {
383                                 read_unlock(&obd_dev_lock);
384                                 return i;
385                         }
386                         break;
387                 }
388         }
389         read_unlock(&obd_dev_lock);
390
391         return -1;
392 }
393
394 struct obd_device *class_name2obd(const char *name)
395 {
396         int dev = class_name2dev(name);
397
398         if (dev < 0 || dev > class_devno_max())
399                 return NULL;
400         return class_num2obd(dev);
401 }
402 EXPORT_SYMBOL(class_name2obd);
403
404 int class_uuid2dev(struct obd_uuid *uuid)
405 {
406         int i;
407
408         read_lock(&obd_dev_lock);
409         for (i = 0; i < class_devno_max(); i++) {
410                 struct obd_device *obd = class_num2obd(i);
411
412                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
413                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
414                         read_unlock(&obd_dev_lock);
415                         return i;
416                 }
417         }
418         read_unlock(&obd_dev_lock);
419
420         return -1;
421 }
422
423 /**
424  * Get obd device from ::obd_devs[]
425  *
426  * \param num [in] array index
427  *
428  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
429  *       otherwise return the obd device there.
430  */
431 struct obd_device *class_num2obd(int num)
432 {
433         struct obd_device *obd = NULL;
434
435         if (num < class_devno_max()) {
436                 obd = obd_devs[num];
437                 if (!obd)
438                         return NULL;
439
440                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
441                          "%p obd_magic %08x != %08x\n",
442                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
443                 LASSERTF(obd->obd_minor == num,
444                          "%p obd_minor %0d != %0d\n",
445                          obd, obd->obd_minor, num);
446         }
447
448         return obd;
449 }
450
451 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
452  * specified, then only the client with that uuid is returned,
453  * otherwise any client connected to the tgt is returned.
454  */
455 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
456                                          const char *typ_name,
457                                          struct obd_uuid *grp_uuid)
458 {
459         int i;
460
461         read_lock(&obd_dev_lock);
462         for (i = 0; i < class_devno_max(); i++) {
463                 struct obd_device *obd = class_num2obd(i);
464
465                 if (!obd)
466                         continue;
467                 if ((strncmp(obd->obd_type->typ_name, typ_name,
468                              strlen(typ_name)) == 0)) {
469                         if (obd_uuid_equals(tgt_uuid,
470                                             &obd->u.cli.cl_target_uuid) &&
471                             ((grp_uuid) ? obd_uuid_equals(grp_uuid,
472                                                          &obd->obd_uuid) : 1)) {
473                                 read_unlock(&obd_dev_lock);
474                                 return obd;
475                         }
476                 }
477         }
478         read_unlock(&obd_dev_lock);
479
480         return NULL;
481 }
482 EXPORT_SYMBOL(class_find_client_obd);
483
484 /* Iterate the obd_device list looking devices have grp_uuid. Start
485  * searching at *next, and if a device is found, the next index to look
486  * at is saved in *next. If next is NULL, then the first matching device
487  * will always be returned.
488  */
489 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
490 {
491         int i;
492
493         if (!next)
494                 i = 0;
495         else if (*next >= 0 && *next < class_devno_max())
496                 i = *next;
497         else
498                 return NULL;
499
500         read_lock(&obd_dev_lock);
501         for (; i < class_devno_max(); i++) {
502                 struct obd_device *obd = class_num2obd(i);
503
504                 if (!obd)
505                         continue;
506                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
507                         if (next)
508                                 *next = i + 1;
509                         read_unlock(&obd_dev_lock);
510                         return obd;
511                 }
512         }
513         read_unlock(&obd_dev_lock);
514
515         return NULL;
516 }
517 EXPORT_SYMBOL(class_devices_in_group);
518
519 /**
520  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
521  * adjust sptlrpc settings accordingly.
522  */
523 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
524 {
525         struct obd_device  *obd;
526         const char       *type;
527         int              i, rc = 0, rc2;
528
529         LASSERT(namelen > 0);
530
531         read_lock(&obd_dev_lock);
532         for (i = 0; i < class_devno_max(); i++) {
533                 obd = class_num2obd(i);
534
535                 if (!obd || obd->obd_set_up == 0 || obd->obd_stopping)
536                         continue;
537
538                 /* only notify mdc, osc, mdt, ost */
539                 type = obd->obd_type->typ_name;
540                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
541                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
542                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
543                     strcmp(type, LUSTRE_OST_NAME) != 0)
544                         continue;
545
546                 if (strncmp(obd->obd_name, fsname, namelen))
547                         continue;
548
549                 class_incref(obd, __func__, obd);
550                 read_unlock(&obd_dev_lock);
551                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
552                                          sizeof(KEY_SPTLRPC_CONF),
553                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
554                 rc = rc ? rc : rc2;
555                 class_decref(obd, __func__, obd);
556                 read_lock(&obd_dev_lock);
557         }
558         read_unlock(&obd_dev_lock);
559         return rc;
560 }
561 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
562
563 void obd_cleanup_caches(void)
564 {
565         kmem_cache_destroy(obd_device_cachep);
566         obd_device_cachep = NULL;
567         kmem_cache_destroy(obdo_cachep);
568         obdo_cachep = NULL;
569         kmem_cache_destroy(import_cachep);
570         import_cachep = NULL;
571 }
572
573 int obd_init_caches(void)
574 {
575         LASSERT(!obd_device_cachep);
576         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
577                                               sizeof(struct obd_device),
578                                               0, 0, NULL);
579         if (!obd_device_cachep)
580                 goto out;
581
582         LASSERT(!obdo_cachep);
583         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
584                                         0, 0, NULL);
585         if (!obdo_cachep)
586                 goto out;
587
588         LASSERT(!import_cachep);
589         import_cachep = kmem_cache_create("ll_import_cache",
590                                           sizeof(struct obd_import),
591                                           0, 0, NULL);
592         if (!import_cachep)
593                 goto out;
594
595         return 0;
596  out:
597         obd_cleanup_caches();
598         return -ENOMEM;
599 }
600
601 /* map connection to client */
602 struct obd_export *class_conn2export(struct lustre_handle *conn)
603 {
604         struct obd_export *export;
605
606         if (!conn) {
607                 CDEBUG(D_CACHE, "looking for null handle\n");
608                 return NULL;
609         }
610
611         if (conn->cookie == -1) {  /* this means assign a new connection */
612                 CDEBUG(D_CACHE, "want a new connection\n");
613                 return NULL;
614         }
615
616         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
617         export = class_handle2object(conn->cookie, NULL);
618         return export;
619 }
620 EXPORT_SYMBOL(class_conn2export);
621
622 struct obd_device *class_exp2obd(struct obd_export *exp)
623 {
624         if (exp)
625                 return exp->exp_obd;
626         return NULL;
627 }
628 EXPORT_SYMBOL(class_exp2obd);
629
630 struct obd_import *class_exp2cliimp(struct obd_export *exp)
631 {
632         struct obd_device *obd = exp->exp_obd;
633
634         if (!obd)
635                 return NULL;
636         return obd->u.cli.cl_import;
637 }
638 EXPORT_SYMBOL(class_exp2cliimp);
639
640 /* Export management functions */
641 static void class_export_destroy(struct obd_export *exp)
642 {
643         struct obd_device *obd = exp->exp_obd;
644
645         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
646         LASSERT(obd);
647
648         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
649                exp->exp_client_uuid.uuid, obd->obd_name);
650
651         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
652         if (exp->exp_connection)
653                 ptlrpc_put_connection_superhack(exp->exp_connection);
654
655         LASSERT(list_empty(&exp->exp_outstanding_replies));
656         LASSERT(list_empty(&exp->exp_uncommitted_replies));
657         LASSERT(list_empty(&exp->exp_req_replay_queue));
658         LASSERT(list_empty(&exp->exp_hp_rpcs));
659         obd_destroy_export(exp);
660         class_decref(obd, "export", exp);
661
662         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
663 }
664
665 static void export_handle_addref(void *export)
666 {
667         class_export_get(export);
668 }
669
670 static struct portals_handle_ops export_handle_ops = {
671         .hop_addref = export_handle_addref,
672         .hop_free   = NULL,
673 };
674
675 struct obd_export *class_export_get(struct obd_export *exp)
676 {
677         atomic_inc(&exp->exp_refcount);
678         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
679                atomic_read(&exp->exp_refcount));
680         return exp;
681 }
682 EXPORT_SYMBOL(class_export_get);
683
684 void class_export_put(struct obd_export *exp)
685 {
686         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
687         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
688                atomic_read(&exp->exp_refcount) - 1);
689
690         if (atomic_dec_and_test(&exp->exp_refcount)) {
691                 LASSERT(!list_empty(&exp->exp_obd_chain));
692                 CDEBUG(D_IOCTL, "final put %p/%s\n",
693                        exp, exp->exp_client_uuid.uuid);
694
695                 /* release nid stat refererence */
696                 lprocfs_exp_cleanup(exp);
697
698                 obd_zombie_export_add(exp);
699         }
700 }
701 EXPORT_SYMBOL(class_export_put);
702
703 /* Creates a new export, adds it to the hash table, and returns a
704  * pointer to it. The refcount is 2: one for the hash reference, and
705  * one for the pointer returned by this function.
706  */
707 struct obd_export *class_new_export(struct obd_device *obd,
708                                     struct obd_uuid *cluuid)
709 {
710         struct obd_export *export;
711         struct cfs_hash *hash = NULL;
712         int rc = 0;
713
714         export = kzalloc(sizeof(*export), GFP_NOFS);
715         if (!export)
716                 return ERR_PTR(-ENOMEM);
717
718         export->exp_conn_cnt = 0;
719         export->exp_lock_hash = NULL;
720         export->exp_flock_hash = NULL;
721         atomic_set(&export->exp_refcount, 2);
722         atomic_set(&export->exp_rpc_count, 0);
723         atomic_set(&export->exp_cb_count, 0);
724         atomic_set(&export->exp_locks_count, 0);
725 #if LUSTRE_TRACKS_LOCK_EXP_REFS
726         INIT_LIST_HEAD(&export->exp_locks_list);
727         spin_lock_init(&export->exp_locks_list_guard);
728 #endif
729         atomic_set(&export->exp_replay_count, 0);
730         export->exp_obd = obd;
731         INIT_LIST_HEAD(&export->exp_outstanding_replies);
732         spin_lock_init(&export->exp_uncommitted_replies_lock);
733         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
734         INIT_LIST_HEAD(&export->exp_req_replay_queue);
735         INIT_LIST_HEAD(&export->exp_handle.h_link);
736         INIT_LIST_HEAD(&export->exp_hp_rpcs);
737         class_handle_hash(&export->exp_handle, &export_handle_ops);
738         spin_lock_init(&export->exp_lock);
739         spin_lock_init(&export->exp_rpc_lock);
740         INIT_HLIST_NODE(&export->exp_uuid_hash);
741         spin_lock_init(&export->exp_bl_list_lock);
742         INIT_LIST_HEAD(&export->exp_bl_list);
743
744         export->exp_sp_peer = LUSTRE_SP_ANY;
745         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
746         export->exp_client_uuid = *cluuid;
747         obd_init_export(export);
748
749         spin_lock(&obd->obd_dev_lock);
750         /* shouldn't happen, but might race */
751         if (obd->obd_stopping) {
752                 rc = -ENODEV;
753                 goto exit_unlock;
754         }
755
756         hash = cfs_hash_getref(obd->obd_uuid_hash);
757         if (!hash) {
758                 rc = -ENODEV;
759                 goto exit_unlock;
760         }
761         spin_unlock(&obd->obd_dev_lock);
762
763         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
764                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
765                 if (rc != 0) {
766                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
767                                       obd->obd_name, cluuid->uuid, rc);
768                         rc = -EALREADY;
769                         goto exit_err;
770                 }
771         }
772
773         spin_lock(&obd->obd_dev_lock);
774         if (obd->obd_stopping) {
775                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
776                 rc = -ENODEV;
777                 goto exit_unlock;
778         }
779
780         class_incref(obd, "export", export);
781         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
782         export->exp_obd->obd_num_exports++;
783         spin_unlock(&obd->obd_dev_lock);
784         cfs_hash_putref(hash);
785         return export;
786
787 exit_unlock:
788         spin_unlock(&obd->obd_dev_lock);
789 exit_err:
790         if (hash)
791                 cfs_hash_putref(hash);
792         class_handle_unhash(&export->exp_handle);
793         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
794         obd_destroy_export(export);
795         kfree(export);
796         return ERR_PTR(rc);
797 }
798 EXPORT_SYMBOL(class_new_export);
799
800 void class_unlink_export(struct obd_export *exp)
801 {
802         class_handle_unhash(&exp->exp_handle);
803
804         spin_lock(&exp->exp_obd->obd_dev_lock);
805         /* delete an uuid-export hashitem from hashtables */
806         if (!hlist_unhashed(&exp->exp_uuid_hash))
807                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
808                              &exp->exp_client_uuid,
809                              &exp->exp_uuid_hash);
810
811         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
812         exp->exp_obd->obd_num_exports--;
813         spin_unlock(&exp->exp_obd->obd_dev_lock);
814         class_export_put(exp);
815 }
816
817 /* Import management functions */
818 static void class_import_destroy(struct obd_import *imp)
819 {
820         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
821                imp->imp_obd->obd_name);
822
823         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
824
825         ptlrpc_put_connection_superhack(imp->imp_connection);
826
827         while (!list_empty(&imp->imp_conn_list)) {
828                 struct obd_import_conn *imp_conn;
829
830                 imp_conn = list_entry(imp->imp_conn_list.next,
831                                       struct obd_import_conn, oic_item);
832                 list_del_init(&imp_conn->oic_item);
833                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
834                 kfree(imp_conn);
835         }
836
837         LASSERT(!imp->imp_sec);
838         class_decref(imp->imp_obd, "import", imp);
839         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
840 }
841
842 static void import_handle_addref(void *import)
843 {
844         class_import_get(import);
845 }
846
847 static struct portals_handle_ops import_handle_ops = {
848         .hop_addref = import_handle_addref,
849         .hop_free   = NULL,
850 };
851
852 struct obd_import *class_import_get(struct obd_import *import)
853 {
854         atomic_inc(&import->imp_refcount);
855         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
856                atomic_read(&import->imp_refcount),
857                import->imp_obd->obd_name);
858         return import;
859 }
860 EXPORT_SYMBOL(class_import_get);
861
862 void class_import_put(struct obd_import *imp)
863 {
864         LASSERT(list_empty(&imp->imp_zombie_chain));
865         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
866
867         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
868                atomic_read(&imp->imp_refcount) - 1,
869                imp->imp_obd->obd_name);
870
871         if (atomic_dec_and_test(&imp->imp_refcount)) {
872                 CDEBUG(D_INFO, "final put import %p\n", imp);
873                 obd_zombie_import_add(imp);
874         }
875
876         /* catch possible import put race */
877         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
878 }
879 EXPORT_SYMBOL(class_import_put);
880
881 static void init_imp_at(struct imp_at *at)
882 {
883         int i;
884
885         at_init(&at->iat_net_latency, 0, 0);
886         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
887                 /* max service estimates are tracked on the server side, so
888                  * don't use the AT history here, just use the last reported
889                  * val. (But keep hist for proc histogram, worst_ever)
890                  */
891                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
892                         AT_FLG_NOHIST);
893         }
894 }
895
896 struct obd_import *class_new_import(struct obd_device *obd)
897 {
898         struct obd_import *imp;
899
900         imp = kzalloc(sizeof(*imp), GFP_NOFS);
901         if (!imp)
902                 return NULL;
903
904         INIT_LIST_HEAD(&imp->imp_pinger_chain);
905         INIT_LIST_HEAD(&imp->imp_zombie_chain);
906         INIT_LIST_HEAD(&imp->imp_replay_list);
907         INIT_LIST_HEAD(&imp->imp_sending_list);
908         INIT_LIST_HEAD(&imp->imp_delayed_list);
909         INIT_LIST_HEAD(&imp->imp_committed_list);
910         INIT_LIST_HEAD(&imp->imp_unreplied_list);
911         imp->imp_known_replied_xid = 0;
912         imp->imp_replay_cursor = &imp->imp_committed_list;
913         spin_lock_init(&imp->imp_lock);
914         imp->imp_last_success_conn = 0;
915         imp->imp_state = LUSTRE_IMP_NEW;
916         imp->imp_obd = class_incref(obd, "import", imp);
917         mutex_init(&imp->imp_sec_mutex);
918         init_waitqueue_head(&imp->imp_recovery_waitq);
919
920         atomic_set(&imp->imp_refcount, 2);
921         atomic_set(&imp->imp_unregistering, 0);
922         atomic_set(&imp->imp_inflight, 0);
923         atomic_set(&imp->imp_replay_inflight, 0);
924         atomic_set(&imp->imp_inval_count, 0);
925         INIT_LIST_HEAD(&imp->imp_conn_list);
926         INIT_LIST_HEAD(&imp->imp_handle.h_link);
927         class_handle_hash(&imp->imp_handle, &import_handle_ops);
928         init_imp_at(&imp->imp_at);
929
930         /* the default magic is V2, will be used in connect RPC, and
931          * then adjusted according to the flags in request/reply.
932          */
933         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
934
935         return imp;
936 }
937 EXPORT_SYMBOL(class_new_import);
938
939 void class_destroy_import(struct obd_import *import)
940 {
941         LASSERT(import);
942         LASSERT(import != LP_POISON);
943
944         class_handle_unhash(&import->imp_handle);
945
946         spin_lock(&import->imp_lock);
947         import->imp_generation++;
948         spin_unlock(&import->imp_lock);
949         class_import_put(import);
950 }
951 EXPORT_SYMBOL(class_destroy_import);
952
953 #if LUSTRE_TRACKS_LOCK_EXP_REFS
954
955 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
956 {
957         spin_lock(&exp->exp_locks_list_guard);
958
959         LASSERT(lock->l_exp_refs_nr >= 0);
960
961         if (lock->l_exp_refs_target && lock->l_exp_refs_target != exp) {
962                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
963                               exp, lock, lock->l_exp_refs_target);
964         }
965         if ((lock->l_exp_refs_nr++) == 0) {
966                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
967                 lock->l_exp_refs_target = exp;
968         }
969         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
970                lock, exp, lock->l_exp_refs_nr);
971         spin_unlock(&exp->exp_locks_list_guard);
972 }
973
974 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
975 {
976         spin_lock(&exp->exp_locks_list_guard);
977         LASSERT(lock->l_exp_refs_nr > 0);
978         if (lock->l_exp_refs_target != exp) {
979                 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
980                               lock, lock->l_exp_refs_target, exp);
981         }
982         if (-- lock->l_exp_refs_nr == 0) {
983                 list_del_init(&lock->l_exp_refs_link);
984                 lock->l_exp_refs_target = NULL;
985         }
986         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
987                lock, exp, lock->l_exp_refs_nr);
988         spin_unlock(&exp->exp_locks_list_guard);
989 }
990 #endif
991
992 /* A connection defines an export context in which preallocation can
993  * be managed. This releases the export pointer reference, and returns
994  * the export handle, so the export refcount is 1 when this function
995  * returns.
996  */
997 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
998                   struct obd_uuid *cluuid)
999 {
1000         struct obd_export *export;
1001
1002         LASSERT(conn);
1003         LASSERT(obd);
1004         LASSERT(cluuid);
1005
1006         export = class_new_export(obd, cluuid);
1007         if (IS_ERR(export))
1008                 return PTR_ERR(export);
1009
1010         conn->cookie = export->exp_handle.h_cookie;
1011         class_export_put(export);
1012
1013         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1014                cluuid->uuid, conn->cookie);
1015         return 0;
1016 }
1017 EXPORT_SYMBOL(class_connect);
1018
1019 /* This function removes 1-3 references from the export:
1020  * 1 - for export pointer passed
1021  * and if disconnect really need
1022  * 2 - removing from hash
1023  * 3 - in client_unlink_export
1024  * The export pointer passed to this function can destroyed
1025  */
1026 int class_disconnect(struct obd_export *export)
1027 {
1028         int already_disconnected;
1029
1030         if (!export) {
1031                 CWARN("attempting to free NULL export %p\n", export);
1032                 return -EINVAL;
1033         }
1034
1035         spin_lock(&export->exp_lock);
1036         already_disconnected = export->exp_disconnected;
1037         export->exp_disconnected = 1;
1038         spin_unlock(&export->exp_lock);
1039
1040         /* class_cleanup(), abort_recovery(), and class_fail_export()
1041          * all end up in here, and if any of them race we shouldn't
1042          * call extra class_export_puts().
1043          */
1044         if (already_disconnected)
1045                 goto no_disconn;
1046
1047         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1048                export->exp_handle.h_cookie);
1049
1050         class_unlink_export(export);
1051 no_disconn:
1052         class_export_put(export);
1053         return 0;
1054 }
1055 EXPORT_SYMBOL(class_disconnect);
1056
1057 void class_fail_export(struct obd_export *exp)
1058 {
1059         int rc, already_failed;
1060
1061         spin_lock(&exp->exp_lock);
1062         already_failed = exp->exp_failed;
1063         exp->exp_failed = 1;
1064         spin_unlock(&exp->exp_lock);
1065
1066         if (already_failed) {
1067                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1068                        exp, exp->exp_client_uuid.uuid);
1069                 return;
1070         }
1071
1072         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1073                exp, exp->exp_client_uuid.uuid);
1074
1075         if (obd_dump_on_timeout)
1076                 libcfs_debug_dumplog();
1077
1078         /* need for safe call CDEBUG after obd_disconnect */
1079         class_export_get(exp);
1080
1081         /* Most callers into obd_disconnect are removing their own reference
1082          * (request, for example) in addition to the one from the hash table.
1083          * We don't have such a reference here, so make one.
1084          */
1085         class_export_get(exp);
1086         rc = obd_disconnect(exp);
1087         if (rc)
1088                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1089         else
1090                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1091                        exp, exp->exp_client_uuid.uuid);
1092         class_export_put(exp);
1093 }
1094 EXPORT_SYMBOL(class_fail_export);
1095
1096 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1097 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1098 #endif
1099
1100 /* Total amount of zombies to be destroyed */
1101 static int zombies_count;
1102
1103 /**
1104  * kill zombie imports and exports
1105  */
1106 static void obd_zombie_impexp_cull(void)
1107 {
1108         struct obd_import *import;
1109         struct obd_export *export;
1110
1111         do {
1112                 spin_lock(&obd_zombie_impexp_lock);
1113
1114                 import = NULL;
1115                 if (!list_empty(&obd_zombie_imports)) {
1116                         import = list_entry(obd_zombie_imports.next,
1117                                             struct obd_import,
1118                                             imp_zombie_chain);
1119                         list_del_init(&import->imp_zombie_chain);
1120                 }
1121
1122                 export = NULL;
1123                 if (!list_empty(&obd_zombie_exports)) {
1124                         export = list_entry(obd_zombie_exports.next,
1125                                             struct obd_export,
1126                                             exp_obd_chain);
1127                         list_del_init(&export->exp_obd_chain);
1128                 }
1129
1130                 spin_unlock(&obd_zombie_impexp_lock);
1131
1132                 if (import) {
1133                         class_import_destroy(import);
1134                         spin_lock(&obd_zombie_impexp_lock);
1135                         zombies_count--;
1136                         spin_unlock(&obd_zombie_impexp_lock);
1137                 }
1138
1139                 if (export) {
1140                         class_export_destroy(export);
1141                         spin_lock(&obd_zombie_impexp_lock);
1142                         zombies_count--;
1143                         spin_unlock(&obd_zombie_impexp_lock);
1144                 }
1145
1146                 cond_resched();
1147         } while (import || export);
1148 }
1149
1150 static struct completion        obd_zombie_start;
1151 static struct completion        obd_zombie_stop;
1152 static unsigned long            obd_zombie_flags;
1153 static wait_queue_head_t                obd_zombie_waitq;
1154 static pid_t                    obd_zombie_pid;
1155
1156 enum {
1157         OBD_ZOMBIE_STOP         = 0x0001,
1158 };
1159
1160 /**
1161  * check for work for kill zombie import/export thread.
1162  */
1163 static int obd_zombie_impexp_check(void *arg)
1164 {
1165         int rc;
1166
1167         spin_lock(&obd_zombie_impexp_lock);
1168         rc = (zombies_count == 0) &&
1169              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1170         spin_unlock(&obd_zombie_impexp_lock);
1171
1172         return rc;
1173 }
1174
1175 /**
1176  * Add export to the obd_zombie thread and notify it.
1177  */
1178 static void obd_zombie_export_add(struct obd_export *exp)
1179 {
1180         spin_lock(&exp->exp_obd->obd_dev_lock);
1181         LASSERT(!list_empty(&exp->exp_obd_chain));
1182         list_del_init(&exp->exp_obd_chain);
1183         spin_unlock(&exp->exp_obd->obd_dev_lock);
1184         spin_lock(&obd_zombie_impexp_lock);
1185         zombies_count++;
1186         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1187         spin_unlock(&obd_zombie_impexp_lock);
1188
1189         obd_zombie_impexp_notify();
1190 }
1191
1192 /**
1193  * Add import to the obd_zombie thread and notify it.
1194  */
1195 static void obd_zombie_import_add(struct obd_import *imp)
1196 {
1197         LASSERT(!imp->imp_sec);
1198         spin_lock(&obd_zombie_impexp_lock);
1199         LASSERT(list_empty(&imp->imp_zombie_chain));
1200         zombies_count++;
1201         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1202         spin_unlock(&obd_zombie_impexp_lock);
1203
1204         obd_zombie_impexp_notify();
1205 }
1206
1207 /**
1208  * notify import/export destroy thread about new zombie.
1209  */
1210 static void obd_zombie_impexp_notify(void)
1211 {
1212         /*
1213          * Make sure obd_zombie_impexp_thread get this notification.
1214          * It is possible this signal only get by obd_zombie_barrier, and
1215          * barrier gulps this notification and sleeps away and hangs ensues
1216          */
1217         wake_up_all(&obd_zombie_waitq);
1218 }
1219
1220 /**
1221  * check whether obd_zombie is idle
1222  */
1223 static int obd_zombie_is_idle(void)
1224 {
1225         int rc;
1226
1227         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1228         spin_lock(&obd_zombie_impexp_lock);
1229         rc = (zombies_count == 0);
1230         spin_unlock(&obd_zombie_impexp_lock);
1231         return rc;
1232 }
1233
1234 /**
1235  * wait when obd_zombie import/export queues become empty
1236  */
1237 void obd_zombie_barrier(void)
1238 {
1239         struct l_wait_info lwi = { 0 };
1240
1241         if (obd_zombie_pid == current_pid())
1242                 /* don't wait for myself */
1243                 return;
1244         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1245 }
1246 EXPORT_SYMBOL(obd_zombie_barrier);
1247
1248 /**
1249  * destroy zombie export/import thread.
1250  */
1251 static int obd_zombie_impexp_thread(void *unused)
1252 {
1253         unshare_fs_struct();
1254         complete(&obd_zombie_start);
1255
1256         obd_zombie_pid = current_pid();
1257
1258         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1259                 struct l_wait_info lwi = { 0 };
1260
1261                 l_wait_event(obd_zombie_waitq,
1262                              !obd_zombie_impexp_check(NULL), &lwi);
1263                 obd_zombie_impexp_cull();
1264
1265                 /*
1266                  * Notify obd_zombie_barrier callers that queues
1267                  * may be empty.
1268                  */
1269                 wake_up(&obd_zombie_waitq);
1270         }
1271
1272         complete(&obd_zombie_stop);
1273
1274         return 0;
1275 }
1276
1277 /**
1278  * start destroy zombie import/export thread
1279  */
1280 int obd_zombie_impexp_init(void)
1281 {
1282         struct task_struct *task;
1283
1284         INIT_LIST_HEAD(&obd_zombie_imports);
1285         INIT_LIST_HEAD(&obd_zombie_exports);
1286         spin_lock_init(&obd_zombie_impexp_lock);
1287         init_completion(&obd_zombie_start);
1288         init_completion(&obd_zombie_stop);
1289         init_waitqueue_head(&obd_zombie_waitq);
1290         obd_zombie_pid = 0;
1291
1292         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1293         if (IS_ERR(task))
1294                 return PTR_ERR(task);
1295
1296         wait_for_completion(&obd_zombie_start);
1297         return 0;
1298 }
1299
1300 /**
1301  * stop destroy zombie import/export thread
1302  */
1303 void obd_zombie_impexp_stop(void)
1304 {
1305         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1306         obd_zombie_impexp_notify();
1307         wait_for_completion(&obd_zombie_stop);
1308 }
1309
1310 struct obd_request_slot_waiter {
1311         struct list_head        orsw_entry;
1312         wait_queue_head_t       orsw_waitq;
1313         bool                    orsw_signaled;
1314 };
1315
1316 static bool obd_request_slot_avail(struct client_obd *cli,
1317                                    struct obd_request_slot_waiter *orsw)
1318 {
1319         bool avail;
1320
1321         spin_lock(&cli->cl_loi_list_lock);
1322         avail = !!list_empty(&orsw->orsw_entry);
1323         spin_unlock(&cli->cl_loi_list_lock);
1324
1325         return avail;
1326 };
1327
1328 /*
1329  * For network flow control, the RPC sponsor needs to acquire a credit
1330  * before sending the RPC. The credits count for a connection is defined
1331  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1332  * the subsequent RPC sponsors need to wait until others released their
1333  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1334  */
1335 int obd_get_request_slot(struct client_obd *cli)
1336 {
1337         struct obd_request_slot_waiter orsw;
1338         struct l_wait_info lwi;
1339         int rc;
1340
1341         spin_lock(&cli->cl_loi_list_lock);
1342         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1343                 cli->cl_r_in_flight++;
1344                 spin_unlock(&cli->cl_loi_list_lock);
1345                 return 0;
1346         }
1347
1348         init_waitqueue_head(&orsw.orsw_waitq);
1349         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1350         orsw.orsw_signaled = false;
1351         spin_unlock(&cli->cl_loi_list_lock);
1352
1353         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1354         rc = l_wait_event(orsw.orsw_waitq,
1355                           obd_request_slot_avail(cli, &orsw) ||
1356                           orsw.orsw_signaled,
1357                           &lwi);
1358
1359         /*
1360          * Here, we must take the lock to avoid the on-stack 'orsw' to be
1361          * freed but other (such as obd_put_request_slot) is using it.
1362          */
1363         spin_lock(&cli->cl_loi_list_lock);
1364         if (rc) {
1365                 if (!orsw.orsw_signaled) {
1366                         if (list_empty(&orsw.orsw_entry))
1367                                 cli->cl_r_in_flight--;
1368                         else
1369                                 list_del(&orsw.orsw_entry);
1370                 }
1371         }
1372
1373         if (orsw.orsw_signaled) {
1374                 LASSERT(list_empty(&orsw.orsw_entry));
1375
1376                 rc = -EINTR;
1377         }
1378         spin_unlock(&cli->cl_loi_list_lock);
1379
1380         return rc;
1381 }
1382 EXPORT_SYMBOL(obd_get_request_slot);
1383
1384 void obd_put_request_slot(struct client_obd *cli)
1385 {
1386         struct obd_request_slot_waiter *orsw;
1387
1388         spin_lock(&cli->cl_loi_list_lock);
1389         cli->cl_r_in_flight--;
1390
1391         /* If there is free slot, wakeup the first waiter. */
1392         if (!list_empty(&cli->cl_loi_read_list) &&
1393             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1394                 orsw = list_entry(cli->cl_loi_read_list.next,
1395                                   struct obd_request_slot_waiter, orsw_entry);
1396                 list_del_init(&orsw->orsw_entry);
1397                 cli->cl_r_in_flight++;
1398                 wake_up(&orsw->orsw_waitq);
1399         }
1400         spin_unlock(&cli->cl_loi_list_lock);
1401 }
1402 EXPORT_SYMBOL(obd_put_request_slot);
1403
1404 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1405 {
1406         return cli->cl_max_rpcs_in_flight;
1407 }
1408 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1409
1410 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1411 {
1412         struct obd_request_slot_waiter *orsw;
1413         const char *typ_name;
1414         __u32 old;
1415         int diff;
1416         int rc;
1417         int i;
1418
1419         if (max > OBD_MAX_RIF_MAX || max < 1)
1420                 return -ERANGE;
1421
1422         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
1423         if (!strcmp(typ_name, LUSTRE_MDC_NAME)) {
1424                 /*
1425                  * adjust max_mod_rpcs_in_flight to ensure it is always
1426                  * strictly lower that max_rpcs_in_flight
1427                  */
1428                 if (max < 2) {
1429                         CERROR("%s: cannot set max_rpcs_in_flight to 1 because it must be higher than max_mod_rpcs_in_flight value\n",
1430                                cli->cl_import->imp_obd->obd_name);
1431                         return -ERANGE;
1432                 }
1433                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
1434                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
1435                         if (rc)
1436                                 return rc;
1437                 }
1438         }
1439
1440         spin_lock(&cli->cl_loi_list_lock);
1441         old = cli->cl_max_rpcs_in_flight;
1442         cli->cl_max_rpcs_in_flight = max;
1443         diff = max - old;
1444
1445         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1446         for (i = 0; i < diff; i++) {
1447                 if (list_empty(&cli->cl_loi_read_list))
1448                         break;
1449
1450                 orsw = list_entry(cli->cl_loi_read_list.next,
1451                                   struct obd_request_slot_waiter, orsw_entry);
1452                 list_del_init(&orsw->orsw_entry);
1453                 cli->cl_r_in_flight++;
1454                 wake_up(&orsw->orsw_waitq);
1455         }
1456         spin_unlock(&cli->cl_loi_list_lock);
1457
1458         return 0;
1459 }
1460 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
1461
1462 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
1463 {
1464         struct obd_connect_data *ocd;
1465         u16 maxmodrpcs;
1466         u16 prev;
1467
1468         if (max > OBD_MAX_RIF_MAX || max < 1)
1469                 return -ERANGE;
1470
1471         /* cannot exceed or equal max_rpcs_in_flight */
1472         if (max >= cli->cl_max_rpcs_in_flight) {
1473                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) higher or equal to max_rpcs_in_flight value (%u)\n",
1474                        cli->cl_import->imp_obd->obd_name,
1475                        max, cli->cl_max_rpcs_in_flight);
1476                 return -ERANGE;
1477         }
1478
1479         /* cannot exceed max modify RPCs in flight supported by the server */
1480         ocd = &cli->cl_import->imp_connect_data;
1481         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
1482                 maxmodrpcs = ocd->ocd_maxmodrpcs;
1483         else
1484                 maxmodrpcs = 1;
1485         if (max > maxmodrpcs) {
1486                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) higher than max_mod_rpcs_per_client value (%hu) returned by the server at connection\n",
1487                        cli->cl_import->imp_obd->obd_name,
1488                        max, maxmodrpcs);
1489                 return -ERANGE;
1490         }
1491
1492         spin_lock(&cli->cl_mod_rpcs_lock);
1493
1494         prev = cli->cl_max_mod_rpcs_in_flight;
1495         cli->cl_max_mod_rpcs_in_flight = max;
1496
1497         /* wakeup waiters if limit has been increased */
1498         if (cli->cl_max_mod_rpcs_in_flight > prev)
1499                 wake_up(&cli->cl_mod_rpcs_waitq);
1500
1501         spin_unlock(&cli->cl_mod_rpcs_lock);
1502
1503         return 0;
1504 }
1505 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
1506
1507 #define pct(a, b) (b ? (a * 100) / b : 0)
1508
1509 int obd_mod_rpc_stats_seq_show(struct client_obd *cli, struct seq_file *seq)
1510 {
1511         unsigned long mod_tot = 0, mod_cum;
1512         struct timespec64 now;
1513         int i;
1514
1515         ktime_get_real_ts64(&now);
1516
1517         spin_lock(&cli->cl_mod_rpcs_lock);
1518
1519         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
1520                    (s64)now.tv_sec, (unsigned long)now.tv_nsec);
1521         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
1522                    cli->cl_mod_rpcs_in_flight);
1523
1524         seq_puts(seq, "\n\t\t\tmodify\n");
1525         seq_puts(seq, "rpcs in flight        rpcs   %% cum %%\n");
1526
1527         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
1528
1529         mod_cum = 0;
1530         for (i = 0; i < OBD_HIST_MAX; i++) {
1531                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
1532
1533                 mod_cum += mod;
1534                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
1535                            i, mod, pct(mod, mod_tot),
1536                            pct(mod_cum, mod_tot));
1537                 if (mod_cum == mod_tot)
1538                         break;
1539         }
1540
1541         spin_unlock(&cli->cl_mod_rpcs_lock);
1542
1543         return 0;
1544 }
1545 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
1546 #undef pct
1547
1548 /*
1549  * The number of modify RPCs sent in parallel is limited
1550  * because the server has a finite number of slots per client to
1551  * store request result and ensure reply reconstruction when needed.
1552  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
1553  * that takes into account server limit and cl_max_rpcs_in_flight
1554  * value.
1555  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
1556  * one close request is allowed above the maximum.
1557  */
1558 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
1559                                                  bool close_req)
1560 {
1561         bool avail;
1562
1563         /* A slot is available if
1564          * - number of modify RPCs in flight is less than the max
1565          * - it's a close RPC and no other close request is in flight
1566          */
1567         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
1568                 (close_req && !cli->cl_close_rpcs_in_flight);
1569
1570         return avail;
1571 }
1572
1573 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
1574                                           bool close_req)
1575 {
1576         bool avail;
1577
1578         spin_lock(&cli->cl_mod_rpcs_lock);
1579         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
1580         spin_unlock(&cli->cl_mod_rpcs_lock);
1581         return avail;
1582 }
1583
1584 /* Get a modify RPC slot from the obd client @cli according
1585  * to the kind of operation @opc that is going to be sent
1586  * and the intent @it of the operation if it applies.
1587  * If the maximum number of modify RPCs in flight is reached
1588  * the thread is put to sleep.
1589  * Returns the tag to be set in the request message. Tag 0
1590  * is reserved for non-modifying requests.
1591  */
1592 u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
1593                          struct lookup_intent *it)
1594 {
1595         struct l_wait_info lwi = LWI_INTR(NULL, NULL);
1596         bool close_req = false;
1597         u16 i, max;
1598
1599         /* read-only metadata RPCs don't consume a slot on MDT
1600          * for reply reconstruction
1601          */
1602         if (it && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
1603                    it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
1604                 return 0;
1605
1606         if (opc == MDS_CLOSE)
1607                 close_req = true;
1608
1609         do {
1610                 spin_lock(&cli->cl_mod_rpcs_lock);
1611                 max = cli->cl_max_mod_rpcs_in_flight;
1612                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
1613                         /* there is a slot available */
1614                         cli->cl_mod_rpcs_in_flight++;
1615                         if (close_req)
1616                                 cli->cl_close_rpcs_in_flight++;
1617                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
1618                                          cli->cl_mod_rpcs_in_flight);
1619                         /* find a free tag */
1620                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
1621                                                 max + 1);
1622                         LASSERT(i < OBD_MAX_RIF_MAX);
1623                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
1624                         spin_unlock(&cli->cl_mod_rpcs_lock);
1625                         /* tag 0 is reserved for non-modify RPCs */
1626                         return i + 1;
1627                 }
1628                 spin_unlock(&cli->cl_mod_rpcs_lock);
1629
1630                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot opc %u, max %hu\n",
1631                        cli->cl_import->imp_obd->obd_name, opc, max);
1632
1633                 l_wait_event(cli->cl_mod_rpcs_waitq,
1634                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
1635         } while (true);
1636 }
1637 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
1638
1639 /*
1640  * Put a modify RPC slot from the obd client @cli according
1641  * to the kind of operation @opc that has been sent and the
1642  * intent @it of the operation if it applies.
1643  */
1644 void obd_put_mod_rpc_slot(struct client_obd *cli, u32 opc,
1645                           struct lookup_intent *it, u16 tag)
1646 {
1647         bool close_req = false;
1648
1649         if (it && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
1650                    it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
1651                 return;
1652
1653         if (opc == MDS_CLOSE)
1654                 close_req = true;
1655
1656         spin_lock(&cli->cl_mod_rpcs_lock);
1657         cli->cl_mod_rpcs_in_flight--;
1658         if (close_req)
1659                 cli->cl_close_rpcs_in_flight--;
1660         /* release the tag in the bitmap */
1661         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
1662         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
1663         spin_unlock(&cli->cl_mod_rpcs_lock);
1664         wake_up(&cli->cl_mod_rpcs_waitq);
1665 }
1666 EXPORT_SYMBOL(obd_put_mod_rpc_slot);