1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * RDMA Transport Layer
4 *
5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8 */
9
10 #undef pr_fmt
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12
13 #include <linux/module.h>
14 #include <linux/rculist.h>
15 #include <linux/random.h>
16
17 #include "rtrs-clt.h"
18 #include "rtrs-log.h"
19
20 #define RTRS_CONNECT_TIMEOUT_MS 30000
21 /*
22 * Wait a bit before trying to reconnect after a failure
23 * in order to give server time to finish clean up which
24 * leads to "false positives" failed reconnect attempts
25 */
26 #define RTRS_RECONNECT_BACKOFF 1000
27 /*
28 * Wait for additional random time between 0 and 8 seconds
29 * before starting to reconnect to avoid clients reconnecting
30 * all at once in case of a major network outage
31 */
32 #define RTRS_RECONNECT_SEED 8
33
34 MODULE_DESCRIPTION("RDMA Transport Client");
35 MODULE_LICENSE("GPL");
36
37 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops;
38 static struct rtrs_rdma_dev_pd dev_pd = {
39 .ops = &dev_pd_ops
40 };
41
42 static struct workqueue_struct *rtrs_wq;
43 static struct class *rtrs_clt_dev_class;
44
rtrs_clt_is_connected(const struct rtrs_clt * clt)45 static inline bool rtrs_clt_is_connected(const struct rtrs_clt *clt)
46 {
47 struct rtrs_clt_sess *sess;
48 bool connected = false;
49
50 rcu_read_lock();
51 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry)
52 connected |= READ_ONCE(sess->state) == RTRS_CLT_CONNECTED;
53 rcu_read_unlock();
54
55 return connected;
56 }
57
58 static struct rtrs_permit *
__rtrs_get_permit(struct rtrs_clt * clt,enum rtrs_clt_con_type con_type)59 __rtrs_get_permit(struct rtrs_clt *clt, enum rtrs_clt_con_type con_type)
60 {
61 size_t max_depth = clt->queue_depth;
62 struct rtrs_permit *permit;
63 int bit;
64
65 /*
66 * Adapted from null_blk get_tag(). Callers from different cpus may
67 * grab the same bit, since find_first_zero_bit is not atomic.
68 * But then the test_and_set_bit_lock will fail for all the
69 * callers but one, so that they will loop again.
70 * This way an explicit spinlock is not required.
71 */
72 do {
73 bit = find_first_zero_bit(clt->permits_map, max_depth);
74 if (unlikely(bit >= max_depth))
75 return NULL;
76 } while (unlikely(test_and_set_bit_lock(bit, clt->permits_map)));
77
78 permit = get_permit(clt, bit);
79 WARN_ON(permit->mem_id != bit);
80 permit->cpu_id = raw_smp_processor_id();
81 permit->con_type = con_type;
82
83 return permit;
84 }
85
__rtrs_put_permit(struct rtrs_clt * clt,struct rtrs_permit * permit)86 static inline void __rtrs_put_permit(struct rtrs_clt *clt,
87 struct rtrs_permit *permit)
88 {
89 clear_bit_unlock(permit->mem_id, clt->permits_map);
90 }
91
92 /**
93 * rtrs_clt_get_permit() - allocates permit for future RDMA operation
94 * @clt: Current session
95 * @con_type: Type of connection to use with the permit
96 * @can_wait: Wait type
97 *
98 * Description:
99 * Allocates permit for the following RDMA operation. Permit is used
100 * to preallocate all resources and to propagate memory pressure
101 * up earlier.
102 *
103 * Context:
104 * Can sleep if @wait == RTRS_TAG_WAIT
105 */
rtrs_clt_get_permit(struct rtrs_clt * clt,enum rtrs_clt_con_type con_type,int can_wait)106 struct rtrs_permit *rtrs_clt_get_permit(struct rtrs_clt *clt,
107 enum rtrs_clt_con_type con_type,
108 int can_wait)
109 {
110 struct rtrs_permit *permit;
111 DEFINE_WAIT(wait);
112
113 permit = __rtrs_get_permit(clt, con_type);
114 if (likely(permit) || !can_wait)
115 return permit;
116
117 do {
118 prepare_to_wait(&clt->permits_wait, &wait,
119 TASK_UNINTERRUPTIBLE);
120 permit = __rtrs_get_permit(clt, con_type);
121 if (likely(permit))
122 break;
123
124 io_schedule();
125 } while (1);
126
127 finish_wait(&clt->permits_wait, &wait);
128
129 return permit;
130 }
131 EXPORT_SYMBOL(rtrs_clt_get_permit);
132
133 /**
134 * rtrs_clt_put_permit() - puts allocated permit
135 * @clt: Current session
136 * @permit: Permit to be freed
137 *
138 * Context:
139 * Does not matter
140 */
rtrs_clt_put_permit(struct rtrs_clt * clt,struct rtrs_permit * permit)141 void rtrs_clt_put_permit(struct rtrs_clt *clt, struct rtrs_permit *permit)
142 {
143 if (WARN_ON(!test_bit(permit->mem_id, clt->permits_map)))
144 return;
145
146 __rtrs_put_permit(clt, permit);
147
148 /*
149 * rtrs_clt_get_permit() adds itself to the &clt->permits_wait list
150 * before calling schedule(). So if rtrs_clt_get_permit() is sleeping
151 * it must have added itself to &clt->permits_wait before
152 * __rtrs_put_permit() finished.
153 * Hence it is safe to guard wake_up() with a waitqueue_active() test.
154 */
155 if (waitqueue_active(&clt->permits_wait))
156 wake_up(&clt->permits_wait);
157 }
158 EXPORT_SYMBOL(rtrs_clt_put_permit);
159
rtrs_permit_to_pdu(struct rtrs_permit * permit)160 void *rtrs_permit_to_pdu(struct rtrs_permit *permit)
161 {
162 return permit + 1;
163 }
164 EXPORT_SYMBOL(rtrs_permit_to_pdu);
165
166 /**
167 * rtrs_permit_to_clt_con() - returns RDMA connection pointer by the permit
168 * @sess: client session pointer
169 * @permit: permit for the allocation of the RDMA buffer
170 * Note:
171 * IO connection starts from 1.
172 * 0 connection is for user messages.
173 */
174 static
rtrs_permit_to_clt_con(struct rtrs_clt_sess * sess,struct rtrs_permit * permit)175 struct rtrs_clt_con *rtrs_permit_to_clt_con(struct rtrs_clt_sess *sess,
176 struct rtrs_permit *permit)
177 {
178 int id = 0;
179
180 if (likely(permit->con_type == RTRS_IO_CON))
181 id = (permit->cpu_id % (sess->s.con_num - 1)) + 1;
182
183 return to_clt_con(sess->s.con[id]);
184 }
185
186 /**
187 * __rtrs_clt_change_state() - change the session state through session state
188 * machine.
189 *
190 * @sess: client session to change the state of.
191 * @new_state: state to change to.
192 *
193 * returns true if successful, false if the requested state can not be set.
194 *
195 * Locks:
196 * state_wq lock must be hold.
197 */
__rtrs_clt_change_state(struct rtrs_clt_sess * sess,enum rtrs_clt_state new_state)198 static bool __rtrs_clt_change_state(struct rtrs_clt_sess *sess,
199 enum rtrs_clt_state new_state)
200 {
201 enum rtrs_clt_state old_state;
202 bool changed = false;
203
204 lockdep_assert_held(&sess->state_wq.lock);
205
206 old_state = sess->state;
207 switch (new_state) {
208 case RTRS_CLT_CONNECTING:
209 switch (old_state) {
210 case RTRS_CLT_RECONNECTING:
211 changed = true;
212 fallthrough;
213 default:
214 break;
215 }
216 break;
217 case RTRS_CLT_RECONNECTING:
218 switch (old_state) {
219 case RTRS_CLT_CONNECTED:
220 case RTRS_CLT_CONNECTING_ERR:
221 case RTRS_CLT_CLOSED:
222 changed = true;
223 fallthrough;
224 default:
225 break;
226 }
227 break;
228 case RTRS_CLT_CONNECTED:
229 switch (old_state) {
230 case RTRS_CLT_CONNECTING:
231 changed = true;
232 fallthrough;
233 default:
234 break;
235 }
236 break;
237 case RTRS_CLT_CONNECTING_ERR:
238 switch (old_state) {
239 case RTRS_CLT_CONNECTING:
240 changed = true;
241 fallthrough;
242 default:
243 break;
244 }
245 break;
246 case RTRS_CLT_CLOSING:
247 switch (old_state) {
248 case RTRS_CLT_CONNECTING:
249 case RTRS_CLT_CONNECTING_ERR:
250 case RTRS_CLT_RECONNECTING:
251 case RTRS_CLT_CONNECTED:
252 changed = true;
253 fallthrough;
254 default:
255 break;
256 }
257 break;
258 case RTRS_CLT_CLOSED:
259 switch (old_state) {
260 case RTRS_CLT_CLOSING:
261 changed = true;
262 fallthrough;
263 default:
264 break;
265 }
266 break;
267 case RTRS_CLT_DEAD:
268 switch (old_state) {
269 case RTRS_CLT_CLOSED:
270 changed = true;
271 fallthrough;
272 default:
273 break;
274 }
275 break;
276 default:
277 break;
278 }
279 if (changed) {
280 sess->state = new_state;
281 wake_up_locked(&sess->state_wq);
282 }
283
284 return changed;
285 }
286
rtrs_clt_change_state_from_to(struct rtrs_clt_sess * sess,enum rtrs_clt_state old_state,enum rtrs_clt_state new_state)287 static bool rtrs_clt_change_state_from_to(struct rtrs_clt_sess *sess,
288 enum rtrs_clt_state old_state,
289 enum rtrs_clt_state new_state)
290 {
291 bool changed = false;
292
293 spin_lock_irq(&sess->state_wq.lock);
294 if (sess->state == old_state)
295 changed = __rtrs_clt_change_state(sess, new_state);
296 spin_unlock_irq(&sess->state_wq.lock);
297
298 return changed;
299 }
300
rtrs_rdma_error_recovery(struct rtrs_clt_con * con)301 static void rtrs_rdma_error_recovery(struct rtrs_clt_con *con)
302 {
303 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
304
305 if (rtrs_clt_change_state_from_to(sess,
306 RTRS_CLT_CONNECTED,
307 RTRS_CLT_RECONNECTING)) {
308 struct rtrs_clt *clt = sess->clt;
309 unsigned int delay_ms;
310
311 /*
312 * Normal scenario, reconnect if we were successfully connected
313 */
314 delay_ms = clt->reconnect_delay_sec * 1000;
315 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork,
316 msecs_to_jiffies(delay_ms +
317 prandom_u32() % RTRS_RECONNECT_SEED));
318 } else {
319 /*
320 * Error can happen just on establishing new connection,
321 * so notify waiter with error state, waiter is responsible
322 * for cleaning the rest and reconnect if needed.
323 */
324 rtrs_clt_change_state_from_to(sess,
325 RTRS_CLT_CONNECTING,
326 RTRS_CLT_CONNECTING_ERR);
327 }
328 }
329
rtrs_clt_fast_reg_done(struct ib_cq * cq,struct ib_wc * wc)330 static void rtrs_clt_fast_reg_done(struct ib_cq *cq, struct ib_wc *wc)
331 {
332 struct rtrs_clt_con *con = cq->cq_context;
333
334 if (unlikely(wc->status != IB_WC_SUCCESS)) {
335 rtrs_err(con->c.sess, "Failed IB_WR_REG_MR: %s\n",
336 ib_wc_status_msg(wc->status));
337 rtrs_rdma_error_recovery(con);
338 }
339 }
340
341 static struct ib_cqe fast_reg_cqe = {
342 .done = rtrs_clt_fast_reg_done
343 };
344
345 static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno,
346 bool notify, bool can_wait);
347
rtrs_clt_inv_rkey_done(struct ib_cq * cq,struct ib_wc * wc)348 static void rtrs_clt_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
349 {
350 struct rtrs_clt_io_req *req =
351 container_of(wc->wr_cqe, typeof(*req), inv_cqe);
352 struct rtrs_clt_con *con = cq->cq_context;
353
354 if (unlikely(wc->status != IB_WC_SUCCESS)) {
355 rtrs_err(con->c.sess, "Failed IB_WR_LOCAL_INV: %s\n",
356 ib_wc_status_msg(wc->status));
357 rtrs_rdma_error_recovery(con);
358 }
359 req->need_inv = false;
360 if (likely(req->need_inv_comp))
361 complete(&req->inv_comp);
362 else
363 /* Complete request from INV callback */
364 complete_rdma_req(req, req->inv_errno, true, false);
365 }
366
rtrs_inv_rkey(struct rtrs_clt_io_req * req)367 static int rtrs_inv_rkey(struct rtrs_clt_io_req *req)
368 {
369 struct rtrs_clt_con *con = req->con;
370 struct ib_send_wr wr = {
371 .opcode = IB_WR_LOCAL_INV,
372 .wr_cqe = &req->inv_cqe,
373 .send_flags = IB_SEND_SIGNALED,
374 .ex.invalidate_rkey = req->mr->rkey,
375 };
376 req->inv_cqe.done = rtrs_clt_inv_rkey_done;
377
378 return ib_post_send(con->c.qp, &wr, NULL);
379 }
380
complete_rdma_req(struct rtrs_clt_io_req * req,int errno,bool notify,bool can_wait)381 static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno,
382 bool notify, bool can_wait)
383 {
384 struct rtrs_clt_con *con = req->con;
385 struct rtrs_clt_sess *sess;
386 int err;
387
388 if (WARN_ON(!req->in_use))
389 return;
390 if (WARN_ON(!req->con))
391 return;
392 sess = to_clt_sess(con->c.sess);
393
394 if (req->sg_cnt) {
395 if (unlikely(req->dir == DMA_FROM_DEVICE && req->need_inv)) {
396 /*
397 * We are here to invalidate read requests
398 * ourselves. In normal scenario server should
399 * send INV for all read requests, but
400 * we are here, thus two things could happen:
401 *
402 * 1. this is failover, when errno != 0
403 * and can_wait == 1,
404 *
405 * 2. something totally bad happened and
406 * server forgot to send INV, so we
407 * should do that ourselves.
408 */
409
410 if (likely(can_wait)) {
411 req->need_inv_comp = true;
412 } else {
413 /* This should be IO path, so always notify */
414 WARN_ON(!notify);
415 /* Save errno for INV callback */
416 req->inv_errno = errno;
417 }
418
419 err = rtrs_inv_rkey(req);
420 if (unlikely(err)) {
421 rtrs_err(con->c.sess, "Send INV WR key=%#x: %d\n",
422 req->mr->rkey, err);
423 } else if (likely(can_wait)) {
424 wait_for_completion(&req->inv_comp);
425 } else {
426 /*
427 * Something went wrong, so request will be
428 * completed from INV callback.
429 */
430 WARN_ON_ONCE(1);
431
432 return;
433 }
434 }
435 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
436 req->sg_cnt, req->dir);
437 }
438 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
439 atomic_dec(&sess->stats->inflight);
440
441 req->in_use = false;
442 req->con = NULL;
443
444 if (notify)
445 req->conf(req->priv, errno);
446 }
447
rtrs_post_send_rdma(struct rtrs_clt_con * con,struct rtrs_clt_io_req * req,struct rtrs_rbuf * rbuf,u32 off,u32 imm,struct ib_send_wr * wr)448 static int rtrs_post_send_rdma(struct rtrs_clt_con *con,
449 struct rtrs_clt_io_req *req,
450 struct rtrs_rbuf *rbuf, u32 off,
451 u32 imm, struct ib_send_wr *wr)
452 {
453 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
454 enum ib_send_flags flags;
455 struct ib_sge sge;
456
457 if (unlikely(!req->sg_size)) {
458 rtrs_wrn(con->c.sess,
459 "Doing RDMA Write failed, no data supplied\n");
460 return -EINVAL;
461 }
462
463 /* user data and user message in the first list element */
464 sge.addr = req->iu->dma_addr;
465 sge.length = req->sg_size;
466 sge.lkey = sess->s.dev->ib_pd->local_dma_lkey;
467
468 /*
469 * From time to time we have to post signalled sends,
470 * or send queue will fill up and only QP reset can help.
471 */
472 flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ?
473 0 : IB_SEND_SIGNALED;
474
475 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr,
476 req->sg_size, DMA_TO_DEVICE);
477
478 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, &sge, 1,
479 rbuf->rkey, rbuf->addr + off,
480 imm, flags, wr);
481 }
482
process_io_rsp(struct rtrs_clt_sess * sess,u32 msg_id,s16 errno,bool w_inval)483 static void process_io_rsp(struct rtrs_clt_sess *sess, u32 msg_id,
484 s16 errno, bool w_inval)
485 {
486 struct rtrs_clt_io_req *req;
487
488 if (WARN_ON(msg_id >= sess->queue_depth))
489 return;
490
491 req = &sess->reqs[msg_id];
492 /* Drop need_inv if server responded with send with invalidation */
493 req->need_inv &= !w_inval;
494 complete_rdma_req(req, errno, true, false);
495 }
496
rtrs_clt_recv_done(struct rtrs_clt_con * con,struct ib_wc * wc)497 static void rtrs_clt_recv_done(struct rtrs_clt_con *con, struct ib_wc *wc)
498 {
499 struct rtrs_iu *iu;
500 int err;
501 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
502
503 WARN_ON(sess->flags != RTRS_MSG_NEW_RKEY_F);
504 iu = container_of(wc->wr_cqe, struct rtrs_iu,
505 cqe);
506 err = rtrs_iu_post_recv(&con->c, iu);
507 if (unlikely(err)) {
508 rtrs_err(con->c.sess, "post iu failed %d\n", err);
509 rtrs_rdma_error_recovery(con);
510 }
511 }
512
rtrs_clt_rkey_rsp_done(struct rtrs_clt_con * con,struct ib_wc * wc)513 static void rtrs_clt_rkey_rsp_done(struct rtrs_clt_con *con, struct ib_wc *wc)
514 {
515 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
516 struct rtrs_msg_rkey_rsp *msg;
517 u32 imm_type, imm_payload;
518 bool w_inval = false;
519 struct rtrs_iu *iu;
520 u32 buf_id;
521 int err;
522
523 WARN_ON(sess->flags != RTRS_MSG_NEW_RKEY_F);
524
525 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
526
527 if (unlikely(wc->byte_len < sizeof(*msg))) {
528 rtrs_err(con->c.sess, "rkey response is malformed: size %d\n",
529 wc->byte_len);
530 goto out;
531 }
532 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
533 iu->size, DMA_FROM_DEVICE);
534 msg = iu->buf;
535 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_RKEY_RSP)) {
536 rtrs_err(sess->clt, "rkey response is malformed: type %d\n",
537 le16_to_cpu(msg->type));
538 goto out;
539 }
540 buf_id = le16_to_cpu(msg->buf_id);
541 if (WARN_ON(buf_id >= sess->queue_depth))
542 goto out;
543
544 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), &imm_type, &imm_payload);
545 if (likely(imm_type == RTRS_IO_RSP_IMM ||
546 imm_type == RTRS_IO_RSP_W_INV_IMM)) {
547 u32 msg_id;
548
549 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM);
550 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err);
551
552 if (WARN_ON(buf_id != msg_id))
553 goto out;
554 sess->rbufs[buf_id].rkey = le32_to_cpu(msg->rkey);
555 process_io_rsp(sess, msg_id, err, w_inval);
556 }
557 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, iu->dma_addr,
558 iu->size, DMA_FROM_DEVICE);
559 return rtrs_clt_recv_done(con, wc);
560 out:
561 rtrs_rdma_error_recovery(con);
562 }
563
564 static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
565
566 static struct ib_cqe io_comp_cqe = {
567 .done = rtrs_clt_rdma_done
568 };
569
570 /*
571 * Post x2 empty WRs: first is for this RDMA with IMM,
572 * second is for RECV with INV, which happened earlier.
573 */
rtrs_post_recv_empty_x2(struct rtrs_con * con,struct ib_cqe * cqe)574 static int rtrs_post_recv_empty_x2(struct rtrs_con *con, struct ib_cqe *cqe)
575 {
576 struct ib_recv_wr wr_arr[2], *wr;
577 int i;
578
579 memset(wr_arr, 0, sizeof(wr_arr));
580 for (i = 0; i < ARRAY_SIZE(wr_arr); i++) {
581 wr = &wr_arr[i];
582 wr->wr_cqe = cqe;
583 if (i)
584 /* Chain backwards */
585 wr->next = &wr_arr[i - 1];
586 }
587
588 return ib_post_recv(con->qp, wr, NULL);
589 }
590
rtrs_clt_rdma_done(struct ib_cq * cq,struct ib_wc * wc)591 static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
592 {
593 struct rtrs_clt_con *con = cq->cq_context;
594 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
595 u32 imm_type, imm_payload;
596 bool w_inval = false;
597 int err;
598
599 if (unlikely(wc->status != IB_WC_SUCCESS)) {
600 if (wc->status != IB_WC_WR_FLUSH_ERR) {
601 rtrs_err(sess->clt, "RDMA failed: %s\n",
602 ib_wc_status_msg(wc->status));
603 rtrs_rdma_error_recovery(con);
604 }
605 return;
606 }
607 rtrs_clt_update_wc_stats(con);
608
609 switch (wc->opcode) {
610 case IB_WC_RECV_RDMA_WITH_IMM:
611 /*
612 * post_recv() RDMA write completions of IO reqs (read/write)
613 * and hb
614 */
615 if (WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done))
616 return;
617 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
618 &imm_type, &imm_payload);
619 if (likely(imm_type == RTRS_IO_RSP_IMM ||
620 imm_type == RTRS_IO_RSP_W_INV_IMM)) {
621 u32 msg_id;
622
623 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM);
624 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err);
625
626 process_io_rsp(sess, msg_id, err, w_inval);
627 } else if (imm_type == RTRS_HB_MSG_IMM) {
628 WARN_ON(con->c.cid);
629 rtrs_send_hb_ack(&sess->s);
630 if (sess->flags == RTRS_MSG_NEW_RKEY_F)
631 return rtrs_clt_recv_done(con, wc);
632 } else if (imm_type == RTRS_HB_ACK_IMM) {
633 WARN_ON(con->c.cid);
634 sess->s.hb_missed_cnt = 0;
635 if (sess->flags == RTRS_MSG_NEW_RKEY_F)
636 return rtrs_clt_recv_done(con, wc);
637 } else {
638 rtrs_wrn(con->c.sess, "Unknown IMM type %u\n",
639 imm_type);
640 }
641 if (w_inval)
642 /*
643 * Post x2 empty WRs: first is for this RDMA with IMM,
644 * second is for RECV with INV, which happened earlier.
645 */
646 err = rtrs_post_recv_empty_x2(&con->c, &io_comp_cqe);
647 else
648 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
649 if (unlikely(err)) {
650 rtrs_err(con->c.sess, "rtrs_post_recv_empty(): %d\n",
651 err);
652 rtrs_rdma_error_recovery(con);
653 break;
654 }
655 break;
656 case IB_WC_RECV:
657 /*
658 * Key invalidations from server side
659 */
660 WARN_ON(!(wc->wc_flags & IB_WC_WITH_INVALIDATE ||
661 wc->wc_flags & IB_WC_WITH_IMM));
662 WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done);
663 if (sess->flags == RTRS_MSG_NEW_RKEY_F) {
664 if (wc->wc_flags & IB_WC_WITH_INVALIDATE)
665 return rtrs_clt_recv_done(con, wc);
666
667 return rtrs_clt_rkey_rsp_done(con, wc);
668 }
669 break;
670 case IB_WC_RDMA_WRITE:
671 /*
672 * post_send() RDMA write completions of IO reqs (read/write)
673 * and hb
674 */
675 break;
676
677 default:
678 rtrs_wrn(sess->clt, "Unexpected WC type: %d\n", wc->opcode);
679 return;
680 }
681 }
682
post_recv_io(struct rtrs_clt_con * con,size_t q_size)683 static int post_recv_io(struct rtrs_clt_con *con, size_t q_size)
684 {
685 int err, i;
686 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
687
688 for (i = 0; i < q_size; i++) {
689 if (sess->flags == RTRS_MSG_NEW_RKEY_F) {
690 struct rtrs_iu *iu = &con->rsp_ius[i];
691
692 err = rtrs_iu_post_recv(&con->c, iu);
693 } else {
694 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
695 }
696 if (unlikely(err))
697 return err;
698 }
699
700 return 0;
701 }
702
post_recv_sess(struct rtrs_clt_sess * sess)703 static int post_recv_sess(struct rtrs_clt_sess *sess)
704 {
705 size_t q_size = 0;
706 int err, cid;
707
708 for (cid = 0; cid < sess->s.con_num; cid++) {
709 if (cid == 0)
710 q_size = SERVICE_CON_QUEUE_DEPTH;
711 else
712 q_size = sess->queue_depth;
713
714 /*
715 * x2 for RDMA read responses + FR key invalidations,
716 * RDMA writes do not require any FR registrations.
717 */
718 q_size *= 2;
719
720 err = post_recv_io(to_clt_con(sess->s.con[cid]), q_size);
721 if (unlikely(err)) {
722 rtrs_err(sess->clt, "post_recv_io(), err: %d\n", err);
723 return err;
724 }
725 }
726
727 return 0;
728 }
729
730 struct path_it {
731 int i;
732 struct list_head skip_list;
733 struct rtrs_clt *clt;
734 struct rtrs_clt_sess *(*next_path)(struct path_it *it);
735 };
736
737 /**
738 * list_next_or_null_rr_rcu - get next list element in round-robin fashion.
739 * @head: the head for the list.
740 * @ptr: the list head to take the next element from.
741 * @type: the type of the struct this is embedded in.
742 * @memb: the name of the list_head within the struct.
743 *
744 * Next element returned in round-robin fashion, i.e. head will be skipped,
745 * but if list is observed as empty, NULL will be returned.
746 *
747 * This primitive may safely run concurrently with the _rcu list-mutation
748 * primitives such as list_add_rcu() as long as it's guarded by rcu_read_lock().
749 */
750 #define list_next_or_null_rr_rcu(head, ptr, type, memb) \
751 ({ \
752 list_next_or_null_rcu(head, ptr, type, memb) ?: \
753 list_next_or_null_rcu(head, READ_ONCE((ptr)->next), \
754 type, memb); \
755 })
756
757 /**
758 * get_next_path_rr() - Returns path in round-robin fashion.
759 * @it: the path pointer
760 *
761 * Related to @MP_POLICY_RR
762 *
763 * Locks:
764 * rcu_read_lock() must be hold.
765 */
get_next_path_rr(struct path_it * it)766 static struct rtrs_clt_sess *get_next_path_rr(struct path_it *it)
767 {
768 struct rtrs_clt_sess __rcu **ppcpu_path;
769 struct rtrs_clt_sess *path;
770 struct rtrs_clt *clt;
771
772 clt = it->clt;
773
774 /*
775 * Here we use two RCU objects: @paths_list and @pcpu_path
776 * pointer. See rtrs_clt_remove_path_from_arr() for details
777 * how that is handled.
778 */
779
780 ppcpu_path = this_cpu_ptr(clt->pcpu_path);
781 path = rcu_dereference(*ppcpu_path);
782 if (unlikely(!path))
783 path = list_first_or_null_rcu(&clt->paths_list,
784 typeof(*path), s.entry);
785 else
786 path = list_next_or_null_rr_rcu(&clt->paths_list,
787 &path->s.entry,
788 typeof(*path),
789 s.entry);
790 rcu_assign_pointer(*ppcpu_path, path);
791
792 return path;
793 }
794
795 /**
796 * get_next_path_min_inflight() - Returns path with minimal inflight count.
797 * @it: the path pointer
798 *
799 * Related to @MP_POLICY_MIN_INFLIGHT
800 *
801 * Locks:
802 * rcu_read_lock() must be hold.
803 */
get_next_path_min_inflight(struct path_it * it)804 static struct rtrs_clt_sess *get_next_path_min_inflight(struct path_it *it)
805 {
806 struct rtrs_clt_sess *min_path = NULL;
807 struct rtrs_clt *clt = it->clt;
808 struct rtrs_clt_sess *sess;
809 int min_inflight = INT_MAX;
810 int inflight;
811
812 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) {
813 if (unlikely(!list_empty(raw_cpu_ptr(sess->mp_skip_entry))))
814 continue;
815
816 inflight = atomic_read(&sess->stats->inflight);
817
818 if (inflight < min_inflight) {
819 min_inflight = inflight;
820 min_path = sess;
821 }
822 }
823
824 /*
825 * add the path to the skip list, so that next time we can get
826 * a different one
827 */
828 if (min_path)
829 list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list);
830
831 return min_path;
832 }
833
path_it_init(struct path_it * it,struct rtrs_clt * clt)834 static inline void path_it_init(struct path_it *it, struct rtrs_clt *clt)
835 {
836 INIT_LIST_HEAD(&it->skip_list);
837 it->clt = clt;
838 it->i = 0;
839
840 if (clt->mp_policy == MP_POLICY_RR)
841 it->next_path = get_next_path_rr;
842 else
843 it->next_path = get_next_path_min_inflight;
844 }
845
path_it_deinit(struct path_it * it)846 static inline void path_it_deinit(struct path_it *it)
847 {
848 struct list_head *skip, *tmp;
849 /*
850 * The skip_list is used only for the MIN_INFLIGHT policy.
851 * We need to remove paths from it, so that next IO can insert
852 * paths (->mp_skip_entry) into a skip_list again.
853 */
854 list_for_each_safe(skip, tmp, &it->skip_list)
855 list_del_init(skip);
856 }
857
858 /**
859 * rtrs_clt_init_req() Initialize an rtrs_clt_io_req holding information
860 * about an inflight IO.
861 * The user buffer holding user control message (not data) is copied into
862 * the corresponding buffer of rtrs_iu (req->iu->buf), which later on will
863 * also hold the control message of rtrs.
864 * @req: an io request holding information about IO.
865 * @sess: client session
866 * @conf: conformation callback function to notify upper layer.
867 * @permit: permit for allocation of RDMA remote buffer
868 * @priv: private pointer
869 * @vec: kernel vector containing control message
870 * @usr_len: length of the user message
871 * @sg: scater list for IO data
872 * @sg_cnt: number of scater list entries
873 * @data_len: length of the IO data
874 * @dir: direction of the IO.
875 */
rtrs_clt_init_req(struct rtrs_clt_io_req * req,struct rtrs_clt_sess * sess,void (* conf)(void * priv,int errno),struct rtrs_permit * permit,void * priv,const struct kvec * vec,size_t usr_len,struct scatterlist * sg,size_t sg_cnt,size_t data_len,int dir)876 static void rtrs_clt_init_req(struct rtrs_clt_io_req *req,
877 struct rtrs_clt_sess *sess,
878 void (*conf)(void *priv, int errno),
879 struct rtrs_permit *permit, void *priv,
880 const struct kvec *vec, size_t usr_len,
881 struct scatterlist *sg, size_t sg_cnt,
882 size_t data_len, int dir)
883 {
884 struct iov_iter iter;
885 size_t len;
886
887 req->permit = permit;
888 req->in_use = true;
889 req->usr_len = usr_len;
890 req->data_len = data_len;
891 req->sglist = sg;
892 req->sg_cnt = sg_cnt;
893 req->priv = priv;
894 req->dir = dir;
895 req->con = rtrs_permit_to_clt_con(sess, permit);
896 req->conf = conf;
897 req->need_inv = false;
898 req->need_inv_comp = false;
899 req->inv_errno = 0;
900
901 iov_iter_kvec(&iter, READ, vec, 1, usr_len);
902 len = _copy_from_iter(req->iu->buf, usr_len, &iter);
903 WARN_ON(len != usr_len);
904
905 reinit_completion(&req->inv_comp);
906 }
907
908 static struct rtrs_clt_io_req *
rtrs_clt_get_req(struct rtrs_clt_sess * sess,void (* conf)(void * priv,int errno),struct rtrs_permit * permit,void * priv,const struct kvec * vec,size_t usr_len,struct scatterlist * sg,size_t sg_cnt,size_t data_len,int dir)909 rtrs_clt_get_req(struct rtrs_clt_sess *sess,
910 void (*conf)(void *priv, int errno),
911 struct rtrs_permit *permit, void *priv,
912 const struct kvec *vec, size_t usr_len,
913 struct scatterlist *sg, size_t sg_cnt,
914 size_t data_len, int dir)
915 {
916 struct rtrs_clt_io_req *req;
917
918 req = &sess->reqs[permit->mem_id];
919 rtrs_clt_init_req(req, sess, conf, permit, priv, vec, usr_len,
920 sg, sg_cnt, data_len, dir);
921 return req;
922 }
923
924 static struct rtrs_clt_io_req *
rtrs_clt_get_copy_req(struct rtrs_clt_sess * alive_sess,struct rtrs_clt_io_req * fail_req)925 rtrs_clt_get_copy_req(struct rtrs_clt_sess *alive_sess,
926 struct rtrs_clt_io_req *fail_req)
927 {
928 struct rtrs_clt_io_req *req;
929 struct kvec vec = {
930 .iov_base = fail_req->iu->buf,
931 .iov_len = fail_req->usr_len
932 };
933
934 req = &alive_sess->reqs[fail_req->permit->mem_id];
935 rtrs_clt_init_req(req, alive_sess, fail_req->conf, fail_req->permit,
936 fail_req->priv, &vec, fail_req->usr_len,
937 fail_req->sglist, fail_req->sg_cnt,
938 fail_req->data_len, fail_req->dir);
939 return req;
940 }
941
rtrs_post_rdma_write_sg(struct rtrs_clt_con * con,struct rtrs_clt_io_req * req,struct rtrs_rbuf * rbuf,u32 size,u32 imm)942 static int rtrs_post_rdma_write_sg(struct rtrs_clt_con *con,
943 struct rtrs_clt_io_req *req,
944 struct rtrs_rbuf *rbuf,
945 u32 size, u32 imm)
946 {
947 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
948 struct ib_sge *sge = req->sge;
949 enum ib_send_flags flags;
950 struct scatterlist *sg;
951 size_t num_sge;
952 int i;
953
954 for_each_sg(req->sglist, sg, req->sg_cnt, i) {
955 sge[i].addr = sg_dma_address(sg);
956 sge[i].length = sg_dma_len(sg);
957 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey;
958 }
959 sge[i].addr = req->iu->dma_addr;
960 sge[i].length = size;
961 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey;
962
963 num_sge = 1 + req->sg_cnt;
964
965 /*
966 * From time to time we have to post signalled sends,
967 * or send queue will fill up and only QP reset can help.
968 */
969 flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ?
970 0 : IB_SEND_SIGNALED;
971
972 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr,
973 size, DMA_TO_DEVICE);
974
975 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, sge, num_sge,
976 rbuf->rkey, rbuf->addr, imm,
977 flags, NULL);
978 }
979
rtrs_clt_write_req(struct rtrs_clt_io_req * req)980 static int rtrs_clt_write_req(struct rtrs_clt_io_req *req)
981 {
982 struct rtrs_clt_con *con = req->con;
983 struct rtrs_sess *s = con->c.sess;
984 struct rtrs_clt_sess *sess = to_clt_sess(s);
985 struct rtrs_msg_rdma_write *msg;
986
987 struct rtrs_rbuf *rbuf;
988 int ret, count = 0;
989 u32 imm, buf_id;
990
991 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len;
992
993 if (unlikely(tsize > sess->chunk_size)) {
994 rtrs_wrn(s, "Write request failed, size too big %zu > %d\n",
995 tsize, sess->chunk_size);
996 return -EMSGSIZE;
997 }
998 if (req->sg_cnt) {
999 count = ib_dma_map_sg(sess->s.dev->ib_dev, req->sglist,
1000 req->sg_cnt, req->dir);
1001 if (unlikely(!count)) {
1002 rtrs_wrn(s, "Write request failed, map failed\n");
1003 return -EINVAL;
1004 }
1005 }
1006 /* put rtrs msg after sg and user message */
1007 msg = req->iu->buf + req->usr_len;
1008 msg->type = cpu_to_le16(RTRS_MSG_WRITE);
1009 msg->usr_len = cpu_to_le16(req->usr_len);
1010
1011 /* rtrs message on server side will be after user data and message */
1012 imm = req->permit->mem_off + req->data_len + req->usr_len;
1013 imm = rtrs_to_io_req_imm(imm);
1014 buf_id = req->permit->mem_id;
1015 req->sg_size = tsize;
1016 rbuf = &sess->rbufs[buf_id];
1017
1018 /*
1019 * Update stats now, after request is successfully sent it is not
1020 * safe anymore to touch it.
1021 */
1022 rtrs_clt_update_all_stats(req, WRITE);
1023
1024 ret = rtrs_post_rdma_write_sg(req->con, req, rbuf,
1025 req->usr_len + sizeof(*msg),
1026 imm);
1027 if (unlikely(ret)) {
1028 rtrs_err(s, "Write request failed: %d\n", ret);
1029 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
1030 atomic_dec(&sess->stats->inflight);
1031 if (req->sg_cnt)
1032 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
1033 req->sg_cnt, req->dir);
1034 }
1035
1036 return ret;
1037 }
1038
rtrs_map_sg_fr(struct rtrs_clt_io_req * req,size_t count)1039 static int rtrs_map_sg_fr(struct rtrs_clt_io_req *req, size_t count)
1040 {
1041 int nr;
1042
1043 /* Align the MR to a 4K page size to match the block virt boundary */
1044 nr = ib_map_mr_sg(req->mr, req->sglist, count, NULL, SZ_4K);
1045 if (nr < 0)
1046 return nr;
1047 if (unlikely(nr < req->sg_cnt))
1048 return -EINVAL;
1049 ib_update_fast_reg_key(req->mr, ib_inc_rkey(req->mr->rkey));
1050
1051 return nr;
1052 }
1053
rtrs_clt_read_req(struct rtrs_clt_io_req * req)1054 static int rtrs_clt_read_req(struct rtrs_clt_io_req *req)
1055 {
1056 struct rtrs_clt_con *con = req->con;
1057 struct rtrs_sess *s = con->c.sess;
1058 struct rtrs_clt_sess *sess = to_clt_sess(s);
1059 struct rtrs_msg_rdma_read *msg;
1060 struct rtrs_ib_dev *dev;
1061
1062 struct ib_reg_wr rwr;
1063 struct ib_send_wr *wr = NULL;
1064
1065 int ret, count = 0;
1066 u32 imm, buf_id;
1067
1068 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len;
1069
1070 s = &sess->s;
1071 dev = sess->s.dev;
1072
1073 if (unlikely(tsize > sess->chunk_size)) {
1074 rtrs_wrn(s,
1075 "Read request failed, message size is %zu, bigger than CHUNK_SIZE %d\n",
1076 tsize, sess->chunk_size);
1077 return -EMSGSIZE;
1078 }
1079
1080 if (req->sg_cnt) {
1081 count = ib_dma_map_sg(dev->ib_dev, req->sglist, req->sg_cnt,
1082 req->dir);
1083 if (unlikely(!count)) {
1084 rtrs_wrn(s,
1085 "Read request failed, dma map failed\n");
1086 return -EINVAL;
1087 }
1088 }
1089 /* put our message into req->buf after user message*/
1090 msg = req->iu->buf + req->usr_len;
1091 msg->type = cpu_to_le16(RTRS_MSG_READ);
1092 msg->usr_len = cpu_to_le16(req->usr_len);
1093
1094 if (count) {
1095 ret = rtrs_map_sg_fr(req, count);
1096 if (ret < 0) {
1097 rtrs_err_rl(s,
1098 "Read request failed, failed to map fast reg. data, err: %d\n",
1099 ret);
1100 ib_dma_unmap_sg(dev->ib_dev, req->sglist, req->sg_cnt,
1101 req->dir);
1102 return ret;
1103 }
1104 rwr = (struct ib_reg_wr) {
1105 .wr.opcode = IB_WR_REG_MR,
1106 .wr.wr_cqe = &fast_reg_cqe,
1107 .mr = req->mr,
1108 .key = req->mr->rkey,
1109 .access = (IB_ACCESS_LOCAL_WRITE |
1110 IB_ACCESS_REMOTE_WRITE),
1111 };
1112 wr = &rwr.wr;
1113
1114 msg->sg_cnt = cpu_to_le16(1);
1115 msg->flags = cpu_to_le16(RTRS_MSG_NEED_INVAL_F);
1116
1117 msg->desc[0].addr = cpu_to_le64(req->mr->iova);
1118 msg->desc[0].key = cpu_to_le32(req->mr->rkey);
1119 msg->desc[0].len = cpu_to_le32(req->mr->length);
1120
1121 /* Further invalidation is required */
1122 req->need_inv = !!RTRS_MSG_NEED_INVAL_F;
1123
1124 } else {
1125 msg->sg_cnt = 0;
1126 msg->flags = 0;
1127 }
1128 /*
1129 * rtrs message will be after the space reserved for disk data and
1130 * user message
1131 */
1132 imm = req->permit->mem_off + req->data_len + req->usr_len;
1133 imm = rtrs_to_io_req_imm(imm);
1134 buf_id = req->permit->mem_id;
1135
1136 req->sg_size = sizeof(*msg);
1137 req->sg_size += le16_to_cpu(msg->sg_cnt) * sizeof(struct rtrs_sg_desc);
1138 req->sg_size += req->usr_len;
1139
1140 /*
1141 * Update stats now, after request is successfully sent it is not
1142 * safe anymore to touch it.
1143 */
1144 rtrs_clt_update_all_stats(req, READ);
1145
1146 ret = rtrs_post_send_rdma(req->con, req, &sess->rbufs[buf_id],
1147 req->data_len, imm, wr);
1148 if (unlikely(ret)) {
1149 rtrs_err(s, "Read request failed: %d\n", ret);
1150 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
1151 atomic_dec(&sess->stats->inflight);
1152 req->need_inv = false;
1153 if (req->sg_cnt)
1154 ib_dma_unmap_sg(dev->ib_dev, req->sglist,
1155 req->sg_cnt, req->dir);
1156 }
1157
1158 return ret;
1159 }
1160
1161 /**
1162 * rtrs_clt_failover_req() Try to find an active path for a failed request
1163 * @clt: clt context
1164 * @fail_req: a failed io request.
1165 */
rtrs_clt_failover_req(struct rtrs_clt * clt,struct rtrs_clt_io_req * fail_req)1166 static int rtrs_clt_failover_req(struct rtrs_clt *clt,
1167 struct rtrs_clt_io_req *fail_req)
1168 {
1169 struct rtrs_clt_sess *alive_sess;
1170 struct rtrs_clt_io_req *req;
1171 int err = -ECONNABORTED;
1172 struct path_it it;
1173
1174 rcu_read_lock();
1175 for (path_it_init(&it, clt);
1176 (alive_sess = it.next_path(&it)) && it.i < it.clt->paths_num;
1177 it.i++) {
1178 if (unlikely(READ_ONCE(alive_sess->state) !=
1179 RTRS_CLT_CONNECTED))
1180 continue;
1181 req = rtrs_clt_get_copy_req(alive_sess, fail_req);
1182 if (req->dir == DMA_TO_DEVICE)
1183 err = rtrs_clt_write_req(req);
1184 else
1185 err = rtrs_clt_read_req(req);
1186 if (unlikely(err)) {
1187 req->in_use = false;
1188 continue;
1189 }
1190 /* Success path */
1191 rtrs_clt_inc_failover_cnt(alive_sess->stats);
1192 break;
1193 }
1194 path_it_deinit(&it);
1195 rcu_read_unlock();
1196
1197 return err;
1198 }
1199
fail_all_outstanding_reqs(struct rtrs_clt_sess * sess)1200 static void fail_all_outstanding_reqs(struct rtrs_clt_sess *sess)
1201 {
1202 struct rtrs_clt *clt = sess->clt;
1203 struct rtrs_clt_io_req *req;
1204 int i, err;
1205
1206 if (!sess->reqs)
1207 return;
1208 for (i = 0; i < sess->queue_depth; ++i) {
1209 req = &sess->reqs[i];
1210 if (!req->in_use)
1211 continue;
1212
1213 /*
1214 * Safely (without notification) complete failed request.
1215 * After completion this request is still useble and can
1216 * be failovered to another path.
1217 */
1218 complete_rdma_req(req, -ECONNABORTED, false, true);
1219
1220 err = rtrs_clt_failover_req(clt, req);
1221 if (unlikely(err))
1222 /* Failover failed, notify anyway */
1223 req->conf(req->priv, err);
1224 }
1225 }
1226
free_sess_reqs(struct rtrs_clt_sess * sess)1227 static void free_sess_reqs(struct rtrs_clt_sess *sess)
1228 {
1229 struct rtrs_clt_io_req *req;
1230 int i;
1231
1232 if (!sess->reqs)
1233 return;
1234 for (i = 0; i < sess->queue_depth; ++i) {
1235 req = &sess->reqs[i];
1236 if (req->mr)
1237 ib_dereg_mr(req->mr);
1238 kfree(req->sge);
1239 rtrs_iu_free(req->iu, DMA_TO_DEVICE,
1240 sess->s.dev->ib_dev, 1);
1241 }
1242 kfree(sess->reqs);
1243 sess->reqs = NULL;
1244 }
1245
alloc_sess_reqs(struct rtrs_clt_sess * sess)1246 static int alloc_sess_reqs(struct rtrs_clt_sess *sess)
1247 {
1248 struct rtrs_clt_io_req *req;
1249 struct rtrs_clt *clt = sess->clt;
1250 int i, err = -ENOMEM;
1251
1252 sess->reqs = kcalloc(sess->queue_depth, sizeof(*sess->reqs),
1253 GFP_KERNEL);
1254 if (!sess->reqs)
1255 return -ENOMEM;
1256
1257 for (i = 0; i < sess->queue_depth; ++i) {
1258 req = &sess->reqs[i];
1259 req->iu = rtrs_iu_alloc(1, sess->max_hdr_size, GFP_KERNEL,
1260 sess->s.dev->ib_dev,
1261 DMA_TO_DEVICE,
1262 rtrs_clt_rdma_done);
1263 if (!req->iu)
1264 goto out;
1265
1266 req->sge = kmalloc_array(clt->max_segments + 1,
1267 sizeof(*req->sge), GFP_KERNEL);
1268 if (!req->sge)
1269 goto out;
1270
1271 req->mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
1272 sess->max_pages_per_mr);
1273 if (IS_ERR(req->mr)) {
1274 err = PTR_ERR(req->mr);
1275 req->mr = NULL;
1276 pr_err("Failed to alloc sess->max_pages_per_mr %d\n",
1277 sess->max_pages_per_mr);
1278 goto out;
1279 }
1280
1281 init_completion(&req->inv_comp);
1282 }
1283
1284 return 0;
1285
1286 out:
1287 free_sess_reqs(sess);
1288
1289 return err;
1290 }
1291
alloc_permits(struct rtrs_clt * clt)1292 static int alloc_permits(struct rtrs_clt *clt)
1293 {
1294 unsigned int chunk_bits;
1295 int err, i;
1296
1297 clt->permits_map = kcalloc(BITS_TO_LONGS(clt->queue_depth),
1298 sizeof(long), GFP_KERNEL);
1299 if (!clt->permits_map) {
1300 err = -ENOMEM;
1301 goto out_err;
1302 }
1303 clt->permits = kcalloc(clt->queue_depth, permit_size(clt), GFP_KERNEL);
1304 if (!clt->permits) {
1305 err = -ENOMEM;
1306 goto err_map;
1307 }
1308 chunk_bits = ilog2(clt->queue_depth - 1) + 1;
1309 for (i = 0; i < clt->queue_depth; i++) {
1310 struct rtrs_permit *permit;
1311
1312 permit = get_permit(clt, i);
1313 permit->mem_id = i;
1314 permit->mem_off = i << (MAX_IMM_PAYL_BITS - chunk_bits);
1315 }
1316
1317 return 0;
1318
1319 err_map:
1320 kfree(clt->permits_map);
1321 clt->permits_map = NULL;
1322 out_err:
1323 return err;
1324 }
1325
free_permits(struct rtrs_clt * clt)1326 static void free_permits(struct rtrs_clt *clt)
1327 {
1328 kfree(clt->permits_map);
1329 clt->permits_map = NULL;
1330 kfree(clt->permits);
1331 clt->permits = NULL;
1332 }
1333
query_fast_reg_mode(struct rtrs_clt_sess * sess)1334 static void query_fast_reg_mode(struct rtrs_clt_sess *sess)
1335 {
1336 struct ib_device *ib_dev;
1337 u64 max_pages_per_mr;
1338 int mr_page_shift;
1339
1340 ib_dev = sess->s.dev->ib_dev;
1341
1342 /*
1343 * Use the smallest page size supported by the HCA, down to a
1344 * minimum of 4096 bytes. We're unlikely to build large sglists
1345 * out of smaller entries.
1346 */
1347 mr_page_shift = max(12, ffs(ib_dev->attrs.page_size_cap) - 1);
1348 max_pages_per_mr = ib_dev->attrs.max_mr_size;
1349 do_div(max_pages_per_mr, (1ull << mr_page_shift));
1350 sess->max_pages_per_mr =
1351 min3(sess->max_pages_per_mr, (u32)max_pages_per_mr,
1352 ib_dev->attrs.max_fast_reg_page_list_len);
1353 sess->max_send_sge = ib_dev->attrs.max_send_sge;
1354 }
1355
rtrs_clt_change_state_get_old(struct rtrs_clt_sess * sess,enum rtrs_clt_state new_state,enum rtrs_clt_state * old_state)1356 static bool rtrs_clt_change_state_get_old(struct rtrs_clt_sess *sess,
1357 enum rtrs_clt_state new_state,
1358 enum rtrs_clt_state *old_state)
1359 {
1360 bool changed;
1361
1362 spin_lock_irq(&sess->state_wq.lock);
1363 *old_state = sess->state;
1364 changed = __rtrs_clt_change_state(sess, new_state);
1365 spin_unlock_irq(&sess->state_wq.lock);
1366
1367 return changed;
1368 }
1369
rtrs_clt_change_state(struct rtrs_clt_sess * sess,enum rtrs_clt_state new_state)1370 static bool rtrs_clt_change_state(struct rtrs_clt_sess *sess,
1371 enum rtrs_clt_state new_state)
1372 {
1373 enum rtrs_clt_state old_state;
1374
1375 return rtrs_clt_change_state_get_old(sess, new_state, &old_state);
1376 }
1377
rtrs_clt_hb_err_handler(struct rtrs_con * c)1378 static void rtrs_clt_hb_err_handler(struct rtrs_con *c)
1379 {
1380 struct rtrs_clt_con *con = container_of(c, typeof(*con), c);
1381
1382 rtrs_rdma_error_recovery(con);
1383 }
1384
rtrs_clt_init_hb(struct rtrs_clt_sess * sess)1385 static void rtrs_clt_init_hb(struct rtrs_clt_sess *sess)
1386 {
1387 rtrs_init_hb(&sess->s, &io_comp_cqe,
1388 RTRS_HB_INTERVAL_MS,
1389 RTRS_HB_MISSED_MAX,
1390 rtrs_clt_hb_err_handler,
1391 rtrs_wq);
1392 }
1393
rtrs_clt_start_hb(struct rtrs_clt_sess * sess)1394 static void rtrs_clt_start_hb(struct rtrs_clt_sess *sess)
1395 {
1396 rtrs_start_hb(&sess->s);
1397 }
1398
rtrs_clt_stop_hb(struct rtrs_clt_sess * sess)1399 static void rtrs_clt_stop_hb(struct rtrs_clt_sess *sess)
1400 {
1401 rtrs_stop_hb(&sess->s);
1402 }
1403
1404 static void rtrs_clt_reconnect_work(struct work_struct *work);
1405 static void rtrs_clt_close_work(struct work_struct *work);
1406
alloc_sess(struct rtrs_clt * clt,const struct rtrs_addr * path,size_t con_num,u16 max_segments,size_t max_segment_size)1407 static struct rtrs_clt_sess *alloc_sess(struct rtrs_clt *clt,
1408 const struct rtrs_addr *path,
1409 size_t con_num, u16 max_segments,
1410 size_t max_segment_size)
1411 {
1412 struct rtrs_clt_sess *sess;
1413 int err = -ENOMEM;
1414 int cpu;
1415
1416 sess = kzalloc(sizeof(*sess), GFP_KERNEL);
1417 if (!sess)
1418 goto err;
1419
1420 /* Extra connection for user messages */
1421 con_num += 1;
1422
1423 sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL);
1424 if (!sess->s.con)
1425 goto err_free_sess;
1426
1427 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL);
1428 if (!sess->stats)
1429 goto err_free_con;
1430
1431 mutex_init(&sess->init_mutex);
1432 uuid_gen(&sess->s.uuid);
1433 memcpy(&sess->s.dst_addr, path->dst,
1434 rdma_addr_size((struct sockaddr *)path->dst));
1435
1436 /*
1437 * rdma_resolve_addr() passes src_addr to cma_bind_addr, which
1438 * checks the sa_family to be non-zero. If user passed src_addr=NULL
1439 * the sess->src_addr will contain only zeros, which is then fine.
1440 */
1441 if (path->src)
1442 memcpy(&sess->s.src_addr, path->src,
1443 rdma_addr_size((struct sockaddr *)path->src));
1444 strlcpy(sess->s.sessname, clt->sessname, sizeof(sess->s.sessname));
1445 sess->s.con_num = con_num;
1446 sess->clt = clt;
1447 sess->max_pages_per_mr = max_segments * max_segment_size >> 12;
1448 init_waitqueue_head(&sess->state_wq);
1449 sess->state = RTRS_CLT_CONNECTING;
1450 atomic_set(&sess->connected_cnt, 0);
1451 INIT_WORK(&sess->close_work, rtrs_clt_close_work);
1452 INIT_DELAYED_WORK(&sess->reconnect_dwork, rtrs_clt_reconnect_work);
1453 rtrs_clt_init_hb(sess);
1454
1455 sess->mp_skip_entry = alloc_percpu(typeof(*sess->mp_skip_entry));
1456 if (!sess->mp_skip_entry)
1457 goto err_free_stats;
1458
1459 for_each_possible_cpu(cpu)
1460 INIT_LIST_HEAD(per_cpu_ptr(sess->mp_skip_entry, cpu));
1461
1462 err = rtrs_clt_init_stats(sess->stats);
1463 if (err)
1464 goto err_free_percpu;
1465
1466 return sess;
1467
1468 err_free_percpu:
1469 free_percpu(sess->mp_skip_entry);
1470 err_free_stats:
1471 kfree(sess->stats);
1472 err_free_con:
1473 kfree(sess->s.con);
1474 err_free_sess:
1475 kfree(sess);
1476 err:
1477 return ERR_PTR(err);
1478 }
1479
free_sess(struct rtrs_clt_sess * sess)1480 void free_sess(struct rtrs_clt_sess *sess)
1481 {
1482 free_percpu(sess->mp_skip_entry);
1483 mutex_destroy(&sess->init_mutex);
1484 kfree(sess->s.con);
1485 kfree(sess->rbufs);
1486 kfree(sess);
1487 }
1488
create_con(struct rtrs_clt_sess * sess,unsigned int cid)1489 static int create_con(struct rtrs_clt_sess *sess, unsigned int cid)
1490 {
1491 struct rtrs_clt_con *con;
1492
1493 con = kzalloc(sizeof(*con), GFP_KERNEL);
1494 if (!con)
1495 return -ENOMEM;
1496
1497 /* Map first two connections to the first CPU */
1498 con->cpu = (cid ? cid - 1 : 0) % nr_cpu_ids;
1499 con->c.cid = cid;
1500 con->c.sess = &sess->s;
1501 atomic_set(&con->io_cnt, 0);
1502
1503 sess->s.con[cid] = &con->c;
1504
1505 return 0;
1506 }
1507
destroy_con(struct rtrs_clt_con * con)1508 static void destroy_con(struct rtrs_clt_con *con)
1509 {
1510 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1511
1512 sess->s.con[con->c.cid] = NULL;
1513 kfree(con);
1514 }
1515
create_con_cq_qp(struct rtrs_clt_con * con)1516 static int create_con_cq_qp(struct rtrs_clt_con *con)
1517 {
1518 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1519 u16 wr_queue_size;
1520 int err, cq_vector;
1521 struct rtrs_msg_rkey_rsp *rsp;
1522
1523 /*
1524 * This function can fail, but still destroy_con_cq_qp() should
1525 * be called, this is because create_con_cq_qp() is called on cm
1526 * event path, thus caller/waiter never knows: have we failed before
1527 * create_con_cq_qp() or after. To solve this dilemma without
1528 * creating any additional flags just allow destroy_con_cq_qp() be
1529 * called many times.
1530 */
1531
1532 if (con->c.cid == 0) {
1533 /*
1534 * One completion for each receive and two for each send
1535 * (send request + registration)
1536 * + 2 for drain and heartbeat
1537 * in case qp gets into error state
1538 */
1539 wr_queue_size = SERVICE_CON_QUEUE_DEPTH * 3 + 2;
1540 /* We must be the first here */
1541 if (WARN_ON(sess->s.dev))
1542 return -EINVAL;
1543
1544 /*
1545 * The whole session uses device from user connection.
1546 * Be careful not to close user connection before ib dev
1547 * is gracefully put.
1548 */
1549 sess->s.dev = rtrs_ib_dev_find_or_add(con->c.cm_id->device,
1550 &dev_pd);
1551 if (!sess->s.dev) {
1552 rtrs_wrn(sess->clt,
1553 "rtrs_ib_dev_find_get_or_add(): no memory\n");
1554 return -ENOMEM;
1555 }
1556 sess->s.dev_ref = 1;
1557 query_fast_reg_mode(sess);
1558 } else {
1559 /*
1560 * Here we assume that session members are correctly set.
1561 * This is always true if user connection (cid == 0) is
1562 * established first.
1563 */
1564 if (WARN_ON(!sess->s.dev))
1565 return -EINVAL;
1566 if (WARN_ON(!sess->queue_depth))
1567 return -EINVAL;
1568
1569 /* Shared between connections */
1570 sess->s.dev_ref++;
1571 wr_queue_size =
1572 min_t(int, sess->s.dev->ib_dev->attrs.max_qp_wr,
1573 /* QD * (REQ + RSP + FR REGS or INVS) + drain */
1574 sess->queue_depth * 3 + 1);
1575 }
1576 /* alloc iu to recv new rkey reply when server reports flags set */
1577 if (sess->flags == RTRS_MSG_NEW_RKEY_F || con->c.cid == 0) {
1578 con->rsp_ius = rtrs_iu_alloc(wr_queue_size, sizeof(*rsp),
1579 GFP_KERNEL, sess->s.dev->ib_dev,
1580 DMA_FROM_DEVICE,
1581 rtrs_clt_rdma_done);
1582 if (!con->rsp_ius)
1583 return -ENOMEM;
1584 con->queue_size = wr_queue_size;
1585 }
1586 cq_vector = con->cpu % sess->s.dev->ib_dev->num_comp_vectors;
1587 err = rtrs_cq_qp_create(&sess->s, &con->c, sess->max_send_sge,
1588 cq_vector, wr_queue_size, wr_queue_size,
1589 IB_POLL_SOFTIRQ);
1590 /*
1591 * In case of error we do not bother to clean previous allocations,
1592 * since destroy_con_cq_qp() must be called.
1593 */
1594 return err;
1595 }
1596
destroy_con_cq_qp(struct rtrs_clt_con * con)1597 static void destroy_con_cq_qp(struct rtrs_clt_con *con)
1598 {
1599 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1600
1601 /*
1602 * Be careful here: destroy_con_cq_qp() can be called even
1603 * create_con_cq_qp() failed, see comments there.
1604 */
1605
1606 rtrs_cq_qp_destroy(&con->c);
1607 if (con->rsp_ius) {
1608 rtrs_iu_free(con->rsp_ius, DMA_FROM_DEVICE,
1609 sess->s.dev->ib_dev, con->queue_size);
1610 con->rsp_ius = NULL;
1611 con->queue_size = 0;
1612 }
1613 if (sess->s.dev_ref && !--sess->s.dev_ref) {
1614 rtrs_ib_dev_put(sess->s.dev);
1615 sess->s.dev = NULL;
1616 }
1617 }
1618
stop_cm(struct rtrs_clt_con * con)1619 static void stop_cm(struct rtrs_clt_con *con)
1620 {
1621 rdma_disconnect(con->c.cm_id);
1622 if (con->c.qp)
1623 ib_drain_qp(con->c.qp);
1624 }
1625
destroy_cm(struct rtrs_clt_con * con)1626 static void destroy_cm(struct rtrs_clt_con *con)
1627 {
1628 rdma_destroy_id(con->c.cm_id);
1629 con->c.cm_id = NULL;
1630 }
1631
rtrs_rdma_addr_resolved(struct rtrs_clt_con * con)1632 static int rtrs_rdma_addr_resolved(struct rtrs_clt_con *con)
1633 {
1634 struct rtrs_sess *s = con->c.sess;
1635 int err;
1636
1637 err = create_con_cq_qp(con);
1638 if (err) {
1639 rtrs_err(s, "create_con_cq_qp(), err: %d\n", err);
1640 return err;
1641 }
1642 err = rdma_resolve_route(con->c.cm_id, RTRS_CONNECT_TIMEOUT_MS);
1643 if (err) {
1644 rtrs_err(s, "Resolving route failed, err: %d\n", err);
1645 destroy_con_cq_qp(con);
1646 }
1647
1648 return err;
1649 }
1650
rtrs_rdma_route_resolved(struct rtrs_clt_con * con)1651 static int rtrs_rdma_route_resolved(struct rtrs_clt_con *con)
1652 {
1653 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1654 struct rtrs_clt *clt = sess->clt;
1655 struct rtrs_msg_conn_req msg;
1656 struct rdma_conn_param param;
1657
1658 int err;
1659
1660 param = (struct rdma_conn_param) {
1661 .retry_count = 7,
1662 .rnr_retry_count = 7,
1663 .private_data = &msg,
1664 .private_data_len = sizeof(msg),
1665 };
1666
1667 msg = (struct rtrs_msg_conn_req) {
1668 .magic = cpu_to_le16(RTRS_MAGIC),
1669 .version = cpu_to_le16(RTRS_PROTO_VER),
1670 .cid = cpu_to_le16(con->c.cid),
1671 .cid_num = cpu_to_le16(sess->s.con_num),
1672 .recon_cnt = cpu_to_le16(sess->s.recon_cnt),
1673 };
1674 uuid_copy(&msg.sess_uuid, &sess->s.uuid);
1675 uuid_copy(&msg.paths_uuid, &clt->paths_uuid);
1676
1677 err = rdma_connect_locked(con->c.cm_id, ¶m);
1678 if (err)
1679 rtrs_err(clt, "rdma_connect_locked(): %d\n", err);
1680
1681 return err;
1682 }
1683
rtrs_rdma_conn_established(struct rtrs_clt_con * con,struct rdma_cm_event * ev)1684 static int rtrs_rdma_conn_established(struct rtrs_clt_con *con,
1685 struct rdma_cm_event *ev)
1686 {
1687 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1688 struct rtrs_clt *clt = sess->clt;
1689 const struct rtrs_msg_conn_rsp *msg;
1690 u16 version, queue_depth;
1691 int errno;
1692 u8 len;
1693
1694 msg = ev->param.conn.private_data;
1695 len = ev->param.conn.private_data_len;
1696 if (len < sizeof(*msg)) {
1697 rtrs_err(clt, "Invalid RTRS connection response\n");
1698 return -ECONNRESET;
1699 }
1700 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
1701 rtrs_err(clt, "Invalid RTRS magic\n");
1702 return -ECONNRESET;
1703 }
1704 version = le16_to_cpu(msg->version);
1705 if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
1706 rtrs_err(clt, "Unsupported major RTRS version: %d, expected %d\n",
1707 version >> 8, RTRS_PROTO_VER_MAJOR);
1708 return -ECONNRESET;
1709 }
1710 errno = le16_to_cpu(msg->errno);
1711 if (errno) {
1712 rtrs_err(clt, "Invalid RTRS message: errno %d\n",
1713 errno);
1714 return -ECONNRESET;
1715 }
1716 if (con->c.cid == 0) {
1717 queue_depth = le16_to_cpu(msg->queue_depth);
1718
1719 if (queue_depth > MAX_SESS_QUEUE_DEPTH) {
1720 rtrs_err(clt, "Invalid RTRS message: queue=%d\n",
1721 queue_depth);
1722 return -ECONNRESET;
1723 }
1724 if (!sess->rbufs || sess->queue_depth < queue_depth) {
1725 kfree(sess->rbufs);
1726 sess->rbufs = kcalloc(queue_depth, sizeof(*sess->rbufs),
1727 GFP_KERNEL);
1728 if (!sess->rbufs)
1729 return -ENOMEM;
1730 }
1731 sess->queue_depth = queue_depth;
1732 sess->max_hdr_size = le32_to_cpu(msg->max_hdr_size);
1733 sess->max_io_size = le32_to_cpu(msg->max_io_size);
1734 sess->flags = le32_to_cpu(msg->flags);
1735 sess->chunk_size = sess->max_io_size + sess->max_hdr_size;
1736
1737 /*
1738 * Global queue depth and IO size is always a minimum.
1739 * If while a reconnection server sends us a value a bit
1740 * higher - client does not care and uses cached minimum.
1741 *
1742 * Since we can have several sessions (paths) restablishing
1743 * connections in parallel, use lock.
1744 */
1745 mutex_lock(&clt->paths_mutex);
1746 clt->queue_depth = min_not_zero(sess->queue_depth,
1747 clt->queue_depth);
1748 clt->max_io_size = min_not_zero(sess->max_io_size,
1749 clt->max_io_size);
1750 mutex_unlock(&clt->paths_mutex);
1751
1752 /*
1753 * Cache the hca_port and hca_name for sysfs
1754 */
1755 sess->hca_port = con->c.cm_id->port_num;
1756 scnprintf(sess->hca_name, sizeof(sess->hca_name),
1757 sess->s.dev->ib_dev->name);
1758 sess->s.src_addr = con->c.cm_id->route.addr.src_addr;
1759 }
1760
1761 return 0;
1762 }
1763
flag_success_on_conn(struct rtrs_clt_con * con)1764 static inline void flag_success_on_conn(struct rtrs_clt_con *con)
1765 {
1766 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1767
1768 atomic_inc(&sess->connected_cnt);
1769 con->cm_err = 1;
1770 }
1771
rtrs_rdma_conn_rejected(struct rtrs_clt_con * con,struct rdma_cm_event * ev)1772 static int rtrs_rdma_conn_rejected(struct rtrs_clt_con *con,
1773 struct rdma_cm_event *ev)
1774 {
1775 struct rtrs_sess *s = con->c.sess;
1776 const struct rtrs_msg_conn_rsp *msg;
1777 const char *rej_msg;
1778 int status, errno;
1779 u8 data_len;
1780
1781 status = ev->status;
1782 rej_msg = rdma_reject_msg(con->c.cm_id, status);
1783 msg = rdma_consumer_reject_data(con->c.cm_id, ev, &data_len);
1784
1785 if (msg && data_len >= sizeof(*msg)) {
1786 errno = (int16_t)le16_to_cpu(msg->errno);
1787 if (errno == -EBUSY)
1788 rtrs_err(s,
1789 "Previous session is still exists on the server, please reconnect later\n");
1790 else
1791 rtrs_err(s,
1792 "Connect rejected: status %d (%s), rtrs errno %d\n",
1793 status, rej_msg, errno);
1794 } else {
1795 rtrs_err(s,
1796 "Connect rejected but with malformed message: status %d (%s)\n",
1797 status, rej_msg);
1798 }
1799
1800 return -ECONNRESET;
1801 }
1802
rtrs_clt_close_conns(struct rtrs_clt_sess * sess,bool wait)1803 static void rtrs_clt_close_conns(struct rtrs_clt_sess *sess, bool wait)
1804 {
1805 if (rtrs_clt_change_state(sess, RTRS_CLT_CLOSING))
1806 queue_work(rtrs_wq, &sess->close_work);
1807 if (wait)
1808 flush_work(&sess->close_work);
1809 }
1810
flag_error_on_conn(struct rtrs_clt_con * con,int cm_err)1811 static inline void flag_error_on_conn(struct rtrs_clt_con *con, int cm_err)
1812 {
1813 if (con->cm_err == 1) {
1814 struct rtrs_clt_sess *sess;
1815
1816 sess = to_clt_sess(con->c.sess);
1817 if (atomic_dec_and_test(&sess->connected_cnt))
1818
1819 wake_up(&sess->state_wq);
1820 }
1821 con->cm_err = cm_err;
1822 }
1823
rtrs_clt_rdma_cm_handler(struct rdma_cm_id * cm_id,struct rdma_cm_event * ev)1824 static int rtrs_clt_rdma_cm_handler(struct rdma_cm_id *cm_id,
1825 struct rdma_cm_event *ev)
1826 {
1827 struct rtrs_clt_con *con = cm_id->context;
1828 struct rtrs_sess *s = con->c.sess;
1829 struct rtrs_clt_sess *sess = to_clt_sess(s);
1830 int cm_err = 0;
1831
1832 switch (ev->event) {
1833 case RDMA_CM_EVENT_ADDR_RESOLVED:
1834 cm_err = rtrs_rdma_addr_resolved(con);
1835 break;
1836 case RDMA_CM_EVENT_ROUTE_RESOLVED:
1837 cm_err = rtrs_rdma_route_resolved(con);
1838 break;
1839 case RDMA_CM_EVENT_ESTABLISHED:
1840 con->cm_err = rtrs_rdma_conn_established(con, ev);
1841 if (likely(!con->cm_err)) {
1842 /*
1843 * Report success and wake up. Here we abuse state_wq,
1844 * i.e. wake up without state change, but we set cm_err.
1845 */
1846 flag_success_on_conn(con);
1847 wake_up(&sess->state_wq);
1848 return 0;
1849 }
1850 break;
1851 case RDMA_CM_EVENT_REJECTED:
1852 cm_err = rtrs_rdma_conn_rejected(con, ev);
1853 break;
1854 case RDMA_CM_EVENT_CONNECT_ERROR:
1855 case RDMA_CM_EVENT_UNREACHABLE:
1856 rtrs_wrn(s, "CM error event %d\n", ev->event);
1857 cm_err = -ECONNRESET;
1858 break;
1859 case RDMA_CM_EVENT_ADDR_ERROR:
1860 case RDMA_CM_EVENT_ROUTE_ERROR:
1861 cm_err = -EHOSTUNREACH;
1862 break;
1863 case RDMA_CM_EVENT_DISCONNECTED:
1864 case RDMA_CM_EVENT_ADDR_CHANGE:
1865 case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1866 cm_err = -ECONNRESET;
1867 break;
1868 case RDMA_CM_EVENT_DEVICE_REMOVAL:
1869 /*
1870 * Device removal is a special case. Queue close and return 0.
1871 */
1872 rtrs_clt_close_conns(sess, false);
1873 return 0;
1874 default:
1875 rtrs_err(s, "Unexpected RDMA CM event (%d)\n", ev->event);
1876 cm_err = -ECONNRESET;
1877 break;
1878 }
1879
1880 if (cm_err) {
1881 /*
1882 * cm error makes sense only on connection establishing,
1883 * in other cases we rely on normal procedure of reconnecting.
1884 */
1885 flag_error_on_conn(con, cm_err);
1886 rtrs_rdma_error_recovery(con);
1887 }
1888
1889 return 0;
1890 }
1891
create_cm(struct rtrs_clt_con * con)1892 static int create_cm(struct rtrs_clt_con *con)
1893 {
1894 struct rtrs_sess *s = con->c.sess;
1895 struct rtrs_clt_sess *sess = to_clt_sess(s);
1896 struct rdma_cm_id *cm_id;
1897 int err;
1898
1899 cm_id = rdma_create_id(&init_net, rtrs_clt_rdma_cm_handler, con,
1900 sess->s.dst_addr.ss_family == AF_IB ?
1901 RDMA_PS_IB : RDMA_PS_TCP, IB_QPT_RC);
1902 if (IS_ERR(cm_id)) {
1903 err = PTR_ERR(cm_id);
1904 rtrs_err(s, "Failed to create CM ID, err: %d\n", err);
1905
1906 return err;
1907 }
1908 con->c.cm_id = cm_id;
1909 con->cm_err = 0;
1910 /* allow the port to be reused */
1911 err = rdma_set_reuseaddr(cm_id, 1);
1912 if (err != 0) {
1913 rtrs_err(s, "Set address reuse failed, err: %d\n", err);
1914 goto destroy_cm;
1915 }
1916 err = rdma_resolve_addr(cm_id, (struct sockaddr *)&sess->s.src_addr,
1917 (struct sockaddr *)&sess->s.dst_addr,
1918 RTRS_CONNECT_TIMEOUT_MS);
1919 if (err) {
1920 rtrs_err(s, "Failed to resolve address, err: %d\n", err);
1921 goto destroy_cm;
1922 }
1923 /*
1924 * Combine connection status and session events. This is needed
1925 * for waiting two possible cases: cm_err has something meaningful
1926 * or session state was really changed to error by device removal.
1927 */
1928 err = wait_event_interruptible_timeout(
1929 sess->state_wq,
1930 con->cm_err || sess->state != RTRS_CLT_CONNECTING,
1931 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS));
1932 if (err == 0 || err == -ERESTARTSYS) {
1933 if (err == 0)
1934 err = -ETIMEDOUT;
1935 /* Timedout or interrupted */
1936 goto errr;
1937 }
1938 if (con->cm_err < 0) {
1939 err = con->cm_err;
1940 goto errr;
1941 }
1942 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTING) {
1943 /* Device removal */
1944 err = -ECONNABORTED;
1945 goto errr;
1946 }
1947
1948 return 0;
1949
1950 errr:
1951 stop_cm(con);
1952 /* Is safe to call destroy if cq_qp is not inited */
1953 destroy_con_cq_qp(con);
1954 destroy_cm:
1955 destroy_cm(con);
1956
1957 return err;
1958 }
1959
rtrs_clt_sess_up(struct rtrs_clt_sess * sess)1960 static void rtrs_clt_sess_up(struct rtrs_clt_sess *sess)
1961 {
1962 struct rtrs_clt *clt = sess->clt;
1963 int up;
1964
1965 /*
1966 * We can fire RECONNECTED event only when all paths were
1967 * connected on rtrs_clt_open(), then each was disconnected
1968 * and the first one connected again. That's why this nasty
1969 * game with counter value.
1970 */
1971
1972 mutex_lock(&clt->paths_ev_mutex);
1973 up = ++clt->paths_up;
1974 /*
1975 * Here it is safe to access paths num directly since up counter
1976 * is greater than MAX_PATHS_NUM only while rtrs_clt_open() is
1977 * in progress, thus paths removals are impossible.
1978 */
1979 if (up > MAX_PATHS_NUM && up == MAX_PATHS_NUM + clt->paths_num)
1980 clt->paths_up = clt->paths_num;
1981 else if (up == 1)
1982 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_RECONNECTED);
1983 mutex_unlock(&clt->paths_ev_mutex);
1984
1985 /* Mark session as established */
1986 sess->established = true;
1987 sess->reconnect_attempts = 0;
1988 sess->stats->reconnects.successful_cnt++;
1989 }
1990
rtrs_clt_sess_down(struct rtrs_clt_sess * sess)1991 static void rtrs_clt_sess_down(struct rtrs_clt_sess *sess)
1992 {
1993 struct rtrs_clt *clt = sess->clt;
1994
1995 if (!sess->established)
1996 return;
1997
1998 sess->established = false;
1999 mutex_lock(&clt->paths_ev_mutex);
2000 WARN_ON(!clt->paths_up);
2001 if (--clt->paths_up == 0)
2002 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_DISCONNECTED);
2003 mutex_unlock(&clt->paths_ev_mutex);
2004 }
2005
rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_sess * sess)2006 static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_sess *sess)
2007 {
2008 struct rtrs_clt_con *con;
2009 unsigned int cid;
2010
2011 WARN_ON(READ_ONCE(sess->state) == RTRS_CLT_CONNECTED);
2012
2013 /*
2014 * Possible race with rtrs_clt_open(), when DEVICE_REMOVAL comes
2015 * exactly in between. Start destroying after it finishes.
2016 */
2017 mutex_lock(&sess->init_mutex);
2018 mutex_unlock(&sess->init_mutex);
2019
2020 /*
2021 * All IO paths must observe !CONNECTED state before we
2022 * free everything.
2023 */
2024 synchronize_rcu();
2025
2026 rtrs_clt_stop_hb(sess);
2027
2028 /*
2029 * The order it utterly crucial: firstly disconnect and complete all
2030 * rdma requests with error (thus set in_use=false for requests),
2031 * then fail outstanding requests checking in_use for each, and
2032 * eventually notify upper layer about session disconnection.
2033 */
2034
2035 for (cid = 0; cid < sess->s.con_num; cid++) {
2036 if (!sess->s.con[cid])
2037 break;
2038 con = to_clt_con(sess->s.con[cid]);
2039 stop_cm(con);
2040 }
2041 fail_all_outstanding_reqs(sess);
2042 free_sess_reqs(sess);
2043 rtrs_clt_sess_down(sess);
2044
2045 /*
2046 * Wait for graceful shutdown, namely when peer side invokes
2047 * rdma_disconnect(). 'connected_cnt' is decremented only on
2048 * CM events, thus if other side had crashed and hb has detected
2049 * something is wrong, here we will stuck for exactly timeout ms,
2050 * since CM does not fire anything. That is fine, we are not in
2051 * hurry.
2052 */
2053 wait_event_timeout(sess->state_wq, !atomic_read(&sess->connected_cnt),
2054 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS));
2055
2056 for (cid = 0; cid < sess->s.con_num; cid++) {
2057 if (!sess->s.con[cid])
2058 break;
2059 con = to_clt_con(sess->s.con[cid]);
2060 destroy_con_cq_qp(con);
2061 destroy_cm(con);
2062 destroy_con(con);
2063 }
2064 }
2065
xchg_sessions(struct rtrs_clt_sess __rcu ** rcu_ppcpu_path,struct rtrs_clt_sess * sess,struct rtrs_clt_sess * next)2066 static inline bool xchg_sessions(struct rtrs_clt_sess __rcu **rcu_ppcpu_path,
2067 struct rtrs_clt_sess *sess,
2068 struct rtrs_clt_sess *next)
2069 {
2070 struct rtrs_clt_sess **ppcpu_path;
2071
2072 /* Call cmpxchg() without sparse warnings */
2073 ppcpu_path = (typeof(ppcpu_path))rcu_ppcpu_path;
2074 return sess == cmpxchg(ppcpu_path, sess, next);
2075 }
2076
rtrs_clt_remove_path_from_arr(struct rtrs_clt_sess * sess)2077 static void rtrs_clt_remove_path_from_arr(struct rtrs_clt_sess *sess)
2078 {
2079 struct rtrs_clt *clt = sess->clt;
2080 struct rtrs_clt_sess *next;
2081 bool wait_for_grace = false;
2082 int cpu;
2083
2084 mutex_lock(&clt->paths_mutex);
2085 list_del_rcu(&sess->s.entry);
2086
2087 /* Make sure everybody observes path removal. */
2088 synchronize_rcu();
2089
2090 /*
2091 * At this point nobody sees @sess in the list, but still we have
2092 * dangling pointer @pcpu_path which _can_ point to @sess. Since
2093 * nobody can observe @sess in the list, we guarantee that IO path
2094 * will not assign @sess to @pcpu_path, i.e. @pcpu_path can be equal
2095 * to @sess, but can never again become @sess.
2096 */
2097
2098 /*
2099 * Decrement paths number only after grace period, because
2100 * caller of do_each_path() must firstly observe list without
2101 * path and only then decremented paths number.
2102 *
2103 * Otherwise there can be the following situation:
2104 * o Two paths exist and IO is coming.
2105 * o One path is removed:
2106 * CPU#0 CPU#1
2107 * do_each_path(): rtrs_clt_remove_path_from_arr():
2108 * path = get_next_path()
2109 * ^^^ list_del_rcu(path)
2110 * [!CONNECTED path] clt->paths_num--
2111 * ^^^^^^^^^
2112 * load clt->paths_num from 2 to 1
2113 * ^^^^^^^^^
2114 * sees 1
2115 *
2116 * path is observed as !CONNECTED, but do_each_path() loop
2117 * ends, because expression i < clt->paths_num is false.
2118 */
2119 clt->paths_num--;
2120
2121 /*
2122 * Get @next connection from current @sess which is going to be
2123 * removed. If @sess is the last element, then @next is NULL.
2124 */
2125 rcu_read_lock();
2126 next = list_next_or_null_rr_rcu(&clt->paths_list, &sess->s.entry,
2127 typeof(*next), s.entry);
2128 rcu_read_unlock();
2129
2130 /*
2131 * @pcpu paths can still point to the path which is going to be
2132 * removed, so change the pointer manually.
2133 */
2134 for_each_possible_cpu(cpu) {
2135 struct rtrs_clt_sess __rcu **ppcpu_path;
2136
2137 ppcpu_path = per_cpu_ptr(clt->pcpu_path, cpu);
2138 if (rcu_dereference_protected(*ppcpu_path,
2139 lockdep_is_held(&clt->paths_mutex)) != sess)
2140 /*
2141 * synchronize_rcu() was called just after deleting
2142 * entry from the list, thus IO code path cannot
2143 * change pointer back to the pointer which is going
2144 * to be removed, we are safe here.
2145 */
2146 continue;
2147
2148 /*
2149 * We race with IO code path, which also changes pointer,
2150 * thus we have to be careful not to overwrite it.
2151 */
2152 if (xchg_sessions(ppcpu_path, sess, next))
2153 /*
2154 * @ppcpu_path was successfully replaced with @next,
2155 * that means that someone could also pick up the
2156 * @sess and dereferencing it right now, so wait for
2157 * a grace period is required.
2158 */
2159 wait_for_grace = true;
2160 }
2161 if (wait_for_grace)
2162 synchronize_rcu();
2163
2164 mutex_unlock(&clt->paths_mutex);
2165 }
2166
rtrs_clt_add_path_to_arr(struct rtrs_clt_sess * sess,struct rtrs_addr * addr)2167 static void rtrs_clt_add_path_to_arr(struct rtrs_clt_sess *sess,
2168 struct rtrs_addr *addr)
2169 {
2170 struct rtrs_clt *clt = sess->clt;
2171
2172 mutex_lock(&clt->paths_mutex);
2173 clt->paths_num++;
2174
2175 list_add_tail_rcu(&sess->s.entry, &clt->paths_list);
2176 mutex_unlock(&clt->paths_mutex);
2177 }
2178
rtrs_clt_close_work(struct work_struct * work)2179 static void rtrs_clt_close_work(struct work_struct *work)
2180 {
2181 struct rtrs_clt_sess *sess;
2182
2183 sess = container_of(work, struct rtrs_clt_sess, close_work);
2184
2185 cancel_delayed_work_sync(&sess->reconnect_dwork);
2186 rtrs_clt_stop_and_destroy_conns(sess);
2187 rtrs_clt_change_state(sess, RTRS_CLT_CLOSED);
2188 }
2189
init_conns(struct rtrs_clt_sess * sess)2190 static int init_conns(struct rtrs_clt_sess *sess)
2191 {
2192 unsigned int cid;
2193 int err;
2194
2195 /*
2196 * On every new session connections increase reconnect counter
2197 * to avoid clashes with previous sessions not yet closed
2198 * sessions on a server side.
2199 */
2200 sess->s.recon_cnt++;
2201
2202 /* Establish all RDMA connections */
2203 for (cid = 0; cid < sess->s.con_num; cid++) {
2204 err = create_con(sess, cid);
2205 if (err)
2206 goto destroy;
2207
2208 err = create_cm(to_clt_con(sess->s.con[cid]));
2209 if (err) {
2210 destroy_con(to_clt_con(sess->s.con[cid]));
2211 goto destroy;
2212 }
2213 }
2214 err = alloc_sess_reqs(sess);
2215 if (err)
2216 goto destroy;
2217
2218 rtrs_clt_start_hb(sess);
2219
2220 return 0;
2221
2222 destroy:
2223 while (cid--) {
2224 struct rtrs_clt_con *con = to_clt_con(sess->s.con[cid]);
2225
2226 stop_cm(con);
2227 destroy_con_cq_qp(con);
2228 destroy_cm(con);
2229 destroy_con(con);
2230 }
2231 /*
2232 * If we've never taken async path and got an error, say,
2233 * doing rdma_resolve_addr(), switch to CONNECTION_ERR state
2234 * manually to keep reconnecting.
2235 */
2236 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR);
2237
2238 return err;
2239 }
2240
rtrs_clt_info_req_done(struct ib_cq * cq,struct ib_wc * wc)2241 static void rtrs_clt_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
2242 {
2243 struct rtrs_clt_con *con = cq->cq_context;
2244 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
2245 struct rtrs_iu *iu;
2246
2247 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
2248 rtrs_iu_free(iu, DMA_TO_DEVICE, sess->s.dev->ib_dev, 1);
2249
2250 if (unlikely(wc->status != IB_WC_SUCCESS)) {
2251 rtrs_err(sess->clt, "Sess info request send failed: %s\n",
2252 ib_wc_status_msg(wc->status));
2253 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR);
2254 return;
2255 }
2256
2257 rtrs_clt_update_wc_stats(con);
2258 }
2259
process_info_rsp(struct rtrs_clt_sess * sess,const struct rtrs_msg_info_rsp * msg)2260 static int process_info_rsp(struct rtrs_clt_sess *sess,
2261 const struct rtrs_msg_info_rsp *msg)
2262 {
2263 unsigned int sg_cnt, total_len;
2264 int i, sgi;
2265
2266 sg_cnt = le16_to_cpu(msg->sg_cnt);
2267 if (unlikely(!sg_cnt))
2268 return -EINVAL;
2269 /*
2270 * Check if IB immediate data size is enough to hold the mem_id and
2271 * the offset inside the memory chunk.
2272 */
2273 if (unlikely((ilog2(sg_cnt - 1) + 1) +
2274 (ilog2(sess->chunk_size - 1) + 1) >
2275 MAX_IMM_PAYL_BITS)) {
2276 rtrs_err(sess->clt,
2277 "RDMA immediate size (%db) not enough to encode %d buffers of size %dB\n",
2278 MAX_IMM_PAYL_BITS, sg_cnt, sess->chunk_size);
2279 return -EINVAL;
2280 }
2281 if (unlikely(!sg_cnt || (sess->queue_depth % sg_cnt))) {
2282 rtrs_err(sess->clt, "Incorrect sg_cnt %d, is not multiple\n",
2283 sg_cnt);
2284 return -EINVAL;
2285 }
2286 total_len = 0;
2287 for (sgi = 0, i = 0; sgi < sg_cnt && i < sess->queue_depth; sgi++) {
2288 const struct rtrs_sg_desc *desc = &msg->desc[sgi];
2289 u32 len, rkey;
2290 u64 addr;
2291
2292 addr = le64_to_cpu(desc->addr);
2293 rkey = le32_to_cpu(desc->key);
2294 len = le32_to_cpu(desc->len);
2295
2296 total_len += len;
2297
2298 if (unlikely(!len || (len % sess->chunk_size))) {
2299 rtrs_err(sess->clt, "Incorrect [%d].len %d\n", sgi,
2300 len);
2301 return -EINVAL;
2302 }
2303 for ( ; len && i < sess->queue_depth; i++) {
2304 sess->rbufs[i].addr = addr;
2305 sess->rbufs[i].rkey = rkey;
2306
2307 len -= sess->chunk_size;
2308 addr += sess->chunk_size;
2309 }
2310 }
2311 /* Sanity check */
2312 if (unlikely(sgi != sg_cnt || i != sess->queue_depth)) {
2313 rtrs_err(sess->clt, "Incorrect sg vector, not fully mapped\n");
2314 return -EINVAL;
2315 }
2316 if (unlikely(total_len != sess->chunk_size * sess->queue_depth)) {
2317 rtrs_err(sess->clt, "Incorrect total_len %d\n", total_len);
2318 return -EINVAL;
2319 }
2320
2321 return 0;
2322 }
2323
rtrs_clt_info_rsp_done(struct ib_cq * cq,struct ib_wc * wc)2324 static void rtrs_clt_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
2325 {
2326 struct rtrs_clt_con *con = cq->cq_context;
2327 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
2328 struct rtrs_msg_info_rsp *msg;
2329 enum rtrs_clt_state state;
2330 struct rtrs_iu *iu;
2331 size_t rx_sz;
2332 int err;
2333
2334 state = RTRS_CLT_CONNECTING_ERR;
2335
2336 WARN_ON(con->c.cid);
2337 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
2338 if (unlikely(wc->status != IB_WC_SUCCESS)) {
2339 rtrs_err(sess->clt, "Sess info response recv failed: %s\n",
2340 ib_wc_status_msg(wc->status));
2341 goto out;
2342 }
2343 WARN_ON(wc->opcode != IB_WC_RECV);
2344
2345 if (unlikely(wc->byte_len < sizeof(*msg))) {
2346 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n",
2347 wc->byte_len);
2348 goto out;
2349 }
2350 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
2351 iu->size, DMA_FROM_DEVICE);
2352 msg = iu->buf;
2353 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_RSP)) {
2354 rtrs_err(sess->clt, "Sess info response is malformed: type %d\n",
2355 le16_to_cpu(msg->type));
2356 goto out;
2357 }
2358 rx_sz = sizeof(*msg);
2359 rx_sz += sizeof(msg->desc[0]) * le16_to_cpu(msg->sg_cnt);
2360 if (unlikely(wc->byte_len < rx_sz)) {
2361 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n",
2362 wc->byte_len);
2363 goto out;
2364 }
2365 err = process_info_rsp(sess, msg);
2366 if (unlikely(err))
2367 goto out;
2368
2369 err = post_recv_sess(sess);
2370 if (unlikely(err))
2371 goto out;
2372
2373 state = RTRS_CLT_CONNECTED;
2374
2375 out:
2376 rtrs_clt_update_wc_stats(con);
2377 rtrs_iu_free(iu, DMA_FROM_DEVICE, sess->s.dev->ib_dev, 1);
2378 rtrs_clt_change_state(sess, state);
2379 }
2380
rtrs_send_sess_info(struct rtrs_clt_sess * sess)2381 static int rtrs_send_sess_info(struct rtrs_clt_sess *sess)
2382 {
2383 struct rtrs_clt_con *usr_con = to_clt_con(sess->s.con[0]);
2384 struct rtrs_msg_info_req *msg;
2385 struct rtrs_iu *tx_iu, *rx_iu;
2386 size_t rx_sz;
2387 int err;
2388
2389 rx_sz = sizeof(struct rtrs_msg_info_rsp);
2390 rx_sz += sizeof(u64) * MAX_SESS_QUEUE_DEPTH;
2391
2392 tx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), GFP_KERNEL,
2393 sess->s.dev->ib_dev, DMA_TO_DEVICE,
2394 rtrs_clt_info_req_done);
2395 rx_iu = rtrs_iu_alloc(1, rx_sz, GFP_KERNEL, sess->s.dev->ib_dev,
2396 DMA_FROM_DEVICE, rtrs_clt_info_rsp_done);
2397 if (unlikely(!tx_iu || !rx_iu)) {
2398 err = -ENOMEM;
2399 goto out;
2400 }
2401 /* Prepare for getting info response */
2402 err = rtrs_iu_post_recv(&usr_con->c, rx_iu);
2403 if (unlikely(err)) {
2404 rtrs_err(sess->clt, "rtrs_iu_post_recv(), err: %d\n", err);
2405 goto out;
2406 }
2407 rx_iu = NULL;
2408
2409 msg = tx_iu->buf;
2410 msg->type = cpu_to_le16(RTRS_MSG_INFO_REQ);
2411 memcpy(msg->sessname, sess->s.sessname, sizeof(msg->sessname));
2412
2413 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr,
2414 tx_iu->size, DMA_TO_DEVICE);
2415
2416 /* Send info request */
2417 err = rtrs_iu_post_send(&usr_con->c, tx_iu, sizeof(*msg), NULL);
2418 if (unlikely(err)) {
2419 rtrs_err(sess->clt, "rtrs_iu_post_send(), err: %d\n", err);
2420 goto out;
2421 }
2422 tx_iu = NULL;
2423
2424 /* Wait for state change */
2425 wait_event_interruptible_timeout(sess->state_wq,
2426 sess->state != RTRS_CLT_CONNECTING,
2427 msecs_to_jiffies(
2428 RTRS_CONNECT_TIMEOUT_MS));
2429 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)) {
2430 if (READ_ONCE(sess->state) == RTRS_CLT_CONNECTING_ERR)
2431 err = -ECONNRESET;
2432 else
2433 err = -ETIMEDOUT;
2434 goto out;
2435 }
2436
2437 out:
2438 if (tx_iu)
2439 rtrs_iu_free(tx_iu, DMA_TO_DEVICE, sess->s.dev->ib_dev, 1);
2440 if (rx_iu)
2441 rtrs_iu_free(rx_iu, DMA_FROM_DEVICE, sess->s.dev->ib_dev, 1);
2442 if (unlikely(err))
2443 /* If we've never taken async path because of malloc problems */
2444 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR);
2445
2446 return err;
2447 }
2448
2449 /**
2450 * init_sess() - establishes all session connections and does handshake
2451 * @sess: client session.
2452 * In case of error full close or reconnect procedure should be taken,
2453 * because reconnect or close async works can be started.
2454 */
init_sess(struct rtrs_clt_sess * sess)2455 static int init_sess(struct rtrs_clt_sess *sess)
2456 {
2457 int err;
2458
2459 mutex_lock(&sess->init_mutex);
2460 err = init_conns(sess);
2461 if (err) {
2462 rtrs_err(sess->clt, "init_conns(), err: %d\n", err);
2463 goto out;
2464 }
2465 err = rtrs_send_sess_info(sess);
2466 if (err) {
2467 rtrs_err(sess->clt, "rtrs_send_sess_info(), err: %d\n", err);
2468 goto out;
2469 }
2470 rtrs_clt_sess_up(sess);
2471 out:
2472 mutex_unlock(&sess->init_mutex);
2473
2474 return err;
2475 }
2476
rtrs_clt_reconnect_work(struct work_struct * work)2477 static void rtrs_clt_reconnect_work(struct work_struct *work)
2478 {
2479 struct rtrs_clt_sess *sess;
2480 struct rtrs_clt *clt;
2481 unsigned int delay_ms;
2482 int err;
2483
2484 sess = container_of(to_delayed_work(work), struct rtrs_clt_sess,
2485 reconnect_dwork);
2486 clt = sess->clt;
2487
2488 if (READ_ONCE(sess->state) != RTRS_CLT_RECONNECTING)
2489 return;
2490
2491 if (sess->reconnect_attempts >= clt->max_reconnect_attempts) {
2492 /* Close a session completely if max attempts is reached */
2493 rtrs_clt_close_conns(sess, false);
2494 return;
2495 }
2496 sess->reconnect_attempts++;
2497
2498 /* Stop everything */
2499 rtrs_clt_stop_and_destroy_conns(sess);
2500 msleep(RTRS_RECONNECT_BACKOFF);
2501 if (rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING)) {
2502 err = init_sess(sess);
2503 if (err)
2504 goto reconnect_again;
2505 }
2506
2507 return;
2508
2509 reconnect_again:
2510 if (rtrs_clt_change_state(sess, RTRS_CLT_RECONNECTING)) {
2511 sess->stats->reconnects.fail_cnt++;
2512 delay_ms = clt->reconnect_delay_sec * 1000;
2513 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork,
2514 msecs_to_jiffies(delay_ms +
2515 prandom_u32() %
2516 RTRS_RECONNECT_SEED));
2517 }
2518 }
2519
rtrs_clt_dev_release(struct device * dev)2520 static void rtrs_clt_dev_release(struct device *dev)
2521 {
2522 struct rtrs_clt *clt = container_of(dev, struct rtrs_clt, dev);
2523
2524 kfree(clt);
2525 }
2526
alloc_clt(const char * sessname,size_t paths_num,u16 port,size_t pdu_sz,void * priv,void (* link_ev)(void * priv,enum rtrs_clt_link_ev ev),unsigned int max_segments,size_t max_segment_size,unsigned int reconnect_delay_sec,unsigned int max_reconnect_attempts)2527 static struct rtrs_clt *alloc_clt(const char *sessname, size_t paths_num,
2528 u16 port, size_t pdu_sz, void *priv,
2529 void (*link_ev)(void *priv,
2530 enum rtrs_clt_link_ev ev),
2531 unsigned int max_segments,
2532 size_t max_segment_size,
2533 unsigned int reconnect_delay_sec,
2534 unsigned int max_reconnect_attempts)
2535 {
2536 struct rtrs_clt *clt;
2537 int err;
2538
2539 if (!paths_num || paths_num > MAX_PATHS_NUM)
2540 return ERR_PTR(-EINVAL);
2541
2542 if (strlen(sessname) >= sizeof(clt->sessname))
2543 return ERR_PTR(-EINVAL);
2544
2545 clt = kzalloc(sizeof(*clt), GFP_KERNEL);
2546 if (!clt)
2547 return ERR_PTR(-ENOMEM);
2548
2549 clt->pcpu_path = alloc_percpu(typeof(*clt->pcpu_path));
2550 if (!clt->pcpu_path) {
2551 kfree(clt);
2552 return ERR_PTR(-ENOMEM);
2553 }
2554
2555 uuid_gen(&clt->paths_uuid);
2556 INIT_LIST_HEAD_RCU(&clt->paths_list);
2557 clt->paths_num = paths_num;
2558 clt->paths_up = MAX_PATHS_NUM;
2559 clt->port = port;
2560 clt->pdu_sz = pdu_sz;
2561 clt->max_segments = max_segments;
2562 clt->max_segment_size = max_segment_size;
2563 clt->reconnect_delay_sec = reconnect_delay_sec;
2564 clt->max_reconnect_attempts = max_reconnect_attempts;
2565 clt->priv = priv;
2566 clt->link_ev = link_ev;
2567 clt->mp_policy = MP_POLICY_MIN_INFLIGHT;
2568 strlcpy(clt->sessname, sessname, sizeof(clt->sessname));
2569 init_waitqueue_head(&clt->permits_wait);
2570 mutex_init(&clt->paths_ev_mutex);
2571 mutex_init(&clt->paths_mutex);
2572
2573 clt->dev.class = rtrs_clt_dev_class;
2574 clt->dev.release = rtrs_clt_dev_release;
2575 err = dev_set_name(&clt->dev, "%s", sessname);
2576 if (err) {
2577 free_percpu(clt->pcpu_path);
2578 kfree(clt);
2579 return ERR_PTR(err);
2580 }
2581 /*
2582 * Suppress user space notification until
2583 * sysfs files are created
2584 */
2585 dev_set_uevent_suppress(&clt->dev, true);
2586 err = device_register(&clt->dev);
2587 if (err) {
2588 free_percpu(clt->pcpu_path);
2589 put_device(&clt->dev);
2590 return ERR_PTR(err);
2591 }
2592
2593 clt->kobj_paths = kobject_create_and_add("paths", &clt->dev.kobj);
2594 if (!clt->kobj_paths) {
2595 free_percpu(clt->pcpu_path);
2596 device_unregister(&clt->dev);
2597 return NULL;
2598 }
2599 err = rtrs_clt_create_sysfs_root_files(clt);
2600 if (err) {
2601 free_percpu(clt->pcpu_path);
2602 kobject_del(clt->kobj_paths);
2603 kobject_put(clt->kobj_paths);
2604 device_unregister(&clt->dev);
2605 return ERR_PTR(err);
2606 }
2607 dev_set_uevent_suppress(&clt->dev, false);
2608 kobject_uevent(&clt->dev.kobj, KOBJ_ADD);
2609
2610 return clt;
2611 }
2612
wait_for_inflight_permits(struct rtrs_clt * clt)2613 static void wait_for_inflight_permits(struct rtrs_clt *clt)
2614 {
2615 if (clt->permits_map) {
2616 size_t sz = clt->queue_depth;
2617
2618 wait_event(clt->permits_wait,
2619 find_first_bit(clt->permits_map, sz) >= sz);
2620 }
2621 }
2622
free_clt(struct rtrs_clt * clt)2623 static void free_clt(struct rtrs_clt *clt)
2624 {
2625 wait_for_inflight_permits(clt);
2626 free_permits(clt);
2627 free_percpu(clt->pcpu_path);
2628 mutex_destroy(&clt->paths_ev_mutex);
2629 mutex_destroy(&clt->paths_mutex);
2630 /* release callback will free clt in last put */
2631 device_unregister(&clt->dev);
2632 }
2633
2634 /**
2635 * rtrs_clt_open() - Open a session to an RTRS server
2636 * @ops: holds the link event callback and the private pointer.
2637 * @sessname: name of the session
2638 * @paths: Paths to be established defined by their src and dst addresses
2639 * @paths_num: Number of elements in the @paths array
2640 * @port: port to be used by the RTRS session
2641 * @pdu_sz: Size of extra payload which can be accessed after permit allocation.
2642 * @reconnect_delay_sec: time between reconnect tries
2643 * @max_segments: Max. number of segments per IO request
2644 * @max_segment_size: Max. size of one segment
2645 * @max_reconnect_attempts: Number of times to reconnect on error before giving
2646 * up, 0 for * disabled, -1 for forever
2647 *
2648 * Starts session establishment with the rtrs_server. The function can block
2649 * up to ~2000ms before it returns.
2650 *
2651 * Return a valid pointer on success otherwise PTR_ERR.
2652 */
rtrs_clt_open(struct rtrs_clt_ops * ops,const char * sessname,const struct rtrs_addr * paths,size_t paths_num,u16 port,size_t pdu_sz,u8 reconnect_delay_sec,u16 max_segments,size_t max_segment_size,s16 max_reconnect_attempts)2653 struct rtrs_clt *rtrs_clt_open(struct rtrs_clt_ops *ops,
2654 const char *sessname,
2655 const struct rtrs_addr *paths,
2656 size_t paths_num, u16 port,
2657 size_t pdu_sz, u8 reconnect_delay_sec,
2658 u16 max_segments,
2659 size_t max_segment_size,
2660 s16 max_reconnect_attempts)
2661 {
2662 struct rtrs_clt_sess *sess, *tmp;
2663 struct rtrs_clt *clt;
2664 int err, i;
2665
2666 clt = alloc_clt(sessname, paths_num, port, pdu_sz, ops->priv,
2667 ops->link_ev,
2668 max_segments, max_segment_size, reconnect_delay_sec,
2669 max_reconnect_attempts);
2670 if (IS_ERR(clt)) {
2671 err = PTR_ERR(clt);
2672 goto out;
2673 }
2674 for (i = 0; i < paths_num; i++) {
2675 struct rtrs_clt_sess *sess;
2676
2677 sess = alloc_sess(clt, &paths[i], nr_cpu_ids,
2678 max_segments, max_segment_size);
2679 if (IS_ERR(sess)) {
2680 err = PTR_ERR(sess);
2681 goto close_all_sess;
2682 }
2683 list_add_tail_rcu(&sess->s.entry, &clt->paths_list);
2684
2685 err = init_sess(sess);
2686 if (err) {
2687 list_del_rcu(&sess->s.entry);
2688 rtrs_clt_close_conns(sess, true);
2689 free_sess(sess);
2690 goto close_all_sess;
2691 }
2692
2693 err = rtrs_clt_create_sess_files(sess);
2694 if (err) {
2695 list_del_rcu(&sess->s.entry);
2696 rtrs_clt_close_conns(sess, true);
2697 free_sess(sess);
2698 goto close_all_sess;
2699 }
2700 }
2701 err = alloc_permits(clt);
2702 if (err)
2703 goto close_all_sess;
2704
2705 return clt;
2706
2707 close_all_sess:
2708 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) {
2709 rtrs_clt_destroy_sess_files(sess, NULL);
2710 rtrs_clt_close_conns(sess, true);
2711 kobject_put(&sess->kobj);
2712 }
2713 rtrs_clt_destroy_sysfs_root_files(clt);
2714 rtrs_clt_destroy_sysfs_root_folders(clt);
2715 free_clt(clt);
2716
2717 out:
2718 return ERR_PTR(err);
2719 }
2720 EXPORT_SYMBOL(rtrs_clt_open);
2721
2722 /**
2723 * rtrs_clt_close() - Close a session
2724 * @clt: Session handle. Session is freed upon return.
2725 */
rtrs_clt_close(struct rtrs_clt * clt)2726 void rtrs_clt_close(struct rtrs_clt *clt)
2727 {
2728 struct rtrs_clt_sess *sess, *tmp;
2729
2730 /* Firstly forbid sysfs access */
2731 rtrs_clt_destroy_sysfs_root_files(clt);
2732 rtrs_clt_destroy_sysfs_root_folders(clt);
2733
2734 /* Now it is safe to iterate over all paths without locks */
2735 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) {
2736 rtrs_clt_destroy_sess_files(sess, NULL);
2737 rtrs_clt_close_conns(sess, true);
2738 kobject_put(&sess->kobj);
2739 }
2740 free_clt(clt);
2741 }
2742 EXPORT_SYMBOL(rtrs_clt_close);
2743
rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_sess * sess)2744 int rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_sess *sess)
2745 {
2746 enum rtrs_clt_state old_state;
2747 int err = -EBUSY;
2748 bool changed;
2749
2750 changed = rtrs_clt_change_state_get_old(sess, RTRS_CLT_RECONNECTING,
2751 &old_state);
2752 if (changed) {
2753 sess->reconnect_attempts = 0;
2754 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 0);
2755 }
2756 if (changed || old_state == RTRS_CLT_RECONNECTING) {
2757 /*
2758 * flush_delayed_work() queues pending work for immediate
2759 * execution, so do the flush if we have queued something
2760 * right now or work is pending.
2761 */
2762 flush_delayed_work(&sess->reconnect_dwork);
2763 err = (READ_ONCE(sess->state) ==
2764 RTRS_CLT_CONNECTED ? 0 : -ENOTCONN);
2765 }
2766
2767 return err;
2768 }
2769
rtrs_clt_disconnect_from_sysfs(struct rtrs_clt_sess * sess)2770 int rtrs_clt_disconnect_from_sysfs(struct rtrs_clt_sess *sess)
2771 {
2772 rtrs_clt_close_conns(sess, true);
2773
2774 return 0;
2775 }
2776
rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_sess * sess,const struct attribute * sysfs_self)2777 int rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_sess *sess,
2778 const struct attribute *sysfs_self)
2779 {
2780 enum rtrs_clt_state old_state;
2781 bool changed;
2782
2783 /*
2784 * Continue stopping path till state was changed to DEAD or
2785 * state was observed as DEAD:
2786 * 1. State was changed to DEAD - we were fast and nobody
2787 * invoked rtrs_clt_reconnect(), which can again start
2788 * reconnecting.
2789 * 2. State was observed as DEAD - we have someone in parallel
2790 * removing the path.
2791 */
2792 do {
2793 rtrs_clt_close_conns(sess, true);
2794 changed = rtrs_clt_change_state_get_old(sess,
2795 RTRS_CLT_DEAD,
2796 &old_state);
2797 } while (!changed && old_state != RTRS_CLT_DEAD);
2798
2799 if (likely(changed)) {
2800 rtrs_clt_destroy_sess_files(sess, sysfs_self);
2801 rtrs_clt_remove_path_from_arr(sess);
2802 kobject_put(&sess->kobj);
2803 }
2804
2805 return 0;
2806 }
2807
rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt * clt,int value)2808 void rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt *clt, int value)
2809 {
2810 clt->max_reconnect_attempts = (unsigned int)value;
2811 }
2812
rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt * clt)2813 int rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt *clt)
2814 {
2815 return (int)clt->max_reconnect_attempts;
2816 }
2817
2818 /**
2819 * rtrs_clt_request() - Request data transfer to/from server via RDMA.
2820 *
2821 * @dir: READ/WRITE
2822 * @ops: callback function to be called as confirmation, and the pointer.
2823 * @clt: Session
2824 * @permit: Preallocated permit
2825 * @vec: Message that is sent to server together with the request.
2826 * Sum of len of all @vec elements limited to <= IO_MSG_SIZE.
2827 * Since the msg is copied internally it can be allocated on stack.
2828 * @nr: Number of elements in @vec.
2829 * @data_len: length of data sent to/from server
2830 * @sg: Pages to be sent/received to/from server.
2831 * @sg_cnt: Number of elements in the @sg
2832 *
2833 * Return:
2834 * 0: Success
2835 * <0: Error
2836 *
2837 * On dir=READ rtrs client will request a data transfer from Server to client.
2838 * The data that the server will respond with will be stored in @sg when
2839 * the user receives an %RTRS_CLT_RDMA_EV_RDMA_REQUEST_WRITE_COMPL event.
2840 * On dir=WRITE rtrs client will rdma write data in sg to server side.
2841 */
rtrs_clt_request(int dir,struct rtrs_clt_req_ops * ops,struct rtrs_clt * clt,struct rtrs_permit * permit,const struct kvec * vec,size_t nr,size_t data_len,struct scatterlist * sg,unsigned int sg_cnt)2842 int rtrs_clt_request(int dir, struct rtrs_clt_req_ops *ops,
2843 struct rtrs_clt *clt, struct rtrs_permit *permit,
2844 const struct kvec *vec, size_t nr, size_t data_len,
2845 struct scatterlist *sg, unsigned int sg_cnt)
2846 {
2847 struct rtrs_clt_io_req *req;
2848 struct rtrs_clt_sess *sess;
2849
2850 enum dma_data_direction dma_dir;
2851 int err = -ECONNABORTED, i;
2852 size_t usr_len, hdr_len;
2853 struct path_it it;
2854
2855 /* Get kvec length */
2856 for (i = 0, usr_len = 0; i < nr; i++)
2857 usr_len += vec[i].iov_len;
2858
2859 if (dir == READ) {
2860 hdr_len = sizeof(struct rtrs_msg_rdma_read) +
2861 sg_cnt * sizeof(struct rtrs_sg_desc);
2862 dma_dir = DMA_FROM_DEVICE;
2863 } else {
2864 hdr_len = sizeof(struct rtrs_msg_rdma_write);
2865 dma_dir = DMA_TO_DEVICE;
2866 }
2867
2868 rcu_read_lock();
2869 for (path_it_init(&it, clt);
2870 (sess = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) {
2871 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED))
2872 continue;
2873
2874 if (unlikely(usr_len + hdr_len > sess->max_hdr_size)) {
2875 rtrs_wrn_rl(sess->clt,
2876 "%s request failed, user message size is %zu and header length %zu, but max size is %u\n",
2877 dir == READ ? "Read" : "Write",
2878 usr_len, hdr_len, sess->max_hdr_size);
2879 err = -EMSGSIZE;
2880 break;
2881 }
2882 req = rtrs_clt_get_req(sess, ops->conf_fn, permit, ops->priv,
2883 vec, usr_len, sg, sg_cnt, data_len,
2884 dma_dir);
2885 if (dir == READ)
2886 err = rtrs_clt_read_req(req);
2887 else
2888 err = rtrs_clt_write_req(req);
2889 if (unlikely(err)) {
2890 req->in_use = false;
2891 continue;
2892 }
2893 /* Success path */
2894 break;
2895 }
2896 path_it_deinit(&it);
2897 rcu_read_unlock();
2898
2899 return err;
2900 }
2901 EXPORT_SYMBOL(rtrs_clt_request);
2902
2903 /**
2904 * rtrs_clt_query() - queries RTRS session attributes
2905 *@clt: session pointer
2906 *@attr: query results for session attributes.
2907 * Returns:
2908 * 0 on success
2909 * -ECOMM no connection to the server
2910 */
rtrs_clt_query(struct rtrs_clt * clt,struct rtrs_attrs * attr)2911 int rtrs_clt_query(struct rtrs_clt *clt, struct rtrs_attrs *attr)
2912 {
2913 if (!rtrs_clt_is_connected(clt))
2914 return -ECOMM;
2915
2916 attr->queue_depth = clt->queue_depth;
2917 attr->max_io_size = clt->max_io_size;
2918 attr->sess_kobj = &clt->dev.kobj;
2919 strlcpy(attr->sessname, clt->sessname, sizeof(attr->sessname));
2920
2921 return 0;
2922 }
2923 EXPORT_SYMBOL(rtrs_clt_query);
2924
rtrs_clt_create_path_from_sysfs(struct rtrs_clt * clt,struct rtrs_addr * addr)2925 int rtrs_clt_create_path_from_sysfs(struct rtrs_clt *clt,
2926 struct rtrs_addr *addr)
2927 {
2928 struct rtrs_clt_sess *sess;
2929 int err;
2930
2931 sess = alloc_sess(clt, addr, nr_cpu_ids, clt->max_segments,
2932 clt->max_segment_size);
2933 if (IS_ERR(sess))
2934 return PTR_ERR(sess);
2935
2936 /*
2937 * It is totally safe to add path in CONNECTING state: coming
2938 * IO will never grab it. Also it is very important to add
2939 * path before init, since init fires LINK_CONNECTED event.
2940 */
2941 rtrs_clt_add_path_to_arr(sess, addr);
2942
2943 err = init_sess(sess);
2944 if (err)
2945 goto close_sess;
2946
2947 err = rtrs_clt_create_sess_files(sess);
2948 if (err)
2949 goto close_sess;
2950
2951 return 0;
2952
2953 close_sess:
2954 rtrs_clt_remove_path_from_arr(sess);
2955 rtrs_clt_close_conns(sess, true);
2956 free_sess(sess);
2957
2958 return err;
2959 }
2960
rtrs_clt_ib_dev_init(struct rtrs_ib_dev * dev)2961 static int rtrs_clt_ib_dev_init(struct rtrs_ib_dev *dev)
2962 {
2963 if (!(dev->ib_dev->attrs.device_cap_flags &
2964 IB_DEVICE_MEM_MGT_EXTENSIONS)) {
2965 pr_err("Memory registrations not supported.\n");
2966 return -ENOTSUPP;
2967 }
2968
2969 return 0;
2970 }
2971
2972 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops = {
2973 .init = rtrs_clt_ib_dev_init
2974 };
2975
rtrs_client_init(void)2976 static int __init rtrs_client_init(void)
2977 {
2978 rtrs_rdma_dev_pd_init(0, &dev_pd);
2979
2980 rtrs_clt_dev_class = class_create(THIS_MODULE, "rtrs-client");
2981 if (IS_ERR(rtrs_clt_dev_class)) {
2982 pr_err("Failed to create rtrs-client dev class\n");
2983 return PTR_ERR(rtrs_clt_dev_class);
2984 }
2985 rtrs_wq = alloc_workqueue("rtrs_client_wq", 0, 0);
2986 if (!rtrs_wq) {
2987 class_destroy(rtrs_clt_dev_class);
2988 return -ENOMEM;
2989 }
2990
2991 return 0;
2992 }
2993
rtrs_client_exit(void)2994 static void __exit rtrs_client_exit(void)
2995 {
2996 destroy_workqueue(rtrs_wq);
2997 class_destroy(rtrs_clt_dev_class);
2998 rtrs_rdma_dev_pd_deinit(&dev_pd);
2999 }
3000
3001 module_init(rtrs_client_init);
3002 module_exit(rtrs_client_exit);
3003