Linux-libre 4.14.138-gnu
[librecmc/linux-libre.git] / drivers / staging / lustre / lustre / obdclass / llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/llog.c
33  *
34  * OST<->MDS recovery logging infrastructure.
35  * Invariants in implementation:
36  * - we do not share logs among different OST<->MDS connections, so that
37  *   if an OST or MDS fails it need only look at log(s) relevant to itself
38  *
39  * Author: Andreas Dilger <adilger@clusterfs.com>
40  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
41  * Author: Mikhail Pershin <tappro@whamcloud.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_LOG
45
46 #include <llog_swab.h>
47 #include <lustre_log.h>
48 #include <obd_class.h>
49 #include "llog_internal.h"
50
51 /*
52  * Allocate a new log or catalog handle
53  * Used inside llog_open().
54  */
55 static struct llog_handle *llog_alloc_handle(void)
56 {
57         struct llog_handle *loghandle;
58
59         loghandle = kzalloc(sizeof(*loghandle), GFP_NOFS);
60         if (!loghandle)
61                 return NULL;
62
63         init_rwsem(&loghandle->lgh_lock);
64         spin_lock_init(&loghandle->lgh_hdr_lock);
65         INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
66         atomic_set(&loghandle->lgh_refcount, 1);
67
68         return loghandle;
69 }
70
71 /*
72  * Free llog handle and header data if exists. Used in llog_close() only
73  */
74 static void llog_free_handle(struct llog_handle *loghandle)
75 {
76         /* failed llog_init_handle */
77         if (!loghandle->lgh_hdr)
78                 goto out;
79
80         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
81                 LASSERT(list_empty(&loghandle->u.phd.phd_entry));
82         else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
83                 LASSERT(list_empty(&loghandle->u.chd.chd_head));
84         kvfree(loghandle->lgh_hdr);
85 out:
86         kfree(loghandle);
87 }
88
89 void llog_handle_get(struct llog_handle *loghandle)
90 {
91         atomic_inc(&loghandle->lgh_refcount);
92 }
93
94 void llog_handle_put(struct llog_handle *loghandle)
95 {
96         LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
97         if (atomic_dec_and_test(&loghandle->lgh_refcount))
98                 llog_free_handle(loghandle);
99 }
100
101 static int llog_read_header(const struct lu_env *env,
102                             struct llog_handle *handle,
103                             struct obd_uuid *uuid)
104 {
105         struct llog_operations *lop;
106         int rc;
107
108         rc = llog_handle2ops(handle, &lop);
109         if (rc)
110                 return rc;
111
112         if (!lop->lop_read_header)
113                 return -EOPNOTSUPP;
114
115         rc = lop->lop_read_header(env, handle);
116         if (rc == LLOG_EEMPTY) {
117                 struct llog_log_hdr *llh = handle->lgh_hdr;
118                 size_t len;
119
120                 /* lrh_len should be initialized in llog_init_handle */
121                 handle->lgh_last_idx = 0; /* header is record with index 0 */
122                 llh->llh_count = 1;      /* for the header record */
123                 llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
124                 LASSERT(handle->lgh_ctxt->loc_chunk_size >= LLOG_MIN_CHUNK_SIZE);
125                 llh->llh_hdr.lrh_len = handle->lgh_ctxt->loc_chunk_size;
126                 llh->llh_hdr.lrh_index = 0;
127                 llh->llh_timestamp = ktime_get_real_seconds();
128                 if (uuid)
129                         memcpy(&llh->llh_tgtuuid, uuid,
130                                sizeof(llh->llh_tgtuuid));
131                 llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
132                 /*
133                  * Since update llog header might also call this function,
134                  * let's reset the bitmap to 0 here
135                  */
136                 len = llh->llh_hdr.lrh_len - llh->llh_bitmap_offset;
137                 memset(LLOG_HDR_BITMAP(llh), 0, len - sizeof(llh->llh_tail));
138                 ext2_set_bit(0, LLOG_HDR_BITMAP(llh));
139                 LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len;
140                 LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index;
141                 rc = 0;
142         }
143         return rc;
144 }
145
146 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
147                      int flags, struct obd_uuid *uuid)
148 {
149         int chunk_size = handle->lgh_ctxt->loc_chunk_size;
150         enum llog_flag fmt = flags & LLOG_F_EXT_MASK;
151         struct llog_log_hdr     *llh;
152         int                      rc;
153
154         LASSERT(!handle->lgh_hdr);
155
156         LASSERT(chunk_size >= LLOG_MIN_CHUNK_SIZE);
157         llh = libcfs_kvzalloc(sizeof(*llh), GFP_NOFS);
158         if (!llh)
159                 return -ENOMEM;
160         handle->lgh_hdr = llh;
161         handle->lgh_hdr_size = chunk_size;
162         /* first assign flags to use llog_client_ops */
163         llh->llh_flags = flags;
164         rc = llog_read_header(env, handle, uuid);
165         if (rc == 0) {
166                 if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
167                               flags & LLOG_F_IS_CAT) ||
168                              (llh->llh_flags & LLOG_F_IS_CAT &&
169                               flags & LLOG_F_IS_PLAIN))) {
170                         CERROR("%s: llog type is %s but initializing %s\n",
171                                handle->lgh_ctxt->loc_obd->obd_name,
172                                llh->llh_flags & LLOG_F_IS_CAT ?
173                                "catalog" : "plain",
174                                flags & LLOG_F_IS_CAT ? "catalog" : "plain");
175                         rc = -EINVAL;
176                         goto out;
177                 } else if (llh->llh_flags &
178                            (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
179                         /*
180                          * it is possible to open llog without specifying llog
181                          * type so it is taken from llh_flags
182                          */
183                         flags = llh->llh_flags;
184                 } else {
185                         /* for some reason the llh_flags has no type set */
186                         CERROR("llog type is not specified!\n");
187                         rc = -EINVAL;
188                         goto out;
189                 }
190                 if (unlikely(uuid &&
191                              !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
192                         CERROR("%s: llog uuid mismatch: %s/%s\n",
193                                handle->lgh_ctxt->loc_obd->obd_name,
194                                (char *)uuid->uuid,
195                                (char *)llh->llh_tgtuuid.uuid);
196                         rc = -EEXIST;
197                         goto out;
198                 }
199         }
200         if (flags & LLOG_F_IS_CAT) {
201                 LASSERT(list_empty(&handle->u.chd.chd_head));
202                 INIT_LIST_HEAD(&handle->u.chd.chd_head);
203                 llh->llh_size = sizeof(struct llog_logid_rec);
204                 llh->llh_flags |= LLOG_F_IS_FIXSIZE;
205         } else if (!(flags & LLOG_F_IS_PLAIN)) {
206                 CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
207                        handle->lgh_ctxt->loc_obd->obd_name,
208                        flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
209                 rc = -EINVAL;
210         }
211         llh->llh_flags |= fmt;
212 out:
213         if (rc) {
214                 kvfree(llh);
215                 handle->lgh_hdr = NULL;
216         }
217         return rc;
218 }
219 EXPORT_SYMBOL(llog_init_handle);
220
221 static int llog_process_thread(void *arg)
222 {
223         struct llog_process_info        *lpi = arg;
224         struct llog_handle              *loghandle = lpi->lpi_loghandle;
225         struct llog_log_hdr             *llh = loghandle->lgh_hdr;
226         struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
227         char                            *buf;
228         u64 cur_offset, tmp_offset;
229         int chunk_size;
230         int                              rc = 0, index = 1, last_index;
231         int                              saved_index = 0;
232         int                              last_called_index = 0;
233
234         if (!llh)
235                 return -EINVAL;
236
237         cur_offset = llh->llh_hdr.lrh_len;
238         chunk_size = llh->llh_hdr.lrh_len;
239         /* expect chunk_size to be power of two */
240         LASSERT(is_power_of_2(chunk_size));
241
242         buf = libcfs_kvzalloc(chunk_size, GFP_NOFS);
243         if (!buf) {
244                 lpi->lpi_rc = -ENOMEM;
245                 return 0;
246         }
247
248         if (cd) {
249                 last_called_index = cd->lpcd_first_idx;
250                 index = cd->lpcd_first_idx + 1;
251         }
252         if (cd && cd->lpcd_last_idx)
253                 last_index = cd->lpcd_last_idx;
254         else
255                 last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
256
257         while (rc == 0) {
258                 unsigned int buf_offset = 0;
259                 struct llog_rec_hdr *rec;
260                 bool partial_chunk;
261                 off_t chunk_offset;
262
263                 /* skip records not set in bitmap */
264                 while (index <= last_index &&
265                        !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
266                         ++index;
267
268                 if (index > last_index)
269                         break;
270
271                 CDEBUG(D_OTHER, "index: %d last_index %d\n",
272                        index, last_index);
273 repeat:
274                 /* get the buf with our target record; avoid old garbage */
275                 memset(buf, 0, chunk_size);
276                 rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
277                                      index, &cur_offset, buf, chunk_size);
278                 if (rc)
279                         goto out;
280
281                 /*
282                  * NB: after llog_next_block() call the cur_offset is the
283                  * offset of the next block after read one.
284                  * The absolute offset of the current chunk is calculated
285                  * from cur_offset value and stored in chunk_offset variable.
286                  */
287                 tmp_offset = cur_offset;
288                 if (do_div(tmp_offset, chunk_size)) {
289                         partial_chunk = true;
290                         chunk_offset = cur_offset & ~(chunk_size - 1);
291                 } else {
292                         partial_chunk = false;
293                         chunk_offset = cur_offset - chunk_size;
294                 }
295
296                 /* NB: when rec->lrh_len is accessed it is already swabbed
297                  * since it is used at the "end" of the loop and the rec
298                  * swabbing is done at the beginning of the loop.
299                  */
300                 for (rec = (struct llog_rec_hdr *)(buf + buf_offset);
301                      (char *)rec < buf + chunk_size;
302                      rec = llog_rec_hdr_next(rec)) {
303                         CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
304                                rec, rec->lrh_type);
305
306                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
307                                 lustre_swab_llog_rec(rec);
308
309                         CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
310                                rec->lrh_type, rec->lrh_index);
311
312                         /*
313                          * for partial chunk the end of it is zeroed, check
314                          * for index 0 to distinguish it.
315                          */
316                         if (partial_chunk && !rec->lrh_index) {
317                                 /* concurrent llog_add() might add new records
318                                  * while llog_processing, check this is not
319                                  * the case and re-read the current chunk
320                                  * otherwise.
321                                  */
322                                 if (index > loghandle->lgh_last_idx) {
323                                         rc = 0;
324                                         goto out;
325                                 }
326                                 CDEBUG(D_OTHER, "Re-read last llog buffer for new records, index %u, last %u\n",
327                                        index, loghandle->lgh_last_idx);
328                                 /* save offset inside buffer for the re-read */
329                                 buf_offset = (char *)rec - (char *)buf;
330                                 cur_offset = chunk_offset;
331                                 goto repeat;
332                         }
333
334                         if (!rec->lrh_len || rec->lrh_len > chunk_size) {
335                                 CWARN("invalid length %d in llog record for index %d/%d\n",
336                                       rec->lrh_len,
337                                       rec->lrh_index, index);
338                                 rc = -EINVAL;
339                                 goto out;
340                         }
341
342                         if (rec->lrh_index < index) {
343                                 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
344                                        rec->lrh_index);
345                                 continue;
346                         }
347
348                         if (rec->lrh_index != index) {
349                                 CERROR("%s: Invalid record: index %u but expected %u\n",
350                                        loghandle->lgh_ctxt->loc_obd->obd_name,
351                                        rec->lrh_index, index);
352                                 rc = -ERANGE;
353                                 goto out;
354                         }
355
356                         CDEBUG(D_OTHER,
357                                "lrh_index: %d lrh_len: %d (%d remains)\n",
358                                rec->lrh_index, rec->lrh_len,
359                                (int)(buf + chunk_size - (char *)rec));
360
361                         loghandle->lgh_cur_idx = rec->lrh_index;
362                         loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
363                                                     chunk_offset;
364
365                         /* if set, process the callback on this record */
366                         if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
367                                 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
368                                                  lpi->lpi_cbdata);
369                                 last_called_index = index;
370                                 if (rc)
371                                         goto out;
372                         }
373
374                         /* exit if the last index is reached */
375                         if (index >= last_index) {
376                                 rc = 0;
377                                 goto out;
378                         }
379                         index++;
380                 }
381         }
382
383 out:
384         if (cd)
385                 cd->lpcd_last_idx = last_called_index;
386
387         kfree(buf);
388         lpi->lpi_rc = rc;
389         return 0;
390 }
391
392 static int llog_process_thread_daemonize(void *arg)
393 {
394         struct llog_process_info        *lpi = arg;
395         struct lu_env                    env;
396         int                              rc;
397
398         unshare_fs_struct();
399
400         /* client env has no keys, tags is just 0 */
401         rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
402         if (rc)
403                 goto out;
404         lpi->lpi_env = &env;
405
406         rc = llog_process_thread(arg);
407
408         lu_env_fini(&env);
409 out:
410         complete(&lpi->lpi_completion);
411         return rc;
412 }
413
414 int llog_process_or_fork(const struct lu_env *env,
415                          struct llog_handle *loghandle,
416                          llog_cb_t cb, void *data, void *catdata, bool fork)
417 {
418         struct llog_process_info *lpi;
419         int                   rc;
420
421         lpi = kzalloc(sizeof(*lpi), GFP_NOFS);
422         if (!lpi)
423                 return -ENOMEM;
424         lpi->lpi_loghandle = loghandle;
425         lpi->lpi_cb     = cb;
426         lpi->lpi_cbdata    = data;
427         lpi->lpi_catdata   = catdata;
428
429         if (fork) {
430                 struct task_struct *task;
431
432                 /* The new thread can't use parent env,
433                  * init the new one in llog_process_thread_daemonize.
434                  */
435                 lpi->lpi_env = NULL;
436                 init_completion(&lpi->lpi_completion);
437                 task = kthread_run(llog_process_thread_daemonize, lpi,
438                                    "llog_process_thread");
439                 if (IS_ERR(task)) {
440                         rc = PTR_ERR(task);
441                         CERROR("%s: cannot start thread: rc = %d\n",
442                                loghandle->lgh_ctxt->loc_obd->obd_name, rc);
443                         goto out_lpi;
444                 }
445                 wait_for_completion(&lpi->lpi_completion);
446         } else {
447                 lpi->lpi_env = env;
448                 llog_process_thread(lpi);
449         }
450         rc = lpi->lpi_rc;
451 out_lpi:
452         kfree(lpi);
453         return rc;
454 }
455 EXPORT_SYMBOL(llog_process_or_fork);
456
457 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
458                  llog_cb_t cb, void *data, void *catdata)
459 {
460         return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
461 }
462 EXPORT_SYMBOL(llog_process);
463
464 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
465               struct llog_handle **lgh, struct llog_logid *logid,
466               char *name, enum llog_open_param open_param)
467 {
468         int      raised;
469         int      rc;
470
471         LASSERT(ctxt);
472         LASSERT(ctxt->loc_logops);
473
474         if (!ctxt->loc_logops->lop_open) {
475                 *lgh = NULL;
476                 return -EOPNOTSUPP;
477         }
478
479         *lgh = llog_alloc_handle();
480         if (!*lgh)
481                 return -ENOMEM;
482         (*lgh)->lgh_ctxt = ctxt;
483         (*lgh)->lgh_logops = ctxt->loc_logops;
484
485         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
486         if (!raised)
487                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
488         rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
489         if (!raised)
490                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
491         if (rc) {
492                 llog_free_handle(*lgh);
493                 *lgh = NULL;
494         }
495         return rc;
496 }
497 EXPORT_SYMBOL(llog_open);
498
499 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
500 {
501         struct llog_operations  *lop;
502         int                      rc;
503
504         rc = llog_handle2ops(loghandle, &lop);
505         if (rc)
506                 goto out;
507         if (!lop->lop_close) {
508                 rc = -EOPNOTSUPP;
509                 goto out;
510         }
511         rc = lop->lop_close(env, loghandle);
512 out:
513         llog_handle_put(loghandle);
514         return rc;
515 }
516 EXPORT_SYMBOL(llog_close);