Linux-libre 4.9.18-gnu
[librecmc/linux-libre.git] / drivers / staging / lustre / lustre / obdecho / echo_client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_ECHO
34 #include "../../include/linux/libcfs/libcfs.h"
35
36 #include "../include/obd.h"
37 #include "../include/obd_support.h"
38 #include "../include/obd_class.h"
39 #include "../include/lustre_debug.h"
40 #include "../include/lprocfs_status.h"
41 #include "../include/cl_object.h"
42 #include "../include/lustre_fid.h"
43 #include "../include/lustre_acl.h"
44 #include "../include/lustre/lustre_ioctl.h"
45 #include "../include/lustre_net.h"
46
47 #include "echo_internal.h"
48
49 /** \defgroup echo_client Echo Client
50  * @{
51  */
52
53 struct echo_device {
54         struct cl_device        ed_cl;
55         struct echo_client_obd *ed_ec;
56
57         struct cl_site    ed_site_myself;
58         struct cl_site   *ed_site;
59         struct lu_device       *ed_next;
60 };
61
62 struct echo_object {
63         struct cl_object        eo_cl;
64         struct cl_object_header eo_hdr;
65
66         struct echo_device     *eo_dev;
67         struct list_head              eo_obj_chain;
68         struct lov_oinfo       *eo_oinfo;
69         atomic_t            eo_npages;
70         int                  eo_deleted;
71 };
72
73 struct echo_object_conf {
74         struct cl_object_conf  eoc_cl;
75         struct lov_oinfo      **eoc_oinfo;
76 };
77
78 struct echo_page {
79         struct cl_page_slice   ep_cl;
80         struct mutex            ep_lock;
81 };
82
83 struct echo_lock {
84         struct cl_lock_slice   el_cl;
85         struct list_head             el_chain;
86         struct echo_object    *el_object;
87         __u64             el_cookie;
88         atomic_t           el_refcount;
89 };
90
91 static int echo_client_setup(const struct lu_env *env,
92                              struct obd_device *obddev,
93                              struct lustre_cfg *lcfg);
94 static int echo_client_cleanup(struct obd_device *obddev);
95
96 /** \defgroup echo_helpers Helper functions
97  * @{
98  */
99 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
100 {
101         return container_of0(dev, struct echo_device, ed_cl);
102 }
103
104 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
105 {
106         return &d->ed_cl;
107 }
108
109 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
110 {
111         return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
112 }
113
114 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
115 {
116         return &eco->eo_cl;
117 }
118
119 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
120 {
121         return container_of(o, struct echo_object, eo_cl);
122 }
123
124 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
125 {
126         return container_of(s, struct echo_page, ep_cl);
127 }
128
129 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
130 {
131         return container_of(s, struct echo_lock, el_cl);
132 }
133
134 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
135 {
136         return ecl->el_cl.cls_lock;
137 }
138
139 static struct lu_context_key echo_thread_key;
140 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
141 {
142         struct echo_thread_info *info;
143
144         info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
145         LASSERT(info);
146         return info;
147 }
148
149 static inline
150 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
151 {
152         return container_of(c, struct echo_object_conf, eoc_cl);
153 }
154
155 /** @} echo_helpers */
156 static int cl_echo_object_put(struct echo_object *eco);
157 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
158                               struct page **pages, int npages, int async);
159
160 struct echo_thread_info {
161         struct echo_object_conf eti_conf;
162         struct lustre_md        eti_md;
163
164         struct cl_2queue        eti_queue;
165         struct cl_io        eti_io;
166         struct cl_lock          eti_lock;
167         struct lu_fid      eti_fid;
168         struct lu_fid           eti_fid2;
169 };
170
171 /* No session used right now */
172 struct echo_session_info {
173         unsigned long dummy;
174 };
175
176 static struct kmem_cache *echo_lock_kmem;
177 static struct kmem_cache *echo_object_kmem;
178 static struct kmem_cache *echo_thread_kmem;
179 static struct kmem_cache *echo_session_kmem;
180
181 static struct lu_kmem_descr echo_caches[] = {
182         {
183                 .ckd_cache = &echo_lock_kmem,
184                 .ckd_name  = "echo_lock_kmem",
185                 .ckd_size  = sizeof(struct echo_lock)
186         },
187         {
188                 .ckd_cache = &echo_object_kmem,
189                 .ckd_name  = "echo_object_kmem",
190                 .ckd_size  = sizeof(struct echo_object)
191         },
192         {
193                 .ckd_cache = &echo_thread_kmem,
194                 .ckd_name  = "echo_thread_kmem",
195                 .ckd_size  = sizeof(struct echo_thread_info)
196         },
197         {
198                 .ckd_cache = &echo_session_kmem,
199                 .ckd_name  = "echo_session_kmem",
200                 .ckd_size  = sizeof(struct echo_session_info)
201         },
202         {
203                 .ckd_cache = NULL
204         }
205 };
206
207 /** \defgroup echo_page Page operations
208  *
209  * Echo page operations.
210  *
211  * @{
212  */
213 static int echo_page_own(const struct lu_env *env,
214                          const struct cl_page_slice *slice,
215                          struct cl_io *io, int nonblock)
216 {
217         struct echo_page *ep = cl2echo_page(slice);
218
219         if (!nonblock)
220                 mutex_lock(&ep->ep_lock);
221         else if (!mutex_trylock(&ep->ep_lock))
222                 return -EAGAIN;
223         return 0;
224 }
225
226 static void echo_page_disown(const struct lu_env *env,
227                              const struct cl_page_slice *slice,
228                              struct cl_io *io)
229 {
230         struct echo_page *ep = cl2echo_page(slice);
231
232         LASSERT(mutex_is_locked(&ep->ep_lock));
233         mutex_unlock(&ep->ep_lock);
234 }
235
236 static void echo_page_discard(const struct lu_env *env,
237                               const struct cl_page_slice *slice,
238                               struct cl_io *unused)
239 {
240         cl_page_delete(env, slice->cpl_page);
241 }
242
243 static int echo_page_is_vmlocked(const struct lu_env *env,
244                                  const struct cl_page_slice *slice)
245 {
246         if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
247                 return -EBUSY;
248         return -ENODATA;
249 }
250
251 static void echo_page_completion(const struct lu_env *env,
252                                  const struct cl_page_slice *slice,
253                                  int ioret)
254 {
255         LASSERT(slice->cpl_page->cp_sync_io);
256 }
257
258 static void echo_page_fini(const struct lu_env *env,
259                            struct cl_page_slice *slice)
260 {
261         struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
262
263         atomic_dec(&eco->eo_npages);
264         put_page(slice->cpl_page->cp_vmpage);
265 }
266
267 static int echo_page_prep(const struct lu_env *env,
268                           const struct cl_page_slice *slice,
269                           struct cl_io *unused)
270 {
271         return 0;
272 }
273
274 static int echo_page_print(const struct lu_env *env,
275                            const struct cl_page_slice *slice,
276                            void *cookie, lu_printer_t printer)
277 {
278         struct echo_page *ep = cl2echo_page(slice);
279
280         (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
281                    ep, mutex_is_locked(&ep->ep_lock),
282                    slice->cpl_page->cp_vmpage);
283         return 0;
284 }
285
286 static const struct cl_page_operations echo_page_ops = {
287         .cpo_own           = echo_page_own,
288         .cpo_disown     = echo_page_disown,
289         .cpo_discard       = echo_page_discard,
290         .cpo_fini         = echo_page_fini,
291         .cpo_print       = echo_page_print,
292         .cpo_is_vmlocked   = echo_page_is_vmlocked,
293         .io = {
294                 [CRT_READ] = {
295                         .cpo_prep       = echo_page_prep,
296                         .cpo_completion  = echo_page_completion,
297                 },
298                 [CRT_WRITE] = {
299                         .cpo_prep       = echo_page_prep,
300                         .cpo_completion  = echo_page_completion,
301                 }
302         }
303 };
304
305 /** @} echo_page */
306
307 /** \defgroup echo_lock Locking
308  *
309  * echo lock operations
310  *
311  * @{
312  */
313 static void echo_lock_fini(const struct lu_env *env,
314                            struct cl_lock_slice *slice)
315 {
316         struct echo_lock *ecl = cl2echo_lock(slice);
317
318         LASSERT(list_empty(&ecl->el_chain));
319         kmem_cache_free(echo_lock_kmem, ecl);
320 }
321
322 static struct cl_lock_operations echo_lock_ops = {
323         .clo_fini      = echo_lock_fini,
324 };
325
326 /** @} echo_lock */
327
328 /** \defgroup echo_cl_ops cl_object operations
329  *
330  * operations for cl_object
331  *
332  * @{
333  */
334 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
335                           struct cl_page *page, pgoff_t index)
336 {
337         struct echo_page *ep = cl_object_page_slice(obj, page);
338         struct echo_object *eco = cl2echo_obj(obj);
339
340         get_page(page->cp_vmpage);
341         mutex_init(&ep->ep_lock);
342         cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
343         atomic_inc(&eco->eo_npages);
344         return 0;
345 }
346
347 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
348                         struct cl_io *io)
349 {
350         return 0;
351 }
352
353 static int echo_lock_init(const struct lu_env *env,
354                           struct cl_object *obj, struct cl_lock *lock,
355                           const struct cl_io *unused)
356 {
357         struct echo_lock *el;
358
359         el = kmem_cache_zalloc(echo_lock_kmem, GFP_NOFS);
360         if (el) {
361                 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
362                 el->el_object = cl2echo_obj(obj);
363                 INIT_LIST_HEAD(&el->el_chain);
364                 atomic_set(&el->el_refcount, 0);
365         }
366         return !el ? -ENOMEM : 0;
367 }
368
369 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
370                          const struct cl_object_conf *conf)
371 {
372         return 0;
373 }
374
375 static const struct cl_object_operations echo_cl_obj_ops = {
376         .coo_page_init = echo_page_init,
377         .coo_lock_init = echo_lock_init,
378         .coo_io_init   = echo_io_init,
379         .coo_conf_set  = echo_conf_set
380 };
381
382 /** @} echo_cl_ops */
383
384 /** \defgroup echo_lu_ops lu_object operations
385  *
386  * operations for echo lu object.
387  *
388  * @{
389  */
390 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
391                             const struct lu_object_conf *conf)
392 {
393         struct echo_device *ed   = cl2echo_dev(lu2cl_dev(obj->lo_dev));
394         struct echo_client_obd *ec     = ed->ed_ec;
395         struct echo_object *eco = cl2echo_obj(lu2cl(obj));
396         const struct cl_object_conf *cconf;
397         struct echo_object_conf *econf;
398
399         if (ed->ed_next) {
400                 struct lu_object  *below;
401                 struct lu_device  *under;
402
403                 under = ed->ed_next;
404                 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
405                                                         under);
406                 if (!below)
407                         return -ENOMEM;
408                 lu_object_add(obj, below);
409         }
410
411         cconf = lu2cl_conf(conf);
412         econf = cl2echo_conf(cconf);
413
414         LASSERT(econf->eoc_oinfo);
415         /*
416          * Transfer the oinfo pointer to eco that it won't be
417          * freed.
418          */
419         eco->eo_oinfo = *econf->eoc_oinfo;
420         *econf->eoc_oinfo = NULL;
421
422         eco->eo_dev = ed;
423         atomic_set(&eco->eo_npages, 0);
424         cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
425
426         spin_lock(&ec->ec_lock);
427         list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
428         spin_unlock(&ec->ec_lock);
429
430         return 0;
431 }
432
433 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
434 {
435         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
436         struct echo_client_obd *ec = eco->eo_dev->ed_ec;
437
438         LASSERT(atomic_read(&eco->eo_npages) == 0);
439
440         spin_lock(&ec->ec_lock);
441         list_del_init(&eco->eo_obj_chain);
442         spin_unlock(&ec->ec_lock);
443
444         lu_object_fini(obj);
445         lu_object_header_fini(obj->lo_header);
446
447         kfree(eco->eo_oinfo);
448         kmem_cache_free(echo_object_kmem, eco);
449 }
450
451 static int echo_object_print(const struct lu_env *env, void *cookie,
452                              lu_printer_t p, const struct lu_object *o)
453 {
454         struct echo_object *obj = cl2echo_obj(lu2cl(o));
455
456         return (*p)(env, cookie, "echoclient-object@%p", obj);
457 }
458
459 static const struct lu_object_operations echo_lu_obj_ops = {
460         .loo_object_init      = echo_object_init,
461         .loo_object_delete    = NULL,
462         .loo_object_release   = NULL,
463         .loo_object_free      = echo_object_free,
464         .loo_object_print     = echo_object_print,
465         .loo_object_invariant = NULL
466 };
467
468 /** @} echo_lu_ops */
469
470 /** \defgroup echo_lu_dev_ops  lu_device operations
471  *
472  * Operations for echo lu device.
473  *
474  * @{
475  */
476 static struct lu_object *echo_object_alloc(const struct lu_env *env,
477                                            const struct lu_object_header *hdr,
478                                            struct lu_device *dev)
479 {
480         struct echo_object *eco;
481         struct lu_object *obj = NULL;
482
483         /* we're the top dev. */
484         LASSERT(!hdr);
485         eco = kmem_cache_zalloc(echo_object_kmem, GFP_NOFS);
486         if (eco) {
487                 struct cl_object_header *hdr = &eco->eo_hdr;
488
489                 obj = &echo_obj2cl(eco)->co_lu;
490                 cl_object_header_init(hdr);
491                 hdr->coh_page_bufsize = cfs_size_round(sizeof(struct cl_page));
492
493                 lu_object_init(obj, &hdr->coh_lu, dev);
494                 lu_object_add_top(&hdr->coh_lu, obj);
495
496                 eco->eo_cl.co_ops = &echo_cl_obj_ops;
497                 obj->lo_ops       = &echo_lu_obj_ops;
498         }
499         return obj;
500 }
501
502 static const struct lu_device_operations echo_device_lu_ops = {
503         .ldo_object_alloc   = echo_object_alloc,
504 };
505
506 /** @} echo_lu_dev_ops */
507
508 static const struct cl_device_operations echo_device_cl_ops = {
509 };
510
511 /** \defgroup echo_init Setup and teardown
512  *
513  * Init and fini functions for echo client.
514  *
515  * @{
516  */
517 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
518 {
519         struct cl_site *site = &ed->ed_site_myself;
520         int rc;
521
522         /* initialize site */
523         rc = cl_site_init(site, &ed->ed_cl);
524         if (rc) {
525                 CERROR("Cannot initialize site for echo client(%d)\n", rc);
526                 return rc;
527         }
528
529         rc = lu_site_init_finish(&site->cs_lu);
530         if (rc)
531                 return rc;
532
533         ed->ed_site = site;
534         return 0;
535 }
536
537 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
538 {
539         if (ed->ed_site) {
540                 cl_site_fini(ed->ed_site);
541                 ed->ed_site = NULL;
542         }
543 }
544
545 static void *echo_thread_key_init(const struct lu_context *ctx,
546                                   struct lu_context_key *key)
547 {
548         struct echo_thread_info *info;
549
550         info = kmem_cache_zalloc(echo_thread_kmem, GFP_NOFS);
551         if (!info)
552                 info = ERR_PTR(-ENOMEM);
553         return info;
554 }
555
556 static void echo_thread_key_fini(const struct lu_context *ctx,
557                                  struct lu_context_key *key, void *data)
558 {
559         struct echo_thread_info *info = data;
560
561         kmem_cache_free(echo_thread_kmem, info);
562 }
563
564 static void echo_thread_key_exit(const struct lu_context *ctx,
565                                  struct lu_context_key *key, void *data)
566 {
567 }
568
569 static struct lu_context_key echo_thread_key = {
570         .lct_tags = LCT_CL_THREAD,
571         .lct_init = echo_thread_key_init,
572         .lct_fini = echo_thread_key_fini,
573         .lct_exit = echo_thread_key_exit
574 };
575
576 static void *echo_session_key_init(const struct lu_context *ctx,
577                                    struct lu_context_key *key)
578 {
579         struct echo_session_info *session;
580
581         session = kmem_cache_zalloc(echo_session_kmem, GFP_NOFS);
582         if (!session)
583                 session = ERR_PTR(-ENOMEM);
584         return session;
585 }
586
587 static void echo_session_key_fini(const struct lu_context *ctx,
588                                   struct lu_context_key *key, void *data)
589 {
590         struct echo_session_info *session = data;
591
592         kmem_cache_free(echo_session_kmem, session);
593 }
594
595 static void echo_session_key_exit(const struct lu_context *ctx,
596                                   struct lu_context_key *key, void *data)
597 {
598 }
599
600 static struct lu_context_key echo_session_key = {
601         .lct_tags = LCT_SESSION,
602         .lct_init = echo_session_key_init,
603         .lct_fini = echo_session_key_fini,
604         .lct_exit = echo_session_key_exit
605 };
606
607 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
608
609 static struct lu_device *echo_device_alloc(const struct lu_env *env,
610                                            struct lu_device_type *t,
611                                            struct lustre_cfg *cfg)
612 {
613         struct lu_device   *next;
614         struct echo_device *ed;
615         struct cl_device   *cd;
616         struct obd_device  *obd = NULL; /* to keep compiler happy */
617         struct obd_device  *tgt;
618         const char *tgt_type_name;
619         int rc, err;
620
621         ed = kzalloc(sizeof(*ed), GFP_NOFS);
622         if (!ed) {
623                 rc = -ENOMEM;
624                 goto out;
625         }
626
627         cd = &ed->ed_cl;
628         rc = cl_device_init(cd, t);
629         if (rc)
630                 goto out_free;
631
632         cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
633         cd->cd_ops = &echo_device_cl_ops;
634
635         obd = class_name2obd(lustre_cfg_string(cfg, 0));
636         LASSERT(obd);
637         LASSERT(env);
638
639         tgt = class_name2obd(lustre_cfg_string(cfg, 1));
640         if (!tgt) {
641                 CERROR("Can not find tgt device %s\n",
642                        lustre_cfg_string(cfg, 1));
643                 rc = -ENODEV;
644                 goto out_device_fini;
645         }
646
647         next = tgt->obd_lu_dev;
648         if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
649                 CERROR("echo MDT client must be run on server\n");
650                 rc = -EOPNOTSUPP;
651                 goto out_device_fini;
652         }
653
654         rc = echo_site_init(env, ed);
655         if (rc)
656                 goto out_device_fini;
657
658         rc = echo_client_setup(env, obd, cfg);
659         if (rc)
660                 goto out_site_fini;
661
662         ed->ed_ec = &obd->u.echo_client;
663
664         /* if echo client is to be stacked upon ost device, the next is
665          * NULL since ost is not a clio device so far
666          */
667         if (next && !lu_device_is_cl(next))
668                 next = NULL;
669
670         tgt_type_name = tgt->obd_type->typ_name;
671         if (next) {
672                 if (next->ld_site) {
673                         rc = -EBUSY;
674                         goto out_cleanup;
675                 }
676
677                 next->ld_site = &ed->ed_site->cs_lu;
678                 rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
679                                                 next->ld_type->ldt_name,
680                                                               NULL);
681                 if (rc)
682                         goto out_cleanup;
683
684         } else {
685                 LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
686         }
687
688         ed->ed_next = next;
689         return &cd->cd_lu_dev;
690
691 out_cleanup:
692         err = echo_client_cleanup(obd);
693         if (err)
694                 CERROR("Cleanup obd device %s error(%d)\n",
695                        obd->obd_name, err);
696 out_site_fini:
697         echo_site_fini(env, ed);
698 out_device_fini:
699         cl_device_fini(&ed->ed_cl);
700 out_free:
701         kfree(ed);
702 out:
703         return ERR_PTR(rc);
704 }
705
706 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
707                             const char *name, struct lu_device *next)
708 {
709         LBUG();
710         return 0;
711 }
712
713 static struct lu_device *echo_device_fini(const struct lu_env *env,
714                                           struct lu_device *d)
715 {
716         struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
717         struct lu_device *next = ed->ed_next;
718
719         while (next)
720                 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
721         return NULL;
722 }
723
724 static void echo_lock_release(const struct lu_env *env,
725                               struct echo_lock *ecl,
726                               int still_used)
727 {
728         struct cl_lock *clk = echo_lock2cl(ecl);
729
730         cl_lock_release(env, clk);
731 }
732
733 static struct lu_device *echo_device_free(const struct lu_env *env,
734                                           struct lu_device *d)
735 {
736         struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
737         struct echo_client_obd *ec   = ed->ed_ec;
738         struct echo_object     *eco;
739         struct lu_device       *next = ed->ed_next;
740
741         CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
742                ed, next);
743
744         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
745
746         /* check if there are objects still alive.
747          * It shouldn't have any object because lu_site_purge would cleanup
748          * all of cached objects. Anyway, probably the echo device is being
749          * parallelly accessed.
750          */
751         spin_lock(&ec->ec_lock);
752         list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
753                 eco->eo_deleted = 1;
754         spin_unlock(&ec->ec_lock);
755
756         /* purge again */
757         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
758
759         CDEBUG(D_INFO,
760                "Waiting for the reference of echo object to be dropped\n");
761
762         /* Wait for the last reference to be dropped. */
763         spin_lock(&ec->ec_lock);
764         while (!list_empty(&ec->ec_objects)) {
765                 spin_unlock(&ec->ec_lock);
766                 CERROR("echo_client still has objects at cleanup time, wait for 1 second\n");
767                 set_current_state(TASK_UNINTERRUPTIBLE);
768                 schedule_timeout(cfs_time_seconds(1));
769                 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
770                 spin_lock(&ec->ec_lock);
771         }
772         spin_unlock(&ec->ec_lock);
773
774         LASSERT(list_empty(&ec->ec_locks));
775
776         CDEBUG(D_INFO, "No object exists, exiting...\n");
777
778         echo_client_cleanup(d->ld_obd);
779
780         while (next)
781                 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
782
783         LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
784         echo_site_fini(env, ed);
785         cl_device_fini(&ed->ed_cl);
786         kfree(ed);
787
788         return NULL;
789 }
790
791 static const struct lu_device_type_operations echo_device_type_ops = {
792         .ldto_init = echo_type_init,
793         .ldto_fini = echo_type_fini,
794
795         .ldto_start = echo_type_start,
796         .ldto_stop  = echo_type_stop,
797
798         .ldto_device_alloc = echo_device_alloc,
799         .ldto_device_free  = echo_device_free,
800         .ldto_device_init  = echo_device_init,
801         .ldto_device_fini  = echo_device_fini
802 };
803
804 static struct lu_device_type echo_device_type = {
805         .ldt_tags     = LU_DEVICE_CL,
806         .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
807         .ldt_ops      = &echo_device_type_ops,
808         .ldt_ctx_tags = LCT_CL_THREAD,
809 };
810
811 /** @} echo_init */
812
813 /** \defgroup echo_exports Exported operations
814  *
815  * exporting functions to echo client
816  *
817  * @{
818  */
819
820 /* Interfaces to echo client obd device */
821 static struct echo_object *
822 cl_echo_object_find(struct echo_device *d, const struct ost_id *oi)
823 {
824         struct lu_env *env;
825         struct echo_thread_info *info;
826         struct echo_object_conf *conf;
827         struct lov_oinfo *oinfo = NULL;
828         struct echo_object *eco;
829         struct cl_object   *obj;
830         struct lu_fid *fid;
831         int refcheck;
832         int rc;
833
834         LASSERTF(ostid_id(oi), DOSTID "\n", POSTID(oi));
835         LASSERTF(ostid_seq(oi) == FID_SEQ_ECHO, DOSTID "\n", POSTID(oi));
836
837         /* Never return an object if the obd is to be freed. */
838         if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
839                 return ERR_PTR(-ENODEV);
840
841         env = cl_env_get(&refcheck);
842         if (IS_ERR(env))
843                 return (void *)env;
844
845         info = echo_env_info(env);
846         conf = &info->eti_conf;
847         if (d->ed_next) {
848                 oinfo = kzalloc(sizeof(*oinfo), GFP_NOFS);
849                 if (!oinfo) {
850                         eco = ERR_PTR(-ENOMEM);
851                         goto out;
852                 }
853
854                 oinfo->loi_oi = *oi;
855                 conf->eoc_cl.u.coc_oinfo = oinfo;
856         }
857
858         /*
859          * If echo_object_init() is successful then ownership of oinfo
860          * is transferred to the object.
861          */
862         conf->eoc_oinfo = &oinfo;
863
864         fid  = &info->eti_fid;
865         rc = ostid_to_fid(fid, (struct ost_id *)oi, 0);
866         if (rc != 0) {
867                 eco = ERR_PTR(rc);
868                 goto out;
869         }
870
871         /* In the function below, .hs_keycmp resolves to
872          * lu_obj_hop_keycmp()
873          */
874         /* coverity[overrun-buffer-val] */
875         obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
876         if (IS_ERR(obj)) {
877                 eco = (void *)obj;
878                 goto out;
879         }
880
881         eco = cl2echo_obj(obj);
882         if (eco->eo_deleted) {
883                 cl_object_put(env, obj);
884                 eco = ERR_PTR(-EAGAIN);
885         }
886
887 out:
888         kfree(oinfo);
889         cl_env_put(env, &refcheck);
890         return eco;
891 }
892
893 static int cl_echo_object_put(struct echo_object *eco)
894 {
895         struct lu_env *env;
896         struct cl_object *obj = echo_obj2cl(eco);
897         int refcheck;
898
899         env = cl_env_get(&refcheck);
900         if (IS_ERR(env))
901                 return PTR_ERR(env);
902
903         /* an external function to kill an object? */
904         if (eco->eo_deleted) {
905                 struct lu_object_header *loh = obj->co_lu.lo_header;
906
907                 LASSERT(&eco->eo_hdr == luh2coh(loh));
908                 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
909         }
910
911         cl_object_put(env, obj);
912         cl_env_put(env, &refcheck);
913         return 0;
914 }
915
916 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
917                             u64 start, u64 end, int mode,
918                             __u64 *cookie, __u32 enqflags)
919 {
920         struct cl_io *io;
921         struct cl_lock *lck;
922         struct cl_object *obj;
923         struct cl_lock_descr *descr;
924         struct echo_thread_info *info;
925         int rc = -ENOMEM;
926
927         info = echo_env_info(env);
928         io = &info->eti_io;
929         lck = &info->eti_lock;
930         obj = echo_obj2cl(eco);
931
932         memset(lck, 0, sizeof(*lck));
933         descr = &lck->cll_descr;
934         descr->cld_obj   = obj;
935         descr->cld_start = cl_index(obj, start);
936         descr->cld_end   = cl_index(obj, end);
937         descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
938         descr->cld_enq_flags = enqflags;
939         io->ci_obj = obj;
940
941         rc = cl_lock_request(env, io, lck);
942         if (rc == 0) {
943                 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
944                 struct echo_lock *el;
945
946                 el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
947                 spin_lock(&ec->ec_lock);
948                 if (list_empty(&el->el_chain)) {
949                         list_add(&el->el_chain, &ec->ec_locks);
950                         el->el_cookie = ++ec->ec_unique;
951                 }
952                 atomic_inc(&el->el_refcount);
953                 *cookie = el->el_cookie;
954                 spin_unlock(&ec->ec_lock);
955         }
956         return rc;
957 }
958
959 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
960                            __u64 cookie)
961 {
962         struct echo_client_obd *ec = ed->ed_ec;
963         struct echo_lock       *ecl = NULL;
964         struct list_head             *el;
965         int found = 0, still_used = 0;
966
967         spin_lock(&ec->ec_lock);
968         list_for_each(el, &ec->ec_locks) {
969                 ecl = list_entry(el, struct echo_lock, el_chain);
970                 CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
971                 found = (ecl->el_cookie == cookie);
972                 if (found) {
973                         if (atomic_dec_and_test(&ecl->el_refcount))
974                                 list_del_init(&ecl->el_chain);
975                         else
976                                 still_used = 1;
977                         break;
978                 }
979         }
980         spin_unlock(&ec->ec_lock);
981
982         if (!found)
983                 return -ENOENT;
984
985         echo_lock_release(env, ecl, still_used);
986         return 0;
987 }
988
989 static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
990                                  struct cl_page *page)
991 {
992         struct echo_thread_info *info;
993         struct cl_2queue *queue;
994
995         info = echo_env_info(env);
996         LASSERT(io == &info->eti_io);
997
998         queue = &info->eti_queue;
999         cl_page_list_add(&queue->c2_qout, page);
1000 }
1001
1002 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1003                               struct page **pages, int npages, int async)
1004 {
1005         struct lu_env      *env;
1006         struct echo_thread_info *info;
1007         struct cl_object        *obj = echo_obj2cl(eco);
1008         struct echo_device      *ed  = eco->eo_dev;
1009         struct cl_2queue        *queue;
1010         struct cl_io        *io;
1011         struct cl_page    *clp;
1012         struct lustre_handle    lh = { 0 };
1013         size_t page_size = cl_page_size(obj);
1014         int refcheck;
1015         int rc;
1016         int i;
1017
1018         LASSERT((offset & ~PAGE_MASK) == 0);
1019         LASSERT(ed->ed_next);
1020         env = cl_env_get(&refcheck);
1021         if (IS_ERR(env))
1022                 return PTR_ERR(env);
1023
1024         info    = echo_env_info(env);
1025         io      = &info->eti_io;
1026         queue   = &info->eti_queue;
1027
1028         cl_2queue_init(queue);
1029
1030         io->ci_ignore_layout = 1;
1031         rc = cl_io_init(env, io, CIT_MISC, obj);
1032         if (rc < 0)
1033                 goto out;
1034         LASSERT(rc == 0);
1035
1036         rc = cl_echo_enqueue0(env, eco, offset,
1037                               offset + npages * PAGE_SIZE - 1,
1038                               rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1039                               CEF_NEVER);
1040         if (rc < 0)
1041                 goto error_lock;
1042
1043         for (i = 0; i < npages; i++) {
1044                 LASSERT(pages[i]);
1045                 clp = cl_page_find(env, obj, cl_index(obj, offset),
1046                                    pages[i], CPT_TRANSIENT);
1047                 if (IS_ERR(clp)) {
1048                         rc = PTR_ERR(clp);
1049                         break;
1050                 }
1051                 LASSERT(clp->cp_type == CPT_TRANSIENT);
1052
1053                 rc = cl_page_own(env, io, clp);
1054                 if (rc) {
1055                         LASSERT(clp->cp_state == CPS_FREEING);
1056                         cl_page_put(env, clp);
1057                         break;
1058                 }
1059                 /*
1060                  * Add a page to the incoming page list of 2-queue.
1061                  */
1062                 cl_page_list_add(&queue->c2_qin, clp);
1063
1064                 /* drop the reference count for cl_page_find, so that the page
1065                  * will be freed in cl_2queue_fini.
1066                  */
1067                 cl_page_put(env, clp);
1068                 cl_page_clip(env, clp, 0, page_size);
1069
1070                 offset += page_size;
1071         }
1072
1073         if (rc == 0) {
1074                 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1075
1076                 async = async && (typ == CRT_WRITE);
1077                 if (async)
1078                         rc = cl_io_commit_async(env, io, &queue->c2_qin,
1079                                                 0, PAGE_SIZE,
1080                                                 echo_commit_callback);
1081                 else
1082                         rc = cl_io_submit_sync(env, io, typ, queue, 0);
1083                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1084                        async ? "async" : "sync", rc);
1085         }
1086
1087         cl_echo_cancel0(env, ed, lh.cookie);
1088 error_lock:
1089         cl_2queue_discard(env, io, queue);
1090         cl_2queue_disown(env, io, queue);
1091         cl_2queue_fini(env, queue);
1092         cl_io_fini(env, io);
1093 out:
1094         cl_env_put(env, &refcheck);
1095         return rc;
1096 }
1097
1098 /** @} echo_exports */
1099
1100 static u64 last_object_id;
1101
1102 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
1103                               struct obdo *oa, struct obd_trans_info *oti)
1104 {
1105         struct echo_object     *eco;
1106         struct echo_client_obd *ec = ed->ed_ec;
1107         int                  rc;
1108         int                  created = 0;
1109
1110         if (!(oa->o_valid & OBD_MD_FLID) ||
1111             !(oa->o_valid & OBD_MD_FLGROUP) ||
1112             !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
1113                 CERROR("invalid oid " DOSTID "\n", POSTID(&oa->o_oi));
1114                 return -EINVAL;
1115         }
1116
1117         if (!ostid_id(&oa->o_oi))
1118                 ostid_set_id(&oa->o_oi, ++last_object_id);
1119
1120         rc = obd_create(env, ec->ec_exp, oa, oti);
1121         if (rc != 0) {
1122                 CERROR("Cannot create objects: rc = %d\n", rc);
1123                 goto failed;
1124         }
1125         created = 1;
1126
1127         oa->o_valid |= OBD_MD_FLID;
1128
1129         eco = cl_echo_object_find(ed, &oa->o_oi);
1130         if (IS_ERR(eco)) {
1131                 rc = PTR_ERR(eco);
1132                 goto failed;
1133         }
1134         cl_echo_object_put(eco);
1135
1136         CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
1137
1138  failed:
1139         if (created && rc)
1140                 obd_destroy(env, ec->ec_exp, oa, oti);
1141         if (rc)
1142                 CERROR("create object failed with: rc = %d\n", rc);
1143         return rc;
1144 }
1145
1146 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
1147                            struct obdo *oa)
1148 {
1149         struct echo_object     *eco;
1150         int                  rc;
1151
1152         if (!(oa->o_valid & OBD_MD_FLID) || !(oa->o_valid & OBD_MD_FLGROUP) ||
1153             !ostid_id(&oa->o_oi)) {
1154                 CERROR("invalid oid " DOSTID "\n", POSTID(&oa->o_oi));
1155                 return -EINVAL;
1156         }
1157
1158         rc = 0;
1159         eco = cl_echo_object_find(ed, &oa->o_oi);
1160         if (!IS_ERR(eco))
1161                 *ecop = eco;
1162         else
1163                 rc = PTR_ERR(eco);
1164         return rc;
1165 }
1166
1167 static void echo_put_object(struct echo_object *eco)
1168 {
1169         int rc;
1170
1171         rc = cl_echo_object_put(eco);
1172         if (rc)
1173                 CERROR("%s: echo client drop an object failed: rc = %d\n",
1174                        eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
1175 }
1176
1177 static void
1178 echo_client_page_debug_setup(struct page *page, int rw, u64 id,
1179                              u64 offset, u64 count)
1180 {
1181         char    *addr;
1182         u64      stripe_off;
1183         u64      stripe_id;
1184         int      delta;
1185
1186         /* no partial pages on the client */
1187         LASSERT(count == PAGE_SIZE);
1188
1189         addr = kmap(page);
1190
1191         for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1192                 if (rw == OBD_BRW_WRITE) {
1193                         stripe_off = offset + delta;
1194                         stripe_id = id;
1195                 } else {
1196                         stripe_off = 0xdeadbeef00c0ffeeULL;
1197                         stripe_id = 0xdeadbeef00c0ffeeULL;
1198                 }
1199                 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
1200                                   stripe_off, stripe_id);
1201         }
1202
1203         kunmap(page);
1204 }
1205
1206 static int echo_client_page_debug_check(struct page *page, u64 id,
1207                                         u64 offset, u64 count)
1208 {
1209         u64     stripe_off;
1210         u64     stripe_id;
1211         char   *addr;
1212         int     delta;
1213         int     rc;
1214         int     rc2;
1215
1216         /* no partial pages on the client */
1217         LASSERT(count == PAGE_SIZE);
1218
1219         addr = kmap(page);
1220
1221         for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1222                 stripe_off = offset + delta;
1223                 stripe_id = id;
1224
1225                 rc2 = block_debug_check("test_brw",
1226                                         addr + delta, OBD_ECHO_BLOCK_SIZE,
1227                                         stripe_off, stripe_id);
1228                 if (rc2 != 0) {
1229                         CERROR("Error in echo object %#llx\n", id);
1230                         rc = rc2;
1231                 }
1232         }
1233
1234         kunmap(page);
1235         return rc;
1236 }
1237
1238 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
1239                             struct echo_object *eco, u64 offset,
1240                             u64 count, int async,
1241                             struct obd_trans_info *oti)
1242 {
1243         u32            npages;
1244         struct brw_page *pga;
1245         struct brw_page *pgp;
1246         struct page         **pages;
1247         u64              off;
1248         int                  i;
1249         int                  rc;
1250         int                  verify;
1251         gfp_t                gfp_mask;
1252         int                  brw_flags = 0;
1253
1254         verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
1255                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
1256                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
1257
1258         gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
1259
1260         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
1261
1262         if (count <= 0 ||
1263             (count & (~PAGE_MASK)) != 0)
1264                 return -EINVAL;
1265
1266         /* XXX think again with misaligned I/O */
1267         npages = count >> PAGE_SHIFT;
1268
1269         if (rw == OBD_BRW_WRITE)
1270                 brw_flags = OBD_BRW_ASYNC;
1271
1272         pga = kcalloc(npages, sizeof(*pga), GFP_NOFS);
1273         if (!pga)
1274                 return -ENOMEM;
1275
1276         pages = kcalloc(npages, sizeof(*pages), GFP_NOFS);
1277         if (!pages) {
1278                 kfree(pga);
1279                 return -ENOMEM;
1280         }
1281
1282         for (i = 0, pgp = pga, off = offset;
1283              i < npages;
1284              i++, pgp++, off += PAGE_SIZE) {
1285                 LASSERT(!pgp->pg);      /* for cleanup */
1286
1287                 rc = -ENOMEM;
1288                 pgp->pg = alloc_page(gfp_mask);
1289                 if (!pgp->pg)
1290                         goto out;
1291
1292                 pages[i] = pgp->pg;
1293                 pgp->count = PAGE_SIZE;
1294                 pgp->off = off;
1295                 pgp->flag = brw_flags;
1296
1297                 if (verify)
1298                         echo_client_page_debug_setup(pgp->pg, rw,
1299                                                      ostid_id(&oa->o_oi), off,
1300                                                      pgp->count);
1301         }
1302
1303         /* brw mode can only be used at client */
1304         LASSERT(ed->ed_next);
1305         rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
1306
1307  out:
1308         if (rc != 0 || rw != OBD_BRW_READ)
1309                 verify = 0;
1310
1311         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
1312                 if (!pgp->pg)
1313                         continue;
1314
1315                 if (verify) {
1316                         int vrc;
1317
1318                         vrc = echo_client_page_debug_check(pgp->pg,
1319                                                            ostid_id(&oa->o_oi),
1320                                                            pgp->off, pgp->count);
1321                         if (vrc != 0 && rc == 0)
1322                                 rc = vrc;
1323                 }
1324                 __free_page(pgp->pg);
1325         }
1326         kfree(pga);
1327         kfree(pages);
1328         return rc;
1329 }
1330
1331 static int echo_client_prep_commit(const struct lu_env *env,
1332                                    struct obd_export *exp, int rw,
1333                                    struct obdo *oa, struct echo_object *eco,
1334                                    u64 offset, u64 count,
1335                                    u64 batch, struct obd_trans_info *oti,
1336                                    int async)
1337 {
1338         struct obd_ioobj ioo;
1339         struct niobuf_local *lnb;
1340         struct niobuf_remote *rnb;
1341         u64 off;
1342         u64 npages, tot_pages;
1343         int i, ret = 0, brw_flags = 0;
1344
1345         if (count <= 0 || (count & (~PAGE_MASK)) != 0)
1346                 return -EINVAL;
1347
1348         npages = batch >> PAGE_SHIFT;
1349         tot_pages = count >> PAGE_SHIFT;
1350
1351         lnb = kcalloc(npages, sizeof(struct niobuf_local), GFP_NOFS);
1352         rnb = kcalloc(npages, sizeof(struct niobuf_remote), GFP_NOFS);
1353
1354         if (!lnb || !rnb) {
1355                 ret = -ENOMEM;
1356                 goto out;
1357         }
1358
1359         if (rw == OBD_BRW_WRITE && async)
1360                 brw_flags |= OBD_BRW_ASYNC;
1361
1362         obdo_to_ioobj(oa, &ioo);
1363
1364         off = offset;
1365
1366         for (; tot_pages; tot_pages -= npages) {
1367                 int lpages;
1368
1369                 if (tot_pages < npages)
1370                         npages = tot_pages;
1371
1372                 for (i = 0; i < npages; i++, off += PAGE_SIZE) {
1373                         rnb[i].rnb_offset = off;
1374                         rnb[i].rnb_len = PAGE_SIZE;
1375                         rnb[i].rnb_flags = brw_flags;
1376                 }
1377
1378                 ioo.ioo_bufcnt = npages;
1379
1380                 lpages = npages;
1381                 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
1382                                  lnb, oti);
1383                 if (ret != 0)
1384                         goto out;
1385                 LASSERT(lpages == npages);
1386
1387                 for (i = 0; i < lpages; i++) {
1388                         struct page *page = lnb[i].lnb_page;
1389
1390                         /* read past eof? */
1391                         if (!page  && lnb[i].lnb_rc == 0)
1392                                 continue;
1393
1394                         if (async)
1395                                 lnb[i].lnb_flags |= OBD_BRW_ASYNC;
1396
1397                         if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
1398                             (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
1399                             (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
1400                                 continue;
1401
1402                         if (rw == OBD_BRW_WRITE)
1403                                 echo_client_page_debug_setup(page, rw,
1404                                                             ostid_id(&oa->o_oi),
1405                                                              rnb[i].rnb_offset,
1406                                                              rnb[i].rnb_len);
1407                         else
1408                                 echo_client_page_debug_check(page,
1409                                                             ostid_id(&oa->o_oi),
1410                                                              rnb[i].rnb_offset,
1411                                                              rnb[i].rnb_len);
1412                 }
1413
1414                 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
1415                                    rnb, npages, lnb, oti, ret);
1416                 if (ret != 0)
1417                         goto out;
1418
1419                 /* Reset oti otherwise it would confuse ldiskfs. */
1420                 memset(oti, 0, sizeof(*oti));
1421
1422                 /* Reuse env context. */
1423                 lu_context_exit((struct lu_context *)&env->le_ctx);
1424                 lu_context_enter((struct lu_context *)&env->le_ctx);
1425         }
1426
1427 out:
1428         kfree(lnb);
1429         kfree(rnb);
1430         return ret;
1431 }
1432
1433 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
1434                                  struct obd_export *exp,
1435                                  struct obd_ioctl_data *data,
1436                                  struct obd_trans_info *dummy_oti)
1437 {
1438         struct obd_device *obd = class_exp2obd(exp);
1439         struct echo_device *ed = obd2echo_dev(obd);
1440         struct echo_client_obd *ec = ed->ed_ec;
1441         struct obdo *oa = &data->ioc_obdo1;
1442         struct echo_object *eco;
1443         int rc;
1444         int async = 1;
1445         long test_mode;
1446
1447         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1448
1449         rc = echo_get_object(&eco, ed, oa);
1450         if (rc)
1451                 return rc;
1452
1453         oa->o_valid &= ~OBD_MD_FLHANDLE;
1454
1455         /* OFD/obdfilter works only via prep/commit */
1456         test_mode = (long)data->ioc_pbuf1;
1457         if (test_mode == 1)
1458                 async = 0;
1459
1460         if (!ed->ed_next && test_mode != 3) {
1461                 test_mode = 3;
1462                 data->ioc_plen1 = data->ioc_count;
1463         }
1464
1465         /* Truncate batch size to maximum */
1466         if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
1467                 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
1468
1469         switch (test_mode) {
1470         case 1:
1471                 /* fall through */
1472         case 2:
1473                 rc = echo_client_kbrw(ed, rw, oa,
1474                                       eco, data->ioc_offset,
1475                                       data->ioc_count, async, dummy_oti);
1476                 break;
1477         case 3:
1478                 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
1479                                              eco, data->ioc_offset,
1480                                              data->ioc_count, data->ioc_plen1,
1481                                              dummy_oti, async);
1482                 break;
1483         default:
1484                 rc = -EINVAL;
1485         }
1486         echo_put_object(eco);
1487         return rc;
1488 }
1489
1490 static int
1491 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1492                       void *karg, void __user *uarg)
1493 {
1494         struct obd_device      *obd = exp->exp_obd;
1495         struct echo_device     *ed = obd2echo_dev(obd);
1496         struct echo_client_obd *ec = ed->ed_ec;
1497         struct echo_object     *eco;
1498         struct obd_ioctl_data  *data = karg;
1499         struct obd_trans_info   dummy_oti;
1500         struct lu_env     *env;
1501         struct oti_req_ack_lock *ack_lock;
1502         struct obdo         *oa;
1503         struct lu_fid      fid;
1504         int                  rw = OBD_BRW_READ;
1505         int                  rc = 0;
1506         int                  i;
1507
1508         memset(&dummy_oti, 0, sizeof(dummy_oti));
1509
1510         oa = &data->ioc_obdo1;
1511         if (!(oa->o_valid & OBD_MD_FLGROUP)) {
1512                 oa->o_valid |= OBD_MD_FLGROUP;
1513                 ostid_set_seq_echo(&oa->o_oi);
1514         }
1515
1516         /* This FID is unpacked just for validation at this point */
1517         rc = ostid_to_fid(&fid, &oa->o_oi, 0);
1518         if (rc < 0)
1519                 return rc;
1520
1521         env = kzalloc(sizeof(*env), GFP_NOFS);
1522         if (!env)
1523                 return -ENOMEM;
1524
1525         rc = lu_env_init(env, LCT_DT_THREAD);
1526         if (rc) {
1527                 rc = -ENOMEM;
1528                 goto out;
1529         }
1530
1531         switch (cmd) {
1532         case OBD_IOC_CREATE:                /* may create echo object */
1533                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1534                         rc = -EPERM;
1535                         goto out;
1536                 }
1537
1538                 rc = echo_create_object(env, ed, oa, &dummy_oti);
1539                 goto out;
1540
1541         case OBD_IOC_DESTROY:
1542                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1543                         rc = -EPERM;
1544                         goto out;
1545                 }
1546
1547                 rc = echo_get_object(&eco, ed, oa);
1548                 if (rc == 0) {
1549                         rc = obd_destroy(env, ec->ec_exp, oa, &dummy_oti);
1550                         if (rc == 0)
1551                                 eco->eo_deleted = 1;
1552                         echo_put_object(eco);
1553                 }
1554                 goto out;
1555
1556         case OBD_IOC_GETATTR:
1557                 rc = echo_get_object(&eco, ed, oa);
1558                 if (rc == 0) {
1559                         struct obd_info oinfo = {
1560                                 .oi_oa = oa,
1561                         };
1562
1563                         rc = obd_getattr(env, ec->ec_exp, &oinfo);
1564                         echo_put_object(eco);
1565                 }
1566                 goto out;
1567
1568         case OBD_IOC_SETATTR:
1569                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1570                         rc = -EPERM;
1571                         goto out;
1572                 }
1573
1574                 rc = echo_get_object(&eco, ed, oa);
1575                 if (rc == 0) {
1576                         struct obd_info oinfo = {
1577                                 .oi_oa = oa,
1578                         };
1579
1580                         rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
1581                         echo_put_object(eco);
1582                 }
1583                 goto out;
1584
1585         case OBD_IOC_BRW_WRITE:
1586                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1587                         rc = -EPERM;
1588                         goto out;
1589                 }
1590
1591                 rw = OBD_BRW_WRITE;
1592                 /* fall through */
1593         case OBD_IOC_BRW_READ:
1594                 rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
1595                 goto out;
1596
1597         default:
1598                 CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
1599                 rc = -ENOTTY;
1600                 goto out;
1601         }
1602
1603 out:
1604         lu_env_fini(env);
1605         kfree(env);
1606
1607         /* XXX this should be in a helper also called by target_send_reply */
1608         for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
1609              i++, ack_lock++) {
1610                 if (!ack_lock->mode)
1611                         break;
1612                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
1613         }
1614
1615         return rc;
1616 }
1617
1618 static int echo_client_setup(const struct lu_env *env,
1619                              struct obd_device *obddev, struct lustre_cfg *lcfg)
1620 {
1621         struct echo_client_obd *ec = &obddev->u.echo_client;
1622         struct obd_device *tgt;
1623         struct obd_uuid echo_uuid = { "ECHO_UUID" };
1624         struct obd_connect_data *ocd = NULL;
1625         int rc;
1626
1627         if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1628                 CERROR("requires a TARGET OBD name\n");
1629                 return -EINVAL;
1630         }
1631
1632         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
1633         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
1634                 CERROR("device not attached or not set up (%s)\n",
1635                        lustre_cfg_string(lcfg, 1));
1636                 return -EINVAL;
1637         }
1638
1639         spin_lock_init(&ec->ec_lock);
1640         INIT_LIST_HEAD(&ec->ec_objects);
1641         INIT_LIST_HEAD(&ec->ec_locks);
1642         ec->ec_unique = 0;
1643
1644         ocd = kzalloc(sizeof(*ocd), GFP_NOFS);
1645         if (!ocd)
1646                 return -ENOMEM;
1647
1648         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
1649                                  OBD_CONNECT_BRW_SIZE |
1650                                  OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
1651                                  OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
1652                                  OBD_CONNECT_FID;
1653         ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
1654         ocd->ocd_version = LUSTRE_VERSION_CODE;
1655         ocd->ocd_group = FID_SEQ_ECHO;
1656
1657         rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
1658
1659         kfree(ocd);
1660
1661         if (rc != 0) {
1662                 CERROR("fail to connect to device %s\n",
1663                        lustre_cfg_string(lcfg, 1));
1664                 return rc;
1665         }
1666
1667         return rc;
1668 }
1669
1670 static int echo_client_cleanup(struct obd_device *obddev)
1671 {
1672         struct echo_client_obd *ec = &obddev->u.echo_client;
1673         int rc;
1674
1675         if (!list_empty(&obddev->obd_exports)) {
1676                 CERROR("still has clients!\n");
1677                 return -EBUSY;
1678         }
1679
1680         LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
1681         rc = obd_disconnect(ec->ec_exp);
1682         if (rc != 0)
1683                 CERROR("fail to disconnect device: %d\n", rc);
1684
1685         return rc;
1686 }
1687
1688 static int echo_client_connect(const struct lu_env *env,
1689                                struct obd_export **exp,
1690                                struct obd_device *src, struct obd_uuid *cluuid,
1691                                struct obd_connect_data *data, void *localdata)
1692 {
1693         int             rc;
1694         struct lustre_handle conn = { 0 };
1695
1696         rc = class_connect(&conn, src, cluuid);
1697         if (rc == 0) {
1698                 *exp = class_conn2export(&conn);
1699         }
1700
1701         return rc;
1702 }
1703
1704 static int echo_client_disconnect(struct obd_export *exp)
1705 {
1706         int                  rc;
1707
1708         if (!exp) {
1709                 rc = -EINVAL;
1710                 goto out;
1711         }
1712
1713         rc = class_disconnect(exp);
1714         goto out;
1715  out:
1716         return rc;
1717 }
1718
1719 static struct obd_ops echo_client_obd_ops = {
1720         .owner          = THIS_MODULE,
1721         .iocontrol      = echo_client_iocontrol,
1722         .connect        = echo_client_connect,
1723         .disconnect     = echo_client_disconnect
1724 };
1725
1726 static int echo_client_init(void)
1727 {
1728         int rc;
1729
1730         rc = lu_kmem_init(echo_caches);
1731         if (rc == 0) {
1732                 rc = class_register_type(&echo_client_obd_ops, NULL,
1733                                          LUSTRE_ECHO_CLIENT_NAME,
1734                                          &echo_device_type);
1735                 if (rc)
1736                         lu_kmem_fini(echo_caches);
1737         }
1738         return rc;
1739 }
1740
1741 static void echo_client_exit(void)
1742 {
1743         class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
1744         lu_kmem_fini(echo_caches);
1745 }
1746
1747 static int __init obdecho_init(void)
1748 {
1749         LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
1750
1751         LASSERT(PAGE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
1752
1753         return echo_client_init();
1754 }
1755
1756 static void /*__exit*/ obdecho_exit(void)
1757 {
1758         echo_client_exit();
1759 }
1760
1761 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
1762 MODULE_DESCRIPTION("Lustre Echo Client test driver");
1763 MODULE_VERSION(LUSTRE_VERSION_STRING);
1764 MODULE_LICENSE("GPL");
1765
1766 module_init(obdecho_init);
1767 module_exit(obdecho_exit);
1768
1769 /** @} echo_client */