1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 drbd_receiver.c
4
5 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6
7 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10
11 */
12
13
14 #include <linux/module.h>
15
16 #include <linux/uaccess.h>
17 #include <net/sock.h>
18
19 #include <linux/drbd.h>
20 #include <linux/fs.h>
21 #include <linux/file.h>
22 #include <linux/in.h>
23 #include <linux/mm.h>
24 #include <linux/memcontrol.h>
25 #include <linux/mm_inline.h>
26 #include <linux/slab.h>
27 #include <uapi/linux/sched/types.h>
28 #include <linux/sched/signal.h>
29 #include <linux/pkt_sched.h>
30 #include <linux/unistd.h>
31 #include <linux/vmalloc.h>
32 #include <linux/random.h>
33 #include <linux/string.h>
34 #include <linux/scatterlist.h>
35 #include <linux/part_stat.h>
36 #include <linux/mempool.h>
37 #include "drbd_int.h"
38 #include "drbd_protocol.h"
39 #include "drbd_req.h"
40 #include "drbd_vli.h"
41
42 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
43
44 struct packet_info {
45 enum drbd_packet cmd;
46 unsigned int size;
47 unsigned int vnr;
48 void *data;
49 };
50
51 enum finish_epoch {
52 FE_STILL_LIVE,
53 FE_DESTROYED,
54 FE_RECYCLED,
55 };
56
57 static int drbd_do_features(struct drbd_connection *connection);
58 static int drbd_do_auth(struct drbd_connection *connection);
59 static int drbd_disconnected(struct drbd_peer_device *);
60 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
61 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
62 static int e_end_block(struct drbd_work *, int);
63
64
65 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
66
__drbd_alloc_pages(unsigned int number)67 static struct page *__drbd_alloc_pages(unsigned int number)
68 {
69 struct page *page = NULL;
70 struct page *tmp = NULL;
71 unsigned int i = 0;
72
73 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
74 * "criss-cross" setup, that might cause write-out on some other DRBD,
75 * which in turn might block on the other node at this very place. */
76 for (i = 0; i < number; i++) {
77 tmp = mempool_alloc(&drbd_buffer_page_pool, GFP_TRY);
78 if (!tmp)
79 goto fail;
80 set_page_private(tmp, (unsigned long)page);
81 page = tmp;
82 }
83 return page;
84 fail:
85 page_chain_for_each_safe(page, tmp) {
86 set_page_private(page, 0);
87 mempool_free(page, &drbd_buffer_page_pool);
88 }
89 return NULL;
90 }
91
92 /**
93 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
94 * @peer_device: DRBD device.
95 * @number: number of pages requested
96 * @retry: whether to retry, if not enough pages are available right now
97 *
98 * Tries to allocate number pages, first from our own page pool, then from
99 * the kernel.
100 * Possibly retry until DRBD frees sufficient pages somewhere else.
101 *
102 * If this allocation would exceed the max_buffers setting, we throttle
103 * allocation (schedule_timeout) to give the system some room to breathe.
104 *
105 * We do not use max-buffers as hard limit, because it could lead to
106 * congestion and further to a distributed deadlock during online-verify or
107 * (checksum based) resync, if the max-buffers, socket buffer sizes and
108 * resync-rate settings are mis-configured.
109 *
110 * Returns a page chain linked via page->private.
111 */
drbd_alloc_pages(struct drbd_peer_device * peer_device,unsigned int number,bool retry)112 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
113 bool retry)
114 {
115 struct drbd_device *device = peer_device->device;
116 struct page *page;
117 struct net_conf *nc;
118 unsigned int mxb;
119
120 rcu_read_lock();
121 nc = rcu_dereference(peer_device->connection->net_conf);
122 mxb = nc ? nc->max_buffers : 1000000;
123 rcu_read_unlock();
124
125 if (atomic_read(&device->pp_in_use) >= mxb)
126 schedule_timeout_interruptible(HZ / 10);
127 page = __drbd_alloc_pages(number);
128
129 if (page)
130 atomic_add(number, &device->pp_in_use);
131 return page;
132 }
133
134 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
135 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
136 * Either links the page chain back to the global pool,
137 * or returns all pages to the system. */
drbd_free_pages(struct drbd_device * device,struct page * page)138 static void drbd_free_pages(struct drbd_device *device, struct page *page)
139 {
140 struct page *tmp;
141 int i = 0;
142
143 if (page == NULL)
144 return;
145
146 page_chain_for_each_safe(page, tmp) {
147 set_page_private(page, 0);
148 if (page_count(page) == 1)
149 mempool_free(page, &drbd_buffer_page_pool);
150 else
151 put_page(page);
152 i++;
153 }
154 i = atomic_sub_return(i, &device->pp_in_use);
155 if (i < 0)
156 drbd_warn(device, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
157 }
158
159 /*
160 You need to hold the req_lock:
161 _drbd_wait_ee_list_empty()
162
163 You must not have the req_lock:
164 drbd_free_peer_req()
165 drbd_alloc_peer_req()
166 drbd_free_peer_reqs()
167 drbd_ee_fix_bhs()
168 drbd_finish_peer_reqs()
169 drbd_clear_done_ee()
170 drbd_wait_ee_list_empty()
171 */
172
173 /* normal: payload_size == request size (bi_size)
174 * w_same: payload_size == logical_block_size
175 * trim: payload_size == 0 */
176 struct drbd_peer_request *
drbd_alloc_peer_req(struct drbd_peer_device * peer_device,u64 id,sector_t sector,unsigned int request_size,unsigned int payload_size,gfp_t gfp_mask)177 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
178 unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
179 {
180 struct drbd_device *device = peer_device->device;
181 struct drbd_peer_request *peer_req;
182 struct page *page = NULL;
183 unsigned int nr_pages = PFN_UP(payload_size);
184
185 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
186 return NULL;
187
188 peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
189 if (!peer_req) {
190 if (!(gfp_mask & __GFP_NOWARN))
191 drbd_err(device, "%s: allocation failed\n", __func__);
192 return NULL;
193 }
194
195 if (nr_pages) {
196 page = drbd_alloc_pages(peer_device, nr_pages,
197 gfpflags_allow_blocking(gfp_mask));
198 if (!page)
199 goto fail;
200 if (!mempool_is_saturated(&drbd_buffer_page_pool))
201 peer_req->flags |= EE_RELEASE_TO_MEMPOOL;
202 }
203
204 memset(peer_req, 0, sizeof(*peer_req));
205 INIT_LIST_HEAD(&peer_req->w.list);
206 drbd_clear_interval(&peer_req->i);
207 peer_req->i.size = request_size;
208 peer_req->i.sector = sector;
209 peer_req->submit_jif = jiffies;
210 peer_req->peer_device = peer_device;
211 peer_req->pages = page;
212 /*
213 * The block_id is opaque to the receiver. It is not endianness
214 * converted, and sent back to the sender unchanged.
215 */
216 peer_req->block_id = id;
217
218 return peer_req;
219
220 fail:
221 mempool_free(peer_req, &drbd_ee_mempool);
222 return NULL;
223 }
224
drbd_free_peer_req(struct drbd_device * device,struct drbd_peer_request * peer_req)225 void drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req)
226 {
227 might_sleep();
228 if (peer_req->flags & EE_HAS_DIGEST)
229 kfree(peer_req->digest);
230 drbd_free_pages(device, peer_req->pages);
231 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
232 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
233 if (!expect(device, !(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
234 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
235 drbd_al_complete_io(device, &peer_req->i);
236 }
237 mempool_free(peer_req, &drbd_ee_mempool);
238 }
239
drbd_free_peer_reqs(struct drbd_device * device,struct list_head * list)240 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
241 {
242 LIST_HEAD(work_list);
243 struct drbd_peer_request *peer_req, *t;
244 int count = 0;
245
246 spin_lock_irq(&device->resource->req_lock);
247 list_splice_init(list, &work_list);
248 spin_unlock_irq(&device->resource->req_lock);
249
250 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
251 drbd_free_peer_req(device, peer_req);
252 count++;
253 }
254 return count;
255 }
256
257 /*
258 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
259 */
drbd_finish_peer_reqs(struct drbd_device * device)260 static int drbd_finish_peer_reqs(struct drbd_device *device)
261 {
262 LIST_HEAD(work_list);
263 struct drbd_peer_request *peer_req, *t;
264 int err = 0;
265
266 spin_lock_irq(&device->resource->req_lock);
267 list_splice_init(&device->done_ee, &work_list);
268 spin_unlock_irq(&device->resource->req_lock);
269
270 /* possible callbacks here:
271 * e_end_block, and e_end_resync_block, e_send_superseded.
272 * all ignore the last argument.
273 */
274 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
275 int err2;
276
277 /* list_del not necessary, next/prev members not touched */
278 err2 = peer_req->w.cb(&peer_req->w, !!err);
279 if (!err)
280 err = err2;
281 drbd_free_peer_req(device, peer_req);
282 }
283 wake_up(&device->ee_wait);
284
285 return err;
286 }
287
_drbd_wait_ee_list_empty(struct drbd_device * device,struct list_head * head)288 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
289 struct list_head *head)
290 {
291 DEFINE_WAIT(wait);
292
293 /* avoids spin_lock/unlock
294 * and calling prepare_to_wait in the fast path */
295 while (!list_empty(head)) {
296 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
297 spin_unlock_irq(&device->resource->req_lock);
298 io_schedule();
299 finish_wait(&device->ee_wait, &wait);
300 spin_lock_irq(&device->resource->req_lock);
301 }
302 }
303
drbd_wait_ee_list_empty(struct drbd_device * device,struct list_head * head)304 static void drbd_wait_ee_list_empty(struct drbd_device *device,
305 struct list_head *head)
306 {
307 spin_lock_irq(&device->resource->req_lock);
308 _drbd_wait_ee_list_empty(device, head);
309 spin_unlock_irq(&device->resource->req_lock);
310 }
311
drbd_recv_short(struct socket * sock,void * buf,size_t size,int flags)312 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
313 {
314 struct kvec iov = {
315 .iov_base = buf,
316 .iov_len = size,
317 };
318 struct msghdr msg = {
319 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
320 };
321 iov_iter_kvec(&msg.msg_iter, ITER_DEST, &iov, 1, size);
322 return sock_recvmsg(sock, &msg, msg.msg_flags);
323 }
324
drbd_recv(struct drbd_connection * connection,void * buf,size_t size)325 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
326 {
327 int rv;
328
329 rv = drbd_recv_short(connection->data.socket, buf, size, 0);
330
331 if (rv < 0) {
332 if (rv == -ECONNRESET)
333 drbd_info(connection, "sock was reset by peer\n");
334 else if (rv != -ERESTARTSYS)
335 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
336 } else if (rv == 0) {
337 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
338 long t;
339 rcu_read_lock();
340 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
341 rcu_read_unlock();
342
343 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
344
345 if (t)
346 goto out;
347 }
348 drbd_info(connection, "sock was shut down by peer\n");
349 }
350
351 if (rv != size)
352 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
353
354 out:
355 return rv;
356 }
357
drbd_recv_all(struct drbd_connection * connection,void * buf,size_t size)358 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
359 {
360 int err;
361
362 err = drbd_recv(connection, buf, size);
363 if (err != size) {
364 if (err >= 0)
365 err = -EIO;
366 } else
367 err = 0;
368 return err;
369 }
370
drbd_recv_all_warn(struct drbd_connection * connection,void * buf,size_t size)371 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
372 {
373 int err;
374
375 err = drbd_recv_all(connection, buf, size);
376 if (err && !signal_pending(current))
377 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
378 return err;
379 }
380
381 /* quoting tcp(7):
382 * On individual connections, the socket buffer size must be set prior to the
383 * listen(2) or connect(2) calls in order to have it take effect.
384 * This is our wrapper to do so.
385 */
drbd_setbufsize(struct socket * sock,unsigned int snd,unsigned int rcv)386 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
387 unsigned int rcv)
388 {
389 /* open coded SO_SNDBUF, SO_RCVBUF */
390 if (snd) {
391 sock->sk->sk_sndbuf = snd;
392 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
393 }
394 if (rcv) {
395 sock->sk->sk_rcvbuf = rcv;
396 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
397 }
398 }
399
drbd_try_connect(struct drbd_connection * connection)400 static struct socket *drbd_try_connect(struct drbd_connection *connection)
401 {
402 const char *what;
403 struct socket *sock;
404 struct sockaddr_in6 src_in6;
405 struct sockaddr_in6 peer_in6;
406 struct net_conf *nc;
407 int err, peer_addr_len, my_addr_len;
408 int sndbuf_size, rcvbuf_size, connect_int;
409 int disconnect_on_error = 1;
410
411 rcu_read_lock();
412 nc = rcu_dereference(connection->net_conf);
413 if (!nc) {
414 rcu_read_unlock();
415 return NULL;
416 }
417 sndbuf_size = nc->sndbuf_size;
418 rcvbuf_size = nc->rcvbuf_size;
419 connect_int = nc->connect_int;
420 rcu_read_unlock();
421
422 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
423 memcpy(&src_in6, &connection->my_addr, my_addr_len);
424
425 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
426 src_in6.sin6_port = 0;
427 else
428 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
429
430 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
431 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
432
433 what = "sock_create_kern";
434 err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
435 SOCK_STREAM, IPPROTO_TCP, &sock);
436 if (err < 0) {
437 sock = NULL;
438 goto out;
439 }
440
441 sock->sk->sk_rcvtimeo =
442 sock->sk->sk_sndtimeo = connect_int * HZ;
443 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
444
445 /* explicitly bind to the configured IP as source IP
446 * for the outgoing connections.
447 * This is needed for multihomed hosts and to be
448 * able to use lo: interfaces for drbd.
449 * Make sure to use 0 as port number, so linux selects
450 * a free one dynamically.
451 */
452 what = "bind before connect";
453 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
454 if (err < 0)
455 goto out;
456
457 /* connect may fail, peer not yet available.
458 * stay C_WF_CONNECTION, don't go Disconnecting! */
459 disconnect_on_error = 0;
460 what = "connect";
461 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
462
463 out:
464 if (err < 0) {
465 if (sock) {
466 sock_release(sock);
467 sock = NULL;
468 }
469 switch (-err) {
470 /* timeout, busy, signal pending */
471 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
472 case EINTR: case ERESTARTSYS:
473 /* peer not (yet) available, network problem */
474 case ECONNREFUSED: case ENETUNREACH:
475 case EHOSTDOWN: case EHOSTUNREACH:
476 disconnect_on_error = 0;
477 break;
478 default:
479 drbd_err(connection, "%s failed, err = %d\n", what, err);
480 }
481 if (disconnect_on_error)
482 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
483 }
484
485 return sock;
486 }
487
488 struct accept_wait_data {
489 struct drbd_connection *connection;
490 struct socket *s_listen;
491 struct completion door_bell;
492 void (*original_sk_state_change)(struct sock *sk);
493
494 };
495
drbd_incoming_connection(struct sock * sk)496 static void drbd_incoming_connection(struct sock *sk)
497 {
498 struct accept_wait_data *ad = sk->sk_user_data;
499 void (*state_change)(struct sock *sk);
500
501 state_change = ad->original_sk_state_change;
502 if (sk->sk_state == TCP_ESTABLISHED)
503 complete(&ad->door_bell);
504 state_change(sk);
505 }
506
prepare_listen_socket(struct drbd_connection * connection,struct accept_wait_data * ad)507 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
508 {
509 int err, sndbuf_size, rcvbuf_size, my_addr_len;
510 struct sockaddr_in6 my_addr;
511 struct socket *s_listen;
512 struct net_conf *nc;
513 const char *what;
514
515 rcu_read_lock();
516 nc = rcu_dereference(connection->net_conf);
517 if (!nc) {
518 rcu_read_unlock();
519 return -EIO;
520 }
521 sndbuf_size = nc->sndbuf_size;
522 rcvbuf_size = nc->rcvbuf_size;
523 rcu_read_unlock();
524
525 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
526 memcpy(&my_addr, &connection->my_addr, my_addr_len);
527
528 what = "sock_create_kern";
529 err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
530 SOCK_STREAM, IPPROTO_TCP, &s_listen);
531 if (err) {
532 s_listen = NULL;
533 goto out;
534 }
535
536 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
537 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
538
539 what = "bind before listen";
540 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
541 if (err < 0)
542 goto out;
543
544 ad->s_listen = s_listen;
545 write_lock_bh(&s_listen->sk->sk_callback_lock);
546 ad->original_sk_state_change = s_listen->sk->sk_state_change;
547 s_listen->sk->sk_state_change = drbd_incoming_connection;
548 s_listen->sk->sk_user_data = ad;
549 write_unlock_bh(&s_listen->sk->sk_callback_lock);
550
551 what = "listen";
552 err = s_listen->ops->listen(s_listen, 5);
553 if (err < 0)
554 goto out;
555
556 return 0;
557 out:
558 if (s_listen)
559 sock_release(s_listen);
560 if (err < 0) {
561 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
562 drbd_err(connection, "%s failed, err = %d\n", what, err);
563 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
564 }
565 }
566
567 return -EIO;
568 }
569
unregister_state_change(struct sock * sk,struct accept_wait_data * ad)570 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
571 {
572 write_lock_bh(&sk->sk_callback_lock);
573 sk->sk_state_change = ad->original_sk_state_change;
574 sk->sk_user_data = NULL;
575 write_unlock_bh(&sk->sk_callback_lock);
576 }
577
drbd_wait_for_connect(struct drbd_connection * connection,struct accept_wait_data * ad)578 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
579 {
580 int timeo, connect_int, err = 0;
581 struct socket *s_estab = NULL;
582 struct net_conf *nc;
583
584 rcu_read_lock();
585 nc = rcu_dereference(connection->net_conf);
586 if (!nc) {
587 rcu_read_unlock();
588 return NULL;
589 }
590 connect_int = nc->connect_int;
591 rcu_read_unlock();
592
593 timeo = connect_int * HZ;
594 /* 28.5% random jitter */
595 timeo += get_random_u32_below(2) ? timeo / 7 : -timeo / 7;
596
597 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
598 if (err <= 0)
599 return NULL;
600
601 err = kernel_accept(ad->s_listen, &s_estab, 0);
602 if (err < 0) {
603 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
604 drbd_err(connection, "accept failed, err = %d\n", err);
605 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
606 }
607 }
608
609 if (s_estab)
610 unregister_state_change(s_estab->sk, ad);
611
612 return s_estab;
613 }
614
615 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
616
send_first_packet(struct drbd_connection * connection,struct drbd_socket * sock,enum drbd_packet cmd)617 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
618 enum drbd_packet cmd)
619 {
620 if (!conn_prepare_command(connection, sock))
621 return -EIO;
622 return conn_send_command(connection, sock, cmd, 0, NULL, 0);
623 }
624
receive_first_packet(struct drbd_connection * connection,struct socket * sock)625 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
626 {
627 unsigned int header_size = drbd_header_size(connection);
628 struct packet_info pi;
629 struct net_conf *nc;
630 int err;
631
632 rcu_read_lock();
633 nc = rcu_dereference(connection->net_conf);
634 if (!nc) {
635 rcu_read_unlock();
636 return -EIO;
637 }
638 sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
639 rcu_read_unlock();
640
641 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
642 if (err != header_size) {
643 if (err >= 0)
644 err = -EIO;
645 return err;
646 }
647 err = decode_header(connection, connection->data.rbuf, &pi);
648 if (err)
649 return err;
650 return pi.cmd;
651 }
652
653 /**
654 * drbd_socket_okay() - Free the socket if its connection is not okay
655 * @sock: pointer to the pointer to the socket.
656 */
drbd_socket_okay(struct socket ** sock)657 static bool drbd_socket_okay(struct socket **sock)
658 {
659 int rr;
660 char tb[4];
661
662 if (!*sock)
663 return false;
664
665 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
666
667 if (rr > 0 || rr == -EAGAIN) {
668 return true;
669 } else {
670 sock_release(*sock);
671 *sock = NULL;
672 return false;
673 }
674 }
675
connection_established(struct drbd_connection * connection,struct socket ** sock1,struct socket ** sock2)676 static bool connection_established(struct drbd_connection *connection,
677 struct socket **sock1,
678 struct socket **sock2)
679 {
680 struct net_conf *nc;
681 int timeout;
682 bool ok;
683
684 if (!*sock1 || !*sock2)
685 return false;
686
687 rcu_read_lock();
688 nc = rcu_dereference(connection->net_conf);
689 timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
690 rcu_read_unlock();
691 schedule_timeout_interruptible(timeout);
692
693 ok = drbd_socket_okay(sock1);
694 ok = drbd_socket_okay(sock2) && ok;
695
696 return ok;
697 }
698
699 /* Gets called if a connection is established, or if a new minor gets created
700 in a connection */
drbd_connected(struct drbd_peer_device * peer_device)701 int drbd_connected(struct drbd_peer_device *peer_device)
702 {
703 struct drbd_device *device = peer_device->device;
704 int err;
705
706 atomic_set(&device->packet_seq, 0);
707 device->peer_seq = 0;
708
709 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
710 &peer_device->connection->cstate_mutex :
711 &device->own_state_mutex;
712
713 err = drbd_send_sync_param(peer_device);
714 if (!err)
715 err = drbd_send_sizes(peer_device, 0, 0);
716 if (!err)
717 err = drbd_send_uuids(peer_device);
718 if (!err)
719 err = drbd_send_current_state(peer_device);
720 clear_bit(USE_DEGR_WFC_T, &device->flags);
721 clear_bit(RESIZE_PENDING, &device->flags);
722 atomic_set(&device->ap_in_flight, 0);
723 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
724 return err;
725 }
726
727 /*
728 * return values:
729 * 1 yes, we have a valid connection
730 * 0 oops, did not work out, please try again
731 * -1 peer talks different language,
732 * no point in trying again, please go standalone.
733 * -2 We do not have a network config...
734 */
conn_connect(struct drbd_connection * connection)735 static int conn_connect(struct drbd_connection *connection)
736 {
737 struct drbd_socket sock, msock;
738 struct drbd_peer_device *peer_device;
739 struct net_conf *nc;
740 int vnr, timeout, h;
741 bool discard_my_data, ok;
742 enum drbd_state_rv rv;
743 struct accept_wait_data ad = {
744 .connection = connection,
745 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
746 };
747
748 clear_bit(DISCONNECT_SENT, &connection->flags);
749 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
750 return -2;
751
752 mutex_init(&sock.mutex);
753 sock.sbuf = connection->data.sbuf;
754 sock.rbuf = connection->data.rbuf;
755 sock.socket = NULL;
756 mutex_init(&msock.mutex);
757 msock.sbuf = connection->meta.sbuf;
758 msock.rbuf = connection->meta.rbuf;
759 msock.socket = NULL;
760
761 /* Assume that the peer only understands protocol 80 until we know better. */
762 connection->agreed_pro_version = 80;
763
764 if (prepare_listen_socket(connection, &ad))
765 return 0;
766
767 do {
768 struct socket *s;
769
770 s = drbd_try_connect(connection);
771 if (s) {
772 if (!sock.socket) {
773 sock.socket = s;
774 send_first_packet(connection, &sock, P_INITIAL_DATA);
775 } else if (!msock.socket) {
776 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
777 msock.socket = s;
778 send_first_packet(connection, &msock, P_INITIAL_META);
779 } else {
780 drbd_err(connection, "Logic error in conn_connect()\n");
781 goto out_release_sockets;
782 }
783 }
784
785 if (connection_established(connection, &sock.socket, &msock.socket))
786 break;
787
788 retry:
789 s = drbd_wait_for_connect(connection, &ad);
790 if (s) {
791 int fp = receive_first_packet(connection, s);
792 drbd_socket_okay(&sock.socket);
793 drbd_socket_okay(&msock.socket);
794 switch (fp) {
795 case P_INITIAL_DATA:
796 if (sock.socket) {
797 drbd_warn(connection, "initial packet S crossed\n");
798 sock_release(sock.socket);
799 sock.socket = s;
800 goto randomize;
801 }
802 sock.socket = s;
803 break;
804 case P_INITIAL_META:
805 set_bit(RESOLVE_CONFLICTS, &connection->flags);
806 if (msock.socket) {
807 drbd_warn(connection, "initial packet M crossed\n");
808 sock_release(msock.socket);
809 msock.socket = s;
810 goto randomize;
811 }
812 msock.socket = s;
813 break;
814 default:
815 drbd_warn(connection, "Error receiving initial packet\n");
816 sock_release(s);
817 randomize:
818 if (get_random_u32_below(2))
819 goto retry;
820 }
821 }
822
823 if (connection->cstate <= C_DISCONNECTING)
824 goto out_release_sockets;
825 if (signal_pending(current)) {
826 flush_signals(current);
827 smp_rmb();
828 if (get_t_state(&connection->receiver) == EXITING)
829 goto out_release_sockets;
830 }
831
832 ok = connection_established(connection, &sock.socket, &msock.socket);
833 } while (!ok);
834
835 if (ad.s_listen)
836 sock_release(ad.s_listen);
837
838 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
839 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
840
841 sock.socket->sk->sk_allocation = GFP_NOIO;
842 msock.socket->sk->sk_allocation = GFP_NOIO;
843
844 sock.socket->sk->sk_use_task_frag = false;
845 msock.socket->sk->sk_use_task_frag = false;
846
847 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
848 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
849
850 /* NOT YET ...
851 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
852 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
853 * first set it to the P_CONNECTION_FEATURES timeout,
854 * which we set to 4x the configured ping_timeout. */
855 rcu_read_lock();
856 nc = rcu_dereference(connection->net_conf);
857
858 sock.socket->sk->sk_sndtimeo =
859 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
860
861 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
862 timeout = nc->timeout * HZ / 10;
863 discard_my_data = nc->discard_my_data;
864 rcu_read_unlock();
865
866 msock.socket->sk->sk_sndtimeo = timeout;
867
868 /* we don't want delays.
869 * we use TCP_CORK where appropriate, though */
870 tcp_sock_set_nodelay(sock.socket->sk);
871 tcp_sock_set_nodelay(msock.socket->sk);
872
873 connection->data.socket = sock.socket;
874 connection->meta.socket = msock.socket;
875 connection->last_received = jiffies;
876
877 h = drbd_do_features(connection);
878 if (h <= 0)
879 return h;
880
881 if (connection->cram_hmac_tfm) {
882 /* drbd_request_state(device, NS(conn, WFAuth)); */
883 switch (drbd_do_auth(connection)) {
884 case -1:
885 drbd_err(connection, "Authentication of peer failed\n");
886 return -1;
887 case 0:
888 drbd_err(connection, "Authentication of peer failed, trying again.\n");
889 return 0;
890 }
891 }
892
893 connection->data.socket->sk->sk_sndtimeo = timeout;
894 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
895
896 if (drbd_send_protocol(connection) == -EOPNOTSUPP)
897 return -1;
898
899 /* Prevent a race between resync-handshake and
900 * being promoted to Primary.
901 *
902 * Grab and release the state mutex, so we know that any current
903 * drbd_set_role() is finished, and any incoming drbd_set_role
904 * will see the STATE_SENT flag, and wait for it to be cleared.
905 */
906 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
907 mutex_lock(peer_device->device->state_mutex);
908
909 /* avoid a race with conn_request_state( C_DISCONNECTING ) */
910 spin_lock_irq(&connection->resource->req_lock);
911 set_bit(STATE_SENT, &connection->flags);
912 spin_unlock_irq(&connection->resource->req_lock);
913
914 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
915 mutex_unlock(peer_device->device->state_mutex);
916
917 rcu_read_lock();
918 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
919 struct drbd_device *device = peer_device->device;
920 kref_get(&device->kref);
921 rcu_read_unlock();
922
923 if (discard_my_data)
924 set_bit(DISCARD_MY_DATA, &device->flags);
925 else
926 clear_bit(DISCARD_MY_DATA, &device->flags);
927
928 drbd_connected(peer_device);
929 kref_put(&device->kref, drbd_destroy_device);
930 rcu_read_lock();
931 }
932 rcu_read_unlock();
933
934 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
935 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
936 clear_bit(STATE_SENT, &connection->flags);
937 return 0;
938 }
939
940 drbd_thread_start(&connection->ack_receiver);
941 /* opencoded create_singlethread_workqueue(),
942 * to be able to use format string arguments */
943 connection->ack_sender =
944 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
945 if (!connection->ack_sender) {
946 drbd_err(connection, "Failed to create workqueue ack_sender\n");
947 return 0;
948 }
949
950 mutex_lock(&connection->resource->conf_update);
951 /* The discard_my_data flag is a single-shot modifier to the next
952 * connection attempt, the handshake of which is now well underway.
953 * No need for rcu style copying of the whole struct
954 * just to clear a single value. */
955 connection->net_conf->discard_my_data = 0;
956 mutex_unlock(&connection->resource->conf_update);
957
958 return h;
959
960 out_release_sockets:
961 if (ad.s_listen)
962 sock_release(ad.s_listen);
963 if (sock.socket)
964 sock_release(sock.socket);
965 if (msock.socket)
966 sock_release(msock.socket);
967 return -1;
968 }
969
decode_header(struct drbd_connection * connection,void * header,struct packet_info * pi)970 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
971 {
972 unsigned int header_size = drbd_header_size(connection);
973
974 if (header_size == sizeof(struct p_header100) &&
975 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
976 struct p_header100 *h = header;
977 if (h->pad != 0) {
978 drbd_err(connection, "Header padding is not zero\n");
979 return -EINVAL;
980 }
981 pi->vnr = be16_to_cpu(h->volume);
982 pi->cmd = be16_to_cpu(h->command);
983 pi->size = be32_to_cpu(h->length);
984 } else if (header_size == sizeof(struct p_header95) &&
985 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
986 struct p_header95 *h = header;
987 pi->cmd = be16_to_cpu(h->command);
988 pi->size = be32_to_cpu(h->length);
989 pi->vnr = 0;
990 } else if (header_size == sizeof(struct p_header80) &&
991 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
992 struct p_header80 *h = header;
993 pi->cmd = be16_to_cpu(h->command);
994 pi->size = be16_to_cpu(h->length);
995 pi->vnr = 0;
996 } else {
997 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
998 be32_to_cpu(*(__be32 *)header),
999 connection->agreed_pro_version);
1000 return -EINVAL;
1001 }
1002 pi->data = header + header_size;
1003 return 0;
1004 }
1005
drbd_unplug_all_devices(struct drbd_connection * connection)1006 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1007 {
1008 if (current->plug == &connection->receiver_plug) {
1009 blk_finish_plug(&connection->receiver_plug);
1010 blk_start_plug(&connection->receiver_plug);
1011 } /* else: maybe just schedule() ?? */
1012 }
1013
drbd_recv_header(struct drbd_connection * connection,struct packet_info * pi)1014 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1015 {
1016 void *buffer = connection->data.rbuf;
1017 int err;
1018
1019 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1020 if (err)
1021 return err;
1022
1023 err = decode_header(connection, buffer, pi);
1024 connection->last_received = jiffies;
1025
1026 return err;
1027 }
1028
drbd_recv_header_maybe_unplug(struct drbd_connection * connection,struct packet_info * pi)1029 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1030 {
1031 void *buffer = connection->data.rbuf;
1032 unsigned int size = drbd_header_size(connection);
1033 int err;
1034
1035 err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1036 if (err != size) {
1037 /* If we have nothing in the receive buffer now, to reduce
1038 * application latency, try to drain the backend queues as
1039 * quickly as possible, and let remote TCP know what we have
1040 * received so far. */
1041 if (err == -EAGAIN) {
1042 tcp_sock_set_quickack(connection->data.socket->sk, 2);
1043 drbd_unplug_all_devices(connection);
1044 }
1045 if (err > 0) {
1046 buffer += err;
1047 size -= err;
1048 }
1049 err = drbd_recv_all_warn(connection, buffer, size);
1050 if (err)
1051 return err;
1052 }
1053
1054 err = decode_header(connection, connection->data.rbuf, pi);
1055 connection->last_received = jiffies;
1056
1057 return err;
1058 }
1059 /* This is blkdev_issue_flush, but asynchronous.
1060 * We want to submit to all component volumes in parallel,
1061 * then wait for all completions.
1062 */
1063 struct issue_flush_context {
1064 atomic_t pending;
1065 int error;
1066 struct completion done;
1067 };
1068 struct one_flush_context {
1069 struct drbd_device *device;
1070 struct issue_flush_context *ctx;
1071 };
1072
one_flush_endio(struct bio * bio)1073 static void one_flush_endio(struct bio *bio)
1074 {
1075 struct one_flush_context *octx = bio->bi_private;
1076 struct drbd_device *device = octx->device;
1077 struct issue_flush_context *ctx = octx->ctx;
1078
1079 if (bio->bi_status) {
1080 ctx->error = blk_status_to_errno(bio->bi_status);
1081 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1082 }
1083 kfree(octx);
1084 bio_put(bio);
1085
1086 clear_bit(FLUSH_PENDING, &device->flags);
1087 put_ldev(device);
1088 kref_put(&device->kref, drbd_destroy_device);
1089
1090 if (atomic_dec_and_test(&ctx->pending))
1091 complete(&ctx->done);
1092 }
1093
submit_one_flush(struct drbd_device * device,struct issue_flush_context * ctx)1094 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1095 {
1096 struct bio *bio = bio_alloc(device->ldev->backing_bdev, 0,
1097 REQ_OP_WRITE | REQ_PREFLUSH, GFP_NOIO);
1098 struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1099
1100 if (!octx) {
1101 drbd_warn(device, "Could not allocate a octx, CANNOT ISSUE FLUSH\n");
1102 /* FIXME: what else can I do now? disconnecting or detaching
1103 * really does not help to improve the state of the world, either.
1104 */
1105 bio_put(bio);
1106
1107 ctx->error = -ENOMEM;
1108 put_ldev(device);
1109 kref_put(&device->kref, drbd_destroy_device);
1110 return;
1111 }
1112
1113 octx->device = device;
1114 octx->ctx = ctx;
1115 bio->bi_private = octx;
1116 bio->bi_end_io = one_flush_endio;
1117
1118 device->flush_jif = jiffies;
1119 set_bit(FLUSH_PENDING, &device->flags);
1120 atomic_inc(&ctx->pending);
1121 submit_bio(bio);
1122 }
1123
drbd_flush(struct drbd_connection * connection)1124 static void drbd_flush(struct drbd_connection *connection)
1125 {
1126 if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1127 struct drbd_peer_device *peer_device;
1128 struct issue_flush_context ctx;
1129 int vnr;
1130
1131 atomic_set(&ctx.pending, 1);
1132 ctx.error = 0;
1133 init_completion(&ctx.done);
1134
1135 rcu_read_lock();
1136 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1137 struct drbd_device *device = peer_device->device;
1138
1139 if (!get_ldev(device))
1140 continue;
1141 kref_get(&device->kref);
1142 rcu_read_unlock();
1143
1144 submit_one_flush(device, &ctx);
1145
1146 rcu_read_lock();
1147 }
1148 rcu_read_unlock();
1149
1150 /* Do we want to add a timeout,
1151 * if disk-timeout is set? */
1152 if (!atomic_dec_and_test(&ctx.pending))
1153 wait_for_completion(&ctx.done);
1154
1155 if (ctx.error) {
1156 /* would rather check on EOPNOTSUPP, but that is not reliable.
1157 * don't try again for ANY return value != 0
1158 * if (rv == -EOPNOTSUPP) */
1159 /* Any error is already reported by bio_endio callback. */
1160 drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1161 }
1162 }
1163 }
1164
1165 /**
1166 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1167 * @connection: DRBD connection.
1168 * @epoch: Epoch object.
1169 * @ev: Epoch event.
1170 */
drbd_may_finish_epoch(struct drbd_connection * connection,struct drbd_epoch * epoch,enum epoch_event ev)1171 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1172 struct drbd_epoch *epoch,
1173 enum epoch_event ev)
1174 {
1175 int epoch_size;
1176 struct drbd_epoch *next_epoch;
1177 enum finish_epoch rv = FE_STILL_LIVE;
1178
1179 spin_lock(&connection->epoch_lock);
1180 do {
1181 next_epoch = NULL;
1182
1183 epoch_size = atomic_read(&epoch->epoch_size);
1184
1185 switch (ev & ~EV_CLEANUP) {
1186 case EV_PUT:
1187 atomic_dec(&epoch->active);
1188 break;
1189 case EV_GOT_BARRIER_NR:
1190 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1191 break;
1192 case EV_BECAME_LAST:
1193 /* nothing to do*/
1194 break;
1195 }
1196
1197 if (epoch_size != 0 &&
1198 atomic_read(&epoch->active) == 0 &&
1199 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1200 if (!(ev & EV_CLEANUP)) {
1201 spin_unlock(&connection->epoch_lock);
1202 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1203 spin_lock(&connection->epoch_lock);
1204 }
1205 #if 0
1206 /* FIXME: dec unacked on connection, once we have
1207 * something to count pending connection packets in. */
1208 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1209 dec_unacked(epoch->connection);
1210 #endif
1211
1212 if (connection->current_epoch != epoch) {
1213 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1214 list_del(&epoch->list);
1215 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1216 connection->epochs--;
1217 kfree(epoch);
1218
1219 if (rv == FE_STILL_LIVE)
1220 rv = FE_DESTROYED;
1221 } else {
1222 epoch->flags = 0;
1223 atomic_set(&epoch->epoch_size, 0);
1224 /* atomic_set(&epoch->active, 0); is already zero */
1225 if (rv == FE_STILL_LIVE)
1226 rv = FE_RECYCLED;
1227 }
1228 }
1229
1230 if (!next_epoch)
1231 break;
1232
1233 epoch = next_epoch;
1234 } while (1);
1235
1236 spin_unlock(&connection->epoch_lock);
1237
1238 return rv;
1239 }
1240
1241 static enum write_ordering_e
max_allowed_wo(struct drbd_backing_dev * bdev,enum write_ordering_e wo)1242 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1243 {
1244 struct disk_conf *dc;
1245
1246 dc = rcu_dereference(bdev->disk_conf);
1247
1248 if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1249 wo = WO_DRAIN_IO;
1250 if (wo == WO_DRAIN_IO && !dc->disk_drain)
1251 wo = WO_NONE;
1252
1253 return wo;
1254 }
1255
1256 /*
1257 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1258 * @wo: Write ordering method to try.
1259 */
drbd_bump_write_ordering(struct drbd_resource * resource,struct drbd_backing_dev * bdev,enum write_ordering_e wo)1260 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1261 enum write_ordering_e wo)
1262 {
1263 struct drbd_device *device;
1264 enum write_ordering_e pwo;
1265 int vnr;
1266 static char *write_ordering_str[] = {
1267 [WO_NONE] = "none",
1268 [WO_DRAIN_IO] = "drain",
1269 [WO_BDEV_FLUSH] = "flush",
1270 };
1271
1272 pwo = resource->write_ordering;
1273 if (wo != WO_BDEV_FLUSH)
1274 wo = min(pwo, wo);
1275 rcu_read_lock();
1276 idr_for_each_entry(&resource->devices, device, vnr) {
1277 if (get_ldev(device)) {
1278 wo = max_allowed_wo(device->ldev, wo);
1279 if (device->ldev == bdev)
1280 bdev = NULL;
1281 put_ldev(device);
1282 }
1283 }
1284
1285 if (bdev)
1286 wo = max_allowed_wo(bdev, wo);
1287
1288 rcu_read_unlock();
1289
1290 resource->write_ordering = wo;
1291 if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1292 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1293 }
1294
1295 /*
1296 * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1297 * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1298 * will directly go to fallback mode, submitting normal writes, and
1299 * never even try to UNMAP.
1300 *
1301 * And dm-thin does not do this (yet), mostly because in general it has
1302 * to assume that "skip_block_zeroing" is set. See also:
1303 * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1304 * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1305 *
1306 * We *may* ignore the discard-zeroes-data setting, if so configured.
1307 *
1308 * Assumption is that this "discard_zeroes_data=0" is only because the backend
1309 * may ignore partial unaligned discards.
1310 *
1311 * LVM/DM thin as of at least
1312 * LVM version: 2.02.115(2)-RHEL7 (2015-01-28)
1313 * Library version: 1.02.93-RHEL7 (2015-01-28)
1314 * Driver version: 4.29.0
1315 * still behaves this way.
1316 *
1317 * For unaligned (wrt. alignment and granularity) or too small discards,
1318 * we zero-out the initial (and/or) trailing unaligned partial chunks,
1319 * but discard all the aligned full chunks.
1320 *
1321 * At least for LVM/DM thin, with skip_block_zeroing=false,
1322 * the result is effectively "discard_zeroes_data=1".
1323 */
1324 /* flags: EE_TRIM|EE_ZEROOUT */
drbd_issue_discard_or_zero_out(struct drbd_device * device,sector_t start,unsigned int nr_sectors,int flags)1325 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
1326 {
1327 struct block_device *bdev = device->ldev->backing_bdev;
1328 sector_t tmp, nr;
1329 unsigned int max_discard_sectors, granularity;
1330 int alignment;
1331 int err = 0;
1332
1333 if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
1334 goto zero_out;
1335
1336 /* Zero-sector (unknown) and one-sector granularities are the same. */
1337 granularity = max(bdev_discard_granularity(bdev) >> 9, 1U);
1338 alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1339
1340 max_discard_sectors = min(bdev_max_discard_sectors(bdev), (1U << 22));
1341 max_discard_sectors -= max_discard_sectors % granularity;
1342 if (unlikely(!max_discard_sectors))
1343 goto zero_out;
1344
1345 if (nr_sectors < granularity)
1346 goto zero_out;
1347
1348 tmp = start;
1349 if (sector_div(tmp, granularity) != alignment) {
1350 if (nr_sectors < 2*granularity)
1351 goto zero_out;
1352 /* start + gran - (start + gran - align) % gran */
1353 tmp = start + granularity - alignment;
1354 tmp = start + granularity - sector_div(tmp, granularity);
1355
1356 nr = tmp - start;
1357 /* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1358 * layers are below us, some may have smaller granularity */
1359 err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1360 nr_sectors -= nr;
1361 start = tmp;
1362 }
1363 while (nr_sectors >= max_discard_sectors) {
1364 err |= blkdev_issue_discard(bdev, start, max_discard_sectors,
1365 GFP_NOIO);
1366 nr_sectors -= max_discard_sectors;
1367 start += max_discard_sectors;
1368 }
1369 if (nr_sectors) {
1370 /* max_discard_sectors is unsigned int (and a multiple of
1371 * granularity, we made sure of that above already);
1372 * nr is < max_discard_sectors;
1373 * I don't need sector_div here, even though nr is sector_t */
1374 nr = nr_sectors;
1375 nr -= (unsigned int)nr % granularity;
1376 if (nr) {
1377 err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO);
1378 nr_sectors -= nr;
1379 start += nr;
1380 }
1381 }
1382 zero_out:
1383 if (nr_sectors) {
1384 err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
1385 (flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
1386 }
1387 return err != 0;
1388 }
1389
can_do_reliable_discards(struct drbd_device * device)1390 static bool can_do_reliable_discards(struct drbd_device *device)
1391 {
1392 struct disk_conf *dc;
1393 bool can_do;
1394
1395 if (!bdev_max_discard_sectors(device->ldev->backing_bdev))
1396 return false;
1397
1398 rcu_read_lock();
1399 dc = rcu_dereference(device->ldev->disk_conf);
1400 can_do = dc->discard_zeroes_if_aligned;
1401 rcu_read_unlock();
1402 return can_do;
1403 }
1404
drbd_issue_peer_discard_or_zero_out(struct drbd_device * device,struct drbd_peer_request * peer_req)1405 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
1406 {
1407 /* If the backend cannot discard, or does not guarantee
1408 * read-back zeroes in discarded ranges, we fall back to
1409 * zero-out. Unless configuration specifically requested
1410 * otherwise. */
1411 if (!can_do_reliable_discards(device))
1412 peer_req->flags |= EE_ZEROOUT;
1413
1414 if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1415 peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
1416 peer_req->flags |= EE_WAS_ERROR;
1417 drbd_endio_write_sec_final(peer_req);
1418 }
1419
peer_request_fault_type(struct drbd_peer_request * peer_req)1420 static int peer_request_fault_type(struct drbd_peer_request *peer_req)
1421 {
1422 if (peer_req_op(peer_req) == REQ_OP_READ) {
1423 return peer_req->flags & EE_APPLICATION ?
1424 DRBD_FAULT_DT_RD : DRBD_FAULT_RS_RD;
1425 } else {
1426 return peer_req->flags & EE_APPLICATION ?
1427 DRBD_FAULT_DT_WR : DRBD_FAULT_RS_WR;
1428 }
1429 }
1430
1431 /**
1432 * drbd_submit_peer_request()
1433 * @peer_req: peer request
1434 *
1435 * May spread the pages to multiple bios,
1436 * depending on bio_add_page restrictions.
1437 *
1438 * Returns 0 if all bios have been submitted,
1439 * -ENOMEM if we could not allocate enough bios,
1440 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1441 * single page to an empty bio (which should never happen and likely indicates
1442 * that the lower level IO stack is in some way broken). This has been observed
1443 * on certain Xen deployments.
1444 */
1445 /* TODO allocate from our own bio_set. */
drbd_submit_peer_request(struct drbd_peer_request * peer_req)1446 int drbd_submit_peer_request(struct drbd_peer_request *peer_req)
1447 {
1448 struct drbd_device *device = peer_req->peer_device->device;
1449 struct bio *bios = NULL;
1450 struct bio *bio;
1451 struct page *page = peer_req->pages;
1452 sector_t sector = peer_req->i.sector;
1453 unsigned int data_size = peer_req->i.size;
1454 unsigned int n_bios = 0;
1455 unsigned int nr_pages = PFN_UP(data_size);
1456
1457 /* TRIM/DISCARD: for now, always use the helper function
1458 * blkdev_issue_zeroout(..., discard=true).
1459 * It's synchronous, but it does the right thing wrt. bio splitting.
1460 * Correctness first, performance later. Next step is to code an
1461 * asynchronous variant of the same.
1462 */
1463 if (peer_req->flags & (EE_TRIM | EE_ZEROOUT)) {
1464 /* wait for all pending IO completions, before we start
1465 * zeroing things out. */
1466 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1467 /* add it to the active list now,
1468 * so we can find it to present it in debugfs */
1469 peer_req->submit_jif = jiffies;
1470 peer_req->flags |= EE_SUBMITTED;
1471
1472 /* If this was a resync request from receive_rs_deallocated(),
1473 * it is already on the sync_ee list */
1474 if (list_empty(&peer_req->w.list)) {
1475 spin_lock_irq(&device->resource->req_lock);
1476 list_add_tail(&peer_req->w.list, &device->active_ee);
1477 spin_unlock_irq(&device->resource->req_lock);
1478 }
1479
1480 drbd_issue_peer_discard_or_zero_out(device, peer_req);
1481 return 0;
1482 }
1483
1484 /* In most cases, we will only need one bio. But in case the lower
1485 * level restrictions happen to be different at this offset on this
1486 * side than those of the sending peer, we may need to submit the
1487 * request in more than one bio.
1488 *
1489 * Plain bio_alloc is good enough here, this is no DRBD internally
1490 * generated bio, but a bio allocated on behalf of the peer.
1491 */
1492 next_bio:
1493 /* _DISCARD, _WRITE_ZEROES handled above.
1494 * REQ_OP_FLUSH (empty flush) not expected,
1495 * should have been mapped to a "drbd protocol barrier".
1496 * REQ_OP_SECURE_ERASE: I don't see how we could ever support that.
1497 */
1498 if (!(peer_req_op(peer_req) == REQ_OP_WRITE ||
1499 peer_req_op(peer_req) == REQ_OP_READ)) {
1500 drbd_err(device, "Invalid bio op received: 0x%x\n", peer_req->opf);
1501 return -EINVAL;
1502 }
1503
1504 bio = bio_alloc(device->ldev->backing_bdev, nr_pages, peer_req->opf, GFP_NOIO);
1505 /* > peer_req->i.sector, unless this is the first bio */
1506 bio->bi_iter.bi_sector = sector;
1507 bio->bi_private = peer_req;
1508 bio->bi_end_io = drbd_peer_request_endio;
1509
1510 bio->bi_next = bios;
1511 bios = bio;
1512 ++n_bios;
1513
1514 page_chain_for_each(page) {
1515 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1516 if (!bio_add_page(bio, page, len, 0))
1517 goto next_bio;
1518 data_size -= len;
1519 sector += len >> 9;
1520 --nr_pages;
1521 }
1522 D_ASSERT(device, data_size == 0);
1523 D_ASSERT(device, page == NULL);
1524
1525 atomic_set(&peer_req->pending_bios, n_bios);
1526 /* for debugfs: update timestamp, mark as submitted */
1527 peer_req->submit_jif = jiffies;
1528 peer_req->flags |= EE_SUBMITTED;
1529 do {
1530 bio = bios;
1531 bios = bios->bi_next;
1532 bio->bi_next = NULL;
1533
1534 drbd_submit_bio_noacct(device, peer_request_fault_type(peer_req), bio);
1535 } while (bios);
1536 return 0;
1537 }
1538
drbd_remove_epoch_entry_interval(struct drbd_device * device,struct drbd_peer_request * peer_req)1539 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1540 struct drbd_peer_request *peer_req)
1541 {
1542 struct drbd_interval *i = &peer_req->i;
1543
1544 drbd_remove_interval(&device->write_requests, i);
1545 drbd_clear_interval(i);
1546
1547 /* Wake up any processes waiting for this peer request to complete. */
1548 if (i->waiting)
1549 wake_up(&device->misc_wait);
1550 }
1551
conn_wait_active_ee_empty(struct drbd_connection * connection)1552 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1553 {
1554 struct drbd_peer_device *peer_device;
1555 int vnr;
1556
1557 rcu_read_lock();
1558 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1559 struct drbd_device *device = peer_device->device;
1560
1561 kref_get(&device->kref);
1562 rcu_read_unlock();
1563 drbd_wait_ee_list_empty(device, &device->active_ee);
1564 kref_put(&device->kref, drbd_destroy_device);
1565 rcu_read_lock();
1566 }
1567 rcu_read_unlock();
1568 }
1569
receive_Barrier(struct drbd_connection * connection,struct packet_info * pi)1570 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1571 {
1572 int rv;
1573 struct p_barrier *p = pi->data;
1574 struct drbd_epoch *epoch;
1575
1576 /* FIXME these are unacked on connection,
1577 * not a specific (peer)device.
1578 */
1579 connection->current_epoch->barrier_nr = p->barrier;
1580 connection->current_epoch->connection = connection;
1581 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1582
1583 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1584 * the activity log, which means it would not be resynced in case the
1585 * R_PRIMARY crashes now.
1586 * Therefore we must send the barrier_ack after the barrier request was
1587 * completed. */
1588 switch (connection->resource->write_ordering) {
1589 case WO_NONE:
1590 if (rv == FE_RECYCLED)
1591 return 0;
1592
1593 /* receiver context, in the writeout path of the other node.
1594 * avoid potential distributed deadlock */
1595 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1596 if (epoch)
1597 break;
1598 else
1599 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1600 fallthrough;
1601
1602 case WO_BDEV_FLUSH:
1603 case WO_DRAIN_IO:
1604 conn_wait_active_ee_empty(connection);
1605 drbd_flush(connection);
1606
1607 if (atomic_read(&connection->current_epoch->epoch_size)) {
1608 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1609 if (epoch)
1610 break;
1611 }
1612
1613 return 0;
1614 default:
1615 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1616 connection->resource->write_ordering);
1617 return -EIO;
1618 }
1619
1620 epoch->flags = 0;
1621 atomic_set(&epoch->epoch_size, 0);
1622 atomic_set(&epoch->active, 0);
1623
1624 spin_lock(&connection->epoch_lock);
1625 if (atomic_read(&connection->current_epoch->epoch_size)) {
1626 list_add(&epoch->list, &connection->current_epoch->list);
1627 connection->current_epoch = epoch;
1628 connection->epochs++;
1629 } else {
1630 /* The current_epoch got recycled while we allocated this one... */
1631 kfree(epoch);
1632 }
1633 spin_unlock(&connection->epoch_lock);
1634
1635 return 0;
1636 }
1637
1638 /* quick wrapper in case payload size != request_size (write same) */
drbd_csum_ee_size(struct crypto_shash * h,struct drbd_peer_request * r,void * d,unsigned int payload_size)1639 static void drbd_csum_ee_size(struct crypto_shash *h,
1640 struct drbd_peer_request *r, void *d,
1641 unsigned int payload_size)
1642 {
1643 unsigned int tmp = r->i.size;
1644 r->i.size = payload_size;
1645 drbd_csum_ee(h, r, d);
1646 r->i.size = tmp;
1647 }
1648
1649 /* used from receive_RSDataReply (recv_resync_read)
1650 * and from receive_Data.
1651 * data_size: actual payload ("data in")
1652 * for normal writes that is bi_size.
1653 * for discards, that is zero.
1654 * for write same, it is logical_block_size.
1655 * both trim and write same have the bi_size ("data len to be affected")
1656 * as extra argument in the packet header.
1657 */
1658 static struct drbd_peer_request *
read_in_block(struct drbd_peer_device * peer_device,u64 id,sector_t sector,struct packet_info * pi)1659 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1660 struct packet_info *pi) __must_hold(local)
1661 {
1662 struct drbd_device *device = peer_device->device;
1663 const sector_t capacity = get_capacity(device->vdisk);
1664 struct drbd_peer_request *peer_req;
1665 struct page *page;
1666 int digest_size, err;
1667 unsigned int data_size = pi->size, ds;
1668 void *dig_in = peer_device->connection->int_dig_in;
1669 void *dig_vv = peer_device->connection->int_dig_vv;
1670 unsigned long *data;
1671 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1672 struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
1673
1674 digest_size = 0;
1675 if (!trim && peer_device->connection->peer_integrity_tfm) {
1676 digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1677 /*
1678 * FIXME: Receive the incoming digest into the receive buffer
1679 * here, together with its struct p_data?
1680 */
1681 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1682 if (err)
1683 return NULL;
1684 data_size -= digest_size;
1685 }
1686
1687 /* assume request_size == data_size, but special case trim. */
1688 ds = data_size;
1689 if (trim) {
1690 if (!expect(peer_device, data_size == 0))
1691 return NULL;
1692 ds = be32_to_cpu(trim->size);
1693 } else if (zeroes) {
1694 if (!expect(peer_device, data_size == 0))
1695 return NULL;
1696 ds = be32_to_cpu(zeroes->size);
1697 }
1698
1699 if (!expect(peer_device, IS_ALIGNED(ds, 512)))
1700 return NULL;
1701 if (trim || zeroes) {
1702 if (!expect(peer_device, ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1703 return NULL;
1704 } else if (!expect(peer_device, ds <= DRBD_MAX_BIO_SIZE))
1705 return NULL;
1706
1707 /* even though we trust out peer,
1708 * we sometimes have to double check. */
1709 if (sector + (ds>>9) > capacity) {
1710 drbd_err(device, "request from peer beyond end of local disk: "
1711 "capacity: %llus < sector: %llus + size: %u\n",
1712 (unsigned long long)capacity,
1713 (unsigned long long)sector, ds);
1714 return NULL;
1715 }
1716
1717 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1718 * "criss-cross" setup, that might cause write-out on some other DRBD,
1719 * which in turn might block on the other node at this very place. */
1720 peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1721 if (!peer_req)
1722 return NULL;
1723
1724 peer_req->flags |= EE_WRITE;
1725 if (trim) {
1726 peer_req->flags |= EE_TRIM;
1727 return peer_req;
1728 }
1729 if (zeroes) {
1730 peer_req->flags |= EE_ZEROOUT;
1731 return peer_req;
1732 }
1733
1734 /* receive payload size bytes into page chain */
1735 ds = data_size;
1736 page = peer_req->pages;
1737 page_chain_for_each(page) {
1738 unsigned len = min_t(int, ds, PAGE_SIZE);
1739 data = kmap(page);
1740 err = drbd_recv_all_warn(peer_device->connection, data, len);
1741 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1742 drbd_err(device, "Fault injection: Corrupting data on receive\n");
1743 data[0] = data[0] ^ (unsigned long)-1;
1744 }
1745 kunmap(page);
1746 if (err) {
1747 drbd_free_peer_req(device, peer_req);
1748 return NULL;
1749 }
1750 ds -= len;
1751 }
1752
1753 if (digest_size) {
1754 drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1755 if (memcmp(dig_in, dig_vv, digest_size)) {
1756 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1757 (unsigned long long)sector, data_size);
1758 drbd_free_peer_req(device, peer_req);
1759 return NULL;
1760 }
1761 }
1762 device->recv_cnt += data_size >> 9;
1763 return peer_req;
1764 }
1765
1766 /* drbd_drain_block() just takes a data block
1767 * out of the socket input buffer, and discards it.
1768 */
drbd_drain_block(struct drbd_peer_device * peer_device,int data_size)1769 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1770 {
1771 struct page *page;
1772 int err = 0;
1773 void *data;
1774
1775 if (!data_size)
1776 return 0;
1777
1778 page = drbd_alloc_pages(peer_device, 1, 1);
1779
1780 data = kmap(page);
1781 while (data_size) {
1782 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1783
1784 err = drbd_recv_all_warn(peer_device->connection, data, len);
1785 if (err)
1786 break;
1787 data_size -= len;
1788 }
1789 kunmap(page);
1790 drbd_free_pages(peer_device->device, page);
1791 return err;
1792 }
1793
recv_dless_read(struct drbd_peer_device * peer_device,struct drbd_request * req,sector_t sector,int data_size)1794 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1795 sector_t sector, int data_size)
1796 {
1797 struct bio_vec bvec;
1798 struct bvec_iter iter;
1799 struct bio *bio;
1800 int digest_size, err, expect;
1801 void *dig_in = peer_device->connection->int_dig_in;
1802 void *dig_vv = peer_device->connection->int_dig_vv;
1803
1804 digest_size = 0;
1805 if (peer_device->connection->peer_integrity_tfm) {
1806 digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1807 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1808 if (err)
1809 return err;
1810 data_size -= digest_size;
1811 }
1812
1813 /* optimistically update recv_cnt. if receiving fails below,
1814 * we disconnect anyways, and counters will be reset. */
1815 peer_device->device->recv_cnt += data_size>>9;
1816
1817 bio = req->master_bio;
1818 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1819
1820 bio_for_each_segment(bvec, bio, iter) {
1821 void *mapped = bvec_kmap_local(&bvec);
1822 expect = min_t(int, data_size, bvec.bv_len);
1823 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1824 kunmap_local(mapped);
1825 if (err)
1826 return err;
1827 data_size -= expect;
1828 }
1829
1830 if (digest_size) {
1831 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1832 if (memcmp(dig_in, dig_vv, digest_size)) {
1833 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1834 return -EINVAL;
1835 }
1836 }
1837
1838 D_ASSERT(peer_device->device, data_size == 0);
1839 return 0;
1840 }
1841
1842 /*
1843 * e_end_resync_block() is called in ack_sender context via
1844 * drbd_finish_peer_reqs().
1845 */
e_end_resync_block(struct drbd_work * w,int unused)1846 static int e_end_resync_block(struct drbd_work *w, int unused)
1847 {
1848 struct drbd_peer_request *peer_req =
1849 container_of(w, struct drbd_peer_request, w);
1850 struct drbd_peer_device *peer_device = peer_req->peer_device;
1851 struct drbd_device *device = peer_device->device;
1852 sector_t sector = peer_req->i.sector;
1853 int err;
1854
1855 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1856
1857 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1858 drbd_set_in_sync(peer_device, sector, peer_req->i.size);
1859 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1860 } else {
1861 /* Record failure to sync */
1862 drbd_rs_failed_io(peer_device, sector, peer_req->i.size);
1863
1864 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1865 }
1866 dec_unacked(device);
1867
1868 return err;
1869 }
1870
recv_resync_read(struct drbd_peer_device * peer_device,sector_t sector,struct packet_info * pi)1871 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1872 struct packet_info *pi) __releases(local)
1873 {
1874 struct drbd_device *device = peer_device->device;
1875 struct drbd_peer_request *peer_req;
1876
1877 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1878 if (!peer_req)
1879 goto fail;
1880
1881 dec_rs_pending(peer_device);
1882
1883 inc_unacked(device);
1884 /* corresponding dec_unacked() in e_end_resync_block()
1885 * respective _drbd_clear_done_ee */
1886
1887 peer_req->w.cb = e_end_resync_block;
1888 peer_req->opf = REQ_OP_WRITE;
1889 peer_req->submit_jif = jiffies;
1890
1891 spin_lock_irq(&device->resource->req_lock);
1892 list_add_tail(&peer_req->w.list, &device->sync_ee);
1893 spin_unlock_irq(&device->resource->req_lock);
1894
1895 atomic_add(pi->size >> 9, &device->rs_sect_ev);
1896 if (drbd_submit_peer_request(peer_req) == 0)
1897 return 0;
1898
1899 /* don't care for the reason here */
1900 drbd_err(device, "submit failed, triggering re-connect\n");
1901 spin_lock_irq(&device->resource->req_lock);
1902 list_del(&peer_req->w.list);
1903 spin_unlock_irq(&device->resource->req_lock);
1904
1905 drbd_free_peer_req(device, peer_req);
1906 fail:
1907 put_ldev(device);
1908 return -EIO;
1909 }
1910
1911 static struct drbd_request *
find_request(struct drbd_device * device,struct rb_root * root,u64 id,sector_t sector,bool missing_ok,const char * func)1912 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1913 sector_t sector, bool missing_ok, const char *func)
1914 {
1915 struct drbd_request *req;
1916
1917 /* Request object according to our peer */
1918 req = (struct drbd_request *)(unsigned long)id;
1919 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1920 return req;
1921 if (!missing_ok) {
1922 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1923 (unsigned long)id, (unsigned long long)sector);
1924 }
1925 return NULL;
1926 }
1927
receive_DataReply(struct drbd_connection * connection,struct packet_info * pi)1928 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1929 {
1930 struct drbd_peer_device *peer_device;
1931 struct drbd_device *device;
1932 struct drbd_request *req;
1933 sector_t sector;
1934 int err;
1935 struct p_data *p = pi->data;
1936
1937 peer_device = conn_peer_device(connection, pi->vnr);
1938 if (!peer_device)
1939 return -EIO;
1940 device = peer_device->device;
1941
1942 sector = be64_to_cpu(p->sector);
1943
1944 spin_lock_irq(&device->resource->req_lock);
1945 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1946 spin_unlock_irq(&device->resource->req_lock);
1947 if (unlikely(!req))
1948 return -EIO;
1949
1950 err = recv_dless_read(peer_device, req, sector, pi->size);
1951 if (!err)
1952 req_mod(req, DATA_RECEIVED, peer_device);
1953 /* else: nothing. handled from drbd_disconnect...
1954 * I don't think we may complete this just yet
1955 * in case we are "on-disconnect: freeze" */
1956
1957 return err;
1958 }
1959
receive_RSDataReply(struct drbd_connection * connection,struct packet_info * pi)1960 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1961 {
1962 struct drbd_peer_device *peer_device;
1963 struct drbd_device *device;
1964 sector_t sector;
1965 int err;
1966 struct p_data *p = pi->data;
1967
1968 peer_device = conn_peer_device(connection, pi->vnr);
1969 if (!peer_device)
1970 return -EIO;
1971 device = peer_device->device;
1972
1973 sector = be64_to_cpu(p->sector);
1974 D_ASSERT(device, p->block_id == ID_SYNCER);
1975
1976 if (get_ldev(device)) {
1977 /* data is submitted to disk within recv_resync_read.
1978 * corresponding put_ldev done below on error,
1979 * or in drbd_peer_request_endio. */
1980 err = recv_resync_read(peer_device, sector, pi);
1981 } else {
1982 if (drbd_ratelimit())
1983 drbd_err(device, "Can not write resync data to local disk.\n");
1984
1985 err = drbd_drain_block(peer_device, pi->size);
1986
1987 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1988 }
1989
1990 atomic_add(pi->size >> 9, &device->rs_sect_in);
1991
1992 return err;
1993 }
1994
restart_conflicting_writes(struct drbd_device * device,sector_t sector,int size)1995 static void restart_conflicting_writes(struct drbd_device *device,
1996 sector_t sector, int size)
1997 {
1998 struct drbd_interval *i;
1999 struct drbd_request *req;
2000
2001 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2002 if (!i->local)
2003 continue;
2004 req = container_of(i, struct drbd_request, i);
2005 if (req->rq_state & RQ_LOCAL_PENDING ||
2006 !(req->rq_state & RQ_POSTPONED))
2007 continue;
2008 /* as it is RQ_POSTPONED, this will cause it to
2009 * be queued on the retry workqueue. */
2010 __req_mod(req, CONFLICT_RESOLVED, NULL, NULL);
2011 }
2012 }
2013
2014 /*
2015 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2016 */
e_end_block(struct drbd_work * w,int cancel)2017 static int e_end_block(struct drbd_work *w, int cancel)
2018 {
2019 struct drbd_peer_request *peer_req =
2020 container_of(w, struct drbd_peer_request, w);
2021 struct drbd_peer_device *peer_device = peer_req->peer_device;
2022 struct drbd_device *device = peer_device->device;
2023 sector_t sector = peer_req->i.sector;
2024 int err = 0, pcmd;
2025
2026 if (peer_req->flags & EE_SEND_WRITE_ACK) {
2027 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2028 pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2029 device->state.conn <= C_PAUSED_SYNC_T &&
2030 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2031 P_RS_WRITE_ACK : P_WRITE_ACK;
2032 err = drbd_send_ack(peer_device, pcmd, peer_req);
2033 if (pcmd == P_RS_WRITE_ACK)
2034 drbd_set_in_sync(peer_device, sector, peer_req->i.size);
2035 } else {
2036 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2037 /* we expect it to be marked out of sync anyways...
2038 * maybe assert this? */
2039 }
2040 dec_unacked(device);
2041 }
2042
2043 /* we delete from the conflict detection hash _after_ we sent out the
2044 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
2045 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2046 spin_lock_irq(&device->resource->req_lock);
2047 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2048 drbd_remove_epoch_entry_interval(device, peer_req);
2049 if (peer_req->flags & EE_RESTART_REQUESTS)
2050 restart_conflicting_writes(device, sector, peer_req->i.size);
2051 spin_unlock_irq(&device->resource->req_lock);
2052 } else
2053 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2054
2055 drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2056
2057 return err;
2058 }
2059
e_send_ack(struct drbd_work * w,enum drbd_packet ack)2060 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2061 {
2062 struct drbd_peer_request *peer_req =
2063 container_of(w, struct drbd_peer_request, w);
2064 struct drbd_peer_device *peer_device = peer_req->peer_device;
2065 int err;
2066
2067 err = drbd_send_ack(peer_device, ack, peer_req);
2068 dec_unacked(peer_device->device);
2069
2070 return err;
2071 }
2072
e_send_superseded(struct drbd_work * w,int unused)2073 static int e_send_superseded(struct drbd_work *w, int unused)
2074 {
2075 return e_send_ack(w, P_SUPERSEDED);
2076 }
2077
e_send_retry_write(struct drbd_work * w,int unused)2078 static int e_send_retry_write(struct drbd_work *w, int unused)
2079 {
2080 struct drbd_peer_request *peer_req =
2081 container_of(w, struct drbd_peer_request, w);
2082 struct drbd_connection *connection = peer_req->peer_device->connection;
2083
2084 return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2085 P_RETRY_WRITE : P_SUPERSEDED);
2086 }
2087
seq_greater(u32 a,u32 b)2088 static bool seq_greater(u32 a, u32 b)
2089 {
2090 /*
2091 * We assume 32-bit wrap-around here.
2092 * For 24-bit wrap-around, we would have to shift:
2093 * a <<= 8; b <<= 8;
2094 */
2095 return (s32)a - (s32)b > 0;
2096 }
2097
seq_max(u32 a,u32 b)2098 static u32 seq_max(u32 a, u32 b)
2099 {
2100 return seq_greater(a, b) ? a : b;
2101 }
2102
update_peer_seq(struct drbd_peer_device * peer_device,unsigned int peer_seq)2103 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2104 {
2105 struct drbd_device *device = peer_device->device;
2106 unsigned int newest_peer_seq;
2107
2108 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2109 spin_lock(&device->peer_seq_lock);
2110 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2111 device->peer_seq = newest_peer_seq;
2112 spin_unlock(&device->peer_seq_lock);
2113 /* wake up only if we actually changed device->peer_seq */
2114 if (peer_seq == newest_peer_seq)
2115 wake_up(&device->seq_wait);
2116 }
2117 }
2118
overlaps(sector_t s1,int l1,sector_t s2,int l2)2119 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2120 {
2121 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2122 }
2123
2124 /* maybe change sync_ee into interval trees as well? */
overlapping_resync_write(struct drbd_device * device,struct drbd_peer_request * peer_req)2125 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2126 {
2127 struct drbd_peer_request *rs_req;
2128 bool rv = false;
2129
2130 spin_lock_irq(&device->resource->req_lock);
2131 list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2132 if (overlaps(peer_req->i.sector, peer_req->i.size,
2133 rs_req->i.sector, rs_req->i.size)) {
2134 rv = true;
2135 break;
2136 }
2137 }
2138 spin_unlock_irq(&device->resource->req_lock);
2139
2140 return rv;
2141 }
2142
2143 /* Called from receive_Data.
2144 * Synchronize packets on sock with packets on msock.
2145 *
2146 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2147 * packet traveling on msock, they are still processed in the order they have
2148 * been sent.
2149 *
2150 * Note: we don't care for Ack packets overtaking P_DATA packets.
2151 *
2152 * In case packet_seq is larger than device->peer_seq number, there are
2153 * outstanding packets on the msock. We wait for them to arrive.
2154 * In case we are the logically next packet, we update device->peer_seq
2155 * ourselves. Correctly handles 32bit wrap around.
2156 *
2157 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2158 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2159 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2160 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2161 *
2162 * returns 0 if we may process the packet,
2163 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
wait_for_and_update_peer_seq(struct drbd_peer_device * peer_device,const u32 peer_seq)2164 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2165 {
2166 struct drbd_device *device = peer_device->device;
2167 DEFINE_WAIT(wait);
2168 long timeout;
2169 int ret = 0, tp;
2170
2171 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2172 return 0;
2173
2174 spin_lock(&device->peer_seq_lock);
2175 for (;;) {
2176 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2177 device->peer_seq = seq_max(device->peer_seq, peer_seq);
2178 break;
2179 }
2180
2181 if (signal_pending(current)) {
2182 ret = -ERESTARTSYS;
2183 break;
2184 }
2185
2186 rcu_read_lock();
2187 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2188 rcu_read_unlock();
2189
2190 if (!tp)
2191 break;
2192
2193 /* Only need to wait if two_primaries is enabled */
2194 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2195 spin_unlock(&device->peer_seq_lock);
2196 rcu_read_lock();
2197 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2198 rcu_read_unlock();
2199 timeout = schedule_timeout(timeout);
2200 spin_lock(&device->peer_seq_lock);
2201 if (!timeout) {
2202 ret = -ETIMEDOUT;
2203 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2204 break;
2205 }
2206 }
2207 spin_unlock(&device->peer_seq_lock);
2208 finish_wait(&device->seq_wait, &wait);
2209 return ret;
2210 }
2211
wire_flags_to_bio_op(u32 dpf)2212 static enum req_op wire_flags_to_bio_op(u32 dpf)
2213 {
2214 if (dpf & DP_ZEROES)
2215 return REQ_OP_WRITE_ZEROES;
2216 if (dpf & DP_DISCARD)
2217 return REQ_OP_DISCARD;
2218 else
2219 return REQ_OP_WRITE;
2220 }
2221
2222 /* see also bio_flags_to_wire() */
wire_flags_to_bio(struct drbd_connection * connection,u32 dpf)2223 static blk_opf_t wire_flags_to_bio(struct drbd_connection *connection, u32 dpf)
2224 {
2225 return wire_flags_to_bio_op(dpf) |
2226 (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2227 (dpf & DP_FUA ? REQ_FUA : 0) |
2228 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2229 }
2230
fail_postponed_requests(struct drbd_device * device,sector_t sector,unsigned int size)2231 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2232 unsigned int size)
2233 {
2234 struct drbd_peer_device *peer_device = first_peer_device(device);
2235 struct drbd_interval *i;
2236
2237 repeat:
2238 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2239 struct drbd_request *req;
2240 struct bio_and_error m;
2241
2242 if (!i->local)
2243 continue;
2244 req = container_of(i, struct drbd_request, i);
2245 if (!(req->rq_state & RQ_POSTPONED))
2246 continue;
2247 req->rq_state &= ~RQ_POSTPONED;
2248 __req_mod(req, NEG_ACKED, peer_device, &m);
2249 spin_unlock_irq(&device->resource->req_lock);
2250 if (m.bio)
2251 complete_master_bio(device, &m);
2252 spin_lock_irq(&device->resource->req_lock);
2253 goto repeat;
2254 }
2255 }
2256
handle_write_conflicts(struct drbd_device * device,struct drbd_peer_request * peer_req)2257 static int handle_write_conflicts(struct drbd_device *device,
2258 struct drbd_peer_request *peer_req)
2259 {
2260 struct drbd_connection *connection = peer_req->peer_device->connection;
2261 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2262 sector_t sector = peer_req->i.sector;
2263 const unsigned int size = peer_req->i.size;
2264 struct drbd_interval *i;
2265 bool equal;
2266 int err;
2267
2268 /*
2269 * Inserting the peer request into the write_requests tree will prevent
2270 * new conflicting local requests from being added.
2271 */
2272 drbd_insert_interval(&device->write_requests, &peer_req->i);
2273
2274 repeat:
2275 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2276 if (i == &peer_req->i)
2277 continue;
2278 if (i->completed)
2279 continue;
2280
2281 if (!i->local) {
2282 /*
2283 * Our peer has sent a conflicting remote request; this
2284 * should not happen in a two-node setup. Wait for the
2285 * earlier peer request to complete.
2286 */
2287 err = drbd_wait_misc(device, i);
2288 if (err)
2289 goto out;
2290 goto repeat;
2291 }
2292
2293 equal = i->sector == sector && i->size == size;
2294 if (resolve_conflicts) {
2295 /*
2296 * If the peer request is fully contained within the
2297 * overlapping request, it can be considered overwritten
2298 * and thus superseded; otherwise, it will be retried
2299 * once all overlapping requests have completed.
2300 */
2301 bool superseded = i->sector <= sector && i->sector +
2302 (i->size >> 9) >= sector + (size >> 9);
2303
2304 if (!equal)
2305 drbd_alert(device, "Concurrent writes detected: "
2306 "local=%llus +%u, remote=%llus +%u, "
2307 "assuming %s came first\n",
2308 (unsigned long long)i->sector, i->size,
2309 (unsigned long long)sector, size,
2310 superseded ? "local" : "remote");
2311
2312 peer_req->w.cb = superseded ? e_send_superseded :
2313 e_send_retry_write;
2314 list_add_tail(&peer_req->w.list, &device->done_ee);
2315 /* put is in drbd_send_acks_wf() */
2316 kref_get(&device->kref);
2317 if (!queue_work(connection->ack_sender,
2318 &peer_req->peer_device->send_acks_work))
2319 kref_put(&device->kref, drbd_destroy_device);
2320
2321 err = -ENOENT;
2322 goto out;
2323 } else {
2324 struct drbd_request *req =
2325 container_of(i, struct drbd_request, i);
2326
2327 if (!equal)
2328 drbd_alert(device, "Concurrent writes detected: "
2329 "local=%llus +%u, remote=%llus +%u\n",
2330 (unsigned long long)i->sector, i->size,
2331 (unsigned long long)sector, size);
2332
2333 if (req->rq_state & RQ_LOCAL_PENDING ||
2334 !(req->rq_state & RQ_POSTPONED)) {
2335 /*
2336 * Wait for the node with the discard flag to
2337 * decide if this request has been superseded
2338 * or needs to be retried.
2339 * Requests that have been superseded will
2340 * disappear from the write_requests tree.
2341 *
2342 * In addition, wait for the conflicting
2343 * request to finish locally before submitting
2344 * the conflicting peer request.
2345 */
2346 err = drbd_wait_misc(device, &req->i);
2347 if (err) {
2348 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2349 fail_postponed_requests(device, sector, size);
2350 goto out;
2351 }
2352 goto repeat;
2353 }
2354 /*
2355 * Remember to restart the conflicting requests after
2356 * the new peer request has completed.
2357 */
2358 peer_req->flags |= EE_RESTART_REQUESTS;
2359 }
2360 }
2361 err = 0;
2362
2363 out:
2364 if (err)
2365 drbd_remove_epoch_entry_interval(device, peer_req);
2366 return err;
2367 }
2368
2369 /* mirrored write */
receive_Data(struct drbd_connection * connection,struct packet_info * pi)2370 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2371 {
2372 struct drbd_peer_device *peer_device;
2373 struct drbd_device *device;
2374 struct net_conf *nc;
2375 sector_t sector;
2376 struct drbd_peer_request *peer_req;
2377 struct p_data *p = pi->data;
2378 u32 peer_seq = be32_to_cpu(p->seq_num);
2379 u32 dp_flags;
2380 int err, tp;
2381
2382 peer_device = conn_peer_device(connection, pi->vnr);
2383 if (!peer_device)
2384 return -EIO;
2385 device = peer_device->device;
2386
2387 if (!get_ldev(device)) {
2388 int err2;
2389
2390 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2391 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2392 atomic_inc(&connection->current_epoch->epoch_size);
2393 err2 = drbd_drain_block(peer_device, pi->size);
2394 if (!err)
2395 err = err2;
2396 return err;
2397 }
2398
2399 /*
2400 * Corresponding put_ldev done either below (on various errors), or in
2401 * drbd_peer_request_endio, if we successfully submit the data at the
2402 * end of this function.
2403 */
2404
2405 sector = be64_to_cpu(p->sector);
2406 peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2407 if (!peer_req) {
2408 put_ldev(device);
2409 return -EIO;
2410 }
2411
2412 peer_req->w.cb = e_end_block;
2413 peer_req->submit_jif = jiffies;
2414 peer_req->flags |= EE_APPLICATION;
2415
2416 dp_flags = be32_to_cpu(p->dp_flags);
2417 peer_req->opf = wire_flags_to_bio(connection, dp_flags);
2418 if (pi->cmd == P_TRIM) {
2419 D_ASSERT(peer_device, peer_req->i.size > 0);
2420 D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_DISCARD);
2421 D_ASSERT(peer_device, peer_req->pages == NULL);
2422 /* need to play safe: an older DRBD sender
2423 * may mean zero-out while sending P_TRIM. */
2424 if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
2425 peer_req->flags |= EE_ZEROOUT;
2426 } else if (pi->cmd == P_ZEROES) {
2427 D_ASSERT(peer_device, peer_req->i.size > 0);
2428 D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_WRITE_ZEROES);
2429 D_ASSERT(peer_device, peer_req->pages == NULL);
2430 /* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2431 if (dp_flags & DP_DISCARD)
2432 peer_req->flags |= EE_TRIM;
2433 } else if (peer_req->pages == NULL) {
2434 D_ASSERT(device, peer_req->i.size == 0);
2435 D_ASSERT(device, dp_flags & DP_FLUSH);
2436 }
2437
2438 if (dp_flags & DP_MAY_SET_IN_SYNC)
2439 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2440
2441 spin_lock(&connection->epoch_lock);
2442 peer_req->epoch = connection->current_epoch;
2443 atomic_inc(&peer_req->epoch->epoch_size);
2444 atomic_inc(&peer_req->epoch->active);
2445 spin_unlock(&connection->epoch_lock);
2446
2447 rcu_read_lock();
2448 nc = rcu_dereference(peer_device->connection->net_conf);
2449 tp = nc->two_primaries;
2450 if (peer_device->connection->agreed_pro_version < 100) {
2451 switch (nc->wire_protocol) {
2452 case DRBD_PROT_C:
2453 dp_flags |= DP_SEND_WRITE_ACK;
2454 break;
2455 case DRBD_PROT_B:
2456 dp_flags |= DP_SEND_RECEIVE_ACK;
2457 break;
2458 }
2459 }
2460 rcu_read_unlock();
2461
2462 if (dp_flags & DP_SEND_WRITE_ACK) {
2463 peer_req->flags |= EE_SEND_WRITE_ACK;
2464 inc_unacked(device);
2465 /* corresponding dec_unacked() in e_end_block()
2466 * respective _drbd_clear_done_ee */
2467 }
2468
2469 if (dp_flags & DP_SEND_RECEIVE_ACK) {
2470 /* I really don't like it that the receiver thread
2471 * sends on the msock, but anyways */
2472 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2473 }
2474
2475 if (tp) {
2476 /* two primaries implies protocol C */
2477 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2478 peer_req->flags |= EE_IN_INTERVAL_TREE;
2479 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2480 if (err)
2481 goto out_interrupted;
2482 spin_lock_irq(&device->resource->req_lock);
2483 err = handle_write_conflicts(device, peer_req);
2484 if (err) {
2485 spin_unlock_irq(&device->resource->req_lock);
2486 if (err == -ENOENT) {
2487 put_ldev(device);
2488 return 0;
2489 }
2490 goto out_interrupted;
2491 }
2492 } else {
2493 update_peer_seq(peer_device, peer_seq);
2494 spin_lock_irq(&device->resource->req_lock);
2495 }
2496 /* TRIM and is processed synchronously,
2497 * we wait for all pending requests, respectively wait for
2498 * active_ee to become empty in drbd_submit_peer_request();
2499 * better not add ourselves here. */
2500 if ((peer_req->flags & (EE_TRIM | EE_ZEROOUT)) == 0)
2501 list_add_tail(&peer_req->w.list, &device->active_ee);
2502 spin_unlock_irq(&device->resource->req_lock);
2503
2504 if (device->state.conn == C_SYNC_TARGET)
2505 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2506
2507 if (device->state.pdsk < D_INCONSISTENT) {
2508 /* In case we have the only disk of the cluster, */
2509 drbd_set_out_of_sync(peer_device, peer_req->i.sector, peer_req->i.size);
2510 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2511 drbd_al_begin_io(device, &peer_req->i);
2512 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2513 }
2514
2515 err = drbd_submit_peer_request(peer_req);
2516 if (!err)
2517 return 0;
2518
2519 /* don't care for the reason here */
2520 drbd_err(device, "submit failed, triggering re-connect\n");
2521 spin_lock_irq(&device->resource->req_lock);
2522 list_del(&peer_req->w.list);
2523 drbd_remove_epoch_entry_interval(device, peer_req);
2524 spin_unlock_irq(&device->resource->req_lock);
2525 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2526 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2527 drbd_al_complete_io(device, &peer_req->i);
2528 }
2529
2530 out_interrupted:
2531 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2532 put_ldev(device);
2533 drbd_free_peer_req(device, peer_req);
2534 return err;
2535 }
2536
2537 /* We may throttle resync, if the lower device seems to be busy,
2538 * and current sync rate is above c_min_rate.
2539 *
2540 * To decide whether or not the lower device is busy, we use a scheme similar
2541 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2542 * (more than 64 sectors) of activity we cannot account for with our own resync
2543 * activity, it obviously is "busy".
2544 *
2545 * The current sync rate used here uses only the most recent two step marks,
2546 * to have a short time average so we can react faster.
2547 */
drbd_rs_should_slow_down(struct drbd_peer_device * peer_device,sector_t sector,bool throttle_if_app_is_waiting)2548 bool drbd_rs_should_slow_down(struct drbd_peer_device *peer_device, sector_t sector,
2549 bool throttle_if_app_is_waiting)
2550 {
2551 struct drbd_device *device = peer_device->device;
2552 struct lc_element *tmp;
2553 bool throttle = drbd_rs_c_min_rate_throttle(device);
2554
2555 if (!throttle || throttle_if_app_is_waiting)
2556 return throttle;
2557
2558 spin_lock_irq(&device->al_lock);
2559 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2560 if (tmp) {
2561 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2562 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2563 throttle = false;
2564 /* Do not slow down if app IO is already waiting for this extent,
2565 * and our progress is necessary for application IO to complete. */
2566 }
2567 spin_unlock_irq(&device->al_lock);
2568
2569 return throttle;
2570 }
2571
drbd_rs_c_min_rate_throttle(struct drbd_device * device)2572 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2573 {
2574 struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
2575 unsigned long db, dt, dbdt;
2576 unsigned int c_min_rate;
2577 int curr_events;
2578
2579 rcu_read_lock();
2580 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2581 rcu_read_unlock();
2582
2583 /* feature disabled? */
2584 if (c_min_rate == 0)
2585 return false;
2586
2587 curr_events = (int)part_stat_read_accum(disk->part0, sectors) -
2588 atomic_read(&device->rs_sect_ev);
2589
2590 if (atomic_read(&device->ap_actlog_cnt)
2591 || curr_events - device->rs_last_events > 64) {
2592 unsigned long rs_left;
2593 int i;
2594
2595 device->rs_last_events = curr_events;
2596
2597 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2598 * approx. */
2599 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2600
2601 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2602 rs_left = device->ov_left;
2603 else
2604 rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2605
2606 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2607 if (!dt)
2608 dt++;
2609 db = device->rs_mark_left[i] - rs_left;
2610 dbdt = Bit2KB(db/dt);
2611
2612 if (dbdt > c_min_rate)
2613 return true;
2614 }
2615 return false;
2616 }
2617
receive_DataRequest(struct drbd_connection * connection,struct packet_info * pi)2618 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2619 {
2620 struct drbd_peer_device *peer_device;
2621 struct drbd_device *device;
2622 sector_t sector;
2623 sector_t capacity;
2624 struct drbd_peer_request *peer_req;
2625 struct digest_info *di = NULL;
2626 int size, verb;
2627 struct p_block_req *p = pi->data;
2628
2629 peer_device = conn_peer_device(connection, pi->vnr);
2630 if (!peer_device)
2631 return -EIO;
2632 device = peer_device->device;
2633 capacity = get_capacity(device->vdisk);
2634
2635 sector = be64_to_cpu(p->sector);
2636 size = be32_to_cpu(p->blksize);
2637
2638 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2639 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2640 (unsigned long long)sector, size);
2641 return -EINVAL;
2642 }
2643 if (sector + (size>>9) > capacity) {
2644 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2645 (unsigned long long)sector, size);
2646 return -EINVAL;
2647 }
2648
2649 if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2650 verb = 1;
2651 switch (pi->cmd) {
2652 case P_DATA_REQUEST:
2653 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2654 break;
2655 case P_RS_THIN_REQ:
2656 case P_RS_DATA_REQUEST:
2657 case P_CSUM_RS_REQUEST:
2658 case P_OV_REQUEST:
2659 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2660 break;
2661 case P_OV_REPLY:
2662 verb = 0;
2663 dec_rs_pending(peer_device);
2664 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2665 break;
2666 default:
2667 BUG();
2668 }
2669 if (verb && drbd_ratelimit())
2670 drbd_err(device, "Can not satisfy peer's read request, "
2671 "no local data.\n");
2672
2673 /* drain possibly payload */
2674 return drbd_drain_block(peer_device, pi->size);
2675 }
2676
2677 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2678 * "criss-cross" setup, that might cause write-out on some other DRBD,
2679 * which in turn might block on the other node at this very place. */
2680 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2681 size, GFP_NOIO);
2682 if (!peer_req) {
2683 put_ldev(device);
2684 return -ENOMEM;
2685 }
2686 peer_req->opf = REQ_OP_READ;
2687
2688 switch (pi->cmd) {
2689 case P_DATA_REQUEST:
2690 peer_req->w.cb = w_e_end_data_req;
2691 /* application IO, don't drbd_rs_begin_io */
2692 peer_req->flags |= EE_APPLICATION;
2693 goto submit;
2694
2695 case P_RS_THIN_REQ:
2696 /* If at some point in the future we have a smart way to
2697 find out if this data block is completely deallocated,
2698 then we would do something smarter here than reading
2699 the block... */
2700 peer_req->flags |= EE_RS_THIN_REQ;
2701 fallthrough;
2702 case P_RS_DATA_REQUEST:
2703 peer_req->w.cb = w_e_end_rsdata_req;
2704 /* used in the sector offset progress display */
2705 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2706 break;
2707
2708 case P_OV_REPLY:
2709 case P_CSUM_RS_REQUEST:
2710 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2711 if (!di)
2712 goto out_free_e;
2713
2714 di->digest_size = pi->size;
2715 di->digest = (((char *)di)+sizeof(struct digest_info));
2716
2717 peer_req->digest = di;
2718 peer_req->flags |= EE_HAS_DIGEST;
2719
2720 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2721 goto out_free_e;
2722
2723 if (pi->cmd == P_CSUM_RS_REQUEST) {
2724 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2725 peer_req->w.cb = w_e_end_csum_rs_req;
2726 /* used in the sector offset progress display */
2727 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2728 /* remember to report stats in drbd_resync_finished */
2729 device->use_csums = true;
2730 } else if (pi->cmd == P_OV_REPLY) {
2731 /* track progress, we may need to throttle */
2732 atomic_add(size >> 9, &device->rs_sect_in);
2733 peer_req->w.cb = w_e_end_ov_reply;
2734 dec_rs_pending(peer_device);
2735 /* drbd_rs_begin_io done when we sent this request,
2736 * but accounting still needs to be done. */
2737 goto submit_for_resync;
2738 }
2739 break;
2740
2741 case P_OV_REQUEST:
2742 if (device->ov_start_sector == ~(sector_t)0 &&
2743 peer_device->connection->agreed_pro_version >= 90) {
2744 unsigned long now = jiffies;
2745 int i;
2746 device->ov_start_sector = sector;
2747 device->ov_position = sector;
2748 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2749 device->rs_total = device->ov_left;
2750 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2751 device->rs_mark_left[i] = device->ov_left;
2752 device->rs_mark_time[i] = now;
2753 }
2754 drbd_info(device, "Online Verify start sector: %llu\n",
2755 (unsigned long long)sector);
2756 }
2757 peer_req->w.cb = w_e_end_ov_req;
2758 break;
2759
2760 default:
2761 BUG();
2762 }
2763
2764 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2765 * wrt the receiver, but it is not as straightforward as it may seem.
2766 * Various places in the resync start and stop logic assume resync
2767 * requests are processed in order, requeuing this on the worker thread
2768 * introduces a bunch of new code for synchronization between threads.
2769 *
2770 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2771 * "forever", throttling after drbd_rs_begin_io will lock that extent
2772 * for application writes for the same time. For now, just throttle
2773 * here, where the rest of the code expects the receiver to sleep for
2774 * a while, anyways.
2775 */
2776
2777 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2778 * this defers syncer requests for some time, before letting at least
2779 * on request through. The resync controller on the receiving side
2780 * will adapt to the incoming rate accordingly.
2781 *
2782 * We cannot throttle here if remote is Primary/SyncTarget:
2783 * we would also throttle its application reads.
2784 * In that case, throttling is done on the SyncTarget only.
2785 */
2786
2787 /* Even though this may be a resync request, we do add to "read_ee";
2788 * "sync_ee" is only used for resync WRITEs.
2789 * Add to list early, so debugfs can find this request
2790 * even if we have to sleep below. */
2791 spin_lock_irq(&device->resource->req_lock);
2792 list_add_tail(&peer_req->w.list, &device->read_ee);
2793 spin_unlock_irq(&device->resource->req_lock);
2794
2795 update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2796 if (device->state.peer != R_PRIMARY
2797 && drbd_rs_should_slow_down(peer_device, sector, false))
2798 schedule_timeout_uninterruptible(HZ/10);
2799 update_receiver_timing_details(connection, drbd_rs_begin_io);
2800 if (drbd_rs_begin_io(device, sector))
2801 goto out_free_e;
2802
2803 submit_for_resync:
2804 atomic_add(size >> 9, &device->rs_sect_ev);
2805
2806 submit:
2807 update_receiver_timing_details(connection, drbd_submit_peer_request);
2808 inc_unacked(device);
2809 if (drbd_submit_peer_request(peer_req) == 0)
2810 return 0;
2811
2812 /* don't care for the reason here */
2813 drbd_err(device, "submit failed, triggering re-connect\n");
2814
2815 out_free_e:
2816 spin_lock_irq(&device->resource->req_lock);
2817 list_del(&peer_req->w.list);
2818 spin_unlock_irq(&device->resource->req_lock);
2819 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2820
2821 put_ldev(device);
2822 drbd_free_peer_req(device, peer_req);
2823 return -EIO;
2824 }
2825
2826 /*
2827 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
2828 */
drbd_asb_recover_0p(struct drbd_peer_device * peer_device)2829 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2830 {
2831 struct drbd_device *device = peer_device->device;
2832 int self, peer, rv = -100;
2833 unsigned long ch_self, ch_peer;
2834 enum drbd_after_sb_p after_sb_0p;
2835
2836 self = device->ldev->md.uuid[UI_BITMAP] & 1;
2837 peer = device->p_uuid[UI_BITMAP] & 1;
2838
2839 ch_peer = device->p_uuid[UI_SIZE];
2840 ch_self = device->comm_bm_set;
2841
2842 rcu_read_lock();
2843 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2844 rcu_read_unlock();
2845 switch (after_sb_0p) {
2846 case ASB_CONSENSUS:
2847 case ASB_DISCARD_SECONDARY:
2848 case ASB_CALL_HELPER:
2849 case ASB_VIOLENTLY:
2850 drbd_err(device, "Configuration error.\n");
2851 break;
2852 case ASB_DISCONNECT:
2853 break;
2854 case ASB_DISCARD_YOUNGER_PRI:
2855 if (self == 0 && peer == 1) {
2856 rv = -1;
2857 break;
2858 }
2859 if (self == 1 && peer == 0) {
2860 rv = 1;
2861 break;
2862 }
2863 fallthrough; /* to one of the other strategies */
2864 case ASB_DISCARD_OLDER_PRI:
2865 if (self == 0 && peer == 1) {
2866 rv = 1;
2867 break;
2868 }
2869 if (self == 1 && peer == 0) {
2870 rv = -1;
2871 break;
2872 }
2873 /* Else fall through to one of the other strategies... */
2874 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2875 "Using discard-least-changes instead\n");
2876 fallthrough;
2877 case ASB_DISCARD_ZERO_CHG:
2878 if (ch_peer == 0 && ch_self == 0) {
2879 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2880 ? -1 : 1;
2881 break;
2882 } else {
2883 if (ch_peer == 0) { rv = 1; break; }
2884 if (ch_self == 0) { rv = -1; break; }
2885 }
2886 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2887 break;
2888 fallthrough;
2889 case ASB_DISCARD_LEAST_CHG:
2890 if (ch_self < ch_peer)
2891 rv = -1;
2892 else if (ch_self > ch_peer)
2893 rv = 1;
2894 else /* ( ch_self == ch_peer ) */
2895 /* Well, then use something else. */
2896 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2897 ? -1 : 1;
2898 break;
2899 case ASB_DISCARD_LOCAL:
2900 rv = -1;
2901 break;
2902 case ASB_DISCARD_REMOTE:
2903 rv = 1;
2904 }
2905
2906 return rv;
2907 }
2908
2909 /*
2910 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
2911 */
drbd_asb_recover_1p(struct drbd_peer_device * peer_device)2912 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2913 {
2914 struct drbd_device *device = peer_device->device;
2915 int hg, rv = -100;
2916 enum drbd_after_sb_p after_sb_1p;
2917
2918 rcu_read_lock();
2919 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2920 rcu_read_unlock();
2921 switch (after_sb_1p) {
2922 case ASB_DISCARD_YOUNGER_PRI:
2923 case ASB_DISCARD_OLDER_PRI:
2924 case ASB_DISCARD_LEAST_CHG:
2925 case ASB_DISCARD_LOCAL:
2926 case ASB_DISCARD_REMOTE:
2927 case ASB_DISCARD_ZERO_CHG:
2928 drbd_err(device, "Configuration error.\n");
2929 break;
2930 case ASB_DISCONNECT:
2931 break;
2932 case ASB_CONSENSUS:
2933 hg = drbd_asb_recover_0p(peer_device);
2934 if (hg == -1 && device->state.role == R_SECONDARY)
2935 rv = hg;
2936 if (hg == 1 && device->state.role == R_PRIMARY)
2937 rv = hg;
2938 break;
2939 case ASB_VIOLENTLY:
2940 rv = drbd_asb_recover_0p(peer_device);
2941 break;
2942 case ASB_DISCARD_SECONDARY:
2943 return device->state.role == R_PRIMARY ? 1 : -1;
2944 case ASB_CALL_HELPER:
2945 hg = drbd_asb_recover_0p(peer_device);
2946 if (hg == -1 && device->state.role == R_PRIMARY) {
2947 enum drbd_state_rv rv2;
2948
2949 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2950 * we might be here in C_WF_REPORT_PARAMS which is transient.
2951 * we do not need to wait for the after state change work either. */
2952 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2953 if (rv2 != SS_SUCCESS) {
2954 drbd_khelper(device, "pri-lost-after-sb");
2955 } else {
2956 drbd_warn(device, "Successfully gave up primary role.\n");
2957 rv = hg;
2958 }
2959 } else
2960 rv = hg;
2961 }
2962
2963 return rv;
2964 }
2965
2966 /*
2967 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
2968 */
drbd_asb_recover_2p(struct drbd_peer_device * peer_device)2969 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2970 {
2971 struct drbd_device *device = peer_device->device;
2972 int hg, rv = -100;
2973 enum drbd_after_sb_p after_sb_2p;
2974
2975 rcu_read_lock();
2976 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2977 rcu_read_unlock();
2978 switch (after_sb_2p) {
2979 case ASB_DISCARD_YOUNGER_PRI:
2980 case ASB_DISCARD_OLDER_PRI:
2981 case ASB_DISCARD_LEAST_CHG:
2982 case ASB_DISCARD_LOCAL:
2983 case ASB_DISCARD_REMOTE:
2984 case ASB_CONSENSUS:
2985 case ASB_DISCARD_SECONDARY:
2986 case ASB_DISCARD_ZERO_CHG:
2987 drbd_err(device, "Configuration error.\n");
2988 break;
2989 case ASB_VIOLENTLY:
2990 rv = drbd_asb_recover_0p(peer_device);
2991 break;
2992 case ASB_DISCONNECT:
2993 break;
2994 case ASB_CALL_HELPER:
2995 hg = drbd_asb_recover_0p(peer_device);
2996 if (hg == -1) {
2997 enum drbd_state_rv rv2;
2998
2999 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3000 * we might be here in C_WF_REPORT_PARAMS which is transient.
3001 * we do not need to wait for the after state change work either. */
3002 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3003 if (rv2 != SS_SUCCESS) {
3004 drbd_khelper(device, "pri-lost-after-sb");
3005 } else {
3006 drbd_warn(device, "Successfully gave up primary role.\n");
3007 rv = hg;
3008 }
3009 } else
3010 rv = hg;
3011 }
3012
3013 return rv;
3014 }
3015
drbd_uuid_dump(struct drbd_device * device,char * text,u64 * uuid,u64 bits,u64 flags)3016 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3017 u64 bits, u64 flags)
3018 {
3019 if (!uuid) {
3020 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3021 return;
3022 }
3023 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3024 text,
3025 (unsigned long long)uuid[UI_CURRENT],
3026 (unsigned long long)uuid[UI_BITMAP],
3027 (unsigned long long)uuid[UI_HISTORY_START],
3028 (unsigned long long)uuid[UI_HISTORY_END],
3029 (unsigned long long)bits,
3030 (unsigned long long)flags);
3031 }
3032
3033 /*
3034 100 after split brain try auto recover
3035 2 C_SYNC_SOURCE set BitMap
3036 1 C_SYNC_SOURCE use BitMap
3037 0 no Sync
3038 -1 C_SYNC_TARGET use BitMap
3039 -2 C_SYNC_TARGET set BitMap
3040 -100 after split brain, disconnect
3041 -1000 unrelated data
3042 -1091 requires proto 91
3043 -1096 requires proto 96
3044 */
3045
drbd_uuid_compare(struct drbd_peer_device * const peer_device,enum drbd_role const peer_role,int * rule_nr)3046 static int drbd_uuid_compare(struct drbd_peer_device *const peer_device,
3047 enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3048 {
3049 struct drbd_connection *const connection = peer_device->connection;
3050 struct drbd_device *device = peer_device->device;
3051 u64 self, peer;
3052 int i, j;
3053
3054 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3055 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3056
3057 *rule_nr = 10;
3058 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3059 return 0;
3060
3061 *rule_nr = 20;
3062 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3063 peer != UUID_JUST_CREATED)
3064 return -2;
3065
3066 *rule_nr = 30;
3067 if (self != UUID_JUST_CREATED &&
3068 (peer == UUID_JUST_CREATED || peer == (u64)0))
3069 return 2;
3070
3071 if (self == peer) {
3072 int rct, dc; /* roles at crash time */
3073
3074 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3075
3076 if (connection->agreed_pro_version < 91)
3077 return -1091;
3078
3079 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3080 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3081 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3082 drbd_uuid_move_history(device);
3083 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3084 device->ldev->md.uuid[UI_BITMAP] = 0;
3085
3086 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3087 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3088 *rule_nr = 34;
3089 } else {
3090 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3091 *rule_nr = 36;
3092 }
3093
3094 return 1;
3095 }
3096
3097 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3098
3099 if (connection->agreed_pro_version < 91)
3100 return -1091;
3101
3102 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3103 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3104 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3105
3106 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3107 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3108 device->p_uuid[UI_BITMAP] = 0UL;
3109
3110 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3111 *rule_nr = 35;
3112 } else {
3113 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3114 *rule_nr = 37;
3115 }
3116
3117 return -1;
3118 }
3119
3120 /* Common power [off|failure] */
3121 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3122 (device->p_uuid[UI_FLAGS] & 2);
3123 /* lowest bit is set when we were primary,
3124 * next bit (weight 2) is set when peer was primary */
3125 *rule_nr = 40;
3126
3127 /* Neither has the "crashed primary" flag set,
3128 * only a replication link hickup. */
3129 if (rct == 0)
3130 return 0;
3131
3132 /* Current UUID equal and no bitmap uuid; does not necessarily
3133 * mean this was a "simultaneous hard crash", maybe IO was
3134 * frozen, so no UUID-bump happened.
3135 * This is a protocol change, overload DRBD_FF_WSAME as flag
3136 * for "new-enough" peer DRBD version. */
3137 if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3138 *rule_nr = 41;
3139 if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3140 drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3141 return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3142 }
3143 if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3144 /* At least one has the "crashed primary" bit set,
3145 * both are primary now, but neither has rotated its UUIDs?
3146 * "Can not happen." */
3147 drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3148 return -100;
3149 }
3150 if (device->state.role == R_PRIMARY)
3151 return 1;
3152 return -1;
3153 }
3154
3155 /* Both are secondary.
3156 * Really looks like recovery from simultaneous hard crash.
3157 * Check which had been primary before, and arbitrate. */
3158 switch (rct) {
3159 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3160 case 1: /* self_pri && !peer_pri */ return 1;
3161 case 2: /* !self_pri && peer_pri */ return -1;
3162 case 3: /* self_pri && peer_pri */
3163 dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3164 return dc ? -1 : 1;
3165 }
3166 }
3167
3168 *rule_nr = 50;
3169 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3170 if (self == peer)
3171 return -1;
3172
3173 *rule_nr = 51;
3174 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3175 if (self == peer) {
3176 if (connection->agreed_pro_version < 96 ?
3177 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3178 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3179 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3180 /* The last P_SYNC_UUID did not get though. Undo the last start of
3181 resync as sync source modifications of the peer's UUIDs. */
3182
3183 if (connection->agreed_pro_version < 91)
3184 return -1091;
3185
3186 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3187 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3188
3189 drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3190 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3191
3192 return -1;
3193 }
3194 }
3195
3196 *rule_nr = 60;
3197 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3198 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3199 peer = device->p_uuid[i] & ~((u64)1);
3200 if (self == peer)
3201 return -2;
3202 }
3203
3204 *rule_nr = 70;
3205 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3206 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3207 if (self == peer)
3208 return 1;
3209
3210 *rule_nr = 71;
3211 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3212 if (self == peer) {
3213 if (connection->agreed_pro_version < 96 ?
3214 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3215 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3216 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3217 /* The last P_SYNC_UUID did not get though. Undo the last start of
3218 resync as sync source modifications of our UUIDs. */
3219
3220 if (connection->agreed_pro_version < 91)
3221 return -1091;
3222
3223 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3224 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3225
3226 drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3227 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3228 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3229
3230 return 1;
3231 }
3232 }
3233
3234
3235 *rule_nr = 80;
3236 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3237 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3238 self = device->ldev->md.uuid[i] & ~((u64)1);
3239 if (self == peer)
3240 return 2;
3241 }
3242
3243 *rule_nr = 90;
3244 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3245 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3246 if (self == peer && self != ((u64)0))
3247 return 100;
3248
3249 *rule_nr = 100;
3250 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3251 self = device->ldev->md.uuid[i] & ~((u64)1);
3252 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3253 peer = device->p_uuid[j] & ~((u64)1);
3254 if (self == peer)
3255 return -100;
3256 }
3257 }
3258
3259 return -1000;
3260 }
3261
3262 /* drbd_sync_handshake() returns the new conn state on success, or
3263 CONN_MASK (-1) on failure.
3264 */
drbd_sync_handshake(struct drbd_peer_device * peer_device,enum drbd_role peer_role,enum drbd_disk_state peer_disk)3265 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3266 enum drbd_role peer_role,
3267 enum drbd_disk_state peer_disk) __must_hold(local)
3268 {
3269 struct drbd_device *device = peer_device->device;
3270 enum drbd_conns rv = C_MASK;
3271 enum drbd_disk_state mydisk;
3272 struct net_conf *nc;
3273 int hg, rule_nr, rr_conflict, tentative, always_asbp;
3274
3275 mydisk = device->state.disk;
3276 if (mydisk == D_NEGOTIATING)
3277 mydisk = device->new_state_tmp.disk;
3278
3279 drbd_info(device, "drbd_sync_handshake:\n");
3280
3281 spin_lock_irq(&device->ldev->md.uuid_lock);
3282 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3283 drbd_uuid_dump(device, "peer", device->p_uuid,
3284 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3285
3286 hg = drbd_uuid_compare(peer_device, peer_role, &rule_nr);
3287 spin_unlock_irq(&device->ldev->md.uuid_lock);
3288
3289 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3290
3291 if (hg == -1000) {
3292 drbd_alert(device, "Unrelated data, aborting!\n");
3293 return C_MASK;
3294 }
3295 if (hg < -0x10000) {
3296 int proto, fflags;
3297 hg = -hg;
3298 proto = hg & 0xff;
3299 fflags = (hg >> 8) & 0xff;
3300 drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3301 proto, fflags);
3302 return C_MASK;
3303 }
3304 if (hg < -1000) {
3305 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3306 return C_MASK;
3307 }
3308
3309 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3310 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
3311 int f = (hg == -100) || abs(hg) == 2;
3312 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3313 if (f)
3314 hg = hg*2;
3315 drbd_info(device, "Becoming sync %s due to disk states.\n",
3316 hg > 0 ? "source" : "target");
3317 }
3318
3319 if (abs(hg) == 100)
3320 drbd_khelper(device, "initial-split-brain");
3321
3322 rcu_read_lock();
3323 nc = rcu_dereference(peer_device->connection->net_conf);
3324 always_asbp = nc->always_asbp;
3325 rr_conflict = nc->rr_conflict;
3326 tentative = nc->tentative;
3327 rcu_read_unlock();
3328
3329 if (hg == 100 || (hg == -100 && always_asbp)) {
3330 int pcount = (device->state.role == R_PRIMARY)
3331 + (peer_role == R_PRIMARY);
3332 int forced = (hg == -100);
3333
3334 switch (pcount) {
3335 case 0:
3336 hg = drbd_asb_recover_0p(peer_device);
3337 break;
3338 case 1:
3339 hg = drbd_asb_recover_1p(peer_device);
3340 break;
3341 case 2:
3342 hg = drbd_asb_recover_2p(peer_device);
3343 break;
3344 }
3345 if (abs(hg) < 100) {
3346 drbd_warn(device, "Split-Brain detected, %d primaries, "
3347 "automatically solved. Sync from %s node\n",
3348 pcount, (hg < 0) ? "peer" : "this");
3349 if (forced) {
3350 drbd_warn(device, "Doing a full sync, since"
3351 " UUIDs where ambiguous.\n");
3352 hg = hg*2;
3353 }
3354 }
3355 }
3356
3357 if (hg == -100) {
3358 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3359 hg = -1;
3360 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3361 hg = 1;
3362
3363 if (abs(hg) < 100)
3364 drbd_warn(device, "Split-Brain detected, manually solved. "
3365 "Sync from %s node\n",
3366 (hg < 0) ? "peer" : "this");
3367 }
3368
3369 if (hg == -100) {
3370 /* FIXME this log message is not correct if we end up here
3371 * after an attempted attach on a diskless node.
3372 * We just refuse to attach -- well, we drop the "connection"
3373 * to that disk, in a way... */
3374 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3375 drbd_khelper(device, "split-brain");
3376 return C_MASK;
3377 }
3378
3379 if (hg > 0 && mydisk <= D_INCONSISTENT) {
3380 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3381 return C_MASK;
3382 }
3383
3384 if (hg < 0 && /* by intention we do not use mydisk here. */
3385 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3386 switch (rr_conflict) {
3387 case ASB_CALL_HELPER:
3388 drbd_khelper(device, "pri-lost");
3389 fallthrough;
3390 case ASB_DISCONNECT:
3391 drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3392 return C_MASK;
3393 case ASB_VIOLENTLY:
3394 drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3395 "assumption\n");
3396 }
3397 }
3398
3399 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3400 if (hg == 0)
3401 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3402 else
3403 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3404 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3405 abs(hg) >= 2 ? "full" : "bit-map based");
3406 return C_MASK;
3407 }
3408
3409 if (abs(hg) >= 2) {
3410 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3411 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3412 BM_LOCKED_SET_ALLOWED, NULL))
3413 return C_MASK;
3414 }
3415
3416 if (hg > 0) { /* become sync source. */
3417 rv = C_WF_BITMAP_S;
3418 } else if (hg < 0) { /* become sync target */
3419 rv = C_WF_BITMAP_T;
3420 } else {
3421 rv = C_CONNECTED;
3422 if (drbd_bm_total_weight(device)) {
3423 drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3424 drbd_bm_total_weight(device));
3425 }
3426 }
3427
3428 return rv;
3429 }
3430
convert_after_sb(enum drbd_after_sb_p peer)3431 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3432 {
3433 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3434 if (peer == ASB_DISCARD_REMOTE)
3435 return ASB_DISCARD_LOCAL;
3436
3437 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3438 if (peer == ASB_DISCARD_LOCAL)
3439 return ASB_DISCARD_REMOTE;
3440
3441 /* everything else is valid if they are equal on both sides. */
3442 return peer;
3443 }
3444
receive_protocol(struct drbd_connection * connection,struct packet_info * pi)3445 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3446 {
3447 struct p_protocol *p = pi->data;
3448 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3449 int p_proto, p_discard_my_data, p_two_primaries, cf;
3450 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3451 char integrity_alg[SHARED_SECRET_MAX] = "";
3452 struct crypto_shash *peer_integrity_tfm = NULL;
3453 void *int_dig_in = NULL, *int_dig_vv = NULL;
3454
3455 p_proto = be32_to_cpu(p->protocol);
3456 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3457 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3458 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
3459 p_two_primaries = be32_to_cpu(p->two_primaries);
3460 cf = be32_to_cpu(p->conn_flags);
3461 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3462
3463 if (connection->agreed_pro_version >= 87) {
3464 int err;
3465
3466 if (pi->size > sizeof(integrity_alg))
3467 return -EIO;
3468 err = drbd_recv_all(connection, integrity_alg, pi->size);
3469 if (err)
3470 return err;
3471 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3472 }
3473
3474 if (pi->cmd != P_PROTOCOL_UPDATE) {
3475 clear_bit(CONN_DRY_RUN, &connection->flags);
3476
3477 if (cf & CF_DRY_RUN)
3478 set_bit(CONN_DRY_RUN, &connection->flags);
3479
3480 rcu_read_lock();
3481 nc = rcu_dereference(connection->net_conf);
3482
3483 if (p_proto != nc->wire_protocol) {
3484 drbd_err(connection, "incompatible %s settings\n", "protocol");
3485 goto disconnect_rcu_unlock;
3486 }
3487
3488 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3489 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3490 goto disconnect_rcu_unlock;
3491 }
3492
3493 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3494 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3495 goto disconnect_rcu_unlock;
3496 }
3497
3498 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3499 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3500 goto disconnect_rcu_unlock;
3501 }
3502
3503 if (p_discard_my_data && nc->discard_my_data) {
3504 drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3505 goto disconnect_rcu_unlock;
3506 }
3507
3508 if (p_two_primaries != nc->two_primaries) {
3509 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3510 goto disconnect_rcu_unlock;
3511 }
3512
3513 if (strcmp(integrity_alg, nc->integrity_alg)) {
3514 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3515 goto disconnect_rcu_unlock;
3516 }
3517
3518 rcu_read_unlock();
3519 }
3520
3521 if (integrity_alg[0]) {
3522 int hash_size;
3523
3524 /*
3525 * We can only change the peer data integrity algorithm
3526 * here. Changing our own data integrity algorithm
3527 * requires that we send a P_PROTOCOL_UPDATE packet at
3528 * the same time; otherwise, the peer has no way to
3529 * tell between which packets the algorithm should
3530 * change.
3531 */
3532
3533 peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
3534 if (IS_ERR(peer_integrity_tfm)) {
3535 peer_integrity_tfm = NULL;
3536 drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3537 integrity_alg);
3538 goto disconnect;
3539 }
3540
3541 hash_size = crypto_shash_digestsize(peer_integrity_tfm);
3542 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3543 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3544 if (!(int_dig_in && int_dig_vv)) {
3545 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3546 goto disconnect;
3547 }
3548 }
3549
3550 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3551 if (!new_net_conf)
3552 goto disconnect;
3553
3554 mutex_lock(&connection->data.mutex);
3555 mutex_lock(&connection->resource->conf_update);
3556 old_net_conf = connection->net_conf;
3557 *new_net_conf = *old_net_conf;
3558
3559 new_net_conf->wire_protocol = p_proto;
3560 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3561 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3562 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3563 new_net_conf->two_primaries = p_two_primaries;
3564
3565 rcu_assign_pointer(connection->net_conf, new_net_conf);
3566 mutex_unlock(&connection->resource->conf_update);
3567 mutex_unlock(&connection->data.mutex);
3568
3569 crypto_free_shash(connection->peer_integrity_tfm);
3570 kfree(connection->int_dig_in);
3571 kfree(connection->int_dig_vv);
3572 connection->peer_integrity_tfm = peer_integrity_tfm;
3573 connection->int_dig_in = int_dig_in;
3574 connection->int_dig_vv = int_dig_vv;
3575
3576 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3577 drbd_info(connection, "peer data-integrity-alg: %s\n",
3578 integrity_alg[0] ? integrity_alg : "(none)");
3579
3580 kvfree_rcu_mightsleep(old_net_conf);
3581 return 0;
3582
3583 disconnect_rcu_unlock:
3584 rcu_read_unlock();
3585 disconnect:
3586 crypto_free_shash(peer_integrity_tfm);
3587 kfree(int_dig_in);
3588 kfree(int_dig_vv);
3589 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3590 return -EIO;
3591 }
3592
3593 /* helper function
3594 * input: alg name, feature name
3595 * return: NULL (alg name was "")
3596 * ERR_PTR(error) if something goes wrong
3597 * or the crypto hash ptr, if it worked out ok. */
drbd_crypto_alloc_digest_safe(const struct drbd_device * device,const char * alg,const char * name)3598 static struct crypto_shash *drbd_crypto_alloc_digest_safe(
3599 const struct drbd_device *device,
3600 const char *alg, const char *name)
3601 {
3602 struct crypto_shash *tfm;
3603
3604 if (!alg[0])
3605 return NULL;
3606
3607 tfm = crypto_alloc_shash(alg, 0, 0);
3608 if (IS_ERR(tfm)) {
3609 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3610 alg, name, PTR_ERR(tfm));
3611 return tfm;
3612 }
3613 return tfm;
3614 }
3615
ignore_remaining_packet(struct drbd_connection * connection,struct packet_info * pi)3616 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3617 {
3618 void *buffer = connection->data.rbuf;
3619 int size = pi->size;
3620
3621 while (size) {
3622 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3623 s = drbd_recv(connection, buffer, s);
3624 if (s <= 0) {
3625 if (s < 0)
3626 return s;
3627 break;
3628 }
3629 size -= s;
3630 }
3631 if (size)
3632 return -EIO;
3633 return 0;
3634 }
3635
3636 /*
3637 * config_unknown_volume - device configuration command for unknown volume
3638 *
3639 * When a device is added to an existing connection, the node on which the
3640 * device is added first will send configuration commands to its peer but the
3641 * peer will not know about the device yet. It will warn and ignore these
3642 * commands. Once the device is added on the second node, the second node will
3643 * send the same device configuration commands, but in the other direction.
3644 *
3645 * (We can also end up here if drbd is misconfigured.)
3646 */
config_unknown_volume(struct drbd_connection * connection,struct packet_info * pi)3647 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3648 {
3649 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3650 cmdname(pi->cmd), pi->vnr);
3651 return ignore_remaining_packet(connection, pi);
3652 }
3653
receive_SyncParam(struct drbd_connection * connection,struct packet_info * pi)3654 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3655 {
3656 struct drbd_peer_device *peer_device;
3657 struct drbd_device *device;
3658 struct p_rs_param_95 *p;
3659 unsigned int header_size, data_size, exp_max_sz;
3660 struct crypto_shash *verify_tfm = NULL;
3661 struct crypto_shash *csums_tfm = NULL;
3662 struct net_conf *old_net_conf, *new_net_conf = NULL;
3663 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3664 const int apv = connection->agreed_pro_version;
3665 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3666 unsigned int fifo_size = 0;
3667 int err;
3668
3669 peer_device = conn_peer_device(connection, pi->vnr);
3670 if (!peer_device)
3671 return config_unknown_volume(connection, pi);
3672 device = peer_device->device;
3673
3674 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3675 : apv == 88 ? sizeof(struct p_rs_param)
3676 + SHARED_SECRET_MAX
3677 : apv <= 94 ? sizeof(struct p_rs_param_89)
3678 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3679
3680 if (pi->size > exp_max_sz) {
3681 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3682 pi->size, exp_max_sz);
3683 return -EIO;
3684 }
3685
3686 if (apv <= 88) {
3687 header_size = sizeof(struct p_rs_param);
3688 data_size = pi->size - header_size;
3689 } else if (apv <= 94) {
3690 header_size = sizeof(struct p_rs_param_89);
3691 data_size = pi->size - header_size;
3692 D_ASSERT(device, data_size == 0);
3693 } else {
3694 header_size = sizeof(struct p_rs_param_95);
3695 data_size = pi->size - header_size;
3696 D_ASSERT(device, data_size == 0);
3697 }
3698
3699 /* initialize verify_alg and csums_alg */
3700 p = pi->data;
3701 BUILD_BUG_ON(sizeof(p->algs) != 2 * SHARED_SECRET_MAX);
3702 memset(&p->algs, 0, sizeof(p->algs));
3703
3704 err = drbd_recv_all(peer_device->connection, p, header_size);
3705 if (err)
3706 return err;
3707
3708 mutex_lock(&connection->resource->conf_update);
3709 old_net_conf = peer_device->connection->net_conf;
3710 if (get_ldev(device)) {
3711 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3712 if (!new_disk_conf) {
3713 put_ldev(device);
3714 mutex_unlock(&connection->resource->conf_update);
3715 drbd_err(device, "Allocation of new disk_conf failed\n");
3716 return -ENOMEM;
3717 }
3718
3719 old_disk_conf = device->ldev->disk_conf;
3720 *new_disk_conf = *old_disk_conf;
3721
3722 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3723 }
3724
3725 if (apv >= 88) {
3726 if (apv == 88) {
3727 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3728 drbd_err(device, "verify-alg of wrong size, "
3729 "peer wants %u, accepting only up to %u byte\n",
3730 data_size, SHARED_SECRET_MAX);
3731 goto reconnect;
3732 }
3733
3734 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3735 if (err)
3736 goto reconnect;
3737 /* we expect NUL terminated string */
3738 /* but just in case someone tries to be evil */
3739 D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3740 p->verify_alg[data_size-1] = 0;
3741
3742 } else /* apv >= 89 */ {
3743 /* we still expect NUL terminated strings */
3744 /* but just in case someone tries to be evil */
3745 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3746 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3747 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3748 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3749 }
3750
3751 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3752 if (device->state.conn == C_WF_REPORT_PARAMS) {
3753 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3754 old_net_conf->verify_alg, p->verify_alg);
3755 goto disconnect;
3756 }
3757 verify_tfm = drbd_crypto_alloc_digest_safe(device,
3758 p->verify_alg, "verify-alg");
3759 if (IS_ERR(verify_tfm)) {
3760 verify_tfm = NULL;
3761 goto disconnect;
3762 }
3763 }
3764
3765 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3766 if (device->state.conn == C_WF_REPORT_PARAMS) {
3767 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3768 old_net_conf->csums_alg, p->csums_alg);
3769 goto disconnect;
3770 }
3771 csums_tfm = drbd_crypto_alloc_digest_safe(device,
3772 p->csums_alg, "csums-alg");
3773 if (IS_ERR(csums_tfm)) {
3774 csums_tfm = NULL;
3775 goto disconnect;
3776 }
3777 }
3778
3779 if (apv > 94 && new_disk_conf) {
3780 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3781 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3782 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3783 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3784
3785 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3786 if (fifo_size != device->rs_plan_s->size) {
3787 new_plan = fifo_alloc(fifo_size);
3788 if (!new_plan) {
3789 drbd_err(device, "kmalloc of fifo_buffer failed");
3790 put_ldev(device);
3791 goto disconnect;
3792 }
3793 }
3794 }
3795
3796 if (verify_tfm || csums_tfm) {
3797 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3798 if (!new_net_conf)
3799 goto disconnect;
3800
3801 *new_net_conf = *old_net_conf;
3802
3803 if (verify_tfm) {
3804 strcpy(new_net_conf->verify_alg, p->verify_alg);
3805 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3806 crypto_free_shash(peer_device->connection->verify_tfm);
3807 peer_device->connection->verify_tfm = verify_tfm;
3808 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3809 }
3810 if (csums_tfm) {
3811 strcpy(new_net_conf->csums_alg, p->csums_alg);
3812 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3813 crypto_free_shash(peer_device->connection->csums_tfm);
3814 peer_device->connection->csums_tfm = csums_tfm;
3815 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3816 }
3817 rcu_assign_pointer(connection->net_conf, new_net_conf);
3818 }
3819 }
3820
3821 if (new_disk_conf) {
3822 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3823 put_ldev(device);
3824 }
3825
3826 if (new_plan) {
3827 old_plan = device->rs_plan_s;
3828 rcu_assign_pointer(device->rs_plan_s, new_plan);
3829 }
3830
3831 mutex_unlock(&connection->resource->conf_update);
3832 synchronize_rcu();
3833 if (new_net_conf)
3834 kfree(old_net_conf);
3835 kfree(old_disk_conf);
3836 kfree(old_plan);
3837
3838 return 0;
3839
3840 reconnect:
3841 if (new_disk_conf) {
3842 put_ldev(device);
3843 kfree(new_disk_conf);
3844 }
3845 mutex_unlock(&connection->resource->conf_update);
3846 return -EIO;
3847
3848 disconnect:
3849 kfree(new_plan);
3850 if (new_disk_conf) {
3851 put_ldev(device);
3852 kfree(new_disk_conf);
3853 }
3854 mutex_unlock(&connection->resource->conf_update);
3855 /* just for completeness: actually not needed,
3856 * as this is not reached if csums_tfm was ok. */
3857 crypto_free_shash(csums_tfm);
3858 /* but free the verify_tfm again, if csums_tfm did not work out */
3859 crypto_free_shash(verify_tfm);
3860 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3861 return -EIO;
3862 }
3863
3864 /* warn if the arguments differ by more than 12.5% */
warn_if_differ_considerably(struct drbd_device * device,const char * s,sector_t a,sector_t b)3865 static void warn_if_differ_considerably(struct drbd_device *device,
3866 const char *s, sector_t a, sector_t b)
3867 {
3868 sector_t d;
3869 if (a == 0 || b == 0)
3870 return;
3871 d = (a > b) ? (a - b) : (b - a);
3872 if (d > (a>>3) || d > (b>>3))
3873 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3874 (unsigned long long)a, (unsigned long long)b);
3875 }
3876
receive_sizes(struct drbd_connection * connection,struct packet_info * pi)3877 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3878 {
3879 struct drbd_peer_device *peer_device;
3880 struct drbd_device *device;
3881 struct p_sizes *p = pi->data;
3882 struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
3883 enum determine_dev_size dd = DS_UNCHANGED;
3884 sector_t p_size, p_usize, p_csize, my_usize;
3885 sector_t new_size, cur_size;
3886 int ldsc = 0; /* local disk size changed */
3887 enum dds_flags ddsf;
3888
3889 peer_device = conn_peer_device(connection, pi->vnr);
3890 if (!peer_device)
3891 return config_unknown_volume(connection, pi);
3892 device = peer_device->device;
3893 cur_size = get_capacity(device->vdisk);
3894
3895 p_size = be64_to_cpu(p->d_size);
3896 p_usize = be64_to_cpu(p->u_size);
3897 p_csize = be64_to_cpu(p->c_size);
3898
3899 /* just store the peer's disk size for now.
3900 * we still need to figure out whether we accept that. */
3901 device->p_size = p_size;
3902
3903 if (get_ldev(device)) {
3904 rcu_read_lock();
3905 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3906 rcu_read_unlock();
3907
3908 warn_if_differ_considerably(device, "lower level device sizes",
3909 p_size, drbd_get_max_capacity(device->ldev));
3910 warn_if_differ_considerably(device, "user requested size",
3911 p_usize, my_usize);
3912
3913 /* if this is the first connect, or an otherwise expected
3914 * param exchange, choose the minimum */
3915 if (device->state.conn == C_WF_REPORT_PARAMS)
3916 p_usize = min_not_zero(my_usize, p_usize);
3917
3918 /* Never shrink a device with usable data during connect,
3919 * or "attach" on the peer.
3920 * But allow online shrinking if we are connected. */
3921 new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
3922 if (new_size < cur_size &&
3923 device->state.disk >= D_OUTDATED &&
3924 (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) {
3925 drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
3926 (unsigned long long)new_size, (unsigned long long)cur_size);
3927 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3928 put_ldev(device);
3929 return -EIO;
3930 }
3931
3932 if (my_usize != p_usize) {
3933 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3934
3935 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3936 if (!new_disk_conf) {
3937 put_ldev(device);
3938 return -ENOMEM;
3939 }
3940
3941 mutex_lock(&connection->resource->conf_update);
3942 old_disk_conf = device->ldev->disk_conf;
3943 *new_disk_conf = *old_disk_conf;
3944 new_disk_conf->disk_size = p_usize;
3945
3946 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3947 mutex_unlock(&connection->resource->conf_update);
3948 kvfree_rcu_mightsleep(old_disk_conf);
3949
3950 drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n",
3951 (unsigned long)p_usize, (unsigned long)my_usize);
3952 }
3953
3954 put_ldev(device);
3955 }
3956
3957 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3958 /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
3959 In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3960 drbd_reconsider_queue_parameters(), we can be sure that after
3961 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3962
3963 ddsf = be16_to_cpu(p->dds_flags);
3964 if (get_ldev(device)) {
3965 drbd_reconsider_queue_parameters(device, device->ldev, o);
3966 dd = drbd_determine_dev_size(device, ddsf, NULL);
3967 put_ldev(device);
3968 if (dd == DS_ERROR)
3969 return -EIO;
3970 drbd_md_sync(device);
3971 } else {
3972 /*
3973 * I am diskless, need to accept the peer's *current* size.
3974 * I must NOT accept the peers backing disk size,
3975 * it may have been larger than mine all along...
3976 *
3977 * At this point, the peer knows more about my disk, or at
3978 * least about what we last agreed upon, than myself.
3979 * So if his c_size is less than his d_size, the most likely
3980 * reason is that *my* d_size was smaller last time we checked.
3981 *
3982 * However, if he sends a zero current size,
3983 * take his (user-capped or) backing disk size anyways.
3984 *
3985 * Unless of course he does not have a disk himself.
3986 * In which case we ignore this completely.
3987 */
3988 sector_t new_size = p_csize ?: p_usize ?: p_size;
3989 drbd_reconsider_queue_parameters(device, NULL, o);
3990 if (new_size == 0) {
3991 /* Ignore, peer does not know nothing. */
3992 } else if (new_size == cur_size) {
3993 /* nothing to do */
3994 } else if (cur_size != 0 && p_size == 0) {
3995 drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
3996 (unsigned long long)new_size, (unsigned long long)cur_size);
3997 } else if (new_size < cur_size && device->state.role == R_PRIMARY) {
3998 drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
3999 (unsigned long long)new_size, (unsigned long long)cur_size);
4000 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4001 return -EIO;
4002 } else {
4003 /* I believe the peer, if
4004 * - I don't have a current size myself
4005 * - we agree on the size anyways
4006 * - I do have a current size, am Secondary,
4007 * and he has the only disk
4008 * - I do have a current size, am Primary,
4009 * and he has the only disk,
4010 * which is larger than my current size
4011 */
4012 drbd_set_my_capacity(device, new_size);
4013 }
4014 }
4015
4016 if (get_ldev(device)) {
4017 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4018 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4019 ldsc = 1;
4020 }
4021
4022 put_ldev(device);
4023 }
4024
4025 if (device->state.conn > C_WF_REPORT_PARAMS) {
4026 if (be64_to_cpu(p->c_size) != get_capacity(device->vdisk) ||
4027 ldsc) {
4028 /* we have different sizes, probably peer
4029 * needs to know my new size... */
4030 drbd_send_sizes(peer_device, 0, ddsf);
4031 }
4032 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4033 (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4034 if (device->state.pdsk >= D_INCONSISTENT &&
4035 device->state.disk >= D_INCONSISTENT) {
4036 if (ddsf & DDSF_NO_RESYNC)
4037 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4038 else
4039 resync_after_online_grow(device);
4040 } else
4041 set_bit(RESYNC_AFTER_NEG, &device->flags);
4042 }
4043 }
4044
4045 return 0;
4046 }
4047
receive_uuids(struct drbd_connection * connection,struct packet_info * pi)4048 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4049 {
4050 struct drbd_peer_device *peer_device;
4051 struct drbd_device *device;
4052 struct p_uuids *p = pi->data;
4053 u64 *p_uuid;
4054 int i, updated_uuids = 0;
4055
4056 peer_device = conn_peer_device(connection, pi->vnr);
4057 if (!peer_device)
4058 return config_unknown_volume(connection, pi);
4059 device = peer_device->device;
4060
4061 p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4062 if (!p_uuid)
4063 return false;
4064
4065 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4066 p_uuid[i] = be64_to_cpu(p->uuid[i]);
4067
4068 kfree(device->p_uuid);
4069 device->p_uuid = p_uuid;
4070
4071 if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4072 device->state.disk < D_INCONSISTENT &&
4073 device->state.role == R_PRIMARY &&
4074 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4075 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4076 (unsigned long long)device->ed_uuid);
4077 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4078 return -EIO;
4079 }
4080
4081 if (get_ldev(device)) {
4082 int skip_initial_sync =
4083 device->state.conn == C_CONNECTED &&
4084 peer_device->connection->agreed_pro_version >= 90 &&
4085 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4086 (p_uuid[UI_FLAGS] & 8);
4087 if (skip_initial_sync) {
4088 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4089 drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4090 "clear_n_write from receive_uuids",
4091 BM_LOCKED_TEST_ALLOWED, NULL);
4092 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4093 _drbd_uuid_set(device, UI_BITMAP, 0);
4094 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4095 CS_VERBOSE, NULL);
4096 drbd_md_sync(device);
4097 updated_uuids = 1;
4098 }
4099 put_ldev(device);
4100 } else if (device->state.disk < D_INCONSISTENT &&
4101 device->state.role == R_PRIMARY) {
4102 /* I am a diskless primary, the peer just created a new current UUID
4103 for me. */
4104 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4105 }
4106
4107 /* Before we test for the disk state, we should wait until an eventually
4108 ongoing cluster wide state change is finished. That is important if
4109 we are primary and are detaching from our disk. We need to see the
4110 new disk state... */
4111 mutex_lock(device->state_mutex);
4112 mutex_unlock(device->state_mutex);
4113 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4114 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4115
4116 if (updated_uuids)
4117 drbd_print_uuids(device, "receiver updated UUIDs to");
4118
4119 return 0;
4120 }
4121
4122 /**
4123 * convert_state() - Converts the peer's view of the cluster state to our point of view
4124 * @ps: The state as seen by the peer.
4125 */
convert_state(union drbd_state ps)4126 static union drbd_state convert_state(union drbd_state ps)
4127 {
4128 union drbd_state ms;
4129
4130 static enum drbd_conns c_tab[] = {
4131 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4132 [C_CONNECTED] = C_CONNECTED,
4133
4134 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4135 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4136 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4137 [C_VERIFY_S] = C_VERIFY_T,
4138 [C_MASK] = C_MASK,
4139 };
4140
4141 ms.i = ps.i;
4142
4143 ms.conn = c_tab[ps.conn];
4144 ms.peer = ps.role;
4145 ms.role = ps.peer;
4146 ms.pdsk = ps.disk;
4147 ms.disk = ps.pdsk;
4148 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4149
4150 return ms;
4151 }
4152
receive_req_state(struct drbd_connection * connection,struct packet_info * pi)4153 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4154 {
4155 struct drbd_peer_device *peer_device;
4156 struct drbd_device *device;
4157 struct p_req_state *p = pi->data;
4158 union drbd_state mask, val;
4159 enum drbd_state_rv rv;
4160
4161 peer_device = conn_peer_device(connection, pi->vnr);
4162 if (!peer_device)
4163 return -EIO;
4164 device = peer_device->device;
4165
4166 mask.i = be32_to_cpu(p->mask);
4167 val.i = be32_to_cpu(p->val);
4168
4169 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4170 mutex_is_locked(device->state_mutex)) {
4171 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4172 return 0;
4173 }
4174
4175 mask = convert_state(mask);
4176 val = convert_state(val);
4177
4178 rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4179 drbd_send_sr_reply(peer_device, rv);
4180
4181 drbd_md_sync(device);
4182
4183 return 0;
4184 }
4185
receive_req_conn_state(struct drbd_connection * connection,struct packet_info * pi)4186 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4187 {
4188 struct p_req_state *p = pi->data;
4189 union drbd_state mask, val;
4190 enum drbd_state_rv rv;
4191
4192 mask.i = be32_to_cpu(p->mask);
4193 val.i = be32_to_cpu(p->val);
4194
4195 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4196 mutex_is_locked(&connection->cstate_mutex)) {
4197 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4198 return 0;
4199 }
4200
4201 mask = convert_state(mask);
4202 val = convert_state(val);
4203
4204 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4205 conn_send_sr_reply(connection, rv);
4206
4207 return 0;
4208 }
4209
receive_state(struct drbd_connection * connection,struct packet_info * pi)4210 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4211 {
4212 struct drbd_peer_device *peer_device;
4213 struct drbd_device *device;
4214 struct p_state *p = pi->data;
4215 union drbd_state os, ns, peer_state;
4216 enum drbd_disk_state real_peer_disk;
4217 enum chg_state_flags cs_flags;
4218 int rv;
4219
4220 peer_device = conn_peer_device(connection, pi->vnr);
4221 if (!peer_device)
4222 return config_unknown_volume(connection, pi);
4223 device = peer_device->device;
4224
4225 peer_state.i = be32_to_cpu(p->state);
4226
4227 real_peer_disk = peer_state.disk;
4228 if (peer_state.disk == D_NEGOTIATING) {
4229 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4230 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4231 }
4232
4233 spin_lock_irq(&device->resource->req_lock);
4234 retry:
4235 os = ns = drbd_read_state(device);
4236 spin_unlock_irq(&device->resource->req_lock);
4237
4238 /* If some other part of the code (ack_receiver thread, timeout)
4239 * already decided to close the connection again,
4240 * we must not "re-establish" it here. */
4241 if (os.conn <= C_TEAR_DOWN)
4242 return -ECONNRESET;
4243
4244 /* If this is the "end of sync" confirmation, usually the peer disk
4245 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4246 * set) resync started in PausedSyncT, or if the timing of pause-/
4247 * unpause-sync events has been "just right", the peer disk may
4248 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4249 */
4250 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4251 real_peer_disk == D_UP_TO_DATE &&
4252 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4253 /* If we are (becoming) SyncSource, but peer is still in sync
4254 * preparation, ignore its uptodate-ness to avoid flapping, it
4255 * will change to inconsistent once the peer reaches active
4256 * syncing states.
4257 * It may have changed syncer-paused flags, however, so we
4258 * cannot ignore this completely. */
4259 if (peer_state.conn > C_CONNECTED &&
4260 peer_state.conn < C_SYNC_SOURCE)
4261 real_peer_disk = D_INCONSISTENT;
4262
4263 /* if peer_state changes to connected at the same time,
4264 * it explicitly notifies us that it finished resync.
4265 * Maybe we should finish it up, too? */
4266 else if (os.conn >= C_SYNC_SOURCE &&
4267 peer_state.conn == C_CONNECTED) {
4268 if (drbd_bm_total_weight(device) <= device->rs_failed)
4269 drbd_resync_finished(peer_device);
4270 return 0;
4271 }
4272 }
4273
4274 /* explicit verify finished notification, stop sector reached. */
4275 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4276 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4277 ov_out_of_sync_print(peer_device);
4278 drbd_resync_finished(peer_device);
4279 return 0;
4280 }
4281
4282 /* peer says his disk is inconsistent, while we think it is uptodate,
4283 * and this happens while the peer still thinks we have a sync going on,
4284 * but we think we are already done with the sync.
4285 * We ignore this to avoid flapping pdsk.
4286 * This should not happen, if the peer is a recent version of drbd. */
4287 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4288 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4289 real_peer_disk = D_UP_TO_DATE;
4290
4291 if (ns.conn == C_WF_REPORT_PARAMS)
4292 ns.conn = C_CONNECTED;
4293
4294 if (peer_state.conn == C_AHEAD)
4295 ns.conn = C_BEHIND;
4296
4297 /* TODO:
4298 * if (primary and diskless and peer uuid != effective uuid)
4299 * abort attach on peer;
4300 *
4301 * If this node does not have good data, was already connected, but
4302 * the peer did a late attach only now, trying to "negotiate" with me,
4303 * AND I am currently Primary, possibly frozen, with some specific
4304 * "effective" uuid, this should never be reached, really, because
4305 * we first send the uuids, then the current state.
4306 *
4307 * In this scenario, we already dropped the connection hard
4308 * when we received the unsuitable uuids (receive_uuids().
4309 *
4310 * Should we want to change this, that is: not drop the connection in
4311 * receive_uuids() already, then we would need to add a branch here
4312 * that aborts the attach of "unsuitable uuids" on the peer in case
4313 * this node is currently Diskless Primary.
4314 */
4315
4316 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4317 get_ldev_if_state(device, D_NEGOTIATING)) {
4318 int cr; /* consider resync */
4319
4320 /* if we established a new connection */
4321 cr = (os.conn < C_CONNECTED);
4322 /* if we had an established connection
4323 * and one of the nodes newly attaches a disk */
4324 cr |= (os.conn == C_CONNECTED &&
4325 (peer_state.disk == D_NEGOTIATING ||
4326 os.disk == D_NEGOTIATING));
4327 /* if we have both been inconsistent, and the peer has been
4328 * forced to be UpToDate with --force */
4329 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4330 /* if we had been plain connected, and the admin requested to
4331 * start a sync by "invalidate" or "invalidate-remote" */
4332 cr |= (os.conn == C_CONNECTED &&
4333 (peer_state.conn >= C_STARTING_SYNC_S &&
4334 peer_state.conn <= C_WF_BITMAP_T));
4335
4336 if (cr)
4337 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4338
4339 put_ldev(device);
4340 if (ns.conn == C_MASK) {
4341 ns.conn = C_CONNECTED;
4342 if (device->state.disk == D_NEGOTIATING) {
4343 drbd_force_state(device, NS(disk, D_FAILED));
4344 } else if (peer_state.disk == D_NEGOTIATING) {
4345 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4346 peer_state.disk = D_DISKLESS;
4347 real_peer_disk = D_DISKLESS;
4348 } else {
4349 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4350 return -EIO;
4351 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4352 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4353 return -EIO;
4354 }
4355 }
4356 }
4357
4358 spin_lock_irq(&device->resource->req_lock);
4359 if (os.i != drbd_read_state(device).i)
4360 goto retry;
4361 clear_bit(CONSIDER_RESYNC, &device->flags);
4362 ns.peer = peer_state.role;
4363 ns.pdsk = real_peer_disk;
4364 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4365 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4366 ns.disk = device->new_state_tmp.disk;
4367 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4368 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4369 test_bit(NEW_CUR_UUID, &device->flags)) {
4370 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4371 for temporal network outages! */
4372 spin_unlock_irq(&device->resource->req_lock);
4373 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4374 tl_clear(peer_device->connection);
4375 drbd_uuid_new_current(device);
4376 clear_bit(NEW_CUR_UUID, &device->flags);
4377 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4378 return -EIO;
4379 }
4380 rv = _drbd_set_state(device, ns, cs_flags, NULL);
4381 ns = drbd_read_state(device);
4382 spin_unlock_irq(&device->resource->req_lock);
4383
4384 if (rv < SS_SUCCESS) {
4385 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4386 return -EIO;
4387 }
4388
4389 if (os.conn > C_WF_REPORT_PARAMS) {
4390 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4391 peer_state.disk != D_NEGOTIATING ) {
4392 /* we want resync, peer has not yet decided to sync... */
4393 /* Nowadays only used when forcing a node into primary role and
4394 setting its disk to UpToDate with that */
4395 drbd_send_uuids(peer_device);
4396 drbd_send_current_state(peer_device);
4397 }
4398 }
4399
4400 clear_bit(DISCARD_MY_DATA, &device->flags);
4401
4402 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4403
4404 return 0;
4405 }
4406
receive_sync_uuid(struct drbd_connection * connection,struct packet_info * pi)4407 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4408 {
4409 struct drbd_peer_device *peer_device;
4410 struct drbd_device *device;
4411 struct p_rs_uuid *p = pi->data;
4412
4413 peer_device = conn_peer_device(connection, pi->vnr);
4414 if (!peer_device)
4415 return -EIO;
4416 device = peer_device->device;
4417
4418 wait_event(device->misc_wait,
4419 device->state.conn == C_WF_SYNC_UUID ||
4420 device->state.conn == C_BEHIND ||
4421 device->state.conn < C_CONNECTED ||
4422 device->state.disk < D_NEGOTIATING);
4423
4424 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
4425
4426 /* Here the _drbd_uuid_ functions are right, current should
4427 _not_ be rotated into the history */
4428 if (get_ldev_if_state(device, D_NEGOTIATING)) {
4429 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4430 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4431
4432 drbd_print_uuids(device, "updated sync uuid");
4433 drbd_start_resync(device, C_SYNC_TARGET);
4434
4435 put_ldev(device);
4436 } else
4437 drbd_err(device, "Ignoring SyncUUID packet!\n");
4438
4439 return 0;
4440 }
4441
4442 /*
4443 * receive_bitmap_plain
4444 *
4445 * Return 0 when done, 1 when another iteration is needed, and a negative error
4446 * code upon failure.
4447 */
4448 static int
receive_bitmap_plain(struct drbd_peer_device * peer_device,unsigned int size,unsigned long * p,struct bm_xfer_ctx * c)4449 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4450 unsigned long *p, struct bm_xfer_ctx *c)
4451 {
4452 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4453 drbd_header_size(peer_device->connection);
4454 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4455 c->bm_words - c->word_offset);
4456 unsigned int want = num_words * sizeof(*p);
4457 int err;
4458
4459 if (want != size) {
4460 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4461 return -EIO;
4462 }
4463 if (want == 0)
4464 return 0;
4465 err = drbd_recv_all(peer_device->connection, p, want);
4466 if (err)
4467 return err;
4468
4469 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4470
4471 c->word_offset += num_words;
4472 c->bit_offset = c->word_offset * BITS_PER_LONG;
4473 if (c->bit_offset > c->bm_bits)
4474 c->bit_offset = c->bm_bits;
4475
4476 return 1;
4477 }
4478
dcbp_get_code(struct p_compressed_bm * p)4479 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4480 {
4481 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4482 }
4483
dcbp_get_start(struct p_compressed_bm * p)4484 static int dcbp_get_start(struct p_compressed_bm *p)
4485 {
4486 return (p->encoding & 0x80) != 0;
4487 }
4488
dcbp_get_pad_bits(struct p_compressed_bm * p)4489 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4490 {
4491 return (p->encoding >> 4) & 0x7;
4492 }
4493
4494 /*
4495 * recv_bm_rle_bits
4496 *
4497 * Return 0 when done, 1 when another iteration is needed, and a negative error
4498 * code upon failure.
4499 */
4500 static int
recv_bm_rle_bits(struct drbd_peer_device * peer_device,struct p_compressed_bm * p,struct bm_xfer_ctx * c,unsigned int len)4501 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4502 struct p_compressed_bm *p,
4503 struct bm_xfer_ctx *c,
4504 unsigned int len)
4505 {
4506 struct bitstream bs;
4507 u64 look_ahead;
4508 u64 rl;
4509 u64 tmp;
4510 unsigned long s = c->bit_offset;
4511 unsigned long e;
4512 int toggle = dcbp_get_start(p);
4513 int have;
4514 int bits;
4515
4516 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4517
4518 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4519 if (bits < 0)
4520 return -EIO;
4521
4522 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4523 bits = vli_decode_bits(&rl, look_ahead);
4524 if (bits <= 0)
4525 return -EIO;
4526
4527 if (toggle) {
4528 e = s + rl -1;
4529 if (e >= c->bm_bits) {
4530 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4531 return -EIO;
4532 }
4533 _drbd_bm_set_bits(peer_device->device, s, e);
4534 }
4535
4536 if (have < bits) {
4537 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4538 have, bits, look_ahead,
4539 (unsigned int)(bs.cur.b - p->code),
4540 (unsigned int)bs.buf_len);
4541 return -EIO;
4542 }
4543 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4544 if (likely(bits < 64))
4545 look_ahead >>= bits;
4546 else
4547 look_ahead = 0;
4548 have -= bits;
4549
4550 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4551 if (bits < 0)
4552 return -EIO;
4553 look_ahead |= tmp << have;
4554 have += bits;
4555 }
4556
4557 c->bit_offset = s;
4558 bm_xfer_ctx_bit_to_word_offset(c);
4559
4560 return (s != c->bm_bits);
4561 }
4562
4563 /*
4564 * decode_bitmap_c
4565 *
4566 * Return 0 when done, 1 when another iteration is needed, and a negative error
4567 * code upon failure.
4568 */
4569 static int
decode_bitmap_c(struct drbd_peer_device * peer_device,struct p_compressed_bm * p,struct bm_xfer_ctx * c,unsigned int len)4570 decode_bitmap_c(struct drbd_peer_device *peer_device,
4571 struct p_compressed_bm *p,
4572 struct bm_xfer_ctx *c,
4573 unsigned int len)
4574 {
4575 if (dcbp_get_code(p) == RLE_VLI_Bits)
4576 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4577
4578 /* other variants had been implemented for evaluation,
4579 * but have been dropped as this one turned out to be "best"
4580 * during all our tests. */
4581
4582 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4583 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4584 return -EIO;
4585 }
4586
INFO_bm_xfer_stats(struct drbd_peer_device * peer_device,const char * direction,struct bm_xfer_ctx * c)4587 void INFO_bm_xfer_stats(struct drbd_peer_device *peer_device,
4588 const char *direction, struct bm_xfer_ctx *c)
4589 {
4590 /* what would it take to transfer it "plaintext" */
4591 unsigned int header_size = drbd_header_size(peer_device->connection);
4592 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4593 unsigned int plain =
4594 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4595 c->bm_words * sizeof(unsigned long);
4596 unsigned int total = c->bytes[0] + c->bytes[1];
4597 unsigned int r;
4598
4599 /* total can not be zero. but just in case: */
4600 if (total == 0)
4601 return;
4602
4603 /* don't report if not compressed */
4604 if (total >= plain)
4605 return;
4606
4607 /* total < plain. check for overflow, still */
4608 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4609 : (1000 * total / plain);
4610
4611 if (r > 1000)
4612 r = 1000;
4613
4614 r = 1000 - r;
4615 drbd_info(peer_device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4616 "total %u; compression: %u.%u%%\n",
4617 direction,
4618 c->bytes[1], c->packets[1],
4619 c->bytes[0], c->packets[0],
4620 total, r/10, r % 10);
4621 }
4622
4623 /* Since we are processing the bitfield from lower addresses to higher,
4624 it does not matter if the process it in 32 bit chunks or 64 bit
4625 chunks as long as it is little endian. (Understand it as byte stream,
4626 beginning with the lowest byte...) If we would use big endian
4627 we would need to process it from the highest address to the lowest,
4628 in order to be agnostic to the 32 vs 64 bits issue.
4629
4630 returns 0 on failure, 1 if we successfully received it. */
receive_bitmap(struct drbd_connection * connection,struct packet_info * pi)4631 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4632 {
4633 struct drbd_peer_device *peer_device;
4634 struct drbd_device *device;
4635 struct bm_xfer_ctx c;
4636 int err;
4637
4638 peer_device = conn_peer_device(connection, pi->vnr);
4639 if (!peer_device)
4640 return -EIO;
4641 device = peer_device->device;
4642
4643 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4644 /* you are supposed to send additional out-of-sync information
4645 * if you actually set bits during this phase */
4646
4647 c = (struct bm_xfer_ctx) {
4648 .bm_bits = drbd_bm_bits(device),
4649 .bm_words = drbd_bm_words(device),
4650 };
4651
4652 for(;;) {
4653 if (pi->cmd == P_BITMAP)
4654 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4655 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4656 /* MAYBE: sanity check that we speak proto >= 90,
4657 * and the feature is enabled! */
4658 struct p_compressed_bm *p = pi->data;
4659
4660 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4661 drbd_err(device, "ReportCBitmap packet too large\n");
4662 err = -EIO;
4663 goto out;
4664 }
4665 if (pi->size <= sizeof(*p)) {
4666 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4667 err = -EIO;
4668 goto out;
4669 }
4670 err = drbd_recv_all(peer_device->connection, p, pi->size);
4671 if (err)
4672 goto out;
4673 err = decode_bitmap_c(peer_device, p, &c, pi->size);
4674 } else {
4675 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4676 err = -EIO;
4677 goto out;
4678 }
4679
4680 c.packets[pi->cmd == P_BITMAP]++;
4681 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4682
4683 if (err <= 0) {
4684 if (err < 0)
4685 goto out;
4686 break;
4687 }
4688 err = drbd_recv_header(peer_device->connection, pi);
4689 if (err)
4690 goto out;
4691 }
4692
4693 INFO_bm_xfer_stats(peer_device, "receive", &c);
4694
4695 if (device->state.conn == C_WF_BITMAP_T) {
4696 enum drbd_state_rv rv;
4697
4698 err = drbd_send_bitmap(device, peer_device);
4699 if (err)
4700 goto out;
4701 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4702 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4703 D_ASSERT(device, rv == SS_SUCCESS);
4704 } else if (device->state.conn != C_WF_BITMAP_S) {
4705 /* admin may have requested C_DISCONNECTING,
4706 * other threads may have noticed network errors */
4707 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4708 drbd_conn_str(device->state.conn));
4709 }
4710 err = 0;
4711
4712 out:
4713 drbd_bm_unlock(device);
4714 if (!err && device->state.conn == C_WF_BITMAP_S)
4715 drbd_start_resync(device, C_SYNC_SOURCE);
4716 return err;
4717 }
4718
receive_skip(struct drbd_connection * connection,struct packet_info * pi)4719 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4720 {
4721 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4722 pi->cmd, pi->size);
4723
4724 return ignore_remaining_packet(connection, pi);
4725 }
4726
receive_UnplugRemote(struct drbd_connection * connection,struct packet_info * pi)4727 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4728 {
4729 /* Make sure we've acked all the TCP data associated
4730 * with the data requests being unplugged */
4731 tcp_sock_set_quickack(connection->data.socket->sk, 2);
4732 return 0;
4733 }
4734
receive_out_of_sync(struct drbd_connection * connection,struct packet_info * pi)4735 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4736 {
4737 struct drbd_peer_device *peer_device;
4738 struct drbd_device *device;
4739 struct p_block_desc *p = pi->data;
4740
4741 peer_device = conn_peer_device(connection, pi->vnr);
4742 if (!peer_device)
4743 return -EIO;
4744 device = peer_device->device;
4745
4746 switch (device->state.conn) {
4747 case C_WF_SYNC_UUID:
4748 case C_WF_BITMAP_T:
4749 case C_BEHIND:
4750 break;
4751 default:
4752 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4753 drbd_conn_str(device->state.conn));
4754 }
4755
4756 drbd_set_out_of_sync(peer_device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4757
4758 return 0;
4759 }
4760
receive_rs_deallocated(struct drbd_connection * connection,struct packet_info * pi)4761 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4762 {
4763 struct drbd_peer_device *peer_device;
4764 struct p_block_desc *p = pi->data;
4765 struct drbd_device *device;
4766 sector_t sector;
4767 int size, err = 0;
4768
4769 peer_device = conn_peer_device(connection, pi->vnr);
4770 if (!peer_device)
4771 return -EIO;
4772 device = peer_device->device;
4773
4774 sector = be64_to_cpu(p->sector);
4775 size = be32_to_cpu(p->blksize);
4776
4777 dec_rs_pending(peer_device);
4778
4779 if (get_ldev(device)) {
4780 struct drbd_peer_request *peer_req;
4781
4782 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4783 size, 0, GFP_NOIO);
4784 if (!peer_req) {
4785 put_ldev(device);
4786 return -ENOMEM;
4787 }
4788
4789 peer_req->w.cb = e_end_resync_block;
4790 peer_req->opf = REQ_OP_DISCARD;
4791 peer_req->submit_jif = jiffies;
4792 peer_req->flags |= EE_TRIM;
4793
4794 spin_lock_irq(&device->resource->req_lock);
4795 list_add_tail(&peer_req->w.list, &device->sync_ee);
4796 spin_unlock_irq(&device->resource->req_lock);
4797
4798 atomic_add(pi->size >> 9, &device->rs_sect_ev);
4799 err = drbd_submit_peer_request(peer_req);
4800
4801 if (err) {
4802 spin_lock_irq(&device->resource->req_lock);
4803 list_del(&peer_req->w.list);
4804 spin_unlock_irq(&device->resource->req_lock);
4805
4806 drbd_free_peer_req(device, peer_req);
4807 put_ldev(device);
4808 err = 0;
4809 goto fail;
4810 }
4811
4812 inc_unacked(device);
4813
4814 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4815 as well as drbd_rs_complete_io() */
4816 } else {
4817 fail:
4818 drbd_rs_complete_io(device, sector);
4819 drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4820 }
4821
4822 atomic_add(size >> 9, &device->rs_sect_in);
4823
4824 return err;
4825 }
4826
4827 struct data_cmd {
4828 int expect_payload;
4829 unsigned int pkt_size;
4830 int (*fn)(struct drbd_connection *, struct packet_info *);
4831 };
4832
4833 static struct data_cmd drbd_cmd_handler[] = {
4834 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4835 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4836 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4837 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4838 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4839 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4840 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4841 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4842 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4843 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4844 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4845 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4846 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4847 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4848 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4849 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4850 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4851 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4852 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4853 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4854 [P_RS_THIN_REQ] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4855 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4856 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4857 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4858 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4859 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data },
4860 [P_ZEROES] = { 0, sizeof(struct p_trim), receive_Data },
4861 [P_RS_DEALLOCATED] = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4862 };
4863
drbdd(struct drbd_connection * connection)4864 static void drbdd(struct drbd_connection *connection)
4865 {
4866 struct packet_info pi;
4867 size_t shs; /* sub header size */
4868 int err;
4869
4870 while (get_t_state(&connection->receiver) == RUNNING) {
4871 struct data_cmd const *cmd;
4872
4873 drbd_thread_current_set_cpu(&connection->receiver);
4874 update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
4875 if (drbd_recv_header_maybe_unplug(connection, &pi))
4876 goto err_out;
4877
4878 cmd = &drbd_cmd_handler[pi.cmd];
4879 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4880 drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4881 cmdname(pi.cmd), pi.cmd);
4882 goto err_out;
4883 }
4884
4885 shs = cmd->pkt_size;
4886 if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4887 shs += sizeof(struct o_qlim);
4888 if (pi.size > shs && !cmd->expect_payload) {
4889 drbd_err(connection, "No payload expected %s l:%d\n",
4890 cmdname(pi.cmd), pi.size);
4891 goto err_out;
4892 }
4893 if (pi.size < shs) {
4894 drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4895 cmdname(pi.cmd), (int)shs, pi.size);
4896 goto err_out;
4897 }
4898
4899 if (shs) {
4900 update_receiver_timing_details(connection, drbd_recv_all_warn);
4901 err = drbd_recv_all_warn(connection, pi.data, shs);
4902 if (err)
4903 goto err_out;
4904 pi.size -= shs;
4905 }
4906
4907 update_receiver_timing_details(connection, cmd->fn);
4908 err = cmd->fn(connection, &pi);
4909 if (err) {
4910 drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4911 cmdname(pi.cmd), err, pi.size);
4912 goto err_out;
4913 }
4914 }
4915 return;
4916
4917 err_out:
4918 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4919 }
4920
conn_disconnect(struct drbd_connection * connection)4921 static void conn_disconnect(struct drbd_connection *connection)
4922 {
4923 struct drbd_peer_device *peer_device;
4924 enum drbd_conns oc;
4925 int vnr;
4926
4927 if (connection->cstate == C_STANDALONE)
4928 return;
4929
4930 /* We are about to start the cleanup after connection loss.
4931 * Make sure drbd_make_request knows about that.
4932 * Usually we should be in some network failure state already,
4933 * but just in case we are not, we fix it up here.
4934 */
4935 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4936
4937 /* ack_receiver does not clean up anything. it must not interfere, either */
4938 drbd_thread_stop(&connection->ack_receiver);
4939 if (connection->ack_sender) {
4940 destroy_workqueue(connection->ack_sender);
4941 connection->ack_sender = NULL;
4942 }
4943 drbd_free_sock(connection);
4944
4945 rcu_read_lock();
4946 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4947 struct drbd_device *device = peer_device->device;
4948 kref_get(&device->kref);
4949 rcu_read_unlock();
4950 drbd_disconnected(peer_device);
4951 kref_put(&device->kref, drbd_destroy_device);
4952 rcu_read_lock();
4953 }
4954 rcu_read_unlock();
4955
4956 if (!list_empty(&connection->current_epoch->list))
4957 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4958 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4959 atomic_set(&connection->current_epoch->epoch_size, 0);
4960 connection->send.seen_any_write_yet = false;
4961
4962 drbd_info(connection, "Connection closed\n");
4963
4964 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4965 conn_try_outdate_peer_async(connection);
4966
4967 spin_lock_irq(&connection->resource->req_lock);
4968 oc = connection->cstate;
4969 if (oc >= C_UNCONNECTED)
4970 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4971
4972 spin_unlock_irq(&connection->resource->req_lock);
4973
4974 if (oc == C_DISCONNECTING)
4975 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4976 }
4977
drbd_disconnected(struct drbd_peer_device * peer_device)4978 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4979 {
4980 struct drbd_device *device = peer_device->device;
4981 unsigned int i;
4982
4983 /* wait for current activity to cease. */
4984 spin_lock_irq(&device->resource->req_lock);
4985 _drbd_wait_ee_list_empty(device, &device->active_ee);
4986 _drbd_wait_ee_list_empty(device, &device->sync_ee);
4987 _drbd_wait_ee_list_empty(device, &device->read_ee);
4988 spin_unlock_irq(&device->resource->req_lock);
4989
4990 /* We do not have data structures that would allow us to
4991 * get the rs_pending_cnt down to 0 again.
4992 * * On C_SYNC_TARGET we do not have any data structures describing
4993 * the pending RSDataRequest's we have sent.
4994 * * On C_SYNC_SOURCE there is no data structure that tracks
4995 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4996 * And no, it is not the sum of the reference counts in the
4997 * resync_LRU. The resync_LRU tracks the whole operation including
4998 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4999 * on the fly. */
5000 drbd_rs_cancel_all(device);
5001 device->rs_total = 0;
5002 device->rs_failed = 0;
5003 atomic_set(&device->rs_pending_cnt, 0);
5004 wake_up(&device->misc_wait);
5005
5006 timer_delete_sync(&device->resync_timer);
5007 resync_timer_fn(&device->resync_timer);
5008
5009 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5010 * w_make_resync_request etc. which may still be on the worker queue
5011 * to be "canceled" */
5012 drbd_flush_workqueue(&peer_device->connection->sender_work);
5013
5014 drbd_finish_peer_reqs(device);
5015
5016 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5017 might have issued a work again. The one before drbd_finish_peer_reqs() is
5018 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5019 drbd_flush_workqueue(&peer_device->connection->sender_work);
5020
5021 /* need to do it again, drbd_finish_peer_reqs() may have populated it
5022 * again via drbd_try_clear_on_disk_bm(). */
5023 drbd_rs_cancel_all(device);
5024
5025 kfree(device->p_uuid);
5026 device->p_uuid = NULL;
5027
5028 if (!drbd_suspended(device))
5029 tl_clear(peer_device->connection);
5030
5031 drbd_md_sync(device);
5032
5033 if (get_ldev(device)) {
5034 drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5035 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED, NULL);
5036 put_ldev(device);
5037 }
5038
5039 i = atomic_read(&device->pp_in_use_by_net);
5040 if (i)
5041 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5042 i = atomic_read(&device->pp_in_use);
5043 if (i)
5044 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5045
5046 D_ASSERT(device, list_empty(&device->read_ee));
5047 D_ASSERT(device, list_empty(&device->active_ee));
5048 D_ASSERT(device, list_empty(&device->sync_ee));
5049 D_ASSERT(device, list_empty(&device->done_ee));
5050
5051 return 0;
5052 }
5053
5054 /*
5055 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5056 * we can agree on is stored in agreed_pro_version.
5057 *
5058 * feature flags and the reserved array should be enough room for future
5059 * enhancements of the handshake protocol, and possible plugins...
5060 *
5061 * for now, they are expected to be zero, but ignored.
5062 */
drbd_send_features(struct drbd_connection * connection)5063 static int drbd_send_features(struct drbd_connection *connection)
5064 {
5065 struct drbd_socket *sock;
5066 struct p_connection_features *p;
5067
5068 sock = &connection->data;
5069 p = conn_prepare_command(connection, sock);
5070 if (!p)
5071 return -EIO;
5072 memset(p, 0, sizeof(*p));
5073 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5074 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5075 p->feature_flags = cpu_to_be32(PRO_FEATURES);
5076 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5077 }
5078
5079 /*
5080 * return values:
5081 * 1 yes, we have a valid connection
5082 * 0 oops, did not work out, please try again
5083 * -1 peer talks different language,
5084 * no point in trying again, please go standalone.
5085 */
drbd_do_features(struct drbd_connection * connection)5086 static int drbd_do_features(struct drbd_connection *connection)
5087 {
5088 /* ASSERT current == connection->receiver ... */
5089 struct p_connection_features *p;
5090 const int expect = sizeof(struct p_connection_features);
5091 struct packet_info pi;
5092 int err;
5093
5094 err = drbd_send_features(connection);
5095 if (err)
5096 return 0;
5097
5098 err = drbd_recv_header(connection, &pi);
5099 if (err)
5100 return 0;
5101
5102 if (pi.cmd != P_CONNECTION_FEATURES) {
5103 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5104 cmdname(pi.cmd), pi.cmd);
5105 return -1;
5106 }
5107
5108 if (pi.size != expect) {
5109 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5110 expect, pi.size);
5111 return -1;
5112 }
5113
5114 p = pi.data;
5115 err = drbd_recv_all_warn(connection, p, expect);
5116 if (err)
5117 return 0;
5118
5119 p->protocol_min = be32_to_cpu(p->protocol_min);
5120 p->protocol_max = be32_to_cpu(p->protocol_max);
5121 if (p->protocol_max == 0)
5122 p->protocol_max = p->protocol_min;
5123
5124 if (PRO_VERSION_MAX < p->protocol_min ||
5125 PRO_VERSION_MIN > p->protocol_max)
5126 goto incompat;
5127
5128 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5129 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5130
5131 drbd_info(connection, "Handshake successful: "
5132 "Agreed network protocol version %d\n", connection->agreed_pro_version);
5133
5134 drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
5135 connection->agreed_features,
5136 connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5137 connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5138 connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "",
5139 connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" :
5140 connection->agreed_features ? "" : " none");
5141
5142 return 1;
5143
5144 incompat:
5145 drbd_err(connection, "incompatible DRBD dialects: "
5146 "I support %d-%d, peer supports %d-%d\n",
5147 PRO_VERSION_MIN, PRO_VERSION_MAX,
5148 p->protocol_min, p->protocol_max);
5149 return -1;
5150 }
5151
5152 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
drbd_do_auth(struct drbd_connection * connection)5153 static int drbd_do_auth(struct drbd_connection *connection)
5154 {
5155 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5156 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5157 return -1;
5158 }
5159 #else
5160 #define CHALLENGE_LEN 64
5161
5162 /* Return value:
5163 1 - auth succeeded,
5164 0 - failed, try again (network error),
5165 -1 - auth failed, don't try again.
5166 */
5167
drbd_do_auth(struct drbd_connection * connection)5168 static int drbd_do_auth(struct drbd_connection *connection)
5169 {
5170 struct drbd_socket *sock;
5171 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
5172 char *response = NULL;
5173 char *right_response = NULL;
5174 char *peers_ch = NULL;
5175 unsigned int key_len;
5176 char secret[SHARED_SECRET_MAX]; /* 64 byte */
5177 unsigned int resp_size;
5178 struct shash_desc *desc;
5179 struct packet_info pi;
5180 struct net_conf *nc;
5181 int err, rv;
5182
5183 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
5184
5185 rcu_read_lock();
5186 nc = rcu_dereference(connection->net_conf);
5187 key_len = strlen(nc->shared_secret);
5188 memcpy(secret, nc->shared_secret, key_len);
5189 rcu_read_unlock();
5190
5191 desc = kmalloc(sizeof(struct shash_desc) +
5192 crypto_shash_descsize(connection->cram_hmac_tfm),
5193 GFP_KERNEL);
5194 if (!desc) {
5195 rv = -1;
5196 goto fail;
5197 }
5198 desc->tfm = connection->cram_hmac_tfm;
5199
5200 rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5201 if (rv) {
5202 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5203 rv = -1;
5204 goto fail;
5205 }
5206
5207 get_random_bytes(my_challenge, CHALLENGE_LEN);
5208
5209 sock = &connection->data;
5210 if (!conn_prepare_command(connection, sock)) {
5211 rv = 0;
5212 goto fail;
5213 }
5214 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5215 my_challenge, CHALLENGE_LEN);
5216 if (!rv)
5217 goto fail;
5218
5219 err = drbd_recv_header(connection, &pi);
5220 if (err) {
5221 rv = 0;
5222 goto fail;
5223 }
5224
5225 if (pi.cmd != P_AUTH_CHALLENGE) {
5226 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5227 cmdname(pi.cmd), pi.cmd);
5228 rv = -1;
5229 goto fail;
5230 }
5231
5232 if (pi.size > CHALLENGE_LEN * 2) {
5233 drbd_err(connection, "expected AuthChallenge payload too big.\n");
5234 rv = -1;
5235 goto fail;
5236 }
5237
5238 if (pi.size < CHALLENGE_LEN) {
5239 drbd_err(connection, "AuthChallenge payload too small.\n");
5240 rv = -1;
5241 goto fail;
5242 }
5243
5244 peers_ch = kmalloc(pi.size, GFP_NOIO);
5245 if (!peers_ch) {
5246 rv = -1;
5247 goto fail;
5248 }
5249
5250 err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5251 if (err) {
5252 rv = 0;
5253 goto fail;
5254 }
5255
5256 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5257 drbd_err(connection, "Peer presented the same challenge!\n");
5258 rv = -1;
5259 goto fail;
5260 }
5261
5262 resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5263 response = kmalloc(resp_size, GFP_NOIO);
5264 if (!response) {
5265 rv = -1;
5266 goto fail;
5267 }
5268
5269 rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5270 if (rv) {
5271 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5272 rv = -1;
5273 goto fail;
5274 }
5275
5276 if (!conn_prepare_command(connection, sock)) {
5277 rv = 0;
5278 goto fail;
5279 }
5280 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5281 response, resp_size);
5282 if (!rv)
5283 goto fail;
5284
5285 err = drbd_recv_header(connection, &pi);
5286 if (err) {
5287 rv = 0;
5288 goto fail;
5289 }
5290
5291 if (pi.cmd != P_AUTH_RESPONSE) {
5292 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5293 cmdname(pi.cmd), pi.cmd);
5294 rv = 0;
5295 goto fail;
5296 }
5297
5298 if (pi.size != resp_size) {
5299 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5300 rv = 0;
5301 goto fail;
5302 }
5303
5304 err = drbd_recv_all_warn(connection, response , resp_size);
5305 if (err) {
5306 rv = 0;
5307 goto fail;
5308 }
5309
5310 right_response = kmalloc(resp_size, GFP_NOIO);
5311 if (!right_response) {
5312 rv = -1;
5313 goto fail;
5314 }
5315
5316 rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5317 right_response);
5318 if (rv) {
5319 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5320 rv = -1;
5321 goto fail;
5322 }
5323
5324 rv = !memcmp(response, right_response, resp_size);
5325
5326 if (rv)
5327 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5328 resp_size);
5329 else
5330 rv = -1;
5331
5332 fail:
5333 kfree(peers_ch);
5334 kfree(response);
5335 kfree(right_response);
5336 if (desc) {
5337 shash_desc_zero(desc);
5338 kfree(desc);
5339 }
5340
5341 return rv;
5342 }
5343 #endif
5344
drbd_receiver(struct drbd_thread * thi)5345 int drbd_receiver(struct drbd_thread *thi)
5346 {
5347 struct drbd_connection *connection = thi->connection;
5348 int h;
5349
5350 drbd_info(connection, "receiver (re)started\n");
5351
5352 do {
5353 h = conn_connect(connection);
5354 if (h == 0) {
5355 conn_disconnect(connection);
5356 schedule_timeout_interruptible(HZ);
5357 }
5358 if (h == -1) {
5359 drbd_warn(connection, "Discarding network configuration.\n");
5360 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5361 }
5362 } while (h == 0);
5363
5364 if (h > 0) {
5365 blk_start_plug(&connection->receiver_plug);
5366 drbdd(connection);
5367 blk_finish_plug(&connection->receiver_plug);
5368 }
5369
5370 conn_disconnect(connection);
5371
5372 drbd_info(connection, "receiver terminated\n");
5373 return 0;
5374 }
5375
5376 /* ********* acknowledge sender ******** */
5377
got_conn_RqSReply(struct drbd_connection * connection,struct packet_info * pi)5378 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5379 {
5380 struct p_req_state_reply *p = pi->data;
5381 int retcode = be32_to_cpu(p->retcode);
5382
5383 if (retcode >= SS_SUCCESS) {
5384 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5385 } else {
5386 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5387 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5388 drbd_set_st_err_str(retcode), retcode);
5389 }
5390 wake_up(&connection->ping_wait);
5391
5392 return 0;
5393 }
5394
got_RqSReply(struct drbd_connection * connection,struct packet_info * pi)5395 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5396 {
5397 struct drbd_peer_device *peer_device;
5398 struct drbd_device *device;
5399 struct p_req_state_reply *p = pi->data;
5400 int retcode = be32_to_cpu(p->retcode);
5401
5402 peer_device = conn_peer_device(connection, pi->vnr);
5403 if (!peer_device)
5404 return -EIO;
5405 device = peer_device->device;
5406
5407 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5408 D_ASSERT(device, connection->agreed_pro_version < 100);
5409 return got_conn_RqSReply(connection, pi);
5410 }
5411
5412 if (retcode >= SS_SUCCESS) {
5413 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5414 } else {
5415 set_bit(CL_ST_CHG_FAIL, &device->flags);
5416 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5417 drbd_set_st_err_str(retcode), retcode);
5418 }
5419 wake_up(&device->state_wait);
5420
5421 return 0;
5422 }
5423
got_Ping(struct drbd_connection * connection,struct packet_info * pi)5424 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5425 {
5426 return drbd_send_ping_ack(connection);
5427
5428 }
5429
got_PingAck(struct drbd_connection * connection,struct packet_info * pi)5430 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5431 {
5432 /* restore idle timeout */
5433 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5434 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5435 wake_up(&connection->ping_wait);
5436
5437 return 0;
5438 }
5439
got_IsInSync(struct drbd_connection * connection,struct packet_info * pi)5440 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5441 {
5442 struct drbd_peer_device *peer_device;
5443 struct drbd_device *device;
5444 struct p_block_ack *p = pi->data;
5445 sector_t sector = be64_to_cpu(p->sector);
5446 int blksize = be32_to_cpu(p->blksize);
5447
5448 peer_device = conn_peer_device(connection, pi->vnr);
5449 if (!peer_device)
5450 return -EIO;
5451 device = peer_device->device;
5452
5453 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5454
5455 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5456
5457 if (get_ldev(device)) {
5458 drbd_rs_complete_io(device, sector);
5459 drbd_set_in_sync(peer_device, sector, blksize);
5460 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5461 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5462 put_ldev(device);
5463 }
5464 dec_rs_pending(peer_device);
5465 atomic_add(blksize >> 9, &device->rs_sect_in);
5466
5467 return 0;
5468 }
5469
5470 static int
validate_req_change_req_state(struct drbd_peer_device * peer_device,u64 id,sector_t sector,struct rb_root * root,const char * func,enum drbd_req_event what,bool missing_ok)5471 validate_req_change_req_state(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
5472 struct rb_root *root, const char *func,
5473 enum drbd_req_event what, bool missing_ok)
5474 {
5475 struct drbd_device *device = peer_device->device;
5476 struct drbd_request *req;
5477 struct bio_and_error m;
5478
5479 spin_lock_irq(&device->resource->req_lock);
5480 req = find_request(device, root, id, sector, missing_ok, func);
5481 if (unlikely(!req)) {
5482 spin_unlock_irq(&device->resource->req_lock);
5483 return -EIO;
5484 }
5485 __req_mod(req, what, peer_device, &m);
5486 spin_unlock_irq(&device->resource->req_lock);
5487
5488 if (m.bio)
5489 complete_master_bio(device, &m);
5490 return 0;
5491 }
5492
got_BlockAck(struct drbd_connection * connection,struct packet_info * pi)5493 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5494 {
5495 struct drbd_peer_device *peer_device;
5496 struct drbd_device *device;
5497 struct p_block_ack *p = pi->data;
5498 sector_t sector = be64_to_cpu(p->sector);
5499 int blksize = be32_to_cpu(p->blksize);
5500 enum drbd_req_event what;
5501
5502 peer_device = conn_peer_device(connection, pi->vnr);
5503 if (!peer_device)
5504 return -EIO;
5505 device = peer_device->device;
5506
5507 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5508
5509 if (p->block_id == ID_SYNCER) {
5510 drbd_set_in_sync(peer_device, sector, blksize);
5511 dec_rs_pending(peer_device);
5512 return 0;
5513 }
5514 switch (pi->cmd) {
5515 case P_RS_WRITE_ACK:
5516 what = WRITE_ACKED_BY_PEER_AND_SIS;
5517 break;
5518 case P_WRITE_ACK:
5519 what = WRITE_ACKED_BY_PEER;
5520 break;
5521 case P_RECV_ACK:
5522 what = RECV_ACKED_BY_PEER;
5523 break;
5524 case P_SUPERSEDED:
5525 what = CONFLICT_RESOLVED;
5526 break;
5527 case P_RETRY_WRITE:
5528 what = POSTPONE_WRITE;
5529 break;
5530 default:
5531 BUG();
5532 }
5533
5534 return validate_req_change_req_state(peer_device, p->block_id, sector,
5535 &device->write_requests, __func__,
5536 what, false);
5537 }
5538
got_NegAck(struct drbd_connection * connection,struct packet_info * pi)5539 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5540 {
5541 struct drbd_peer_device *peer_device;
5542 struct drbd_device *device;
5543 struct p_block_ack *p = pi->data;
5544 sector_t sector = be64_to_cpu(p->sector);
5545 int size = be32_to_cpu(p->blksize);
5546 int err;
5547
5548 peer_device = conn_peer_device(connection, pi->vnr);
5549 if (!peer_device)
5550 return -EIO;
5551 device = peer_device->device;
5552
5553 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5554
5555 if (p->block_id == ID_SYNCER) {
5556 dec_rs_pending(peer_device);
5557 drbd_rs_failed_io(peer_device, sector, size);
5558 return 0;
5559 }
5560
5561 err = validate_req_change_req_state(peer_device, p->block_id, sector,
5562 &device->write_requests, __func__,
5563 NEG_ACKED, true);
5564 if (err) {
5565 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5566 The master bio might already be completed, therefore the
5567 request is no longer in the collision hash. */
5568 /* In Protocol B we might already have got a P_RECV_ACK
5569 but then get a P_NEG_ACK afterwards. */
5570 drbd_set_out_of_sync(peer_device, sector, size);
5571 }
5572 return 0;
5573 }
5574
got_NegDReply(struct drbd_connection * connection,struct packet_info * pi)5575 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5576 {
5577 struct drbd_peer_device *peer_device;
5578 struct drbd_device *device;
5579 struct p_block_ack *p = pi->data;
5580 sector_t sector = be64_to_cpu(p->sector);
5581
5582 peer_device = conn_peer_device(connection, pi->vnr);
5583 if (!peer_device)
5584 return -EIO;
5585 device = peer_device->device;
5586
5587 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5588
5589 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5590 (unsigned long long)sector, be32_to_cpu(p->blksize));
5591
5592 return validate_req_change_req_state(peer_device, p->block_id, sector,
5593 &device->read_requests, __func__,
5594 NEG_ACKED, false);
5595 }
5596
got_NegRSDReply(struct drbd_connection * connection,struct packet_info * pi)5597 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5598 {
5599 struct drbd_peer_device *peer_device;
5600 struct drbd_device *device;
5601 sector_t sector;
5602 int size;
5603 struct p_block_ack *p = pi->data;
5604
5605 peer_device = conn_peer_device(connection, pi->vnr);
5606 if (!peer_device)
5607 return -EIO;
5608 device = peer_device->device;
5609
5610 sector = be64_to_cpu(p->sector);
5611 size = be32_to_cpu(p->blksize);
5612
5613 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5614
5615 dec_rs_pending(peer_device);
5616
5617 if (get_ldev_if_state(device, D_FAILED)) {
5618 drbd_rs_complete_io(device, sector);
5619 switch (pi->cmd) {
5620 case P_NEG_RS_DREPLY:
5621 drbd_rs_failed_io(peer_device, sector, size);
5622 break;
5623 case P_RS_CANCEL:
5624 break;
5625 default:
5626 BUG();
5627 }
5628 put_ldev(device);
5629 }
5630
5631 return 0;
5632 }
5633
got_BarrierAck(struct drbd_connection * connection,struct packet_info * pi)5634 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5635 {
5636 struct p_barrier_ack *p = pi->data;
5637 struct drbd_peer_device *peer_device;
5638 int vnr;
5639
5640 tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5641
5642 rcu_read_lock();
5643 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5644 struct drbd_device *device = peer_device->device;
5645
5646 if (device->state.conn == C_AHEAD &&
5647 atomic_read(&device->ap_in_flight) == 0 &&
5648 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5649 device->start_resync_timer.expires = jiffies + HZ;
5650 add_timer(&device->start_resync_timer);
5651 }
5652 }
5653 rcu_read_unlock();
5654
5655 return 0;
5656 }
5657
got_OVResult(struct drbd_connection * connection,struct packet_info * pi)5658 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5659 {
5660 struct drbd_peer_device *peer_device;
5661 struct drbd_device *device;
5662 struct p_block_ack *p = pi->data;
5663 struct drbd_device_work *dw;
5664 sector_t sector;
5665 int size;
5666
5667 peer_device = conn_peer_device(connection, pi->vnr);
5668 if (!peer_device)
5669 return -EIO;
5670 device = peer_device->device;
5671
5672 sector = be64_to_cpu(p->sector);
5673 size = be32_to_cpu(p->blksize);
5674
5675 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5676
5677 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5678 drbd_ov_out_of_sync_found(peer_device, sector, size);
5679 else
5680 ov_out_of_sync_print(peer_device);
5681
5682 if (!get_ldev(device))
5683 return 0;
5684
5685 drbd_rs_complete_io(device, sector);
5686 dec_rs_pending(peer_device);
5687
5688 --device->ov_left;
5689
5690 /* let's advance progress step marks only for every other megabyte */
5691 if ((device->ov_left & 0x200) == 0x200)
5692 drbd_advance_rs_marks(peer_device, device->ov_left);
5693
5694 if (device->ov_left == 0) {
5695 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5696 if (dw) {
5697 dw->w.cb = w_ov_finished;
5698 dw->device = device;
5699 drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5700 } else {
5701 drbd_err(device, "kmalloc(dw) failed.");
5702 ov_out_of_sync_print(peer_device);
5703 drbd_resync_finished(peer_device);
5704 }
5705 }
5706 put_ldev(device);
5707 return 0;
5708 }
5709
got_skip(struct drbd_connection * connection,struct packet_info * pi)5710 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5711 {
5712 return 0;
5713 }
5714
5715 struct meta_sock_cmd {
5716 size_t pkt_size;
5717 int (*fn)(struct drbd_connection *connection, struct packet_info *);
5718 };
5719
set_rcvtimeo(struct drbd_connection * connection,bool ping_timeout)5720 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5721 {
5722 long t;
5723 struct net_conf *nc;
5724
5725 rcu_read_lock();
5726 nc = rcu_dereference(connection->net_conf);
5727 t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5728 rcu_read_unlock();
5729
5730 t *= HZ;
5731 if (ping_timeout)
5732 t /= 10;
5733
5734 connection->meta.socket->sk->sk_rcvtimeo = t;
5735 }
5736
set_ping_timeout(struct drbd_connection * connection)5737 static void set_ping_timeout(struct drbd_connection *connection)
5738 {
5739 set_rcvtimeo(connection, 1);
5740 }
5741
set_idle_timeout(struct drbd_connection * connection)5742 static void set_idle_timeout(struct drbd_connection *connection)
5743 {
5744 set_rcvtimeo(connection, 0);
5745 }
5746
5747 static struct meta_sock_cmd ack_receiver_tbl[] = {
5748 [P_PING] = { 0, got_Ping },
5749 [P_PING_ACK] = { 0, got_PingAck },
5750 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5751 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5752 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5753 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck },
5754 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5755 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
5756 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
5757 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5758 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5759 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5760 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
5761 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
5762 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5763 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5764 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5765 };
5766
drbd_ack_receiver(struct drbd_thread * thi)5767 int drbd_ack_receiver(struct drbd_thread *thi)
5768 {
5769 struct drbd_connection *connection = thi->connection;
5770 struct meta_sock_cmd *cmd = NULL;
5771 struct packet_info pi;
5772 unsigned long pre_recv_jif;
5773 int rv;
5774 void *buf = connection->meta.rbuf;
5775 int received = 0;
5776 unsigned int header_size = drbd_header_size(connection);
5777 int expect = header_size;
5778 bool ping_timeout_active = false;
5779
5780 sched_set_fifo_low(current);
5781
5782 while (get_t_state(thi) == RUNNING) {
5783 drbd_thread_current_set_cpu(thi);
5784
5785 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5786 if (drbd_send_ping(connection)) {
5787 drbd_err(connection, "drbd_send_ping has failed\n");
5788 goto reconnect;
5789 }
5790 set_ping_timeout(connection);
5791 ping_timeout_active = true;
5792 }
5793
5794 pre_recv_jif = jiffies;
5795 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5796
5797 /* Note:
5798 * -EINTR (on meta) we got a signal
5799 * -EAGAIN (on meta) rcvtimeo expired
5800 * -ECONNRESET other side closed the connection
5801 * -ERESTARTSYS (on data) we got a signal
5802 * rv < 0 other than above: unexpected error!
5803 * rv == expected: full header or command
5804 * rv < expected: "woken" by signal during receive
5805 * rv == 0 : "connection shut down by peer"
5806 */
5807 if (likely(rv > 0)) {
5808 received += rv;
5809 buf += rv;
5810 } else if (rv == 0) {
5811 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5812 long t;
5813 rcu_read_lock();
5814 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5815 rcu_read_unlock();
5816
5817 t = wait_event_timeout(connection->ping_wait,
5818 connection->cstate < C_WF_REPORT_PARAMS,
5819 t);
5820 if (t)
5821 break;
5822 }
5823 drbd_err(connection, "meta connection shut down by peer.\n");
5824 goto reconnect;
5825 } else if (rv == -EAGAIN) {
5826 /* If the data socket received something meanwhile,
5827 * that is good enough: peer is still alive. */
5828 if (time_after(connection->last_received, pre_recv_jif))
5829 continue;
5830 if (ping_timeout_active) {
5831 drbd_err(connection, "PingAck did not arrive in time.\n");
5832 goto reconnect;
5833 }
5834 set_bit(SEND_PING, &connection->flags);
5835 continue;
5836 } else if (rv == -EINTR) {
5837 /* maybe drbd_thread_stop(): the while condition will notice.
5838 * maybe woken for send_ping: we'll send a ping above,
5839 * and change the rcvtimeo */
5840 flush_signals(current);
5841 continue;
5842 } else {
5843 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5844 goto reconnect;
5845 }
5846
5847 if (received == expect && cmd == NULL) {
5848 if (decode_header(connection, connection->meta.rbuf, &pi))
5849 goto reconnect;
5850 cmd = &ack_receiver_tbl[pi.cmd];
5851 if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5852 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5853 cmdname(pi.cmd), pi.cmd);
5854 goto disconnect;
5855 }
5856 expect = header_size + cmd->pkt_size;
5857 if (pi.size != expect - header_size) {
5858 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5859 pi.cmd, pi.size);
5860 goto reconnect;
5861 }
5862 }
5863 if (received == expect) {
5864 bool err;
5865
5866 err = cmd->fn(connection, &pi);
5867 if (err) {
5868 drbd_err(connection, "%ps failed\n", cmd->fn);
5869 goto reconnect;
5870 }
5871
5872 connection->last_received = jiffies;
5873
5874 if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5875 set_idle_timeout(connection);
5876 ping_timeout_active = false;
5877 }
5878
5879 buf = connection->meta.rbuf;
5880 received = 0;
5881 expect = header_size;
5882 cmd = NULL;
5883 }
5884 }
5885
5886 if (0) {
5887 reconnect:
5888 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5889 conn_md_sync(connection);
5890 }
5891 if (0) {
5892 disconnect:
5893 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5894 }
5895
5896 drbd_info(connection, "ack_receiver terminated\n");
5897
5898 return 0;
5899 }
5900
drbd_send_acks_wf(struct work_struct * ws)5901 void drbd_send_acks_wf(struct work_struct *ws)
5902 {
5903 struct drbd_peer_device *peer_device =
5904 container_of(ws, struct drbd_peer_device, send_acks_work);
5905 struct drbd_connection *connection = peer_device->connection;
5906 struct drbd_device *device = peer_device->device;
5907 struct net_conf *nc;
5908 int tcp_cork, err;
5909
5910 rcu_read_lock();
5911 nc = rcu_dereference(connection->net_conf);
5912 tcp_cork = nc->tcp_cork;
5913 rcu_read_unlock();
5914
5915 if (tcp_cork)
5916 tcp_sock_set_cork(connection->meta.socket->sk, true);
5917
5918 err = drbd_finish_peer_reqs(device);
5919 kref_put(&device->kref, drbd_destroy_device);
5920 /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5921 struct work_struct send_acks_work alive, which is in the peer_device object */
5922
5923 if (err) {
5924 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5925 return;
5926 }
5927
5928 if (tcp_cork)
5929 tcp_sock_set_cork(connection->meta.socket->sk, false);
5930
5931 return;
5932 }
5933