1 /* 2 * Vhost User Bridge 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * Authors: 7 * Victor Kaplansky <victork@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or 10 * later. See the COPYING file in the top-level directory. 11 */ 12 13 /* 14 * TODO: 15 * - main should get parameters from the command line. 16 * - implement all request handlers. Still not implemented: 17 * vubr_get_queue_num_exec() 18 * vubr_send_rarp_exec() 19 * - test for broken requests and virtqueue. 20 * - implement features defined by Virtio 1.0 spec. 21 * - support mergeable buffers and indirect descriptors. 22 * - implement clean shutdown. 23 * - implement non-blocking writes to UDP backend. 24 * - implement polling strategy. 25 * - implement clean starting/stopping of vq processing 26 * - implement clean starting/stopping of used and buffers 27 * dirty page logging. 28 */ 29 30 #define _FILE_OFFSET_BITS 64 31 32 #include "qemu/osdep.h" 33 #include <sys/socket.h> 34 #include <sys/un.h> 35 #include <sys/unistd.h> 36 #include <sys/eventfd.h> 37 #include <arpa/inet.h> 38 #include <netdb.h> 39 #include <linux/vhost.h> 40 41 #include "qemu/atomic.h" 42 #include "standard-headers/linux/virtio_net.h" 43 #include "standard-headers/linux/virtio_ring.h" 44 45 #define VHOST_USER_BRIDGE_DEBUG 1 46 47 #define DPRINT(...) \ 48 do { \ 49 if (VHOST_USER_BRIDGE_DEBUG) { \ 50 printf(__VA_ARGS__); \ 51 } \ 52 } while (0) 53 54 typedef void (*CallbackFunc)(int sock, void *ctx); 55 56 typedef struct Event { 57 void *ctx; 58 CallbackFunc callback; 59 } Event; 60 61 typedef struct Dispatcher { 62 int max_sock; 63 fd_set fdset; 64 Event events[FD_SETSIZE]; 65 } Dispatcher; 66 67 static void 68 vubr_die(const char *s) 69 { 70 perror(s); 71 exit(1); 72 } 73 74 static int 75 dispatcher_init(Dispatcher *dispr) 76 { 77 FD_ZERO(&dispr->fdset); 78 dispr->max_sock = -1; 79 return 0; 80 } 81 82 static int 83 dispatcher_add(Dispatcher *dispr, int sock, void *ctx, CallbackFunc cb) 84 { 85 if (sock >= FD_SETSIZE) { 86 fprintf(stderr, 87 "Error: Failed to add new event. sock %d should be less than %d\n", 88 sock, FD_SETSIZE); 89 return -1; 90 } 91 92 dispr->events[sock].ctx = ctx; 93 dispr->events[sock].callback = cb; 94 95 FD_SET(sock, &dispr->fdset); 96 if (sock > dispr->max_sock) { 97 dispr->max_sock = sock; 98 } 99 DPRINT("Added sock %d for watching. max_sock: %d\n", 100 sock, dispr->max_sock); 101 return 0; 102 } 103 104 static int 105 dispatcher_remove(Dispatcher *dispr, int sock) 106 { 107 if (sock >= FD_SETSIZE) { 108 fprintf(stderr, 109 "Error: Failed to remove event. sock %d should be less than %d\n", 110 sock, FD_SETSIZE); 111 return -1; 112 } 113 114 FD_CLR(sock, &dispr->fdset); 115 DPRINT("Sock %d removed from dispatcher watch.\n", sock); 116 return 0; 117 } 118 119 /* timeout in us */ 120 static int 121 dispatcher_wait(Dispatcher *dispr, uint32_t timeout) 122 { 123 struct timeval tv; 124 tv.tv_sec = timeout / 1000000; 125 tv.tv_usec = timeout % 1000000; 126 127 fd_set fdset = dispr->fdset; 128 129 /* wait until some of sockets become readable. */ 130 int rc = select(dispr->max_sock + 1, &fdset, 0, 0, &tv); 131 132 if (rc == -1) { 133 vubr_die("select"); 134 } 135 136 /* Timeout */ 137 if (rc == 0) { 138 return 0; 139 } 140 141 /* Now call callback for every ready socket. */ 142 143 int sock; 144 for (sock = 0; sock < dispr->max_sock + 1; sock++) { 145 /* The callback on a socket can remove other sockets from the 146 * dispatcher, thus we have to check that the socket is 147 * still not removed from dispatcher's list 148 */ 149 if (FD_ISSET(sock, &fdset) && FD_ISSET(sock, &dispr->fdset)) { 150 Event *e = &dispr->events[sock]; 151 e->callback(sock, e->ctx); 152 } 153 } 154 155 return 0; 156 } 157 158 typedef struct VubrVirtq { 159 int call_fd; 160 int kick_fd; 161 uint32_t size; 162 uint16_t last_avail_index; 163 uint16_t last_used_index; 164 struct vring_desc *desc; 165 struct vring_avail *avail; 166 struct vring_used *used; 167 uint64_t log_guest_addr; 168 int enable; 169 } VubrVirtq; 170 171 /* Based on qemu/hw/virtio/vhost-user.c */ 172 173 #define VHOST_MEMORY_MAX_NREGIONS 8 174 #define VHOST_USER_F_PROTOCOL_FEATURES 30 175 /* v1.0 compliant. */ 176 #define VIRTIO_F_VERSION_1 32 177 178 #define VHOST_LOG_PAGE 4096 179 180 enum VhostUserProtocolFeature { 181 VHOST_USER_PROTOCOL_F_MQ = 0, 182 VHOST_USER_PROTOCOL_F_LOG_SHMFD = 1, 183 VHOST_USER_PROTOCOL_F_RARP = 2, 184 185 VHOST_USER_PROTOCOL_F_MAX 186 }; 187 188 #define VHOST_USER_PROTOCOL_FEATURE_MASK ((1 << VHOST_USER_PROTOCOL_F_MAX) - 1) 189 190 typedef enum VhostUserRequest { 191 VHOST_USER_NONE = 0, 192 VHOST_USER_GET_FEATURES = 1, 193 VHOST_USER_SET_FEATURES = 2, 194 VHOST_USER_SET_OWNER = 3, 195 VHOST_USER_RESET_OWNER = 4, 196 VHOST_USER_SET_MEM_TABLE = 5, 197 VHOST_USER_SET_LOG_BASE = 6, 198 VHOST_USER_SET_LOG_FD = 7, 199 VHOST_USER_SET_VRING_NUM = 8, 200 VHOST_USER_SET_VRING_ADDR = 9, 201 VHOST_USER_SET_VRING_BASE = 10, 202 VHOST_USER_GET_VRING_BASE = 11, 203 VHOST_USER_SET_VRING_KICK = 12, 204 VHOST_USER_SET_VRING_CALL = 13, 205 VHOST_USER_SET_VRING_ERR = 14, 206 VHOST_USER_GET_PROTOCOL_FEATURES = 15, 207 VHOST_USER_SET_PROTOCOL_FEATURES = 16, 208 VHOST_USER_GET_QUEUE_NUM = 17, 209 VHOST_USER_SET_VRING_ENABLE = 18, 210 VHOST_USER_SEND_RARP = 19, 211 VHOST_USER_MAX 212 } VhostUserRequest; 213 214 typedef struct VhostUserMemoryRegion { 215 uint64_t guest_phys_addr; 216 uint64_t memory_size; 217 uint64_t userspace_addr; 218 uint64_t mmap_offset; 219 } VhostUserMemoryRegion; 220 221 typedef struct VhostUserMemory { 222 uint32_t nregions; 223 uint32_t padding; 224 VhostUserMemoryRegion regions[VHOST_MEMORY_MAX_NREGIONS]; 225 } VhostUserMemory; 226 227 typedef struct VhostUserLog { 228 uint64_t mmap_size; 229 uint64_t mmap_offset; 230 } VhostUserLog; 231 232 typedef struct VhostUserMsg { 233 VhostUserRequest request; 234 235 #define VHOST_USER_VERSION_MASK (0x3) 236 #define VHOST_USER_REPLY_MASK (0x1<<2) 237 uint32_t flags; 238 uint32_t size; /* the following payload size */ 239 union { 240 #define VHOST_USER_VRING_IDX_MASK (0xff) 241 #define VHOST_USER_VRING_NOFD_MASK (0x1<<8) 242 uint64_t u64; 243 struct vhost_vring_state state; 244 struct vhost_vring_addr addr; 245 VhostUserMemory memory; 246 VhostUserLog log; 247 } payload; 248 int fds[VHOST_MEMORY_MAX_NREGIONS]; 249 int fd_num; 250 } QEMU_PACKED VhostUserMsg; 251 252 #define VHOST_USER_HDR_SIZE offsetof(VhostUserMsg, payload.u64) 253 254 /* The version of the protocol we support */ 255 #define VHOST_USER_VERSION (0x1) 256 257 #define MAX_NR_VIRTQUEUE (8) 258 259 typedef struct VubrDevRegion { 260 /* Guest Physical address. */ 261 uint64_t gpa; 262 /* Memory region size. */ 263 uint64_t size; 264 /* QEMU virtual address (userspace). */ 265 uint64_t qva; 266 /* Starting offset in our mmaped space. */ 267 uint64_t mmap_offset; 268 /* Start address of mmaped space. */ 269 uint64_t mmap_addr; 270 } VubrDevRegion; 271 272 typedef struct VubrDev { 273 int sock; 274 Dispatcher dispatcher; 275 uint32_t nregions; 276 VubrDevRegion regions[VHOST_MEMORY_MAX_NREGIONS]; 277 VubrVirtq vq[MAX_NR_VIRTQUEUE]; 278 int log_call_fd; 279 uint64_t log_size; 280 uint8_t *log_table; 281 int backend_udp_sock; 282 struct sockaddr_in backend_udp_dest; 283 int ready; 284 uint64_t features; 285 int hdrlen; 286 } VubrDev; 287 288 static const char *vubr_request_str[] = { 289 [VHOST_USER_NONE] = "VHOST_USER_NONE", 290 [VHOST_USER_GET_FEATURES] = "VHOST_USER_GET_FEATURES", 291 [VHOST_USER_SET_FEATURES] = "VHOST_USER_SET_FEATURES", 292 [VHOST_USER_SET_OWNER] = "VHOST_USER_SET_OWNER", 293 [VHOST_USER_RESET_OWNER] = "VHOST_USER_RESET_OWNER", 294 [VHOST_USER_SET_MEM_TABLE] = "VHOST_USER_SET_MEM_TABLE", 295 [VHOST_USER_SET_LOG_BASE] = "VHOST_USER_SET_LOG_BASE", 296 [VHOST_USER_SET_LOG_FD] = "VHOST_USER_SET_LOG_FD", 297 [VHOST_USER_SET_VRING_NUM] = "VHOST_USER_SET_VRING_NUM", 298 [VHOST_USER_SET_VRING_ADDR] = "VHOST_USER_SET_VRING_ADDR", 299 [VHOST_USER_SET_VRING_BASE] = "VHOST_USER_SET_VRING_BASE", 300 [VHOST_USER_GET_VRING_BASE] = "VHOST_USER_GET_VRING_BASE", 301 [VHOST_USER_SET_VRING_KICK] = "VHOST_USER_SET_VRING_KICK", 302 [VHOST_USER_SET_VRING_CALL] = "VHOST_USER_SET_VRING_CALL", 303 [VHOST_USER_SET_VRING_ERR] = "VHOST_USER_SET_VRING_ERR", 304 [VHOST_USER_GET_PROTOCOL_FEATURES] = "VHOST_USER_GET_PROTOCOL_FEATURES", 305 [VHOST_USER_SET_PROTOCOL_FEATURES] = "VHOST_USER_SET_PROTOCOL_FEATURES", 306 [VHOST_USER_GET_QUEUE_NUM] = "VHOST_USER_GET_QUEUE_NUM", 307 [VHOST_USER_SET_VRING_ENABLE] = "VHOST_USER_SET_VRING_ENABLE", 308 [VHOST_USER_SEND_RARP] = "VHOST_USER_SEND_RARP", 309 [VHOST_USER_MAX] = "VHOST_USER_MAX", 310 }; 311 312 static void 313 print_buffer(uint8_t *buf, size_t len) 314 { 315 int i; 316 printf("Raw buffer:\n"); 317 for (i = 0; i < len; i++) { 318 if (i % 16 == 0) { 319 printf("\n"); 320 } 321 if (i % 4 == 0) { 322 printf(" "); 323 } 324 printf("%02x ", buf[i]); 325 } 326 printf("\n............................................................\n"); 327 } 328 329 /* Translate guest physical address to our virtual address. */ 330 static uint64_t 331 gpa_to_va(VubrDev *dev, uint64_t guest_addr) 332 { 333 int i; 334 335 /* Find matching memory region. */ 336 for (i = 0; i < dev->nregions; i++) { 337 VubrDevRegion *r = &dev->regions[i]; 338 339 if ((guest_addr >= r->gpa) && (guest_addr < (r->gpa + r->size))) { 340 return guest_addr - r->gpa + r->mmap_addr + r->mmap_offset; 341 } 342 } 343 344 assert(!"address not found in regions"); 345 return 0; 346 } 347 348 /* Translate qemu virtual address to our virtual address. */ 349 static uint64_t 350 qva_to_va(VubrDev *dev, uint64_t qemu_addr) 351 { 352 int i; 353 354 /* Find matching memory region. */ 355 for (i = 0; i < dev->nregions; i++) { 356 VubrDevRegion *r = &dev->regions[i]; 357 358 if ((qemu_addr >= r->qva) && (qemu_addr < (r->qva + r->size))) { 359 return qemu_addr - r->qva + r->mmap_addr + r->mmap_offset; 360 } 361 } 362 363 assert(!"address not found in regions"); 364 return 0; 365 } 366 367 static void 368 vubr_message_read(int conn_fd, VhostUserMsg *vmsg) 369 { 370 char control[CMSG_SPACE(VHOST_MEMORY_MAX_NREGIONS * sizeof(int))] = { }; 371 struct iovec iov = { 372 .iov_base = (char *)vmsg, 373 .iov_len = VHOST_USER_HDR_SIZE, 374 }; 375 struct msghdr msg = { 376 .msg_iov = &iov, 377 .msg_iovlen = 1, 378 .msg_control = control, 379 .msg_controllen = sizeof(control), 380 }; 381 size_t fd_size; 382 struct cmsghdr *cmsg; 383 int rc; 384 385 rc = recvmsg(conn_fd, &msg, 0); 386 387 if (rc == 0) { 388 vubr_die("recvmsg"); 389 fprintf(stderr, "Peer disconnected.\n"); 390 exit(1); 391 } 392 if (rc < 0) { 393 vubr_die("recvmsg"); 394 } 395 396 vmsg->fd_num = 0; 397 for (cmsg = CMSG_FIRSTHDR(&msg); 398 cmsg != NULL; 399 cmsg = CMSG_NXTHDR(&msg, cmsg)) 400 { 401 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { 402 fd_size = cmsg->cmsg_len - CMSG_LEN(0); 403 vmsg->fd_num = fd_size / sizeof(int); 404 memcpy(vmsg->fds, CMSG_DATA(cmsg), fd_size); 405 break; 406 } 407 } 408 409 if (vmsg->size > sizeof(vmsg->payload)) { 410 fprintf(stderr, 411 "Error: too big message request: %d, size: vmsg->size: %u, " 412 "while sizeof(vmsg->payload) = %zu\n", 413 vmsg->request, vmsg->size, sizeof(vmsg->payload)); 414 exit(1); 415 } 416 417 if (vmsg->size) { 418 rc = read(conn_fd, &vmsg->payload, vmsg->size); 419 if (rc == 0) { 420 vubr_die("recvmsg"); 421 fprintf(stderr, "Peer disconnected.\n"); 422 exit(1); 423 } 424 if (rc < 0) { 425 vubr_die("recvmsg"); 426 } 427 428 assert(rc == vmsg->size); 429 } 430 } 431 432 static void 433 vubr_message_write(int conn_fd, VhostUserMsg *vmsg) 434 { 435 int rc; 436 437 do { 438 rc = write(conn_fd, vmsg, VHOST_USER_HDR_SIZE + vmsg->size); 439 } while (rc < 0 && errno == EINTR); 440 441 if (rc < 0) { 442 vubr_die("write"); 443 } 444 } 445 446 static void 447 vubr_backend_udp_sendbuf(VubrDev *dev, uint8_t *buf, size_t len) 448 { 449 int slen = sizeof(struct sockaddr_in); 450 451 if (sendto(dev->backend_udp_sock, buf, len, 0, 452 (struct sockaddr *) &dev->backend_udp_dest, slen) == -1) { 453 vubr_die("sendto()"); 454 } 455 } 456 457 static int 458 vubr_backend_udp_recvbuf(VubrDev *dev, uint8_t *buf, size_t buflen) 459 { 460 int slen = sizeof(struct sockaddr_in); 461 int rc; 462 463 rc = recvfrom(dev->backend_udp_sock, buf, buflen, 0, 464 (struct sockaddr *) &dev->backend_udp_dest, 465 (socklen_t *)&slen); 466 if (rc == -1) { 467 vubr_die("recvfrom()"); 468 } 469 470 return rc; 471 } 472 473 static void 474 vubr_consume_raw_packet(VubrDev *dev, uint8_t *buf, uint32_t len) 475 { 476 int hdrlen = dev->hdrlen; 477 DPRINT(" hdrlen = %d\n", dev->hdrlen); 478 479 if (VHOST_USER_BRIDGE_DEBUG) { 480 print_buffer(buf, len); 481 } 482 vubr_backend_udp_sendbuf(dev, buf + hdrlen, len - hdrlen); 483 } 484 485 /* Kick the log_call_fd if required. */ 486 static void 487 vubr_log_kick(VubrDev *dev) 488 { 489 if (dev->log_call_fd != -1) { 490 DPRINT("Kicking the QEMU's log...\n"); 491 eventfd_write(dev->log_call_fd, 1); 492 } 493 } 494 495 /* Kick the guest if necessary. */ 496 static void 497 vubr_virtqueue_kick(VubrVirtq *vq) 498 { 499 if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) { 500 DPRINT("Kicking the guest...\n"); 501 eventfd_write(vq->call_fd, 1); 502 } 503 } 504 505 static void 506 vubr_log_page(uint8_t *log_table, uint64_t page) 507 { 508 DPRINT("Logged dirty guest page: %"PRId64"\n", page); 509 atomic_or(&log_table[page / 8], 1 << (page % 8)); 510 } 511 512 static void 513 vubr_log_write(VubrDev *dev, uint64_t address, uint64_t length) 514 { 515 uint64_t page; 516 517 if (!(dev->features & (1ULL << VHOST_F_LOG_ALL)) || 518 !dev->log_table || !length) { 519 return; 520 } 521 522 assert(dev->log_size > ((address + length - 1) / VHOST_LOG_PAGE / 8)); 523 524 page = address / VHOST_LOG_PAGE; 525 while (page * VHOST_LOG_PAGE < address + length) { 526 vubr_log_page(dev->log_table, page); 527 page += VHOST_LOG_PAGE; 528 } 529 vubr_log_kick(dev); 530 } 531 532 static void 533 vubr_post_buffer(VubrDev *dev, VubrVirtq *vq, uint8_t *buf, int32_t len) 534 { 535 struct vring_desc *desc = vq->desc; 536 struct vring_avail *avail = vq->avail; 537 struct vring_used *used = vq->used; 538 uint64_t log_guest_addr = vq->log_guest_addr; 539 int32_t remaining_len = len; 540 541 unsigned int size = vq->size; 542 543 uint16_t avail_index = atomic_mb_read(&avail->idx); 544 545 /* We check the available descriptors before posting the 546 * buffer, so here we assume that enough available 547 * descriptors. */ 548 assert(vq->last_avail_index != avail_index); 549 uint16_t a_index = vq->last_avail_index % size; 550 uint16_t u_index = vq->last_used_index % size; 551 uint16_t d_index = avail->ring[a_index]; 552 553 int i = d_index; 554 uint32_t written_len = 0; 555 556 do { 557 DPRINT("Post packet to guest on vq:\n"); 558 DPRINT(" size = %d\n", vq->size); 559 DPRINT(" last_avail_index = %d\n", vq->last_avail_index); 560 DPRINT(" last_used_index = %d\n", vq->last_used_index); 561 DPRINT(" a_index = %d\n", a_index); 562 DPRINT(" u_index = %d\n", u_index); 563 DPRINT(" d_index = %d\n", d_index); 564 DPRINT(" desc[%d].addr = 0x%016"PRIx64"\n", i, desc[i].addr); 565 DPRINT(" desc[%d].len = %d\n", i, desc[i].len); 566 DPRINT(" desc[%d].flags = %d\n", i, desc[i].flags); 567 DPRINT(" avail->idx = %d\n", avail_index); 568 DPRINT(" used->idx = %d\n", used->idx); 569 570 if (!(desc[i].flags & VRING_DESC_F_WRITE)) { 571 /* FIXME: we should find writable descriptor. */ 572 fprintf(stderr, "Error: descriptor is not writable. Exiting.\n"); 573 exit(1); 574 } 575 576 void *chunk_start = (void *)(uintptr_t)gpa_to_va(dev, desc[i].addr); 577 uint32_t chunk_len = desc[i].len; 578 uint32_t chunk_write_len = MIN(remaining_len, chunk_len); 579 580 memcpy(chunk_start, buf + written_len, chunk_write_len); 581 vubr_log_write(dev, desc[i].addr, chunk_write_len); 582 remaining_len -= chunk_write_len; 583 written_len += chunk_write_len; 584 585 if ((remaining_len == 0) || !(desc[i].flags & VRING_DESC_F_NEXT)) { 586 break; 587 } 588 589 i = desc[i].next; 590 } while (1); 591 592 if (remaining_len > 0) { 593 fprintf(stderr, 594 "Too long packet for RX, remaining_len = %d, Dropping...\n", 595 remaining_len); 596 return; 597 } 598 599 /* Add descriptor to the used ring. */ 600 used->ring[u_index].id = d_index; 601 used->ring[u_index].len = len; 602 vubr_log_write(dev, 603 log_guest_addr + offsetof(struct vring_used, ring[u_index]), 604 sizeof(used->ring[u_index])); 605 606 vq->last_avail_index++; 607 vq->last_used_index++; 608 609 atomic_mb_set(&used->idx, vq->last_used_index); 610 vubr_log_write(dev, 611 log_guest_addr + offsetof(struct vring_used, idx), 612 sizeof(used->idx)); 613 614 /* Kick the guest if necessary. */ 615 vubr_virtqueue_kick(vq); 616 } 617 618 static int 619 vubr_process_desc(VubrDev *dev, VubrVirtq *vq) 620 { 621 struct vring_desc *desc = vq->desc; 622 struct vring_avail *avail = vq->avail; 623 struct vring_used *used = vq->used; 624 uint64_t log_guest_addr = vq->log_guest_addr; 625 626 unsigned int size = vq->size; 627 628 uint16_t a_index = vq->last_avail_index % size; 629 uint16_t u_index = vq->last_used_index % size; 630 uint16_t d_index = avail->ring[a_index]; 631 632 uint32_t i, len = 0; 633 size_t buf_size = 4096; 634 uint8_t buf[4096]; 635 636 DPRINT("Chunks: "); 637 i = d_index; 638 do { 639 void *chunk_start = (void *)(uintptr_t)gpa_to_va(dev, desc[i].addr); 640 uint32_t chunk_len = desc[i].len; 641 642 assert(!(desc[i].flags & VRING_DESC_F_WRITE)); 643 644 if (len + chunk_len < buf_size) { 645 memcpy(buf + len, chunk_start, chunk_len); 646 DPRINT("%d ", chunk_len); 647 } else { 648 fprintf(stderr, "Error: too long packet. Dropping...\n"); 649 break; 650 } 651 652 len += chunk_len; 653 654 if (!(desc[i].flags & VRING_DESC_F_NEXT)) { 655 break; 656 } 657 658 i = desc[i].next; 659 } while (1); 660 DPRINT("\n"); 661 662 if (!len) { 663 return -1; 664 } 665 666 /* Add descriptor to the used ring. */ 667 used->ring[u_index].id = d_index; 668 used->ring[u_index].len = len; 669 vubr_log_write(dev, 670 log_guest_addr + offsetof(struct vring_used, ring[u_index]), 671 sizeof(used->ring[u_index])); 672 673 vubr_consume_raw_packet(dev, buf, len); 674 675 return 0; 676 } 677 678 static void 679 vubr_process_avail(VubrDev *dev, VubrVirtq *vq) 680 { 681 struct vring_avail *avail = vq->avail; 682 struct vring_used *used = vq->used; 683 uint64_t log_guest_addr = vq->log_guest_addr; 684 685 while (vq->last_avail_index != atomic_mb_read(&avail->idx)) { 686 vubr_process_desc(dev, vq); 687 vq->last_avail_index++; 688 vq->last_used_index++; 689 } 690 691 atomic_mb_set(&used->idx, vq->last_used_index); 692 vubr_log_write(dev, 693 log_guest_addr + offsetof(struct vring_used, idx), 694 sizeof(used->idx)); 695 } 696 697 static void 698 vubr_backend_recv_cb(int sock, void *ctx) 699 { 700 VubrDev *dev = (VubrDev *) ctx; 701 VubrVirtq *rx_vq = &dev->vq[0]; 702 uint8_t buf[4096]; 703 struct virtio_net_hdr_v1 *hdr = (struct virtio_net_hdr_v1 *)buf; 704 int hdrlen = dev->hdrlen; 705 int buflen = sizeof(buf); 706 int len; 707 708 if (!dev->ready) { 709 return; 710 } 711 712 DPRINT("\n\n *** IN UDP RECEIVE CALLBACK ***\n\n"); 713 DPRINT(" hdrlen = %d\n", hdrlen); 714 715 uint16_t avail_index = atomic_mb_read(&rx_vq->avail->idx); 716 717 /* If there is no available descriptors, just do nothing. 718 * The buffer will be handled by next arrived UDP packet, 719 * or next kick on receive virtq. */ 720 if (rx_vq->last_avail_index == avail_index) { 721 DPRINT("Got UDP packet, but no available descriptors on RX virtq.\n"); 722 return; 723 } 724 725 memset(buf, 0, hdrlen); 726 /* TODO: support mergeable buffers. */ 727 if (hdrlen == 12) 728 hdr->num_buffers = 1; 729 len = vubr_backend_udp_recvbuf(dev, buf + hdrlen, buflen - hdrlen); 730 731 vubr_post_buffer(dev, rx_vq, buf, len + hdrlen); 732 } 733 734 static void 735 vubr_kick_cb(int sock, void *ctx) 736 { 737 VubrDev *dev = (VubrDev *) ctx; 738 eventfd_t kick_data; 739 ssize_t rc; 740 741 rc = eventfd_read(sock, &kick_data); 742 if (rc == -1) { 743 vubr_die("eventfd_read()"); 744 } else { 745 DPRINT("Got kick_data: %016"PRIx64"\n", kick_data); 746 vubr_process_avail(dev, &dev->vq[1]); 747 } 748 } 749 750 static int 751 vubr_none_exec(VubrDev *dev, VhostUserMsg *vmsg) 752 { 753 DPRINT("Function %s() not implemented yet.\n", __func__); 754 return 0; 755 } 756 757 static int 758 vubr_get_features_exec(VubrDev *dev, VhostUserMsg *vmsg) 759 { 760 vmsg->payload.u64 = 761 ((1ULL << VIRTIO_NET_F_MRG_RXBUF) | 762 (1ULL << VHOST_F_LOG_ALL) | 763 (1ULL << VIRTIO_NET_F_GUEST_ANNOUNCE) | 764 (1ULL << VHOST_USER_F_PROTOCOL_FEATURES)); 765 766 vmsg->size = sizeof(vmsg->payload.u64); 767 768 DPRINT("Sending back to guest u64: 0x%016"PRIx64"\n", vmsg->payload.u64); 769 770 /* Reply */ 771 return 1; 772 } 773 774 static int 775 vubr_set_features_exec(VubrDev *dev, VhostUserMsg *vmsg) 776 { 777 DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); 778 779 dev->features = vmsg->payload.u64; 780 if ((dev->features & (1ULL << VIRTIO_F_VERSION_1)) || 781 (dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF))) { 782 dev->hdrlen = 12; 783 } else { 784 dev->hdrlen = 10; 785 } 786 787 return 0; 788 } 789 790 static int 791 vubr_set_owner_exec(VubrDev *dev, VhostUserMsg *vmsg) 792 { 793 return 0; 794 } 795 796 static void 797 vubr_close_log(VubrDev *dev) 798 { 799 if (dev->log_table) { 800 if (munmap(dev->log_table, dev->log_size) != 0) { 801 vubr_die("munmap()"); 802 } 803 804 dev->log_table = 0; 805 } 806 if (dev->log_call_fd != -1) { 807 close(dev->log_call_fd); 808 dev->log_call_fd = -1; 809 } 810 } 811 812 static int 813 vubr_reset_device_exec(VubrDev *dev, VhostUserMsg *vmsg) 814 { 815 vubr_close_log(dev); 816 dev->ready = 0; 817 dev->features = 0; 818 return 0; 819 } 820 821 static int 822 vubr_set_mem_table_exec(VubrDev *dev, VhostUserMsg *vmsg) 823 { 824 int i; 825 VhostUserMemory *memory = &vmsg->payload.memory; 826 dev->nregions = memory->nregions; 827 828 DPRINT("Nregions: %d\n", memory->nregions); 829 for (i = 0; i < dev->nregions; i++) { 830 void *mmap_addr; 831 VhostUserMemoryRegion *msg_region = &memory->regions[i]; 832 VubrDevRegion *dev_region = &dev->regions[i]; 833 834 DPRINT("Region %d\n", i); 835 DPRINT(" guest_phys_addr: 0x%016"PRIx64"\n", 836 msg_region->guest_phys_addr); 837 DPRINT(" memory_size: 0x%016"PRIx64"\n", 838 msg_region->memory_size); 839 DPRINT(" userspace_addr 0x%016"PRIx64"\n", 840 msg_region->userspace_addr); 841 DPRINT(" mmap_offset 0x%016"PRIx64"\n", 842 msg_region->mmap_offset); 843 844 dev_region->gpa = msg_region->guest_phys_addr; 845 dev_region->size = msg_region->memory_size; 846 dev_region->qva = msg_region->userspace_addr; 847 dev_region->mmap_offset = msg_region->mmap_offset; 848 849 /* We don't use offset argument of mmap() since the 850 * mapped address has to be page aligned, and we use huge 851 * pages. */ 852 mmap_addr = mmap(0, dev_region->size + dev_region->mmap_offset, 853 PROT_READ | PROT_WRITE, MAP_SHARED, 854 vmsg->fds[i], 0); 855 856 if (mmap_addr == MAP_FAILED) { 857 vubr_die("mmap"); 858 } 859 dev_region->mmap_addr = (uint64_t)(uintptr_t)mmap_addr; 860 DPRINT(" mmap_addr: 0x%016"PRIx64"\n", dev_region->mmap_addr); 861 862 close(vmsg->fds[i]); 863 } 864 865 return 0; 866 } 867 868 static int 869 vubr_set_log_base_exec(VubrDev *dev, VhostUserMsg *vmsg) 870 { 871 int fd; 872 uint64_t log_mmap_size, log_mmap_offset; 873 void *rc; 874 875 assert(vmsg->fd_num == 1); 876 fd = vmsg->fds[0]; 877 878 assert(vmsg->size == sizeof(vmsg->payload.log)); 879 log_mmap_offset = vmsg->payload.log.mmap_offset; 880 log_mmap_size = vmsg->payload.log.mmap_size; 881 DPRINT("Log mmap_offset: %"PRId64"\n", log_mmap_offset); 882 DPRINT("Log mmap_size: %"PRId64"\n", log_mmap_size); 883 884 rc = mmap(0, log_mmap_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 885 log_mmap_offset); 886 if (rc == MAP_FAILED) { 887 vubr_die("mmap"); 888 } 889 dev->log_table = rc; 890 dev->log_size = log_mmap_size; 891 892 vmsg->size = sizeof(vmsg->payload.u64); 893 /* Reply */ 894 return 1; 895 } 896 897 static int 898 vubr_set_log_fd_exec(VubrDev *dev, VhostUserMsg *vmsg) 899 { 900 assert(vmsg->fd_num == 1); 901 dev->log_call_fd = vmsg->fds[0]; 902 DPRINT("Got log_call_fd: %d\n", vmsg->fds[0]); 903 return 0; 904 } 905 906 static int 907 vubr_set_vring_num_exec(VubrDev *dev, VhostUserMsg *vmsg) 908 { 909 unsigned int index = vmsg->payload.state.index; 910 unsigned int num = vmsg->payload.state.num; 911 912 DPRINT("State.index: %d\n", index); 913 DPRINT("State.num: %d\n", num); 914 dev->vq[index].size = num; 915 return 0; 916 } 917 918 static int 919 vubr_set_vring_addr_exec(VubrDev *dev, VhostUserMsg *vmsg) 920 { 921 struct vhost_vring_addr *vra = &vmsg->payload.addr; 922 unsigned int index = vra->index; 923 VubrVirtq *vq = &dev->vq[index]; 924 925 DPRINT("vhost_vring_addr:\n"); 926 DPRINT(" index: %d\n", vra->index); 927 DPRINT(" flags: %d\n", vra->flags); 928 DPRINT(" desc_user_addr: 0x%016llx\n", vra->desc_user_addr); 929 DPRINT(" used_user_addr: 0x%016llx\n", vra->used_user_addr); 930 DPRINT(" avail_user_addr: 0x%016llx\n", vra->avail_user_addr); 931 DPRINT(" log_guest_addr: 0x%016llx\n", vra->log_guest_addr); 932 933 vq->desc = (struct vring_desc *)(uintptr_t)qva_to_va(dev, vra->desc_user_addr); 934 vq->used = (struct vring_used *)(uintptr_t)qva_to_va(dev, vra->used_user_addr); 935 vq->avail = (struct vring_avail *)(uintptr_t)qva_to_va(dev, vra->avail_user_addr); 936 vq->log_guest_addr = vra->log_guest_addr; 937 938 DPRINT("Setting virtq addresses:\n"); 939 DPRINT(" vring_desc at %p\n", vq->desc); 940 DPRINT(" vring_used at %p\n", vq->used); 941 DPRINT(" vring_avail at %p\n", vq->avail); 942 943 vq->last_used_index = vq->used->idx; 944 945 if (vq->last_avail_index != vq->used->idx) { 946 DPRINT("Last avail index != used index: %d != %d, resuming", 947 vq->last_avail_index, vq->used->idx); 948 vq->last_avail_index = vq->used->idx; 949 } 950 951 return 0; 952 } 953 954 static int 955 vubr_set_vring_base_exec(VubrDev *dev, VhostUserMsg *vmsg) 956 { 957 unsigned int index = vmsg->payload.state.index; 958 unsigned int num = vmsg->payload.state.num; 959 960 DPRINT("State.index: %d\n", index); 961 DPRINT("State.num: %d\n", num); 962 dev->vq[index].last_avail_index = num; 963 964 return 0; 965 } 966 967 static int 968 vubr_get_vring_base_exec(VubrDev *dev, VhostUserMsg *vmsg) 969 { 970 unsigned int index = vmsg->payload.state.index; 971 972 DPRINT("State.index: %d\n", index); 973 vmsg->payload.state.num = dev->vq[index].last_avail_index; 974 vmsg->size = sizeof(vmsg->payload.state); 975 /* FIXME: this is a work-around for a bug in QEMU enabling 976 * too early vrings. When protocol features are enabled, 977 * we have to respect * VHOST_USER_SET_VRING_ENABLE request. */ 978 dev->ready = 0; 979 980 if (dev->vq[index].call_fd != -1) { 981 close(dev->vq[index].call_fd); 982 dispatcher_remove(&dev->dispatcher, dev->vq[index].call_fd); 983 dev->vq[index].call_fd = -1; 984 } 985 if (dev->vq[index].kick_fd != -1) { 986 close(dev->vq[index].kick_fd); 987 dispatcher_remove(&dev->dispatcher, dev->vq[index].kick_fd); 988 dev->vq[index].kick_fd = -1; 989 } 990 991 /* Reply */ 992 return 1; 993 } 994 995 static int 996 vubr_set_vring_kick_exec(VubrDev *dev, VhostUserMsg *vmsg) 997 { 998 uint64_t u64_arg = vmsg->payload.u64; 999 int index = u64_arg & VHOST_USER_VRING_IDX_MASK; 1000 1001 DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); 1002 1003 assert((u64_arg & VHOST_USER_VRING_NOFD_MASK) == 0); 1004 assert(vmsg->fd_num == 1); 1005 1006 if (dev->vq[index].kick_fd != -1) { 1007 close(dev->vq[index].kick_fd); 1008 dispatcher_remove(&dev->dispatcher, dev->vq[index].kick_fd); 1009 } 1010 dev->vq[index].kick_fd = vmsg->fds[0]; 1011 DPRINT("Got kick_fd: %d for vq: %d\n", vmsg->fds[0], index); 1012 1013 if (index % 2 == 1) { 1014 /* TX queue. */ 1015 dispatcher_add(&dev->dispatcher, dev->vq[index].kick_fd, 1016 dev, vubr_kick_cb); 1017 1018 DPRINT("Waiting for kicks on fd: %d for vq: %d\n", 1019 dev->vq[index].kick_fd, index); 1020 } 1021 /* We temporarily use this hack to determine that both TX and RX 1022 * queues are set up and ready for processing. 1023 * FIXME: we need to rely in VHOST_USER_SET_VRING_ENABLE and 1024 * actual kicks. */ 1025 if (dev->vq[0].kick_fd != -1 && 1026 dev->vq[1].kick_fd != -1) { 1027 dev->ready = 1; 1028 DPRINT("vhost-user-bridge is ready for processing queues.\n"); 1029 } 1030 return 0; 1031 1032 } 1033 1034 static int 1035 vubr_set_vring_call_exec(VubrDev *dev, VhostUserMsg *vmsg) 1036 { 1037 uint64_t u64_arg = vmsg->payload.u64; 1038 int index = u64_arg & VHOST_USER_VRING_IDX_MASK; 1039 1040 DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); 1041 assert((u64_arg & VHOST_USER_VRING_NOFD_MASK) == 0); 1042 assert(vmsg->fd_num == 1); 1043 1044 if (dev->vq[index].call_fd != -1) { 1045 close(dev->vq[index].call_fd); 1046 dispatcher_remove(&dev->dispatcher, dev->vq[index].call_fd); 1047 } 1048 dev->vq[index].call_fd = vmsg->fds[0]; 1049 DPRINT("Got call_fd: %d for vq: %d\n", vmsg->fds[0], index); 1050 1051 return 0; 1052 } 1053 1054 static int 1055 vubr_set_vring_err_exec(VubrDev *dev, VhostUserMsg *vmsg) 1056 { 1057 DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); 1058 return 0; 1059 } 1060 1061 static int 1062 vubr_get_protocol_features_exec(VubrDev *dev, VhostUserMsg *vmsg) 1063 { 1064 vmsg->payload.u64 = 1ULL << VHOST_USER_PROTOCOL_F_LOG_SHMFD; 1065 DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); 1066 vmsg->size = sizeof(vmsg->payload.u64); 1067 1068 /* Reply */ 1069 return 1; 1070 } 1071 1072 static int 1073 vubr_set_protocol_features_exec(VubrDev *dev, VhostUserMsg *vmsg) 1074 { 1075 /* FIXME: unimplented */ 1076 DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); 1077 return 0; 1078 } 1079 1080 static int 1081 vubr_get_queue_num_exec(VubrDev *dev, VhostUserMsg *vmsg) 1082 { 1083 DPRINT("Function %s() not implemented yet.\n", __func__); 1084 return 0; 1085 } 1086 1087 static int 1088 vubr_set_vring_enable_exec(VubrDev *dev, VhostUserMsg *vmsg) 1089 { 1090 unsigned int index = vmsg->payload.state.index; 1091 unsigned int enable = vmsg->payload.state.num; 1092 1093 DPRINT("State.index: %d\n", index); 1094 DPRINT("State.enable: %d\n", enable); 1095 dev->vq[index].enable = enable; 1096 return 0; 1097 } 1098 1099 static int 1100 vubr_send_rarp_exec(VubrDev *dev, VhostUserMsg *vmsg) 1101 { 1102 DPRINT("Function %s() not implemented yet.\n", __func__); 1103 return 0; 1104 } 1105 1106 static int 1107 vubr_execute_request(VubrDev *dev, VhostUserMsg *vmsg) 1108 { 1109 /* Print out generic part of the request. */ 1110 DPRINT( 1111 "================== Vhost user message from QEMU ==================\n"); 1112 DPRINT("Request: %s (%d)\n", vubr_request_str[vmsg->request], 1113 vmsg->request); 1114 DPRINT("Flags: 0x%x\n", vmsg->flags); 1115 DPRINT("Size: %d\n", vmsg->size); 1116 1117 if (vmsg->fd_num) { 1118 int i; 1119 DPRINT("Fds:"); 1120 for (i = 0; i < vmsg->fd_num; i++) { 1121 DPRINT(" %d", vmsg->fds[i]); 1122 } 1123 DPRINT("\n"); 1124 } 1125 1126 switch (vmsg->request) { 1127 case VHOST_USER_NONE: 1128 return vubr_none_exec(dev, vmsg); 1129 case VHOST_USER_GET_FEATURES: 1130 return vubr_get_features_exec(dev, vmsg); 1131 case VHOST_USER_SET_FEATURES: 1132 return vubr_set_features_exec(dev, vmsg); 1133 case VHOST_USER_SET_OWNER: 1134 return vubr_set_owner_exec(dev, vmsg); 1135 case VHOST_USER_RESET_OWNER: 1136 return vubr_reset_device_exec(dev, vmsg); 1137 case VHOST_USER_SET_MEM_TABLE: 1138 return vubr_set_mem_table_exec(dev, vmsg); 1139 case VHOST_USER_SET_LOG_BASE: 1140 return vubr_set_log_base_exec(dev, vmsg); 1141 case VHOST_USER_SET_LOG_FD: 1142 return vubr_set_log_fd_exec(dev, vmsg); 1143 case VHOST_USER_SET_VRING_NUM: 1144 return vubr_set_vring_num_exec(dev, vmsg); 1145 case VHOST_USER_SET_VRING_ADDR: 1146 return vubr_set_vring_addr_exec(dev, vmsg); 1147 case VHOST_USER_SET_VRING_BASE: 1148 return vubr_set_vring_base_exec(dev, vmsg); 1149 case VHOST_USER_GET_VRING_BASE: 1150 return vubr_get_vring_base_exec(dev, vmsg); 1151 case VHOST_USER_SET_VRING_KICK: 1152 return vubr_set_vring_kick_exec(dev, vmsg); 1153 case VHOST_USER_SET_VRING_CALL: 1154 return vubr_set_vring_call_exec(dev, vmsg); 1155 case VHOST_USER_SET_VRING_ERR: 1156 return vubr_set_vring_err_exec(dev, vmsg); 1157 case VHOST_USER_GET_PROTOCOL_FEATURES: 1158 return vubr_get_protocol_features_exec(dev, vmsg); 1159 case VHOST_USER_SET_PROTOCOL_FEATURES: 1160 return vubr_set_protocol_features_exec(dev, vmsg); 1161 case VHOST_USER_GET_QUEUE_NUM: 1162 return vubr_get_queue_num_exec(dev, vmsg); 1163 case VHOST_USER_SET_VRING_ENABLE: 1164 return vubr_set_vring_enable_exec(dev, vmsg); 1165 case VHOST_USER_SEND_RARP: 1166 return vubr_send_rarp_exec(dev, vmsg); 1167 1168 case VHOST_USER_MAX: 1169 assert(vmsg->request != VHOST_USER_MAX); 1170 } 1171 return 0; 1172 } 1173 1174 static void 1175 vubr_receive_cb(int sock, void *ctx) 1176 { 1177 VubrDev *dev = (VubrDev *) ctx; 1178 VhostUserMsg vmsg; 1179 int reply_requested; 1180 1181 vubr_message_read(sock, &vmsg); 1182 reply_requested = vubr_execute_request(dev, &vmsg); 1183 if (reply_requested) { 1184 /* Set the version in the flags when sending the reply */ 1185 vmsg.flags &= ~VHOST_USER_VERSION_MASK; 1186 vmsg.flags |= VHOST_USER_VERSION; 1187 vmsg.flags |= VHOST_USER_REPLY_MASK; 1188 vubr_message_write(sock, &vmsg); 1189 } 1190 } 1191 1192 static void 1193 vubr_accept_cb(int sock, void *ctx) 1194 { 1195 VubrDev *dev = (VubrDev *)ctx; 1196 int conn_fd; 1197 struct sockaddr_un un; 1198 socklen_t len = sizeof(un); 1199 1200 conn_fd = accept(sock, (struct sockaddr *) &un, &len); 1201 if (conn_fd == -1) { 1202 vubr_die("accept()"); 1203 } 1204 DPRINT("Got connection from remote peer on sock %d\n", conn_fd); 1205 dispatcher_add(&dev->dispatcher, conn_fd, ctx, vubr_receive_cb); 1206 } 1207 1208 static VubrDev * 1209 vubr_new(const char *path, bool client) 1210 { 1211 VubrDev *dev = (VubrDev *) calloc(1, sizeof(VubrDev)); 1212 dev->nregions = 0; 1213 int i; 1214 struct sockaddr_un un; 1215 CallbackFunc cb; 1216 size_t len; 1217 1218 for (i = 0; i < MAX_NR_VIRTQUEUE; i++) { 1219 dev->vq[i] = (VubrVirtq) { 1220 .call_fd = -1, .kick_fd = -1, 1221 .size = 0, 1222 .last_avail_index = 0, .last_used_index = 0, 1223 .desc = 0, .avail = 0, .used = 0, 1224 .enable = 0, 1225 }; 1226 } 1227 1228 /* Init log */ 1229 dev->log_call_fd = -1; 1230 dev->log_size = 0; 1231 dev->log_table = 0; 1232 dev->ready = 0; 1233 dev->features = 0; 1234 1235 /* Get a UNIX socket. */ 1236 dev->sock = socket(AF_UNIX, SOCK_STREAM, 0); 1237 if (dev->sock == -1) { 1238 vubr_die("socket"); 1239 } 1240 1241 un.sun_family = AF_UNIX; 1242 strcpy(un.sun_path, path); 1243 len = sizeof(un.sun_family) + strlen(path); 1244 1245 if (!client) { 1246 unlink(path); 1247 1248 if (bind(dev->sock, (struct sockaddr *) &un, len) == -1) { 1249 vubr_die("bind"); 1250 } 1251 1252 if (listen(dev->sock, 1) == -1) { 1253 vubr_die("listen"); 1254 } 1255 cb = vubr_accept_cb; 1256 1257 DPRINT("Waiting for connections on UNIX socket %s ...\n", path); 1258 } else { 1259 if (connect(dev->sock, (struct sockaddr *)&un, len) == -1) { 1260 vubr_die("connect"); 1261 } 1262 cb = vubr_receive_cb; 1263 } 1264 1265 dispatcher_init(&dev->dispatcher); 1266 dispatcher_add(&dev->dispatcher, dev->sock, (void *)dev, cb); 1267 1268 return dev; 1269 } 1270 1271 static void 1272 vubr_set_host(struct sockaddr_in *saddr, const char *host) 1273 { 1274 if (isdigit(host[0])) { 1275 if (!inet_aton(host, &saddr->sin_addr)) { 1276 fprintf(stderr, "inet_aton() failed.\n"); 1277 exit(1); 1278 } 1279 } else { 1280 struct hostent *he = gethostbyname(host); 1281 1282 if (!he) { 1283 fprintf(stderr, "gethostbyname() failed.\n"); 1284 exit(1); 1285 } 1286 saddr->sin_addr = *(struct in_addr *)he->h_addr; 1287 } 1288 } 1289 1290 static void 1291 vubr_backend_udp_setup(VubrDev *dev, 1292 const char *local_host, 1293 const char *local_port, 1294 const char *remote_host, 1295 const char *remote_port) 1296 { 1297 int sock; 1298 const char *r; 1299 1300 int lport, rport; 1301 1302 lport = strtol(local_port, (char **)&r, 0); 1303 if (r == local_port) { 1304 fprintf(stderr, "lport parsing failed.\n"); 1305 exit(1); 1306 } 1307 1308 rport = strtol(remote_port, (char **)&r, 0); 1309 if (r == remote_port) { 1310 fprintf(stderr, "rport parsing failed.\n"); 1311 exit(1); 1312 } 1313 1314 struct sockaddr_in si_local = { 1315 .sin_family = AF_INET, 1316 .sin_port = htons(lport), 1317 }; 1318 1319 vubr_set_host(&si_local, local_host); 1320 1321 /* setup destination for sends */ 1322 dev->backend_udp_dest = (struct sockaddr_in) { 1323 .sin_family = AF_INET, 1324 .sin_port = htons(rport), 1325 }; 1326 vubr_set_host(&dev->backend_udp_dest, remote_host); 1327 1328 sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); 1329 if (sock == -1) { 1330 vubr_die("socket"); 1331 } 1332 1333 if (bind(sock, (struct sockaddr *)&si_local, sizeof(si_local)) == -1) { 1334 vubr_die("bind"); 1335 } 1336 1337 dev->backend_udp_sock = sock; 1338 dispatcher_add(&dev->dispatcher, sock, dev, vubr_backend_recv_cb); 1339 DPRINT("Waiting for data from udp backend on %s:%d...\n", 1340 local_host, lport); 1341 } 1342 1343 static void 1344 vubr_run(VubrDev *dev) 1345 { 1346 while (1) { 1347 /* timeout 200ms */ 1348 dispatcher_wait(&dev->dispatcher, 200000); 1349 /* Here one can try polling strategy. */ 1350 } 1351 } 1352 1353 static int 1354 vubr_parse_host_port(const char **host, const char **port, const char *buf) 1355 { 1356 char *p = strchr(buf, ':'); 1357 1358 if (!p) { 1359 return -1; 1360 } 1361 *p = '\0'; 1362 *host = strdup(buf); 1363 *port = strdup(p + 1); 1364 return 0; 1365 } 1366 1367 #define DEFAULT_UD_SOCKET "/tmp/vubr.sock" 1368 #define DEFAULT_LHOST "127.0.0.1" 1369 #define DEFAULT_LPORT "4444" 1370 #define DEFAULT_RHOST "127.0.0.1" 1371 #define DEFAULT_RPORT "5555" 1372 1373 static const char *ud_socket_path = DEFAULT_UD_SOCKET; 1374 static const char *lhost = DEFAULT_LHOST; 1375 static const char *lport = DEFAULT_LPORT; 1376 static const char *rhost = DEFAULT_RHOST; 1377 static const char *rport = DEFAULT_RPORT; 1378 1379 int 1380 main(int argc, char *argv[]) 1381 { 1382 VubrDev *dev; 1383 int opt; 1384 bool client = false; 1385 1386 while ((opt = getopt(argc, argv, "l:r:u:c")) != -1) { 1387 1388 switch (opt) { 1389 case 'l': 1390 if (vubr_parse_host_port(&lhost, &lport, optarg) < 0) { 1391 goto out; 1392 } 1393 break; 1394 case 'r': 1395 if (vubr_parse_host_port(&rhost, &rport, optarg) < 0) { 1396 goto out; 1397 } 1398 break; 1399 case 'u': 1400 ud_socket_path = strdup(optarg); 1401 break; 1402 case 'c': 1403 client = true; 1404 break; 1405 default: 1406 goto out; 1407 } 1408 } 1409 1410 DPRINT("ud socket: %s (%s)\n", ud_socket_path, 1411 client ? "client" : "server"); 1412 DPRINT("local: %s:%s\n", lhost, lport); 1413 DPRINT("remote: %s:%s\n", rhost, rport); 1414 1415 dev = vubr_new(ud_socket_path, client); 1416 if (!dev) { 1417 return 1; 1418 } 1419 1420 vubr_backend_udp_setup(dev, lhost, lport, rhost, rport); 1421 vubr_run(dev); 1422 return 0; 1423 1424 out: 1425 fprintf(stderr, "Usage: %s ", argv[0]); 1426 fprintf(stderr, "[-c] [-u ud_socket_path] [-l lhost:lport] [-r rhost:rport]\n"); 1427 fprintf(stderr, "\t-u path to unix doman socket. default: %s\n", 1428 DEFAULT_UD_SOCKET); 1429 fprintf(stderr, "\t-l local host and port. default: %s:%s\n", 1430 DEFAULT_LHOST, DEFAULT_LPORT); 1431 fprintf(stderr, "\t-r remote host and port. default: %s:%s\n", 1432 DEFAULT_RHOST, DEFAULT_RPORT); 1433 fprintf(stderr, "\t-c client mode\n"); 1434 1435 return 1; 1436 } 1437