xref: /linux/drivers/block/drbd/drbd_receiver.c (revision e656ec8ae2c0319b6d52834695f9635217d62de5)
1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/version.h>
32 #include <linux/drbd.h>
33 #include <linux/fs.h>
34 #include <linux/file.h>
35 #include <linux/in.h>
36 #include <linux/mm.h>
37 #include <linux/memcontrol.h>
38 #include <linux/mm_inline.h>
39 #include <linux/slab.h>
40 #include <linux/smp_lock.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/mm.h>
47 #include <linux/string.h>
48 #include <linux/scatterlist.h>
49 #include "drbd_int.h"
50 #include "drbd_req.h"
51 
52 #include "drbd_vli.h"
53 
54 struct flush_work {
55 	struct drbd_work w;
56 	struct drbd_epoch *epoch;
57 };
58 
59 enum finish_epoch {
60 	FE_STILL_LIVE,
61 	FE_DESTROYED,
62 	FE_RECYCLED,
63 };
64 
65 static int drbd_do_handshake(struct drbd_conf *mdev);
66 static int drbd_do_auth(struct drbd_conf *mdev);
67 
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
70 
71 static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
72 {
73 	struct drbd_epoch *prev;
74 	spin_lock(&mdev->epoch_lock);
75 	prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
76 	if (prev == epoch || prev == mdev->current_epoch)
77 		prev = NULL;
78 	spin_unlock(&mdev->epoch_lock);
79 	return prev;
80 }
81 
82 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
83 
84 static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
85 {
86 	struct page *page = NULL;
87 
88 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
89 	 * So what. It saves a spin_lock. */
90 	if (drbd_pp_vacant > 0) {
91 		spin_lock(&drbd_pp_lock);
92 		page = drbd_pp_pool;
93 		if (page) {
94 			drbd_pp_pool = (struct page *)page_private(page);
95 			set_page_private(page, 0); /* just to be polite */
96 			drbd_pp_vacant--;
97 		}
98 		spin_unlock(&drbd_pp_lock);
99 	}
100 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
101 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
102 	 * which in turn might block on the other node at this very place.  */
103 	if (!page)
104 		page = alloc_page(GFP_TRY);
105 	if (page)
106 		atomic_inc(&mdev->pp_in_use);
107 	return page;
108 }
109 
110 /* kick lower level device, if we have more than (arbitrary number)
111  * reference counts on it, which typically are locally submitted io
112  * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
113 static void maybe_kick_lo(struct drbd_conf *mdev)
114 {
115 	if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
116 		drbd_kick_lo(mdev);
117 }
118 
119 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
120 {
121 	struct drbd_epoch_entry *e;
122 	struct list_head *le, *tle;
123 
124 	/* The EEs are always appended to the end of the list. Since
125 	   they are sent in order over the wire, they have to finish
126 	   in order. As soon as we see the first not finished we can
127 	   stop to examine the list... */
128 
129 	list_for_each_safe(le, tle, &mdev->net_ee) {
130 		e = list_entry(le, struct drbd_epoch_entry, w.list);
131 		if (drbd_bio_has_active_page(e->private_bio))
132 			break;
133 		list_move(le, to_be_freed);
134 	}
135 }
136 
137 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
138 {
139 	LIST_HEAD(reclaimed);
140 	struct drbd_epoch_entry *e, *t;
141 
142 	maybe_kick_lo(mdev);
143 	spin_lock_irq(&mdev->req_lock);
144 	reclaim_net_ee(mdev, &reclaimed);
145 	spin_unlock_irq(&mdev->req_lock);
146 
147 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
148 		drbd_free_ee(mdev, e);
149 }
150 
151 /**
152  * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
153  * @mdev:	DRBD device.
154  * @retry:	whether or not to retry allocation forever (or until signalled)
155  *
156  * Tries to allocate a page, first from our own page pool, then from the
157  * kernel, unless this allocation would exceed the max_buffers setting.
158  * If @retry is non-zero, retry until DRBD frees a page somewhere else.
159  */
160 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
161 {
162 	struct page *page = NULL;
163 	DEFINE_WAIT(wait);
164 
165 	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
166 		page = drbd_pp_first_page_or_try_alloc(mdev);
167 		if (page)
168 			return page;
169 	}
170 
171 	for (;;) {
172 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
173 
174 		drbd_kick_lo_and_reclaim_net(mdev);
175 
176 		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
177 			page = drbd_pp_first_page_or_try_alloc(mdev);
178 			if (page)
179 				break;
180 		}
181 
182 		if (!retry)
183 			break;
184 
185 		if (signal_pending(current)) {
186 			dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
187 			break;
188 		}
189 
190 		schedule();
191 	}
192 	finish_wait(&drbd_pp_wait, &wait);
193 
194 	return page;
195 }
196 
197 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
198  * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
199 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
200 {
201 	int free_it;
202 
203 	spin_lock(&drbd_pp_lock);
204 	if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
205 		free_it = 1;
206 	} else {
207 		set_page_private(page, (unsigned long)drbd_pp_pool);
208 		drbd_pp_pool = page;
209 		drbd_pp_vacant++;
210 		free_it = 0;
211 	}
212 	spin_unlock(&drbd_pp_lock);
213 
214 	atomic_dec(&mdev->pp_in_use);
215 
216 	if (free_it)
217 		__free_page(page);
218 
219 	wake_up(&drbd_pp_wait);
220 }
221 
222 static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
223 {
224 	struct page *p_to_be_freed = NULL;
225 	struct page *page;
226 	struct bio_vec *bvec;
227 	int i;
228 
229 	spin_lock(&drbd_pp_lock);
230 	__bio_for_each_segment(bvec, bio, i, 0) {
231 		if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
232 			set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
233 			p_to_be_freed = bvec->bv_page;
234 		} else {
235 			set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
236 			drbd_pp_pool = bvec->bv_page;
237 			drbd_pp_vacant++;
238 		}
239 	}
240 	spin_unlock(&drbd_pp_lock);
241 	atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
242 
243 	while (p_to_be_freed) {
244 		page = p_to_be_freed;
245 		p_to_be_freed = (struct page *)page_private(page);
246 		set_page_private(page, 0); /* just to be polite */
247 		put_page(page);
248 	}
249 
250 	wake_up(&drbd_pp_wait);
251 }
252 
253 /*
254 You need to hold the req_lock:
255  _drbd_wait_ee_list_empty()
256 
257 You must not have the req_lock:
258  drbd_free_ee()
259  drbd_alloc_ee()
260  drbd_init_ee()
261  drbd_release_ee()
262  drbd_ee_fix_bhs()
263  drbd_process_done_ee()
264  drbd_clear_done_ee()
265  drbd_wait_ee_list_empty()
266 */
267 
268 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
269 				     u64 id,
270 				     sector_t sector,
271 				     unsigned int data_size,
272 				     gfp_t gfp_mask) __must_hold(local)
273 {
274 	struct request_queue *q;
275 	struct drbd_epoch_entry *e;
276 	struct page *page;
277 	struct bio *bio;
278 	unsigned int ds;
279 
280 	if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
281 		return NULL;
282 
283 	e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
284 	if (!e) {
285 		if (!(gfp_mask & __GFP_NOWARN))
286 			dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
287 		return NULL;
288 	}
289 
290 	bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
291 	if (!bio) {
292 		if (!(gfp_mask & __GFP_NOWARN))
293 			dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
294 		goto fail1;
295 	}
296 
297 	bio->bi_bdev = mdev->ldev->backing_bdev;
298 	bio->bi_sector = sector;
299 
300 	ds = data_size;
301 	while (ds) {
302 		page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
303 		if (!page) {
304 			if (!(gfp_mask & __GFP_NOWARN))
305 				dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
306 			goto fail2;
307 		}
308 		if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
309 			drbd_pp_free(mdev, page);
310 			dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
311 			    "data_size=%u,ds=%u) failed\n",
312 			    (unsigned long long)sector, data_size, ds);
313 
314 			q = bdev_get_queue(bio->bi_bdev);
315 			if (q->merge_bvec_fn) {
316 				struct bvec_merge_data bvm = {
317 					.bi_bdev = bio->bi_bdev,
318 					.bi_sector = bio->bi_sector,
319 					.bi_size = bio->bi_size,
320 					.bi_rw = bio->bi_rw,
321 				};
322 				int l = q->merge_bvec_fn(q, &bvm,
323 						&bio->bi_io_vec[bio->bi_vcnt]);
324 				dev_err(DEV, "merge_bvec_fn() = %d\n", l);
325 			}
326 
327 			/* dump more of the bio. */
328 			dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
329 			dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
330 			dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
331 			dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
332 
333 			goto fail2;
334 			break;
335 		}
336 		ds -= min_t(int, ds, PAGE_SIZE);
337 	}
338 
339 	D_ASSERT(data_size == bio->bi_size);
340 
341 	bio->bi_private = e;
342 	e->mdev = mdev;
343 	e->sector = sector;
344 	e->size = bio->bi_size;
345 
346 	e->private_bio = bio;
347 	e->block_id = id;
348 	INIT_HLIST_NODE(&e->colision);
349 	e->epoch = NULL;
350 	e->flags = 0;
351 
352 	return e;
353 
354  fail2:
355 	drbd_pp_free_bio_pages(mdev, bio);
356 	bio_put(bio);
357  fail1:
358 	mempool_free(e, drbd_ee_mempool);
359 
360 	return NULL;
361 }
362 
363 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
364 {
365 	struct bio *bio = e->private_bio;
366 	drbd_pp_free_bio_pages(mdev, bio);
367 	bio_put(bio);
368 	D_ASSERT(hlist_unhashed(&e->colision));
369 	mempool_free(e, drbd_ee_mempool);
370 }
371 
372 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
373 {
374 	LIST_HEAD(work_list);
375 	struct drbd_epoch_entry *e, *t;
376 	int count = 0;
377 
378 	spin_lock_irq(&mdev->req_lock);
379 	list_splice_init(list, &work_list);
380 	spin_unlock_irq(&mdev->req_lock);
381 
382 	list_for_each_entry_safe(e, t, &work_list, w.list) {
383 		drbd_free_ee(mdev, e);
384 		count++;
385 	}
386 	return count;
387 }
388 
389 
390 /*
391  * This function is called from _asender only_
392  * but see also comments in _req_mod(,barrier_acked)
393  * and receive_Barrier.
394  *
395  * Move entries from net_ee to done_ee, if ready.
396  * Grab done_ee, call all callbacks, free the entries.
397  * The callbacks typically send out ACKs.
398  */
399 static int drbd_process_done_ee(struct drbd_conf *mdev)
400 {
401 	LIST_HEAD(work_list);
402 	LIST_HEAD(reclaimed);
403 	struct drbd_epoch_entry *e, *t;
404 	int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
405 
406 	spin_lock_irq(&mdev->req_lock);
407 	reclaim_net_ee(mdev, &reclaimed);
408 	list_splice_init(&mdev->done_ee, &work_list);
409 	spin_unlock_irq(&mdev->req_lock);
410 
411 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
412 		drbd_free_ee(mdev, e);
413 
414 	/* possible callbacks here:
415 	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
416 	 * all ignore the last argument.
417 	 */
418 	list_for_each_entry_safe(e, t, &work_list, w.list) {
419 		/* list_del not necessary, next/prev members not touched */
420 		ok = e->w.cb(mdev, &e->w, !ok) && ok;
421 		drbd_free_ee(mdev, e);
422 	}
423 	wake_up(&mdev->ee_wait);
424 
425 	return ok;
426 }
427 
428 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
429 {
430 	DEFINE_WAIT(wait);
431 
432 	/* avoids spin_lock/unlock
433 	 * and calling prepare_to_wait in the fast path */
434 	while (!list_empty(head)) {
435 		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
436 		spin_unlock_irq(&mdev->req_lock);
437 		drbd_kick_lo(mdev);
438 		schedule();
439 		finish_wait(&mdev->ee_wait, &wait);
440 		spin_lock_irq(&mdev->req_lock);
441 	}
442 }
443 
444 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
445 {
446 	spin_lock_irq(&mdev->req_lock);
447 	_drbd_wait_ee_list_empty(mdev, head);
448 	spin_unlock_irq(&mdev->req_lock);
449 }
450 
451 /* see also kernel_accept; which is only present since 2.6.18.
452  * also we want to log which part of it failed, exactly */
453 static int drbd_accept(struct drbd_conf *mdev, const char **what,
454 		struct socket *sock, struct socket **newsock)
455 {
456 	struct sock *sk = sock->sk;
457 	int err = 0;
458 
459 	*what = "listen";
460 	err = sock->ops->listen(sock, 5);
461 	if (err < 0)
462 		goto out;
463 
464 	*what = "sock_create_lite";
465 	err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
466 			       newsock);
467 	if (err < 0)
468 		goto out;
469 
470 	*what = "accept";
471 	err = sock->ops->accept(sock, *newsock, 0);
472 	if (err < 0) {
473 		sock_release(*newsock);
474 		*newsock = NULL;
475 		goto out;
476 	}
477 	(*newsock)->ops  = sock->ops;
478 
479 out:
480 	return err;
481 }
482 
483 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
484 		    void *buf, size_t size, int flags)
485 {
486 	mm_segment_t oldfs;
487 	struct kvec iov = {
488 		.iov_base = buf,
489 		.iov_len = size,
490 	};
491 	struct msghdr msg = {
492 		.msg_iovlen = 1,
493 		.msg_iov = (struct iovec *)&iov,
494 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
495 	};
496 	int rv;
497 
498 	oldfs = get_fs();
499 	set_fs(KERNEL_DS);
500 	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
501 	set_fs(oldfs);
502 
503 	return rv;
504 }
505 
506 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
507 {
508 	mm_segment_t oldfs;
509 	struct kvec iov = {
510 		.iov_base = buf,
511 		.iov_len = size,
512 	};
513 	struct msghdr msg = {
514 		.msg_iovlen = 1,
515 		.msg_iov = (struct iovec *)&iov,
516 		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
517 	};
518 	int rv;
519 
520 	oldfs = get_fs();
521 	set_fs(KERNEL_DS);
522 
523 	for (;;) {
524 		rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
525 		if (rv == size)
526 			break;
527 
528 		/* Note:
529 		 * ECONNRESET	other side closed the connection
530 		 * ERESTARTSYS	(on  sock) we got a signal
531 		 */
532 
533 		if (rv < 0) {
534 			if (rv == -ECONNRESET)
535 				dev_info(DEV, "sock was reset by peer\n");
536 			else if (rv != -ERESTARTSYS)
537 				dev_err(DEV, "sock_recvmsg returned %d\n", rv);
538 			break;
539 		} else if (rv == 0) {
540 			dev_info(DEV, "sock was shut down by peer\n");
541 			break;
542 		} else	{
543 			/* signal came in, or peer/link went down,
544 			 * after we read a partial message
545 			 */
546 			/* D_ASSERT(signal_pending(current)); */
547 			break;
548 		}
549 	};
550 
551 	set_fs(oldfs);
552 
553 	if (rv != size)
554 		drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
555 
556 	return rv;
557 }
558 
559 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
560 {
561 	const char *what;
562 	struct socket *sock;
563 	struct sockaddr_in6 src_in6;
564 	int err;
565 	int disconnect_on_error = 1;
566 
567 	if (!get_net_conf(mdev))
568 		return NULL;
569 
570 	what = "sock_create_kern";
571 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
572 		SOCK_STREAM, IPPROTO_TCP, &sock);
573 	if (err < 0) {
574 		sock = NULL;
575 		goto out;
576 	}
577 
578 	sock->sk->sk_rcvtimeo =
579 	sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
580 
581        /* explicitly bind to the configured IP as source IP
582 	*  for the outgoing connections.
583 	*  This is needed for multihomed hosts and to be
584 	*  able to use lo: interfaces for drbd.
585 	* Make sure to use 0 as port number, so linux selects
586 	*  a free one dynamically.
587 	*/
588 	memcpy(&src_in6, mdev->net_conf->my_addr,
589 	       min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
590 	if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
591 		src_in6.sin6_port = 0;
592 	else
593 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
594 
595 	what = "bind before connect";
596 	err = sock->ops->bind(sock,
597 			      (struct sockaddr *) &src_in6,
598 			      mdev->net_conf->my_addr_len);
599 	if (err < 0)
600 		goto out;
601 
602 	/* connect may fail, peer not yet available.
603 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
604 	disconnect_on_error = 0;
605 	what = "connect";
606 	err = sock->ops->connect(sock,
607 				 (struct sockaddr *)mdev->net_conf->peer_addr,
608 				 mdev->net_conf->peer_addr_len, 0);
609 
610 out:
611 	if (err < 0) {
612 		if (sock) {
613 			sock_release(sock);
614 			sock = NULL;
615 		}
616 		switch (-err) {
617 			/* timeout, busy, signal pending */
618 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
619 		case EINTR: case ERESTARTSYS:
620 			/* peer not (yet) available, network problem */
621 		case ECONNREFUSED: case ENETUNREACH:
622 		case EHOSTDOWN:    case EHOSTUNREACH:
623 			disconnect_on_error = 0;
624 			break;
625 		default:
626 			dev_err(DEV, "%s failed, err = %d\n", what, err);
627 		}
628 		if (disconnect_on_error)
629 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
630 	}
631 	put_net_conf(mdev);
632 	return sock;
633 }
634 
635 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
636 {
637 	int timeo, err;
638 	struct socket *s_estab = NULL, *s_listen;
639 	const char *what;
640 
641 	if (!get_net_conf(mdev))
642 		return NULL;
643 
644 	what = "sock_create_kern";
645 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
646 		SOCK_STREAM, IPPROTO_TCP, &s_listen);
647 	if (err) {
648 		s_listen = NULL;
649 		goto out;
650 	}
651 
652 	timeo = mdev->net_conf->try_connect_int * HZ;
653 	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
654 
655 	s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
656 	s_listen->sk->sk_rcvtimeo = timeo;
657 	s_listen->sk->sk_sndtimeo = timeo;
658 
659 	what = "bind before listen";
660 	err = s_listen->ops->bind(s_listen,
661 			      (struct sockaddr *) mdev->net_conf->my_addr,
662 			      mdev->net_conf->my_addr_len);
663 	if (err < 0)
664 		goto out;
665 
666 	err = drbd_accept(mdev, &what, s_listen, &s_estab);
667 
668 out:
669 	if (s_listen)
670 		sock_release(s_listen);
671 	if (err < 0) {
672 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
673 			dev_err(DEV, "%s failed, err = %d\n", what, err);
674 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
675 		}
676 	}
677 	put_net_conf(mdev);
678 
679 	return s_estab;
680 }
681 
682 static int drbd_send_fp(struct drbd_conf *mdev,
683 	struct socket *sock, enum drbd_packets cmd)
684 {
685 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
686 
687 	return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
688 }
689 
690 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
691 {
692 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
693 	int rr;
694 
695 	rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
696 
697 	if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
698 		return be16_to_cpu(h->command);
699 
700 	return 0xffff;
701 }
702 
703 /**
704  * drbd_socket_okay() - Free the socket if its connection is not okay
705  * @mdev:	DRBD device.
706  * @sock:	pointer to the pointer to the socket.
707  */
708 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
709 {
710 	int rr;
711 	char tb[4];
712 
713 	if (!*sock)
714 		return FALSE;
715 
716 	rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
717 
718 	if (rr > 0 || rr == -EAGAIN) {
719 		return TRUE;
720 	} else {
721 		sock_release(*sock);
722 		*sock = NULL;
723 		return FALSE;
724 	}
725 }
726 
727 /*
728  * return values:
729  *   1 yes, we have a valid connection
730  *   0 oops, did not work out, please try again
731  *  -1 peer talks different language,
732  *     no point in trying again, please go standalone.
733  *  -2 We do not have a network config...
734  */
735 static int drbd_connect(struct drbd_conf *mdev)
736 {
737 	struct socket *s, *sock, *msock;
738 	int try, h, ok;
739 
740 	D_ASSERT(!mdev->data.socket);
741 
742 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
743 		dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
744 
745 	if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
746 		return -2;
747 
748 	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
749 
750 	sock  = NULL;
751 	msock = NULL;
752 
753 	do {
754 		for (try = 0;;) {
755 			/* 3 tries, this should take less than a second! */
756 			s = drbd_try_connect(mdev);
757 			if (s || ++try >= 3)
758 				break;
759 			/* give the other side time to call bind() & listen() */
760 			__set_current_state(TASK_INTERRUPTIBLE);
761 			schedule_timeout(HZ / 10);
762 		}
763 
764 		if (s) {
765 			if (!sock) {
766 				drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
767 				sock = s;
768 				s = NULL;
769 			} else if (!msock) {
770 				drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
771 				msock = s;
772 				s = NULL;
773 			} else {
774 				dev_err(DEV, "Logic error in drbd_connect()\n");
775 				goto out_release_sockets;
776 			}
777 		}
778 
779 		if (sock && msock) {
780 			__set_current_state(TASK_INTERRUPTIBLE);
781 			schedule_timeout(HZ / 10);
782 			ok = drbd_socket_okay(mdev, &sock);
783 			ok = drbd_socket_okay(mdev, &msock) && ok;
784 			if (ok)
785 				break;
786 		}
787 
788 retry:
789 		s = drbd_wait_for_connect(mdev);
790 		if (s) {
791 			try = drbd_recv_fp(mdev, s);
792 			drbd_socket_okay(mdev, &sock);
793 			drbd_socket_okay(mdev, &msock);
794 			switch (try) {
795 			case P_HAND_SHAKE_S:
796 				if (sock) {
797 					dev_warn(DEV, "initial packet S crossed\n");
798 					sock_release(sock);
799 				}
800 				sock = s;
801 				break;
802 			case P_HAND_SHAKE_M:
803 				if (msock) {
804 					dev_warn(DEV, "initial packet M crossed\n");
805 					sock_release(msock);
806 				}
807 				msock = s;
808 				set_bit(DISCARD_CONCURRENT, &mdev->flags);
809 				break;
810 			default:
811 				dev_warn(DEV, "Error receiving initial packet\n");
812 				sock_release(s);
813 				if (random32() & 1)
814 					goto retry;
815 			}
816 		}
817 
818 		if (mdev->state.conn <= C_DISCONNECTING)
819 			goto out_release_sockets;
820 		if (signal_pending(current)) {
821 			flush_signals(current);
822 			smp_rmb();
823 			if (get_t_state(&mdev->receiver) == Exiting)
824 				goto out_release_sockets;
825 		}
826 
827 		if (sock && msock) {
828 			ok = drbd_socket_okay(mdev, &sock);
829 			ok = drbd_socket_okay(mdev, &msock) && ok;
830 			if (ok)
831 				break;
832 		}
833 	} while (1);
834 
835 	msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
836 	sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
837 
838 	sock->sk->sk_allocation = GFP_NOIO;
839 	msock->sk->sk_allocation = GFP_NOIO;
840 
841 	sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
842 	msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
843 
844 	if (mdev->net_conf->sndbuf_size) {
845 		sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
846 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
847 	}
848 
849 	if (mdev->net_conf->rcvbuf_size) {
850 		sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
851 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
852 	}
853 
854 	/* NOT YET ...
855 	 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
856 	 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
857 	 * first set it to the P_HAND_SHAKE timeout,
858 	 * which we set to 4x the configured ping_timeout. */
859 	sock->sk->sk_sndtimeo =
860 	sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
861 
862 	msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
863 	msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
864 
865 	/* we don't want delays.
866 	 * we use TCP_CORK where apropriate, though */
867 	drbd_tcp_nodelay(sock);
868 	drbd_tcp_nodelay(msock);
869 
870 	mdev->data.socket = sock;
871 	mdev->meta.socket = msock;
872 	mdev->last_received = jiffies;
873 
874 	D_ASSERT(mdev->asender.task == NULL);
875 
876 	h = drbd_do_handshake(mdev);
877 	if (h <= 0)
878 		return h;
879 
880 	if (mdev->cram_hmac_tfm) {
881 		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
882 		if (!drbd_do_auth(mdev)) {
883 			dev_err(DEV, "Authentication of peer failed\n");
884 			return -1;
885 		}
886 	}
887 
888 	if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
889 		return 0;
890 
891 	sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
892 	sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
893 
894 	atomic_set(&mdev->packet_seq, 0);
895 	mdev->peer_seq = 0;
896 
897 	drbd_thread_start(&mdev->asender);
898 
899 	drbd_send_protocol(mdev);
900 	drbd_send_sync_param(mdev, &mdev->sync_conf);
901 	drbd_send_sizes(mdev, 0);
902 	drbd_send_uuids(mdev);
903 	drbd_send_state(mdev);
904 	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
905 	clear_bit(RESIZE_PENDING, &mdev->flags);
906 
907 	return 1;
908 
909 out_release_sockets:
910 	if (sock)
911 		sock_release(sock);
912 	if (msock)
913 		sock_release(msock);
914 	return -1;
915 }
916 
917 static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
918 {
919 	int r;
920 
921 	r = drbd_recv(mdev, h, sizeof(*h));
922 
923 	if (unlikely(r != sizeof(*h))) {
924 		dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
925 		return FALSE;
926 	};
927 	h->command = be16_to_cpu(h->command);
928 	h->length  = be16_to_cpu(h->length);
929 	if (unlikely(h->magic != BE_DRBD_MAGIC)) {
930 		dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
931 		    (long)be32_to_cpu(h->magic),
932 		    h->command, h->length);
933 		return FALSE;
934 	}
935 	mdev->last_received = jiffies;
936 
937 	return TRUE;
938 }
939 
940 static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
941 {
942 	int rv;
943 
944 	if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
945 		rv = blkdev_issue_flush(mdev->ldev->backing_bdev, NULL);
946 		if (rv) {
947 			dev_err(DEV, "local disk flush failed with status %d\n", rv);
948 			/* would rather check on EOPNOTSUPP, but that is not reliable.
949 			 * don't try again for ANY return value != 0
950 			 * if (rv == -EOPNOTSUPP) */
951 			drbd_bump_write_ordering(mdev, WO_drain_io);
952 		}
953 		put_ldev(mdev);
954 	}
955 
956 	return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
957 }
958 
959 static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
960 {
961 	struct flush_work *fw = (struct flush_work *)w;
962 	struct drbd_epoch *epoch = fw->epoch;
963 
964 	kfree(w);
965 
966 	if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
967 		drbd_flush_after_epoch(mdev, epoch);
968 
969 	drbd_may_finish_epoch(mdev, epoch, EV_PUT |
970 			      (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
971 
972 	return 1;
973 }
974 
975 /**
976  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
977  * @mdev:	DRBD device.
978  * @epoch:	Epoch object.
979  * @ev:		Epoch event.
980  */
981 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
982 					       struct drbd_epoch *epoch,
983 					       enum epoch_event ev)
984 {
985 	int finish, epoch_size;
986 	struct drbd_epoch *next_epoch;
987 	int schedule_flush = 0;
988 	enum finish_epoch rv = FE_STILL_LIVE;
989 
990 	spin_lock(&mdev->epoch_lock);
991 	do {
992 		next_epoch = NULL;
993 		finish = 0;
994 
995 		epoch_size = atomic_read(&epoch->epoch_size);
996 
997 		switch (ev & ~EV_CLEANUP) {
998 		case EV_PUT:
999 			atomic_dec(&epoch->active);
1000 			break;
1001 		case EV_GOT_BARRIER_NR:
1002 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1003 
1004 			/* Special case: If we just switched from WO_bio_barrier to
1005 			   WO_bdev_flush we should not finish the current epoch */
1006 			if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1007 			    mdev->write_ordering != WO_bio_barrier &&
1008 			    epoch == mdev->current_epoch)
1009 				clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1010 			break;
1011 		case EV_BARRIER_DONE:
1012 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1013 			break;
1014 		case EV_BECAME_LAST:
1015 			/* nothing to do*/
1016 			break;
1017 		}
1018 
1019 		if (epoch_size != 0 &&
1020 		    atomic_read(&epoch->active) == 0 &&
1021 		    test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1022 		    epoch->list.prev == &mdev->current_epoch->list &&
1023 		    !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1024 			/* Nearly all conditions are met to finish that epoch... */
1025 			if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1026 			    mdev->write_ordering == WO_none ||
1027 			    (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1028 			    ev & EV_CLEANUP) {
1029 				finish = 1;
1030 				set_bit(DE_IS_FINISHING, &epoch->flags);
1031 			} else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1032 				 mdev->write_ordering == WO_bio_barrier) {
1033 				atomic_inc(&epoch->active);
1034 				schedule_flush = 1;
1035 			}
1036 		}
1037 		if (finish) {
1038 			if (!(ev & EV_CLEANUP)) {
1039 				spin_unlock(&mdev->epoch_lock);
1040 				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1041 				spin_lock(&mdev->epoch_lock);
1042 			}
1043 			dec_unacked(mdev);
1044 
1045 			if (mdev->current_epoch != epoch) {
1046 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1047 				list_del(&epoch->list);
1048 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1049 				mdev->epochs--;
1050 				kfree(epoch);
1051 
1052 				if (rv == FE_STILL_LIVE)
1053 					rv = FE_DESTROYED;
1054 			} else {
1055 				epoch->flags = 0;
1056 				atomic_set(&epoch->epoch_size, 0);
1057 				/* atomic_set(&epoch->active, 0); is alrady zero */
1058 				if (rv == FE_STILL_LIVE)
1059 					rv = FE_RECYCLED;
1060 			}
1061 		}
1062 
1063 		if (!next_epoch)
1064 			break;
1065 
1066 		epoch = next_epoch;
1067 	} while (1);
1068 
1069 	spin_unlock(&mdev->epoch_lock);
1070 
1071 	if (schedule_flush) {
1072 		struct flush_work *fw;
1073 		fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1074 		if (fw) {
1075 			fw->w.cb = w_flush;
1076 			fw->epoch = epoch;
1077 			drbd_queue_work(&mdev->data.work, &fw->w);
1078 		} else {
1079 			dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1080 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1081 			/* That is not a recursion, only one level */
1082 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1083 			drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1084 		}
1085 	}
1086 
1087 	return rv;
1088 }
1089 
1090 /**
1091  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1092  * @mdev:	DRBD device.
1093  * @wo:		Write ordering method to try.
1094  */
1095 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1096 {
1097 	enum write_ordering_e pwo;
1098 	static char *write_ordering_str[] = {
1099 		[WO_none] = "none",
1100 		[WO_drain_io] = "drain",
1101 		[WO_bdev_flush] = "flush",
1102 		[WO_bio_barrier] = "barrier",
1103 	};
1104 
1105 	pwo = mdev->write_ordering;
1106 	wo = min(pwo, wo);
1107 	if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1108 		wo = WO_bdev_flush;
1109 	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1110 		wo = WO_drain_io;
1111 	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1112 		wo = WO_none;
1113 	mdev->write_ordering = wo;
1114 	if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1115 		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1116 }
1117 
1118 /**
1119  * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1120  * @mdev:	DRBD device.
1121  * @w:		work object.
1122  * @cancel:	The connection will be closed anyways (unused in this callback)
1123  */
1124 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1125 {
1126 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1127 	struct bio *bio = e->private_bio;
1128 
1129 	/* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1130 	   (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1131 	   so that we can finish that epoch in drbd_may_finish_epoch().
1132 	   That is necessary if we already have a long chain of Epochs, before
1133 	   we realize that BIO_RW_BARRIER is actually not supported */
1134 
1135 	/* As long as the -ENOTSUPP on the barrier is reported immediately
1136 	   that will never trigger. If it is reported late, we will just
1137 	   print that warning and continue correctly for all future requests
1138 	   with WO_bdev_flush */
1139 	if (previous_epoch(mdev, e->epoch))
1140 		dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1141 
1142 	/* prepare bio for re-submit,
1143 	 * re-init volatile members */
1144 	/* we still have a local reference,
1145 	 * get_ldev was done in receive_Data. */
1146 	bio->bi_bdev = mdev->ldev->backing_bdev;
1147 	bio->bi_sector = e->sector;
1148 	bio->bi_size = e->size;
1149 	bio->bi_idx = 0;
1150 
1151 	bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1152 	bio->bi_flags |= 1 << BIO_UPTODATE;
1153 
1154 	/* don't know whether this is necessary: */
1155 	bio->bi_phys_segments = 0;
1156 	bio->bi_next = NULL;
1157 
1158 	/* these should be unchanged: */
1159 	/* bio->bi_end_io = drbd_endio_write_sec; */
1160 	/* bio->bi_vcnt = whatever; */
1161 
1162 	e->w.cb = e_end_block;
1163 
1164 	/* This is no longer a barrier request. */
1165 	bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
1166 
1167 	drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
1168 
1169 	return 1;
1170 }
1171 
1172 static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1173 {
1174 	int rv, issue_flush;
1175 	struct p_barrier *p = (struct p_barrier *)h;
1176 	struct drbd_epoch *epoch;
1177 
1178 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1179 
1180 	rv = drbd_recv(mdev, h->payload, h->length);
1181 	ERR_IF(rv != h->length) return FALSE;
1182 
1183 	inc_unacked(mdev);
1184 
1185 	if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1186 		drbd_kick_lo(mdev);
1187 
1188 	mdev->current_epoch->barrier_nr = p->barrier;
1189 	rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1190 
1191 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1192 	 * the activity log, which means it would not be resynced in case the
1193 	 * R_PRIMARY crashes now.
1194 	 * Therefore we must send the barrier_ack after the barrier request was
1195 	 * completed. */
1196 	switch (mdev->write_ordering) {
1197 	case WO_bio_barrier:
1198 	case WO_none:
1199 		if (rv == FE_RECYCLED)
1200 			return TRUE;
1201 		break;
1202 
1203 	case WO_bdev_flush:
1204 	case WO_drain_io:
1205 		D_ASSERT(rv == FE_STILL_LIVE);
1206 		set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1207 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1208 		rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1209 		if (rv == FE_RECYCLED)
1210 			return TRUE;
1211 
1212 		/* The asender will send all the ACKs and barrier ACKs out, since
1213 		   all EEs moved from the active_ee to the done_ee. We need to
1214 		   provide a new epoch object for the EEs that come in soon */
1215 		break;
1216 	}
1217 
1218 	/* receiver context, in the writeout path of the other node.
1219 	 * avoid potential distributed deadlock */
1220 	epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1221 	if (!epoch) {
1222 		dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1223 		issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1224 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1225 		if (issue_flush) {
1226 			rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1227 			if (rv == FE_RECYCLED)
1228 				return TRUE;
1229 		}
1230 
1231 		drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1232 
1233 		return TRUE;
1234 	}
1235 
1236 	epoch->flags = 0;
1237 	atomic_set(&epoch->epoch_size, 0);
1238 	atomic_set(&epoch->active, 0);
1239 
1240 	spin_lock(&mdev->epoch_lock);
1241 	if (atomic_read(&mdev->current_epoch->epoch_size)) {
1242 		list_add(&epoch->list, &mdev->current_epoch->list);
1243 		mdev->current_epoch = epoch;
1244 		mdev->epochs++;
1245 	} else {
1246 		/* The current_epoch got recycled while we allocated this one... */
1247 		kfree(epoch);
1248 	}
1249 	spin_unlock(&mdev->epoch_lock);
1250 
1251 	return TRUE;
1252 }
1253 
1254 /* used from receive_RSDataReply (recv_resync_read)
1255  * and from receive_Data */
1256 static struct drbd_epoch_entry *
1257 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1258 {
1259 	struct drbd_epoch_entry *e;
1260 	struct bio_vec *bvec;
1261 	struct page *page;
1262 	struct bio *bio;
1263 	int dgs, ds, i, rr;
1264 	void *dig_in = mdev->int_dig_in;
1265 	void *dig_vv = mdev->int_dig_vv;
1266 
1267 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1268 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1269 
1270 	if (dgs) {
1271 		rr = drbd_recv(mdev, dig_in, dgs);
1272 		if (rr != dgs) {
1273 			dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1274 			     rr, dgs);
1275 			return NULL;
1276 		}
1277 	}
1278 
1279 	data_size -= dgs;
1280 
1281 	ERR_IF(data_size &  0x1ff) return NULL;
1282 	ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1283 
1284 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1285 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1286 	 * which in turn might block on the other node at this very place.  */
1287 	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1288 	if (!e)
1289 		return NULL;
1290 	bio = e->private_bio;
1291 	ds = data_size;
1292 	bio_for_each_segment(bvec, bio, i) {
1293 		page = bvec->bv_page;
1294 		rr = drbd_recv(mdev, kmap(page), min_t(int, ds, PAGE_SIZE));
1295 		kunmap(page);
1296 		if (rr != min_t(int, ds, PAGE_SIZE)) {
1297 			drbd_free_ee(mdev, e);
1298 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1299 			     rr, min_t(int, ds, PAGE_SIZE));
1300 			return NULL;
1301 		}
1302 		ds -= rr;
1303 	}
1304 
1305 	if (dgs) {
1306 		drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1307 		if (memcmp(dig_in, dig_vv, dgs)) {
1308 			dev_err(DEV, "Digest integrity check FAILED.\n");
1309 			drbd_bcast_ee(mdev, "digest failed",
1310 					dgs, dig_in, dig_vv, e);
1311 			drbd_free_ee(mdev, e);
1312 			return NULL;
1313 		}
1314 	}
1315 	mdev->recv_cnt += data_size>>9;
1316 	return e;
1317 }
1318 
1319 /* drbd_drain_block() just takes a data block
1320  * out of the socket input buffer, and discards it.
1321  */
1322 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1323 {
1324 	struct page *page;
1325 	int rr, rv = 1;
1326 	void *data;
1327 
1328 	page = drbd_pp_alloc(mdev, 1);
1329 
1330 	data = kmap(page);
1331 	while (data_size) {
1332 		rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1333 		if (rr != min_t(int, data_size, PAGE_SIZE)) {
1334 			rv = 0;
1335 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1336 			     rr, min_t(int, data_size, PAGE_SIZE));
1337 			break;
1338 		}
1339 		data_size -= rr;
1340 	}
1341 	kunmap(page);
1342 	drbd_pp_free(mdev, page);
1343 	return rv;
1344 }
1345 
1346 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1347 			   sector_t sector, int data_size)
1348 {
1349 	struct bio_vec *bvec;
1350 	struct bio *bio;
1351 	int dgs, rr, i, expect;
1352 	void *dig_in = mdev->int_dig_in;
1353 	void *dig_vv = mdev->int_dig_vv;
1354 
1355 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1356 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1357 
1358 	if (dgs) {
1359 		rr = drbd_recv(mdev, dig_in, dgs);
1360 		if (rr != dgs) {
1361 			dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1362 			     rr, dgs);
1363 			return 0;
1364 		}
1365 	}
1366 
1367 	data_size -= dgs;
1368 
1369 	/* optimistically update recv_cnt.  if receiving fails below,
1370 	 * we disconnect anyways, and counters will be reset. */
1371 	mdev->recv_cnt += data_size>>9;
1372 
1373 	bio = req->master_bio;
1374 	D_ASSERT(sector == bio->bi_sector);
1375 
1376 	bio_for_each_segment(bvec, bio, i) {
1377 		expect = min_t(int, data_size, bvec->bv_len);
1378 		rr = drbd_recv(mdev,
1379 			     kmap(bvec->bv_page)+bvec->bv_offset,
1380 			     expect);
1381 		kunmap(bvec->bv_page);
1382 		if (rr != expect) {
1383 			dev_warn(DEV, "short read receiving data reply: "
1384 			     "read %d expected %d\n",
1385 			     rr, expect);
1386 			return 0;
1387 		}
1388 		data_size -= rr;
1389 	}
1390 
1391 	if (dgs) {
1392 		drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1393 		if (memcmp(dig_in, dig_vv, dgs)) {
1394 			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1395 			return 0;
1396 		}
1397 	}
1398 
1399 	D_ASSERT(data_size == 0);
1400 	return 1;
1401 }
1402 
1403 /* e_end_resync_block() is called via
1404  * drbd_process_done_ee() by asender only */
1405 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1406 {
1407 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1408 	sector_t sector = e->sector;
1409 	int ok;
1410 
1411 	D_ASSERT(hlist_unhashed(&e->colision));
1412 
1413 	if (likely(drbd_bio_uptodate(e->private_bio))) {
1414 		drbd_set_in_sync(mdev, sector, e->size);
1415 		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1416 	} else {
1417 		/* Record failure to sync */
1418 		drbd_rs_failed_io(mdev, sector, e->size);
1419 
1420 		ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1421 	}
1422 	dec_unacked(mdev);
1423 
1424 	return ok;
1425 }
1426 
1427 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1428 {
1429 	struct drbd_epoch_entry *e;
1430 
1431 	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1432 	if (!e) {
1433 		put_ldev(mdev);
1434 		return FALSE;
1435 	}
1436 
1437 	dec_rs_pending(mdev);
1438 
1439 	e->private_bio->bi_end_io = drbd_endio_write_sec;
1440 	e->private_bio->bi_rw = WRITE;
1441 	e->w.cb = e_end_resync_block;
1442 
1443 	inc_unacked(mdev);
1444 	/* corresponding dec_unacked() in e_end_resync_block()
1445 	 * respective _drbd_clear_done_ee */
1446 
1447 	spin_lock_irq(&mdev->req_lock);
1448 	list_add(&e->w.list, &mdev->sync_ee);
1449 	spin_unlock_irq(&mdev->req_lock);
1450 
1451 	drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
1452 	/* accounting done in endio */
1453 
1454 	maybe_kick_lo(mdev);
1455 	return TRUE;
1456 }
1457 
1458 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1459 {
1460 	struct drbd_request *req;
1461 	sector_t sector;
1462 	unsigned int header_size, data_size;
1463 	int ok;
1464 	struct p_data *p = (struct p_data *)h;
1465 
1466 	header_size = sizeof(*p) - sizeof(*h);
1467 	data_size   = h->length  - header_size;
1468 
1469 	ERR_IF(data_size == 0) return FALSE;
1470 
1471 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1472 		return FALSE;
1473 
1474 	sector = be64_to_cpu(p->sector);
1475 
1476 	spin_lock_irq(&mdev->req_lock);
1477 	req = _ar_id_to_req(mdev, p->block_id, sector);
1478 	spin_unlock_irq(&mdev->req_lock);
1479 	if (unlikely(!req)) {
1480 		dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1481 		return FALSE;
1482 	}
1483 
1484 	/* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1485 	 * special casing it there for the various failure cases.
1486 	 * still no race with drbd_fail_pending_reads */
1487 	ok = recv_dless_read(mdev, req, sector, data_size);
1488 
1489 	if (ok)
1490 		req_mod(req, data_received);
1491 	/* else: nothing. handled from drbd_disconnect...
1492 	 * I don't think we may complete this just yet
1493 	 * in case we are "on-disconnect: freeze" */
1494 
1495 	return ok;
1496 }
1497 
1498 static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1499 {
1500 	sector_t sector;
1501 	unsigned int header_size, data_size;
1502 	int ok;
1503 	struct p_data *p = (struct p_data *)h;
1504 
1505 	header_size = sizeof(*p) - sizeof(*h);
1506 	data_size   = h->length  - header_size;
1507 
1508 	ERR_IF(data_size == 0) return FALSE;
1509 
1510 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1511 		return FALSE;
1512 
1513 	sector = be64_to_cpu(p->sector);
1514 	D_ASSERT(p->block_id == ID_SYNCER);
1515 
1516 	if (get_ldev(mdev)) {
1517 		/* data is submitted to disk within recv_resync_read.
1518 		 * corresponding put_ldev done below on error,
1519 		 * or in drbd_endio_write_sec. */
1520 		ok = recv_resync_read(mdev, sector, data_size);
1521 	} else {
1522 		if (__ratelimit(&drbd_ratelimit_state))
1523 			dev_err(DEV, "Can not write resync data to local disk.\n");
1524 
1525 		ok = drbd_drain_block(mdev, data_size);
1526 
1527 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1528 	}
1529 
1530 	return ok;
1531 }
1532 
1533 /* e_end_block() is called via drbd_process_done_ee().
1534  * this means this function only runs in the asender thread
1535  */
1536 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1537 {
1538 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1539 	sector_t sector = e->sector;
1540 	struct drbd_epoch *epoch;
1541 	int ok = 1, pcmd;
1542 
1543 	if (e->flags & EE_IS_BARRIER) {
1544 		epoch = previous_epoch(mdev, e->epoch);
1545 		if (epoch)
1546 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1547 	}
1548 
1549 	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1550 		if (likely(drbd_bio_uptodate(e->private_bio))) {
1551 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1552 				mdev->state.conn <= C_PAUSED_SYNC_T &&
1553 				e->flags & EE_MAY_SET_IN_SYNC) ?
1554 				P_RS_WRITE_ACK : P_WRITE_ACK;
1555 			ok &= drbd_send_ack(mdev, pcmd, e);
1556 			if (pcmd == P_RS_WRITE_ACK)
1557 				drbd_set_in_sync(mdev, sector, e->size);
1558 		} else {
1559 			ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1560 			/* we expect it to be marked out of sync anyways...
1561 			 * maybe assert this?  */
1562 		}
1563 		dec_unacked(mdev);
1564 	}
1565 	/* we delete from the conflict detection hash _after_ we sent out the
1566 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1567 	if (mdev->net_conf->two_primaries) {
1568 		spin_lock_irq(&mdev->req_lock);
1569 		D_ASSERT(!hlist_unhashed(&e->colision));
1570 		hlist_del_init(&e->colision);
1571 		spin_unlock_irq(&mdev->req_lock);
1572 	} else {
1573 		D_ASSERT(hlist_unhashed(&e->colision));
1574 	}
1575 
1576 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1577 
1578 	return ok;
1579 }
1580 
1581 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1582 {
1583 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1584 	int ok = 1;
1585 
1586 	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1587 	ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1588 
1589 	spin_lock_irq(&mdev->req_lock);
1590 	D_ASSERT(!hlist_unhashed(&e->colision));
1591 	hlist_del_init(&e->colision);
1592 	spin_unlock_irq(&mdev->req_lock);
1593 
1594 	dec_unacked(mdev);
1595 
1596 	return ok;
1597 }
1598 
1599 /* Called from receive_Data.
1600  * Synchronize packets on sock with packets on msock.
1601  *
1602  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1603  * packet traveling on msock, they are still processed in the order they have
1604  * been sent.
1605  *
1606  * Note: we don't care for Ack packets overtaking P_DATA packets.
1607  *
1608  * In case packet_seq is larger than mdev->peer_seq number, there are
1609  * outstanding packets on the msock. We wait for them to arrive.
1610  * In case we are the logically next packet, we update mdev->peer_seq
1611  * ourselves. Correctly handles 32bit wrap around.
1612  *
1613  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1614  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1615  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1616  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1617  *
1618  * returns 0 if we may process the packet,
1619  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1620 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1621 {
1622 	DEFINE_WAIT(wait);
1623 	unsigned int p_seq;
1624 	long timeout;
1625 	int ret = 0;
1626 	spin_lock(&mdev->peer_seq_lock);
1627 	for (;;) {
1628 		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1629 		if (seq_le(packet_seq, mdev->peer_seq+1))
1630 			break;
1631 		if (signal_pending(current)) {
1632 			ret = -ERESTARTSYS;
1633 			break;
1634 		}
1635 		p_seq = mdev->peer_seq;
1636 		spin_unlock(&mdev->peer_seq_lock);
1637 		timeout = schedule_timeout(30*HZ);
1638 		spin_lock(&mdev->peer_seq_lock);
1639 		if (timeout == 0 && p_seq == mdev->peer_seq) {
1640 			ret = -ETIMEDOUT;
1641 			dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1642 			break;
1643 		}
1644 	}
1645 	finish_wait(&mdev->seq_wait, &wait);
1646 	if (mdev->peer_seq+1 == packet_seq)
1647 		mdev->peer_seq++;
1648 	spin_unlock(&mdev->peer_seq_lock);
1649 	return ret;
1650 }
1651 
1652 /* mirrored write */
1653 static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1654 {
1655 	sector_t sector;
1656 	struct drbd_epoch_entry *e;
1657 	struct p_data *p = (struct p_data *)h;
1658 	int header_size, data_size;
1659 	int rw = WRITE;
1660 	u32 dp_flags;
1661 
1662 	header_size = sizeof(*p) - sizeof(*h);
1663 	data_size   = h->length  - header_size;
1664 
1665 	ERR_IF(data_size == 0) return FALSE;
1666 
1667 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1668 		return FALSE;
1669 
1670 	if (!get_ldev(mdev)) {
1671 		if (__ratelimit(&drbd_ratelimit_state))
1672 			dev_err(DEV, "Can not write mirrored data block "
1673 			    "to local disk.\n");
1674 		spin_lock(&mdev->peer_seq_lock);
1675 		if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1676 			mdev->peer_seq++;
1677 		spin_unlock(&mdev->peer_seq_lock);
1678 
1679 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1680 		atomic_inc(&mdev->current_epoch->epoch_size);
1681 		return drbd_drain_block(mdev, data_size);
1682 	}
1683 
1684 	/* get_ldev(mdev) successful.
1685 	 * Corresponding put_ldev done either below (on various errors),
1686 	 * or in drbd_endio_write_sec, if we successfully submit the data at
1687 	 * the end of this function. */
1688 
1689 	sector = be64_to_cpu(p->sector);
1690 	e = read_in_block(mdev, p->block_id, sector, data_size);
1691 	if (!e) {
1692 		put_ldev(mdev);
1693 		return FALSE;
1694 	}
1695 
1696 	e->private_bio->bi_end_io = drbd_endio_write_sec;
1697 	e->w.cb = e_end_block;
1698 
1699 	spin_lock(&mdev->epoch_lock);
1700 	e->epoch = mdev->current_epoch;
1701 	atomic_inc(&e->epoch->epoch_size);
1702 	atomic_inc(&e->epoch->active);
1703 
1704 	if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1705 		struct drbd_epoch *epoch;
1706 		/* Issue a barrier if we start a new epoch, and the previous epoch
1707 		   was not a epoch containing a single request which already was
1708 		   a Barrier. */
1709 		epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1710 		if (epoch == e->epoch) {
1711 			set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1712 			rw |= (1<<BIO_RW_BARRIER);
1713 			e->flags |= EE_IS_BARRIER;
1714 		} else {
1715 			if (atomic_read(&epoch->epoch_size) > 1 ||
1716 			    !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1717 				set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1718 				set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1719 				rw |= (1<<BIO_RW_BARRIER);
1720 				e->flags |= EE_IS_BARRIER;
1721 			}
1722 		}
1723 	}
1724 	spin_unlock(&mdev->epoch_lock);
1725 
1726 	dp_flags = be32_to_cpu(p->dp_flags);
1727 	if (dp_flags & DP_HARDBARRIER) {
1728 		dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1729 		/* rw |= (1<<BIO_RW_BARRIER); */
1730 	}
1731 	if (dp_flags & DP_RW_SYNC)
1732 		rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1733 	if (dp_flags & DP_MAY_SET_IN_SYNC)
1734 		e->flags |= EE_MAY_SET_IN_SYNC;
1735 
1736 	/* I'm the receiver, I do hold a net_cnt reference. */
1737 	if (!mdev->net_conf->two_primaries) {
1738 		spin_lock_irq(&mdev->req_lock);
1739 	} else {
1740 		/* don't get the req_lock yet,
1741 		 * we may sleep in drbd_wait_peer_seq */
1742 		const int size = e->size;
1743 		const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1744 		DEFINE_WAIT(wait);
1745 		struct drbd_request *i;
1746 		struct hlist_node *n;
1747 		struct hlist_head *slot;
1748 		int first;
1749 
1750 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1751 		BUG_ON(mdev->ee_hash == NULL);
1752 		BUG_ON(mdev->tl_hash == NULL);
1753 
1754 		/* conflict detection and handling:
1755 		 * 1. wait on the sequence number,
1756 		 *    in case this data packet overtook ACK packets.
1757 		 * 2. check our hash tables for conflicting requests.
1758 		 *    we only need to walk the tl_hash, since an ee can not
1759 		 *    have a conflict with an other ee: on the submitting
1760 		 *    node, the corresponding req had already been conflicting,
1761 		 *    and a conflicting req is never sent.
1762 		 *
1763 		 * Note: for two_primaries, we are protocol C,
1764 		 * so there cannot be any request that is DONE
1765 		 * but still on the transfer log.
1766 		 *
1767 		 * unconditionally add to the ee_hash.
1768 		 *
1769 		 * if no conflicting request is found:
1770 		 *    submit.
1771 		 *
1772 		 * if any conflicting request is found
1773 		 * that has not yet been acked,
1774 		 * AND I have the "discard concurrent writes" flag:
1775 		 *	 queue (via done_ee) the P_DISCARD_ACK; OUT.
1776 		 *
1777 		 * if any conflicting request is found:
1778 		 *	 block the receiver, waiting on misc_wait
1779 		 *	 until no more conflicting requests are there,
1780 		 *	 or we get interrupted (disconnect).
1781 		 *
1782 		 *	 we do not just write after local io completion of those
1783 		 *	 requests, but only after req is done completely, i.e.
1784 		 *	 we wait for the P_DISCARD_ACK to arrive!
1785 		 *
1786 		 *	 then proceed normally, i.e. submit.
1787 		 */
1788 		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1789 			goto out_interrupted;
1790 
1791 		spin_lock_irq(&mdev->req_lock);
1792 
1793 		hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1794 
1795 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1796 		slot = tl_hash_slot(mdev, sector);
1797 		first = 1;
1798 		for (;;) {
1799 			int have_unacked = 0;
1800 			int have_conflict = 0;
1801 			prepare_to_wait(&mdev->misc_wait, &wait,
1802 				TASK_INTERRUPTIBLE);
1803 			hlist_for_each_entry(i, n, slot, colision) {
1804 				if (OVERLAPS) {
1805 					/* only ALERT on first iteration,
1806 					 * we may be woken up early... */
1807 					if (first)
1808 						dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1809 						      "	new: %llus +%u; pending: %llus +%u\n",
1810 						      current->comm, current->pid,
1811 						      (unsigned long long)sector, size,
1812 						      (unsigned long long)i->sector, i->size);
1813 					if (i->rq_state & RQ_NET_PENDING)
1814 						++have_unacked;
1815 					++have_conflict;
1816 				}
1817 			}
1818 #undef OVERLAPS
1819 			if (!have_conflict)
1820 				break;
1821 
1822 			/* Discard Ack only for the _first_ iteration */
1823 			if (first && discard && have_unacked) {
1824 				dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1825 				     (unsigned long long)sector);
1826 				inc_unacked(mdev);
1827 				e->w.cb = e_send_discard_ack;
1828 				list_add_tail(&e->w.list, &mdev->done_ee);
1829 
1830 				spin_unlock_irq(&mdev->req_lock);
1831 
1832 				/* we could probably send that P_DISCARD_ACK ourselves,
1833 				 * but I don't like the receiver using the msock */
1834 
1835 				put_ldev(mdev);
1836 				wake_asender(mdev);
1837 				finish_wait(&mdev->misc_wait, &wait);
1838 				return TRUE;
1839 			}
1840 
1841 			if (signal_pending(current)) {
1842 				hlist_del_init(&e->colision);
1843 
1844 				spin_unlock_irq(&mdev->req_lock);
1845 
1846 				finish_wait(&mdev->misc_wait, &wait);
1847 				goto out_interrupted;
1848 			}
1849 
1850 			spin_unlock_irq(&mdev->req_lock);
1851 			if (first) {
1852 				first = 0;
1853 				dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1854 				     "sec=%llus\n", (unsigned long long)sector);
1855 			} else if (discard) {
1856 				/* we had none on the first iteration.
1857 				 * there must be none now. */
1858 				D_ASSERT(have_unacked == 0);
1859 			}
1860 			schedule();
1861 			spin_lock_irq(&mdev->req_lock);
1862 		}
1863 		finish_wait(&mdev->misc_wait, &wait);
1864 	}
1865 
1866 	list_add(&e->w.list, &mdev->active_ee);
1867 	spin_unlock_irq(&mdev->req_lock);
1868 
1869 	switch (mdev->net_conf->wire_protocol) {
1870 	case DRBD_PROT_C:
1871 		inc_unacked(mdev);
1872 		/* corresponding dec_unacked() in e_end_block()
1873 		 * respective _drbd_clear_done_ee */
1874 		break;
1875 	case DRBD_PROT_B:
1876 		/* I really don't like it that the receiver thread
1877 		 * sends on the msock, but anyways */
1878 		drbd_send_ack(mdev, P_RECV_ACK, e);
1879 		break;
1880 	case DRBD_PROT_A:
1881 		/* nothing to do */
1882 		break;
1883 	}
1884 
1885 	if (mdev->state.pdsk == D_DISKLESS) {
1886 		/* In case we have the only disk of the cluster, */
1887 		drbd_set_out_of_sync(mdev, e->sector, e->size);
1888 		e->flags |= EE_CALL_AL_COMPLETE_IO;
1889 		drbd_al_begin_io(mdev, e->sector);
1890 	}
1891 
1892 	e->private_bio->bi_rw = rw;
1893 	drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
1894 	/* accounting done in endio */
1895 
1896 	maybe_kick_lo(mdev);
1897 	return TRUE;
1898 
1899 out_interrupted:
1900 	/* yes, the epoch_size now is imbalanced.
1901 	 * but we drop the connection anyways, so we don't have a chance to
1902 	 * receive a barrier... atomic_inc(&mdev->epoch_size); */
1903 	put_ldev(mdev);
1904 	drbd_free_ee(mdev, e);
1905 	return FALSE;
1906 }
1907 
1908 static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1909 {
1910 	sector_t sector;
1911 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1912 	struct drbd_epoch_entry *e;
1913 	struct digest_info *di = NULL;
1914 	int size, digest_size;
1915 	unsigned int fault_type;
1916 	struct p_block_req *p =
1917 		(struct p_block_req *)h;
1918 	const int brps = sizeof(*p)-sizeof(*h);
1919 
1920 	if (drbd_recv(mdev, h->payload, brps) != brps)
1921 		return FALSE;
1922 
1923 	sector = be64_to_cpu(p->sector);
1924 	size   = be32_to_cpu(p->blksize);
1925 
1926 	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
1927 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1928 				(unsigned long long)sector, size);
1929 		return FALSE;
1930 	}
1931 	if (sector + (size>>9) > capacity) {
1932 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1933 				(unsigned long long)sector, size);
1934 		return FALSE;
1935 	}
1936 
1937 	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1938 		if (__ratelimit(&drbd_ratelimit_state))
1939 			dev_err(DEV, "Can not satisfy peer's read request, "
1940 			    "no local data.\n");
1941 		drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
1942 				 P_NEG_RS_DREPLY , p);
1943 		return TRUE;
1944 	}
1945 
1946 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1947 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1948 	 * which in turn might block on the other node at this very place.  */
1949 	e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1950 	if (!e) {
1951 		put_ldev(mdev);
1952 		return FALSE;
1953 	}
1954 
1955 	e->private_bio->bi_rw = READ;
1956 	e->private_bio->bi_end_io = drbd_endio_read_sec;
1957 
1958 	switch (h->command) {
1959 	case P_DATA_REQUEST:
1960 		e->w.cb = w_e_end_data_req;
1961 		fault_type = DRBD_FAULT_DT_RD;
1962 		break;
1963 	case P_RS_DATA_REQUEST:
1964 		e->w.cb = w_e_end_rsdata_req;
1965 		fault_type = DRBD_FAULT_RS_RD;
1966 		/* Eventually this should become asynchronously. Currently it
1967 		 * blocks the whole receiver just to delay the reading of a
1968 		 * resync data block.
1969 		 * the drbd_work_queue mechanism is made for this...
1970 		 */
1971 		if (!drbd_rs_begin_io(mdev, sector)) {
1972 			/* we have been interrupted,
1973 			 * probably connection lost! */
1974 			D_ASSERT(signal_pending(current));
1975 			goto out_free_e;
1976 		}
1977 		break;
1978 
1979 	case P_OV_REPLY:
1980 	case P_CSUM_RS_REQUEST:
1981 		fault_type = DRBD_FAULT_RS_RD;
1982 		digest_size = h->length - brps ;
1983 		di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
1984 		if (!di)
1985 			goto out_free_e;
1986 
1987 		di->digest_size = digest_size;
1988 		di->digest = (((char *)di)+sizeof(struct digest_info));
1989 
1990 		if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
1991 			goto out_free_e;
1992 
1993 		e->block_id = (u64)(unsigned long)di;
1994 		if (h->command == P_CSUM_RS_REQUEST) {
1995 			D_ASSERT(mdev->agreed_pro_version >= 89);
1996 			e->w.cb = w_e_end_csum_rs_req;
1997 		} else if (h->command == P_OV_REPLY) {
1998 			e->w.cb = w_e_end_ov_reply;
1999 			dec_rs_pending(mdev);
2000 			break;
2001 		}
2002 
2003 		if (!drbd_rs_begin_io(mdev, sector)) {
2004 			/* we have been interrupted, probably connection lost! */
2005 			D_ASSERT(signal_pending(current));
2006 			goto out_free_e;
2007 		}
2008 		break;
2009 
2010 	case P_OV_REQUEST:
2011 		if (mdev->state.conn >= C_CONNECTED &&
2012 		    mdev->state.conn != C_VERIFY_T)
2013 			dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2014 				drbd_conn_str(mdev->state.conn));
2015 		if (mdev->ov_start_sector == ~(sector_t)0 &&
2016 		    mdev->agreed_pro_version >= 90) {
2017 			mdev->ov_start_sector = sector;
2018 			mdev->ov_position = sector;
2019 			mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2020 			dev_info(DEV, "Online Verify start sector: %llu\n",
2021 					(unsigned long long)sector);
2022 		}
2023 		e->w.cb = w_e_end_ov_req;
2024 		fault_type = DRBD_FAULT_RS_RD;
2025 		/* Eventually this should become asynchronous. Currently it
2026 		 * blocks the whole receiver just to delay the reading of a
2027 		 * resync data block.
2028 		 * the drbd_work_queue mechanism is made for this...
2029 		 */
2030 		if (!drbd_rs_begin_io(mdev, sector)) {
2031 			/* we have been interrupted,
2032 			 * probably connection lost! */
2033 			D_ASSERT(signal_pending(current));
2034 			goto out_free_e;
2035 		}
2036 		break;
2037 
2038 
2039 	default:
2040 		dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2041 		    cmdname(h->command));
2042 		fault_type = DRBD_FAULT_MAX;
2043 	}
2044 
2045 	spin_lock_irq(&mdev->req_lock);
2046 	list_add(&e->w.list, &mdev->read_ee);
2047 	spin_unlock_irq(&mdev->req_lock);
2048 
2049 	inc_unacked(mdev);
2050 
2051 	drbd_generic_make_request(mdev, fault_type, e->private_bio);
2052 	maybe_kick_lo(mdev);
2053 
2054 	return TRUE;
2055 
2056 out_free_e:
2057 	kfree(di);
2058 	put_ldev(mdev);
2059 	drbd_free_ee(mdev, e);
2060 	return FALSE;
2061 }
2062 
2063 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2064 {
2065 	int self, peer, rv = -100;
2066 	unsigned long ch_self, ch_peer;
2067 
2068 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2069 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2070 
2071 	ch_peer = mdev->p_uuid[UI_SIZE];
2072 	ch_self = mdev->comm_bm_set;
2073 
2074 	switch (mdev->net_conf->after_sb_0p) {
2075 	case ASB_CONSENSUS:
2076 	case ASB_DISCARD_SECONDARY:
2077 	case ASB_CALL_HELPER:
2078 		dev_err(DEV, "Configuration error.\n");
2079 		break;
2080 	case ASB_DISCONNECT:
2081 		break;
2082 	case ASB_DISCARD_YOUNGER_PRI:
2083 		if (self == 0 && peer == 1) {
2084 			rv = -1;
2085 			break;
2086 		}
2087 		if (self == 1 && peer == 0) {
2088 			rv =  1;
2089 			break;
2090 		}
2091 		/* Else fall through to one of the other strategies... */
2092 	case ASB_DISCARD_OLDER_PRI:
2093 		if (self == 0 && peer == 1) {
2094 			rv = 1;
2095 			break;
2096 		}
2097 		if (self == 1 && peer == 0) {
2098 			rv = -1;
2099 			break;
2100 		}
2101 		/* Else fall through to one of the other strategies... */
2102 		dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2103 		     "Using discard-least-changes instead\n");
2104 	case ASB_DISCARD_ZERO_CHG:
2105 		if (ch_peer == 0 && ch_self == 0) {
2106 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2107 				? -1 : 1;
2108 			break;
2109 		} else {
2110 			if (ch_peer == 0) { rv =  1; break; }
2111 			if (ch_self == 0) { rv = -1; break; }
2112 		}
2113 		if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2114 			break;
2115 	case ASB_DISCARD_LEAST_CHG:
2116 		if	(ch_self < ch_peer)
2117 			rv = -1;
2118 		else if (ch_self > ch_peer)
2119 			rv =  1;
2120 		else /* ( ch_self == ch_peer ) */
2121 		     /* Well, then use something else. */
2122 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2123 				? -1 : 1;
2124 		break;
2125 	case ASB_DISCARD_LOCAL:
2126 		rv = -1;
2127 		break;
2128 	case ASB_DISCARD_REMOTE:
2129 		rv =  1;
2130 	}
2131 
2132 	return rv;
2133 }
2134 
2135 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2136 {
2137 	int self, peer, hg, rv = -100;
2138 
2139 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2140 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2141 
2142 	switch (mdev->net_conf->after_sb_1p) {
2143 	case ASB_DISCARD_YOUNGER_PRI:
2144 	case ASB_DISCARD_OLDER_PRI:
2145 	case ASB_DISCARD_LEAST_CHG:
2146 	case ASB_DISCARD_LOCAL:
2147 	case ASB_DISCARD_REMOTE:
2148 		dev_err(DEV, "Configuration error.\n");
2149 		break;
2150 	case ASB_DISCONNECT:
2151 		break;
2152 	case ASB_CONSENSUS:
2153 		hg = drbd_asb_recover_0p(mdev);
2154 		if (hg == -1 && mdev->state.role == R_SECONDARY)
2155 			rv = hg;
2156 		if (hg == 1  && mdev->state.role == R_PRIMARY)
2157 			rv = hg;
2158 		break;
2159 	case ASB_VIOLENTLY:
2160 		rv = drbd_asb_recover_0p(mdev);
2161 		break;
2162 	case ASB_DISCARD_SECONDARY:
2163 		return mdev->state.role == R_PRIMARY ? 1 : -1;
2164 	case ASB_CALL_HELPER:
2165 		hg = drbd_asb_recover_0p(mdev);
2166 		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2167 			self = drbd_set_role(mdev, R_SECONDARY, 0);
2168 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2169 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2170 			  * we do not need to wait for the after state change work either. */
2171 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2172 			if (self != SS_SUCCESS) {
2173 				drbd_khelper(mdev, "pri-lost-after-sb");
2174 			} else {
2175 				dev_warn(DEV, "Successfully gave up primary role.\n");
2176 				rv = hg;
2177 			}
2178 		} else
2179 			rv = hg;
2180 	}
2181 
2182 	return rv;
2183 }
2184 
2185 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2186 {
2187 	int self, peer, hg, rv = -100;
2188 
2189 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2190 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2191 
2192 	switch (mdev->net_conf->after_sb_2p) {
2193 	case ASB_DISCARD_YOUNGER_PRI:
2194 	case ASB_DISCARD_OLDER_PRI:
2195 	case ASB_DISCARD_LEAST_CHG:
2196 	case ASB_DISCARD_LOCAL:
2197 	case ASB_DISCARD_REMOTE:
2198 	case ASB_CONSENSUS:
2199 	case ASB_DISCARD_SECONDARY:
2200 		dev_err(DEV, "Configuration error.\n");
2201 		break;
2202 	case ASB_VIOLENTLY:
2203 		rv = drbd_asb_recover_0p(mdev);
2204 		break;
2205 	case ASB_DISCONNECT:
2206 		break;
2207 	case ASB_CALL_HELPER:
2208 		hg = drbd_asb_recover_0p(mdev);
2209 		if (hg == -1) {
2210 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2211 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2212 			  * we do not need to wait for the after state change work either. */
2213 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2214 			if (self != SS_SUCCESS) {
2215 				drbd_khelper(mdev, "pri-lost-after-sb");
2216 			} else {
2217 				dev_warn(DEV, "Successfully gave up primary role.\n");
2218 				rv = hg;
2219 			}
2220 		} else
2221 			rv = hg;
2222 	}
2223 
2224 	return rv;
2225 }
2226 
2227 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2228 			   u64 bits, u64 flags)
2229 {
2230 	if (!uuid) {
2231 		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2232 		return;
2233 	}
2234 	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2235 	     text,
2236 	     (unsigned long long)uuid[UI_CURRENT],
2237 	     (unsigned long long)uuid[UI_BITMAP],
2238 	     (unsigned long long)uuid[UI_HISTORY_START],
2239 	     (unsigned long long)uuid[UI_HISTORY_END],
2240 	     (unsigned long long)bits,
2241 	     (unsigned long long)flags);
2242 }
2243 
2244 /*
2245   100	after split brain try auto recover
2246     2	C_SYNC_SOURCE set BitMap
2247     1	C_SYNC_SOURCE use BitMap
2248     0	no Sync
2249    -1	C_SYNC_TARGET use BitMap
2250    -2	C_SYNC_TARGET set BitMap
2251  -100	after split brain, disconnect
2252 -1000	unrelated data
2253  */
2254 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2255 {
2256 	u64 self, peer;
2257 	int i, j;
2258 
2259 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2260 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2261 
2262 	*rule_nr = 10;
2263 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2264 		return 0;
2265 
2266 	*rule_nr = 20;
2267 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2268 	     peer != UUID_JUST_CREATED)
2269 		return -2;
2270 
2271 	*rule_nr = 30;
2272 	if (self != UUID_JUST_CREATED &&
2273 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2274 		return 2;
2275 
2276 	if (self == peer) {
2277 		int rct, dc; /* roles at crash time */
2278 
2279 		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2280 
2281 			if (mdev->agreed_pro_version < 91)
2282 				return -1001;
2283 
2284 			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2285 			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2286 				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2287 				drbd_uuid_set_bm(mdev, 0UL);
2288 
2289 				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2290 					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2291 				*rule_nr = 34;
2292 			} else {
2293 				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2294 				*rule_nr = 36;
2295 			}
2296 
2297 			return 1;
2298 		}
2299 
2300 		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2301 
2302 			if (mdev->agreed_pro_version < 91)
2303 				return -1001;
2304 
2305 			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2306 			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2307 				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2308 
2309 				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2310 				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2311 				mdev->p_uuid[UI_BITMAP] = 0UL;
2312 
2313 				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2314 				*rule_nr = 35;
2315 			} else {
2316 				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2317 				*rule_nr = 37;
2318 			}
2319 
2320 			return -1;
2321 		}
2322 
2323 		/* Common power [off|failure] */
2324 		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2325 			(mdev->p_uuid[UI_FLAGS] & 2);
2326 		/* lowest bit is set when we were primary,
2327 		 * next bit (weight 2) is set when peer was primary */
2328 		*rule_nr = 40;
2329 
2330 		switch (rct) {
2331 		case 0: /* !self_pri && !peer_pri */ return 0;
2332 		case 1: /*  self_pri && !peer_pri */ return 1;
2333 		case 2: /* !self_pri &&  peer_pri */ return -1;
2334 		case 3: /*  self_pri &&  peer_pri */
2335 			dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2336 			return dc ? -1 : 1;
2337 		}
2338 	}
2339 
2340 	*rule_nr = 50;
2341 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2342 	if (self == peer)
2343 		return -1;
2344 
2345 	*rule_nr = 51;
2346 	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2347 	if (self == peer) {
2348 		self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2349 		peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2350 		if (self == peer) {
2351 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2352 			   resync as sync source modifications of the peer's UUIDs. */
2353 
2354 			if (mdev->agreed_pro_version < 91)
2355 				return -1001;
2356 
2357 			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2358 			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2359 			return -1;
2360 		}
2361 	}
2362 
2363 	*rule_nr = 60;
2364 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2365 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2366 		peer = mdev->p_uuid[i] & ~((u64)1);
2367 		if (self == peer)
2368 			return -2;
2369 	}
2370 
2371 	*rule_nr = 70;
2372 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2373 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2374 	if (self == peer)
2375 		return 1;
2376 
2377 	*rule_nr = 71;
2378 	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2379 	if (self == peer) {
2380 		self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2381 		peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2382 		if (self == peer) {
2383 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2384 			   resync as sync source modifications of our UUIDs. */
2385 
2386 			if (mdev->agreed_pro_version < 91)
2387 				return -1001;
2388 
2389 			_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2390 			_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2391 
2392 			dev_info(DEV, "Undid last start of resync:\n");
2393 
2394 			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2395 				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2396 
2397 			return 1;
2398 		}
2399 	}
2400 
2401 
2402 	*rule_nr = 80;
2403 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2404 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2405 		if (self == peer)
2406 			return 2;
2407 	}
2408 
2409 	*rule_nr = 90;
2410 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2411 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2412 	if (self == peer && self != ((u64)0))
2413 		return 100;
2414 
2415 	*rule_nr = 100;
2416 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2417 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2418 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2419 			peer = mdev->p_uuid[j] & ~((u64)1);
2420 			if (self == peer)
2421 				return -100;
2422 		}
2423 	}
2424 
2425 	return -1000;
2426 }
2427 
2428 /* drbd_sync_handshake() returns the new conn state on success, or
2429    CONN_MASK (-1) on failure.
2430  */
2431 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2432 					   enum drbd_disk_state peer_disk) __must_hold(local)
2433 {
2434 	int hg, rule_nr;
2435 	enum drbd_conns rv = C_MASK;
2436 	enum drbd_disk_state mydisk;
2437 
2438 	mydisk = mdev->state.disk;
2439 	if (mydisk == D_NEGOTIATING)
2440 		mydisk = mdev->new_state_tmp.disk;
2441 
2442 	dev_info(DEV, "drbd_sync_handshake:\n");
2443 	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2444 	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2445 		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2446 
2447 	hg = drbd_uuid_compare(mdev, &rule_nr);
2448 
2449 	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2450 
2451 	if (hg == -1000) {
2452 		dev_alert(DEV, "Unrelated data, aborting!\n");
2453 		return C_MASK;
2454 	}
2455 	if (hg == -1001) {
2456 		dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2457 		return C_MASK;
2458 	}
2459 
2460 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2461 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2462 		int f = (hg == -100) || abs(hg) == 2;
2463 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2464 		if (f)
2465 			hg = hg*2;
2466 		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2467 		     hg > 0 ? "source" : "target");
2468 	}
2469 
2470 	if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2471 		int pcount = (mdev->state.role == R_PRIMARY)
2472 			   + (peer_role == R_PRIMARY);
2473 		int forced = (hg == -100);
2474 
2475 		switch (pcount) {
2476 		case 0:
2477 			hg = drbd_asb_recover_0p(mdev);
2478 			break;
2479 		case 1:
2480 			hg = drbd_asb_recover_1p(mdev);
2481 			break;
2482 		case 2:
2483 			hg = drbd_asb_recover_2p(mdev);
2484 			break;
2485 		}
2486 		if (abs(hg) < 100) {
2487 			dev_warn(DEV, "Split-Brain detected, %d primaries, "
2488 			     "automatically solved. Sync from %s node\n",
2489 			     pcount, (hg < 0) ? "peer" : "this");
2490 			if (forced) {
2491 				dev_warn(DEV, "Doing a full sync, since"
2492 				     " UUIDs where ambiguous.\n");
2493 				hg = hg*2;
2494 			}
2495 		}
2496 	}
2497 
2498 	if (hg == -100) {
2499 		if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2500 			hg = -1;
2501 		if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2502 			hg = 1;
2503 
2504 		if (abs(hg) < 100)
2505 			dev_warn(DEV, "Split-Brain detected, manually solved. "
2506 			     "Sync from %s node\n",
2507 			     (hg < 0) ? "peer" : "this");
2508 	}
2509 
2510 	if (hg == -100) {
2511 		dev_alert(DEV, "Split-Brain detected, dropping connection!\n");
2512 		drbd_khelper(mdev, "split-brain");
2513 		return C_MASK;
2514 	}
2515 
2516 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
2517 		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2518 		return C_MASK;
2519 	}
2520 
2521 	if (hg < 0 && /* by intention we do not use mydisk here. */
2522 	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2523 		switch (mdev->net_conf->rr_conflict) {
2524 		case ASB_CALL_HELPER:
2525 			drbd_khelper(mdev, "pri-lost");
2526 			/* fall through */
2527 		case ASB_DISCONNECT:
2528 			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2529 			return C_MASK;
2530 		case ASB_VIOLENTLY:
2531 			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2532 			     "assumption\n");
2533 		}
2534 	}
2535 
2536 	if (abs(hg) >= 2) {
2537 		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2538 		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2539 			return C_MASK;
2540 	}
2541 
2542 	if (hg > 0) { /* become sync source. */
2543 		rv = C_WF_BITMAP_S;
2544 	} else if (hg < 0) { /* become sync target */
2545 		rv = C_WF_BITMAP_T;
2546 	} else {
2547 		rv = C_CONNECTED;
2548 		if (drbd_bm_total_weight(mdev)) {
2549 			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2550 			     drbd_bm_total_weight(mdev));
2551 		}
2552 	}
2553 
2554 	return rv;
2555 }
2556 
2557 /* returns 1 if invalid */
2558 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2559 {
2560 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2561 	if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2562 	    (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2563 		return 0;
2564 
2565 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2566 	if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2567 	    self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2568 		return 1;
2569 
2570 	/* everything else is valid if they are equal on both sides. */
2571 	if (peer == self)
2572 		return 0;
2573 
2574 	/* everything es is invalid. */
2575 	return 1;
2576 }
2577 
2578 static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2579 {
2580 	struct p_protocol *p = (struct p_protocol *)h;
2581 	int header_size, data_size;
2582 	int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2583 	int p_want_lose, p_two_primaries;
2584 	char p_integrity_alg[SHARED_SECRET_MAX] = "";
2585 
2586 	header_size = sizeof(*p) - sizeof(*h);
2587 	data_size   = h->length  - header_size;
2588 
2589 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2590 		return FALSE;
2591 
2592 	p_proto		= be32_to_cpu(p->protocol);
2593 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
2594 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
2595 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
2596 	p_want_lose	= be32_to_cpu(p->want_lose);
2597 	p_two_primaries = be32_to_cpu(p->two_primaries);
2598 
2599 	if (p_proto != mdev->net_conf->wire_protocol) {
2600 		dev_err(DEV, "incompatible communication protocols\n");
2601 		goto disconnect;
2602 	}
2603 
2604 	if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2605 		dev_err(DEV, "incompatible after-sb-0pri settings\n");
2606 		goto disconnect;
2607 	}
2608 
2609 	if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2610 		dev_err(DEV, "incompatible after-sb-1pri settings\n");
2611 		goto disconnect;
2612 	}
2613 
2614 	if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2615 		dev_err(DEV, "incompatible after-sb-2pri settings\n");
2616 		goto disconnect;
2617 	}
2618 
2619 	if (p_want_lose && mdev->net_conf->want_lose) {
2620 		dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2621 		goto disconnect;
2622 	}
2623 
2624 	if (p_two_primaries != mdev->net_conf->two_primaries) {
2625 		dev_err(DEV, "incompatible setting of the two-primaries options\n");
2626 		goto disconnect;
2627 	}
2628 
2629 	if (mdev->agreed_pro_version >= 87) {
2630 		unsigned char *my_alg = mdev->net_conf->integrity_alg;
2631 
2632 		if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2633 			return FALSE;
2634 
2635 		p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2636 		if (strcmp(p_integrity_alg, my_alg)) {
2637 			dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2638 			goto disconnect;
2639 		}
2640 		dev_info(DEV, "data-integrity-alg: %s\n",
2641 		     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2642 	}
2643 
2644 	return TRUE;
2645 
2646 disconnect:
2647 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2648 	return FALSE;
2649 }
2650 
2651 /* helper function
2652  * input: alg name, feature name
2653  * return: NULL (alg name was "")
2654  *         ERR_PTR(error) if something goes wrong
2655  *         or the crypto hash ptr, if it worked out ok. */
2656 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2657 		const char *alg, const char *name)
2658 {
2659 	struct crypto_hash *tfm;
2660 
2661 	if (!alg[0])
2662 		return NULL;
2663 
2664 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2665 	if (IS_ERR(tfm)) {
2666 		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2667 			alg, name, PTR_ERR(tfm));
2668 		return tfm;
2669 	}
2670 	if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2671 		crypto_free_hash(tfm);
2672 		dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2673 		return ERR_PTR(-EINVAL);
2674 	}
2675 	return tfm;
2676 }
2677 
2678 static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2679 {
2680 	int ok = TRUE;
2681 	struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2682 	unsigned int header_size, data_size, exp_max_sz;
2683 	struct crypto_hash *verify_tfm = NULL;
2684 	struct crypto_hash *csums_tfm = NULL;
2685 	const int apv = mdev->agreed_pro_version;
2686 
2687 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2688 		    : apv == 88 ? sizeof(struct p_rs_param)
2689 					+ SHARED_SECRET_MAX
2690 		    : /* 89 */    sizeof(struct p_rs_param_89);
2691 
2692 	if (h->length > exp_max_sz) {
2693 		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2694 		    h->length, exp_max_sz);
2695 		return FALSE;
2696 	}
2697 
2698 	if (apv <= 88) {
2699 		header_size = sizeof(struct p_rs_param) - sizeof(*h);
2700 		data_size   = h->length  - header_size;
2701 	} else /* apv >= 89 */ {
2702 		header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2703 		data_size   = h->length  - header_size;
2704 		D_ASSERT(data_size == 0);
2705 	}
2706 
2707 	/* initialize verify_alg and csums_alg */
2708 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2709 
2710 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2711 		return FALSE;
2712 
2713 	mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2714 
2715 	if (apv >= 88) {
2716 		if (apv == 88) {
2717 			if (data_size > SHARED_SECRET_MAX) {
2718 				dev_err(DEV, "verify-alg too long, "
2719 				    "peer wants %u, accepting only %u byte\n",
2720 						data_size, SHARED_SECRET_MAX);
2721 				return FALSE;
2722 			}
2723 
2724 			if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2725 				return FALSE;
2726 
2727 			/* we expect NUL terminated string */
2728 			/* but just in case someone tries to be evil */
2729 			D_ASSERT(p->verify_alg[data_size-1] == 0);
2730 			p->verify_alg[data_size-1] = 0;
2731 
2732 		} else /* apv >= 89 */ {
2733 			/* we still expect NUL terminated strings */
2734 			/* but just in case someone tries to be evil */
2735 			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2736 			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2737 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2738 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2739 		}
2740 
2741 		if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2742 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2743 				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2744 				    mdev->sync_conf.verify_alg, p->verify_alg);
2745 				goto disconnect;
2746 			}
2747 			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2748 					p->verify_alg, "verify-alg");
2749 			if (IS_ERR(verify_tfm)) {
2750 				verify_tfm = NULL;
2751 				goto disconnect;
2752 			}
2753 		}
2754 
2755 		if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2756 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2757 				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2758 				    mdev->sync_conf.csums_alg, p->csums_alg);
2759 				goto disconnect;
2760 			}
2761 			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2762 					p->csums_alg, "csums-alg");
2763 			if (IS_ERR(csums_tfm)) {
2764 				csums_tfm = NULL;
2765 				goto disconnect;
2766 			}
2767 		}
2768 
2769 
2770 		spin_lock(&mdev->peer_seq_lock);
2771 		/* lock against drbd_nl_syncer_conf() */
2772 		if (verify_tfm) {
2773 			strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2774 			mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2775 			crypto_free_hash(mdev->verify_tfm);
2776 			mdev->verify_tfm = verify_tfm;
2777 			dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2778 		}
2779 		if (csums_tfm) {
2780 			strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2781 			mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2782 			crypto_free_hash(mdev->csums_tfm);
2783 			mdev->csums_tfm = csums_tfm;
2784 			dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2785 		}
2786 		spin_unlock(&mdev->peer_seq_lock);
2787 	}
2788 
2789 	return ok;
2790 disconnect:
2791 	/* just for completeness: actually not needed,
2792 	 * as this is not reached if csums_tfm was ok. */
2793 	crypto_free_hash(csums_tfm);
2794 	/* but free the verify_tfm again, if csums_tfm did not work out */
2795 	crypto_free_hash(verify_tfm);
2796 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2797 	return FALSE;
2798 }
2799 
2800 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2801 {
2802 	/* sorry, we currently have no working implementation
2803 	 * of distributed TCQ */
2804 }
2805 
2806 /* warn if the arguments differ by more than 12.5% */
2807 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2808 	const char *s, sector_t a, sector_t b)
2809 {
2810 	sector_t d;
2811 	if (a == 0 || b == 0)
2812 		return;
2813 	d = (a > b) ? (a - b) : (b - a);
2814 	if (d > (a>>3) || d > (b>>3))
2815 		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2816 		     (unsigned long long)a, (unsigned long long)b);
2817 }
2818 
2819 static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2820 {
2821 	struct p_sizes *p = (struct p_sizes *)h;
2822 	enum determine_dev_size dd = unchanged;
2823 	unsigned int max_seg_s;
2824 	sector_t p_size, p_usize, my_usize;
2825 	int ldsc = 0; /* local disk size changed */
2826 	enum drbd_conns nconn;
2827 
2828 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2829 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
2830 		return FALSE;
2831 
2832 	p_size = be64_to_cpu(p->d_size);
2833 	p_usize = be64_to_cpu(p->u_size);
2834 
2835 	if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2836 		dev_err(DEV, "some backing storage is needed\n");
2837 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2838 		return FALSE;
2839 	}
2840 
2841 	/* just store the peer's disk size for now.
2842 	 * we still need to figure out whether we accept that. */
2843 	mdev->p_size = p_size;
2844 
2845 #define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2846 	if (get_ldev(mdev)) {
2847 		warn_if_differ_considerably(mdev, "lower level device sizes",
2848 			   p_size, drbd_get_max_capacity(mdev->ldev));
2849 		warn_if_differ_considerably(mdev, "user requested size",
2850 					    p_usize, mdev->ldev->dc.disk_size);
2851 
2852 		/* if this is the first connect, or an otherwise expected
2853 		 * param exchange, choose the minimum */
2854 		if (mdev->state.conn == C_WF_REPORT_PARAMS)
2855 			p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2856 					     p_usize);
2857 
2858 		my_usize = mdev->ldev->dc.disk_size;
2859 
2860 		if (mdev->ldev->dc.disk_size != p_usize) {
2861 			mdev->ldev->dc.disk_size = p_usize;
2862 			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2863 			     (unsigned long)mdev->ldev->dc.disk_size);
2864 		}
2865 
2866 		/* Never shrink a device with usable data during connect.
2867 		   But allow online shrinking if we are connected. */
2868 		if (drbd_new_dev_size(mdev, mdev->ldev) <
2869 		   drbd_get_capacity(mdev->this_bdev) &&
2870 		   mdev->state.disk >= D_OUTDATED &&
2871 		   mdev->state.conn < C_CONNECTED) {
2872 			dev_err(DEV, "The peer's disk size is too small!\n");
2873 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2874 			mdev->ldev->dc.disk_size = my_usize;
2875 			put_ldev(mdev);
2876 			return FALSE;
2877 		}
2878 		put_ldev(mdev);
2879 	}
2880 #undef min_not_zero
2881 
2882 	if (get_ldev(mdev)) {
2883 		dd = drbd_determin_dev_size(mdev);
2884 		put_ldev(mdev);
2885 		if (dd == dev_size_error)
2886 			return FALSE;
2887 		drbd_md_sync(mdev);
2888 	} else {
2889 		/* I am diskless, need to accept the peer's size. */
2890 		drbd_set_my_capacity(mdev, p_size);
2891 	}
2892 
2893 	if (mdev->p_uuid && mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
2894 		nconn = drbd_sync_handshake(mdev,
2895 				mdev->state.peer, mdev->state.pdsk);
2896 		put_ldev(mdev);
2897 
2898 		if (nconn == C_MASK) {
2899 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2900 			return FALSE;
2901 		}
2902 
2903 		if (drbd_request_state(mdev, NS(conn, nconn)) < SS_SUCCESS) {
2904 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2905 			return FALSE;
2906 		}
2907 	}
2908 
2909 	if (get_ldev(mdev)) {
2910 		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
2911 			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
2912 			ldsc = 1;
2913 		}
2914 
2915 		max_seg_s = be32_to_cpu(p->max_segment_size);
2916 		if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
2917 			drbd_setup_queue_param(mdev, max_seg_s);
2918 
2919 		drbd_setup_order_type(mdev, be32_to_cpu(p->queue_order_type));
2920 		put_ldev(mdev);
2921 	}
2922 
2923 	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
2924 		if (be64_to_cpu(p->c_size) !=
2925 		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
2926 			/* we have different sizes, probably peer
2927 			 * needs to know my new size... */
2928 			drbd_send_sizes(mdev, 0);
2929 		}
2930 		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
2931 		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
2932 			if (mdev->state.pdsk >= D_INCONSISTENT &&
2933 			    mdev->state.disk >= D_INCONSISTENT)
2934 				resync_after_online_grow(mdev);
2935 			else
2936 				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
2937 		}
2938 	}
2939 
2940 	return TRUE;
2941 }
2942 
2943 static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
2944 {
2945 	struct p_uuids *p = (struct p_uuids *)h;
2946 	u64 *p_uuid;
2947 	int i;
2948 
2949 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2950 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
2951 		return FALSE;
2952 
2953 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
2954 
2955 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
2956 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
2957 
2958 	kfree(mdev->p_uuid);
2959 	mdev->p_uuid = p_uuid;
2960 
2961 	if (mdev->state.conn < C_CONNECTED &&
2962 	    mdev->state.disk < D_INCONSISTENT &&
2963 	    mdev->state.role == R_PRIMARY &&
2964 	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
2965 		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
2966 		    (unsigned long long)mdev->ed_uuid);
2967 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2968 		return FALSE;
2969 	}
2970 
2971 	if (get_ldev(mdev)) {
2972 		int skip_initial_sync =
2973 			mdev->state.conn == C_CONNECTED &&
2974 			mdev->agreed_pro_version >= 90 &&
2975 			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
2976 			(p_uuid[UI_FLAGS] & 8);
2977 		if (skip_initial_sync) {
2978 			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
2979 			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
2980 					"clear_n_write from receive_uuids");
2981 			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
2982 			_drbd_uuid_set(mdev, UI_BITMAP, 0);
2983 			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
2984 					CS_VERBOSE, NULL);
2985 			drbd_md_sync(mdev);
2986 		}
2987 		put_ldev(mdev);
2988 	}
2989 
2990 	/* Before we test for the disk state, we should wait until an eventually
2991 	   ongoing cluster wide state change is finished. That is important if
2992 	   we are primary and are detaching from our disk. We need to see the
2993 	   new disk state... */
2994 	wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
2995 	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
2996 		drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
2997 
2998 	return TRUE;
2999 }
3000 
3001 /**
3002  * convert_state() - Converts the peer's view of the cluster state to our point of view
3003  * @ps:		The state as seen by the peer.
3004  */
3005 static union drbd_state convert_state(union drbd_state ps)
3006 {
3007 	union drbd_state ms;
3008 
3009 	static enum drbd_conns c_tab[] = {
3010 		[C_CONNECTED] = C_CONNECTED,
3011 
3012 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3013 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3014 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3015 		[C_VERIFY_S]       = C_VERIFY_T,
3016 		[C_MASK]   = C_MASK,
3017 	};
3018 
3019 	ms.i = ps.i;
3020 
3021 	ms.conn = c_tab[ps.conn];
3022 	ms.peer = ps.role;
3023 	ms.role = ps.peer;
3024 	ms.pdsk = ps.disk;
3025 	ms.disk = ps.pdsk;
3026 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3027 
3028 	return ms;
3029 }
3030 
3031 static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3032 {
3033 	struct p_req_state *p = (struct p_req_state *)h;
3034 	union drbd_state mask, val;
3035 	int rv;
3036 
3037 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3038 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3039 		return FALSE;
3040 
3041 	mask.i = be32_to_cpu(p->mask);
3042 	val.i = be32_to_cpu(p->val);
3043 
3044 	if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3045 	    test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3046 		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3047 		return TRUE;
3048 	}
3049 
3050 	mask = convert_state(mask);
3051 	val = convert_state(val);
3052 
3053 	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3054 
3055 	drbd_send_sr_reply(mdev, rv);
3056 	drbd_md_sync(mdev);
3057 
3058 	return TRUE;
3059 }
3060 
3061 static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3062 {
3063 	struct p_state *p = (struct p_state *)h;
3064 	enum drbd_conns nconn, oconn;
3065 	union drbd_state ns, peer_state;
3066 	enum drbd_disk_state real_peer_disk;
3067 	int rv;
3068 
3069 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3070 		return FALSE;
3071 
3072 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3073 		return FALSE;
3074 
3075 	peer_state.i = be32_to_cpu(p->state);
3076 
3077 	real_peer_disk = peer_state.disk;
3078 	if (peer_state.disk == D_NEGOTIATING) {
3079 		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3080 		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3081 	}
3082 
3083 	spin_lock_irq(&mdev->req_lock);
3084  retry:
3085 	oconn = nconn = mdev->state.conn;
3086 	spin_unlock_irq(&mdev->req_lock);
3087 
3088 	if (nconn == C_WF_REPORT_PARAMS)
3089 		nconn = C_CONNECTED;
3090 
3091 	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3092 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3093 		int cr; /* consider resync */
3094 
3095 		/* if we established a new connection */
3096 		cr  = (oconn < C_CONNECTED);
3097 		/* if we had an established connection
3098 		 * and one of the nodes newly attaches a disk */
3099 		cr |= (oconn == C_CONNECTED &&
3100 		       (peer_state.disk == D_NEGOTIATING ||
3101 			mdev->state.disk == D_NEGOTIATING));
3102 		/* if we have both been inconsistent, and the peer has been
3103 		 * forced to be UpToDate with --overwrite-data */
3104 		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3105 		/* if we had been plain connected, and the admin requested to
3106 		 * start a sync by "invalidate" or "invalidate-remote" */
3107 		cr |= (oconn == C_CONNECTED &&
3108 				(peer_state.conn >= C_STARTING_SYNC_S &&
3109 				 peer_state.conn <= C_WF_BITMAP_T));
3110 
3111 		if (cr)
3112 			nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3113 
3114 		put_ldev(mdev);
3115 		if (nconn == C_MASK) {
3116 			if (mdev->state.disk == D_NEGOTIATING) {
3117 				drbd_force_state(mdev, NS(disk, D_DISKLESS));
3118 				nconn = C_CONNECTED;
3119 			} else if (peer_state.disk == D_NEGOTIATING) {
3120 				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3121 				peer_state.disk = D_DISKLESS;
3122 			} else {
3123 				D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3124 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3125 				return FALSE;
3126 			}
3127 		}
3128 	}
3129 
3130 	spin_lock_irq(&mdev->req_lock);
3131 	if (mdev->state.conn != oconn)
3132 		goto retry;
3133 	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3134 	ns.i = mdev->state.i;
3135 	ns.conn = nconn;
3136 	ns.peer = peer_state.role;
3137 	ns.pdsk = real_peer_disk;
3138 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3139 	if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3140 		ns.disk = mdev->new_state_tmp.disk;
3141 
3142 	rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3143 	ns = mdev->state;
3144 	spin_unlock_irq(&mdev->req_lock);
3145 
3146 	if (rv < SS_SUCCESS) {
3147 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3148 		return FALSE;
3149 	}
3150 
3151 	if (oconn > C_WF_REPORT_PARAMS) {
3152 		if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3153 		    peer_state.disk != D_NEGOTIATING ) {
3154 			/* we want resync, peer has not yet decided to sync... */
3155 			/* Nowadays only used when forcing a node into primary role and
3156 			   setting its disk to UpToDate with that */
3157 			drbd_send_uuids(mdev);
3158 			drbd_send_state(mdev);
3159 		}
3160 	}
3161 
3162 	mdev->net_conf->want_lose = 0;
3163 
3164 	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3165 
3166 	return TRUE;
3167 }
3168 
3169 static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3170 {
3171 	struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3172 
3173 	wait_event(mdev->misc_wait,
3174 		   mdev->state.conn == C_WF_SYNC_UUID ||
3175 		   mdev->state.conn < C_CONNECTED ||
3176 		   mdev->state.disk < D_NEGOTIATING);
3177 
3178 	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3179 
3180 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3181 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3182 		return FALSE;
3183 
3184 	/* Here the _drbd_uuid_ functions are right, current should
3185 	   _not_ be rotated into the history */
3186 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3187 		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3188 		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3189 
3190 		drbd_start_resync(mdev, C_SYNC_TARGET);
3191 
3192 		put_ldev(mdev);
3193 	} else
3194 		dev_err(DEV, "Ignoring SyncUUID packet!\n");
3195 
3196 	return TRUE;
3197 }
3198 
3199 enum receive_bitmap_ret { OK, DONE, FAILED };
3200 
3201 static enum receive_bitmap_ret
3202 receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3203 	unsigned long *buffer, struct bm_xfer_ctx *c)
3204 {
3205 	unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3206 	unsigned want = num_words * sizeof(long);
3207 
3208 	if (want != h->length) {
3209 		dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3210 		return FAILED;
3211 	}
3212 	if (want == 0)
3213 		return DONE;
3214 	if (drbd_recv(mdev, buffer, want) != want)
3215 		return FAILED;
3216 
3217 	drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3218 
3219 	c->word_offset += num_words;
3220 	c->bit_offset = c->word_offset * BITS_PER_LONG;
3221 	if (c->bit_offset > c->bm_bits)
3222 		c->bit_offset = c->bm_bits;
3223 
3224 	return OK;
3225 }
3226 
3227 static enum receive_bitmap_ret
3228 recv_bm_rle_bits(struct drbd_conf *mdev,
3229 		struct p_compressed_bm *p,
3230 		struct bm_xfer_ctx *c)
3231 {
3232 	struct bitstream bs;
3233 	u64 look_ahead;
3234 	u64 rl;
3235 	u64 tmp;
3236 	unsigned long s = c->bit_offset;
3237 	unsigned long e;
3238 	int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3239 	int toggle = DCBP_get_start(p);
3240 	int have;
3241 	int bits;
3242 
3243 	bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3244 
3245 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
3246 	if (bits < 0)
3247 		return FAILED;
3248 
3249 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
3250 		bits = vli_decode_bits(&rl, look_ahead);
3251 		if (bits <= 0)
3252 			return FAILED;
3253 
3254 		if (toggle) {
3255 			e = s + rl -1;
3256 			if (e >= c->bm_bits) {
3257 				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3258 				return FAILED;
3259 			}
3260 			_drbd_bm_set_bits(mdev, s, e);
3261 		}
3262 
3263 		if (have < bits) {
3264 			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3265 				have, bits, look_ahead,
3266 				(unsigned int)(bs.cur.b - p->code),
3267 				(unsigned int)bs.buf_len);
3268 			return FAILED;
3269 		}
3270 		look_ahead >>= bits;
3271 		have -= bits;
3272 
3273 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3274 		if (bits < 0)
3275 			return FAILED;
3276 		look_ahead |= tmp << have;
3277 		have += bits;
3278 	}
3279 
3280 	c->bit_offset = s;
3281 	bm_xfer_ctx_bit_to_word_offset(c);
3282 
3283 	return (s == c->bm_bits) ? DONE : OK;
3284 }
3285 
3286 static enum receive_bitmap_ret
3287 decode_bitmap_c(struct drbd_conf *mdev,
3288 		struct p_compressed_bm *p,
3289 		struct bm_xfer_ctx *c)
3290 {
3291 	if (DCBP_get_code(p) == RLE_VLI_Bits)
3292 		return recv_bm_rle_bits(mdev, p, c);
3293 
3294 	/* other variants had been implemented for evaluation,
3295 	 * but have been dropped as this one turned out to be "best"
3296 	 * during all our tests. */
3297 
3298 	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3299 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3300 	return FAILED;
3301 }
3302 
3303 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3304 		const char *direction, struct bm_xfer_ctx *c)
3305 {
3306 	/* what would it take to transfer it "plaintext" */
3307 	unsigned plain = sizeof(struct p_header) *
3308 		((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3309 		+ c->bm_words * sizeof(long);
3310 	unsigned total = c->bytes[0] + c->bytes[1];
3311 	unsigned r;
3312 
3313 	/* total can not be zero. but just in case: */
3314 	if (total == 0)
3315 		return;
3316 
3317 	/* don't report if not compressed */
3318 	if (total >= plain)
3319 		return;
3320 
3321 	/* total < plain. check for overflow, still */
3322 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3323 		                    : (1000 * total / plain);
3324 
3325 	if (r > 1000)
3326 		r = 1000;
3327 
3328 	r = 1000 - r;
3329 	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3330 	     "total %u; compression: %u.%u%%\n",
3331 			direction,
3332 			c->bytes[1], c->packets[1],
3333 			c->bytes[0], c->packets[0],
3334 			total, r/10, r % 10);
3335 }
3336 
3337 /* Since we are processing the bitfield from lower addresses to higher,
3338    it does not matter if the process it in 32 bit chunks or 64 bit
3339    chunks as long as it is little endian. (Understand it as byte stream,
3340    beginning with the lowest byte...) If we would use big endian
3341    we would need to process it from the highest address to the lowest,
3342    in order to be agnostic to the 32 vs 64 bits issue.
3343 
3344    returns 0 on failure, 1 if we successfully received it. */
3345 static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3346 {
3347 	struct bm_xfer_ctx c;
3348 	void *buffer;
3349 	enum receive_bitmap_ret ret;
3350 	int ok = FALSE;
3351 
3352 	wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3353 
3354 	drbd_bm_lock(mdev, "receive bitmap");
3355 
3356 	/* maybe we should use some per thread scratch page,
3357 	 * and allocate that during initial device creation? */
3358 	buffer	 = (unsigned long *) __get_free_page(GFP_NOIO);
3359 	if (!buffer) {
3360 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3361 		goto out;
3362 	}
3363 
3364 	c = (struct bm_xfer_ctx) {
3365 		.bm_bits = drbd_bm_bits(mdev),
3366 		.bm_words = drbd_bm_words(mdev),
3367 	};
3368 
3369 	do {
3370 		if (h->command == P_BITMAP) {
3371 			ret = receive_bitmap_plain(mdev, h, buffer, &c);
3372 		} else if (h->command == P_COMPRESSED_BITMAP) {
3373 			/* MAYBE: sanity check that we speak proto >= 90,
3374 			 * and the feature is enabled! */
3375 			struct p_compressed_bm *p;
3376 
3377 			if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3378 				dev_err(DEV, "ReportCBitmap packet too large\n");
3379 				goto out;
3380 			}
3381 			/* use the page buff */
3382 			p = buffer;
3383 			memcpy(p, h, sizeof(*h));
3384 			if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3385 				goto out;
3386 			if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3387 				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3388 				return FAILED;
3389 			}
3390 			ret = decode_bitmap_c(mdev, p, &c);
3391 		} else {
3392 			dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3393 			goto out;
3394 		}
3395 
3396 		c.packets[h->command == P_BITMAP]++;
3397 		c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3398 
3399 		if (ret != OK)
3400 			break;
3401 
3402 		if (!drbd_recv_header(mdev, h))
3403 			goto out;
3404 	} while (ret == OK);
3405 	if (ret == FAILED)
3406 		goto out;
3407 
3408 	INFO_bm_xfer_stats(mdev, "receive", &c);
3409 
3410 	if (mdev->state.conn == C_WF_BITMAP_T) {
3411 		ok = !drbd_send_bitmap(mdev);
3412 		if (!ok)
3413 			goto out;
3414 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3415 		ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3416 		D_ASSERT(ok == SS_SUCCESS);
3417 	} else if (mdev->state.conn != C_WF_BITMAP_S) {
3418 		/* admin may have requested C_DISCONNECTING,
3419 		 * other threads may have noticed network errors */
3420 		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3421 		    drbd_conn_str(mdev->state.conn));
3422 	}
3423 
3424 	ok = TRUE;
3425  out:
3426 	drbd_bm_unlock(mdev);
3427 	if (ok && mdev->state.conn == C_WF_BITMAP_S)
3428 		drbd_start_resync(mdev, C_SYNC_SOURCE);
3429 	free_page((unsigned long) buffer);
3430 	return ok;
3431 }
3432 
3433 static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3434 {
3435 	/* TODO zero copy sink :) */
3436 	static char sink[128];
3437 	int size, want, r;
3438 
3439 	dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3440 	     h->command, h->length);
3441 
3442 	size = h->length;
3443 	while (size > 0) {
3444 		want = min_t(int, size, sizeof(sink));
3445 		r = drbd_recv(mdev, sink, want);
3446 		ERR_IF(r <= 0) break;
3447 		size -= r;
3448 	}
3449 	return size == 0;
3450 }
3451 
3452 static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3453 {
3454 	if (mdev->state.disk >= D_INCONSISTENT)
3455 		drbd_kick_lo(mdev);
3456 
3457 	/* Make sure we've acked all the TCP data associated
3458 	 * with the data requests being unplugged */
3459 	drbd_tcp_quickack(mdev->data.socket);
3460 
3461 	return TRUE;
3462 }
3463 
3464 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3465 
3466 static drbd_cmd_handler_f drbd_default_handler[] = {
3467 	[P_DATA]	    = receive_Data,
3468 	[P_DATA_REPLY]	    = receive_DataReply,
3469 	[P_RS_DATA_REPLY]   = receive_RSDataReply,
3470 	[P_BARRIER]	    = receive_Barrier,
3471 	[P_BITMAP]	    = receive_bitmap,
3472 	[P_COMPRESSED_BITMAP]    = receive_bitmap,
3473 	[P_UNPLUG_REMOTE]   = receive_UnplugRemote,
3474 	[P_DATA_REQUEST]    = receive_DataRequest,
3475 	[P_RS_DATA_REQUEST] = receive_DataRequest,
3476 	[P_SYNC_PARAM]	    = receive_SyncParam,
3477 	[P_SYNC_PARAM89]	   = receive_SyncParam,
3478 	[P_PROTOCOL]        = receive_protocol,
3479 	[P_UUIDS]	    = receive_uuids,
3480 	[P_SIZES]	    = receive_sizes,
3481 	[P_STATE]	    = receive_state,
3482 	[P_STATE_CHG_REQ]   = receive_req_state,
3483 	[P_SYNC_UUID]       = receive_sync_uuid,
3484 	[P_OV_REQUEST]      = receive_DataRequest,
3485 	[P_OV_REPLY]        = receive_DataRequest,
3486 	[P_CSUM_RS_REQUEST]    = receive_DataRequest,
3487 	/* anything missing from this table is in
3488 	 * the asender_tbl, see get_asender_cmd */
3489 	[P_MAX_CMD]	    = NULL,
3490 };
3491 
3492 static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3493 static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3494 
3495 static void drbdd(struct drbd_conf *mdev)
3496 {
3497 	drbd_cmd_handler_f handler;
3498 	struct p_header *header = &mdev->data.rbuf.header;
3499 
3500 	while (get_t_state(&mdev->receiver) == Running) {
3501 		drbd_thread_current_set_cpu(mdev);
3502 		if (!drbd_recv_header(mdev, header))
3503 			break;
3504 
3505 		if (header->command < P_MAX_CMD)
3506 			handler = drbd_cmd_handler[header->command];
3507 		else if (P_MAY_IGNORE < header->command
3508 		     && header->command < P_MAX_OPT_CMD)
3509 			handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3510 		else if (header->command > P_MAX_OPT_CMD)
3511 			handler = receive_skip;
3512 		else
3513 			handler = NULL;
3514 
3515 		if (unlikely(!handler)) {
3516 			dev_err(DEV, "unknown packet type %d, l: %d!\n",
3517 			    header->command, header->length);
3518 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3519 			break;
3520 		}
3521 		if (unlikely(!handler(mdev, header))) {
3522 			dev_err(DEV, "error receiving %s, l: %d!\n",
3523 			    cmdname(header->command), header->length);
3524 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3525 			break;
3526 		}
3527 	}
3528 }
3529 
3530 static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3531 {
3532 	struct hlist_head *slot;
3533 	struct hlist_node *pos;
3534 	struct hlist_node *tmp;
3535 	struct drbd_request *req;
3536 	int i;
3537 
3538 	/*
3539 	 * Application READ requests
3540 	 */
3541 	spin_lock_irq(&mdev->req_lock);
3542 	for (i = 0; i < APP_R_HSIZE; i++) {
3543 		slot = mdev->app_reads_hash+i;
3544 		hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3545 			/* it may (but should not any longer!)
3546 			 * be on the work queue; if that assert triggers,
3547 			 * we need to also grab the
3548 			 * spin_lock_irq(&mdev->data.work.q_lock);
3549 			 * and list_del_init here. */
3550 			D_ASSERT(list_empty(&req->w.list));
3551 			/* It would be nice to complete outside of spinlock.
3552 			 * But this is easier for now. */
3553 			_req_mod(req, connection_lost_while_pending);
3554 		}
3555 	}
3556 	for (i = 0; i < APP_R_HSIZE; i++)
3557 		if (!hlist_empty(mdev->app_reads_hash+i))
3558 			dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3559 				"%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3560 
3561 	memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3562 	spin_unlock_irq(&mdev->req_lock);
3563 }
3564 
3565 void drbd_flush_workqueue(struct drbd_conf *mdev)
3566 {
3567 	struct drbd_wq_barrier barr;
3568 
3569 	barr.w.cb = w_prev_work_done;
3570 	init_completion(&barr.done);
3571 	drbd_queue_work(&mdev->data.work, &barr.w);
3572 	wait_for_completion(&barr.done);
3573 }
3574 
3575 static void drbd_disconnect(struct drbd_conf *mdev)
3576 {
3577 	enum drbd_fencing_p fp;
3578 	union drbd_state os, ns;
3579 	int rv = SS_UNKNOWN_ERROR;
3580 	unsigned int i;
3581 
3582 	if (mdev->state.conn == C_STANDALONE)
3583 		return;
3584 	if (mdev->state.conn >= C_WF_CONNECTION)
3585 		dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3586 				drbd_conn_str(mdev->state.conn));
3587 
3588 	/* asender does not clean up anything. it must not interfere, either */
3589 	drbd_thread_stop(&mdev->asender);
3590 
3591 	mutex_lock(&mdev->data.mutex);
3592 	drbd_free_sock(mdev);
3593 	mutex_unlock(&mdev->data.mutex);
3594 
3595 	spin_lock_irq(&mdev->req_lock);
3596 	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3597 	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3598 	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3599 	spin_unlock_irq(&mdev->req_lock);
3600 
3601 	/* We do not have data structures that would allow us to
3602 	 * get the rs_pending_cnt down to 0 again.
3603 	 *  * On C_SYNC_TARGET we do not have any data structures describing
3604 	 *    the pending RSDataRequest's we have sent.
3605 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
3606 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3607 	 *  And no, it is not the sum of the reference counts in the
3608 	 *  resync_LRU. The resync_LRU tracks the whole operation including
3609 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3610 	 *  on the fly. */
3611 	drbd_rs_cancel_all(mdev);
3612 	mdev->rs_total = 0;
3613 	mdev->rs_failed = 0;
3614 	atomic_set(&mdev->rs_pending_cnt, 0);
3615 	wake_up(&mdev->misc_wait);
3616 
3617 	/* make sure syncer is stopped and w_resume_next_sg queued */
3618 	del_timer_sync(&mdev->resync_timer);
3619 	set_bit(STOP_SYNC_TIMER, &mdev->flags);
3620 	resync_timer_fn((unsigned long)mdev);
3621 
3622 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3623 	 * w_make_resync_request etc. which may still be on the worker queue
3624 	 * to be "canceled" */
3625 	drbd_flush_workqueue(mdev);
3626 
3627 	/* This also does reclaim_net_ee().  If we do this too early, we might
3628 	 * miss some resync ee and pages.*/
3629 	drbd_process_done_ee(mdev);
3630 
3631 	kfree(mdev->p_uuid);
3632 	mdev->p_uuid = NULL;
3633 
3634 	if (!mdev->state.susp)
3635 		tl_clear(mdev);
3636 
3637 	drbd_fail_pending_reads(mdev);
3638 
3639 	dev_info(DEV, "Connection closed\n");
3640 
3641 	drbd_md_sync(mdev);
3642 
3643 	fp = FP_DONT_CARE;
3644 	if (get_ldev(mdev)) {
3645 		fp = mdev->ldev->dc.fencing;
3646 		put_ldev(mdev);
3647 	}
3648 
3649 	if (mdev->state.role == R_PRIMARY) {
3650 		if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3651 			enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3652 			drbd_request_state(mdev, NS(pdsk, nps));
3653 		}
3654 	}
3655 
3656 	spin_lock_irq(&mdev->req_lock);
3657 	os = mdev->state;
3658 	if (os.conn >= C_UNCONNECTED) {
3659 		/* Do not restart in case we are C_DISCONNECTING */
3660 		ns = os;
3661 		ns.conn = C_UNCONNECTED;
3662 		rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3663 	}
3664 	spin_unlock_irq(&mdev->req_lock);
3665 
3666 	if (os.conn == C_DISCONNECTING) {
3667 		struct hlist_head *h;
3668 		wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3669 
3670 		/* we must not free the tl_hash
3671 		 * while application io is still on the fly */
3672 		wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3673 
3674 		spin_lock_irq(&mdev->req_lock);
3675 		/* paranoia code */
3676 		for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3677 			if (h->first)
3678 				dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3679 						(int)(h - mdev->ee_hash), h->first);
3680 		kfree(mdev->ee_hash);
3681 		mdev->ee_hash = NULL;
3682 		mdev->ee_hash_s = 0;
3683 
3684 		/* paranoia code */
3685 		for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3686 			if (h->first)
3687 				dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3688 						(int)(h - mdev->tl_hash), h->first);
3689 		kfree(mdev->tl_hash);
3690 		mdev->tl_hash = NULL;
3691 		mdev->tl_hash_s = 0;
3692 		spin_unlock_irq(&mdev->req_lock);
3693 
3694 		crypto_free_hash(mdev->cram_hmac_tfm);
3695 		mdev->cram_hmac_tfm = NULL;
3696 
3697 		kfree(mdev->net_conf);
3698 		mdev->net_conf = NULL;
3699 		drbd_request_state(mdev, NS(conn, C_STANDALONE));
3700 	}
3701 
3702 	/* tcp_close and release of sendpage pages can be deferred.  I don't
3703 	 * want to use SO_LINGER, because apparently it can be deferred for
3704 	 * more than 20 seconds (longest time I checked).
3705 	 *
3706 	 * Actually we don't care for exactly when the network stack does its
3707 	 * put_page(), but release our reference on these pages right here.
3708 	 */
3709 	i = drbd_release_ee(mdev, &mdev->net_ee);
3710 	if (i)
3711 		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3712 	i = atomic_read(&mdev->pp_in_use);
3713 	if (i)
3714 		dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
3715 
3716 	D_ASSERT(list_empty(&mdev->read_ee));
3717 	D_ASSERT(list_empty(&mdev->active_ee));
3718 	D_ASSERT(list_empty(&mdev->sync_ee));
3719 	D_ASSERT(list_empty(&mdev->done_ee));
3720 
3721 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3722 	atomic_set(&mdev->current_epoch->epoch_size, 0);
3723 	D_ASSERT(list_empty(&mdev->current_epoch->list));
3724 }
3725 
3726 /*
3727  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3728  * we can agree on is stored in agreed_pro_version.
3729  *
3730  * feature flags and the reserved array should be enough room for future
3731  * enhancements of the handshake protocol, and possible plugins...
3732  *
3733  * for now, they are expected to be zero, but ignored.
3734  */
3735 static int drbd_send_handshake(struct drbd_conf *mdev)
3736 {
3737 	/* ASSERT current == mdev->receiver ... */
3738 	struct p_handshake *p = &mdev->data.sbuf.handshake;
3739 	int ok;
3740 
3741 	if (mutex_lock_interruptible(&mdev->data.mutex)) {
3742 		dev_err(DEV, "interrupted during initial handshake\n");
3743 		return 0; /* interrupted. not ok. */
3744 	}
3745 
3746 	if (mdev->data.socket == NULL) {
3747 		mutex_unlock(&mdev->data.mutex);
3748 		return 0;
3749 	}
3750 
3751 	memset(p, 0, sizeof(*p));
3752 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3753 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3754 	ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3755 			     (struct p_header *)p, sizeof(*p), 0 );
3756 	mutex_unlock(&mdev->data.mutex);
3757 	return ok;
3758 }
3759 
3760 /*
3761  * return values:
3762  *   1 yes, we have a valid connection
3763  *   0 oops, did not work out, please try again
3764  *  -1 peer talks different language,
3765  *     no point in trying again, please go standalone.
3766  */
3767 static int drbd_do_handshake(struct drbd_conf *mdev)
3768 {
3769 	/* ASSERT current == mdev->receiver ... */
3770 	struct p_handshake *p = &mdev->data.rbuf.handshake;
3771 	const int expect = sizeof(struct p_handshake)
3772 			  -sizeof(struct p_header);
3773 	int rv;
3774 
3775 	rv = drbd_send_handshake(mdev);
3776 	if (!rv)
3777 		return 0;
3778 
3779 	rv = drbd_recv_header(mdev, &p->head);
3780 	if (!rv)
3781 		return 0;
3782 
3783 	if (p->head.command != P_HAND_SHAKE) {
3784 		dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3785 		     cmdname(p->head.command), p->head.command);
3786 		return -1;
3787 	}
3788 
3789 	if (p->head.length != expect) {
3790 		dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3791 		     expect, p->head.length);
3792 		return -1;
3793 	}
3794 
3795 	rv = drbd_recv(mdev, &p->head.payload, expect);
3796 
3797 	if (rv != expect) {
3798 		dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3799 		return 0;
3800 	}
3801 
3802 	p->protocol_min = be32_to_cpu(p->protocol_min);
3803 	p->protocol_max = be32_to_cpu(p->protocol_max);
3804 	if (p->protocol_max == 0)
3805 		p->protocol_max = p->protocol_min;
3806 
3807 	if (PRO_VERSION_MAX < p->protocol_min ||
3808 	    PRO_VERSION_MIN > p->protocol_max)
3809 		goto incompat;
3810 
3811 	mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3812 
3813 	dev_info(DEV, "Handshake successful: "
3814 	     "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3815 
3816 	return 1;
3817 
3818  incompat:
3819 	dev_err(DEV, "incompatible DRBD dialects: "
3820 	    "I support %d-%d, peer supports %d-%d\n",
3821 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
3822 	    p->protocol_min, p->protocol_max);
3823 	return -1;
3824 }
3825 
3826 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3827 static int drbd_do_auth(struct drbd_conf *mdev)
3828 {
3829 	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3830 	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3831 	return 0;
3832 }
3833 #else
3834 #define CHALLENGE_LEN 64
3835 static int drbd_do_auth(struct drbd_conf *mdev)
3836 {
3837 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
3838 	struct scatterlist sg;
3839 	char *response = NULL;
3840 	char *right_response = NULL;
3841 	char *peers_ch = NULL;
3842 	struct p_header p;
3843 	unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3844 	unsigned int resp_size;
3845 	struct hash_desc desc;
3846 	int rv;
3847 
3848 	desc.tfm = mdev->cram_hmac_tfm;
3849 	desc.flags = 0;
3850 
3851 	rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3852 				(u8 *)mdev->net_conf->shared_secret, key_len);
3853 	if (rv) {
3854 		dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
3855 		rv = 0;
3856 		goto fail;
3857 	}
3858 
3859 	get_random_bytes(my_challenge, CHALLENGE_LEN);
3860 
3861 	rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
3862 	if (!rv)
3863 		goto fail;
3864 
3865 	rv = drbd_recv_header(mdev, &p);
3866 	if (!rv)
3867 		goto fail;
3868 
3869 	if (p.command != P_AUTH_CHALLENGE) {
3870 		dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
3871 		    cmdname(p.command), p.command);
3872 		rv = 0;
3873 		goto fail;
3874 	}
3875 
3876 	if (p.length > CHALLENGE_LEN*2) {
3877 		dev_err(DEV, "expected AuthChallenge payload too big.\n");
3878 		rv = 0;
3879 		goto fail;
3880 	}
3881 
3882 	peers_ch = kmalloc(p.length, GFP_NOIO);
3883 	if (peers_ch == NULL) {
3884 		dev_err(DEV, "kmalloc of peers_ch failed\n");
3885 		rv = 0;
3886 		goto fail;
3887 	}
3888 
3889 	rv = drbd_recv(mdev, peers_ch, p.length);
3890 
3891 	if (rv != p.length) {
3892 		dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
3893 		rv = 0;
3894 		goto fail;
3895 	}
3896 
3897 	resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
3898 	response = kmalloc(resp_size, GFP_NOIO);
3899 	if (response == NULL) {
3900 		dev_err(DEV, "kmalloc of response failed\n");
3901 		rv = 0;
3902 		goto fail;
3903 	}
3904 
3905 	sg_init_table(&sg, 1);
3906 	sg_set_buf(&sg, peers_ch, p.length);
3907 
3908 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
3909 	if (rv) {
3910 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3911 		rv = 0;
3912 		goto fail;
3913 	}
3914 
3915 	rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
3916 	if (!rv)
3917 		goto fail;
3918 
3919 	rv = drbd_recv_header(mdev, &p);
3920 	if (!rv)
3921 		goto fail;
3922 
3923 	if (p.command != P_AUTH_RESPONSE) {
3924 		dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
3925 		    cmdname(p.command), p.command);
3926 		rv = 0;
3927 		goto fail;
3928 	}
3929 
3930 	if (p.length != resp_size) {
3931 		dev_err(DEV, "expected AuthResponse payload of wrong size\n");
3932 		rv = 0;
3933 		goto fail;
3934 	}
3935 
3936 	rv = drbd_recv(mdev, response , resp_size);
3937 
3938 	if (rv != resp_size) {
3939 		dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
3940 		rv = 0;
3941 		goto fail;
3942 	}
3943 
3944 	right_response = kmalloc(resp_size, GFP_NOIO);
3945 	if (response == NULL) {
3946 		dev_err(DEV, "kmalloc of right_response failed\n");
3947 		rv = 0;
3948 		goto fail;
3949 	}
3950 
3951 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
3952 
3953 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
3954 	if (rv) {
3955 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3956 		rv = 0;
3957 		goto fail;
3958 	}
3959 
3960 	rv = !memcmp(response, right_response, resp_size);
3961 
3962 	if (rv)
3963 		dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
3964 		     resp_size, mdev->net_conf->cram_hmac_alg);
3965 
3966  fail:
3967 	kfree(peers_ch);
3968 	kfree(response);
3969 	kfree(right_response);
3970 
3971 	return rv;
3972 }
3973 #endif
3974 
3975 int drbdd_init(struct drbd_thread *thi)
3976 {
3977 	struct drbd_conf *mdev = thi->mdev;
3978 	unsigned int minor = mdev_to_minor(mdev);
3979 	int h;
3980 
3981 	sprintf(current->comm, "drbd%d_receiver", minor);
3982 
3983 	dev_info(DEV, "receiver (re)started\n");
3984 
3985 	do {
3986 		h = drbd_connect(mdev);
3987 		if (h == 0) {
3988 			drbd_disconnect(mdev);
3989 			__set_current_state(TASK_INTERRUPTIBLE);
3990 			schedule_timeout(HZ);
3991 		}
3992 		if (h == -1) {
3993 			dev_warn(DEV, "Discarding network configuration.\n");
3994 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3995 		}
3996 	} while (h == 0);
3997 
3998 	if (h > 0) {
3999 		if (get_net_conf(mdev)) {
4000 			drbdd(mdev);
4001 			put_net_conf(mdev);
4002 		}
4003 	}
4004 
4005 	drbd_disconnect(mdev);
4006 
4007 	dev_info(DEV, "receiver terminated\n");
4008 	return 0;
4009 }
4010 
4011 /* ********* acknowledge sender ******** */
4012 
4013 static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4014 {
4015 	struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4016 
4017 	int retcode = be32_to_cpu(p->retcode);
4018 
4019 	if (retcode >= SS_SUCCESS) {
4020 		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4021 	} else {
4022 		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4023 		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4024 		    drbd_set_st_err_str(retcode), retcode);
4025 	}
4026 	wake_up(&mdev->state_wait);
4027 
4028 	return TRUE;
4029 }
4030 
4031 static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4032 {
4033 	return drbd_send_ping_ack(mdev);
4034 
4035 }
4036 
4037 static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4038 {
4039 	/* restore idle timeout */
4040 	mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4041 
4042 	return TRUE;
4043 }
4044 
4045 static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4046 {
4047 	struct p_block_ack *p = (struct p_block_ack *)h;
4048 	sector_t sector = be64_to_cpu(p->sector);
4049 	int blksize = be32_to_cpu(p->blksize);
4050 
4051 	D_ASSERT(mdev->agreed_pro_version >= 89);
4052 
4053 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4054 
4055 	drbd_rs_complete_io(mdev, sector);
4056 	drbd_set_in_sync(mdev, sector, blksize);
4057 	/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4058 	mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4059 	dec_rs_pending(mdev);
4060 
4061 	return TRUE;
4062 }
4063 
4064 /* when we receive the ACK for a write request,
4065  * verify that we actually know about it */
4066 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4067 	u64 id, sector_t sector)
4068 {
4069 	struct hlist_head *slot = tl_hash_slot(mdev, sector);
4070 	struct hlist_node *n;
4071 	struct drbd_request *req;
4072 
4073 	hlist_for_each_entry(req, n, slot, colision) {
4074 		if ((unsigned long)req == (unsigned long)id) {
4075 			if (req->sector != sector) {
4076 				dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4077 				    "wrong sector (%llus versus %llus)\n", req,
4078 				    (unsigned long long)req->sector,
4079 				    (unsigned long long)sector);
4080 				break;
4081 			}
4082 			return req;
4083 		}
4084 	}
4085 	dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4086 		(void *)(unsigned long)id, (unsigned long long)sector);
4087 	return NULL;
4088 }
4089 
4090 typedef struct drbd_request *(req_validator_fn)
4091 	(struct drbd_conf *mdev, u64 id, sector_t sector);
4092 
4093 static int validate_req_change_req_state(struct drbd_conf *mdev,
4094 	u64 id, sector_t sector, req_validator_fn validator,
4095 	const char *func, enum drbd_req_event what)
4096 {
4097 	struct drbd_request *req;
4098 	struct bio_and_error m;
4099 
4100 	spin_lock_irq(&mdev->req_lock);
4101 	req = validator(mdev, id, sector);
4102 	if (unlikely(!req)) {
4103 		spin_unlock_irq(&mdev->req_lock);
4104 		dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4105 		return FALSE;
4106 	}
4107 	__req_mod(req, what, &m);
4108 	spin_unlock_irq(&mdev->req_lock);
4109 
4110 	if (m.bio)
4111 		complete_master_bio(mdev, &m);
4112 	return TRUE;
4113 }
4114 
4115 static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4116 {
4117 	struct p_block_ack *p = (struct p_block_ack *)h;
4118 	sector_t sector = be64_to_cpu(p->sector);
4119 	int blksize = be32_to_cpu(p->blksize);
4120 	enum drbd_req_event what;
4121 
4122 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4123 
4124 	if (is_syncer_block_id(p->block_id)) {
4125 		drbd_set_in_sync(mdev, sector, blksize);
4126 		dec_rs_pending(mdev);
4127 		return TRUE;
4128 	}
4129 	switch (be16_to_cpu(h->command)) {
4130 	case P_RS_WRITE_ACK:
4131 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4132 		what = write_acked_by_peer_and_sis;
4133 		break;
4134 	case P_WRITE_ACK:
4135 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4136 		what = write_acked_by_peer;
4137 		break;
4138 	case P_RECV_ACK:
4139 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4140 		what = recv_acked_by_peer;
4141 		break;
4142 	case P_DISCARD_ACK:
4143 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4144 		what = conflict_discarded_by_peer;
4145 		break;
4146 	default:
4147 		D_ASSERT(0);
4148 		return FALSE;
4149 	}
4150 
4151 	return validate_req_change_req_state(mdev, p->block_id, sector,
4152 		_ack_id_to_req, __func__ , what);
4153 }
4154 
4155 static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4156 {
4157 	struct p_block_ack *p = (struct p_block_ack *)h;
4158 	sector_t sector = be64_to_cpu(p->sector);
4159 
4160 	if (__ratelimit(&drbd_ratelimit_state))
4161 		dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4162 
4163 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4164 
4165 	if (is_syncer_block_id(p->block_id)) {
4166 		int size = be32_to_cpu(p->blksize);
4167 		dec_rs_pending(mdev);
4168 		drbd_rs_failed_io(mdev, sector, size);
4169 		return TRUE;
4170 	}
4171 	return validate_req_change_req_state(mdev, p->block_id, sector,
4172 		_ack_id_to_req, __func__ , neg_acked);
4173 }
4174 
4175 static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4176 {
4177 	struct p_block_ack *p = (struct p_block_ack *)h;
4178 	sector_t sector = be64_to_cpu(p->sector);
4179 
4180 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4181 	dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4182 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
4183 
4184 	return validate_req_change_req_state(mdev, p->block_id, sector,
4185 		_ar_id_to_req, __func__ , neg_acked);
4186 }
4187 
4188 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4189 {
4190 	sector_t sector;
4191 	int size;
4192 	struct p_block_ack *p = (struct p_block_ack *)h;
4193 
4194 	sector = be64_to_cpu(p->sector);
4195 	size = be32_to_cpu(p->blksize);
4196 	D_ASSERT(p->block_id == ID_SYNCER);
4197 
4198 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4199 
4200 	dec_rs_pending(mdev);
4201 
4202 	if (get_ldev_if_state(mdev, D_FAILED)) {
4203 		drbd_rs_complete_io(mdev, sector);
4204 		drbd_rs_failed_io(mdev, sector, size);
4205 		put_ldev(mdev);
4206 	}
4207 
4208 	return TRUE;
4209 }
4210 
4211 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4212 {
4213 	struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4214 
4215 	tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4216 
4217 	return TRUE;
4218 }
4219 
4220 static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4221 {
4222 	struct p_block_ack *p = (struct p_block_ack *)h;
4223 	struct drbd_work *w;
4224 	sector_t sector;
4225 	int size;
4226 
4227 	sector = be64_to_cpu(p->sector);
4228 	size = be32_to_cpu(p->blksize);
4229 
4230 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4231 
4232 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4233 		drbd_ov_oos_found(mdev, sector, size);
4234 	else
4235 		ov_oos_print(mdev);
4236 
4237 	drbd_rs_complete_io(mdev, sector);
4238 	dec_rs_pending(mdev);
4239 
4240 	if (--mdev->ov_left == 0) {
4241 		w = kmalloc(sizeof(*w), GFP_NOIO);
4242 		if (w) {
4243 			w->cb = w_ov_finished;
4244 			drbd_queue_work_front(&mdev->data.work, w);
4245 		} else {
4246 			dev_err(DEV, "kmalloc(w) failed.");
4247 			ov_oos_print(mdev);
4248 			drbd_resync_finished(mdev);
4249 		}
4250 	}
4251 	return TRUE;
4252 }
4253 
4254 struct asender_cmd {
4255 	size_t pkt_size;
4256 	int (*process)(struct drbd_conf *mdev, struct p_header *h);
4257 };
4258 
4259 static struct asender_cmd *get_asender_cmd(int cmd)
4260 {
4261 	static struct asender_cmd asender_tbl[] = {
4262 		/* anything missing from this table is in
4263 		 * the drbd_cmd_handler (drbd_default_handler) table,
4264 		 * see the beginning of drbdd() */
4265 	[P_PING]	    = { sizeof(struct p_header), got_Ping },
4266 	[P_PING_ACK]	    = { sizeof(struct p_header), got_PingAck },
4267 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4268 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4269 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4270 	[P_DISCARD_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4271 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
4272 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
4273 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4274 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
4275 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
4276 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4277 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4278 	[P_MAX_CMD]	    = { 0, NULL },
4279 	};
4280 	if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4281 		return NULL;
4282 	return &asender_tbl[cmd];
4283 }
4284 
4285 int drbd_asender(struct drbd_thread *thi)
4286 {
4287 	struct drbd_conf *mdev = thi->mdev;
4288 	struct p_header *h = &mdev->meta.rbuf.header;
4289 	struct asender_cmd *cmd = NULL;
4290 
4291 	int rv, len;
4292 	void *buf    = h;
4293 	int received = 0;
4294 	int expect   = sizeof(struct p_header);
4295 	int empty;
4296 
4297 	sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4298 
4299 	current->policy = SCHED_RR;  /* Make this a realtime task! */
4300 	current->rt_priority = 2;    /* more important than all other tasks */
4301 
4302 	while (get_t_state(thi) == Running) {
4303 		drbd_thread_current_set_cpu(mdev);
4304 		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4305 			ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4306 			mdev->meta.socket->sk->sk_rcvtimeo =
4307 				mdev->net_conf->ping_timeo*HZ/10;
4308 		}
4309 
4310 		/* conditionally cork;
4311 		 * it may hurt latency if we cork without much to send */
4312 		if (!mdev->net_conf->no_cork &&
4313 			3 < atomic_read(&mdev->unacked_cnt))
4314 			drbd_tcp_cork(mdev->meta.socket);
4315 		while (1) {
4316 			clear_bit(SIGNAL_ASENDER, &mdev->flags);
4317 			flush_signals(current);
4318 			if (!drbd_process_done_ee(mdev)) {
4319 				dev_err(DEV, "process_done_ee() = NOT_OK\n");
4320 				goto reconnect;
4321 			}
4322 			/* to avoid race with newly queued ACKs */
4323 			set_bit(SIGNAL_ASENDER, &mdev->flags);
4324 			spin_lock_irq(&mdev->req_lock);
4325 			empty = list_empty(&mdev->done_ee);
4326 			spin_unlock_irq(&mdev->req_lock);
4327 			/* new ack may have been queued right here,
4328 			 * but then there is also a signal pending,
4329 			 * and we start over... */
4330 			if (empty)
4331 				break;
4332 		}
4333 		/* but unconditionally uncork unless disabled */
4334 		if (!mdev->net_conf->no_cork)
4335 			drbd_tcp_uncork(mdev->meta.socket);
4336 
4337 		/* short circuit, recv_msg would return EINTR anyways. */
4338 		if (signal_pending(current))
4339 			continue;
4340 
4341 		rv = drbd_recv_short(mdev, mdev->meta.socket,
4342 				     buf, expect-received, 0);
4343 		clear_bit(SIGNAL_ASENDER, &mdev->flags);
4344 
4345 		flush_signals(current);
4346 
4347 		/* Note:
4348 		 * -EINTR	 (on meta) we got a signal
4349 		 * -EAGAIN	 (on meta) rcvtimeo expired
4350 		 * -ECONNRESET	 other side closed the connection
4351 		 * -ERESTARTSYS  (on data) we got a signal
4352 		 * rv <  0	 other than above: unexpected error!
4353 		 * rv == expected: full header or command
4354 		 * rv <  expected: "woken" by signal during receive
4355 		 * rv == 0	 : "connection shut down by peer"
4356 		 */
4357 		if (likely(rv > 0)) {
4358 			received += rv;
4359 			buf	 += rv;
4360 		} else if (rv == 0) {
4361 			dev_err(DEV, "meta connection shut down by peer.\n");
4362 			goto reconnect;
4363 		} else if (rv == -EAGAIN) {
4364 			if (mdev->meta.socket->sk->sk_rcvtimeo ==
4365 			    mdev->net_conf->ping_timeo*HZ/10) {
4366 				dev_err(DEV, "PingAck did not arrive in time.\n");
4367 				goto reconnect;
4368 			}
4369 			set_bit(SEND_PING, &mdev->flags);
4370 			continue;
4371 		} else if (rv == -EINTR) {
4372 			continue;
4373 		} else {
4374 			dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4375 			goto reconnect;
4376 		}
4377 
4378 		if (received == expect && cmd == NULL) {
4379 			if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4380 				dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4381 				    (long)be32_to_cpu(h->magic),
4382 				    h->command, h->length);
4383 				goto reconnect;
4384 			}
4385 			cmd = get_asender_cmd(be16_to_cpu(h->command));
4386 			len = be16_to_cpu(h->length);
4387 			if (unlikely(cmd == NULL)) {
4388 				dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4389 				    (long)be32_to_cpu(h->magic),
4390 				    h->command, h->length);
4391 				goto disconnect;
4392 			}
4393 			expect = cmd->pkt_size;
4394 			ERR_IF(len != expect-sizeof(struct p_header))
4395 				goto reconnect;
4396 		}
4397 		if (received == expect) {
4398 			D_ASSERT(cmd != NULL);
4399 			if (!cmd->process(mdev, h))
4400 				goto reconnect;
4401 
4402 			buf	 = h;
4403 			received = 0;
4404 			expect	 = sizeof(struct p_header);
4405 			cmd	 = NULL;
4406 		}
4407 	}
4408 
4409 	if (0) {
4410 reconnect:
4411 		drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4412 	}
4413 	if (0) {
4414 disconnect:
4415 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4416 	}
4417 	clear_bit(SIGNAL_ASENDER, &mdev->flags);
4418 
4419 	D_ASSERT(mdev->state.conn < C_CONNECTED);
4420 	dev_info(DEV, "asender terminated\n");
4421 
4422 	return 0;
4423 }
4424