Linux-libre 4.9.135-gnu
[librecmc/linux-libre.git] / drivers / staging / lustre / lustre / ptlrpc / client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 /** Implementation of client-side PortalRPC interfaces */
34
35 #define DEBUG_SUBSYSTEM S_RPC
36
37 #include "../include/obd_support.h"
38 #include "../include/obd_class.h"
39 #include "../include/lustre_lib.h"
40 #include "../include/lustre_ha.h"
41 #include "../include/lustre_import.h"
42 #include "../include/lustre_req_layout.h"
43
44 #include "ptlrpc_internal.h"
45
46 static int ptlrpc_send_new_req(struct ptlrpc_request *req);
47 static int ptlrpcd_check_work(struct ptlrpc_request *req);
48 static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async);
49
50 /**
51  * Initialize passed in client structure \a cl.
52  */
53 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
54                         struct ptlrpc_client *cl)
55 {
56         cl->cli_request_portal = req_portal;
57         cl->cli_reply_portal = rep_portal;
58         cl->cli_name = name;
59 }
60 EXPORT_SYMBOL(ptlrpc_init_client);
61
62 /**
63  * Return PortalRPC connection for remote uud \a uuid
64  */
65 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
66 {
67         struct ptlrpc_connection *c;
68         lnet_nid_t self;
69         lnet_process_id_t peer;
70         int err;
71
72         /*
73          * ptlrpc_uuid_to_peer() initializes its 2nd parameter
74          * before accessing its values.
75          * coverity[uninit_use_in_call]
76          */
77         err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
78         if (err != 0) {
79                 CNETERR("cannot find peer %s!\n", uuid->uuid);
80                 return NULL;
81         }
82
83         c = ptlrpc_connection_get(peer, self, uuid);
84         if (c) {
85                 memcpy(c->c_remote_uuid.uuid,
86                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
87         }
88
89         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
90
91         return c;
92 }
93
94 /**
95  * Allocate and initialize new bulk descriptor on the sender.
96  * Returns pointer to the descriptor or NULL on error.
97  */
98 struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
99                                          unsigned type, unsigned portal)
100 {
101         struct ptlrpc_bulk_desc *desc;
102         int i;
103
104         desc = kzalloc(offsetof(struct ptlrpc_bulk_desc, bd_iov[npages]),
105                        GFP_NOFS);
106         if (!desc)
107                 return NULL;
108
109         spin_lock_init(&desc->bd_lock);
110         init_waitqueue_head(&desc->bd_waitq);
111         desc->bd_max_iov = npages;
112         desc->bd_iov_count = 0;
113         desc->bd_portal = portal;
114         desc->bd_type = type;
115         desc->bd_md_count = 0;
116         LASSERT(max_brw > 0);
117         desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
118         /*
119          * PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this
120          * node. Negotiated ocd_brw_size will always be <= this number.
121          */
122         for (i = 0; i < PTLRPC_BULK_OPS_COUNT; i++)
123                 LNetInvalidateHandle(&desc->bd_mds[i]);
124
125         return desc;
126 }
127
128 /**
129  * Prepare bulk descriptor for specified outgoing request \a req that
130  * can fit \a npages * pages. \a type is bulk type. \a portal is where
131  * the bulk to be sent. Used on client-side.
132  * Returns pointer to newly allocated initialized bulk descriptor or NULL on
133  * error.
134  */
135 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
136                                               unsigned npages, unsigned max_brw,
137                                               unsigned type, unsigned portal)
138 {
139         struct obd_import *imp = req->rq_import;
140         struct ptlrpc_bulk_desc *desc;
141
142         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
143         desc = ptlrpc_new_bulk(npages, max_brw, type, portal);
144         if (!desc)
145                 return NULL;
146
147         desc->bd_import_generation = req->rq_import_generation;
148         desc->bd_import = class_import_get(imp);
149         desc->bd_req = req;
150
151         desc->bd_cbid.cbid_fn = client_bulk_callback;
152         desc->bd_cbid.cbid_arg = desc;
153
154         /* This makes req own desc, and free it when she frees herself */
155         req->rq_bulk = desc;
156
157         return desc;
158 }
159 EXPORT_SYMBOL(ptlrpc_prep_bulk_imp);
160
161 /**
162  * Add a page \a page to the bulk descriptor \a desc.
163  * Data to transfer in the page starts at offset \a pageoffset and
164  * amount of data to transfer from the page is \a len
165  */
166 void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
167                              struct page *page, int pageoffset, int len, int pin)
168 {
169         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
170         LASSERT(page);
171         LASSERT(pageoffset >= 0);
172         LASSERT(len > 0);
173         LASSERT(pageoffset + len <= PAGE_SIZE);
174
175         desc->bd_nob += len;
176
177         if (pin)
178                 get_page(page);
179
180         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
181 }
182 EXPORT_SYMBOL(__ptlrpc_prep_bulk_page);
183
184 /**
185  * Uninitialize and free bulk descriptor \a desc.
186  * Works on bulk descriptors both from server and client side.
187  */
188 void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc, int unpin)
189 {
190         int i;
191
192         LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
193         LASSERT(desc->bd_md_count == 0);         /* network hands off */
194         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
195
196         sptlrpc_enc_pool_put_pages(desc);
197
198         if (desc->bd_export)
199                 class_export_put(desc->bd_export);
200         else
201                 class_import_put(desc->bd_import);
202
203         if (unpin) {
204                 for (i = 0; i < desc->bd_iov_count; i++)
205                         put_page(desc->bd_iov[i].bv_page);
206         }
207
208         kfree(desc);
209 }
210 EXPORT_SYMBOL(__ptlrpc_free_bulk);
211
212 /**
213  * Set server timelimit for this req, i.e. how long are we willing to wait
214  * for reply before timing out this request.
215  */
216 void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
217 {
218         __u32 serv_est;
219         int idx;
220         struct imp_at *at;
221
222         LASSERT(req->rq_import);
223
224         if (AT_OFF) {
225                 /*
226                  * non-AT settings
227                  *
228                  * \a imp_server_timeout means this is reverse import and
229                  * we send (currently only) ASTs to the client and cannot afford
230                  * to wait too long for the reply, otherwise the other client
231                  * (because of which we are sending this request) would
232                  * timeout waiting for us
233                  */
234                 req->rq_timeout = req->rq_import->imp_server_timeout ?
235                                   obd_timeout / 2 : obd_timeout;
236         } else {
237                 at = &req->rq_import->imp_at;
238                 idx = import_at_get_index(req->rq_import,
239                                           req->rq_request_portal);
240                 serv_est = at_get(&at->iat_service_estimate[idx]);
241                 req->rq_timeout = at_est2timeout(serv_est);
242         }
243         /*
244          * We could get even fancier here, using history to predict increased
245          * loading...
246          */
247
248         /*
249          * Let the server know what this RPC timeout is by putting it in the
250          * reqmsg
251          */
252         lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
253 }
254 EXPORT_SYMBOL(ptlrpc_at_set_req_timeout);
255
256 /* Adjust max service estimate based on server value */
257 static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
258                                   unsigned int serv_est)
259 {
260         int idx;
261         unsigned int oldse;
262         struct imp_at *at;
263
264         LASSERT(req->rq_import);
265         at = &req->rq_import->imp_at;
266
267         idx = import_at_get_index(req->rq_import, req->rq_request_portal);
268         /*
269          * max service estimates are tracked on the server side,
270          * so just keep minimal history here
271          */
272         oldse = at_measured(&at->iat_service_estimate[idx], serv_est);
273         if (oldse != 0)
274                 CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d has changed from %d to %d\n",
275                        req->rq_import->imp_obd->obd_name, req->rq_request_portal,
276                        oldse, at_get(&at->iat_service_estimate[idx]));
277 }
278
279 /* Expected network latency per remote node (secs) */
280 int ptlrpc_at_get_net_latency(struct ptlrpc_request *req)
281 {
282         return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency);
283 }
284
285 /* Adjust expected network latency */
286 void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
287                                unsigned int service_time)
288 {
289         unsigned int nl, oldnl;
290         struct imp_at *at;
291         time64_t now = ktime_get_real_seconds();
292
293         LASSERT(req->rq_import);
294
295         if (service_time > now - req->rq_sent + 3) {
296                 /*
297                  * bz16408, however, this can also happen if early reply
298                  * is lost and client RPC is expired and resent, early reply
299                  * or reply of original RPC can still be fit in reply buffer
300                  * of resent RPC, now client is measuring time from the
301                  * resent time, but server sent back service time of original
302                  * RPC.
303                  */
304                 CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ?
305                        D_ADAPTTO : D_WARNING,
306                        "Reported service time %u > total measured time "
307                        CFS_DURATION_T"\n", service_time,
308                        (long)(now - req->rq_sent));
309                 return;
310         }
311
312         /* Network latency is total time less server processing time */
313         nl = max_t(int, now - req->rq_sent -
314                         service_time, 0) + 1; /* st rounding */
315         at = &req->rq_import->imp_at;
316
317         oldnl = at_measured(&at->iat_net_latency, nl);
318         if (oldnl != 0)
319                 CDEBUG(D_ADAPTTO, "The network latency for %s (nid %s) has changed from %d to %d\n",
320                        req->rq_import->imp_obd->obd_name,
321                        obd_uuid2str(
322                                &req->rq_import->imp_connection->c_remote_uuid),
323                        oldnl, at_get(&at->iat_net_latency));
324 }
325
326 static int unpack_reply(struct ptlrpc_request *req)
327 {
328         int rc;
329
330         if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) {
331                 rc = ptlrpc_unpack_rep_msg(req, req->rq_replen);
332                 if (rc) {
333                         DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);
334                         return -EPROTO;
335                 }
336         }
337
338         rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
339         if (rc) {
340                 DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);
341                 return -EPROTO;
342         }
343         return 0;
344 }
345
346 /**
347  * Handle an early reply message, called with the rq_lock held.
348  * If anything goes wrong just ignore it - same as if it never happened
349  */
350 static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req)
351         __must_hold(&req->rq_lock)
352 {
353         struct ptlrpc_request *early_req;
354         time64_t olddl;
355         int rc;
356
357         req->rq_early = 0;
358         spin_unlock(&req->rq_lock);
359
360         rc = sptlrpc_cli_unwrap_early_reply(req, &early_req);
361         if (rc) {
362                 spin_lock(&req->rq_lock);
363                 return rc;
364         }
365
366         rc = unpack_reply(early_req);
367         if (rc) {
368                 sptlrpc_cli_finish_early_reply(early_req);
369                 spin_lock(&req->rq_lock);
370                 return rc;
371         }
372
373         /*
374          * Use new timeout value just to adjust the local value for this
375          * request, don't include it into at_history. It is unclear yet why
376          * service time increased and should it be counted or skipped, e.g.
377          * that can be recovery case or some error or server, the real reply
378          * will add all new data if it is worth to add.
379          */
380         req->rq_timeout = lustre_msg_get_timeout(early_req->rq_repmsg);
381         lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
382
383         /* Network latency can be adjusted, it is pure network delays */
384         ptlrpc_at_adj_net_latency(req,
385                                   lustre_msg_get_service_time(early_req->rq_repmsg));
386
387         sptlrpc_cli_finish_early_reply(early_req);
388
389         spin_lock(&req->rq_lock);
390         olddl = req->rq_deadline;
391         /*
392          * server assumes it now has rq_timeout from when the request
393          * arrived, so the client should give it at least that long.
394          * since we don't know the arrival time we'll use the original
395          * sent time
396          */
397         req->rq_deadline = req->rq_sent + req->rq_timeout +
398                            ptlrpc_at_get_net_latency(req);
399
400         DEBUG_REQ(D_ADAPTTO, req,
401                   "Early reply #%d, new deadline in %lds (%lds)",
402                   req->rq_early_count,
403                   (long)(req->rq_deadline - ktime_get_real_seconds()),
404                   (long)(req->rq_deadline - olddl));
405
406         return rc;
407 }
408
409 static struct kmem_cache *request_cache;
410
411 int ptlrpc_request_cache_init(void)
412 {
413         request_cache = kmem_cache_create("ptlrpc_cache",
414                                           sizeof(struct ptlrpc_request),
415                                           0, SLAB_HWCACHE_ALIGN, NULL);
416         return !request_cache ? -ENOMEM : 0;
417 }
418
419 void ptlrpc_request_cache_fini(void)
420 {
421         kmem_cache_destroy(request_cache);
422 }
423
424 struct ptlrpc_request *ptlrpc_request_cache_alloc(gfp_t flags)
425 {
426         struct ptlrpc_request *req;
427
428         req = kmem_cache_zalloc(request_cache, flags);
429         return req;
430 }
431
432 void ptlrpc_request_cache_free(struct ptlrpc_request *req)
433 {
434         kmem_cache_free(request_cache, req);
435 }
436
437 /**
438  * Wind down request pool \a pool.
439  * Frees all requests from the pool too
440  */
441 void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
442 {
443         struct list_head *l, *tmp;
444         struct ptlrpc_request *req;
445
446         spin_lock(&pool->prp_lock);
447         list_for_each_safe(l, tmp, &pool->prp_req_list) {
448                 req = list_entry(l, struct ptlrpc_request, rq_list);
449                 list_del(&req->rq_list);
450                 LASSERT(req->rq_reqbuf);
451                 LASSERT(req->rq_reqbuf_len == pool->prp_rq_size);
452                 kvfree(req->rq_reqbuf);
453                 ptlrpc_request_cache_free(req);
454         }
455         spin_unlock(&pool->prp_lock);
456         kfree(pool);
457 }
458 EXPORT_SYMBOL(ptlrpc_free_rq_pool);
459
460 /**
461  * Allocates, initializes and adds \a num_rq requests to the pool \a pool
462  */
463 int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
464 {
465         int i;
466         int size = 1;
467
468         while (size < pool->prp_rq_size)
469                 size <<= 1;
470
471         LASSERTF(list_empty(&pool->prp_req_list) ||
472                  size == pool->prp_rq_size,
473                  "Trying to change pool size with nonempty pool from %d to %d bytes\n",
474                  pool->prp_rq_size, size);
475
476         spin_lock(&pool->prp_lock);
477         pool->prp_rq_size = size;
478         for (i = 0; i < num_rq; i++) {
479                 struct ptlrpc_request *req;
480                 struct lustre_msg *msg;
481
482                 spin_unlock(&pool->prp_lock);
483                 req = ptlrpc_request_cache_alloc(GFP_NOFS);
484                 if (!req)
485                         return i;
486                 msg = libcfs_kvzalloc(size, GFP_NOFS);
487                 if (!msg) {
488                         ptlrpc_request_cache_free(req);
489                         return i;
490                 }
491                 req->rq_reqbuf = msg;
492                 req->rq_reqbuf_len = size;
493                 req->rq_pool = pool;
494                 spin_lock(&pool->prp_lock);
495                 list_add_tail(&req->rq_list, &pool->prp_req_list);
496         }
497         spin_unlock(&pool->prp_lock);
498         return num_rq;
499 }
500 EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
501
502 /**
503  * Create and initialize new request pool with given attributes:
504  * \a num_rq - initial number of requests to create for the pool
505  * \a msgsize - maximum message size possible for requests in thid pool
506  * \a populate_pool - function to be called when more requests need to be added
507  *                  to the pool
508  * Returns pointer to newly created pool or NULL on error.
509  */
510 struct ptlrpc_request_pool *
511 ptlrpc_init_rq_pool(int num_rq, int msgsize,
512                     int (*populate_pool)(struct ptlrpc_request_pool *, int))
513 {
514         struct ptlrpc_request_pool *pool;
515
516         pool = kzalloc(sizeof(struct ptlrpc_request_pool), GFP_NOFS);
517         if (!pool)
518                 return NULL;
519
520         /*
521          * Request next power of two for the allocation, because internally
522          * kernel would do exactly this
523          */
524
525         spin_lock_init(&pool->prp_lock);
526         INIT_LIST_HEAD(&pool->prp_req_list);
527         pool->prp_rq_size = msgsize + SPTLRPC_MAX_PAYLOAD;
528         pool->prp_populate = populate_pool;
529
530         populate_pool(pool, num_rq);
531
532         return pool;
533 }
534 EXPORT_SYMBOL(ptlrpc_init_rq_pool);
535
536 /**
537  * Fetches one request from pool \a pool
538  */
539 static struct ptlrpc_request *
540 ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
541 {
542         struct ptlrpc_request *request;
543         struct lustre_msg *reqbuf;
544
545         if (!pool)
546                 return NULL;
547
548         spin_lock(&pool->prp_lock);
549
550         /*
551          * See if we have anything in a pool, and bail out if nothing,
552          * in writeout path, where this matters, this is safe to do, because
553          * nothing is lost in this case, and when some in-flight requests
554          * complete, this code will be called again.
555          */
556         if (unlikely(list_empty(&pool->prp_req_list))) {
557                 spin_unlock(&pool->prp_lock);
558                 return NULL;
559         }
560
561         request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
562                              rq_list);
563         list_del_init(&request->rq_list);
564         spin_unlock(&pool->prp_lock);
565
566         LASSERT(request->rq_reqbuf);
567         LASSERT(request->rq_pool);
568
569         reqbuf = request->rq_reqbuf;
570         memset(request, 0, sizeof(*request));
571         request->rq_reqbuf = reqbuf;
572         request->rq_reqbuf_len = pool->prp_rq_size;
573         request->rq_pool = pool;
574
575         return request;
576 }
577
578 /**
579  * Returns freed \a request to pool.
580  */
581 static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
582 {
583         struct ptlrpc_request_pool *pool = request->rq_pool;
584
585         spin_lock(&pool->prp_lock);
586         LASSERT(list_empty(&request->rq_list));
587         LASSERT(!request->rq_receiving_reply);
588         list_add_tail(&request->rq_list, &pool->prp_req_list);
589         spin_unlock(&pool->prp_lock);
590 }
591
592 int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
593                              __u32 version, int opcode, char **bufs,
594                              struct ptlrpc_cli_ctx *ctx)
595 {
596         int count;
597         struct obd_import *imp;
598         __u32 *lengths;
599         int rc;
600
601         count = req_capsule_filled_sizes(&request->rq_pill, RCL_CLIENT);
602         imp = request->rq_import;
603         lengths = request->rq_pill.rc_area[RCL_CLIENT];
604
605         if (unlikely(ctx)) {
606                 request->rq_cli_ctx = sptlrpc_cli_ctx_get(ctx);
607         } else {
608                 rc = sptlrpc_req_get_ctx(request);
609                 if (rc)
610                         goto out_free;
611         }
612         sptlrpc_req_set_flavor(request, opcode);
613
614         rc = lustre_pack_request(request, imp->imp_msg_magic, count,
615                                  lengths, bufs);
616         if (rc)
617                 goto out_ctx;
618
619         lustre_msg_add_version(request->rq_reqmsg, version);
620         request->rq_send_state = LUSTRE_IMP_FULL;
621         request->rq_type = PTL_RPC_MSG_REQUEST;
622
623         request->rq_req_cbid.cbid_fn = request_out_callback;
624         request->rq_req_cbid.cbid_arg = request;
625
626         request->rq_reply_cbid.cbid_fn = reply_in_callback;
627         request->rq_reply_cbid.cbid_arg = request;
628
629         request->rq_reply_deadline = 0;
630         request->rq_bulk_deadline = 0;
631         request->rq_req_deadline = 0;
632         request->rq_phase = RQ_PHASE_NEW;
633         request->rq_next_phase = RQ_PHASE_UNDEFINED;
634
635         request->rq_request_portal = imp->imp_client->cli_request_portal;
636         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
637
638         ptlrpc_at_set_req_timeout(request);
639
640         request->rq_xid = ptlrpc_next_xid();
641         lustre_msg_set_opc(request->rq_reqmsg, opcode);
642
643         /* Let's setup deadline for req/reply/bulk unlink for opcode. */
644         if (cfs_fail_val == opcode) {
645                 time_t *fail_t = NULL, *fail2_t = NULL;
646
647                 if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) {
648                         fail_t = &request->rq_bulk_deadline;
649                 } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
650                         fail_t = &request->rq_reply_deadline;
651                 } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK)) {
652                         fail_t = &request->rq_req_deadline;
653                 } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK)) {
654                         fail_t = &request->rq_reply_deadline;
655                         fail2_t = &request->rq_bulk_deadline;
656                 }
657
658                 if (fail_t) {
659                         *fail_t = ktime_get_real_seconds() + LONG_UNLINK;
660
661                         if (fail2_t)
662                                 *fail2_t = ktime_get_real_seconds() +
663                                                  LONG_UNLINK;
664
665                         /* The RPC is infected, let the test change the
666                          * fail_loc
667                          */
668                         set_current_state(TASK_UNINTERRUPTIBLE);
669                         schedule_timeout(cfs_time_seconds(2));
670                         set_current_state(TASK_RUNNING);
671                 }
672         }
673
674         return 0;
675
676 out_ctx:
677         LASSERT(!request->rq_pool);
678         sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1);
679 out_free:
680         class_import_put(imp);
681         return rc;
682 }
683 EXPORT_SYMBOL(ptlrpc_request_bufs_pack);
684
685 /**
686  * Pack request buffers for network transfer, performing necessary encryption
687  * steps if necessary.
688  */
689 int ptlrpc_request_pack(struct ptlrpc_request *request,
690                         __u32 version, int opcode)
691 {
692         int rc;
693
694         rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
695         if (rc)
696                 return rc;
697
698         /*
699          * For some old 1.8 clients (< 1.8.7), they will LASSERT the size of
700          * ptlrpc_body sent from server equal to local ptlrpc_body size, so we
701          * have to send old ptlrpc_body to keep interoperability with these
702          * clients.
703          *
704          * Only three kinds of server->client RPCs so far:
705          *  - LDLM_BL_CALLBACK
706          *  - LDLM_CP_CALLBACK
707          *  - LDLM_GL_CALLBACK
708          *
709          * XXX This should be removed whenever we drop the interoperability with
710          *     the these old clients.
711          */
712         if (opcode == LDLM_BL_CALLBACK || opcode == LDLM_CP_CALLBACK ||
713             opcode == LDLM_GL_CALLBACK)
714                 req_capsule_shrink(&request->rq_pill, &RMF_PTLRPC_BODY,
715                                    sizeof(struct ptlrpc_body_v2), RCL_CLIENT);
716
717         return rc;
718 }
719 EXPORT_SYMBOL(ptlrpc_request_pack);
720
721 /**
722  * Helper function to allocate new request on import \a imp
723  * and possibly using existing request from pool \a pool if provided.
724  * Returns allocated request structure with import field filled or
725  * NULL on error.
726  */
727 static inline
728 struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp,
729                                               struct ptlrpc_request_pool *pool)
730 {
731         struct ptlrpc_request *request;
732
733         request = ptlrpc_request_cache_alloc(GFP_NOFS);
734
735         if (!request && pool)
736                 request = ptlrpc_prep_req_from_pool(pool);
737
738         if (request) {
739                 ptlrpc_cli_req_init(request);
740
741                 LASSERTF((unsigned long)imp > 0x1000, "%p", imp);
742                 LASSERT(imp != LP_POISON);
743                 LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n",
744                          imp->imp_client);
745                 LASSERT(imp->imp_client != LP_POISON);
746
747                 request->rq_import = class_import_get(imp);
748         } else {
749                 CERROR("request allocation out of memory\n");
750         }
751
752         return request;
753 }
754
755 /**
756  * Helper function for creating a request.
757  * Calls __ptlrpc_request_alloc to allocate new request structure and inits
758  * buffer structures according to capsule template \a format.
759  * Returns allocated request structure pointer or NULL on error.
760  */
761 static struct ptlrpc_request *
762 ptlrpc_request_alloc_internal(struct obd_import *imp,
763                               struct ptlrpc_request_pool *pool,
764                               const struct req_format *format)
765 {
766         struct ptlrpc_request *request;
767
768         request = __ptlrpc_request_alloc(imp, pool);
769         if (!request)
770                 return NULL;
771
772         req_capsule_init(&request->rq_pill, request, RCL_CLIENT);
773         req_capsule_set(&request->rq_pill, format);
774         return request;
775 }
776
777 /**
778  * Allocate new request structure for import \a imp and initialize its
779  * buffer structure according to capsule template \a format.
780  */
781 struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp,
782                                             const struct req_format *format)
783 {
784         return ptlrpc_request_alloc_internal(imp, NULL, format);
785 }
786 EXPORT_SYMBOL(ptlrpc_request_alloc);
787
788 /**
789  * Allocate new request structure for import \a imp from pool \a pool and
790  * initialize its buffer structure according to capsule template \a format.
791  */
792 struct ptlrpc_request *ptlrpc_request_alloc_pool(struct obd_import *imp,
793                                                  struct ptlrpc_request_pool *pool,
794                                                  const struct req_format *format)
795 {
796         return ptlrpc_request_alloc_internal(imp, pool, format);
797 }
798 EXPORT_SYMBOL(ptlrpc_request_alloc_pool);
799
800 /**
801  * For requests not from pool, free memory of the request structure.
802  * For requests obtained from a pool earlier, return request back to pool.
803  */
804 void ptlrpc_request_free(struct ptlrpc_request *request)
805 {
806         if (request->rq_pool)
807                 __ptlrpc_free_req_to_pool(request);
808         else
809                 ptlrpc_request_cache_free(request);
810 }
811 EXPORT_SYMBOL(ptlrpc_request_free);
812
813 /**
814  * Allocate new request for operation \a opcode and immediately pack it for
815  * network transfer.
816  * Only used for simple requests like OBD_PING where the only important
817  * part of the request is operation itself.
818  * Returns allocated request or NULL on error.
819  */
820 struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp,
821                                                  const struct req_format *format,
822                                                  __u32 version, int opcode)
823 {
824         struct ptlrpc_request *req = ptlrpc_request_alloc(imp, format);
825         int rc;
826
827         if (req) {
828                 rc = ptlrpc_request_pack(req, version, opcode);
829                 if (rc) {
830                         ptlrpc_request_free(req);
831                         req = NULL;
832                 }
833         }
834         return req;
835 }
836 EXPORT_SYMBOL(ptlrpc_request_alloc_pack);
837
838 /**
839  * Allocate and initialize new request set structure on the current CPT.
840  * Returns a pointer to the newly allocated set structure or NULL on error.
841  */
842 struct ptlrpc_request_set *ptlrpc_prep_set(void)
843 {
844         struct ptlrpc_request_set *set;
845         int cpt;
846
847         cpt = cfs_cpt_current(cfs_cpt_table, 0);
848         set = kzalloc_node(sizeof(*set), GFP_NOFS,
849                            cfs_cpt_spread_node(cfs_cpt_table, cpt));
850         if (!set)
851                 return NULL;
852         atomic_set(&set->set_refcount, 1);
853         INIT_LIST_HEAD(&set->set_requests);
854         init_waitqueue_head(&set->set_waitq);
855         atomic_set(&set->set_new_count, 0);
856         atomic_set(&set->set_remaining, 0);
857         spin_lock_init(&set->set_new_req_lock);
858         INIT_LIST_HEAD(&set->set_new_requests);
859         INIT_LIST_HEAD(&set->set_cblist);
860         set->set_max_inflight = UINT_MAX;
861         set->set_producer = NULL;
862         set->set_producer_arg = NULL;
863         set->set_rc = 0;
864
865         return set;
866 }
867 EXPORT_SYMBOL(ptlrpc_prep_set);
868
869 /**
870  * Allocate and initialize new request set structure with flow control
871  * extension. This extension allows to control the number of requests in-flight
872  * for the whole set. A callback function to generate requests must be provided
873  * and the request set will keep the number of requests sent over the wire to
874  * @max_inflight.
875  * Returns a pointer to the newly allocated set structure or NULL on error.
876  */
877 struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
878                                              void *arg)
879
880 {
881         struct ptlrpc_request_set *set;
882
883         set = ptlrpc_prep_set();
884         if (!set)
885                 return NULL;
886
887         set->set_max_inflight = max;
888         set->set_producer = func;
889         set->set_producer_arg = arg;
890
891         return set;
892 }
893
894 /**
895  * Wind down and free request set structure previously allocated with
896  * ptlrpc_prep_set.
897  * Ensures that all requests on the set have completed and removes
898  * all requests from the request list in a set.
899  * If any unsent request happen to be on the list, pretends that they got
900  * an error in flight and calls their completion handler.
901  */
902 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
903 {
904         struct list_head *tmp;
905         struct list_head *next;
906         int expected_phase;
907         int n = 0;
908
909         /* Requests on the set should either all be completed, or all be new */
910         expected_phase = (atomic_read(&set->set_remaining) == 0) ?
911                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
912         list_for_each(tmp, &set->set_requests) {
913                 struct ptlrpc_request *req =
914                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
915
916                 LASSERT(req->rq_phase == expected_phase);
917                 n++;
918         }
919
920         LASSERTF(atomic_read(&set->set_remaining) == 0 ||
921                  atomic_read(&set->set_remaining) == n, "%d / %d\n",
922                  atomic_read(&set->set_remaining), n);
923
924         list_for_each_safe(tmp, next, &set->set_requests) {
925                 struct ptlrpc_request *req =
926                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
927                 list_del_init(&req->rq_set_chain);
928
929                 LASSERT(req->rq_phase == expected_phase);
930
931                 if (req->rq_phase == RQ_PHASE_NEW) {
932                         ptlrpc_req_interpret(NULL, req, -EBADR);
933                         atomic_dec(&set->set_remaining);
934                 }
935
936                 spin_lock(&req->rq_lock);
937                 req->rq_set = NULL;
938                 req->rq_invalid_rqset = 0;
939                 spin_unlock(&req->rq_lock);
940
941                 ptlrpc_req_finished(req);
942         }
943
944         LASSERT(atomic_read(&set->set_remaining) == 0);
945
946         ptlrpc_reqset_put(set);
947 }
948 EXPORT_SYMBOL(ptlrpc_set_destroy);
949
950 /**
951  * Add a new request to the general purpose request set.
952  * Assumes request reference from the caller.
953  */
954 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
955                         struct ptlrpc_request *req)
956 {
957         LASSERT(list_empty(&req->rq_set_chain));
958
959         /* The set takes over the caller's request reference */
960         list_add_tail(&req->rq_set_chain, &set->set_requests);
961         req->rq_set = set;
962         atomic_inc(&set->set_remaining);
963         req->rq_queued_time = cfs_time_current();
964
965         if (req->rq_reqmsg)
966                 lustre_msg_set_jobid(req->rq_reqmsg, NULL);
967
968         if (set->set_producer)
969                 /*
970                  * If the request set has a producer callback, the RPC must be
971                  * sent straight away
972                  */
973                 ptlrpc_send_new_req(req);
974 }
975 EXPORT_SYMBOL(ptlrpc_set_add_req);
976
977 /**
978  * Add a request to a request with dedicated server thread
979  * and wake the thread to make any necessary processing.
980  * Currently only used for ptlrpcd.
981  */
982 void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
983                             struct ptlrpc_request *req)
984 {
985         struct ptlrpc_request_set *set = pc->pc_set;
986         int count, i;
987
988         LASSERT(!req->rq_set);
989         LASSERT(test_bit(LIOD_STOP, &pc->pc_flags) == 0);
990
991         spin_lock(&set->set_new_req_lock);
992         /* The set takes over the caller's request reference.  */
993         req->rq_set = set;
994         req->rq_queued_time = cfs_time_current();
995         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
996         count = atomic_inc_return(&set->set_new_count);
997         spin_unlock(&set->set_new_req_lock);
998
999         /* Only need to call wakeup once for the first entry. */
1000         if (count == 1) {
1001                 wake_up(&set->set_waitq);
1002
1003                 /*
1004                  * XXX: It maybe unnecessary to wakeup all the partners. But to
1005                  *      guarantee the async RPC can be processed ASAP, we have
1006                  *      no other better choice. It maybe fixed in future.
1007                  */
1008                 for (i = 0; i < pc->pc_npartners; i++)
1009                         wake_up(&pc->pc_partners[i]->pc_set->set_waitq);
1010         }
1011 }
1012
1013 /**
1014  * Based on the current state of the import, determine if the request
1015  * can be sent, is an error, or should be delayed.
1016  *
1017  * Returns true if this request should be delayed. If false, and
1018  * *status is set, then the request can not be sent and *status is the
1019  * error code.  If false and status is 0, then request can be sent.
1020  *
1021  * The imp->imp_lock must be held.
1022  */
1023 static int ptlrpc_import_delay_req(struct obd_import *imp,
1024                                    struct ptlrpc_request *req, int *status)
1025 {
1026         int delay = 0;
1027
1028         *status = 0;
1029
1030         if (req->rq_ctx_init || req->rq_ctx_fini) {
1031                 /* always allow ctx init/fini rpc go through */
1032         } else if (imp->imp_state == LUSTRE_IMP_NEW) {
1033                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
1034                 *status = -EIO;
1035         } else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
1036                 /* pings may safely race with umount */
1037                 DEBUG_REQ(lustre_msg_get_opc(req->rq_reqmsg) == OBD_PING ?
1038                           D_HA : D_ERROR, req, "IMP_CLOSED ");
1039                 *status = -EIO;
1040         } else if (ptlrpc_send_limit_expired(req)) {
1041                 /* probably doesn't need to be a D_ERROR after initial testing */
1042                 DEBUG_REQ(D_HA, req, "send limit expired ");
1043                 *status = -ETIMEDOUT;
1044         } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
1045                    imp->imp_state == LUSTRE_IMP_CONNECTING) {
1046                 /* allow CONNECT even if import is invalid */
1047                 if (atomic_read(&imp->imp_inval_count) != 0) {
1048                         DEBUG_REQ(D_ERROR, req, "invalidate in flight");
1049                         *status = -EIO;
1050                 }
1051         } else if (imp->imp_invalid || imp->imp_obd->obd_no_recov) {
1052                 if (!imp->imp_deactive)
1053                         DEBUG_REQ(D_NET, req, "IMP_INVALID");
1054                 *status = -ESHUTDOWN; /* bz 12940 */
1055         } else if (req->rq_import_generation != imp->imp_generation) {
1056                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
1057                 *status = -EIO;
1058         } else if (req->rq_send_state != imp->imp_state) {
1059                 /* invalidate in progress - any requests should be drop */
1060                 if (atomic_read(&imp->imp_inval_count) != 0) {
1061                         DEBUG_REQ(D_ERROR, req, "invalidate in flight");
1062                         *status = -EIO;
1063                 } else if (imp->imp_dlm_fake || req->rq_no_delay) {
1064                         *status = -EWOULDBLOCK;
1065                 } else if (req->rq_allow_replay &&
1066                           (imp->imp_state == LUSTRE_IMP_REPLAY ||
1067                            imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS ||
1068                            imp->imp_state == LUSTRE_IMP_REPLAY_WAIT ||
1069                            imp->imp_state == LUSTRE_IMP_RECOVER)) {
1070                         DEBUG_REQ(D_HA, req, "allow during recovery.\n");
1071                 } else {
1072                         delay = 1;
1073                 }
1074         }
1075
1076         return delay;
1077 }
1078
1079 /**
1080  * Decide if the error message should be printed to the console or not.
1081  * Makes its decision based on request type, status, and failure frequency.
1082  *
1083  * \param[in] req  request that failed and may need a console message
1084  *
1085  * \retval false if no message should be printed
1086  * \retval true  if console message should be printed
1087  */
1088 static bool ptlrpc_console_allow(struct ptlrpc_request *req)
1089 {
1090         __u32 opc;
1091
1092         LASSERT(req->rq_reqmsg);
1093         opc = lustre_msg_get_opc(req->rq_reqmsg);
1094
1095         /* Suppress particular reconnect errors which are to be expected. */
1096         if (opc == OST_CONNECT || opc == MDS_CONNECT || opc == MGS_CONNECT) {
1097                 int err;
1098
1099                 /* Suppress timed out reconnect requests */
1100                 if (lustre_handle_is_used(&req->rq_import->imp_remote_handle) ||
1101                     req->rq_timedout)
1102                         return false;
1103
1104                 /*
1105                  * Suppress most unavailable/again reconnect requests, but
1106                  * print occasionally so it is clear client is trying to
1107                  * connect to a server where no target is running.
1108                  */
1109                 err = lustre_msg_get_status(req->rq_repmsg);
1110                 if ((err == -ENODEV || err == -EAGAIN) &&
1111                     req->rq_import->imp_conn_cnt % 30 != 20)
1112                         return false;
1113         }
1114
1115         return true;
1116 }
1117
1118 /**
1119  * Check request processing status.
1120  * Returns the status.
1121  */
1122 static int ptlrpc_check_status(struct ptlrpc_request *req)
1123 {
1124         int err;
1125
1126         err = lustre_msg_get_status(req->rq_repmsg);
1127         if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
1128                 struct obd_import *imp = req->rq_import;
1129                 lnet_nid_t nid = imp->imp_connection->c_peer.nid;
1130                 __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1131
1132                 if (ptlrpc_console_allow(req))
1133                         LCONSOLE_ERROR_MSG(0x011, "%s: operation %s to node %s failed: rc = %d\n",
1134                                            imp->imp_obd->obd_name,
1135                                            ll_opcode2str(opc),
1136                                            libcfs_nid2str(nid), err);
1137                 return err < 0 ? err : -EINVAL;
1138         }
1139
1140         if (err < 0)
1141                 DEBUG_REQ(D_INFO, req, "status is %d", err);
1142         else if (err > 0)
1143                 /* XXX: translate this error from net to host */
1144                 DEBUG_REQ(D_INFO, req, "status is %d", err);
1145
1146         return err;
1147 }
1148
1149 /**
1150  * save pre-versions of objects into request for replay.
1151  * Versions are obtained from server reply.
1152  * used for VBR.
1153  */
1154 static void ptlrpc_save_versions(struct ptlrpc_request *req)
1155 {
1156         struct lustre_msg *repmsg = req->rq_repmsg;
1157         struct lustre_msg *reqmsg = req->rq_reqmsg;
1158         __u64 *versions = lustre_msg_get_versions(repmsg);
1159
1160         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1161                 return;
1162
1163         LASSERT(versions);
1164         lustre_msg_set_versions(reqmsg, versions);
1165         CDEBUG(D_INFO, "Client save versions [%#llx/%#llx]\n",
1166                versions[0], versions[1]);
1167 }
1168
1169 /**
1170  * Callback function called when client receives RPC reply for \a req.
1171  * Returns 0 on success or error code.
1172  * The return value would be assigned to req->rq_status by the caller
1173  * as request processing status.
1174  * This function also decides if the request needs to be saved for later replay.
1175  */
1176 static int after_reply(struct ptlrpc_request *req)
1177 {
1178         struct obd_import *imp = req->rq_import;
1179         struct obd_device *obd = req->rq_import->imp_obd;
1180         int rc;
1181         struct timespec64 work_start;
1182         long timediff;
1183
1184         LASSERT(obd);
1185         /* repbuf must be unlinked */
1186         LASSERT(!req->rq_receiving_reply && req->rq_reply_unlinked);
1187
1188         if (req->rq_reply_truncated) {
1189                 if (ptlrpc_no_resend(req)) {
1190                         DEBUG_REQ(D_ERROR, req, "reply buffer overflow, expected: %d, actual size: %d",
1191                                   req->rq_nob_received, req->rq_repbuf_len);
1192                         return -EOVERFLOW;
1193                 }
1194
1195                 sptlrpc_cli_free_repbuf(req);
1196                 /*
1197                  * Pass the required reply buffer size (include space for early
1198                  * reply).  NB: no need to round up because alloc_repbuf will
1199                  * round it up
1200                  */
1201                 req->rq_replen       = req->rq_nob_received;
1202                 req->rq_nob_received = 0;
1203                 spin_lock(&req->rq_lock);
1204                 req->rq_resend       = 1;
1205                 spin_unlock(&req->rq_lock);
1206                 return 0;
1207         }
1208
1209         /*
1210          * NB Until this point, the whole of the incoming message,
1211          * including buflens, status etc is in the sender's byte order.
1212          */
1213         rc = sptlrpc_cli_unwrap_reply(req);
1214         if (rc) {
1215                 DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
1216                 return rc;
1217         }
1218
1219         /* Security layer unwrap might ask resend this request. */
1220         if (req->rq_resend)
1221                 return 0;
1222
1223         rc = unpack_reply(req);
1224         if (rc)
1225                 return rc;
1226
1227         /* retry indefinitely on EINPROGRESS */
1228         if (lustre_msg_get_status(req->rq_repmsg) == -EINPROGRESS &&
1229             ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) {
1230                 time64_t now = ktime_get_real_seconds();
1231
1232                 DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS");
1233                 spin_lock(&req->rq_lock);
1234                 req->rq_resend = 1;
1235                 spin_unlock(&req->rq_lock);
1236                 req->rq_nr_resend++;
1237
1238                 /* allocate new xid to avoid reply reconstruction */
1239                 if (!req->rq_bulk) {
1240                         /* new xid is already allocated for bulk in ptlrpc_check_set() */
1241                         req->rq_xid = ptlrpc_next_xid();
1242                         DEBUG_REQ(D_RPCTRACE, req, "Allocating new xid for resend on EINPROGRESS");
1243                 }
1244
1245                 /* Readjust the timeout for current conditions */
1246                 ptlrpc_at_set_req_timeout(req);
1247                 /*
1248                  * delay resend to give a chance to the server to get ready.
1249                  * The delay is increased by 1s on every resend and is capped to
1250                  * the current request timeout (i.e. obd_timeout if AT is off,
1251                  * or AT service time x 125% + 5s, see at_est2timeout)
1252                  */
1253                 if (req->rq_nr_resend > req->rq_timeout)
1254                         req->rq_sent = now + req->rq_timeout;
1255                 else
1256                         req->rq_sent = now + req->rq_nr_resend;
1257
1258                 return 0;
1259         }
1260
1261         ktime_get_real_ts64(&work_start);
1262         timediff = (work_start.tv_sec - req->rq_sent_tv.tv_sec) * USEC_PER_SEC +
1263                    (work_start.tv_nsec - req->rq_sent_tv.tv_nsec) /
1264                                                                  NSEC_PER_USEC;
1265         if (obd->obd_svc_stats) {
1266                 lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQWAIT_CNTR,
1267                                     timediff);
1268                 ptlrpc_lprocfs_rpc_sent(req, timediff);
1269         }
1270
1271         if (lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_REPLY &&
1272             lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_ERR) {
1273                 DEBUG_REQ(D_ERROR, req, "invalid packet received (type=%u)",
1274                           lustre_msg_get_type(req->rq_repmsg));
1275                 return -EPROTO;
1276         }
1277
1278         if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
1279                 CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val);
1280         ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg));
1281         ptlrpc_at_adj_net_latency(req,
1282                                   lustre_msg_get_service_time(req->rq_repmsg));
1283
1284         rc = ptlrpc_check_status(req);
1285         imp->imp_connect_error = rc;
1286
1287         if (rc) {
1288                 /*
1289                  * Either we've been evicted, or the server has failed for
1290                  * some reason. Try to reconnect, and if that fails, punt to
1291                  * the upcall.
1292                  */
1293                 if (ptlrpc_recoverable_error(rc)) {
1294                         if (req->rq_send_state != LUSTRE_IMP_FULL ||
1295                             imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
1296                                 return rc;
1297                         }
1298                         ptlrpc_request_handle_notconn(req);
1299                         return rc;
1300                 }
1301         } else {
1302                 /*
1303                  * Let's look if server sent slv. Do it only for RPC with
1304                  * rc == 0.
1305                  */
1306                 ldlm_cli_update_pool(req);
1307         }
1308
1309         /* Store transno in reqmsg for replay. */
1310         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
1311                 req->rq_transno = lustre_msg_get_transno(req->rq_repmsg);
1312                 lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno);
1313         }
1314
1315         if (imp->imp_replayable) {
1316                 spin_lock(&imp->imp_lock);
1317                 /*
1318                  * No point in adding already-committed requests to the replay
1319                  * list, we will just remove them immediately. b=9829
1320                  */
1321                 if (req->rq_transno != 0 &&
1322                     (req->rq_transno >
1323                      lustre_msg_get_last_committed(req->rq_repmsg) ||
1324                      req->rq_replay)) {
1325                         /* version recovery */
1326                         ptlrpc_save_versions(req);
1327                         ptlrpc_retain_replayable_request(req, imp);
1328                 } else if (req->rq_commit_cb &&
1329                            list_empty(&req->rq_replay_list)) {
1330                         /*
1331                          * NB: don't call rq_commit_cb if it's already on
1332                          * rq_replay_list, ptlrpc_free_committed() will call
1333                          * it later, see LU-3618 for details
1334                          */
1335                         spin_unlock(&imp->imp_lock);
1336                         req->rq_commit_cb(req);
1337                         spin_lock(&imp->imp_lock);
1338                 }
1339
1340                 /* Replay-enabled imports return commit-status information. */
1341                 if (lustre_msg_get_last_committed(req->rq_repmsg)) {
1342                         imp->imp_peer_committed_transno =
1343                                 lustre_msg_get_last_committed(req->rq_repmsg);
1344                 }
1345
1346                 ptlrpc_free_committed(imp);
1347
1348                 if (!list_empty(&imp->imp_replay_list)) {
1349                         struct ptlrpc_request *last;
1350
1351                         last = list_entry(imp->imp_replay_list.prev,
1352                                           struct ptlrpc_request,
1353                                           rq_replay_list);
1354                         /*
1355                          * Requests with rq_replay stay on the list even if no
1356                          * commit is expected.
1357                          */
1358                         if (last->rq_transno > imp->imp_peer_committed_transno)
1359                                 ptlrpc_pinger_commit_expected(imp);
1360                 }
1361
1362                 spin_unlock(&imp->imp_lock);
1363         }
1364
1365         return rc;
1366 }
1367
1368 /**
1369  * Helper function to send request \a req over the network for the first time
1370  * Also adjusts request phase.
1371  * Returns 0 on success or error code.
1372  */
1373 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
1374 {
1375         struct obd_import *imp = req->rq_import;
1376         int rc;
1377
1378         LASSERT(req->rq_phase == RQ_PHASE_NEW);
1379         if (req->rq_sent && (req->rq_sent > ktime_get_real_seconds()) &&
1380             (!req->rq_generation_set ||
1381              req->rq_import_generation == imp->imp_generation))
1382                 return 0;
1383
1384         ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
1385
1386         spin_lock(&imp->imp_lock);
1387
1388         if (!req->rq_generation_set)
1389                 req->rq_import_generation = imp->imp_generation;
1390
1391         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1392                 spin_lock(&req->rq_lock);
1393                 req->rq_waiting = 1;
1394                 spin_unlock(&req->rq_lock);
1395
1396                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: (%s != %s)",
1397                           lustre_msg_get_status(req->rq_reqmsg),
1398                           ptlrpc_import_state_name(req->rq_send_state),
1399                           ptlrpc_import_state_name(imp->imp_state));
1400                 LASSERT(list_empty(&req->rq_list));
1401                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1402                 atomic_inc(&req->rq_import->imp_inflight);
1403                 spin_unlock(&imp->imp_lock);
1404                 return 0;
1405         }
1406
1407         if (rc != 0) {
1408                 spin_unlock(&imp->imp_lock);
1409                 req->rq_status = rc;
1410                 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1411                 return rc;
1412         }
1413
1414         LASSERT(list_empty(&req->rq_list));
1415         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1416         atomic_inc(&req->rq_import->imp_inflight);
1417         spin_unlock(&imp->imp_lock);
1418
1419         lustre_msg_set_status(req->rq_reqmsg, current_pid());
1420
1421         rc = sptlrpc_req_refresh_ctx(req, -1);
1422         if (rc) {
1423                 if (req->rq_err) {
1424                         req->rq_status = rc;
1425                         return 1;
1426                 }
1427                 spin_lock(&req->rq_lock);
1428                 req->rq_wait_ctx = 1;
1429                 spin_unlock(&req->rq_lock);
1430                 return 0;
1431         }
1432
1433         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
1434                current_comm(),
1435                imp->imp_obd->obd_uuid.uuid,
1436                lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1437                libcfs_nid2str(imp->imp_connection->c_peer.nid),
1438                lustre_msg_get_opc(req->rq_reqmsg));
1439
1440         rc = ptl_send_rpc(req, 0);
1441         if (rc) {
1442                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
1443                 spin_lock(&req->rq_lock);
1444                 req->rq_net_err = 1;
1445                 spin_unlock(&req->rq_lock);
1446                 return rc;
1447         }
1448         return 0;
1449 }
1450
1451 static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set)
1452 {
1453         int remaining, rc;
1454
1455         LASSERT(set->set_producer);
1456
1457         remaining = atomic_read(&set->set_remaining);
1458
1459         /*
1460          * populate the ->set_requests list with requests until we
1461          * reach the maximum number of RPCs in flight for this set
1462          */
1463         while (atomic_read(&set->set_remaining) < set->set_max_inflight) {
1464                 rc = set->set_producer(set, set->set_producer_arg);
1465                 if (rc == -ENOENT) {
1466                         /* no more RPC to produce */
1467                         set->set_producer     = NULL;
1468                         set->set_producer_arg = NULL;
1469                         return 0;
1470                 }
1471         }
1472
1473         return (atomic_read(&set->set_remaining) - remaining);
1474 }
1475
1476 /**
1477  * this sends any unsent RPCs in \a set and returns 1 if all are sent
1478  * and no more replies are expected.
1479  * (it is possible to get less replies than requests sent e.g. due to timed out
1480  * requests or requests that we had trouble to send out)
1481  *
1482  * NOTE: This function contains a potential schedule point (cond_resched()).
1483  */
1484 int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
1485 {
1486         struct list_head *tmp, *next;
1487         struct list_head comp_reqs;
1488         int force_timer_recalc = 0;
1489
1490         if (atomic_read(&set->set_remaining) == 0)
1491                 return 1;
1492
1493         INIT_LIST_HEAD(&comp_reqs);
1494         list_for_each_safe(tmp, next, &set->set_requests) {
1495                 struct ptlrpc_request *req =
1496                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1497                 struct obd_import *imp = req->rq_import;
1498                 int unregistered = 0;
1499                 int rc = 0;
1500
1501                 /*
1502                  * This schedule point is mainly for the ptlrpcd caller of this
1503                  * function.  Most ptlrpc sets are not long-lived and unbounded
1504                  * in length, but at the least the set used by the ptlrpcd is.
1505                  * Since the processing time is unbounded, we need to insert an
1506                  * explicit schedule point to make the thread well-behaved.
1507                  */
1508                 cond_resched();
1509
1510                 if (req->rq_phase == RQ_PHASE_NEW &&
1511                     ptlrpc_send_new_req(req)) {
1512                         force_timer_recalc = 1;
1513                 }
1514
1515                 /* delayed send - skip */
1516                 if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
1517                         continue;
1518
1519                 /* delayed resend - skip */
1520                 if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend &&
1521                     req->rq_sent > ktime_get_real_seconds())
1522                         continue;
1523
1524                 if (!(req->rq_phase == RQ_PHASE_RPC ||
1525                       req->rq_phase == RQ_PHASE_BULK ||
1526                       req->rq_phase == RQ_PHASE_INTERPRET ||
1527                       req->rq_phase == RQ_PHASE_UNREG_RPC ||
1528                       req->rq_phase == RQ_PHASE_UNREG_BULK ||
1529                       req->rq_phase == RQ_PHASE_COMPLETE)) {
1530                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
1531                         LBUG();
1532                 }
1533
1534                 if (req->rq_phase == RQ_PHASE_UNREG_RPC ||
1535                     req->rq_phase == RQ_PHASE_UNREG_BULK) {
1536                         LASSERT(req->rq_next_phase != req->rq_phase);
1537                         LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
1538
1539                         if (req->rq_req_deadline &&
1540                             !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK))
1541                                 req->rq_req_deadline = 0;
1542                         if (req->rq_reply_deadline &&
1543                             !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK))
1544                                 req->rq_reply_deadline = 0;
1545                         if (req->rq_bulk_deadline &&
1546                             !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK))
1547                                 req->rq_bulk_deadline = 0;
1548
1549                         /*
1550                          * Skip processing until reply is unlinked. We
1551                          * can't return to pool before that and we can't
1552                          * call interpret before that. We need to make
1553                          * sure that all rdma transfers finished and will
1554                          * not corrupt any data.
1555                          */
1556                         if (req->rq_phase == RQ_PHASE_UNREG_RPC &&
1557                             ptlrpc_client_recv_or_unlink(req))
1558                                 continue;
1559                         if (req->rq_phase == RQ_PHASE_UNREG_BULK &&
1560                             ptlrpc_client_bulk_active(req))
1561                                 continue;
1562
1563                         /*
1564                          * Turn fail_loc off to prevent it from looping
1565                          * forever.
1566                          */
1567                         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
1568                                 OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK,
1569                                                      OBD_FAIL_ONCE);
1570                         }
1571                         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) {
1572                                 OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK,
1573                                                      OBD_FAIL_ONCE);
1574                         }
1575
1576                         /* Move to next phase if reply was successfully
1577                          * unlinked.
1578                          */
1579                         ptlrpc_rqphase_move(req, req->rq_next_phase);
1580                 }
1581
1582                 if (req->rq_phase == RQ_PHASE_COMPLETE) {
1583                         list_move_tail(&req->rq_set_chain, &comp_reqs);
1584                         continue;
1585                 }
1586
1587                 if (req->rq_phase == RQ_PHASE_INTERPRET)
1588                         goto interpret;
1589
1590                 /* Note that this also will start async reply unlink. */
1591                 if (req->rq_net_err && !req->rq_timedout) {
1592                         ptlrpc_expire_one_request(req, 1);
1593
1594                         /* Check if we still need to wait for unlink. */
1595                         if (ptlrpc_client_recv_or_unlink(req) ||
1596                             ptlrpc_client_bulk_active(req))
1597                                 continue;
1598                         /* If there is no need to resend, fail it now. */
1599                         if (req->rq_no_resend) {
1600                                 if (req->rq_status == 0)
1601                                         req->rq_status = -EIO;
1602                                 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1603                                 goto interpret;
1604                         } else {
1605                                 continue;
1606                         }
1607                 }
1608
1609                 if (req->rq_err) {
1610                         spin_lock(&req->rq_lock);
1611                         req->rq_replied = 0;
1612                         spin_unlock(&req->rq_lock);
1613                         if (req->rq_status == 0)
1614                                 req->rq_status = -EIO;
1615                         ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1616                         goto interpret;
1617                 }
1618
1619                 /*
1620                  * ptlrpc_set_wait->l_wait_event sets lwi_allow_intr
1621                  * so it sets rq_intr regardless of individual rpc
1622                  * timeouts. The synchronous IO waiting path sets
1623                  * rq_intr irrespective of whether ptlrpcd
1624                  * has seen a timeout.  Our policy is to only interpret
1625                  * interrupted rpcs after they have timed out, so we
1626                  * need to enforce that here.
1627                  */
1628
1629                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting ||
1630                                      req->rq_wait_ctx)) {
1631                         req->rq_status = -EINTR;
1632                         ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1633                         goto interpret;
1634                 }
1635
1636                 if (req->rq_phase == RQ_PHASE_RPC) {
1637                         if (req->rq_timedout || req->rq_resend ||
1638                             req->rq_waiting || req->rq_wait_ctx) {
1639                                 int status;
1640
1641                                 if (!ptlrpc_unregister_reply(req, 1)) {
1642                                         ptlrpc_unregister_bulk(req, 1);
1643                                         continue;
1644                                 }
1645
1646                                 spin_lock(&imp->imp_lock);
1647                                 if (ptlrpc_import_delay_req(imp, req,
1648                                                             &status)) {
1649                                         /*
1650                                          * put on delay list - only if we wait
1651                                          * recovery finished - before send
1652                                          */
1653                                         list_del_init(&req->rq_list);
1654                                         list_add_tail(&req->rq_list,
1655                                                       &imp->imp_delayed_list);
1656                                         spin_unlock(&imp->imp_lock);
1657                                         continue;
1658                                 }
1659
1660                                 if (status != 0) {
1661                                         req->rq_status = status;
1662                                         ptlrpc_rqphase_move(req,
1663                                                             RQ_PHASE_INTERPRET);
1664                                         spin_unlock(&imp->imp_lock);
1665                                         goto interpret;
1666                                 }
1667                                 if (ptlrpc_no_resend(req) &&
1668                                     !req->rq_wait_ctx) {
1669                                         req->rq_status = -ENOTCONN;
1670                                         ptlrpc_rqphase_move(req,
1671                                                             RQ_PHASE_INTERPRET);
1672                                         spin_unlock(&imp->imp_lock);
1673                                         goto interpret;
1674                                 }
1675
1676                                 list_del_init(&req->rq_list);
1677                                 list_add_tail(&req->rq_list,
1678                                               &imp->imp_sending_list);
1679
1680                                 spin_unlock(&imp->imp_lock);
1681
1682                                 spin_lock(&req->rq_lock);
1683                                 req->rq_waiting = 0;
1684                                 spin_unlock(&req->rq_lock);
1685
1686                                 if (req->rq_timedout || req->rq_resend) {
1687                                         /* This is re-sending anyway, let's mark req as resend. */
1688                                         spin_lock(&req->rq_lock);
1689                                         req->rq_resend = 1;
1690                                         spin_unlock(&req->rq_lock);
1691                                         if (req->rq_bulk) {
1692                                                 __u64 old_xid;
1693
1694                                                 if (!ptlrpc_unregister_bulk(req, 1))
1695                                                         continue;
1696
1697                                                 /* ensure previous bulk fails */
1698                                                 old_xid = req->rq_xid;
1699                                                 req->rq_xid = ptlrpc_next_xid();
1700                                                 CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
1701                                                        old_xid, req->rq_xid);
1702                                         }
1703                                 }
1704                                 /*
1705                                  * rq_wait_ctx is only touched by ptlrpcd,
1706                                  * so no lock is needed here.
1707                                  */
1708                                 status = sptlrpc_req_refresh_ctx(req, -1);
1709                                 if (status) {
1710                                         if (req->rq_err) {
1711                                                 req->rq_status = status;
1712                                                 spin_lock(&req->rq_lock);
1713                                                 req->rq_wait_ctx = 0;
1714                                                 spin_unlock(&req->rq_lock);
1715                                                 force_timer_recalc = 1;
1716                                         } else {
1717                                                 spin_lock(&req->rq_lock);
1718                                                 req->rq_wait_ctx = 1;
1719                                                 spin_unlock(&req->rq_lock);
1720                                         }
1721
1722                                         continue;
1723                                 } else {
1724                                         spin_lock(&req->rq_lock);
1725                                         req->rq_wait_ctx = 0;
1726                                         spin_unlock(&req->rq_lock);
1727                                 }
1728
1729                                 rc = ptl_send_rpc(req, 0);
1730                                 if (rc) {
1731                                         DEBUG_REQ(D_HA, req,
1732                                                   "send failed: rc = %d", rc);
1733                                         force_timer_recalc = 1;
1734                                         spin_lock(&req->rq_lock);
1735                                         req->rq_net_err = 1;
1736                                         spin_unlock(&req->rq_lock);
1737                                         continue;
1738                                 }
1739                                 /* need to reset the timeout */
1740                                 force_timer_recalc = 1;
1741                         }
1742
1743                         spin_lock(&req->rq_lock);
1744
1745                         if (ptlrpc_client_early(req)) {
1746                                 ptlrpc_at_recv_early_reply(req);
1747                                 spin_unlock(&req->rq_lock);
1748                                 continue;
1749                         }
1750
1751                         /* Still waiting for a reply? */
1752                         if (ptlrpc_client_recv(req)) {
1753                                 spin_unlock(&req->rq_lock);
1754                                 continue;
1755                         }
1756
1757                         /* Did we actually receive a reply? */
1758                         if (!ptlrpc_client_replied(req)) {
1759                                 spin_unlock(&req->rq_lock);
1760                                 continue;
1761                         }
1762
1763                         spin_unlock(&req->rq_lock);
1764
1765                         /*
1766                          * unlink from net because we are going to
1767                          * swab in-place of reply buffer
1768                          */
1769                         unregistered = ptlrpc_unregister_reply(req, 1);
1770                         if (!unregistered)
1771                                 continue;
1772
1773                         req->rq_status = after_reply(req);
1774                         if (req->rq_resend)
1775                                 continue;
1776
1777                         /*
1778                          * If there is no bulk associated with this request,
1779                          * then we're done and should let the interpreter
1780                          * process the reply. Similarly if the RPC returned
1781                          * an error, and therefore the bulk will never arrive.
1782                          */
1783                         if (!req->rq_bulk || req->rq_status < 0) {
1784                                 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1785                                 goto interpret;
1786                         }
1787
1788                         ptlrpc_rqphase_move(req, RQ_PHASE_BULK);
1789                 }
1790
1791                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
1792                 if (ptlrpc_client_bulk_active(req))
1793                         continue;
1794
1795                 if (req->rq_bulk->bd_failure) {
1796                         /*
1797                          * The RPC reply arrived OK, but the bulk screwed
1798                          * up!  Dead weird since the server told us the RPC
1799                          * was good after getting the REPLY for her GET or
1800                          * the ACK for her PUT.
1801                          */
1802                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1803                         req->rq_status = -EIO;
1804                 }
1805
1806                 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1807
1808 interpret:
1809                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
1810
1811                 /*
1812                  * This moves to "unregistering" phase we need to wait for
1813                  * reply unlink.
1814                  */
1815                 if (!unregistered && !ptlrpc_unregister_reply(req, 1)) {
1816                         /* start async bulk unlink too */
1817                         ptlrpc_unregister_bulk(req, 1);
1818                         continue;
1819                 }
1820
1821                 if (!ptlrpc_unregister_bulk(req, 1))
1822                         continue;
1823
1824                 /* When calling interpret receive should already be finished. */
1825                 LASSERT(!req->rq_receiving_reply);
1826
1827                 ptlrpc_req_interpret(env, req, req->rq_status);
1828
1829                 if (ptlrpcd_check_work(req)) {
1830                         atomic_dec(&set->set_remaining);
1831                         continue;
1832                 }
1833                 ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
1834
1835                 CDEBUG(req->rq_reqmsg ? D_RPCTRACE : 0,
1836                        "Completed RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
1837                        current_comm(), imp->imp_obd->obd_uuid.uuid,
1838                        lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1839                        libcfs_nid2str(imp->imp_connection->c_peer.nid),
1840                        lustre_msg_get_opc(req->rq_reqmsg));
1841
1842                 spin_lock(&imp->imp_lock);
1843                 /*
1844                  * Request already may be not on sending or delaying list. This
1845                  * may happen in the case of marking it erroneous for the case
1846                  * ptlrpc_import_delay_req(req, status) find it impossible to
1847                  * allow sending this rpc and returns *status != 0.
1848                  */
1849                 if (!list_empty(&req->rq_list)) {
1850                         list_del_init(&req->rq_list);
1851                         atomic_dec(&imp->imp_inflight);
1852                 }
1853                 spin_unlock(&imp->imp_lock);
1854
1855                 atomic_dec(&set->set_remaining);
1856                 wake_up_all(&imp->imp_recovery_waitq);
1857
1858                 if (set->set_producer) {
1859                         /* produce a new request if possible */
1860                         if (ptlrpc_set_producer(set) > 0)
1861                                 force_timer_recalc = 1;
1862
1863                         /*
1864                          * free the request that has just been completed
1865                          * in order not to pollute set->set_requests
1866                          */
1867                         list_del_init(&req->rq_set_chain);
1868                         spin_lock(&req->rq_lock);
1869                         req->rq_set = NULL;
1870                         req->rq_invalid_rqset = 0;
1871                         spin_unlock(&req->rq_lock);
1872
1873                         /* record rq_status to compute the final status later */
1874                         if (req->rq_status != 0)
1875                                 set->set_rc = req->rq_status;
1876                         ptlrpc_req_finished(req);
1877                 } else {
1878                         list_move_tail(&req->rq_set_chain, &comp_reqs);
1879                 }
1880         }
1881
1882         /*
1883          * move completed request at the head of list so it's easier for
1884          * caller to find them
1885          */
1886         list_splice(&comp_reqs, &set->set_requests);
1887
1888         /* If we hit an error, we want to recover promptly. */
1889         return atomic_read(&set->set_remaining) == 0 || force_timer_recalc;
1890 }
1891 EXPORT_SYMBOL(ptlrpc_check_set);
1892
1893 /**
1894  * Time out request \a req. is \a async_unlink is set, that means do not wait
1895  * until LNet actually confirms network buffer unlinking.
1896  * Return 1 if we should give up further retrying attempts or 0 otherwise.
1897  */
1898 int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
1899 {
1900         struct obd_import *imp = req->rq_import;
1901         int rc = 0;
1902
1903         spin_lock(&req->rq_lock);
1904         req->rq_timedout = 1;
1905         spin_unlock(&req->rq_lock);
1906
1907         DEBUG_REQ(D_WARNING, req, "Request sent has %s: [sent %lld/real %lld]",
1908                   req->rq_net_err ? "failed due to network error" :
1909                      ((req->rq_real_sent == 0 ||
1910                        req->rq_real_sent < req->rq_sent ||
1911                        req->rq_real_sent >= req->rq_deadline) ?
1912                       "timed out for sent delay" : "timed out for slow reply"),
1913                   (s64)req->rq_sent, (s64)req->rq_real_sent);
1914
1915         if (imp && obd_debug_peer_on_timeout)
1916                 LNetDebugPeer(imp->imp_connection->c_peer);
1917
1918         ptlrpc_unregister_reply(req, async_unlink);
1919         ptlrpc_unregister_bulk(req, async_unlink);
1920
1921         if (obd_dump_on_timeout)
1922                 libcfs_debug_dumplog();
1923
1924         if (!imp) {
1925                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
1926                 return 1;
1927         }
1928
1929         atomic_inc(&imp->imp_timeouts);
1930
1931         /* The DLM server doesn't want recovery run on its imports. */
1932         if (imp->imp_dlm_fake)
1933                 return 1;
1934
1935         /*
1936          * If this request is for recovery or other primordial tasks,
1937          * then error it out here.
1938          */
1939         if (req->rq_ctx_init || req->rq_ctx_fini ||
1940             req->rq_send_state != LUSTRE_IMP_FULL ||
1941             imp->imp_obd->obd_no_recov) {
1942                 DEBUG_REQ(D_RPCTRACE, req, "err -110, sent_state=%s (now=%s)",
1943                           ptlrpc_import_state_name(req->rq_send_state),
1944                           ptlrpc_import_state_name(imp->imp_state));
1945                 spin_lock(&req->rq_lock);
1946                 req->rq_status = -ETIMEDOUT;
1947                 req->rq_err = 1;
1948                 spin_unlock(&req->rq_lock);
1949                 return 1;
1950         }
1951
1952         /*
1953          * if a request can't be resent we can't wait for an answer after
1954          * the timeout
1955          */
1956         if (ptlrpc_no_resend(req)) {
1957                 DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:");
1958                 rc = 1;
1959         }
1960
1961         ptlrpc_fail_import(imp, lustre_msg_get_conn_cnt(req->rq_reqmsg));
1962
1963         return rc;
1964 }
1965
1966 /**
1967  * Time out all uncompleted requests in request set pointed by \a data
1968  * Callback used when waiting on sets with l_wait_event.
1969  * Always returns 1.
1970  */
1971 int ptlrpc_expired_set(void *data)
1972 {
1973         struct ptlrpc_request_set *set = data;
1974         struct list_head *tmp;
1975         time64_t now = ktime_get_real_seconds();
1976
1977         /* A timeout expired. See which reqs it applies to...  */
1978         list_for_each(tmp, &set->set_requests) {
1979                 struct ptlrpc_request *req =
1980                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1981
1982                 /* don't expire request waiting for context */
1983                 if (req->rq_wait_ctx)
1984                         continue;
1985
1986                 /* Request in-flight? */
1987                 if (!((req->rq_phase == RQ_PHASE_RPC &&
1988                        !req->rq_waiting && !req->rq_resend) ||
1989                       (req->rq_phase == RQ_PHASE_BULK)))
1990                         continue;
1991
1992                 if (req->rq_timedout ||     /* already dealt with */
1993                     req->rq_deadline > now) /* not expired */
1994                         continue;
1995
1996                 /*
1997                  * Deal with this guy. Do it asynchronously to not block
1998                  * ptlrpcd thread.
1999                  */
2000                 ptlrpc_expire_one_request(req, 1);
2001         }
2002
2003         /*
2004          * When waiting for a whole set, we always break out of the
2005          * sleep so we can recalculate the timeout, or enable interrupts
2006          * if everyone's timed out.
2007          */
2008         return 1;
2009 }
2010
2011 /**
2012  * Sets rq_intr flag in \a req under spinlock.
2013  */
2014 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
2015 {
2016         spin_lock(&req->rq_lock);
2017         req->rq_intr = 1;
2018         spin_unlock(&req->rq_lock);
2019 }
2020 EXPORT_SYMBOL(ptlrpc_mark_interrupted);
2021
2022 /**
2023  * Interrupts (sets interrupted flag) all uncompleted requests in
2024  * a set \a data. Callback for l_wait_event for interruptible waits.
2025  */
2026 static void ptlrpc_interrupted_set(void *data)
2027 {
2028         struct ptlrpc_request_set *set = data;
2029         struct list_head *tmp;
2030
2031         CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set);
2032
2033         list_for_each(tmp, &set->set_requests) {
2034                 struct ptlrpc_request *req =
2035                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2036
2037                 if (req->rq_phase != RQ_PHASE_RPC &&
2038                     req->rq_phase != RQ_PHASE_UNREG_RPC)
2039                         continue;
2040
2041                 ptlrpc_mark_interrupted(req);
2042         }
2043 }
2044
2045 /**
2046  * Get the smallest timeout in the set; this does NOT set a timeout.
2047  */
2048 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
2049 {
2050         struct list_head *tmp;
2051         time64_t now = ktime_get_real_seconds();
2052         int timeout = 0;
2053         struct ptlrpc_request *req;
2054         time64_t deadline;
2055
2056         list_for_each(tmp, &set->set_requests) {
2057                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2058
2059                 /* Request in-flight? */
2060                 if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
2061                       (req->rq_phase == RQ_PHASE_BULK) ||
2062                       (req->rq_phase == RQ_PHASE_NEW)))
2063                         continue;
2064
2065                 /* Already timed out. */
2066                 if (req->rq_timedout)
2067                         continue;
2068
2069                 /* Waiting for ctx. */
2070                 if (req->rq_wait_ctx)
2071                         continue;
2072
2073                 if (req->rq_phase == RQ_PHASE_NEW)
2074                         deadline = req->rq_sent;
2075                 else if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend)
2076                         deadline = req->rq_sent;
2077                 else
2078                         deadline = req->rq_sent + req->rq_timeout;
2079
2080                 if (deadline <= now)    /* actually expired already */
2081                         timeout = 1;    /* ASAP */
2082                 else if (timeout == 0 || timeout > deadline - now)
2083                         timeout = deadline - now;
2084         }
2085         return timeout;
2086 }
2087
2088 /**
2089  * Send all unset request from the set and then wait until all
2090  * requests in the set complete (either get a reply, timeout, get an
2091  * error or otherwise be interrupted).
2092  * Returns 0 on success or error code otherwise.
2093  */
2094 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
2095 {
2096         struct list_head *tmp;
2097         struct ptlrpc_request *req;
2098         struct l_wait_info lwi;
2099         int rc, timeout;
2100
2101         if (set->set_producer)
2102                 (void)ptlrpc_set_producer(set);
2103         else
2104                 list_for_each(tmp, &set->set_requests) {
2105                         req = list_entry(tmp, struct ptlrpc_request,
2106                                          rq_set_chain);
2107                         if (req->rq_phase == RQ_PHASE_NEW)
2108                                 (void)ptlrpc_send_new_req(req);
2109                 }
2110
2111         if (list_empty(&set->set_requests))
2112                 return 0;
2113
2114         do {
2115                 timeout = ptlrpc_set_next_timeout(set);
2116
2117                 /*
2118                  * wait until all complete, interrupted, or an in-flight
2119                  * req times out
2120                  */
2121                 CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n",
2122                        set, timeout);
2123
2124                 if (timeout == 0 && !signal_pending(current))
2125                         /*
2126                          * No requests are in-flight (ether timed out
2127                          * or delayed), so we can allow interrupts.
2128                          * We still want to block for a limited time,
2129                          * so we allow interrupts during the timeout.
2130                          */
2131                         lwi = LWI_TIMEOUT_INTR_ALL(cfs_time_seconds(1),
2132                                                    ptlrpc_expired_set,
2133                                                    ptlrpc_interrupted_set, set);
2134                 else
2135                         /*
2136                          * At least one request is in flight, so no
2137                          * interrupts are allowed. Wait until all
2138                          * complete, or an in-flight req times out.
2139                          */
2140                         lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
2141                                           ptlrpc_expired_set, set);
2142
2143                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi);
2144
2145                 /*
2146                  * LU-769 - if we ignored the signal because it was already
2147                  * pending when we started, we need to handle it now or we risk
2148                  * it being ignored forever
2149                  */
2150                 if (rc == -ETIMEDOUT && !lwi.lwi_allow_intr &&
2151                     signal_pending(current)) {
2152                         sigset_t blocked_sigs =
2153                                            cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
2154
2155                         /*
2156                          * In fact we only interrupt for the "fatal" signals
2157                          * like SIGINT or SIGKILL. We still ignore less
2158                          * important signals since ptlrpc set is not easily
2159                          * reentrant from userspace again
2160                          */
2161                         if (signal_pending(current))
2162                                 ptlrpc_interrupted_set(set);
2163                         cfs_restore_sigs(blocked_sigs);
2164                 }
2165
2166                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
2167
2168                 /*
2169                  * -EINTR => all requests have been flagged rq_intr so next
2170                  * check completes.
2171                  * -ETIMEDOUT => someone timed out.  When all reqs have
2172                  * timed out, signals are enabled allowing completion with
2173                  * EINTR.
2174                  * I don't really care if we go once more round the loop in
2175                  * the error cases -eeb.
2176                  */
2177                 if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
2178                         list_for_each(tmp, &set->set_requests) {
2179                                 req = list_entry(tmp, struct ptlrpc_request,
2180                                                  rq_set_chain);
2181                                 spin_lock(&req->rq_lock);
2182                                 req->rq_invalid_rqset = 1;
2183                                 spin_unlock(&req->rq_lock);
2184                         }
2185                 }
2186         } while (rc != 0 || atomic_read(&set->set_remaining) != 0);
2187
2188         LASSERT(atomic_read(&set->set_remaining) == 0);
2189
2190         rc = set->set_rc; /* rq_status of already freed requests if any */
2191         list_for_each(tmp, &set->set_requests) {
2192                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2193
2194                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
2195                 if (req->rq_status != 0)
2196                         rc = req->rq_status;
2197         }
2198
2199         if (set->set_interpret) {
2200                 int (*interpreter)(struct ptlrpc_request_set *set, void *, int) =
2201                         set->set_interpret;
2202                 rc = interpreter(set, set->set_arg, rc);
2203         } else {
2204                 struct ptlrpc_set_cbdata *cbdata, *n;
2205                 int err;
2206
2207                 list_for_each_entry_safe(cbdata, n,
2208                                          &set->set_cblist, psc_item) {
2209                         list_del_init(&cbdata->psc_item);
2210                         err = cbdata->psc_interpret(set, cbdata->psc_data, rc);
2211                         if (err && !rc)
2212                                 rc = err;
2213                         kfree(cbdata);
2214                 }
2215         }
2216
2217         return rc;
2218 }
2219 EXPORT_SYMBOL(ptlrpc_set_wait);
2220
2221 /**
2222  * Helper function for request freeing.
2223  * Called when request count reached zero and request needs to be freed.
2224  * Removes request from all sorts of sending/replay lists it might be on,
2225  * frees network buffers if any are present.
2226  * If \a locked is set, that means caller is already holding import imp_lock
2227  * and so we no longer need to reobtain it (for certain lists manipulations)
2228  */
2229 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
2230 {
2231         if (!request)
2232                 return;
2233         LASSERT(!request->rq_srv_req);
2234         LASSERT(!request->rq_export);
2235         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
2236         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
2237         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
2238         LASSERTF(!request->rq_replay, "req %p\n", request);
2239
2240         req_capsule_fini(&request->rq_pill);
2241
2242         /*
2243          * We must take it off the imp_replay_list first.  Otherwise, we'll set
2244          * request->rq_reqmsg to NULL while osc_close is dereferencing it.
2245          */
2246         if (request->rq_import) {
2247                 if (!locked)
2248                         spin_lock(&request->rq_import->imp_lock);
2249                 list_del_init(&request->rq_replay_list);
2250                 if (!locked)
2251                         spin_unlock(&request->rq_import->imp_lock);
2252         }
2253         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
2254
2255         if (atomic_read(&request->rq_refcount) != 0) {
2256                 DEBUG_REQ(D_ERROR, request,
2257                           "freeing request with nonzero refcount");
2258                 LBUG();
2259         }
2260
2261         if (request->rq_repbuf)
2262                 sptlrpc_cli_free_repbuf(request);
2263
2264         if (request->rq_import) {
2265                 class_import_put(request->rq_import);
2266                 request->rq_import = NULL;
2267         }
2268         if (request->rq_bulk)
2269                 ptlrpc_free_bulk_pin(request->rq_bulk);
2270
2271         if (request->rq_reqbuf || request->rq_clrbuf)
2272                 sptlrpc_cli_free_reqbuf(request);
2273
2274         if (request->rq_cli_ctx)
2275                 sptlrpc_req_put_ctx(request, !locked);
2276
2277         if (request->rq_pool)
2278                 __ptlrpc_free_req_to_pool(request);
2279         else
2280                 ptlrpc_request_cache_free(request);
2281 }
2282
2283 /**
2284  * Helper function
2285  * Drops one reference count for request \a request.
2286  * \a locked set indicates that caller holds import imp_lock.
2287  * Frees the request when reference count reaches zero.
2288  */
2289 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
2290 {
2291         if (!request)
2292                 return 1;
2293
2294         if (request == LP_POISON ||
2295             request->rq_reqmsg == LP_POISON) {
2296                 CERROR("dereferencing freed request (bug 575)\n");
2297                 LBUG();
2298                 return 1;
2299         }
2300
2301         DEBUG_REQ(D_INFO, request, "refcount now %u",
2302                   atomic_read(&request->rq_refcount) - 1);
2303
2304         if (atomic_dec_and_test(&request->rq_refcount)) {
2305                 __ptlrpc_free_req(request, locked);
2306                 return 1;
2307         }
2308
2309         return 0;
2310 }
2311
2312 /**
2313  * Drops one reference count for a request.
2314  */
2315 void ptlrpc_req_finished(struct ptlrpc_request *request)
2316 {
2317         __ptlrpc_req_finished(request, 0);
2318 }
2319 EXPORT_SYMBOL(ptlrpc_req_finished);
2320
2321 /**
2322  * Returns xid of a \a request
2323  */
2324 __u64 ptlrpc_req_xid(struct ptlrpc_request *request)
2325 {
2326         return request->rq_xid;
2327 }
2328 EXPORT_SYMBOL(ptlrpc_req_xid);
2329
2330 /**
2331  * Disengage the client's reply buffer from the network
2332  * NB does _NOT_ unregister any client-side bulk.
2333  * IDEMPOTENT, but _not_ safe against concurrent callers.
2334  * The request owner (i.e. the thread doing the I/O) must call...
2335  * Returns 0 on success or 1 if unregistering cannot be made.
2336  */
2337 static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
2338 {
2339         int rc;
2340         wait_queue_head_t *wq;
2341         struct l_wait_info lwi;
2342
2343         /* Might sleep. */
2344         LASSERT(!in_interrupt());
2345
2346         /* Let's setup deadline for reply unlink. */
2347         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
2348             async && request->rq_reply_deadline == 0 && cfs_fail_val == 0)
2349                 request->rq_reply_deadline =
2350                         ktime_get_real_seconds() + LONG_UNLINK;
2351
2352         /* Nothing left to do. */
2353         if (!ptlrpc_client_recv_or_unlink(request))
2354                 return 1;
2355
2356         LNetMDUnlink(request->rq_reply_md_h);
2357
2358         /* Let's check it once again. */
2359         if (!ptlrpc_client_recv_or_unlink(request))
2360                 return 1;
2361
2362         /* Move to "Unregistering" phase as reply was not unlinked yet. */
2363         ptlrpc_rqphase_move(request, RQ_PHASE_UNREG_RPC);
2364
2365         /* Do not wait for unlink to finish. */
2366         if (async)
2367                 return 0;
2368
2369         /*
2370          * We have to l_wait_event() whatever the result, to give liblustre
2371          * a chance to run reply_in_callback(), and to make sure we've
2372          * unlinked before returning a req to the pool.
2373          */
2374         if (request->rq_set)
2375                 wq = &request->rq_set->set_waitq;
2376         else
2377                 wq = &request->rq_reply_waitq;
2378
2379         for (;;) {
2380                 /*
2381                  * Network access will complete in finite time but the HUGE
2382                  * timeout lets us CWARN for visibility of sluggish NALs
2383                  */
2384                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
2385                                            cfs_time_seconds(1), NULL, NULL);
2386                 rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
2387                                   &lwi);
2388                 if (rc == 0) {
2389                         ptlrpc_rqphase_move(request, request->rq_next_phase);
2390                         return 1;
2391                 }
2392
2393                 LASSERT(rc == -ETIMEDOUT);
2394                 DEBUG_REQ(D_WARNING, request,
2395                           "Unexpectedly long timeout receiving_reply=%d req_ulinked=%d reply_unlinked=%d",
2396                           request->rq_receiving_reply,
2397                           request->rq_req_unlinked,
2398                           request->rq_reply_unlinked);
2399         }
2400         return 0;
2401 }
2402
2403 static void ptlrpc_free_request(struct ptlrpc_request *req)
2404 {
2405         spin_lock(&req->rq_lock);
2406         req->rq_replay = 0;
2407         spin_unlock(&req->rq_lock);
2408
2409         if (req->rq_commit_cb)
2410                 req->rq_commit_cb(req);
2411         list_del_init(&req->rq_replay_list);
2412
2413         __ptlrpc_req_finished(req, 1);
2414 }
2415
2416 /**
2417  * the request is committed and dropped from the replay list of its import
2418  */
2419 void ptlrpc_request_committed(struct ptlrpc_request *req, int force)
2420 {
2421         struct obd_import       *imp = req->rq_import;
2422
2423         spin_lock(&imp->imp_lock);
2424         if (list_empty(&req->rq_replay_list)) {
2425                 spin_unlock(&imp->imp_lock);
2426                 return;
2427         }
2428
2429         if (force || req->rq_transno <= imp->imp_peer_committed_transno)
2430                 ptlrpc_free_request(req);
2431
2432         spin_unlock(&imp->imp_lock);
2433 }
2434 EXPORT_SYMBOL(ptlrpc_request_committed);
2435
2436 /**
2437  * Iterates through replay_list on import and prunes
2438  * all requests have transno smaller than last_committed for the
2439  * import and don't have rq_replay set.
2440  * Since requests are sorted in transno order, stops when meeting first
2441  * transno bigger than last_committed.
2442  * caller must hold imp->imp_lock
2443  */
2444 void ptlrpc_free_committed(struct obd_import *imp)
2445 {
2446         struct ptlrpc_request *req, *saved;
2447         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
2448         bool skip_committed_list = true;
2449
2450         assert_spin_locked(&imp->imp_lock);
2451
2452         if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked &&
2453             imp->imp_generation == imp->imp_last_generation_checked) {
2454                 CDEBUG(D_INFO, "%s: skip recheck: last_committed %llu\n",
2455                        imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
2456                 return;
2457         }
2458         CDEBUG(D_RPCTRACE, "%s: committing for last_committed %llu gen %d\n",
2459                imp->imp_obd->obd_name, imp->imp_peer_committed_transno,
2460                imp->imp_generation);
2461
2462         if (imp->imp_generation != imp->imp_last_generation_checked ||
2463             !imp->imp_last_transno_checked)
2464                 skip_committed_list = false;
2465
2466         imp->imp_last_transno_checked = imp->imp_peer_committed_transno;
2467         imp->imp_last_generation_checked = imp->imp_generation;
2468
2469         list_for_each_entry_safe(req, saved, &imp->imp_replay_list,
2470                                  rq_replay_list) {
2471                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
2472                 LASSERT(req != last_req);
2473                 last_req = req;
2474
2475                 if (req->rq_transno == 0) {
2476                         DEBUG_REQ(D_EMERG, req, "zero transno during replay");
2477                         LBUG();
2478                 }
2479                 if (req->rq_import_generation < imp->imp_generation) {
2480                         DEBUG_REQ(D_RPCTRACE, req, "free request with old gen");
2481                         goto free_req;
2482                 }
2483
2484                 /* not yet committed */
2485                 if (req->rq_transno > imp->imp_peer_committed_transno) {
2486                         DEBUG_REQ(D_RPCTRACE, req, "stopping search");
2487                         break;
2488                 }
2489
2490                 if (req->rq_replay) {
2491                         DEBUG_REQ(D_RPCTRACE, req, "keeping (FL_REPLAY)");
2492                         list_move_tail(&req->rq_replay_list,
2493                                        &imp->imp_committed_list);
2494                         continue;
2495                 }
2496
2497                 DEBUG_REQ(D_INFO, req, "commit (last_committed %llu)",
2498                           imp->imp_peer_committed_transno);
2499 free_req:
2500                 ptlrpc_free_request(req);
2501         }
2502         if (skip_committed_list)
2503                 return;
2504
2505         list_for_each_entry_safe(req, saved, &imp->imp_committed_list,
2506                                  rq_replay_list) {
2507                 LASSERT(req->rq_transno != 0);
2508                 if (req->rq_import_generation < imp->imp_generation) {
2509                         DEBUG_REQ(D_RPCTRACE, req, "free stale open request");
2510                         ptlrpc_free_request(req);
2511                 } else if (!req->rq_replay) {
2512                         DEBUG_REQ(D_RPCTRACE, req, "free closed open request");
2513                         ptlrpc_free_request(req);
2514                 }
2515         }
2516 }
2517
2518 /**
2519  * Schedule previously sent request for resend.
2520  * For bulk requests we assign new xid (to avoid problems with
2521  * lost replies and therefore several transfers landing into same buffer
2522  * from different sending attempts).
2523  */
2524 void ptlrpc_resend_req(struct ptlrpc_request *req)
2525 {
2526         DEBUG_REQ(D_HA, req, "going to resend");
2527         spin_lock(&req->rq_lock);
2528
2529         /*
2530          * Request got reply but linked to the import list still.
2531          * Let ptlrpc_check_set() to process it.
2532          */
2533         if (ptlrpc_client_replied(req)) {
2534                 spin_unlock(&req->rq_lock);
2535                 DEBUG_REQ(D_HA, req, "it has reply, so skip it");
2536                 return;
2537         }
2538
2539         lustre_msg_set_handle(req->rq_reqmsg, &(struct lustre_handle){ 0 });
2540         req->rq_status = -EAGAIN;
2541
2542         req->rq_resend = 1;
2543         req->rq_net_err = 0;
2544         req->rq_timedout = 0;
2545         if (req->rq_bulk) {
2546                 __u64 old_xid = req->rq_xid;
2547
2548                 /* ensure previous bulk fails */
2549                 req->rq_xid = ptlrpc_next_xid();
2550                 CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
2551                        old_xid, req->rq_xid);
2552         }
2553         ptlrpc_client_wake_req(req);
2554         spin_unlock(&req->rq_lock);
2555 }
2556
2557 /**
2558  * Grab additional reference on a request \a req
2559  */
2560 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
2561 {
2562         atomic_inc(&req->rq_refcount);
2563         return req;
2564 }
2565 EXPORT_SYMBOL(ptlrpc_request_addref);
2566
2567 /**
2568  * Add a request to import replay_list.
2569  * Must be called under imp_lock
2570  */
2571 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
2572                                       struct obd_import *imp)
2573 {
2574         struct list_head *tmp;
2575
2576         assert_spin_locked(&imp->imp_lock);
2577
2578         if (req->rq_transno == 0) {
2579                 DEBUG_REQ(D_EMERG, req, "saving request with zero transno");
2580                 LBUG();
2581         }
2582
2583         /*
2584          * clear this for new requests that were resent as well
2585          * as resent replayed requests.
2586          */
2587         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2588
2589         /* don't re-add requests that have been replayed */
2590         if (!list_empty(&req->rq_replay_list))
2591                 return;
2592
2593         lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
2594
2595         LASSERT(imp->imp_replayable);
2596         /* Balanced in ptlrpc_free_committed, usually. */
2597         ptlrpc_request_addref(req);
2598         list_for_each_prev(tmp, &imp->imp_replay_list) {
2599                 struct ptlrpc_request *iter =
2600                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
2601
2602                 /*
2603                  * We may have duplicate transnos if we create and then
2604                  * open a file, or for closes retained if to match creating
2605                  * opens, so use req->rq_xid as a secondary key.
2606                  * (See bugs 684, 685, and 428.)
2607                  * XXX no longer needed, but all opens need transnos!
2608                  */
2609                 if (iter->rq_transno > req->rq_transno)
2610                         continue;
2611
2612                 if (iter->rq_transno == req->rq_transno) {
2613                         LASSERT(iter->rq_xid != req->rq_xid);
2614                         if (iter->rq_xid > req->rq_xid)
2615                                 continue;
2616                 }
2617
2618                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
2619                 return;
2620         }
2621
2622         list_add(&req->rq_replay_list, &imp->imp_replay_list);
2623 }
2624
2625 /**
2626  * Send request and wait until it completes.
2627  * Returns request processing status.
2628  */
2629 int ptlrpc_queue_wait(struct ptlrpc_request *req)
2630 {
2631         struct ptlrpc_request_set *set;
2632         int rc;
2633
2634         LASSERT(!req->rq_set);
2635         LASSERT(!req->rq_receiving_reply);
2636
2637         set = ptlrpc_prep_set();
2638         if (!set) {
2639                 CERROR("cannot allocate ptlrpc set: rc = %d\n", -ENOMEM);
2640                 return -ENOMEM;
2641         }
2642
2643         /* for distributed debugging */
2644         lustre_msg_set_status(req->rq_reqmsg, current_pid());
2645
2646         /* add a ref for the set (see comment in ptlrpc_set_add_req) */
2647         ptlrpc_request_addref(req);
2648         ptlrpc_set_add_req(set, req);
2649         rc = ptlrpc_set_wait(set);
2650         ptlrpc_set_destroy(set);
2651
2652         return rc;
2653 }
2654 EXPORT_SYMBOL(ptlrpc_queue_wait);
2655
2656 /**
2657  * Callback used for replayed requests reply processing.
2658  * In case of successful reply calls registered request replay callback.
2659  * In case of error restart replay process.
2660  */
2661 static int ptlrpc_replay_interpret(const struct lu_env *env,
2662                                    struct ptlrpc_request *req,
2663                                    void *data, int rc)
2664 {
2665         struct ptlrpc_replay_async_args *aa = data;
2666         struct obd_import *imp = req->rq_import;
2667
2668         atomic_dec(&imp->imp_replay_inflight);
2669
2670         if (!ptlrpc_client_replied(req)) {
2671                 CERROR("request replay timed out, restarting recovery\n");
2672                 rc = -ETIMEDOUT;
2673                 goto out;
2674         }
2675
2676         if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR &&
2677             (lustre_msg_get_status(req->rq_repmsg) == -ENOTCONN ||
2678              lustre_msg_get_status(req->rq_repmsg) == -ENODEV)) {
2679                 rc = lustre_msg_get_status(req->rq_repmsg);
2680                 goto out;
2681         }
2682
2683         /** VBR: check version failure */
2684         if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) {
2685                 /** replay was failed due to version mismatch */
2686                 DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n");
2687                 spin_lock(&imp->imp_lock);
2688                 imp->imp_vbr_failed = 1;
2689                 imp->imp_no_lock_replay = 1;
2690                 spin_unlock(&imp->imp_lock);
2691                 lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
2692         } else {
2693                 /** The transno had better not change over replay. */
2694                 LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) ==
2695                          lustre_msg_get_transno(req->rq_repmsg) ||
2696                          lustre_msg_get_transno(req->rq_repmsg) == 0,
2697                          "%#llx/%#llx\n",
2698                          lustre_msg_get_transno(req->rq_reqmsg),
2699                          lustre_msg_get_transno(req->rq_repmsg));
2700         }
2701
2702         spin_lock(&imp->imp_lock);
2703         /** if replays by version then gap occur on server, no trust to locks */
2704         if (lustre_msg_get_flags(req->rq_repmsg) & MSG_VERSION_REPLAY)
2705                 imp->imp_no_lock_replay = 1;
2706         imp->imp_last_replay_transno = lustre_msg_get_transno(req->rq_reqmsg);
2707         spin_unlock(&imp->imp_lock);
2708         LASSERT(imp->imp_last_replay_transno);
2709
2710         /* transaction number shouldn't be bigger than the latest replayed */
2711         if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) {
2712                 DEBUG_REQ(D_ERROR, req,
2713                           "Reported transno %llu is bigger than the replayed one: %llu",
2714                           req->rq_transno,
2715                           lustre_msg_get_transno(req->rq_reqmsg));
2716                 rc = -EINVAL;
2717                 goto out;
2718         }
2719
2720         DEBUG_REQ(D_HA, req, "got rep");
2721
2722         /* let the callback do fixups, possibly including in the request */
2723         if (req->rq_replay_cb)
2724                 req->rq_replay_cb(req);
2725
2726         if (ptlrpc_client_replied(req) &&
2727             lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) {
2728                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
2729                           lustre_msg_get_status(req->rq_repmsg),
2730                           aa->praa_old_status);
2731         } else {
2732                 /* Put it back for re-replay. */
2733                 lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
2734         }
2735
2736         /*
2737          * Errors while replay can set transno to 0, but
2738          * imp_last_replay_transno shouldn't be set to 0 anyway
2739          */
2740         if (req->rq_transno == 0)
2741                 CERROR("Transno is 0 during replay!\n");
2742
2743         /* continue with recovery */
2744         rc = ptlrpc_import_recovery_state_machine(imp);
2745  out:
2746         req->rq_send_state = aa->praa_old_state;
2747
2748         if (rc != 0)
2749                 /* this replay failed, so restart recovery */
2750                 ptlrpc_connect_import(imp);
2751
2752         return rc;
2753 }
2754
2755 /**
2756  * Prepares and queues request for replay.
2757  * Adds it to ptlrpcd queue for actual sending.
2758  * Returns 0 on success.
2759  */
2760 int ptlrpc_replay_req(struct ptlrpc_request *req)
2761 {
2762         struct ptlrpc_replay_async_args *aa;
2763
2764         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
2765
2766         LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2767         aa = ptlrpc_req_async_args(req);
2768         memset(aa, 0, sizeof(*aa));
2769
2770         /* Prepare request to be resent with ptlrpcd */
2771         aa->praa_old_state = req->rq_send_state;
2772         req->rq_send_state = LUSTRE_IMP_REPLAY;
2773         req->rq_phase = RQ_PHASE_NEW;
2774         req->rq_next_phase = RQ_PHASE_UNDEFINED;
2775         if (req->rq_repmsg)
2776                 aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
2777         req->rq_status = 0;
2778         req->rq_interpret_reply = ptlrpc_replay_interpret;
2779         /* Readjust the timeout for current conditions */
2780         ptlrpc_at_set_req_timeout(req);
2781
2782         /*
2783          * Tell server the net_latency, so the server can calculate how long
2784          * it should wait for next replay
2785          */
2786         lustre_msg_set_service_time(req->rq_reqmsg,
2787                                     ptlrpc_at_get_net_latency(req));
2788         DEBUG_REQ(D_HA, req, "REPLAY");
2789
2790         atomic_inc(&req->rq_import->imp_replay_inflight);
2791         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
2792
2793         ptlrpcd_add_req(req);
2794         return 0;
2795 }
2796
2797 /**
2798  * Aborts all in-flight request on import \a imp sending and delayed lists
2799  */
2800 void ptlrpc_abort_inflight(struct obd_import *imp)
2801 {
2802         struct list_head *tmp, *n;
2803
2804         /*
2805          * Make sure that no new requests get processed for this import.
2806          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
2807          * this flag and then putting requests on sending_list or delayed_list.
2808          */
2809         spin_lock(&imp->imp_lock);
2810
2811         /*
2812          * XXX locking?  Maybe we should remove each request with the list
2813          * locked?  Also, how do we know if the requests on the list are
2814          * being freed at this time?
2815          */
2816         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
2817                 struct ptlrpc_request *req =
2818                         list_entry(tmp, struct ptlrpc_request, rq_list);
2819
2820                 DEBUG_REQ(D_RPCTRACE, req, "inflight");
2821
2822                 spin_lock(&req->rq_lock);
2823                 if (req->rq_import_generation < imp->imp_generation) {
2824                         req->rq_err = 1;
2825                         req->rq_status = -EIO;
2826                         ptlrpc_client_wake_req(req);
2827                 }
2828                 spin_unlock(&req->rq_lock);
2829         }
2830
2831         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
2832                 struct ptlrpc_request *req =
2833                         list_entry(tmp, struct ptlrpc_request, rq_list);
2834
2835                 DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
2836
2837                 spin_lock(&req->rq_lock);
2838                 if (req->rq_import_generation < imp->imp_generation) {
2839                         req->rq_err = 1;
2840                         req->rq_status = -EIO;
2841                         ptlrpc_client_wake_req(req);
2842                 }
2843                 spin_unlock(&req->rq_lock);
2844         }
2845
2846         /*
2847          * Last chance to free reqs left on the replay list, but we
2848          * will still leak reqs that haven't committed.
2849          */
2850         if (imp->imp_replayable)
2851                 ptlrpc_free_committed(imp);
2852
2853         spin_unlock(&imp->imp_lock);
2854 }
2855
2856 /**
2857  * Abort all uncompleted requests in request set \a set
2858  */
2859 void ptlrpc_abort_set(struct ptlrpc_request_set *set)
2860 {
2861         struct list_head *tmp, *pos;
2862
2863         list_for_each_safe(pos, tmp, &set->set_requests) {
2864                 struct ptlrpc_request *req =
2865                         list_entry(pos, struct ptlrpc_request, rq_set_chain);
2866
2867                 spin_lock(&req->rq_lock);
2868                 if (req->rq_phase != RQ_PHASE_RPC) {
2869                         spin_unlock(&req->rq_lock);
2870                         continue;
2871                 }
2872
2873                 req->rq_err = 1;
2874                 req->rq_status = -EINTR;
2875                 ptlrpc_client_wake_req(req);
2876                 spin_unlock(&req->rq_lock);
2877         }
2878 }
2879
2880 static __u64 ptlrpc_last_xid;
2881 static spinlock_t ptlrpc_last_xid_lock;
2882
2883 /**
2884  * Initialize the XID for the node.  This is common among all requests on
2885  * this node, and only requires the property that it is monotonically
2886  * increasing.  It does not need to be sequential.  Since this is also used
2887  * as the RDMA match bits, it is important that a single client NOT have
2888  * the same match bits for two different in-flight requests, hence we do
2889  * NOT want to have an XID per target or similar.
2890  *
2891  * To avoid an unlikely collision between match bits after a client reboot
2892  * (which would deliver old data into the wrong RDMA buffer) initialize
2893  * the XID based on the current time, assuming a maximum RPC rate of 1M RPC/s.
2894  * If the time is clearly incorrect, we instead use a 62-bit random number.
2895  * In the worst case the random number will overflow 1M RPCs per second in
2896  * 9133 years, or permutations thereof.
2897  */
2898 #define YEAR_2004 (1ULL << 30)
2899 void ptlrpc_init_xid(void)
2900 {
2901         time64_t now = ktime_get_real_seconds();
2902
2903         spin_lock_init(&ptlrpc_last_xid_lock);
2904         if (now < YEAR_2004) {
2905                 cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
2906                 ptlrpc_last_xid >>= 2;
2907                 ptlrpc_last_xid |= (1ULL << 61);
2908         } else {
2909                 ptlrpc_last_xid = (__u64)now << 20;
2910         }
2911
2912         /* Always need to be aligned to a power-of-two for multi-bulk BRW */
2913         CLASSERT(((PTLRPC_BULK_OPS_COUNT - 1) & PTLRPC_BULK_OPS_COUNT) == 0);
2914         ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK;
2915 }
2916
2917 /**
2918  * Increase xid and returns resulting new value to the caller.
2919  *
2920  * Multi-bulk BRW RPCs consume multiple XIDs for each bulk transfer, starting
2921  * at the returned xid, up to xid + PTLRPC_BULK_OPS_COUNT - 1. The BRW RPC
2922  * itself uses the last bulk xid needed, so the server can determine the
2923  * the number of bulk transfers from the RPC XID and a bitmask.  The starting
2924  * xid must align to a power-of-two value.
2925  *
2926  * This is assumed to be true due to the initial ptlrpc_last_xid
2927  * value also being initialized to a power-of-two value. LU-1431
2928  */
2929 __u64 ptlrpc_next_xid(void)
2930 {
2931         __u64 next;
2932
2933         spin_lock(&ptlrpc_last_xid_lock);
2934         next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
2935         ptlrpc_last_xid = next;
2936         spin_unlock(&ptlrpc_last_xid_lock);
2937
2938         return next;
2939 }
2940
2941 /**
2942  * Get a glimpse at what next xid value might have been.
2943  * Returns possible next xid.
2944  */
2945 __u64 ptlrpc_sample_next_xid(void)
2946 {
2947 #if BITS_PER_LONG == 32
2948         /* need to avoid possible word tearing on 32-bit systems */
2949         __u64 next;
2950
2951         spin_lock(&ptlrpc_last_xid_lock);
2952         next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
2953         spin_unlock(&ptlrpc_last_xid_lock);
2954
2955         return next;
2956 #else
2957         /* No need to lock, since returned value is racy anyways */
2958         return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
2959 #endif
2960 }
2961 EXPORT_SYMBOL(ptlrpc_sample_next_xid);
2962
2963 /**
2964  * Functions for operating ptlrpc workers.
2965  *
2966  * A ptlrpc work is a function which will be running inside ptlrpc context.
2967  * The callback shouldn't sleep otherwise it will block that ptlrpcd thread.
2968  *
2969  * 1. after a work is created, it can be used many times, that is:
2970  *       handler = ptlrpcd_alloc_work();
2971  *       ptlrpcd_queue_work();
2972  *
2973  *    queue it again when necessary:
2974  *       ptlrpcd_queue_work();
2975  *       ptlrpcd_destroy_work();
2976  * 2. ptlrpcd_queue_work() can be called by multiple processes meanwhile, but
2977  *    it will only be queued once in any time. Also as its name implies, it may
2978  *    have delay before it really runs by ptlrpcd thread.
2979  */
2980 struct ptlrpc_work_async_args {
2981         int (*cb)(const struct lu_env *, void *);
2982         void *cbdata;
2983 };
2984
2985 static void ptlrpcd_add_work_req(struct ptlrpc_request *req)
2986 {
2987         /* re-initialize the req */
2988         req->rq_timeout         = obd_timeout;
2989         req->rq_sent            = ktime_get_real_seconds();
2990         req->rq_deadline        = req->rq_sent + req->rq_timeout;
2991         req->rq_phase           = RQ_PHASE_INTERPRET;
2992         req->rq_next_phase      = RQ_PHASE_COMPLETE;
2993         req->rq_xid             = ptlrpc_next_xid();
2994         req->rq_import_generation = req->rq_import->imp_generation;
2995
2996         ptlrpcd_add_req(req);
2997 }
2998
2999 static int work_interpreter(const struct lu_env *env,
3000                             struct ptlrpc_request *req, void *data, int rc)
3001 {
3002         struct ptlrpc_work_async_args *arg = data;
3003
3004         LASSERT(ptlrpcd_check_work(req));
3005
3006         rc = arg->cb(env, arg->cbdata);
3007
3008         list_del_init(&req->rq_set_chain);
3009         req->rq_set = NULL;
3010
3011         if (atomic_dec_return(&req->rq_refcount) > 1) {
3012                 atomic_set(&req->rq_refcount, 2);
3013                 ptlrpcd_add_work_req(req);
3014         }
3015         return rc;
3016 }
3017
3018 static int worker_format;
3019
3020 static int ptlrpcd_check_work(struct ptlrpc_request *req)
3021 {
3022         return req->rq_pill.rc_fmt == (void *)&worker_format;
3023 }
3024
3025 /**
3026  * Create a work for ptlrpc.
3027  */
3028 void *ptlrpcd_alloc_work(struct obd_import *imp,
3029                          int (*cb)(const struct lu_env *, void *), void *cbdata)
3030 {
3031         struct ptlrpc_request    *req = NULL;
3032         struct ptlrpc_work_async_args *args;
3033
3034         might_sleep();
3035
3036         if (!cb)
3037                 return ERR_PTR(-EINVAL);
3038
3039         /* copy some code from deprecated fakereq. */
3040         req = ptlrpc_request_cache_alloc(GFP_NOFS);
3041         if (!req) {
3042                 CERROR("ptlrpc: run out of memory!\n");
3043                 return ERR_PTR(-ENOMEM);
3044         }
3045
3046         ptlrpc_cli_req_init(req);
3047
3048         req->rq_send_state = LUSTRE_IMP_FULL;
3049         req->rq_type = PTL_RPC_MSG_REQUEST;
3050         req->rq_import = class_import_get(imp);
3051         req->rq_interpret_reply = work_interpreter;
3052         /* don't want reply */
3053         req->rq_no_delay = 1;
3054         req->rq_no_resend = 1;
3055         req->rq_pill.rc_fmt = (void *)&worker_format;
3056
3057         CLASSERT(sizeof(*args) <= sizeof(req->rq_async_args));
3058         args = ptlrpc_req_async_args(req);
3059         args->cb = cb;
3060         args->cbdata = cbdata;
3061
3062         return req;
3063 }
3064 EXPORT_SYMBOL(ptlrpcd_alloc_work);
3065
3066 void ptlrpcd_destroy_work(void *handler)
3067 {
3068         struct ptlrpc_request *req = handler;
3069
3070         if (req)
3071                 ptlrpc_req_finished(req);
3072 }
3073 EXPORT_SYMBOL(ptlrpcd_destroy_work);
3074
3075 int ptlrpcd_queue_work(void *handler)
3076 {
3077         struct ptlrpc_request *req = handler;
3078
3079         /*
3080          * Check if the req is already being queued.
3081          *
3082          * Here comes a trick: it lacks a way of checking if a req is being
3083          * processed reliably in ptlrpc. Here I have to use refcount of req
3084          * for this purpose. This is okay because the caller should use this
3085          * req as opaque data. - Jinshan
3086          */
3087         LASSERT(atomic_read(&req->rq_refcount) > 0);
3088         if (atomic_inc_return(&req->rq_refcount) == 2)
3089                 ptlrpcd_add_work_req(req);
3090         return 0;
3091 }
3092 EXPORT_SYMBOL(ptlrpcd_queue_work);