xref: /linux/drivers/block/drbd/drbd_receiver.c (revision b411b3637fa71fce9cf2acf0639009500f5892fe)
1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/autoconf.h>
27 #include <linux/module.h>
28 
29 #include <asm/uaccess.h>
30 #include <net/sock.h>
31 
32 #include <linux/version.h>
33 #include <linux/drbd.h>
34 #include <linux/fs.h>
35 #include <linux/file.h>
36 #include <linux/in.h>
37 #include <linux/mm.h>
38 #include <linux/memcontrol.h>
39 #include <linux/mm_inline.h>
40 #include <linux/slab.h>
41 #include <linux/smp_lock.h>
42 #include <linux/pkt_sched.h>
43 #define __KERNEL_SYSCALLS__
44 #include <linux/unistd.h>
45 #include <linux/vmalloc.h>
46 #include <linux/random.h>
47 #include <linux/mm.h>
48 #include <linux/string.h>
49 #include <linux/scatterlist.h>
50 #include "drbd_int.h"
51 #include "drbd_tracing.h"
52 #include "drbd_req.h"
53 
54 #include "drbd_vli.h"
55 
56 struct flush_work {
57 	struct drbd_work w;
58 	struct drbd_epoch *epoch;
59 };
60 
61 enum finish_epoch {
62 	FE_STILL_LIVE,
63 	FE_DESTROYED,
64 	FE_RECYCLED,
65 };
66 
67 static int drbd_do_handshake(struct drbd_conf *mdev);
68 static int drbd_do_auth(struct drbd_conf *mdev);
69 
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
72 
73 static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
74 {
75 	struct drbd_epoch *prev;
76 	spin_lock(&mdev->epoch_lock);
77 	prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
78 	if (prev == epoch || prev == mdev->current_epoch)
79 		prev = NULL;
80 	spin_unlock(&mdev->epoch_lock);
81 	return prev;
82 }
83 
84 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
85 
86 static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
87 {
88 	struct page *page = NULL;
89 
90 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
91 	 * So what. It saves a spin_lock. */
92 	if (drbd_pp_vacant > 0) {
93 		spin_lock(&drbd_pp_lock);
94 		page = drbd_pp_pool;
95 		if (page) {
96 			drbd_pp_pool = (struct page *)page_private(page);
97 			set_page_private(page, 0); /* just to be polite */
98 			drbd_pp_vacant--;
99 		}
100 		spin_unlock(&drbd_pp_lock);
101 	}
102 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
103 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
104 	 * which in turn might block on the other node at this very place.  */
105 	if (!page)
106 		page = alloc_page(GFP_TRY);
107 	if (page)
108 		atomic_inc(&mdev->pp_in_use);
109 	return page;
110 }
111 
112 /* kick lower level device, if we have more than (arbitrary number)
113  * reference counts on it, which typically are locally submitted io
114  * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
115 static void maybe_kick_lo(struct drbd_conf *mdev)
116 {
117 	if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
118 		drbd_kick_lo(mdev);
119 }
120 
121 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
122 {
123 	struct drbd_epoch_entry *e;
124 	struct list_head *le, *tle;
125 
126 	/* The EEs are always appended to the end of the list. Since
127 	   they are sent in order over the wire, they have to finish
128 	   in order. As soon as we see the first not finished we can
129 	   stop to examine the list... */
130 
131 	list_for_each_safe(le, tle, &mdev->net_ee) {
132 		e = list_entry(le, struct drbd_epoch_entry, w.list);
133 		if (drbd_bio_has_active_page(e->private_bio))
134 			break;
135 		list_move(le, to_be_freed);
136 	}
137 }
138 
139 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
140 {
141 	LIST_HEAD(reclaimed);
142 	struct drbd_epoch_entry *e, *t;
143 
144 	maybe_kick_lo(mdev);
145 	spin_lock_irq(&mdev->req_lock);
146 	reclaim_net_ee(mdev, &reclaimed);
147 	spin_unlock_irq(&mdev->req_lock);
148 
149 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
150 		drbd_free_ee(mdev, e);
151 }
152 
153 /**
154  * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
155  * @mdev:	DRBD device.
156  * @retry:	whether or not to retry allocation forever (or until signalled)
157  *
158  * Tries to allocate a page, first from our own page pool, then from the
159  * kernel, unless this allocation would exceed the max_buffers setting.
160  * If @retry is non-zero, retry until DRBD frees a page somewhere else.
161  */
162 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
163 {
164 	struct page *page = NULL;
165 	DEFINE_WAIT(wait);
166 
167 	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
168 		page = drbd_pp_first_page_or_try_alloc(mdev);
169 		if (page)
170 			return page;
171 	}
172 
173 	for (;;) {
174 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
175 
176 		drbd_kick_lo_and_reclaim_net(mdev);
177 
178 		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
179 			page = drbd_pp_first_page_or_try_alloc(mdev);
180 			if (page)
181 				break;
182 		}
183 
184 		if (!retry)
185 			break;
186 
187 		if (signal_pending(current)) {
188 			dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
189 			break;
190 		}
191 
192 		schedule();
193 	}
194 	finish_wait(&drbd_pp_wait, &wait);
195 
196 	return page;
197 }
198 
199 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
200  * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
201 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
202 {
203 	int free_it;
204 
205 	spin_lock(&drbd_pp_lock);
206 	if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
207 		free_it = 1;
208 	} else {
209 		set_page_private(page, (unsigned long)drbd_pp_pool);
210 		drbd_pp_pool = page;
211 		drbd_pp_vacant++;
212 		free_it = 0;
213 	}
214 	spin_unlock(&drbd_pp_lock);
215 
216 	atomic_dec(&mdev->pp_in_use);
217 
218 	if (free_it)
219 		__free_page(page);
220 
221 	wake_up(&drbd_pp_wait);
222 }
223 
224 static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
225 {
226 	struct page *p_to_be_freed = NULL;
227 	struct page *page;
228 	struct bio_vec *bvec;
229 	int i;
230 
231 	spin_lock(&drbd_pp_lock);
232 	__bio_for_each_segment(bvec, bio, i, 0) {
233 		if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
234 			set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
235 			p_to_be_freed = bvec->bv_page;
236 		} else {
237 			set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
238 			drbd_pp_pool = bvec->bv_page;
239 			drbd_pp_vacant++;
240 		}
241 	}
242 	spin_unlock(&drbd_pp_lock);
243 	atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
244 
245 	while (p_to_be_freed) {
246 		page = p_to_be_freed;
247 		p_to_be_freed = (struct page *)page_private(page);
248 		set_page_private(page, 0); /* just to be polite */
249 		put_page(page);
250 	}
251 
252 	wake_up(&drbd_pp_wait);
253 }
254 
255 /*
256 You need to hold the req_lock:
257  _drbd_wait_ee_list_empty()
258 
259 You must not have the req_lock:
260  drbd_free_ee()
261  drbd_alloc_ee()
262  drbd_init_ee()
263  drbd_release_ee()
264  drbd_ee_fix_bhs()
265  drbd_process_done_ee()
266  drbd_clear_done_ee()
267  drbd_wait_ee_list_empty()
268 */
269 
270 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
271 				     u64 id,
272 				     sector_t sector,
273 				     unsigned int data_size,
274 				     gfp_t gfp_mask) __must_hold(local)
275 {
276 	struct request_queue *q;
277 	struct drbd_epoch_entry *e;
278 	struct page *page;
279 	struct bio *bio;
280 	unsigned int ds;
281 
282 	if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
283 		return NULL;
284 
285 	e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
286 	if (!e) {
287 		if (!(gfp_mask & __GFP_NOWARN))
288 			dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
289 		return NULL;
290 	}
291 
292 	bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
293 	if (!bio) {
294 		if (!(gfp_mask & __GFP_NOWARN))
295 			dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
296 		goto fail1;
297 	}
298 
299 	bio->bi_bdev = mdev->ldev->backing_bdev;
300 	bio->bi_sector = sector;
301 
302 	ds = data_size;
303 	while (ds) {
304 		page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
305 		if (!page) {
306 			if (!(gfp_mask & __GFP_NOWARN))
307 				dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
308 			goto fail2;
309 		}
310 		if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
311 			drbd_pp_free(mdev, page);
312 			dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
313 			    "data_size=%u,ds=%u) failed\n",
314 			    (unsigned long long)sector, data_size, ds);
315 
316 			q = bdev_get_queue(bio->bi_bdev);
317 			if (q->merge_bvec_fn) {
318 				struct bvec_merge_data bvm = {
319 					.bi_bdev = bio->bi_bdev,
320 					.bi_sector = bio->bi_sector,
321 					.bi_size = bio->bi_size,
322 					.bi_rw = bio->bi_rw,
323 				};
324 				int l = q->merge_bvec_fn(q, &bvm,
325 						&bio->bi_io_vec[bio->bi_vcnt]);
326 				dev_err(DEV, "merge_bvec_fn() = %d\n", l);
327 			}
328 
329 			/* dump more of the bio. */
330 			dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
331 			dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
332 			dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
333 			dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
334 
335 			goto fail2;
336 			break;
337 		}
338 		ds -= min_t(int, ds, PAGE_SIZE);
339 	}
340 
341 	D_ASSERT(data_size == bio->bi_size);
342 
343 	bio->bi_private = e;
344 	e->mdev = mdev;
345 	e->sector = sector;
346 	e->size = bio->bi_size;
347 
348 	e->private_bio = bio;
349 	e->block_id = id;
350 	INIT_HLIST_NODE(&e->colision);
351 	e->epoch = NULL;
352 	e->flags = 0;
353 
354 	trace_drbd_ee(mdev, e, "allocated");
355 
356 	return e;
357 
358  fail2:
359 	drbd_pp_free_bio_pages(mdev, bio);
360 	bio_put(bio);
361  fail1:
362 	mempool_free(e, drbd_ee_mempool);
363 
364 	return NULL;
365 }
366 
367 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
368 {
369 	struct bio *bio = e->private_bio;
370 	trace_drbd_ee(mdev, e, "freed");
371 	drbd_pp_free_bio_pages(mdev, bio);
372 	bio_put(bio);
373 	D_ASSERT(hlist_unhashed(&e->colision));
374 	mempool_free(e, drbd_ee_mempool);
375 }
376 
377 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
378 {
379 	LIST_HEAD(work_list);
380 	struct drbd_epoch_entry *e, *t;
381 	int count = 0;
382 
383 	spin_lock_irq(&mdev->req_lock);
384 	list_splice_init(list, &work_list);
385 	spin_unlock_irq(&mdev->req_lock);
386 
387 	list_for_each_entry_safe(e, t, &work_list, w.list) {
388 		drbd_free_ee(mdev, e);
389 		count++;
390 	}
391 	return count;
392 }
393 
394 
395 /*
396  * This function is called from _asender only_
397  * but see also comments in _req_mod(,barrier_acked)
398  * and receive_Barrier.
399  *
400  * Move entries from net_ee to done_ee, if ready.
401  * Grab done_ee, call all callbacks, free the entries.
402  * The callbacks typically send out ACKs.
403  */
404 static int drbd_process_done_ee(struct drbd_conf *mdev)
405 {
406 	LIST_HEAD(work_list);
407 	LIST_HEAD(reclaimed);
408 	struct drbd_epoch_entry *e, *t;
409 	int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
410 
411 	spin_lock_irq(&mdev->req_lock);
412 	reclaim_net_ee(mdev, &reclaimed);
413 	list_splice_init(&mdev->done_ee, &work_list);
414 	spin_unlock_irq(&mdev->req_lock);
415 
416 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
417 		drbd_free_ee(mdev, e);
418 
419 	/* possible callbacks here:
420 	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
421 	 * all ignore the last argument.
422 	 */
423 	list_for_each_entry_safe(e, t, &work_list, w.list) {
424 		trace_drbd_ee(mdev, e, "process_done_ee");
425 		/* list_del not necessary, next/prev members not touched */
426 		ok = e->w.cb(mdev, &e->w, !ok) && ok;
427 		drbd_free_ee(mdev, e);
428 	}
429 	wake_up(&mdev->ee_wait);
430 
431 	return ok;
432 }
433 
434 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
435 {
436 	DEFINE_WAIT(wait);
437 
438 	/* avoids spin_lock/unlock
439 	 * and calling prepare_to_wait in the fast path */
440 	while (!list_empty(head)) {
441 		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
442 		spin_unlock_irq(&mdev->req_lock);
443 		drbd_kick_lo(mdev);
444 		schedule();
445 		finish_wait(&mdev->ee_wait, &wait);
446 		spin_lock_irq(&mdev->req_lock);
447 	}
448 }
449 
450 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
451 {
452 	spin_lock_irq(&mdev->req_lock);
453 	_drbd_wait_ee_list_empty(mdev, head);
454 	spin_unlock_irq(&mdev->req_lock);
455 }
456 
457 /* see also kernel_accept; which is only present since 2.6.18.
458  * also we want to log which part of it failed, exactly */
459 static int drbd_accept(struct drbd_conf *mdev, const char **what,
460 		struct socket *sock, struct socket **newsock)
461 {
462 	struct sock *sk = sock->sk;
463 	int err = 0;
464 
465 	*what = "listen";
466 	err = sock->ops->listen(sock, 5);
467 	if (err < 0)
468 		goto out;
469 
470 	*what = "sock_create_lite";
471 	err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
472 			       newsock);
473 	if (err < 0)
474 		goto out;
475 
476 	*what = "accept";
477 	err = sock->ops->accept(sock, *newsock, 0);
478 	if (err < 0) {
479 		sock_release(*newsock);
480 		*newsock = NULL;
481 		goto out;
482 	}
483 	(*newsock)->ops  = sock->ops;
484 
485 out:
486 	return err;
487 }
488 
489 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
490 		    void *buf, size_t size, int flags)
491 {
492 	mm_segment_t oldfs;
493 	struct kvec iov = {
494 		.iov_base = buf,
495 		.iov_len = size,
496 	};
497 	struct msghdr msg = {
498 		.msg_iovlen = 1,
499 		.msg_iov = (struct iovec *)&iov,
500 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
501 	};
502 	int rv;
503 
504 	oldfs = get_fs();
505 	set_fs(KERNEL_DS);
506 	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
507 	set_fs(oldfs);
508 
509 	return rv;
510 }
511 
512 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
513 {
514 	mm_segment_t oldfs;
515 	struct kvec iov = {
516 		.iov_base = buf,
517 		.iov_len = size,
518 	};
519 	struct msghdr msg = {
520 		.msg_iovlen = 1,
521 		.msg_iov = (struct iovec *)&iov,
522 		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
523 	};
524 	int rv;
525 
526 	oldfs = get_fs();
527 	set_fs(KERNEL_DS);
528 
529 	for (;;) {
530 		rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
531 		if (rv == size)
532 			break;
533 
534 		/* Note:
535 		 * ECONNRESET	other side closed the connection
536 		 * ERESTARTSYS	(on  sock) we got a signal
537 		 */
538 
539 		if (rv < 0) {
540 			if (rv == -ECONNRESET)
541 				dev_info(DEV, "sock was reset by peer\n");
542 			else if (rv != -ERESTARTSYS)
543 				dev_err(DEV, "sock_recvmsg returned %d\n", rv);
544 			break;
545 		} else if (rv == 0) {
546 			dev_info(DEV, "sock was shut down by peer\n");
547 			break;
548 		} else	{
549 			/* signal came in, or peer/link went down,
550 			 * after we read a partial message
551 			 */
552 			/* D_ASSERT(signal_pending(current)); */
553 			break;
554 		}
555 	};
556 
557 	set_fs(oldfs);
558 
559 	if (rv != size)
560 		drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
561 
562 	return rv;
563 }
564 
565 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
566 {
567 	const char *what;
568 	struct socket *sock;
569 	struct sockaddr_in6 src_in6;
570 	int err;
571 	int disconnect_on_error = 1;
572 
573 	if (!get_net_conf(mdev))
574 		return NULL;
575 
576 	what = "sock_create_kern";
577 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
578 		SOCK_STREAM, IPPROTO_TCP, &sock);
579 	if (err < 0) {
580 		sock = NULL;
581 		goto out;
582 	}
583 
584 	sock->sk->sk_rcvtimeo =
585 	sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
586 
587        /* explicitly bind to the configured IP as source IP
588 	*  for the outgoing connections.
589 	*  This is needed for multihomed hosts and to be
590 	*  able to use lo: interfaces for drbd.
591 	* Make sure to use 0 as port number, so linux selects
592 	*  a free one dynamically.
593 	*/
594 	memcpy(&src_in6, mdev->net_conf->my_addr,
595 	       min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
596 	if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
597 		src_in6.sin6_port = 0;
598 	else
599 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
600 
601 	what = "bind before connect";
602 	err = sock->ops->bind(sock,
603 			      (struct sockaddr *) &src_in6,
604 			      mdev->net_conf->my_addr_len);
605 	if (err < 0)
606 		goto out;
607 
608 	/* connect may fail, peer not yet available.
609 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
610 	disconnect_on_error = 0;
611 	what = "connect";
612 	err = sock->ops->connect(sock,
613 				 (struct sockaddr *)mdev->net_conf->peer_addr,
614 				 mdev->net_conf->peer_addr_len, 0);
615 
616 out:
617 	if (err < 0) {
618 		if (sock) {
619 			sock_release(sock);
620 			sock = NULL;
621 		}
622 		switch (-err) {
623 			/* timeout, busy, signal pending */
624 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
625 		case EINTR: case ERESTARTSYS:
626 			/* peer not (yet) available, network problem */
627 		case ECONNREFUSED: case ENETUNREACH:
628 		case EHOSTDOWN:    case EHOSTUNREACH:
629 			disconnect_on_error = 0;
630 			break;
631 		default:
632 			dev_err(DEV, "%s failed, err = %d\n", what, err);
633 		}
634 		if (disconnect_on_error)
635 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
636 	}
637 	put_net_conf(mdev);
638 	return sock;
639 }
640 
641 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
642 {
643 	int timeo, err;
644 	struct socket *s_estab = NULL, *s_listen;
645 	const char *what;
646 
647 	if (!get_net_conf(mdev))
648 		return NULL;
649 
650 	what = "sock_create_kern";
651 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
652 		SOCK_STREAM, IPPROTO_TCP, &s_listen);
653 	if (err) {
654 		s_listen = NULL;
655 		goto out;
656 	}
657 
658 	timeo = mdev->net_conf->try_connect_int * HZ;
659 	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
660 
661 	s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
662 	s_listen->sk->sk_rcvtimeo = timeo;
663 	s_listen->sk->sk_sndtimeo = timeo;
664 
665 	what = "bind before listen";
666 	err = s_listen->ops->bind(s_listen,
667 			      (struct sockaddr *) mdev->net_conf->my_addr,
668 			      mdev->net_conf->my_addr_len);
669 	if (err < 0)
670 		goto out;
671 
672 	err = drbd_accept(mdev, &what, s_listen, &s_estab);
673 
674 out:
675 	if (s_listen)
676 		sock_release(s_listen);
677 	if (err < 0) {
678 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
679 			dev_err(DEV, "%s failed, err = %d\n", what, err);
680 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
681 		}
682 	}
683 	put_net_conf(mdev);
684 
685 	return s_estab;
686 }
687 
688 static int drbd_send_fp(struct drbd_conf *mdev,
689 	struct socket *sock, enum drbd_packets cmd)
690 {
691 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
692 
693 	return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
694 }
695 
696 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
697 {
698 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
699 	int rr;
700 
701 	rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
702 
703 	if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
704 		return be16_to_cpu(h->command);
705 
706 	return 0xffff;
707 }
708 
709 /**
710  * drbd_socket_okay() - Free the socket if its connection is not okay
711  * @mdev:	DRBD device.
712  * @sock:	pointer to the pointer to the socket.
713  */
714 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
715 {
716 	int rr;
717 	char tb[4];
718 
719 	if (!*sock)
720 		return FALSE;
721 
722 	rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
723 
724 	if (rr > 0 || rr == -EAGAIN) {
725 		return TRUE;
726 	} else {
727 		sock_release(*sock);
728 		*sock = NULL;
729 		return FALSE;
730 	}
731 }
732 
733 /*
734  * return values:
735  *   1 yes, we have a valid connection
736  *   0 oops, did not work out, please try again
737  *  -1 peer talks different language,
738  *     no point in trying again, please go standalone.
739  *  -2 We do not have a network config...
740  */
741 static int drbd_connect(struct drbd_conf *mdev)
742 {
743 	struct socket *s, *sock, *msock;
744 	int try, h, ok;
745 
746 	D_ASSERT(!mdev->data.socket);
747 
748 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
749 		dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
750 
751 	if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
752 		return -2;
753 
754 	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
755 
756 	sock  = NULL;
757 	msock = NULL;
758 
759 	do {
760 		for (try = 0;;) {
761 			/* 3 tries, this should take less than a second! */
762 			s = drbd_try_connect(mdev);
763 			if (s || ++try >= 3)
764 				break;
765 			/* give the other side time to call bind() & listen() */
766 			__set_current_state(TASK_INTERRUPTIBLE);
767 			schedule_timeout(HZ / 10);
768 		}
769 
770 		if (s) {
771 			if (!sock) {
772 				drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
773 				sock = s;
774 				s = NULL;
775 			} else if (!msock) {
776 				drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
777 				msock = s;
778 				s = NULL;
779 			} else {
780 				dev_err(DEV, "Logic error in drbd_connect()\n");
781 				goto out_release_sockets;
782 			}
783 		}
784 
785 		if (sock && msock) {
786 			__set_current_state(TASK_INTERRUPTIBLE);
787 			schedule_timeout(HZ / 10);
788 			ok = drbd_socket_okay(mdev, &sock);
789 			ok = drbd_socket_okay(mdev, &msock) && ok;
790 			if (ok)
791 				break;
792 		}
793 
794 retry:
795 		s = drbd_wait_for_connect(mdev);
796 		if (s) {
797 			try = drbd_recv_fp(mdev, s);
798 			drbd_socket_okay(mdev, &sock);
799 			drbd_socket_okay(mdev, &msock);
800 			switch (try) {
801 			case P_HAND_SHAKE_S:
802 				if (sock) {
803 					dev_warn(DEV, "initial packet S crossed\n");
804 					sock_release(sock);
805 				}
806 				sock = s;
807 				break;
808 			case P_HAND_SHAKE_M:
809 				if (msock) {
810 					dev_warn(DEV, "initial packet M crossed\n");
811 					sock_release(msock);
812 				}
813 				msock = s;
814 				set_bit(DISCARD_CONCURRENT, &mdev->flags);
815 				break;
816 			default:
817 				dev_warn(DEV, "Error receiving initial packet\n");
818 				sock_release(s);
819 				if (random32() & 1)
820 					goto retry;
821 			}
822 		}
823 
824 		if (mdev->state.conn <= C_DISCONNECTING)
825 			goto out_release_sockets;
826 		if (signal_pending(current)) {
827 			flush_signals(current);
828 			smp_rmb();
829 			if (get_t_state(&mdev->receiver) == Exiting)
830 				goto out_release_sockets;
831 		}
832 
833 		if (sock && msock) {
834 			ok = drbd_socket_okay(mdev, &sock);
835 			ok = drbd_socket_okay(mdev, &msock) && ok;
836 			if (ok)
837 				break;
838 		}
839 	} while (1);
840 
841 	msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
842 	sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
843 
844 	sock->sk->sk_allocation = GFP_NOIO;
845 	msock->sk->sk_allocation = GFP_NOIO;
846 
847 	sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
848 	msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
849 
850 	if (mdev->net_conf->sndbuf_size) {
851 		sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
852 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
853 	}
854 
855 	if (mdev->net_conf->rcvbuf_size) {
856 		sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
857 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
858 	}
859 
860 	/* NOT YET ...
861 	 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862 	 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
863 	 * first set it to the P_HAND_SHAKE timeout,
864 	 * which we set to 4x the configured ping_timeout. */
865 	sock->sk->sk_sndtimeo =
866 	sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
867 
868 	msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
869 	msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
870 
871 	/* we don't want delays.
872 	 * we use TCP_CORK where apropriate, though */
873 	drbd_tcp_nodelay(sock);
874 	drbd_tcp_nodelay(msock);
875 
876 	mdev->data.socket = sock;
877 	mdev->meta.socket = msock;
878 	mdev->last_received = jiffies;
879 
880 	D_ASSERT(mdev->asender.task == NULL);
881 
882 	h = drbd_do_handshake(mdev);
883 	if (h <= 0)
884 		return h;
885 
886 	if (mdev->cram_hmac_tfm) {
887 		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
888 		if (!drbd_do_auth(mdev)) {
889 			dev_err(DEV, "Authentication of peer failed\n");
890 			return -1;
891 		}
892 	}
893 
894 	if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
895 		return 0;
896 
897 	sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
898 	sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
899 
900 	atomic_set(&mdev->packet_seq, 0);
901 	mdev->peer_seq = 0;
902 
903 	drbd_thread_start(&mdev->asender);
904 
905 	drbd_send_protocol(mdev);
906 	drbd_send_sync_param(mdev, &mdev->sync_conf);
907 	drbd_send_sizes(mdev, 0);
908 	drbd_send_uuids(mdev);
909 	drbd_send_state(mdev);
910 	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
911 	clear_bit(RESIZE_PENDING, &mdev->flags);
912 
913 	return 1;
914 
915 out_release_sockets:
916 	if (sock)
917 		sock_release(sock);
918 	if (msock)
919 		sock_release(msock);
920 	return -1;
921 }
922 
923 static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
924 {
925 	int r;
926 
927 	r = drbd_recv(mdev, h, sizeof(*h));
928 
929 	if (unlikely(r != sizeof(*h))) {
930 		dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
931 		return FALSE;
932 	};
933 	h->command = be16_to_cpu(h->command);
934 	h->length  = be16_to_cpu(h->length);
935 	if (unlikely(h->magic != BE_DRBD_MAGIC)) {
936 		dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
937 		    (long)be32_to_cpu(h->magic),
938 		    h->command, h->length);
939 		return FALSE;
940 	}
941 	mdev->last_received = jiffies;
942 
943 	return TRUE;
944 }
945 
946 static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
947 {
948 	int rv;
949 
950 	if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
951 		rv = blkdev_issue_flush(mdev->ldev->backing_bdev, NULL);
952 		if (rv) {
953 			dev_err(DEV, "local disk flush failed with status %d\n", rv);
954 			/* would rather check on EOPNOTSUPP, but that is not reliable.
955 			 * don't try again for ANY return value != 0
956 			 * if (rv == -EOPNOTSUPP) */
957 			drbd_bump_write_ordering(mdev, WO_drain_io);
958 		}
959 		put_ldev(mdev);
960 	}
961 
962 	return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
963 }
964 
965 static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
966 {
967 	struct flush_work *fw = (struct flush_work *)w;
968 	struct drbd_epoch *epoch = fw->epoch;
969 
970 	kfree(w);
971 
972 	if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
973 		drbd_flush_after_epoch(mdev, epoch);
974 
975 	drbd_may_finish_epoch(mdev, epoch, EV_PUT |
976 			      (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
977 
978 	return 1;
979 }
980 
981 /**
982  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
983  * @mdev:	DRBD device.
984  * @epoch:	Epoch object.
985  * @ev:		Epoch event.
986  */
987 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
988 					       struct drbd_epoch *epoch,
989 					       enum epoch_event ev)
990 {
991 	int finish, epoch_size;
992 	struct drbd_epoch *next_epoch;
993 	int schedule_flush = 0;
994 	enum finish_epoch rv = FE_STILL_LIVE;
995 
996 	spin_lock(&mdev->epoch_lock);
997 	do {
998 		next_epoch = NULL;
999 		finish = 0;
1000 
1001 		epoch_size = atomic_read(&epoch->epoch_size);
1002 
1003 		switch (ev & ~EV_CLEANUP) {
1004 		case EV_PUT:
1005 			atomic_dec(&epoch->active);
1006 			break;
1007 		case EV_GOT_BARRIER_NR:
1008 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1009 
1010 			/* Special case: If we just switched from WO_bio_barrier to
1011 			   WO_bdev_flush we should not finish the current epoch */
1012 			if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1013 			    mdev->write_ordering != WO_bio_barrier &&
1014 			    epoch == mdev->current_epoch)
1015 				clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1016 			break;
1017 		case EV_BARRIER_DONE:
1018 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1019 			break;
1020 		case EV_BECAME_LAST:
1021 			/* nothing to do*/
1022 			break;
1023 		}
1024 
1025 		trace_drbd_epoch(mdev, epoch, ev);
1026 
1027 		if (epoch_size != 0 &&
1028 		    atomic_read(&epoch->active) == 0 &&
1029 		    test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1030 		    epoch->list.prev == &mdev->current_epoch->list &&
1031 		    !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1032 			/* Nearly all conditions are met to finish that epoch... */
1033 			if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1034 			    mdev->write_ordering == WO_none ||
1035 			    (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1036 			    ev & EV_CLEANUP) {
1037 				finish = 1;
1038 				set_bit(DE_IS_FINISHING, &epoch->flags);
1039 			} else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1040 				 mdev->write_ordering == WO_bio_barrier) {
1041 				atomic_inc(&epoch->active);
1042 				schedule_flush = 1;
1043 			}
1044 		}
1045 		if (finish) {
1046 			if (!(ev & EV_CLEANUP)) {
1047 				spin_unlock(&mdev->epoch_lock);
1048 				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1049 				spin_lock(&mdev->epoch_lock);
1050 			}
1051 			dec_unacked(mdev);
1052 
1053 			if (mdev->current_epoch != epoch) {
1054 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1055 				list_del(&epoch->list);
1056 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1057 				mdev->epochs--;
1058 				trace_drbd_epoch(mdev, epoch, EV_TRACE_FREE);
1059 				kfree(epoch);
1060 
1061 				if (rv == FE_STILL_LIVE)
1062 					rv = FE_DESTROYED;
1063 			} else {
1064 				epoch->flags = 0;
1065 				atomic_set(&epoch->epoch_size, 0);
1066 				/* atomic_set(&epoch->active, 0); is alrady zero */
1067 				if (rv == FE_STILL_LIVE)
1068 					rv = FE_RECYCLED;
1069 			}
1070 		}
1071 
1072 		if (!next_epoch)
1073 			break;
1074 
1075 		epoch = next_epoch;
1076 	} while (1);
1077 
1078 	spin_unlock(&mdev->epoch_lock);
1079 
1080 	if (schedule_flush) {
1081 		struct flush_work *fw;
1082 		fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1083 		if (fw) {
1084 			trace_drbd_epoch(mdev, epoch, EV_TRACE_FLUSH);
1085 			fw->w.cb = w_flush;
1086 			fw->epoch = epoch;
1087 			drbd_queue_work(&mdev->data.work, &fw->w);
1088 		} else {
1089 			dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1090 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1091 			/* That is not a recursion, only one level */
1092 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1093 			drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1094 		}
1095 	}
1096 
1097 	return rv;
1098 }
1099 
1100 /**
1101  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1102  * @mdev:	DRBD device.
1103  * @wo:		Write ordering method to try.
1104  */
1105 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1106 {
1107 	enum write_ordering_e pwo;
1108 	static char *write_ordering_str[] = {
1109 		[WO_none] = "none",
1110 		[WO_drain_io] = "drain",
1111 		[WO_bdev_flush] = "flush",
1112 		[WO_bio_barrier] = "barrier",
1113 	};
1114 
1115 	pwo = mdev->write_ordering;
1116 	wo = min(pwo, wo);
1117 	if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1118 		wo = WO_bdev_flush;
1119 	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1120 		wo = WO_drain_io;
1121 	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1122 		wo = WO_none;
1123 	mdev->write_ordering = wo;
1124 	if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1125 		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1126 }
1127 
1128 /**
1129  * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1130  * @mdev:	DRBD device.
1131  * @w:		work object.
1132  * @cancel:	The connection will be closed anyways (unused in this callback)
1133  */
1134 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1135 {
1136 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1137 	struct bio *bio = e->private_bio;
1138 
1139 	/* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1140 	   (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1141 	   so that we can finish that epoch in drbd_may_finish_epoch().
1142 	   That is necessary if we already have a long chain of Epochs, before
1143 	   we realize that BIO_RW_BARRIER is actually not supported */
1144 
1145 	/* As long as the -ENOTSUPP on the barrier is reported immediately
1146 	   that will never trigger. If it is reported late, we will just
1147 	   print that warning and continue correctly for all future requests
1148 	   with WO_bdev_flush */
1149 	if (previous_epoch(mdev, e->epoch))
1150 		dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1151 
1152 	/* prepare bio for re-submit,
1153 	 * re-init volatile members */
1154 	/* we still have a local reference,
1155 	 * get_ldev was done in receive_Data. */
1156 	bio->bi_bdev = mdev->ldev->backing_bdev;
1157 	bio->bi_sector = e->sector;
1158 	bio->bi_size = e->size;
1159 	bio->bi_idx = 0;
1160 
1161 	bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1162 	bio->bi_flags |= 1 << BIO_UPTODATE;
1163 
1164 	/* don't know whether this is necessary: */
1165 	bio->bi_phys_segments = 0;
1166 	bio->bi_next = NULL;
1167 
1168 	/* these should be unchanged: */
1169 	/* bio->bi_end_io = drbd_endio_write_sec; */
1170 	/* bio->bi_vcnt = whatever; */
1171 
1172 	e->w.cb = e_end_block;
1173 
1174 	/* This is no longer a barrier request. */
1175 	bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
1176 
1177 	drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
1178 
1179 	return 1;
1180 }
1181 
1182 static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1183 {
1184 	int rv, issue_flush;
1185 	struct p_barrier *p = (struct p_barrier *)h;
1186 	struct drbd_epoch *epoch;
1187 
1188 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1189 
1190 	rv = drbd_recv(mdev, h->payload, h->length);
1191 	ERR_IF(rv != h->length) return FALSE;
1192 
1193 	inc_unacked(mdev);
1194 
1195 	if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1196 		drbd_kick_lo(mdev);
1197 
1198 	mdev->current_epoch->barrier_nr = p->barrier;
1199 	rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1200 
1201 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1202 	 * the activity log, which means it would not be resynced in case the
1203 	 * R_PRIMARY crashes now.
1204 	 * Therefore we must send the barrier_ack after the barrier request was
1205 	 * completed. */
1206 	switch (mdev->write_ordering) {
1207 	case WO_bio_barrier:
1208 	case WO_none:
1209 		if (rv == FE_RECYCLED)
1210 			return TRUE;
1211 		break;
1212 
1213 	case WO_bdev_flush:
1214 	case WO_drain_io:
1215 		D_ASSERT(rv == FE_STILL_LIVE);
1216 		set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1217 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1218 		rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1219 		if (rv == FE_RECYCLED)
1220 			return TRUE;
1221 
1222 		/* The asender will send all the ACKs and barrier ACKs out, since
1223 		   all EEs moved from the active_ee to the done_ee. We need to
1224 		   provide a new epoch object for the EEs that come in soon */
1225 		break;
1226 	}
1227 
1228 	/* receiver context, in the writeout path of the other node.
1229 	 * avoid potential distributed deadlock */
1230 	epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1231 	if (!epoch) {
1232 		dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1233 		issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1234 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1235 		if (issue_flush) {
1236 			rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1237 			if (rv == FE_RECYCLED)
1238 				return TRUE;
1239 		}
1240 
1241 		drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1242 
1243 		return TRUE;
1244 	}
1245 
1246 	epoch->flags = 0;
1247 	atomic_set(&epoch->epoch_size, 0);
1248 	atomic_set(&epoch->active, 0);
1249 
1250 	spin_lock(&mdev->epoch_lock);
1251 	if (atomic_read(&mdev->current_epoch->epoch_size)) {
1252 		list_add(&epoch->list, &mdev->current_epoch->list);
1253 		mdev->current_epoch = epoch;
1254 		mdev->epochs++;
1255 		trace_drbd_epoch(mdev, epoch, EV_TRACE_ALLOC);
1256 	} else {
1257 		/* The current_epoch got recycled while we allocated this one... */
1258 		kfree(epoch);
1259 	}
1260 	spin_unlock(&mdev->epoch_lock);
1261 
1262 	return TRUE;
1263 }
1264 
1265 /* used from receive_RSDataReply (recv_resync_read)
1266  * and from receive_Data */
1267 static struct drbd_epoch_entry *
1268 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1269 {
1270 	struct drbd_epoch_entry *e;
1271 	struct bio_vec *bvec;
1272 	struct page *page;
1273 	struct bio *bio;
1274 	int dgs, ds, i, rr;
1275 	void *dig_in = mdev->int_dig_in;
1276 	void *dig_vv = mdev->int_dig_vv;
1277 
1278 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1279 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1280 
1281 	if (dgs) {
1282 		rr = drbd_recv(mdev, dig_in, dgs);
1283 		if (rr != dgs) {
1284 			dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1285 			     rr, dgs);
1286 			return NULL;
1287 		}
1288 	}
1289 
1290 	data_size -= dgs;
1291 
1292 	ERR_IF(data_size &  0x1ff) return NULL;
1293 	ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1294 
1295 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1296 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1297 	 * which in turn might block on the other node at this very place.  */
1298 	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1299 	if (!e)
1300 		return NULL;
1301 	bio = e->private_bio;
1302 	ds = data_size;
1303 	bio_for_each_segment(bvec, bio, i) {
1304 		page = bvec->bv_page;
1305 		rr = drbd_recv(mdev, kmap(page), min_t(int, ds, PAGE_SIZE));
1306 		kunmap(page);
1307 		if (rr != min_t(int, ds, PAGE_SIZE)) {
1308 			drbd_free_ee(mdev, e);
1309 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1310 			     rr, min_t(int, ds, PAGE_SIZE));
1311 			return NULL;
1312 		}
1313 		ds -= rr;
1314 	}
1315 
1316 	if (dgs) {
1317 		drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1318 		if (memcmp(dig_in, dig_vv, dgs)) {
1319 			dev_err(DEV, "Digest integrity check FAILED.\n");
1320 			drbd_bcast_ee(mdev, "digest failed",
1321 					dgs, dig_in, dig_vv, e);
1322 			drbd_free_ee(mdev, e);
1323 			return NULL;
1324 		}
1325 	}
1326 	mdev->recv_cnt += data_size>>9;
1327 	return e;
1328 }
1329 
1330 /* drbd_drain_block() just takes a data block
1331  * out of the socket input buffer, and discards it.
1332  */
1333 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1334 {
1335 	struct page *page;
1336 	int rr, rv = 1;
1337 	void *data;
1338 
1339 	page = drbd_pp_alloc(mdev, 1);
1340 
1341 	data = kmap(page);
1342 	while (data_size) {
1343 		rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1344 		if (rr != min_t(int, data_size, PAGE_SIZE)) {
1345 			rv = 0;
1346 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1347 			     rr, min_t(int, data_size, PAGE_SIZE));
1348 			break;
1349 		}
1350 		data_size -= rr;
1351 	}
1352 	kunmap(page);
1353 	drbd_pp_free(mdev, page);
1354 	return rv;
1355 }
1356 
1357 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1358 			   sector_t sector, int data_size)
1359 {
1360 	struct bio_vec *bvec;
1361 	struct bio *bio;
1362 	int dgs, rr, i, expect;
1363 	void *dig_in = mdev->int_dig_in;
1364 	void *dig_vv = mdev->int_dig_vv;
1365 
1366 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1367 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1368 
1369 	if (dgs) {
1370 		rr = drbd_recv(mdev, dig_in, dgs);
1371 		if (rr != dgs) {
1372 			dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1373 			     rr, dgs);
1374 			return 0;
1375 		}
1376 	}
1377 
1378 	data_size -= dgs;
1379 
1380 	/* optimistically update recv_cnt.  if receiving fails below,
1381 	 * we disconnect anyways, and counters will be reset. */
1382 	mdev->recv_cnt += data_size>>9;
1383 
1384 	bio = req->master_bio;
1385 	D_ASSERT(sector == bio->bi_sector);
1386 
1387 	bio_for_each_segment(bvec, bio, i) {
1388 		expect = min_t(int, data_size, bvec->bv_len);
1389 		rr = drbd_recv(mdev,
1390 			     kmap(bvec->bv_page)+bvec->bv_offset,
1391 			     expect);
1392 		kunmap(bvec->bv_page);
1393 		if (rr != expect) {
1394 			dev_warn(DEV, "short read receiving data reply: "
1395 			     "read %d expected %d\n",
1396 			     rr, expect);
1397 			return 0;
1398 		}
1399 		data_size -= rr;
1400 	}
1401 
1402 	if (dgs) {
1403 		drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1404 		if (memcmp(dig_in, dig_vv, dgs)) {
1405 			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1406 			return 0;
1407 		}
1408 	}
1409 
1410 	D_ASSERT(data_size == 0);
1411 	return 1;
1412 }
1413 
1414 /* e_end_resync_block() is called via
1415  * drbd_process_done_ee() by asender only */
1416 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1417 {
1418 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1419 	sector_t sector = e->sector;
1420 	int ok;
1421 
1422 	D_ASSERT(hlist_unhashed(&e->colision));
1423 
1424 	if (likely(drbd_bio_uptodate(e->private_bio))) {
1425 		drbd_set_in_sync(mdev, sector, e->size);
1426 		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1427 	} else {
1428 		/* Record failure to sync */
1429 		drbd_rs_failed_io(mdev, sector, e->size);
1430 
1431 		ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1432 	}
1433 	dec_unacked(mdev);
1434 
1435 	return ok;
1436 }
1437 
1438 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1439 {
1440 	struct drbd_epoch_entry *e;
1441 
1442 	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1443 	if (!e) {
1444 		put_ldev(mdev);
1445 		return FALSE;
1446 	}
1447 
1448 	dec_rs_pending(mdev);
1449 
1450 	e->private_bio->bi_end_io = drbd_endio_write_sec;
1451 	e->private_bio->bi_rw = WRITE;
1452 	e->w.cb = e_end_resync_block;
1453 
1454 	inc_unacked(mdev);
1455 	/* corresponding dec_unacked() in e_end_resync_block()
1456 	 * respective _drbd_clear_done_ee */
1457 
1458 	spin_lock_irq(&mdev->req_lock);
1459 	list_add(&e->w.list, &mdev->sync_ee);
1460 	spin_unlock_irq(&mdev->req_lock);
1461 
1462 	trace_drbd_ee(mdev, e, "submitting for (rs)write");
1463 	trace_drbd_bio(mdev, "Sec", e->private_bio, 0, NULL);
1464 	drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
1465 	/* accounting done in endio */
1466 
1467 	maybe_kick_lo(mdev);
1468 	return TRUE;
1469 }
1470 
1471 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1472 {
1473 	struct drbd_request *req;
1474 	sector_t sector;
1475 	unsigned int header_size, data_size;
1476 	int ok;
1477 	struct p_data *p = (struct p_data *)h;
1478 
1479 	header_size = sizeof(*p) - sizeof(*h);
1480 	data_size   = h->length  - header_size;
1481 
1482 	ERR_IF(data_size == 0) return FALSE;
1483 
1484 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1485 		return FALSE;
1486 
1487 	sector = be64_to_cpu(p->sector);
1488 
1489 	spin_lock_irq(&mdev->req_lock);
1490 	req = _ar_id_to_req(mdev, p->block_id, sector);
1491 	spin_unlock_irq(&mdev->req_lock);
1492 	if (unlikely(!req)) {
1493 		dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1494 		return FALSE;
1495 	}
1496 
1497 	/* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1498 	 * special casing it there for the various failure cases.
1499 	 * still no race with drbd_fail_pending_reads */
1500 	ok = recv_dless_read(mdev, req, sector, data_size);
1501 
1502 	if (ok)
1503 		req_mod(req, data_received);
1504 	/* else: nothing. handled from drbd_disconnect...
1505 	 * I don't think we may complete this just yet
1506 	 * in case we are "on-disconnect: freeze" */
1507 
1508 	return ok;
1509 }
1510 
1511 static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1512 {
1513 	sector_t sector;
1514 	unsigned int header_size, data_size;
1515 	int ok;
1516 	struct p_data *p = (struct p_data *)h;
1517 
1518 	header_size = sizeof(*p) - sizeof(*h);
1519 	data_size   = h->length  - header_size;
1520 
1521 	ERR_IF(data_size == 0) return FALSE;
1522 
1523 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1524 		return FALSE;
1525 
1526 	sector = be64_to_cpu(p->sector);
1527 	D_ASSERT(p->block_id == ID_SYNCER);
1528 
1529 	if (get_ldev(mdev)) {
1530 		/* data is submitted to disk within recv_resync_read.
1531 		 * corresponding put_ldev done below on error,
1532 		 * or in drbd_endio_write_sec. */
1533 		ok = recv_resync_read(mdev, sector, data_size);
1534 	} else {
1535 		if (__ratelimit(&drbd_ratelimit_state))
1536 			dev_err(DEV, "Can not write resync data to local disk.\n");
1537 
1538 		ok = drbd_drain_block(mdev, data_size);
1539 
1540 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1541 	}
1542 
1543 	return ok;
1544 }
1545 
1546 /* e_end_block() is called via drbd_process_done_ee().
1547  * this means this function only runs in the asender thread
1548  */
1549 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1550 {
1551 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1552 	sector_t sector = e->sector;
1553 	struct drbd_epoch *epoch;
1554 	int ok = 1, pcmd;
1555 
1556 	if (e->flags & EE_IS_BARRIER) {
1557 		epoch = previous_epoch(mdev, e->epoch);
1558 		if (epoch)
1559 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1560 	}
1561 
1562 	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1563 		if (likely(drbd_bio_uptodate(e->private_bio))) {
1564 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1565 				mdev->state.conn <= C_PAUSED_SYNC_T &&
1566 				e->flags & EE_MAY_SET_IN_SYNC) ?
1567 				P_RS_WRITE_ACK : P_WRITE_ACK;
1568 			ok &= drbd_send_ack(mdev, pcmd, e);
1569 			if (pcmd == P_RS_WRITE_ACK)
1570 				drbd_set_in_sync(mdev, sector, e->size);
1571 		} else {
1572 			ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1573 			/* we expect it to be marked out of sync anyways...
1574 			 * maybe assert this?  */
1575 		}
1576 		dec_unacked(mdev);
1577 	}
1578 	/* we delete from the conflict detection hash _after_ we sent out the
1579 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1580 	if (mdev->net_conf->two_primaries) {
1581 		spin_lock_irq(&mdev->req_lock);
1582 		D_ASSERT(!hlist_unhashed(&e->colision));
1583 		hlist_del_init(&e->colision);
1584 		spin_unlock_irq(&mdev->req_lock);
1585 	} else {
1586 		D_ASSERT(hlist_unhashed(&e->colision));
1587 	}
1588 
1589 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1590 
1591 	return ok;
1592 }
1593 
1594 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1595 {
1596 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1597 	int ok = 1;
1598 
1599 	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1600 	ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1601 
1602 	spin_lock_irq(&mdev->req_lock);
1603 	D_ASSERT(!hlist_unhashed(&e->colision));
1604 	hlist_del_init(&e->colision);
1605 	spin_unlock_irq(&mdev->req_lock);
1606 
1607 	dec_unacked(mdev);
1608 
1609 	return ok;
1610 }
1611 
1612 /* Called from receive_Data.
1613  * Synchronize packets on sock with packets on msock.
1614  *
1615  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1616  * packet traveling on msock, they are still processed in the order they have
1617  * been sent.
1618  *
1619  * Note: we don't care for Ack packets overtaking P_DATA packets.
1620  *
1621  * In case packet_seq is larger than mdev->peer_seq number, there are
1622  * outstanding packets on the msock. We wait for them to arrive.
1623  * In case we are the logically next packet, we update mdev->peer_seq
1624  * ourselves. Correctly handles 32bit wrap around.
1625  *
1626  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1627  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1628  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1629  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1630  *
1631  * returns 0 if we may process the packet,
1632  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1633 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1634 {
1635 	DEFINE_WAIT(wait);
1636 	unsigned int p_seq;
1637 	long timeout;
1638 	int ret = 0;
1639 	spin_lock(&mdev->peer_seq_lock);
1640 	for (;;) {
1641 		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1642 		if (seq_le(packet_seq, mdev->peer_seq+1))
1643 			break;
1644 		if (signal_pending(current)) {
1645 			ret = -ERESTARTSYS;
1646 			break;
1647 		}
1648 		p_seq = mdev->peer_seq;
1649 		spin_unlock(&mdev->peer_seq_lock);
1650 		timeout = schedule_timeout(30*HZ);
1651 		spin_lock(&mdev->peer_seq_lock);
1652 		if (timeout == 0 && p_seq == mdev->peer_seq) {
1653 			ret = -ETIMEDOUT;
1654 			dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1655 			break;
1656 		}
1657 	}
1658 	finish_wait(&mdev->seq_wait, &wait);
1659 	if (mdev->peer_seq+1 == packet_seq)
1660 		mdev->peer_seq++;
1661 	spin_unlock(&mdev->peer_seq_lock);
1662 	return ret;
1663 }
1664 
1665 /* mirrored write */
1666 static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1667 {
1668 	sector_t sector;
1669 	struct drbd_epoch_entry *e;
1670 	struct p_data *p = (struct p_data *)h;
1671 	int header_size, data_size;
1672 	int rw = WRITE;
1673 	u32 dp_flags;
1674 
1675 	header_size = sizeof(*p) - sizeof(*h);
1676 	data_size   = h->length  - header_size;
1677 
1678 	ERR_IF(data_size == 0) return FALSE;
1679 
1680 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1681 		return FALSE;
1682 
1683 	if (!get_ldev(mdev)) {
1684 		if (__ratelimit(&drbd_ratelimit_state))
1685 			dev_err(DEV, "Can not write mirrored data block "
1686 			    "to local disk.\n");
1687 		spin_lock(&mdev->peer_seq_lock);
1688 		if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1689 			mdev->peer_seq++;
1690 		spin_unlock(&mdev->peer_seq_lock);
1691 
1692 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1693 		atomic_inc(&mdev->current_epoch->epoch_size);
1694 		return drbd_drain_block(mdev, data_size);
1695 	}
1696 
1697 	/* get_ldev(mdev) successful.
1698 	 * Corresponding put_ldev done either below (on various errors),
1699 	 * or in drbd_endio_write_sec, if we successfully submit the data at
1700 	 * the end of this function. */
1701 
1702 	sector = be64_to_cpu(p->sector);
1703 	e = read_in_block(mdev, p->block_id, sector, data_size);
1704 	if (!e) {
1705 		put_ldev(mdev);
1706 		return FALSE;
1707 	}
1708 
1709 	e->private_bio->bi_end_io = drbd_endio_write_sec;
1710 	e->w.cb = e_end_block;
1711 
1712 	spin_lock(&mdev->epoch_lock);
1713 	e->epoch = mdev->current_epoch;
1714 	atomic_inc(&e->epoch->epoch_size);
1715 	atomic_inc(&e->epoch->active);
1716 
1717 	if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1718 		struct drbd_epoch *epoch;
1719 		/* Issue a barrier if we start a new epoch, and the previous epoch
1720 		   was not a epoch containing a single request which already was
1721 		   a Barrier. */
1722 		epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1723 		if (epoch == e->epoch) {
1724 			set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1725 			trace_drbd_epoch(mdev, e->epoch, EV_TRACE_ADD_BARRIER);
1726 			rw |= (1<<BIO_RW_BARRIER);
1727 			e->flags |= EE_IS_BARRIER;
1728 		} else {
1729 			if (atomic_read(&epoch->epoch_size) > 1 ||
1730 			    !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1731 				set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1732 				trace_drbd_epoch(mdev, epoch, EV_TRACE_SETTING_BI);
1733 				set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1734 				trace_drbd_epoch(mdev, e->epoch, EV_TRACE_ADD_BARRIER);
1735 				rw |= (1<<BIO_RW_BARRIER);
1736 				e->flags |= EE_IS_BARRIER;
1737 			}
1738 		}
1739 	}
1740 	spin_unlock(&mdev->epoch_lock);
1741 
1742 	dp_flags = be32_to_cpu(p->dp_flags);
1743 	if (dp_flags & DP_HARDBARRIER) {
1744 		dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1745 		/* rw |= (1<<BIO_RW_BARRIER); */
1746 	}
1747 	if (dp_flags & DP_RW_SYNC)
1748 		rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1749 	if (dp_flags & DP_MAY_SET_IN_SYNC)
1750 		e->flags |= EE_MAY_SET_IN_SYNC;
1751 
1752 	/* I'm the receiver, I do hold a net_cnt reference. */
1753 	if (!mdev->net_conf->two_primaries) {
1754 		spin_lock_irq(&mdev->req_lock);
1755 	} else {
1756 		/* don't get the req_lock yet,
1757 		 * we may sleep in drbd_wait_peer_seq */
1758 		const int size = e->size;
1759 		const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1760 		DEFINE_WAIT(wait);
1761 		struct drbd_request *i;
1762 		struct hlist_node *n;
1763 		struct hlist_head *slot;
1764 		int first;
1765 
1766 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1767 		BUG_ON(mdev->ee_hash == NULL);
1768 		BUG_ON(mdev->tl_hash == NULL);
1769 
1770 		/* conflict detection and handling:
1771 		 * 1. wait on the sequence number,
1772 		 *    in case this data packet overtook ACK packets.
1773 		 * 2. check our hash tables for conflicting requests.
1774 		 *    we only need to walk the tl_hash, since an ee can not
1775 		 *    have a conflict with an other ee: on the submitting
1776 		 *    node, the corresponding req had already been conflicting,
1777 		 *    and a conflicting req is never sent.
1778 		 *
1779 		 * Note: for two_primaries, we are protocol C,
1780 		 * so there cannot be any request that is DONE
1781 		 * but still on the transfer log.
1782 		 *
1783 		 * unconditionally add to the ee_hash.
1784 		 *
1785 		 * if no conflicting request is found:
1786 		 *    submit.
1787 		 *
1788 		 * if any conflicting request is found
1789 		 * that has not yet been acked,
1790 		 * AND I have the "discard concurrent writes" flag:
1791 		 *	 queue (via done_ee) the P_DISCARD_ACK; OUT.
1792 		 *
1793 		 * if any conflicting request is found:
1794 		 *	 block the receiver, waiting on misc_wait
1795 		 *	 until no more conflicting requests are there,
1796 		 *	 or we get interrupted (disconnect).
1797 		 *
1798 		 *	 we do not just write after local io completion of those
1799 		 *	 requests, but only after req is done completely, i.e.
1800 		 *	 we wait for the P_DISCARD_ACK to arrive!
1801 		 *
1802 		 *	 then proceed normally, i.e. submit.
1803 		 */
1804 		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1805 			goto out_interrupted;
1806 
1807 		spin_lock_irq(&mdev->req_lock);
1808 
1809 		hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1810 
1811 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1812 		slot = tl_hash_slot(mdev, sector);
1813 		first = 1;
1814 		for (;;) {
1815 			int have_unacked = 0;
1816 			int have_conflict = 0;
1817 			prepare_to_wait(&mdev->misc_wait, &wait,
1818 				TASK_INTERRUPTIBLE);
1819 			hlist_for_each_entry(i, n, slot, colision) {
1820 				if (OVERLAPS) {
1821 					/* only ALERT on first iteration,
1822 					 * we may be woken up early... */
1823 					if (first)
1824 						dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1825 						      "	new: %llus +%u; pending: %llus +%u\n",
1826 						      current->comm, current->pid,
1827 						      (unsigned long long)sector, size,
1828 						      (unsigned long long)i->sector, i->size);
1829 					if (i->rq_state & RQ_NET_PENDING)
1830 						++have_unacked;
1831 					++have_conflict;
1832 				}
1833 			}
1834 #undef OVERLAPS
1835 			if (!have_conflict)
1836 				break;
1837 
1838 			/* Discard Ack only for the _first_ iteration */
1839 			if (first && discard && have_unacked) {
1840 				dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1841 				     (unsigned long long)sector);
1842 				inc_unacked(mdev);
1843 				e->w.cb = e_send_discard_ack;
1844 				list_add_tail(&e->w.list, &mdev->done_ee);
1845 
1846 				spin_unlock_irq(&mdev->req_lock);
1847 
1848 				/* we could probably send that P_DISCARD_ACK ourselves,
1849 				 * but I don't like the receiver using the msock */
1850 
1851 				put_ldev(mdev);
1852 				wake_asender(mdev);
1853 				finish_wait(&mdev->misc_wait, &wait);
1854 				return TRUE;
1855 			}
1856 
1857 			if (signal_pending(current)) {
1858 				hlist_del_init(&e->colision);
1859 
1860 				spin_unlock_irq(&mdev->req_lock);
1861 
1862 				finish_wait(&mdev->misc_wait, &wait);
1863 				goto out_interrupted;
1864 			}
1865 
1866 			spin_unlock_irq(&mdev->req_lock);
1867 			if (first) {
1868 				first = 0;
1869 				dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1870 				     "sec=%llus\n", (unsigned long long)sector);
1871 			} else if (discard) {
1872 				/* we had none on the first iteration.
1873 				 * there must be none now. */
1874 				D_ASSERT(have_unacked == 0);
1875 			}
1876 			schedule();
1877 			spin_lock_irq(&mdev->req_lock);
1878 		}
1879 		finish_wait(&mdev->misc_wait, &wait);
1880 	}
1881 
1882 	list_add(&e->w.list, &mdev->active_ee);
1883 	spin_unlock_irq(&mdev->req_lock);
1884 
1885 	switch (mdev->net_conf->wire_protocol) {
1886 	case DRBD_PROT_C:
1887 		inc_unacked(mdev);
1888 		/* corresponding dec_unacked() in e_end_block()
1889 		 * respective _drbd_clear_done_ee */
1890 		break;
1891 	case DRBD_PROT_B:
1892 		/* I really don't like it that the receiver thread
1893 		 * sends on the msock, but anyways */
1894 		drbd_send_ack(mdev, P_RECV_ACK, e);
1895 		break;
1896 	case DRBD_PROT_A:
1897 		/* nothing to do */
1898 		break;
1899 	}
1900 
1901 	if (mdev->state.pdsk == D_DISKLESS) {
1902 		/* In case we have the only disk of the cluster, */
1903 		drbd_set_out_of_sync(mdev, e->sector, e->size);
1904 		e->flags |= EE_CALL_AL_COMPLETE_IO;
1905 		drbd_al_begin_io(mdev, e->sector);
1906 	}
1907 
1908 	e->private_bio->bi_rw = rw;
1909 	trace_drbd_ee(mdev, e, "submitting for (data)write");
1910 	trace_drbd_bio(mdev, "Sec", e->private_bio, 0, NULL);
1911 	drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
1912 	/* accounting done in endio */
1913 
1914 	maybe_kick_lo(mdev);
1915 	return TRUE;
1916 
1917 out_interrupted:
1918 	/* yes, the epoch_size now is imbalanced.
1919 	 * but we drop the connection anyways, so we don't have a chance to
1920 	 * receive a barrier... atomic_inc(&mdev->epoch_size); */
1921 	put_ldev(mdev);
1922 	drbd_free_ee(mdev, e);
1923 	return FALSE;
1924 }
1925 
1926 static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1927 {
1928 	sector_t sector;
1929 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1930 	struct drbd_epoch_entry *e;
1931 	struct digest_info *di = NULL;
1932 	int size, digest_size;
1933 	unsigned int fault_type;
1934 	struct p_block_req *p =
1935 		(struct p_block_req *)h;
1936 	const int brps = sizeof(*p)-sizeof(*h);
1937 
1938 	if (drbd_recv(mdev, h->payload, brps) != brps)
1939 		return FALSE;
1940 
1941 	sector = be64_to_cpu(p->sector);
1942 	size   = be32_to_cpu(p->blksize);
1943 
1944 	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
1945 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1946 				(unsigned long long)sector, size);
1947 		return FALSE;
1948 	}
1949 	if (sector + (size>>9) > capacity) {
1950 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1951 				(unsigned long long)sector, size);
1952 		return FALSE;
1953 	}
1954 
1955 	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1956 		if (__ratelimit(&drbd_ratelimit_state))
1957 			dev_err(DEV, "Can not satisfy peer's read request, "
1958 			    "no local data.\n");
1959 		drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
1960 				 P_NEG_RS_DREPLY , p);
1961 		return TRUE;
1962 	}
1963 
1964 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1965 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1966 	 * which in turn might block on the other node at this very place.  */
1967 	e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1968 	if (!e) {
1969 		put_ldev(mdev);
1970 		return FALSE;
1971 	}
1972 
1973 	e->private_bio->bi_rw = READ;
1974 	e->private_bio->bi_end_io = drbd_endio_read_sec;
1975 
1976 	switch (h->command) {
1977 	case P_DATA_REQUEST:
1978 		e->w.cb = w_e_end_data_req;
1979 		fault_type = DRBD_FAULT_DT_RD;
1980 		break;
1981 	case P_RS_DATA_REQUEST:
1982 		e->w.cb = w_e_end_rsdata_req;
1983 		fault_type = DRBD_FAULT_RS_RD;
1984 		/* Eventually this should become asynchronously. Currently it
1985 		 * blocks the whole receiver just to delay the reading of a
1986 		 * resync data block.
1987 		 * the drbd_work_queue mechanism is made for this...
1988 		 */
1989 		if (!drbd_rs_begin_io(mdev, sector)) {
1990 			/* we have been interrupted,
1991 			 * probably connection lost! */
1992 			D_ASSERT(signal_pending(current));
1993 			goto out_free_e;
1994 		}
1995 		break;
1996 
1997 	case P_OV_REPLY:
1998 	case P_CSUM_RS_REQUEST:
1999 		fault_type = DRBD_FAULT_RS_RD;
2000 		digest_size = h->length - brps ;
2001 		di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2002 		if (!di)
2003 			goto out_free_e;
2004 
2005 		di->digest_size = digest_size;
2006 		di->digest = (((char *)di)+sizeof(struct digest_info));
2007 
2008 		if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2009 			goto out_free_e;
2010 
2011 		e->block_id = (u64)(unsigned long)di;
2012 		if (h->command == P_CSUM_RS_REQUEST) {
2013 			D_ASSERT(mdev->agreed_pro_version >= 89);
2014 			e->w.cb = w_e_end_csum_rs_req;
2015 		} else if (h->command == P_OV_REPLY) {
2016 			e->w.cb = w_e_end_ov_reply;
2017 			dec_rs_pending(mdev);
2018 			break;
2019 		}
2020 
2021 		if (!drbd_rs_begin_io(mdev, sector)) {
2022 			/* we have been interrupted, probably connection lost! */
2023 			D_ASSERT(signal_pending(current));
2024 			goto out_free_e;
2025 		}
2026 		break;
2027 
2028 	case P_OV_REQUEST:
2029 		if (mdev->state.conn >= C_CONNECTED &&
2030 		    mdev->state.conn != C_VERIFY_T)
2031 			dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2032 				drbd_conn_str(mdev->state.conn));
2033 		if (mdev->ov_start_sector == ~(sector_t)0 &&
2034 		    mdev->agreed_pro_version >= 90) {
2035 			mdev->ov_start_sector = sector;
2036 			mdev->ov_position = sector;
2037 			mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2038 			dev_info(DEV, "Online Verify start sector: %llu\n",
2039 					(unsigned long long)sector);
2040 		}
2041 		e->w.cb = w_e_end_ov_req;
2042 		fault_type = DRBD_FAULT_RS_RD;
2043 		/* Eventually this should become asynchronous. Currently it
2044 		 * blocks the whole receiver just to delay the reading of a
2045 		 * resync data block.
2046 		 * the drbd_work_queue mechanism is made for this...
2047 		 */
2048 		if (!drbd_rs_begin_io(mdev, sector)) {
2049 			/* we have been interrupted,
2050 			 * probably connection lost! */
2051 			D_ASSERT(signal_pending(current));
2052 			goto out_free_e;
2053 		}
2054 		break;
2055 
2056 
2057 	default:
2058 		dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2059 		    cmdname(h->command));
2060 		fault_type = DRBD_FAULT_MAX;
2061 	}
2062 
2063 	spin_lock_irq(&mdev->req_lock);
2064 	list_add(&e->w.list, &mdev->read_ee);
2065 	spin_unlock_irq(&mdev->req_lock);
2066 
2067 	inc_unacked(mdev);
2068 
2069 	trace_drbd_ee(mdev, e, "submitting for read");
2070 	trace_drbd_bio(mdev, "Sec", e->private_bio, 0, NULL);
2071 	drbd_generic_make_request(mdev, fault_type, e->private_bio);
2072 	maybe_kick_lo(mdev);
2073 
2074 	return TRUE;
2075 
2076 out_free_e:
2077 	kfree(di);
2078 	put_ldev(mdev);
2079 	drbd_free_ee(mdev, e);
2080 	return FALSE;
2081 }
2082 
2083 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2084 {
2085 	int self, peer, rv = -100;
2086 	unsigned long ch_self, ch_peer;
2087 
2088 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2089 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2090 
2091 	ch_peer = mdev->p_uuid[UI_SIZE];
2092 	ch_self = mdev->comm_bm_set;
2093 
2094 	switch (mdev->net_conf->after_sb_0p) {
2095 	case ASB_CONSENSUS:
2096 	case ASB_DISCARD_SECONDARY:
2097 	case ASB_CALL_HELPER:
2098 		dev_err(DEV, "Configuration error.\n");
2099 		break;
2100 	case ASB_DISCONNECT:
2101 		break;
2102 	case ASB_DISCARD_YOUNGER_PRI:
2103 		if (self == 0 && peer == 1) {
2104 			rv = -1;
2105 			break;
2106 		}
2107 		if (self == 1 && peer == 0) {
2108 			rv =  1;
2109 			break;
2110 		}
2111 		/* Else fall through to one of the other strategies... */
2112 	case ASB_DISCARD_OLDER_PRI:
2113 		if (self == 0 && peer == 1) {
2114 			rv = 1;
2115 			break;
2116 		}
2117 		if (self == 1 && peer == 0) {
2118 			rv = -1;
2119 			break;
2120 		}
2121 		/* Else fall through to one of the other strategies... */
2122 		dev_warn(DEV, "Discard younger/older primary did not found a decision\n"
2123 		     "Using discard-least-changes instead\n");
2124 	case ASB_DISCARD_ZERO_CHG:
2125 		if (ch_peer == 0 && ch_self == 0) {
2126 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2127 				? -1 : 1;
2128 			break;
2129 		} else {
2130 			if (ch_peer == 0) { rv =  1; break; }
2131 			if (ch_self == 0) { rv = -1; break; }
2132 		}
2133 		if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2134 			break;
2135 	case ASB_DISCARD_LEAST_CHG:
2136 		if	(ch_self < ch_peer)
2137 			rv = -1;
2138 		else if (ch_self > ch_peer)
2139 			rv =  1;
2140 		else /* ( ch_self == ch_peer ) */
2141 		     /* Well, then use something else. */
2142 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2143 				? -1 : 1;
2144 		break;
2145 	case ASB_DISCARD_LOCAL:
2146 		rv = -1;
2147 		break;
2148 	case ASB_DISCARD_REMOTE:
2149 		rv =  1;
2150 	}
2151 
2152 	return rv;
2153 }
2154 
2155 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2156 {
2157 	int self, peer, hg, rv = -100;
2158 
2159 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2160 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2161 
2162 	switch (mdev->net_conf->after_sb_1p) {
2163 	case ASB_DISCARD_YOUNGER_PRI:
2164 	case ASB_DISCARD_OLDER_PRI:
2165 	case ASB_DISCARD_LEAST_CHG:
2166 	case ASB_DISCARD_LOCAL:
2167 	case ASB_DISCARD_REMOTE:
2168 		dev_err(DEV, "Configuration error.\n");
2169 		break;
2170 	case ASB_DISCONNECT:
2171 		break;
2172 	case ASB_CONSENSUS:
2173 		hg = drbd_asb_recover_0p(mdev);
2174 		if (hg == -1 && mdev->state.role == R_SECONDARY)
2175 			rv = hg;
2176 		if (hg == 1  && mdev->state.role == R_PRIMARY)
2177 			rv = hg;
2178 		break;
2179 	case ASB_VIOLENTLY:
2180 		rv = drbd_asb_recover_0p(mdev);
2181 		break;
2182 	case ASB_DISCARD_SECONDARY:
2183 		return mdev->state.role == R_PRIMARY ? 1 : -1;
2184 	case ASB_CALL_HELPER:
2185 		hg = drbd_asb_recover_0p(mdev);
2186 		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2187 			self = drbd_set_role(mdev, R_SECONDARY, 0);
2188 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2189 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2190 			  * we do not need to wait for the after state change work either. */
2191 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2192 			if (self != SS_SUCCESS) {
2193 				drbd_khelper(mdev, "pri-lost-after-sb");
2194 			} else {
2195 				dev_warn(DEV, "Successfully gave up primary role.\n");
2196 				rv = hg;
2197 			}
2198 		} else
2199 			rv = hg;
2200 	}
2201 
2202 	return rv;
2203 }
2204 
2205 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2206 {
2207 	int self, peer, hg, rv = -100;
2208 
2209 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2210 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2211 
2212 	switch (mdev->net_conf->after_sb_2p) {
2213 	case ASB_DISCARD_YOUNGER_PRI:
2214 	case ASB_DISCARD_OLDER_PRI:
2215 	case ASB_DISCARD_LEAST_CHG:
2216 	case ASB_DISCARD_LOCAL:
2217 	case ASB_DISCARD_REMOTE:
2218 	case ASB_CONSENSUS:
2219 	case ASB_DISCARD_SECONDARY:
2220 		dev_err(DEV, "Configuration error.\n");
2221 		break;
2222 	case ASB_VIOLENTLY:
2223 		rv = drbd_asb_recover_0p(mdev);
2224 		break;
2225 	case ASB_DISCONNECT:
2226 		break;
2227 	case ASB_CALL_HELPER:
2228 		hg = drbd_asb_recover_0p(mdev);
2229 		if (hg == -1) {
2230 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2231 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2232 			  * we do not need to wait for the after state change work either. */
2233 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2234 			if (self != SS_SUCCESS) {
2235 				drbd_khelper(mdev, "pri-lost-after-sb");
2236 			} else {
2237 				dev_warn(DEV, "Successfully gave up primary role.\n");
2238 				rv = hg;
2239 			}
2240 		} else
2241 			rv = hg;
2242 	}
2243 
2244 	return rv;
2245 }
2246 
2247 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2248 			   u64 bits, u64 flags)
2249 {
2250 	if (!uuid) {
2251 		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2252 		return;
2253 	}
2254 	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2255 	     text,
2256 	     (unsigned long long)uuid[UI_CURRENT],
2257 	     (unsigned long long)uuid[UI_BITMAP],
2258 	     (unsigned long long)uuid[UI_HISTORY_START],
2259 	     (unsigned long long)uuid[UI_HISTORY_END],
2260 	     (unsigned long long)bits,
2261 	     (unsigned long long)flags);
2262 }
2263 
2264 /*
2265   100	after split brain try auto recover
2266     2	C_SYNC_SOURCE set BitMap
2267     1	C_SYNC_SOURCE use BitMap
2268     0	no Sync
2269    -1	C_SYNC_TARGET use BitMap
2270    -2	C_SYNC_TARGET set BitMap
2271  -100	after split brain, disconnect
2272 -1000	unrelated data
2273  */
2274 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2275 {
2276 	u64 self, peer;
2277 	int i, j;
2278 
2279 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2280 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2281 
2282 	*rule_nr = 10;
2283 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2284 		return 0;
2285 
2286 	*rule_nr = 20;
2287 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2288 	     peer != UUID_JUST_CREATED)
2289 		return -2;
2290 
2291 	*rule_nr = 30;
2292 	if (self != UUID_JUST_CREATED &&
2293 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2294 		return 2;
2295 
2296 	if (self == peer) {
2297 		int rct, dc; /* roles at crash time */
2298 
2299 		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2300 
2301 			if (mdev->agreed_pro_version < 91)
2302 				return -1001;
2303 
2304 			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2305 			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2306 				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2307 				drbd_uuid_set_bm(mdev, 0UL);
2308 
2309 				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2310 					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2311 				*rule_nr = 34;
2312 			} else {
2313 				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2314 				*rule_nr = 36;
2315 			}
2316 
2317 			return 1;
2318 		}
2319 
2320 		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2321 
2322 			if (mdev->agreed_pro_version < 91)
2323 				return -1001;
2324 
2325 			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2326 			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2327 				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2328 
2329 				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2330 				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2331 				mdev->p_uuid[UI_BITMAP] = 0UL;
2332 
2333 				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2334 				*rule_nr = 35;
2335 			} else {
2336 				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2337 				*rule_nr = 37;
2338 			}
2339 
2340 			return -1;
2341 		}
2342 
2343 		/* Common power [off|failure] */
2344 		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2345 			(mdev->p_uuid[UI_FLAGS] & 2);
2346 		/* lowest bit is set when we were primary,
2347 		 * next bit (weight 2) is set when peer was primary */
2348 		*rule_nr = 40;
2349 
2350 		switch (rct) {
2351 		case 0: /* !self_pri && !peer_pri */ return 0;
2352 		case 1: /*  self_pri && !peer_pri */ return 1;
2353 		case 2: /* !self_pri &&  peer_pri */ return -1;
2354 		case 3: /*  self_pri &&  peer_pri */
2355 			dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2356 			return dc ? -1 : 1;
2357 		}
2358 	}
2359 
2360 	*rule_nr = 50;
2361 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2362 	if (self == peer)
2363 		return -1;
2364 
2365 	*rule_nr = 51;
2366 	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2367 	if (self == peer) {
2368 		self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2369 		peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2370 		if (self == peer) {
2371 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2372 			   resync as sync source modifications of the peer's UUIDs. */
2373 
2374 			if (mdev->agreed_pro_version < 91)
2375 				return -1001;
2376 
2377 			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2378 			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2379 			return -1;
2380 		}
2381 	}
2382 
2383 	*rule_nr = 60;
2384 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2385 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2386 		peer = mdev->p_uuid[i] & ~((u64)1);
2387 		if (self == peer)
2388 			return -2;
2389 	}
2390 
2391 	*rule_nr = 70;
2392 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2393 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2394 	if (self == peer)
2395 		return 1;
2396 
2397 	*rule_nr = 71;
2398 	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2399 	if (self == peer) {
2400 		self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2401 		peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2402 		if (self == peer) {
2403 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2404 			   resync as sync source modifications of our UUIDs. */
2405 
2406 			if (mdev->agreed_pro_version < 91)
2407 				return -1001;
2408 
2409 			_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2410 			_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2411 
2412 			dev_info(DEV, "Undid last start of resync:\n");
2413 
2414 			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2415 				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2416 
2417 			return 1;
2418 		}
2419 	}
2420 
2421 
2422 	*rule_nr = 80;
2423 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2424 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2425 		if (self == peer)
2426 			return 2;
2427 	}
2428 
2429 	*rule_nr = 90;
2430 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2431 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2432 	if (self == peer && self != ((u64)0))
2433 		return 100;
2434 
2435 	*rule_nr = 100;
2436 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2437 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2438 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2439 			peer = mdev->p_uuid[j] & ~((u64)1);
2440 			if (self == peer)
2441 				return -100;
2442 		}
2443 	}
2444 
2445 	return -1000;
2446 }
2447 
2448 /* drbd_sync_handshake() returns the new conn state on success, or
2449    CONN_MASK (-1) on failure.
2450  */
2451 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2452 					   enum drbd_disk_state peer_disk) __must_hold(local)
2453 {
2454 	int hg, rule_nr;
2455 	enum drbd_conns rv = C_MASK;
2456 	enum drbd_disk_state mydisk;
2457 
2458 	mydisk = mdev->state.disk;
2459 	if (mydisk == D_NEGOTIATING)
2460 		mydisk = mdev->new_state_tmp.disk;
2461 
2462 	dev_info(DEV, "drbd_sync_handshake:\n");
2463 	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2464 	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2465 		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2466 
2467 	hg = drbd_uuid_compare(mdev, &rule_nr);
2468 
2469 	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2470 
2471 	if (hg == -1000) {
2472 		dev_alert(DEV, "Unrelated data, aborting!\n");
2473 		return C_MASK;
2474 	}
2475 	if (hg == -1001) {
2476 		dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2477 		return C_MASK;
2478 	}
2479 
2480 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2481 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2482 		int f = (hg == -100) || abs(hg) == 2;
2483 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2484 		if (f)
2485 			hg = hg*2;
2486 		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2487 		     hg > 0 ? "source" : "target");
2488 	}
2489 
2490 	if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2491 		int pcount = (mdev->state.role == R_PRIMARY)
2492 			   + (peer_role == R_PRIMARY);
2493 		int forced = (hg == -100);
2494 
2495 		switch (pcount) {
2496 		case 0:
2497 			hg = drbd_asb_recover_0p(mdev);
2498 			break;
2499 		case 1:
2500 			hg = drbd_asb_recover_1p(mdev);
2501 			break;
2502 		case 2:
2503 			hg = drbd_asb_recover_2p(mdev);
2504 			break;
2505 		}
2506 		if (abs(hg) < 100) {
2507 			dev_warn(DEV, "Split-Brain detected, %d primaries, "
2508 			     "automatically solved. Sync from %s node\n",
2509 			     pcount, (hg < 0) ? "peer" : "this");
2510 			if (forced) {
2511 				dev_warn(DEV, "Doing a full sync, since"
2512 				     " UUIDs where ambiguous.\n");
2513 				hg = hg*2;
2514 			}
2515 		}
2516 	}
2517 
2518 	if (hg == -100) {
2519 		if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2520 			hg = -1;
2521 		if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2522 			hg = 1;
2523 
2524 		if (abs(hg) < 100)
2525 			dev_warn(DEV, "Split-Brain detected, manually solved. "
2526 			     "Sync from %s node\n",
2527 			     (hg < 0) ? "peer" : "this");
2528 	}
2529 
2530 	if (hg == -100) {
2531 		dev_alert(DEV, "Split-Brain detected, dropping connection!\n");
2532 		drbd_khelper(mdev, "split-brain");
2533 		return C_MASK;
2534 	}
2535 
2536 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
2537 		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2538 		return C_MASK;
2539 	}
2540 
2541 	if (hg < 0 && /* by intention we do not use mydisk here. */
2542 	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2543 		switch (mdev->net_conf->rr_conflict) {
2544 		case ASB_CALL_HELPER:
2545 			drbd_khelper(mdev, "pri-lost");
2546 			/* fall through */
2547 		case ASB_DISCONNECT:
2548 			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2549 			return C_MASK;
2550 		case ASB_VIOLENTLY:
2551 			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2552 			     "assumption\n");
2553 		}
2554 	}
2555 
2556 	if (abs(hg) >= 2) {
2557 		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2558 		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2559 			return C_MASK;
2560 	}
2561 
2562 	if (hg > 0) { /* become sync source. */
2563 		rv = C_WF_BITMAP_S;
2564 	} else if (hg < 0) { /* become sync target */
2565 		rv = C_WF_BITMAP_T;
2566 	} else {
2567 		rv = C_CONNECTED;
2568 		if (drbd_bm_total_weight(mdev)) {
2569 			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2570 			     drbd_bm_total_weight(mdev));
2571 		}
2572 	}
2573 
2574 	return rv;
2575 }
2576 
2577 /* returns 1 if invalid */
2578 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2579 {
2580 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2581 	if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2582 	    (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2583 		return 0;
2584 
2585 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2586 	if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2587 	    self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2588 		return 1;
2589 
2590 	/* everything else is valid if they are equal on both sides. */
2591 	if (peer == self)
2592 		return 0;
2593 
2594 	/* everything es is invalid. */
2595 	return 1;
2596 }
2597 
2598 static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2599 {
2600 	struct p_protocol *p = (struct p_protocol *)h;
2601 	int header_size, data_size;
2602 	int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2603 	int p_want_lose, p_two_primaries;
2604 	char p_integrity_alg[SHARED_SECRET_MAX] = "";
2605 
2606 	header_size = sizeof(*p) - sizeof(*h);
2607 	data_size   = h->length  - header_size;
2608 
2609 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2610 		return FALSE;
2611 
2612 	p_proto		= be32_to_cpu(p->protocol);
2613 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
2614 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
2615 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
2616 	p_want_lose	= be32_to_cpu(p->want_lose);
2617 	p_two_primaries = be32_to_cpu(p->two_primaries);
2618 
2619 	if (p_proto != mdev->net_conf->wire_protocol) {
2620 		dev_err(DEV, "incompatible communication protocols\n");
2621 		goto disconnect;
2622 	}
2623 
2624 	if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2625 		dev_err(DEV, "incompatible after-sb-0pri settings\n");
2626 		goto disconnect;
2627 	}
2628 
2629 	if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2630 		dev_err(DEV, "incompatible after-sb-1pri settings\n");
2631 		goto disconnect;
2632 	}
2633 
2634 	if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2635 		dev_err(DEV, "incompatible after-sb-2pri settings\n");
2636 		goto disconnect;
2637 	}
2638 
2639 	if (p_want_lose && mdev->net_conf->want_lose) {
2640 		dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2641 		goto disconnect;
2642 	}
2643 
2644 	if (p_two_primaries != mdev->net_conf->two_primaries) {
2645 		dev_err(DEV, "incompatible setting of the two-primaries options\n");
2646 		goto disconnect;
2647 	}
2648 
2649 	if (mdev->agreed_pro_version >= 87) {
2650 		unsigned char *my_alg = mdev->net_conf->integrity_alg;
2651 
2652 		if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2653 			return FALSE;
2654 
2655 		p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2656 		if (strcmp(p_integrity_alg, my_alg)) {
2657 			dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2658 			goto disconnect;
2659 		}
2660 		dev_info(DEV, "data-integrity-alg: %s\n",
2661 		     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2662 	}
2663 
2664 	return TRUE;
2665 
2666 disconnect:
2667 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2668 	return FALSE;
2669 }
2670 
2671 /* helper function
2672  * input: alg name, feature name
2673  * return: NULL (alg name was "")
2674  *         ERR_PTR(error) if something goes wrong
2675  *         or the crypto hash ptr, if it worked out ok. */
2676 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2677 		const char *alg, const char *name)
2678 {
2679 	struct crypto_hash *tfm;
2680 
2681 	if (!alg[0])
2682 		return NULL;
2683 
2684 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2685 	if (IS_ERR(tfm)) {
2686 		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2687 			alg, name, PTR_ERR(tfm));
2688 		return tfm;
2689 	}
2690 	if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2691 		crypto_free_hash(tfm);
2692 		dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2693 		return ERR_PTR(-EINVAL);
2694 	}
2695 	return tfm;
2696 }
2697 
2698 static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2699 {
2700 	int ok = TRUE;
2701 	struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2702 	unsigned int header_size, data_size, exp_max_sz;
2703 	struct crypto_hash *verify_tfm = NULL;
2704 	struct crypto_hash *csums_tfm = NULL;
2705 	const int apv = mdev->agreed_pro_version;
2706 
2707 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2708 		    : apv == 88 ? sizeof(struct p_rs_param)
2709 					+ SHARED_SECRET_MAX
2710 		    : /* 89 */    sizeof(struct p_rs_param_89);
2711 
2712 	if (h->length > exp_max_sz) {
2713 		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2714 		    h->length, exp_max_sz);
2715 		return FALSE;
2716 	}
2717 
2718 	if (apv <= 88) {
2719 		header_size = sizeof(struct p_rs_param) - sizeof(*h);
2720 		data_size   = h->length  - header_size;
2721 	} else /* apv >= 89 */ {
2722 		header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2723 		data_size   = h->length  - header_size;
2724 		D_ASSERT(data_size == 0);
2725 	}
2726 
2727 	/* initialize verify_alg and csums_alg */
2728 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2729 
2730 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2731 		return FALSE;
2732 
2733 	mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2734 
2735 	if (apv >= 88) {
2736 		if (apv == 88) {
2737 			if (data_size > SHARED_SECRET_MAX) {
2738 				dev_err(DEV, "verify-alg too long, "
2739 				    "peer wants %u, accepting only %u byte\n",
2740 						data_size, SHARED_SECRET_MAX);
2741 				return FALSE;
2742 			}
2743 
2744 			if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2745 				return FALSE;
2746 
2747 			/* we expect NUL terminated string */
2748 			/* but just in case someone tries to be evil */
2749 			D_ASSERT(p->verify_alg[data_size-1] == 0);
2750 			p->verify_alg[data_size-1] = 0;
2751 
2752 		} else /* apv >= 89 */ {
2753 			/* we still expect NUL terminated strings */
2754 			/* but just in case someone tries to be evil */
2755 			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2756 			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2757 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2758 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2759 		}
2760 
2761 		if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2762 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2763 				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2764 				    mdev->sync_conf.verify_alg, p->verify_alg);
2765 				goto disconnect;
2766 			}
2767 			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2768 					p->verify_alg, "verify-alg");
2769 			if (IS_ERR(verify_tfm)) {
2770 				verify_tfm = NULL;
2771 				goto disconnect;
2772 			}
2773 		}
2774 
2775 		if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2776 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2777 				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2778 				    mdev->sync_conf.csums_alg, p->csums_alg);
2779 				goto disconnect;
2780 			}
2781 			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2782 					p->csums_alg, "csums-alg");
2783 			if (IS_ERR(csums_tfm)) {
2784 				csums_tfm = NULL;
2785 				goto disconnect;
2786 			}
2787 		}
2788 
2789 
2790 		spin_lock(&mdev->peer_seq_lock);
2791 		/* lock against drbd_nl_syncer_conf() */
2792 		if (verify_tfm) {
2793 			strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2794 			mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2795 			crypto_free_hash(mdev->verify_tfm);
2796 			mdev->verify_tfm = verify_tfm;
2797 			dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2798 		}
2799 		if (csums_tfm) {
2800 			strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2801 			mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2802 			crypto_free_hash(mdev->csums_tfm);
2803 			mdev->csums_tfm = csums_tfm;
2804 			dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2805 		}
2806 		spin_unlock(&mdev->peer_seq_lock);
2807 	}
2808 
2809 	return ok;
2810 disconnect:
2811 	/* just for completeness: actually not needed,
2812 	 * as this is not reached if csums_tfm was ok. */
2813 	crypto_free_hash(csums_tfm);
2814 	/* but free the verify_tfm again, if csums_tfm did not work out */
2815 	crypto_free_hash(verify_tfm);
2816 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2817 	return FALSE;
2818 }
2819 
2820 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2821 {
2822 	/* sorry, we currently have no working implementation
2823 	 * of distributed TCQ */
2824 }
2825 
2826 /* warn if the arguments differ by more than 12.5% */
2827 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2828 	const char *s, sector_t a, sector_t b)
2829 {
2830 	sector_t d;
2831 	if (a == 0 || b == 0)
2832 		return;
2833 	d = (a > b) ? (a - b) : (b - a);
2834 	if (d > (a>>3) || d > (b>>3))
2835 		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2836 		     (unsigned long long)a, (unsigned long long)b);
2837 }
2838 
2839 static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2840 {
2841 	struct p_sizes *p = (struct p_sizes *)h;
2842 	enum determine_dev_size dd = unchanged;
2843 	unsigned int max_seg_s;
2844 	sector_t p_size, p_usize, my_usize;
2845 	int ldsc = 0; /* local disk size changed */
2846 	enum drbd_conns nconn;
2847 
2848 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2849 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
2850 		return FALSE;
2851 
2852 	p_size = be64_to_cpu(p->d_size);
2853 	p_usize = be64_to_cpu(p->u_size);
2854 
2855 	if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2856 		dev_err(DEV, "some backing storage is needed\n");
2857 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2858 		return FALSE;
2859 	}
2860 
2861 	/* just store the peer's disk size for now.
2862 	 * we still need to figure out whether we accept that. */
2863 	mdev->p_size = p_size;
2864 
2865 #define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2866 	if (get_ldev(mdev)) {
2867 		warn_if_differ_considerably(mdev, "lower level device sizes",
2868 			   p_size, drbd_get_max_capacity(mdev->ldev));
2869 		warn_if_differ_considerably(mdev, "user requested size",
2870 					    p_usize, mdev->ldev->dc.disk_size);
2871 
2872 		/* if this is the first connect, or an otherwise expected
2873 		 * param exchange, choose the minimum */
2874 		if (mdev->state.conn == C_WF_REPORT_PARAMS)
2875 			p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2876 					     p_usize);
2877 
2878 		my_usize = mdev->ldev->dc.disk_size;
2879 
2880 		if (mdev->ldev->dc.disk_size != p_usize) {
2881 			mdev->ldev->dc.disk_size = p_usize;
2882 			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2883 			     (unsigned long)mdev->ldev->dc.disk_size);
2884 		}
2885 
2886 		/* Never shrink a device with usable data during connect.
2887 		   But allow online shrinking if we are connected. */
2888 		if (drbd_new_dev_size(mdev, mdev->ldev) <
2889 		   drbd_get_capacity(mdev->this_bdev) &&
2890 		   mdev->state.disk >= D_OUTDATED &&
2891 		   mdev->state.conn < C_CONNECTED) {
2892 			dev_err(DEV, "The peer's disk size is too small!\n");
2893 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2894 			mdev->ldev->dc.disk_size = my_usize;
2895 			put_ldev(mdev);
2896 			return FALSE;
2897 		}
2898 		put_ldev(mdev);
2899 	}
2900 #undef min_not_zero
2901 
2902 	if (get_ldev(mdev)) {
2903 		dd = drbd_determin_dev_size(mdev);
2904 		put_ldev(mdev);
2905 		if (dd == dev_size_error)
2906 			return FALSE;
2907 		drbd_md_sync(mdev);
2908 	} else {
2909 		/* I am diskless, need to accept the peer's size. */
2910 		drbd_set_my_capacity(mdev, p_size);
2911 	}
2912 
2913 	if (mdev->p_uuid && mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
2914 		nconn = drbd_sync_handshake(mdev,
2915 				mdev->state.peer, mdev->state.pdsk);
2916 		put_ldev(mdev);
2917 
2918 		if (nconn == C_MASK) {
2919 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2920 			return FALSE;
2921 		}
2922 
2923 		if (drbd_request_state(mdev, NS(conn, nconn)) < SS_SUCCESS) {
2924 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2925 			return FALSE;
2926 		}
2927 	}
2928 
2929 	if (get_ldev(mdev)) {
2930 		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
2931 			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
2932 			ldsc = 1;
2933 		}
2934 
2935 		max_seg_s = be32_to_cpu(p->max_segment_size);
2936 		if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
2937 			drbd_setup_queue_param(mdev, max_seg_s);
2938 
2939 		drbd_setup_order_type(mdev, be32_to_cpu(p->queue_order_type));
2940 		put_ldev(mdev);
2941 	}
2942 
2943 	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
2944 		if (be64_to_cpu(p->c_size) !=
2945 		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
2946 			/* we have different sizes, probably peer
2947 			 * needs to know my new size... */
2948 			drbd_send_sizes(mdev, 0);
2949 		}
2950 		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
2951 		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
2952 			if (mdev->state.pdsk >= D_INCONSISTENT &&
2953 			    mdev->state.disk >= D_INCONSISTENT)
2954 				resync_after_online_grow(mdev);
2955 			else
2956 				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
2957 		}
2958 	}
2959 
2960 	return TRUE;
2961 }
2962 
2963 static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
2964 {
2965 	struct p_uuids *p = (struct p_uuids *)h;
2966 	u64 *p_uuid;
2967 	int i;
2968 
2969 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2970 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
2971 		return FALSE;
2972 
2973 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
2974 
2975 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
2976 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
2977 
2978 	kfree(mdev->p_uuid);
2979 	mdev->p_uuid = p_uuid;
2980 
2981 	if (mdev->state.conn < C_CONNECTED &&
2982 	    mdev->state.disk < D_INCONSISTENT &&
2983 	    mdev->state.role == R_PRIMARY &&
2984 	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
2985 		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
2986 		    (unsigned long long)mdev->ed_uuid);
2987 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2988 		return FALSE;
2989 	}
2990 
2991 	if (get_ldev(mdev)) {
2992 		int skip_initial_sync =
2993 			mdev->state.conn == C_CONNECTED &&
2994 			mdev->agreed_pro_version >= 90 &&
2995 			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
2996 			(p_uuid[UI_FLAGS] & 8);
2997 		if (skip_initial_sync) {
2998 			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
2999 			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3000 					"clear_n_write from receive_uuids");
3001 			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3002 			_drbd_uuid_set(mdev, UI_BITMAP, 0);
3003 			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3004 					CS_VERBOSE, NULL);
3005 			drbd_md_sync(mdev);
3006 		}
3007 		put_ldev(mdev);
3008 	}
3009 
3010 	/* Before we test for the disk state, we should wait until an eventually
3011 	   ongoing cluster wide state change is finished. That is important if
3012 	   we are primary and are detaching from our disk. We need to see the
3013 	   new disk state... */
3014 	wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3015 	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3016 		drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3017 
3018 	return TRUE;
3019 }
3020 
3021 /**
3022  * convert_state() - Converts the peer's view of the cluster state to our point of view
3023  * @ps:		The state as seen by the peer.
3024  */
3025 static union drbd_state convert_state(union drbd_state ps)
3026 {
3027 	union drbd_state ms;
3028 
3029 	static enum drbd_conns c_tab[] = {
3030 		[C_CONNECTED] = C_CONNECTED,
3031 
3032 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3033 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3034 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3035 		[C_VERIFY_S]       = C_VERIFY_T,
3036 		[C_MASK]   = C_MASK,
3037 	};
3038 
3039 	ms.i = ps.i;
3040 
3041 	ms.conn = c_tab[ps.conn];
3042 	ms.peer = ps.role;
3043 	ms.role = ps.peer;
3044 	ms.pdsk = ps.disk;
3045 	ms.disk = ps.pdsk;
3046 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3047 
3048 	return ms;
3049 }
3050 
3051 static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3052 {
3053 	struct p_req_state *p = (struct p_req_state *)h;
3054 	union drbd_state mask, val;
3055 	int rv;
3056 
3057 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3058 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3059 		return FALSE;
3060 
3061 	mask.i = be32_to_cpu(p->mask);
3062 	val.i = be32_to_cpu(p->val);
3063 
3064 	if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3065 	    test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3066 		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3067 		return TRUE;
3068 	}
3069 
3070 	mask = convert_state(mask);
3071 	val = convert_state(val);
3072 
3073 	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3074 
3075 	drbd_send_sr_reply(mdev, rv);
3076 	drbd_md_sync(mdev);
3077 
3078 	return TRUE;
3079 }
3080 
3081 static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3082 {
3083 	struct p_state *p = (struct p_state *)h;
3084 	enum drbd_conns nconn, oconn;
3085 	union drbd_state ns, peer_state;
3086 	enum drbd_disk_state real_peer_disk;
3087 	int rv;
3088 
3089 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3090 		return FALSE;
3091 
3092 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3093 		return FALSE;
3094 
3095 	peer_state.i = be32_to_cpu(p->state);
3096 
3097 	real_peer_disk = peer_state.disk;
3098 	if (peer_state.disk == D_NEGOTIATING) {
3099 		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3100 		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3101 	}
3102 
3103 	spin_lock_irq(&mdev->req_lock);
3104  retry:
3105 	oconn = nconn = mdev->state.conn;
3106 	spin_unlock_irq(&mdev->req_lock);
3107 
3108 	if (nconn == C_WF_REPORT_PARAMS)
3109 		nconn = C_CONNECTED;
3110 
3111 	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3112 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3113 		int cr; /* consider resync */
3114 
3115 		/* if we established a new connection */
3116 		cr  = (oconn < C_CONNECTED);
3117 		/* if we had an established connection
3118 		 * and one of the nodes newly attaches a disk */
3119 		cr |= (oconn == C_CONNECTED &&
3120 		       (peer_state.disk == D_NEGOTIATING ||
3121 			mdev->state.disk == D_NEGOTIATING));
3122 		/* if we have both been inconsistent, and the peer has been
3123 		 * forced to be UpToDate with --overwrite-data */
3124 		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3125 		/* if we had been plain connected, and the admin requested to
3126 		 * start a sync by "invalidate" or "invalidate-remote" */
3127 		cr |= (oconn == C_CONNECTED &&
3128 				(peer_state.conn >= C_STARTING_SYNC_S &&
3129 				 peer_state.conn <= C_WF_BITMAP_T));
3130 
3131 		if (cr)
3132 			nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3133 
3134 		put_ldev(mdev);
3135 		if (nconn == C_MASK) {
3136 			if (mdev->state.disk == D_NEGOTIATING) {
3137 				drbd_force_state(mdev, NS(disk, D_DISKLESS));
3138 				nconn = C_CONNECTED;
3139 			} else if (peer_state.disk == D_NEGOTIATING) {
3140 				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3141 				peer_state.disk = D_DISKLESS;
3142 			} else {
3143 				D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3144 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3145 				return FALSE;
3146 			}
3147 		}
3148 	}
3149 
3150 	spin_lock_irq(&mdev->req_lock);
3151 	if (mdev->state.conn != oconn)
3152 		goto retry;
3153 	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3154 	ns.i = mdev->state.i;
3155 	ns.conn = nconn;
3156 	ns.peer = peer_state.role;
3157 	ns.pdsk = real_peer_disk;
3158 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3159 	if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3160 		ns.disk = mdev->new_state_tmp.disk;
3161 
3162 	rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3163 	ns = mdev->state;
3164 	spin_unlock_irq(&mdev->req_lock);
3165 
3166 	if (rv < SS_SUCCESS) {
3167 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3168 		return FALSE;
3169 	}
3170 
3171 	if (oconn > C_WF_REPORT_PARAMS) {
3172 		if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3173 		    peer_state.disk != D_NEGOTIATING ) {
3174 			/* we want resync, peer has not yet decided to sync... */
3175 			/* Nowadays only used when forcing a node into primary role and
3176 			   setting its disk to UpToDate with that */
3177 			drbd_send_uuids(mdev);
3178 			drbd_send_state(mdev);
3179 		}
3180 	}
3181 
3182 	mdev->net_conf->want_lose = 0;
3183 
3184 	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3185 
3186 	return TRUE;
3187 }
3188 
3189 static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3190 {
3191 	struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3192 
3193 	wait_event(mdev->misc_wait,
3194 		   mdev->state.conn == C_WF_SYNC_UUID ||
3195 		   mdev->state.conn < C_CONNECTED ||
3196 		   mdev->state.disk < D_NEGOTIATING);
3197 
3198 	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3199 
3200 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3201 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3202 		return FALSE;
3203 
3204 	/* Here the _drbd_uuid_ functions are right, current should
3205 	   _not_ be rotated into the history */
3206 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3207 		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3208 		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3209 
3210 		drbd_start_resync(mdev, C_SYNC_TARGET);
3211 
3212 		put_ldev(mdev);
3213 	} else
3214 		dev_err(DEV, "Ignoring SyncUUID packet!\n");
3215 
3216 	return TRUE;
3217 }
3218 
3219 enum receive_bitmap_ret { OK, DONE, FAILED };
3220 
3221 static enum receive_bitmap_ret
3222 receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3223 	unsigned long *buffer, struct bm_xfer_ctx *c)
3224 {
3225 	unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3226 	unsigned want = num_words * sizeof(long);
3227 
3228 	if (want != h->length) {
3229 		dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3230 		return FAILED;
3231 	}
3232 	if (want == 0)
3233 		return DONE;
3234 	if (drbd_recv(mdev, buffer, want) != want)
3235 		return FAILED;
3236 
3237 	drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3238 
3239 	c->word_offset += num_words;
3240 	c->bit_offset = c->word_offset * BITS_PER_LONG;
3241 	if (c->bit_offset > c->bm_bits)
3242 		c->bit_offset = c->bm_bits;
3243 
3244 	return OK;
3245 }
3246 
3247 static enum receive_bitmap_ret
3248 recv_bm_rle_bits(struct drbd_conf *mdev,
3249 		struct p_compressed_bm *p,
3250 		struct bm_xfer_ctx *c)
3251 {
3252 	struct bitstream bs;
3253 	u64 look_ahead;
3254 	u64 rl;
3255 	u64 tmp;
3256 	unsigned long s = c->bit_offset;
3257 	unsigned long e;
3258 	int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3259 	int toggle = DCBP_get_start(p);
3260 	int have;
3261 	int bits;
3262 
3263 	bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3264 
3265 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
3266 	if (bits < 0)
3267 		return FAILED;
3268 
3269 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
3270 		bits = vli_decode_bits(&rl, look_ahead);
3271 		if (bits <= 0)
3272 			return FAILED;
3273 
3274 		if (toggle) {
3275 			e = s + rl -1;
3276 			if (e >= c->bm_bits) {
3277 				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3278 				return FAILED;
3279 			}
3280 			_drbd_bm_set_bits(mdev, s, e);
3281 		}
3282 
3283 		if (have < bits) {
3284 			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3285 				have, bits, look_ahead,
3286 				(unsigned int)(bs.cur.b - p->code),
3287 				(unsigned int)bs.buf_len);
3288 			return FAILED;
3289 		}
3290 		look_ahead >>= bits;
3291 		have -= bits;
3292 
3293 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3294 		if (bits < 0)
3295 			return FAILED;
3296 		look_ahead |= tmp << have;
3297 		have += bits;
3298 	}
3299 
3300 	c->bit_offset = s;
3301 	bm_xfer_ctx_bit_to_word_offset(c);
3302 
3303 	return (s == c->bm_bits) ? DONE : OK;
3304 }
3305 
3306 static enum receive_bitmap_ret
3307 decode_bitmap_c(struct drbd_conf *mdev,
3308 		struct p_compressed_bm *p,
3309 		struct bm_xfer_ctx *c)
3310 {
3311 	if (DCBP_get_code(p) == RLE_VLI_Bits)
3312 		return recv_bm_rle_bits(mdev, p, c);
3313 
3314 	/* other variants had been implemented for evaluation,
3315 	 * but have been dropped as this one turned out to be "best"
3316 	 * during all our tests. */
3317 
3318 	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3319 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3320 	return FAILED;
3321 }
3322 
3323 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3324 		const char *direction, struct bm_xfer_ctx *c)
3325 {
3326 	/* what would it take to transfer it "plaintext" */
3327 	unsigned plain = sizeof(struct p_header) *
3328 		((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3329 		+ c->bm_words * sizeof(long);
3330 	unsigned total = c->bytes[0] + c->bytes[1];
3331 	unsigned r;
3332 
3333 	/* total can not be zero. but just in case: */
3334 	if (total == 0)
3335 		return;
3336 
3337 	/* don't report if not compressed */
3338 	if (total >= plain)
3339 		return;
3340 
3341 	/* total < plain. check for overflow, still */
3342 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3343 		                    : (1000 * total / plain);
3344 
3345 	if (r > 1000)
3346 		r = 1000;
3347 
3348 	r = 1000 - r;
3349 	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3350 	     "total %u; compression: %u.%u%%\n",
3351 			direction,
3352 			c->bytes[1], c->packets[1],
3353 			c->bytes[0], c->packets[0],
3354 			total, r/10, r % 10);
3355 }
3356 
3357 /* Since we are processing the bitfield from lower addresses to higher,
3358    it does not matter if the process it in 32 bit chunks or 64 bit
3359    chunks as long as it is little endian. (Understand it as byte stream,
3360    beginning with the lowest byte...) If we would use big endian
3361    we would need to process it from the highest address to the lowest,
3362    in order to be agnostic to the 32 vs 64 bits issue.
3363 
3364    returns 0 on failure, 1 if we successfully received it. */
3365 static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3366 {
3367 	struct bm_xfer_ctx c;
3368 	void *buffer;
3369 	enum receive_bitmap_ret ret;
3370 	int ok = FALSE;
3371 
3372 	wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3373 
3374 	drbd_bm_lock(mdev, "receive bitmap");
3375 
3376 	/* maybe we should use some per thread scratch page,
3377 	 * and allocate that during initial device creation? */
3378 	buffer	 = (unsigned long *) __get_free_page(GFP_NOIO);
3379 	if (!buffer) {
3380 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3381 		goto out;
3382 	}
3383 
3384 	c = (struct bm_xfer_ctx) {
3385 		.bm_bits = drbd_bm_bits(mdev),
3386 		.bm_words = drbd_bm_words(mdev),
3387 	};
3388 
3389 	do {
3390 		if (h->command == P_BITMAP) {
3391 			ret = receive_bitmap_plain(mdev, h, buffer, &c);
3392 		} else if (h->command == P_COMPRESSED_BITMAP) {
3393 			/* MAYBE: sanity check that we speak proto >= 90,
3394 			 * and the feature is enabled! */
3395 			struct p_compressed_bm *p;
3396 
3397 			if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3398 				dev_err(DEV, "ReportCBitmap packet too large\n");
3399 				goto out;
3400 			}
3401 			/* use the page buff */
3402 			p = buffer;
3403 			memcpy(p, h, sizeof(*h));
3404 			if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3405 				goto out;
3406 			if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3407 				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3408 				return FAILED;
3409 			}
3410 			ret = decode_bitmap_c(mdev, p, &c);
3411 		} else {
3412 			dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3413 			goto out;
3414 		}
3415 
3416 		c.packets[h->command == P_BITMAP]++;
3417 		c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3418 
3419 		if (ret != OK)
3420 			break;
3421 
3422 		if (!drbd_recv_header(mdev, h))
3423 			goto out;
3424 	} while (ret == OK);
3425 	if (ret == FAILED)
3426 		goto out;
3427 
3428 	INFO_bm_xfer_stats(mdev, "receive", &c);
3429 
3430 	if (mdev->state.conn == C_WF_BITMAP_T) {
3431 		ok = !drbd_send_bitmap(mdev);
3432 		if (!ok)
3433 			goto out;
3434 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3435 		ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3436 		D_ASSERT(ok == SS_SUCCESS);
3437 	} else if (mdev->state.conn != C_WF_BITMAP_S) {
3438 		/* admin may have requested C_DISCONNECTING,
3439 		 * other threads may have noticed network errors */
3440 		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3441 		    drbd_conn_str(mdev->state.conn));
3442 	}
3443 
3444 	ok = TRUE;
3445  out:
3446 	drbd_bm_unlock(mdev);
3447 	if (ok && mdev->state.conn == C_WF_BITMAP_S)
3448 		drbd_start_resync(mdev, C_SYNC_SOURCE);
3449 	free_page((unsigned long) buffer);
3450 	return ok;
3451 }
3452 
3453 static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3454 {
3455 	/* TODO zero copy sink :) */
3456 	static char sink[128];
3457 	int size, want, r;
3458 
3459 	dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3460 	     h->command, h->length);
3461 
3462 	size = h->length;
3463 	while (size > 0) {
3464 		want = min_t(int, size, sizeof(sink));
3465 		r = drbd_recv(mdev, sink, want);
3466 		ERR_IF(r <= 0) break;
3467 		size -= r;
3468 	}
3469 	return size == 0;
3470 }
3471 
3472 static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3473 {
3474 	if (mdev->state.disk >= D_INCONSISTENT)
3475 		drbd_kick_lo(mdev);
3476 
3477 	/* Make sure we've acked all the TCP data associated
3478 	 * with the data requests being unplugged */
3479 	drbd_tcp_quickack(mdev->data.socket);
3480 
3481 	return TRUE;
3482 }
3483 
3484 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3485 
3486 static drbd_cmd_handler_f drbd_default_handler[] = {
3487 	[P_DATA]	    = receive_Data,
3488 	[P_DATA_REPLY]	    = receive_DataReply,
3489 	[P_RS_DATA_REPLY]   = receive_RSDataReply,
3490 	[P_BARRIER]	    = receive_Barrier,
3491 	[P_BITMAP]	    = receive_bitmap,
3492 	[P_COMPRESSED_BITMAP]    = receive_bitmap,
3493 	[P_UNPLUG_REMOTE]   = receive_UnplugRemote,
3494 	[P_DATA_REQUEST]    = receive_DataRequest,
3495 	[P_RS_DATA_REQUEST] = receive_DataRequest,
3496 	[P_SYNC_PARAM]	    = receive_SyncParam,
3497 	[P_SYNC_PARAM89]	   = receive_SyncParam,
3498 	[P_PROTOCOL]        = receive_protocol,
3499 	[P_UUIDS]	    = receive_uuids,
3500 	[P_SIZES]	    = receive_sizes,
3501 	[P_STATE]	    = receive_state,
3502 	[P_STATE_CHG_REQ]   = receive_req_state,
3503 	[P_SYNC_UUID]       = receive_sync_uuid,
3504 	[P_OV_REQUEST]      = receive_DataRequest,
3505 	[P_OV_REPLY]        = receive_DataRequest,
3506 	[P_CSUM_RS_REQUEST]    = receive_DataRequest,
3507 	/* anything missing from this table is in
3508 	 * the asender_tbl, see get_asender_cmd */
3509 	[P_MAX_CMD]	    = NULL,
3510 };
3511 
3512 static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3513 static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3514 
3515 static void drbdd(struct drbd_conf *mdev)
3516 {
3517 	drbd_cmd_handler_f handler;
3518 	struct p_header *header = &mdev->data.rbuf.header;
3519 
3520 	while (get_t_state(&mdev->receiver) == Running) {
3521 		drbd_thread_current_set_cpu(mdev);
3522 		if (!drbd_recv_header(mdev, header))
3523 			break;
3524 
3525 		if (header->command < P_MAX_CMD)
3526 			handler = drbd_cmd_handler[header->command];
3527 		else if (P_MAY_IGNORE < header->command
3528 		     && header->command < P_MAX_OPT_CMD)
3529 			handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3530 		else if (header->command > P_MAX_OPT_CMD)
3531 			handler = receive_skip;
3532 		else
3533 			handler = NULL;
3534 
3535 		if (unlikely(!handler)) {
3536 			dev_err(DEV, "unknown packet type %d, l: %d!\n",
3537 			    header->command, header->length);
3538 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3539 			break;
3540 		}
3541 		if (unlikely(!handler(mdev, header))) {
3542 			dev_err(DEV, "error receiving %s, l: %d!\n",
3543 			    cmdname(header->command), header->length);
3544 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3545 			break;
3546 		}
3547 
3548 		trace_drbd_packet(mdev, mdev->data.socket, 2, &mdev->data.rbuf,
3549 				__FILE__, __LINE__);
3550 	}
3551 }
3552 
3553 static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3554 {
3555 	struct hlist_head *slot;
3556 	struct hlist_node *pos;
3557 	struct hlist_node *tmp;
3558 	struct drbd_request *req;
3559 	int i;
3560 
3561 	/*
3562 	 * Application READ requests
3563 	 */
3564 	spin_lock_irq(&mdev->req_lock);
3565 	for (i = 0; i < APP_R_HSIZE; i++) {
3566 		slot = mdev->app_reads_hash+i;
3567 		hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3568 			/* it may (but should not any longer!)
3569 			 * be on the work queue; if that assert triggers,
3570 			 * we need to also grab the
3571 			 * spin_lock_irq(&mdev->data.work.q_lock);
3572 			 * and list_del_init here. */
3573 			D_ASSERT(list_empty(&req->w.list));
3574 			/* It would be nice to complete outside of spinlock.
3575 			 * But this is easier for now. */
3576 			_req_mod(req, connection_lost_while_pending);
3577 		}
3578 	}
3579 	for (i = 0; i < APP_R_HSIZE; i++)
3580 		if (!hlist_empty(mdev->app_reads_hash+i))
3581 			dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3582 				"%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3583 
3584 	memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3585 	spin_unlock_irq(&mdev->req_lock);
3586 }
3587 
3588 void drbd_flush_workqueue(struct drbd_conf *mdev)
3589 {
3590 	struct drbd_wq_barrier barr;
3591 
3592 	barr.w.cb = w_prev_work_done;
3593 	init_completion(&barr.done);
3594 	drbd_queue_work(&mdev->data.work, &barr.w);
3595 	wait_for_completion(&barr.done);
3596 }
3597 
3598 static void drbd_disconnect(struct drbd_conf *mdev)
3599 {
3600 	enum drbd_fencing_p fp;
3601 	union drbd_state os, ns;
3602 	int rv = SS_UNKNOWN_ERROR;
3603 	unsigned int i;
3604 
3605 	if (mdev->state.conn == C_STANDALONE)
3606 		return;
3607 	if (mdev->state.conn >= C_WF_CONNECTION)
3608 		dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3609 				drbd_conn_str(mdev->state.conn));
3610 
3611 	/* asender does not clean up anything. it must not interfere, either */
3612 	drbd_thread_stop(&mdev->asender);
3613 
3614 	mutex_lock(&mdev->data.mutex);
3615 	drbd_free_sock(mdev);
3616 	mutex_unlock(&mdev->data.mutex);
3617 
3618 	spin_lock_irq(&mdev->req_lock);
3619 	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3620 	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3621 	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3622 	spin_unlock_irq(&mdev->req_lock);
3623 
3624 	/* We do not have data structures that would allow us to
3625 	 * get the rs_pending_cnt down to 0 again.
3626 	 *  * On C_SYNC_TARGET we do not have any data structures describing
3627 	 *    the pending RSDataRequest's we have sent.
3628 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
3629 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3630 	 *  And no, it is not the sum of the reference counts in the
3631 	 *  resync_LRU. The resync_LRU tracks the whole operation including
3632 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3633 	 *  on the fly. */
3634 	drbd_rs_cancel_all(mdev);
3635 	mdev->rs_total = 0;
3636 	mdev->rs_failed = 0;
3637 	atomic_set(&mdev->rs_pending_cnt, 0);
3638 	wake_up(&mdev->misc_wait);
3639 
3640 	/* make sure syncer is stopped and w_resume_next_sg queued */
3641 	del_timer_sync(&mdev->resync_timer);
3642 	set_bit(STOP_SYNC_TIMER, &mdev->flags);
3643 	resync_timer_fn((unsigned long)mdev);
3644 
3645 	/* so we can be sure that all remote or resync reads
3646 	 * made it at least to net_ee */
3647 	wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
3648 
3649 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3650 	 * w_make_resync_request etc. which may still be on the worker queue
3651 	 * to be "canceled" */
3652 	drbd_flush_workqueue(mdev);
3653 
3654 	/* This also does reclaim_net_ee().  If we do this too early, we might
3655 	 * miss some resync ee and pages.*/
3656 	drbd_process_done_ee(mdev);
3657 
3658 	kfree(mdev->p_uuid);
3659 	mdev->p_uuid = NULL;
3660 
3661 	if (!mdev->state.susp)
3662 		tl_clear(mdev);
3663 
3664 	drbd_fail_pending_reads(mdev);
3665 
3666 	dev_info(DEV, "Connection closed\n");
3667 
3668 	drbd_md_sync(mdev);
3669 
3670 	fp = FP_DONT_CARE;
3671 	if (get_ldev(mdev)) {
3672 		fp = mdev->ldev->dc.fencing;
3673 		put_ldev(mdev);
3674 	}
3675 
3676 	if (mdev->state.role == R_PRIMARY) {
3677 		if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3678 			enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3679 			drbd_request_state(mdev, NS(pdsk, nps));
3680 		}
3681 	}
3682 
3683 	spin_lock_irq(&mdev->req_lock);
3684 	os = mdev->state;
3685 	if (os.conn >= C_UNCONNECTED) {
3686 		/* Do not restart in case we are C_DISCONNECTING */
3687 		ns = os;
3688 		ns.conn = C_UNCONNECTED;
3689 		rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3690 	}
3691 	spin_unlock_irq(&mdev->req_lock);
3692 
3693 	if (os.conn == C_DISCONNECTING) {
3694 		struct hlist_head *h;
3695 		wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3696 
3697 		/* we must not free the tl_hash
3698 		 * while application io is still on the fly */
3699 		wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3700 
3701 		spin_lock_irq(&mdev->req_lock);
3702 		/* paranoia code */
3703 		for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3704 			if (h->first)
3705 				dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3706 						(int)(h - mdev->ee_hash), h->first);
3707 		kfree(mdev->ee_hash);
3708 		mdev->ee_hash = NULL;
3709 		mdev->ee_hash_s = 0;
3710 
3711 		/* paranoia code */
3712 		for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3713 			if (h->first)
3714 				dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3715 						(int)(h - mdev->tl_hash), h->first);
3716 		kfree(mdev->tl_hash);
3717 		mdev->tl_hash = NULL;
3718 		mdev->tl_hash_s = 0;
3719 		spin_unlock_irq(&mdev->req_lock);
3720 
3721 		crypto_free_hash(mdev->cram_hmac_tfm);
3722 		mdev->cram_hmac_tfm = NULL;
3723 
3724 		kfree(mdev->net_conf);
3725 		mdev->net_conf = NULL;
3726 		drbd_request_state(mdev, NS(conn, C_STANDALONE));
3727 	}
3728 
3729 	/* tcp_close and release of sendpage pages can be deferred.  I don't
3730 	 * want to use SO_LINGER, because apparently it can be deferred for
3731 	 * more than 20 seconds (longest time I checked).
3732 	 *
3733 	 * Actually we don't care for exactly when the network stack does its
3734 	 * put_page(), but release our reference on these pages right here.
3735 	 */
3736 	i = drbd_release_ee(mdev, &mdev->net_ee);
3737 	if (i)
3738 		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3739 	i = atomic_read(&mdev->pp_in_use);
3740 	if (i)
3741 		dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
3742 
3743 	D_ASSERT(list_empty(&mdev->read_ee));
3744 	D_ASSERT(list_empty(&mdev->active_ee));
3745 	D_ASSERT(list_empty(&mdev->sync_ee));
3746 	D_ASSERT(list_empty(&mdev->done_ee));
3747 
3748 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3749 	atomic_set(&mdev->current_epoch->epoch_size, 0);
3750 	D_ASSERT(list_empty(&mdev->current_epoch->list));
3751 }
3752 
3753 /*
3754  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3755  * we can agree on is stored in agreed_pro_version.
3756  *
3757  * feature flags and the reserved array should be enough room for future
3758  * enhancements of the handshake protocol, and possible plugins...
3759  *
3760  * for now, they are expected to be zero, but ignored.
3761  */
3762 static int drbd_send_handshake(struct drbd_conf *mdev)
3763 {
3764 	/* ASSERT current == mdev->receiver ... */
3765 	struct p_handshake *p = &mdev->data.sbuf.handshake;
3766 	int ok;
3767 
3768 	if (mutex_lock_interruptible(&mdev->data.mutex)) {
3769 		dev_err(DEV, "interrupted during initial handshake\n");
3770 		return 0; /* interrupted. not ok. */
3771 	}
3772 
3773 	if (mdev->data.socket == NULL) {
3774 		mutex_unlock(&mdev->data.mutex);
3775 		return 0;
3776 	}
3777 
3778 	memset(p, 0, sizeof(*p));
3779 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3780 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3781 	ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3782 			     (struct p_header *)p, sizeof(*p), 0 );
3783 	mutex_unlock(&mdev->data.mutex);
3784 	return ok;
3785 }
3786 
3787 /*
3788  * return values:
3789  *   1 yes, we have a valid connection
3790  *   0 oops, did not work out, please try again
3791  *  -1 peer talks different language,
3792  *     no point in trying again, please go standalone.
3793  */
3794 static int drbd_do_handshake(struct drbd_conf *mdev)
3795 {
3796 	/* ASSERT current == mdev->receiver ... */
3797 	struct p_handshake *p = &mdev->data.rbuf.handshake;
3798 	const int expect = sizeof(struct p_handshake)
3799 			  -sizeof(struct p_header);
3800 	int rv;
3801 
3802 	rv = drbd_send_handshake(mdev);
3803 	if (!rv)
3804 		return 0;
3805 
3806 	rv = drbd_recv_header(mdev, &p->head);
3807 	if (!rv)
3808 		return 0;
3809 
3810 	if (p->head.command != P_HAND_SHAKE) {
3811 		dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3812 		     cmdname(p->head.command), p->head.command);
3813 		return -1;
3814 	}
3815 
3816 	if (p->head.length != expect) {
3817 		dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3818 		     expect, p->head.length);
3819 		return -1;
3820 	}
3821 
3822 	rv = drbd_recv(mdev, &p->head.payload, expect);
3823 
3824 	if (rv != expect) {
3825 		dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3826 		return 0;
3827 	}
3828 
3829 	trace_drbd_packet(mdev, mdev->data.socket, 2, &mdev->data.rbuf,
3830 			__FILE__, __LINE__);
3831 
3832 	p->protocol_min = be32_to_cpu(p->protocol_min);
3833 	p->protocol_max = be32_to_cpu(p->protocol_max);
3834 	if (p->protocol_max == 0)
3835 		p->protocol_max = p->protocol_min;
3836 
3837 	if (PRO_VERSION_MAX < p->protocol_min ||
3838 	    PRO_VERSION_MIN > p->protocol_max)
3839 		goto incompat;
3840 
3841 	mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3842 
3843 	dev_info(DEV, "Handshake successful: "
3844 	     "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3845 
3846 	return 1;
3847 
3848  incompat:
3849 	dev_err(DEV, "incompatible DRBD dialects: "
3850 	    "I support %d-%d, peer supports %d-%d\n",
3851 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
3852 	    p->protocol_min, p->protocol_max);
3853 	return -1;
3854 }
3855 
3856 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3857 static int drbd_do_auth(struct drbd_conf *mdev)
3858 {
3859 	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3860 	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3861 	return 0;
3862 }
3863 #else
3864 #define CHALLENGE_LEN 64
3865 static int drbd_do_auth(struct drbd_conf *mdev)
3866 {
3867 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
3868 	struct scatterlist sg;
3869 	char *response = NULL;
3870 	char *right_response = NULL;
3871 	char *peers_ch = NULL;
3872 	struct p_header p;
3873 	unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3874 	unsigned int resp_size;
3875 	struct hash_desc desc;
3876 	int rv;
3877 
3878 	desc.tfm = mdev->cram_hmac_tfm;
3879 	desc.flags = 0;
3880 
3881 	rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3882 				(u8 *)mdev->net_conf->shared_secret, key_len);
3883 	if (rv) {
3884 		dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
3885 		rv = 0;
3886 		goto fail;
3887 	}
3888 
3889 	get_random_bytes(my_challenge, CHALLENGE_LEN);
3890 
3891 	rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
3892 	if (!rv)
3893 		goto fail;
3894 
3895 	rv = drbd_recv_header(mdev, &p);
3896 	if (!rv)
3897 		goto fail;
3898 
3899 	if (p.command != P_AUTH_CHALLENGE) {
3900 		dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
3901 		    cmdname(p.command), p.command);
3902 		rv = 0;
3903 		goto fail;
3904 	}
3905 
3906 	if (p.length > CHALLENGE_LEN*2) {
3907 		dev_err(DEV, "expected AuthChallenge payload too big.\n");
3908 		rv = 0;
3909 		goto fail;
3910 	}
3911 
3912 	peers_ch = kmalloc(p.length, GFP_NOIO);
3913 	if (peers_ch == NULL) {
3914 		dev_err(DEV, "kmalloc of peers_ch failed\n");
3915 		rv = 0;
3916 		goto fail;
3917 	}
3918 
3919 	rv = drbd_recv(mdev, peers_ch, p.length);
3920 
3921 	if (rv != p.length) {
3922 		dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
3923 		rv = 0;
3924 		goto fail;
3925 	}
3926 
3927 	resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
3928 	response = kmalloc(resp_size, GFP_NOIO);
3929 	if (response == NULL) {
3930 		dev_err(DEV, "kmalloc of response failed\n");
3931 		rv = 0;
3932 		goto fail;
3933 	}
3934 
3935 	sg_init_table(&sg, 1);
3936 	sg_set_buf(&sg, peers_ch, p.length);
3937 
3938 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
3939 	if (rv) {
3940 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3941 		rv = 0;
3942 		goto fail;
3943 	}
3944 
3945 	rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
3946 	if (!rv)
3947 		goto fail;
3948 
3949 	rv = drbd_recv_header(mdev, &p);
3950 	if (!rv)
3951 		goto fail;
3952 
3953 	if (p.command != P_AUTH_RESPONSE) {
3954 		dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
3955 		    cmdname(p.command), p.command);
3956 		rv = 0;
3957 		goto fail;
3958 	}
3959 
3960 	if (p.length != resp_size) {
3961 		dev_err(DEV, "expected AuthResponse payload of wrong size\n");
3962 		rv = 0;
3963 		goto fail;
3964 	}
3965 
3966 	rv = drbd_recv(mdev, response , resp_size);
3967 
3968 	if (rv != resp_size) {
3969 		dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
3970 		rv = 0;
3971 		goto fail;
3972 	}
3973 
3974 	right_response = kmalloc(resp_size, GFP_NOIO);
3975 	if (response == NULL) {
3976 		dev_err(DEV, "kmalloc of right_response failed\n");
3977 		rv = 0;
3978 		goto fail;
3979 	}
3980 
3981 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
3982 
3983 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
3984 	if (rv) {
3985 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3986 		rv = 0;
3987 		goto fail;
3988 	}
3989 
3990 	rv = !memcmp(response, right_response, resp_size);
3991 
3992 	if (rv)
3993 		dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
3994 		     resp_size, mdev->net_conf->cram_hmac_alg);
3995 
3996  fail:
3997 	kfree(peers_ch);
3998 	kfree(response);
3999 	kfree(right_response);
4000 
4001 	return rv;
4002 }
4003 #endif
4004 
4005 int drbdd_init(struct drbd_thread *thi)
4006 {
4007 	struct drbd_conf *mdev = thi->mdev;
4008 	unsigned int minor = mdev_to_minor(mdev);
4009 	int h;
4010 
4011 	sprintf(current->comm, "drbd%d_receiver", minor);
4012 
4013 	dev_info(DEV, "receiver (re)started\n");
4014 
4015 	do {
4016 		h = drbd_connect(mdev);
4017 		if (h == 0) {
4018 			drbd_disconnect(mdev);
4019 			__set_current_state(TASK_INTERRUPTIBLE);
4020 			schedule_timeout(HZ);
4021 		}
4022 		if (h == -1) {
4023 			dev_warn(DEV, "Discarding network configuration.\n");
4024 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4025 		}
4026 	} while (h == 0);
4027 
4028 	if (h > 0) {
4029 		if (get_net_conf(mdev)) {
4030 			drbdd(mdev);
4031 			put_net_conf(mdev);
4032 		}
4033 	}
4034 
4035 	drbd_disconnect(mdev);
4036 
4037 	dev_info(DEV, "receiver terminated\n");
4038 	return 0;
4039 }
4040 
4041 /* ********* acknowledge sender ******** */
4042 
4043 static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4044 {
4045 	struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4046 
4047 	int retcode = be32_to_cpu(p->retcode);
4048 
4049 	if (retcode >= SS_SUCCESS) {
4050 		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4051 	} else {
4052 		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4053 		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4054 		    drbd_set_st_err_str(retcode), retcode);
4055 	}
4056 	wake_up(&mdev->state_wait);
4057 
4058 	return TRUE;
4059 }
4060 
4061 static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4062 {
4063 	return drbd_send_ping_ack(mdev);
4064 
4065 }
4066 
4067 static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4068 {
4069 	/* restore idle timeout */
4070 	mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4071 
4072 	return TRUE;
4073 }
4074 
4075 static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4076 {
4077 	struct p_block_ack *p = (struct p_block_ack *)h;
4078 	sector_t sector = be64_to_cpu(p->sector);
4079 	int blksize = be32_to_cpu(p->blksize);
4080 
4081 	D_ASSERT(mdev->agreed_pro_version >= 89);
4082 
4083 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4084 
4085 	drbd_rs_complete_io(mdev, sector);
4086 	drbd_set_in_sync(mdev, sector, blksize);
4087 	/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4088 	mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4089 	dec_rs_pending(mdev);
4090 
4091 	return TRUE;
4092 }
4093 
4094 /* when we receive the ACK for a write request,
4095  * verify that we actually know about it */
4096 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4097 	u64 id, sector_t sector)
4098 {
4099 	struct hlist_head *slot = tl_hash_slot(mdev, sector);
4100 	struct hlist_node *n;
4101 	struct drbd_request *req;
4102 
4103 	hlist_for_each_entry(req, n, slot, colision) {
4104 		if ((unsigned long)req == (unsigned long)id) {
4105 			if (req->sector != sector) {
4106 				dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4107 				    "wrong sector (%llus versus %llus)\n", req,
4108 				    (unsigned long long)req->sector,
4109 				    (unsigned long long)sector);
4110 				break;
4111 			}
4112 			return req;
4113 		}
4114 	}
4115 	dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4116 		(void *)(unsigned long)id, (unsigned long long)sector);
4117 	return NULL;
4118 }
4119 
4120 typedef struct drbd_request *(req_validator_fn)
4121 	(struct drbd_conf *mdev, u64 id, sector_t sector);
4122 
4123 static int validate_req_change_req_state(struct drbd_conf *mdev,
4124 	u64 id, sector_t sector, req_validator_fn validator,
4125 	const char *func, enum drbd_req_event what)
4126 {
4127 	struct drbd_request *req;
4128 	struct bio_and_error m;
4129 
4130 	spin_lock_irq(&mdev->req_lock);
4131 	req = validator(mdev, id, sector);
4132 	if (unlikely(!req)) {
4133 		spin_unlock_irq(&mdev->req_lock);
4134 		dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4135 		return FALSE;
4136 	}
4137 	__req_mod(req, what, &m);
4138 	spin_unlock_irq(&mdev->req_lock);
4139 
4140 	if (m.bio)
4141 		complete_master_bio(mdev, &m);
4142 	return TRUE;
4143 }
4144 
4145 static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4146 {
4147 	struct p_block_ack *p = (struct p_block_ack *)h;
4148 	sector_t sector = be64_to_cpu(p->sector);
4149 	int blksize = be32_to_cpu(p->blksize);
4150 	enum drbd_req_event what;
4151 
4152 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4153 
4154 	if (is_syncer_block_id(p->block_id)) {
4155 		drbd_set_in_sync(mdev, sector, blksize);
4156 		dec_rs_pending(mdev);
4157 		return TRUE;
4158 	}
4159 	switch (be16_to_cpu(h->command)) {
4160 	case P_RS_WRITE_ACK:
4161 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4162 		what = write_acked_by_peer_and_sis;
4163 		break;
4164 	case P_WRITE_ACK:
4165 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4166 		what = write_acked_by_peer;
4167 		break;
4168 	case P_RECV_ACK:
4169 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4170 		what = recv_acked_by_peer;
4171 		break;
4172 	case P_DISCARD_ACK:
4173 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4174 		what = conflict_discarded_by_peer;
4175 		break;
4176 	default:
4177 		D_ASSERT(0);
4178 		return FALSE;
4179 	}
4180 
4181 	return validate_req_change_req_state(mdev, p->block_id, sector,
4182 		_ack_id_to_req, __func__ , what);
4183 }
4184 
4185 static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4186 {
4187 	struct p_block_ack *p = (struct p_block_ack *)h;
4188 	sector_t sector = be64_to_cpu(p->sector);
4189 
4190 	if (__ratelimit(&drbd_ratelimit_state))
4191 		dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4192 
4193 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4194 
4195 	if (is_syncer_block_id(p->block_id)) {
4196 		int size = be32_to_cpu(p->blksize);
4197 		dec_rs_pending(mdev);
4198 		drbd_rs_failed_io(mdev, sector, size);
4199 		return TRUE;
4200 	}
4201 	return validate_req_change_req_state(mdev, p->block_id, sector,
4202 		_ack_id_to_req, __func__ , neg_acked);
4203 }
4204 
4205 static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4206 {
4207 	struct p_block_ack *p = (struct p_block_ack *)h;
4208 	sector_t sector = be64_to_cpu(p->sector);
4209 
4210 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4211 	dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4212 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
4213 
4214 	return validate_req_change_req_state(mdev, p->block_id, sector,
4215 		_ar_id_to_req, __func__ , neg_acked);
4216 }
4217 
4218 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4219 {
4220 	sector_t sector;
4221 	int size;
4222 	struct p_block_ack *p = (struct p_block_ack *)h;
4223 
4224 	sector = be64_to_cpu(p->sector);
4225 	size = be32_to_cpu(p->blksize);
4226 	D_ASSERT(p->block_id == ID_SYNCER);
4227 
4228 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4229 
4230 	dec_rs_pending(mdev);
4231 
4232 	if (get_ldev_if_state(mdev, D_FAILED)) {
4233 		drbd_rs_complete_io(mdev, sector);
4234 		drbd_rs_failed_io(mdev, sector, size);
4235 		put_ldev(mdev);
4236 	}
4237 
4238 	return TRUE;
4239 }
4240 
4241 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4242 {
4243 	struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4244 
4245 	tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4246 
4247 	return TRUE;
4248 }
4249 
4250 static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4251 {
4252 	struct p_block_ack *p = (struct p_block_ack *)h;
4253 	struct drbd_work *w;
4254 	sector_t sector;
4255 	int size;
4256 
4257 	sector = be64_to_cpu(p->sector);
4258 	size = be32_to_cpu(p->blksize);
4259 
4260 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4261 
4262 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4263 		drbd_ov_oos_found(mdev, sector, size);
4264 	else
4265 		ov_oos_print(mdev);
4266 
4267 	drbd_rs_complete_io(mdev, sector);
4268 	dec_rs_pending(mdev);
4269 
4270 	if (--mdev->ov_left == 0) {
4271 		w = kmalloc(sizeof(*w), GFP_NOIO);
4272 		if (w) {
4273 			w->cb = w_ov_finished;
4274 			drbd_queue_work_front(&mdev->data.work, w);
4275 		} else {
4276 			dev_err(DEV, "kmalloc(w) failed.");
4277 			ov_oos_print(mdev);
4278 			drbd_resync_finished(mdev);
4279 		}
4280 	}
4281 	return TRUE;
4282 }
4283 
4284 struct asender_cmd {
4285 	size_t pkt_size;
4286 	int (*process)(struct drbd_conf *mdev, struct p_header *h);
4287 };
4288 
4289 static struct asender_cmd *get_asender_cmd(int cmd)
4290 {
4291 	static struct asender_cmd asender_tbl[] = {
4292 		/* anything missing from this table is in
4293 		 * the drbd_cmd_handler (drbd_default_handler) table,
4294 		 * see the beginning of drbdd() */
4295 	[P_PING]	    = { sizeof(struct p_header), got_Ping },
4296 	[P_PING_ACK]	    = { sizeof(struct p_header), got_PingAck },
4297 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4298 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4299 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4300 	[P_DISCARD_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4301 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
4302 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
4303 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4304 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
4305 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
4306 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4307 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4308 	[P_MAX_CMD]	    = { 0, NULL },
4309 	};
4310 	if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4311 		return NULL;
4312 	return &asender_tbl[cmd];
4313 }
4314 
4315 int drbd_asender(struct drbd_thread *thi)
4316 {
4317 	struct drbd_conf *mdev = thi->mdev;
4318 	struct p_header *h = &mdev->meta.rbuf.header;
4319 	struct asender_cmd *cmd = NULL;
4320 
4321 	int rv, len;
4322 	void *buf    = h;
4323 	int received = 0;
4324 	int expect   = sizeof(struct p_header);
4325 	int empty;
4326 
4327 	sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4328 
4329 	current->policy = SCHED_RR;  /* Make this a realtime task! */
4330 	current->rt_priority = 2;    /* more important than all other tasks */
4331 
4332 	while (get_t_state(thi) == Running) {
4333 		drbd_thread_current_set_cpu(mdev);
4334 		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4335 			ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4336 			mdev->meta.socket->sk->sk_rcvtimeo =
4337 				mdev->net_conf->ping_timeo*HZ/10;
4338 		}
4339 
4340 		/* conditionally cork;
4341 		 * it may hurt latency if we cork without much to send */
4342 		if (!mdev->net_conf->no_cork &&
4343 			3 < atomic_read(&mdev->unacked_cnt))
4344 			drbd_tcp_cork(mdev->meta.socket);
4345 		while (1) {
4346 			clear_bit(SIGNAL_ASENDER, &mdev->flags);
4347 			flush_signals(current);
4348 			if (!drbd_process_done_ee(mdev)) {
4349 				dev_err(DEV, "process_done_ee() = NOT_OK\n");
4350 				goto reconnect;
4351 			}
4352 			/* to avoid race with newly queued ACKs */
4353 			set_bit(SIGNAL_ASENDER, &mdev->flags);
4354 			spin_lock_irq(&mdev->req_lock);
4355 			empty = list_empty(&mdev->done_ee);
4356 			spin_unlock_irq(&mdev->req_lock);
4357 			/* new ack may have been queued right here,
4358 			 * but then there is also a signal pending,
4359 			 * and we start over... */
4360 			if (empty)
4361 				break;
4362 		}
4363 		/* but unconditionally uncork unless disabled */
4364 		if (!mdev->net_conf->no_cork)
4365 			drbd_tcp_uncork(mdev->meta.socket);
4366 
4367 		/* short circuit, recv_msg would return EINTR anyways. */
4368 		if (signal_pending(current))
4369 			continue;
4370 
4371 		rv = drbd_recv_short(mdev, mdev->meta.socket,
4372 				     buf, expect-received, 0);
4373 		clear_bit(SIGNAL_ASENDER, &mdev->flags);
4374 
4375 		flush_signals(current);
4376 
4377 		/* Note:
4378 		 * -EINTR	 (on meta) we got a signal
4379 		 * -EAGAIN	 (on meta) rcvtimeo expired
4380 		 * -ECONNRESET	 other side closed the connection
4381 		 * -ERESTARTSYS  (on data) we got a signal
4382 		 * rv <  0	 other than above: unexpected error!
4383 		 * rv == expected: full header or command
4384 		 * rv <  expected: "woken" by signal during receive
4385 		 * rv == 0	 : "connection shut down by peer"
4386 		 */
4387 		if (likely(rv > 0)) {
4388 			received += rv;
4389 			buf	 += rv;
4390 		} else if (rv == 0) {
4391 			dev_err(DEV, "meta connection shut down by peer.\n");
4392 			goto reconnect;
4393 		} else if (rv == -EAGAIN) {
4394 			if (mdev->meta.socket->sk->sk_rcvtimeo ==
4395 			    mdev->net_conf->ping_timeo*HZ/10) {
4396 				dev_err(DEV, "PingAck did not arrive in time.\n");
4397 				goto reconnect;
4398 			}
4399 			set_bit(SEND_PING, &mdev->flags);
4400 			continue;
4401 		} else if (rv == -EINTR) {
4402 			continue;
4403 		} else {
4404 			dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4405 			goto reconnect;
4406 		}
4407 
4408 		if (received == expect && cmd == NULL) {
4409 			if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4410 				dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4411 				    (long)be32_to_cpu(h->magic),
4412 				    h->command, h->length);
4413 				goto reconnect;
4414 			}
4415 			cmd = get_asender_cmd(be16_to_cpu(h->command));
4416 			len = be16_to_cpu(h->length);
4417 			if (unlikely(cmd == NULL)) {
4418 				dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4419 				    (long)be32_to_cpu(h->magic),
4420 				    h->command, h->length);
4421 				goto disconnect;
4422 			}
4423 			expect = cmd->pkt_size;
4424 			ERR_IF(len != expect-sizeof(struct p_header)) {
4425 				trace_drbd_packet(mdev, mdev->meta.socket, 1, (void *)h, __FILE__, __LINE__);
4426 				goto reconnect;
4427 			}
4428 		}
4429 		if (received == expect) {
4430 			D_ASSERT(cmd != NULL);
4431 			trace_drbd_packet(mdev, mdev->meta.socket, 1, (void *)h, __FILE__, __LINE__);
4432 			if (!cmd->process(mdev, h))
4433 				goto reconnect;
4434 
4435 			buf	 = h;
4436 			received = 0;
4437 			expect	 = sizeof(struct p_header);
4438 			cmd	 = NULL;
4439 		}
4440 	}
4441 
4442 	if (0) {
4443 reconnect:
4444 		drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4445 	}
4446 	if (0) {
4447 disconnect:
4448 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4449 	}
4450 	clear_bit(SIGNAL_ASENDER, &mdev->flags);
4451 
4452 	D_ASSERT(mdev->state.conn < C_CONNECTED);
4453 	dev_info(DEV, "asender terminated\n");
4454 
4455 	return 0;
4456 }
4457