4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/llog.c
34 * OST<->MDS recovery logging infrastructure.
35 * Invariants in implementation:
36 * - we do not share logs among different OST<->MDS connections, so that
37 * if an OST or MDS fails it need only look at log(s) relevant to itself
39 * Author: Andreas Dilger <adilger@clusterfs.com>
40 * Author: Alex Zhuravlev <bzzz@whamcloud.com>
41 * Author: Mikhail Pershin <tappro@whamcloud.com>
44 #define DEBUG_SUBSYSTEM S_LOG
46 #include <llog_swab.h>
47 #include <lustre_log.h>
48 #include <obd_class.h>
49 #include "llog_internal.h"
52 * Allocate a new log or catalog handle
53 * Used inside llog_open().
55 static struct llog_handle *llog_alloc_handle(void)
57 struct llog_handle *loghandle;
59 loghandle = kzalloc(sizeof(*loghandle), GFP_NOFS);
63 init_rwsem(&loghandle->lgh_lock);
64 spin_lock_init(&loghandle->lgh_hdr_lock);
65 INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
66 atomic_set(&loghandle->lgh_refcount, 1);
72 * Free llog handle and header data if exists. Used in llog_close() only
74 static void llog_free_handle(struct llog_handle *loghandle)
76 /* failed llog_init_handle */
77 if (!loghandle->lgh_hdr)
80 if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
81 LASSERT(list_empty(&loghandle->u.phd.phd_entry));
82 else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
83 LASSERT(list_empty(&loghandle->u.chd.chd_head));
84 kvfree(loghandle->lgh_hdr);
89 void llog_handle_get(struct llog_handle *loghandle)
91 atomic_inc(&loghandle->lgh_refcount);
94 void llog_handle_put(struct llog_handle *loghandle)
96 LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
97 if (atomic_dec_and_test(&loghandle->lgh_refcount))
98 llog_free_handle(loghandle);
101 static int llog_read_header(const struct lu_env *env,
102 struct llog_handle *handle,
103 struct obd_uuid *uuid)
105 struct llog_operations *lop;
108 rc = llog_handle2ops(handle, &lop);
112 if (!lop->lop_read_header)
115 rc = lop->lop_read_header(env, handle);
116 if (rc == LLOG_EEMPTY) {
117 struct llog_log_hdr *llh = handle->lgh_hdr;
120 /* lrh_len should be initialized in llog_init_handle */
121 handle->lgh_last_idx = 0; /* header is record with index 0 */
122 llh->llh_count = 1; /* for the header record */
123 llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
124 LASSERT(handle->lgh_ctxt->loc_chunk_size >= LLOG_MIN_CHUNK_SIZE);
125 llh->llh_hdr.lrh_len = handle->lgh_ctxt->loc_chunk_size;
126 llh->llh_hdr.lrh_index = 0;
127 llh->llh_timestamp = ktime_get_real_seconds();
129 memcpy(&llh->llh_tgtuuid, uuid,
130 sizeof(llh->llh_tgtuuid));
131 llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
133 * Since update llog header might also call this function,
134 * let's reset the bitmap to 0 here
136 len = llh->llh_hdr.lrh_len - llh->llh_bitmap_offset;
137 memset(LLOG_HDR_BITMAP(llh), 0, len - sizeof(llh->llh_tail));
138 ext2_set_bit(0, LLOG_HDR_BITMAP(llh));
139 LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len;
140 LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index;
146 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
147 int flags, struct obd_uuid *uuid)
149 int chunk_size = handle->lgh_ctxt->loc_chunk_size;
150 enum llog_flag fmt = flags & LLOG_F_EXT_MASK;
151 struct llog_log_hdr *llh;
154 LASSERT(!handle->lgh_hdr);
156 LASSERT(chunk_size >= LLOG_MIN_CHUNK_SIZE);
157 llh = libcfs_kvzalloc(sizeof(*llh), GFP_NOFS);
160 handle->lgh_hdr = llh;
161 handle->lgh_hdr_size = chunk_size;
162 /* first assign flags to use llog_client_ops */
163 llh->llh_flags = flags;
164 rc = llog_read_header(env, handle, uuid);
166 if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
167 flags & LLOG_F_IS_CAT) ||
168 (llh->llh_flags & LLOG_F_IS_CAT &&
169 flags & LLOG_F_IS_PLAIN))) {
170 CERROR("%s: llog type is %s but initializing %s\n",
171 handle->lgh_ctxt->loc_obd->obd_name,
172 llh->llh_flags & LLOG_F_IS_CAT ?
174 flags & LLOG_F_IS_CAT ? "catalog" : "plain");
177 } else if (llh->llh_flags &
178 (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
180 * it is possible to open llog without specifying llog
181 * type so it is taken from llh_flags
183 flags = llh->llh_flags;
185 /* for some reason the llh_flags has no type set */
186 CERROR("llog type is not specified!\n");
191 !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
192 CERROR("%s: llog uuid mismatch: %s/%s\n",
193 handle->lgh_ctxt->loc_obd->obd_name,
195 (char *)llh->llh_tgtuuid.uuid);
200 if (flags & LLOG_F_IS_CAT) {
201 LASSERT(list_empty(&handle->u.chd.chd_head));
202 INIT_LIST_HEAD(&handle->u.chd.chd_head);
203 llh->llh_size = sizeof(struct llog_logid_rec);
204 llh->llh_flags |= LLOG_F_IS_FIXSIZE;
205 } else if (!(flags & LLOG_F_IS_PLAIN)) {
206 CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
207 handle->lgh_ctxt->loc_obd->obd_name,
208 flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
211 llh->llh_flags |= fmt;
215 handle->lgh_hdr = NULL;
219 EXPORT_SYMBOL(llog_init_handle);
221 static int llog_process_thread(void *arg)
223 struct llog_process_info *lpi = arg;
224 struct llog_handle *loghandle = lpi->lpi_loghandle;
225 struct llog_log_hdr *llh = loghandle->lgh_hdr;
226 struct llog_process_cat_data *cd = lpi->lpi_catdata;
228 u64 cur_offset, tmp_offset;
230 int rc = 0, index = 1, last_index;
232 int last_called_index = 0;
237 cur_offset = llh->llh_hdr.lrh_len;
238 chunk_size = llh->llh_hdr.lrh_len;
239 /* expect chunk_size to be power of two */
240 LASSERT(is_power_of_2(chunk_size));
242 buf = libcfs_kvzalloc(chunk_size, GFP_NOFS);
244 lpi->lpi_rc = -ENOMEM;
249 last_called_index = cd->lpcd_first_idx;
250 index = cd->lpcd_first_idx + 1;
252 if (cd && cd->lpcd_last_idx)
253 last_index = cd->lpcd_last_idx;
255 last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
258 unsigned int buf_offset = 0;
259 struct llog_rec_hdr *rec;
263 /* skip records not set in bitmap */
264 while (index <= last_index &&
265 !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
268 if (index > last_index)
271 CDEBUG(D_OTHER, "index: %d last_index %d\n",
274 /* get the buf with our target record; avoid old garbage */
275 memset(buf, 0, chunk_size);
276 rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
277 index, &cur_offset, buf, chunk_size);
282 * NB: after llog_next_block() call the cur_offset is the
283 * offset of the next block after read one.
284 * The absolute offset of the current chunk is calculated
285 * from cur_offset value and stored in chunk_offset variable.
287 tmp_offset = cur_offset;
288 if (do_div(tmp_offset, chunk_size)) {
289 partial_chunk = true;
290 chunk_offset = cur_offset & ~(chunk_size - 1);
292 partial_chunk = false;
293 chunk_offset = cur_offset - chunk_size;
296 /* NB: when rec->lrh_len is accessed it is already swabbed
297 * since it is used at the "end" of the loop and the rec
298 * swabbing is done at the beginning of the loop.
300 for (rec = (struct llog_rec_hdr *)(buf + buf_offset);
301 (char *)rec < buf + chunk_size;
302 rec = llog_rec_hdr_next(rec)) {
303 CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
306 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
307 lustre_swab_llog_rec(rec);
309 CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
310 rec->lrh_type, rec->lrh_index);
313 * for partial chunk the end of it is zeroed, check
314 * for index 0 to distinguish it.
316 if (partial_chunk && !rec->lrh_index) {
317 /* concurrent llog_add() might add new records
318 * while llog_processing, check this is not
319 * the case and re-read the current chunk
322 if (index > loghandle->lgh_last_idx) {
326 CDEBUG(D_OTHER, "Re-read last llog buffer for new records, index %u, last %u\n",
327 index, loghandle->lgh_last_idx);
328 /* save offset inside buffer for the re-read */
329 buf_offset = (char *)rec - (char *)buf;
330 cur_offset = chunk_offset;
334 if (!rec->lrh_len || rec->lrh_len > chunk_size) {
335 CWARN("invalid length %d in llog record for index %d/%d\n",
337 rec->lrh_index, index);
342 if (rec->lrh_index < index) {
343 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
348 if (rec->lrh_index != index) {
349 CERROR("%s: Invalid record: index %u but expected %u\n",
350 loghandle->lgh_ctxt->loc_obd->obd_name,
351 rec->lrh_index, index);
357 "lrh_index: %d lrh_len: %d (%d remains)\n",
358 rec->lrh_index, rec->lrh_len,
359 (int)(buf + chunk_size - (char *)rec));
361 loghandle->lgh_cur_idx = rec->lrh_index;
362 loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
365 /* if set, process the callback on this record */
366 if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
367 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
369 last_called_index = index;
374 /* exit if the last index is reached */
375 if (index >= last_index) {
385 cd->lpcd_last_idx = last_called_index;
392 static int llog_process_thread_daemonize(void *arg)
394 struct llog_process_info *lpi = arg;
400 /* client env has no keys, tags is just 0 */
401 rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
406 rc = llog_process_thread(arg);
410 complete(&lpi->lpi_completion);
414 int llog_process_or_fork(const struct lu_env *env,
415 struct llog_handle *loghandle,
416 llog_cb_t cb, void *data, void *catdata, bool fork)
418 struct llog_process_info *lpi;
421 lpi = kzalloc(sizeof(*lpi), GFP_NOFS);
424 lpi->lpi_loghandle = loghandle;
426 lpi->lpi_cbdata = data;
427 lpi->lpi_catdata = catdata;
430 struct task_struct *task;
432 /* The new thread can't use parent env,
433 * init the new one in llog_process_thread_daemonize.
436 init_completion(&lpi->lpi_completion);
437 task = kthread_run(llog_process_thread_daemonize, lpi,
438 "llog_process_thread");
441 CERROR("%s: cannot start thread: rc = %d\n",
442 loghandle->lgh_ctxt->loc_obd->obd_name, rc);
445 wait_for_completion(&lpi->lpi_completion);
448 llog_process_thread(lpi);
455 EXPORT_SYMBOL(llog_process_or_fork);
457 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
458 llog_cb_t cb, void *data, void *catdata)
460 return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
462 EXPORT_SYMBOL(llog_process);
464 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
465 struct llog_handle **lgh, struct llog_logid *logid,
466 char *name, enum llog_open_param open_param)
472 LASSERT(ctxt->loc_logops);
474 if (!ctxt->loc_logops->lop_open) {
479 *lgh = llog_alloc_handle();
482 (*lgh)->lgh_ctxt = ctxt;
483 (*lgh)->lgh_logops = ctxt->loc_logops;
485 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
487 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
488 rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
490 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
492 llog_free_handle(*lgh);
497 EXPORT_SYMBOL(llog_open);
499 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
501 struct llog_operations *lop;
504 rc = llog_handle2ops(loghandle, &lop);
507 if (!lop->lop_close) {
511 rc = lop->lop_close(env, loghandle);
513 llog_handle_put(loghandle);
516 EXPORT_SYMBOL(llog_close);