1 /* 2 drbd_receiver.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 */ 24 25 26 #include <linux/autoconf.h> 27 #include <linux/module.h> 28 29 #include <asm/uaccess.h> 30 #include <net/sock.h> 31 32 #include <linux/version.h> 33 #include <linux/drbd.h> 34 #include <linux/fs.h> 35 #include <linux/file.h> 36 #include <linux/in.h> 37 #include <linux/mm.h> 38 #include <linux/memcontrol.h> 39 #include <linux/mm_inline.h> 40 #include <linux/slab.h> 41 #include <linux/smp_lock.h> 42 #include <linux/pkt_sched.h> 43 #define __KERNEL_SYSCALLS__ 44 #include <linux/unistd.h> 45 #include <linux/vmalloc.h> 46 #include <linux/random.h> 47 #include <linux/mm.h> 48 #include <linux/string.h> 49 #include <linux/scatterlist.h> 50 #include "drbd_int.h" 51 #include "drbd_tracing.h" 52 #include "drbd_req.h" 53 54 #include "drbd_vli.h" 55 56 struct flush_work { 57 struct drbd_work w; 58 struct drbd_epoch *epoch; 59 }; 60 61 enum finish_epoch { 62 FE_STILL_LIVE, 63 FE_DESTROYED, 64 FE_RECYCLED, 65 }; 66 67 static int drbd_do_handshake(struct drbd_conf *mdev); 68 static int drbd_do_auth(struct drbd_conf *mdev); 69 70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event); 71 static int e_end_block(struct drbd_conf *, struct drbd_work *, int); 72 73 static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch) 74 { 75 struct drbd_epoch *prev; 76 spin_lock(&mdev->epoch_lock); 77 prev = list_entry(epoch->list.prev, struct drbd_epoch, list); 78 if (prev == epoch || prev == mdev->current_epoch) 79 prev = NULL; 80 spin_unlock(&mdev->epoch_lock); 81 return prev; 82 } 83 84 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 85 86 static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev) 87 { 88 struct page *page = NULL; 89 90 /* Yes, testing drbd_pp_vacant outside the lock is racy. 91 * So what. It saves a spin_lock. */ 92 if (drbd_pp_vacant > 0) { 93 spin_lock(&drbd_pp_lock); 94 page = drbd_pp_pool; 95 if (page) { 96 drbd_pp_pool = (struct page *)page_private(page); 97 set_page_private(page, 0); /* just to be polite */ 98 drbd_pp_vacant--; 99 } 100 spin_unlock(&drbd_pp_lock); 101 } 102 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD 103 * "criss-cross" setup, that might cause write-out on some other DRBD, 104 * which in turn might block on the other node at this very place. */ 105 if (!page) 106 page = alloc_page(GFP_TRY); 107 if (page) 108 atomic_inc(&mdev->pp_in_use); 109 return page; 110 } 111 112 /* kick lower level device, if we have more than (arbitrary number) 113 * reference counts on it, which typically are locally submitted io 114 * requests. don't use unacked_cnt, so we speed up proto A and B, too. */ 115 static void maybe_kick_lo(struct drbd_conf *mdev) 116 { 117 if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark) 118 drbd_kick_lo(mdev); 119 } 120 121 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed) 122 { 123 struct drbd_epoch_entry *e; 124 struct list_head *le, *tle; 125 126 /* The EEs are always appended to the end of the list. Since 127 they are sent in order over the wire, they have to finish 128 in order. As soon as we see the first not finished we can 129 stop to examine the list... */ 130 131 list_for_each_safe(le, tle, &mdev->net_ee) { 132 e = list_entry(le, struct drbd_epoch_entry, w.list); 133 if (drbd_bio_has_active_page(e->private_bio)) 134 break; 135 list_move(le, to_be_freed); 136 } 137 } 138 139 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev) 140 { 141 LIST_HEAD(reclaimed); 142 struct drbd_epoch_entry *e, *t; 143 144 maybe_kick_lo(mdev); 145 spin_lock_irq(&mdev->req_lock); 146 reclaim_net_ee(mdev, &reclaimed); 147 spin_unlock_irq(&mdev->req_lock); 148 149 list_for_each_entry_safe(e, t, &reclaimed, w.list) 150 drbd_free_ee(mdev, e); 151 } 152 153 /** 154 * drbd_pp_alloc() - Returns a page, fails only if a signal comes in 155 * @mdev: DRBD device. 156 * @retry: whether or not to retry allocation forever (or until signalled) 157 * 158 * Tries to allocate a page, first from our own page pool, then from the 159 * kernel, unless this allocation would exceed the max_buffers setting. 160 * If @retry is non-zero, retry until DRBD frees a page somewhere else. 161 */ 162 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry) 163 { 164 struct page *page = NULL; 165 DEFINE_WAIT(wait); 166 167 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { 168 page = drbd_pp_first_page_or_try_alloc(mdev); 169 if (page) 170 return page; 171 } 172 173 for (;;) { 174 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); 175 176 drbd_kick_lo_and_reclaim_net(mdev); 177 178 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { 179 page = drbd_pp_first_page_or_try_alloc(mdev); 180 if (page) 181 break; 182 } 183 184 if (!retry) 185 break; 186 187 if (signal_pending(current)) { 188 dev_warn(DEV, "drbd_pp_alloc interrupted!\n"); 189 break; 190 } 191 192 schedule(); 193 } 194 finish_wait(&drbd_pp_wait, &wait); 195 196 return page; 197 } 198 199 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc. 200 * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */ 201 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page) 202 { 203 int free_it; 204 205 spin_lock(&drbd_pp_lock); 206 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) { 207 free_it = 1; 208 } else { 209 set_page_private(page, (unsigned long)drbd_pp_pool); 210 drbd_pp_pool = page; 211 drbd_pp_vacant++; 212 free_it = 0; 213 } 214 spin_unlock(&drbd_pp_lock); 215 216 atomic_dec(&mdev->pp_in_use); 217 218 if (free_it) 219 __free_page(page); 220 221 wake_up(&drbd_pp_wait); 222 } 223 224 static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio) 225 { 226 struct page *p_to_be_freed = NULL; 227 struct page *page; 228 struct bio_vec *bvec; 229 int i; 230 231 spin_lock(&drbd_pp_lock); 232 __bio_for_each_segment(bvec, bio, i, 0) { 233 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) { 234 set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed); 235 p_to_be_freed = bvec->bv_page; 236 } else { 237 set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool); 238 drbd_pp_pool = bvec->bv_page; 239 drbd_pp_vacant++; 240 } 241 } 242 spin_unlock(&drbd_pp_lock); 243 atomic_sub(bio->bi_vcnt, &mdev->pp_in_use); 244 245 while (p_to_be_freed) { 246 page = p_to_be_freed; 247 p_to_be_freed = (struct page *)page_private(page); 248 set_page_private(page, 0); /* just to be polite */ 249 put_page(page); 250 } 251 252 wake_up(&drbd_pp_wait); 253 } 254 255 /* 256 You need to hold the req_lock: 257 _drbd_wait_ee_list_empty() 258 259 You must not have the req_lock: 260 drbd_free_ee() 261 drbd_alloc_ee() 262 drbd_init_ee() 263 drbd_release_ee() 264 drbd_ee_fix_bhs() 265 drbd_process_done_ee() 266 drbd_clear_done_ee() 267 drbd_wait_ee_list_empty() 268 */ 269 270 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, 271 u64 id, 272 sector_t sector, 273 unsigned int data_size, 274 gfp_t gfp_mask) __must_hold(local) 275 { 276 struct request_queue *q; 277 struct drbd_epoch_entry *e; 278 struct page *page; 279 struct bio *bio; 280 unsigned int ds; 281 282 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE)) 283 return NULL; 284 285 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM); 286 if (!e) { 287 if (!(gfp_mask & __GFP_NOWARN)) 288 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n"); 289 return NULL; 290 } 291 292 bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE)); 293 if (!bio) { 294 if (!(gfp_mask & __GFP_NOWARN)) 295 dev_err(DEV, "alloc_ee: Allocation of a bio failed\n"); 296 goto fail1; 297 } 298 299 bio->bi_bdev = mdev->ldev->backing_bdev; 300 bio->bi_sector = sector; 301 302 ds = data_size; 303 while (ds) { 304 page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT)); 305 if (!page) { 306 if (!(gfp_mask & __GFP_NOWARN)) 307 dev_err(DEV, "alloc_ee: Allocation of a page failed\n"); 308 goto fail2; 309 } 310 if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) { 311 drbd_pp_free(mdev, page); 312 dev_err(DEV, "alloc_ee: bio_add_page(s=%llu," 313 "data_size=%u,ds=%u) failed\n", 314 (unsigned long long)sector, data_size, ds); 315 316 q = bdev_get_queue(bio->bi_bdev); 317 if (q->merge_bvec_fn) { 318 struct bvec_merge_data bvm = { 319 .bi_bdev = bio->bi_bdev, 320 .bi_sector = bio->bi_sector, 321 .bi_size = bio->bi_size, 322 .bi_rw = bio->bi_rw, 323 }; 324 int l = q->merge_bvec_fn(q, &bvm, 325 &bio->bi_io_vec[bio->bi_vcnt]); 326 dev_err(DEV, "merge_bvec_fn() = %d\n", l); 327 } 328 329 /* dump more of the bio. */ 330 dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs); 331 dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt); 332 dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size); 333 dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments); 334 335 goto fail2; 336 break; 337 } 338 ds -= min_t(int, ds, PAGE_SIZE); 339 } 340 341 D_ASSERT(data_size == bio->bi_size); 342 343 bio->bi_private = e; 344 e->mdev = mdev; 345 e->sector = sector; 346 e->size = bio->bi_size; 347 348 e->private_bio = bio; 349 e->block_id = id; 350 INIT_HLIST_NODE(&e->colision); 351 e->epoch = NULL; 352 e->flags = 0; 353 354 trace_drbd_ee(mdev, e, "allocated"); 355 356 return e; 357 358 fail2: 359 drbd_pp_free_bio_pages(mdev, bio); 360 bio_put(bio); 361 fail1: 362 mempool_free(e, drbd_ee_mempool); 363 364 return NULL; 365 } 366 367 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e) 368 { 369 struct bio *bio = e->private_bio; 370 trace_drbd_ee(mdev, e, "freed"); 371 drbd_pp_free_bio_pages(mdev, bio); 372 bio_put(bio); 373 D_ASSERT(hlist_unhashed(&e->colision)); 374 mempool_free(e, drbd_ee_mempool); 375 } 376 377 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list) 378 { 379 LIST_HEAD(work_list); 380 struct drbd_epoch_entry *e, *t; 381 int count = 0; 382 383 spin_lock_irq(&mdev->req_lock); 384 list_splice_init(list, &work_list); 385 spin_unlock_irq(&mdev->req_lock); 386 387 list_for_each_entry_safe(e, t, &work_list, w.list) { 388 drbd_free_ee(mdev, e); 389 count++; 390 } 391 return count; 392 } 393 394 395 /* 396 * This function is called from _asender only_ 397 * but see also comments in _req_mod(,barrier_acked) 398 * and receive_Barrier. 399 * 400 * Move entries from net_ee to done_ee, if ready. 401 * Grab done_ee, call all callbacks, free the entries. 402 * The callbacks typically send out ACKs. 403 */ 404 static int drbd_process_done_ee(struct drbd_conf *mdev) 405 { 406 LIST_HEAD(work_list); 407 LIST_HEAD(reclaimed); 408 struct drbd_epoch_entry *e, *t; 409 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS); 410 411 spin_lock_irq(&mdev->req_lock); 412 reclaim_net_ee(mdev, &reclaimed); 413 list_splice_init(&mdev->done_ee, &work_list); 414 spin_unlock_irq(&mdev->req_lock); 415 416 list_for_each_entry_safe(e, t, &reclaimed, w.list) 417 drbd_free_ee(mdev, e); 418 419 /* possible callbacks here: 420 * e_end_block, and e_end_resync_block, e_send_discard_ack. 421 * all ignore the last argument. 422 */ 423 list_for_each_entry_safe(e, t, &work_list, w.list) { 424 trace_drbd_ee(mdev, e, "process_done_ee"); 425 /* list_del not necessary, next/prev members not touched */ 426 ok = e->w.cb(mdev, &e->w, !ok) && ok; 427 drbd_free_ee(mdev, e); 428 } 429 wake_up(&mdev->ee_wait); 430 431 return ok; 432 } 433 434 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head) 435 { 436 DEFINE_WAIT(wait); 437 438 /* avoids spin_lock/unlock 439 * and calling prepare_to_wait in the fast path */ 440 while (!list_empty(head)) { 441 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE); 442 spin_unlock_irq(&mdev->req_lock); 443 drbd_kick_lo(mdev); 444 schedule(); 445 finish_wait(&mdev->ee_wait, &wait); 446 spin_lock_irq(&mdev->req_lock); 447 } 448 } 449 450 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head) 451 { 452 spin_lock_irq(&mdev->req_lock); 453 _drbd_wait_ee_list_empty(mdev, head); 454 spin_unlock_irq(&mdev->req_lock); 455 } 456 457 /* see also kernel_accept; which is only present since 2.6.18. 458 * also we want to log which part of it failed, exactly */ 459 static int drbd_accept(struct drbd_conf *mdev, const char **what, 460 struct socket *sock, struct socket **newsock) 461 { 462 struct sock *sk = sock->sk; 463 int err = 0; 464 465 *what = "listen"; 466 err = sock->ops->listen(sock, 5); 467 if (err < 0) 468 goto out; 469 470 *what = "sock_create_lite"; 471 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol, 472 newsock); 473 if (err < 0) 474 goto out; 475 476 *what = "accept"; 477 err = sock->ops->accept(sock, *newsock, 0); 478 if (err < 0) { 479 sock_release(*newsock); 480 *newsock = NULL; 481 goto out; 482 } 483 (*newsock)->ops = sock->ops; 484 485 out: 486 return err; 487 } 488 489 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock, 490 void *buf, size_t size, int flags) 491 { 492 mm_segment_t oldfs; 493 struct kvec iov = { 494 .iov_base = buf, 495 .iov_len = size, 496 }; 497 struct msghdr msg = { 498 .msg_iovlen = 1, 499 .msg_iov = (struct iovec *)&iov, 500 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL) 501 }; 502 int rv; 503 504 oldfs = get_fs(); 505 set_fs(KERNEL_DS); 506 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags); 507 set_fs(oldfs); 508 509 return rv; 510 } 511 512 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size) 513 { 514 mm_segment_t oldfs; 515 struct kvec iov = { 516 .iov_base = buf, 517 .iov_len = size, 518 }; 519 struct msghdr msg = { 520 .msg_iovlen = 1, 521 .msg_iov = (struct iovec *)&iov, 522 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL 523 }; 524 int rv; 525 526 oldfs = get_fs(); 527 set_fs(KERNEL_DS); 528 529 for (;;) { 530 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags); 531 if (rv == size) 532 break; 533 534 /* Note: 535 * ECONNRESET other side closed the connection 536 * ERESTARTSYS (on sock) we got a signal 537 */ 538 539 if (rv < 0) { 540 if (rv == -ECONNRESET) 541 dev_info(DEV, "sock was reset by peer\n"); 542 else if (rv != -ERESTARTSYS) 543 dev_err(DEV, "sock_recvmsg returned %d\n", rv); 544 break; 545 } else if (rv == 0) { 546 dev_info(DEV, "sock was shut down by peer\n"); 547 break; 548 } else { 549 /* signal came in, or peer/link went down, 550 * after we read a partial message 551 */ 552 /* D_ASSERT(signal_pending(current)); */ 553 break; 554 } 555 }; 556 557 set_fs(oldfs); 558 559 if (rv != size) 560 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE)); 561 562 return rv; 563 } 564 565 static struct socket *drbd_try_connect(struct drbd_conf *mdev) 566 { 567 const char *what; 568 struct socket *sock; 569 struct sockaddr_in6 src_in6; 570 int err; 571 int disconnect_on_error = 1; 572 573 if (!get_net_conf(mdev)) 574 return NULL; 575 576 what = "sock_create_kern"; 577 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family, 578 SOCK_STREAM, IPPROTO_TCP, &sock); 579 if (err < 0) { 580 sock = NULL; 581 goto out; 582 } 583 584 sock->sk->sk_rcvtimeo = 585 sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ; 586 587 /* explicitly bind to the configured IP as source IP 588 * for the outgoing connections. 589 * This is needed for multihomed hosts and to be 590 * able to use lo: interfaces for drbd. 591 * Make sure to use 0 as port number, so linux selects 592 * a free one dynamically. 593 */ 594 memcpy(&src_in6, mdev->net_conf->my_addr, 595 min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6))); 596 if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6) 597 src_in6.sin6_port = 0; 598 else 599 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */ 600 601 what = "bind before connect"; 602 err = sock->ops->bind(sock, 603 (struct sockaddr *) &src_in6, 604 mdev->net_conf->my_addr_len); 605 if (err < 0) 606 goto out; 607 608 /* connect may fail, peer not yet available. 609 * stay C_WF_CONNECTION, don't go Disconnecting! */ 610 disconnect_on_error = 0; 611 what = "connect"; 612 err = sock->ops->connect(sock, 613 (struct sockaddr *)mdev->net_conf->peer_addr, 614 mdev->net_conf->peer_addr_len, 0); 615 616 out: 617 if (err < 0) { 618 if (sock) { 619 sock_release(sock); 620 sock = NULL; 621 } 622 switch (-err) { 623 /* timeout, busy, signal pending */ 624 case ETIMEDOUT: case EAGAIN: case EINPROGRESS: 625 case EINTR: case ERESTARTSYS: 626 /* peer not (yet) available, network problem */ 627 case ECONNREFUSED: case ENETUNREACH: 628 case EHOSTDOWN: case EHOSTUNREACH: 629 disconnect_on_error = 0; 630 break; 631 default: 632 dev_err(DEV, "%s failed, err = %d\n", what, err); 633 } 634 if (disconnect_on_error) 635 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 636 } 637 put_net_conf(mdev); 638 return sock; 639 } 640 641 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev) 642 { 643 int timeo, err; 644 struct socket *s_estab = NULL, *s_listen; 645 const char *what; 646 647 if (!get_net_conf(mdev)) 648 return NULL; 649 650 what = "sock_create_kern"; 651 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family, 652 SOCK_STREAM, IPPROTO_TCP, &s_listen); 653 if (err) { 654 s_listen = NULL; 655 goto out; 656 } 657 658 timeo = mdev->net_conf->try_connect_int * HZ; 659 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */ 660 661 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */ 662 s_listen->sk->sk_rcvtimeo = timeo; 663 s_listen->sk->sk_sndtimeo = timeo; 664 665 what = "bind before listen"; 666 err = s_listen->ops->bind(s_listen, 667 (struct sockaddr *) mdev->net_conf->my_addr, 668 mdev->net_conf->my_addr_len); 669 if (err < 0) 670 goto out; 671 672 err = drbd_accept(mdev, &what, s_listen, &s_estab); 673 674 out: 675 if (s_listen) 676 sock_release(s_listen); 677 if (err < 0) { 678 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 679 dev_err(DEV, "%s failed, err = %d\n", what, err); 680 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 681 } 682 } 683 put_net_conf(mdev); 684 685 return s_estab; 686 } 687 688 static int drbd_send_fp(struct drbd_conf *mdev, 689 struct socket *sock, enum drbd_packets cmd) 690 { 691 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header; 692 693 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0); 694 } 695 696 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock) 697 { 698 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header; 699 int rr; 700 701 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0); 702 703 if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC) 704 return be16_to_cpu(h->command); 705 706 return 0xffff; 707 } 708 709 /** 710 * drbd_socket_okay() - Free the socket if its connection is not okay 711 * @mdev: DRBD device. 712 * @sock: pointer to the pointer to the socket. 713 */ 714 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock) 715 { 716 int rr; 717 char tb[4]; 718 719 if (!*sock) 720 return FALSE; 721 722 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK); 723 724 if (rr > 0 || rr == -EAGAIN) { 725 return TRUE; 726 } else { 727 sock_release(*sock); 728 *sock = NULL; 729 return FALSE; 730 } 731 } 732 733 /* 734 * return values: 735 * 1 yes, we have a valid connection 736 * 0 oops, did not work out, please try again 737 * -1 peer talks different language, 738 * no point in trying again, please go standalone. 739 * -2 We do not have a network config... 740 */ 741 static int drbd_connect(struct drbd_conf *mdev) 742 { 743 struct socket *s, *sock, *msock; 744 int try, h, ok; 745 746 D_ASSERT(!mdev->data.socket); 747 748 if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) 749 dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n"); 750 751 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS) 752 return -2; 753 754 clear_bit(DISCARD_CONCURRENT, &mdev->flags); 755 756 sock = NULL; 757 msock = NULL; 758 759 do { 760 for (try = 0;;) { 761 /* 3 tries, this should take less than a second! */ 762 s = drbd_try_connect(mdev); 763 if (s || ++try >= 3) 764 break; 765 /* give the other side time to call bind() & listen() */ 766 __set_current_state(TASK_INTERRUPTIBLE); 767 schedule_timeout(HZ / 10); 768 } 769 770 if (s) { 771 if (!sock) { 772 drbd_send_fp(mdev, s, P_HAND_SHAKE_S); 773 sock = s; 774 s = NULL; 775 } else if (!msock) { 776 drbd_send_fp(mdev, s, P_HAND_SHAKE_M); 777 msock = s; 778 s = NULL; 779 } else { 780 dev_err(DEV, "Logic error in drbd_connect()\n"); 781 goto out_release_sockets; 782 } 783 } 784 785 if (sock && msock) { 786 __set_current_state(TASK_INTERRUPTIBLE); 787 schedule_timeout(HZ / 10); 788 ok = drbd_socket_okay(mdev, &sock); 789 ok = drbd_socket_okay(mdev, &msock) && ok; 790 if (ok) 791 break; 792 } 793 794 retry: 795 s = drbd_wait_for_connect(mdev); 796 if (s) { 797 try = drbd_recv_fp(mdev, s); 798 drbd_socket_okay(mdev, &sock); 799 drbd_socket_okay(mdev, &msock); 800 switch (try) { 801 case P_HAND_SHAKE_S: 802 if (sock) { 803 dev_warn(DEV, "initial packet S crossed\n"); 804 sock_release(sock); 805 } 806 sock = s; 807 break; 808 case P_HAND_SHAKE_M: 809 if (msock) { 810 dev_warn(DEV, "initial packet M crossed\n"); 811 sock_release(msock); 812 } 813 msock = s; 814 set_bit(DISCARD_CONCURRENT, &mdev->flags); 815 break; 816 default: 817 dev_warn(DEV, "Error receiving initial packet\n"); 818 sock_release(s); 819 if (random32() & 1) 820 goto retry; 821 } 822 } 823 824 if (mdev->state.conn <= C_DISCONNECTING) 825 goto out_release_sockets; 826 if (signal_pending(current)) { 827 flush_signals(current); 828 smp_rmb(); 829 if (get_t_state(&mdev->receiver) == Exiting) 830 goto out_release_sockets; 831 } 832 833 if (sock && msock) { 834 ok = drbd_socket_okay(mdev, &sock); 835 ok = drbd_socket_okay(mdev, &msock) && ok; 836 if (ok) 837 break; 838 } 839 } while (1); 840 841 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */ 842 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */ 843 844 sock->sk->sk_allocation = GFP_NOIO; 845 msock->sk->sk_allocation = GFP_NOIO; 846 847 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK; 848 msock->sk->sk_priority = TC_PRIO_INTERACTIVE; 849 850 if (mdev->net_conf->sndbuf_size) { 851 sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size; 852 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 853 } 854 855 if (mdev->net_conf->rcvbuf_size) { 856 sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size; 857 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 858 } 859 860 /* NOT YET ... 861 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10; 862 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 863 * first set it to the P_HAND_SHAKE timeout, 864 * which we set to 4x the configured ping_timeout. */ 865 sock->sk->sk_sndtimeo = 866 sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10; 867 868 msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10; 869 msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ; 870 871 /* we don't want delays. 872 * we use TCP_CORK where apropriate, though */ 873 drbd_tcp_nodelay(sock); 874 drbd_tcp_nodelay(msock); 875 876 mdev->data.socket = sock; 877 mdev->meta.socket = msock; 878 mdev->last_received = jiffies; 879 880 D_ASSERT(mdev->asender.task == NULL); 881 882 h = drbd_do_handshake(mdev); 883 if (h <= 0) 884 return h; 885 886 if (mdev->cram_hmac_tfm) { 887 /* drbd_request_state(mdev, NS(conn, WFAuth)); */ 888 if (!drbd_do_auth(mdev)) { 889 dev_err(DEV, "Authentication of peer failed\n"); 890 return -1; 891 } 892 } 893 894 if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS) 895 return 0; 896 897 sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10; 898 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 899 900 atomic_set(&mdev->packet_seq, 0); 901 mdev->peer_seq = 0; 902 903 drbd_thread_start(&mdev->asender); 904 905 drbd_send_protocol(mdev); 906 drbd_send_sync_param(mdev, &mdev->sync_conf); 907 drbd_send_sizes(mdev, 0); 908 drbd_send_uuids(mdev); 909 drbd_send_state(mdev); 910 clear_bit(USE_DEGR_WFC_T, &mdev->flags); 911 clear_bit(RESIZE_PENDING, &mdev->flags); 912 913 return 1; 914 915 out_release_sockets: 916 if (sock) 917 sock_release(sock); 918 if (msock) 919 sock_release(msock); 920 return -1; 921 } 922 923 static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h) 924 { 925 int r; 926 927 r = drbd_recv(mdev, h, sizeof(*h)); 928 929 if (unlikely(r != sizeof(*h))) { 930 dev_err(DEV, "short read expecting header on sock: r=%d\n", r); 931 return FALSE; 932 }; 933 h->command = be16_to_cpu(h->command); 934 h->length = be16_to_cpu(h->length); 935 if (unlikely(h->magic != BE_DRBD_MAGIC)) { 936 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n", 937 (long)be32_to_cpu(h->magic), 938 h->command, h->length); 939 return FALSE; 940 } 941 mdev->last_received = jiffies; 942 943 return TRUE; 944 } 945 946 static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch) 947 { 948 int rv; 949 950 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) { 951 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, NULL); 952 if (rv) { 953 dev_err(DEV, "local disk flush failed with status %d\n", rv); 954 /* would rather check on EOPNOTSUPP, but that is not reliable. 955 * don't try again for ANY return value != 0 956 * if (rv == -EOPNOTSUPP) */ 957 drbd_bump_write_ordering(mdev, WO_drain_io); 958 } 959 put_ldev(mdev); 960 } 961 962 return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE); 963 } 964 965 static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 966 { 967 struct flush_work *fw = (struct flush_work *)w; 968 struct drbd_epoch *epoch = fw->epoch; 969 970 kfree(w); 971 972 if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags)) 973 drbd_flush_after_epoch(mdev, epoch); 974 975 drbd_may_finish_epoch(mdev, epoch, EV_PUT | 976 (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0)); 977 978 return 1; 979 } 980 981 /** 982 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it. 983 * @mdev: DRBD device. 984 * @epoch: Epoch object. 985 * @ev: Epoch event. 986 */ 987 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev, 988 struct drbd_epoch *epoch, 989 enum epoch_event ev) 990 { 991 int finish, epoch_size; 992 struct drbd_epoch *next_epoch; 993 int schedule_flush = 0; 994 enum finish_epoch rv = FE_STILL_LIVE; 995 996 spin_lock(&mdev->epoch_lock); 997 do { 998 next_epoch = NULL; 999 finish = 0; 1000 1001 epoch_size = atomic_read(&epoch->epoch_size); 1002 1003 switch (ev & ~EV_CLEANUP) { 1004 case EV_PUT: 1005 atomic_dec(&epoch->active); 1006 break; 1007 case EV_GOT_BARRIER_NR: 1008 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags); 1009 1010 /* Special case: If we just switched from WO_bio_barrier to 1011 WO_bdev_flush we should not finish the current epoch */ 1012 if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 && 1013 mdev->write_ordering != WO_bio_barrier && 1014 epoch == mdev->current_epoch) 1015 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags); 1016 break; 1017 case EV_BARRIER_DONE: 1018 set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags); 1019 break; 1020 case EV_BECAME_LAST: 1021 /* nothing to do*/ 1022 break; 1023 } 1024 1025 trace_drbd_epoch(mdev, epoch, ev); 1026 1027 if (epoch_size != 0 && 1028 atomic_read(&epoch->active) == 0 && 1029 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) && 1030 epoch->list.prev == &mdev->current_epoch->list && 1031 !test_bit(DE_IS_FINISHING, &epoch->flags)) { 1032 /* Nearly all conditions are met to finish that epoch... */ 1033 if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) || 1034 mdev->write_ordering == WO_none || 1035 (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) || 1036 ev & EV_CLEANUP) { 1037 finish = 1; 1038 set_bit(DE_IS_FINISHING, &epoch->flags); 1039 } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) && 1040 mdev->write_ordering == WO_bio_barrier) { 1041 atomic_inc(&epoch->active); 1042 schedule_flush = 1; 1043 } 1044 } 1045 if (finish) { 1046 if (!(ev & EV_CLEANUP)) { 1047 spin_unlock(&mdev->epoch_lock); 1048 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size); 1049 spin_lock(&mdev->epoch_lock); 1050 } 1051 dec_unacked(mdev); 1052 1053 if (mdev->current_epoch != epoch) { 1054 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list); 1055 list_del(&epoch->list); 1056 ev = EV_BECAME_LAST | (ev & EV_CLEANUP); 1057 mdev->epochs--; 1058 trace_drbd_epoch(mdev, epoch, EV_TRACE_FREE); 1059 kfree(epoch); 1060 1061 if (rv == FE_STILL_LIVE) 1062 rv = FE_DESTROYED; 1063 } else { 1064 epoch->flags = 0; 1065 atomic_set(&epoch->epoch_size, 0); 1066 /* atomic_set(&epoch->active, 0); is alrady zero */ 1067 if (rv == FE_STILL_LIVE) 1068 rv = FE_RECYCLED; 1069 } 1070 } 1071 1072 if (!next_epoch) 1073 break; 1074 1075 epoch = next_epoch; 1076 } while (1); 1077 1078 spin_unlock(&mdev->epoch_lock); 1079 1080 if (schedule_flush) { 1081 struct flush_work *fw; 1082 fw = kmalloc(sizeof(*fw), GFP_ATOMIC); 1083 if (fw) { 1084 trace_drbd_epoch(mdev, epoch, EV_TRACE_FLUSH); 1085 fw->w.cb = w_flush; 1086 fw->epoch = epoch; 1087 drbd_queue_work(&mdev->data.work, &fw->w); 1088 } else { 1089 dev_warn(DEV, "Could not kmalloc a flush_work obj\n"); 1090 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags); 1091 /* That is not a recursion, only one level */ 1092 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE); 1093 drbd_may_finish_epoch(mdev, epoch, EV_PUT); 1094 } 1095 } 1096 1097 return rv; 1098 } 1099 1100 /** 1101 * drbd_bump_write_ordering() - Fall back to an other write ordering method 1102 * @mdev: DRBD device. 1103 * @wo: Write ordering method to try. 1104 */ 1105 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local) 1106 { 1107 enum write_ordering_e pwo; 1108 static char *write_ordering_str[] = { 1109 [WO_none] = "none", 1110 [WO_drain_io] = "drain", 1111 [WO_bdev_flush] = "flush", 1112 [WO_bio_barrier] = "barrier", 1113 }; 1114 1115 pwo = mdev->write_ordering; 1116 wo = min(pwo, wo); 1117 if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier) 1118 wo = WO_bdev_flush; 1119 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush) 1120 wo = WO_drain_io; 1121 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain) 1122 wo = WO_none; 1123 mdev->write_ordering = wo; 1124 if (pwo != mdev->write_ordering || wo == WO_bio_barrier) 1125 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]); 1126 } 1127 1128 /** 1129 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set 1130 * @mdev: DRBD device. 1131 * @w: work object. 1132 * @cancel: The connection will be closed anyways (unused in this callback) 1133 */ 1134 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local) 1135 { 1136 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; 1137 struct bio *bio = e->private_bio; 1138 1139 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place, 1140 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch) 1141 so that we can finish that epoch in drbd_may_finish_epoch(). 1142 That is necessary if we already have a long chain of Epochs, before 1143 we realize that BIO_RW_BARRIER is actually not supported */ 1144 1145 /* As long as the -ENOTSUPP on the barrier is reported immediately 1146 that will never trigger. If it is reported late, we will just 1147 print that warning and continue correctly for all future requests 1148 with WO_bdev_flush */ 1149 if (previous_epoch(mdev, e->epoch)) 1150 dev_warn(DEV, "Write ordering was not enforced (one time event)\n"); 1151 1152 /* prepare bio for re-submit, 1153 * re-init volatile members */ 1154 /* we still have a local reference, 1155 * get_ldev was done in receive_Data. */ 1156 bio->bi_bdev = mdev->ldev->backing_bdev; 1157 bio->bi_sector = e->sector; 1158 bio->bi_size = e->size; 1159 bio->bi_idx = 0; 1160 1161 bio->bi_flags &= ~(BIO_POOL_MASK - 1); 1162 bio->bi_flags |= 1 << BIO_UPTODATE; 1163 1164 /* don't know whether this is necessary: */ 1165 bio->bi_phys_segments = 0; 1166 bio->bi_next = NULL; 1167 1168 /* these should be unchanged: */ 1169 /* bio->bi_end_io = drbd_endio_write_sec; */ 1170 /* bio->bi_vcnt = whatever; */ 1171 1172 e->w.cb = e_end_block; 1173 1174 /* This is no longer a barrier request. */ 1175 bio->bi_rw &= ~(1UL << BIO_RW_BARRIER); 1176 1177 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio); 1178 1179 return 1; 1180 } 1181 1182 static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h) 1183 { 1184 int rv, issue_flush; 1185 struct p_barrier *p = (struct p_barrier *)h; 1186 struct drbd_epoch *epoch; 1187 1188 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; 1189 1190 rv = drbd_recv(mdev, h->payload, h->length); 1191 ERR_IF(rv != h->length) return FALSE; 1192 1193 inc_unacked(mdev); 1194 1195 if (mdev->net_conf->wire_protocol != DRBD_PROT_C) 1196 drbd_kick_lo(mdev); 1197 1198 mdev->current_epoch->barrier_nr = p->barrier; 1199 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR); 1200 1201 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from 1202 * the activity log, which means it would not be resynced in case the 1203 * R_PRIMARY crashes now. 1204 * Therefore we must send the barrier_ack after the barrier request was 1205 * completed. */ 1206 switch (mdev->write_ordering) { 1207 case WO_bio_barrier: 1208 case WO_none: 1209 if (rv == FE_RECYCLED) 1210 return TRUE; 1211 break; 1212 1213 case WO_bdev_flush: 1214 case WO_drain_io: 1215 D_ASSERT(rv == FE_STILL_LIVE); 1216 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags); 1217 drbd_wait_ee_list_empty(mdev, &mdev->active_ee); 1218 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch); 1219 if (rv == FE_RECYCLED) 1220 return TRUE; 1221 1222 /* The asender will send all the ACKs and barrier ACKs out, since 1223 all EEs moved from the active_ee to the done_ee. We need to 1224 provide a new epoch object for the EEs that come in soon */ 1225 break; 1226 } 1227 1228 /* receiver context, in the writeout path of the other node. 1229 * avoid potential distributed deadlock */ 1230 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); 1231 if (!epoch) { 1232 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n"); 1233 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags); 1234 drbd_wait_ee_list_empty(mdev, &mdev->active_ee); 1235 if (issue_flush) { 1236 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch); 1237 if (rv == FE_RECYCLED) 1238 return TRUE; 1239 } 1240 1241 drbd_wait_ee_list_empty(mdev, &mdev->done_ee); 1242 1243 return TRUE; 1244 } 1245 1246 epoch->flags = 0; 1247 atomic_set(&epoch->epoch_size, 0); 1248 atomic_set(&epoch->active, 0); 1249 1250 spin_lock(&mdev->epoch_lock); 1251 if (atomic_read(&mdev->current_epoch->epoch_size)) { 1252 list_add(&epoch->list, &mdev->current_epoch->list); 1253 mdev->current_epoch = epoch; 1254 mdev->epochs++; 1255 trace_drbd_epoch(mdev, epoch, EV_TRACE_ALLOC); 1256 } else { 1257 /* The current_epoch got recycled while we allocated this one... */ 1258 kfree(epoch); 1259 } 1260 spin_unlock(&mdev->epoch_lock); 1261 1262 return TRUE; 1263 } 1264 1265 /* used from receive_RSDataReply (recv_resync_read) 1266 * and from receive_Data */ 1267 static struct drbd_epoch_entry * 1268 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local) 1269 { 1270 struct drbd_epoch_entry *e; 1271 struct bio_vec *bvec; 1272 struct page *page; 1273 struct bio *bio; 1274 int dgs, ds, i, rr; 1275 void *dig_in = mdev->int_dig_in; 1276 void *dig_vv = mdev->int_dig_vv; 1277 1278 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ? 1279 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0; 1280 1281 if (dgs) { 1282 rr = drbd_recv(mdev, dig_in, dgs); 1283 if (rr != dgs) { 1284 dev_warn(DEV, "short read receiving data digest: read %d expected %d\n", 1285 rr, dgs); 1286 return NULL; 1287 } 1288 } 1289 1290 data_size -= dgs; 1291 1292 ERR_IF(data_size & 0x1ff) return NULL; 1293 ERR_IF(data_size > DRBD_MAX_SEGMENT_SIZE) return NULL; 1294 1295 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 1296 * "criss-cross" setup, that might cause write-out on some other DRBD, 1297 * which in turn might block on the other node at this very place. */ 1298 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO); 1299 if (!e) 1300 return NULL; 1301 bio = e->private_bio; 1302 ds = data_size; 1303 bio_for_each_segment(bvec, bio, i) { 1304 page = bvec->bv_page; 1305 rr = drbd_recv(mdev, kmap(page), min_t(int, ds, PAGE_SIZE)); 1306 kunmap(page); 1307 if (rr != min_t(int, ds, PAGE_SIZE)) { 1308 drbd_free_ee(mdev, e); 1309 dev_warn(DEV, "short read receiving data: read %d expected %d\n", 1310 rr, min_t(int, ds, PAGE_SIZE)); 1311 return NULL; 1312 } 1313 ds -= rr; 1314 } 1315 1316 if (dgs) { 1317 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); 1318 if (memcmp(dig_in, dig_vv, dgs)) { 1319 dev_err(DEV, "Digest integrity check FAILED.\n"); 1320 drbd_bcast_ee(mdev, "digest failed", 1321 dgs, dig_in, dig_vv, e); 1322 drbd_free_ee(mdev, e); 1323 return NULL; 1324 } 1325 } 1326 mdev->recv_cnt += data_size>>9; 1327 return e; 1328 } 1329 1330 /* drbd_drain_block() just takes a data block 1331 * out of the socket input buffer, and discards it. 1332 */ 1333 static int drbd_drain_block(struct drbd_conf *mdev, int data_size) 1334 { 1335 struct page *page; 1336 int rr, rv = 1; 1337 void *data; 1338 1339 page = drbd_pp_alloc(mdev, 1); 1340 1341 data = kmap(page); 1342 while (data_size) { 1343 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE)); 1344 if (rr != min_t(int, data_size, PAGE_SIZE)) { 1345 rv = 0; 1346 dev_warn(DEV, "short read receiving data: read %d expected %d\n", 1347 rr, min_t(int, data_size, PAGE_SIZE)); 1348 break; 1349 } 1350 data_size -= rr; 1351 } 1352 kunmap(page); 1353 drbd_pp_free(mdev, page); 1354 return rv; 1355 } 1356 1357 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req, 1358 sector_t sector, int data_size) 1359 { 1360 struct bio_vec *bvec; 1361 struct bio *bio; 1362 int dgs, rr, i, expect; 1363 void *dig_in = mdev->int_dig_in; 1364 void *dig_vv = mdev->int_dig_vv; 1365 1366 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ? 1367 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0; 1368 1369 if (dgs) { 1370 rr = drbd_recv(mdev, dig_in, dgs); 1371 if (rr != dgs) { 1372 dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n", 1373 rr, dgs); 1374 return 0; 1375 } 1376 } 1377 1378 data_size -= dgs; 1379 1380 /* optimistically update recv_cnt. if receiving fails below, 1381 * we disconnect anyways, and counters will be reset. */ 1382 mdev->recv_cnt += data_size>>9; 1383 1384 bio = req->master_bio; 1385 D_ASSERT(sector == bio->bi_sector); 1386 1387 bio_for_each_segment(bvec, bio, i) { 1388 expect = min_t(int, data_size, bvec->bv_len); 1389 rr = drbd_recv(mdev, 1390 kmap(bvec->bv_page)+bvec->bv_offset, 1391 expect); 1392 kunmap(bvec->bv_page); 1393 if (rr != expect) { 1394 dev_warn(DEV, "short read receiving data reply: " 1395 "read %d expected %d\n", 1396 rr, expect); 1397 return 0; 1398 } 1399 data_size -= rr; 1400 } 1401 1402 if (dgs) { 1403 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); 1404 if (memcmp(dig_in, dig_vv, dgs)) { 1405 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n"); 1406 return 0; 1407 } 1408 } 1409 1410 D_ASSERT(data_size == 0); 1411 return 1; 1412 } 1413 1414 /* e_end_resync_block() is called via 1415 * drbd_process_done_ee() by asender only */ 1416 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused) 1417 { 1418 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; 1419 sector_t sector = e->sector; 1420 int ok; 1421 1422 D_ASSERT(hlist_unhashed(&e->colision)); 1423 1424 if (likely(drbd_bio_uptodate(e->private_bio))) { 1425 drbd_set_in_sync(mdev, sector, e->size); 1426 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e); 1427 } else { 1428 /* Record failure to sync */ 1429 drbd_rs_failed_io(mdev, sector, e->size); 1430 1431 ok = drbd_send_ack(mdev, P_NEG_ACK, e); 1432 } 1433 dec_unacked(mdev); 1434 1435 return ok; 1436 } 1437 1438 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local) 1439 { 1440 struct drbd_epoch_entry *e; 1441 1442 e = read_in_block(mdev, ID_SYNCER, sector, data_size); 1443 if (!e) { 1444 put_ldev(mdev); 1445 return FALSE; 1446 } 1447 1448 dec_rs_pending(mdev); 1449 1450 e->private_bio->bi_end_io = drbd_endio_write_sec; 1451 e->private_bio->bi_rw = WRITE; 1452 e->w.cb = e_end_resync_block; 1453 1454 inc_unacked(mdev); 1455 /* corresponding dec_unacked() in e_end_resync_block() 1456 * respective _drbd_clear_done_ee */ 1457 1458 spin_lock_irq(&mdev->req_lock); 1459 list_add(&e->w.list, &mdev->sync_ee); 1460 spin_unlock_irq(&mdev->req_lock); 1461 1462 trace_drbd_ee(mdev, e, "submitting for (rs)write"); 1463 trace_drbd_bio(mdev, "Sec", e->private_bio, 0, NULL); 1464 drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio); 1465 /* accounting done in endio */ 1466 1467 maybe_kick_lo(mdev); 1468 return TRUE; 1469 } 1470 1471 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h) 1472 { 1473 struct drbd_request *req; 1474 sector_t sector; 1475 unsigned int header_size, data_size; 1476 int ok; 1477 struct p_data *p = (struct p_data *)h; 1478 1479 header_size = sizeof(*p) - sizeof(*h); 1480 data_size = h->length - header_size; 1481 1482 ERR_IF(data_size == 0) return FALSE; 1483 1484 if (drbd_recv(mdev, h->payload, header_size) != header_size) 1485 return FALSE; 1486 1487 sector = be64_to_cpu(p->sector); 1488 1489 spin_lock_irq(&mdev->req_lock); 1490 req = _ar_id_to_req(mdev, p->block_id, sector); 1491 spin_unlock_irq(&mdev->req_lock); 1492 if (unlikely(!req)) { 1493 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n"); 1494 return FALSE; 1495 } 1496 1497 /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid 1498 * special casing it there for the various failure cases. 1499 * still no race with drbd_fail_pending_reads */ 1500 ok = recv_dless_read(mdev, req, sector, data_size); 1501 1502 if (ok) 1503 req_mod(req, data_received); 1504 /* else: nothing. handled from drbd_disconnect... 1505 * I don't think we may complete this just yet 1506 * in case we are "on-disconnect: freeze" */ 1507 1508 return ok; 1509 } 1510 1511 static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h) 1512 { 1513 sector_t sector; 1514 unsigned int header_size, data_size; 1515 int ok; 1516 struct p_data *p = (struct p_data *)h; 1517 1518 header_size = sizeof(*p) - sizeof(*h); 1519 data_size = h->length - header_size; 1520 1521 ERR_IF(data_size == 0) return FALSE; 1522 1523 if (drbd_recv(mdev, h->payload, header_size) != header_size) 1524 return FALSE; 1525 1526 sector = be64_to_cpu(p->sector); 1527 D_ASSERT(p->block_id == ID_SYNCER); 1528 1529 if (get_ldev(mdev)) { 1530 /* data is submitted to disk within recv_resync_read. 1531 * corresponding put_ldev done below on error, 1532 * or in drbd_endio_write_sec. */ 1533 ok = recv_resync_read(mdev, sector, data_size); 1534 } else { 1535 if (__ratelimit(&drbd_ratelimit_state)) 1536 dev_err(DEV, "Can not write resync data to local disk.\n"); 1537 1538 ok = drbd_drain_block(mdev, data_size); 1539 1540 drbd_send_ack_dp(mdev, P_NEG_ACK, p); 1541 } 1542 1543 return ok; 1544 } 1545 1546 /* e_end_block() is called via drbd_process_done_ee(). 1547 * this means this function only runs in the asender thread 1548 */ 1549 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1550 { 1551 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; 1552 sector_t sector = e->sector; 1553 struct drbd_epoch *epoch; 1554 int ok = 1, pcmd; 1555 1556 if (e->flags & EE_IS_BARRIER) { 1557 epoch = previous_epoch(mdev, e->epoch); 1558 if (epoch) 1559 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0)); 1560 } 1561 1562 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) { 1563 if (likely(drbd_bio_uptodate(e->private_bio))) { 1564 pcmd = (mdev->state.conn >= C_SYNC_SOURCE && 1565 mdev->state.conn <= C_PAUSED_SYNC_T && 1566 e->flags & EE_MAY_SET_IN_SYNC) ? 1567 P_RS_WRITE_ACK : P_WRITE_ACK; 1568 ok &= drbd_send_ack(mdev, pcmd, e); 1569 if (pcmd == P_RS_WRITE_ACK) 1570 drbd_set_in_sync(mdev, sector, e->size); 1571 } else { 1572 ok = drbd_send_ack(mdev, P_NEG_ACK, e); 1573 /* we expect it to be marked out of sync anyways... 1574 * maybe assert this? */ 1575 } 1576 dec_unacked(mdev); 1577 } 1578 /* we delete from the conflict detection hash _after_ we sent out the 1579 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */ 1580 if (mdev->net_conf->two_primaries) { 1581 spin_lock_irq(&mdev->req_lock); 1582 D_ASSERT(!hlist_unhashed(&e->colision)); 1583 hlist_del_init(&e->colision); 1584 spin_unlock_irq(&mdev->req_lock); 1585 } else { 1586 D_ASSERT(hlist_unhashed(&e->colision)); 1587 } 1588 1589 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0)); 1590 1591 return ok; 1592 } 1593 1594 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused) 1595 { 1596 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; 1597 int ok = 1; 1598 1599 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C); 1600 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e); 1601 1602 spin_lock_irq(&mdev->req_lock); 1603 D_ASSERT(!hlist_unhashed(&e->colision)); 1604 hlist_del_init(&e->colision); 1605 spin_unlock_irq(&mdev->req_lock); 1606 1607 dec_unacked(mdev); 1608 1609 return ok; 1610 } 1611 1612 /* Called from receive_Data. 1613 * Synchronize packets on sock with packets on msock. 1614 * 1615 * This is here so even when a P_DATA packet traveling via sock overtook an Ack 1616 * packet traveling on msock, they are still processed in the order they have 1617 * been sent. 1618 * 1619 * Note: we don't care for Ack packets overtaking P_DATA packets. 1620 * 1621 * In case packet_seq is larger than mdev->peer_seq number, there are 1622 * outstanding packets on the msock. We wait for them to arrive. 1623 * In case we are the logically next packet, we update mdev->peer_seq 1624 * ourselves. Correctly handles 32bit wrap around. 1625 * 1626 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second, 1627 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds 1628 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have 1629 * 1<<9 == 512 seconds aka ages for the 32bit wrap around... 1630 * 1631 * returns 0 if we may process the packet, 1632 * -ERESTARTSYS if we were interrupted (by disconnect signal). */ 1633 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq) 1634 { 1635 DEFINE_WAIT(wait); 1636 unsigned int p_seq; 1637 long timeout; 1638 int ret = 0; 1639 spin_lock(&mdev->peer_seq_lock); 1640 for (;;) { 1641 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE); 1642 if (seq_le(packet_seq, mdev->peer_seq+1)) 1643 break; 1644 if (signal_pending(current)) { 1645 ret = -ERESTARTSYS; 1646 break; 1647 } 1648 p_seq = mdev->peer_seq; 1649 spin_unlock(&mdev->peer_seq_lock); 1650 timeout = schedule_timeout(30*HZ); 1651 spin_lock(&mdev->peer_seq_lock); 1652 if (timeout == 0 && p_seq == mdev->peer_seq) { 1653 ret = -ETIMEDOUT; 1654 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n"); 1655 break; 1656 } 1657 } 1658 finish_wait(&mdev->seq_wait, &wait); 1659 if (mdev->peer_seq+1 == packet_seq) 1660 mdev->peer_seq++; 1661 spin_unlock(&mdev->peer_seq_lock); 1662 return ret; 1663 } 1664 1665 /* mirrored write */ 1666 static int receive_Data(struct drbd_conf *mdev, struct p_header *h) 1667 { 1668 sector_t sector; 1669 struct drbd_epoch_entry *e; 1670 struct p_data *p = (struct p_data *)h; 1671 int header_size, data_size; 1672 int rw = WRITE; 1673 u32 dp_flags; 1674 1675 header_size = sizeof(*p) - sizeof(*h); 1676 data_size = h->length - header_size; 1677 1678 ERR_IF(data_size == 0) return FALSE; 1679 1680 if (drbd_recv(mdev, h->payload, header_size) != header_size) 1681 return FALSE; 1682 1683 if (!get_ldev(mdev)) { 1684 if (__ratelimit(&drbd_ratelimit_state)) 1685 dev_err(DEV, "Can not write mirrored data block " 1686 "to local disk.\n"); 1687 spin_lock(&mdev->peer_seq_lock); 1688 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num)) 1689 mdev->peer_seq++; 1690 spin_unlock(&mdev->peer_seq_lock); 1691 1692 drbd_send_ack_dp(mdev, P_NEG_ACK, p); 1693 atomic_inc(&mdev->current_epoch->epoch_size); 1694 return drbd_drain_block(mdev, data_size); 1695 } 1696 1697 /* get_ldev(mdev) successful. 1698 * Corresponding put_ldev done either below (on various errors), 1699 * or in drbd_endio_write_sec, if we successfully submit the data at 1700 * the end of this function. */ 1701 1702 sector = be64_to_cpu(p->sector); 1703 e = read_in_block(mdev, p->block_id, sector, data_size); 1704 if (!e) { 1705 put_ldev(mdev); 1706 return FALSE; 1707 } 1708 1709 e->private_bio->bi_end_io = drbd_endio_write_sec; 1710 e->w.cb = e_end_block; 1711 1712 spin_lock(&mdev->epoch_lock); 1713 e->epoch = mdev->current_epoch; 1714 atomic_inc(&e->epoch->epoch_size); 1715 atomic_inc(&e->epoch->active); 1716 1717 if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) { 1718 struct drbd_epoch *epoch; 1719 /* Issue a barrier if we start a new epoch, and the previous epoch 1720 was not a epoch containing a single request which already was 1721 a Barrier. */ 1722 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list); 1723 if (epoch == e->epoch) { 1724 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags); 1725 trace_drbd_epoch(mdev, e->epoch, EV_TRACE_ADD_BARRIER); 1726 rw |= (1<<BIO_RW_BARRIER); 1727 e->flags |= EE_IS_BARRIER; 1728 } else { 1729 if (atomic_read(&epoch->epoch_size) > 1 || 1730 !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) { 1731 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags); 1732 trace_drbd_epoch(mdev, epoch, EV_TRACE_SETTING_BI); 1733 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags); 1734 trace_drbd_epoch(mdev, e->epoch, EV_TRACE_ADD_BARRIER); 1735 rw |= (1<<BIO_RW_BARRIER); 1736 e->flags |= EE_IS_BARRIER; 1737 } 1738 } 1739 } 1740 spin_unlock(&mdev->epoch_lock); 1741 1742 dp_flags = be32_to_cpu(p->dp_flags); 1743 if (dp_flags & DP_HARDBARRIER) { 1744 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n"); 1745 /* rw |= (1<<BIO_RW_BARRIER); */ 1746 } 1747 if (dp_flags & DP_RW_SYNC) 1748 rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG); 1749 if (dp_flags & DP_MAY_SET_IN_SYNC) 1750 e->flags |= EE_MAY_SET_IN_SYNC; 1751 1752 /* I'm the receiver, I do hold a net_cnt reference. */ 1753 if (!mdev->net_conf->two_primaries) { 1754 spin_lock_irq(&mdev->req_lock); 1755 } else { 1756 /* don't get the req_lock yet, 1757 * we may sleep in drbd_wait_peer_seq */ 1758 const int size = e->size; 1759 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags); 1760 DEFINE_WAIT(wait); 1761 struct drbd_request *i; 1762 struct hlist_node *n; 1763 struct hlist_head *slot; 1764 int first; 1765 1766 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C); 1767 BUG_ON(mdev->ee_hash == NULL); 1768 BUG_ON(mdev->tl_hash == NULL); 1769 1770 /* conflict detection and handling: 1771 * 1. wait on the sequence number, 1772 * in case this data packet overtook ACK packets. 1773 * 2. check our hash tables for conflicting requests. 1774 * we only need to walk the tl_hash, since an ee can not 1775 * have a conflict with an other ee: on the submitting 1776 * node, the corresponding req had already been conflicting, 1777 * and a conflicting req is never sent. 1778 * 1779 * Note: for two_primaries, we are protocol C, 1780 * so there cannot be any request that is DONE 1781 * but still on the transfer log. 1782 * 1783 * unconditionally add to the ee_hash. 1784 * 1785 * if no conflicting request is found: 1786 * submit. 1787 * 1788 * if any conflicting request is found 1789 * that has not yet been acked, 1790 * AND I have the "discard concurrent writes" flag: 1791 * queue (via done_ee) the P_DISCARD_ACK; OUT. 1792 * 1793 * if any conflicting request is found: 1794 * block the receiver, waiting on misc_wait 1795 * until no more conflicting requests are there, 1796 * or we get interrupted (disconnect). 1797 * 1798 * we do not just write after local io completion of those 1799 * requests, but only after req is done completely, i.e. 1800 * we wait for the P_DISCARD_ACK to arrive! 1801 * 1802 * then proceed normally, i.e. submit. 1803 */ 1804 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num))) 1805 goto out_interrupted; 1806 1807 spin_lock_irq(&mdev->req_lock); 1808 1809 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector)); 1810 1811 #define OVERLAPS overlaps(i->sector, i->size, sector, size) 1812 slot = tl_hash_slot(mdev, sector); 1813 first = 1; 1814 for (;;) { 1815 int have_unacked = 0; 1816 int have_conflict = 0; 1817 prepare_to_wait(&mdev->misc_wait, &wait, 1818 TASK_INTERRUPTIBLE); 1819 hlist_for_each_entry(i, n, slot, colision) { 1820 if (OVERLAPS) { 1821 /* only ALERT on first iteration, 1822 * we may be woken up early... */ 1823 if (first) 1824 dev_alert(DEV, "%s[%u] Concurrent local write detected!" 1825 " new: %llus +%u; pending: %llus +%u\n", 1826 current->comm, current->pid, 1827 (unsigned long long)sector, size, 1828 (unsigned long long)i->sector, i->size); 1829 if (i->rq_state & RQ_NET_PENDING) 1830 ++have_unacked; 1831 ++have_conflict; 1832 } 1833 } 1834 #undef OVERLAPS 1835 if (!have_conflict) 1836 break; 1837 1838 /* Discard Ack only for the _first_ iteration */ 1839 if (first && discard && have_unacked) { 1840 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n", 1841 (unsigned long long)sector); 1842 inc_unacked(mdev); 1843 e->w.cb = e_send_discard_ack; 1844 list_add_tail(&e->w.list, &mdev->done_ee); 1845 1846 spin_unlock_irq(&mdev->req_lock); 1847 1848 /* we could probably send that P_DISCARD_ACK ourselves, 1849 * but I don't like the receiver using the msock */ 1850 1851 put_ldev(mdev); 1852 wake_asender(mdev); 1853 finish_wait(&mdev->misc_wait, &wait); 1854 return TRUE; 1855 } 1856 1857 if (signal_pending(current)) { 1858 hlist_del_init(&e->colision); 1859 1860 spin_unlock_irq(&mdev->req_lock); 1861 1862 finish_wait(&mdev->misc_wait, &wait); 1863 goto out_interrupted; 1864 } 1865 1866 spin_unlock_irq(&mdev->req_lock); 1867 if (first) { 1868 first = 0; 1869 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] " 1870 "sec=%llus\n", (unsigned long long)sector); 1871 } else if (discard) { 1872 /* we had none on the first iteration. 1873 * there must be none now. */ 1874 D_ASSERT(have_unacked == 0); 1875 } 1876 schedule(); 1877 spin_lock_irq(&mdev->req_lock); 1878 } 1879 finish_wait(&mdev->misc_wait, &wait); 1880 } 1881 1882 list_add(&e->w.list, &mdev->active_ee); 1883 spin_unlock_irq(&mdev->req_lock); 1884 1885 switch (mdev->net_conf->wire_protocol) { 1886 case DRBD_PROT_C: 1887 inc_unacked(mdev); 1888 /* corresponding dec_unacked() in e_end_block() 1889 * respective _drbd_clear_done_ee */ 1890 break; 1891 case DRBD_PROT_B: 1892 /* I really don't like it that the receiver thread 1893 * sends on the msock, but anyways */ 1894 drbd_send_ack(mdev, P_RECV_ACK, e); 1895 break; 1896 case DRBD_PROT_A: 1897 /* nothing to do */ 1898 break; 1899 } 1900 1901 if (mdev->state.pdsk == D_DISKLESS) { 1902 /* In case we have the only disk of the cluster, */ 1903 drbd_set_out_of_sync(mdev, e->sector, e->size); 1904 e->flags |= EE_CALL_AL_COMPLETE_IO; 1905 drbd_al_begin_io(mdev, e->sector); 1906 } 1907 1908 e->private_bio->bi_rw = rw; 1909 trace_drbd_ee(mdev, e, "submitting for (data)write"); 1910 trace_drbd_bio(mdev, "Sec", e->private_bio, 0, NULL); 1911 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio); 1912 /* accounting done in endio */ 1913 1914 maybe_kick_lo(mdev); 1915 return TRUE; 1916 1917 out_interrupted: 1918 /* yes, the epoch_size now is imbalanced. 1919 * but we drop the connection anyways, so we don't have a chance to 1920 * receive a barrier... atomic_inc(&mdev->epoch_size); */ 1921 put_ldev(mdev); 1922 drbd_free_ee(mdev, e); 1923 return FALSE; 1924 } 1925 1926 static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) 1927 { 1928 sector_t sector; 1929 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 1930 struct drbd_epoch_entry *e; 1931 struct digest_info *di = NULL; 1932 int size, digest_size; 1933 unsigned int fault_type; 1934 struct p_block_req *p = 1935 (struct p_block_req *)h; 1936 const int brps = sizeof(*p)-sizeof(*h); 1937 1938 if (drbd_recv(mdev, h->payload, brps) != brps) 1939 return FALSE; 1940 1941 sector = be64_to_cpu(p->sector); 1942 size = be32_to_cpu(p->blksize); 1943 1944 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) { 1945 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 1946 (unsigned long long)sector, size); 1947 return FALSE; 1948 } 1949 if (sector + (size>>9) > capacity) { 1950 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 1951 (unsigned long long)sector, size); 1952 return FALSE; 1953 } 1954 1955 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) { 1956 if (__ratelimit(&drbd_ratelimit_state)) 1957 dev_err(DEV, "Can not satisfy peer's read request, " 1958 "no local data.\n"); 1959 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY : 1960 P_NEG_RS_DREPLY , p); 1961 return TRUE; 1962 } 1963 1964 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 1965 * "criss-cross" setup, that might cause write-out on some other DRBD, 1966 * which in turn might block on the other node at this very place. */ 1967 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO); 1968 if (!e) { 1969 put_ldev(mdev); 1970 return FALSE; 1971 } 1972 1973 e->private_bio->bi_rw = READ; 1974 e->private_bio->bi_end_io = drbd_endio_read_sec; 1975 1976 switch (h->command) { 1977 case P_DATA_REQUEST: 1978 e->w.cb = w_e_end_data_req; 1979 fault_type = DRBD_FAULT_DT_RD; 1980 break; 1981 case P_RS_DATA_REQUEST: 1982 e->w.cb = w_e_end_rsdata_req; 1983 fault_type = DRBD_FAULT_RS_RD; 1984 /* Eventually this should become asynchronously. Currently it 1985 * blocks the whole receiver just to delay the reading of a 1986 * resync data block. 1987 * the drbd_work_queue mechanism is made for this... 1988 */ 1989 if (!drbd_rs_begin_io(mdev, sector)) { 1990 /* we have been interrupted, 1991 * probably connection lost! */ 1992 D_ASSERT(signal_pending(current)); 1993 goto out_free_e; 1994 } 1995 break; 1996 1997 case P_OV_REPLY: 1998 case P_CSUM_RS_REQUEST: 1999 fault_type = DRBD_FAULT_RS_RD; 2000 digest_size = h->length - brps ; 2001 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO); 2002 if (!di) 2003 goto out_free_e; 2004 2005 di->digest_size = digest_size; 2006 di->digest = (((char *)di)+sizeof(struct digest_info)); 2007 2008 if (drbd_recv(mdev, di->digest, digest_size) != digest_size) 2009 goto out_free_e; 2010 2011 e->block_id = (u64)(unsigned long)di; 2012 if (h->command == P_CSUM_RS_REQUEST) { 2013 D_ASSERT(mdev->agreed_pro_version >= 89); 2014 e->w.cb = w_e_end_csum_rs_req; 2015 } else if (h->command == P_OV_REPLY) { 2016 e->w.cb = w_e_end_ov_reply; 2017 dec_rs_pending(mdev); 2018 break; 2019 } 2020 2021 if (!drbd_rs_begin_io(mdev, sector)) { 2022 /* we have been interrupted, probably connection lost! */ 2023 D_ASSERT(signal_pending(current)); 2024 goto out_free_e; 2025 } 2026 break; 2027 2028 case P_OV_REQUEST: 2029 if (mdev->state.conn >= C_CONNECTED && 2030 mdev->state.conn != C_VERIFY_T) 2031 dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n", 2032 drbd_conn_str(mdev->state.conn)); 2033 if (mdev->ov_start_sector == ~(sector_t)0 && 2034 mdev->agreed_pro_version >= 90) { 2035 mdev->ov_start_sector = sector; 2036 mdev->ov_position = sector; 2037 mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector); 2038 dev_info(DEV, "Online Verify start sector: %llu\n", 2039 (unsigned long long)sector); 2040 } 2041 e->w.cb = w_e_end_ov_req; 2042 fault_type = DRBD_FAULT_RS_RD; 2043 /* Eventually this should become asynchronous. Currently it 2044 * blocks the whole receiver just to delay the reading of a 2045 * resync data block. 2046 * the drbd_work_queue mechanism is made for this... 2047 */ 2048 if (!drbd_rs_begin_io(mdev, sector)) { 2049 /* we have been interrupted, 2050 * probably connection lost! */ 2051 D_ASSERT(signal_pending(current)); 2052 goto out_free_e; 2053 } 2054 break; 2055 2056 2057 default: 2058 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n", 2059 cmdname(h->command)); 2060 fault_type = DRBD_FAULT_MAX; 2061 } 2062 2063 spin_lock_irq(&mdev->req_lock); 2064 list_add(&e->w.list, &mdev->read_ee); 2065 spin_unlock_irq(&mdev->req_lock); 2066 2067 inc_unacked(mdev); 2068 2069 trace_drbd_ee(mdev, e, "submitting for read"); 2070 trace_drbd_bio(mdev, "Sec", e->private_bio, 0, NULL); 2071 drbd_generic_make_request(mdev, fault_type, e->private_bio); 2072 maybe_kick_lo(mdev); 2073 2074 return TRUE; 2075 2076 out_free_e: 2077 kfree(di); 2078 put_ldev(mdev); 2079 drbd_free_ee(mdev, e); 2080 return FALSE; 2081 } 2082 2083 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local) 2084 { 2085 int self, peer, rv = -100; 2086 unsigned long ch_self, ch_peer; 2087 2088 self = mdev->ldev->md.uuid[UI_BITMAP] & 1; 2089 peer = mdev->p_uuid[UI_BITMAP] & 1; 2090 2091 ch_peer = mdev->p_uuid[UI_SIZE]; 2092 ch_self = mdev->comm_bm_set; 2093 2094 switch (mdev->net_conf->after_sb_0p) { 2095 case ASB_CONSENSUS: 2096 case ASB_DISCARD_SECONDARY: 2097 case ASB_CALL_HELPER: 2098 dev_err(DEV, "Configuration error.\n"); 2099 break; 2100 case ASB_DISCONNECT: 2101 break; 2102 case ASB_DISCARD_YOUNGER_PRI: 2103 if (self == 0 && peer == 1) { 2104 rv = -1; 2105 break; 2106 } 2107 if (self == 1 && peer == 0) { 2108 rv = 1; 2109 break; 2110 } 2111 /* Else fall through to one of the other strategies... */ 2112 case ASB_DISCARD_OLDER_PRI: 2113 if (self == 0 && peer == 1) { 2114 rv = 1; 2115 break; 2116 } 2117 if (self == 1 && peer == 0) { 2118 rv = -1; 2119 break; 2120 } 2121 /* Else fall through to one of the other strategies... */ 2122 dev_warn(DEV, "Discard younger/older primary did not found a decision\n" 2123 "Using discard-least-changes instead\n"); 2124 case ASB_DISCARD_ZERO_CHG: 2125 if (ch_peer == 0 && ch_self == 0) { 2126 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags) 2127 ? -1 : 1; 2128 break; 2129 } else { 2130 if (ch_peer == 0) { rv = 1; break; } 2131 if (ch_self == 0) { rv = -1; break; } 2132 } 2133 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG) 2134 break; 2135 case ASB_DISCARD_LEAST_CHG: 2136 if (ch_self < ch_peer) 2137 rv = -1; 2138 else if (ch_self > ch_peer) 2139 rv = 1; 2140 else /* ( ch_self == ch_peer ) */ 2141 /* Well, then use something else. */ 2142 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags) 2143 ? -1 : 1; 2144 break; 2145 case ASB_DISCARD_LOCAL: 2146 rv = -1; 2147 break; 2148 case ASB_DISCARD_REMOTE: 2149 rv = 1; 2150 } 2151 2152 return rv; 2153 } 2154 2155 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local) 2156 { 2157 int self, peer, hg, rv = -100; 2158 2159 self = mdev->ldev->md.uuid[UI_BITMAP] & 1; 2160 peer = mdev->p_uuid[UI_BITMAP] & 1; 2161 2162 switch (mdev->net_conf->after_sb_1p) { 2163 case ASB_DISCARD_YOUNGER_PRI: 2164 case ASB_DISCARD_OLDER_PRI: 2165 case ASB_DISCARD_LEAST_CHG: 2166 case ASB_DISCARD_LOCAL: 2167 case ASB_DISCARD_REMOTE: 2168 dev_err(DEV, "Configuration error.\n"); 2169 break; 2170 case ASB_DISCONNECT: 2171 break; 2172 case ASB_CONSENSUS: 2173 hg = drbd_asb_recover_0p(mdev); 2174 if (hg == -1 && mdev->state.role == R_SECONDARY) 2175 rv = hg; 2176 if (hg == 1 && mdev->state.role == R_PRIMARY) 2177 rv = hg; 2178 break; 2179 case ASB_VIOLENTLY: 2180 rv = drbd_asb_recover_0p(mdev); 2181 break; 2182 case ASB_DISCARD_SECONDARY: 2183 return mdev->state.role == R_PRIMARY ? 1 : -1; 2184 case ASB_CALL_HELPER: 2185 hg = drbd_asb_recover_0p(mdev); 2186 if (hg == -1 && mdev->state.role == R_PRIMARY) { 2187 self = drbd_set_role(mdev, R_SECONDARY, 0); 2188 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 2189 * we might be here in C_WF_REPORT_PARAMS which is transient. 2190 * we do not need to wait for the after state change work either. */ 2191 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY)); 2192 if (self != SS_SUCCESS) { 2193 drbd_khelper(mdev, "pri-lost-after-sb"); 2194 } else { 2195 dev_warn(DEV, "Successfully gave up primary role.\n"); 2196 rv = hg; 2197 } 2198 } else 2199 rv = hg; 2200 } 2201 2202 return rv; 2203 } 2204 2205 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local) 2206 { 2207 int self, peer, hg, rv = -100; 2208 2209 self = mdev->ldev->md.uuid[UI_BITMAP] & 1; 2210 peer = mdev->p_uuid[UI_BITMAP] & 1; 2211 2212 switch (mdev->net_conf->after_sb_2p) { 2213 case ASB_DISCARD_YOUNGER_PRI: 2214 case ASB_DISCARD_OLDER_PRI: 2215 case ASB_DISCARD_LEAST_CHG: 2216 case ASB_DISCARD_LOCAL: 2217 case ASB_DISCARD_REMOTE: 2218 case ASB_CONSENSUS: 2219 case ASB_DISCARD_SECONDARY: 2220 dev_err(DEV, "Configuration error.\n"); 2221 break; 2222 case ASB_VIOLENTLY: 2223 rv = drbd_asb_recover_0p(mdev); 2224 break; 2225 case ASB_DISCONNECT: 2226 break; 2227 case ASB_CALL_HELPER: 2228 hg = drbd_asb_recover_0p(mdev); 2229 if (hg == -1) { 2230 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 2231 * we might be here in C_WF_REPORT_PARAMS which is transient. 2232 * we do not need to wait for the after state change work either. */ 2233 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY)); 2234 if (self != SS_SUCCESS) { 2235 drbd_khelper(mdev, "pri-lost-after-sb"); 2236 } else { 2237 dev_warn(DEV, "Successfully gave up primary role.\n"); 2238 rv = hg; 2239 } 2240 } else 2241 rv = hg; 2242 } 2243 2244 return rv; 2245 } 2246 2247 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid, 2248 u64 bits, u64 flags) 2249 { 2250 if (!uuid) { 2251 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text); 2252 return; 2253 } 2254 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n", 2255 text, 2256 (unsigned long long)uuid[UI_CURRENT], 2257 (unsigned long long)uuid[UI_BITMAP], 2258 (unsigned long long)uuid[UI_HISTORY_START], 2259 (unsigned long long)uuid[UI_HISTORY_END], 2260 (unsigned long long)bits, 2261 (unsigned long long)flags); 2262 } 2263 2264 /* 2265 100 after split brain try auto recover 2266 2 C_SYNC_SOURCE set BitMap 2267 1 C_SYNC_SOURCE use BitMap 2268 0 no Sync 2269 -1 C_SYNC_TARGET use BitMap 2270 -2 C_SYNC_TARGET set BitMap 2271 -100 after split brain, disconnect 2272 -1000 unrelated data 2273 */ 2274 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local) 2275 { 2276 u64 self, peer; 2277 int i, j; 2278 2279 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 2280 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1); 2281 2282 *rule_nr = 10; 2283 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED) 2284 return 0; 2285 2286 *rule_nr = 20; 2287 if ((self == UUID_JUST_CREATED || self == (u64)0) && 2288 peer != UUID_JUST_CREATED) 2289 return -2; 2290 2291 *rule_nr = 30; 2292 if (self != UUID_JUST_CREATED && 2293 (peer == UUID_JUST_CREATED || peer == (u64)0)) 2294 return 2; 2295 2296 if (self == peer) { 2297 int rct, dc; /* roles at crash time */ 2298 2299 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) { 2300 2301 if (mdev->agreed_pro_version < 91) 2302 return -1001; 2303 2304 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) && 2305 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) { 2306 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n"); 2307 drbd_uuid_set_bm(mdev, 0UL); 2308 2309 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, 2310 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0); 2311 *rule_nr = 34; 2312 } else { 2313 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n"); 2314 *rule_nr = 36; 2315 } 2316 2317 return 1; 2318 } 2319 2320 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) { 2321 2322 if (mdev->agreed_pro_version < 91) 2323 return -1001; 2324 2325 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) && 2326 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) { 2327 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n"); 2328 2329 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START]; 2330 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP]; 2331 mdev->p_uuid[UI_BITMAP] = 0UL; 2332 2333 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]); 2334 *rule_nr = 35; 2335 } else { 2336 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n"); 2337 *rule_nr = 37; 2338 } 2339 2340 return -1; 2341 } 2342 2343 /* Common power [off|failure] */ 2344 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) + 2345 (mdev->p_uuid[UI_FLAGS] & 2); 2346 /* lowest bit is set when we were primary, 2347 * next bit (weight 2) is set when peer was primary */ 2348 *rule_nr = 40; 2349 2350 switch (rct) { 2351 case 0: /* !self_pri && !peer_pri */ return 0; 2352 case 1: /* self_pri && !peer_pri */ return 1; 2353 case 2: /* !self_pri && peer_pri */ return -1; 2354 case 3: /* self_pri && peer_pri */ 2355 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags); 2356 return dc ? -1 : 1; 2357 } 2358 } 2359 2360 *rule_nr = 50; 2361 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1); 2362 if (self == peer) 2363 return -1; 2364 2365 *rule_nr = 51; 2366 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1); 2367 if (self == peer) { 2368 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1); 2369 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1); 2370 if (self == peer) { 2371 /* The last P_SYNC_UUID did not get though. Undo the last start of 2372 resync as sync source modifications of the peer's UUIDs. */ 2373 2374 if (mdev->agreed_pro_version < 91) 2375 return -1001; 2376 2377 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START]; 2378 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1]; 2379 return -1; 2380 } 2381 } 2382 2383 *rule_nr = 60; 2384 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 2385 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 2386 peer = mdev->p_uuid[i] & ~((u64)1); 2387 if (self == peer) 2388 return -2; 2389 } 2390 2391 *rule_nr = 70; 2392 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 2393 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1); 2394 if (self == peer) 2395 return 1; 2396 2397 *rule_nr = 71; 2398 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1); 2399 if (self == peer) { 2400 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1); 2401 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1); 2402 if (self == peer) { 2403 /* The last P_SYNC_UUID did not get though. Undo the last start of 2404 resync as sync source modifications of our UUIDs. */ 2405 2406 if (mdev->agreed_pro_version < 91) 2407 return -1001; 2408 2409 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]); 2410 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]); 2411 2412 dev_info(DEV, "Undid last start of resync:\n"); 2413 2414 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, 2415 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0); 2416 2417 return 1; 2418 } 2419 } 2420 2421 2422 *rule_nr = 80; 2423 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 2424 self = mdev->ldev->md.uuid[i] & ~((u64)1); 2425 if (self == peer) 2426 return 2; 2427 } 2428 2429 *rule_nr = 90; 2430 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 2431 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1); 2432 if (self == peer && self != ((u64)0)) 2433 return 100; 2434 2435 *rule_nr = 100; 2436 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 2437 self = mdev->ldev->md.uuid[i] & ~((u64)1); 2438 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) { 2439 peer = mdev->p_uuid[j] & ~((u64)1); 2440 if (self == peer) 2441 return -100; 2442 } 2443 } 2444 2445 return -1000; 2446 } 2447 2448 /* drbd_sync_handshake() returns the new conn state on success, or 2449 CONN_MASK (-1) on failure. 2450 */ 2451 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role, 2452 enum drbd_disk_state peer_disk) __must_hold(local) 2453 { 2454 int hg, rule_nr; 2455 enum drbd_conns rv = C_MASK; 2456 enum drbd_disk_state mydisk; 2457 2458 mydisk = mdev->state.disk; 2459 if (mydisk == D_NEGOTIATING) 2460 mydisk = mdev->new_state_tmp.disk; 2461 2462 dev_info(DEV, "drbd_sync_handshake:\n"); 2463 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0); 2464 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, 2465 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]); 2466 2467 hg = drbd_uuid_compare(mdev, &rule_nr); 2468 2469 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr); 2470 2471 if (hg == -1000) { 2472 dev_alert(DEV, "Unrelated data, aborting!\n"); 2473 return C_MASK; 2474 } 2475 if (hg == -1001) { 2476 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n"); 2477 return C_MASK; 2478 } 2479 2480 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) || 2481 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) { 2482 int f = (hg == -100) || abs(hg) == 2; 2483 hg = mydisk > D_INCONSISTENT ? 1 : -1; 2484 if (f) 2485 hg = hg*2; 2486 dev_info(DEV, "Becoming sync %s due to disk states.\n", 2487 hg > 0 ? "source" : "target"); 2488 } 2489 2490 if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) { 2491 int pcount = (mdev->state.role == R_PRIMARY) 2492 + (peer_role == R_PRIMARY); 2493 int forced = (hg == -100); 2494 2495 switch (pcount) { 2496 case 0: 2497 hg = drbd_asb_recover_0p(mdev); 2498 break; 2499 case 1: 2500 hg = drbd_asb_recover_1p(mdev); 2501 break; 2502 case 2: 2503 hg = drbd_asb_recover_2p(mdev); 2504 break; 2505 } 2506 if (abs(hg) < 100) { 2507 dev_warn(DEV, "Split-Brain detected, %d primaries, " 2508 "automatically solved. Sync from %s node\n", 2509 pcount, (hg < 0) ? "peer" : "this"); 2510 if (forced) { 2511 dev_warn(DEV, "Doing a full sync, since" 2512 " UUIDs where ambiguous.\n"); 2513 hg = hg*2; 2514 } 2515 } 2516 } 2517 2518 if (hg == -100) { 2519 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1)) 2520 hg = -1; 2521 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1)) 2522 hg = 1; 2523 2524 if (abs(hg) < 100) 2525 dev_warn(DEV, "Split-Brain detected, manually solved. " 2526 "Sync from %s node\n", 2527 (hg < 0) ? "peer" : "this"); 2528 } 2529 2530 if (hg == -100) { 2531 dev_alert(DEV, "Split-Brain detected, dropping connection!\n"); 2532 drbd_khelper(mdev, "split-brain"); 2533 return C_MASK; 2534 } 2535 2536 if (hg > 0 && mydisk <= D_INCONSISTENT) { 2537 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n"); 2538 return C_MASK; 2539 } 2540 2541 if (hg < 0 && /* by intention we do not use mydisk here. */ 2542 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) { 2543 switch (mdev->net_conf->rr_conflict) { 2544 case ASB_CALL_HELPER: 2545 drbd_khelper(mdev, "pri-lost"); 2546 /* fall through */ 2547 case ASB_DISCONNECT: 2548 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n"); 2549 return C_MASK; 2550 case ASB_VIOLENTLY: 2551 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data" 2552 "assumption\n"); 2553 } 2554 } 2555 2556 if (abs(hg) >= 2) { 2557 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n"); 2558 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake")) 2559 return C_MASK; 2560 } 2561 2562 if (hg > 0) { /* become sync source. */ 2563 rv = C_WF_BITMAP_S; 2564 } else if (hg < 0) { /* become sync target */ 2565 rv = C_WF_BITMAP_T; 2566 } else { 2567 rv = C_CONNECTED; 2568 if (drbd_bm_total_weight(mdev)) { 2569 dev_info(DEV, "No resync, but %lu bits in bitmap!\n", 2570 drbd_bm_total_weight(mdev)); 2571 } 2572 } 2573 2574 return rv; 2575 } 2576 2577 /* returns 1 if invalid */ 2578 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self) 2579 { 2580 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */ 2581 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) || 2582 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL)) 2583 return 0; 2584 2585 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */ 2586 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL || 2587 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL) 2588 return 1; 2589 2590 /* everything else is valid if they are equal on both sides. */ 2591 if (peer == self) 2592 return 0; 2593 2594 /* everything es is invalid. */ 2595 return 1; 2596 } 2597 2598 static int receive_protocol(struct drbd_conf *mdev, struct p_header *h) 2599 { 2600 struct p_protocol *p = (struct p_protocol *)h; 2601 int header_size, data_size; 2602 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p; 2603 int p_want_lose, p_two_primaries; 2604 char p_integrity_alg[SHARED_SECRET_MAX] = ""; 2605 2606 header_size = sizeof(*p) - sizeof(*h); 2607 data_size = h->length - header_size; 2608 2609 if (drbd_recv(mdev, h->payload, header_size) != header_size) 2610 return FALSE; 2611 2612 p_proto = be32_to_cpu(p->protocol); 2613 p_after_sb_0p = be32_to_cpu(p->after_sb_0p); 2614 p_after_sb_1p = be32_to_cpu(p->after_sb_1p); 2615 p_after_sb_2p = be32_to_cpu(p->after_sb_2p); 2616 p_want_lose = be32_to_cpu(p->want_lose); 2617 p_two_primaries = be32_to_cpu(p->two_primaries); 2618 2619 if (p_proto != mdev->net_conf->wire_protocol) { 2620 dev_err(DEV, "incompatible communication protocols\n"); 2621 goto disconnect; 2622 } 2623 2624 if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) { 2625 dev_err(DEV, "incompatible after-sb-0pri settings\n"); 2626 goto disconnect; 2627 } 2628 2629 if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) { 2630 dev_err(DEV, "incompatible after-sb-1pri settings\n"); 2631 goto disconnect; 2632 } 2633 2634 if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) { 2635 dev_err(DEV, "incompatible after-sb-2pri settings\n"); 2636 goto disconnect; 2637 } 2638 2639 if (p_want_lose && mdev->net_conf->want_lose) { 2640 dev_err(DEV, "both sides have the 'want_lose' flag set\n"); 2641 goto disconnect; 2642 } 2643 2644 if (p_two_primaries != mdev->net_conf->two_primaries) { 2645 dev_err(DEV, "incompatible setting of the two-primaries options\n"); 2646 goto disconnect; 2647 } 2648 2649 if (mdev->agreed_pro_version >= 87) { 2650 unsigned char *my_alg = mdev->net_conf->integrity_alg; 2651 2652 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size) 2653 return FALSE; 2654 2655 p_integrity_alg[SHARED_SECRET_MAX-1] = 0; 2656 if (strcmp(p_integrity_alg, my_alg)) { 2657 dev_err(DEV, "incompatible setting of the data-integrity-alg\n"); 2658 goto disconnect; 2659 } 2660 dev_info(DEV, "data-integrity-alg: %s\n", 2661 my_alg[0] ? my_alg : (unsigned char *)"<not-used>"); 2662 } 2663 2664 return TRUE; 2665 2666 disconnect: 2667 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 2668 return FALSE; 2669 } 2670 2671 /* helper function 2672 * input: alg name, feature name 2673 * return: NULL (alg name was "") 2674 * ERR_PTR(error) if something goes wrong 2675 * or the crypto hash ptr, if it worked out ok. */ 2676 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev, 2677 const char *alg, const char *name) 2678 { 2679 struct crypto_hash *tfm; 2680 2681 if (!alg[0]) 2682 return NULL; 2683 2684 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC); 2685 if (IS_ERR(tfm)) { 2686 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n", 2687 alg, name, PTR_ERR(tfm)); 2688 return tfm; 2689 } 2690 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) { 2691 crypto_free_hash(tfm); 2692 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name); 2693 return ERR_PTR(-EINVAL); 2694 } 2695 return tfm; 2696 } 2697 2698 static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h) 2699 { 2700 int ok = TRUE; 2701 struct p_rs_param_89 *p = (struct p_rs_param_89 *)h; 2702 unsigned int header_size, data_size, exp_max_sz; 2703 struct crypto_hash *verify_tfm = NULL; 2704 struct crypto_hash *csums_tfm = NULL; 2705 const int apv = mdev->agreed_pro_version; 2706 2707 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param) 2708 : apv == 88 ? sizeof(struct p_rs_param) 2709 + SHARED_SECRET_MAX 2710 : /* 89 */ sizeof(struct p_rs_param_89); 2711 2712 if (h->length > exp_max_sz) { 2713 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n", 2714 h->length, exp_max_sz); 2715 return FALSE; 2716 } 2717 2718 if (apv <= 88) { 2719 header_size = sizeof(struct p_rs_param) - sizeof(*h); 2720 data_size = h->length - header_size; 2721 } else /* apv >= 89 */ { 2722 header_size = sizeof(struct p_rs_param_89) - sizeof(*h); 2723 data_size = h->length - header_size; 2724 D_ASSERT(data_size == 0); 2725 } 2726 2727 /* initialize verify_alg and csums_alg */ 2728 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX); 2729 2730 if (drbd_recv(mdev, h->payload, header_size) != header_size) 2731 return FALSE; 2732 2733 mdev->sync_conf.rate = be32_to_cpu(p->rate); 2734 2735 if (apv >= 88) { 2736 if (apv == 88) { 2737 if (data_size > SHARED_SECRET_MAX) { 2738 dev_err(DEV, "verify-alg too long, " 2739 "peer wants %u, accepting only %u byte\n", 2740 data_size, SHARED_SECRET_MAX); 2741 return FALSE; 2742 } 2743 2744 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size) 2745 return FALSE; 2746 2747 /* we expect NUL terminated string */ 2748 /* but just in case someone tries to be evil */ 2749 D_ASSERT(p->verify_alg[data_size-1] == 0); 2750 p->verify_alg[data_size-1] = 0; 2751 2752 } else /* apv >= 89 */ { 2753 /* we still expect NUL terminated strings */ 2754 /* but just in case someone tries to be evil */ 2755 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0); 2756 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0); 2757 p->verify_alg[SHARED_SECRET_MAX-1] = 0; 2758 p->csums_alg[SHARED_SECRET_MAX-1] = 0; 2759 } 2760 2761 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) { 2762 if (mdev->state.conn == C_WF_REPORT_PARAMS) { 2763 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n", 2764 mdev->sync_conf.verify_alg, p->verify_alg); 2765 goto disconnect; 2766 } 2767 verify_tfm = drbd_crypto_alloc_digest_safe(mdev, 2768 p->verify_alg, "verify-alg"); 2769 if (IS_ERR(verify_tfm)) { 2770 verify_tfm = NULL; 2771 goto disconnect; 2772 } 2773 } 2774 2775 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) { 2776 if (mdev->state.conn == C_WF_REPORT_PARAMS) { 2777 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n", 2778 mdev->sync_conf.csums_alg, p->csums_alg); 2779 goto disconnect; 2780 } 2781 csums_tfm = drbd_crypto_alloc_digest_safe(mdev, 2782 p->csums_alg, "csums-alg"); 2783 if (IS_ERR(csums_tfm)) { 2784 csums_tfm = NULL; 2785 goto disconnect; 2786 } 2787 } 2788 2789 2790 spin_lock(&mdev->peer_seq_lock); 2791 /* lock against drbd_nl_syncer_conf() */ 2792 if (verify_tfm) { 2793 strcpy(mdev->sync_conf.verify_alg, p->verify_alg); 2794 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1; 2795 crypto_free_hash(mdev->verify_tfm); 2796 mdev->verify_tfm = verify_tfm; 2797 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg); 2798 } 2799 if (csums_tfm) { 2800 strcpy(mdev->sync_conf.csums_alg, p->csums_alg); 2801 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1; 2802 crypto_free_hash(mdev->csums_tfm); 2803 mdev->csums_tfm = csums_tfm; 2804 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg); 2805 } 2806 spin_unlock(&mdev->peer_seq_lock); 2807 } 2808 2809 return ok; 2810 disconnect: 2811 /* just for completeness: actually not needed, 2812 * as this is not reached if csums_tfm was ok. */ 2813 crypto_free_hash(csums_tfm); 2814 /* but free the verify_tfm again, if csums_tfm did not work out */ 2815 crypto_free_hash(verify_tfm); 2816 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 2817 return FALSE; 2818 } 2819 2820 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer) 2821 { 2822 /* sorry, we currently have no working implementation 2823 * of distributed TCQ */ 2824 } 2825 2826 /* warn if the arguments differ by more than 12.5% */ 2827 static void warn_if_differ_considerably(struct drbd_conf *mdev, 2828 const char *s, sector_t a, sector_t b) 2829 { 2830 sector_t d; 2831 if (a == 0 || b == 0) 2832 return; 2833 d = (a > b) ? (a - b) : (b - a); 2834 if (d > (a>>3) || d > (b>>3)) 2835 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s, 2836 (unsigned long long)a, (unsigned long long)b); 2837 } 2838 2839 static int receive_sizes(struct drbd_conf *mdev, struct p_header *h) 2840 { 2841 struct p_sizes *p = (struct p_sizes *)h; 2842 enum determine_dev_size dd = unchanged; 2843 unsigned int max_seg_s; 2844 sector_t p_size, p_usize, my_usize; 2845 int ldsc = 0; /* local disk size changed */ 2846 enum drbd_conns nconn; 2847 2848 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; 2849 if (drbd_recv(mdev, h->payload, h->length) != h->length) 2850 return FALSE; 2851 2852 p_size = be64_to_cpu(p->d_size); 2853 p_usize = be64_to_cpu(p->u_size); 2854 2855 if (p_size == 0 && mdev->state.disk == D_DISKLESS) { 2856 dev_err(DEV, "some backing storage is needed\n"); 2857 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 2858 return FALSE; 2859 } 2860 2861 /* just store the peer's disk size for now. 2862 * we still need to figure out whether we accept that. */ 2863 mdev->p_size = p_size; 2864 2865 #define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r)) 2866 if (get_ldev(mdev)) { 2867 warn_if_differ_considerably(mdev, "lower level device sizes", 2868 p_size, drbd_get_max_capacity(mdev->ldev)); 2869 warn_if_differ_considerably(mdev, "user requested size", 2870 p_usize, mdev->ldev->dc.disk_size); 2871 2872 /* if this is the first connect, or an otherwise expected 2873 * param exchange, choose the minimum */ 2874 if (mdev->state.conn == C_WF_REPORT_PARAMS) 2875 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size, 2876 p_usize); 2877 2878 my_usize = mdev->ldev->dc.disk_size; 2879 2880 if (mdev->ldev->dc.disk_size != p_usize) { 2881 mdev->ldev->dc.disk_size = p_usize; 2882 dev_info(DEV, "Peer sets u_size to %lu sectors\n", 2883 (unsigned long)mdev->ldev->dc.disk_size); 2884 } 2885 2886 /* Never shrink a device with usable data during connect. 2887 But allow online shrinking if we are connected. */ 2888 if (drbd_new_dev_size(mdev, mdev->ldev) < 2889 drbd_get_capacity(mdev->this_bdev) && 2890 mdev->state.disk >= D_OUTDATED && 2891 mdev->state.conn < C_CONNECTED) { 2892 dev_err(DEV, "The peer's disk size is too small!\n"); 2893 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 2894 mdev->ldev->dc.disk_size = my_usize; 2895 put_ldev(mdev); 2896 return FALSE; 2897 } 2898 put_ldev(mdev); 2899 } 2900 #undef min_not_zero 2901 2902 if (get_ldev(mdev)) { 2903 dd = drbd_determin_dev_size(mdev); 2904 put_ldev(mdev); 2905 if (dd == dev_size_error) 2906 return FALSE; 2907 drbd_md_sync(mdev); 2908 } else { 2909 /* I am diskless, need to accept the peer's size. */ 2910 drbd_set_my_capacity(mdev, p_size); 2911 } 2912 2913 if (mdev->p_uuid && mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) { 2914 nconn = drbd_sync_handshake(mdev, 2915 mdev->state.peer, mdev->state.pdsk); 2916 put_ldev(mdev); 2917 2918 if (nconn == C_MASK) { 2919 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 2920 return FALSE; 2921 } 2922 2923 if (drbd_request_state(mdev, NS(conn, nconn)) < SS_SUCCESS) { 2924 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 2925 return FALSE; 2926 } 2927 } 2928 2929 if (get_ldev(mdev)) { 2930 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) { 2931 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev); 2932 ldsc = 1; 2933 } 2934 2935 max_seg_s = be32_to_cpu(p->max_segment_size); 2936 if (max_seg_s != queue_max_segment_size(mdev->rq_queue)) 2937 drbd_setup_queue_param(mdev, max_seg_s); 2938 2939 drbd_setup_order_type(mdev, be32_to_cpu(p->queue_order_type)); 2940 put_ldev(mdev); 2941 } 2942 2943 if (mdev->state.conn > C_WF_REPORT_PARAMS) { 2944 if (be64_to_cpu(p->c_size) != 2945 drbd_get_capacity(mdev->this_bdev) || ldsc) { 2946 /* we have different sizes, probably peer 2947 * needs to know my new size... */ 2948 drbd_send_sizes(mdev, 0); 2949 } 2950 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) || 2951 (dd == grew && mdev->state.conn == C_CONNECTED)) { 2952 if (mdev->state.pdsk >= D_INCONSISTENT && 2953 mdev->state.disk >= D_INCONSISTENT) 2954 resync_after_online_grow(mdev); 2955 else 2956 set_bit(RESYNC_AFTER_NEG, &mdev->flags); 2957 } 2958 } 2959 2960 return TRUE; 2961 } 2962 2963 static int receive_uuids(struct drbd_conf *mdev, struct p_header *h) 2964 { 2965 struct p_uuids *p = (struct p_uuids *)h; 2966 u64 *p_uuid; 2967 int i; 2968 2969 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; 2970 if (drbd_recv(mdev, h->payload, h->length) != h->length) 2971 return FALSE; 2972 2973 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO); 2974 2975 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++) 2976 p_uuid[i] = be64_to_cpu(p->uuid[i]); 2977 2978 kfree(mdev->p_uuid); 2979 mdev->p_uuid = p_uuid; 2980 2981 if (mdev->state.conn < C_CONNECTED && 2982 mdev->state.disk < D_INCONSISTENT && 2983 mdev->state.role == R_PRIMARY && 2984 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) { 2985 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n", 2986 (unsigned long long)mdev->ed_uuid); 2987 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 2988 return FALSE; 2989 } 2990 2991 if (get_ldev(mdev)) { 2992 int skip_initial_sync = 2993 mdev->state.conn == C_CONNECTED && 2994 mdev->agreed_pro_version >= 90 && 2995 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED && 2996 (p_uuid[UI_FLAGS] & 8); 2997 if (skip_initial_sync) { 2998 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n"); 2999 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write, 3000 "clear_n_write from receive_uuids"); 3001 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]); 3002 _drbd_uuid_set(mdev, UI_BITMAP, 0); 3003 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE), 3004 CS_VERBOSE, NULL); 3005 drbd_md_sync(mdev); 3006 } 3007 put_ldev(mdev); 3008 } 3009 3010 /* Before we test for the disk state, we should wait until an eventually 3011 ongoing cluster wide state change is finished. That is important if 3012 we are primary and are detaching from our disk. We need to see the 3013 new disk state... */ 3014 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags)); 3015 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT) 3016 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]); 3017 3018 return TRUE; 3019 } 3020 3021 /** 3022 * convert_state() - Converts the peer's view of the cluster state to our point of view 3023 * @ps: The state as seen by the peer. 3024 */ 3025 static union drbd_state convert_state(union drbd_state ps) 3026 { 3027 union drbd_state ms; 3028 3029 static enum drbd_conns c_tab[] = { 3030 [C_CONNECTED] = C_CONNECTED, 3031 3032 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T, 3033 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S, 3034 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */ 3035 [C_VERIFY_S] = C_VERIFY_T, 3036 [C_MASK] = C_MASK, 3037 }; 3038 3039 ms.i = ps.i; 3040 3041 ms.conn = c_tab[ps.conn]; 3042 ms.peer = ps.role; 3043 ms.role = ps.peer; 3044 ms.pdsk = ps.disk; 3045 ms.disk = ps.pdsk; 3046 ms.peer_isp = (ps.aftr_isp | ps.user_isp); 3047 3048 return ms; 3049 } 3050 3051 static int receive_req_state(struct drbd_conf *mdev, struct p_header *h) 3052 { 3053 struct p_req_state *p = (struct p_req_state *)h; 3054 union drbd_state mask, val; 3055 int rv; 3056 3057 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; 3058 if (drbd_recv(mdev, h->payload, h->length) != h->length) 3059 return FALSE; 3060 3061 mask.i = be32_to_cpu(p->mask); 3062 val.i = be32_to_cpu(p->val); 3063 3064 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) && 3065 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) { 3066 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG); 3067 return TRUE; 3068 } 3069 3070 mask = convert_state(mask); 3071 val = convert_state(val); 3072 3073 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val); 3074 3075 drbd_send_sr_reply(mdev, rv); 3076 drbd_md_sync(mdev); 3077 3078 return TRUE; 3079 } 3080 3081 static int receive_state(struct drbd_conf *mdev, struct p_header *h) 3082 { 3083 struct p_state *p = (struct p_state *)h; 3084 enum drbd_conns nconn, oconn; 3085 union drbd_state ns, peer_state; 3086 enum drbd_disk_state real_peer_disk; 3087 int rv; 3088 3089 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) 3090 return FALSE; 3091 3092 if (drbd_recv(mdev, h->payload, h->length) != h->length) 3093 return FALSE; 3094 3095 peer_state.i = be32_to_cpu(p->state); 3096 3097 real_peer_disk = peer_state.disk; 3098 if (peer_state.disk == D_NEGOTIATING) { 3099 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT; 3100 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk)); 3101 } 3102 3103 spin_lock_irq(&mdev->req_lock); 3104 retry: 3105 oconn = nconn = mdev->state.conn; 3106 spin_unlock_irq(&mdev->req_lock); 3107 3108 if (nconn == C_WF_REPORT_PARAMS) 3109 nconn = C_CONNECTED; 3110 3111 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING && 3112 get_ldev_if_state(mdev, D_NEGOTIATING)) { 3113 int cr; /* consider resync */ 3114 3115 /* if we established a new connection */ 3116 cr = (oconn < C_CONNECTED); 3117 /* if we had an established connection 3118 * and one of the nodes newly attaches a disk */ 3119 cr |= (oconn == C_CONNECTED && 3120 (peer_state.disk == D_NEGOTIATING || 3121 mdev->state.disk == D_NEGOTIATING)); 3122 /* if we have both been inconsistent, and the peer has been 3123 * forced to be UpToDate with --overwrite-data */ 3124 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags); 3125 /* if we had been plain connected, and the admin requested to 3126 * start a sync by "invalidate" or "invalidate-remote" */ 3127 cr |= (oconn == C_CONNECTED && 3128 (peer_state.conn >= C_STARTING_SYNC_S && 3129 peer_state.conn <= C_WF_BITMAP_T)); 3130 3131 if (cr) 3132 nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk); 3133 3134 put_ldev(mdev); 3135 if (nconn == C_MASK) { 3136 if (mdev->state.disk == D_NEGOTIATING) { 3137 drbd_force_state(mdev, NS(disk, D_DISKLESS)); 3138 nconn = C_CONNECTED; 3139 } else if (peer_state.disk == D_NEGOTIATING) { 3140 dev_err(DEV, "Disk attach process on the peer node was aborted.\n"); 3141 peer_state.disk = D_DISKLESS; 3142 } else { 3143 D_ASSERT(oconn == C_WF_REPORT_PARAMS); 3144 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 3145 return FALSE; 3146 } 3147 } 3148 } 3149 3150 spin_lock_irq(&mdev->req_lock); 3151 if (mdev->state.conn != oconn) 3152 goto retry; 3153 clear_bit(CONSIDER_RESYNC, &mdev->flags); 3154 ns.i = mdev->state.i; 3155 ns.conn = nconn; 3156 ns.peer = peer_state.role; 3157 ns.pdsk = real_peer_disk; 3158 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp); 3159 if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING) 3160 ns.disk = mdev->new_state_tmp.disk; 3161 3162 rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL); 3163 ns = mdev->state; 3164 spin_unlock_irq(&mdev->req_lock); 3165 3166 if (rv < SS_SUCCESS) { 3167 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 3168 return FALSE; 3169 } 3170 3171 if (oconn > C_WF_REPORT_PARAMS) { 3172 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED && 3173 peer_state.disk != D_NEGOTIATING ) { 3174 /* we want resync, peer has not yet decided to sync... */ 3175 /* Nowadays only used when forcing a node into primary role and 3176 setting its disk to UpToDate with that */ 3177 drbd_send_uuids(mdev); 3178 drbd_send_state(mdev); 3179 } 3180 } 3181 3182 mdev->net_conf->want_lose = 0; 3183 3184 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */ 3185 3186 return TRUE; 3187 } 3188 3189 static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h) 3190 { 3191 struct p_rs_uuid *p = (struct p_rs_uuid *)h; 3192 3193 wait_event(mdev->misc_wait, 3194 mdev->state.conn == C_WF_SYNC_UUID || 3195 mdev->state.conn < C_CONNECTED || 3196 mdev->state.disk < D_NEGOTIATING); 3197 3198 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */ 3199 3200 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; 3201 if (drbd_recv(mdev, h->payload, h->length) != h->length) 3202 return FALSE; 3203 3204 /* Here the _drbd_uuid_ functions are right, current should 3205 _not_ be rotated into the history */ 3206 if (get_ldev_if_state(mdev, D_NEGOTIATING)) { 3207 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid)); 3208 _drbd_uuid_set(mdev, UI_BITMAP, 0UL); 3209 3210 drbd_start_resync(mdev, C_SYNC_TARGET); 3211 3212 put_ldev(mdev); 3213 } else 3214 dev_err(DEV, "Ignoring SyncUUID packet!\n"); 3215 3216 return TRUE; 3217 } 3218 3219 enum receive_bitmap_ret { OK, DONE, FAILED }; 3220 3221 static enum receive_bitmap_ret 3222 receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h, 3223 unsigned long *buffer, struct bm_xfer_ctx *c) 3224 { 3225 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset); 3226 unsigned want = num_words * sizeof(long); 3227 3228 if (want != h->length) { 3229 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length); 3230 return FAILED; 3231 } 3232 if (want == 0) 3233 return DONE; 3234 if (drbd_recv(mdev, buffer, want) != want) 3235 return FAILED; 3236 3237 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer); 3238 3239 c->word_offset += num_words; 3240 c->bit_offset = c->word_offset * BITS_PER_LONG; 3241 if (c->bit_offset > c->bm_bits) 3242 c->bit_offset = c->bm_bits; 3243 3244 return OK; 3245 } 3246 3247 static enum receive_bitmap_ret 3248 recv_bm_rle_bits(struct drbd_conf *mdev, 3249 struct p_compressed_bm *p, 3250 struct bm_xfer_ctx *c) 3251 { 3252 struct bitstream bs; 3253 u64 look_ahead; 3254 u64 rl; 3255 u64 tmp; 3256 unsigned long s = c->bit_offset; 3257 unsigned long e; 3258 int len = p->head.length - (sizeof(*p) - sizeof(p->head)); 3259 int toggle = DCBP_get_start(p); 3260 int have; 3261 int bits; 3262 3263 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p)); 3264 3265 bits = bitstream_get_bits(&bs, &look_ahead, 64); 3266 if (bits < 0) 3267 return FAILED; 3268 3269 for (have = bits; have > 0; s += rl, toggle = !toggle) { 3270 bits = vli_decode_bits(&rl, look_ahead); 3271 if (bits <= 0) 3272 return FAILED; 3273 3274 if (toggle) { 3275 e = s + rl -1; 3276 if (e >= c->bm_bits) { 3277 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e); 3278 return FAILED; 3279 } 3280 _drbd_bm_set_bits(mdev, s, e); 3281 } 3282 3283 if (have < bits) { 3284 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n", 3285 have, bits, look_ahead, 3286 (unsigned int)(bs.cur.b - p->code), 3287 (unsigned int)bs.buf_len); 3288 return FAILED; 3289 } 3290 look_ahead >>= bits; 3291 have -= bits; 3292 3293 bits = bitstream_get_bits(&bs, &tmp, 64 - have); 3294 if (bits < 0) 3295 return FAILED; 3296 look_ahead |= tmp << have; 3297 have += bits; 3298 } 3299 3300 c->bit_offset = s; 3301 bm_xfer_ctx_bit_to_word_offset(c); 3302 3303 return (s == c->bm_bits) ? DONE : OK; 3304 } 3305 3306 static enum receive_bitmap_ret 3307 decode_bitmap_c(struct drbd_conf *mdev, 3308 struct p_compressed_bm *p, 3309 struct bm_xfer_ctx *c) 3310 { 3311 if (DCBP_get_code(p) == RLE_VLI_Bits) 3312 return recv_bm_rle_bits(mdev, p, c); 3313 3314 /* other variants had been implemented for evaluation, 3315 * but have been dropped as this one turned out to be "best" 3316 * during all our tests. */ 3317 3318 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding); 3319 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR)); 3320 return FAILED; 3321 } 3322 3323 void INFO_bm_xfer_stats(struct drbd_conf *mdev, 3324 const char *direction, struct bm_xfer_ctx *c) 3325 { 3326 /* what would it take to transfer it "plaintext" */ 3327 unsigned plain = sizeof(struct p_header) * 3328 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1) 3329 + c->bm_words * sizeof(long); 3330 unsigned total = c->bytes[0] + c->bytes[1]; 3331 unsigned r; 3332 3333 /* total can not be zero. but just in case: */ 3334 if (total == 0) 3335 return; 3336 3337 /* don't report if not compressed */ 3338 if (total >= plain) 3339 return; 3340 3341 /* total < plain. check for overflow, still */ 3342 r = (total > UINT_MAX/1000) ? (total / (plain/1000)) 3343 : (1000 * total / plain); 3344 3345 if (r > 1000) 3346 r = 1000; 3347 3348 r = 1000 - r; 3349 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), " 3350 "total %u; compression: %u.%u%%\n", 3351 direction, 3352 c->bytes[1], c->packets[1], 3353 c->bytes[0], c->packets[0], 3354 total, r/10, r % 10); 3355 } 3356 3357 /* Since we are processing the bitfield from lower addresses to higher, 3358 it does not matter if the process it in 32 bit chunks or 64 bit 3359 chunks as long as it is little endian. (Understand it as byte stream, 3360 beginning with the lowest byte...) If we would use big endian 3361 we would need to process it from the highest address to the lowest, 3362 in order to be agnostic to the 32 vs 64 bits issue. 3363 3364 returns 0 on failure, 1 if we successfully received it. */ 3365 static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h) 3366 { 3367 struct bm_xfer_ctx c; 3368 void *buffer; 3369 enum receive_bitmap_ret ret; 3370 int ok = FALSE; 3371 3372 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt)); 3373 3374 drbd_bm_lock(mdev, "receive bitmap"); 3375 3376 /* maybe we should use some per thread scratch page, 3377 * and allocate that during initial device creation? */ 3378 buffer = (unsigned long *) __get_free_page(GFP_NOIO); 3379 if (!buffer) { 3380 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__); 3381 goto out; 3382 } 3383 3384 c = (struct bm_xfer_ctx) { 3385 .bm_bits = drbd_bm_bits(mdev), 3386 .bm_words = drbd_bm_words(mdev), 3387 }; 3388 3389 do { 3390 if (h->command == P_BITMAP) { 3391 ret = receive_bitmap_plain(mdev, h, buffer, &c); 3392 } else if (h->command == P_COMPRESSED_BITMAP) { 3393 /* MAYBE: sanity check that we speak proto >= 90, 3394 * and the feature is enabled! */ 3395 struct p_compressed_bm *p; 3396 3397 if (h->length > BM_PACKET_PAYLOAD_BYTES) { 3398 dev_err(DEV, "ReportCBitmap packet too large\n"); 3399 goto out; 3400 } 3401 /* use the page buff */ 3402 p = buffer; 3403 memcpy(p, h, sizeof(*h)); 3404 if (drbd_recv(mdev, p->head.payload, h->length) != h->length) 3405 goto out; 3406 if (p->head.length <= (sizeof(*p) - sizeof(p->head))) { 3407 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length); 3408 return FAILED; 3409 } 3410 ret = decode_bitmap_c(mdev, p, &c); 3411 } else { 3412 dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command); 3413 goto out; 3414 } 3415 3416 c.packets[h->command == P_BITMAP]++; 3417 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length; 3418 3419 if (ret != OK) 3420 break; 3421 3422 if (!drbd_recv_header(mdev, h)) 3423 goto out; 3424 } while (ret == OK); 3425 if (ret == FAILED) 3426 goto out; 3427 3428 INFO_bm_xfer_stats(mdev, "receive", &c); 3429 3430 if (mdev->state.conn == C_WF_BITMAP_T) { 3431 ok = !drbd_send_bitmap(mdev); 3432 if (!ok) 3433 goto out; 3434 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */ 3435 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE); 3436 D_ASSERT(ok == SS_SUCCESS); 3437 } else if (mdev->state.conn != C_WF_BITMAP_S) { 3438 /* admin may have requested C_DISCONNECTING, 3439 * other threads may have noticed network errors */ 3440 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n", 3441 drbd_conn_str(mdev->state.conn)); 3442 } 3443 3444 ok = TRUE; 3445 out: 3446 drbd_bm_unlock(mdev); 3447 if (ok && mdev->state.conn == C_WF_BITMAP_S) 3448 drbd_start_resync(mdev, C_SYNC_SOURCE); 3449 free_page((unsigned long) buffer); 3450 return ok; 3451 } 3452 3453 static int receive_skip(struct drbd_conf *mdev, struct p_header *h) 3454 { 3455 /* TODO zero copy sink :) */ 3456 static char sink[128]; 3457 int size, want, r; 3458 3459 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n", 3460 h->command, h->length); 3461 3462 size = h->length; 3463 while (size > 0) { 3464 want = min_t(int, size, sizeof(sink)); 3465 r = drbd_recv(mdev, sink, want); 3466 ERR_IF(r <= 0) break; 3467 size -= r; 3468 } 3469 return size == 0; 3470 } 3471 3472 static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h) 3473 { 3474 if (mdev->state.disk >= D_INCONSISTENT) 3475 drbd_kick_lo(mdev); 3476 3477 /* Make sure we've acked all the TCP data associated 3478 * with the data requests being unplugged */ 3479 drbd_tcp_quickack(mdev->data.socket); 3480 3481 return TRUE; 3482 } 3483 3484 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *); 3485 3486 static drbd_cmd_handler_f drbd_default_handler[] = { 3487 [P_DATA] = receive_Data, 3488 [P_DATA_REPLY] = receive_DataReply, 3489 [P_RS_DATA_REPLY] = receive_RSDataReply, 3490 [P_BARRIER] = receive_Barrier, 3491 [P_BITMAP] = receive_bitmap, 3492 [P_COMPRESSED_BITMAP] = receive_bitmap, 3493 [P_UNPLUG_REMOTE] = receive_UnplugRemote, 3494 [P_DATA_REQUEST] = receive_DataRequest, 3495 [P_RS_DATA_REQUEST] = receive_DataRequest, 3496 [P_SYNC_PARAM] = receive_SyncParam, 3497 [P_SYNC_PARAM89] = receive_SyncParam, 3498 [P_PROTOCOL] = receive_protocol, 3499 [P_UUIDS] = receive_uuids, 3500 [P_SIZES] = receive_sizes, 3501 [P_STATE] = receive_state, 3502 [P_STATE_CHG_REQ] = receive_req_state, 3503 [P_SYNC_UUID] = receive_sync_uuid, 3504 [P_OV_REQUEST] = receive_DataRequest, 3505 [P_OV_REPLY] = receive_DataRequest, 3506 [P_CSUM_RS_REQUEST] = receive_DataRequest, 3507 /* anything missing from this table is in 3508 * the asender_tbl, see get_asender_cmd */ 3509 [P_MAX_CMD] = NULL, 3510 }; 3511 3512 static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler; 3513 static drbd_cmd_handler_f *drbd_opt_cmd_handler; 3514 3515 static void drbdd(struct drbd_conf *mdev) 3516 { 3517 drbd_cmd_handler_f handler; 3518 struct p_header *header = &mdev->data.rbuf.header; 3519 3520 while (get_t_state(&mdev->receiver) == Running) { 3521 drbd_thread_current_set_cpu(mdev); 3522 if (!drbd_recv_header(mdev, header)) 3523 break; 3524 3525 if (header->command < P_MAX_CMD) 3526 handler = drbd_cmd_handler[header->command]; 3527 else if (P_MAY_IGNORE < header->command 3528 && header->command < P_MAX_OPT_CMD) 3529 handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE]; 3530 else if (header->command > P_MAX_OPT_CMD) 3531 handler = receive_skip; 3532 else 3533 handler = NULL; 3534 3535 if (unlikely(!handler)) { 3536 dev_err(DEV, "unknown packet type %d, l: %d!\n", 3537 header->command, header->length); 3538 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR)); 3539 break; 3540 } 3541 if (unlikely(!handler(mdev, header))) { 3542 dev_err(DEV, "error receiving %s, l: %d!\n", 3543 cmdname(header->command), header->length); 3544 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR)); 3545 break; 3546 } 3547 3548 trace_drbd_packet(mdev, mdev->data.socket, 2, &mdev->data.rbuf, 3549 __FILE__, __LINE__); 3550 } 3551 } 3552 3553 static void drbd_fail_pending_reads(struct drbd_conf *mdev) 3554 { 3555 struct hlist_head *slot; 3556 struct hlist_node *pos; 3557 struct hlist_node *tmp; 3558 struct drbd_request *req; 3559 int i; 3560 3561 /* 3562 * Application READ requests 3563 */ 3564 spin_lock_irq(&mdev->req_lock); 3565 for (i = 0; i < APP_R_HSIZE; i++) { 3566 slot = mdev->app_reads_hash+i; 3567 hlist_for_each_entry_safe(req, pos, tmp, slot, colision) { 3568 /* it may (but should not any longer!) 3569 * be on the work queue; if that assert triggers, 3570 * we need to also grab the 3571 * spin_lock_irq(&mdev->data.work.q_lock); 3572 * and list_del_init here. */ 3573 D_ASSERT(list_empty(&req->w.list)); 3574 /* It would be nice to complete outside of spinlock. 3575 * But this is easier for now. */ 3576 _req_mod(req, connection_lost_while_pending); 3577 } 3578 } 3579 for (i = 0; i < APP_R_HSIZE; i++) 3580 if (!hlist_empty(mdev->app_reads_hash+i)) 3581 dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: " 3582 "%p, should be NULL\n", i, mdev->app_reads_hash[i].first); 3583 3584 memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *)); 3585 spin_unlock_irq(&mdev->req_lock); 3586 } 3587 3588 void drbd_flush_workqueue(struct drbd_conf *mdev) 3589 { 3590 struct drbd_wq_barrier barr; 3591 3592 barr.w.cb = w_prev_work_done; 3593 init_completion(&barr.done); 3594 drbd_queue_work(&mdev->data.work, &barr.w); 3595 wait_for_completion(&barr.done); 3596 } 3597 3598 static void drbd_disconnect(struct drbd_conf *mdev) 3599 { 3600 enum drbd_fencing_p fp; 3601 union drbd_state os, ns; 3602 int rv = SS_UNKNOWN_ERROR; 3603 unsigned int i; 3604 3605 if (mdev->state.conn == C_STANDALONE) 3606 return; 3607 if (mdev->state.conn >= C_WF_CONNECTION) 3608 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n", 3609 drbd_conn_str(mdev->state.conn)); 3610 3611 /* asender does not clean up anything. it must not interfere, either */ 3612 drbd_thread_stop(&mdev->asender); 3613 3614 mutex_lock(&mdev->data.mutex); 3615 drbd_free_sock(mdev); 3616 mutex_unlock(&mdev->data.mutex); 3617 3618 spin_lock_irq(&mdev->req_lock); 3619 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee); 3620 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee); 3621 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee); 3622 spin_unlock_irq(&mdev->req_lock); 3623 3624 /* We do not have data structures that would allow us to 3625 * get the rs_pending_cnt down to 0 again. 3626 * * On C_SYNC_TARGET we do not have any data structures describing 3627 * the pending RSDataRequest's we have sent. 3628 * * On C_SYNC_SOURCE there is no data structure that tracks 3629 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget. 3630 * And no, it is not the sum of the reference counts in the 3631 * resync_LRU. The resync_LRU tracks the whole operation including 3632 * the disk-IO, while the rs_pending_cnt only tracks the blocks 3633 * on the fly. */ 3634 drbd_rs_cancel_all(mdev); 3635 mdev->rs_total = 0; 3636 mdev->rs_failed = 0; 3637 atomic_set(&mdev->rs_pending_cnt, 0); 3638 wake_up(&mdev->misc_wait); 3639 3640 /* make sure syncer is stopped and w_resume_next_sg queued */ 3641 del_timer_sync(&mdev->resync_timer); 3642 set_bit(STOP_SYNC_TIMER, &mdev->flags); 3643 resync_timer_fn((unsigned long)mdev); 3644 3645 /* so we can be sure that all remote or resync reads 3646 * made it at least to net_ee */ 3647 wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt)); 3648 3649 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier, 3650 * w_make_resync_request etc. which may still be on the worker queue 3651 * to be "canceled" */ 3652 drbd_flush_workqueue(mdev); 3653 3654 /* This also does reclaim_net_ee(). If we do this too early, we might 3655 * miss some resync ee and pages.*/ 3656 drbd_process_done_ee(mdev); 3657 3658 kfree(mdev->p_uuid); 3659 mdev->p_uuid = NULL; 3660 3661 if (!mdev->state.susp) 3662 tl_clear(mdev); 3663 3664 drbd_fail_pending_reads(mdev); 3665 3666 dev_info(DEV, "Connection closed\n"); 3667 3668 drbd_md_sync(mdev); 3669 3670 fp = FP_DONT_CARE; 3671 if (get_ldev(mdev)) { 3672 fp = mdev->ldev->dc.fencing; 3673 put_ldev(mdev); 3674 } 3675 3676 if (mdev->state.role == R_PRIMARY) { 3677 if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) { 3678 enum drbd_disk_state nps = drbd_try_outdate_peer(mdev); 3679 drbd_request_state(mdev, NS(pdsk, nps)); 3680 } 3681 } 3682 3683 spin_lock_irq(&mdev->req_lock); 3684 os = mdev->state; 3685 if (os.conn >= C_UNCONNECTED) { 3686 /* Do not restart in case we are C_DISCONNECTING */ 3687 ns = os; 3688 ns.conn = C_UNCONNECTED; 3689 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 3690 } 3691 spin_unlock_irq(&mdev->req_lock); 3692 3693 if (os.conn == C_DISCONNECTING) { 3694 struct hlist_head *h; 3695 wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0); 3696 3697 /* we must not free the tl_hash 3698 * while application io is still on the fly */ 3699 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0); 3700 3701 spin_lock_irq(&mdev->req_lock); 3702 /* paranoia code */ 3703 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++) 3704 if (h->first) 3705 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n", 3706 (int)(h - mdev->ee_hash), h->first); 3707 kfree(mdev->ee_hash); 3708 mdev->ee_hash = NULL; 3709 mdev->ee_hash_s = 0; 3710 3711 /* paranoia code */ 3712 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++) 3713 if (h->first) 3714 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n", 3715 (int)(h - mdev->tl_hash), h->first); 3716 kfree(mdev->tl_hash); 3717 mdev->tl_hash = NULL; 3718 mdev->tl_hash_s = 0; 3719 spin_unlock_irq(&mdev->req_lock); 3720 3721 crypto_free_hash(mdev->cram_hmac_tfm); 3722 mdev->cram_hmac_tfm = NULL; 3723 3724 kfree(mdev->net_conf); 3725 mdev->net_conf = NULL; 3726 drbd_request_state(mdev, NS(conn, C_STANDALONE)); 3727 } 3728 3729 /* tcp_close and release of sendpage pages can be deferred. I don't 3730 * want to use SO_LINGER, because apparently it can be deferred for 3731 * more than 20 seconds (longest time I checked). 3732 * 3733 * Actually we don't care for exactly when the network stack does its 3734 * put_page(), but release our reference on these pages right here. 3735 */ 3736 i = drbd_release_ee(mdev, &mdev->net_ee); 3737 if (i) 3738 dev_info(DEV, "net_ee not empty, killed %u entries\n", i); 3739 i = atomic_read(&mdev->pp_in_use); 3740 if (i) 3741 dev_info(DEV, "pp_in_use = %u, expected 0\n", i); 3742 3743 D_ASSERT(list_empty(&mdev->read_ee)); 3744 D_ASSERT(list_empty(&mdev->active_ee)); 3745 D_ASSERT(list_empty(&mdev->sync_ee)); 3746 D_ASSERT(list_empty(&mdev->done_ee)); 3747 3748 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */ 3749 atomic_set(&mdev->current_epoch->epoch_size, 0); 3750 D_ASSERT(list_empty(&mdev->current_epoch->list)); 3751 } 3752 3753 /* 3754 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version 3755 * we can agree on is stored in agreed_pro_version. 3756 * 3757 * feature flags and the reserved array should be enough room for future 3758 * enhancements of the handshake protocol, and possible plugins... 3759 * 3760 * for now, they are expected to be zero, but ignored. 3761 */ 3762 static int drbd_send_handshake(struct drbd_conf *mdev) 3763 { 3764 /* ASSERT current == mdev->receiver ... */ 3765 struct p_handshake *p = &mdev->data.sbuf.handshake; 3766 int ok; 3767 3768 if (mutex_lock_interruptible(&mdev->data.mutex)) { 3769 dev_err(DEV, "interrupted during initial handshake\n"); 3770 return 0; /* interrupted. not ok. */ 3771 } 3772 3773 if (mdev->data.socket == NULL) { 3774 mutex_unlock(&mdev->data.mutex); 3775 return 0; 3776 } 3777 3778 memset(p, 0, sizeof(*p)); 3779 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN); 3780 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX); 3781 ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE, 3782 (struct p_header *)p, sizeof(*p), 0 ); 3783 mutex_unlock(&mdev->data.mutex); 3784 return ok; 3785 } 3786 3787 /* 3788 * return values: 3789 * 1 yes, we have a valid connection 3790 * 0 oops, did not work out, please try again 3791 * -1 peer talks different language, 3792 * no point in trying again, please go standalone. 3793 */ 3794 static int drbd_do_handshake(struct drbd_conf *mdev) 3795 { 3796 /* ASSERT current == mdev->receiver ... */ 3797 struct p_handshake *p = &mdev->data.rbuf.handshake; 3798 const int expect = sizeof(struct p_handshake) 3799 -sizeof(struct p_header); 3800 int rv; 3801 3802 rv = drbd_send_handshake(mdev); 3803 if (!rv) 3804 return 0; 3805 3806 rv = drbd_recv_header(mdev, &p->head); 3807 if (!rv) 3808 return 0; 3809 3810 if (p->head.command != P_HAND_SHAKE) { 3811 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n", 3812 cmdname(p->head.command), p->head.command); 3813 return -1; 3814 } 3815 3816 if (p->head.length != expect) { 3817 dev_err(DEV, "expected HandShake length: %u, received: %u\n", 3818 expect, p->head.length); 3819 return -1; 3820 } 3821 3822 rv = drbd_recv(mdev, &p->head.payload, expect); 3823 3824 if (rv != expect) { 3825 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv); 3826 return 0; 3827 } 3828 3829 trace_drbd_packet(mdev, mdev->data.socket, 2, &mdev->data.rbuf, 3830 __FILE__, __LINE__); 3831 3832 p->protocol_min = be32_to_cpu(p->protocol_min); 3833 p->protocol_max = be32_to_cpu(p->protocol_max); 3834 if (p->protocol_max == 0) 3835 p->protocol_max = p->protocol_min; 3836 3837 if (PRO_VERSION_MAX < p->protocol_min || 3838 PRO_VERSION_MIN > p->protocol_max) 3839 goto incompat; 3840 3841 mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max); 3842 3843 dev_info(DEV, "Handshake successful: " 3844 "Agreed network protocol version %d\n", mdev->agreed_pro_version); 3845 3846 return 1; 3847 3848 incompat: 3849 dev_err(DEV, "incompatible DRBD dialects: " 3850 "I support %d-%d, peer supports %d-%d\n", 3851 PRO_VERSION_MIN, PRO_VERSION_MAX, 3852 p->protocol_min, p->protocol_max); 3853 return -1; 3854 } 3855 3856 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE) 3857 static int drbd_do_auth(struct drbd_conf *mdev) 3858 { 3859 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n"); 3860 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n"); 3861 return 0; 3862 } 3863 #else 3864 #define CHALLENGE_LEN 64 3865 static int drbd_do_auth(struct drbd_conf *mdev) 3866 { 3867 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */ 3868 struct scatterlist sg; 3869 char *response = NULL; 3870 char *right_response = NULL; 3871 char *peers_ch = NULL; 3872 struct p_header p; 3873 unsigned int key_len = strlen(mdev->net_conf->shared_secret); 3874 unsigned int resp_size; 3875 struct hash_desc desc; 3876 int rv; 3877 3878 desc.tfm = mdev->cram_hmac_tfm; 3879 desc.flags = 0; 3880 3881 rv = crypto_hash_setkey(mdev->cram_hmac_tfm, 3882 (u8 *)mdev->net_conf->shared_secret, key_len); 3883 if (rv) { 3884 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv); 3885 rv = 0; 3886 goto fail; 3887 } 3888 3889 get_random_bytes(my_challenge, CHALLENGE_LEN); 3890 3891 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN); 3892 if (!rv) 3893 goto fail; 3894 3895 rv = drbd_recv_header(mdev, &p); 3896 if (!rv) 3897 goto fail; 3898 3899 if (p.command != P_AUTH_CHALLENGE) { 3900 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n", 3901 cmdname(p.command), p.command); 3902 rv = 0; 3903 goto fail; 3904 } 3905 3906 if (p.length > CHALLENGE_LEN*2) { 3907 dev_err(DEV, "expected AuthChallenge payload too big.\n"); 3908 rv = 0; 3909 goto fail; 3910 } 3911 3912 peers_ch = kmalloc(p.length, GFP_NOIO); 3913 if (peers_ch == NULL) { 3914 dev_err(DEV, "kmalloc of peers_ch failed\n"); 3915 rv = 0; 3916 goto fail; 3917 } 3918 3919 rv = drbd_recv(mdev, peers_ch, p.length); 3920 3921 if (rv != p.length) { 3922 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv); 3923 rv = 0; 3924 goto fail; 3925 } 3926 3927 resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm); 3928 response = kmalloc(resp_size, GFP_NOIO); 3929 if (response == NULL) { 3930 dev_err(DEV, "kmalloc of response failed\n"); 3931 rv = 0; 3932 goto fail; 3933 } 3934 3935 sg_init_table(&sg, 1); 3936 sg_set_buf(&sg, peers_ch, p.length); 3937 3938 rv = crypto_hash_digest(&desc, &sg, sg.length, response); 3939 if (rv) { 3940 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv); 3941 rv = 0; 3942 goto fail; 3943 } 3944 3945 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size); 3946 if (!rv) 3947 goto fail; 3948 3949 rv = drbd_recv_header(mdev, &p); 3950 if (!rv) 3951 goto fail; 3952 3953 if (p.command != P_AUTH_RESPONSE) { 3954 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n", 3955 cmdname(p.command), p.command); 3956 rv = 0; 3957 goto fail; 3958 } 3959 3960 if (p.length != resp_size) { 3961 dev_err(DEV, "expected AuthResponse payload of wrong size\n"); 3962 rv = 0; 3963 goto fail; 3964 } 3965 3966 rv = drbd_recv(mdev, response , resp_size); 3967 3968 if (rv != resp_size) { 3969 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv); 3970 rv = 0; 3971 goto fail; 3972 } 3973 3974 right_response = kmalloc(resp_size, GFP_NOIO); 3975 if (response == NULL) { 3976 dev_err(DEV, "kmalloc of right_response failed\n"); 3977 rv = 0; 3978 goto fail; 3979 } 3980 3981 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN); 3982 3983 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response); 3984 if (rv) { 3985 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv); 3986 rv = 0; 3987 goto fail; 3988 } 3989 3990 rv = !memcmp(response, right_response, resp_size); 3991 3992 if (rv) 3993 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n", 3994 resp_size, mdev->net_conf->cram_hmac_alg); 3995 3996 fail: 3997 kfree(peers_ch); 3998 kfree(response); 3999 kfree(right_response); 4000 4001 return rv; 4002 } 4003 #endif 4004 4005 int drbdd_init(struct drbd_thread *thi) 4006 { 4007 struct drbd_conf *mdev = thi->mdev; 4008 unsigned int minor = mdev_to_minor(mdev); 4009 int h; 4010 4011 sprintf(current->comm, "drbd%d_receiver", minor); 4012 4013 dev_info(DEV, "receiver (re)started\n"); 4014 4015 do { 4016 h = drbd_connect(mdev); 4017 if (h == 0) { 4018 drbd_disconnect(mdev); 4019 __set_current_state(TASK_INTERRUPTIBLE); 4020 schedule_timeout(HZ); 4021 } 4022 if (h == -1) { 4023 dev_warn(DEV, "Discarding network configuration.\n"); 4024 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 4025 } 4026 } while (h == 0); 4027 4028 if (h > 0) { 4029 if (get_net_conf(mdev)) { 4030 drbdd(mdev); 4031 put_net_conf(mdev); 4032 } 4033 } 4034 4035 drbd_disconnect(mdev); 4036 4037 dev_info(DEV, "receiver terminated\n"); 4038 return 0; 4039 } 4040 4041 /* ********* acknowledge sender ******** */ 4042 4043 static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h) 4044 { 4045 struct p_req_state_reply *p = (struct p_req_state_reply *)h; 4046 4047 int retcode = be32_to_cpu(p->retcode); 4048 4049 if (retcode >= SS_SUCCESS) { 4050 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags); 4051 } else { 4052 set_bit(CL_ST_CHG_FAIL, &mdev->flags); 4053 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n", 4054 drbd_set_st_err_str(retcode), retcode); 4055 } 4056 wake_up(&mdev->state_wait); 4057 4058 return TRUE; 4059 } 4060 4061 static int got_Ping(struct drbd_conf *mdev, struct p_header *h) 4062 { 4063 return drbd_send_ping_ack(mdev); 4064 4065 } 4066 4067 static int got_PingAck(struct drbd_conf *mdev, struct p_header *h) 4068 { 4069 /* restore idle timeout */ 4070 mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ; 4071 4072 return TRUE; 4073 } 4074 4075 static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h) 4076 { 4077 struct p_block_ack *p = (struct p_block_ack *)h; 4078 sector_t sector = be64_to_cpu(p->sector); 4079 int blksize = be32_to_cpu(p->blksize); 4080 4081 D_ASSERT(mdev->agreed_pro_version >= 89); 4082 4083 update_peer_seq(mdev, be32_to_cpu(p->seq_num)); 4084 4085 drbd_rs_complete_io(mdev, sector); 4086 drbd_set_in_sync(mdev, sector, blksize); 4087 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */ 4088 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT); 4089 dec_rs_pending(mdev); 4090 4091 return TRUE; 4092 } 4093 4094 /* when we receive the ACK for a write request, 4095 * verify that we actually know about it */ 4096 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev, 4097 u64 id, sector_t sector) 4098 { 4099 struct hlist_head *slot = tl_hash_slot(mdev, sector); 4100 struct hlist_node *n; 4101 struct drbd_request *req; 4102 4103 hlist_for_each_entry(req, n, slot, colision) { 4104 if ((unsigned long)req == (unsigned long)id) { 4105 if (req->sector != sector) { 4106 dev_err(DEV, "_ack_id_to_req: found req %p but it has " 4107 "wrong sector (%llus versus %llus)\n", req, 4108 (unsigned long long)req->sector, 4109 (unsigned long long)sector); 4110 break; 4111 } 4112 return req; 4113 } 4114 } 4115 dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n", 4116 (void *)(unsigned long)id, (unsigned long long)sector); 4117 return NULL; 4118 } 4119 4120 typedef struct drbd_request *(req_validator_fn) 4121 (struct drbd_conf *mdev, u64 id, sector_t sector); 4122 4123 static int validate_req_change_req_state(struct drbd_conf *mdev, 4124 u64 id, sector_t sector, req_validator_fn validator, 4125 const char *func, enum drbd_req_event what) 4126 { 4127 struct drbd_request *req; 4128 struct bio_and_error m; 4129 4130 spin_lock_irq(&mdev->req_lock); 4131 req = validator(mdev, id, sector); 4132 if (unlikely(!req)) { 4133 spin_unlock_irq(&mdev->req_lock); 4134 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func); 4135 return FALSE; 4136 } 4137 __req_mod(req, what, &m); 4138 spin_unlock_irq(&mdev->req_lock); 4139 4140 if (m.bio) 4141 complete_master_bio(mdev, &m); 4142 return TRUE; 4143 } 4144 4145 static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h) 4146 { 4147 struct p_block_ack *p = (struct p_block_ack *)h; 4148 sector_t sector = be64_to_cpu(p->sector); 4149 int blksize = be32_to_cpu(p->blksize); 4150 enum drbd_req_event what; 4151 4152 update_peer_seq(mdev, be32_to_cpu(p->seq_num)); 4153 4154 if (is_syncer_block_id(p->block_id)) { 4155 drbd_set_in_sync(mdev, sector, blksize); 4156 dec_rs_pending(mdev); 4157 return TRUE; 4158 } 4159 switch (be16_to_cpu(h->command)) { 4160 case P_RS_WRITE_ACK: 4161 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C); 4162 what = write_acked_by_peer_and_sis; 4163 break; 4164 case P_WRITE_ACK: 4165 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C); 4166 what = write_acked_by_peer; 4167 break; 4168 case P_RECV_ACK: 4169 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B); 4170 what = recv_acked_by_peer; 4171 break; 4172 case P_DISCARD_ACK: 4173 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C); 4174 what = conflict_discarded_by_peer; 4175 break; 4176 default: 4177 D_ASSERT(0); 4178 return FALSE; 4179 } 4180 4181 return validate_req_change_req_state(mdev, p->block_id, sector, 4182 _ack_id_to_req, __func__ , what); 4183 } 4184 4185 static int got_NegAck(struct drbd_conf *mdev, struct p_header *h) 4186 { 4187 struct p_block_ack *p = (struct p_block_ack *)h; 4188 sector_t sector = be64_to_cpu(p->sector); 4189 4190 if (__ratelimit(&drbd_ratelimit_state)) 4191 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n"); 4192 4193 update_peer_seq(mdev, be32_to_cpu(p->seq_num)); 4194 4195 if (is_syncer_block_id(p->block_id)) { 4196 int size = be32_to_cpu(p->blksize); 4197 dec_rs_pending(mdev); 4198 drbd_rs_failed_io(mdev, sector, size); 4199 return TRUE; 4200 } 4201 return validate_req_change_req_state(mdev, p->block_id, sector, 4202 _ack_id_to_req, __func__ , neg_acked); 4203 } 4204 4205 static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h) 4206 { 4207 struct p_block_ack *p = (struct p_block_ack *)h; 4208 sector_t sector = be64_to_cpu(p->sector); 4209 4210 update_peer_seq(mdev, be32_to_cpu(p->seq_num)); 4211 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n", 4212 (unsigned long long)sector, be32_to_cpu(p->blksize)); 4213 4214 return validate_req_change_req_state(mdev, p->block_id, sector, 4215 _ar_id_to_req, __func__ , neg_acked); 4216 } 4217 4218 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h) 4219 { 4220 sector_t sector; 4221 int size; 4222 struct p_block_ack *p = (struct p_block_ack *)h; 4223 4224 sector = be64_to_cpu(p->sector); 4225 size = be32_to_cpu(p->blksize); 4226 D_ASSERT(p->block_id == ID_SYNCER); 4227 4228 update_peer_seq(mdev, be32_to_cpu(p->seq_num)); 4229 4230 dec_rs_pending(mdev); 4231 4232 if (get_ldev_if_state(mdev, D_FAILED)) { 4233 drbd_rs_complete_io(mdev, sector); 4234 drbd_rs_failed_io(mdev, sector, size); 4235 put_ldev(mdev); 4236 } 4237 4238 return TRUE; 4239 } 4240 4241 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h) 4242 { 4243 struct p_barrier_ack *p = (struct p_barrier_ack *)h; 4244 4245 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size)); 4246 4247 return TRUE; 4248 } 4249 4250 static int got_OVResult(struct drbd_conf *mdev, struct p_header *h) 4251 { 4252 struct p_block_ack *p = (struct p_block_ack *)h; 4253 struct drbd_work *w; 4254 sector_t sector; 4255 int size; 4256 4257 sector = be64_to_cpu(p->sector); 4258 size = be32_to_cpu(p->blksize); 4259 4260 update_peer_seq(mdev, be32_to_cpu(p->seq_num)); 4261 4262 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC) 4263 drbd_ov_oos_found(mdev, sector, size); 4264 else 4265 ov_oos_print(mdev); 4266 4267 drbd_rs_complete_io(mdev, sector); 4268 dec_rs_pending(mdev); 4269 4270 if (--mdev->ov_left == 0) { 4271 w = kmalloc(sizeof(*w), GFP_NOIO); 4272 if (w) { 4273 w->cb = w_ov_finished; 4274 drbd_queue_work_front(&mdev->data.work, w); 4275 } else { 4276 dev_err(DEV, "kmalloc(w) failed."); 4277 ov_oos_print(mdev); 4278 drbd_resync_finished(mdev); 4279 } 4280 } 4281 return TRUE; 4282 } 4283 4284 struct asender_cmd { 4285 size_t pkt_size; 4286 int (*process)(struct drbd_conf *mdev, struct p_header *h); 4287 }; 4288 4289 static struct asender_cmd *get_asender_cmd(int cmd) 4290 { 4291 static struct asender_cmd asender_tbl[] = { 4292 /* anything missing from this table is in 4293 * the drbd_cmd_handler (drbd_default_handler) table, 4294 * see the beginning of drbdd() */ 4295 [P_PING] = { sizeof(struct p_header), got_Ping }, 4296 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck }, 4297 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 4298 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 4299 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 4300 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 4301 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck }, 4302 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply }, 4303 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply}, 4304 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult }, 4305 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck }, 4306 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply }, 4307 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync }, 4308 [P_MAX_CMD] = { 0, NULL }, 4309 }; 4310 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL) 4311 return NULL; 4312 return &asender_tbl[cmd]; 4313 } 4314 4315 int drbd_asender(struct drbd_thread *thi) 4316 { 4317 struct drbd_conf *mdev = thi->mdev; 4318 struct p_header *h = &mdev->meta.rbuf.header; 4319 struct asender_cmd *cmd = NULL; 4320 4321 int rv, len; 4322 void *buf = h; 4323 int received = 0; 4324 int expect = sizeof(struct p_header); 4325 int empty; 4326 4327 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev)); 4328 4329 current->policy = SCHED_RR; /* Make this a realtime task! */ 4330 current->rt_priority = 2; /* more important than all other tasks */ 4331 4332 while (get_t_state(thi) == Running) { 4333 drbd_thread_current_set_cpu(mdev); 4334 if (test_and_clear_bit(SEND_PING, &mdev->flags)) { 4335 ERR_IF(!drbd_send_ping(mdev)) goto reconnect; 4336 mdev->meta.socket->sk->sk_rcvtimeo = 4337 mdev->net_conf->ping_timeo*HZ/10; 4338 } 4339 4340 /* conditionally cork; 4341 * it may hurt latency if we cork without much to send */ 4342 if (!mdev->net_conf->no_cork && 4343 3 < atomic_read(&mdev->unacked_cnt)) 4344 drbd_tcp_cork(mdev->meta.socket); 4345 while (1) { 4346 clear_bit(SIGNAL_ASENDER, &mdev->flags); 4347 flush_signals(current); 4348 if (!drbd_process_done_ee(mdev)) { 4349 dev_err(DEV, "process_done_ee() = NOT_OK\n"); 4350 goto reconnect; 4351 } 4352 /* to avoid race with newly queued ACKs */ 4353 set_bit(SIGNAL_ASENDER, &mdev->flags); 4354 spin_lock_irq(&mdev->req_lock); 4355 empty = list_empty(&mdev->done_ee); 4356 spin_unlock_irq(&mdev->req_lock); 4357 /* new ack may have been queued right here, 4358 * but then there is also a signal pending, 4359 * and we start over... */ 4360 if (empty) 4361 break; 4362 } 4363 /* but unconditionally uncork unless disabled */ 4364 if (!mdev->net_conf->no_cork) 4365 drbd_tcp_uncork(mdev->meta.socket); 4366 4367 /* short circuit, recv_msg would return EINTR anyways. */ 4368 if (signal_pending(current)) 4369 continue; 4370 4371 rv = drbd_recv_short(mdev, mdev->meta.socket, 4372 buf, expect-received, 0); 4373 clear_bit(SIGNAL_ASENDER, &mdev->flags); 4374 4375 flush_signals(current); 4376 4377 /* Note: 4378 * -EINTR (on meta) we got a signal 4379 * -EAGAIN (on meta) rcvtimeo expired 4380 * -ECONNRESET other side closed the connection 4381 * -ERESTARTSYS (on data) we got a signal 4382 * rv < 0 other than above: unexpected error! 4383 * rv == expected: full header or command 4384 * rv < expected: "woken" by signal during receive 4385 * rv == 0 : "connection shut down by peer" 4386 */ 4387 if (likely(rv > 0)) { 4388 received += rv; 4389 buf += rv; 4390 } else if (rv == 0) { 4391 dev_err(DEV, "meta connection shut down by peer.\n"); 4392 goto reconnect; 4393 } else if (rv == -EAGAIN) { 4394 if (mdev->meta.socket->sk->sk_rcvtimeo == 4395 mdev->net_conf->ping_timeo*HZ/10) { 4396 dev_err(DEV, "PingAck did not arrive in time.\n"); 4397 goto reconnect; 4398 } 4399 set_bit(SEND_PING, &mdev->flags); 4400 continue; 4401 } else if (rv == -EINTR) { 4402 continue; 4403 } else { 4404 dev_err(DEV, "sock_recvmsg returned %d\n", rv); 4405 goto reconnect; 4406 } 4407 4408 if (received == expect && cmd == NULL) { 4409 if (unlikely(h->magic != BE_DRBD_MAGIC)) { 4410 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n", 4411 (long)be32_to_cpu(h->magic), 4412 h->command, h->length); 4413 goto reconnect; 4414 } 4415 cmd = get_asender_cmd(be16_to_cpu(h->command)); 4416 len = be16_to_cpu(h->length); 4417 if (unlikely(cmd == NULL)) { 4418 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n", 4419 (long)be32_to_cpu(h->magic), 4420 h->command, h->length); 4421 goto disconnect; 4422 } 4423 expect = cmd->pkt_size; 4424 ERR_IF(len != expect-sizeof(struct p_header)) { 4425 trace_drbd_packet(mdev, mdev->meta.socket, 1, (void *)h, __FILE__, __LINE__); 4426 goto reconnect; 4427 } 4428 } 4429 if (received == expect) { 4430 D_ASSERT(cmd != NULL); 4431 trace_drbd_packet(mdev, mdev->meta.socket, 1, (void *)h, __FILE__, __LINE__); 4432 if (!cmd->process(mdev, h)) 4433 goto reconnect; 4434 4435 buf = h; 4436 received = 0; 4437 expect = sizeof(struct p_header); 4438 cmd = NULL; 4439 } 4440 } 4441 4442 if (0) { 4443 reconnect: 4444 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE)); 4445 } 4446 if (0) { 4447 disconnect: 4448 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 4449 } 4450 clear_bit(SIGNAL_ASENDER, &mdev->flags); 4451 4452 D_ASSERT(mdev->state.conn < C_CONNECTED); 4453 dev_info(DEV, "asender terminated\n"); 4454 4455 return 0; 4456 } 4457