xref: /linux/drivers/block/drbd/drbd_receiver.c (revision ab8fafc2e1ecc0090f2c78902d3b992eec8b11f8)
1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/version.h>
32 #include <linux/drbd.h>
33 #include <linux/fs.h>
34 #include <linux/file.h>
35 #include <linux/in.h>
36 #include <linux/mm.h>
37 #include <linux/memcontrol.h>
38 #include <linux/mm_inline.h>
39 #include <linux/slab.h>
40 #include <linux/smp_lock.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/mm.h>
47 #include <linux/string.h>
48 #include <linux/scatterlist.h>
49 #include "drbd_int.h"
50 #include "drbd_tracing.h"
51 #include "drbd_req.h"
52 
53 #include "drbd_vli.h"
54 
55 struct flush_work {
56 	struct drbd_work w;
57 	struct drbd_epoch *epoch;
58 };
59 
60 enum finish_epoch {
61 	FE_STILL_LIVE,
62 	FE_DESTROYED,
63 	FE_RECYCLED,
64 };
65 
66 static int drbd_do_handshake(struct drbd_conf *mdev);
67 static int drbd_do_auth(struct drbd_conf *mdev);
68 
69 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
70 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
71 
72 static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
73 {
74 	struct drbd_epoch *prev;
75 	spin_lock(&mdev->epoch_lock);
76 	prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
77 	if (prev == epoch || prev == mdev->current_epoch)
78 		prev = NULL;
79 	spin_unlock(&mdev->epoch_lock);
80 	return prev;
81 }
82 
83 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
84 
85 static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
86 {
87 	struct page *page = NULL;
88 
89 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
90 	 * So what. It saves a spin_lock. */
91 	if (drbd_pp_vacant > 0) {
92 		spin_lock(&drbd_pp_lock);
93 		page = drbd_pp_pool;
94 		if (page) {
95 			drbd_pp_pool = (struct page *)page_private(page);
96 			set_page_private(page, 0); /* just to be polite */
97 			drbd_pp_vacant--;
98 		}
99 		spin_unlock(&drbd_pp_lock);
100 	}
101 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
102 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
103 	 * which in turn might block on the other node at this very place.  */
104 	if (!page)
105 		page = alloc_page(GFP_TRY);
106 	if (page)
107 		atomic_inc(&mdev->pp_in_use);
108 	return page;
109 }
110 
111 /* kick lower level device, if we have more than (arbitrary number)
112  * reference counts on it, which typically are locally submitted io
113  * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
114 static void maybe_kick_lo(struct drbd_conf *mdev)
115 {
116 	if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
117 		drbd_kick_lo(mdev);
118 }
119 
120 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
121 {
122 	struct drbd_epoch_entry *e;
123 	struct list_head *le, *tle;
124 
125 	/* The EEs are always appended to the end of the list. Since
126 	   they are sent in order over the wire, they have to finish
127 	   in order. As soon as we see the first not finished we can
128 	   stop to examine the list... */
129 
130 	list_for_each_safe(le, tle, &mdev->net_ee) {
131 		e = list_entry(le, struct drbd_epoch_entry, w.list);
132 		if (drbd_bio_has_active_page(e->private_bio))
133 			break;
134 		list_move(le, to_be_freed);
135 	}
136 }
137 
138 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
139 {
140 	LIST_HEAD(reclaimed);
141 	struct drbd_epoch_entry *e, *t;
142 
143 	maybe_kick_lo(mdev);
144 	spin_lock_irq(&mdev->req_lock);
145 	reclaim_net_ee(mdev, &reclaimed);
146 	spin_unlock_irq(&mdev->req_lock);
147 
148 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
149 		drbd_free_ee(mdev, e);
150 }
151 
152 /**
153  * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
154  * @mdev:	DRBD device.
155  * @retry:	whether or not to retry allocation forever (or until signalled)
156  *
157  * Tries to allocate a page, first from our own page pool, then from the
158  * kernel, unless this allocation would exceed the max_buffers setting.
159  * If @retry is non-zero, retry until DRBD frees a page somewhere else.
160  */
161 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
162 {
163 	struct page *page = NULL;
164 	DEFINE_WAIT(wait);
165 
166 	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
167 		page = drbd_pp_first_page_or_try_alloc(mdev);
168 		if (page)
169 			return page;
170 	}
171 
172 	for (;;) {
173 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
174 
175 		drbd_kick_lo_and_reclaim_net(mdev);
176 
177 		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
178 			page = drbd_pp_first_page_or_try_alloc(mdev);
179 			if (page)
180 				break;
181 		}
182 
183 		if (!retry)
184 			break;
185 
186 		if (signal_pending(current)) {
187 			dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
188 			break;
189 		}
190 
191 		schedule();
192 	}
193 	finish_wait(&drbd_pp_wait, &wait);
194 
195 	return page;
196 }
197 
198 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
199  * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
200 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
201 {
202 	int free_it;
203 
204 	spin_lock(&drbd_pp_lock);
205 	if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
206 		free_it = 1;
207 	} else {
208 		set_page_private(page, (unsigned long)drbd_pp_pool);
209 		drbd_pp_pool = page;
210 		drbd_pp_vacant++;
211 		free_it = 0;
212 	}
213 	spin_unlock(&drbd_pp_lock);
214 
215 	atomic_dec(&mdev->pp_in_use);
216 
217 	if (free_it)
218 		__free_page(page);
219 
220 	wake_up(&drbd_pp_wait);
221 }
222 
223 static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
224 {
225 	struct page *p_to_be_freed = NULL;
226 	struct page *page;
227 	struct bio_vec *bvec;
228 	int i;
229 
230 	spin_lock(&drbd_pp_lock);
231 	__bio_for_each_segment(bvec, bio, i, 0) {
232 		if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
233 			set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
234 			p_to_be_freed = bvec->bv_page;
235 		} else {
236 			set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
237 			drbd_pp_pool = bvec->bv_page;
238 			drbd_pp_vacant++;
239 		}
240 	}
241 	spin_unlock(&drbd_pp_lock);
242 	atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
243 
244 	while (p_to_be_freed) {
245 		page = p_to_be_freed;
246 		p_to_be_freed = (struct page *)page_private(page);
247 		set_page_private(page, 0); /* just to be polite */
248 		put_page(page);
249 	}
250 
251 	wake_up(&drbd_pp_wait);
252 }
253 
254 /*
255 You need to hold the req_lock:
256  _drbd_wait_ee_list_empty()
257 
258 You must not have the req_lock:
259  drbd_free_ee()
260  drbd_alloc_ee()
261  drbd_init_ee()
262  drbd_release_ee()
263  drbd_ee_fix_bhs()
264  drbd_process_done_ee()
265  drbd_clear_done_ee()
266  drbd_wait_ee_list_empty()
267 */
268 
269 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
270 				     u64 id,
271 				     sector_t sector,
272 				     unsigned int data_size,
273 				     gfp_t gfp_mask) __must_hold(local)
274 {
275 	struct request_queue *q;
276 	struct drbd_epoch_entry *e;
277 	struct page *page;
278 	struct bio *bio;
279 	unsigned int ds;
280 
281 	if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
282 		return NULL;
283 
284 	e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
285 	if (!e) {
286 		if (!(gfp_mask & __GFP_NOWARN))
287 			dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
288 		return NULL;
289 	}
290 
291 	bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
292 	if (!bio) {
293 		if (!(gfp_mask & __GFP_NOWARN))
294 			dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
295 		goto fail1;
296 	}
297 
298 	bio->bi_bdev = mdev->ldev->backing_bdev;
299 	bio->bi_sector = sector;
300 
301 	ds = data_size;
302 	while (ds) {
303 		page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
304 		if (!page) {
305 			if (!(gfp_mask & __GFP_NOWARN))
306 				dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
307 			goto fail2;
308 		}
309 		if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
310 			drbd_pp_free(mdev, page);
311 			dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
312 			    "data_size=%u,ds=%u) failed\n",
313 			    (unsigned long long)sector, data_size, ds);
314 
315 			q = bdev_get_queue(bio->bi_bdev);
316 			if (q->merge_bvec_fn) {
317 				struct bvec_merge_data bvm = {
318 					.bi_bdev = bio->bi_bdev,
319 					.bi_sector = bio->bi_sector,
320 					.bi_size = bio->bi_size,
321 					.bi_rw = bio->bi_rw,
322 				};
323 				int l = q->merge_bvec_fn(q, &bvm,
324 						&bio->bi_io_vec[bio->bi_vcnt]);
325 				dev_err(DEV, "merge_bvec_fn() = %d\n", l);
326 			}
327 
328 			/* dump more of the bio. */
329 			dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
330 			dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
331 			dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
332 			dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
333 
334 			goto fail2;
335 			break;
336 		}
337 		ds -= min_t(int, ds, PAGE_SIZE);
338 	}
339 
340 	D_ASSERT(data_size == bio->bi_size);
341 
342 	bio->bi_private = e;
343 	e->mdev = mdev;
344 	e->sector = sector;
345 	e->size = bio->bi_size;
346 
347 	e->private_bio = bio;
348 	e->block_id = id;
349 	INIT_HLIST_NODE(&e->colision);
350 	e->epoch = NULL;
351 	e->flags = 0;
352 
353 	trace_drbd_ee(mdev, e, "allocated");
354 
355 	return e;
356 
357  fail2:
358 	drbd_pp_free_bio_pages(mdev, bio);
359 	bio_put(bio);
360  fail1:
361 	mempool_free(e, drbd_ee_mempool);
362 
363 	return NULL;
364 }
365 
366 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
367 {
368 	struct bio *bio = e->private_bio;
369 	trace_drbd_ee(mdev, e, "freed");
370 	drbd_pp_free_bio_pages(mdev, bio);
371 	bio_put(bio);
372 	D_ASSERT(hlist_unhashed(&e->colision));
373 	mempool_free(e, drbd_ee_mempool);
374 }
375 
376 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
377 {
378 	LIST_HEAD(work_list);
379 	struct drbd_epoch_entry *e, *t;
380 	int count = 0;
381 
382 	spin_lock_irq(&mdev->req_lock);
383 	list_splice_init(list, &work_list);
384 	spin_unlock_irq(&mdev->req_lock);
385 
386 	list_for_each_entry_safe(e, t, &work_list, w.list) {
387 		drbd_free_ee(mdev, e);
388 		count++;
389 	}
390 	return count;
391 }
392 
393 
394 /*
395  * This function is called from _asender only_
396  * but see also comments in _req_mod(,barrier_acked)
397  * and receive_Barrier.
398  *
399  * Move entries from net_ee to done_ee, if ready.
400  * Grab done_ee, call all callbacks, free the entries.
401  * The callbacks typically send out ACKs.
402  */
403 static int drbd_process_done_ee(struct drbd_conf *mdev)
404 {
405 	LIST_HEAD(work_list);
406 	LIST_HEAD(reclaimed);
407 	struct drbd_epoch_entry *e, *t;
408 	int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
409 
410 	spin_lock_irq(&mdev->req_lock);
411 	reclaim_net_ee(mdev, &reclaimed);
412 	list_splice_init(&mdev->done_ee, &work_list);
413 	spin_unlock_irq(&mdev->req_lock);
414 
415 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
416 		drbd_free_ee(mdev, e);
417 
418 	/* possible callbacks here:
419 	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
420 	 * all ignore the last argument.
421 	 */
422 	list_for_each_entry_safe(e, t, &work_list, w.list) {
423 		trace_drbd_ee(mdev, e, "process_done_ee");
424 		/* list_del not necessary, next/prev members not touched */
425 		ok = e->w.cb(mdev, &e->w, !ok) && ok;
426 		drbd_free_ee(mdev, e);
427 	}
428 	wake_up(&mdev->ee_wait);
429 
430 	return ok;
431 }
432 
433 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
434 {
435 	DEFINE_WAIT(wait);
436 
437 	/* avoids spin_lock/unlock
438 	 * and calling prepare_to_wait in the fast path */
439 	while (!list_empty(head)) {
440 		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
441 		spin_unlock_irq(&mdev->req_lock);
442 		drbd_kick_lo(mdev);
443 		schedule();
444 		finish_wait(&mdev->ee_wait, &wait);
445 		spin_lock_irq(&mdev->req_lock);
446 	}
447 }
448 
449 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
450 {
451 	spin_lock_irq(&mdev->req_lock);
452 	_drbd_wait_ee_list_empty(mdev, head);
453 	spin_unlock_irq(&mdev->req_lock);
454 }
455 
456 /* see also kernel_accept; which is only present since 2.6.18.
457  * also we want to log which part of it failed, exactly */
458 static int drbd_accept(struct drbd_conf *mdev, const char **what,
459 		struct socket *sock, struct socket **newsock)
460 {
461 	struct sock *sk = sock->sk;
462 	int err = 0;
463 
464 	*what = "listen";
465 	err = sock->ops->listen(sock, 5);
466 	if (err < 0)
467 		goto out;
468 
469 	*what = "sock_create_lite";
470 	err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
471 			       newsock);
472 	if (err < 0)
473 		goto out;
474 
475 	*what = "accept";
476 	err = sock->ops->accept(sock, *newsock, 0);
477 	if (err < 0) {
478 		sock_release(*newsock);
479 		*newsock = NULL;
480 		goto out;
481 	}
482 	(*newsock)->ops  = sock->ops;
483 
484 out:
485 	return err;
486 }
487 
488 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
489 		    void *buf, size_t size, int flags)
490 {
491 	mm_segment_t oldfs;
492 	struct kvec iov = {
493 		.iov_base = buf,
494 		.iov_len = size,
495 	};
496 	struct msghdr msg = {
497 		.msg_iovlen = 1,
498 		.msg_iov = (struct iovec *)&iov,
499 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
500 	};
501 	int rv;
502 
503 	oldfs = get_fs();
504 	set_fs(KERNEL_DS);
505 	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
506 	set_fs(oldfs);
507 
508 	return rv;
509 }
510 
511 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
512 {
513 	mm_segment_t oldfs;
514 	struct kvec iov = {
515 		.iov_base = buf,
516 		.iov_len = size,
517 	};
518 	struct msghdr msg = {
519 		.msg_iovlen = 1,
520 		.msg_iov = (struct iovec *)&iov,
521 		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
522 	};
523 	int rv;
524 
525 	oldfs = get_fs();
526 	set_fs(KERNEL_DS);
527 
528 	for (;;) {
529 		rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
530 		if (rv == size)
531 			break;
532 
533 		/* Note:
534 		 * ECONNRESET	other side closed the connection
535 		 * ERESTARTSYS	(on  sock) we got a signal
536 		 */
537 
538 		if (rv < 0) {
539 			if (rv == -ECONNRESET)
540 				dev_info(DEV, "sock was reset by peer\n");
541 			else if (rv != -ERESTARTSYS)
542 				dev_err(DEV, "sock_recvmsg returned %d\n", rv);
543 			break;
544 		} else if (rv == 0) {
545 			dev_info(DEV, "sock was shut down by peer\n");
546 			break;
547 		} else	{
548 			/* signal came in, or peer/link went down,
549 			 * after we read a partial message
550 			 */
551 			/* D_ASSERT(signal_pending(current)); */
552 			break;
553 		}
554 	};
555 
556 	set_fs(oldfs);
557 
558 	if (rv != size)
559 		drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
560 
561 	return rv;
562 }
563 
564 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
565 {
566 	const char *what;
567 	struct socket *sock;
568 	struct sockaddr_in6 src_in6;
569 	int err;
570 	int disconnect_on_error = 1;
571 
572 	if (!get_net_conf(mdev))
573 		return NULL;
574 
575 	what = "sock_create_kern";
576 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
577 		SOCK_STREAM, IPPROTO_TCP, &sock);
578 	if (err < 0) {
579 		sock = NULL;
580 		goto out;
581 	}
582 
583 	sock->sk->sk_rcvtimeo =
584 	sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
585 
586        /* explicitly bind to the configured IP as source IP
587 	*  for the outgoing connections.
588 	*  This is needed for multihomed hosts and to be
589 	*  able to use lo: interfaces for drbd.
590 	* Make sure to use 0 as port number, so linux selects
591 	*  a free one dynamically.
592 	*/
593 	memcpy(&src_in6, mdev->net_conf->my_addr,
594 	       min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
595 	if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
596 		src_in6.sin6_port = 0;
597 	else
598 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
599 
600 	what = "bind before connect";
601 	err = sock->ops->bind(sock,
602 			      (struct sockaddr *) &src_in6,
603 			      mdev->net_conf->my_addr_len);
604 	if (err < 0)
605 		goto out;
606 
607 	/* connect may fail, peer not yet available.
608 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
609 	disconnect_on_error = 0;
610 	what = "connect";
611 	err = sock->ops->connect(sock,
612 				 (struct sockaddr *)mdev->net_conf->peer_addr,
613 				 mdev->net_conf->peer_addr_len, 0);
614 
615 out:
616 	if (err < 0) {
617 		if (sock) {
618 			sock_release(sock);
619 			sock = NULL;
620 		}
621 		switch (-err) {
622 			/* timeout, busy, signal pending */
623 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
624 		case EINTR: case ERESTARTSYS:
625 			/* peer not (yet) available, network problem */
626 		case ECONNREFUSED: case ENETUNREACH:
627 		case EHOSTDOWN:    case EHOSTUNREACH:
628 			disconnect_on_error = 0;
629 			break;
630 		default:
631 			dev_err(DEV, "%s failed, err = %d\n", what, err);
632 		}
633 		if (disconnect_on_error)
634 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
635 	}
636 	put_net_conf(mdev);
637 	return sock;
638 }
639 
640 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
641 {
642 	int timeo, err;
643 	struct socket *s_estab = NULL, *s_listen;
644 	const char *what;
645 
646 	if (!get_net_conf(mdev))
647 		return NULL;
648 
649 	what = "sock_create_kern";
650 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
651 		SOCK_STREAM, IPPROTO_TCP, &s_listen);
652 	if (err) {
653 		s_listen = NULL;
654 		goto out;
655 	}
656 
657 	timeo = mdev->net_conf->try_connect_int * HZ;
658 	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
659 
660 	s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
661 	s_listen->sk->sk_rcvtimeo = timeo;
662 	s_listen->sk->sk_sndtimeo = timeo;
663 
664 	what = "bind before listen";
665 	err = s_listen->ops->bind(s_listen,
666 			      (struct sockaddr *) mdev->net_conf->my_addr,
667 			      mdev->net_conf->my_addr_len);
668 	if (err < 0)
669 		goto out;
670 
671 	err = drbd_accept(mdev, &what, s_listen, &s_estab);
672 
673 out:
674 	if (s_listen)
675 		sock_release(s_listen);
676 	if (err < 0) {
677 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
678 			dev_err(DEV, "%s failed, err = %d\n", what, err);
679 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
680 		}
681 	}
682 	put_net_conf(mdev);
683 
684 	return s_estab;
685 }
686 
687 static int drbd_send_fp(struct drbd_conf *mdev,
688 	struct socket *sock, enum drbd_packets cmd)
689 {
690 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
691 
692 	return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
693 }
694 
695 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
696 {
697 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
698 	int rr;
699 
700 	rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
701 
702 	if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
703 		return be16_to_cpu(h->command);
704 
705 	return 0xffff;
706 }
707 
708 /**
709  * drbd_socket_okay() - Free the socket if its connection is not okay
710  * @mdev:	DRBD device.
711  * @sock:	pointer to the pointer to the socket.
712  */
713 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
714 {
715 	int rr;
716 	char tb[4];
717 
718 	if (!*sock)
719 		return FALSE;
720 
721 	rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
722 
723 	if (rr > 0 || rr == -EAGAIN) {
724 		return TRUE;
725 	} else {
726 		sock_release(*sock);
727 		*sock = NULL;
728 		return FALSE;
729 	}
730 }
731 
732 /*
733  * return values:
734  *   1 yes, we have a valid connection
735  *   0 oops, did not work out, please try again
736  *  -1 peer talks different language,
737  *     no point in trying again, please go standalone.
738  *  -2 We do not have a network config...
739  */
740 static int drbd_connect(struct drbd_conf *mdev)
741 {
742 	struct socket *s, *sock, *msock;
743 	int try, h, ok;
744 
745 	D_ASSERT(!mdev->data.socket);
746 
747 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
748 		dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
749 
750 	if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
751 		return -2;
752 
753 	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
754 
755 	sock  = NULL;
756 	msock = NULL;
757 
758 	do {
759 		for (try = 0;;) {
760 			/* 3 tries, this should take less than a second! */
761 			s = drbd_try_connect(mdev);
762 			if (s || ++try >= 3)
763 				break;
764 			/* give the other side time to call bind() & listen() */
765 			__set_current_state(TASK_INTERRUPTIBLE);
766 			schedule_timeout(HZ / 10);
767 		}
768 
769 		if (s) {
770 			if (!sock) {
771 				drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
772 				sock = s;
773 				s = NULL;
774 			} else if (!msock) {
775 				drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
776 				msock = s;
777 				s = NULL;
778 			} else {
779 				dev_err(DEV, "Logic error in drbd_connect()\n");
780 				goto out_release_sockets;
781 			}
782 		}
783 
784 		if (sock && msock) {
785 			__set_current_state(TASK_INTERRUPTIBLE);
786 			schedule_timeout(HZ / 10);
787 			ok = drbd_socket_okay(mdev, &sock);
788 			ok = drbd_socket_okay(mdev, &msock) && ok;
789 			if (ok)
790 				break;
791 		}
792 
793 retry:
794 		s = drbd_wait_for_connect(mdev);
795 		if (s) {
796 			try = drbd_recv_fp(mdev, s);
797 			drbd_socket_okay(mdev, &sock);
798 			drbd_socket_okay(mdev, &msock);
799 			switch (try) {
800 			case P_HAND_SHAKE_S:
801 				if (sock) {
802 					dev_warn(DEV, "initial packet S crossed\n");
803 					sock_release(sock);
804 				}
805 				sock = s;
806 				break;
807 			case P_HAND_SHAKE_M:
808 				if (msock) {
809 					dev_warn(DEV, "initial packet M crossed\n");
810 					sock_release(msock);
811 				}
812 				msock = s;
813 				set_bit(DISCARD_CONCURRENT, &mdev->flags);
814 				break;
815 			default:
816 				dev_warn(DEV, "Error receiving initial packet\n");
817 				sock_release(s);
818 				if (random32() & 1)
819 					goto retry;
820 			}
821 		}
822 
823 		if (mdev->state.conn <= C_DISCONNECTING)
824 			goto out_release_sockets;
825 		if (signal_pending(current)) {
826 			flush_signals(current);
827 			smp_rmb();
828 			if (get_t_state(&mdev->receiver) == Exiting)
829 				goto out_release_sockets;
830 		}
831 
832 		if (sock && msock) {
833 			ok = drbd_socket_okay(mdev, &sock);
834 			ok = drbd_socket_okay(mdev, &msock) && ok;
835 			if (ok)
836 				break;
837 		}
838 	} while (1);
839 
840 	msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
841 	sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
842 
843 	sock->sk->sk_allocation = GFP_NOIO;
844 	msock->sk->sk_allocation = GFP_NOIO;
845 
846 	sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
847 	msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
848 
849 	if (mdev->net_conf->sndbuf_size) {
850 		sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
851 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
852 	}
853 
854 	if (mdev->net_conf->rcvbuf_size) {
855 		sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
856 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
857 	}
858 
859 	/* NOT YET ...
860 	 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
861 	 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
862 	 * first set it to the P_HAND_SHAKE timeout,
863 	 * which we set to 4x the configured ping_timeout. */
864 	sock->sk->sk_sndtimeo =
865 	sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
866 
867 	msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
868 	msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
869 
870 	/* we don't want delays.
871 	 * we use TCP_CORK where apropriate, though */
872 	drbd_tcp_nodelay(sock);
873 	drbd_tcp_nodelay(msock);
874 
875 	mdev->data.socket = sock;
876 	mdev->meta.socket = msock;
877 	mdev->last_received = jiffies;
878 
879 	D_ASSERT(mdev->asender.task == NULL);
880 
881 	h = drbd_do_handshake(mdev);
882 	if (h <= 0)
883 		return h;
884 
885 	if (mdev->cram_hmac_tfm) {
886 		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
887 		if (!drbd_do_auth(mdev)) {
888 			dev_err(DEV, "Authentication of peer failed\n");
889 			return -1;
890 		}
891 	}
892 
893 	if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
894 		return 0;
895 
896 	sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
897 	sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
898 
899 	atomic_set(&mdev->packet_seq, 0);
900 	mdev->peer_seq = 0;
901 
902 	drbd_thread_start(&mdev->asender);
903 
904 	drbd_send_protocol(mdev);
905 	drbd_send_sync_param(mdev, &mdev->sync_conf);
906 	drbd_send_sizes(mdev, 0);
907 	drbd_send_uuids(mdev);
908 	drbd_send_state(mdev);
909 	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
910 	clear_bit(RESIZE_PENDING, &mdev->flags);
911 
912 	return 1;
913 
914 out_release_sockets:
915 	if (sock)
916 		sock_release(sock);
917 	if (msock)
918 		sock_release(msock);
919 	return -1;
920 }
921 
922 static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
923 {
924 	int r;
925 
926 	r = drbd_recv(mdev, h, sizeof(*h));
927 
928 	if (unlikely(r != sizeof(*h))) {
929 		dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
930 		return FALSE;
931 	};
932 	h->command = be16_to_cpu(h->command);
933 	h->length  = be16_to_cpu(h->length);
934 	if (unlikely(h->magic != BE_DRBD_MAGIC)) {
935 		dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
936 		    (long)be32_to_cpu(h->magic),
937 		    h->command, h->length);
938 		return FALSE;
939 	}
940 	mdev->last_received = jiffies;
941 
942 	return TRUE;
943 }
944 
945 static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
946 {
947 	int rv;
948 
949 	if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
950 		rv = blkdev_issue_flush(mdev->ldev->backing_bdev, NULL);
951 		if (rv) {
952 			dev_err(DEV, "local disk flush failed with status %d\n", rv);
953 			/* would rather check on EOPNOTSUPP, but that is not reliable.
954 			 * don't try again for ANY return value != 0
955 			 * if (rv == -EOPNOTSUPP) */
956 			drbd_bump_write_ordering(mdev, WO_drain_io);
957 		}
958 		put_ldev(mdev);
959 	}
960 
961 	return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
962 }
963 
964 static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
965 {
966 	struct flush_work *fw = (struct flush_work *)w;
967 	struct drbd_epoch *epoch = fw->epoch;
968 
969 	kfree(w);
970 
971 	if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
972 		drbd_flush_after_epoch(mdev, epoch);
973 
974 	drbd_may_finish_epoch(mdev, epoch, EV_PUT |
975 			      (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
976 
977 	return 1;
978 }
979 
980 /**
981  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
982  * @mdev:	DRBD device.
983  * @epoch:	Epoch object.
984  * @ev:		Epoch event.
985  */
986 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
987 					       struct drbd_epoch *epoch,
988 					       enum epoch_event ev)
989 {
990 	int finish, epoch_size;
991 	struct drbd_epoch *next_epoch;
992 	int schedule_flush = 0;
993 	enum finish_epoch rv = FE_STILL_LIVE;
994 
995 	spin_lock(&mdev->epoch_lock);
996 	do {
997 		next_epoch = NULL;
998 		finish = 0;
999 
1000 		epoch_size = atomic_read(&epoch->epoch_size);
1001 
1002 		switch (ev & ~EV_CLEANUP) {
1003 		case EV_PUT:
1004 			atomic_dec(&epoch->active);
1005 			break;
1006 		case EV_GOT_BARRIER_NR:
1007 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1008 
1009 			/* Special case: If we just switched from WO_bio_barrier to
1010 			   WO_bdev_flush we should not finish the current epoch */
1011 			if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1012 			    mdev->write_ordering != WO_bio_barrier &&
1013 			    epoch == mdev->current_epoch)
1014 				clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1015 			break;
1016 		case EV_BARRIER_DONE:
1017 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1018 			break;
1019 		case EV_BECAME_LAST:
1020 			/* nothing to do*/
1021 			break;
1022 		}
1023 
1024 		trace_drbd_epoch(mdev, epoch, ev);
1025 
1026 		if (epoch_size != 0 &&
1027 		    atomic_read(&epoch->active) == 0 &&
1028 		    test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1029 		    epoch->list.prev == &mdev->current_epoch->list &&
1030 		    !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1031 			/* Nearly all conditions are met to finish that epoch... */
1032 			if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1033 			    mdev->write_ordering == WO_none ||
1034 			    (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1035 			    ev & EV_CLEANUP) {
1036 				finish = 1;
1037 				set_bit(DE_IS_FINISHING, &epoch->flags);
1038 			} else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1039 				 mdev->write_ordering == WO_bio_barrier) {
1040 				atomic_inc(&epoch->active);
1041 				schedule_flush = 1;
1042 			}
1043 		}
1044 		if (finish) {
1045 			if (!(ev & EV_CLEANUP)) {
1046 				spin_unlock(&mdev->epoch_lock);
1047 				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1048 				spin_lock(&mdev->epoch_lock);
1049 			}
1050 			dec_unacked(mdev);
1051 
1052 			if (mdev->current_epoch != epoch) {
1053 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1054 				list_del(&epoch->list);
1055 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1056 				mdev->epochs--;
1057 				trace_drbd_epoch(mdev, epoch, EV_TRACE_FREE);
1058 				kfree(epoch);
1059 
1060 				if (rv == FE_STILL_LIVE)
1061 					rv = FE_DESTROYED;
1062 			} else {
1063 				epoch->flags = 0;
1064 				atomic_set(&epoch->epoch_size, 0);
1065 				/* atomic_set(&epoch->active, 0); is alrady zero */
1066 				if (rv == FE_STILL_LIVE)
1067 					rv = FE_RECYCLED;
1068 			}
1069 		}
1070 
1071 		if (!next_epoch)
1072 			break;
1073 
1074 		epoch = next_epoch;
1075 	} while (1);
1076 
1077 	spin_unlock(&mdev->epoch_lock);
1078 
1079 	if (schedule_flush) {
1080 		struct flush_work *fw;
1081 		fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1082 		if (fw) {
1083 			trace_drbd_epoch(mdev, epoch, EV_TRACE_FLUSH);
1084 			fw->w.cb = w_flush;
1085 			fw->epoch = epoch;
1086 			drbd_queue_work(&mdev->data.work, &fw->w);
1087 		} else {
1088 			dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1089 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1090 			/* That is not a recursion, only one level */
1091 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1092 			drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1093 		}
1094 	}
1095 
1096 	return rv;
1097 }
1098 
1099 /**
1100  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1101  * @mdev:	DRBD device.
1102  * @wo:		Write ordering method to try.
1103  */
1104 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1105 {
1106 	enum write_ordering_e pwo;
1107 	static char *write_ordering_str[] = {
1108 		[WO_none] = "none",
1109 		[WO_drain_io] = "drain",
1110 		[WO_bdev_flush] = "flush",
1111 		[WO_bio_barrier] = "barrier",
1112 	};
1113 
1114 	pwo = mdev->write_ordering;
1115 	wo = min(pwo, wo);
1116 	if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1117 		wo = WO_bdev_flush;
1118 	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1119 		wo = WO_drain_io;
1120 	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1121 		wo = WO_none;
1122 	mdev->write_ordering = wo;
1123 	if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1124 		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1125 }
1126 
1127 /**
1128  * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1129  * @mdev:	DRBD device.
1130  * @w:		work object.
1131  * @cancel:	The connection will be closed anyways (unused in this callback)
1132  */
1133 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1134 {
1135 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1136 	struct bio *bio = e->private_bio;
1137 
1138 	/* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1139 	   (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1140 	   so that we can finish that epoch in drbd_may_finish_epoch().
1141 	   That is necessary if we already have a long chain of Epochs, before
1142 	   we realize that BIO_RW_BARRIER is actually not supported */
1143 
1144 	/* As long as the -ENOTSUPP on the barrier is reported immediately
1145 	   that will never trigger. If it is reported late, we will just
1146 	   print that warning and continue correctly for all future requests
1147 	   with WO_bdev_flush */
1148 	if (previous_epoch(mdev, e->epoch))
1149 		dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1150 
1151 	/* prepare bio for re-submit,
1152 	 * re-init volatile members */
1153 	/* we still have a local reference,
1154 	 * get_ldev was done in receive_Data. */
1155 	bio->bi_bdev = mdev->ldev->backing_bdev;
1156 	bio->bi_sector = e->sector;
1157 	bio->bi_size = e->size;
1158 	bio->bi_idx = 0;
1159 
1160 	bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1161 	bio->bi_flags |= 1 << BIO_UPTODATE;
1162 
1163 	/* don't know whether this is necessary: */
1164 	bio->bi_phys_segments = 0;
1165 	bio->bi_next = NULL;
1166 
1167 	/* these should be unchanged: */
1168 	/* bio->bi_end_io = drbd_endio_write_sec; */
1169 	/* bio->bi_vcnt = whatever; */
1170 
1171 	e->w.cb = e_end_block;
1172 
1173 	/* This is no longer a barrier request. */
1174 	bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
1175 
1176 	drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
1177 
1178 	return 1;
1179 }
1180 
1181 static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1182 {
1183 	int rv, issue_flush;
1184 	struct p_barrier *p = (struct p_barrier *)h;
1185 	struct drbd_epoch *epoch;
1186 
1187 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1188 
1189 	rv = drbd_recv(mdev, h->payload, h->length);
1190 	ERR_IF(rv != h->length) return FALSE;
1191 
1192 	inc_unacked(mdev);
1193 
1194 	if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1195 		drbd_kick_lo(mdev);
1196 
1197 	mdev->current_epoch->barrier_nr = p->barrier;
1198 	rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1199 
1200 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1201 	 * the activity log, which means it would not be resynced in case the
1202 	 * R_PRIMARY crashes now.
1203 	 * Therefore we must send the barrier_ack after the barrier request was
1204 	 * completed. */
1205 	switch (mdev->write_ordering) {
1206 	case WO_bio_barrier:
1207 	case WO_none:
1208 		if (rv == FE_RECYCLED)
1209 			return TRUE;
1210 		break;
1211 
1212 	case WO_bdev_flush:
1213 	case WO_drain_io:
1214 		D_ASSERT(rv == FE_STILL_LIVE);
1215 		set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1216 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1217 		rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1218 		if (rv == FE_RECYCLED)
1219 			return TRUE;
1220 
1221 		/* The asender will send all the ACKs and barrier ACKs out, since
1222 		   all EEs moved from the active_ee to the done_ee. We need to
1223 		   provide a new epoch object for the EEs that come in soon */
1224 		break;
1225 	}
1226 
1227 	/* receiver context, in the writeout path of the other node.
1228 	 * avoid potential distributed deadlock */
1229 	epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1230 	if (!epoch) {
1231 		dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1232 		issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1233 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1234 		if (issue_flush) {
1235 			rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1236 			if (rv == FE_RECYCLED)
1237 				return TRUE;
1238 		}
1239 
1240 		drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1241 
1242 		return TRUE;
1243 	}
1244 
1245 	epoch->flags = 0;
1246 	atomic_set(&epoch->epoch_size, 0);
1247 	atomic_set(&epoch->active, 0);
1248 
1249 	spin_lock(&mdev->epoch_lock);
1250 	if (atomic_read(&mdev->current_epoch->epoch_size)) {
1251 		list_add(&epoch->list, &mdev->current_epoch->list);
1252 		mdev->current_epoch = epoch;
1253 		mdev->epochs++;
1254 		trace_drbd_epoch(mdev, epoch, EV_TRACE_ALLOC);
1255 	} else {
1256 		/* The current_epoch got recycled while we allocated this one... */
1257 		kfree(epoch);
1258 	}
1259 	spin_unlock(&mdev->epoch_lock);
1260 
1261 	return TRUE;
1262 }
1263 
1264 /* used from receive_RSDataReply (recv_resync_read)
1265  * and from receive_Data */
1266 static struct drbd_epoch_entry *
1267 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1268 {
1269 	struct drbd_epoch_entry *e;
1270 	struct bio_vec *bvec;
1271 	struct page *page;
1272 	struct bio *bio;
1273 	int dgs, ds, i, rr;
1274 	void *dig_in = mdev->int_dig_in;
1275 	void *dig_vv = mdev->int_dig_vv;
1276 
1277 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1278 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1279 
1280 	if (dgs) {
1281 		rr = drbd_recv(mdev, dig_in, dgs);
1282 		if (rr != dgs) {
1283 			dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1284 			     rr, dgs);
1285 			return NULL;
1286 		}
1287 	}
1288 
1289 	data_size -= dgs;
1290 
1291 	ERR_IF(data_size &  0x1ff) return NULL;
1292 	ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1293 
1294 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1295 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1296 	 * which in turn might block on the other node at this very place.  */
1297 	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1298 	if (!e)
1299 		return NULL;
1300 	bio = e->private_bio;
1301 	ds = data_size;
1302 	bio_for_each_segment(bvec, bio, i) {
1303 		page = bvec->bv_page;
1304 		rr = drbd_recv(mdev, kmap(page), min_t(int, ds, PAGE_SIZE));
1305 		kunmap(page);
1306 		if (rr != min_t(int, ds, PAGE_SIZE)) {
1307 			drbd_free_ee(mdev, e);
1308 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1309 			     rr, min_t(int, ds, PAGE_SIZE));
1310 			return NULL;
1311 		}
1312 		ds -= rr;
1313 	}
1314 
1315 	if (dgs) {
1316 		drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1317 		if (memcmp(dig_in, dig_vv, dgs)) {
1318 			dev_err(DEV, "Digest integrity check FAILED.\n");
1319 			drbd_bcast_ee(mdev, "digest failed",
1320 					dgs, dig_in, dig_vv, e);
1321 			drbd_free_ee(mdev, e);
1322 			return NULL;
1323 		}
1324 	}
1325 	mdev->recv_cnt += data_size>>9;
1326 	return e;
1327 }
1328 
1329 /* drbd_drain_block() just takes a data block
1330  * out of the socket input buffer, and discards it.
1331  */
1332 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1333 {
1334 	struct page *page;
1335 	int rr, rv = 1;
1336 	void *data;
1337 
1338 	page = drbd_pp_alloc(mdev, 1);
1339 
1340 	data = kmap(page);
1341 	while (data_size) {
1342 		rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1343 		if (rr != min_t(int, data_size, PAGE_SIZE)) {
1344 			rv = 0;
1345 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1346 			     rr, min_t(int, data_size, PAGE_SIZE));
1347 			break;
1348 		}
1349 		data_size -= rr;
1350 	}
1351 	kunmap(page);
1352 	drbd_pp_free(mdev, page);
1353 	return rv;
1354 }
1355 
1356 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1357 			   sector_t sector, int data_size)
1358 {
1359 	struct bio_vec *bvec;
1360 	struct bio *bio;
1361 	int dgs, rr, i, expect;
1362 	void *dig_in = mdev->int_dig_in;
1363 	void *dig_vv = mdev->int_dig_vv;
1364 
1365 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1366 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1367 
1368 	if (dgs) {
1369 		rr = drbd_recv(mdev, dig_in, dgs);
1370 		if (rr != dgs) {
1371 			dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1372 			     rr, dgs);
1373 			return 0;
1374 		}
1375 	}
1376 
1377 	data_size -= dgs;
1378 
1379 	/* optimistically update recv_cnt.  if receiving fails below,
1380 	 * we disconnect anyways, and counters will be reset. */
1381 	mdev->recv_cnt += data_size>>9;
1382 
1383 	bio = req->master_bio;
1384 	D_ASSERT(sector == bio->bi_sector);
1385 
1386 	bio_for_each_segment(bvec, bio, i) {
1387 		expect = min_t(int, data_size, bvec->bv_len);
1388 		rr = drbd_recv(mdev,
1389 			     kmap(bvec->bv_page)+bvec->bv_offset,
1390 			     expect);
1391 		kunmap(bvec->bv_page);
1392 		if (rr != expect) {
1393 			dev_warn(DEV, "short read receiving data reply: "
1394 			     "read %d expected %d\n",
1395 			     rr, expect);
1396 			return 0;
1397 		}
1398 		data_size -= rr;
1399 	}
1400 
1401 	if (dgs) {
1402 		drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1403 		if (memcmp(dig_in, dig_vv, dgs)) {
1404 			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1405 			return 0;
1406 		}
1407 	}
1408 
1409 	D_ASSERT(data_size == 0);
1410 	return 1;
1411 }
1412 
1413 /* e_end_resync_block() is called via
1414  * drbd_process_done_ee() by asender only */
1415 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1416 {
1417 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1418 	sector_t sector = e->sector;
1419 	int ok;
1420 
1421 	D_ASSERT(hlist_unhashed(&e->colision));
1422 
1423 	if (likely(drbd_bio_uptodate(e->private_bio))) {
1424 		drbd_set_in_sync(mdev, sector, e->size);
1425 		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1426 	} else {
1427 		/* Record failure to sync */
1428 		drbd_rs_failed_io(mdev, sector, e->size);
1429 
1430 		ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1431 	}
1432 	dec_unacked(mdev);
1433 
1434 	return ok;
1435 }
1436 
1437 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1438 {
1439 	struct drbd_epoch_entry *e;
1440 
1441 	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1442 	if (!e) {
1443 		put_ldev(mdev);
1444 		return FALSE;
1445 	}
1446 
1447 	dec_rs_pending(mdev);
1448 
1449 	e->private_bio->bi_end_io = drbd_endio_write_sec;
1450 	e->private_bio->bi_rw = WRITE;
1451 	e->w.cb = e_end_resync_block;
1452 
1453 	inc_unacked(mdev);
1454 	/* corresponding dec_unacked() in e_end_resync_block()
1455 	 * respective _drbd_clear_done_ee */
1456 
1457 	spin_lock_irq(&mdev->req_lock);
1458 	list_add(&e->w.list, &mdev->sync_ee);
1459 	spin_unlock_irq(&mdev->req_lock);
1460 
1461 	trace_drbd_ee(mdev, e, "submitting for (rs)write");
1462 	trace_drbd_bio(mdev, "Sec", e->private_bio, 0, NULL);
1463 	drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
1464 	/* accounting done in endio */
1465 
1466 	maybe_kick_lo(mdev);
1467 	return TRUE;
1468 }
1469 
1470 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1471 {
1472 	struct drbd_request *req;
1473 	sector_t sector;
1474 	unsigned int header_size, data_size;
1475 	int ok;
1476 	struct p_data *p = (struct p_data *)h;
1477 
1478 	header_size = sizeof(*p) - sizeof(*h);
1479 	data_size   = h->length  - header_size;
1480 
1481 	ERR_IF(data_size == 0) return FALSE;
1482 
1483 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1484 		return FALSE;
1485 
1486 	sector = be64_to_cpu(p->sector);
1487 
1488 	spin_lock_irq(&mdev->req_lock);
1489 	req = _ar_id_to_req(mdev, p->block_id, sector);
1490 	spin_unlock_irq(&mdev->req_lock);
1491 	if (unlikely(!req)) {
1492 		dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1493 		return FALSE;
1494 	}
1495 
1496 	/* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1497 	 * special casing it there for the various failure cases.
1498 	 * still no race with drbd_fail_pending_reads */
1499 	ok = recv_dless_read(mdev, req, sector, data_size);
1500 
1501 	if (ok)
1502 		req_mod(req, data_received);
1503 	/* else: nothing. handled from drbd_disconnect...
1504 	 * I don't think we may complete this just yet
1505 	 * in case we are "on-disconnect: freeze" */
1506 
1507 	return ok;
1508 }
1509 
1510 static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1511 {
1512 	sector_t sector;
1513 	unsigned int header_size, data_size;
1514 	int ok;
1515 	struct p_data *p = (struct p_data *)h;
1516 
1517 	header_size = sizeof(*p) - sizeof(*h);
1518 	data_size   = h->length  - header_size;
1519 
1520 	ERR_IF(data_size == 0) return FALSE;
1521 
1522 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1523 		return FALSE;
1524 
1525 	sector = be64_to_cpu(p->sector);
1526 	D_ASSERT(p->block_id == ID_SYNCER);
1527 
1528 	if (get_ldev(mdev)) {
1529 		/* data is submitted to disk within recv_resync_read.
1530 		 * corresponding put_ldev done below on error,
1531 		 * or in drbd_endio_write_sec. */
1532 		ok = recv_resync_read(mdev, sector, data_size);
1533 	} else {
1534 		if (__ratelimit(&drbd_ratelimit_state))
1535 			dev_err(DEV, "Can not write resync data to local disk.\n");
1536 
1537 		ok = drbd_drain_block(mdev, data_size);
1538 
1539 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1540 	}
1541 
1542 	return ok;
1543 }
1544 
1545 /* e_end_block() is called via drbd_process_done_ee().
1546  * this means this function only runs in the asender thread
1547  */
1548 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1549 {
1550 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1551 	sector_t sector = e->sector;
1552 	struct drbd_epoch *epoch;
1553 	int ok = 1, pcmd;
1554 
1555 	if (e->flags & EE_IS_BARRIER) {
1556 		epoch = previous_epoch(mdev, e->epoch);
1557 		if (epoch)
1558 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1559 	}
1560 
1561 	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1562 		if (likely(drbd_bio_uptodate(e->private_bio))) {
1563 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1564 				mdev->state.conn <= C_PAUSED_SYNC_T &&
1565 				e->flags & EE_MAY_SET_IN_SYNC) ?
1566 				P_RS_WRITE_ACK : P_WRITE_ACK;
1567 			ok &= drbd_send_ack(mdev, pcmd, e);
1568 			if (pcmd == P_RS_WRITE_ACK)
1569 				drbd_set_in_sync(mdev, sector, e->size);
1570 		} else {
1571 			ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1572 			/* we expect it to be marked out of sync anyways...
1573 			 * maybe assert this?  */
1574 		}
1575 		dec_unacked(mdev);
1576 	}
1577 	/* we delete from the conflict detection hash _after_ we sent out the
1578 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1579 	if (mdev->net_conf->two_primaries) {
1580 		spin_lock_irq(&mdev->req_lock);
1581 		D_ASSERT(!hlist_unhashed(&e->colision));
1582 		hlist_del_init(&e->colision);
1583 		spin_unlock_irq(&mdev->req_lock);
1584 	} else {
1585 		D_ASSERT(hlist_unhashed(&e->colision));
1586 	}
1587 
1588 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1589 
1590 	return ok;
1591 }
1592 
1593 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1594 {
1595 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1596 	int ok = 1;
1597 
1598 	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1599 	ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1600 
1601 	spin_lock_irq(&mdev->req_lock);
1602 	D_ASSERT(!hlist_unhashed(&e->colision));
1603 	hlist_del_init(&e->colision);
1604 	spin_unlock_irq(&mdev->req_lock);
1605 
1606 	dec_unacked(mdev);
1607 
1608 	return ok;
1609 }
1610 
1611 /* Called from receive_Data.
1612  * Synchronize packets on sock with packets on msock.
1613  *
1614  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1615  * packet traveling on msock, they are still processed in the order they have
1616  * been sent.
1617  *
1618  * Note: we don't care for Ack packets overtaking P_DATA packets.
1619  *
1620  * In case packet_seq is larger than mdev->peer_seq number, there are
1621  * outstanding packets on the msock. We wait for them to arrive.
1622  * In case we are the logically next packet, we update mdev->peer_seq
1623  * ourselves. Correctly handles 32bit wrap around.
1624  *
1625  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1626  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1627  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1628  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1629  *
1630  * returns 0 if we may process the packet,
1631  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1632 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1633 {
1634 	DEFINE_WAIT(wait);
1635 	unsigned int p_seq;
1636 	long timeout;
1637 	int ret = 0;
1638 	spin_lock(&mdev->peer_seq_lock);
1639 	for (;;) {
1640 		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1641 		if (seq_le(packet_seq, mdev->peer_seq+1))
1642 			break;
1643 		if (signal_pending(current)) {
1644 			ret = -ERESTARTSYS;
1645 			break;
1646 		}
1647 		p_seq = mdev->peer_seq;
1648 		spin_unlock(&mdev->peer_seq_lock);
1649 		timeout = schedule_timeout(30*HZ);
1650 		spin_lock(&mdev->peer_seq_lock);
1651 		if (timeout == 0 && p_seq == mdev->peer_seq) {
1652 			ret = -ETIMEDOUT;
1653 			dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1654 			break;
1655 		}
1656 	}
1657 	finish_wait(&mdev->seq_wait, &wait);
1658 	if (mdev->peer_seq+1 == packet_seq)
1659 		mdev->peer_seq++;
1660 	spin_unlock(&mdev->peer_seq_lock);
1661 	return ret;
1662 }
1663 
1664 /* mirrored write */
1665 static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1666 {
1667 	sector_t sector;
1668 	struct drbd_epoch_entry *e;
1669 	struct p_data *p = (struct p_data *)h;
1670 	int header_size, data_size;
1671 	int rw = WRITE;
1672 	u32 dp_flags;
1673 
1674 	header_size = sizeof(*p) - sizeof(*h);
1675 	data_size   = h->length  - header_size;
1676 
1677 	ERR_IF(data_size == 0) return FALSE;
1678 
1679 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1680 		return FALSE;
1681 
1682 	if (!get_ldev(mdev)) {
1683 		if (__ratelimit(&drbd_ratelimit_state))
1684 			dev_err(DEV, "Can not write mirrored data block "
1685 			    "to local disk.\n");
1686 		spin_lock(&mdev->peer_seq_lock);
1687 		if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1688 			mdev->peer_seq++;
1689 		spin_unlock(&mdev->peer_seq_lock);
1690 
1691 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1692 		atomic_inc(&mdev->current_epoch->epoch_size);
1693 		return drbd_drain_block(mdev, data_size);
1694 	}
1695 
1696 	/* get_ldev(mdev) successful.
1697 	 * Corresponding put_ldev done either below (on various errors),
1698 	 * or in drbd_endio_write_sec, if we successfully submit the data at
1699 	 * the end of this function. */
1700 
1701 	sector = be64_to_cpu(p->sector);
1702 	e = read_in_block(mdev, p->block_id, sector, data_size);
1703 	if (!e) {
1704 		put_ldev(mdev);
1705 		return FALSE;
1706 	}
1707 
1708 	e->private_bio->bi_end_io = drbd_endio_write_sec;
1709 	e->w.cb = e_end_block;
1710 
1711 	spin_lock(&mdev->epoch_lock);
1712 	e->epoch = mdev->current_epoch;
1713 	atomic_inc(&e->epoch->epoch_size);
1714 	atomic_inc(&e->epoch->active);
1715 
1716 	if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1717 		struct drbd_epoch *epoch;
1718 		/* Issue a barrier if we start a new epoch, and the previous epoch
1719 		   was not a epoch containing a single request which already was
1720 		   a Barrier. */
1721 		epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1722 		if (epoch == e->epoch) {
1723 			set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1724 			trace_drbd_epoch(mdev, e->epoch, EV_TRACE_ADD_BARRIER);
1725 			rw |= (1<<BIO_RW_BARRIER);
1726 			e->flags |= EE_IS_BARRIER;
1727 		} else {
1728 			if (atomic_read(&epoch->epoch_size) > 1 ||
1729 			    !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1730 				set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1731 				trace_drbd_epoch(mdev, epoch, EV_TRACE_SETTING_BI);
1732 				set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1733 				trace_drbd_epoch(mdev, e->epoch, EV_TRACE_ADD_BARRIER);
1734 				rw |= (1<<BIO_RW_BARRIER);
1735 				e->flags |= EE_IS_BARRIER;
1736 			}
1737 		}
1738 	}
1739 	spin_unlock(&mdev->epoch_lock);
1740 
1741 	dp_flags = be32_to_cpu(p->dp_flags);
1742 	if (dp_flags & DP_HARDBARRIER) {
1743 		dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1744 		/* rw |= (1<<BIO_RW_BARRIER); */
1745 	}
1746 	if (dp_flags & DP_RW_SYNC)
1747 		rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1748 	if (dp_flags & DP_MAY_SET_IN_SYNC)
1749 		e->flags |= EE_MAY_SET_IN_SYNC;
1750 
1751 	/* I'm the receiver, I do hold a net_cnt reference. */
1752 	if (!mdev->net_conf->two_primaries) {
1753 		spin_lock_irq(&mdev->req_lock);
1754 	} else {
1755 		/* don't get the req_lock yet,
1756 		 * we may sleep in drbd_wait_peer_seq */
1757 		const int size = e->size;
1758 		const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1759 		DEFINE_WAIT(wait);
1760 		struct drbd_request *i;
1761 		struct hlist_node *n;
1762 		struct hlist_head *slot;
1763 		int first;
1764 
1765 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1766 		BUG_ON(mdev->ee_hash == NULL);
1767 		BUG_ON(mdev->tl_hash == NULL);
1768 
1769 		/* conflict detection and handling:
1770 		 * 1. wait on the sequence number,
1771 		 *    in case this data packet overtook ACK packets.
1772 		 * 2. check our hash tables for conflicting requests.
1773 		 *    we only need to walk the tl_hash, since an ee can not
1774 		 *    have a conflict with an other ee: on the submitting
1775 		 *    node, the corresponding req had already been conflicting,
1776 		 *    and a conflicting req is never sent.
1777 		 *
1778 		 * Note: for two_primaries, we are protocol C,
1779 		 * so there cannot be any request that is DONE
1780 		 * but still on the transfer log.
1781 		 *
1782 		 * unconditionally add to the ee_hash.
1783 		 *
1784 		 * if no conflicting request is found:
1785 		 *    submit.
1786 		 *
1787 		 * if any conflicting request is found
1788 		 * that has not yet been acked,
1789 		 * AND I have the "discard concurrent writes" flag:
1790 		 *	 queue (via done_ee) the P_DISCARD_ACK; OUT.
1791 		 *
1792 		 * if any conflicting request is found:
1793 		 *	 block the receiver, waiting on misc_wait
1794 		 *	 until no more conflicting requests are there,
1795 		 *	 or we get interrupted (disconnect).
1796 		 *
1797 		 *	 we do not just write after local io completion of those
1798 		 *	 requests, but only after req is done completely, i.e.
1799 		 *	 we wait for the P_DISCARD_ACK to arrive!
1800 		 *
1801 		 *	 then proceed normally, i.e. submit.
1802 		 */
1803 		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1804 			goto out_interrupted;
1805 
1806 		spin_lock_irq(&mdev->req_lock);
1807 
1808 		hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1809 
1810 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1811 		slot = tl_hash_slot(mdev, sector);
1812 		first = 1;
1813 		for (;;) {
1814 			int have_unacked = 0;
1815 			int have_conflict = 0;
1816 			prepare_to_wait(&mdev->misc_wait, &wait,
1817 				TASK_INTERRUPTIBLE);
1818 			hlist_for_each_entry(i, n, slot, colision) {
1819 				if (OVERLAPS) {
1820 					/* only ALERT on first iteration,
1821 					 * we may be woken up early... */
1822 					if (first)
1823 						dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1824 						      "	new: %llus +%u; pending: %llus +%u\n",
1825 						      current->comm, current->pid,
1826 						      (unsigned long long)sector, size,
1827 						      (unsigned long long)i->sector, i->size);
1828 					if (i->rq_state & RQ_NET_PENDING)
1829 						++have_unacked;
1830 					++have_conflict;
1831 				}
1832 			}
1833 #undef OVERLAPS
1834 			if (!have_conflict)
1835 				break;
1836 
1837 			/* Discard Ack only for the _first_ iteration */
1838 			if (first && discard && have_unacked) {
1839 				dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1840 				     (unsigned long long)sector);
1841 				inc_unacked(mdev);
1842 				e->w.cb = e_send_discard_ack;
1843 				list_add_tail(&e->w.list, &mdev->done_ee);
1844 
1845 				spin_unlock_irq(&mdev->req_lock);
1846 
1847 				/* we could probably send that P_DISCARD_ACK ourselves,
1848 				 * but I don't like the receiver using the msock */
1849 
1850 				put_ldev(mdev);
1851 				wake_asender(mdev);
1852 				finish_wait(&mdev->misc_wait, &wait);
1853 				return TRUE;
1854 			}
1855 
1856 			if (signal_pending(current)) {
1857 				hlist_del_init(&e->colision);
1858 
1859 				spin_unlock_irq(&mdev->req_lock);
1860 
1861 				finish_wait(&mdev->misc_wait, &wait);
1862 				goto out_interrupted;
1863 			}
1864 
1865 			spin_unlock_irq(&mdev->req_lock);
1866 			if (first) {
1867 				first = 0;
1868 				dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1869 				     "sec=%llus\n", (unsigned long long)sector);
1870 			} else if (discard) {
1871 				/* we had none on the first iteration.
1872 				 * there must be none now. */
1873 				D_ASSERT(have_unacked == 0);
1874 			}
1875 			schedule();
1876 			spin_lock_irq(&mdev->req_lock);
1877 		}
1878 		finish_wait(&mdev->misc_wait, &wait);
1879 	}
1880 
1881 	list_add(&e->w.list, &mdev->active_ee);
1882 	spin_unlock_irq(&mdev->req_lock);
1883 
1884 	switch (mdev->net_conf->wire_protocol) {
1885 	case DRBD_PROT_C:
1886 		inc_unacked(mdev);
1887 		/* corresponding dec_unacked() in e_end_block()
1888 		 * respective _drbd_clear_done_ee */
1889 		break;
1890 	case DRBD_PROT_B:
1891 		/* I really don't like it that the receiver thread
1892 		 * sends on the msock, but anyways */
1893 		drbd_send_ack(mdev, P_RECV_ACK, e);
1894 		break;
1895 	case DRBD_PROT_A:
1896 		/* nothing to do */
1897 		break;
1898 	}
1899 
1900 	if (mdev->state.pdsk == D_DISKLESS) {
1901 		/* In case we have the only disk of the cluster, */
1902 		drbd_set_out_of_sync(mdev, e->sector, e->size);
1903 		e->flags |= EE_CALL_AL_COMPLETE_IO;
1904 		drbd_al_begin_io(mdev, e->sector);
1905 	}
1906 
1907 	e->private_bio->bi_rw = rw;
1908 	trace_drbd_ee(mdev, e, "submitting for (data)write");
1909 	trace_drbd_bio(mdev, "Sec", e->private_bio, 0, NULL);
1910 	drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
1911 	/* accounting done in endio */
1912 
1913 	maybe_kick_lo(mdev);
1914 	return TRUE;
1915 
1916 out_interrupted:
1917 	/* yes, the epoch_size now is imbalanced.
1918 	 * but we drop the connection anyways, so we don't have a chance to
1919 	 * receive a barrier... atomic_inc(&mdev->epoch_size); */
1920 	put_ldev(mdev);
1921 	drbd_free_ee(mdev, e);
1922 	return FALSE;
1923 }
1924 
1925 static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1926 {
1927 	sector_t sector;
1928 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1929 	struct drbd_epoch_entry *e;
1930 	struct digest_info *di = NULL;
1931 	int size, digest_size;
1932 	unsigned int fault_type;
1933 	struct p_block_req *p =
1934 		(struct p_block_req *)h;
1935 	const int brps = sizeof(*p)-sizeof(*h);
1936 
1937 	if (drbd_recv(mdev, h->payload, brps) != brps)
1938 		return FALSE;
1939 
1940 	sector = be64_to_cpu(p->sector);
1941 	size   = be32_to_cpu(p->blksize);
1942 
1943 	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
1944 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1945 				(unsigned long long)sector, size);
1946 		return FALSE;
1947 	}
1948 	if (sector + (size>>9) > capacity) {
1949 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1950 				(unsigned long long)sector, size);
1951 		return FALSE;
1952 	}
1953 
1954 	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1955 		if (__ratelimit(&drbd_ratelimit_state))
1956 			dev_err(DEV, "Can not satisfy peer's read request, "
1957 			    "no local data.\n");
1958 		drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
1959 				 P_NEG_RS_DREPLY , p);
1960 		return TRUE;
1961 	}
1962 
1963 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1964 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1965 	 * which in turn might block on the other node at this very place.  */
1966 	e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1967 	if (!e) {
1968 		put_ldev(mdev);
1969 		return FALSE;
1970 	}
1971 
1972 	e->private_bio->bi_rw = READ;
1973 	e->private_bio->bi_end_io = drbd_endio_read_sec;
1974 
1975 	switch (h->command) {
1976 	case P_DATA_REQUEST:
1977 		e->w.cb = w_e_end_data_req;
1978 		fault_type = DRBD_FAULT_DT_RD;
1979 		break;
1980 	case P_RS_DATA_REQUEST:
1981 		e->w.cb = w_e_end_rsdata_req;
1982 		fault_type = DRBD_FAULT_RS_RD;
1983 		/* Eventually this should become asynchronously. Currently it
1984 		 * blocks the whole receiver just to delay the reading of a
1985 		 * resync data block.
1986 		 * the drbd_work_queue mechanism is made for this...
1987 		 */
1988 		if (!drbd_rs_begin_io(mdev, sector)) {
1989 			/* we have been interrupted,
1990 			 * probably connection lost! */
1991 			D_ASSERT(signal_pending(current));
1992 			goto out_free_e;
1993 		}
1994 		break;
1995 
1996 	case P_OV_REPLY:
1997 	case P_CSUM_RS_REQUEST:
1998 		fault_type = DRBD_FAULT_RS_RD;
1999 		digest_size = h->length - brps ;
2000 		di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2001 		if (!di)
2002 			goto out_free_e;
2003 
2004 		di->digest_size = digest_size;
2005 		di->digest = (((char *)di)+sizeof(struct digest_info));
2006 
2007 		if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2008 			goto out_free_e;
2009 
2010 		e->block_id = (u64)(unsigned long)di;
2011 		if (h->command == P_CSUM_RS_REQUEST) {
2012 			D_ASSERT(mdev->agreed_pro_version >= 89);
2013 			e->w.cb = w_e_end_csum_rs_req;
2014 		} else if (h->command == P_OV_REPLY) {
2015 			e->w.cb = w_e_end_ov_reply;
2016 			dec_rs_pending(mdev);
2017 			break;
2018 		}
2019 
2020 		if (!drbd_rs_begin_io(mdev, sector)) {
2021 			/* we have been interrupted, probably connection lost! */
2022 			D_ASSERT(signal_pending(current));
2023 			goto out_free_e;
2024 		}
2025 		break;
2026 
2027 	case P_OV_REQUEST:
2028 		if (mdev->state.conn >= C_CONNECTED &&
2029 		    mdev->state.conn != C_VERIFY_T)
2030 			dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2031 				drbd_conn_str(mdev->state.conn));
2032 		if (mdev->ov_start_sector == ~(sector_t)0 &&
2033 		    mdev->agreed_pro_version >= 90) {
2034 			mdev->ov_start_sector = sector;
2035 			mdev->ov_position = sector;
2036 			mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2037 			dev_info(DEV, "Online Verify start sector: %llu\n",
2038 					(unsigned long long)sector);
2039 		}
2040 		e->w.cb = w_e_end_ov_req;
2041 		fault_type = DRBD_FAULT_RS_RD;
2042 		/* Eventually this should become asynchronous. Currently it
2043 		 * blocks the whole receiver just to delay the reading of a
2044 		 * resync data block.
2045 		 * the drbd_work_queue mechanism is made for this...
2046 		 */
2047 		if (!drbd_rs_begin_io(mdev, sector)) {
2048 			/* we have been interrupted,
2049 			 * probably connection lost! */
2050 			D_ASSERT(signal_pending(current));
2051 			goto out_free_e;
2052 		}
2053 		break;
2054 
2055 
2056 	default:
2057 		dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2058 		    cmdname(h->command));
2059 		fault_type = DRBD_FAULT_MAX;
2060 	}
2061 
2062 	spin_lock_irq(&mdev->req_lock);
2063 	list_add(&e->w.list, &mdev->read_ee);
2064 	spin_unlock_irq(&mdev->req_lock);
2065 
2066 	inc_unacked(mdev);
2067 
2068 	trace_drbd_ee(mdev, e, "submitting for read");
2069 	trace_drbd_bio(mdev, "Sec", e->private_bio, 0, NULL);
2070 	drbd_generic_make_request(mdev, fault_type, e->private_bio);
2071 	maybe_kick_lo(mdev);
2072 
2073 	return TRUE;
2074 
2075 out_free_e:
2076 	kfree(di);
2077 	put_ldev(mdev);
2078 	drbd_free_ee(mdev, e);
2079 	return FALSE;
2080 }
2081 
2082 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2083 {
2084 	int self, peer, rv = -100;
2085 	unsigned long ch_self, ch_peer;
2086 
2087 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2088 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2089 
2090 	ch_peer = mdev->p_uuid[UI_SIZE];
2091 	ch_self = mdev->comm_bm_set;
2092 
2093 	switch (mdev->net_conf->after_sb_0p) {
2094 	case ASB_CONSENSUS:
2095 	case ASB_DISCARD_SECONDARY:
2096 	case ASB_CALL_HELPER:
2097 		dev_err(DEV, "Configuration error.\n");
2098 		break;
2099 	case ASB_DISCONNECT:
2100 		break;
2101 	case ASB_DISCARD_YOUNGER_PRI:
2102 		if (self == 0 && peer == 1) {
2103 			rv = -1;
2104 			break;
2105 		}
2106 		if (self == 1 && peer == 0) {
2107 			rv =  1;
2108 			break;
2109 		}
2110 		/* Else fall through to one of the other strategies... */
2111 	case ASB_DISCARD_OLDER_PRI:
2112 		if (self == 0 && peer == 1) {
2113 			rv = 1;
2114 			break;
2115 		}
2116 		if (self == 1 && peer == 0) {
2117 			rv = -1;
2118 			break;
2119 		}
2120 		/* Else fall through to one of the other strategies... */
2121 		dev_warn(DEV, "Discard younger/older primary did not found a decision\n"
2122 		     "Using discard-least-changes instead\n");
2123 	case ASB_DISCARD_ZERO_CHG:
2124 		if (ch_peer == 0 && ch_self == 0) {
2125 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2126 				? -1 : 1;
2127 			break;
2128 		} else {
2129 			if (ch_peer == 0) { rv =  1; break; }
2130 			if (ch_self == 0) { rv = -1; break; }
2131 		}
2132 		if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2133 			break;
2134 	case ASB_DISCARD_LEAST_CHG:
2135 		if	(ch_self < ch_peer)
2136 			rv = -1;
2137 		else if (ch_self > ch_peer)
2138 			rv =  1;
2139 		else /* ( ch_self == ch_peer ) */
2140 		     /* Well, then use something else. */
2141 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2142 				? -1 : 1;
2143 		break;
2144 	case ASB_DISCARD_LOCAL:
2145 		rv = -1;
2146 		break;
2147 	case ASB_DISCARD_REMOTE:
2148 		rv =  1;
2149 	}
2150 
2151 	return rv;
2152 }
2153 
2154 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2155 {
2156 	int self, peer, hg, rv = -100;
2157 
2158 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2159 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2160 
2161 	switch (mdev->net_conf->after_sb_1p) {
2162 	case ASB_DISCARD_YOUNGER_PRI:
2163 	case ASB_DISCARD_OLDER_PRI:
2164 	case ASB_DISCARD_LEAST_CHG:
2165 	case ASB_DISCARD_LOCAL:
2166 	case ASB_DISCARD_REMOTE:
2167 		dev_err(DEV, "Configuration error.\n");
2168 		break;
2169 	case ASB_DISCONNECT:
2170 		break;
2171 	case ASB_CONSENSUS:
2172 		hg = drbd_asb_recover_0p(mdev);
2173 		if (hg == -1 && mdev->state.role == R_SECONDARY)
2174 			rv = hg;
2175 		if (hg == 1  && mdev->state.role == R_PRIMARY)
2176 			rv = hg;
2177 		break;
2178 	case ASB_VIOLENTLY:
2179 		rv = drbd_asb_recover_0p(mdev);
2180 		break;
2181 	case ASB_DISCARD_SECONDARY:
2182 		return mdev->state.role == R_PRIMARY ? 1 : -1;
2183 	case ASB_CALL_HELPER:
2184 		hg = drbd_asb_recover_0p(mdev);
2185 		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2186 			self = drbd_set_role(mdev, R_SECONDARY, 0);
2187 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2188 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2189 			  * we do not need to wait for the after state change work either. */
2190 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2191 			if (self != SS_SUCCESS) {
2192 				drbd_khelper(mdev, "pri-lost-after-sb");
2193 			} else {
2194 				dev_warn(DEV, "Successfully gave up primary role.\n");
2195 				rv = hg;
2196 			}
2197 		} else
2198 			rv = hg;
2199 	}
2200 
2201 	return rv;
2202 }
2203 
2204 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2205 {
2206 	int self, peer, hg, rv = -100;
2207 
2208 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2209 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2210 
2211 	switch (mdev->net_conf->after_sb_2p) {
2212 	case ASB_DISCARD_YOUNGER_PRI:
2213 	case ASB_DISCARD_OLDER_PRI:
2214 	case ASB_DISCARD_LEAST_CHG:
2215 	case ASB_DISCARD_LOCAL:
2216 	case ASB_DISCARD_REMOTE:
2217 	case ASB_CONSENSUS:
2218 	case ASB_DISCARD_SECONDARY:
2219 		dev_err(DEV, "Configuration error.\n");
2220 		break;
2221 	case ASB_VIOLENTLY:
2222 		rv = drbd_asb_recover_0p(mdev);
2223 		break;
2224 	case ASB_DISCONNECT:
2225 		break;
2226 	case ASB_CALL_HELPER:
2227 		hg = drbd_asb_recover_0p(mdev);
2228 		if (hg == -1) {
2229 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2230 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2231 			  * we do not need to wait for the after state change work either. */
2232 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2233 			if (self != SS_SUCCESS) {
2234 				drbd_khelper(mdev, "pri-lost-after-sb");
2235 			} else {
2236 				dev_warn(DEV, "Successfully gave up primary role.\n");
2237 				rv = hg;
2238 			}
2239 		} else
2240 			rv = hg;
2241 	}
2242 
2243 	return rv;
2244 }
2245 
2246 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2247 			   u64 bits, u64 flags)
2248 {
2249 	if (!uuid) {
2250 		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2251 		return;
2252 	}
2253 	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2254 	     text,
2255 	     (unsigned long long)uuid[UI_CURRENT],
2256 	     (unsigned long long)uuid[UI_BITMAP],
2257 	     (unsigned long long)uuid[UI_HISTORY_START],
2258 	     (unsigned long long)uuid[UI_HISTORY_END],
2259 	     (unsigned long long)bits,
2260 	     (unsigned long long)flags);
2261 }
2262 
2263 /*
2264   100	after split brain try auto recover
2265     2	C_SYNC_SOURCE set BitMap
2266     1	C_SYNC_SOURCE use BitMap
2267     0	no Sync
2268    -1	C_SYNC_TARGET use BitMap
2269    -2	C_SYNC_TARGET set BitMap
2270  -100	after split brain, disconnect
2271 -1000	unrelated data
2272  */
2273 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2274 {
2275 	u64 self, peer;
2276 	int i, j;
2277 
2278 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2279 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2280 
2281 	*rule_nr = 10;
2282 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2283 		return 0;
2284 
2285 	*rule_nr = 20;
2286 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2287 	     peer != UUID_JUST_CREATED)
2288 		return -2;
2289 
2290 	*rule_nr = 30;
2291 	if (self != UUID_JUST_CREATED &&
2292 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2293 		return 2;
2294 
2295 	if (self == peer) {
2296 		int rct, dc; /* roles at crash time */
2297 
2298 		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2299 
2300 			if (mdev->agreed_pro_version < 91)
2301 				return -1001;
2302 
2303 			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2304 			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2305 				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2306 				drbd_uuid_set_bm(mdev, 0UL);
2307 
2308 				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2309 					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2310 				*rule_nr = 34;
2311 			} else {
2312 				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2313 				*rule_nr = 36;
2314 			}
2315 
2316 			return 1;
2317 		}
2318 
2319 		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2320 
2321 			if (mdev->agreed_pro_version < 91)
2322 				return -1001;
2323 
2324 			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2325 			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2326 				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2327 
2328 				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2329 				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2330 				mdev->p_uuid[UI_BITMAP] = 0UL;
2331 
2332 				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2333 				*rule_nr = 35;
2334 			} else {
2335 				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2336 				*rule_nr = 37;
2337 			}
2338 
2339 			return -1;
2340 		}
2341 
2342 		/* Common power [off|failure] */
2343 		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2344 			(mdev->p_uuid[UI_FLAGS] & 2);
2345 		/* lowest bit is set when we were primary,
2346 		 * next bit (weight 2) is set when peer was primary */
2347 		*rule_nr = 40;
2348 
2349 		switch (rct) {
2350 		case 0: /* !self_pri && !peer_pri */ return 0;
2351 		case 1: /*  self_pri && !peer_pri */ return 1;
2352 		case 2: /* !self_pri &&  peer_pri */ return -1;
2353 		case 3: /*  self_pri &&  peer_pri */
2354 			dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2355 			return dc ? -1 : 1;
2356 		}
2357 	}
2358 
2359 	*rule_nr = 50;
2360 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2361 	if (self == peer)
2362 		return -1;
2363 
2364 	*rule_nr = 51;
2365 	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2366 	if (self == peer) {
2367 		self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2368 		peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2369 		if (self == peer) {
2370 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2371 			   resync as sync source modifications of the peer's UUIDs. */
2372 
2373 			if (mdev->agreed_pro_version < 91)
2374 				return -1001;
2375 
2376 			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2377 			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2378 			return -1;
2379 		}
2380 	}
2381 
2382 	*rule_nr = 60;
2383 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2384 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2385 		peer = mdev->p_uuid[i] & ~((u64)1);
2386 		if (self == peer)
2387 			return -2;
2388 	}
2389 
2390 	*rule_nr = 70;
2391 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2392 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2393 	if (self == peer)
2394 		return 1;
2395 
2396 	*rule_nr = 71;
2397 	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2398 	if (self == peer) {
2399 		self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2400 		peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2401 		if (self == peer) {
2402 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2403 			   resync as sync source modifications of our UUIDs. */
2404 
2405 			if (mdev->agreed_pro_version < 91)
2406 				return -1001;
2407 
2408 			_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2409 			_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2410 
2411 			dev_info(DEV, "Undid last start of resync:\n");
2412 
2413 			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2414 				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2415 
2416 			return 1;
2417 		}
2418 	}
2419 
2420 
2421 	*rule_nr = 80;
2422 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2423 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2424 		if (self == peer)
2425 			return 2;
2426 	}
2427 
2428 	*rule_nr = 90;
2429 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2430 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2431 	if (self == peer && self != ((u64)0))
2432 		return 100;
2433 
2434 	*rule_nr = 100;
2435 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2436 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2437 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2438 			peer = mdev->p_uuid[j] & ~((u64)1);
2439 			if (self == peer)
2440 				return -100;
2441 		}
2442 	}
2443 
2444 	return -1000;
2445 }
2446 
2447 /* drbd_sync_handshake() returns the new conn state on success, or
2448    CONN_MASK (-1) on failure.
2449  */
2450 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2451 					   enum drbd_disk_state peer_disk) __must_hold(local)
2452 {
2453 	int hg, rule_nr;
2454 	enum drbd_conns rv = C_MASK;
2455 	enum drbd_disk_state mydisk;
2456 
2457 	mydisk = mdev->state.disk;
2458 	if (mydisk == D_NEGOTIATING)
2459 		mydisk = mdev->new_state_tmp.disk;
2460 
2461 	dev_info(DEV, "drbd_sync_handshake:\n");
2462 	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2463 	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2464 		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2465 
2466 	hg = drbd_uuid_compare(mdev, &rule_nr);
2467 
2468 	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2469 
2470 	if (hg == -1000) {
2471 		dev_alert(DEV, "Unrelated data, aborting!\n");
2472 		return C_MASK;
2473 	}
2474 	if (hg == -1001) {
2475 		dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2476 		return C_MASK;
2477 	}
2478 
2479 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2480 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2481 		int f = (hg == -100) || abs(hg) == 2;
2482 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2483 		if (f)
2484 			hg = hg*2;
2485 		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2486 		     hg > 0 ? "source" : "target");
2487 	}
2488 
2489 	if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2490 		int pcount = (mdev->state.role == R_PRIMARY)
2491 			   + (peer_role == R_PRIMARY);
2492 		int forced = (hg == -100);
2493 
2494 		switch (pcount) {
2495 		case 0:
2496 			hg = drbd_asb_recover_0p(mdev);
2497 			break;
2498 		case 1:
2499 			hg = drbd_asb_recover_1p(mdev);
2500 			break;
2501 		case 2:
2502 			hg = drbd_asb_recover_2p(mdev);
2503 			break;
2504 		}
2505 		if (abs(hg) < 100) {
2506 			dev_warn(DEV, "Split-Brain detected, %d primaries, "
2507 			     "automatically solved. Sync from %s node\n",
2508 			     pcount, (hg < 0) ? "peer" : "this");
2509 			if (forced) {
2510 				dev_warn(DEV, "Doing a full sync, since"
2511 				     " UUIDs where ambiguous.\n");
2512 				hg = hg*2;
2513 			}
2514 		}
2515 	}
2516 
2517 	if (hg == -100) {
2518 		if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2519 			hg = -1;
2520 		if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2521 			hg = 1;
2522 
2523 		if (abs(hg) < 100)
2524 			dev_warn(DEV, "Split-Brain detected, manually solved. "
2525 			     "Sync from %s node\n",
2526 			     (hg < 0) ? "peer" : "this");
2527 	}
2528 
2529 	if (hg == -100) {
2530 		dev_alert(DEV, "Split-Brain detected, dropping connection!\n");
2531 		drbd_khelper(mdev, "split-brain");
2532 		return C_MASK;
2533 	}
2534 
2535 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
2536 		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2537 		return C_MASK;
2538 	}
2539 
2540 	if (hg < 0 && /* by intention we do not use mydisk here. */
2541 	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2542 		switch (mdev->net_conf->rr_conflict) {
2543 		case ASB_CALL_HELPER:
2544 			drbd_khelper(mdev, "pri-lost");
2545 			/* fall through */
2546 		case ASB_DISCONNECT:
2547 			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2548 			return C_MASK;
2549 		case ASB_VIOLENTLY:
2550 			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2551 			     "assumption\n");
2552 		}
2553 	}
2554 
2555 	if (abs(hg) >= 2) {
2556 		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2557 		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2558 			return C_MASK;
2559 	}
2560 
2561 	if (hg > 0) { /* become sync source. */
2562 		rv = C_WF_BITMAP_S;
2563 	} else if (hg < 0) { /* become sync target */
2564 		rv = C_WF_BITMAP_T;
2565 	} else {
2566 		rv = C_CONNECTED;
2567 		if (drbd_bm_total_weight(mdev)) {
2568 			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2569 			     drbd_bm_total_weight(mdev));
2570 		}
2571 	}
2572 
2573 	return rv;
2574 }
2575 
2576 /* returns 1 if invalid */
2577 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2578 {
2579 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2580 	if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2581 	    (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2582 		return 0;
2583 
2584 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2585 	if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2586 	    self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2587 		return 1;
2588 
2589 	/* everything else is valid if they are equal on both sides. */
2590 	if (peer == self)
2591 		return 0;
2592 
2593 	/* everything es is invalid. */
2594 	return 1;
2595 }
2596 
2597 static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2598 {
2599 	struct p_protocol *p = (struct p_protocol *)h;
2600 	int header_size, data_size;
2601 	int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2602 	int p_want_lose, p_two_primaries;
2603 	char p_integrity_alg[SHARED_SECRET_MAX] = "";
2604 
2605 	header_size = sizeof(*p) - sizeof(*h);
2606 	data_size   = h->length  - header_size;
2607 
2608 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2609 		return FALSE;
2610 
2611 	p_proto		= be32_to_cpu(p->protocol);
2612 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
2613 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
2614 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
2615 	p_want_lose	= be32_to_cpu(p->want_lose);
2616 	p_two_primaries = be32_to_cpu(p->two_primaries);
2617 
2618 	if (p_proto != mdev->net_conf->wire_protocol) {
2619 		dev_err(DEV, "incompatible communication protocols\n");
2620 		goto disconnect;
2621 	}
2622 
2623 	if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2624 		dev_err(DEV, "incompatible after-sb-0pri settings\n");
2625 		goto disconnect;
2626 	}
2627 
2628 	if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2629 		dev_err(DEV, "incompatible after-sb-1pri settings\n");
2630 		goto disconnect;
2631 	}
2632 
2633 	if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2634 		dev_err(DEV, "incompatible after-sb-2pri settings\n");
2635 		goto disconnect;
2636 	}
2637 
2638 	if (p_want_lose && mdev->net_conf->want_lose) {
2639 		dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2640 		goto disconnect;
2641 	}
2642 
2643 	if (p_two_primaries != mdev->net_conf->two_primaries) {
2644 		dev_err(DEV, "incompatible setting of the two-primaries options\n");
2645 		goto disconnect;
2646 	}
2647 
2648 	if (mdev->agreed_pro_version >= 87) {
2649 		unsigned char *my_alg = mdev->net_conf->integrity_alg;
2650 
2651 		if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2652 			return FALSE;
2653 
2654 		p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2655 		if (strcmp(p_integrity_alg, my_alg)) {
2656 			dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2657 			goto disconnect;
2658 		}
2659 		dev_info(DEV, "data-integrity-alg: %s\n",
2660 		     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2661 	}
2662 
2663 	return TRUE;
2664 
2665 disconnect:
2666 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2667 	return FALSE;
2668 }
2669 
2670 /* helper function
2671  * input: alg name, feature name
2672  * return: NULL (alg name was "")
2673  *         ERR_PTR(error) if something goes wrong
2674  *         or the crypto hash ptr, if it worked out ok. */
2675 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2676 		const char *alg, const char *name)
2677 {
2678 	struct crypto_hash *tfm;
2679 
2680 	if (!alg[0])
2681 		return NULL;
2682 
2683 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2684 	if (IS_ERR(tfm)) {
2685 		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2686 			alg, name, PTR_ERR(tfm));
2687 		return tfm;
2688 	}
2689 	if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2690 		crypto_free_hash(tfm);
2691 		dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2692 		return ERR_PTR(-EINVAL);
2693 	}
2694 	return tfm;
2695 }
2696 
2697 static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2698 {
2699 	int ok = TRUE;
2700 	struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2701 	unsigned int header_size, data_size, exp_max_sz;
2702 	struct crypto_hash *verify_tfm = NULL;
2703 	struct crypto_hash *csums_tfm = NULL;
2704 	const int apv = mdev->agreed_pro_version;
2705 
2706 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2707 		    : apv == 88 ? sizeof(struct p_rs_param)
2708 					+ SHARED_SECRET_MAX
2709 		    : /* 89 */    sizeof(struct p_rs_param_89);
2710 
2711 	if (h->length > exp_max_sz) {
2712 		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2713 		    h->length, exp_max_sz);
2714 		return FALSE;
2715 	}
2716 
2717 	if (apv <= 88) {
2718 		header_size = sizeof(struct p_rs_param) - sizeof(*h);
2719 		data_size   = h->length  - header_size;
2720 	} else /* apv >= 89 */ {
2721 		header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2722 		data_size   = h->length  - header_size;
2723 		D_ASSERT(data_size == 0);
2724 	}
2725 
2726 	/* initialize verify_alg and csums_alg */
2727 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2728 
2729 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2730 		return FALSE;
2731 
2732 	mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2733 
2734 	if (apv >= 88) {
2735 		if (apv == 88) {
2736 			if (data_size > SHARED_SECRET_MAX) {
2737 				dev_err(DEV, "verify-alg too long, "
2738 				    "peer wants %u, accepting only %u byte\n",
2739 						data_size, SHARED_SECRET_MAX);
2740 				return FALSE;
2741 			}
2742 
2743 			if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2744 				return FALSE;
2745 
2746 			/* we expect NUL terminated string */
2747 			/* but just in case someone tries to be evil */
2748 			D_ASSERT(p->verify_alg[data_size-1] == 0);
2749 			p->verify_alg[data_size-1] = 0;
2750 
2751 		} else /* apv >= 89 */ {
2752 			/* we still expect NUL terminated strings */
2753 			/* but just in case someone tries to be evil */
2754 			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2755 			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2756 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2757 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2758 		}
2759 
2760 		if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2761 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2762 				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2763 				    mdev->sync_conf.verify_alg, p->verify_alg);
2764 				goto disconnect;
2765 			}
2766 			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2767 					p->verify_alg, "verify-alg");
2768 			if (IS_ERR(verify_tfm)) {
2769 				verify_tfm = NULL;
2770 				goto disconnect;
2771 			}
2772 		}
2773 
2774 		if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2775 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2776 				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2777 				    mdev->sync_conf.csums_alg, p->csums_alg);
2778 				goto disconnect;
2779 			}
2780 			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2781 					p->csums_alg, "csums-alg");
2782 			if (IS_ERR(csums_tfm)) {
2783 				csums_tfm = NULL;
2784 				goto disconnect;
2785 			}
2786 		}
2787 
2788 
2789 		spin_lock(&mdev->peer_seq_lock);
2790 		/* lock against drbd_nl_syncer_conf() */
2791 		if (verify_tfm) {
2792 			strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2793 			mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2794 			crypto_free_hash(mdev->verify_tfm);
2795 			mdev->verify_tfm = verify_tfm;
2796 			dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2797 		}
2798 		if (csums_tfm) {
2799 			strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2800 			mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2801 			crypto_free_hash(mdev->csums_tfm);
2802 			mdev->csums_tfm = csums_tfm;
2803 			dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2804 		}
2805 		spin_unlock(&mdev->peer_seq_lock);
2806 	}
2807 
2808 	return ok;
2809 disconnect:
2810 	/* just for completeness: actually not needed,
2811 	 * as this is not reached if csums_tfm was ok. */
2812 	crypto_free_hash(csums_tfm);
2813 	/* but free the verify_tfm again, if csums_tfm did not work out */
2814 	crypto_free_hash(verify_tfm);
2815 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2816 	return FALSE;
2817 }
2818 
2819 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2820 {
2821 	/* sorry, we currently have no working implementation
2822 	 * of distributed TCQ */
2823 }
2824 
2825 /* warn if the arguments differ by more than 12.5% */
2826 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2827 	const char *s, sector_t a, sector_t b)
2828 {
2829 	sector_t d;
2830 	if (a == 0 || b == 0)
2831 		return;
2832 	d = (a > b) ? (a - b) : (b - a);
2833 	if (d > (a>>3) || d > (b>>3))
2834 		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2835 		     (unsigned long long)a, (unsigned long long)b);
2836 }
2837 
2838 static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2839 {
2840 	struct p_sizes *p = (struct p_sizes *)h;
2841 	enum determine_dev_size dd = unchanged;
2842 	unsigned int max_seg_s;
2843 	sector_t p_size, p_usize, my_usize;
2844 	int ldsc = 0; /* local disk size changed */
2845 	enum drbd_conns nconn;
2846 
2847 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2848 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
2849 		return FALSE;
2850 
2851 	p_size = be64_to_cpu(p->d_size);
2852 	p_usize = be64_to_cpu(p->u_size);
2853 
2854 	if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2855 		dev_err(DEV, "some backing storage is needed\n");
2856 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2857 		return FALSE;
2858 	}
2859 
2860 	/* just store the peer's disk size for now.
2861 	 * we still need to figure out whether we accept that. */
2862 	mdev->p_size = p_size;
2863 
2864 #define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2865 	if (get_ldev(mdev)) {
2866 		warn_if_differ_considerably(mdev, "lower level device sizes",
2867 			   p_size, drbd_get_max_capacity(mdev->ldev));
2868 		warn_if_differ_considerably(mdev, "user requested size",
2869 					    p_usize, mdev->ldev->dc.disk_size);
2870 
2871 		/* if this is the first connect, or an otherwise expected
2872 		 * param exchange, choose the minimum */
2873 		if (mdev->state.conn == C_WF_REPORT_PARAMS)
2874 			p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2875 					     p_usize);
2876 
2877 		my_usize = mdev->ldev->dc.disk_size;
2878 
2879 		if (mdev->ldev->dc.disk_size != p_usize) {
2880 			mdev->ldev->dc.disk_size = p_usize;
2881 			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2882 			     (unsigned long)mdev->ldev->dc.disk_size);
2883 		}
2884 
2885 		/* Never shrink a device with usable data during connect.
2886 		   But allow online shrinking if we are connected. */
2887 		if (drbd_new_dev_size(mdev, mdev->ldev) <
2888 		   drbd_get_capacity(mdev->this_bdev) &&
2889 		   mdev->state.disk >= D_OUTDATED &&
2890 		   mdev->state.conn < C_CONNECTED) {
2891 			dev_err(DEV, "The peer's disk size is too small!\n");
2892 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2893 			mdev->ldev->dc.disk_size = my_usize;
2894 			put_ldev(mdev);
2895 			return FALSE;
2896 		}
2897 		put_ldev(mdev);
2898 	}
2899 #undef min_not_zero
2900 
2901 	if (get_ldev(mdev)) {
2902 		dd = drbd_determin_dev_size(mdev);
2903 		put_ldev(mdev);
2904 		if (dd == dev_size_error)
2905 			return FALSE;
2906 		drbd_md_sync(mdev);
2907 	} else {
2908 		/* I am diskless, need to accept the peer's size. */
2909 		drbd_set_my_capacity(mdev, p_size);
2910 	}
2911 
2912 	if (mdev->p_uuid && mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
2913 		nconn = drbd_sync_handshake(mdev,
2914 				mdev->state.peer, mdev->state.pdsk);
2915 		put_ldev(mdev);
2916 
2917 		if (nconn == C_MASK) {
2918 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2919 			return FALSE;
2920 		}
2921 
2922 		if (drbd_request_state(mdev, NS(conn, nconn)) < SS_SUCCESS) {
2923 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2924 			return FALSE;
2925 		}
2926 	}
2927 
2928 	if (get_ldev(mdev)) {
2929 		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
2930 			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
2931 			ldsc = 1;
2932 		}
2933 
2934 		max_seg_s = be32_to_cpu(p->max_segment_size);
2935 		if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
2936 			drbd_setup_queue_param(mdev, max_seg_s);
2937 
2938 		drbd_setup_order_type(mdev, be32_to_cpu(p->queue_order_type));
2939 		put_ldev(mdev);
2940 	}
2941 
2942 	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
2943 		if (be64_to_cpu(p->c_size) !=
2944 		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
2945 			/* we have different sizes, probably peer
2946 			 * needs to know my new size... */
2947 			drbd_send_sizes(mdev, 0);
2948 		}
2949 		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
2950 		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
2951 			if (mdev->state.pdsk >= D_INCONSISTENT &&
2952 			    mdev->state.disk >= D_INCONSISTENT)
2953 				resync_after_online_grow(mdev);
2954 			else
2955 				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
2956 		}
2957 	}
2958 
2959 	return TRUE;
2960 }
2961 
2962 static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
2963 {
2964 	struct p_uuids *p = (struct p_uuids *)h;
2965 	u64 *p_uuid;
2966 	int i;
2967 
2968 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2969 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
2970 		return FALSE;
2971 
2972 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
2973 
2974 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
2975 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
2976 
2977 	kfree(mdev->p_uuid);
2978 	mdev->p_uuid = p_uuid;
2979 
2980 	if (mdev->state.conn < C_CONNECTED &&
2981 	    mdev->state.disk < D_INCONSISTENT &&
2982 	    mdev->state.role == R_PRIMARY &&
2983 	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
2984 		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
2985 		    (unsigned long long)mdev->ed_uuid);
2986 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2987 		return FALSE;
2988 	}
2989 
2990 	if (get_ldev(mdev)) {
2991 		int skip_initial_sync =
2992 			mdev->state.conn == C_CONNECTED &&
2993 			mdev->agreed_pro_version >= 90 &&
2994 			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
2995 			(p_uuid[UI_FLAGS] & 8);
2996 		if (skip_initial_sync) {
2997 			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
2998 			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
2999 					"clear_n_write from receive_uuids");
3000 			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3001 			_drbd_uuid_set(mdev, UI_BITMAP, 0);
3002 			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3003 					CS_VERBOSE, NULL);
3004 			drbd_md_sync(mdev);
3005 		}
3006 		put_ldev(mdev);
3007 	}
3008 
3009 	/* Before we test for the disk state, we should wait until an eventually
3010 	   ongoing cluster wide state change is finished. That is important if
3011 	   we are primary and are detaching from our disk. We need to see the
3012 	   new disk state... */
3013 	wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3014 	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3015 		drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3016 
3017 	return TRUE;
3018 }
3019 
3020 /**
3021  * convert_state() - Converts the peer's view of the cluster state to our point of view
3022  * @ps:		The state as seen by the peer.
3023  */
3024 static union drbd_state convert_state(union drbd_state ps)
3025 {
3026 	union drbd_state ms;
3027 
3028 	static enum drbd_conns c_tab[] = {
3029 		[C_CONNECTED] = C_CONNECTED,
3030 
3031 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3032 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3033 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3034 		[C_VERIFY_S]       = C_VERIFY_T,
3035 		[C_MASK]   = C_MASK,
3036 	};
3037 
3038 	ms.i = ps.i;
3039 
3040 	ms.conn = c_tab[ps.conn];
3041 	ms.peer = ps.role;
3042 	ms.role = ps.peer;
3043 	ms.pdsk = ps.disk;
3044 	ms.disk = ps.pdsk;
3045 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3046 
3047 	return ms;
3048 }
3049 
3050 static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3051 {
3052 	struct p_req_state *p = (struct p_req_state *)h;
3053 	union drbd_state mask, val;
3054 	int rv;
3055 
3056 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3057 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3058 		return FALSE;
3059 
3060 	mask.i = be32_to_cpu(p->mask);
3061 	val.i = be32_to_cpu(p->val);
3062 
3063 	if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3064 	    test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3065 		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3066 		return TRUE;
3067 	}
3068 
3069 	mask = convert_state(mask);
3070 	val = convert_state(val);
3071 
3072 	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3073 
3074 	drbd_send_sr_reply(mdev, rv);
3075 	drbd_md_sync(mdev);
3076 
3077 	return TRUE;
3078 }
3079 
3080 static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3081 {
3082 	struct p_state *p = (struct p_state *)h;
3083 	enum drbd_conns nconn, oconn;
3084 	union drbd_state ns, peer_state;
3085 	enum drbd_disk_state real_peer_disk;
3086 	int rv;
3087 
3088 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3089 		return FALSE;
3090 
3091 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3092 		return FALSE;
3093 
3094 	peer_state.i = be32_to_cpu(p->state);
3095 
3096 	real_peer_disk = peer_state.disk;
3097 	if (peer_state.disk == D_NEGOTIATING) {
3098 		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3099 		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3100 	}
3101 
3102 	spin_lock_irq(&mdev->req_lock);
3103  retry:
3104 	oconn = nconn = mdev->state.conn;
3105 	spin_unlock_irq(&mdev->req_lock);
3106 
3107 	if (nconn == C_WF_REPORT_PARAMS)
3108 		nconn = C_CONNECTED;
3109 
3110 	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3111 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3112 		int cr; /* consider resync */
3113 
3114 		/* if we established a new connection */
3115 		cr  = (oconn < C_CONNECTED);
3116 		/* if we had an established connection
3117 		 * and one of the nodes newly attaches a disk */
3118 		cr |= (oconn == C_CONNECTED &&
3119 		       (peer_state.disk == D_NEGOTIATING ||
3120 			mdev->state.disk == D_NEGOTIATING));
3121 		/* if we have both been inconsistent, and the peer has been
3122 		 * forced to be UpToDate with --overwrite-data */
3123 		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3124 		/* if we had been plain connected, and the admin requested to
3125 		 * start a sync by "invalidate" or "invalidate-remote" */
3126 		cr |= (oconn == C_CONNECTED &&
3127 				(peer_state.conn >= C_STARTING_SYNC_S &&
3128 				 peer_state.conn <= C_WF_BITMAP_T));
3129 
3130 		if (cr)
3131 			nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3132 
3133 		put_ldev(mdev);
3134 		if (nconn == C_MASK) {
3135 			if (mdev->state.disk == D_NEGOTIATING) {
3136 				drbd_force_state(mdev, NS(disk, D_DISKLESS));
3137 				nconn = C_CONNECTED;
3138 			} else if (peer_state.disk == D_NEGOTIATING) {
3139 				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3140 				peer_state.disk = D_DISKLESS;
3141 			} else {
3142 				D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3143 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3144 				return FALSE;
3145 			}
3146 		}
3147 	}
3148 
3149 	spin_lock_irq(&mdev->req_lock);
3150 	if (mdev->state.conn != oconn)
3151 		goto retry;
3152 	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3153 	ns.i = mdev->state.i;
3154 	ns.conn = nconn;
3155 	ns.peer = peer_state.role;
3156 	ns.pdsk = real_peer_disk;
3157 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3158 	if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3159 		ns.disk = mdev->new_state_tmp.disk;
3160 
3161 	rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3162 	ns = mdev->state;
3163 	spin_unlock_irq(&mdev->req_lock);
3164 
3165 	if (rv < SS_SUCCESS) {
3166 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3167 		return FALSE;
3168 	}
3169 
3170 	if (oconn > C_WF_REPORT_PARAMS) {
3171 		if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3172 		    peer_state.disk != D_NEGOTIATING ) {
3173 			/* we want resync, peer has not yet decided to sync... */
3174 			/* Nowadays only used when forcing a node into primary role and
3175 			   setting its disk to UpToDate with that */
3176 			drbd_send_uuids(mdev);
3177 			drbd_send_state(mdev);
3178 		}
3179 	}
3180 
3181 	mdev->net_conf->want_lose = 0;
3182 
3183 	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3184 
3185 	return TRUE;
3186 }
3187 
3188 static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3189 {
3190 	struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3191 
3192 	wait_event(mdev->misc_wait,
3193 		   mdev->state.conn == C_WF_SYNC_UUID ||
3194 		   mdev->state.conn < C_CONNECTED ||
3195 		   mdev->state.disk < D_NEGOTIATING);
3196 
3197 	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3198 
3199 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3200 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3201 		return FALSE;
3202 
3203 	/* Here the _drbd_uuid_ functions are right, current should
3204 	   _not_ be rotated into the history */
3205 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3206 		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3207 		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3208 
3209 		drbd_start_resync(mdev, C_SYNC_TARGET);
3210 
3211 		put_ldev(mdev);
3212 	} else
3213 		dev_err(DEV, "Ignoring SyncUUID packet!\n");
3214 
3215 	return TRUE;
3216 }
3217 
3218 enum receive_bitmap_ret { OK, DONE, FAILED };
3219 
3220 static enum receive_bitmap_ret
3221 receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3222 	unsigned long *buffer, struct bm_xfer_ctx *c)
3223 {
3224 	unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3225 	unsigned want = num_words * sizeof(long);
3226 
3227 	if (want != h->length) {
3228 		dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3229 		return FAILED;
3230 	}
3231 	if (want == 0)
3232 		return DONE;
3233 	if (drbd_recv(mdev, buffer, want) != want)
3234 		return FAILED;
3235 
3236 	drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3237 
3238 	c->word_offset += num_words;
3239 	c->bit_offset = c->word_offset * BITS_PER_LONG;
3240 	if (c->bit_offset > c->bm_bits)
3241 		c->bit_offset = c->bm_bits;
3242 
3243 	return OK;
3244 }
3245 
3246 static enum receive_bitmap_ret
3247 recv_bm_rle_bits(struct drbd_conf *mdev,
3248 		struct p_compressed_bm *p,
3249 		struct bm_xfer_ctx *c)
3250 {
3251 	struct bitstream bs;
3252 	u64 look_ahead;
3253 	u64 rl;
3254 	u64 tmp;
3255 	unsigned long s = c->bit_offset;
3256 	unsigned long e;
3257 	int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3258 	int toggle = DCBP_get_start(p);
3259 	int have;
3260 	int bits;
3261 
3262 	bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3263 
3264 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
3265 	if (bits < 0)
3266 		return FAILED;
3267 
3268 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
3269 		bits = vli_decode_bits(&rl, look_ahead);
3270 		if (bits <= 0)
3271 			return FAILED;
3272 
3273 		if (toggle) {
3274 			e = s + rl -1;
3275 			if (e >= c->bm_bits) {
3276 				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3277 				return FAILED;
3278 			}
3279 			_drbd_bm_set_bits(mdev, s, e);
3280 		}
3281 
3282 		if (have < bits) {
3283 			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3284 				have, bits, look_ahead,
3285 				(unsigned int)(bs.cur.b - p->code),
3286 				(unsigned int)bs.buf_len);
3287 			return FAILED;
3288 		}
3289 		look_ahead >>= bits;
3290 		have -= bits;
3291 
3292 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3293 		if (bits < 0)
3294 			return FAILED;
3295 		look_ahead |= tmp << have;
3296 		have += bits;
3297 	}
3298 
3299 	c->bit_offset = s;
3300 	bm_xfer_ctx_bit_to_word_offset(c);
3301 
3302 	return (s == c->bm_bits) ? DONE : OK;
3303 }
3304 
3305 static enum receive_bitmap_ret
3306 decode_bitmap_c(struct drbd_conf *mdev,
3307 		struct p_compressed_bm *p,
3308 		struct bm_xfer_ctx *c)
3309 {
3310 	if (DCBP_get_code(p) == RLE_VLI_Bits)
3311 		return recv_bm_rle_bits(mdev, p, c);
3312 
3313 	/* other variants had been implemented for evaluation,
3314 	 * but have been dropped as this one turned out to be "best"
3315 	 * during all our tests. */
3316 
3317 	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3318 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3319 	return FAILED;
3320 }
3321 
3322 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3323 		const char *direction, struct bm_xfer_ctx *c)
3324 {
3325 	/* what would it take to transfer it "plaintext" */
3326 	unsigned plain = sizeof(struct p_header) *
3327 		((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3328 		+ c->bm_words * sizeof(long);
3329 	unsigned total = c->bytes[0] + c->bytes[1];
3330 	unsigned r;
3331 
3332 	/* total can not be zero. but just in case: */
3333 	if (total == 0)
3334 		return;
3335 
3336 	/* don't report if not compressed */
3337 	if (total >= plain)
3338 		return;
3339 
3340 	/* total < plain. check for overflow, still */
3341 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3342 		                    : (1000 * total / plain);
3343 
3344 	if (r > 1000)
3345 		r = 1000;
3346 
3347 	r = 1000 - r;
3348 	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3349 	     "total %u; compression: %u.%u%%\n",
3350 			direction,
3351 			c->bytes[1], c->packets[1],
3352 			c->bytes[0], c->packets[0],
3353 			total, r/10, r % 10);
3354 }
3355 
3356 /* Since we are processing the bitfield from lower addresses to higher,
3357    it does not matter if the process it in 32 bit chunks or 64 bit
3358    chunks as long as it is little endian. (Understand it as byte stream,
3359    beginning with the lowest byte...) If we would use big endian
3360    we would need to process it from the highest address to the lowest,
3361    in order to be agnostic to the 32 vs 64 bits issue.
3362 
3363    returns 0 on failure, 1 if we successfully received it. */
3364 static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3365 {
3366 	struct bm_xfer_ctx c;
3367 	void *buffer;
3368 	enum receive_bitmap_ret ret;
3369 	int ok = FALSE;
3370 
3371 	wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3372 
3373 	drbd_bm_lock(mdev, "receive bitmap");
3374 
3375 	/* maybe we should use some per thread scratch page,
3376 	 * and allocate that during initial device creation? */
3377 	buffer	 = (unsigned long *) __get_free_page(GFP_NOIO);
3378 	if (!buffer) {
3379 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3380 		goto out;
3381 	}
3382 
3383 	c = (struct bm_xfer_ctx) {
3384 		.bm_bits = drbd_bm_bits(mdev),
3385 		.bm_words = drbd_bm_words(mdev),
3386 	};
3387 
3388 	do {
3389 		if (h->command == P_BITMAP) {
3390 			ret = receive_bitmap_plain(mdev, h, buffer, &c);
3391 		} else if (h->command == P_COMPRESSED_BITMAP) {
3392 			/* MAYBE: sanity check that we speak proto >= 90,
3393 			 * and the feature is enabled! */
3394 			struct p_compressed_bm *p;
3395 
3396 			if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3397 				dev_err(DEV, "ReportCBitmap packet too large\n");
3398 				goto out;
3399 			}
3400 			/* use the page buff */
3401 			p = buffer;
3402 			memcpy(p, h, sizeof(*h));
3403 			if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3404 				goto out;
3405 			if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3406 				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3407 				return FAILED;
3408 			}
3409 			ret = decode_bitmap_c(mdev, p, &c);
3410 		} else {
3411 			dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3412 			goto out;
3413 		}
3414 
3415 		c.packets[h->command == P_BITMAP]++;
3416 		c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3417 
3418 		if (ret != OK)
3419 			break;
3420 
3421 		if (!drbd_recv_header(mdev, h))
3422 			goto out;
3423 	} while (ret == OK);
3424 	if (ret == FAILED)
3425 		goto out;
3426 
3427 	INFO_bm_xfer_stats(mdev, "receive", &c);
3428 
3429 	if (mdev->state.conn == C_WF_BITMAP_T) {
3430 		ok = !drbd_send_bitmap(mdev);
3431 		if (!ok)
3432 			goto out;
3433 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3434 		ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3435 		D_ASSERT(ok == SS_SUCCESS);
3436 	} else if (mdev->state.conn != C_WF_BITMAP_S) {
3437 		/* admin may have requested C_DISCONNECTING,
3438 		 * other threads may have noticed network errors */
3439 		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3440 		    drbd_conn_str(mdev->state.conn));
3441 	}
3442 
3443 	ok = TRUE;
3444  out:
3445 	drbd_bm_unlock(mdev);
3446 	if (ok && mdev->state.conn == C_WF_BITMAP_S)
3447 		drbd_start_resync(mdev, C_SYNC_SOURCE);
3448 	free_page((unsigned long) buffer);
3449 	return ok;
3450 }
3451 
3452 static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3453 {
3454 	/* TODO zero copy sink :) */
3455 	static char sink[128];
3456 	int size, want, r;
3457 
3458 	dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3459 	     h->command, h->length);
3460 
3461 	size = h->length;
3462 	while (size > 0) {
3463 		want = min_t(int, size, sizeof(sink));
3464 		r = drbd_recv(mdev, sink, want);
3465 		ERR_IF(r <= 0) break;
3466 		size -= r;
3467 	}
3468 	return size == 0;
3469 }
3470 
3471 static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3472 {
3473 	if (mdev->state.disk >= D_INCONSISTENT)
3474 		drbd_kick_lo(mdev);
3475 
3476 	/* Make sure we've acked all the TCP data associated
3477 	 * with the data requests being unplugged */
3478 	drbd_tcp_quickack(mdev->data.socket);
3479 
3480 	return TRUE;
3481 }
3482 
3483 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3484 
3485 static drbd_cmd_handler_f drbd_default_handler[] = {
3486 	[P_DATA]	    = receive_Data,
3487 	[P_DATA_REPLY]	    = receive_DataReply,
3488 	[P_RS_DATA_REPLY]   = receive_RSDataReply,
3489 	[P_BARRIER]	    = receive_Barrier,
3490 	[P_BITMAP]	    = receive_bitmap,
3491 	[P_COMPRESSED_BITMAP]    = receive_bitmap,
3492 	[P_UNPLUG_REMOTE]   = receive_UnplugRemote,
3493 	[P_DATA_REQUEST]    = receive_DataRequest,
3494 	[P_RS_DATA_REQUEST] = receive_DataRequest,
3495 	[P_SYNC_PARAM]	    = receive_SyncParam,
3496 	[P_SYNC_PARAM89]	   = receive_SyncParam,
3497 	[P_PROTOCOL]        = receive_protocol,
3498 	[P_UUIDS]	    = receive_uuids,
3499 	[P_SIZES]	    = receive_sizes,
3500 	[P_STATE]	    = receive_state,
3501 	[P_STATE_CHG_REQ]   = receive_req_state,
3502 	[P_SYNC_UUID]       = receive_sync_uuid,
3503 	[P_OV_REQUEST]      = receive_DataRequest,
3504 	[P_OV_REPLY]        = receive_DataRequest,
3505 	[P_CSUM_RS_REQUEST]    = receive_DataRequest,
3506 	/* anything missing from this table is in
3507 	 * the asender_tbl, see get_asender_cmd */
3508 	[P_MAX_CMD]	    = NULL,
3509 };
3510 
3511 static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3512 static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3513 
3514 static void drbdd(struct drbd_conf *mdev)
3515 {
3516 	drbd_cmd_handler_f handler;
3517 	struct p_header *header = &mdev->data.rbuf.header;
3518 
3519 	while (get_t_state(&mdev->receiver) == Running) {
3520 		drbd_thread_current_set_cpu(mdev);
3521 		if (!drbd_recv_header(mdev, header))
3522 			break;
3523 
3524 		if (header->command < P_MAX_CMD)
3525 			handler = drbd_cmd_handler[header->command];
3526 		else if (P_MAY_IGNORE < header->command
3527 		     && header->command < P_MAX_OPT_CMD)
3528 			handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3529 		else if (header->command > P_MAX_OPT_CMD)
3530 			handler = receive_skip;
3531 		else
3532 			handler = NULL;
3533 
3534 		if (unlikely(!handler)) {
3535 			dev_err(DEV, "unknown packet type %d, l: %d!\n",
3536 			    header->command, header->length);
3537 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3538 			break;
3539 		}
3540 		if (unlikely(!handler(mdev, header))) {
3541 			dev_err(DEV, "error receiving %s, l: %d!\n",
3542 			    cmdname(header->command), header->length);
3543 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3544 			break;
3545 		}
3546 
3547 		trace_drbd_packet(mdev, mdev->data.socket, 2, &mdev->data.rbuf,
3548 				__FILE__, __LINE__);
3549 	}
3550 }
3551 
3552 static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3553 {
3554 	struct hlist_head *slot;
3555 	struct hlist_node *pos;
3556 	struct hlist_node *tmp;
3557 	struct drbd_request *req;
3558 	int i;
3559 
3560 	/*
3561 	 * Application READ requests
3562 	 */
3563 	spin_lock_irq(&mdev->req_lock);
3564 	for (i = 0; i < APP_R_HSIZE; i++) {
3565 		slot = mdev->app_reads_hash+i;
3566 		hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3567 			/* it may (but should not any longer!)
3568 			 * be on the work queue; if that assert triggers,
3569 			 * we need to also grab the
3570 			 * spin_lock_irq(&mdev->data.work.q_lock);
3571 			 * and list_del_init here. */
3572 			D_ASSERT(list_empty(&req->w.list));
3573 			/* It would be nice to complete outside of spinlock.
3574 			 * But this is easier for now. */
3575 			_req_mod(req, connection_lost_while_pending);
3576 		}
3577 	}
3578 	for (i = 0; i < APP_R_HSIZE; i++)
3579 		if (!hlist_empty(mdev->app_reads_hash+i))
3580 			dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3581 				"%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3582 
3583 	memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3584 	spin_unlock_irq(&mdev->req_lock);
3585 }
3586 
3587 void drbd_flush_workqueue(struct drbd_conf *mdev)
3588 {
3589 	struct drbd_wq_barrier barr;
3590 
3591 	barr.w.cb = w_prev_work_done;
3592 	init_completion(&barr.done);
3593 	drbd_queue_work(&mdev->data.work, &barr.w);
3594 	wait_for_completion(&barr.done);
3595 }
3596 
3597 static void drbd_disconnect(struct drbd_conf *mdev)
3598 {
3599 	enum drbd_fencing_p fp;
3600 	union drbd_state os, ns;
3601 	int rv = SS_UNKNOWN_ERROR;
3602 	unsigned int i;
3603 
3604 	if (mdev->state.conn == C_STANDALONE)
3605 		return;
3606 	if (mdev->state.conn >= C_WF_CONNECTION)
3607 		dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3608 				drbd_conn_str(mdev->state.conn));
3609 
3610 	/* asender does not clean up anything. it must not interfere, either */
3611 	drbd_thread_stop(&mdev->asender);
3612 
3613 	mutex_lock(&mdev->data.mutex);
3614 	drbd_free_sock(mdev);
3615 	mutex_unlock(&mdev->data.mutex);
3616 
3617 	spin_lock_irq(&mdev->req_lock);
3618 	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3619 	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3620 	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3621 	spin_unlock_irq(&mdev->req_lock);
3622 
3623 	/* We do not have data structures that would allow us to
3624 	 * get the rs_pending_cnt down to 0 again.
3625 	 *  * On C_SYNC_TARGET we do not have any data structures describing
3626 	 *    the pending RSDataRequest's we have sent.
3627 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
3628 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3629 	 *  And no, it is not the sum of the reference counts in the
3630 	 *  resync_LRU. The resync_LRU tracks the whole operation including
3631 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3632 	 *  on the fly. */
3633 	drbd_rs_cancel_all(mdev);
3634 	mdev->rs_total = 0;
3635 	mdev->rs_failed = 0;
3636 	atomic_set(&mdev->rs_pending_cnt, 0);
3637 	wake_up(&mdev->misc_wait);
3638 
3639 	/* make sure syncer is stopped and w_resume_next_sg queued */
3640 	del_timer_sync(&mdev->resync_timer);
3641 	set_bit(STOP_SYNC_TIMER, &mdev->flags);
3642 	resync_timer_fn((unsigned long)mdev);
3643 
3644 	/* so we can be sure that all remote or resync reads
3645 	 * made it at least to net_ee */
3646 	wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
3647 
3648 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3649 	 * w_make_resync_request etc. which may still be on the worker queue
3650 	 * to be "canceled" */
3651 	drbd_flush_workqueue(mdev);
3652 
3653 	/* This also does reclaim_net_ee().  If we do this too early, we might
3654 	 * miss some resync ee and pages.*/
3655 	drbd_process_done_ee(mdev);
3656 
3657 	kfree(mdev->p_uuid);
3658 	mdev->p_uuid = NULL;
3659 
3660 	if (!mdev->state.susp)
3661 		tl_clear(mdev);
3662 
3663 	drbd_fail_pending_reads(mdev);
3664 
3665 	dev_info(DEV, "Connection closed\n");
3666 
3667 	drbd_md_sync(mdev);
3668 
3669 	fp = FP_DONT_CARE;
3670 	if (get_ldev(mdev)) {
3671 		fp = mdev->ldev->dc.fencing;
3672 		put_ldev(mdev);
3673 	}
3674 
3675 	if (mdev->state.role == R_PRIMARY) {
3676 		if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3677 			enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3678 			drbd_request_state(mdev, NS(pdsk, nps));
3679 		}
3680 	}
3681 
3682 	spin_lock_irq(&mdev->req_lock);
3683 	os = mdev->state;
3684 	if (os.conn >= C_UNCONNECTED) {
3685 		/* Do not restart in case we are C_DISCONNECTING */
3686 		ns = os;
3687 		ns.conn = C_UNCONNECTED;
3688 		rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3689 	}
3690 	spin_unlock_irq(&mdev->req_lock);
3691 
3692 	if (os.conn == C_DISCONNECTING) {
3693 		struct hlist_head *h;
3694 		wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3695 
3696 		/* we must not free the tl_hash
3697 		 * while application io is still on the fly */
3698 		wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3699 
3700 		spin_lock_irq(&mdev->req_lock);
3701 		/* paranoia code */
3702 		for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3703 			if (h->first)
3704 				dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3705 						(int)(h - mdev->ee_hash), h->first);
3706 		kfree(mdev->ee_hash);
3707 		mdev->ee_hash = NULL;
3708 		mdev->ee_hash_s = 0;
3709 
3710 		/* paranoia code */
3711 		for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3712 			if (h->first)
3713 				dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3714 						(int)(h - mdev->tl_hash), h->first);
3715 		kfree(mdev->tl_hash);
3716 		mdev->tl_hash = NULL;
3717 		mdev->tl_hash_s = 0;
3718 		spin_unlock_irq(&mdev->req_lock);
3719 
3720 		crypto_free_hash(mdev->cram_hmac_tfm);
3721 		mdev->cram_hmac_tfm = NULL;
3722 
3723 		kfree(mdev->net_conf);
3724 		mdev->net_conf = NULL;
3725 		drbd_request_state(mdev, NS(conn, C_STANDALONE));
3726 	}
3727 
3728 	/* tcp_close and release of sendpage pages can be deferred.  I don't
3729 	 * want to use SO_LINGER, because apparently it can be deferred for
3730 	 * more than 20 seconds (longest time I checked).
3731 	 *
3732 	 * Actually we don't care for exactly when the network stack does its
3733 	 * put_page(), but release our reference on these pages right here.
3734 	 */
3735 	i = drbd_release_ee(mdev, &mdev->net_ee);
3736 	if (i)
3737 		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3738 	i = atomic_read(&mdev->pp_in_use);
3739 	if (i)
3740 		dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
3741 
3742 	D_ASSERT(list_empty(&mdev->read_ee));
3743 	D_ASSERT(list_empty(&mdev->active_ee));
3744 	D_ASSERT(list_empty(&mdev->sync_ee));
3745 	D_ASSERT(list_empty(&mdev->done_ee));
3746 
3747 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3748 	atomic_set(&mdev->current_epoch->epoch_size, 0);
3749 	D_ASSERT(list_empty(&mdev->current_epoch->list));
3750 }
3751 
3752 /*
3753  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3754  * we can agree on is stored in agreed_pro_version.
3755  *
3756  * feature flags and the reserved array should be enough room for future
3757  * enhancements of the handshake protocol, and possible plugins...
3758  *
3759  * for now, they are expected to be zero, but ignored.
3760  */
3761 static int drbd_send_handshake(struct drbd_conf *mdev)
3762 {
3763 	/* ASSERT current == mdev->receiver ... */
3764 	struct p_handshake *p = &mdev->data.sbuf.handshake;
3765 	int ok;
3766 
3767 	if (mutex_lock_interruptible(&mdev->data.mutex)) {
3768 		dev_err(DEV, "interrupted during initial handshake\n");
3769 		return 0; /* interrupted. not ok. */
3770 	}
3771 
3772 	if (mdev->data.socket == NULL) {
3773 		mutex_unlock(&mdev->data.mutex);
3774 		return 0;
3775 	}
3776 
3777 	memset(p, 0, sizeof(*p));
3778 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3779 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3780 	ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3781 			     (struct p_header *)p, sizeof(*p), 0 );
3782 	mutex_unlock(&mdev->data.mutex);
3783 	return ok;
3784 }
3785 
3786 /*
3787  * return values:
3788  *   1 yes, we have a valid connection
3789  *   0 oops, did not work out, please try again
3790  *  -1 peer talks different language,
3791  *     no point in trying again, please go standalone.
3792  */
3793 static int drbd_do_handshake(struct drbd_conf *mdev)
3794 {
3795 	/* ASSERT current == mdev->receiver ... */
3796 	struct p_handshake *p = &mdev->data.rbuf.handshake;
3797 	const int expect = sizeof(struct p_handshake)
3798 			  -sizeof(struct p_header);
3799 	int rv;
3800 
3801 	rv = drbd_send_handshake(mdev);
3802 	if (!rv)
3803 		return 0;
3804 
3805 	rv = drbd_recv_header(mdev, &p->head);
3806 	if (!rv)
3807 		return 0;
3808 
3809 	if (p->head.command != P_HAND_SHAKE) {
3810 		dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3811 		     cmdname(p->head.command), p->head.command);
3812 		return -1;
3813 	}
3814 
3815 	if (p->head.length != expect) {
3816 		dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3817 		     expect, p->head.length);
3818 		return -1;
3819 	}
3820 
3821 	rv = drbd_recv(mdev, &p->head.payload, expect);
3822 
3823 	if (rv != expect) {
3824 		dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3825 		return 0;
3826 	}
3827 
3828 	trace_drbd_packet(mdev, mdev->data.socket, 2, &mdev->data.rbuf,
3829 			__FILE__, __LINE__);
3830 
3831 	p->protocol_min = be32_to_cpu(p->protocol_min);
3832 	p->protocol_max = be32_to_cpu(p->protocol_max);
3833 	if (p->protocol_max == 0)
3834 		p->protocol_max = p->protocol_min;
3835 
3836 	if (PRO_VERSION_MAX < p->protocol_min ||
3837 	    PRO_VERSION_MIN > p->protocol_max)
3838 		goto incompat;
3839 
3840 	mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3841 
3842 	dev_info(DEV, "Handshake successful: "
3843 	     "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3844 
3845 	return 1;
3846 
3847  incompat:
3848 	dev_err(DEV, "incompatible DRBD dialects: "
3849 	    "I support %d-%d, peer supports %d-%d\n",
3850 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
3851 	    p->protocol_min, p->protocol_max);
3852 	return -1;
3853 }
3854 
3855 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3856 static int drbd_do_auth(struct drbd_conf *mdev)
3857 {
3858 	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3859 	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3860 	return 0;
3861 }
3862 #else
3863 #define CHALLENGE_LEN 64
3864 static int drbd_do_auth(struct drbd_conf *mdev)
3865 {
3866 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
3867 	struct scatterlist sg;
3868 	char *response = NULL;
3869 	char *right_response = NULL;
3870 	char *peers_ch = NULL;
3871 	struct p_header p;
3872 	unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3873 	unsigned int resp_size;
3874 	struct hash_desc desc;
3875 	int rv;
3876 
3877 	desc.tfm = mdev->cram_hmac_tfm;
3878 	desc.flags = 0;
3879 
3880 	rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3881 				(u8 *)mdev->net_conf->shared_secret, key_len);
3882 	if (rv) {
3883 		dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
3884 		rv = 0;
3885 		goto fail;
3886 	}
3887 
3888 	get_random_bytes(my_challenge, CHALLENGE_LEN);
3889 
3890 	rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
3891 	if (!rv)
3892 		goto fail;
3893 
3894 	rv = drbd_recv_header(mdev, &p);
3895 	if (!rv)
3896 		goto fail;
3897 
3898 	if (p.command != P_AUTH_CHALLENGE) {
3899 		dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
3900 		    cmdname(p.command), p.command);
3901 		rv = 0;
3902 		goto fail;
3903 	}
3904 
3905 	if (p.length > CHALLENGE_LEN*2) {
3906 		dev_err(DEV, "expected AuthChallenge payload too big.\n");
3907 		rv = 0;
3908 		goto fail;
3909 	}
3910 
3911 	peers_ch = kmalloc(p.length, GFP_NOIO);
3912 	if (peers_ch == NULL) {
3913 		dev_err(DEV, "kmalloc of peers_ch failed\n");
3914 		rv = 0;
3915 		goto fail;
3916 	}
3917 
3918 	rv = drbd_recv(mdev, peers_ch, p.length);
3919 
3920 	if (rv != p.length) {
3921 		dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
3922 		rv = 0;
3923 		goto fail;
3924 	}
3925 
3926 	resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
3927 	response = kmalloc(resp_size, GFP_NOIO);
3928 	if (response == NULL) {
3929 		dev_err(DEV, "kmalloc of response failed\n");
3930 		rv = 0;
3931 		goto fail;
3932 	}
3933 
3934 	sg_init_table(&sg, 1);
3935 	sg_set_buf(&sg, peers_ch, p.length);
3936 
3937 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
3938 	if (rv) {
3939 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3940 		rv = 0;
3941 		goto fail;
3942 	}
3943 
3944 	rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
3945 	if (!rv)
3946 		goto fail;
3947 
3948 	rv = drbd_recv_header(mdev, &p);
3949 	if (!rv)
3950 		goto fail;
3951 
3952 	if (p.command != P_AUTH_RESPONSE) {
3953 		dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
3954 		    cmdname(p.command), p.command);
3955 		rv = 0;
3956 		goto fail;
3957 	}
3958 
3959 	if (p.length != resp_size) {
3960 		dev_err(DEV, "expected AuthResponse payload of wrong size\n");
3961 		rv = 0;
3962 		goto fail;
3963 	}
3964 
3965 	rv = drbd_recv(mdev, response , resp_size);
3966 
3967 	if (rv != resp_size) {
3968 		dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
3969 		rv = 0;
3970 		goto fail;
3971 	}
3972 
3973 	right_response = kmalloc(resp_size, GFP_NOIO);
3974 	if (response == NULL) {
3975 		dev_err(DEV, "kmalloc of right_response failed\n");
3976 		rv = 0;
3977 		goto fail;
3978 	}
3979 
3980 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
3981 
3982 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
3983 	if (rv) {
3984 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3985 		rv = 0;
3986 		goto fail;
3987 	}
3988 
3989 	rv = !memcmp(response, right_response, resp_size);
3990 
3991 	if (rv)
3992 		dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
3993 		     resp_size, mdev->net_conf->cram_hmac_alg);
3994 
3995  fail:
3996 	kfree(peers_ch);
3997 	kfree(response);
3998 	kfree(right_response);
3999 
4000 	return rv;
4001 }
4002 #endif
4003 
4004 int drbdd_init(struct drbd_thread *thi)
4005 {
4006 	struct drbd_conf *mdev = thi->mdev;
4007 	unsigned int minor = mdev_to_minor(mdev);
4008 	int h;
4009 
4010 	sprintf(current->comm, "drbd%d_receiver", minor);
4011 
4012 	dev_info(DEV, "receiver (re)started\n");
4013 
4014 	do {
4015 		h = drbd_connect(mdev);
4016 		if (h == 0) {
4017 			drbd_disconnect(mdev);
4018 			__set_current_state(TASK_INTERRUPTIBLE);
4019 			schedule_timeout(HZ);
4020 		}
4021 		if (h == -1) {
4022 			dev_warn(DEV, "Discarding network configuration.\n");
4023 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4024 		}
4025 	} while (h == 0);
4026 
4027 	if (h > 0) {
4028 		if (get_net_conf(mdev)) {
4029 			drbdd(mdev);
4030 			put_net_conf(mdev);
4031 		}
4032 	}
4033 
4034 	drbd_disconnect(mdev);
4035 
4036 	dev_info(DEV, "receiver terminated\n");
4037 	return 0;
4038 }
4039 
4040 /* ********* acknowledge sender ******** */
4041 
4042 static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4043 {
4044 	struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4045 
4046 	int retcode = be32_to_cpu(p->retcode);
4047 
4048 	if (retcode >= SS_SUCCESS) {
4049 		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4050 	} else {
4051 		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4052 		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4053 		    drbd_set_st_err_str(retcode), retcode);
4054 	}
4055 	wake_up(&mdev->state_wait);
4056 
4057 	return TRUE;
4058 }
4059 
4060 static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4061 {
4062 	return drbd_send_ping_ack(mdev);
4063 
4064 }
4065 
4066 static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4067 {
4068 	/* restore idle timeout */
4069 	mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4070 
4071 	return TRUE;
4072 }
4073 
4074 static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4075 {
4076 	struct p_block_ack *p = (struct p_block_ack *)h;
4077 	sector_t sector = be64_to_cpu(p->sector);
4078 	int blksize = be32_to_cpu(p->blksize);
4079 
4080 	D_ASSERT(mdev->agreed_pro_version >= 89);
4081 
4082 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4083 
4084 	drbd_rs_complete_io(mdev, sector);
4085 	drbd_set_in_sync(mdev, sector, blksize);
4086 	/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4087 	mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4088 	dec_rs_pending(mdev);
4089 
4090 	return TRUE;
4091 }
4092 
4093 /* when we receive the ACK for a write request,
4094  * verify that we actually know about it */
4095 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4096 	u64 id, sector_t sector)
4097 {
4098 	struct hlist_head *slot = tl_hash_slot(mdev, sector);
4099 	struct hlist_node *n;
4100 	struct drbd_request *req;
4101 
4102 	hlist_for_each_entry(req, n, slot, colision) {
4103 		if ((unsigned long)req == (unsigned long)id) {
4104 			if (req->sector != sector) {
4105 				dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4106 				    "wrong sector (%llus versus %llus)\n", req,
4107 				    (unsigned long long)req->sector,
4108 				    (unsigned long long)sector);
4109 				break;
4110 			}
4111 			return req;
4112 		}
4113 	}
4114 	dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4115 		(void *)(unsigned long)id, (unsigned long long)sector);
4116 	return NULL;
4117 }
4118 
4119 typedef struct drbd_request *(req_validator_fn)
4120 	(struct drbd_conf *mdev, u64 id, sector_t sector);
4121 
4122 static int validate_req_change_req_state(struct drbd_conf *mdev,
4123 	u64 id, sector_t sector, req_validator_fn validator,
4124 	const char *func, enum drbd_req_event what)
4125 {
4126 	struct drbd_request *req;
4127 	struct bio_and_error m;
4128 
4129 	spin_lock_irq(&mdev->req_lock);
4130 	req = validator(mdev, id, sector);
4131 	if (unlikely(!req)) {
4132 		spin_unlock_irq(&mdev->req_lock);
4133 		dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4134 		return FALSE;
4135 	}
4136 	__req_mod(req, what, &m);
4137 	spin_unlock_irq(&mdev->req_lock);
4138 
4139 	if (m.bio)
4140 		complete_master_bio(mdev, &m);
4141 	return TRUE;
4142 }
4143 
4144 static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4145 {
4146 	struct p_block_ack *p = (struct p_block_ack *)h;
4147 	sector_t sector = be64_to_cpu(p->sector);
4148 	int blksize = be32_to_cpu(p->blksize);
4149 	enum drbd_req_event what;
4150 
4151 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4152 
4153 	if (is_syncer_block_id(p->block_id)) {
4154 		drbd_set_in_sync(mdev, sector, blksize);
4155 		dec_rs_pending(mdev);
4156 		return TRUE;
4157 	}
4158 	switch (be16_to_cpu(h->command)) {
4159 	case P_RS_WRITE_ACK:
4160 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4161 		what = write_acked_by_peer_and_sis;
4162 		break;
4163 	case P_WRITE_ACK:
4164 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4165 		what = write_acked_by_peer;
4166 		break;
4167 	case P_RECV_ACK:
4168 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4169 		what = recv_acked_by_peer;
4170 		break;
4171 	case P_DISCARD_ACK:
4172 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4173 		what = conflict_discarded_by_peer;
4174 		break;
4175 	default:
4176 		D_ASSERT(0);
4177 		return FALSE;
4178 	}
4179 
4180 	return validate_req_change_req_state(mdev, p->block_id, sector,
4181 		_ack_id_to_req, __func__ , what);
4182 }
4183 
4184 static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4185 {
4186 	struct p_block_ack *p = (struct p_block_ack *)h;
4187 	sector_t sector = be64_to_cpu(p->sector);
4188 
4189 	if (__ratelimit(&drbd_ratelimit_state))
4190 		dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4191 
4192 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4193 
4194 	if (is_syncer_block_id(p->block_id)) {
4195 		int size = be32_to_cpu(p->blksize);
4196 		dec_rs_pending(mdev);
4197 		drbd_rs_failed_io(mdev, sector, size);
4198 		return TRUE;
4199 	}
4200 	return validate_req_change_req_state(mdev, p->block_id, sector,
4201 		_ack_id_to_req, __func__ , neg_acked);
4202 }
4203 
4204 static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4205 {
4206 	struct p_block_ack *p = (struct p_block_ack *)h;
4207 	sector_t sector = be64_to_cpu(p->sector);
4208 
4209 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4210 	dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4211 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
4212 
4213 	return validate_req_change_req_state(mdev, p->block_id, sector,
4214 		_ar_id_to_req, __func__ , neg_acked);
4215 }
4216 
4217 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4218 {
4219 	sector_t sector;
4220 	int size;
4221 	struct p_block_ack *p = (struct p_block_ack *)h;
4222 
4223 	sector = be64_to_cpu(p->sector);
4224 	size = be32_to_cpu(p->blksize);
4225 	D_ASSERT(p->block_id == ID_SYNCER);
4226 
4227 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4228 
4229 	dec_rs_pending(mdev);
4230 
4231 	if (get_ldev_if_state(mdev, D_FAILED)) {
4232 		drbd_rs_complete_io(mdev, sector);
4233 		drbd_rs_failed_io(mdev, sector, size);
4234 		put_ldev(mdev);
4235 	}
4236 
4237 	return TRUE;
4238 }
4239 
4240 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4241 {
4242 	struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4243 
4244 	tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4245 
4246 	return TRUE;
4247 }
4248 
4249 static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4250 {
4251 	struct p_block_ack *p = (struct p_block_ack *)h;
4252 	struct drbd_work *w;
4253 	sector_t sector;
4254 	int size;
4255 
4256 	sector = be64_to_cpu(p->sector);
4257 	size = be32_to_cpu(p->blksize);
4258 
4259 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4260 
4261 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4262 		drbd_ov_oos_found(mdev, sector, size);
4263 	else
4264 		ov_oos_print(mdev);
4265 
4266 	drbd_rs_complete_io(mdev, sector);
4267 	dec_rs_pending(mdev);
4268 
4269 	if (--mdev->ov_left == 0) {
4270 		w = kmalloc(sizeof(*w), GFP_NOIO);
4271 		if (w) {
4272 			w->cb = w_ov_finished;
4273 			drbd_queue_work_front(&mdev->data.work, w);
4274 		} else {
4275 			dev_err(DEV, "kmalloc(w) failed.");
4276 			ov_oos_print(mdev);
4277 			drbd_resync_finished(mdev);
4278 		}
4279 	}
4280 	return TRUE;
4281 }
4282 
4283 struct asender_cmd {
4284 	size_t pkt_size;
4285 	int (*process)(struct drbd_conf *mdev, struct p_header *h);
4286 };
4287 
4288 static struct asender_cmd *get_asender_cmd(int cmd)
4289 {
4290 	static struct asender_cmd asender_tbl[] = {
4291 		/* anything missing from this table is in
4292 		 * the drbd_cmd_handler (drbd_default_handler) table,
4293 		 * see the beginning of drbdd() */
4294 	[P_PING]	    = { sizeof(struct p_header), got_Ping },
4295 	[P_PING_ACK]	    = { sizeof(struct p_header), got_PingAck },
4296 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4297 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4298 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4299 	[P_DISCARD_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4300 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
4301 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
4302 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4303 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
4304 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
4305 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4306 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4307 	[P_MAX_CMD]	    = { 0, NULL },
4308 	};
4309 	if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4310 		return NULL;
4311 	return &asender_tbl[cmd];
4312 }
4313 
4314 int drbd_asender(struct drbd_thread *thi)
4315 {
4316 	struct drbd_conf *mdev = thi->mdev;
4317 	struct p_header *h = &mdev->meta.rbuf.header;
4318 	struct asender_cmd *cmd = NULL;
4319 
4320 	int rv, len;
4321 	void *buf    = h;
4322 	int received = 0;
4323 	int expect   = sizeof(struct p_header);
4324 	int empty;
4325 
4326 	sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4327 
4328 	current->policy = SCHED_RR;  /* Make this a realtime task! */
4329 	current->rt_priority = 2;    /* more important than all other tasks */
4330 
4331 	while (get_t_state(thi) == Running) {
4332 		drbd_thread_current_set_cpu(mdev);
4333 		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4334 			ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4335 			mdev->meta.socket->sk->sk_rcvtimeo =
4336 				mdev->net_conf->ping_timeo*HZ/10;
4337 		}
4338 
4339 		/* conditionally cork;
4340 		 * it may hurt latency if we cork without much to send */
4341 		if (!mdev->net_conf->no_cork &&
4342 			3 < atomic_read(&mdev->unacked_cnt))
4343 			drbd_tcp_cork(mdev->meta.socket);
4344 		while (1) {
4345 			clear_bit(SIGNAL_ASENDER, &mdev->flags);
4346 			flush_signals(current);
4347 			if (!drbd_process_done_ee(mdev)) {
4348 				dev_err(DEV, "process_done_ee() = NOT_OK\n");
4349 				goto reconnect;
4350 			}
4351 			/* to avoid race with newly queued ACKs */
4352 			set_bit(SIGNAL_ASENDER, &mdev->flags);
4353 			spin_lock_irq(&mdev->req_lock);
4354 			empty = list_empty(&mdev->done_ee);
4355 			spin_unlock_irq(&mdev->req_lock);
4356 			/* new ack may have been queued right here,
4357 			 * but then there is also a signal pending,
4358 			 * and we start over... */
4359 			if (empty)
4360 				break;
4361 		}
4362 		/* but unconditionally uncork unless disabled */
4363 		if (!mdev->net_conf->no_cork)
4364 			drbd_tcp_uncork(mdev->meta.socket);
4365 
4366 		/* short circuit, recv_msg would return EINTR anyways. */
4367 		if (signal_pending(current))
4368 			continue;
4369 
4370 		rv = drbd_recv_short(mdev, mdev->meta.socket,
4371 				     buf, expect-received, 0);
4372 		clear_bit(SIGNAL_ASENDER, &mdev->flags);
4373 
4374 		flush_signals(current);
4375 
4376 		/* Note:
4377 		 * -EINTR	 (on meta) we got a signal
4378 		 * -EAGAIN	 (on meta) rcvtimeo expired
4379 		 * -ECONNRESET	 other side closed the connection
4380 		 * -ERESTARTSYS  (on data) we got a signal
4381 		 * rv <  0	 other than above: unexpected error!
4382 		 * rv == expected: full header or command
4383 		 * rv <  expected: "woken" by signal during receive
4384 		 * rv == 0	 : "connection shut down by peer"
4385 		 */
4386 		if (likely(rv > 0)) {
4387 			received += rv;
4388 			buf	 += rv;
4389 		} else if (rv == 0) {
4390 			dev_err(DEV, "meta connection shut down by peer.\n");
4391 			goto reconnect;
4392 		} else if (rv == -EAGAIN) {
4393 			if (mdev->meta.socket->sk->sk_rcvtimeo ==
4394 			    mdev->net_conf->ping_timeo*HZ/10) {
4395 				dev_err(DEV, "PingAck did not arrive in time.\n");
4396 				goto reconnect;
4397 			}
4398 			set_bit(SEND_PING, &mdev->flags);
4399 			continue;
4400 		} else if (rv == -EINTR) {
4401 			continue;
4402 		} else {
4403 			dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4404 			goto reconnect;
4405 		}
4406 
4407 		if (received == expect && cmd == NULL) {
4408 			if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4409 				dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4410 				    (long)be32_to_cpu(h->magic),
4411 				    h->command, h->length);
4412 				goto reconnect;
4413 			}
4414 			cmd = get_asender_cmd(be16_to_cpu(h->command));
4415 			len = be16_to_cpu(h->length);
4416 			if (unlikely(cmd == NULL)) {
4417 				dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4418 				    (long)be32_to_cpu(h->magic),
4419 				    h->command, h->length);
4420 				goto disconnect;
4421 			}
4422 			expect = cmd->pkt_size;
4423 			ERR_IF(len != expect-sizeof(struct p_header)) {
4424 				trace_drbd_packet(mdev, mdev->meta.socket, 1, (void *)h, __FILE__, __LINE__);
4425 				goto reconnect;
4426 			}
4427 		}
4428 		if (received == expect) {
4429 			D_ASSERT(cmd != NULL);
4430 			trace_drbd_packet(mdev, mdev->meta.socket, 1, (void *)h, __FILE__, __LINE__);
4431 			if (!cmd->process(mdev, h))
4432 				goto reconnect;
4433 
4434 			buf	 = h;
4435 			received = 0;
4436 			expect	 = sizeof(struct p_header);
4437 			cmd	 = NULL;
4438 		}
4439 	}
4440 
4441 	if (0) {
4442 reconnect:
4443 		drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4444 	}
4445 	if (0) {
4446 disconnect:
4447 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4448 	}
4449 	clear_bit(SIGNAL_ASENDER, &mdev->flags);
4450 
4451 	D_ASSERT(mdev->state.conn < C_CONNECTED);
4452 	dev_info(DEV, "asender terminated\n");
4453 
4454 	return 0;
4455 }
4456