Linux-libre 3.11-gnu
[librecmc/linux-libre.git] / drivers / staging / lustre / lustre / ptlrpc / recover.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/recover.c
37  *
38  * Author: Mike Shaver <shaver@clusterfs.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 # include <linux/libcfs/libcfs.h>
43
44 #include <obd_support.h>
45 #include <lustre_ha.h>
46 #include <lustre_net.h>
47 #include <lustre_import.h>
48 #include <lustre_export.h>
49 #include <obd.h>
50 #include <obd_ost.h>
51 #include <obd_class.h>
52 #include <obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
53 #include <linux/list.h>
54
55 #include "ptlrpc_internal.h"
56
57 /**
58  * Start recovery on disconnected import.
59  * This is done by just attempting a connect
60  */
61 void ptlrpc_initiate_recovery(struct obd_import *imp)
62 {
63         ENTRY;
64
65         CDEBUG(D_HA, "%s: starting recovery\n", obd2cli_tgt(imp->imp_obd));
66         ptlrpc_connect_import(imp);
67
68         EXIT;
69 }
70
71 /**
72  * Identify what request from replay list needs to be replayed next
73  * (based on what we have already replayed) and send it to server.
74  */
75 int ptlrpc_replay_next(struct obd_import *imp, int *inflight)
76 {
77         int rc = 0;
78         struct list_head *tmp, *pos;
79         struct ptlrpc_request *req = NULL;
80         __u64 last_transno;
81         ENTRY;
82
83         *inflight = 0;
84
85         /* It might have committed some after we last spoke, so make sure we
86          * get rid of them now.
87          */
88         spin_lock(&imp->imp_lock);
89         imp->imp_last_transno_checked = 0;
90         ptlrpc_free_committed(imp);
91         last_transno = imp->imp_last_replay_transno;
92         spin_unlock(&imp->imp_lock);
93
94         CDEBUG(D_HA, "import %p from %s committed "LPU64" last "LPU64"\n",
95                imp, obd2cli_tgt(imp->imp_obd),
96                imp->imp_peer_committed_transno, last_transno);
97
98         /* Do I need to hold a lock across this iteration?  We shouldn't be
99          * racing with any additions to the list, because we're in recovery
100          * and are therefore not processing additional requests to add.  Calls
101          * to ptlrpc_free_committed might commit requests, but nothing "newer"
102          * than the one we're replaying (it can't be committed until it's
103          * replayed, and we're doing that here).  l_f_e_safe protects against
104          * problems with the current request being committed, in the unlikely
105          * event of that race.  So, in conclusion, I think that it's safe to
106          * perform this list-walk without the imp_lock held.
107          *
108          * But, the {mdc,osc}_replay_open callbacks both iterate
109          * request lists, and have comments saying they assume the
110          * imp_lock is being held by ptlrpc_replay, but it's not. it's
111          * just a little race...
112          */
113         list_for_each_safe(tmp, pos, &imp->imp_replay_list) {
114                 req = list_entry(tmp, struct ptlrpc_request,
115                                      rq_replay_list);
116
117                 /* If need to resend the last sent transno (because a
118                    reconnect has occurred), then stop on the matching
119                    req and send it again. If, however, the last sent
120                    transno has been committed then we continue replay
121                    from the next request. */
122                 if (req->rq_transno > last_transno) {
123                         if (imp->imp_resend_replay)
124                                 lustre_msg_add_flags(req->rq_reqmsg,
125                                                      MSG_RESENT);
126                         break;
127                 }
128                 req = NULL;
129         }
130
131         spin_lock(&imp->imp_lock);
132         imp->imp_resend_replay = 0;
133         spin_unlock(&imp->imp_lock);
134
135         if (req != NULL) {
136                 rc = ptlrpc_replay_req(req);
137                 if (rc) {
138                         CERROR("recovery replay error %d for req "
139                                LPU64"\n", rc, req->rq_xid);
140                         RETURN(rc);
141                 }
142                 *inflight = 1;
143         }
144         RETURN(rc);
145 }
146
147 /**
148  * Schedule resending of request on sending_list. This is done after
149  * we completed replaying of requests and locks.
150  */
151 int ptlrpc_resend(struct obd_import *imp)
152 {
153         struct ptlrpc_request *req, *next;
154
155         ENTRY;
156
157         /* As long as we're in recovery, nothing should be added to the sending
158          * list, so we don't need to hold the lock during this iteration and
159          * resend process.
160          */
161         /* Well... what if lctl recover is called twice at the same time?
162          */
163         spin_lock(&imp->imp_lock);
164         if (imp->imp_state != LUSTRE_IMP_RECOVER) {
165                 spin_unlock(&imp->imp_lock);
166                 RETURN(-1);
167         }
168
169         list_for_each_entry_safe(req, next, &imp->imp_sending_list,
170                                      rq_list) {
171                 LASSERTF((long)req > PAGE_CACHE_SIZE && req != LP_POISON,
172                          "req %p bad\n", req);
173                 LASSERTF(req->rq_type != LI_POISON, "req %p freed\n", req);
174                 if (!ptlrpc_no_resend(req))
175                         ptlrpc_resend_req(req);
176         }
177         spin_unlock(&imp->imp_lock);
178
179         RETURN(0);
180 }
181 EXPORT_SYMBOL(ptlrpc_resend);
182
183 /**
184  * Go through all requests in delayed list and wake their threads
185  * for resending
186  */
187 void ptlrpc_wake_delayed(struct obd_import *imp)
188 {
189         struct list_head *tmp, *pos;
190         struct ptlrpc_request *req;
191
192         spin_lock(&imp->imp_lock);
193         list_for_each_safe(tmp, pos, &imp->imp_delayed_list) {
194                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
195
196                 DEBUG_REQ(D_HA, req, "waking (set %p):", req->rq_set);
197                 ptlrpc_client_wake_req(req);
198         }
199         spin_unlock(&imp->imp_lock);
200 }
201 EXPORT_SYMBOL(ptlrpc_wake_delayed);
202
203 void ptlrpc_request_handle_notconn(struct ptlrpc_request *failed_req)
204 {
205         struct obd_import *imp = failed_req->rq_import;
206         ENTRY;
207
208         CDEBUG(D_HA, "import %s of %s@%s abruptly disconnected: reconnecting\n",
209                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
210                imp->imp_connection->c_remote_uuid.uuid);
211
212         if (ptlrpc_set_import_discon(imp,
213                               lustre_msg_get_conn_cnt(failed_req->rq_reqmsg))) {
214                 if (!imp->imp_replayable) {
215                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
216                                "auto-deactivating\n",
217                                obd2cli_tgt(imp->imp_obd),
218                                imp->imp_connection->c_remote_uuid.uuid,
219                                imp->imp_obd->obd_name);
220                         ptlrpc_deactivate_import(imp);
221                 }
222                 /* to control recovery via lctl {disable|enable}_recovery */
223                 if (imp->imp_deactive == 0)
224                         ptlrpc_connect_import(imp);
225         }
226
227         /* Wait for recovery to complete and resend. If evicted, then
228            this request will be errored out later.*/
229         spin_lock(&failed_req->rq_lock);
230         if (!failed_req->rq_no_resend)
231                 failed_req->rq_resend = 1;
232         spin_unlock(&failed_req->rq_lock);
233
234         EXIT;
235 }
236
237 /**
238  * Administratively active/deactive a client.
239  * This should only be called by the ioctl interface, currently
240  *  - the lctl deactivate and activate commands
241  *  - echo 0/1 >> /proc/osc/XXX/active
242  *  - client umount -f (ll_umount_begin)
243  */
244 int ptlrpc_set_import_active(struct obd_import *imp, int active)
245 {
246         struct obd_device *obd = imp->imp_obd;
247         int rc = 0;
248
249         ENTRY;
250         LASSERT(obd);
251
252         /* When deactivating, mark import invalid, and abort in-flight
253          * requests. */
254         if (!active) {
255                 LCONSOLE_WARN("setting import %s INACTIVE by administrator "
256                               "request\n", obd2cli_tgt(imp->imp_obd));
257
258                 /* set before invalidate to avoid messages about imp_inval
259                  * set without imp_deactive in ptlrpc_import_delay_req */
260                 spin_lock(&imp->imp_lock);
261                 imp->imp_deactive = 1;
262                 spin_unlock(&imp->imp_lock);
263
264                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DEACTIVATE);
265
266                 ptlrpc_invalidate_import(imp);
267         }
268
269         /* When activating, mark import valid, and attempt recovery */
270         if (active) {
271                 CDEBUG(D_HA, "setting import %s VALID\n",
272                        obd2cli_tgt(imp->imp_obd));
273
274                 spin_lock(&imp->imp_lock);
275                 imp->imp_deactive = 0;
276                 spin_unlock(&imp->imp_lock);
277                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_ACTIVATE);
278
279                 rc = ptlrpc_recover_import(imp, NULL, 0);
280         }
281
282         RETURN(rc);
283 }
284 EXPORT_SYMBOL(ptlrpc_set_import_active);
285
286 /* Attempt to reconnect an import */
287 int ptlrpc_recover_import(struct obd_import *imp, char *new_uuid, int async)
288 {
289         int rc = 0;
290         ENTRY;
291
292         spin_lock(&imp->imp_lock);
293         if (imp->imp_state == LUSTRE_IMP_NEW || imp->imp_deactive ||
294             atomic_read(&imp->imp_inval_count))
295                 rc = -EINVAL;
296         spin_unlock(&imp->imp_lock);
297         if (rc)
298                 GOTO(out, rc);
299
300         /* force import to be disconnected. */
301         ptlrpc_set_import_discon(imp, 0);
302
303         if (new_uuid) {
304                 struct obd_uuid uuid;
305
306                 /* intruct import to use new uuid */
307                 obd_str2uuid(&uuid, new_uuid);
308                 rc = import_set_conn_priority(imp, &uuid);
309                 if (rc)
310                         GOTO(out, rc);
311         }
312
313         /* Check if reconnect is already in progress */
314         spin_lock(&imp->imp_lock);
315         if (imp->imp_state != LUSTRE_IMP_DISCON) {
316                 imp->imp_force_verify = 1;
317                 rc = -EALREADY;
318         }
319         spin_unlock(&imp->imp_lock);
320         if (rc)
321                 GOTO(out, rc);
322
323         rc = ptlrpc_connect_import(imp);
324         if (rc)
325                 GOTO(out, rc);
326
327         if (!async) {
328                 struct l_wait_info lwi;
329                 int secs = cfs_time_seconds(obd_timeout);
330
331                 CDEBUG(D_HA, "%s: recovery started, waiting %u seconds\n",
332                        obd2cli_tgt(imp->imp_obd), secs);
333
334                 lwi = LWI_TIMEOUT(secs, NULL, NULL);
335                 rc = l_wait_event(imp->imp_recovery_waitq,
336                                   !ptlrpc_import_in_recovery(imp), &lwi);
337                 CDEBUG(D_HA, "%s: recovery finished\n",
338                        obd2cli_tgt(imp->imp_obd));
339         }
340         EXIT;
341
342 out:
343         return rc;
344 }
345 EXPORT_SYMBOL(ptlrpc_recover_import);
346
347 int ptlrpc_import_in_recovery(struct obd_import *imp)
348 {
349         int in_recovery = 1;
350         spin_lock(&imp->imp_lock);
351         if (imp->imp_state == LUSTRE_IMP_FULL ||
352             imp->imp_state == LUSTRE_IMP_CLOSED ||
353             imp->imp_state == LUSTRE_IMP_DISCON)
354                 in_recovery = 0;
355         spin_unlock(&imp->imp_lock);
356         return in_recovery;
357 }