xref: /linux/drivers/block/drbd/drbd_receiver.c (revision b89371621e5bedc84498ced2c5c33976bd1b2f64)
1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/version.h>
32 #include <linux/drbd.h>
33 #include <linux/fs.h>
34 #include <linux/file.h>
35 #include <linux/in.h>
36 #include <linux/mm.h>
37 #include <linux/memcontrol.h>
38 #include <linux/mm_inline.h>
39 #include <linux/slab.h>
40 #include <linux/smp_lock.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/mm.h>
47 #include <linux/string.h>
48 #include <linux/scatterlist.h>
49 #include "drbd_int.h"
50 #include "drbd_req.h"
51 
52 #include "drbd_vli.h"
53 
54 struct flush_work {
55 	struct drbd_work w;
56 	struct drbd_epoch *epoch;
57 };
58 
59 enum finish_epoch {
60 	FE_STILL_LIVE,
61 	FE_DESTROYED,
62 	FE_RECYCLED,
63 };
64 
65 static int drbd_do_handshake(struct drbd_conf *mdev);
66 static int drbd_do_auth(struct drbd_conf *mdev);
67 
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
70 
71 static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
72 {
73 	struct drbd_epoch *prev;
74 	spin_lock(&mdev->epoch_lock);
75 	prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
76 	if (prev == epoch || prev == mdev->current_epoch)
77 		prev = NULL;
78 	spin_unlock(&mdev->epoch_lock);
79 	return prev;
80 }
81 
82 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
83 
84 static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
85 {
86 	struct page *page = NULL;
87 
88 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
89 	 * So what. It saves a spin_lock. */
90 	if (drbd_pp_vacant > 0) {
91 		spin_lock(&drbd_pp_lock);
92 		page = drbd_pp_pool;
93 		if (page) {
94 			drbd_pp_pool = (struct page *)page_private(page);
95 			set_page_private(page, 0); /* just to be polite */
96 			drbd_pp_vacant--;
97 		}
98 		spin_unlock(&drbd_pp_lock);
99 	}
100 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
101 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
102 	 * which in turn might block on the other node at this very place.  */
103 	if (!page)
104 		page = alloc_page(GFP_TRY);
105 	if (page)
106 		atomic_inc(&mdev->pp_in_use);
107 	return page;
108 }
109 
110 /* kick lower level device, if we have more than (arbitrary number)
111  * reference counts on it, which typically are locally submitted io
112  * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
113 static void maybe_kick_lo(struct drbd_conf *mdev)
114 {
115 	if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
116 		drbd_kick_lo(mdev);
117 }
118 
119 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
120 {
121 	struct drbd_epoch_entry *e;
122 	struct list_head *le, *tle;
123 
124 	/* The EEs are always appended to the end of the list. Since
125 	   they are sent in order over the wire, they have to finish
126 	   in order. As soon as we see the first not finished we can
127 	   stop to examine the list... */
128 
129 	list_for_each_safe(le, tle, &mdev->net_ee) {
130 		e = list_entry(le, struct drbd_epoch_entry, w.list);
131 		if (drbd_bio_has_active_page(e->private_bio))
132 			break;
133 		list_move(le, to_be_freed);
134 	}
135 }
136 
137 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
138 {
139 	LIST_HEAD(reclaimed);
140 	struct drbd_epoch_entry *e, *t;
141 
142 	maybe_kick_lo(mdev);
143 	spin_lock_irq(&mdev->req_lock);
144 	reclaim_net_ee(mdev, &reclaimed);
145 	spin_unlock_irq(&mdev->req_lock);
146 
147 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
148 		drbd_free_ee(mdev, e);
149 }
150 
151 /**
152  * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
153  * @mdev:	DRBD device.
154  * @retry:	whether or not to retry allocation forever (or until signalled)
155  *
156  * Tries to allocate a page, first from our own page pool, then from the
157  * kernel, unless this allocation would exceed the max_buffers setting.
158  * If @retry is non-zero, retry until DRBD frees a page somewhere else.
159  */
160 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
161 {
162 	struct page *page = NULL;
163 	DEFINE_WAIT(wait);
164 
165 	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
166 		page = drbd_pp_first_page_or_try_alloc(mdev);
167 		if (page)
168 			return page;
169 	}
170 
171 	for (;;) {
172 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
173 
174 		drbd_kick_lo_and_reclaim_net(mdev);
175 
176 		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
177 			page = drbd_pp_first_page_or_try_alloc(mdev);
178 			if (page)
179 				break;
180 		}
181 
182 		if (!retry)
183 			break;
184 
185 		if (signal_pending(current)) {
186 			dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
187 			break;
188 		}
189 
190 		schedule();
191 	}
192 	finish_wait(&drbd_pp_wait, &wait);
193 
194 	return page;
195 }
196 
197 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
198  * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
199 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
200 {
201 	int free_it;
202 
203 	spin_lock(&drbd_pp_lock);
204 	if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
205 		free_it = 1;
206 	} else {
207 		set_page_private(page, (unsigned long)drbd_pp_pool);
208 		drbd_pp_pool = page;
209 		drbd_pp_vacant++;
210 		free_it = 0;
211 	}
212 	spin_unlock(&drbd_pp_lock);
213 
214 	atomic_dec(&mdev->pp_in_use);
215 
216 	if (free_it)
217 		__free_page(page);
218 
219 	wake_up(&drbd_pp_wait);
220 }
221 
222 static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
223 {
224 	struct page *p_to_be_freed = NULL;
225 	struct page *page;
226 	struct bio_vec *bvec;
227 	int i;
228 
229 	spin_lock(&drbd_pp_lock);
230 	__bio_for_each_segment(bvec, bio, i, 0) {
231 		if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
232 			set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
233 			p_to_be_freed = bvec->bv_page;
234 		} else {
235 			set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
236 			drbd_pp_pool = bvec->bv_page;
237 			drbd_pp_vacant++;
238 		}
239 	}
240 	spin_unlock(&drbd_pp_lock);
241 	atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
242 
243 	while (p_to_be_freed) {
244 		page = p_to_be_freed;
245 		p_to_be_freed = (struct page *)page_private(page);
246 		set_page_private(page, 0); /* just to be polite */
247 		put_page(page);
248 	}
249 
250 	wake_up(&drbd_pp_wait);
251 }
252 
253 /*
254 You need to hold the req_lock:
255  _drbd_wait_ee_list_empty()
256 
257 You must not have the req_lock:
258  drbd_free_ee()
259  drbd_alloc_ee()
260  drbd_init_ee()
261  drbd_release_ee()
262  drbd_ee_fix_bhs()
263  drbd_process_done_ee()
264  drbd_clear_done_ee()
265  drbd_wait_ee_list_empty()
266 */
267 
268 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
269 				     u64 id,
270 				     sector_t sector,
271 				     unsigned int data_size,
272 				     gfp_t gfp_mask) __must_hold(local)
273 {
274 	struct request_queue *q;
275 	struct drbd_epoch_entry *e;
276 	struct page *page;
277 	struct bio *bio;
278 	unsigned int ds;
279 
280 	if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
281 		return NULL;
282 
283 	e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
284 	if (!e) {
285 		if (!(gfp_mask & __GFP_NOWARN))
286 			dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
287 		return NULL;
288 	}
289 
290 	bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
291 	if (!bio) {
292 		if (!(gfp_mask & __GFP_NOWARN))
293 			dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
294 		goto fail1;
295 	}
296 
297 	bio->bi_bdev = mdev->ldev->backing_bdev;
298 	bio->bi_sector = sector;
299 
300 	ds = data_size;
301 	while (ds) {
302 		page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
303 		if (!page) {
304 			if (!(gfp_mask & __GFP_NOWARN))
305 				dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
306 			goto fail2;
307 		}
308 		if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
309 			drbd_pp_free(mdev, page);
310 			dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
311 			    "data_size=%u,ds=%u) failed\n",
312 			    (unsigned long long)sector, data_size, ds);
313 
314 			q = bdev_get_queue(bio->bi_bdev);
315 			if (q->merge_bvec_fn) {
316 				struct bvec_merge_data bvm = {
317 					.bi_bdev = bio->bi_bdev,
318 					.bi_sector = bio->bi_sector,
319 					.bi_size = bio->bi_size,
320 					.bi_rw = bio->bi_rw,
321 				};
322 				int l = q->merge_bvec_fn(q, &bvm,
323 						&bio->bi_io_vec[bio->bi_vcnt]);
324 				dev_err(DEV, "merge_bvec_fn() = %d\n", l);
325 			}
326 
327 			/* dump more of the bio. */
328 			dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
329 			dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
330 			dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
331 			dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
332 
333 			goto fail2;
334 			break;
335 		}
336 		ds -= min_t(int, ds, PAGE_SIZE);
337 	}
338 
339 	D_ASSERT(data_size == bio->bi_size);
340 
341 	bio->bi_private = e;
342 	e->mdev = mdev;
343 	e->sector = sector;
344 	e->size = bio->bi_size;
345 
346 	e->private_bio = bio;
347 	e->block_id = id;
348 	INIT_HLIST_NODE(&e->colision);
349 	e->epoch = NULL;
350 	e->flags = 0;
351 
352 	return e;
353 
354  fail2:
355 	drbd_pp_free_bio_pages(mdev, bio);
356 	bio_put(bio);
357  fail1:
358 	mempool_free(e, drbd_ee_mempool);
359 
360 	return NULL;
361 }
362 
363 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
364 {
365 	struct bio *bio = e->private_bio;
366 	drbd_pp_free_bio_pages(mdev, bio);
367 	bio_put(bio);
368 	D_ASSERT(hlist_unhashed(&e->colision));
369 	mempool_free(e, drbd_ee_mempool);
370 }
371 
372 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
373 {
374 	LIST_HEAD(work_list);
375 	struct drbd_epoch_entry *e, *t;
376 	int count = 0;
377 
378 	spin_lock_irq(&mdev->req_lock);
379 	list_splice_init(list, &work_list);
380 	spin_unlock_irq(&mdev->req_lock);
381 
382 	list_for_each_entry_safe(e, t, &work_list, w.list) {
383 		drbd_free_ee(mdev, e);
384 		count++;
385 	}
386 	return count;
387 }
388 
389 
390 /*
391  * This function is called from _asender only_
392  * but see also comments in _req_mod(,barrier_acked)
393  * and receive_Barrier.
394  *
395  * Move entries from net_ee to done_ee, if ready.
396  * Grab done_ee, call all callbacks, free the entries.
397  * The callbacks typically send out ACKs.
398  */
399 static int drbd_process_done_ee(struct drbd_conf *mdev)
400 {
401 	LIST_HEAD(work_list);
402 	LIST_HEAD(reclaimed);
403 	struct drbd_epoch_entry *e, *t;
404 	int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
405 
406 	spin_lock_irq(&mdev->req_lock);
407 	reclaim_net_ee(mdev, &reclaimed);
408 	list_splice_init(&mdev->done_ee, &work_list);
409 	spin_unlock_irq(&mdev->req_lock);
410 
411 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
412 		drbd_free_ee(mdev, e);
413 
414 	/* possible callbacks here:
415 	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
416 	 * all ignore the last argument.
417 	 */
418 	list_for_each_entry_safe(e, t, &work_list, w.list) {
419 		/* list_del not necessary, next/prev members not touched */
420 		ok = e->w.cb(mdev, &e->w, !ok) && ok;
421 		drbd_free_ee(mdev, e);
422 	}
423 	wake_up(&mdev->ee_wait);
424 
425 	return ok;
426 }
427 
428 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
429 {
430 	DEFINE_WAIT(wait);
431 
432 	/* avoids spin_lock/unlock
433 	 * and calling prepare_to_wait in the fast path */
434 	while (!list_empty(head)) {
435 		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
436 		spin_unlock_irq(&mdev->req_lock);
437 		drbd_kick_lo(mdev);
438 		schedule();
439 		finish_wait(&mdev->ee_wait, &wait);
440 		spin_lock_irq(&mdev->req_lock);
441 	}
442 }
443 
444 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
445 {
446 	spin_lock_irq(&mdev->req_lock);
447 	_drbd_wait_ee_list_empty(mdev, head);
448 	spin_unlock_irq(&mdev->req_lock);
449 }
450 
451 /* see also kernel_accept; which is only present since 2.6.18.
452  * also we want to log which part of it failed, exactly */
453 static int drbd_accept(struct drbd_conf *mdev, const char **what,
454 		struct socket *sock, struct socket **newsock)
455 {
456 	struct sock *sk = sock->sk;
457 	int err = 0;
458 
459 	*what = "listen";
460 	err = sock->ops->listen(sock, 5);
461 	if (err < 0)
462 		goto out;
463 
464 	*what = "sock_create_lite";
465 	err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
466 			       newsock);
467 	if (err < 0)
468 		goto out;
469 
470 	*what = "accept";
471 	err = sock->ops->accept(sock, *newsock, 0);
472 	if (err < 0) {
473 		sock_release(*newsock);
474 		*newsock = NULL;
475 		goto out;
476 	}
477 	(*newsock)->ops  = sock->ops;
478 
479 out:
480 	return err;
481 }
482 
483 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
484 		    void *buf, size_t size, int flags)
485 {
486 	mm_segment_t oldfs;
487 	struct kvec iov = {
488 		.iov_base = buf,
489 		.iov_len = size,
490 	};
491 	struct msghdr msg = {
492 		.msg_iovlen = 1,
493 		.msg_iov = (struct iovec *)&iov,
494 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
495 	};
496 	int rv;
497 
498 	oldfs = get_fs();
499 	set_fs(KERNEL_DS);
500 	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
501 	set_fs(oldfs);
502 
503 	return rv;
504 }
505 
506 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
507 {
508 	mm_segment_t oldfs;
509 	struct kvec iov = {
510 		.iov_base = buf,
511 		.iov_len = size,
512 	};
513 	struct msghdr msg = {
514 		.msg_iovlen = 1,
515 		.msg_iov = (struct iovec *)&iov,
516 		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
517 	};
518 	int rv;
519 
520 	oldfs = get_fs();
521 	set_fs(KERNEL_DS);
522 
523 	for (;;) {
524 		rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
525 		if (rv == size)
526 			break;
527 
528 		/* Note:
529 		 * ECONNRESET	other side closed the connection
530 		 * ERESTARTSYS	(on  sock) we got a signal
531 		 */
532 
533 		if (rv < 0) {
534 			if (rv == -ECONNRESET)
535 				dev_info(DEV, "sock was reset by peer\n");
536 			else if (rv != -ERESTARTSYS)
537 				dev_err(DEV, "sock_recvmsg returned %d\n", rv);
538 			break;
539 		} else if (rv == 0) {
540 			dev_info(DEV, "sock was shut down by peer\n");
541 			break;
542 		} else	{
543 			/* signal came in, or peer/link went down,
544 			 * after we read a partial message
545 			 */
546 			/* D_ASSERT(signal_pending(current)); */
547 			break;
548 		}
549 	};
550 
551 	set_fs(oldfs);
552 
553 	if (rv != size)
554 		drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
555 
556 	return rv;
557 }
558 
559 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
560 {
561 	const char *what;
562 	struct socket *sock;
563 	struct sockaddr_in6 src_in6;
564 	int err;
565 	int disconnect_on_error = 1;
566 
567 	if (!get_net_conf(mdev))
568 		return NULL;
569 
570 	what = "sock_create_kern";
571 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
572 		SOCK_STREAM, IPPROTO_TCP, &sock);
573 	if (err < 0) {
574 		sock = NULL;
575 		goto out;
576 	}
577 
578 	sock->sk->sk_rcvtimeo =
579 	sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
580 
581        /* explicitly bind to the configured IP as source IP
582 	*  for the outgoing connections.
583 	*  This is needed for multihomed hosts and to be
584 	*  able to use lo: interfaces for drbd.
585 	* Make sure to use 0 as port number, so linux selects
586 	*  a free one dynamically.
587 	*/
588 	memcpy(&src_in6, mdev->net_conf->my_addr,
589 	       min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
590 	if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
591 		src_in6.sin6_port = 0;
592 	else
593 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
594 
595 	what = "bind before connect";
596 	err = sock->ops->bind(sock,
597 			      (struct sockaddr *) &src_in6,
598 			      mdev->net_conf->my_addr_len);
599 	if (err < 0)
600 		goto out;
601 
602 	/* connect may fail, peer not yet available.
603 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
604 	disconnect_on_error = 0;
605 	what = "connect";
606 	err = sock->ops->connect(sock,
607 				 (struct sockaddr *)mdev->net_conf->peer_addr,
608 				 mdev->net_conf->peer_addr_len, 0);
609 
610 out:
611 	if (err < 0) {
612 		if (sock) {
613 			sock_release(sock);
614 			sock = NULL;
615 		}
616 		switch (-err) {
617 			/* timeout, busy, signal pending */
618 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
619 		case EINTR: case ERESTARTSYS:
620 			/* peer not (yet) available, network problem */
621 		case ECONNREFUSED: case ENETUNREACH:
622 		case EHOSTDOWN:    case EHOSTUNREACH:
623 			disconnect_on_error = 0;
624 			break;
625 		default:
626 			dev_err(DEV, "%s failed, err = %d\n", what, err);
627 		}
628 		if (disconnect_on_error)
629 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
630 	}
631 	put_net_conf(mdev);
632 	return sock;
633 }
634 
635 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
636 {
637 	int timeo, err;
638 	struct socket *s_estab = NULL, *s_listen;
639 	const char *what;
640 
641 	if (!get_net_conf(mdev))
642 		return NULL;
643 
644 	what = "sock_create_kern";
645 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
646 		SOCK_STREAM, IPPROTO_TCP, &s_listen);
647 	if (err) {
648 		s_listen = NULL;
649 		goto out;
650 	}
651 
652 	timeo = mdev->net_conf->try_connect_int * HZ;
653 	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
654 
655 	s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
656 	s_listen->sk->sk_rcvtimeo = timeo;
657 	s_listen->sk->sk_sndtimeo = timeo;
658 
659 	what = "bind before listen";
660 	err = s_listen->ops->bind(s_listen,
661 			      (struct sockaddr *) mdev->net_conf->my_addr,
662 			      mdev->net_conf->my_addr_len);
663 	if (err < 0)
664 		goto out;
665 
666 	err = drbd_accept(mdev, &what, s_listen, &s_estab);
667 
668 out:
669 	if (s_listen)
670 		sock_release(s_listen);
671 	if (err < 0) {
672 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
673 			dev_err(DEV, "%s failed, err = %d\n", what, err);
674 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
675 		}
676 	}
677 	put_net_conf(mdev);
678 
679 	return s_estab;
680 }
681 
682 static int drbd_send_fp(struct drbd_conf *mdev,
683 	struct socket *sock, enum drbd_packets cmd)
684 {
685 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
686 
687 	return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
688 }
689 
690 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
691 {
692 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
693 	int rr;
694 
695 	rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
696 
697 	if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
698 		return be16_to_cpu(h->command);
699 
700 	return 0xffff;
701 }
702 
703 /**
704  * drbd_socket_okay() - Free the socket if its connection is not okay
705  * @mdev:	DRBD device.
706  * @sock:	pointer to the pointer to the socket.
707  */
708 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
709 {
710 	int rr;
711 	char tb[4];
712 
713 	if (!*sock)
714 		return FALSE;
715 
716 	rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
717 
718 	if (rr > 0 || rr == -EAGAIN) {
719 		return TRUE;
720 	} else {
721 		sock_release(*sock);
722 		*sock = NULL;
723 		return FALSE;
724 	}
725 }
726 
727 /*
728  * return values:
729  *   1 yes, we have a valid connection
730  *   0 oops, did not work out, please try again
731  *  -1 peer talks different language,
732  *     no point in trying again, please go standalone.
733  *  -2 We do not have a network config...
734  */
735 static int drbd_connect(struct drbd_conf *mdev)
736 {
737 	struct socket *s, *sock, *msock;
738 	int try, h, ok;
739 
740 	D_ASSERT(!mdev->data.socket);
741 
742 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
743 		dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
744 
745 	if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
746 		return -2;
747 
748 	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
749 
750 	sock  = NULL;
751 	msock = NULL;
752 
753 	do {
754 		for (try = 0;;) {
755 			/* 3 tries, this should take less than a second! */
756 			s = drbd_try_connect(mdev);
757 			if (s || ++try >= 3)
758 				break;
759 			/* give the other side time to call bind() & listen() */
760 			__set_current_state(TASK_INTERRUPTIBLE);
761 			schedule_timeout(HZ / 10);
762 		}
763 
764 		if (s) {
765 			if (!sock) {
766 				drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
767 				sock = s;
768 				s = NULL;
769 			} else if (!msock) {
770 				drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
771 				msock = s;
772 				s = NULL;
773 			} else {
774 				dev_err(DEV, "Logic error in drbd_connect()\n");
775 				goto out_release_sockets;
776 			}
777 		}
778 
779 		if (sock && msock) {
780 			__set_current_state(TASK_INTERRUPTIBLE);
781 			schedule_timeout(HZ / 10);
782 			ok = drbd_socket_okay(mdev, &sock);
783 			ok = drbd_socket_okay(mdev, &msock) && ok;
784 			if (ok)
785 				break;
786 		}
787 
788 retry:
789 		s = drbd_wait_for_connect(mdev);
790 		if (s) {
791 			try = drbd_recv_fp(mdev, s);
792 			drbd_socket_okay(mdev, &sock);
793 			drbd_socket_okay(mdev, &msock);
794 			switch (try) {
795 			case P_HAND_SHAKE_S:
796 				if (sock) {
797 					dev_warn(DEV, "initial packet S crossed\n");
798 					sock_release(sock);
799 				}
800 				sock = s;
801 				break;
802 			case P_HAND_SHAKE_M:
803 				if (msock) {
804 					dev_warn(DEV, "initial packet M crossed\n");
805 					sock_release(msock);
806 				}
807 				msock = s;
808 				set_bit(DISCARD_CONCURRENT, &mdev->flags);
809 				break;
810 			default:
811 				dev_warn(DEV, "Error receiving initial packet\n");
812 				sock_release(s);
813 				if (random32() & 1)
814 					goto retry;
815 			}
816 		}
817 
818 		if (mdev->state.conn <= C_DISCONNECTING)
819 			goto out_release_sockets;
820 		if (signal_pending(current)) {
821 			flush_signals(current);
822 			smp_rmb();
823 			if (get_t_state(&mdev->receiver) == Exiting)
824 				goto out_release_sockets;
825 		}
826 
827 		if (sock && msock) {
828 			ok = drbd_socket_okay(mdev, &sock);
829 			ok = drbd_socket_okay(mdev, &msock) && ok;
830 			if (ok)
831 				break;
832 		}
833 	} while (1);
834 
835 	msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
836 	sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
837 
838 	sock->sk->sk_allocation = GFP_NOIO;
839 	msock->sk->sk_allocation = GFP_NOIO;
840 
841 	sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
842 	msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
843 
844 	if (mdev->net_conf->sndbuf_size) {
845 		sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
846 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
847 	}
848 
849 	if (mdev->net_conf->rcvbuf_size) {
850 		sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
851 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
852 	}
853 
854 	/* NOT YET ...
855 	 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
856 	 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
857 	 * first set it to the P_HAND_SHAKE timeout,
858 	 * which we set to 4x the configured ping_timeout. */
859 	sock->sk->sk_sndtimeo =
860 	sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
861 
862 	msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
863 	msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
864 
865 	/* we don't want delays.
866 	 * we use TCP_CORK where apropriate, though */
867 	drbd_tcp_nodelay(sock);
868 	drbd_tcp_nodelay(msock);
869 
870 	mdev->data.socket = sock;
871 	mdev->meta.socket = msock;
872 	mdev->last_received = jiffies;
873 
874 	D_ASSERT(mdev->asender.task == NULL);
875 
876 	h = drbd_do_handshake(mdev);
877 	if (h <= 0)
878 		return h;
879 
880 	if (mdev->cram_hmac_tfm) {
881 		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
882 		if (!drbd_do_auth(mdev)) {
883 			dev_err(DEV, "Authentication of peer failed\n");
884 			return -1;
885 		}
886 	}
887 
888 	if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
889 		return 0;
890 
891 	sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
892 	sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
893 
894 	atomic_set(&mdev->packet_seq, 0);
895 	mdev->peer_seq = 0;
896 
897 	drbd_thread_start(&mdev->asender);
898 
899 	drbd_send_protocol(mdev);
900 	drbd_send_sync_param(mdev, &mdev->sync_conf);
901 	drbd_send_sizes(mdev, 0);
902 	drbd_send_uuids(mdev);
903 	drbd_send_state(mdev);
904 	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
905 	clear_bit(RESIZE_PENDING, &mdev->flags);
906 
907 	return 1;
908 
909 out_release_sockets:
910 	if (sock)
911 		sock_release(sock);
912 	if (msock)
913 		sock_release(msock);
914 	return -1;
915 }
916 
917 static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
918 {
919 	int r;
920 
921 	r = drbd_recv(mdev, h, sizeof(*h));
922 
923 	if (unlikely(r != sizeof(*h))) {
924 		dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
925 		return FALSE;
926 	};
927 	h->command = be16_to_cpu(h->command);
928 	h->length  = be16_to_cpu(h->length);
929 	if (unlikely(h->magic != BE_DRBD_MAGIC)) {
930 		dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
931 		    (long)be32_to_cpu(h->magic),
932 		    h->command, h->length);
933 		return FALSE;
934 	}
935 	mdev->last_received = jiffies;
936 
937 	return TRUE;
938 }
939 
940 static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
941 {
942 	int rv;
943 
944 	if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
945 		rv = blkdev_issue_flush(mdev->ldev->backing_bdev, NULL);
946 		if (rv) {
947 			dev_err(DEV, "local disk flush failed with status %d\n", rv);
948 			/* would rather check on EOPNOTSUPP, but that is not reliable.
949 			 * don't try again for ANY return value != 0
950 			 * if (rv == -EOPNOTSUPP) */
951 			drbd_bump_write_ordering(mdev, WO_drain_io);
952 		}
953 		put_ldev(mdev);
954 	}
955 
956 	return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
957 }
958 
959 static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
960 {
961 	struct flush_work *fw = (struct flush_work *)w;
962 	struct drbd_epoch *epoch = fw->epoch;
963 
964 	kfree(w);
965 
966 	if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
967 		drbd_flush_after_epoch(mdev, epoch);
968 
969 	drbd_may_finish_epoch(mdev, epoch, EV_PUT |
970 			      (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
971 
972 	return 1;
973 }
974 
975 /**
976  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
977  * @mdev:	DRBD device.
978  * @epoch:	Epoch object.
979  * @ev:		Epoch event.
980  */
981 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
982 					       struct drbd_epoch *epoch,
983 					       enum epoch_event ev)
984 {
985 	int finish, epoch_size;
986 	struct drbd_epoch *next_epoch;
987 	int schedule_flush = 0;
988 	enum finish_epoch rv = FE_STILL_LIVE;
989 
990 	spin_lock(&mdev->epoch_lock);
991 	do {
992 		next_epoch = NULL;
993 		finish = 0;
994 
995 		epoch_size = atomic_read(&epoch->epoch_size);
996 
997 		switch (ev & ~EV_CLEANUP) {
998 		case EV_PUT:
999 			atomic_dec(&epoch->active);
1000 			break;
1001 		case EV_GOT_BARRIER_NR:
1002 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1003 
1004 			/* Special case: If we just switched from WO_bio_barrier to
1005 			   WO_bdev_flush we should not finish the current epoch */
1006 			if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1007 			    mdev->write_ordering != WO_bio_barrier &&
1008 			    epoch == mdev->current_epoch)
1009 				clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1010 			break;
1011 		case EV_BARRIER_DONE:
1012 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1013 			break;
1014 		case EV_BECAME_LAST:
1015 			/* nothing to do*/
1016 			break;
1017 		}
1018 
1019 		if (epoch_size != 0 &&
1020 		    atomic_read(&epoch->active) == 0 &&
1021 		    test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1022 		    epoch->list.prev == &mdev->current_epoch->list &&
1023 		    !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1024 			/* Nearly all conditions are met to finish that epoch... */
1025 			if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1026 			    mdev->write_ordering == WO_none ||
1027 			    (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1028 			    ev & EV_CLEANUP) {
1029 				finish = 1;
1030 				set_bit(DE_IS_FINISHING, &epoch->flags);
1031 			} else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1032 				 mdev->write_ordering == WO_bio_barrier) {
1033 				atomic_inc(&epoch->active);
1034 				schedule_flush = 1;
1035 			}
1036 		}
1037 		if (finish) {
1038 			if (!(ev & EV_CLEANUP)) {
1039 				spin_unlock(&mdev->epoch_lock);
1040 				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1041 				spin_lock(&mdev->epoch_lock);
1042 			}
1043 			dec_unacked(mdev);
1044 
1045 			if (mdev->current_epoch != epoch) {
1046 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1047 				list_del(&epoch->list);
1048 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1049 				mdev->epochs--;
1050 				kfree(epoch);
1051 
1052 				if (rv == FE_STILL_LIVE)
1053 					rv = FE_DESTROYED;
1054 			} else {
1055 				epoch->flags = 0;
1056 				atomic_set(&epoch->epoch_size, 0);
1057 				/* atomic_set(&epoch->active, 0); is alrady zero */
1058 				if (rv == FE_STILL_LIVE)
1059 					rv = FE_RECYCLED;
1060 			}
1061 		}
1062 
1063 		if (!next_epoch)
1064 			break;
1065 
1066 		epoch = next_epoch;
1067 	} while (1);
1068 
1069 	spin_unlock(&mdev->epoch_lock);
1070 
1071 	if (schedule_flush) {
1072 		struct flush_work *fw;
1073 		fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1074 		if (fw) {
1075 			fw->w.cb = w_flush;
1076 			fw->epoch = epoch;
1077 			drbd_queue_work(&mdev->data.work, &fw->w);
1078 		} else {
1079 			dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1080 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1081 			/* That is not a recursion, only one level */
1082 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1083 			drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1084 		}
1085 	}
1086 
1087 	return rv;
1088 }
1089 
1090 /**
1091  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1092  * @mdev:	DRBD device.
1093  * @wo:		Write ordering method to try.
1094  */
1095 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1096 {
1097 	enum write_ordering_e pwo;
1098 	static char *write_ordering_str[] = {
1099 		[WO_none] = "none",
1100 		[WO_drain_io] = "drain",
1101 		[WO_bdev_flush] = "flush",
1102 		[WO_bio_barrier] = "barrier",
1103 	};
1104 
1105 	pwo = mdev->write_ordering;
1106 	wo = min(pwo, wo);
1107 	if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1108 		wo = WO_bdev_flush;
1109 	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1110 		wo = WO_drain_io;
1111 	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1112 		wo = WO_none;
1113 	mdev->write_ordering = wo;
1114 	if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1115 		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1116 }
1117 
1118 /**
1119  * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1120  * @mdev:	DRBD device.
1121  * @w:		work object.
1122  * @cancel:	The connection will be closed anyways (unused in this callback)
1123  */
1124 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1125 {
1126 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1127 	struct bio *bio = e->private_bio;
1128 
1129 	/* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1130 	   (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1131 	   so that we can finish that epoch in drbd_may_finish_epoch().
1132 	   That is necessary if we already have a long chain of Epochs, before
1133 	   we realize that BIO_RW_BARRIER is actually not supported */
1134 
1135 	/* As long as the -ENOTSUPP on the barrier is reported immediately
1136 	   that will never trigger. If it is reported late, we will just
1137 	   print that warning and continue correctly for all future requests
1138 	   with WO_bdev_flush */
1139 	if (previous_epoch(mdev, e->epoch))
1140 		dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1141 
1142 	/* prepare bio for re-submit,
1143 	 * re-init volatile members */
1144 	/* we still have a local reference,
1145 	 * get_ldev was done in receive_Data. */
1146 	bio->bi_bdev = mdev->ldev->backing_bdev;
1147 	bio->bi_sector = e->sector;
1148 	bio->bi_size = e->size;
1149 	bio->bi_idx = 0;
1150 
1151 	bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1152 	bio->bi_flags |= 1 << BIO_UPTODATE;
1153 
1154 	/* don't know whether this is necessary: */
1155 	bio->bi_phys_segments = 0;
1156 	bio->bi_next = NULL;
1157 
1158 	/* these should be unchanged: */
1159 	/* bio->bi_end_io = drbd_endio_write_sec; */
1160 	/* bio->bi_vcnt = whatever; */
1161 
1162 	e->w.cb = e_end_block;
1163 
1164 	/* This is no longer a barrier request. */
1165 	bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
1166 
1167 	drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
1168 
1169 	return 1;
1170 }
1171 
1172 static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1173 {
1174 	int rv, issue_flush;
1175 	struct p_barrier *p = (struct p_barrier *)h;
1176 	struct drbd_epoch *epoch;
1177 
1178 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1179 
1180 	rv = drbd_recv(mdev, h->payload, h->length);
1181 	ERR_IF(rv != h->length) return FALSE;
1182 
1183 	inc_unacked(mdev);
1184 
1185 	if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1186 		drbd_kick_lo(mdev);
1187 
1188 	mdev->current_epoch->barrier_nr = p->barrier;
1189 	rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1190 
1191 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1192 	 * the activity log, which means it would not be resynced in case the
1193 	 * R_PRIMARY crashes now.
1194 	 * Therefore we must send the barrier_ack after the barrier request was
1195 	 * completed. */
1196 	switch (mdev->write_ordering) {
1197 	case WO_bio_barrier:
1198 	case WO_none:
1199 		if (rv == FE_RECYCLED)
1200 			return TRUE;
1201 		break;
1202 
1203 	case WO_bdev_flush:
1204 	case WO_drain_io:
1205 		D_ASSERT(rv == FE_STILL_LIVE);
1206 		set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1207 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1208 		rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1209 		if (rv == FE_RECYCLED)
1210 			return TRUE;
1211 
1212 		/* The asender will send all the ACKs and barrier ACKs out, since
1213 		   all EEs moved from the active_ee to the done_ee. We need to
1214 		   provide a new epoch object for the EEs that come in soon */
1215 		break;
1216 	}
1217 
1218 	/* receiver context, in the writeout path of the other node.
1219 	 * avoid potential distributed deadlock */
1220 	epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1221 	if (!epoch) {
1222 		dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1223 		issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1224 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1225 		if (issue_flush) {
1226 			rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1227 			if (rv == FE_RECYCLED)
1228 				return TRUE;
1229 		}
1230 
1231 		drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1232 
1233 		return TRUE;
1234 	}
1235 
1236 	epoch->flags = 0;
1237 	atomic_set(&epoch->epoch_size, 0);
1238 	atomic_set(&epoch->active, 0);
1239 
1240 	spin_lock(&mdev->epoch_lock);
1241 	if (atomic_read(&mdev->current_epoch->epoch_size)) {
1242 		list_add(&epoch->list, &mdev->current_epoch->list);
1243 		mdev->current_epoch = epoch;
1244 		mdev->epochs++;
1245 	} else {
1246 		/* The current_epoch got recycled while we allocated this one... */
1247 		kfree(epoch);
1248 	}
1249 	spin_unlock(&mdev->epoch_lock);
1250 
1251 	return TRUE;
1252 }
1253 
1254 /* used from receive_RSDataReply (recv_resync_read)
1255  * and from receive_Data */
1256 static struct drbd_epoch_entry *
1257 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1258 {
1259 	struct drbd_epoch_entry *e;
1260 	struct bio_vec *bvec;
1261 	struct page *page;
1262 	struct bio *bio;
1263 	int dgs, ds, i, rr;
1264 	void *dig_in = mdev->int_dig_in;
1265 	void *dig_vv = mdev->int_dig_vv;
1266 
1267 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1268 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1269 
1270 	if (dgs) {
1271 		rr = drbd_recv(mdev, dig_in, dgs);
1272 		if (rr != dgs) {
1273 			dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1274 			     rr, dgs);
1275 			return NULL;
1276 		}
1277 	}
1278 
1279 	data_size -= dgs;
1280 
1281 	ERR_IF(data_size &  0x1ff) return NULL;
1282 	ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1283 
1284 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1285 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1286 	 * which in turn might block on the other node at this very place.  */
1287 	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1288 	if (!e)
1289 		return NULL;
1290 	bio = e->private_bio;
1291 	ds = data_size;
1292 	bio_for_each_segment(bvec, bio, i) {
1293 		page = bvec->bv_page;
1294 		rr = drbd_recv(mdev, kmap(page), min_t(int, ds, PAGE_SIZE));
1295 		kunmap(page);
1296 		if (rr != min_t(int, ds, PAGE_SIZE)) {
1297 			drbd_free_ee(mdev, e);
1298 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1299 			     rr, min_t(int, ds, PAGE_SIZE));
1300 			return NULL;
1301 		}
1302 		ds -= rr;
1303 	}
1304 
1305 	if (dgs) {
1306 		drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1307 		if (memcmp(dig_in, dig_vv, dgs)) {
1308 			dev_err(DEV, "Digest integrity check FAILED.\n");
1309 			drbd_bcast_ee(mdev, "digest failed",
1310 					dgs, dig_in, dig_vv, e);
1311 			drbd_free_ee(mdev, e);
1312 			return NULL;
1313 		}
1314 	}
1315 	mdev->recv_cnt += data_size>>9;
1316 	return e;
1317 }
1318 
1319 /* drbd_drain_block() just takes a data block
1320  * out of the socket input buffer, and discards it.
1321  */
1322 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1323 {
1324 	struct page *page;
1325 	int rr, rv = 1;
1326 	void *data;
1327 
1328 	page = drbd_pp_alloc(mdev, 1);
1329 
1330 	data = kmap(page);
1331 	while (data_size) {
1332 		rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1333 		if (rr != min_t(int, data_size, PAGE_SIZE)) {
1334 			rv = 0;
1335 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1336 			     rr, min_t(int, data_size, PAGE_SIZE));
1337 			break;
1338 		}
1339 		data_size -= rr;
1340 	}
1341 	kunmap(page);
1342 	drbd_pp_free(mdev, page);
1343 	return rv;
1344 }
1345 
1346 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1347 			   sector_t sector, int data_size)
1348 {
1349 	struct bio_vec *bvec;
1350 	struct bio *bio;
1351 	int dgs, rr, i, expect;
1352 	void *dig_in = mdev->int_dig_in;
1353 	void *dig_vv = mdev->int_dig_vv;
1354 
1355 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1356 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1357 
1358 	if (dgs) {
1359 		rr = drbd_recv(mdev, dig_in, dgs);
1360 		if (rr != dgs) {
1361 			dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1362 			     rr, dgs);
1363 			return 0;
1364 		}
1365 	}
1366 
1367 	data_size -= dgs;
1368 
1369 	/* optimistically update recv_cnt.  if receiving fails below,
1370 	 * we disconnect anyways, and counters will be reset. */
1371 	mdev->recv_cnt += data_size>>9;
1372 
1373 	bio = req->master_bio;
1374 	D_ASSERT(sector == bio->bi_sector);
1375 
1376 	bio_for_each_segment(bvec, bio, i) {
1377 		expect = min_t(int, data_size, bvec->bv_len);
1378 		rr = drbd_recv(mdev,
1379 			     kmap(bvec->bv_page)+bvec->bv_offset,
1380 			     expect);
1381 		kunmap(bvec->bv_page);
1382 		if (rr != expect) {
1383 			dev_warn(DEV, "short read receiving data reply: "
1384 			     "read %d expected %d\n",
1385 			     rr, expect);
1386 			return 0;
1387 		}
1388 		data_size -= rr;
1389 	}
1390 
1391 	if (dgs) {
1392 		drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1393 		if (memcmp(dig_in, dig_vv, dgs)) {
1394 			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1395 			return 0;
1396 		}
1397 	}
1398 
1399 	D_ASSERT(data_size == 0);
1400 	return 1;
1401 }
1402 
1403 /* e_end_resync_block() is called via
1404  * drbd_process_done_ee() by asender only */
1405 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1406 {
1407 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1408 	sector_t sector = e->sector;
1409 	int ok;
1410 
1411 	D_ASSERT(hlist_unhashed(&e->colision));
1412 
1413 	if (likely(drbd_bio_uptodate(e->private_bio))) {
1414 		drbd_set_in_sync(mdev, sector, e->size);
1415 		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1416 	} else {
1417 		/* Record failure to sync */
1418 		drbd_rs_failed_io(mdev, sector, e->size);
1419 
1420 		ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1421 	}
1422 	dec_unacked(mdev);
1423 
1424 	return ok;
1425 }
1426 
1427 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1428 {
1429 	struct drbd_epoch_entry *e;
1430 
1431 	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1432 	if (!e) {
1433 		put_ldev(mdev);
1434 		return FALSE;
1435 	}
1436 
1437 	dec_rs_pending(mdev);
1438 
1439 	e->private_bio->bi_end_io = drbd_endio_write_sec;
1440 	e->private_bio->bi_rw = WRITE;
1441 	e->w.cb = e_end_resync_block;
1442 
1443 	inc_unacked(mdev);
1444 	/* corresponding dec_unacked() in e_end_resync_block()
1445 	 * respective _drbd_clear_done_ee */
1446 
1447 	spin_lock_irq(&mdev->req_lock);
1448 	list_add(&e->w.list, &mdev->sync_ee);
1449 	spin_unlock_irq(&mdev->req_lock);
1450 
1451 	drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
1452 	/* accounting done in endio */
1453 
1454 	maybe_kick_lo(mdev);
1455 	return TRUE;
1456 }
1457 
1458 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1459 {
1460 	struct drbd_request *req;
1461 	sector_t sector;
1462 	unsigned int header_size, data_size;
1463 	int ok;
1464 	struct p_data *p = (struct p_data *)h;
1465 
1466 	header_size = sizeof(*p) - sizeof(*h);
1467 	data_size   = h->length  - header_size;
1468 
1469 	ERR_IF(data_size == 0) return FALSE;
1470 
1471 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1472 		return FALSE;
1473 
1474 	sector = be64_to_cpu(p->sector);
1475 
1476 	spin_lock_irq(&mdev->req_lock);
1477 	req = _ar_id_to_req(mdev, p->block_id, sector);
1478 	spin_unlock_irq(&mdev->req_lock);
1479 	if (unlikely(!req)) {
1480 		dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1481 		return FALSE;
1482 	}
1483 
1484 	/* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1485 	 * special casing it there for the various failure cases.
1486 	 * still no race with drbd_fail_pending_reads */
1487 	ok = recv_dless_read(mdev, req, sector, data_size);
1488 
1489 	if (ok)
1490 		req_mod(req, data_received);
1491 	/* else: nothing. handled from drbd_disconnect...
1492 	 * I don't think we may complete this just yet
1493 	 * in case we are "on-disconnect: freeze" */
1494 
1495 	return ok;
1496 }
1497 
1498 static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1499 {
1500 	sector_t sector;
1501 	unsigned int header_size, data_size;
1502 	int ok;
1503 	struct p_data *p = (struct p_data *)h;
1504 
1505 	header_size = sizeof(*p) - sizeof(*h);
1506 	data_size   = h->length  - header_size;
1507 
1508 	ERR_IF(data_size == 0) return FALSE;
1509 
1510 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1511 		return FALSE;
1512 
1513 	sector = be64_to_cpu(p->sector);
1514 	D_ASSERT(p->block_id == ID_SYNCER);
1515 
1516 	if (get_ldev(mdev)) {
1517 		/* data is submitted to disk within recv_resync_read.
1518 		 * corresponding put_ldev done below on error,
1519 		 * or in drbd_endio_write_sec. */
1520 		ok = recv_resync_read(mdev, sector, data_size);
1521 	} else {
1522 		if (__ratelimit(&drbd_ratelimit_state))
1523 			dev_err(DEV, "Can not write resync data to local disk.\n");
1524 
1525 		ok = drbd_drain_block(mdev, data_size);
1526 
1527 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1528 	}
1529 
1530 	return ok;
1531 }
1532 
1533 /* e_end_block() is called via drbd_process_done_ee().
1534  * this means this function only runs in the asender thread
1535  */
1536 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1537 {
1538 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1539 	sector_t sector = e->sector;
1540 	struct drbd_epoch *epoch;
1541 	int ok = 1, pcmd;
1542 
1543 	if (e->flags & EE_IS_BARRIER) {
1544 		epoch = previous_epoch(mdev, e->epoch);
1545 		if (epoch)
1546 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1547 	}
1548 
1549 	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1550 		if (likely(drbd_bio_uptodate(e->private_bio))) {
1551 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1552 				mdev->state.conn <= C_PAUSED_SYNC_T &&
1553 				e->flags & EE_MAY_SET_IN_SYNC) ?
1554 				P_RS_WRITE_ACK : P_WRITE_ACK;
1555 			ok &= drbd_send_ack(mdev, pcmd, e);
1556 			if (pcmd == P_RS_WRITE_ACK)
1557 				drbd_set_in_sync(mdev, sector, e->size);
1558 		} else {
1559 			ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1560 			/* we expect it to be marked out of sync anyways...
1561 			 * maybe assert this?  */
1562 		}
1563 		dec_unacked(mdev);
1564 	}
1565 	/* we delete from the conflict detection hash _after_ we sent out the
1566 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1567 	if (mdev->net_conf->two_primaries) {
1568 		spin_lock_irq(&mdev->req_lock);
1569 		D_ASSERT(!hlist_unhashed(&e->colision));
1570 		hlist_del_init(&e->colision);
1571 		spin_unlock_irq(&mdev->req_lock);
1572 	} else {
1573 		D_ASSERT(hlist_unhashed(&e->colision));
1574 	}
1575 
1576 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1577 
1578 	return ok;
1579 }
1580 
1581 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1582 {
1583 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1584 	int ok = 1;
1585 
1586 	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1587 	ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1588 
1589 	spin_lock_irq(&mdev->req_lock);
1590 	D_ASSERT(!hlist_unhashed(&e->colision));
1591 	hlist_del_init(&e->colision);
1592 	spin_unlock_irq(&mdev->req_lock);
1593 
1594 	dec_unacked(mdev);
1595 
1596 	return ok;
1597 }
1598 
1599 /* Called from receive_Data.
1600  * Synchronize packets on sock with packets on msock.
1601  *
1602  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1603  * packet traveling on msock, they are still processed in the order they have
1604  * been sent.
1605  *
1606  * Note: we don't care for Ack packets overtaking P_DATA packets.
1607  *
1608  * In case packet_seq is larger than mdev->peer_seq number, there are
1609  * outstanding packets on the msock. We wait for them to arrive.
1610  * In case we are the logically next packet, we update mdev->peer_seq
1611  * ourselves. Correctly handles 32bit wrap around.
1612  *
1613  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1614  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1615  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1616  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1617  *
1618  * returns 0 if we may process the packet,
1619  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1620 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1621 {
1622 	DEFINE_WAIT(wait);
1623 	unsigned int p_seq;
1624 	long timeout;
1625 	int ret = 0;
1626 	spin_lock(&mdev->peer_seq_lock);
1627 	for (;;) {
1628 		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1629 		if (seq_le(packet_seq, mdev->peer_seq+1))
1630 			break;
1631 		if (signal_pending(current)) {
1632 			ret = -ERESTARTSYS;
1633 			break;
1634 		}
1635 		p_seq = mdev->peer_seq;
1636 		spin_unlock(&mdev->peer_seq_lock);
1637 		timeout = schedule_timeout(30*HZ);
1638 		spin_lock(&mdev->peer_seq_lock);
1639 		if (timeout == 0 && p_seq == mdev->peer_seq) {
1640 			ret = -ETIMEDOUT;
1641 			dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1642 			break;
1643 		}
1644 	}
1645 	finish_wait(&mdev->seq_wait, &wait);
1646 	if (mdev->peer_seq+1 == packet_seq)
1647 		mdev->peer_seq++;
1648 	spin_unlock(&mdev->peer_seq_lock);
1649 	return ret;
1650 }
1651 
1652 /* mirrored write */
1653 static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1654 {
1655 	sector_t sector;
1656 	struct drbd_epoch_entry *e;
1657 	struct p_data *p = (struct p_data *)h;
1658 	int header_size, data_size;
1659 	int rw = WRITE;
1660 	u32 dp_flags;
1661 
1662 	header_size = sizeof(*p) - sizeof(*h);
1663 	data_size   = h->length  - header_size;
1664 
1665 	ERR_IF(data_size == 0) return FALSE;
1666 
1667 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1668 		return FALSE;
1669 
1670 	if (!get_ldev(mdev)) {
1671 		if (__ratelimit(&drbd_ratelimit_state))
1672 			dev_err(DEV, "Can not write mirrored data block "
1673 			    "to local disk.\n");
1674 		spin_lock(&mdev->peer_seq_lock);
1675 		if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1676 			mdev->peer_seq++;
1677 		spin_unlock(&mdev->peer_seq_lock);
1678 
1679 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1680 		atomic_inc(&mdev->current_epoch->epoch_size);
1681 		return drbd_drain_block(mdev, data_size);
1682 	}
1683 
1684 	/* get_ldev(mdev) successful.
1685 	 * Corresponding put_ldev done either below (on various errors),
1686 	 * or in drbd_endio_write_sec, if we successfully submit the data at
1687 	 * the end of this function. */
1688 
1689 	sector = be64_to_cpu(p->sector);
1690 	e = read_in_block(mdev, p->block_id, sector, data_size);
1691 	if (!e) {
1692 		put_ldev(mdev);
1693 		return FALSE;
1694 	}
1695 
1696 	e->private_bio->bi_end_io = drbd_endio_write_sec;
1697 	e->w.cb = e_end_block;
1698 
1699 	spin_lock(&mdev->epoch_lock);
1700 	e->epoch = mdev->current_epoch;
1701 	atomic_inc(&e->epoch->epoch_size);
1702 	atomic_inc(&e->epoch->active);
1703 
1704 	if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1705 		struct drbd_epoch *epoch;
1706 		/* Issue a barrier if we start a new epoch, and the previous epoch
1707 		   was not a epoch containing a single request which already was
1708 		   a Barrier. */
1709 		epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1710 		if (epoch == e->epoch) {
1711 			set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1712 			rw |= (1<<BIO_RW_BARRIER);
1713 			e->flags |= EE_IS_BARRIER;
1714 		} else {
1715 			if (atomic_read(&epoch->epoch_size) > 1 ||
1716 			    !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1717 				set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1718 				set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1719 				rw |= (1<<BIO_RW_BARRIER);
1720 				e->flags |= EE_IS_BARRIER;
1721 			}
1722 		}
1723 	}
1724 	spin_unlock(&mdev->epoch_lock);
1725 
1726 	dp_flags = be32_to_cpu(p->dp_flags);
1727 	if (dp_flags & DP_HARDBARRIER) {
1728 		dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1729 		/* rw |= (1<<BIO_RW_BARRIER); */
1730 	}
1731 	if (dp_flags & DP_RW_SYNC)
1732 		rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1733 	if (dp_flags & DP_MAY_SET_IN_SYNC)
1734 		e->flags |= EE_MAY_SET_IN_SYNC;
1735 
1736 	/* I'm the receiver, I do hold a net_cnt reference. */
1737 	if (!mdev->net_conf->two_primaries) {
1738 		spin_lock_irq(&mdev->req_lock);
1739 	} else {
1740 		/* don't get the req_lock yet,
1741 		 * we may sleep in drbd_wait_peer_seq */
1742 		const int size = e->size;
1743 		const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1744 		DEFINE_WAIT(wait);
1745 		struct drbd_request *i;
1746 		struct hlist_node *n;
1747 		struct hlist_head *slot;
1748 		int first;
1749 
1750 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1751 		BUG_ON(mdev->ee_hash == NULL);
1752 		BUG_ON(mdev->tl_hash == NULL);
1753 
1754 		/* conflict detection and handling:
1755 		 * 1. wait on the sequence number,
1756 		 *    in case this data packet overtook ACK packets.
1757 		 * 2. check our hash tables for conflicting requests.
1758 		 *    we only need to walk the tl_hash, since an ee can not
1759 		 *    have a conflict with an other ee: on the submitting
1760 		 *    node, the corresponding req had already been conflicting,
1761 		 *    and a conflicting req is never sent.
1762 		 *
1763 		 * Note: for two_primaries, we are protocol C,
1764 		 * so there cannot be any request that is DONE
1765 		 * but still on the transfer log.
1766 		 *
1767 		 * unconditionally add to the ee_hash.
1768 		 *
1769 		 * if no conflicting request is found:
1770 		 *    submit.
1771 		 *
1772 		 * if any conflicting request is found
1773 		 * that has not yet been acked,
1774 		 * AND I have the "discard concurrent writes" flag:
1775 		 *	 queue (via done_ee) the P_DISCARD_ACK; OUT.
1776 		 *
1777 		 * if any conflicting request is found:
1778 		 *	 block the receiver, waiting on misc_wait
1779 		 *	 until no more conflicting requests are there,
1780 		 *	 or we get interrupted (disconnect).
1781 		 *
1782 		 *	 we do not just write after local io completion of those
1783 		 *	 requests, but only after req is done completely, i.e.
1784 		 *	 we wait for the P_DISCARD_ACK to arrive!
1785 		 *
1786 		 *	 then proceed normally, i.e. submit.
1787 		 */
1788 		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1789 			goto out_interrupted;
1790 
1791 		spin_lock_irq(&mdev->req_lock);
1792 
1793 		hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1794 
1795 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1796 		slot = tl_hash_slot(mdev, sector);
1797 		first = 1;
1798 		for (;;) {
1799 			int have_unacked = 0;
1800 			int have_conflict = 0;
1801 			prepare_to_wait(&mdev->misc_wait, &wait,
1802 				TASK_INTERRUPTIBLE);
1803 			hlist_for_each_entry(i, n, slot, colision) {
1804 				if (OVERLAPS) {
1805 					/* only ALERT on first iteration,
1806 					 * we may be woken up early... */
1807 					if (first)
1808 						dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1809 						      "	new: %llus +%u; pending: %llus +%u\n",
1810 						      current->comm, current->pid,
1811 						      (unsigned long long)sector, size,
1812 						      (unsigned long long)i->sector, i->size);
1813 					if (i->rq_state & RQ_NET_PENDING)
1814 						++have_unacked;
1815 					++have_conflict;
1816 				}
1817 			}
1818 #undef OVERLAPS
1819 			if (!have_conflict)
1820 				break;
1821 
1822 			/* Discard Ack only for the _first_ iteration */
1823 			if (first && discard && have_unacked) {
1824 				dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1825 				     (unsigned long long)sector);
1826 				inc_unacked(mdev);
1827 				e->w.cb = e_send_discard_ack;
1828 				list_add_tail(&e->w.list, &mdev->done_ee);
1829 
1830 				spin_unlock_irq(&mdev->req_lock);
1831 
1832 				/* we could probably send that P_DISCARD_ACK ourselves,
1833 				 * but I don't like the receiver using the msock */
1834 
1835 				put_ldev(mdev);
1836 				wake_asender(mdev);
1837 				finish_wait(&mdev->misc_wait, &wait);
1838 				return TRUE;
1839 			}
1840 
1841 			if (signal_pending(current)) {
1842 				hlist_del_init(&e->colision);
1843 
1844 				spin_unlock_irq(&mdev->req_lock);
1845 
1846 				finish_wait(&mdev->misc_wait, &wait);
1847 				goto out_interrupted;
1848 			}
1849 
1850 			spin_unlock_irq(&mdev->req_lock);
1851 			if (first) {
1852 				first = 0;
1853 				dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1854 				     "sec=%llus\n", (unsigned long long)sector);
1855 			} else if (discard) {
1856 				/* we had none on the first iteration.
1857 				 * there must be none now. */
1858 				D_ASSERT(have_unacked == 0);
1859 			}
1860 			schedule();
1861 			spin_lock_irq(&mdev->req_lock);
1862 		}
1863 		finish_wait(&mdev->misc_wait, &wait);
1864 	}
1865 
1866 	list_add(&e->w.list, &mdev->active_ee);
1867 	spin_unlock_irq(&mdev->req_lock);
1868 
1869 	switch (mdev->net_conf->wire_protocol) {
1870 	case DRBD_PROT_C:
1871 		inc_unacked(mdev);
1872 		/* corresponding dec_unacked() in e_end_block()
1873 		 * respective _drbd_clear_done_ee */
1874 		break;
1875 	case DRBD_PROT_B:
1876 		/* I really don't like it that the receiver thread
1877 		 * sends on the msock, but anyways */
1878 		drbd_send_ack(mdev, P_RECV_ACK, e);
1879 		break;
1880 	case DRBD_PROT_A:
1881 		/* nothing to do */
1882 		break;
1883 	}
1884 
1885 	if (mdev->state.pdsk == D_DISKLESS) {
1886 		/* In case we have the only disk of the cluster, */
1887 		drbd_set_out_of_sync(mdev, e->sector, e->size);
1888 		e->flags |= EE_CALL_AL_COMPLETE_IO;
1889 		drbd_al_begin_io(mdev, e->sector);
1890 	}
1891 
1892 	e->private_bio->bi_rw = rw;
1893 	drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
1894 	/* accounting done in endio */
1895 
1896 	maybe_kick_lo(mdev);
1897 	return TRUE;
1898 
1899 out_interrupted:
1900 	/* yes, the epoch_size now is imbalanced.
1901 	 * but we drop the connection anyways, so we don't have a chance to
1902 	 * receive a barrier... atomic_inc(&mdev->epoch_size); */
1903 	put_ldev(mdev);
1904 	drbd_free_ee(mdev, e);
1905 	return FALSE;
1906 }
1907 
1908 static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1909 {
1910 	sector_t sector;
1911 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1912 	struct drbd_epoch_entry *e;
1913 	struct digest_info *di = NULL;
1914 	int size, digest_size;
1915 	unsigned int fault_type;
1916 	struct p_block_req *p =
1917 		(struct p_block_req *)h;
1918 	const int brps = sizeof(*p)-sizeof(*h);
1919 
1920 	if (drbd_recv(mdev, h->payload, brps) != brps)
1921 		return FALSE;
1922 
1923 	sector = be64_to_cpu(p->sector);
1924 	size   = be32_to_cpu(p->blksize);
1925 
1926 	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
1927 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1928 				(unsigned long long)sector, size);
1929 		return FALSE;
1930 	}
1931 	if (sector + (size>>9) > capacity) {
1932 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1933 				(unsigned long long)sector, size);
1934 		return FALSE;
1935 	}
1936 
1937 	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1938 		if (__ratelimit(&drbd_ratelimit_state))
1939 			dev_err(DEV, "Can not satisfy peer's read request, "
1940 			    "no local data.\n");
1941 		drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
1942 				 P_NEG_RS_DREPLY , p);
1943 		return TRUE;
1944 	}
1945 
1946 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1947 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1948 	 * which in turn might block on the other node at this very place.  */
1949 	e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1950 	if (!e) {
1951 		put_ldev(mdev);
1952 		return FALSE;
1953 	}
1954 
1955 	e->private_bio->bi_rw = READ;
1956 	e->private_bio->bi_end_io = drbd_endio_read_sec;
1957 
1958 	switch (h->command) {
1959 	case P_DATA_REQUEST:
1960 		e->w.cb = w_e_end_data_req;
1961 		fault_type = DRBD_FAULT_DT_RD;
1962 		break;
1963 	case P_RS_DATA_REQUEST:
1964 		e->w.cb = w_e_end_rsdata_req;
1965 		fault_type = DRBD_FAULT_RS_RD;
1966 		/* Eventually this should become asynchronously. Currently it
1967 		 * blocks the whole receiver just to delay the reading of a
1968 		 * resync data block.
1969 		 * the drbd_work_queue mechanism is made for this...
1970 		 */
1971 		if (!drbd_rs_begin_io(mdev, sector)) {
1972 			/* we have been interrupted,
1973 			 * probably connection lost! */
1974 			D_ASSERT(signal_pending(current));
1975 			goto out_free_e;
1976 		}
1977 		break;
1978 
1979 	case P_OV_REPLY:
1980 	case P_CSUM_RS_REQUEST:
1981 		fault_type = DRBD_FAULT_RS_RD;
1982 		digest_size = h->length - brps ;
1983 		di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
1984 		if (!di)
1985 			goto out_free_e;
1986 
1987 		di->digest_size = digest_size;
1988 		di->digest = (((char *)di)+sizeof(struct digest_info));
1989 
1990 		if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
1991 			goto out_free_e;
1992 
1993 		e->block_id = (u64)(unsigned long)di;
1994 		if (h->command == P_CSUM_RS_REQUEST) {
1995 			D_ASSERT(mdev->agreed_pro_version >= 89);
1996 			e->w.cb = w_e_end_csum_rs_req;
1997 		} else if (h->command == P_OV_REPLY) {
1998 			e->w.cb = w_e_end_ov_reply;
1999 			dec_rs_pending(mdev);
2000 			break;
2001 		}
2002 
2003 		if (!drbd_rs_begin_io(mdev, sector)) {
2004 			/* we have been interrupted, probably connection lost! */
2005 			D_ASSERT(signal_pending(current));
2006 			goto out_free_e;
2007 		}
2008 		break;
2009 
2010 	case P_OV_REQUEST:
2011 		if (mdev->state.conn >= C_CONNECTED &&
2012 		    mdev->state.conn != C_VERIFY_T)
2013 			dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2014 				drbd_conn_str(mdev->state.conn));
2015 		if (mdev->ov_start_sector == ~(sector_t)0 &&
2016 		    mdev->agreed_pro_version >= 90) {
2017 			mdev->ov_start_sector = sector;
2018 			mdev->ov_position = sector;
2019 			mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2020 			dev_info(DEV, "Online Verify start sector: %llu\n",
2021 					(unsigned long long)sector);
2022 		}
2023 		e->w.cb = w_e_end_ov_req;
2024 		fault_type = DRBD_FAULT_RS_RD;
2025 		/* Eventually this should become asynchronous. Currently it
2026 		 * blocks the whole receiver just to delay the reading of a
2027 		 * resync data block.
2028 		 * the drbd_work_queue mechanism is made for this...
2029 		 */
2030 		if (!drbd_rs_begin_io(mdev, sector)) {
2031 			/* we have been interrupted,
2032 			 * probably connection lost! */
2033 			D_ASSERT(signal_pending(current));
2034 			goto out_free_e;
2035 		}
2036 		break;
2037 
2038 
2039 	default:
2040 		dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2041 		    cmdname(h->command));
2042 		fault_type = DRBD_FAULT_MAX;
2043 	}
2044 
2045 	spin_lock_irq(&mdev->req_lock);
2046 	list_add(&e->w.list, &mdev->read_ee);
2047 	spin_unlock_irq(&mdev->req_lock);
2048 
2049 	inc_unacked(mdev);
2050 
2051 	drbd_generic_make_request(mdev, fault_type, e->private_bio);
2052 	maybe_kick_lo(mdev);
2053 
2054 	return TRUE;
2055 
2056 out_free_e:
2057 	kfree(di);
2058 	put_ldev(mdev);
2059 	drbd_free_ee(mdev, e);
2060 	return FALSE;
2061 }
2062 
2063 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2064 {
2065 	int self, peer, rv = -100;
2066 	unsigned long ch_self, ch_peer;
2067 
2068 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2069 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2070 
2071 	ch_peer = mdev->p_uuid[UI_SIZE];
2072 	ch_self = mdev->comm_bm_set;
2073 
2074 	switch (mdev->net_conf->after_sb_0p) {
2075 	case ASB_CONSENSUS:
2076 	case ASB_DISCARD_SECONDARY:
2077 	case ASB_CALL_HELPER:
2078 		dev_err(DEV, "Configuration error.\n");
2079 		break;
2080 	case ASB_DISCONNECT:
2081 		break;
2082 	case ASB_DISCARD_YOUNGER_PRI:
2083 		if (self == 0 && peer == 1) {
2084 			rv = -1;
2085 			break;
2086 		}
2087 		if (self == 1 && peer == 0) {
2088 			rv =  1;
2089 			break;
2090 		}
2091 		/* Else fall through to one of the other strategies... */
2092 	case ASB_DISCARD_OLDER_PRI:
2093 		if (self == 0 && peer == 1) {
2094 			rv = 1;
2095 			break;
2096 		}
2097 		if (self == 1 && peer == 0) {
2098 			rv = -1;
2099 			break;
2100 		}
2101 		/* Else fall through to one of the other strategies... */
2102 		dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2103 		     "Using discard-least-changes instead\n");
2104 	case ASB_DISCARD_ZERO_CHG:
2105 		if (ch_peer == 0 && ch_self == 0) {
2106 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2107 				? -1 : 1;
2108 			break;
2109 		} else {
2110 			if (ch_peer == 0) { rv =  1; break; }
2111 			if (ch_self == 0) { rv = -1; break; }
2112 		}
2113 		if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2114 			break;
2115 	case ASB_DISCARD_LEAST_CHG:
2116 		if	(ch_self < ch_peer)
2117 			rv = -1;
2118 		else if (ch_self > ch_peer)
2119 			rv =  1;
2120 		else /* ( ch_self == ch_peer ) */
2121 		     /* Well, then use something else. */
2122 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2123 				? -1 : 1;
2124 		break;
2125 	case ASB_DISCARD_LOCAL:
2126 		rv = -1;
2127 		break;
2128 	case ASB_DISCARD_REMOTE:
2129 		rv =  1;
2130 	}
2131 
2132 	return rv;
2133 }
2134 
2135 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2136 {
2137 	int self, peer, hg, rv = -100;
2138 
2139 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2140 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2141 
2142 	switch (mdev->net_conf->after_sb_1p) {
2143 	case ASB_DISCARD_YOUNGER_PRI:
2144 	case ASB_DISCARD_OLDER_PRI:
2145 	case ASB_DISCARD_LEAST_CHG:
2146 	case ASB_DISCARD_LOCAL:
2147 	case ASB_DISCARD_REMOTE:
2148 		dev_err(DEV, "Configuration error.\n");
2149 		break;
2150 	case ASB_DISCONNECT:
2151 		break;
2152 	case ASB_CONSENSUS:
2153 		hg = drbd_asb_recover_0p(mdev);
2154 		if (hg == -1 && mdev->state.role == R_SECONDARY)
2155 			rv = hg;
2156 		if (hg == 1  && mdev->state.role == R_PRIMARY)
2157 			rv = hg;
2158 		break;
2159 	case ASB_VIOLENTLY:
2160 		rv = drbd_asb_recover_0p(mdev);
2161 		break;
2162 	case ASB_DISCARD_SECONDARY:
2163 		return mdev->state.role == R_PRIMARY ? 1 : -1;
2164 	case ASB_CALL_HELPER:
2165 		hg = drbd_asb_recover_0p(mdev);
2166 		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2167 			self = drbd_set_role(mdev, R_SECONDARY, 0);
2168 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2169 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2170 			  * we do not need to wait for the after state change work either. */
2171 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2172 			if (self != SS_SUCCESS) {
2173 				drbd_khelper(mdev, "pri-lost-after-sb");
2174 			} else {
2175 				dev_warn(DEV, "Successfully gave up primary role.\n");
2176 				rv = hg;
2177 			}
2178 		} else
2179 			rv = hg;
2180 	}
2181 
2182 	return rv;
2183 }
2184 
2185 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2186 {
2187 	int self, peer, hg, rv = -100;
2188 
2189 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2190 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2191 
2192 	switch (mdev->net_conf->after_sb_2p) {
2193 	case ASB_DISCARD_YOUNGER_PRI:
2194 	case ASB_DISCARD_OLDER_PRI:
2195 	case ASB_DISCARD_LEAST_CHG:
2196 	case ASB_DISCARD_LOCAL:
2197 	case ASB_DISCARD_REMOTE:
2198 	case ASB_CONSENSUS:
2199 	case ASB_DISCARD_SECONDARY:
2200 		dev_err(DEV, "Configuration error.\n");
2201 		break;
2202 	case ASB_VIOLENTLY:
2203 		rv = drbd_asb_recover_0p(mdev);
2204 		break;
2205 	case ASB_DISCONNECT:
2206 		break;
2207 	case ASB_CALL_HELPER:
2208 		hg = drbd_asb_recover_0p(mdev);
2209 		if (hg == -1) {
2210 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2211 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2212 			  * we do not need to wait for the after state change work either. */
2213 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2214 			if (self != SS_SUCCESS) {
2215 				drbd_khelper(mdev, "pri-lost-after-sb");
2216 			} else {
2217 				dev_warn(DEV, "Successfully gave up primary role.\n");
2218 				rv = hg;
2219 			}
2220 		} else
2221 			rv = hg;
2222 	}
2223 
2224 	return rv;
2225 }
2226 
2227 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2228 			   u64 bits, u64 flags)
2229 {
2230 	if (!uuid) {
2231 		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2232 		return;
2233 	}
2234 	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2235 	     text,
2236 	     (unsigned long long)uuid[UI_CURRENT],
2237 	     (unsigned long long)uuid[UI_BITMAP],
2238 	     (unsigned long long)uuid[UI_HISTORY_START],
2239 	     (unsigned long long)uuid[UI_HISTORY_END],
2240 	     (unsigned long long)bits,
2241 	     (unsigned long long)flags);
2242 }
2243 
2244 /*
2245   100	after split brain try auto recover
2246     2	C_SYNC_SOURCE set BitMap
2247     1	C_SYNC_SOURCE use BitMap
2248     0	no Sync
2249    -1	C_SYNC_TARGET use BitMap
2250    -2	C_SYNC_TARGET set BitMap
2251  -100	after split brain, disconnect
2252 -1000	unrelated data
2253  */
2254 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2255 {
2256 	u64 self, peer;
2257 	int i, j;
2258 
2259 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2260 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2261 
2262 	*rule_nr = 10;
2263 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2264 		return 0;
2265 
2266 	*rule_nr = 20;
2267 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2268 	     peer != UUID_JUST_CREATED)
2269 		return -2;
2270 
2271 	*rule_nr = 30;
2272 	if (self != UUID_JUST_CREATED &&
2273 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2274 		return 2;
2275 
2276 	if (self == peer) {
2277 		int rct, dc; /* roles at crash time */
2278 
2279 		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2280 
2281 			if (mdev->agreed_pro_version < 91)
2282 				return -1001;
2283 
2284 			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2285 			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2286 				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2287 				drbd_uuid_set_bm(mdev, 0UL);
2288 
2289 				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2290 					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2291 				*rule_nr = 34;
2292 			} else {
2293 				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2294 				*rule_nr = 36;
2295 			}
2296 
2297 			return 1;
2298 		}
2299 
2300 		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2301 
2302 			if (mdev->agreed_pro_version < 91)
2303 				return -1001;
2304 
2305 			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2306 			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2307 				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2308 
2309 				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2310 				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2311 				mdev->p_uuid[UI_BITMAP] = 0UL;
2312 
2313 				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2314 				*rule_nr = 35;
2315 			} else {
2316 				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2317 				*rule_nr = 37;
2318 			}
2319 
2320 			return -1;
2321 		}
2322 
2323 		/* Common power [off|failure] */
2324 		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2325 			(mdev->p_uuid[UI_FLAGS] & 2);
2326 		/* lowest bit is set when we were primary,
2327 		 * next bit (weight 2) is set when peer was primary */
2328 		*rule_nr = 40;
2329 
2330 		switch (rct) {
2331 		case 0: /* !self_pri && !peer_pri */ return 0;
2332 		case 1: /*  self_pri && !peer_pri */ return 1;
2333 		case 2: /* !self_pri &&  peer_pri */ return -1;
2334 		case 3: /*  self_pri &&  peer_pri */
2335 			dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2336 			return dc ? -1 : 1;
2337 		}
2338 	}
2339 
2340 	*rule_nr = 50;
2341 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2342 	if (self == peer)
2343 		return -1;
2344 
2345 	*rule_nr = 51;
2346 	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2347 	if (self == peer) {
2348 		self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2349 		peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2350 		if (self == peer) {
2351 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2352 			   resync as sync source modifications of the peer's UUIDs. */
2353 
2354 			if (mdev->agreed_pro_version < 91)
2355 				return -1001;
2356 
2357 			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2358 			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2359 			return -1;
2360 		}
2361 	}
2362 
2363 	*rule_nr = 60;
2364 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2365 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2366 		peer = mdev->p_uuid[i] & ~((u64)1);
2367 		if (self == peer)
2368 			return -2;
2369 	}
2370 
2371 	*rule_nr = 70;
2372 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2373 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2374 	if (self == peer)
2375 		return 1;
2376 
2377 	*rule_nr = 71;
2378 	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2379 	if (self == peer) {
2380 		self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2381 		peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2382 		if (self == peer) {
2383 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2384 			   resync as sync source modifications of our UUIDs. */
2385 
2386 			if (mdev->agreed_pro_version < 91)
2387 				return -1001;
2388 
2389 			_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2390 			_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2391 
2392 			dev_info(DEV, "Undid last start of resync:\n");
2393 
2394 			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2395 				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2396 
2397 			return 1;
2398 		}
2399 	}
2400 
2401 
2402 	*rule_nr = 80;
2403 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2404 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2405 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2406 		if (self == peer)
2407 			return 2;
2408 	}
2409 
2410 	*rule_nr = 90;
2411 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2412 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2413 	if (self == peer && self != ((u64)0))
2414 		return 100;
2415 
2416 	*rule_nr = 100;
2417 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2418 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2419 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2420 			peer = mdev->p_uuid[j] & ~((u64)1);
2421 			if (self == peer)
2422 				return -100;
2423 		}
2424 	}
2425 
2426 	return -1000;
2427 }
2428 
2429 /* drbd_sync_handshake() returns the new conn state on success, or
2430    CONN_MASK (-1) on failure.
2431  */
2432 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2433 					   enum drbd_disk_state peer_disk) __must_hold(local)
2434 {
2435 	int hg, rule_nr;
2436 	enum drbd_conns rv = C_MASK;
2437 	enum drbd_disk_state mydisk;
2438 
2439 	mydisk = mdev->state.disk;
2440 	if (mydisk == D_NEGOTIATING)
2441 		mydisk = mdev->new_state_tmp.disk;
2442 
2443 	dev_info(DEV, "drbd_sync_handshake:\n");
2444 	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2445 	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2446 		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2447 
2448 	hg = drbd_uuid_compare(mdev, &rule_nr);
2449 
2450 	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2451 
2452 	if (hg == -1000) {
2453 		dev_alert(DEV, "Unrelated data, aborting!\n");
2454 		return C_MASK;
2455 	}
2456 	if (hg == -1001) {
2457 		dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2458 		return C_MASK;
2459 	}
2460 
2461 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2462 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2463 		int f = (hg == -100) || abs(hg) == 2;
2464 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2465 		if (f)
2466 			hg = hg*2;
2467 		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2468 		     hg > 0 ? "source" : "target");
2469 	}
2470 
2471 	if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2472 		int pcount = (mdev->state.role == R_PRIMARY)
2473 			   + (peer_role == R_PRIMARY);
2474 		int forced = (hg == -100);
2475 
2476 		switch (pcount) {
2477 		case 0:
2478 			hg = drbd_asb_recover_0p(mdev);
2479 			break;
2480 		case 1:
2481 			hg = drbd_asb_recover_1p(mdev);
2482 			break;
2483 		case 2:
2484 			hg = drbd_asb_recover_2p(mdev);
2485 			break;
2486 		}
2487 		if (abs(hg) < 100) {
2488 			dev_warn(DEV, "Split-Brain detected, %d primaries, "
2489 			     "automatically solved. Sync from %s node\n",
2490 			     pcount, (hg < 0) ? "peer" : "this");
2491 			if (forced) {
2492 				dev_warn(DEV, "Doing a full sync, since"
2493 				     " UUIDs where ambiguous.\n");
2494 				hg = hg*2;
2495 			}
2496 		}
2497 	}
2498 
2499 	if (hg == -100) {
2500 		if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2501 			hg = -1;
2502 		if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2503 			hg = 1;
2504 
2505 		if (abs(hg) < 100)
2506 			dev_warn(DEV, "Split-Brain detected, manually solved. "
2507 			     "Sync from %s node\n",
2508 			     (hg < 0) ? "peer" : "this");
2509 	}
2510 
2511 	if (hg == -100) {
2512 		dev_alert(DEV, "Split-Brain detected, dropping connection!\n");
2513 		drbd_khelper(mdev, "split-brain");
2514 		return C_MASK;
2515 	}
2516 
2517 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
2518 		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2519 		return C_MASK;
2520 	}
2521 
2522 	if (hg < 0 && /* by intention we do not use mydisk here. */
2523 	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2524 		switch (mdev->net_conf->rr_conflict) {
2525 		case ASB_CALL_HELPER:
2526 			drbd_khelper(mdev, "pri-lost");
2527 			/* fall through */
2528 		case ASB_DISCONNECT:
2529 			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2530 			return C_MASK;
2531 		case ASB_VIOLENTLY:
2532 			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2533 			     "assumption\n");
2534 		}
2535 	}
2536 
2537 	if (abs(hg) >= 2) {
2538 		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2539 		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2540 			return C_MASK;
2541 	}
2542 
2543 	if (hg > 0) { /* become sync source. */
2544 		rv = C_WF_BITMAP_S;
2545 	} else if (hg < 0) { /* become sync target */
2546 		rv = C_WF_BITMAP_T;
2547 	} else {
2548 		rv = C_CONNECTED;
2549 		if (drbd_bm_total_weight(mdev)) {
2550 			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2551 			     drbd_bm_total_weight(mdev));
2552 		}
2553 	}
2554 
2555 	return rv;
2556 }
2557 
2558 /* returns 1 if invalid */
2559 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2560 {
2561 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2562 	if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2563 	    (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2564 		return 0;
2565 
2566 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2567 	if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2568 	    self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2569 		return 1;
2570 
2571 	/* everything else is valid if they are equal on both sides. */
2572 	if (peer == self)
2573 		return 0;
2574 
2575 	/* everything es is invalid. */
2576 	return 1;
2577 }
2578 
2579 static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2580 {
2581 	struct p_protocol *p = (struct p_protocol *)h;
2582 	int header_size, data_size;
2583 	int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2584 	int p_want_lose, p_two_primaries;
2585 	char p_integrity_alg[SHARED_SECRET_MAX] = "";
2586 
2587 	header_size = sizeof(*p) - sizeof(*h);
2588 	data_size   = h->length  - header_size;
2589 
2590 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2591 		return FALSE;
2592 
2593 	p_proto		= be32_to_cpu(p->protocol);
2594 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
2595 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
2596 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
2597 	p_want_lose	= be32_to_cpu(p->want_lose);
2598 	p_two_primaries = be32_to_cpu(p->two_primaries);
2599 
2600 	if (p_proto != mdev->net_conf->wire_protocol) {
2601 		dev_err(DEV, "incompatible communication protocols\n");
2602 		goto disconnect;
2603 	}
2604 
2605 	if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2606 		dev_err(DEV, "incompatible after-sb-0pri settings\n");
2607 		goto disconnect;
2608 	}
2609 
2610 	if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2611 		dev_err(DEV, "incompatible after-sb-1pri settings\n");
2612 		goto disconnect;
2613 	}
2614 
2615 	if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2616 		dev_err(DEV, "incompatible after-sb-2pri settings\n");
2617 		goto disconnect;
2618 	}
2619 
2620 	if (p_want_lose && mdev->net_conf->want_lose) {
2621 		dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2622 		goto disconnect;
2623 	}
2624 
2625 	if (p_two_primaries != mdev->net_conf->two_primaries) {
2626 		dev_err(DEV, "incompatible setting of the two-primaries options\n");
2627 		goto disconnect;
2628 	}
2629 
2630 	if (mdev->agreed_pro_version >= 87) {
2631 		unsigned char *my_alg = mdev->net_conf->integrity_alg;
2632 
2633 		if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2634 			return FALSE;
2635 
2636 		p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2637 		if (strcmp(p_integrity_alg, my_alg)) {
2638 			dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2639 			goto disconnect;
2640 		}
2641 		dev_info(DEV, "data-integrity-alg: %s\n",
2642 		     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2643 	}
2644 
2645 	return TRUE;
2646 
2647 disconnect:
2648 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2649 	return FALSE;
2650 }
2651 
2652 /* helper function
2653  * input: alg name, feature name
2654  * return: NULL (alg name was "")
2655  *         ERR_PTR(error) if something goes wrong
2656  *         or the crypto hash ptr, if it worked out ok. */
2657 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2658 		const char *alg, const char *name)
2659 {
2660 	struct crypto_hash *tfm;
2661 
2662 	if (!alg[0])
2663 		return NULL;
2664 
2665 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2666 	if (IS_ERR(tfm)) {
2667 		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2668 			alg, name, PTR_ERR(tfm));
2669 		return tfm;
2670 	}
2671 	if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2672 		crypto_free_hash(tfm);
2673 		dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2674 		return ERR_PTR(-EINVAL);
2675 	}
2676 	return tfm;
2677 }
2678 
2679 static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2680 {
2681 	int ok = TRUE;
2682 	struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2683 	unsigned int header_size, data_size, exp_max_sz;
2684 	struct crypto_hash *verify_tfm = NULL;
2685 	struct crypto_hash *csums_tfm = NULL;
2686 	const int apv = mdev->agreed_pro_version;
2687 
2688 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2689 		    : apv == 88 ? sizeof(struct p_rs_param)
2690 					+ SHARED_SECRET_MAX
2691 		    : /* 89 */    sizeof(struct p_rs_param_89);
2692 
2693 	if (h->length > exp_max_sz) {
2694 		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2695 		    h->length, exp_max_sz);
2696 		return FALSE;
2697 	}
2698 
2699 	if (apv <= 88) {
2700 		header_size = sizeof(struct p_rs_param) - sizeof(*h);
2701 		data_size   = h->length  - header_size;
2702 	} else /* apv >= 89 */ {
2703 		header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2704 		data_size   = h->length  - header_size;
2705 		D_ASSERT(data_size == 0);
2706 	}
2707 
2708 	/* initialize verify_alg and csums_alg */
2709 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2710 
2711 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2712 		return FALSE;
2713 
2714 	mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2715 
2716 	if (apv >= 88) {
2717 		if (apv == 88) {
2718 			if (data_size > SHARED_SECRET_MAX) {
2719 				dev_err(DEV, "verify-alg too long, "
2720 				    "peer wants %u, accepting only %u byte\n",
2721 						data_size, SHARED_SECRET_MAX);
2722 				return FALSE;
2723 			}
2724 
2725 			if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2726 				return FALSE;
2727 
2728 			/* we expect NUL terminated string */
2729 			/* but just in case someone tries to be evil */
2730 			D_ASSERT(p->verify_alg[data_size-1] == 0);
2731 			p->verify_alg[data_size-1] = 0;
2732 
2733 		} else /* apv >= 89 */ {
2734 			/* we still expect NUL terminated strings */
2735 			/* but just in case someone tries to be evil */
2736 			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2737 			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2738 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2739 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2740 		}
2741 
2742 		if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2743 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2744 				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2745 				    mdev->sync_conf.verify_alg, p->verify_alg);
2746 				goto disconnect;
2747 			}
2748 			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2749 					p->verify_alg, "verify-alg");
2750 			if (IS_ERR(verify_tfm)) {
2751 				verify_tfm = NULL;
2752 				goto disconnect;
2753 			}
2754 		}
2755 
2756 		if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2757 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2758 				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2759 				    mdev->sync_conf.csums_alg, p->csums_alg);
2760 				goto disconnect;
2761 			}
2762 			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2763 					p->csums_alg, "csums-alg");
2764 			if (IS_ERR(csums_tfm)) {
2765 				csums_tfm = NULL;
2766 				goto disconnect;
2767 			}
2768 		}
2769 
2770 
2771 		spin_lock(&mdev->peer_seq_lock);
2772 		/* lock against drbd_nl_syncer_conf() */
2773 		if (verify_tfm) {
2774 			strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2775 			mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2776 			crypto_free_hash(mdev->verify_tfm);
2777 			mdev->verify_tfm = verify_tfm;
2778 			dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2779 		}
2780 		if (csums_tfm) {
2781 			strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2782 			mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2783 			crypto_free_hash(mdev->csums_tfm);
2784 			mdev->csums_tfm = csums_tfm;
2785 			dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2786 		}
2787 		spin_unlock(&mdev->peer_seq_lock);
2788 	}
2789 
2790 	return ok;
2791 disconnect:
2792 	/* just for completeness: actually not needed,
2793 	 * as this is not reached if csums_tfm was ok. */
2794 	crypto_free_hash(csums_tfm);
2795 	/* but free the verify_tfm again, if csums_tfm did not work out */
2796 	crypto_free_hash(verify_tfm);
2797 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2798 	return FALSE;
2799 }
2800 
2801 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2802 {
2803 	/* sorry, we currently have no working implementation
2804 	 * of distributed TCQ */
2805 }
2806 
2807 /* warn if the arguments differ by more than 12.5% */
2808 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2809 	const char *s, sector_t a, sector_t b)
2810 {
2811 	sector_t d;
2812 	if (a == 0 || b == 0)
2813 		return;
2814 	d = (a > b) ? (a - b) : (b - a);
2815 	if (d > (a>>3) || d > (b>>3))
2816 		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2817 		     (unsigned long long)a, (unsigned long long)b);
2818 }
2819 
2820 static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2821 {
2822 	struct p_sizes *p = (struct p_sizes *)h;
2823 	enum determine_dev_size dd = unchanged;
2824 	unsigned int max_seg_s;
2825 	sector_t p_size, p_usize, my_usize;
2826 	int ldsc = 0; /* local disk size changed */
2827 	enum drbd_conns nconn;
2828 
2829 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2830 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
2831 		return FALSE;
2832 
2833 	p_size = be64_to_cpu(p->d_size);
2834 	p_usize = be64_to_cpu(p->u_size);
2835 
2836 	if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2837 		dev_err(DEV, "some backing storage is needed\n");
2838 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2839 		return FALSE;
2840 	}
2841 
2842 	/* just store the peer's disk size for now.
2843 	 * we still need to figure out whether we accept that. */
2844 	mdev->p_size = p_size;
2845 
2846 #define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2847 	if (get_ldev(mdev)) {
2848 		warn_if_differ_considerably(mdev, "lower level device sizes",
2849 			   p_size, drbd_get_max_capacity(mdev->ldev));
2850 		warn_if_differ_considerably(mdev, "user requested size",
2851 					    p_usize, mdev->ldev->dc.disk_size);
2852 
2853 		/* if this is the first connect, or an otherwise expected
2854 		 * param exchange, choose the minimum */
2855 		if (mdev->state.conn == C_WF_REPORT_PARAMS)
2856 			p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2857 					     p_usize);
2858 
2859 		my_usize = mdev->ldev->dc.disk_size;
2860 
2861 		if (mdev->ldev->dc.disk_size != p_usize) {
2862 			mdev->ldev->dc.disk_size = p_usize;
2863 			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2864 			     (unsigned long)mdev->ldev->dc.disk_size);
2865 		}
2866 
2867 		/* Never shrink a device with usable data during connect.
2868 		   But allow online shrinking if we are connected. */
2869 		if (drbd_new_dev_size(mdev, mdev->ldev) <
2870 		   drbd_get_capacity(mdev->this_bdev) &&
2871 		   mdev->state.disk >= D_OUTDATED &&
2872 		   mdev->state.conn < C_CONNECTED) {
2873 			dev_err(DEV, "The peer's disk size is too small!\n");
2874 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2875 			mdev->ldev->dc.disk_size = my_usize;
2876 			put_ldev(mdev);
2877 			return FALSE;
2878 		}
2879 		put_ldev(mdev);
2880 	}
2881 #undef min_not_zero
2882 
2883 	if (get_ldev(mdev)) {
2884 		dd = drbd_determin_dev_size(mdev);
2885 		put_ldev(mdev);
2886 		if (dd == dev_size_error)
2887 			return FALSE;
2888 		drbd_md_sync(mdev);
2889 	} else {
2890 		/* I am diskless, need to accept the peer's size. */
2891 		drbd_set_my_capacity(mdev, p_size);
2892 	}
2893 
2894 	if (mdev->p_uuid && mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
2895 		nconn = drbd_sync_handshake(mdev,
2896 				mdev->state.peer, mdev->state.pdsk);
2897 		put_ldev(mdev);
2898 
2899 		if (nconn == C_MASK) {
2900 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2901 			return FALSE;
2902 		}
2903 
2904 		if (drbd_request_state(mdev, NS(conn, nconn)) < SS_SUCCESS) {
2905 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2906 			return FALSE;
2907 		}
2908 	}
2909 
2910 	if (get_ldev(mdev)) {
2911 		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
2912 			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
2913 			ldsc = 1;
2914 		}
2915 
2916 		max_seg_s = be32_to_cpu(p->max_segment_size);
2917 		if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
2918 			drbd_setup_queue_param(mdev, max_seg_s);
2919 
2920 		drbd_setup_order_type(mdev, be32_to_cpu(p->queue_order_type));
2921 		put_ldev(mdev);
2922 	}
2923 
2924 	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
2925 		if (be64_to_cpu(p->c_size) !=
2926 		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
2927 			/* we have different sizes, probably peer
2928 			 * needs to know my new size... */
2929 			drbd_send_sizes(mdev, 0);
2930 		}
2931 		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
2932 		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
2933 			if (mdev->state.pdsk >= D_INCONSISTENT &&
2934 			    mdev->state.disk >= D_INCONSISTENT)
2935 				resync_after_online_grow(mdev);
2936 			else
2937 				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
2938 		}
2939 	}
2940 
2941 	return TRUE;
2942 }
2943 
2944 static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
2945 {
2946 	struct p_uuids *p = (struct p_uuids *)h;
2947 	u64 *p_uuid;
2948 	int i;
2949 
2950 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2951 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
2952 		return FALSE;
2953 
2954 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
2955 
2956 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
2957 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
2958 
2959 	kfree(mdev->p_uuid);
2960 	mdev->p_uuid = p_uuid;
2961 
2962 	if (mdev->state.conn < C_CONNECTED &&
2963 	    mdev->state.disk < D_INCONSISTENT &&
2964 	    mdev->state.role == R_PRIMARY &&
2965 	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
2966 		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
2967 		    (unsigned long long)mdev->ed_uuid);
2968 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2969 		return FALSE;
2970 	}
2971 
2972 	if (get_ldev(mdev)) {
2973 		int skip_initial_sync =
2974 			mdev->state.conn == C_CONNECTED &&
2975 			mdev->agreed_pro_version >= 90 &&
2976 			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
2977 			(p_uuid[UI_FLAGS] & 8);
2978 		if (skip_initial_sync) {
2979 			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
2980 			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
2981 					"clear_n_write from receive_uuids");
2982 			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
2983 			_drbd_uuid_set(mdev, UI_BITMAP, 0);
2984 			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
2985 					CS_VERBOSE, NULL);
2986 			drbd_md_sync(mdev);
2987 		}
2988 		put_ldev(mdev);
2989 	}
2990 
2991 	/* Before we test for the disk state, we should wait until an eventually
2992 	   ongoing cluster wide state change is finished. That is important if
2993 	   we are primary and are detaching from our disk. We need to see the
2994 	   new disk state... */
2995 	wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
2996 	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
2997 		drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
2998 
2999 	return TRUE;
3000 }
3001 
3002 /**
3003  * convert_state() - Converts the peer's view of the cluster state to our point of view
3004  * @ps:		The state as seen by the peer.
3005  */
3006 static union drbd_state convert_state(union drbd_state ps)
3007 {
3008 	union drbd_state ms;
3009 
3010 	static enum drbd_conns c_tab[] = {
3011 		[C_CONNECTED] = C_CONNECTED,
3012 
3013 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3014 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3015 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3016 		[C_VERIFY_S]       = C_VERIFY_T,
3017 		[C_MASK]   = C_MASK,
3018 	};
3019 
3020 	ms.i = ps.i;
3021 
3022 	ms.conn = c_tab[ps.conn];
3023 	ms.peer = ps.role;
3024 	ms.role = ps.peer;
3025 	ms.pdsk = ps.disk;
3026 	ms.disk = ps.pdsk;
3027 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3028 
3029 	return ms;
3030 }
3031 
3032 static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3033 {
3034 	struct p_req_state *p = (struct p_req_state *)h;
3035 	union drbd_state mask, val;
3036 	int rv;
3037 
3038 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3039 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3040 		return FALSE;
3041 
3042 	mask.i = be32_to_cpu(p->mask);
3043 	val.i = be32_to_cpu(p->val);
3044 
3045 	if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3046 	    test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3047 		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3048 		return TRUE;
3049 	}
3050 
3051 	mask = convert_state(mask);
3052 	val = convert_state(val);
3053 
3054 	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3055 
3056 	drbd_send_sr_reply(mdev, rv);
3057 	drbd_md_sync(mdev);
3058 
3059 	return TRUE;
3060 }
3061 
3062 static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3063 {
3064 	struct p_state *p = (struct p_state *)h;
3065 	enum drbd_conns nconn, oconn;
3066 	union drbd_state ns, peer_state;
3067 	enum drbd_disk_state real_peer_disk;
3068 	int rv;
3069 
3070 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3071 		return FALSE;
3072 
3073 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3074 		return FALSE;
3075 
3076 	peer_state.i = be32_to_cpu(p->state);
3077 
3078 	real_peer_disk = peer_state.disk;
3079 	if (peer_state.disk == D_NEGOTIATING) {
3080 		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3081 		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3082 	}
3083 
3084 	spin_lock_irq(&mdev->req_lock);
3085  retry:
3086 	oconn = nconn = mdev->state.conn;
3087 	spin_unlock_irq(&mdev->req_lock);
3088 
3089 	if (nconn == C_WF_REPORT_PARAMS)
3090 		nconn = C_CONNECTED;
3091 
3092 	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3093 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3094 		int cr; /* consider resync */
3095 
3096 		/* if we established a new connection */
3097 		cr  = (oconn < C_CONNECTED);
3098 		/* if we had an established connection
3099 		 * and one of the nodes newly attaches a disk */
3100 		cr |= (oconn == C_CONNECTED &&
3101 		       (peer_state.disk == D_NEGOTIATING ||
3102 			mdev->state.disk == D_NEGOTIATING));
3103 		/* if we have both been inconsistent, and the peer has been
3104 		 * forced to be UpToDate with --overwrite-data */
3105 		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3106 		/* if we had been plain connected, and the admin requested to
3107 		 * start a sync by "invalidate" or "invalidate-remote" */
3108 		cr |= (oconn == C_CONNECTED &&
3109 				(peer_state.conn >= C_STARTING_SYNC_S &&
3110 				 peer_state.conn <= C_WF_BITMAP_T));
3111 
3112 		if (cr)
3113 			nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3114 
3115 		put_ldev(mdev);
3116 		if (nconn == C_MASK) {
3117 			if (mdev->state.disk == D_NEGOTIATING) {
3118 				drbd_force_state(mdev, NS(disk, D_DISKLESS));
3119 				nconn = C_CONNECTED;
3120 			} else if (peer_state.disk == D_NEGOTIATING) {
3121 				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3122 				peer_state.disk = D_DISKLESS;
3123 			} else {
3124 				D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3125 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3126 				return FALSE;
3127 			}
3128 		}
3129 	}
3130 
3131 	spin_lock_irq(&mdev->req_lock);
3132 	if (mdev->state.conn != oconn)
3133 		goto retry;
3134 	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3135 	ns.i = mdev->state.i;
3136 	ns.conn = nconn;
3137 	ns.peer = peer_state.role;
3138 	ns.pdsk = real_peer_disk;
3139 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3140 	if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3141 		ns.disk = mdev->new_state_tmp.disk;
3142 
3143 	rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3144 	ns = mdev->state;
3145 	spin_unlock_irq(&mdev->req_lock);
3146 
3147 	if (rv < SS_SUCCESS) {
3148 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3149 		return FALSE;
3150 	}
3151 
3152 	if (oconn > C_WF_REPORT_PARAMS) {
3153 		if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3154 		    peer_state.disk != D_NEGOTIATING ) {
3155 			/* we want resync, peer has not yet decided to sync... */
3156 			/* Nowadays only used when forcing a node into primary role and
3157 			   setting its disk to UpToDate with that */
3158 			drbd_send_uuids(mdev);
3159 			drbd_send_state(mdev);
3160 		}
3161 	}
3162 
3163 	mdev->net_conf->want_lose = 0;
3164 
3165 	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3166 
3167 	return TRUE;
3168 }
3169 
3170 static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3171 {
3172 	struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3173 
3174 	wait_event(mdev->misc_wait,
3175 		   mdev->state.conn == C_WF_SYNC_UUID ||
3176 		   mdev->state.conn < C_CONNECTED ||
3177 		   mdev->state.disk < D_NEGOTIATING);
3178 
3179 	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3180 
3181 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3182 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3183 		return FALSE;
3184 
3185 	/* Here the _drbd_uuid_ functions are right, current should
3186 	   _not_ be rotated into the history */
3187 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3188 		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3189 		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3190 
3191 		drbd_start_resync(mdev, C_SYNC_TARGET);
3192 
3193 		put_ldev(mdev);
3194 	} else
3195 		dev_err(DEV, "Ignoring SyncUUID packet!\n");
3196 
3197 	return TRUE;
3198 }
3199 
3200 enum receive_bitmap_ret { OK, DONE, FAILED };
3201 
3202 static enum receive_bitmap_ret
3203 receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3204 	unsigned long *buffer, struct bm_xfer_ctx *c)
3205 {
3206 	unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3207 	unsigned want = num_words * sizeof(long);
3208 
3209 	if (want != h->length) {
3210 		dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3211 		return FAILED;
3212 	}
3213 	if (want == 0)
3214 		return DONE;
3215 	if (drbd_recv(mdev, buffer, want) != want)
3216 		return FAILED;
3217 
3218 	drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3219 
3220 	c->word_offset += num_words;
3221 	c->bit_offset = c->word_offset * BITS_PER_LONG;
3222 	if (c->bit_offset > c->bm_bits)
3223 		c->bit_offset = c->bm_bits;
3224 
3225 	return OK;
3226 }
3227 
3228 static enum receive_bitmap_ret
3229 recv_bm_rle_bits(struct drbd_conf *mdev,
3230 		struct p_compressed_bm *p,
3231 		struct bm_xfer_ctx *c)
3232 {
3233 	struct bitstream bs;
3234 	u64 look_ahead;
3235 	u64 rl;
3236 	u64 tmp;
3237 	unsigned long s = c->bit_offset;
3238 	unsigned long e;
3239 	int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3240 	int toggle = DCBP_get_start(p);
3241 	int have;
3242 	int bits;
3243 
3244 	bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3245 
3246 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
3247 	if (bits < 0)
3248 		return FAILED;
3249 
3250 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
3251 		bits = vli_decode_bits(&rl, look_ahead);
3252 		if (bits <= 0)
3253 			return FAILED;
3254 
3255 		if (toggle) {
3256 			e = s + rl -1;
3257 			if (e >= c->bm_bits) {
3258 				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3259 				return FAILED;
3260 			}
3261 			_drbd_bm_set_bits(mdev, s, e);
3262 		}
3263 
3264 		if (have < bits) {
3265 			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3266 				have, bits, look_ahead,
3267 				(unsigned int)(bs.cur.b - p->code),
3268 				(unsigned int)bs.buf_len);
3269 			return FAILED;
3270 		}
3271 		look_ahead >>= bits;
3272 		have -= bits;
3273 
3274 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3275 		if (bits < 0)
3276 			return FAILED;
3277 		look_ahead |= tmp << have;
3278 		have += bits;
3279 	}
3280 
3281 	c->bit_offset = s;
3282 	bm_xfer_ctx_bit_to_word_offset(c);
3283 
3284 	return (s == c->bm_bits) ? DONE : OK;
3285 }
3286 
3287 static enum receive_bitmap_ret
3288 decode_bitmap_c(struct drbd_conf *mdev,
3289 		struct p_compressed_bm *p,
3290 		struct bm_xfer_ctx *c)
3291 {
3292 	if (DCBP_get_code(p) == RLE_VLI_Bits)
3293 		return recv_bm_rle_bits(mdev, p, c);
3294 
3295 	/* other variants had been implemented for evaluation,
3296 	 * but have been dropped as this one turned out to be "best"
3297 	 * during all our tests. */
3298 
3299 	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3300 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3301 	return FAILED;
3302 }
3303 
3304 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3305 		const char *direction, struct bm_xfer_ctx *c)
3306 {
3307 	/* what would it take to transfer it "plaintext" */
3308 	unsigned plain = sizeof(struct p_header) *
3309 		((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3310 		+ c->bm_words * sizeof(long);
3311 	unsigned total = c->bytes[0] + c->bytes[1];
3312 	unsigned r;
3313 
3314 	/* total can not be zero. but just in case: */
3315 	if (total == 0)
3316 		return;
3317 
3318 	/* don't report if not compressed */
3319 	if (total >= plain)
3320 		return;
3321 
3322 	/* total < plain. check for overflow, still */
3323 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3324 		                    : (1000 * total / plain);
3325 
3326 	if (r > 1000)
3327 		r = 1000;
3328 
3329 	r = 1000 - r;
3330 	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3331 	     "total %u; compression: %u.%u%%\n",
3332 			direction,
3333 			c->bytes[1], c->packets[1],
3334 			c->bytes[0], c->packets[0],
3335 			total, r/10, r % 10);
3336 }
3337 
3338 /* Since we are processing the bitfield from lower addresses to higher,
3339    it does not matter if the process it in 32 bit chunks or 64 bit
3340    chunks as long as it is little endian. (Understand it as byte stream,
3341    beginning with the lowest byte...) If we would use big endian
3342    we would need to process it from the highest address to the lowest,
3343    in order to be agnostic to the 32 vs 64 bits issue.
3344 
3345    returns 0 on failure, 1 if we successfully received it. */
3346 static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3347 {
3348 	struct bm_xfer_ctx c;
3349 	void *buffer;
3350 	enum receive_bitmap_ret ret;
3351 	int ok = FALSE;
3352 
3353 	wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3354 
3355 	drbd_bm_lock(mdev, "receive bitmap");
3356 
3357 	/* maybe we should use some per thread scratch page,
3358 	 * and allocate that during initial device creation? */
3359 	buffer	 = (unsigned long *) __get_free_page(GFP_NOIO);
3360 	if (!buffer) {
3361 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3362 		goto out;
3363 	}
3364 
3365 	c = (struct bm_xfer_ctx) {
3366 		.bm_bits = drbd_bm_bits(mdev),
3367 		.bm_words = drbd_bm_words(mdev),
3368 	};
3369 
3370 	do {
3371 		if (h->command == P_BITMAP) {
3372 			ret = receive_bitmap_plain(mdev, h, buffer, &c);
3373 		} else if (h->command == P_COMPRESSED_BITMAP) {
3374 			/* MAYBE: sanity check that we speak proto >= 90,
3375 			 * and the feature is enabled! */
3376 			struct p_compressed_bm *p;
3377 
3378 			if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3379 				dev_err(DEV, "ReportCBitmap packet too large\n");
3380 				goto out;
3381 			}
3382 			/* use the page buff */
3383 			p = buffer;
3384 			memcpy(p, h, sizeof(*h));
3385 			if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3386 				goto out;
3387 			if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3388 				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3389 				return FAILED;
3390 			}
3391 			ret = decode_bitmap_c(mdev, p, &c);
3392 		} else {
3393 			dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3394 			goto out;
3395 		}
3396 
3397 		c.packets[h->command == P_BITMAP]++;
3398 		c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3399 
3400 		if (ret != OK)
3401 			break;
3402 
3403 		if (!drbd_recv_header(mdev, h))
3404 			goto out;
3405 	} while (ret == OK);
3406 	if (ret == FAILED)
3407 		goto out;
3408 
3409 	INFO_bm_xfer_stats(mdev, "receive", &c);
3410 
3411 	if (mdev->state.conn == C_WF_BITMAP_T) {
3412 		ok = !drbd_send_bitmap(mdev);
3413 		if (!ok)
3414 			goto out;
3415 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3416 		ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3417 		D_ASSERT(ok == SS_SUCCESS);
3418 	} else if (mdev->state.conn != C_WF_BITMAP_S) {
3419 		/* admin may have requested C_DISCONNECTING,
3420 		 * other threads may have noticed network errors */
3421 		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3422 		    drbd_conn_str(mdev->state.conn));
3423 	}
3424 
3425 	ok = TRUE;
3426  out:
3427 	drbd_bm_unlock(mdev);
3428 	if (ok && mdev->state.conn == C_WF_BITMAP_S)
3429 		drbd_start_resync(mdev, C_SYNC_SOURCE);
3430 	free_page((unsigned long) buffer);
3431 	return ok;
3432 }
3433 
3434 static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3435 {
3436 	/* TODO zero copy sink :) */
3437 	static char sink[128];
3438 	int size, want, r;
3439 
3440 	dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3441 	     h->command, h->length);
3442 
3443 	size = h->length;
3444 	while (size > 0) {
3445 		want = min_t(int, size, sizeof(sink));
3446 		r = drbd_recv(mdev, sink, want);
3447 		ERR_IF(r <= 0) break;
3448 		size -= r;
3449 	}
3450 	return size == 0;
3451 }
3452 
3453 static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3454 {
3455 	if (mdev->state.disk >= D_INCONSISTENT)
3456 		drbd_kick_lo(mdev);
3457 
3458 	/* Make sure we've acked all the TCP data associated
3459 	 * with the data requests being unplugged */
3460 	drbd_tcp_quickack(mdev->data.socket);
3461 
3462 	return TRUE;
3463 }
3464 
3465 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3466 
3467 static drbd_cmd_handler_f drbd_default_handler[] = {
3468 	[P_DATA]	    = receive_Data,
3469 	[P_DATA_REPLY]	    = receive_DataReply,
3470 	[P_RS_DATA_REPLY]   = receive_RSDataReply,
3471 	[P_BARRIER]	    = receive_Barrier,
3472 	[P_BITMAP]	    = receive_bitmap,
3473 	[P_COMPRESSED_BITMAP]    = receive_bitmap,
3474 	[P_UNPLUG_REMOTE]   = receive_UnplugRemote,
3475 	[P_DATA_REQUEST]    = receive_DataRequest,
3476 	[P_RS_DATA_REQUEST] = receive_DataRequest,
3477 	[P_SYNC_PARAM]	    = receive_SyncParam,
3478 	[P_SYNC_PARAM89]	   = receive_SyncParam,
3479 	[P_PROTOCOL]        = receive_protocol,
3480 	[P_UUIDS]	    = receive_uuids,
3481 	[P_SIZES]	    = receive_sizes,
3482 	[P_STATE]	    = receive_state,
3483 	[P_STATE_CHG_REQ]   = receive_req_state,
3484 	[P_SYNC_UUID]       = receive_sync_uuid,
3485 	[P_OV_REQUEST]      = receive_DataRequest,
3486 	[P_OV_REPLY]        = receive_DataRequest,
3487 	[P_CSUM_RS_REQUEST]    = receive_DataRequest,
3488 	/* anything missing from this table is in
3489 	 * the asender_tbl, see get_asender_cmd */
3490 	[P_MAX_CMD]	    = NULL,
3491 };
3492 
3493 static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3494 static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3495 
3496 static void drbdd(struct drbd_conf *mdev)
3497 {
3498 	drbd_cmd_handler_f handler;
3499 	struct p_header *header = &mdev->data.rbuf.header;
3500 
3501 	while (get_t_state(&mdev->receiver) == Running) {
3502 		drbd_thread_current_set_cpu(mdev);
3503 		if (!drbd_recv_header(mdev, header)) {
3504 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3505 			break;
3506 		}
3507 
3508 		if (header->command < P_MAX_CMD)
3509 			handler = drbd_cmd_handler[header->command];
3510 		else if (P_MAY_IGNORE < header->command
3511 		     && header->command < P_MAX_OPT_CMD)
3512 			handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3513 		else if (header->command > P_MAX_OPT_CMD)
3514 			handler = receive_skip;
3515 		else
3516 			handler = NULL;
3517 
3518 		if (unlikely(!handler)) {
3519 			dev_err(DEV, "unknown packet type %d, l: %d!\n",
3520 			    header->command, header->length);
3521 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3522 			break;
3523 		}
3524 		if (unlikely(!handler(mdev, header))) {
3525 			dev_err(DEV, "error receiving %s, l: %d!\n",
3526 			    cmdname(header->command), header->length);
3527 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3528 			break;
3529 		}
3530 	}
3531 }
3532 
3533 static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3534 {
3535 	struct hlist_head *slot;
3536 	struct hlist_node *pos;
3537 	struct hlist_node *tmp;
3538 	struct drbd_request *req;
3539 	int i;
3540 
3541 	/*
3542 	 * Application READ requests
3543 	 */
3544 	spin_lock_irq(&mdev->req_lock);
3545 	for (i = 0; i < APP_R_HSIZE; i++) {
3546 		slot = mdev->app_reads_hash+i;
3547 		hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3548 			/* it may (but should not any longer!)
3549 			 * be on the work queue; if that assert triggers,
3550 			 * we need to also grab the
3551 			 * spin_lock_irq(&mdev->data.work.q_lock);
3552 			 * and list_del_init here. */
3553 			D_ASSERT(list_empty(&req->w.list));
3554 			/* It would be nice to complete outside of spinlock.
3555 			 * But this is easier for now. */
3556 			_req_mod(req, connection_lost_while_pending);
3557 		}
3558 	}
3559 	for (i = 0; i < APP_R_HSIZE; i++)
3560 		if (!hlist_empty(mdev->app_reads_hash+i))
3561 			dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3562 				"%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3563 
3564 	memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3565 	spin_unlock_irq(&mdev->req_lock);
3566 }
3567 
3568 void drbd_flush_workqueue(struct drbd_conf *mdev)
3569 {
3570 	struct drbd_wq_barrier barr;
3571 
3572 	barr.w.cb = w_prev_work_done;
3573 	init_completion(&barr.done);
3574 	drbd_queue_work(&mdev->data.work, &barr.w);
3575 	wait_for_completion(&barr.done);
3576 }
3577 
3578 static void drbd_disconnect(struct drbd_conf *mdev)
3579 {
3580 	enum drbd_fencing_p fp;
3581 	union drbd_state os, ns;
3582 	int rv = SS_UNKNOWN_ERROR;
3583 	unsigned int i;
3584 
3585 	if (mdev->state.conn == C_STANDALONE)
3586 		return;
3587 	if (mdev->state.conn >= C_WF_CONNECTION)
3588 		dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3589 				drbd_conn_str(mdev->state.conn));
3590 
3591 	/* asender does not clean up anything. it must not interfere, either */
3592 	drbd_thread_stop(&mdev->asender);
3593 
3594 	mutex_lock(&mdev->data.mutex);
3595 	drbd_free_sock(mdev);
3596 	mutex_unlock(&mdev->data.mutex);
3597 
3598 	spin_lock_irq(&mdev->req_lock);
3599 	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3600 	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3601 	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3602 	spin_unlock_irq(&mdev->req_lock);
3603 
3604 	/* We do not have data structures that would allow us to
3605 	 * get the rs_pending_cnt down to 0 again.
3606 	 *  * On C_SYNC_TARGET we do not have any data structures describing
3607 	 *    the pending RSDataRequest's we have sent.
3608 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
3609 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3610 	 *  And no, it is not the sum of the reference counts in the
3611 	 *  resync_LRU. The resync_LRU tracks the whole operation including
3612 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3613 	 *  on the fly. */
3614 	drbd_rs_cancel_all(mdev);
3615 	mdev->rs_total = 0;
3616 	mdev->rs_failed = 0;
3617 	atomic_set(&mdev->rs_pending_cnt, 0);
3618 	wake_up(&mdev->misc_wait);
3619 
3620 	/* make sure syncer is stopped and w_resume_next_sg queued */
3621 	del_timer_sync(&mdev->resync_timer);
3622 	set_bit(STOP_SYNC_TIMER, &mdev->flags);
3623 	resync_timer_fn((unsigned long)mdev);
3624 
3625 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3626 	 * w_make_resync_request etc. which may still be on the worker queue
3627 	 * to be "canceled" */
3628 	drbd_flush_workqueue(mdev);
3629 
3630 	/* This also does reclaim_net_ee().  If we do this too early, we might
3631 	 * miss some resync ee and pages.*/
3632 	drbd_process_done_ee(mdev);
3633 
3634 	kfree(mdev->p_uuid);
3635 	mdev->p_uuid = NULL;
3636 
3637 	if (!mdev->state.susp)
3638 		tl_clear(mdev);
3639 
3640 	drbd_fail_pending_reads(mdev);
3641 
3642 	dev_info(DEV, "Connection closed\n");
3643 
3644 	drbd_md_sync(mdev);
3645 
3646 	fp = FP_DONT_CARE;
3647 	if (get_ldev(mdev)) {
3648 		fp = mdev->ldev->dc.fencing;
3649 		put_ldev(mdev);
3650 	}
3651 
3652 	if (mdev->state.role == R_PRIMARY) {
3653 		if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3654 			enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3655 			drbd_request_state(mdev, NS(pdsk, nps));
3656 		}
3657 	}
3658 
3659 	spin_lock_irq(&mdev->req_lock);
3660 	os = mdev->state;
3661 	if (os.conn >= C_UNCONNECTED) {
3662 		/* Do not restart in case we are C_DISCONNECTING */
3663 		ns = os;
3664 		ns.conn = C_UNCONNECTED;
3665 		rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3666 	}
3667 	spin_unlock_irq(&mdev->req_lock);
3668 
3669 	if (os.conn == C_DISCONNECTING) {
3670 		struct hlist_head *h;
3671 		wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3672 
3673 		/* we must not free the tl_hash
3674 		 * while application io is still on the fly */
3675 		wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3676 
3677 		spin_lock_irq(&mdev->req_lock);
3678 		/* paranoia code */
3679 		for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3680 			if (h->first)
3681 				dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3682 						(int)(h - mdev->ee_hash), h->first);
3683 		kfree(mdev->ee_hash);
3684 		mdev->ee_hash = NULL;
3685 		mdev->ee_hash_s = 0;
3686 
3687 		/* paranoia code */
3688 		for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3689 			if (h->first)
3690 				dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3691 						(int)(h - mdev->tl_hash), h->first);
3692 		kfree(mdev->tl_hash);
3693 		mdev->tl_hash = NULL;
3694 		mdev->tl_hash_s = 0;
3695 		spin_unlock_irq(&mdev->req_lock);
3696 
3697 		crypto_free_hash(mdev->cram_hmac_tfm);
3698 		mdev->cram_hmac_tfm = NULL;
3699 
3700 		kfree(mdev->net_conf);
3701 		mdev->net_conf = NULL;
3702 		drbd_request_state(mdev, NS(conn, C_STANDALONE));
3703 	}
3704 
3705 	/* tcp_close and release of sendpage pages can be deferred.  I don't
3706 	 * want to use SO_LINGER, because apparently it can be deferred for
3707 	 * more than 20 seconds (longest time I checked).
3708 	 *
3709 	 * Actually we don't care for exactly when the network stack does its
3710 	 * put_page(), but release our reference on these pages right here.
3711 	 */
3712 	i = drbd_release_ee(mdev, &mdev->net_ee);
3713 	if (i)
3714 		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3715 	i = atomic_read(&mdev->pp_in_use);
3716 	if (i)
3717 		dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
3718 
3719 	D_ASSERT(list_empty(&mdev->read_ee));
3720 	D_ASSERT(list_empty(&mdev->active_ee));
3721 	D_ASSERT(list_empty(&mdev->sync_ee));
3722 	D_ASSERT(list_empty(&mdev->done_ee));
3723 
3724 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3725 	atomic_set(&mdev->current_epoch->epoch_size, 0);
3726 	D_ASSERT(list_empty(&mdev->current_epoch->list));
3727 }
3728 
3729 /*
3730  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3731  * we can agree on is stored in agreed_pro_version.
3732  *
3733  * feature flags and the reserved array should be enough room for future
3734  * enhancements of the handshake protocol, and possible plugins...
3735  *
3736  * for now, they are expected to be zero, but ignored.
3737  */
3738 static int drbd_send_handshake(struct drbd_conf *mdev)
3739 {
3740 	/* ASSERT current == mdev->receiver ... */
3741 	struct p_handshake *p = &mdev->data.sbuf.handshake;
3742 	int ok;
3743 
3744 	if (mutex_lock_interruptible(&mdev->data.mutex)) {
3745 		dev_err(DEV, "interrupted during initial handshake\n");
3746 		return 0; /* interrupted. not ok. */
3747 	}
3748 
3749 	if (mdev->data.socket == NULL) {
3750 		mutex_unlock(&mdev->data.mutex);
3751 		return 0;
3752 	}
3753 
3754 	memset(p, 0, sizeof(*p));
3755 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3756 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3757 	ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3758 			     (struct p_header *)p, sizeof(*p), 0 );
3759 	mutex_unlock(&mdev->data.mutex);
3760 	return ok;
3761 }
3762 
3763 /*
3764  * return values:
3765  *   1 yes, we have a valid connection
3766  *   0 oops, did not work out, please try again
3767  *  -1 peer talks different language,
3768  *     no point in trying again, please go standalone.
3769  */
3770 static int drbd_do_handshake(struct drbd_conf *mdev)
3771 {
3772 	/* ASSERT current == mdev->receiver ... */
3773 	struct p_handshake *p = &mdev->data.rbuf.handshake;
3774 	const int expect = sizeof(struct p_handshake)
3775 			  -sizeof(struct p_header);
3776 	int rv;
3777 
3778 	rv = drbd_send_handshake(mdev);
3779 	if (!rv)
3780 		return 0;
3781 
3782 	rv = drbd_recv_header(mdev, &p->head);
3783 	if (!rv)
3784 		return 0;
3785 
3786 	if (p->head.command != P_HAND_SHAKE) {
3787 		dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3788 		     cmdname(p->head.command), p->head.command);
3789 		return -1;
3790 	}
3791 
3792 	if (p->head.length != expect) {
3793 		dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3794 		     expect, p->head.length);
3795 		return -1;
3796 	}
3797 
3798 	rv = drbd_recv(mdev, &p->head.payload, expect);
3799 
3800 	if (rv != expect) {
3801 		dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3802 		return 0;
3803 	}
3804 
3805 	p->protocol_min = be32_to_cpu(p->protocol_min);
3806 	p->protocol_max = be32_to_cpu(p->protocol_max);
3807 	if (p->protocol_max == 0)
3808 		p->protocol_max = p->protocol_min;
3809 
3810 	if (PRO_VERSION_MAX < p->protocol_min ||
3811 	    PRO_VERSION_MIN > p->protocol_max)
3812 		goto incompat;
3813 
3814 	mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3815 
3816 	dev_info(DEV, "Handshake successful: "
3817 	     "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3818 
3819 	return 1;
3820 
3821  incompat:
3822 	dev_err(DEV, "incompatible DRBD dialects: "
3823 	    "I support %d-%d, peer supports %d-%d\n",
3824 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
3825 	    p->protocol_min, p->protocol_max);
3826 	return -1;
3827 }
3828 
3829 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3830 static int drbd_do_auth(struct drbd_conf *mdev)
3831 {
3832 	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3833 	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3834 	return 0;
3835 }
3836 #else
3837 #define CHALLENGE_LEN 64
3838 static int drbd_do_auth(struct drbd_conf *mdev)
3839 {
3840 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
3841 	struct scatterlist sg;
3842 	char *response = NULL;
3843 	char *right_response = NULL;
3844 	char *peers_ch = NULL;
3845 	struct p_header p;
3846 	unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3847 	unsigned int resp_size;
3848 	struct hash_desc desc;
3849 	int rv;
3850 
3851 	desc.tfm = mdev->cram_hmac_tfm;
3852 	desc.flags = 0;
3853 
3854 	rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3855 				(u8 *)mdev->net_conf->shared_secret, key_len);
3856 	if (rv) {
3857 		dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
3858 		rv = 0;
3859 		goto fail;
3860 	}
3861 
3862 	get_random_bytes(my_challenge, CHALLENGE_LEN);
3863 
3864 	rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
3865 	if (!rv)
3866 		goto fail;
3867 
3868 	rv = drbd_recv_header(mdev, &p);
3869 	if (!rv)
3870 		goto fail;
3871 
3872 	if (p.command != P_AUTH_CHALLENGE) {
3873 		dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
3874 		    cmdname(p.command), p.command);
3875 		rv = 0;
3876 		goto fail;
3877 	}
3878 
3879 	if (p.length > CHALLENGE_LEN*2) {
3880 		dev_err(DEV, "expected AuthChallenge payload too big.\n");
3881 		rv = 0;
3882 		goto fail;
3883 	}
3884 
3885 	peers_ch = kmalloc(p.length, GFP_NOIO);
3886 	if (peers_ch == NULL) {
3887 		dev_err(DEV, "kmalloc of peers_ch failed\n");
3888 		rv = 0;
3889 		goto fail;
3890 	}
3891 
3892 	rv = drbd_recv(mdev, peers_ch, p.length);
3893 
3894 	if (rv != p.length) {
3895 		dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
3896 		rv = 0;
3897 		goto fail;
3898 	}
3899 
3900 	resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
3901 	response = kmalloc(resp_size, GFP_NOIO);
3902 	if (response == NULL) {
3903 		dev_err(DEV, "kmalloc of response failed\n");
3904 		rv = 0;
3905 		goto fail;
3906 	}
3907 
3908 	sg_init_table(&sg, 1);
3909 	sg_set_buf(&sg, peers_ch, p.length);
3910 
3911 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
3912 	if (rv) {
3913 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3914 		rv = 0;
3915 		goto fail;
3916 	}
3917 
3918 	rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
3919 	if (!rv)
3920 		goto fail;
3921 
3922 	rv = drbd_recv_header(mdev, &p);
3923 	if (!rv)
3924 		goto fail;
3925 
3926 	if (p.command != P_AUTH_RESPONSE) {
3927 		dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
3928 		    cmdname(p.command), p.command);
3929 		rv = 0;
3930 		goto fail;
3931 	}
3932 
3933 	if (p.length != resp_size) {
3934 		dev_err(DEV, "expected AuthResponse payload of wrong size\n");
3935 		rv = 0;
3936 		goto fail;
3937 	}
3938 
3939 	rv = drbd_recv(mdev, response , resp_size);
3940 
3941 	if (rv != resp_size) {
3942 		dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
3943 		rv = 0;
3944 		goto fail;
3945 	}
3946 
3947 	right_response = kmalloc(resp_size, GFP_NOIO);
3948 	if (response == NULL) {
3949 		dev_err(DEV, "kmalloc of right_response failed\n");
3950 		rv = 0;
3951 		goto fail;
3952 	}
3953 
3954 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
3955 
3956 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
3957 	if (rv) {
3958 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3959 		rv = 0;
3960 		goto fail;
3961 	}
3962 
3963 	rv = !memcmp(response, right_response, resp_size);
3964 
3965 	if (rv)
3966 		dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
3967 		     resp_size, mdev->net_conf->cram_hmac_alg);
3968 
3969  fail:
3970 	kfree(peers_ch);
3971 	kfree(response);
3972 	kfree(right_response);
3973 
3974 	return rv;
3975 }
3976 #endif
3977 
3978 int drbdd_init(struct drbd_thread *thi)
3979 {
3980 	struct drbd_conf *mdev = thi->mdev;
3981 	unsigned int minor = mdev_to_minor(mdev);
3982 	int h;
3983 
3984 	sprintf(current->comm, "drbd%d_receiver", minor);
3985 
3986 	dev_info(DEV, "receiver (re)started\n");
3987 
3988 	do {
3989 		h = drbd_connect(mdev);
3990 		if (h == 0) {
3991 			drbd_disconnect(mdev);
3992 			__set_current_state(TASK_INTERRUPTIBLE);
3993 			schedule_timeout(HZ);
3994 		}
3995 		if (h == -1) {
3996 			dev_warn(DEV, "Discarding network configuration.\n");
3997 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3998 		}
3999 	} while (h == 0);
4000 
4001 	if (h > 0) {
4002 		if (get_net_conf(mdev)) {
4003 			drbdd(mdev);
4004 			put_net_conf(mdev);
4005 		}
4006 	}
4007 
4008 	drbd_disconnect(mdev);
4009 
4010 	dev_info(DEV, "receiver terminated\n");
4011 	return 0;
4012 }
4013 
4014 /* ********* acknowledge sender ******** */
4015 
4016 static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4017 {
4018 	struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4019 
4020 	int retcode = be32_to_cpu(p->retcode);
4021 
4022 	if (retcode >= SS_SUCCESS) {
4023 		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4024 	} else {
4025 		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4026 		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4027 		    drbd_set_st_err_str(retcode), retcode);
4028 	}
4029 	wake_up(&mdev->state_wait);
4030 
4031 	return TRUE;
4032 }
4033 
4034 static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4035 {
4036 	return drbd_send_ping_ack(mdev);
4037 
4038 }
4039 
4040 static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4041 {
4042 	/* restore idle timeout */
4043 	mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4044 
4045 	return TRUE;
4046 }
4047 
4048 static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4049 {
4050 	struct p_block_ack *p = (struct p_block_ack *)h;
4051 	sector_t sector = be64_to_cpu(p->sector);
4052 	int blksize = be32_to_cpu(p->blksize);
4053 
4054 	D_ASSERT(mdev->agreed_pro_version >= 89);
4055 
4056 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4057 
4058 	drbd_rs_complete_io(mdev, sector);
4059 	drbd_set_in_sync(mdev, sector, blksize);
4060 	/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4061 	mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4062 	dec_rs_pending(mdev);
4063 
4064 	return TRUE;
4065 }
4066 
4067 /* when we receive the ACK for a write request,
4068  * verify that we actually know about it */
4069 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4070 	u64 id, sector_t sector)
4071 {
4072 	struct hlist_head *slot = tl_hash_slot(mdev, sector);
4073 	struct hlist_node *n;
4074 	struct drbd_request *req;
4075 
4076 	hlist_for_each_entry(req, n, slot, colision) {
4077 		if ((unsigned long)req == (unsigned long)id) {
4078 			if (req->sector != sector) {
4079 				dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4080 				    "wrong sector (%llus versus %llus)\n", req,
4081 				    (unsigned long long)req->sector,
4082 				    (unsigned long long)sector);
4083 				break;
4084 			}
4085 			return req;
4086 		}
4087 	}
4088 	dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4089 		(void *)(unsigned long)id, (unsigned long long)sector);
4090 	return NULL;
4091 }
4092 
4093 typedef struct drbd_request *(req_validator_fn)
4094 	(struct drbd_conf *mdev, u64 id, sector_t sector);
4095 
4096 static int validate_req_change_req_state(struct drbd_conf *mdev,
4097 	u64 id, sector_t sector, req_validator_fn validator,
4098 	const char *func, enum drbd_req_event what)
4099 {
4100 	struct drbd_request *req;
4101 	struct bio_and_error m;
4102 
4103 	spin_lock_irq(&mdev->req_lock);
4104 	req = validator(mdev, id, sector);
4105 	if (unlikely(!req)) {
4106 		spin_unlock_irq(&mdev->req_lock);
4107 		dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4108 		return FALSE;
4109 	}
4110 	__req_mod(req, what, &m);
4111 	spin_unlock_irq(&mdev->req_lock);
4112 
4113 	if (m.bio)
4114 		complete_master_bio(mdev, &m);
4115 	return TRUE;
4116 }
4117 
4118 static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4119 {
4120 	struct p_block_ack *p = (struct p_block_ack *)h;
4121 	sector_t sector = be64_to_cpu(p->sector);
4122 	int blksize = be32_to_cpu(p->blksize);
4123 	enum drbd_req_event what;
4124 
4125 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4126 
4127 	if (is_syncer_block_id(p->block_id)) {
4128 		drbd_set_in_sync(mdev, sector, blksize);
4129 		dec_rs_pending(mdev);
4130 		return TRUE;
4131 	}
4132 	switch (be16_to_cpu(h->command)) {
4133 	case P_RS_WRITE_ACK:
4134 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4135 		what = write_acked_by_peer_and_sis;
4136 		break;
4137 	case P_WRITE_ACK:
4138 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4139 		what = write_acked_by_peer;
4140 		break;
4141 	case P_RECV_ACK:
4142 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4143 		what = recv_acked_by_peer;
4144 		break;
4145 	case P_DISCARD_ACK:
4146 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4147 		what = conflict_discarded_by_peer;
4148 		break;
4149 	default:
4150 		D_ASSERT(0);
4151 		return FALSE;
4152 	}
4153 
4154 	return validate_req_change_req_state(mdev, p->block_id, sector,
4155 		_ack_id_to_req, __func__ , what);
4156 }
4157 
4158 static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4159 {
4160 	struct p_block_ack *p = (struct p_block_ack *)h;
4161 	sector_t sector = be64_to_cpu(p->sector);
4162 
4163 	if (__ratelimit(&drbd_ratelimit_state))
4164 		dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4165 
4166 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4167 
4168 	if (is_syncer_block_id(p->block_id)) {
4169 		int size = be32_to_cpu(p->blksize);
4170 		dec_rs_pending(mdev);
4171 		drbd_rs_failed_io(mdev, sector, size);
4172 		return TRUE;
4173 	}
4174 	return validate_req_change_req_state(mdev, p->block_id, sector,
4175 		_ack_id_to_req, __func__ , neg_acked);
4176 }
4177 
4178 static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4179 {
4180 	struct p_block_ack *p = (struct p_block_ack *)h;
4181 	sector_t sector = be64_to_cpu(p->sector);
4182 
4183 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4184 	dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4185 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
4186 
4187 	return validate_req_change_req_state(mdev, p->block_id, sector,
4188 		_ar_id_to_req, __func__ , neg_acked);
4189 }
4190 
4191 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4192 {
4193 	sector_t sector;
4194 	int size;
4195 	struct p_block_ack *p = (struct p_block_ack *)h;
4196 
4197 	sector = be64_to_cpu(p->sector);
4198 	size = be32_to_cpu(p->blksize);
4199 	D_ASSERT(p->block_id == ID_SYNCER);
4200 
4201 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4202 
4203 	dec_rs_pending(mdev);
4204 
4205 	if (get_ldev_if_state(mdev, D_FAILED)) {
4206 		drbd_rs_complete_io(mdev, sector);
4207 		drbd_rs_failed_io(mdev, sector, size);
4208 		put_ldev(mdev);
4209 	}
4210 
4211 	return TRUE;
4212 }
4213 
4214 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4215 {
4216 	struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4217 
4218 	tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4219 
4220 	return TRUE;
4221 }
4222 
4223 static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4224 {
4225 	struct p_block_ack *p = (struct p_block_ack *)h;
4226 	struct drbd_work *w;
4227 	sector_t sector;
4228 	int size;
4229 
4230 	sector = be64_to_cpu(p->sector);
4231 	size = be32_to_cpu(p->blksize);
4232 
4233 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4234 
4235 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4236 		drbd_ov_oos_found(mdev, sector, size);
4237 	else
4238 		ov_oos_print(mdev);
4239 
4240 	drbd_rs_complete_io(mdev, sector);
4241 	dec_rs_pending(mdev);
4242 
4243 	if (--mdev->ov_left == 0) {
4244 		w = kmalloc(sizeof(*w), GFP_NOIO);
4245 		if (w) {
4246 			w->cb = w_ov_finished;
4247 			drbd_queue_work_front(&mdev->data.work, w);
4248 		} else {
4249 			dev_err(DEV, "kmalloc(w) failed.");
4250 			ov_oos_print(mdev);
4251 			drbd_resync_finished(mdev);
4252 		}
4253 	}
4254 	return TRUE;
4255 }
4256 
4257 struct asender_cmd {
4258 	size_t pkt_size;
4259 	int (*process)(struct drbd_conf *mdev, struct p_header *h);
4260 };
4261 
4262 static struct asender_cmd *get_asender_cmd(int cmd)
4263 {
4264 	static struct asender_cmd asender_tbl[] = {
4265 		/* anything missing from this table is in
4266 		 * the drbd_cmd_handler (drbd_default_handler) table,
4267 		 * see the beginning of drbdd() */
4268 	[P_PING]	    = { sizeof(struct p_header), got_Ping },
4269 	[P_PING_ACK]	    = { sizeof(struct p_header), got_PingAck },
4270 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4271 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4272 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4273 	[P_DISCARD_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4274 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
4275 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
4276 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4277 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
4278 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
4279 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4280 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4281 	[P_MAX_CMD]	    = { 0, NULL },
4282 	};
4283 	if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4284 		return NULL;
4285 	return &asender_tbl[cmd];
4286 }
4287 
4288 int drbd_asender(struct drbd_thread *thi)
4289 {
4290 	struct drbd_conf *mdev = thi->mdev;
4291 	struct p_header *h = &mdev->meta.rbuf.header;
4292 	struct asender_cmd *cmd = NULL;
4293 
4294 	int rv, len;
4295 	void *buf    = h;
4296 	int received = 0;
4297 	int expect   = sizeof(struct p_header);
4298 	int empty;
4299 
4300 	sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4301 
4302 	current->policy = SCHED_RR;  /* Make this a realtime task! */
4303 	current->rt_priority = 2;    /* more important than all other tasks */
4304 
4305 	while (get_t_state(thi) == Running) {
4306 		drbd_thread_current_set_cpu(mdev);
4307 		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4308 			ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4309 			mdev->meta.socket->sk->sk_rcvtimeo =
4310 				mdev->net_conf->ping_timeo*HZ/10;
4311 		}
4312 
4313 		/* conditionally cork;
4314 		 * it may hurt latency if we cork without much to send */
4315 		if (!mdev->net_conf->no_cork &&
4316 			3 < atomic_read(&mdev->unacked_cnt))
4317 			drbd_tcp_cork(mdev->meta.socket);
4318 		while (1) {
4319 			clear_bit(SIGNAL_ASENDER, &mdev->flags);
4320 			flush_signals(current);
4321 			if (!drbd_process_done_ee(mdev)) {
4322 				dev_err(DEV, "process_done_ee() = NOT_OK\n");
4323 				goto reconnect;
4324 			}
4325 			/* to avoid race with newly queued ACKs */
4326 			set_bit(SIGNAL_ASENDER, &mdev->flags);
4327 			spin_lock_irq(&mdev->req_lock);
4328 			empty = list_empty(&mdev->done_ee);
4329 			spin_unlock_irq(&mdev->req_lock);
4330 			/* new ack may have been queued right here,
4331 			 * but then there is also a signal pending,
4332 			 * and we start over... */
4333 			if (empty)
4334 				break;
4335 		}
4336 		/* but unconditionally uncork unless disabled */
4337 		if (!mdev->net_conf->no_cork)
4338 			drbd_tcp_uncork(mdev->meta.socket);
4339 
4340 		/* short circuit, recv_msg would return EINTR anyways. */
4341 		if (signal_pending(current))
4342 			continue;
4343 
4344 		rv = drbd_recv_short(mdev, mdev->meta.socket,
4345 				     buf, expect-received, 0);
4346 		clear_bit(SIGNAL_ASENDER, &mdev->flags);
4347 
4348 		flush_signals(current);
4349 
4350 		/* Note:
4351 		 * -EINTR	 (on meta) we got a signal
4352 		 * -EAGAIN	 (on meta) rcvtimeo expired
4353 		 * -ECONNRESET	 other side closed the connection
4354 		 * -ERESTARTSYS  (on data) we got a signal
4355 		 * rv <  0	 other than above: unexpected error!
4356 		 * rv == expected: full header or command
4357 		 * rv <  expected: "woken" by signal during receive
4358 		 * rv == 0	 : "connection shut down by peer"
4359 		 */
4360 		if (likely(rv > 0)) {
4361 			received += rv;
4362 			buf	 += rv;
4363 		} else if (rv == 0) {
4364 			dev_err(DEV, "meta connection shut down by peer.\n");
4365 			goto reconnect;
4366 		} else if (rv == -EAGAIN) {
4367 			if (mdev->meta.socket->sk->sk_rcvtimeo ==
4368 			    mdev->net_conf->ping_timeo*HZ/10) {
4369 				dev_err(DEV, "PingAck did not arrive in time.\n");
4370 				goto reconnect;
4371 			}
4372 			set_bit(SEND_PING, &mdev->flags);
4373 			continue;
4374 		} else if (rv == -EINTR) {
4375 			continue;
4376 		} else {
4377 			dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4378 			goto reconnect;
4379 		}
4380 
4381 		if (received == expect && cmd == NULL) {
4382 			if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4383 				dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4384 				    (long)be32_to_cpu(h->magic),
4385 				    h->command, h->length);
4386 				goto reconnect;
4387 			}
4388 			cmd = get_asender_cmd(be16_to_cpu(h->command));
4389 			len = be16_to_cpu(h->length);
4390 			if (unlikely(cmd == NULL)) {
4391 				dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4392 				    (long)be32_to_cpu(h->magic),
4393 				    h->command, h->length);
4394 				goto disconnect;
4395 			}
4396 			expect = cmd->pkt_size;
4397 			ERR_IF(len != expect-sizeof(struct p_header))
4398 				goto reconnect;
4399 		}
4400 		if (received == expect) {
4401 			D_ASSERT(cmd != NULL);
4402 			if (!cmd->process(mdev, h))
4403 				goto reconnect;
4404 
4405 			buf	 = h;
4406 			received = 0;
4407 			expect	 = sizeof(struct p_header);
4408 			cmd	 = NULL;
4409 		}
4410 	}
4411 
4412 	if (0) {
4413 reconnect:
4414 		drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4415 	}
4416 	if (0) {
4417 disconnect:
4418 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4419 	}
4420 	clear_bit(SIGNAL_ASENDER, &mdev->flags);
4421 
4422 	D_ASSERT(mdev->state.conn < C_CONNECTED);
4423 	dev_info(DEV, "asender terminated\n");
4424 
4425 	return 0;
4426 }
4427