4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
39 #include <obd_class.h>
40 #include <lprocfs_status.h>
41 #include <lustre_kernelcomm.h>
43 spinlock_t obd_types_lock;
45 static struct kmem_cache *obd_device_cachep;
46 struct kmem_cache *obdo_cachep;
47 EXPORT_SYMBOL(obdo_cachep);
48 static struct kmem_cache *import_cachep;
50 static struct list_head obd_zombie_imports;
51 static struct list_head obd_zombie_exports;
52 static spinlock_t obd_zombie_impexp_lock;
53 static void obd_zombie_impexp_notify(void);
54 static void obd_zombie_export_add(struct obd_export *exp);
55 static void obd_zombie_import_add(struct obd_import *imp);
57 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
58 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
61 * support functions: we could use inter-module communication, but this
62 * is more portable to other OS's
64 static struct obd_device *obd_device_alloc(void)
66 struct obd_device *obd;
68 obd = kmem_cache_zalloc(obd_device_cachep, GFP_NOFS);
70 obd->obd_magic = OBD_DEVICE_MAGIC;
74 static void obd_device_free(struct obd_device *obd)
76 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
77 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
78 if (obd->obd_namespace) {
79 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
80 obd, obd->obd_namespace, obd->obd_force);
83 lu_ref_fini(&obd->obd_reference);
84 kmem_cache_free(obd_device_cachep, obd);
87 static struct obd_type *class_search_type(const char *name)
89 struct list_head *tmp;
90 struct obd_type *type;
92 spin_lock(&obd_types_lock);
93 list_for_each(tmp, &obd_types) {
94 type = list_entry(tmp, struct obd_type, typ_chain);
95 if (strcmp(type->typ_name, name) == 0) {
96 spin_unlock(&obd_types_lock);
100 spin_unlock(&obd_types_lock);
104 static struct obd_type *class_get_type(const char *name)
106 struct obd_type *type = class_search_type(name);
109 const char *modname = name;
111 if (!request_module("%s", modname)) {
112 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
113 type = class_search_type(name);
115 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
120 spin_lock(&type->obd_type_lock);
122 try_module_get(type->typ_dt_ops->owner);
123 spin_unlock(&type->obd_type_lock);
128 void class_put_type(struct obd_type *type)
131 spin_lock(&type->obd_type_lock);
133 module_put(type->typ_dt_ops->owner);
134 spin_unlock(&type->obd_type_lock);
137 #define CLASS_MAX_NAME 1024
139 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
141 struct lu_device_type *ldt)
143 struct obd_type *type;
147 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
149 if (class_search_type(name)) {
150 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
155 type = kzalloc(sizeof(*type), GFP_NOFS);
159 type->typ_dt_ops = kzalloc(sizeof(*type->typ_dt_ops), GFP_NOFS);
160 type->typ_md_ops = kzalloc(sizeof(*type->typ_md_ops), GFP_NOFS);
161 type->typ_name = kzalloc(strlen(name) + 1, GFP_NOFS);
163 if (!type->typ_dt_ops ||
168 *type->typ_dt_ops = *dt_ops;
169 /* md_ops is optional */
171 *type->typ_md_ops = *md_ops;
172 strcpy(type->typ_name, name);
173 spin_lock_init(&type->obd_type_lock);
175 type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
178 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
179 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
181 type->typ_debugfs_entry = NULL;
185 type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
186 if (!type->typ_kobj) {
193 rc = lu_device_type_init(ldt);
198 spin_lock(&obd_types_lock);
199 list_add(&type->typ_chain, &obd_types);
200 spin_unlock(&obd_types_lock);
206 kobject_put(type->typ_kobj);
207 kfree(type->typ_name);
208 kfree(type->typ_md_ops);
209 kfree(type->typ_dt_ops);
213 EXPORT_SYMBOL(class_register_type);
215 int class_unregister_type(const char *name)
217 struct obd_type *type = class_search_type(name);
220 CERROR("unknown obd type\n");
224 if (type->typ_refcnt) {
225 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
226 /* This is a bad situation, let's make the best of it */
227 /* Remove ops, but leave the name for debugging */
228 kfree(type->typ_dt_ops);
229 kfree(type->typ_md_ops);
234 kobject_put(type->typ_kobj);
236 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
237 ldebugfs_remove(&type->typ_debugfs_entry);
240 lu_device_type_fini(type->typ_lu);
242 spin_lock(&obd_types_lock);
243 list_del(&type->typ_chain);
244 spin_unlock(&obd_types_lock);
245 kfree(type->typ_name);
246 kfree(type->typ_dt_ops);
247 kfree(type->typ_md_ops);
250 } /* class_unregister_type */
251 EXPORT_SYMBOL(class_unregister_type);
254 * Create a new obd device.
256 * Find an empty slot in ::obd_devs[], create a new obd device in it.
258 * \param[in] type_name obd device type string.
259 * \param[in] name obd device name.
261 * \retval NULL if create fails, otherwise return the obd device
264 struct obd_device *class_newdev(const char *type_name, const char *name)
266 struct obd_device *result = NULL;
267 struct obd_device *newdev;
268 struct obd_type *type = NULL;
270 int new_obd_minor = 0;
272 if (strlen(name) >= MAX_OBD_NAME) {
273 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
274 return ERR_PTR(-EINVAL);
277 type = class_get_type(type_name);
279 CERROR("OBD: unknown type: %s\n", type_name);
280 return ERR_PTR(-ENODEV);
283 newdev = obd_device_alloc();
285 result = ERR_PTR(-ENOMEM);
289 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
291 write_lock(&obd_dev_lock);
292 for (i = 0; i < class_devno_max(); i++) {
293 struct obd_device *obd = class_num2obd(i);
295 if (obd && (strcmp(name, obd->obd_name) == 0)) {
296 CERROR("Device %s already exists at %d, won't add\n",
299 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
300 "%p obd_magic %08x != %08x\n", result,
301 result->obd_magic, OBD_DEVICE_MAGIC);
302 LASSERTF(result->obd_minor == new_obd_minor,
303 "%p obd_minor %d != %d\n", result,
304 result->obd_minor, new_obd_minor);
306 obd_devs[result->obd_minor] = NULL;
307 result->obd_name[0] = '\0';
309 result = ERR_PTR(-EEXIST);
312 if (!result && !obd) {
314 result->obd_minor = i;
316 result->obd_type = type;
317 strncpy(result->obd_name, name,
318 sizeof(result->obd_name) - 1);
319 obd_devs[i] = result;
322 write_unlock(&obd_dev_lock);
324 if (!result && i >= class_devno_max()) {
325 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
327 result = ERR_PTR(-EOVERFLOW);
334 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
335 result->obd_name, result);
339 obd_device_free(newdev);
341 class_put_type(type);
345 void class_release_dev(struct obd_device *obd)
347 struct obd_type *obd_type = obd->obd_type;
349 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
350 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
351 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
352 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
355 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
356 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
358 write_lock(&obd_dev_lock);
359 obd_devs[obd->obd_minor] = NULL;
360 write_unlock(&obd_dev_lock);
361 obd_device_free(obd);
363 class_put_type(obd_type);
366 int class_name2dev(const char *name)
373 read_lock(&obd_dev_lock);
374 for (i = 0; i < class_devno_max(); i++) {
375 struct obd_device *obd = class_num2obd(i);
377 if (obd && strcmp(name, obd->obd_name) == 0) {
378 /* Make sure we finished attaching before we give
381 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
382 if (obd->obd_attached) {
383 read_unlock(&obd_dev_lock);
389 read_unlock(&obd_dev_lock);
394 struct obd_device *class_name2obd(const char *name)
396 int dev = class_name2dev(name);
398 if (dev < 0 || dev > class_devno_max())
400 return class_num2obd(dev);
402 EXPORT_SYMBOL(class_name2obd);
404 int class_uuid2dev(struct obd_uuid *uuid)
408 read_lock(&obd_dev_lock);
409 for (i = 0; i < class_devno_max(); i++) {
410 struct obd_device *obd = class_num2obd(i);
412 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
413 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
414 read_unlock(&obd_dev_lock);
418 read_unlock(&obd_dev_lock);
424 * Get obd device from ::obd_devs[]
426 * \param num [in] array index
428 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
429 * otherwise return the obd device there.
431 struct obd_device *class_num2obd(int num)
433 struct obd_device *obd = NULL;
435 if (num < class_devno_max()) {
440 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
441 "%p obd_magic %08x != %08x\n",
442 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
443 LASSERTF(obd->obd_minor == num,
444 "%p obd_minor %0d != %0d\n",
445 obd, obd->obd_minor, num);
451 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
452 * specified, then only the client with that uuid is returned,
453 * otherwise any client connected to the tgt is returned.
455 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
456 const char *typ_name,
457 struct obd_uuid *grp_uuid)
461 read_lock(&obd_dev_lock);
462 for (i = 0; i < class_devno_max(); i++) {
463 struct obd_device *obd = class_num2obd(i);
467 if ((strncmp(obd->obd_type->typ_name, typ_name,
468 strlen(typ_name)) == 0)) {
469 if (obd_uuid_equals(tgt_uuid,
470 &obd->u.cli.cl_target_uuid) &&
471 ((grp_uuid) ? obd_uuid_equals(grp_uuid,
472 &obd->obd_uuid) : 1)) {
473 read_unlock(&obd_dev_lock);
478 read_unlock(&obd_dev_lock);
482 EXPORT_SYMBOL(class_find_client_obd);
484 /* Iterate the obd_device list looking devices have grp_uuid. Start
485 * searching at *next, and if a device is found, the next index to look
486 * at is saved in *next. If next is NULL, then the first matching device
487 * will always be returned.
489 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
495 else if (*next >= 0 && *next < class_devno_max())
500 read_lock(&obd_dev_lock);
501 for (; i < class_devno_max(); i++) {
502 struct obd_device *obd = class_num2obd(i);
506 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
509 read_unlock(&obd_dev_lock);
513 read_unlock(&obd_dev_lock);
517 EXPORT_SYMBOL(class_devices_in_group);
520 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
521 * adjust sptlrpc settings accordingly.
523 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
525 struct obd_device *obd;
529 LASSERT(namelen > 0);
531 read_lock(&obd_dev_lock);
532 for (i = 0; i < class_devno_max(); i++) {
533 obd = class_num2obd(i);
535 if (!obd || obd->obd_set_up == 0 || obd->obd_stopping)
538 /* only notify mdc, osc, mdt, ost */
539 type = obd->obd_type->typ_name;
540 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
541 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
542 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
543 strcmp(type, LUSTRE_OST_NAME) != 0)
546 if (strncmp(obd->obd_name, fsname, namelen))
549 class_incref(obd, __func__, obd);
550 read_unlock(&obd_dev_lock);
551 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
552 sizeof(KEY_SPTLRPC_CONF),
553 KEY_SPTLRPC_CONF, 0, NULL, NULL);
555 class_decref(obd, __func__, obd);
556 read_lock(&obd_dev_lock);
558 read_unlock(&obd_dev_lock);
561 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
563 void obd_cleanup_caches(void)
565 kmem_cache_destroy(obd_device_cachep);
566 obd_device_cachep = NULL;
567 kmem_cache_destroy(obdo_cachep);
569 kmem_cache_destroy(import_cachep);
570 import_cachep = NULL;
573 int obd_init_caches(void)
575 LASSERT(!obd_device_cachep);
576 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
577 sizeof(struct obd_device),
579 if (!obd_device_cachep)
582 LASSERT(!obdo_cachep);
583 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
588 LASSERT(!import_cachep);
589 import_cachep = kmem_cache_create("ll_import_cache",
590 sizeof(struct obd_import),
597 obd_cleanup_caches();
601 /* map connection to client */
602 struct obd_export *class_conn2export(struct lustre_handle *conn)
604 struct obd_export *export;
607 CDEBUG(D_CACHE, "looking for null handle\n");
611 if (conn->cookie == -1) { /* this means assign a new connection */
612 CDEBUG(D_CACHE, "want a new connection\n");
616 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
617 export = class_handle2object(conn->cookie, NULL);
620 EXPORT_SYMBOL(class_conn2export);
622 struct obd_device *class_exp2obd(struct obd_export *exp)
628 EXPORT_SYMBOL(class_exp2obd);
630 struct obd_import *class_exp2cliimp(struct obd_export *exp)
632 struct obd_device *obd = exp->exp_obd;
636 return obd->u.cli.cl_import;
638 EXPORT_SYMBOL(class_exp2cliimp);
640 /* Export management functions */
641 static void class_export_destroy(struct obd_export *exp)
643 struct obd_device *obd = exp->exp_obd;
645 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
648 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
649 exp->exp_client_uuid.uuid, obd->obd_name);
651 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
652 if (exp->exp_connection)
653 ptlrpc_put_connection_superhack(exp->exp_connection);
655 LASSERT(list_empty(&exp->exp_outstanding_replies));
656 LASSERT(list_empty(&exp->exp_uncommitted_replies));
657 LASSERT(list_empty(&exp->exp_req_replay_queue));
658 LASSERT(list_empty(&exp->exp_hp_rpcs));
659 obd_destroy_export(exp);
660 class_decref(obd, "export", exp);
662 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
665 static void export_handle_addref(void *export)
667 class_export_get(export);
670 static struct portals_handle_ops export_handle_ops = {
671 .hop_addref = export_handle_addref,
675 struct obd_export *class_export_get(struct obd_export *exp)
677 atomic_inc(&exp->exp_refcount);
678 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
679 atomic_read(&exp->exp_refcount));
682 EXPORT_SYMBOL(class_export_get);
684 void class_export_put(struct obd_export *exp)
686 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
687 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
688 atomic_read(&exp->exp_refcount) - 1);
690 if (atomic_dec_and_test(&exp->exp_refcount)) {
691 LASSERT(!list_empty(&exp->exp_obd_chain));
692 CDEBUG(D_IOCTL, "final put %p/%s\n",
693 exp, exp->exp_client_uuid.uuid);
695 /* release nid stat refererence */
696 lprocfs_exp_cleanup(exp);
698 obd_zombie_export_add(exp);
701 EXPORT_SYMBOL(class_export_put);
703 /* Creates a new export, adds it to the hash table, and returns a
704 * pointer to it. The refcount is 2: one for the hash reference, and
705 * one for the pointer returned by this function.
707 struct obd_export *class_new_export(struct obd_device *obd,
708 struct obd_uuid *cluuid)
710 struct obd_export *export;
711 struct cfs_hash *hash = NULL;
714 export = kzalloc(sizeof(*export), GFP_NOFS);
716 return ERR_PTR(-ENOMEM);
718 export->exp_conn_cnt = 0;
719 export->exp_lock_hash = NULL;
720 export->exp_flock_hash = NULL;
721 atomic_set(&export->exp_refcount, 2);
722 atomic_set(&export->exp_rpc_count, 0);
723 atomic_set(&export->exp_cb_count, 0);
724 atomic_set(&export->exp_locks_count, 0);
725 #if LUSTRE_TRACKS_LOCK_EXP_REFS
726 INIT_LIST_HEAD(&export->exp_locks_list);
727 spin_lock_init(&export->exp_locks_list_guard);
729 atomic_set(&export->exp_replay_count, 0);
730 export->exp_obd = obd;
731 INIT_LIST_HEAD(&export->exp_outstanding_replies);
732 spin_lock_init(&export->exp_uncommitted_replies_lock);
733 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
734 INIT_LIST_HEAD(&export->exp_req_replay_queue);
735 INIT_LIST_HEAD(&export->exp_handle.h_link);
736 INIT_LIST_HEAD(&export->exp_hp_rpcs);
737 class_handle_hash(&export->exp_handle, &export_handle_ops);
738 spin_lock_init(&export->exp_lock);
739 spin_lock_init(&export->exp_rpc_lock);
740 INIT_HLIST_NODE(&export->exp_uuid_hash);
741 spin_lock_init(&export->exp_bl_list_lock);
742 INIT_LIST_HEAD(&export->exp_bl_list);
744 export->exp_sp_peer = LUSTRE_SP_ANY;
745 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
746 export->exp_client_uuid = *cluuid;
747 obd_init_export(export);
749 spin_lock(&obd->obd_dev_lock);
750 /* shouldn't happen, but might race */
751 if (obd->obd_stopping) {
756 hash = cfs_hash_getref(obd->obd_uuid_hash);
761 spin_unlock(&obd->obd_dev_lock);
763 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
764 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
766 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
767 obd->obd_name, cluuid->uuid, rc);
773 spin_lock(&obd->obd_dev_lock);
774 if (obd->obd_stopping) {
775 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
780 class_incref(obd, "export", export);
781 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
782 export->exp_obd->obd_num_exports++;
783 spin_unlock(&obd->obd_dev_lock);
784 cfs_hash_putref(hash);
788 spin_unlock(&obd->obd_dev_lock);
791 cfs_hash_putref(hash);
792 class_handle_unhash(&export->exp_handle);
793 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
794 obd_destroy_export(export);
798 EXPORT_SYMBOL(class_new_export);
800 void class_unlink_export(struct obd_export *exp)
802 class_handle_unhash(&exp->exp_handle);
804 spin_lock(&exp->exp_obd->obd_dev_lock);
805 /* delete an uuid-export hashitem from hashtables */
806 if (!hlist_unhashed(&exp->exp_uuid_hash))
807 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
808 &exp->exp_client_uuid,
809 &exp->exp_uuid_hash);
811 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
812 exp->exp_obd->obd_num_exports--;
813 spin_unlock(&exp->exp_obd->obd_dev_lock);
814 class_export_put(exp);
817 /* Import management functions */
818 static void class_import_destroy(struct obd_import *imp)
820 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
821 imp->imp_obd->obd_name);
823 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
825 ptlrpc_put_connection_superhack(imp->imp_connection);
827 while (!list_empty(&imp->imp_conn_list)) {
828 struct obd_import_conn *imp_conn;
830 imp_conn = list_entry(imp->imp_conn_list.next,
831 struct obd_import_conn, oic_item);
832 list_del_init(&imp_conn->oic_item);
833 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
837 LASSERT(!imp->imp_sec);
838 class_decref(imp->imp_obd, "import", imp);
839 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
842 static void import_handle_addref(void *import)
844 class_import_get(import);
847 static struct portals_handle_ops import_handle_ops = {
848 .hop_addref = import_handle_addref,
852 struct obd_import *class_import_get(struct obd_import *import)
854 atomic_inc(&import->imp_refcount);
855 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
856 atomic_read(&import->imp_refcount),
857 import->imp_obd->obd_name);
860 EXPORT_SYMBOL(class_import_get);
862 void class_import_put(struct obd_import *imp)
864 LASSERT(list_empty(&imp->imp_zombie_chain));
865 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
867 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
868 atomic_read(&imp->imp_refcount) - 1,
869 imp->imp_obd->obd_name);
871 if (atomic_dec_and_test(&imp->imp_refcount)) {
872 CDEBUG(D_INFO, "final put import %p\n", imp);
873 obd_zombie_import_add(imp);
876 /* catch possible import put race */
877 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
879 EXPORT_SYMBOL(class_import_put);
881 static void init_imp_at(struct imp_at *at)
885 at_init(&at->iat_net_latency, 0, 0);
886 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
887 /* max service estimates are tracked on the server side, so
888 * don't use the AT history here, just use the last reported
889 * val. (But keep hist for proc histogram, worst_ever)
891 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
896 struct obd_import *class_new_import(struct obd_device *obd)
898 struct obd_import *imp;
900 imp = kzalloc(sizeof(*imp), GFP_NOFS);
904 INIT_LIST_HEAD(&imp->imp_pinger_chain);
905 INIT_LIST_HEAD(&imp->imp_zombie_chain);
906 INIT_LIST_HEAD(&imp->imp_replay_list);
907 INIT_LIST_HEAD(&imp->imp_sending_list);
908 INIT_LIST_HEAD(&imp->imp_delayed_list);
909 INIT_LIST_HEAD(&imp->imp_committed_list);
910 INIT_LIST_HEAD(&imp->imp_unreplied_list);
911 imp->imp_known_replied_xid = 0;
912 imp->imp_replay_cursor = &imp->imp_committed_list;
913 spin_lock_init(&imp->imp_lock);
914 imp->imp_last_success_conn = 0;
915 imp->imp_state = LUSTRE_IMP_NEW;
916 imp->imp_obd = class_incref(obd, "import", imp);
917 mutex_init(&imp->imp_sec_mutex);
918 init_waitqueue_head(&imp->imp_recovery_waitq);
920 atomic_set(&imp->imp_refcount, 2);
921 atomic_set(&imp->imp_unregistering, 0);
922 atomic_set(&imp->imp_inflight, 0);
923 atomic_set(&imp->imp_replay_inflight, 0);
924 atomic_set(&imp->imp_inval_count, 0);
925 INIT_LIST_HEAD(&imp->imp_conn_list);
926 INIT_LIST_HEAD(&imp->imp_handle.h_link);
927 class_handle_hash(&imp->imp_handle, &import_handle_ops);
928 init_imp_at(&imp->imp_at);
930 /* the default magic is V2, will be used in connect RPC, and
931 * then adjusted according to the flags in request/reply.
933 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
937 EXPORT_SYMBOL(class_new_import);
939 void class_destroy_import(struct obd_import *import)
942 LASSERT(import != LP_POISON);
944 class_handle_unhash(&import->imp_handle);
946 spin_lock(&import->imp_lock);
947 import->imp_generation++;
948 spin_unlock(&import->imp_lock);
949 class_import_put(import);
951 EXPORT_SYMBOL(class_destroy_import);
953 #if LUSTRE_TRACKS_LOCK_EXP_REFS
955 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
957 spin_lock(&exp->exp_locks_list_guard);
959 LASSERT(lock->l_exp_refs_nr >= 0);
961 if (lock->l_exp_refs_target && lock->l_exp_refs_target != exp) {
962 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
963 exp, lock, lock->l_exp_refs_target);
965 if ((lock->l_exp_refs_nr++) == 0) {
966 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
967 lock->l_exp_refs_target = exp;
969 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
970 lock, exp, lock->l_exp_refs_nr);
971 spin_unlock(&exp->exp_locks_list_guard);
974 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
976 spin_lock(&exp->exp_locks_list_guard);
977 LASSERT(lock->l_exp_refs_nr > 0);
978 if (lock->l_exp_refs_target != exp) {
979 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
980 lock, lock->l_exp_refs_target, exp);
982 if (-- lock->l_exp_refs_nr == 0) {
983 list_del_init(&lock->l_exp_refs_link);
984 lock->l_exp_refs_target = NULL;
986 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
987 lock, exp, lock->l_exp_refs_nr);
988 spin_unlock(&exp->exp_locks_list_guard);
992 /* A connection defines an export context in which preallocation can
993 * be managed. This releases the export pointer reference, and returns
994 * the export handle, so the export refcount is 1 when this function
997 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
998 struct obd_uuid *cluuid)
1000 struct obd_export *export;
1006 export = class_new_export(obd, cluuid);
1008 return PTR_ERR(export);
1010 conn->cookie = export->exp_handle.h_cookie;
1011 class_export_put(export);
1013 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1014 cluuid->uuid, conn->cookie);
1017 EXPORT_SYMBOL(class_connect);
1019 /* This function removes 1-3 references from the export:
1020 * 1 - for export pointer passed
1021 * and if disconnect really need
1022 * 2 - removing from hash
1023 * 3 - in client_unlink_export
1024 * The export pointer passed to this function can destroyed
1026 int class_disconnect(struct obd_export *export)
1028 int already_disconnected;
1031 CWARN("attempting to free NULL export %p\n", export);
1035 spin_lock(&export->exp_lock);
1036 already_disconnected = export->exp_disconnected;
1037 export->exp_disconnected = 1;
1038 spin_unlock(&export->exp_lock);
1040 /* class_cleanup(), abort_recovery(), and class_fail_export()
1041 * all end up in here, and if any of them race we shouldn't
1042 * call extra class_export_puts().
1044 if (already_disconnected)
1047 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1048 export->exp_handle.h_cookie);
1050 class_unlink_export(export);
1052 class_export_put(export);
1055 EXPORT_SYMBOL(class_disconnect);
1057 void class_fail_export(struct obd_export *exp)
1059 int rc, already_failed;
1061 spin_lock(&exp->exp_lock);
1062 already_failed = exp->exp_failed;
1063 exp->exp_failed = 1;
1064 spin_unlock(&exp->exp_lock);
1066 if (already_failed) {
1067 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1068 exp, exp->exp_client_uuid.uuid);
1072 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1073 exp, exp->exp_client_uuid.uuid);
1075 if (obd_dump_on_timeout)
1076 libcfs_debug_dumplog();
1078 /* need for safe call CDEBUG after obd_disconnect */
1079 class_export_get(exp);
1081 /* Most callers into obd_disconnect are removing their own reference
1082 * (request, for example) in addition to the one from the hash table.
1083 * We don't have such a reference here, so make one.
1085 class_export_get(exp);
1086 rc = obd_disconnect(exp);
1088 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1090 CDEBUG(D_HA, "disconnected export %p/%s\n",
1091 exp, exp->exp_client_uuid.uuid);
1092 class_export_put(exp);
1094 EXPORT_SYMBOL(class_fail_export);
1096 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1097 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1100 /* Total amount of zombies to be destroyed */
1101 static int zombies_count;
1104 * kill zombie imports and exports
1106 static void obd_zombie_impexp_cull(void)
1108 struct obd_import *import;
1109 struct obd_export *export;
1112 spin_lock(&obd_zombie_impexp_lock);
1115 if (!list_empty(&obd_zombie_imports)) {
1116 import = list_entry(obd_zombie_imports.next,
1119 list_del_init(&import->imp_zombie_chain);
1123 if (!list_empty(&obd_zombie_exports)) {
1124 export = list_entry(obd_zombie_exports.next,
1127 list_del_init(&export->exp_obd_chain);
1130 spin_unlock(&obd_zombie_impexp_lock);
1133 class_import_destroy(import);
1134 spin_lock(&obd_zombie_impexp_lock);
1136 spin_unlock(&obd_zombie_impexp_lock);
1140 class_export_destroy(export);
1141 spin_lock(&obd_zombie_impexp_lock);
1143 spin_unlock(&obd_zombie_impexp_lock);
1147 } while (import || export);
1150 static struct completion obd_zombie_start;
1151 static struct completion obd_zombie_stop;
1152 static unsigned long obd_zombie_flags;
1153 static wait_queue_head_t obd_zombie_waitq;
1154 static pid_t obd_zombie_pid;
1157 OBD_ZOMBIE_STOP = 0x0001,
1161 * check for work for kill zombie import/export thread.
1163 static int obd_zombie_impexp_check(void *arg)
1167 spin_lock(&obd_zombie_impexp_lock);
1168 rc = (zombies_count == 0) &&
1169 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1170 spin_unlock(&obd_zombie_impexp_lock);
1176 * Add export to the obd_zombie thread and notify it.
1178 static void obd_zombie_export_add(struct obd_export *exp)
1180 spin_lock(&exp->exp_obd->obd_dev_lock);
1181 LASSERT(!list_empty(&exp->exp_obd_chain));
1182 list_del_init(&exp->exp_obd_chain);
1183 spin_unlock(&exp->exp_obd->obd_dev_lock);
1184 spin_lock(&obd_zombie_impexp_lock);
1186 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1187 spin_unlock(&obd_zombie_impexp_lock);
1189 obd_zombie_impexp_notify();
1193 * Add import to the obd_zombie thread and notify it.
1195 static void obd_zombie_import_add(struct obd_import *imp)
1197 LASSERT(!imp->imp_sec);
1198 spin_lock(&obd_zombie_impexp_lock);
1199 LASSERT(list_empty(&imp->imp_zombie_chain));
1201 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1202 spin_unlock(&obd_zombie_impexp_lock);
1204 obd_zombie_impexp_notify();
1208 * notify import/export destroy thread about new zombie.
1210 static void obd_zombie_impexp_notify(void)
1213 * Make sure obd_zombie_impexp_thread get this notification.
1214 * It is possible this signal only get by obd_zombie_barrier, and
1215 * barrier gulps this notification and sleeps away and hangs ensues
1217 wake_up_all(&obd_zombie_waitq);
1221 * check whether obd_zombie is idle
1223 static int obd_zombie_is_idle(void)
1227 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1228 spin_lock(&obd_zombie_impexp_lock);
1229 rc = (zombies_count == 0);
1230 spin_unlock(&obd_zombie_impexp_lock);
1235 * wait when obd_zombie import/export queues become empty
1237 void obd_zombie_barrier(void)
1239 struct l_wait_info lwi = { 0 };
1241 if (obd_zombie_pid == current_pid())
1242 /* don't wait for myself */
1244 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1246 EXPORT_SYMBOL(obd_zombie_barrier);
1249 * destroy zombie export/import thread.
1251 static int obd_zombie_impexp_thread(void *unused)
1253 unshare_fs_struct();
1254 complete(&obd_zombie_start);
1256 obd_zombie_pid = current_pid();
1258 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1259 struct l_wait_info lwi = { 0 };
1261 l_wait_event(obd_zombie_waitq,
1262 !obd_zombie_impexp_check(NULL), &lwi);
1263 obd_zombie_impexp_cull();
1266 * Notify obd_zombie_barrier callers that queues
1269 wake_up(&obd_zombie_waitq);
1272 complete(&obd_zombie_stop);
1278 * start destroy zombie import/export thread
1280 int obd_zombie_impexp_init(void)
1282 struct task_struct *task;
1284 INIT_LIST_HEAD(&obd_zombie_imports);
1285 INIT_LIST_HEAD(&obd_zombie_exports);
1286 spin_lock_init(&obd_zombie_impexp_lock);
1287 init_completion(&obd_zombie_start);
1288 init_completion(&obd_zombie_stop);
1289 init_waitqueue_head(&obd_zombie_waitq);
1292 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1294 return PTR_ERR(task);
1296 wait_for_completion(&obd_zombie_start);
1301 * stop destroy zombie import/export thread
1303 void obd_zombie_impexp_stop(void)
1305 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1306 obd_zombie_impexp_notify();
1307 wait_for_completion(&obd_zombie_stop);
1310 struct obd_request_slot_waiter {
1311 struct list_head orsw_entry;
1312 wait_queue_head_t orsw_waitq;
1316 static bool obd_request_slot_avail(struct client_obd *cli,
1317 struct obd_request_slot_waiter *orsw)
1321 spin_lock(&cli->cl_loi_list_lock);
1322 avail = !!list_empty(&orsw->orsw_entry);
1323 spin_unlock(&cli->cl_loi_list_lock);
1329 * For network flow control, the RPC sponsor needs to acquire a credit
1330 * before sending the RPC. The credits count for a connection is defined
1331 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1332 * the subsequent RPC sponsors need to wait until others released their
1333 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1335 int obd_get_request_slot(struct client_obd *cli)
1337 struct obd_request_slot_waiter orsw;
1338 struct l_wait_info lwi;
1341 spin_lock(&cli->cl_loi_list_lock);
1342 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1343 cli->cl_r_in_flight++;
1344 spin_unlock(&cli->cl_loi_list_lock);
1348 init_waitqueue_head(&orsw.orsw_waitq);
1349 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1350 orsw.orsw_signaled = false;
1351 spin_unlock(&cli->cl_loi_list_lock);
1353 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1354 rc = l_wait_event(orsw.orsw_waitq,
1355 obd_request_slot_avail(cli, &orsw) ||
1360 * Here, we must take the lock to avoid the on-stack 'orsw' to be
1361 * freed but other (such as obd_put_request_slot) is using it.
1363 spin_lock(&cli->cl_loi_list_lock);
1365 if (!orsw.orsw_signaled) {
1366 if (list_empty(&orsw.orsw_entry))
1367 cli->cl_r_in_flight--;
1369 list_del(&orsw.orsw_entry);
1373 if (orsw.orsw_signaled) {
1374 LASSERT(list_empty(&orsw.orsw_entry));
1378 spin_unlock(&cli->cl_loi_list_lock);
1382 EXPORT_SYMBOL(obd_get_request_slot);
1384 void obd_put_request_slot(struct client_obd *cli)
1386 struct obd_request_slot_waiter *orsw;
1388 spin_lock(&cli->cl_loi_list_lock);
1389 cli->cl_r_in_flight--;
1391 /* If there is free slot, wakeup the first waiter. */
1392 if (!list_empty(&cli->cl_loi_read_list) &&
1393 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1394 orsw = list_entry(cli->cl_loi_read_list.next,
1395 struct obd_request_slot_waiter, orsw_entry);
1396 list_del_init(&orsw->orsw_entry);
1397 cli->cl_r_in_flight++;
1398 wake_up(&orsw->orsw_waitq);
1400 spin_unlock(&cli->cl_loi_list_lock);
1402 EXPORT_SYMBOL(obd_put_request_slot);
1404 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1406 return cli->cl_max_rpcs_in_flight;
1408 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1410 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1412 struct obd_request_slot_waiter *orsw;
1413 const char *typ_name;
1419 if (max > OBD_MAX_RIF_MAX || max < 1)
1422 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
1423 if (!strcmp(typ_name, LUSTRE_MDC_NAME)) {
1425 * adjust max_mod_rpcs_in_flight to ensure it is always
1426 * strictly lower that max_rpcs_in_flight
1429 CERROR("%s: cannot set max_rpcs_in_flight to 1 because it must be higher than max_mod_rpcs_in_flight value\n",
1430 cli->cl_import->imp_obd->obd_name);
1433 if (max <= cli->cl_max_mod_rpcs_in_flight) {
1434 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
1440 spin_lock(&cli->cl_loi_list_lock);
1441 old = cli->cl_max_rpcs_in_flight;
1442 cli->cl_max_rpcs_in_flight = max;
1445 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1446 for (i = 0; i < diff; i++) {
1447 if (list_empty(&cli->cl_loi_read_list))
1450 orsw = list_entry(cli->cl_loi_read_list.next,
1451 struct obd_request_slot_waiter, orsw_entry);
1452 list_del_init(&orsw->orsw_entry);
1453 cli->cl_r_in_flight++;
1454 wake_up(&orsw->orsw_waitq);
1456 spin_unlock(&cli->cl_loi_list_lock);
1460 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
1462 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
1464 struct obd_connect_data *ocd;
1468 if (max > OBD_MAX_RIF_MAX || max < 1)
1471 /* cannot exceed or equal max_rpcs_in_flight */
1472 if (max >= cli->cl_max_rpcs_in_flight) {
1473 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) higher or equal to max_rpcs_in_flight value (%u)\n",
1474 cli->cl_import->imp_obd->obd_name,
1475 max, cli->cl_max_rpcs_in_flight);
1479 /* cannot exceed max modify RPCs in flight supported by the server */
1480 ocd = &cli->cl_import->imp_connect_data;
1481 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
1482 maxmodrpcs = ocd->ocd_maxmodrpcs;
1485 if (max > maxmodrpcs) {
1486 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) higher than max_mod_rpcs_per_client value (%hu) returned by the server at connection\n",
1487 cli->cl_import->imp_obd->obd_name,
1492 spin_lock(&cli->cl_mod_rpcs_lock);
1494 prev = cli->cl_max_mod_rpcs_in_flight;
1495 cli->cl_max_mod_rpcs_in_flight = max;
1497 /* wakeup waiters if limit has been increased */
1498 if (cli->cl_max_mod_rpcs_in_flight > prev)
1499 wake_up(&cli->cl_mod_rpcs_waitq);
1501 spin_unlock(&cli->cl_mod_rpcs_lock);
1505 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
1507 #define pct(a, b) (b ? (a * 100) / b : 0)
1509 int obd_mod_rpc_stats_seq_show(struct client_obd *cli, struct seq_file *seq)
1511 unsigned long mod_tot = 0, mod_cum;
1512 struct timespec64 now;
1515 ktime_get_real_ts64(&now);
1517 spin_lock(&cli->cl_mod_rpcs_lock);
1519 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
1520 (s64)now.tv_sec, (unsigned long)now.tv_nsec);
1521 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
1522 cli->cl_mod_rpcs_in_flight);
1524 seq_puts(seq, "\n\t\t\tmodify\n");
1525 seq_puts(seq, "rpcs in flight rpcs %% cum %%\n");
1527 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
1530 for (i = 0; i < OBD_HIST_MAX; i++) {
1531 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
1534 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
1535 i, mod, pct(mod, mod_tot),
1536 pct(mod_cum, mod_tot));
1537 if (mod_cum == mod_tot)
1541 spin_unlock(&cli->cl_mod_rpcs_lock);
1545 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
1549 * The number of modify RPCs sent in parallel is limited
1550 * because the server has a finite number of slots per client to
1551 * store request result and ensure reply reconstruction when needed.
1552 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
1553 * that takes into account server limit and cl_max_rpcs_in_flight
1555 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
1556 * one close request is allowed above the maximum.
1558 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
1563 /* A slot is available if
1564 * - number of modify RPCs in flight is less than the max
1565 * - it's a close RPC and no other close request is in flight
1567 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
1568 (close_req && !cli->cl_close_rpcs_in_flight);
1573 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
1578 spin_lock(&cli->cl_mod_rpcs_lock);
1579 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
1580 spin_unlock(&cli->cl_mod_rpcs_lock);
1584 /* Get a modify RPC slot from the obd client @cli according
1585 * to the kind of operation @opc that is going to be sent
1586 * and the intent @it of the operation if it applies.
1587 * If the maximum number of modify RPCs in flight is reached
1588 * the thread is put to sleep.
1589 * Returns the tag to be set in the request message. Tag 0
1590 * is reserved for non-modifying requests.
1592 u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
1593 struct lookup_intent *it)
1595 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
1596 bool close_req = false;
1599 /* read-only metadata RPCs don't consume a slot on MDT
1600 * for reply reconstruction
1602 if (it && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
1603 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
1606 if (opc == MDS_CLOSE)
1610 spin_lock(&cli->cl_mod_rpcs_lock);
1611 max = cli->cl_max_mod_rpcs_in_flight;
1612 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
1613 /* there is a slot available */
1614 cli->cl_mod_rpcs_in_flight++;
1616 cli->cl_close_rpcs_in_flight++;
1617 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
1618 cli->cl_mod_rpcs_in_flight);
1619 /* find a free tag */
1620 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
1622 LASSERT(i < OBD_MAX_RIF_MAX);
1623 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
1624 spin_unlock(&cli->cl_mod_rpcs_lock);
1625 /* tag 0 is reserved for non-modify RPCs */
1628 spin_unlock(&cli->cl_mod_rpcs_lock);
1630 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot opc %u, max %hu\n",
1631 cli->cl_import->imp_obd->obd_name, opc, max);
1633 l_wait_event(cli->cl_mod_rpcs_waitq,
1634 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
1637 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
1640 * Put a modify RPC slot from the obd client @cli according
1641 * to the kind of operation @opc that has been sent and the
1642 * intent @it of the operation if it applies.
1644 void obd_put_mod_rpc_slot(struct client_obd *cli, u32 opc,
1645 struct lookup_intent *it, u16 tag)
1647 bool close_req = false;
1649 if (it && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
1650 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
1653 if (opc == MDS_CLOSE)
1656 spin_lock(&cli->cl_mod_rpcs_lock);
1657 cli->cl_mod_rpcs_in_flight--;
1659 cli->cl_close_rpcs_in_flight--;
1660 /* release the tag in the bitmap */
1661 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
1662 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
1663 spin_unlock(&cli->cl_mod_rpcs_lock);
1664 wake_up(&cli->cl_mod_rpcs_waitq);
1666 EXPORT_SYMBOL(obd_put_mod_rpc_slot);