Linux-libre 4.9.123-gnu
[librecmc/linux-libre.git] / drivers / staging / lustre / lustre / lov / lov_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_LOV
34
35 #include "../../include/linux/libcfs/libcfs.h"
36
37 #include "../include/obd_class.h"
38 #include "../include/lustre/lustre_idl.h"
39 #include "lov_internal.h"
40
41 static void lov_init_set(struct lov_request_set *set)
42 {
43         set->set_count = 0;
44         atomic_set(&set->set_completes, 0);
45         atomic_set(&set->set_success, 0);
46         atomic_set(&set->set_finish_checked, 0);
47         set->set_cookies = NULL;
48         INIT_LIST_HEAD(&set->set_list);
49         atomic_set(&set->set_refcount, 1);
50         init_waitqueue_head(&set->set_waitq);
51 }
52
53 void lov_finish_set(struct lov_request_set *set)
54 {
55         struct list_head *pos, *n;
56
57         LASSERT(set);
58         list_for_each_safe(pos, n, &set->set_list) {
59                 struct lov_request *req = list_entry(pos,
60                                                          struct lov_request,
61                                                          rq_link);
62                 list_del_init(&req->rq_link);
63
64                 if (req->rq_oi.oi_oa)
65                         kmem_cache_free(obdo_cachep, req->rq_oi.oi_oa);
66                 kfree(req->rq_oi.oi_osfs);
67                 kfree(req);
68         }
69         kfree(set);
70 }
71
72 static int lov_set_finished(struct lov_request_set *set, int idempotent)
73 {
74         int completes = atomic_read(&set->set_completes);
75
76         CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
77
78         if (completes == set->set_count) {
79                 if (idempotent)
80                         return 1;
81                 if (atomic_inc_return(&set->set_finish_checked) == 1)
82                         return 1;
83         }
84         return 0;
85 }
86
87 static void lov_update_set(struct lov_request_set *set,
88                            struct lov_request *req, int rc)
89 {
90         req->rq_complete = 1;
91         req->rq_rc = rc;
92
93         atomic_inc(&set->set_completes);
94         if (rc == 0)
95                 atomic_inc(&set->set_success);
96
97         wake_up(&set->set_waitq);
98 }
99
100 int lov_update_common_set(struct lov_request_set *set,
101                           struct lov_request *req, int rc)
102 {
103         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
104
105         lov_update_set(set, req, rc);
106
107         /* grace error on inactive ost */
108         if (rc && !(lov->lov_tgts[req->rq_idx] &&
109                     lov->lov_tgts[req->rq_idx]->ltd_active))
110                 rc = 0;
111
112         /* FIXME in raid1 regime, should return 0 */
113         return rc;
114 }
115
116 static void lov_set_add_req(struct lov_request *req,
117                             struct lov_request_set *set)
118 {
119         list_add_tail(&req->rq_link, &set->set_list);
120         set->set_count++;
121         req->rq_rqset = set;
122 }
123
124 static int lov_check_set(struct lov_obd *lov, int idx)
125 {
126         int rc;
127         struct lov_tgt_desc *tgt;
128
129         mutex_lock(&lov->lov_lock);
130         tgt = lov->lov_tgts[idx];
131         rc = !tgt || tgt->ltd_active ||
132                 (tgt->ltd_exp &&
133                  class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried);
134         mutex_unlock(&lov->lov_lock);
135
136         return rc;
137 }
138
139 /* Check if the OSC connection exists and is active.
140  * If the OSC has not yet had a chance to connect to the OST the first time,
141  * wait once for it to connect instead of returning an error.
142  */
143 static int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
144 {
145         wait_queue_head_t waitq;
146         struct l_wait_info lwi;
147         struct lov_tgt_desc *tgt;
148         int rc = 0;
149
150         mutex_lock(&lov->lov_lock);
151
152         tgt = lov->lov_tgts[ost_idx];
153
154         if (unlikely(!tgt)) {
155                 rc = 0;
156                 goto out;
157         }
158
159         if (likely(tgt->ltd_active)) {
160                 rc = 1;
161                 goto out;
162         }
163
164         if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried) {
165                 rc = 0;
166                 goto out;
167         }
168
169         mutex_unlock(&lov->lov_lock);
170
171         init_waitqueue_head(&waitq);
172         lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout),
173                                    cfs_time_seconds(1), NULL, NULL);
174
175         rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi);
176         if (tgt->ltd_active)
177                 return 1;
178
179         return 0;
180
181 out:
182         mutex_unlock(&lov->lov_lock);
183         return rc;
184 }
185
186 static int common_attr_done(struct lov_request_set *set)
187 {
188         struct lov_request *req;
189         struct obdo *tmp_oa;
190         int rc = 0, attrset = 0;
191
192         if (!set->set_oi->oi_oa)
193                 return 0;
194
195         if (!atomic_read(&set->set_success))
196                 return -EIO;
197
198         tmp_oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
199         if (!tmp_oa) {
200                 rc = -ENOMEM;
201                 goto out;
202         }
203
204         list_for_each_entry(req, &set->set_list, rq_link) {
205                 if (!req->rq_complete || req->rq_rc)
206                         continue;
207                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
208                         continue;
209                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
210                                 req->rq_oi.oi_oa->o_valid,
211                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
212         }
213         if (!attrset) {
214                 CERROR("No stripes had valid attrs\n");
215                 rc = -EIO;
216         }
217         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
218             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
219                 /* When we take attributes of some epoch, we require all the
220                  * ost to be active.
221                  */
222                 CERROR("Not all the stripes had valid attrs\n");
223                 rc = -EIO;
224                 goto out;
225         }
226
227         tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
228         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
229 out:
230         if (tmp_oa)
231                 kmem_cache_free(obdo_cachep, tmp_oa);
232         return rc;
233 }
234
235 int lov_fini_getattr_set(struct lov_request_set *set)
236 {
237         int rc = 0;
238
239         if (!set)
240                 return 0;
241         LASSERT(set->set_exp);
242         if (atomic_read(&set->set_completes))
243                 rc = common_attr_done(set);
244
245         lov_put_reqset(set);
246
247         return rc;
248 }
249
250 /* The callback for osc_getattr_async that finalizes a request info when a
251  * response is received.
252  */
253 static int cb_getattr_update(void *cookie, int rc)
254 {
255         struct obd_info *oinfo = cookie;
256         struct lov_request *lovreq;
257
258         lovreq = container_of(oinfo, struct lov_request, rq_oi);
259         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
260 }
261
262 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
263                          struct lov_request_set **reqset)
264 {
265         struct lov_request_set *set;
266         struct lov_obd *lov = &exp->exp_obd->u.lov;
267         int rc = 0, i;
268
269         set = kzalloc(sizeof(*set), GFP_NOFS);
270         if (!set)
271                 return -ENOMEM;
272         lov_init_set(set);
273
274         set->set_exp = exp;
275         set->set_oi = oinfo;
276
277         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
278                 struct lov_oinfo *loi;
279                 struct lov_request *req;
280
281                 loi = oinfo->oi_md->lsm_oinfo[i];
282                 if (lov_oinfo_is_dummy(loi))
283                         continue;
284
285                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
286                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
287                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH) {
288                                 /* SOM requires all the OSTs to be active. */
289                                 rc = -EIO;
290                                 goto out_set;
291                         }
292                         continue;
293                 }
294
295                 req = kzalloc(sizeof(*req), GFP_NOFS);
296                 if (!req) {
297                         rc = -ENOMEM;
298                         goto out_set;
299                 }
300
301                 req->rq_stripe = i;
302                 req->rq_idx = loi->loi_ost_idx;
303
304                 req->rq_oi.oi_oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
305                 if (!req->rq_oi.oi_oa) {
306                         kfree(req);
307                         rc = -ENOMEM;
308                         goto out_set;
309                 }
310                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
311                        sizeof(*req->rq_oi.oi_oa));
312                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
313                 req->rq_oi.oi_cb_up = cb_getattr_update;
314
315                 lov_set_add_req(req, set);
316         }
317         if (!set->set_count) {
318                 rc = -EIO;
319                 goto out_set;
320         }
321         *reqset = set;
322         return rc;
323 out_set:
324         lov_fini_getattr_set(set);
325         return rc;
326 }
327
328 int lov_fini_setattr_set(struct lov_request_set *set)
329 {
330         int rc = 0;
331
332         if (!set)
333                 return 0;
334         LASSERT(set->set_exp);
335         if (atomic_read(&set->set_completes)) {
336                 rc = common_attr_done(set);
337                 /* FIXME update qos data here */
338         }
339
340         lov_put_reqset(set);
341         return rc;
342 }
343
344 int lov_update_setattr_set(struct lov_request_set *set,
345                            struct lov_request *req, int rc)
346 {
347         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
348         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
349
350         lov_update_set(set, req, rc);
351
352         /* grace error on inactive ost */
353         if (rc && !(lov->lov_tgts[req->rq_idx] &&
354                     lov->lov_tgts[req->rq_idx]->ltd_active))
355                 rc = 0;
356
357         if (rc == 0) {
358                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
359                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
360                                 req->rq_oi.oi_oa->o_ctime;
361                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
362                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
363                                 req->rq_oi.oi_oa->o_mtime;
364                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
365                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
366                                 req->rq_oi.oi_oa->o_atime;
367         }
368
369         return rc;
370 }
371
372 /* The callback for osc_setattr_async that finalizes a request info when a
373  * response is received.
374  */
375 static int cb_setattr_update(void *cookie, int rc)
376 {
377         struct obd_info *oinfo = cookie;
378         struct lov_request *lovreq;
379
380         lovreq = container_of(oinfo, struct lov_request, rq_oi);
381         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
382 }
383
384 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
385                          struct obd_trans_info *oti,
386                          struct lov_request_set **reqset)
387 {
388         struct lov_request_set *set;
389         struct lov_obd *lov = &exp->exp_obd->u.lov;
390         int rc = 0, i;
391
392         set = kzalloc(sizeof(*set), GFP_NOFS);
393         if (!set)
394                 return -ENOMEM;
395         lov_init_set(set);
396
397         set->set_exp = exp;
398         set->set_oi = oinfo;
399         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
400                 set->set_cookies = oti->oti_logcookies;
401
402         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
403                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
404                 struct lov_request *req;
405
406                 if (lov_oinfo_is_dummy(loi))
407                         continue;
408
409                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
410                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
411                         continue;
412                 }
413
414                 req = kzalloc(sizeof(*req), GFP_NOFS);
415                 if (!req) {
416                         rc = -ENOMEM;
417                         goto out_set;
418                 }
419                 req->rq_stripe = i;
420                 req->rq_idx = loi->loi_ost_idx;
421
422                 req->rq_oi.oi_oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
423                 if (!req->rq_oi.oi_oa) {
424                         kfree(req);
425                         rc = -ENOMEM;
426                         goto out_set;
427                 }
428                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
429                        sizeof(*req->rq_oi.oi_oa));
430                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
431                 req->rq_oi.oi_oa->o_stripe_idx = i;
432                 req->rq_oi.oi_cb_up = cb_setattr_update;
433
434                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
435                         int off = lov_stripe_offset(oinfo->oi_md,
436                                                     oinfo->oi_oa->o_size, i,
437                                                     &req->rq_oi.oi_oa->o_size);
438
439                         if (off < 0 && req->rq_oi.oi_oa->o_size)
440                                 req->rq_oi.oi_oa->o_size--;
441
442                         CDEBUG(D_INODE, "stripe %d has size %llu/%llu\n",
443                                i, req->rq_oi.oi_oa->o_size,
444                                oinfo->oi_oa->o_size);
445                 }
446                 lov_set_add_req(req, set);
447         }
448         if (!set->set_count) {
449                 rc = -EIO;
450                 goto out_set;
451         }
452         *reqset = set;
453         return rc;
454 out_set:
455         lov_fini_setattr_set(set);
456         return rc;
457 }
458
459 #define LOV_U64_MAX ((__u64)~0ULL)
460 #define LOV_SUM_MAX(tot, add)                                      \
461         do {                                                        \
462                 if ((tot) + (add) < (tot))                            \
463                         (tot) = LOV_U64_MAX;                        \
464                 else                                                \
465                         (tot) += (add);                          \
466         } while (0)
467
468 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,
469                     int success)
470 {
471         if (success) {
472                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
473                                                            LOV_MAGIC, 0);
474                 if (osfs->os_files != LOV_U64_MAX)
475                         lov_do_div64(osfs->os_files, expected_stripes);
476                 if (osfs->os_ffree != LOV_U64_MAX)
477                         lov_do_div64(osfs->os_ffree, expected_stripes);
478
479                 spin_lock(&obd->obd_osfs_lock);
480                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
481                 obd->obd_osfs_age = cfs_time_current_64();
482                 spin_unlock(&obd->obd_osfs_lock);
483                 return 0;
484         }
485
486         return -EIO;
487 }
488
489 int lov_fini_statfs_set(struct lov_request_set *set)
490 {
491         int rc = 0;
492
493         if (!set)
494                 return 0;
495
496         if (atomic_read(&set->set_completes)) {
497                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
498                                      atomic_read(&set->set_success));
499         }
500         lov_put_reqset(set);
501         return rc;
502 }
503
504 static void lov_update_statfs(struct obd_statfs *osfs,
505                               struct obd_statfs *lov_sfs,
506                               int success)
507 {
508         int shift = 0, quit = 0;
509         __u64 tmp;
510
511         if (success == 0) {
512                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
513         } else {
514                 if (osfs->os_bsize != lov_sfs->os_bsize) {
515                         /* assume all block sizes are always powers of 2 */
516                         /* get the bits difference */
517                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
518                         for (shift = 0; shift <= 64; ++shift) {
519                                 if (tmp & 1) {
520                                         if (quit)
521                                                 break;
522                                         quit = 1;
523                                         shift = 0;
524                                 }
525                                 tmp >>= 1;
526                         }
527                 }
528
529                 if (osfs->os_bsize < lov_sfs->os_bsize) {
530                         osfs->os_bsize = lov_sfs->os_bsize;
531
532                         osfs->os_bfree  >>= shift;
533                         osfs->os_bavail >>= shift;
534                         osfs->os_blocks >>= shift;
535                 } else if (shift != 0) {
536                         lov_sfs->os_bfree  >>= shift;
537                         lov_sfs->os_bavail >>= shift;
538                         lov_sfs->os_blocks >>= shift;
539                 }
540                 osfs->os_bfree += lov_sfs->os_bfree;
541                 osfs->os_bavail += lov_sfs->os_bavail;
542                 osfs->os_blocks += lov_sfs->os_blocks;
543                 /* XXX not sure about this one - depends on policy.
544                  *   - could be minimum if we always stripe on all OBDs
545                  *     (but that would be wrong for any other policy,
546                  *     if one of the OBDs has no more objects left)
547                  *   - could be sum if we stripe whole objects
548                  *   - could be average, just to give a nice number
549                  *
550                  * To give a "reasonable" (if not wholly accurate)
551                  * number, we divide the total number of free objects
552                  * by expected stripe count (watch out for overflow).
553                  */
554                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
555                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
556         }
557 }
558
559 /* The callback for osc_statfs_async that finalizes a request info when a
560  * response is received.
561  */
562 static int cb_statfs_update(void *cookie, int rc)
563 {
564         struct obd_info *oinfo = cookie;
565         struct lov_request *lovreq;
566         struct lov_request_set *set;
567         struct obd_statfs *osfs, *lov_sfs;
568         struct lov_obd *lov;
569         struct lov_tgt_desc *tgt;
570         struct obd_device *lovobd, *tgtobd;
571         int success;
572
573         lovreq = container_of(oinfo, struct lov_request, rq_oi);
574         set = lovreq->rq_rqset;
575         lovobd = set->set_obd;
576         lov = &lovobd->u.lov;
577         osfs = set->set_oi->oi_osfs;
578         lov_sfs = oinfo->oi_osfs;
579         success = atomic_read(&set->set_success);
580         /* XXX: the same is done in lov_update_common_set, however
581          * lovset->set_exp is not initialized.
582          */
583         lov_update_set(set, lovreq, rc);
584         if (rc)
585                 goto out;
586
587         obd_getref(lovobd);
588         tgt = lov->lov_tgts[lovreq->rq_idx];
589         if (!tgt || !tgt->ltd_active)
590                 goto out_update;
591
592         tgtobd = class_exp2obd(tgt->ltd_exp);
593         spin_lock(&tgtobd->obd_osfs_lock);
594         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
595         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
596                 tgtobd->obd_osfs_age = cfs_time_current_64();
597         spin_unlock(&tgtobd->obd_osfs_lock);
598
599 out_update:
600         lov_update_statfs(osfs, lov_sfs, success);
601         obd_putref(lovobd);
602
603 out:
604         if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
605             lov_set_finished(set, 0)) {
606                 lov_statfs_interpret(NULL, set, set->set_count !=
607                                      atomic_read(&set->set_success));
608         }
609
610         return 0;
611 }
612
613 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
614                         struct lov_request_set **reqset)
615 {
616         struct lov_request_set *set;
617         struct lov_obd *lov = &obd->u.lov;
618         int rc = 0, i;
619
620         set = kzalloc(sizeof(*set), GFP_NOFS);
621         if (!set)
622                 return -ENOMEM;
623         lov_init_set(set);
624
625         set->set_obd = obd;
626         set->set_oi = oinfo;
627
628         /* We only get block data from the OBD */
629         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
630                 struct lov_request *req;
631
632                 if (!lov->lov_tgts[i] ||
633                     (oinfo->oi_flags & OBD_STATFS_NODELAY &&
634                      !lov->lov_tgts[i]->ltd_active)) {
635                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
636                         continue;
637                 }
638
639                 if (!lov->lov_tgts[i]->ltd_active)
640                         lov_check_and_wait_active(lov, i);
641
642                 /* skip targets that have been explicitly disabled by the
643                  * administrator
644                  */
645                 if (!lov->lov_tgts[i]->ltd_exp) {
646                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
647                         continue;
648                 }
649
650                 req = kzalloc(sizeof(*req), GFP_NOFS);
651                 if (!req) {
652                         rc = -ENOMEM;
653                         goto out_set;
654                 }
655
656                 req->rq_oi.oi_osfs = kzalloc(sizeof(*req->rq_oi.oi_osfs),
657                                              GFP_NOFS);
658                 if (!req->rq_oi.oi_osfs) {
659                         kfree(req);
660                         rc = -ENOMEM;
661                         goto out_set;
662                 }
663
664                 req->rq_idx = i;
665                 req->rq_oi.oi_cb_up = cb_statfs_update;
666                 req->rq_oi.oi_flags = oinfo->oi_flags;
667
668                 lov_set_add_req(req, set);
669         }
670         if (!set->set_count) {
671                 rc = -EIO;
672                 goto out_set;
673         }
674         *reqset = set;
675         return rc;
676 out_set:
677         lov_fini_statfs_set(set);
678         return rc;
679 }