Linux-libre 4.9.46-gnu
[librecmc/linux-libre.git] / drivers / staging / lustre / lnet / klnds / o2iblnd / o2iblnd_cb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/klnds/o2iblnd/o2iblnd_cb.c
33  *
34  * Author: Eric Barton <eric@bartonsoftware.com>
35  */
36
37 #include "o2iblnd.h"
38
39 #define MAX_CONN_RACES_BEFORE_ABORT 20
40
41 static void kiblnd_peer_alive(struct kib_peer *peer);
42 static void kiblnd_peer_connect_failed(struct kib_peer *peer, int active, int error);
43 static void kiblnd_init_tx_msg(lnet_ni_t *ni, struct kib_tx *tx,
44                                int type, int body_nob);
45 static int kiblnd_init_rdma(struct kib_conn *conn, struct kib_tx *tx, int type,
46                             int resid, struct kib_rdma_desc *dstrd,
47                             __u64 dstcookie);
48 static void kiblnd_queue_tx_locked(struct kib_tx *tx, struct kib_conn *conn);
49 static void kiblnd_queue_tx(struct kib_tx *tx, struct kib_conn *conn);
50 static void kiblnd_unmap_tx(lnet_ni_t *ni, struct kib_tx *tx);
51 static void kiblnd_check_sends_locked(struct kib_conn *conn);
52
53 static void
54 kiblnd_tx_done(lnet_ni_t *ni, struct kib_tx *tx)
55 {
56         lnet_msg_t *lntmsg[2];
57         struct kib_net *net = ni->ni_data;
58         int rc;
59         int i;
60
61         LASSERT(net);
62         LASSERT(!in_interrupt());
63         LASSERT(!tx->tx_queued);               /* mustn't be queued for sending */
64         LASSERT(!tx->tx_sending);         /* mustn't be awaiting sent callback */
65         LASSERT(!tx->tx_waiting);             /* mustn't be awaiting peer response */
66         LASSERT(tx->tx_pool);
67
68         kiblnd_unmap_tx(ni, tx);
69
70         /* tx may have up to 2 lnet msgs to finalise */
71         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
72         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
73         rc = tx->tx_status;
74
75         if (tx->tx_conn) {
76                 LASSERT(ni == tx->tx_conn->ibc_peer->ibp_ni);
77
78                 kiblnd_conn_decref(tx->tx_conn);
79                 tx->tx_conn = NULL;
80         }
81
82         tx->tx_nwrq = 0;
83         tx->tx_status = 0;
84
85         kiblnd_pool_free_node(&tx->tx_pool->tpo_pool, &tx->tx_list);
86
87         /* delay finalize until my descs have been freed */
88         for (i = 0; i < 2; i++) {
89                 if (!lntmsg[i])
90                         continue;
91
92                 lnet_finalize(ni, lntmsg[i], rc);
93         }
94 }
95
96 void
97 kiblnd_txlist_done(lnet_ni_t *ni, struct list_head *txlist, int status)
98 {
99         struct kib_tx *tx;
100
101         while (!list_empty(txlist)) {
102                 tx = list_entry(txlist->next, struct kib_tx, tx_list);
103
104                 list_del(&tx->tx_list);
105                 /* complete now */
106                 tx->tx_waiting = 0;
107                 tx->tx_status = status;
108                 kiblnd_tx_done(ni, tx);
109         }
110 }
111
112 static struct kib_tx *
113 kiblnd_get_idle_tx(lnet_ni_t *ni, lnet_nid_t target)
114 {
115         struct kib_net *net = (struct kib_net *)ni->ni_data;
116         struct list_head *node;
117         struct kib_tx *tx;
118         struct kib_tx_poolset *tps;
119
120         tps = net->ibn_tx_ps[lnet_cpt_of_nid(target)];
121         node = kiblnd_pool_alloc_node(&tps->tps_poolset);
122         if (!node)
123                 return NULL;
124         tx = list_entry(node, struct kib_tx, tx_list);
125
126         LASSERT(!tx->tx_nwrq);
127         LASSERT(!tx->tx_queued);
128         LASSERT(!tx->tx_sending);
129         LASSERT(!tx->tx_waiting);
130         LASSERT(!tx->tx_status);
131         LASSERT(!tx->tx_conn);
132         LASSERT(!tx->tx_lntmsg[0]);
133         LASSERT(!tx->tx_lntmsg[1]);
134         LASSERT(!tx->tx_nfrags);
135
136         return tx;
137 }
138
139 static void
140 kiblnd_drop_rx(struct kib_rx *rx)
141 {
142         struct kib_conn *conn = rx->rx_conn;
143         struct kib_sched_info *sched = conn->ibc_sched;
144         unsigned long flags;
145
146         spin_lock_irqsave(&sched->ibs_lock, flags);
147         LASSERT(conn->ibc_nrx > 0);
148         conn->ibc_nrx--;
149         spin_unlock_irqrestore(&sched->ibs_lock, flags);
150
151         kiblnd_conn_decref(conn);
152 }
153
154 int
155 kiblnd_post_rx(struct kib_rx *rx, int credit)
156 {
157         struct kib_conn *conn = rx->rx_conn;
158         struct kib_net *net = conn->ibc_peer->ibp_ni->ni_data;
159         struct ib_recv_wr *bad_wrq = NULL;
160         struct ib_mr *mr = conn->ibc_hdev->ibh_mrs;
161         int rc;
162
163         LASSERT(net);
164         LASSERT(!in_interrupt());
165         LASSERT(credit == IBLND_POSTRX_NO_CREDIT ||
166                 credit == IBLND_POSTRX_PEER_CREDIT ||
167                 credit == IBLND_POSTRX_RSRVD_CREDIT);
168         LASSERT(mr);
169
170         rx->rx_sge.lkey   = mr->lkey;
171         rx->rx_sge.addr   = rx->rx_msgaddr;
172         rx->rx_sge.length = IBLND_MSG_SIZE;
173
174         rx->rx_wrq.next    = NULL;
175         rx->rx_wrq.sg_list = &rx->rx_sge;
176         rx->rx_wrq.num_sge = 1;
177         rx->rx_wrq.wr_id   = kiblnd_ptr2wreqid(rx, IBLND_WID_RX);
178
179         LASSERT(conn->ibc_state >= IBLND_CONN_INIT);
180         LASSERT(rx->rx_nob >= 0);             /* not posted */
181
182         if (conn->ibc_state > IBLND_CONN_ESTABLISHED) {
183                 kiblnd_drop_rx(rx);          /* No more posts for this rx */
184                 return 0;
185         }
186
187         rx->rx_nob = -1;                        /* flag posted */
188
189         /* NB: need an extra reference after ib_post_recv because we don't
190          * own this rx (and rx::rx_conn) anymore, LU-5678.
191          */
192         kiblnd_conn_addref(conn);
193         rc = ib_post_recv(conn->ibc_cmid->qp, &rx->rx_wrq, &bad_wrq);
194         if (unlikely(rc)) {
195                 CERROR("Can't post rx for %s: %d, bad_wrq: %p\n",
196                        libcfs_nid2str(conn->ibc_peer->ibp_nid), rc, bad_wrq);
197                 rx->rx_nob = 0;
198         }
199
200         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) /* Initial post */
201                 goto out;
202
203         if (unlikely(rc)) {
204                 kiblnd_close_conn(conn, rc);
205                 kiblnd_drop_rx(rx);          /* No more posts for this rx */
206                 goto out;
207         }
208
209         if (credit == IBLND_POSTRX_NO_CREDIT)
210                 goto out;
211
212         spin_lock(&conn->ibc_lock);
213         if (credit == IBLND_POSTRX_PEER_CREDIT)
214                 conn->ibc_outstanding_credits++;
215         else
216                 conn->ibc_reserved_credits++;
217         kiblnd_check_sends_locked(conn);
218         spin_unlock(&conn->ibc_lock);
219
220 out:
221         kiblnd_conn_decref(conn);
222         return rc;
223 }
224
225 static struct kib_tx *
226 kiblnd_find_waiting_tx_locked(struct kib_conn *conn, int txtype, __u64 cookie)
227 {
228         struct list_head *tmp;
229
230         list_for_each(tmp, &conn->ibc_active_txs) {
231                 struct kib_tx *tx = list_entry(tmp, struct kib_tx, tx_list);
232
233                 LASSERT(!tx->tx_queued);
234                 LASSERT(tx->tx_sending || tx->tx_waiting);
235
236                 if (tx->tx_cookie != cookie)
237                         continue;
238
239                 if (tx->tx_waiting &&
240                     tx->tx_msg->ibm_type == txtype)
241                         return tx;
242
243                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
244                       tx->tx_waiting ? "" : "NOT ",
245                       tx->tx_msg->ibm_type, txtype);
246         }
247         return NULL;
248 }
249
250 static void
251 kiblnd_handle_completion(struct kib_conn *conn, int txtype, int status, __u64 cookie)
252 {
253         struct kib_tx *tx;
254         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
255         int idle;
256
257         spin_lock(&conn->ibc_lock);
258
259         tx = kiblnd_find_waiting_tx_locked(conn, txtype, cookie);
260         if (!tx) {
261                 spin_unlock(&conn->ibc_lock);
262
263                 CWARN("Unmatched completion type %x cookie %#llx from %s\n",
264                       txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
265                 kiblnd_close_conn(conn, -EPROTO);
266                 return;
267         }
268
269         if (!tx->tx_status) {          /* success so far */
270                 if (status < 0) /* failed? */
271                         tx->tx_status = status;
272                 else if (txtype == IBLND_MSG_GET_REQ)
273                         lnet_set_reply_msg_len(ni, tx->tx_lntmsg[1], status);
274         }
275
276         tx->tx_waiting = 0;
277
278         idle = !tx->tx_queued && !tx->tx_sending;
279         if (idle)
280                 list_del(&tx->tx_list);
281
282         spin_unlock(&conn->ibc_lock);
283
284         if (idle)
285                 kiblnd_tx_done(ni, tx);
286 }
287
288 static void
289 kiblnd_send_completion(struct kib_conn *conn, int type, int status, __u64 cookie)
290 {
291         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
292         struct kib_tx *tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
293
294         if (!tx) {
295                 CERROR("Can't get tx for completion %x for %s\n",
296                        type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
297                 return;
298         }
299
300         tx->tx_msg->ibm_u.completion.ibcm_status = status;
301         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
302         kiblnd_init_tx_msg(ni, tx, type, sizeof(struct kib_completion_msg));
303
304         kiblnd_queue_tx(tx, conn);
305 }
306
307 static void
308 kiblnd_handle_rx(struct kib_rx *rx)
309 {
310         struct kib_msg *msg = rx->rx_msg;
311         struct kib_conn *conn = rx->rx_conn;
312         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
313         int credits = msg->ibm_credits;
314         struct kib_tx *tx;
315         int rc = 0;
316         int rc2;
317         int post_credit;
318
319         LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
320
321         CDEBUG(D_NET, "Received %x[%d] from %s\n",
322                msg->ibm_type, credits,
323                libcfs_nid2str(conn->ibc_peer->ibp_nid));
324
325         if (credits) {
326                 /* Have I received credits that will let me send? */
327                 spin_lock(&conn->ibc_lock);
328
329                 if (conn->ibc_credits + credits >
330                     conn->ibc_queue_depth) {
331                         rc2 = conn->ibc_credits;
332                         spin_unlock(&conn->ibc_lock);
333
334                         CERROR("Bad credits from %s: %d + %d > %d\n",
335                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
336                                rc2, credits, conn->ibc_queue_depth);
337
338                         kiblnd_close_conn(conn, -EPROTO);
339                         kiblnd_post_rx(rx, IBLND_POSTRX_NO_CREDIT);
340                         return;
341                 }
342
343                 conn->ibc_credits += credits;
344
345                 /* This ensures the credit taken by NOOP can be returned */
346                 if (msg->ibm_type == IBLND_MSG_NOOP &&
347                     !IBLND_OOB_CAPABLE(conn->ibc_version)) /* v1 only */
348                         conn->ibc_outstanding_credits++;
349
350                 kiblnd_check_sends_locked(conn);
351                 spin_unlock(&conn->ibc_lock);
352         }
353
354         switch (msg->ibm_type) {
355         default:
356                 CERROR("Bad IBLND message type %x from %s\n",
357                        msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
358                 post_credit = IBLND_POSTRX_NO_CREDIT;
359                 rc = -EPROTO;
360                 break;
361
362         case IBLND_MSG_NOOP:
363                 if (IBLND_OOB_CAPABLE(conn->ibc_version)) {
364                         post_credit = IBLND_POSTRX_NO_CREDIT;
365                         break;
366                 }
367
368                 if (credits) /* credit already posted */
369                         post_credit = IBLND_POSTRX_NO_CREDIT;
370                 else          /* a keepalive NOOP */
371                         post_credit = IBLND_POSTRX_PEER_CREDIT;
372                 break;
373
374         case IBLND_MSG_IMMEDIATE:
375                 post_credit = IBLND_POSTRX_DONT_POST;
376                 rc = lnet_parse(ni, &msg->ibm_u.immediate.ibim_hdr,
377                                 msg->ibm_srcnid, rx, 0);
378                 if (rc < 0)                  /* repost on error */
379                         post_credit = IBLND_POSTRX_PEER_CREDIT;
380                 break;
381
382         case IBLND_MSG_PUT_REQ:
383                 post_credit = IBLND_POSTRX_DONT_POST;
384                 rc = lnet_parse(ni, &msg->ibm_u.putreq.ibprm_hdr,
385                                 msg->ibm_srcnid, rx, 1);
386                 if (rc < 0)                  /* repost on error */
387                         post_credit = IBLND_POSTRX_PEER_CREDIT;
388                 break;
389
390         case IBLND_MSG_PUT_NAK:
391                 CWARN("PUT_NACK from %s\n",
392                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
393                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
394                 kiblnd_handle_completion(conn, IBLND_MSG_PUT_REQ,
395                                          msg->ibm_u.completion.ibcm_status,
396                                          msg->ibm_u.completion.ibcm_cookie);
397                 break;
398
399         case IBLND_MSG_PUT_ACK:
400                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
401
402                 spin_lock(&conn->ibc_lock);
403                 tx = kiblnd_find_waiting_tx_locked(conn, IBLND_MSG_PUT_REQ,
404                                                    msg->ibm_u.putack.ibpam_src_cookie);
405                 if (tx)
406                         list_del(&tx->tx_list);
407                 spin_unlock(&conn->ibc_lock);
408
409                 if (!tx) {
410                         CERROR("Unmatched PUT_ACK from %s\n",
411                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
412                         rc = -EPROTO;
413                         break;
414                 }
415
416                 LASSERT(tx->tx_waiting);
417                 /*
418                  * CAVEAT EMPTOR: I could be racing with tx_complete, but...
419                  * (a) I can overwrite tx_msg since my peer has received it!
420                  * (b) tx_waiting set tells tx_complete() it's not done.
421                  */
422                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
423
424                 rc2 = kiblnd_init_rdma(conn, tx, IBLND_MSG_PUT_DONE,
425                                        kiblnd_rd_size(&msg->ibm_u.putack.ibpam_rd),
426                                        &msg->ibm_u.putack.ibpam_rd,
427                                        msg->ibm_u.putack.ibpam_dst_cookie);
428                 if (rc2 < 0)
429                         CERROR("Can't setup rdma for PUT to %s: %d\n",
430                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
431
432                 spin_lock(&conn->ibc_lock);
433                 tx->tx_waiting = 0;     /* clear waiting and queue atomically */
434                 kiblnd_queue_tx_locked(tx, conn);
435                 spin_unlock(&conn->ibc_lock);
436                 break;
437
438         case IBLND_MSG_PUT_DONE:
439                 post_credit = IBLND_POSTRX_PEER_CREDIT;
440                 kiblnd_handle_completion(conn, IBLND_MSG_PUT_ACK,
441                                          msg->ibm_u.completion.ibcm_status,
442                                          msg->ibm_u.completion.ibcm_cookie);
443                 break;
444
445         case IBLND_MSG_GET_REQ:
446                 post_credit = IBLND_POSTRX_DONT_POST;
447                 rc = lnet_parse(ni, &msg->ibm_u.get.ibgm_hdr,
448                                 msg->ibm_srcnid, rx, 1);
449                 if (rc < 0)                  /* repost on error */
450                         post_credit = IBLND_POSTRX_PEER_CREDIT;
451                 break;
452
453         case IBLND_MSG_GET_DONE:
454                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
455                 kiblnd_handle_completion(conn, IBLND_MSG_GET_REQ,
456                                          msg->ibm_u.completion.ibcm_status,
457                                          msg->ibm_u.completion.ibcm_cookie);
458                 break;
459         }
460
461         if (rc < 0)                          /* protocol error */
462                 kiblnd_close_conn(conn, rc);
463
464         if (post_credit != IBLND_POSTRX_DONT_POST)
465                 kiblnd_post_rx(rx, post_credit);
466 }
467
468 static void
469 kiblnd_rx_complete(struct kib_rx *rx, int status, int nob)
470 {
471         struct kib_msg *msg = rx->rx_msg;
472         struct kib_conn *conn = rx->rx_conn;
473         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
474         struct kib_net *net = ni->ni_data;
475         int rc;
476         int err = -EIO;
477
478         LASSERT(net);
479         LASSERT(rx->rx_nob < 0);               /* was posted */
480         rx->rx_nob = 0;                  /* isn't now */
481
482         if (conn->ibc_state > IBLND_CONN_ESTABLISHED)
483                 goto ignore;
484
485         if (status != IB_WC_SUCCESS) {
486                 CNETERR("Rx from %s failed: %d\n",
487                         libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
488                 goto failed;
489         }
490
491         LASSERT(nob >= 0);
492         rx->rx_nob = nob;
493
494         rc = kiblnd_unpack_msg(msg, rx->rx_nob);
495         if (rc) {
496                 CERROR("Error %d unpacking rx from %s\n",
497                        rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
498                 goto failed;
499         }
500
501         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
502             msg->ibm_dstnid != ni->ni_nid ||
503             msg->ibm_srcstamp != conn->ibc_incarnation ||
504             msg->ibm_dststamp != net->ibn_incarnation) {
505                 CERROR("Stale rx from %s\n",
506                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
507                 err = -ESTALE;
508                 goto failed;
509         }
510
511         /* set time last known alive */
512         kiblnd_peer_alive(conn->ibc_peer);
513
514         /* racing with connection establishment/teardown! */
515
516         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
517                 rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
518                 unsigned long flags;
519
520                 write_lock_irqsave(g_lock, flags);
521                 /* must check holding global lock to eliminate race */
522                 if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
523                         list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
524                         write_unlock_irqrestore(g_lock, flags);
525                         return;
526                 }
527                 write_unlock_irqrestore(g_lock, flags);
528         }
529         kiblnd_handle_rx(rx);
530         return;
531
532  failed:
533         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
534         kiblnd_close_conn(conn, err);
535  ignore:
536         kiblnd_drop_rx(rx);                  /* Don't re-post rx. */
537 }
538
539 static struct page *
540 kiblnd_kvaddr_to_page(unsigned long vaddr)
541 {
542         struct page *page;
543
544         if (is_vmalloc_addr((void *)vaddr)) {
545                 page = vmalloc_to_page((void *)vaddr);
546                 LASSERT(page);
547                 return page;
548         }
549 #ifdef CONFIG_HIGHMEM
550         if (vaddr >= PKMAP_BASE &&
551             vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
552                 /* No highmem pages only used for bulk (kiov) I/O */
553                 CERROR("find page for address in highmem\n");
554                 LBUG();
555         }
556 #endif
557         page = virt_to_page(vaddr);
558         LASSERT(page);
559         return page;
560 }
561
562 static int
563 kiblnd_fmr_map_tx(struct kib_net *net, struct kib_tx *tx, struct kib_rdma_desc *rd, __u32 nob)
564 {
565         struct kib_hca_dev *hdev;
566         struct kib_fmr_poolset *fps;
567         int cpt;
568         int rc;
569
570         LASSERT(tx->tx_pool);
571         LASSERT(tx->tx_pool->tpo_pool.po_owner);
572
573         hdev = tx->tx_pool->tpo_hdev;
574         cpt = tx->tx_pool->tpo_pool.po_owner->ps_cpt;
575
576         fps = net->ibn_fmr_ps[cpt];
577         rc = kiblnd_fmr_pool_map(fps, tx, rd, nob, 0, &tx->fmr);
578         if (rc) {
579                 CERROR("Can't map %u bytes: %d\n", nob, rc);
580                 return rc;
581         }
582
583         /*
584          * If rd is not tx_rd, it's going to get sent to a peer, who will need
585          * the rkey
586          */
587         rd->rd_key = tx->fmr.fmr_key;
588         rd->rd_frags[0].rf_addr &= ~hdev->ibh_page_mask;
589         rd->rd_frags[0].rf_nob = nob;
590         rd->rd_nfrags = 1;
591
592         return 0;
593 }
594
595 static void kiblnd_unmap_tx(lnet_ni_t *ni, struct kib_tx *tx)
596 {
597         struct kib_net *net = ni->ni_data;
598
599         LASSERT(net);
600
601         if (net->ibn_fmr_ps)
602                 kiblnd_fmr_pool_unmap(&tx->fmr, tx->tx_status);
603
604         if (tx->tx_nfrags) {
605                 kiblnd_dma_unmap_sg(tx->tx_pool->tpo_hdev->ibh_ibdev,
606                                     tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
607                 tx->tx_nfrags = 0;
608         }
609 }
610
611 static int kiblnd_map_tx(lnet_ni_t *ni, struct kib_tx *tx, struct kib_rdma_desc *rd,
612                          int nfrags)
613 {
614         struct kib_net *net = ni->ni_data;
615         struct kib_hca_dev *hdev = net->ibn_dev->ibd_hdev;
616         struct ib_mr *mr    = NULL;
617         __u32 nob;
618         int i;
619
620         /*
621          * If rd is not tx_rd, it's going to get sent to a peer and I'm the
622          * RDMA sink
623          */
624         tx->tx_dmadir = (rd != tx->tx_rd) ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
625         tx->tx_nfrags = nfrags;
626
627         rd->rd_nfrags = kiblnd_dma_map_sg(hdev->ibh_ibdev, tx->tx_frags,
628                                           tx->tx_nfrags, tx->tx_dmadir);
629
630         for (i = 0, nob = 0; i < rd->rd_nfrags; i++) {
631                 rd->rd_frags[i].rf_nob  = kiblnd_sg_dma_len(
632                         hdev->ibh_ibdev, &tx->tx_frags[i]);
633                 rd->rd_frags[i].rf_addr = kiblnd_sg_dma_address(
634                         hdev->ibh_ibdev, &tx->tx_frags[i]);
635                 nob += rd->rd_frags[i].rf_nob;
636         }
637
638         mr = kiblnd_find_rd_dma_mr(ni, rd, tx->tx_conn ?
639                                    tx->tx_conn->ibc_max_frags : -1);
640         if (mr) {
641                 /* found pre-mapping MR */
642                 rd->rd_key = (rd != tx->tx_rd) ? mr->rkey : mr->lkey;
643                 return 0;
644         }
645
646         if (net->ibn_fmr_ps)
647                 return kiblnd_fmr_map_tx(net, tx, rd, nob);
648
649         return -EINVAL;
650 }
651
652 static int
653 kiblnd_setup_rd_iov(lnet_ni_t *ni, struct kib_tx *tx, struct kib_rdma_desc *rd,
654                     unsigned int niov, const struct kvec *iov, int offset, int nob)
655 {
656         struct kib_net *net = ni->ni_data;
657         struct page *page;
658         struct scatterlist *sg;
659         unsigned long vaddr;
660         int fragnob;
661         int page_offset;
662
663         LASSERT(nob > 0);
664         LASSERT(niov > 0);
665         LASSERT(net);
666
667         while (offset >= iov->iov_len) {
668                 offset -= iov->iov_len;
669                 niov--;
670                 iov++;
671                 LASSERT(niov > 0);
672         }
673
674         sg = tx->tx_frags;
675         do {
676                 LASSERT(niov > 0);
677
678                 vaddr = ((unsigned long)iov->iov_base) + offset;
679                 page_offset = vaddr & (PAGE_SIZE - 1);
680                 page = kiblnd_kvaddr_to_page(vaddr);
681                 if (!page) {
682                         CERROR("Can't find page\n");
683                         return -EFAULT;
684                 }
685
686                 fragnob = min((int)(iov->iov_len - offset), nob);
687                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
688
689                 sg_set_page(sg, page, fragnob, page_offset);
690                 sg = sg_next(sg);
691                 if (!sg) {
692                         CERROR("lacking enough sg entries to map tx\n");
693                         return -EFAULT;
694                 }
695
696                 if (offset + fragnob < iov->iov_len) {
697                         offset += fragnob;
698                 } else {
699                         offset = 0;
700                         iov++;
701                         niov--;
702                 }
703                 nob -= fragnob;
704         } while (nob > 0);
705
706         return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
707 }
708
709 static int
710 kiblnd_setup_rd_kiov(lnet_ni_t *ni, struct kib_tx *tx, struct kib_rdma_desc *rd,
711                      int nkiov, const lnet_kiov_t *kiov, int offset, int nob)
712 {
713         struct kib_net *net = ni->ni_data;
714         struct scatterlist *sg;
715         int fragnob;
716
717         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
718
719         LASSERT(nob > 0);
720         LASSERT(nkiov > 0);
721         LASSERT(net);
722
723         while (offset >= kiov->bv_len) {
724                 offset -= kiov->bv_len;
725                 nkiov--;
726                 kiov++;
727                 LASSERT(nkiov > 0);
728         }
729
730         sg = tx->tx_frags;
731         do {
732                 LASSERT(nkiov > 0);
733
734                 fragnob = min((int)(kiov->bv_len - offset), nob);
735
736                 sg_set_page(sg, kiov->bv_page, fragnob,
737                             kiov->bv_offset + offset);
738                 sg = sg_next(sg);
739                 if (!sg) {
740                         CERROR("lacking enough sg entries to map tx\n");
741                         return -EFAULT;
742                 }
743
744                 offset = 0;
745                 kiov++;
746                 nkiov--;
747                 nob -= fragnob;
748         } while (nob > 0);
749
750         return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
751 }
752
753 static int
754 kiblnd_post_tx_locked(struct kib_conn *conn, struct kib_tx *tx, int credit)
755         __must_hold(&conn->ibc_lock)
756 {
757         struct kib_msg *msg = tx->tx_msg;
758         struct kib_peer *peer = conn->ibc_peer;
759         struct lnet_ni *ni = peer->ibp_ni;
760         int ver = conn->ibc_version;
761         int rc;
762         int done;
763
764         LASSERT(tx->tx_queued);
765         /* We rely on this for QP sizing */
766         LASSERT(tx->tx_nwrq > 0);
767
768         LASSERT(!credit || credit == 1);
769         LASSERT(conn->ibc_outstanding_credits >= 0);
770         LASSERT(conn->ibc_outstanding_credits <= conn->ibc_queue_depth);
771         LASSERT(conn->ibc_credits >= 0);
772         LASSERT(conn->ibc_credits <= conn->ibc_queue_depth);
773
774         if (conn->ibc_nsends_posted == kiblnd_concurrent_sends(ver, ni)) {
775                 /* tx completions outstanding... */
776                 CDEBUG(D_NET, "%s: posted enough\n",
777                        libcfs_nid2str(peer->ibp_nid));
778                 return -EAGAIN;
779         }
780
781         if (credit && !conn->ibc_credits) {   /* no credits */
782                 CDEBUG(D_NET, "%s: no credits\n",
783                        libcfs_nid2str(peer->ibp_nid));
784                 return -EAGAIN;
785         }
786
787         if (credit && !IBLND_OOB_CAPABLE(ver) &&
788             conn->ibc_credits == 1 &&   /* last credit reserved */
789             msg->ibm_type != IBLND_MSG_NOOP) {      /* for NOOP */
790                 CDEBUG(D_NET, "%s: not using last credit\n",
791                        libcfs_nid2str(peer->ibp_nid));
792                 return -EAGAIN;
793         }
794
795         /* NB don't drop ibc_lock before bumping tx_sending */
796         list_del(&tx->tx_list);
797         tx->tx_queued = 0;
798
799         if (msg->ibm_type == IBLND_MSG_NOOP &&
800             (!kiblnd_need_noop(conn) ||     /* redundant NOOP */
801              (IBLND_OOB_CAPABLE(ver) && /* posted enough NOOP */
802               conn->ibc_noops_posted == IBLND_OOB_MSGS(ver)))) {
803                 /*
804                  * OK to drop when posted enough NOOPs, since
805                  * kiblnd_check_sends_locked will queue NOOP again when
806                  * posted NOOPs complete
807                  */
808                 spin_unlock(&conn->ibc_lock);
809                 kiblnd_tx_done(peer->ibp_ni, tx);
810                 spin_lock(&conn->ibc_lock);
811                 CDEBUG(D_NET, "%s(%d): redundant or enough NOOP\n",
812                        libcfs_nid2str(peer->ibp_nid),
813                        conn->ibc_noops_posted);
814                 return 0;
815         }
816
817         kiblnd_pack_msg(peer->ibp_ni, msg, ver, conn->ibc_outstanding_credits,
818                         peer->ibp_nid, conn->ibc_incarnation);
819
820         conn->ibc_credits -= credit;
821         conn->ibc_outstanding_credits = 0;
822         conn->ibc_nsends_posted++;
823         if (msg->ibm_type == IBLND_MSG_NOOP)
824                 conn->ibc_noops_posted++;
825
826         /*
827          * CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
828          * PUT.  If so, it was first queued here as a PUT_REQ, sent and
829          * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
830          * and then re-queued here.  It's (just) possible that
831          * tx_sending is non-zero if we've not done the tx_complete()
832          * from the first send; hence the ++ rather than = below.
833          */
834         tx->tx_sending++;
835         list_add(&tx->tx_list, &conn->ibc_active_txs);
836
837         /* I'm still holding ibc_lock! */
838         if (conn->ibc_state != IBLND_CONN_ESTABLISHED) {
839                 rc = -ECONNABORTED;
840         } else if (tx->tx_pool->tpo_pool.po_failed ||
841                  conn->ibc_hdev != tx->tx_pool->tpo_hdev) {
842                 /* close_conn will launch failover */
843                 rc = -ENETDOWN;
844         } else {
845                 struct kib_fast_reg_descriptor *frd = tx->fmr.fmr_frd;
846                 struct ib_send_wr *bad = &tx->tx_wrq[tx->tx_nwrq - 1].wr;
847                 struct ib_send_wr *wrq = &tx->tx_wrq[0].wr;
848
849                 if (frd) {
850                         if (!frd->frd_valid) {
851                                 wrq = &frd->frd_inv_wr;
852                                 wrq->next = &frd->frd_fastreg_wr.wr;
853                         } else {
854                                 wrq = &frd->frd_fastreg_wr.wr;
855                         }
856                         frd->frd_fastreg_wr.wr.next = &tx->tx_wrq[0].wr;
857                 }
858
859                 LASSERTF(bad->wr_id == kiblnd_ptr2wreqid(tx, IBLND_WID_TX),
860                          "bad wr_id %llx, opc %d, flags %d, peer: %s\n",
861                          bad->wr_id, bad->opcode, bad->send_flags,
862                          libcfs_nid2str(conn->ibc_peer->ibp_nid));
863                 bad = NULL;
864                 rc = ib_post_send(conn->ibc_cmid->qp, wrq, &bad);
865         }
866
867         conn->ibc_last_send = jiffies;
868
869         if (!rc)
870                 return 0;
871
872         /*
873          * NB credits are transferred in the actual
874          * message, which can only be the last work item
875          */
876         conn->ibc_credits += credit;
877         conn->ibc_outstanding_credits += msg->ibm_credits;
878         conn->ibc_nsends_posted--;
879         if (msg->ibm_type == IBLND_MSG_NOOP)
880                 conn->ibc_noops_posted--;
881
882         tx->tx_status = rc;
883         tx->tx_waiting = 0;
884         tx->tx_sending--;
885
886         done = !tx->tx_sending;
887         if (done)
888                 list_del(&tx->tx_list);
889
890         spin_unlock(&conn->ibc_lock);
891
892         if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
893                 CERROR("Error %d posting transmit to %s\n",
894                        rc, libcfs_nid2str(peer->ibp_nid));
895         else
896                 CDEBUG(D_NET, "Error %d posting transmit to %s\n",
897                        rc, libcfs_nid2str(peer->ibp_nid));
898
899         kiblnd_close_conn(conn, rc);
900
901         if (done)
902                 kiblnd_tx_done(peer->ibp_ni, tx);
903
904         spin_lock(&conn->ibc_lock);
905
906         return -EIO;
907 }
908
909 static void
910 kiblnd_check_sends_locked(struct kib_conn *conn)
911 {
912         int ver = conn->ibc_version;
913         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
914         struct kib_tx *tx;
915
916         /* Don't send anything until after the connection is established */
917         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
918                 CDEBUG(D_NET, "%s too soon\n",
919                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
920                 return;
921         }
922
923         LASSERT(conn->ibc_nsends_posted <= kiblnd_concurrent_sends(ver, ni));
924         LASSERT(!IBLND_OOB_CAPABLE(ver) ||
925                 conn->ibc_noops_posted <= IBLND_OOB_MSGS(ver));
926         LASSERT(conn->ibc_reserved_credits >= 0);
927
928         while (conn->ibc_reserved_credits > 0 &&
929                !list_empty(&conn->ibc_tx_queue_rsrvd)) {
930                 tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
931                                 struct kib_tx, tx_list);
932                 list_del(&tx->tx_list);
933                 list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
934                 conn->ibc_reserved_credits--;
935         }
936
937         if (kiblnd_need_noop(conn)) {
938                 spin_unlock(&conn->ibc_lock);
939
940                 tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
941                 if (tx)
942                         kiblnd_init_tx_msg(ni, tx, IBLND_MSG_NOOP, 0);
943
944                 spin_lock(&conn->ibc_lock);
945                 if (tx)
946                         kiblnd_queue_tx_locked(tx, conn);
947         }
948
949         for (;;) {
950                 int credit;
951
952                 if (!list_empty(&conn->ibc_tx_queue_nocred)) {
953                         credit = 0;
954                         tx = list_entry(conn->ibc_tx_queue_nocred.next,
955                                         struct kib_tx, tx_list);
956                 } else if (!list_empty(&conn->ibc_tx_noops)) {
957                         LASSERT(!IBLND_OOB_CAPABLE(ver));
958                         credit = 1;
959                         tx = list_entry(conn->ibc_tx_noops.next,
960                                         struct kib_tx, tx_list);
961                 } else if (!list_empty(&conn->ibc_tx_queue)) {
962                         credit = 1;
963                         tx = list_entry(conn->ibc_tx_queue.next,
964                                         struct kib_tx, tx_list);
965                 } else {
966                         break;
967                 }
968
969                 if (kiblnd_post_tx_locked(conn, tx, credit))
970                         break;
971         }
972 }
973
974 static void
975 kiblnd_tx_complete(struct kib_tx *tx, int status)
976 {
977         int failed = (status != IB_WC_SUCCESS);
978         struct kib_conn *conn = tx->tx_conn;
979         int idle;
980
981         LASSERT(tx->tx_sending > 0);
982
983         if (failed) {
984                 if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
985                         CNETERR("Tx -> %s cookie %#llx sending %d waiting %d: failed %d\n",
986                                 libcfs_nid2str(conn->ibc_peer->ibp_nid),
987                                 tx->tx_cookie, tx->tx_sending, tx->tx_waiting,
988                                 status);
989
990                 kiblnd_close_conn(conn, -EIO);
991         } else {
992                 kiblnd_peer_alive(conn->ibc_peer);
993         }
994
995         spin_lock(&conn->ibc_lock);
996
997         /*
998          * I could be racing with rdma completion.  Whoever makes 'tx' idle
999          * gets to free it, which also drops its ref on 'conn'.
1000          */
1001         tx->tx_sending--;
1002         conn->ibc_nsends_posted--;
1003         if (tx->tx_msg->ibm_type == IBLND_MSG_NOOP)
1004                 conn->ibc_noops_posted--;
1005
1006         if (failed) {
1007                 tx->tx_waiting = 0;          /* don't wait for peer */
1008                 tx->tx_status = -EIO;
1009         }
1010
1011         idle = !tx->tx_sending &&        /* This is the final callback */
1012                !tx->tx_waiting &&              /* Not waiting for peer */
1013                !tx->tx_queued;            /* Not re-queued (PUT_DONE) */
1014         if (idle)
1015                 list_del(&tx->tx_list);
1016
1017         kiblnd_check_sends_locked(conn);
1018         spin_unlock(&conn->ibc_lock);
1019
1020         if (idle)
1021                 kiblnd_tx_done(conn->ibc_peer->ibp_ni, tx);
1022 }
1023
1024 static void
1025 kiblnd_init_tx_msg(lnet_ni_t *ni, struct kib_tx *tx, int type, int body_nob)
1026 {
1027         struct kib_hca_dev *hdev = tx->tx_pool->tpo_hdev;
1028         struct ib_sge *sge = &tx->tx_sge[tx->tx_nwrq];
1029         struct ib_rdma_wr *wrq = &tx->tx_wrq[tx->tx_nwrq];
1030         int nob = offsetof(struct kib_msg, ibm_u) + body_nob;
1031         struct ib_mr *mr = hdev->ibh_mrs;
1032
1033         LASSERT(tx->tx_nwrq >= 0);
1034         LASSERT(tx->tx_nwrq < IBLND_MAX_RDMA_FRAGS + 1);
1035         LASSERT(nob <= IBLND_MSG_SIZE);
1036         LASSERT(mr);
1037
1038         kiblnd_init_msg(tx->tx_msg, type, body_nob);
1039
1040         sge->lkey   = mr->lkey;
1041         sge->addr   = tx->tx_msgaddr;
1042         sge->length = nob;
1043
1044         memset(wrq, 0, sizeof(*wrq));
1045
1046         wrq->wr.next       = NULL;
1047         wrq->wr.wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_TX);
1048         wrq->wr.sg_list    = sge;
1049         wrq->wr.num_sge    = 1;
1050         wrq->wr.opcode     = IB_WR_SEND;
1051         wrq->wr.send_flags = IB_SEND_SIGNALED;
1052
1053         tx->tx_nwrq++;
1054 }
1055
1056 static int
1057 kiblnd_init_rdma(struct kib_conn *conn, struct kib_tx *tx, int type,
1058                  int resid, struct kib_rdma_desc *dstrd, __u64 dstcookie)
1059 {
1060         struct kib_msg *ibmsg = tx->tx_msg;
1061         struct kib_rdma_desc *srcrd = tx->tx_rd;
1062         struct ib_sge *sge = &tx->tx_sge[0];
1063         struct ib_rdma_wr *wrq, *next;
1064         int rc  = resid;
1065         int srcidx = 0;
1066         int dstidx = 0;
1067         int wrknob;
1068
1069         LASSERT(!in_interrupt());
1070         LASSERT(!tx->tx_nwrq);
1071         LASSERT(type == IBLND_MSG_GET_DONE ||
1072                 type == IBLND_MSG_PUT_DONE);
1073
1074         if (kiblnd_rd_size(srcrd) > conn->ibc_max_frags << PAGE_SHIFT) {
1075                 CERROR("RDMA is too large for peer %s (%d), src size: %d dst size: %d\n",
1076                        libcfs_nid2str(conn->ibc_peer->ibp_nid),
1077                        conn->ibc_max_frags << PAGE_SHIFT,
1078                        kiblnd_rd_size(srcrd), kiblnd_rd_size(dstrd));
1079                 rc = -EMSGSIZE;
1080                 goto too_big;
1081         }
1082
1083         while (resid > 0) {
1084                 if (srcidx >= srcrd->rd_nfrags) {
1085                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1086                         rc = -EPROTO;
1087                         break;
1088                 }
1089
1090                 if (dstidx == dstrd->rd_nfrags) {
1091                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1092                         rc = -EPROTO;
1093                         break;
1094                 }
1095
1096                 if (tx->tx_nwrq >= IBLND_MAX_RDMA_FRAGS) {
1097                         CERROR("RDMA has too many fragments for peer %s (%d), src idx/frags: %d/%d dst idx/frags: %d/%d\n",
1098                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1099                                IBLND_MAX_RDMA_FRAGS,
1100                                srcidx, srcrd->rd_nfrags,
1101                                dstidx, dstrd->rd_nfrags);
1102                         rc = -EMSGSIZE;
1103                         break;
1104                 }
1105
1106                 wrknob = min(min(kiblnd_rd_frag_size(srcrd, srcidx),
1107                                  kiblnd_rd_frag_size(dstrd, dstidx)),
1108                              (__u32)resid);
1109
1110                 sge = &tx->tx_sge[tx->tx_nwrq];
1111                 sge->addr   = kiblnd_rd_frag_addr(srcrd, srcidx);
1112                 sge->lkey   = kiblnd_rd_frag_key(srcrd, srcidx);
1113                 sge->length = wrknob;
1114
1115                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1116                 next = wrq + 1;
1117
1118                 wrq->wr.next       = &next->wr;
1119                 wrq->wr.wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);
1120                 wrq->wr.sg_list    = sge;
1121                 wrq->wr.num_sge    = 1;
1122                 wrq->wr.opcode     = IB_WR_RDMA_WRITE;
1123                 wrq->wr.send_flags = 0;
1124
1125                 wrq->remote_addr = kiblnd_rd_frag_addr(dstrd, dstidx);
1126                 wrq->rkey        = kiblnd_rd_frag_key(dstrd, dstidx);
1127
1128                 srcidx = kiblnd_rd_consume_frag(srcrd, srcidx, wrknob);
1129                 dstidx = kiblnd_rd_consume_frag(dstrd, dstidx, wrknob);
1130
1131                 resid -= wrknob;
1132
1133                 tx->tx_nwrq++;
1134                 wrq++;
1135                 sge++;
1136         }
1137 too_big:
1138         if (rc < 0)                          /* no RDMA if completing with failure */
1139                 tx->tx_nwrq = 0;
1140
1141         ibmsg->ibm_u.completion.ibcm_status = rc;
1142         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1143         kiblnd_init_tx_msg(conn->ibc_peer->ibp_ni, tx,
1144                            type, sizeof(struct kib_completion_msg));
1145
1146         return rc;
1147 }
1148
1149 static void
1150 kiblnd_queue_tx_locked(struct kib_tx *tx, struct kib_conn *conn)
1151 {
1152         struct list_head *q;
1153
1154         LASSERT(tx->tx_nwrq > 0);             /* work items set up */
1155         LASSERT(!tx->tx_queued);               /* not queued for sending already */
1156         LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1157
1158         tx->tx_queued = 1;
1159         tx->tx_deadline = jiffies +
1160                           msecs_to_jiffies(*kiblnd_tunables.kib_timeout *
1161                                            MSEC_PER_SEC);
1162
1163         if (!tx->tx_conn) {
1164                 kiblnd_conn_addref(conn);
1165                 tx->tx_conn = conn;
1166                 LASSERT(tx->tx_msg->ibm_type != IBLND_MSG_PUT_DONE);
1167         } else {
1168                 /* PUT_DONE first attached to conn as a PUT_REQ */
1169                 LASSERT(tx->tx_conn == conn);
1170                 LASSERT(tx->tx_msg->ibm_type == IBLND_MSG_PUT_DONE);
1171         }
1172
1173         switch (tx->tx_msg->ibm_type) {
1174         default:
1175                 LBUG();
1176
1177         case IBLND_MSG_PUT_REQ:
1178         case IBLND_MSG_GET_REQ:
1179                 q = &conn->ibc_tx_queue_rsrvd;
1180                 break;
1181
1182         case IBLND_MSG_PUT_NAK:
1183         case IBLND_MSG_PUT_ACK:
1184         case IBLND_MSG_PUT_DONE:
1185         case IBLND_MSG_GET_DONE:
1186                 q = &conn->ibc_tx_queue_nocred;
1187                 break;
1188
1189         case IBLND_MSG_NOOP:
1190                 if (IBLND_OOB_CAPABLE(conn->ibc_version))
1191                         q = &conn->ibc_tx_queue_nocred;
1192                 else
1193                         q = &conn->ibc_tx_noops;
1194                 break;
1195
1196         case IBLND_MSG_IMMEDIATE:
1197                 q = &conn->ibc_tx_queue;
1198                 break;
1199         }
1200
1201         list_add_tail(&tx->tx_list, q);
1202 }
1203
1204 static void
1205 kiblnd_queue_tx(struct kib_tx *tx, struct kib_conn *conn)
1206 {
1207         spin_lock(&conn->ibc_lock);
1208         kiblnd_queue_tx_locked(tx, conn);
1209         kiblnd_check_sends_locked(conn);
1210         spin_unlock(&conn->ibc_lock);
1211 }
1212
1213 static int kiblnd_resolve_addr(struct rdma_cm_id *cmid,
1214                                struct sockaddr_in *srcaddr,
1215                                struct sockaddr_in *dstaddr,
1216                                int timeout_ms)
1217 {
1218         unsigned short port;
1219         int rc;
1220
1221         /* allow the port to be reused */
1222         rc = rdma_set_reuseaddr(cmid, 1);
1223         if (rc) {
1224                 CERROR("Unable to set reuse on cmid: %d\n", rc);
1225                 return rc;
1226         }
1227
1228         /* look for a free privileged port */
1229         for (port = PROT_SOCK - 1; port > 0; port--) {
1230                 srcaddr->sin_port = htons(port);
1231                 rc = rdma_resolve_addr(cmid,
1232                                        (struct sockaddr *)srcaddr,
1233                                        (struct sockaddr *)dstaddr,
1234                                        timeout_ms);
1235                 if (!rc) {
1236                         CDEBUG(D_NET, "bound to port %hu\n", port);
1237                         return 0;
1238                 } else if (rc == -EADDRINUSE || rc == -EADDRNOTAVAIL) {
1239                         CDEBUG(D_NET, "bind to port %hu failed: %d\n",
1240                                port, rc);
1241                 } else {
1242                         return rc;
1243                 }
1244         }
1245
1246         CERROR("Failed to bind to a free privileged port\n");
1247         return rc;
1248 }
1249
1250 static void
1251 kiblnd_connect_peer(struct kib_peer *peer)
1252 {
1253         struct rdma_cm_id *cmid;
1254         struct kib_dev *dev;
1255         struct kib_net *net = peer->ibp_ni->ni_data;
1256         struct sockaddr_in srcaddr;
1257         struct sockaddr_in dstaddr;
1258         int rc;
1259
1260         LASSERT(net);
1261         LASSERT(peer->ibp_connecting > 0);
1262         LASSERT(!peer->ibp_reconnecting);
1263
1264         cmid = kiblnd_rdma_create_id(kiblnd_cm_callback, peer, RDMA_PS_TCP,
1265                                      IB_QPT_RC);
1266
1267         if (IS_ERR(cmid)) {
1268                 CERROR("Can't create CMID for %s: %ld\n",
1269                        libcfs_nid2str(peer->ibp_nid), PTR_ERR(cmid));
1270                 rc = PTR_ERR(cmid);
1271                 goto failed;
1272         }
1273
1274         dev = net->ibn_dev;
1275         memset(&srcaddr, 0, sizeof(srcaddr));
1276         srcaddr.sin_family = AF_INET;
1277         srcaddr.sin_addr.s_addr = htonl(dev->ibd_ifip);
1278
1279         memset(&dstaddr, 0, sizeof(dstaddr));
1280         dstaddr.sin_family = AF_INET;
1281         dstaddr.sin_port = htons(*kiblnd_tunables.kib_service);
1282         dstaddr.sin_addr.s_addr = htonl(LNET_NIDADDR(peer->ibp_nid));
1283
1284         kiblnd_peer_addref(peer);              /* cmid's ref */
1285
1286         if (*kiblnd_tunables.kib_use_priv_port) {
1287                 rc = kiblnd_resolve_addr(cmid, &srcaddr, &dstaddr,
1288                                          *kiblnd_tunables.kib_timeout * 1000);
1289         } else {
1290                 rc = rdma_resolve_addr(cmid,
1291                                        (struct sockaddr *)&srcaddr,
1292                                        (struct sockaddr *)&dstaddr,
1293                                        *kiblnd_tunables.kib_timeout * 1000);
1294         }
1295         if (rc) {
1296                 /* Can't initiate address resolution:  */
1297                 CERROR("Can't resolve addr for %s: %d\n",
1298                        libcfs_nid2str(peer->ibp_nid), rc);
1299                 goto failed2;
1300         }
1301
1302         LASSERT(cmid->device);
1303         CDEBUG(D_NET, "%s: connection bound to %s:%pI4h:%s\n",
1304                libcfs_nid2str(peer->ibp_nid), dev->ibd_ifname,
1305                &dev->ibd_ifip, cmid->device->name);
1306
1307         return;
1308
1309  failed2:
1310         kiblnd_peer_connect_failed(peer, 1, rc);
1311         kiblnd_peer_decref(peer);              /* cmid's ref */
1312         rdma_destroy_id(cmid);
1313         return;
1314  failed:
1315         kiblnd_peer_connect_failed(peer, 1, rc);
1316 }
1317
1318 bool
1319 kiblnd_reconnect_peer(struct kib_peer *peer)
1320 {
1321         rwlock_t *glock = &kiblnd_data.kib_global_lock;
1322         char *reason = NULL;
1323         struct list_head txs;
1324         unsigned long flags;
1325
1326         INIT_LIST_HEAD(&txs);
1327
1328         write_lock_irqsave(glock, flags);
1329         if (!peer->ibp_reconnecting) {
1330                 if (peer->ibp_accepting)
1331                         reason = "accepting";
1332                 else if (peer->ibp_connecting)
1333                         reason = "connecting";
1334                 else if (!list_empty(&peer->ibp_conns))
1335                         reason = "connected";
1336                 else /* connected then closed */
1337                         reason = "closed";
1338
1339                 goto no_reconnect;
1340         }
1341
1342         LASSERT(!peer->ibp_accepting && !peer->ibp_connecting &&
1343                 list_empty(&peer->ibp_conns));
1344         peer->ibp_reconnecting = 0;
1345
1346         if (!kiblnd_peer_active(peer)) {
1347                 list_splice_init(&peer->ibp_tx_queue, &txs);
1348                 reason = "unlinked";
1349                 goto no_reconnect;
1350         }
1351
1352         peer->ibp_connecting++;
1353         peer->ibp_reconnected++;
1354         write_unlock_irqrestore(glock, flags);
1355
1356         kiblnd_connect_peer(peer);
1357         return true;
1358
1359 no_reconnect:
1360         write_unlock_irqrestore(glock, flags);
1361
1362         CWARN("Abort reconnection of %s: %s\n",
1363               libcfs_nid2str(peer->ibp_nid), reason);
1364         kiblnd_txlist_done(peer->ibp_ni, &txs, -ECONNABORTED);
1365         return false;
1366 }
1367
1368 void
1369 kiblnd_launch_tx(lnet_ni_t *ni, struct kib_tx *tx, lnet_nid_t nid)
1370 {
1371         struct kib_peer *peer;
1372         struct kib_peer *peer2;
1373         struct kib_conn *conn;
1374         rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
1375         unsigned long flags;
1376         int rc;
1377
1378         /*
1379          * If I get here, I've committed to send, so I complete the tx with
1380          * failure on any problems
1381          */
1382         LASSERT(!tx || !tx->tx_conn); /* only set when assigned a conn */
1383         LASSERT(!tx || tx->tx_nwrq > 0);     /* work items have been set up */
1384
1385         /*
1386          * First time, just use a read lock since I expect to find my peer
1387          * connected
1388          */
1389         read_lock_irqsave(g_lock, flags);
1390
1391         peer = kiblnd_find_peer_locked(nid);
1392         if (peer && !list_empty(&peer->ibp_conns)) {
1393                 /* Found a peer with an established connection */
1394                 conn = kiblnd_get_conn_locked(peer);
1395                 kiblnd_conn_addref(conn); /* 1 ref for me... */
1396
1397                 read_unlock_irqrestore(g_lock, flags);
1398
1399                 if (tx)
1400                         kiblnd_queue_tx(tx, conn);
1401                 kiblnd_conn_decref(conn); /* ...to here */
1402                 return;
1403         }
1404
1405         read_unlock(g_lock);
1406         /* Re-try with a write lock */
1407         write_lock(g_lock);
1408
1409         peer = kiblnd_find_peer_locked(nid);
1410         if (peer) {
1411                 if (list_empty(&peer->ibp_conns)) {
1412                         /* found a peer, but it's still connecting... */
1413                         LASSERT(kiblnd_peer_connecting(peer));
1414                         if (tx)
1415                                 list_add_tail(&tx->tx_list,
1416                                               &peer->ibp_tx_queue);
1417                         write_unlock_irqrestore(g_lock, flags);
1418                 } else {
1419                         conn = kiblnd_get_conn_locked(peer);
1420                         kiblnd_conn_addref(conn); /* 1 ref for me... */
1421
1422                         write_unlock_irqrestore(g_lock, flags);
1423
1424                         if (tx)
1425                                 kiblnd_queue_tx(tx, conn);
1426                         kiblnd_conn_decref(conn); /* ...to here */
1427                 }
1428                 return;
1429         }
1430
1431         write_unlock_irqrestore(g_lock, flags);
1432
1433         /* Allocate a peer ready to add to the peer table and retry */
1434         rc = kiblnd_create_peer(ni, &peer, nid);
1435         if (rc) {
1436                 CERROR("Can't create peer %s\n", libcfs_nid2str(nid));
1437                 if (tx) {
1438                         tx->tx_status = -EHOSTUNREACH;
1439                         tx->tx_waiting = 0;
1440                         kiblnd_tx_done(ni, tx);
1441                 }
1442                 return;
1443         }
1444
1445         write_lock_irqsave(g_lock, flags);
1446
1447         peer2 = kiblnd_find_peer_locked(nid);
1448         if (peer2) {
1449                 if (list_empty(&peer2->ibp_conns)) {
1450                         /* found a peer, but it's still connecting... */
1451                         LASSERT(kiblnd_peer_connecting(peer2));
1452                         if (tx)
1453                                 list_add_tail(&tx->tx_list,
1454                                               &peer2->ibp_tx_queue);
1455                         write_unlock_irqrestore(g_lock, flags);
1456                 } else {
1457                         conn = kiblnd_get_conn_locked(peer2);
1458                         kiblnd_conn_addref(conn); /* 1 ref for me... */
1459
1460                         write_unlock_irqrestore(g_lock, flags);
1461
1462                         if (tx)
1463                                 kiblnd_queue_tx(tx, conn);
1464                         kiblnd_conn_decref(conn); /* ...to here */
1465                 }
1466
1467                 kiblnd_peer_decref(peer);
1468                 return;
1469         }
1470
1471         /* Brand new peer */
1472         LASSERT(!peer->ibp_connecting);
1473         peer->ibp_connecting = 1;
1474
1475         /* always called with a ref on ni, which prevents ni being shutdown */
1476         LASSERT(!((struct kib_net *)ni->ni_data)->ibn_shutdown);
1477
1478         if (tx)
1479                 list_add_tail(&tx->tx_list, &peer->ibp_tx_queue);
1480
1481         kiblnd_peer_addref(peer);
1482         list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
1483
1484         write_unlock_irqrestore(g_lock, flags);
1485
1486         kiblnd_connect_peer(peer);
1487         kiblnd_peer_decref(peer);
1488 }
1489
1490 int
1491 kiblnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1492 {
1493         lnet_hdr_t *hdr = &lntmsg->msg_hdr;
1494         int type = lntmsg->msg_type;
1495         lnet_process_id_t target = lntmsg->msg_target;
1496         int target_is_router = lntmsg->msg_target_is_router;
1497         int routing = lntmsg->msg_routing;
1498         unsigned int payload_niov = lntmsg->msg_niov;
1499         struct kvec *payload_iov = lntmsg->msg_iov;
1500         lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
1501         unsigned int payload_offset = lntmsg->msg_offset;
1502         unsigned int payload_nob = lntmsg->msg_len;
1503         struct iov_iter from;
1504         struct kib_msg *ibmsg;
1505         struct kib_rdma_desc  *rd;
1506         struct kib_tx *tx;
1507         int nob;
1508         int rc;
1509
1510         /* NB 'private' is different depending on what we're sending.... */
1511
1512         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1513                payload_nob, payload_niov, libcfs_id2str(target));
1514
1515         LASSERT(!payload_nob || payload_niov > 0);
1516         LASSERT(payload_niov <= LNET_MAX_IOV);
1517
1518         /* Thread context */
1519         LASSERT(!in_interrupt());
1520         /* payload is either all vaddrs or all pages */
1521         LASSERT(!(payload_kiov && payload_iov));
1522
1523         if (payload_kiov)
1524                 iov_iter_bvec(&from, ITER_BVEC | WRITE,
1525                               payload_kiov, payload_niov,
1526                               payload_nob + payload_offset);
1527         else
1528                 iov_iter_kvec(&from, ITER_KVEC | WRITE,
1529                               payload_iov, payload_niov,
1530                               payload_nob + payload_offset);
1531
1532         iov_iter_advance(&from, payload_offset);
1533
1534         switch (type) {
1535         default:
1536                 LBUG();
1537                 return -EIO;
1538
1539         case LNET_MSG_ACK:
1540                 LASSERT(!payload_nob);
1541                 break;
1542
1543         case LNET_MSG_GET:
1544                 if (routing || target_is_router)
1545                         break;            /* send IMMEDIATE */
1546
1547                 /* is the REPLY message too small for RDMA? */
1548                 nob = offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1549                 if (nob <= IBLND_MSG_SIZE)
1550                         break;            /* send IMMEDIATE */
1551
1552                 tx = kiblnd_get_idle_tx(ni, target.nid);
1553                 if (!tx) {
1554                         CERROR("Can't allocate txd for GET to %s\n",
1555                                libcfs_nid2str(target.nid));
1556                         return -ENOMEM;
1557                 }
1558
1559                 ibmsg = tx->tx_msg;
1560                 rd = &ibmsg->ibm_u.get.ibgm_rd;
1561                 if (!(lntmsg->msg_md->md_options & LNET_MD_KIOV))
1562                         rc = kiblnd_setup_rd_iov(ni, tx, rd,
1563                                                  lntmsg->msg_md->md_niov,
1564                                                  lntmsg->msg_md->md_iov.iov,
1565                                                  0, lntmsg->msg_md->md_length);
1566                 else
1567                         rc = kiblnd_setup_rd_kiov(ni, tx, rd,
1568                                                   lntmsg->msg_md->md_niov,
1569                                                   lntmsg->msg_md->md_iov.kiov,
1570                                                   0, lntmsg->msg_md->md_length);
1571                 if (rc) {
1572                         CERROR("Can't setup GET sink for %s: %d\n",
1573                                libcfs_nid2str(target.nid), rc);
1574                         kiblnd_tx_done(ni, tx);
1575                         return -EIO;
1576                 }
1577
1578                 nob = offsetof(struct kib_get_msg, ibgm_rd.rd_frags[rd->rd_nfrags]);
1579                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1580                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1581
1582                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_GET_REQ, nob);
1583
1584                 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
1585                 if (!tx->tx_lntmsg[1]) {
1586                         CERROR("Can't create reply for GET -> %s\n",
1587                                libcfs_nid2str(target.nid));
1588                         kiblnd_tx_done(ni, tx);
1589                         return -EIO;
1590                 }
1591
1592                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1593                 tx->tx_waiting = 1;          /* waiting for GET_DONE */
1594                 kiblnd_launch_tx(ni, tx, target.nid);
1595                 return 0;
1596
1597         case LNET_MSG_REPLY:
1598         case LNET_MSG_PUT:
1599                 /* Is the payload small enough not to need RDMA? */
1600                 nob = offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[payload_nob]);
1601                 if (nob <= IBLND_MSG_SIZE)
1602                         break;            /* send IMMEDIATE */
1603
1604                 tx = kiblnd_get_idle_tx(ni, target.nid);
1605                 if (!tx) {
1606                         CERROR("Can't allocate %s txd for %s\n",
1607                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1608                                libcfs_nid2str(target.nid));
1609                         return -ENOMEM;
1610                 }
1611
1612                 if (!payload_kiov)
1613                         rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1614                                                  payload_niov, payload_iov,
1615                                                  payload_offset, payload_nob);
1616                 else
1617                         rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1618                                                   payload_niov, payload_kiov,
1619                                                   payload_offset, payload_nob);
1620                 if (rc) {
1621                         CERROR("Can't setup PUT src for %s: %d\n",
1622                                libcfs_nid2str(target.nid), rc);
1623                         kiblnd_tx_done(ni, tx);
1624                         return -EIO;
1625                 }
1626
1627                 ibmsg = tx->tx_msg;
1628                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1629                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1630                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_REQ, sizeof(struct kib_putreq_msg));
1631
1632                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1633                 tx->tx_waiting = 1;          /* waiting for PUT_{ACK,NAK} */
1634                 kiblnd_launch_tx(ni, tx, target.nid);
1635                 return 0;
1636         }
1637
1638         /* send IMMEDIATE */
1639
1640         LASSERT(offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[payload_nob])
1641                  <= IBLND_MSG_SIZE);
1642
1643         tx = kiblnd_get_idle_tx(ni, target.nid);
1644         if (!tx) {
1645                 CERROR("Can't send %d to %s: tx descs exhausted\n",
1646                        type, libcfs_nid2str(target.nid));
1647                 return -ENOMEM;
1648         }
1649
1650         ibmsg = tx->tx_msg;
1651         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1652
1653         rc = copy_from_iter(&ibmsg->ibm_u.immediate.ibim_payload, payload_nob,
1654                             &from);
1655         if (rc != payload_nob) {
1656                 kiblnd_pool_free_node(&tx->tx_pool->tpo_pool, &tx->tx_list);
1657                 return -EFAULT;
1658         }
1659
1660         nob = offsetof(struct kib_immediate_msg, ibim_payload[payload_nob]);
1661         kiblnd_init_tx_msg(ni, tx, IBLND_MSG_IMMEDIATE, nob);
1662
1663         tx->tx_lntmsg[0] = lntmsg;            /* finalise lntmsg on completion */
1664         kiblnd_launch_tx(ni, tx, target.nid);
1665         return 0;
1666 }
1667
1668 static void
1669 kiblnd_reply(lnet_ni_t *ni, struct kib_rx *rx, lnet_msg_t *lntmsg)
1670 {
1671         lnet_process_id_t target = lntmsg->msg_target;
1672         unsigned int niov = lntmsg->msg_niov;
1673         struct kvec *iov = lntmsg->msg_iov;
1674         lnet_kiov_t *kiov = lntmsg->msg_kiov;
1675         unsigned int offset = lntmsg->msg_offset;
1676         unsigned int nob = lntmsg->msg_len;
1677         struct kib_tx *tx;
1678         int rc;
1679
1680         tx = kiblnd_get_idle_tx(ni, rx->rx_conn->ibc_peer->ibp_nid);
1681         if (!tx) {
1682                 CERROR("Can't get tx for REPLY to %s\n",
1683                        libcfs_nid2str(target.nid));
1684                 goto failed_0;
1685         }
1686
1687         if (!nob)
1688                 rc = 0;
1689         else if (!kiov)
1690                 rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1691                                          niov, iov, offset, nob);
1692         else
1693                 rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1694                                           niov, kiov, offset, nob);
1695
1696         if (rc) {
1697                 CERROR("Can't setup GET src for %s: %d\n",
1698                        libcfs_nid2str(target.nid), rc);
1699                 goto failed_1;
1700         }
1701
1702         rc = kiblnd_init_rdma(rx->rx_conn, tx,
1703                               IBLND_MSG_GET_DONE, nob,
1704                               &rx->rx_msg->ibm_u.get.ibgm_rd,
1705                               rx->rx_msg->ibm_u.get.ibgm_cookie);
1706         if (rc < 0) {
1707                 CERROR("Can't setup rdma for GET from %s: %d\n",
1708                        libcfs_nid2str(target.nid), rc);
1709                 goto failed_1;
1710         }
1711
1712         if (!nob) {
1713                 /* No RDMA: local completion may happen now! */
1714                 lnet_finalize(ni, lntmsg, 0);
1715         } else {
1716                 /* RDMA: lnet_finalize(lntmsg) when it completes */
1717                 tx->tx_lntmsg[0] = lntmsg;
1718         }
1719
1720         kiblnd_queue_tx(tx, rx->rx_conn);
1721         return;
1722
1723  failed_1:
1724         kiblnd_tx_done(ni, tx);
1725  failed_0:
1726         lnet_finalize(ni, lntmsg, -EIO);
1727 }
1728
1729 int
1730 kiblnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1731             struct iov_iter *to, unsigned int rlen)
1732 {
1733         struct kib_rx *rx = private;
1734         struct kib_msg *rxmsg = rx->rx_msg;
1735         struct kib_conn *conn = rx->rx_conn;
1736         struct kib_tx *tx;
1737         int nob;
1738         int post_credit = IBLND_POSTRX_PEER_CREDIT;
1739         int rc = 0;
1740
1741         LASSERT(iov_iter_count(to) <= rlen);
1742         LASSERT(!in_interrupt());
1743         /* Either all pages or all vaddrs */
1744
1745         switch (rxmsg->ibm_type) {
1746         default:
1747                 LBUG();
1748
1749         case IBLND_MSG_IMMEDIATE:
1750                 nob = offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[rlen]);
1751                 if (nob > rx->rx_nob) {
1752                         CERROR("Immediate message from %s too big: %d(%d)\n",
1753                                libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1754                                nob, rx->rx_nob);
1755                         rc = -EPROTO;
1756                         break;
1757                 }
1758
1759                 rc = copy_to_iter(&rxmsg->ibm_u.immediate.ibim_payload, rlen,
1760                                   to);
1761                 if (rc != rlen) {
1762                         rc = -EFAULT;
1763                         break;
1764                 }
1765
1766                 rc = 0;
1767                 lnet_finalize(ni, lntmsg, 0);
1768                 break;
1769
1770         case IBLND_MSG_PUT_REQ: {
1771                 struct kib_msg  *txmsg;
1772                 struct kib_rdma_desc *rd;
1773
1774                 if (!iov_iter_count(to)) {
1775                         lnet_finalize(ni, lntmsg, 0);
1776                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, 0,
1777                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1778                         break;
1779                 }
1780
1781                 tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
1782                 if (!tx) {
1783                         CERROR("Can't allocate tx for %s\n",
1784                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1785                         /* Not replying will break the connection */
1786                         rc = -ENOMEM;
1787                         break;
1788                 }
1789
1790                 txmsg = tx->tx_msg;
1791                 rd = &txmsg->ibm_u.putack.ibpam_rd;
1792                 if (!(to->type & ITER_BVEC))
1793                         rc = kiblnd_setup_rd_iov(ni, tx, rd,
1794                                                  to->nr_segs, to->kvec,
1795                                                  to->iov_offset,
1796                                                  iov_iter_count(to));
1797                 else
1798                         rc = kiblnd_setup_rd_kiov(ni, tx, rd,
1799                                                   to->nr_segs, to->bvec,
1800                                                   to->iov_offset,
1801                                                   iov_iter_count(to));
1802                 if (rc) {
1803                         CERROR("Can't setup PUT sink for %s: %d\n",
1804                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1805                         kiblnd_tx_done(ni, tx);
1806                         /* tell peer it's over */
1807                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, rc,
1808                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1809                         break;
1810                 }
1811
1812                 nob = offsetof(struct kib_putack_msg, ibpam_rd.rd_frags[rd->rd_nfrags]);
1813                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1814                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1815
1816                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_ACK, nob);
1817
1818                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1819                 tx->tx_waiting = 1;          /* waiting for PUT_DONE */
1820                 kiblnd_queue_tx(tx, conn);
1821
1822                 /* reposted buffer reserved for PUT_DONE */
1823                 post_credit = IBLND_POSTRX_NO_CREDIT;
1824                 break;
1825                 }
1826
1827         case IBLND_MSG_GET_REQ:
1828                 if (lntmsg) {
1829                         /* Optimized GET; RDMA lntmsg's payload */
1830                         kiblnd_reply(ni, rx, lntmsg);
1831                 } else {
1832                         /* GET didn't match anything */
1833                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_GET_DONE,
1834                                                -ENODATA,
1835                                                rxmsg->ibm_u.get.ibgm_cookie);
1836                 }
1837                 break;
1838         }
1839
1840         kiblnd_post_rx(rx, post_credit);
1841         return rc;
1842 }
1843
1844 int
1845 kiblnd_thread_start(int (*fn)(void *arg), void *arg, char *name)
1846 {
1847         struct task_struct *task = kthread_run(fn, arg, "%s", name);
1848
1849         if (IS_ERR(task))
1850                 return PTR_ERR(task);
1851
1852         atomic_inc(&kiblnd_data.kib_nthreads);
1853         return 0;
1854 }
1855
1856 static void
1857 kiblnd_thread_fini(void)
1858 {
1859         atomic_dec(&kiblnd_data.kib_nthreads);
1860 }
1861
1862 static void
1863 kiblnd_peer_alive(struct kib_peer *peer)
1864 {
1865         /* This is racy, but everyone's only writing cfs_time_current() */
1866         peer->ibp_last_alive = cfs_time_current();
1867         mb();
1868 }
1869
1870 static void
1871 kiblnd_peer_notify(struct kib_peer *peer)
1872 {
1873         int error = 0;
1874         unsigned long last_alive = 0;
1875         unsigned long flags;
1876
1877         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1878
1879         if (kiblnd_peer_idle(peer) && peer->ibp_error) {
1880                 error = peer->ibp_error;
1881                 peer->ibp_error = 0;
1882
1883                 last_alive = peer->ibp_last_alive;
1884         }
1885
1886         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1887
1888         if (error)
1889                 lnet_notify(peer->ibp_ni,
1890                             peer->ibp_nid, 0, last_alive);
1891 }
1892
1893 void
1894 kiblnd_close_conn_locked(struct kib_conn *conn, int error)
1895 {
1896         /*
1897          * This just does the immediate housekeeping. 'error' is zero for a
1898          * normal shutdown which can happen only after the connection has been
1899          * established.  If the connection is established, schedule the
1900          * connection to be finished off by the connd. Otherwise the connd is
1901          * already dealing with it (either to set it up or tear it down).
1902          * Caller holds kib_global_lock exclusively in irq context
1903          */
1904         struct kib_peer *peer = conn->ibc_peer;
1905         struct kib_dev *dev;
1906         unsigned long flags;
1907
1908         LASSERT(error || conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1909
1910         if (error && !conn->ibc_comms_error)
1911                 conn->ibc_comms_error = error;
1912
1913         if (conn->ibc_state != IBLND_CONN_ESTABLISHED)
1914                 return; /* already being handled  */
1915
1916         if (!error &&
1917             list_empty(&conn->ibc_tx_noops) &&
1918             list_empty(&conn->ibc_tx_queue) &&
1919             list_empty(&conn->ibc_tx_queue_rsrvd) &&
1920             list_empty(&conn->ibc_tx_queue_nocred) &&
1921             list_empty(&conn->ibc_active_txs)) {
1922                 CDEBUG(D_NET, "closing conn to %s\n",
1923                        libcfs_nid2str(peer->ibp_nid));
1924         } else {
1925                 CNETERR("Closing conn to %s: error %d%s%s%s%s%s\n",
1926                         libcfs_nid2str(peer->ibp_nid), error,
1927                         list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1928                         list_empty(&conn->ibc_tx_noops) ? "" : "(sending_noops)",
1929                         list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
1930                         list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
1931                         list_empty(&conn->ibc_active_txs) ? "" : "(waiting)");
1932         }
1933
1934         dev = ((struct kib_net *)peer->ibp_ni->ni_data)->ibn_dev;
1935         list_del(&conn->ibc_list);
1936         /* connd (see below) takes over ibc_list's ref */
1937
1938         if (list_empty(&peer->ibp_conns) &&    /* no more conns */
1939             kiblnd_peer_active(peer)) {  /* still in peer table */
1940                 kiblnd_unlink_peer_locked(peer);
1941
1942                 /* set/clear error on last conn */
1943                 peer->ibp_error = conn->ibc_comms_error;
1944         }
1945
1946         kiblnd_set_conn_state(conn, IBLND_CONN_CLOSING);
1947
1948         if (error &&
1949             kiblnd_dev_can_failover(dev)) {
1950                 list_add_tail(&dev->ibd_fail_list,
1951                               &kiblnd_data.kib_failed_devs);
1952                 wake_up(&kiblnd_data.kib_failover_waitq);
1953         }
1954
1955         spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
1956
1957         list_add_tail(&conn->ibc_list, &kiblnd_data.kib_connd_conns);
1958         wake_up(&kiblnd_data.kib_connd_waitq);
1959
1960         spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
1961 }
1962
1963 void
1964 kiblnd_close_conn(struct kib_conn *conn, int error)
1965 {
1966         unsigned long flags;
1967
1968         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1969
1970         kiblnd_close_conn_locked(conn, error);
1971
1972         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1973 }
1974
1975 static void
1976 kiblnd_handle_early_rxs(struct kib_conn *conn)
1977 {
1978         unsigned long flags;
1979         struct kib_rx *rx;
1980         struct kib_rx *tmp;
1981
1982         LASSERT(!in_interrupt());
1983         LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1984
1985         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1986         list_for_each_entry_safe(rx, tmp, &conn->ibc_early_rxs, rx_list) {
1987                 list_del(&rx->rx_list);
1988                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1989
1990                 kiblnd_handle_rx(rx);
1991
1992                 write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1993         }
1994         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1995 }
1996
1997 static void
1998 kiblnd_abort_txs(struct kib_conn *conn, struct list_head *txs)
1999 {
2000         LIST_HEAD(zombies);
2001         struct list_head *tmp;
2002         struct list_head *nxt;
2003         struct kib_tx *tx;
2004
2005         spin_lock(&conn->ibc_lock);
2006
2007         list_for_each_safe(tmp, nxt, txs) {
2008                 tx = list_entry(tmp, struct kib_tx, tx_list);
2009
2010                 if (txs == &conn->ibc_active_txs) {
2011                         LASSERT(!tx->tx_queued);
2012                         LASSERT(tx->tx_waiting || tx->tx_sending);
2013                 } else {
2014                         LASSERT(tx->tx_queued);
2015                 }
2016
2017                 tx->tx_status = -ECONNABORTED;
2018                 tx->tx_waiting = 0;
2019
2020                 if (!tx->tx_sending) {
2021                         tx->tx_queued = 0;
2022                         list_del(&tx->tx_list);
2023                         list_add(&tx->tx_list, &zombies);
2024                 }
2025         }
2026
2027         spin_unlock(&conn->ibc_lock);
2028
2029         kiblnd_txlist_done(conn->ibc_peer->ibp_ni, &zombies, -ECONNABORTED);
2030 }
2031
2032 static void
2033 kiblnd_finalise_conn(struct kib_conn *conn)
2034 {
2035         LASSERT(!in_interrupt());
2036         LASSERT(conn->ibc_state > IBLND_CONN_INIT);
2037
2038         kiblnd_set_conn_state(conn, IBLND_CONN_DISCONNECTED);
2039
2040         /*
2041          * abort_receives moves QP state to IB_QPS_ERR.  This is only required
2042          * for connections that didn't get as far as being connected, because
2043          * rdma_disconnect() does this for free.
2044          */
2045         kiblnd_abort_receives(conn);
2046
2047         /*
2048          * Complete all tx descs not waiting for sends to complete.
2049          * NB we should be safe from RDMA now that the QP has changed state
2050          */
2051         kiblnd_abort_txs(conn, &conn->ibc_tx_noops);
2052         kiblnd_abort_txs(conn, &conn->ibc_tx_queue);
2053         kiblnd_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
2054         kiblnd_abort_txs(conn, &conn->ibc_tx_queue_nocred);
2055         kiblnd_abort_txs(conn, &conn->ibc_active_txs);
2056
2057         kiblnd_handle_early_rxs(conn);
2058 }
2059
2060 static void
2061 kiblnd_peer_connect_failed(struct kib_peer *peer, int active, int error)
2062 {
2063         LIST_HEAD(zombies);
2064         unsigned long flags;
2065
2066         LASSERT(error);
2067         LASSERT(!in_interrupt());
2068
2069         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2070
2071         if (active) {
2072                 LASSERT(peer->ibp_connecting > 0);
2073                 peer->ibp_connecting--;
2074         } else {
2075                 LASSERT(peer->ibp_accepting > 0);
2076                 peer->ibp_accepting--;
2077         }
2078
2079         if (kiblnd_peer_connecting(peer)) {
2080                 /* another connection attempt under way... */
2081                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
2082                                         flags);
2083                 return;
2084         }
2085
2086         peer->ibp_reconnected = 0;
2087         if (list_empty(&peer->ibp_conns)) {
2088                 /* Take peer's blocked transmits to complete with error */
2089                 list_add(&zombies, &peer->ibp_tx_queue);
2090                 list_del_init(&peer->ibp_tx_queue);
2091
2092                 if (kiblnd_peer_active(peer))
2093                         kiblnd_unlink_peer_locked(peer);
2094
2095                 peer->ibp_error = error;
2096         } else {
2097                 /* Can't have blocked transmits if there are connections */
2098                 LASSERT(list_empty(&peer->ibp_tx_queue));
2099         }
2100
2101         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2102
2103         kiblnd_peer_notify(peer);
2104
2105         if (list_empty(&zombies))
2106                 return;
2107
2108         CNETERR("Deleting messages for %s: connection failed\n",
2109                 libcfs_nid2str(peer->ibp_nid));
2110
2111         kiblnd_txlist_done(peer->ibp_ni, &zombies, -EHOSTUNREACH);
2112 }
2113
2114 static void
2115 kiblnd_connreq_done(struct kib_conn *conn, int status)
2116 {
2117         struct kib_peer *peer = conn->ibc_peer;
2118         struct kib_tx *tx;
2119         struct kib_tx *tmp;
2120         struct list_head txs;
2121         unsigned long flags;
2122         int active;
2123
2124         active = (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2125
2126         CDEBUG(D_NET, "%s: active(%d), version(%x), status(%d)\n",
2127                libcfs_nid2str(peer->ibp_nid), active,
2128                conn->ibc_version, status);
2129
2130         LASSERT(!in_interrupt());
2131         LASSERT((conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT &&
2132                  peer->ibp_connecting > 0) ||
2133                  (conn->ibc_state == IBLND_CONN_PASSIVE_WAIT &&
2134                  peer->ibp_accepting > 0));
2135
2136         LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
2137         conn->ibc_connvars = NULL;
2138
2139         if (status) {
2140                 /* failed to establish connection */
2141                 kiblnd_peer_connect_failed(peer, active, status);
2142                 kiblnd_finalise_conn(conn);
2143                 return;
2144         }
2145
2146         /* connection established */
2147         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2148
2149         conn->ibc_last_send = jiffies;
2150         kiblnd_set_conn_state(conn, IBLND_CONN_ESTABLISHED);
2151         kiblnd_peer_alive(peer);
2152
2153         /*
2154          * Add conn to peer's list and nuke any dangling conns from a different
2155          * peer instance...
2156          */
2157         kiblnd_conn_addref(conn);              /* +1 ref for ibc_list */
2158         list_add(&conn->ibc_list, &peer->ibp_conns);
2159         peer->ibp_reconnected = 0;
2160         if (active)
2161                 peer->ibp_connecting--;
2162         else
2163                 peer->ibp_accepting--;
2164
2165         if (!peer->ibp_version) {
2166                 peer->ibp_version     = conn->ibc_version;
2167                 peer->ibp_incarnation = conn->ibc_incarnation;
2168         }
2169
2170         if (peer->ibp_version     != conn->ibc_version ||
2171             peer->ibp_incarnation != conn->ibc_incarnation) {
2172                 kiblnd_close_stale_conns_locked(peer, conn->ibc_version,
2173                                                 conn->ibc_incarnation);
2174                 peer->ibp_version     = conn->ibc_version;
2175                 peer->ibp_incarnation = conn->ibc_incarnation;
2176         }
2177
2178         /* grab pending txs while I have the lock */
2179         list_add(&txs, &peer->ibp_tx_queue);
2180         list_del_init(&peer->ibp_tx_queue);
2181
2182         if (!kiblnd_peer_active(peer) ||        /* peer has been deleted */
2183             conn->ibc_comms_error) {       /* error has happened already */
2184                 lnet_ni_t *ni = peer->ibp_ni;
2185
2186                 /* start to shut down connection */
2187                 kiblnd_close_conn_locked(conn, -ECONNABORTED);
2188                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2189
2190                 kiblnd_txlist_done(ni, &txs, -ECONNABORTED);
2191
2192                 return;
2193         }
2194
2195         /*
2196          * +1 ref for myself, this connection is visible to other threads
2197          * now, refcount of peer:ibp_conns can be released by connection
2198          * close from either a different thread, or the calling of
2199          * kiblnd_check_sends_locked() below. See bz21911 for details.
2200          */
2201         kiblnd_conn_addref(conn);
2202         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2203
2204         /* Schedule blocked txs */
2205         spin_lock(&conn->ibc_lock);
2206         list_for_each_entry_safe(tx, tmp, &txs, tx_list) {
2207                 list_del(&tx->tx_list);
2208
2209                 kiblnd_queue_tx_locked(tx, conn);
2210         }
2211         kiblnd_check_sends_locked(conn);
2212         spin_unlock(&conn->ibc_lock);
2213
2214         /* schedule blocked rxs */
2215         kiblnd_handle_early_rxs(conn);
2216
2217         kiblnd_conn_decref(conn);
2218 }
2219
2220 static void
2221 kiblnd_reject(struct rdma_cm_id *cmid, struct kib_rej *rej)
2222 {
2223         int rc;
2224
2225         rc = rdma_reject(cmid, rej, sizeof(*rej));
2226
2227         if (rc)
2228                 CWARN("Error %d sending reject\n", rc);
2229 }
2230
2231 static int
2232 kiblnd_passive_connect(struct rdma_cm_id *cmid, void *priv, int priv_nob)
2233 {
2234         rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
2235         struct kib_msg *reqmsg = priv;
2236         struct kib_msg *ackmsg;
2237         struct kib_dev *ibdev;
2238         struct kib_peer *peer;
2239         struct kib_peer *peer2;
2240         struct kib_conn *conn;
2241         lnet_ni_t *ni  = NULL;
2242         struct kib_net *net = NULL;
2243         lnet_nid_t nid;
2244         struct rdma_conn_param cp;
2245         struct kib_rej rej;
2246         int version = IBLND_MSG_VERSION;
2247         unsigned long flags;
2248         int max_frags;
2249         int rc;
2250         struct sockaddr_in *peer_addr;
2251
2252         LASSERT(!in_interrupt());
2253
2254         /* cmid inherits 'context' from the corresponding listener id */
2255         ibdev = (struct kib_dev *)cmid->context;
2256         LASSERT(ibdev);
2257
2258         memset(&rej, 0, sizeof(rej));
2259         rej.ibr_magic = IBLND_MSG_MAGIC;
2260         rej.ibr_why = IBLND_REJECT_FATAL;
2261         rej.ibr_cp.ibcp_max_msg_size = IBLND_MSG_SIZE;
2262
2263         peer_addr = (struct sockaddr_in *)&cmid->route.addr.dst_addr;
2264         if (*kiblnd_tunables.kib_require_priv_port &&
2265             ntohs(peer_addr->sin_port) >= PROT_SOCK) {
2266                 __u32 ip = ntohl(peer_addr->sin_addr.s_addr);
2267
2268                 CERROR("Peer's port (%pI4h:%hu) is not privileged\n",
2269                        &ip, ntohs(peer_addr->sin_port));
2270                 goto failed;
2271         }
2272
2273         if (priv_nob < offsetof(struct kib_msg, ibm_type)) {
2274                 CERROR("Short connection request\n");
2275                 goto failed;
2276         }
2277
2278         /*
2279          * Future protocol version compatibility support!  If the
2280          * o2iblnd-specific protocol changes, or when LNET unifies
2281          * protocols over all LNDs, the initial connection will
2282          * negotiate a protocol version.  I trap this here to avoid
2283          * console errors; the reject tells the peer which protocol I
2284          * speak.
2285          */
2286         if (reqmsg->ibm_magic == LNET_PROTO_MAGIC ||
2287             reqmsg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
2288                 goto failed;
2289         if (reqmsg->ibm_magic == IBLND_MSG_MAGIC &&
2290             reqmsg->ibm_version != IBLND_MSG_VERSION &&
2291             reqmsg->ibm_version != IBLND_MSG_VERSION_1)
2292                 goto failed;
2293         if (reqmsg->ibm_magic == __swab32(IBLND_MSG_MAGIC) &&
2294             reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION) &&
2295             reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION_1))
2296                 goto failed;
2297
2298         rc = kiblnd_unpack_msg(reqmsg, priv_nob);
2299         if (rc) {
2300                 CERROR("Can't parse connection request: %d\n", rc);
2301                 goto failed;
2302         }
2303
2304         nid = reqmsg->ibm_srcnid;
2305         ni = lnet_net2ni(LNET_NIDNET(reqmsg->ibm_dstnid));
2306
2307         if (ni) {
2308                 net = (struct kib_net *)ni->ni_data;
2309                 rej.ibr_incarnation = net->ibn_incarnation;
2310         }
2311
2312         if (!ni ||                       /* no matching net */
2313             ni->ni_nid != reqmsg->ibm_dstnid ||   /* right NET, wrong NID! */
2314             net->ibn_dev != ibdev) {          /* wrong device */
2315                 CERROR("Can't accept conn from %s on %s (%s:%d:%pI4h): bad dst nid %s\n",
2316                        libcfs_nid2str(nid),
2317                        !ni ? "NA" : libcfs_nid2str(ni->ni_nid),
2318                        ibdev->ibd_ifname, ibdev->ibd_nnets,
2319                        &ibdev->ibd_ifip,
2320                        libcfs_nid2str(reqmsg->ibm_dstnid));
2321
2322                 goto failed;
2323         }
2324
2325        /* check time stamp as soon as possible */
2326         if (reqmsg->ibm_dststamp &&
2327             reqmsg->ibm_dststamp != net->ibn_incarnation) {
2328                 CWARN("Stale connection request\n");
2329                 rej.ibr_why = IBLND_REJECT_CONN_STALE;
2330                 goto failed;
2331         }
2332
2333         /* I can accept peer's version */
2334         version = reqmsg->ibm_version;
2335
2336         if (reqmsg->ibm_type != IBLND_MSG_CONNREQ) {
2337                 CERROR("Unexpected connreq msg type: %x from %s\n",
2338                        reqmsg->ibm_type, libcfs_nid2str(nid));
2339                 goto failed;
2340         }
2341
2342         if (reqmsg->ibm_u.connparams.ibcp_queue_depth >
2343             kiblnd_msg_queue_size(version, ni)) {
2344                 CERROR("Can't accept conn from %s, queue depth too large: %d (<=%d wanted)\n",
2345                        libcfs_nid2str(nid),
2346                        reqmsg->ibm_u.connparams.ibcp_queue_depth,
2347                        kiblnd_msg_queue_size(version, ni));
2348
2349                 if (version == IBLND_MSG_VERSION)
2350                         rej.ibr_why = IBLND_REJECT_MSG_QUEUE_SIZE;
2351
2352                 goto failed;
2353         }
2354
2355         max_frags = reqmsg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT;
2356         if (max_frags > kiblnd_rdma_frags(version, ni)) {
2357                 CWARN("Can't accept conn from %s (version %x): max message size %d is too large (%d wanted)\n",
2358                       libcfs_nid2str(nid), version, max_frags,
2359                       kiblnd_rdma_frags(version, ni));
2360
2361                 if (version >= IBLND_MSG_VERSION)
2362                         rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2363
2364                 goto failed;
2365         } else if (max_frags < kiblnd_rdma_frags(version, ni) &&
2366                    !net->ibn_fmr_ps) {
2367                 CWARN("Can't accept conn from %s (version %x): max message size %d incompatible without FMR pool (%d wanted)\n",
2368                       libcfs_nid2str(nid), version, max_frags,
2369                       kiblnd_rdma_frags(version, ni));
2370
2371                 if (version == IBLND_MSG_VERSION)
2372                         rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2373
2374                 goto failed;
2375         }
2376
2377         if (reqmsg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2378                 CERROR("Can't accept %s: message size %d too big (%d max)\n",
2379                        libcfs_nid2str(nid),
2380                        reqmsg->ibm_u.connparams.ibcp_max_msg_size,
2381                        IBLND_MSG_SIZE);
2382                 goto failed;
2383         }
2384
2385         /* assume 'nid' is a new peer; create  */
2386         rc = kiblnd_create_peer(ni, &peer, nid);
2387         if (rc) {
2388                 CERROR("Can't create peer for %s\n", libcfs_nid2str(nid));
2389                 rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2390                 goto failed;
2391         }
2392
2393         /* We have validated the peer's parameters so use those */
2394         peer->ibp_max_frags = max_frags;
2395         peer->ibp_queue_depth = reqmsg->ibm_u.connparams.ibcp_queue_depth;
2396
2397         write_lock_irqsave(g_lock, flags);
2398
2399         peer2 = kiblnd_find_peer_locked(nid);
2400         if (peer2) {
2401                 if (!peer2->ibp_version) {
2402                         peer2->ibp_version     = version;
2403                         peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2404                 }
2405
2406                 /* not the guy I've talked with */
2407                 if (peer2->ibp_incarnation != reqmsg->ibm_srcstamp ||
2408                     peer2->ibp_version     != version) {
2409                         kiblnd_close_peer_conns_locked(peer2, -ESTALE);
2410
2411                         if (kiblnd_peer_active(peer2)) {
2412                                 peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2413                                 peer2->ibp_version = version;
2414                         }
2415                         write_unlock_irqrestore(g_lock, flags);
2416
2417                         CWARN("Conn stale %s version %x/%x incarnation %llu/%llu\n",
2418                               libcfs_nid2str(nid), peer2->ibp_version, version,
2419                               peer2->ibp_incarnation, reqmsg->ibm_srcstamp);
2420
2421                         kiblnd_peer_decref(peer);
2422                         rej.ibr_why = IBLND_REJECT_CONN_STALE;
2423                         goto failed;
2424                 }
2425
2426                 /*
2427                  * Tie-break connection race in favour of the higher NID.
2428                  * If we keep running into a race condition multiple times,
2429                  * we have to assume that the connection attempt with the
2430                  * higher NID is stuck in a connecting state and will never
2431                  * recover.  As such, we pass through this if-block and let
2432                  * the lower NID connection win so we can move forward.
2433                  */
2434                 if (peer2->ibp_connecting &&
2435                     nid < ni->ni_nid && peer2->ibp_races <
2436                     MAX_CONN_RACES_BEFORE_ABORT) {
2437                         peer2->ibp_races++;
2438                         write_unlock_irqrestore(g_lock, flags);
2439
2440                         CDEBUG(D_NET, "Conn race %s\n",
2441                                libcfs_nid2str(peer2->ibp_nid));
2442
2443                         kiblnd_peer_decref(peer);
2444                         rej.ibr_why = IBLND_REJECT_CONN_RACE;
2445                         goto failed;
2446                 }
2447                 if (peer2->ibp_races >= MAX_CONN_RACES_BEFORE_ABORT)
2448                         CNETERR("Conn race %s: unresolved after %d attempts, letting lower NID win\n",
2449                                 libcfs_nid2str(peer2->ibp_nid),
2450                                 MAX_CONN_RACES_BEFORE_ABORT);
2451                 /**
2452                  * passive connection is allowed even this peer is waiting for
2453                  * reconnection.
2454                  */
2455                 peer2->ibp_reconnecting = 0;
2456                 peer2->ibp_races = 0;
2457                 peer2->ibp_accepting++;
2458                 kiblnd_peer_addref(peer2);
2459
2460                 /**
2461                  * Race with kiblnd_launch_tx (active connect) to create peer
2462                  * so copy validated parameters since we now know what the
2463                  * peer's limits are
2464                  */
2465                 peer2->ibp_max_frags = peer->ibp_max_frags;
2466                 peer2->ibp_queue_depth = peer->ibp_queue_depth;
2467
2468                 write_unlock_irqrestore(g_lock, flags);
2469                 kiblnd_peer_decref(peer);
2470                 peer = peer2;
2471         } else {
2472                 /* Brand new peer */
2473                 LASSERT(!peer->ibp_accepting);
2474                 LASSERT(!peer->ibp_version &&
2475                         !peer->ibp_incarnation);
2476
2477                 peer->ibp_accepting   = 1;
2478                 peer->ibp_version     = version;
2479                 peer->ibp_incarnation = reqmsg->ibm_srcstamp;
2480
2481                 /* I have a ref on ni that prevents it being shutdown */
2482                 LASSERT(!net->ibn_shutdown);
2483
2484                 kiblnd_peer_addref(peer);
2485                 list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
2486
2487                 write_unlock_irqrestore(g_lock, flags);
2488         }
2489
2490         conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_PASSIVE_WAIT,
2491                                   version);
2492         if (!conn) {
2493                 kiblnd_peer_connect_failed(peer, 0, -ENOMEM);
2494                 kiblnd_peer_decref(peer);
2495                 rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2496                 goto failed;
2497         }
2498
2499         /*
2500          * conn now "owns" cmid, so I return success from here on to ensure the
2501          * CM callback doesn't destroy cmid.
2502          */
2503         conn->ibc_incarnation      = reqmsg->ibm_srcstamp;
2504         conn->ibc_credits          = conn->ibc_queue_depth;
2505         conn->ibc_reserved_credits = conn->ibc_queue_depth;
2506         LASSERT(conn->ibc_credits + conn->ibc_reserved_credits +
2507                 IBLND_OOB_MSGS(version) <= IBLND_RX_MSGS(conn));
2508
2509         ackmsg = &conn->ibc_connvars->cv_msg;
2510         memset(ackmsg, 0, sizeof(*ackmsg));
2511
2512         kiblnd_init_msg(ackmsg, IBLND_MSG_CONNACK,
2513                         sizeof(ackmsg->ibm_u.connparams));
2514         ackmsg->ibm_u.connparams.ibcp_queue_depth = conn->ibc_queue_depth;
2515         ackmsg->ibm_u.connparams.ibcp_max_frags = conn->ibc_max_frags << IBLND_FRAG_SHIFT;
2516         ackmsg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2517
2518         kiblnd_pack_msg(ni, ackmsg, version, 0, nid, reqmsg->ibm_srcstamp);
2519
2520         memset(&cp, 0, sizeof(cp));
2521         cp.private_data = ackmsg;
2522         cp.private_data_len = ackmsg->ibm_nob;
2523         cp.responder_resources = 0;          /* No atomic ops or RDMA reads */
2524         cp.initiator_depth = 0;
2525         cp.flow_control = 1;
2526         cp.retry_count = *kiblnd_tunables.kib_retry_count;
2527         cp.rnr_retry_count = *kiblnd_tunables.kib_rnr_retry_count;
2528
2529         CDEBUG(D_NET, "Accept %s\n", libcfs_nid2str(nid));
2530
2531         rc = rdma_accept(cmid, &cp);
2532         if (rc) {
2533                 CERROR("Can't accept %s: %d\n", libcfs_nid2str(nid), rc);
2534                 rej.ibr_version = version;
2535                 rej.ibr_why     = IBLND_REJECT_FATAL;
2536
2537                 kiblnd_reject(cmid, &rej);
2538                 kiblnd_connreq_done(conn, rc);
2539                 kiblnd_conn_decref(conn);
2540         }
2541
2542         lnet_ni_decref(ni);
2543         return 0;
2544
2545  failed:
2546         if (ni) {
2547                 rej.ibr_cp.ibcp_queue_depth = kiblnd_msg_queue_size(version, ni);
2548                 rej.ibr_cp.ibcp_max_frags = kiblnd_rdma_frags(version, ni);
2549                 lnet_ni_decref(ni);
2550         }
2551
2552         rej.ibr_version             = version;
2553         kiblnd_reject(cmid, &rej);
2554
2555         return -ECONNREFUSED;
2556 }
2557
2558 static void
2559 kiblnd_check_reconnect(struct kib_conn *conn, int version,
2560                        __u64 incarnation, int why, struct kib_connparams *cp)
2561 {
2562         rwlock_t *glock = &kiblnd_data.kib_global_lock;
2563         struct kib_peer *peer = conn->ibc_peer;
2564         char *reason;
2565         int msg_size = IBLND_MSG_SIZE;
2566         int frag_num = -1;
2567         int queue_dep = -1;
2568         bool reconnect;
2569         unsigned long flags;
2570
2571         LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2572         LASSERT(peer->ibp_connecting > 0);     /* 'conn' at least */
2573         LASSERT(!peer->ibp_reconnecting);
2574
2575         if (cp) {
2576                 msg_size = cp->ibcp_max_msg_size;
2577                 frag_num        = cp->ibcp_max_frags << IBLND_FRAG_SHIFT;
2578                 queue_dep = cp->ibcp_queue_depth;
2579         }
2580
2581         write_lock_irqsave(glock, flags);
2582         /**
2583          * retry connection if it's still needed and no other connection
2584          * attempts (active or passive) are in progress
2585          * NB: reconnect is still needed even when ibp_tx_queue is
2586          * empty if ibp_version != version because reconnect may be
2587          * initiated by kiblnd_query()
2588          */
2589         reconnect = (!list_empty(&peer->ibp_tx_queue) ||
2590                      peer->ibp_version != version) &&
2591                     peer->ibp_connecting == 1 &&
2592                     !peer->ibp_accepting;
2593         if (!reconnect) {
2594                 reason = "no need";
2595                 goto out;
2596         }
2597
2598         switch (why) {
2599         default:
2600                 reason = "Unknown";
2601                 break;
2602
2603         case IBLND_REJECT_RDMA_FRAGS: {
2604                 struct lnet_ioctl_config_lnd_tunables *tunables;
2605
2606                 if (!cp) {
2607                         reason = "can't negotiate max frags";
2608                         goto out;
2609                 }
2610                 tunables = peer->ibp_ni->ni_lnd_tunables;
2611                 if (!tunables->lt_tun_u.lt_o2ib.lnd_map_on_demand) {
2612                         reason = "map_on_demand must be enabled";
2613                         goto out;
2614                 }
2615                 if (conn->ibc_max_frags <= frag_num) {
2616                         reason = "unsupported max frags";
2617                         goto out;
2618                 }
2619
2620                 peer->ibp_max_frags = frag_num;
2621                 reason = "rdma fragments";
2622                 break;
2623         }
2624         case IBLND_REJECT_MSG_QUEUE_SIZE:
2625                 if (!cp) {
2626                         reason = "can't negotiate queue depth";
2627                         goto out;
2628                 }
2629                 if (conn->ibc_queue_depth <= queue_dep) {
2630                         reason = "unsupported queue depth";
2631                         goto out;
2632                 }
2633
2634                 peer->ibp_queue_depth = queue_dep;
2635                 reason = "queue depth";
2636                 break;
2637
2638         case IBLND_REJECT_CONN_STALE:
2639                 reason = "stale";
2640                 break;
2641
2642         case IBLND_REJECT_CONN_RACE:
2643                 reason = "conn race";
2644                 break;
2645
2646         case IBLND_REJECT_CONN_UNCOMPAT:
2647                 reason = "version negotiation";
2648                 break;
2649         }
2650
2651         conn->ibc_reconnect = 1;
2652         peer->ibp_reconnecting = 1;
2653         peer->ibp_version = version;
2654         if (incarnation)
2655                 peer->ibp_incarnation = incarnation;
2656 out:
2657         write_unlock_irqrestore(glock, flags);
2658
2659         CNETERR("%s: %s (%s), %x, %x, msg_size: %d, queue_depth: %d/%d, max_frags: %d/%d\n",
2660                 libcfs_nid2str(peer->ibp_nid),
2661                 reconnect ? "reconnect" : "don't reconnect",
2662                 reason, IBLND_MSG_VERSION, version, msg_size,
2663                 conn->ibc_queue_depth, queue_dep,
2664                 conn->ibc_max_frags, frag_num);
2665         /**
2666          * if conn::ibc_reconnect is TRUE, connd will reconnect to the peer
2667          * while destroying the zombie
2668          */
2669 }
2670
2671 static void
2672 kiblnd_rejected(struct kib_conn *conn, int reason, void *priv, int priv_nob)
2673 {
2674         struct kib_peer *peer = conn->ibc_peer;
2675
2676         LASSERT(!in_interrupt());
2677         LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2678
2679         switch (reason) {
2680         case IB_CM_REJ_STALE_CONN:
2681                 kiblnd_check_reconnect(conn, IBLND_MSG_VERSION, 0,
2682                                        IBLND_REJECT_CONN_STALE, NULL);
2683                 break;
2684
2685         case IB_CM_REJ_INVALID_SERVICE_ID:
2686                 CNETERR("%s rejected: no listener at %d\n",
2687                         libcfs_nid2str(peer->ibp_nid),
2688                         *kiblnd_tunables.kib_service);
2689                 break;
2690
2691         case IB_CM_REJ_CONSUMER_DEFINED:
2692                 if (priv_nob >= offsetof(struct kib_rej, ibr_padding)) {
2693                         struct kib_rej *rej = priv;
2694                         struct kib_connparams *cp = NULL;
2695                         int flip = 0;
2696                         __u64 incarnation = -1;
2697
2698                         /* NB. default incarnation is -1 because:
2699                          * a) V1 will ignore dst incarnation in connreq.
2700                          * b) V2 will provide incarnation while rejecting me,
2701                          *    -1 will be overwrote.
2702                          *
2703                          * if I try to connect to a V1 peer with V2 protocol,
2704                          * it rejected me then upgrade to V2, I have no idea
2705                          * about the upgrading and try to reconnect with V1,
2706                          * in this case upgraded V2 can find out I'm trying to
2707                          * talk to the old guy and reject me(incarnation is -1).
2708                          */
2709
2710                         if (rej->ibr_magic == __swab32(IBLND_MSG_MAGIC) ||
2711                             rej->ibr_magic == __swab32(LNET_PROTO_MAGIC)) {
2712                                 __swab32s(&rej->ibr_magic);
2713                                 __swab16s(&rej->ibr_version);
2714                                 flip = 1;
2715                         }
2716
2717                         if (priv_nob >= sizeof(struct kib_rej) &&
2718                             rej->ibr_version > IBLND_MSG_VERSION_1) {
2719                                 /*
2720                                  * priv_nob is always 148 in current version
2721                                  * of OFED, so we still need to check version.
2722                                  * (define of IB_CM_REJ_PRIVATE_DATA_SIZE)
2723                                  */
2724                                 cp = &rej->ibr_cp;
2725
2726                                 if (flip) {
2727                                         __swab64s(&rej->ibr_incarnation);
2728                                         __swab16s(&cp->ibcp_queue_depth);
2729                                         __swab16s(&cp->ibcp_max_frags);
2730                                         __swab32s(&cp->ibcp_max_msg_size);
2731                                 }
2732
2733                                 incarnation = rej->ibr_incarnation;
2734                         }
2735
2736                         if (rej->ibr_magic != IBLND_MSG_MAGIC &&
2737                             rej->ibr_magic != LNET_PROTO_MAGIC) {
2738                                 CERROR("%s rejected: consumer defined fatal error\n",
2739                                        libcfs_nid2str(peer->ibp_nid));
2740                                 break;
2741                         }
2742
2743                         if (rej->ibr_version != IBLND_MSG_VERSION &&
2744                             rej->ibr_version != IBLND_MSG_VERSION_1) {
2745                                 CERROR("%s rejected: o2iblnd version %x error\n",
2746                                        libcfs_nid2str(peer->ibp_nid),
2747                                        rej->ibr_version);
2748                                 break;
2749                         }
2750
2751                         if (rej->ibr_why     == IBLND_REJECT_FATAL &&
2752                             rej->ibr_version == IBLND_MSG_VERSION_1) {
2753                                 CDEBUG(D_NET, "rejected by old version peer %s: %x\n",
2754                                        libcfs_nid2str(peer->ibp_nid), rej->ibr_version);
2755
2756                                 if (conn->ibc_version != IBLND_MSG_VERSION_1)
2757                                         rej->ibr_why = IBLND_REJECT_CONN_UNCOMPAT;
2758                         }
2759
2760                         switch (rej->ibr_why) {
2761                         case IBLND_REJECT_CONN_RACE:
2762                         case IBLND_REJECT_CONN_STALE:
2763                         case IBLND_REJECT_CONN_UNCOMPAT:
2764                         case IBLND_REJECT_MSG_QUEUE_SIZE:
2765                         case IBLND_REJECT_RDMA_FRAGS:
2766                                 kiblnd_check_reconnect(conn, rej->ibr_version,
2767                                                        incarnation,
2768                                                        rej->ibr_why, cp);
2769                                 break;
2770
2771                         case IBLND_REJECT_NO_RESOURCES:
2772                                 CERROR("%s rejected: o2iblnd no resources\n",
2773                                        libcfs_nid2str(peer->ibp_nid));
2774                                 break;
2775
2776                         case IBLND_REJECT_FATAL:
2777                                 CERROR("%s rejected: o2iblnd fatal error\n",
2778                                        libcfs_nid2str(peer->ibp_nid));
2779                                 break;
2780
2781                         default:
2782                                 CERROR("%s rejected: o2iblnd reason %d\n",
2783                                        libcfs_nid2str(peer->ibp_nid),
2784                                        rej->ibr_why);
2785                                 break;
2786                         }
2787                         break;
2788                 }
2789                 /* fall through */
2790         default:
2791                 CNETERR("%s rejected: reason %d, size %d\n",
2792                         libcfs_nid2str(peer->ibp_nid), reason, priv_nob);
2793                 break;
2794         }
2795
2796         kiblnd_connreq_done(conn, -ECONNREFUSED);
2797 }
2798
2799 static void
2800 kiblnd_check_connreply(struct kib_conn *conn, void *priv, int priv_nob)
2801 {
2802         struct kib_peer *peer = conn->ibc_peer;
2803         lnet_ni_t *ni = peer->ibp_ni;
2804         struct kib_net *net = ni->ni_data;
2805         struct kib_msg *msg = priv;
2806         int ver = conn->ibc_version;
2807         int rc = kiblnd_unpack_msg(msg, priv_nob);
2808         unsigned long flags;
2809
2810         LASSERT(net);
2811
2812         if (rc) {
2813                 CERROR("Can't unpack connack from %s: %d\n",
2814                        libcfs_nid2str(peer->ibp_nid), rc);
2815                 goto failed;
2816         }
2817
2818         if (msg->ibm_type != IBLND_MSG_CONNACK) {
2819                 CERROR("Unexpected message %d from %s\n",
2820                        msg->ibm_type, libcfs_nid2str(peer->ibp_nid));
2821                 rc = -EPROTO;
2822                 goto failed;
2823         }
2824
2825         if (ver != msg->ibm_version) {
2826                 CERROR("%s replied version %x is different with requested version %x\n",
2827                        libcfs_nid2str(peer->ibp_nid), msg->ibm_version, ver);
2828                 rc = -EPROTO;
2829                 goto failed;
2830         }
2831
2832         if (msg->ibm_u.connparams.ibcp_queue_depth >
2833             conn->ibc_queue_depth) {
2834                 CERROR("%s has incompatible queue depth %d (<=%d wanted)\n",
2835                        libcfs_nid2str(peer->ibp_nid),
2836                        msg->ibm_u.connparams.ibcp_queue_depth,
2837                        conn->ibc_queue_depth);
2838                 rc = -EPROTO;
2839                 goto failed;
2840         }
2841
2842         if ((msg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT) >
2843             conn->ibc_max_frags) {
2844                 CERROR("%s has incompatible max_frags %d (<=%d wanted)\n",
2845                        libcfs_nid2str(peer->ibp_nid),
2846                        msg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT,
2847                        conn->ibc_max_frags);
2848                 rc = -EPROTO;
2849                 goto failed;
2850         }
2851
2852         if (msg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2853                 CERROR("%s max message size %d too big (%d max)\n",
2854                        libcfs_nid2str(peer->ibp_nid),
2855                        msg->ibm_u.connparams.ibcp_max_msg_size,
2856                        IBLND_MSG_SIZE);
2857                 rc = -EPROTO;
2858                 goto failed;
2859         }
2860
2861         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2862         if (msg->ibm_dstnid == ni->ni_nid &&
2863             msg->ibm_dststamp == net->ibn_incarnation)
2864                 rc = 0;
2865         else
2866                 rc = -ESTALE;
2867         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2868
2869         if (rc) {
2870                 CERROR("Bad connection reply from %s, rc = %d, version: %x max_frags: %d\n",
2871                        libcfs_nid2str(peer->ibp_nid), rc,
2872                        msg->ibm_version, msg->ibm_u.connparams.ibcp_max_frags);
2873                 goto failed;
2874         }
2875
2876         conn->ibc_incarnation = msg->ibm_srcstamp;
2877         conn->ibc_credits = msg->ibm_u.connparams.ibcp_queue_depth;
2878         conn->ibc_reserved_credits = msg->ibm_u.connparams.ibcp_queue_depth;
2879         conn->ibc_queue_depth = msg->ibm_u.connparams.ibcp_queue_depth;
2880         conn->ibc_max_frags = msg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT;
2881         LASSERT(conn->ibc_credits + conn->ibc_reserved_credits +
2882                 IBLND_OOB_MSGS(ver) <= IBLND_RX_MSGS(conn));
2883
2884         kiblnd_connreq_done(conn, 0);
2885         return;
2886
2887  failed:
2888         /*
2889          * NB My QP has already established itself, so I handle anything going
2890          * wrong here by setting ibc_comms_error.
2891          * kiblnd_connreq_done(0) moves the conn state to ESTABLISHED, but then
2892          * immediately tears it down.
2893          */
2894         LASSERT(rc);
2895         conn->ibc_comms_error = rc;
2896         kiblnd_connreq_done(conn, 0);
2897 }
2898
2899 static int
2900 kiblnd_active_connect(struct rdma_cm_id *cmid)
2901 {
2902         struct kib_peer *peer = (struct kib_peer *)cmid->context;
2903         struct kib_conn *conn;
2904         struct kib_msg *msg;
2905         struct rdma_conn_param cp;
2906         int version;
2907         __u64 incarnation;
2908         unsigned long flags;
2909         int rc;
2910
2911         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2912
2913         incarnation = peer->ibp_incarnation;
2914         version = !peer->ibp_version ? IBLND_MSG_VERSION :
2915                                        peer->ibp_version;
2916
2917         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2918
2919         conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_ACTIVE_CONNECT,
2920                                   version);
2921         if (!conn) {
2922                 kiblnd_peer_connect_failed(peer, 1, -ENOMEM);
2923                 kiblnd_peer_decref(peer); /* lose cmid's ref */
2924                 return -ENOMEM;
2925         }
2926
2927         /*
2928          * conn "owns" cmid now, so I return success from here on to ensure the
2929          * CM callback doesn't destroy cmid. conn also takes over cmid's ref
2930          * on peer
2931          */
2932         msg = &conn->ibc_connvars->cv_msg;
2933
2934         memset(msg, 0, sizeof(*msg));
2935         kiblnd_init_msg(msg, IBLND_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2936         msg->ibm_u.connparams.ibcp_queue_depth = conn->ibc_queue_depth;
2937         msg->ibm_u.connparams.ibcp_max_frags = conn->ibc_max_frags << IBLND_FRAG_SHIFT;
2938         msg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2939
2940         kiblnd_pack_msg(peer->ibp_ni, msg, version,
2941                         0, peer->ibp_nid, incarnation);
2942
2943         memset(&cp, 0, sizeof(cp));
2944         cp.private_data = msg;
2945         cp.private_data_len    = msg->ibm_nob;
2946         cp.responder_resources = 0;          /* No atomic ops or RDMA reads */
2947         cp.initiator_depth     = 0;
2948         cp.flow_control        = 1;
2949         cp.retry_count         = *kiblnd_tunables.kib_retry_count;
2950         cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2951
2952         LASSERT(cmid->context == (void *)conn);
2953         LASSERT(conn->ibc_cmid == cmid);
2954
2955         rc = rdma_connect(cmid, &cp);
2956         if (rc) {
2957                 CERROR("Can't connect to %s: %d\n",
2958                        libcfs_nid2str(peer->ibp_nid), rc);
2959                 kiblnd_connreq_done(conn, rc);
2960                 kiblnd_conn_decref(conn);
2961         }
2962
2963         return 0;
2964 }
2965
2966 int
2967 kiblnd_cm_callback(struct rdma_cm_id *cmid, struct rdma_cm_event *event)
2968 {
2969         struct kib_peer *peer;
2970         struct kib_conn *conn;
2971         int rc;
2972
2973         switch (event->event) {
2974         default:
2975                 CERROR("Unexpected event: %d, status: %d\n",
2976                        event->event, event->status);
2977                 LBUG();
2978
2979         case RDMA_CM_EVENT_CONNECT_REQUEST:
2980                 /* destroy cmid on failure */
2981                 rc = kiblnd_passive_connect(cmid,
2982                                             (void *)KIBLND_CONN_PARAM(event),
2983                                             KIBLND_CONN_PARAM_LEN(event));
2984                 CDEBUG(D_NET, "connreq: %d\n", rc);
2985                 return rc;
2986
2987         case RDMA_CM_EVENT_ADDR_ERROR:
2988                 peer = (struct kib_peer *)cmid->context;
2989                 CNETERR("%s: ADDR ERROR %d\n",
2990                         libcfs_nid2str(peer->ibp_nid), event->status);
2991                 kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2992                 kiblnd_peer_decref(peer);
2993                 return -EHOSTUNREACH;      /* rc destroys cmid */
2994
2995         case RDMA_CM_EVENT_ADDR_RESOLVED:
2996                 peer = (struct kib_peer *)cmid->context;
2997
2998                 CDEBUG(D_NET, "%s Addr resolved: %d\n",
2999                        libcfs_nid2str(peer->ibp_nid), event->status);
3000
3001                 if (event->status) {
3002                         CNETERR("Can't resolve address for %s: %d\n",
3003                                 libcfs_nid2str(peer->ibp_nid), event->status);
3004                         rc = event->status;
3005                 } else {
3006                         rc = rdma_resolve_route(
3007                                 cmid, *kiblnd_tunables.kib_timeout * 1000);
3008                         if (!rc)
3009                                 return 0;
3010                         /* Can't initiate route resolution */
3011                         CERROR("Can't resolve route for %s: %d\n",
3012                                libcfs_nid2str(peer->ibp_nid), rc);
3013                 }
3014                 kiblnd_peer_connect_failed(peer, 1, rc);
3015                 kiblnd_peer_decref(peer);
3016                 return rc;                    /* rc destroys cmid */
3017
3018         case RDMA_CM_EVENT_ROUTE_ERROR:
3019                 peer = (struct kib_peer *)cmid->context;
3020                 CNETERR("%s: ROUTE ERROR %d\n",
3021                         libcfs_nid2str(peer->ibp_nid), event->status);
3022                 kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
3023                 kiblnd_peer_decref(peer);
3024                 return -EHOSTUNREACH;      /* rc destroys cmid */
3025
3026         case RDMA_CM_EVENT_ROUTE_RESOLVED:
3027                 peer = (struct kib_peer *)cmid->context;
3028                 CDEBUG(D_NET, "%s Route resolved: %d\n",
3029                        libcfs_nid2str(peer->ibp_nid), event->status);
3030
3031                 if (!event->status)
3032                         return kiblnd_active_connect(cmid);
3033
3034                 CNETERR("Can't resolve route for %s: %d\n",
3035                         libcfs_nid2str(peer->ibp_nid), event->status);
3036                 kiblnd_peer_connect_failed(peer, 1, event->status);
3037                 kiblnd_peer_decref(peer);
3038                 return event->status;      /* rc destroys cmid */
3039
3040         case RDMA_CM_EVENT_UNREACHABLE:
3041                 conn = (struct kib_conn *)cmid->context;
3042                 LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
3043                         conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
3044                 CNETERR("%s: UNREACHABLE %d\n",
3045                         libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
3046                 kiblnd_connreq_done(conn, -ENETDOWN);
3047                 kiblnd_conn_decref(conn);
3048                 return 0;
3049
3050         case RDMA_CM_EVENT_CONNECT_ERROR:
3051                 conn = (struct kib_conn *)cmid->context;
3052                 LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
3053                         conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
3054                 CNETERR("%s: CONNECT ERROR %d\n",
3055                         libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
3056                 kiblnd_connreq_done(conn, -ENOTCONN);
3057                 kiblnd_conn_decref(conn);
3058                 return 0;
3059
3060         case RDMA_CM_EVENT_REJECTED:
3061                 conn = (struct kib_conn *)cmid->context;
3062                 switch (conn->ibc_state) {
3063                 default:
3064                         LBUG();
3065
3066                 case IBLND_CONN_PASSIVE_WAIT:
3067                         CERROR("%s: REJECTED %d\n",
3068                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
3069                                event->status);
3070                         kiblnd_connreq_done(conn, -ECONNRESET);
3071                         break;
3072
3073                 case IBLND_CONN_ACTIVE_CONNECT:
3074                         kiblnd_rejected(conn, event->status,
3075                                         (void *)KIBLND_CONN_PARAM(event),
3076                                         KIBLND_CONN_PARAM_LEN(event));
3077                         break;
3078                 }
3079                 kiblnd_conn_decref(conn);
3080                 return 0;
3081
3082         case RDMA_CM_EVENT_ESTABLISHED:
3083                 conn = (struct kib_conn *)cmid->context;
3084                 switch (conn->ibc_state) {
3085                 default:
3086                         LBUG();
3087
3088                 case IBLND_CONN_PASSIVE_WAIT:
3089                         CDEBUG(D_NET, "ESTABLISHED (passive): %s\n",
3090                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
3091                         kiblnd_connreq_done(conn, 0);
3092                         break;
3093
3094                 case IBLND_CONN_ACTIVE_CONNECT:
3095                         CDEBUG(D_NET, "ESTABLISHED(active): %s\n",
3096                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
3097                         kiblnd_check_connreply(conn,
3098                                                (void *)KIBLND_CONN_PARAM(event),
3099                                                KIBLND_CONN_PARAM_LEN(event));
3100                         break;
3101                 }
3102                 /* net keeps its ref on conn! */
3103                 return 0;
3104
3105         case RDMA_CM_EVENT_TIMEWAIT_EXIT:
3106                 CDEBUG(D_NET, "Ignore TIMEWAIT_EXIT event\n");
3107                 return 0;
3108         case RDMA_CM_EVENT_DISCONNECTED:
3109                 conn = (struct kib_conn *)cmid->context;
3110                 if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
3111                         CERROR("%s DISCONNECTED\n",
3112                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
3113                         kiblnd_connreq_done(conn, -ECONNRESET);
3114                 } else {
3115                         kiblnd_close_conn(conn, 0);
3116                 }
3117                 kiblnd_conn_decref(conn);
3118                 cmid->context = NULL;
3119                 return 0;
3120
3121         case RDMA_CM_EVENT_DEVICE_REMOVAL:
3122                 LCONSOLE_ERROR_MSG(0x131,
3123                                    "Received notification of device removal\n"
3124                                    "Please shutdown LNET to allow this to proceed\n");
3125                 /*
3126                  * Can't remove network from underneath LNET for now, so I have
3127                  * to ignore this
3128                  */
3129                 return 0;
3130
3131         case RDMA_CM_EVENT_ADDR_CHANGE:
3132                 LCONSOLE_INFO("Physical link changed (eg hca/port)\n");
3133                 return 0;
3134         }
3135 }
3136
3137 static int
3138 kiblnd_check_txs_locked(struct kib_conn *conn, struct list_head *txs)
3139 {
3140         struct kib_tx *tx;
3141         struct list_head *ttmp;
3142
3143         list_for_each(ttmp, txs) {
3144                 tx = list_entry(ttmp, struct kib_tx, tx_list);
3145
3146                 if (txs != &conn->ibc_active_txs) {
3147                         LASSERT(tx->tx_queued);
3148                 } else {
3149                         LASSERT(!tx->tx_queued);
3150                         LASSERT(tx->tx_waiting || tx->tx_sending);
3151                 }
3152
3153                 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
3154                         CERROR("Timed out tx: %s, %lu seconds\n",
3155                                kiblnd_queue2str(conn, txs),
3156                                cfs_duration_sec(jiffies - tx->tx_deadline));
3157                         return 1;
3158                 }
3159         }
3160
3161         return 0;
3162 }
3163
3164 static int
3165 kiblnd_conn_timed_out_locked(struct kib_conn *conn)
3166 {
3167         return  kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue) ||
3168                 kiblnd_check_txs_locked(conn, &conn->ibc_tx_noops) ||
3169                 kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_rsrvd) ||
3170                 kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_nocred) ||
3171                 kiblnd_check_txs_locked(conn, &conn->ibc_active_txs);
3172 }
3173
3174 static void
3175 kiblnd_check_conns(int idx)
3176 {
3177         LIST_HEAD(closes);
3178         LIST_HEAD(checksends);
3179         struct list_head *peers = &kiblnd_data.kib_peers[idx];
3180         struct list_head *ptmp;
3181         struct kib_peer *peer;
3182         struct kib_conn *conn;
3183         struct kib_conn *temp;
3184         struct kib_conn *tmp;
3185         struct list_head *ctmp;
3186         unsigned long flags;
3187
3188         /*
3189          * NB. We expect to have a look at all the peers and not find any
3190          * RDMAs to time out, so we just use a shared lock while we
3191          * take a look...
3192          */
3193         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
3194
3195         list_for_each(ptmp, peers) {
3196                 peer = list_entry(ptmp, struct kib_peer, ibp_list);
3197
3198                 list_for_each(ctmp, &peer->ibp_conns) {
3199                         int timedout;
3200                         int sendnoop;
3201
3202                         conn = list_entry(ctmp, struct kib_conn, ibc_list);
3203
3204                         LASSERT(conn->ibc_state == IBLND_CONN_ESTABLISHED);
3205
3206                         spin_lock(&conn->ibc_lock);
3207
3208                         sendnoop = kiblnd_need_noop(conn);
3209                         timedout = kiblnd_conn_timed_out_locked(conn);
3210                         if (!sendnoop && !timedout) {
3211                                 spin_unlock(&conn->ibc_lock);
3212                                 continue;
3213                         }
3214
3215                         if (timedout) {
3216                                 CERROR("Timed out RDMA with %s (%lu): c: %u, oc: %u, rc: %u\n",
3217                                        libcfs_nid2str(peer->ibp_nid),
3218                                        cfs_duration_sec(cfs_time_current() -
3219                                                         peer->ibp_last_alive),
3220                                        conn->ibc_credits,
3221                                        conn->ibc_outstanding_credits,
3222                                        conn->ibc_reserved_credits);
3223                                 list_add(&conn->ibc_connd_list, &closes);
3224                         } else {
3225                                 list_add(&conn->ibc_connd_list, &checksends);
3226                         }
3227                         /* +ref for 'closes' or 'checksends' */
3228                         kiblnd_conn_addref(conn);
3229
3230                         spin_unlock(&conn->ibc_lock);
3231                 }
3232         }
3233
3234         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
3235
3236         /*
3237          * Handle timeout by closing the whole
3238          * connection. We can only be sure RDMA activity
3239          * has ceased once the QP has been modified.
3240          */
3241         list_for_each_entry_safe(conn, tmp, &closes, ibc_connd_list) {
3242                 list_del(&conn->ibc_connd_list);
3243                 kiblnd_close_conn(conn, -ETIMEDOUT);
3244                 kiblnd_conn_decref(conn);
3245         }
3246
3247         /*
3248          * In case we have enough credits to return via a
3249          * NOOP, but there were no non-blocking tx descs
3250          * free to do it last time...
3251          */
3252         list_for_each_entry_safe(conn, temp, &checksends, ibc_connd_list) {
3253                 list_del(&conn->ibc_connd_list);
3254
3255                 spin_lock(&conn->ibc_lock);
3256                 kiblnd_check_sends_locked(conn);
3257                 spin_unlock(&conn->ibc_lock);
3258
3259                 kiblnd_conn_decref(conn);
3260         }
3261 }
3262
3263 static void
3264 kiblnd_disconnect_conn(struct kib_conn *conn)
3265 {
3266         LASSERT(!in_interrupt());
3267         LASSERT(current == kiblnd_data.kib_connd);
3268         LASSERT(conn->ibc_state == IBLND_CONN_CLOSING);
3269
3270         rdma_disconnect(conn->ibc_cmid);
3271         kiblnd_finalise_conn(conn);
3272
3273         kiblnd_peer_notify(conn->ibc_peer);
3274 }
3275
3276 /**
3277  * High-water for reconnection to the same peer, reconnection attempt should
3278  * be delayed after trying more than KIB_RECONN_HIGH_RACE.
3279  */
3280 #define KIB_RECONN_HIGH_RACE    10
3281 /**
3282  * Allow connd to take a break and handle other things after consecutive
3283  * reconnection attemps.
3284  */
3285 #define KIB_RECONN_BREAK        100
3286
3287 int
3288 kiblnd_connd(void *arg)
3289 {
3290         spinlock_t *lock= &kiblnd_data.kib_connd_lock;
3291         wait_queue_t wait;
3292         unsigned long flags;
3293         struct kib_conn *conn;
3294         int timeout;
3295         int i;
3296         int dropped_lock;
3297         int peer_index = 0;
3298         unsigned long deadline = jiffies;
3299
3300         cfs_block_allsigs();
3301
3302         init_waitqueue_entry(&wait, current);
3303         kiblnd_data.kib_connd = current;
3304
3305         spin_lock_irqsave(lock, flags);
3306
3307         while (!kiblnd_data.kib_shutdown) {
3308                 int reconn = 0;
3309
3310                 dropped_lock = 0;
3311
3312                 if (!list_empty(&kiblnd_data.kib_connd_zombies)) {
3313                         struct kib_peer *peer = NULL;
3314
3315                         conn = list_entry(kiblnd_data.kib_connd_zombies.next,
3316                                           struct kib_conn, ibc_list);
3317                         list_del(&conn->ibc_list);
3318                         if (conn->ibc_reconnect) {
3319                                 peer = conn->ibc_peer;
3320                                 kiblnd_peer_addref(peer);
3321                         }
3322
3323                         spin_unlock_irqrestore(lock, flags);
3324                         dropped_lock = 1;
3325
3326                         kiblnd_destroy_conn(conn, !peer);
3327
3328                         spin_lock_irqsave(lock, flags);
3329                         if (!peer)
3330                                 continue;
3331
3332                         conn->ibc_peer = peer;
3333                         if (peer->ibp_reconnected < KIB_RECONN_HIGH_RACE)
3334                                 list_add_tail(&conn->ibc_list,
3335                                               &kiblnd_data.kib_reconn_list);
3336                         else
3337                                 list_add_tail(&conn->ibc_list,
3338                                               &kiblnd_data.kib_reconn_wait);
3339                 }
3340
3341                 if (!list_empty(&kiblnd_data.kib_connd_conns)) {
3342                         conn = list_entry(kiblnd_data.kib_connd_conns.next,
3343                                           struct kib_conn, ibc_list);
3344                         list_del(&conn->ibc_list);
3345
3346                         spin_unlock_irqrestore(lock, flags);
3347                         dropped_lock = 1;
3348
3349                         kiblnd_disconnect_conn(conn);
3350                         kiblnd_conn_decref(conn);
3351
3352                         spin_lock_irqsave(lock, flags);
3353                 }
3354
3355                 while (reconn < KIB_RECONN_BREAK) {
3356                         if (kiblnd_data.kib_reconn_sec !=
3357                             ktime_get_real_seconds()) {
3358                                 kiblnd_data.kib_reconn_sec = ktime_get_real_seconds();
3359                                 list_splice_init(&kiblnd_data.kib_reconn_wait,
3360                                                  &kiblnd_data.kib_reconn_list);
3361                         }
3362
3363                         if (list_empty(&kiblnd_data.kib_reconn_list))
3364                                 break;
3365
3366                         conn = list_entry(kiblnd_data.kib_reconn_list.next,
3367                                           struct kib_conn, ibc_list);
3368                         list_del(&conn->ibc_list);
3369
3370                         spin_unlock_irqrestore(lock, flags);
3371                         dropped_lock = 1;
3372
3373                         reconn += kiblnd_reconnect_peer(conn->ibc_peer);
3374                         kiblnd_peer_decref(conn->ibc_peer);
3375                         LIBCFS_FREE(conn, sizeof(*conn));
3376
3377                         spin_lock_irqsave(lock, flags);
3378                 }
3379
3380                 /* careful with the jiffy wrap... */
3381                 timeout = (int)(deadline - jiffies);
3382                 if (timeout <= 0) {
3383                         const int n = 4;
3384                         const int p = 1;
3385                         int chunk = kiblnd_data.kib_peer_hash_size;
3386
3387                         spin_unlock_irqrestore(lock, flags);
3388                         dropped_lock = 1;
3389
3390                         /*
3391                          * Time to check for RDMA timeouts on a few more
3392                          * peers: I do checks every 'p' seconds on a
3393                          * proportion of the peer table and I need to check
3394                          * every connection 'n' times within a timeout
3395                          * interval, to ensure I detect a timeout on any
3396                          * connection within (n+1)/n times the timeout
3397                          * interval.
3398                          */
3399                         if (*kiblnd_tunables.kib_timeout > n * p)
3400                                 chunk = (chunk * n * p) /
3401                                         *kiblnd_tunables.kib_timeout;
3402                         if (!chunk)
3403                                 chunk = 1;
3404
3405                         for (i = 0; i < chunk; i++) {
3406                                 kiblnd_check_conns(peer_index);
3407                                 peer_index = (peer_index + 1) %
3408                                              kiblnd_data.kib_peer_hash_size;
3409                         }
3410
3411                         deadline += msecs_to_jiffies(p * MSEC_PER_SEC);
3412                         spin_lock_irqsave(lock, flags);
3413                 }
3414
3415                 if (dropped_lock)
3416                         continue;
3417
3418                 /* Nothing to do for 'timeout'  */
3419                 set_current_state(TASK_INTERRUPTIBLE);
3420                 add_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3421                 spin_unlock_irqrestore(lock, flags);
3422
3423                 schedule_timeout(timeout);
3424
3425                 remove_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3426                 spin_lock_irqsave(lock, flags);
3427         }
3428
3429         spin_unlock_irqrestore(lock, flags);
3430
3431         kiblnd_thread_fini();
3432         return 0;
3433 }
3434
3435 void
3436 kiblnd_qp_event(struct ib_event *event, void *arg)
3437 {
3438         struct kib_conn *conn = arg;
3439
3440         switch (event->event) {
3441         case IB_EVENT_COMM_EST:
3442                 CDEBUG(D_NET, "%s established\n",
3443                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
3444                 /*
3445                  * We received a packet but connection isn't established
3446                  * probably handshake packet was lost, so free to
3447                  * force make connection established
3448                  */
3449                 rdma_notify(conn->ibc_cmid, IB_EVENT_COMM_EST);
3450                 return;
3451
3452         default:
3453                 CERROR("%s: Async QP event type %d\n",
3454                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3455                 return;
3456         }
3457 }
3458
3459 static void
3460 kiblnd_complete(struct ib_wc *wc)
3461 {
3462         switch (kiblnd_wreqid2type(wc->wr_id)) {
3463         default:
3464                 LBUG();
3465
3466         case IBLND_WID_MR:
3467                 if (wc->status != IB_WC_SUCCESS &&
3468                     wc->status != IB_WC_WR_FLUSH_ERR)
3469                         CNETERR("FastReg failed: %d\n", wc->status);
3470                 break;
3471
3472         case IBLND_WID_RDMA:
3473                 /*
3474                  * We only get RDMA completion notification if it fails.  All
3475                  * subsequent work items, including the final SEND will fail
3476                  * too.  However we can't print out any more info about the
3477                  * failing RDMA because 'tx' might be back on the idle list or
3478                  * even reused already if we didn't manage to post all our work
3479                  * items
3480                  */
3481                 CNETERR("RDMA (tx: %p) failed: %d\n",
3482                         kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3483                 return;
3484
3485         case IBLND_WID_TX:
3486                 kiblnd_tx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3487                 return;
3488
3489         case IBLND_WID_RX:
3490                 kiblnd_rx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status,
3491                                    wc->byte_len);
3492                 return;
3493         }
3494 }
3495
3496 void
3497 kiblnd_cq_completion(struct ib_cq *cq, void *arg)
3498 {
3499         /*
3500          * NB I'm not allowed to schedule this conn once its refcount has
3501          * reached 0.  Since fundamentally I'm racing with scheduler threads
3502          * consuming my CQ I could be called after all completions have
3503          * occurred.  But in this case, !ibc_nrx && !ibc_nsends_posted
3504          * and this CQ is about to be destroyed so I NOOP.
3505          */
3506         struct kib_conn *conn = arg;
3507         struct kib_sched_info *sched = conn->ibc_sched;
3508         unsigned long flags;
3509
3510         LASSERT(cq == conn->ibc_cq);
3511
3512         spin_lock_irqsave(&sched->ibs_lock, flags);
3513
3514         conn->ibc_ready = 1;
3515
3516         if (!conn->ibc_scheduled &&
3517             (conn->ibc_nrx > 0 ||
3518              conn->ibc_nsends_posted > 0)) {
3519                 kiblnd_conn_addref(conn); /* +1 ref for sched_conns */
3520                 conn->ibc_scheduled = 1;
3521                 list_add_tail(&conn->ibc_sched_list, &sched->ibs_conns);
3522
3523                 if (waitqueue_active(&sched->ibs_waitq))
3524                         wake_up(&sched->ibs_waitq);
3525         }
3526
3527         spin_unlock_irqrestore(&sched->ibs_lock, flags);
3528 }
3529
3530 void
3531 kiblnd_cq_event(struct ib_event *event, void *arg)
3532 {
3533         struct kib_conn *conn = arg;
3534
3535         CERROR("%s: async CQ event type %d\n",
3536                libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3537 }
3538
3539 int
3540 kiblnd_scheduler(void *arg)
3541 {
3542         long id = (long)arg;
3543         struct kib_sched_info *sched;
3544         struct kib_conn *conn;
3545         wait_queue_t wait;
3546         unsigned long flags;
3547         struct ib_wc wc;
3548         int did_something;
3549         int busy_loops = 0;
3550         int rc;
3551
3552         cfs_block_allsigs();
3553
3554         init_waitqueue_entry(&wait, current);
3555
3556         sched = kiblnd_data.kib_scheds[KIB_THREAD_CPT(id)];
3557
3558         rc = cfs_cpt_bind(lnet_cpt_table(), sched->ibs_cpt);
3559         if (rc) {
3560                 CWARN("Failed to bind on CPT %d, please verify whether all CPUs are healthy and reload modules if necessary, otherwise your system might under risk of low performance\n",
3561                       sched->ibs_cpt);
3562         }
3563
3564         spin_lock_irqsave(&sched->ibs_lock, flags);
3565
3566         while (!kiblnd_data.kib_shutdown) {
3567                 if (busy_loops++ >= IBLND_RESCHED) {
3568                         spin_unlock_irqrestore(&sched->ibs_lock, flags);
3569
3570                         cond_resched();
3571                         busy_loops = 0;
3572
3573                         spin_lock_irqsave(&sched->ibs_lock, flags);
3574                 }
3575
3576                 did_something = 0;
3577
3578                 if (!list_empty(&sched->ibs_conns)) {
3579                         conn = list_entry(sched->ibs_conns.next, struct kib_conn,
3580                                           ibc_sched_list);
3581                         /* take over kib_sched_conns' ref on conn... */
3582                         LASSERT(conn->ibc_scheduled);
3583                         list_del(&conn->ibc_sched_list);
3584                         conn->ibc_ready = 0;
3585
3586                         spin_unlock_irqrestore(&sched->ibs_lock, flags);
3587
3588                         wc.wr_id = IBLND_WID_INVAL;
3589
3590                         rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3591                         if (!rc) {
3592                                 rc = ib_req_notify_cq(conn->ibc_cq,
3593                                                       IB_CQ_NEXT_COMP);
3594                                 if (rc < 0) {
3595                                         CWARN("%s: ib_req_notify_cq failed: %d, closing connection\n",
3596                                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
3597                                         kiblnd_close_conn(conn, -EIO);
3598                                         kiblnd_conn_decref(conn);
3599                                         spin_lock_irqsave(&sched->ibs_lock,
3600                                                           flags);
3601                                         continue;
3602                                 }
3603
3604                                 rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3605                         }
3606
3607                         if (unlikely(rc > 0 && wc.wr_id == IBLND_WID_INVAL)) {
3608                                 LCONSOLE_ERROR("ib_poll_cq (rc: %d) returned invalid wr_id, opcode %d, status: %d, vendor_err: %d, conn: %s status: %d\nplease upgrade firmware and OFED or contact vendor.\n",
3609                                                rc, wc.opcode, wc.status,
3610                                                wc.vendor_err,
3611                                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
3612                                                conn->ibc_state);
3613                                 rc = -EINVAL;
3614                         }
3615
3616                         if (rc < 0) {
3617                                 CWARN("%s: ib_poll_cq failed: %d, closing connection\n",
3618                                       libcfs_nid2str(conn->ibc_peer->ibp_nid),
3619                                       rc);
3620                                 kiblnd_close_conn(conn, -EIO);
3621                                 kiblnd_conn_decref(conn);
3622                                 spin_lock_irqsave(&sched->ibs_lock, flags);
3623                                 continue;
3624                         }
3625
3626                         spin_lock_irqsave(&sched->ibs_lock, flags);
3627
3628                         if (rc || conn->ibc_ready) {
3629                                 /*
3630                                  * There may be another completion waiting; get
3631                                  * another scheduler to check while I handle
3632                                  * this one...
3633                                  */
3634                                 /* +1 ref for sched_conns */
3635                                 kiblnd_conn_addref(conn);
3636                                 list_add_tail(&conn->ibc_sched_list,
3637                                               &sched->ibs_conns);
3638                                 if (waitqueue_active(&sched->ibs_waitq))
3639                                         wake_up(&sched->ibs_waitq);
3640                         } else {
3641                                 conn->ibc_scheduled = 0;
3642                         }
3643
3644                         if (rc) {
3645                                 spin_unlock_irqrestore(&sched->ibs_lock, flags);
3646                                 kiblnd_complete(&wc);
3647
3648                                 spin_lock_irqsave(&sched->ibs_lock, flags);
3649                         }
3650
3651                         kiblnd_conn_decref(conn); /* ...drop my ref from above */
3652                         did_something = 1;
3653                 }
3654
3655                 if (did_something)
3656                         continue;
3657
3658                 set_current_state(TASK_INTERRUPTIBLE);
3659                 add_wait_queue_exclusive(&sched->ibs_waitq, &wait);
3660                 spin_unlock_irqrestore(&sched->ibs_lock, flags);
3661
3662                 schedule();
3663                 busy_loops = 0;
3664
3665                 remove_wait_queue(&sched->ibs_waitq, &wait);
3666                 spin_lock_irqsave(&sched->ibs_lock, flags);
3667         }
3668
3669         spin_unlock_irqrestore(&sched->ibs_lock, flags);
3670
3671         kiblnd_thread_fini();
3672         return 0;
3673 }
3674
3675 int
3676 kiblnd_failover_thread(void *arg)
3677 {
3678         rwlock_t *glock = &kiblnd_data.kib_global_lock;
3679         struct kib_dev *dev;
3680         wait_queue_t wait;
3681         unsigned long flags;
3682         int rc;
3683
3684         LASSERT(*kiblnd_tunables.kib_dev_failover);
3685
3686         cfs_block_allsigs();
3687
3688         init_waitqueue_entry(&wait, current);
3689         write_lock_irqsave(glock, flags);
3690
3691         while (!kiblnd_data.kib_shutdown) {
3692                 int do_failover = 0;
3693                 int long_sleep;
3694
3695                 list_for_each_entry(dev, &kiblnd_data.kib_failed_devs,
3696                                     ibd_fail_list) {
3697                         if (time_before(cfs_time_current(),
3698                                         dev->ibd_next_failover))
3699                                 continue;
3700                         do_failover = 1;
3701                         break;
3702                 }
3703
3704                 if (do_failover) {
3705                         list_del_init(&dev->ibd_fail_list);
3706                         dev->ibd_failover = 1;
3707                         write_unlock_irqrestore(glock, flags);
3708
3709                         rc = kiblnd_dev_failover(dev);
3710
3711                         write_lock_irqsave(glock, flags);
3712
3713                         LASSERT(dev->ibd_failover);
3714                         dev->ibd_failover = 0;
3715                         if (rc >= 0) { /* Device is OK or failover succeed */
3716                                 dev->ibd_next_failover = cfs_time_shift(3);
3717                                 continue;
3718                         }
3719
3720                         /* failed to failover, retry later */
3721                         dev->ibd_next_failover =
3722                                 cfs_time_shift(min(dev->ibd_failed_failover, 10));
3723                         if (kiblnd_dev_can_failover(dev)) {
3724                                 list_add_tail(&dev->ibd_fail_list,
3725                                               &kiblnd_data.kib_failed_devs);
3726                         }
3727
3728                         continue;
3729                 }
3730
3731                 /* long sleep if no more pending failover */
3732                 long_sleep = list_empty(&kiblnd_data.kib_failed_devs);
3733
3734                 set_current_state(TASK_INTERRUPTIBLE);
3735                 add_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3736                 write_unlock_irqrestore(glock, flags);
3737
3738                 rc = schedule_timeout(long_sleep ? cfs_time_seconds(10) :
3739                                                    cfs_time_seconds(1));
3740                 remove_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3741                 write_lock_irqsave(glock, flags);
3742
3743                 if (!long_sleep || rc)
3744                         continue;
3745
3746                 /*
3747                  * have a long sleep, routine check all active devices,
3748                  * we need checking like this because if there is not active
3749                  * connection on the dev and no SEND from local, we may listen
3750                  * on wrong HCA for ever while there is a bonding failover
3751                  */
3752                 list_for_each_entry(dev, &kiblnd_data.kib_devs, ibd_list) {
3753                         if (kiblnd_dev_can_failover(dev)) {
3754                                 list_add_tail(&dev->ibd_fail_list,
3755                                               &kiblnd_data.kib_failed_devs);
3756                         }
3757                 }
3758         }
3759
3760         write_unlock_irqrestore(glock, flags);
3761
3762         kiblnd_thread_fini();
3763         return 0;
3764 }