1 /* 2 * Vhost User Bridge 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * Authors: 7 * Victor Kaplansky <victork@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or 10 * later. See the COPYING file in the top-level directory. 11 */ 12 13 /* 14 * TODO: 15 * - main should get parameters from the command line. 16 * - implement all request handlers. Still not implemented: 17 * vubr_get_queue_num_exec() 18 * vubr_send_rarp_exec() 19 * - test for broken requests and virtqueue. 20 * - implement features defined by Virtio 1.0 spec. 21 * - support mergeable buffers and indirect descriptors. 22 * - implement clean shutdown. 23 * - implement non-blocking writes to UDP backend. 24 * - implement polling strategy. 25 * - implement clean starting/stopping of vq processing 26 * - implement clean starting/stopping of used and buffers 27 * dirty page logging. 28 */ 29 30 #define _FILE_OFFSET_BITS 64 31 32 #include "qemu/osdep.h" 33 #include <sys/socket.h> 34 #include <sys/un.h> 35 #include <sys/unistd.h> 36 #include <sys/eventfd.h> 37 #include <arpa/inet.h> 38 #include <netdb.h> 39 #include <qemu/osdep.h> 40 41 #include <linux/vhost.h> 42 43 #include "qemu/atomic.h" 44 #include "standard-headers/linux/virtio_net.h" 45 #include "standard-headers/linux/virtio_ring.h" 46 47 #define VHOST_USER_BRIDGE_DEBUG 1 48 49 #define DPRINT(...) \ 50 do { \ 51 if (VHOST_USER_BRIDGE_DEBUG) { \ 52 printf(__VA_ARGS__); \ 53 } \ 54 } while (0) 55 56 typedef void (*CallbackFunc)(int sock, void *ctx); 57 58 typedef struct Event { 59 void *ctx; 60 CallbackFunc callback; 61 } Event; 62 63 typedef struct Dispatcher { 64 int max_sock; 65 fd_set fdset; 66 Event events[FD_SETSIZE]; 67 } Dispatcher; 68 69 static void 70 vubr_die(const char *s) 71 { 72 perror(s); 73 exit(1); 74 } 75 76 static int 77 dispatcher_init(Dispatcher *dispr) 78 { 79 FD_ZERO(&dispr->fdset); 80 dispr->max_sock = -1; 81 return 0; 82 } 83 84 static int 85 dispatcher_add(Dispatcher *dispr, int sock, void *ctx, CallbackFunc cb) 86 { 87 if (sock >= FD_SETSIZE) { 88 fprintf(stderr, 89 "Error: Failed to add new event. sock %d should be less than %d\n", 90 sock, FD_SETSIZE); 91 return -1; 92 } 93 94 dispr->events[sock].ctx = ctx; 95 dispr->events[sock].callback = cb; 96 97 FD_SET(sock, &dispr->fdset); 98 if (sock > dispr->max_sock) { 99 dispr->max_sock = sock; 100 } 101 DPRINT("Added sock %d for watching. max_sock: %d\n", 102 sock, dispr->max_sock); 103 return 0; 104 } 105 106 /* dispatcher_remove() is not currently in use but may be useful 107 * in the future. */ 108 static int 109 dispatcher_remove(Dispatcher *dispr, int sock) 110 { 111 if (sock >= FD_SETSIZE) { 112 fprintf(stderr, 113 "Error: Failed to remove event. sock %d should be less than %d\n", 114 sock, FD_SETSIZE); 115 return -1; 116 } 117 118 FD_CLR(sock, &dispr->fdset); 119 DPRINT("Sock %d removed from dispatcher watch.\n", sock); 120 return 0; 121 } 122 123 /* timeout in us */ 124 static int 125 dispatcher_wait(Dispatcher *dispr, uint32_t timeout) 126 { 127 struct timeval tv; 128 tv.tv_sec = timeout / 1000000; 129 tv.tv_usec = timeout % 1000000; 130 131 fd_set fdset = dispr->fdset; 132 133 /* wait until some of sockets become readable. */ 134 int rc = select(dispr->max_sock + 1, &fdset, 0, 0, &tv); 135 136 if (rc == -1) { 137 vubr_die("select"); 138 } 139 140 /* Timeout */ 141 if (rc == 0) { 142 return 0; 143 } 144 145 /* Now call callback for every ready socket. */ 146 147 int sock; 148 for (sock = 0; sock < dispr->max_sock + 1; sock++) { 149 /* The callback on a socket can remove other sockets from the 150 * dispatcher, thus we have to check that the socket is 151 * still not removed from dispatcher's list 152 */ 153 if (FD_ISSET(sock, &fdset) && FD_ISSET(sock, &dispr->fdset)) { 154 Event *e = &dispr->events[sock]; 155 e->callback(sock, e->ctx); 156 } 157 } 158 159 return 0; 160 } 161 162 typedef struct VubrVirtq { 163 int call_fd; 164 int kick_fd; 165 uint32_t size; 166 uint16_t last_avail_index; 167 uint16_t last_used_index; 168 struct vring_desc *desc; 169 struct vring_avail *avail; 170 struct vring_used *used; 171 uint64_t log_guest_addr; 172 int enable; 173 } VubrVirtq; 174 175 /* Based on qemu/hw/virtio/vhost-user.c */ 176 177 #define VHOST_MEMORY_MAX_NREGIONS 8 178 #define VHOST_USER_F_PROTOCOL_FEATURES 30 179 /* v1.0 compliant. */ 180 #define VIRTIO_F_VERSION_1 32 181 182 #define VHOST_LOG_PAGE 4096 183 184 enum VhostUserProtocolFeature { 185 VHOST_USER_PROTOCOL_F_MQ = 0, 186 VHOST_USER_PROTOCOL_F_LOG_SHMFD = 1, 187 VHOST_USER_PROTOCOL_F_RARP = 2, 188 189 VHOST_USER_PROTOCOL_F_MAX 190 }; 191 192 #define VHOST_USER_PROTOCOL_FEATURE_MASK ((1 << VHOST_USER_PROTOCOL_F_MAX) - 1) 193 194 typedef enum VhostUserRequest { 195 VHOST_USER_NONE = 0, 196 VHOST_USER_GET_FEATURES = 1, 197 VHOST_USER_SET_FEATURES = 2, 198 VHOST_USER_SET_OWNER = 3, 199 VHOST_USER_RESET_OWNER = 4, 200 VHOST_USER_SET_MEM_TABLE = 5, 201 VHOST_USER_SET_LOG_BASE = 6, 202 VHOST_USER_SET_LOG_FD = 7, 203 VHOST_USER_SET_VRING_NUM = 8, 204 VHOST_USER_SET_VRING_ADDR = 9, 205 VHOST_USER_SET_VRING_BASE = 10, 206 VHOST_USER_GET_VRING_BASE = 11, 207 VHOST_USER_SET_VRING_KICK = 12, 208 VHOST_USER_SET_VRING_CALL = 13, 209 VHOST_USER_SET_VRING_ERR = 14, 210 VHOST_USER_GET_PROTOCOL_FEATURES = 15, 211 VHOST_USER_SET_PROTOCOL_FEATURES = 16, 212 VHOST_USER_GET_QUEUE_NUM = 17, 213 VHOST_USER_SET_VRING_ENABLE = 18, 214 VHOST_USER_SEND_RARP = 19, 215 VHOST_USER_MAX 216 } VhostUserRequest; 217 218 typedef struct VhostUserMemoryRegion { 219 uint64_t guest_phys_addr; 220 uint64_t memory_size; 221 uint64_t userspace_addr; 222 uint64_t mmap_offset; 223 } VhostUserMemoryRegion; 224 225 typedef struct VhostUserMemory { 226 uint32_t nregions; 227 uint32_t padding; 228 VhostUserMemoryRegion regions[VHOST_MEMORY_MAX_NREGIONS]; 229 } VhostUserMemory; 230 231 typedef struct VhostUserLog { 232 uint64_t mmap_size; 233 uint64_t mmap_offset; 234 } VhostUserLog; 235 236 typedef struct VhostUserMsg { 237 VhostUserRequest request; 238 239 #define VHOST_USER_VERSION_MASK (0x3) 240 #define VHOST_USER_REPLY_MASK (0x1<<2) 241 uint32_t flags; 242 uint32_t size; /* the following payload size */ 243 union { 244 #define VHOST_USER_VRING_IDX_MASK (0xff) 245 #define VHOST_USER_VRING_NOFD_MASK (0x1<<8) 246 uint64_t u64; 247 struct vhost_vring_state state; 248 struct vhost_vring_addr addr; 249 VhostUserMemory memory; 250 VhostUserLog log; 251 } payload; 252 int fds[VHOST_MEMORY_MAX_NREGIONS]; 253 int fd_num; 254 } QEMU_PACKED VhostUserMsg; 255 256 #define VHOST_USER_HDR_SIZE offsetof(VhostUserMsg, payload.u64) 257 258 /* The version of the protocol we support */ 259 #define VHOST_USER_VERSION (0x1) 260 261 #define MAX_NR_VIRTQUEUE (8) 262 263 typedef struct VubrDevRegion { 264 /* Guest Physical address. */ 265 uint64_t gpa; 266 /* Memory region size. */ 267 uint64_t size; 268 /* QEMU virtual address (userspace). */ 269 uint64_t qva; 270 /* Starting offset in our mmaped space. */ 271 uint64_t mmap_offset; 272 /* Start address of mmaped space. */ 273 uint64_t mmap_addr; 274 } VubrDevRegion; 275 276 typedef struct VubrDev { 277 int sock; 278 Dispatcher dispatcher; 279 uint32_t nregions; 280 VubrDevRegion regions[VHOST_MEMORY_MAX_NREGIONS]; 281 VubrVirtq vq[MAX_NR_VIRTQUEUE]; 282 int log_call_fd; 283 uint64_t log_size; 284 uint8_t *log_table; 285 int backend_udp_sock; 286 struct sockaddr_in backend_udp_dest; 287 int ready; 288 uint64_t features; 289 int hdrlen; 290 } VubrDev; 291 292 static const char *vubr_request_str[] = { 293 [VHOST_USER_NONE] = "VHOST_USER_NONE", 294 [VHOST_USER_GET_FEATURES] = "VHOST_USER_GET_FEATURES", 295 [VHOST_USER_SET_FEATURES] = "VHOST_USER_SET_FEATURES", 296 [VHOST_USER_SET_OWNER] = "VHOST_USER_SET_OWNER", 297 [VHOST_USER_RESET_OWNER] = "VHOST_USER_RESET_OWNER", 298 [VHOST_USER_SET_MEM_TABLE] = "VHOST_USER_SET_MEM_TABLE", 299 [VHOST_USER_SET_LOG_BASE] = "VHOST_USER_SET_LOG_BASE", 300 [VHOST_USER_SET_LOG_FD] = "VHOST_USER_SET_LOG_FD", 301 [VHOST_USER_SET_VRING_NUM] = "VHOST_USER_SET_VRING_NUM", 302 [VHOST_USER_SET_VRING_ADDR] = "VHOST_USER_SET_VRING_ADDR", 303 [VHOST_USER_SET_VRING_BASE] = "VHOST_USER_SET_VRING_BASE", 304 [VHOST_USER_GET_VRING_BASE] = "VHOST_USER_GET_VRING_BASE", 305 [VHOST_USER_SET_VRING_KICK] = "VHOST_USER_SET_VRING_KICK", 306 [VHOST_USER_SET_VRING_CALL] = "VHOST_USER_SET_VRING_CALL", 307 [VHOST_USER_SET_VRING_ERR] = "VHOST_USER_SET_VRING_ERR", 308 [VHOST_USER_GET_PROTOCOL_FEATURES] = "VHOST_USER_GET_PROTOCOL_FEATURES", 309 [VHOST_USER_SET_PROTOCOL_FEATURES] = "VHOST_USER_SET_PROTOCOL_FEATURES", 310 [VHOST_USER_GET_QUEUE_NUM] = "VHOST_USER_GET_QUEUE_NUM", 311 [VHOST_USER_SET_VRING_ENABLE] = "VHOST_USER_SET_VRING_ENABLE", 312 [VHOST_USER_SEND_RARP] = "VHOST_USER_SEND_RARP", 313 [VHOST_USER_MAX] = "VHOST_USER_MAX", 314 }; 315 316 static void 317 print_buffer(uint8_t *buf, size_t len) 318 { 319 int i; 320 printf("Raw buffer:\n"); 321 for (i = 0; i < len; i++) { 322 if (i % 16 == 0) { 323 printf("\n"); 324 } 325 if (i % 4 == 0) { 326 printf(" "); 327 } 328 printf("%02x ", buf[i]); 329 } 330 printf("\n............................................................\n"); 331 } 332 333 /* Translate guest physical address to our virtual address. */ 334 static uint64_t 335 gpa_to_va(VubrDev *dev, uint64_t guest_addr) 336 { 337 int i; 338 339 /* Find matching memory region. */ 340 for (i = 0; i < dev->nregions; i++) { 341 VubrDevRegion *r = &dev->regions[i]; 342 343 if ((guest_addr >= r->gpa) && (guest_addr < (r->gpa + r->size))) { 344 return guest_addr - r->gpa + r->mmap_addr + r->mmap_offset; 345 } 346 } 347 348 assert(!"address not found in regions"); 349 return 0; 350 } 351 352 /* Translate qemu virtual address to our virtual address. */ 353 static uint64_t 354 qva_to_va(VubrDev *dev, uint64_t qemu_addr) 355 { 356 int i; 357 358 /* Find matching memory region. */ 359 for (i = 0; i < dev->nregions; i++) { 360 VubrDevRegion *r = &dev->regions[i]; 361 362 if ((qemu_addr >= r->qva) && (qemu_addr < (r->qva + r->size))) { 363 return qemu_addr - r->qva + r->mmap_addr + r->mmap_offset; 364 } 365 } 366 367 assert(!"address not found in regions"); 368 return 0; 369 } 370 371 static void 372 vubr_message_read(int conn_fd, VhostUserMsg *vmsg) 373 { 374 char control[CMSG_SPACE(VHOST_MEMORY_MAX_NREGIONS * sizeof(int))] = { }; 375 struct iovec iov = { 376 .iov_base = (char *)vmsg, 377 .iov_len = VHOST_USER_HDR_SIZE, 378 }; 379 struct msghdr msg = { 380 .msg_iov = &iov, 381 .msg_iovlen = 1, 382 .msg_control = control, 383 .msg_controllen = sizeof(control), 384 }; 385 size_t fd_size; 386 struct cmsghdr *cmsg; 387 int rc; 388 389 rc = recvmsg(conn_fd, &msg, 0); 390 391 if (rc == 0) { 392 vubr_die("recvmsg"); 393 fprintf(stderr, "Peer disconnected.\n"); 394 exit(1); 395 } 396 if (rc < 0) { 397 vubr_die("recvmsg"); 398 } 399 400 vmsg->fd_num = 0; 401 for (cmsg = CMSG_FIRSTHDR(&msg); 402 cmsg != NULL; 403 cmsg = CMSG_NXTHDR(&msg, cmsg)) 404 { 405 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { 406 fd_size = cmsg->cmsg_len - CMSG_LEN(0); 407 vmsg->fd_num = fd_size / sizeof(int); 408 memcpy(vmsg->fds, CMSG_DATA(cmsg), fd_size); 409 break; 410 } 411 } 412 413 if (vmsg->size > sizeof(vmsg->payload)) { 414 fprintf(stderr, 415 "Error: too big message request: %d, size: vmsg->size: %u, " 416 "while sizeof(vmsg->payload) = %zu\n", 417 vmsg->request, vmsg->size, sizeof(vmsg->payload)); 418 exit(1); 419 } 420 421 if (vmsg->size) { 422 rc = read(conn_fd, &vmsg->payload, vmsg->size); 423 if (rc == 0) { 424 vubr_die("recvmsg"); 425 fprintf(stderr, "Peer disconnected.\n"); 426 exit(1); 427 } 428 if (rc < 0) { 429 vubr_die("recvmsg"); 430 } 431 432 assert(rc == vmsg->size); 433 } 434 } 435 436 static void 437 vubr_message_write(int conn_fd, VhostUserMsg *vmsg) 438 { 439 int rc; 440 441 do { 442 rc = write(conn_fd, vmsg, VHOST_USER_HDR_SIZE + vmsg->size); 443 } while (rc < 0 && errno == EINTR); 444 445 if (rc < 0) { 446 vubr_die("write"); 447 } 448 } 449 450 static void 451 vubr_backend_udp_sendbuf(VubrDev *dev, uint8_t *buf, size_t len) 452 { 453 int slen = sizeof(struct sockaddr_in); 454 455 if (sendto(dev->backend_udp_sock, buf, len, 0, 456 (struct sockaddr *) &dev->backend_udp_dest, slen) == -1) { 457 vubr_die("sendto()"); 458 } 459 } 460 461 static int 462 vubr_backend_udp_recvbuf(VubrDev *dev, uint8_t *buf, size_t buflen) 463 { 464 int slen = sizeof(struct sockaddr_in); 465 int rc; 466 467 rc = recvfrom(dev->backend_udp_sock, buf, buflen, 0, 468 (struct sockaddr *) &dev->backend_udp_dest, 469 (socklen_t *)&slen); 470 if (rc == -1) { 471 vubr_die("recvfrom()"); 472 } 473 474 return rc; 475 } 476 477 static void 478 vubr_consume_raw_packet(VubrDev *dev, uint8_t *buf, uint32_t len) 479 { 480 int hdrlen = dev->hdrlen; 481 DPRINT(" hdrlen = %d\n", dev->hdrlen); 482 483 if (VHOST_USER_BRIDGE_DEBUG) { 484 print_buffer(buf, len); 485 } 486 vubr_backend_udp_sendbuf(dev, buf + hdrlen, len - hdrlen); 487 } 488 489 /* Kick the log_call_fd if required. */ 490 static void 491 vubr_log_kick(VubrDev *dev) 492 { 493 if (dev->log_call_fd != -1) { 494 DPRINT("Kicking the QEMU's log...\n"); 495 eventfd_write(dev->log_call_fd, 1); 496 } 497 } 498 499 /* Kick the guest if necessary. */ 500 static void 501 vubr_virtqueue_kick(VubrVirtq *vq) 502 { 503 if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) { 504 DPRINT("Kicking the guest...\n"); 505 eventfd_write(vq->call_fd, 1); 506 } 507 } 508 509 static void 510 vubr_log_page(uint8_t *log_table, uint64_t page) 511 { 512 DPRINT("Logged dirty guest page: %"PRId64"\n", page); 513 atomic_or(&log_table[page / 8], 1 << (page % 8)); 514 } 515 516 static void 517 vubr_log_write(VubrDev *dev, uint64_t address, uint64_t length) 518 { 519 uint64_t page; 520 521 if (!(dev->features & (1ULL << VHOST_F_LOG_ALL)) || 522 !dev->log_table || !length) { 523 return; 524 } 525 526 assert(dev->log_size > ((address + length - 1) / VHOST_LOG_PAGE / 8)); 527 528 page = address / VHOST_LOG_PAGE; 529 while (page * VHOST_LOG_PAGE < address + length) { 530 vubr_log_page(dev->log_table, page); 531 page += VHOST_LOG_PAGE; 532 } 533 vubr_log_kick(dev); 534 } 535 536 static void 537 vubr_post_buffer(VubrDev *dev, VubrVirtq *vq, uint8_t *buf, int32_t len) 538 { 539 struct vring_desc *desc = vq->desc; 540 struct vring_avail *avail = vq->avail; 541 struct vring_used *used = vq->used; 542 uint64_t log_guest_addr = vq->log_guest_addr; 543 int32_t remaining_len = len; 544 545 unsigned int size = vq->size; 546 547 uint16_t avail_index = atomic_mb_read(&avail->idx); 548 549 /* We check the available descriptors before posting the 550 * buffer, so here we assume that enough available 551 * descriptors. */ 552 assert(vq->last_avail_index != avail_index); 553 uint16_t a_index = vq->last_avail_index % size; 554 uint16_t u_index = vq->last_used_index % size; 555 uint16_t d_index = avail->ring[a_index]; 556 557 int i = d_index; 558 uint32_t written_len = 0; 559 560 do { 561 DPRINT("Post packet to guest on vq:\n"); 562 DPRINT(" size = %d\n", vq->size); 563 DPRINT(" last_avail_index = %d\n", vq->last_avail_index); 564 DPRINT(" last_used_index = %d\n", vq->last_used_index); 565 DPRINT(" a_index = %d\n", a_index); 566 DPRINT(" u_index = %d\n", u_index); 567 DPRINT(" d_index = %d\n", d_index); 568 DPRINT(" desc[%d].addr = 0x%016"PRIx64"\n", i, desc[i].addr); 569 DPRINT(" desc[%d].len = %d\n", i, desc[i].len); 570 DPRINT(" desc[%d].flags = %d\n", i, desc[i].flags); 571 DPRINT(" avail->idx = %d\n", avail_index); 572 DPRINT(" used->idx = %d\n", used->idx); 573 574 if (!(desc[i].flags & VRING_DESC_F_WRITE)) { 575 /* FIXME: we should find writable descriptor. */ 576 fprintf(stderr, "Error: descriptor is not writable. Exiting.\n"); 577 exit(1); 578 } 579 580 void *chunk_start = (void *)(uintptr_t)gpa_to_va(dev, desc[i].addr); 581 uint32_t chunk_len = desc[i].len; 582 uint32_t chunk_write_len = MIN(remaining_len, chunk_len); 583 584 memcpy(chunk_start, buf + written_len, chunk_write_len); 585 vubr_log_write(dev, desc[i].addr, chunk_write_len); 586 remaining_len -= chunk_write_len; 587 written_len += chunk_write_len; 588 589 if ((remaining_len == 0) || !(desc[i].flags & VRING_DESC_F_NEXT)) { 590 break; 591 } 592 593 i = desc[i].next; 594 } while (1); 595 596 if (remaining_len > 0) { 597 fprintf(stderr, 598 "Too long packet for RX, remaining_len = %d, Dropping...\n", 599 remaining_len); 600 return; 601 } 602 603 /* Add descriptor to the used ring. */ 604 used->ring[u_index].id = d_index; 605 used->ring[u_index].len = len; 606 vubr_log_write(dev, 607 log_guest_addr + offsetof(struct vring_used, ring[u_index]), 608 sizeof(used->ring[u_index])); 609 610 vq->last_avail_index++; 611 vq->last_used_index++; 612 613 atomic_mb_set(&used->idx, vq->last_used_index); 614 vubr_log_write(dev, 615 log_guest_addr + offsetof(struct vring_used, idx), 616 sizeof(used->idx)); 617 618 /* Kick the guest if necessary. */ 619 vubr_virtqueue_kick(vq); 620 } 621 622 static int 623 vubr_process_desc(VubrDev *dev, VubrVirtq *vq) 624 { 625 struct vring_desc *desc = vq->desc; 626 struct vring_avail *avail = vq->avail; 627 struct vring_used *used = vq->used; 628 uint64_t log_guest_addr = vq->log_guest_addr; 629 630 unsigned int size = vq->size; 631 632 uint16_t a_index = vq->last_avail_index % size; 633 uint16_t u_index = vq->last_used_index % size; 634 uint16_t d_index = avail->ring[a_index]; 635 636 uint32_t i, len = 0; 637 size_t buf_size = 4096; 638 uint8_t buf[4096]; 639 640 DPRINT("Chunks: "); 641 i = d_index; 642 do { 643 void *chunk_start = (void *)(uintptr_t)gpa_to_va(dev, desc[i].addr); 644 uint32_t chunk_len = desc[i].len; 645 646 assert(!(desc[i].flags & VRING_DESC_F_WRITE)); 647 648 if (len + chunk_len < buf_size) { 649 memcpy(buf + len, chunk_start, chunk_len); 650 DPRINT("%d ", chunk_len); 651 } else { 652 fprintf(stderr, "Error: too long packet. Dropping...\n"); 653 break; 654 } 655 656 len += chunk_len; 657 658 if (!(desc[i].flags & VRING_DESC_F_NEXT)) { 659 break; 660 } 661 662 i = desc[i].next; 663 } while (1); 664 DPRINT("\n"); 665 666 if (!len) { 667 return -1; 668 } 669 670 /* Add descriptor to the used ring. */ 671 used->ring[u_index].id = d_index; 672 used->ring[u_index].len = len; 673 vubr_log_write(dev, 674 log_guest_addr + offsetof(struct vring_used, ring[u_index]), 675 sizeof(used->ring[u_index])); 676 677 vubr_consume_raw_packet(dev, buf, len); 678 679 return 0; 680 } 681 682 static void 683 vubr_process_avail(VubrDev *dev, VubrVirtq *vq) 684 { 685 struct vring_avail *avail = vq->avail; 686 struct vring_used *used = vq->used; 687 uint64_t log_guest_addr = vq->log_guest_addr; 688 689 while (vq->last_avail_index != atomic_mb_read(&avail->idx)) { 690 vubr_process_desc(dev, vq); 691 vq->last_avail_index++; 692 vq->last_used_index++; 693 } 694 695 atomic_mb_set(&used->idx, vq->last_used_index); 696 vubr_log_write(dev, 697 log_guest_addr + offsetof(struct vring_used, idx), 698 sizeof(used->idx)); 699 } 700 701 static void 702 vubr_backend_recv_cb(int sock, void *ctx) 703 { 704 VubrDev *dev = (VubrDev *) ctx; 705 VubrVirtq *rx_vq = &dev->vq[0]; 706 uint8_t buf[4096]; 707 struct virtio_net_hdr_v1 *hdr = (struct virtio_net_hdr_v1 *)buf; 708 int hdrlen = dev->hdrlen; 709 int buflen = sizeof(buf); 710 int len; 711 712 if (!dev->ready) { 713 return; 714 } 715 716 DPRINT("\n\n *** IN UDP RECEIVE CALLBACK ***\n\n"); 717 DPRINT(" hdrlen = %d\n", hdrlen); 718 719 uint16_t avail_index = atomic_mb_read(&rx_vq->avail->idx); 720 721 /* If there is no available descriptors, just do nothing. 722 * The buffer will be handled by next arrived UDP packet, 723 * or next kick on receive virtq. */ 724 if (rx_vq->last_avail_index == avail_index) { 725 DPRINT("Got UDP packet, but no available descriptors on RX virtq.\n"); 726 return; 727 } 728 729 memset(buf, 0, hdrlen); 730 /* TODO: support mergeable buffers. */ 731 if (hdrlen == 12) 732 hdr->num_buffers = 1; 733 len = vubr_backend_udp_recvbuf(dev, buf + hdrlen, buflen - hdrlen); 734 735 vubr_post_buffer(dev, rx_vq, buf, len + hdrlen); 736 } 737 738 static void 739 vubr_kick_cb(int sock, void *ctx) 740 { 741 VubrDev *dev = (VubrDev *) ctx; 742 eventfd_t kick_data; 743 ssize_t rc; 744 745 rc = eventfd_read(sock, &kick_data); 746 if (rc == -1) { 747 vubr_die("eventfd_read()"); 748 } else { 749 DPRINT("Got kick_data: %016"PRIx64"\n", kick_data); 750 vubr_process_avail(dev, &dev->vq[1]); 751 } 752 } 753 754 static int 755 vubr_none_exec(VubrDev *dev, VhostUserMsg *vmsg) 756 { 757 DPRINT("Function %s() not implemented yet.\n", __func__); 758 return 0; 759 } 760 761 static int 762 vubr_get_features_exec(VubrDev *dev, VhostUserMsg *vmsg) 763 { 764 vmsg->payload.u64 = 765 ((1ULL << VIRTIO_NET_F_MRG_RXBUF) | 766 (1ULL << VHOST_F_LOG_ALL) | 767 (1ULL << VIRTIO_NET_F_GUEST_ANNOUNCE) | 768 (1ULL << VHOST_USER_F_PROTOCOL_FEATURES)); 769 770 vmsg->size = sizeof(vmsg->payload.u64); 771 772 DPRINT("Sending back to guest u64: 0x%016"PRIx64"\n", vmsg->payload.u64); 773 774 /* Reply */ 775 return 1; 776 } 777 778 static int 779 vubr_set_features_exec(VubrDev *dev, VhostUserMsg *vmsg) 780 { 781 DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); 782 783 dev->features = vmsg->payload.u64; 784 if ((dev->features & (1ULL << VIRTIO_F_VERSION_1)) || 785 (dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF))) { 786 dev->hdrlen = 12; 787 } else { 788 dev->hdrlen = 10; 789 } 790 791 return 0; 792 } 793 794 static int 795 vubr_set_owner_exec(VubrDev *dev, VhostUserMsg *vmsg) 796 { 797 return 0; 798 } 799 800 static void 801 vubr_close_log(VubrDev *dev) 802 { 803 if (dev->log_table) { 804 if (munmap(dev->log_table, dev->log_size) != 0) { 805 vubr_die("munmap()"); 806 } 807 808 dev->log_table = 0; 809 } 810 if (dev->log_call_fd != -1) { 811 close(dev->log_call_fd); 812 dev->log_call_fd = -1; 813 } 814 } 815 816 static int 817 vubr_reset_device_exec(VubrDev *dev, VhostUserMsg *vmsg) 818 { 819 vubr_close_log(dev); 820 dev->ready = 0; 821 dev->features = 0; 822 return 0; 823 } 824 825 static int 826 vubr_set_mem_table_exec(VubrDev *dev, VhostUserMsg *vmsg) 827 { 828 int i; 829 VhostUserMemory *memory = &vmsg->payload.memory; 830 dev->nregions = memory->nregions; 831 832 DPRINT("Nregions: %d\n", memory->nregions); 833 for (i = 0; i < dev->nregions; i++) { 834 void *mmap_addr; 835 VhostUserMemoryRegion *msg_region = &memory->regions[i]; 836 VubrDevRegion *dev_region = &dev->regions[i]; 837 838 DPRINT("Region %d\n", i); 839 DPRINT(" guest_phys_addr: 0x%016"PRIx64"\n", 840 msg_region->guest_phys_addr); 841 DPRINT(" memory_size: 0x%016"PRIx64"\n", 842 msg_region->memory_size); 843 DPRINT(" userspace_addr 0x%016"PRIx64"\n", 844 msg_region->userspace_addr); 845 DPRINT(" mmap_offset 0x%016"PRIx64"\n", 846 msg_region->mmap_offset); 847 848 dev_region->gpa = msg_region->guest_phys_addr; 849 dev_region->size = msg_region->memory_size; 850 dev_region->qva = msg_region->userspace_addr; 851 dev_region->mmap_offset = msg_region->mmap_offset; 852 853 /* We don't use offset argument of mmap() since the 854 * mapped address has to be page aligned, and we use huge 855 * pages. */ 856 mmap_addr = mmap(0, dev_region->size + dev_region->mmap_offset, 857 PROT_READ | PROT_WRITE, MAP_SHARED, 858 vmsg->fds[i], 0); 859 860 if (mmap_addr == MAP_FAILED) { 861 vubr_die("mmap"); 862 } 863 dev_region->mmap_addr = (uint64_t)(uintptr_t)mmap_addr; 864 DPRINT(" mmap_addr: 0x%016"PRIx64"\n", dev_region->mmap_addr); 865 866 close(vmsg->fds[i]); 867 } 868 869 return 0; 870 } 871 872 static int 873 vubr_set_log_base_exec(VubrDev *dev, VhostUserMsg *vmsg) 874 { 875 int fd; 876 uint64_t log_mmap_size, log_mmap_offset; 877 void *rc; 878 879 assert(vmsg->fd_num == 1); 880 fd = vmsg->fds[0]; 881 882 assert(vmsg->size == sizeof(vmsg->payload.log)); 883 log_mmap_offset = vmsg->payload.log.mmap_offset; 884 log_mmap_size = vmsg->payload.log.mmap_size; 885 DPRINT("Log mmap_offset: %"PRId64"\n", log_mmap_offset); 886 DPRINT("Log mmap_size: %"PRId64"\n", log_mmap_size); 887 888 rc = mmap(0, log_mmap_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 889 log_mmap_offset); 890 if (rc == MAP_FAILED) { 891 vubr_die("mmap"); 892 } 893 dev->log_table = rc; 894 dev->log_size = log_mmap_size; 895 896 vmsg->size = sizeof(vmsg->payload.u64); 897 /* Reply */ 898 return 1; 899 } 900 901 static int 902 vubr_set_log_fd_exec(VubrDev *dev, VhostUserMsg *vmsg) 903 { 904 assert(vmsg->fd_num == 1); 905 dev->log_call_fd = vmsg->fds[0]; 906 DPRINT("Got log_call_fd: %d\n", vmsg->fds[0]); 907 return 0; 908 } 909 910 static int 911 vubr_set_vring_num_exec(VubrDev *dev, VhostUserMsg *vmsg) 912 { 913 unsigned int index = vmsg->payload.state.index; 914 unsigned int num = vmsg->payload.state.num; 915 916 DPRINT("State.index: %d\n", index); 917 DPRINT("State.num: %d\n", num); 918 dev->vq[index].size = num; 919 return 0; 920 } 921 922 static int 923 vubr_set_vring_addr_exec(VubrDev *dev, VhostUserMsg *vmsg) 924 { 925 struct vhost_vring_addr *vra = &vmsg->payload.addr; 926 unsigned int index = vra->index; 927 VubrVirtq *vq = &dev->vq[index]; 928 929 DPRINT("vhost_vring_addr:\n"); 930 DPRINT(" index: %d\n", vra->index); 931 DPRINT(" flags: %d\n", vra->flags); 932 DPRINT(" desc_user_addr: 0x%016llx\n", vra->desc_user_addr); 933 DPRINT(" used_user_addr: 0x%016llx\n", vra->used_user_addr); 934 DPRINT(" avail_user_addr: 0x%016llx\n", vra->avail_user_addr); 935 DPRINT(" log_guest_addr: 0x%016llx\n", vra->log_guest_addr); 936 937 vq->desc = (struct vring_desc *)(uintptr_t)qva_to_va(dev, vra->desc_user_addr); 938 vq->used = (struct vring_used *)(uintptr_t)qva_to_va(dev, vra->used_user_addr); 939 vq->avail = (struct vring_avail *)(uintptr_t)qva_to_va(dev, vra->avail_user_addr); 940 vq->log_guest_addr = vra->log_guest_addr; 941 942 DPRINT("Setting virtq addresses:\n"); 943 DPRINT(" vring_desc at %p\n", vq->desc); 944 DPRINT(" vring_used at %p\n", vq->used); 945 DPRINT(" vring_avail at %p\n", vq->avail); 946 947 vq->last_used_index = vq->used->idx; 948 return 0; 949 } 950 951 static int 952 vubr_set_vring_base_exec(VubrDev *dev, VhostUserMsg *vmsg) 953 { 954 unsigned int index = vmsg->payload.state.index; 955 unsigned int num = vmsg->payload.state.num; 956 957 DPRINT("State.index: %d\n", index); 958 DPRINT("State.num: %d\n", num); 959 dev->vq[index].last_avail_index = num; 960 961 return 0; 962 } 963 964 static int 965 vubr_get_vring_base_exec(VubrDev *dev, VhostUserMsg *vmsg) 966 { 967 unsigned int index = vmsg->payload.state.index; 968 969 DPRINT("State.index: %d\n", index); 970 vmsg->payload.state.num = dev->vq[index].last_avail_index; 971 vmsg->size = sizeof(vmsg->payload.state); 972 /* FIXME: this is a work-around for a bug in QEMU enabling 973 * too early vrings. When protocol features are enabled, 974 * we have to respect * VHOST_USER_SET_VRING_ENABLE request. */ 975 dev->ready = 0; 976 977 if (dev->vq[index].call_fd != -1) { 978 close(dev->vq[index].call_fd); 979 dispatcher_remove(&dev->dispatcher, dev->vq[index].call_fd); 980 dev->vq[index].call_fd = -1; 981 } 982 if (dev->vq[index].kick_fd != -1) { 983 close(dev->vq[index].kick_fd); 984 dispatcher_remove(&dev->dispatcher, dev->vq[index].kick_fd); 985 dev->vq[index].kick_fd = -1; 986 } 987 988 /* Reply */ 989 return 1; 990 } 991 992 static int 993 vubr_set_vring_kick_exec(VubrDev *dev, VhostUserMsg *vmsg) 994 { 995 uint64_t u64_arg = vmsg->payload.u64; 996 int index = u64_arg & VHOST_USER_VRING_IDX_MASK; 997 998 DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); 999 1000 assert((u64_arg & VHOST_USER_VRING_NOFD_MASK) == 0); 1001 assert(vmsg->fd_num == 1); 1002 1003 if (dev->vq[index].kick_fd != -1) { 1004 close(dev->vq[index].kick_fd); 1005 dispatcher_remove(&dev->dispatcher, dev->vq[index].kick_fd); 1006 } 1007 dev->vq[index].kick_fd = vmsg->fds[0]; 1008 DPRINT("Got kick_fd: %d for vq: %d\n", vmsg->fds[0], index); 1009 1010 if (index % 2 == 1) { 1011 /* TX queue. */ 1012 dispatcher_add(&dev->dispatcher, dev->vq[index].kick_fd, 1013 dev, vubr_kick_cb); 1014 1015 DPRINT("Waiting for kicks on fd: %d for vq: %d\n", 1016 dev->vq[index].kick_fd, index); 1017 } 1018 /* We temporarily use this hack to determine that both TX and RX 1019 * queues are set up and ready for processing. 1020 * FIXME: we need to rely in VHOST_USER_SET_VRING_ENABLE and 1021 * actual kicks. */ 1022 if (dev->vq[0].kick_fd != -1 && 1023 dev->vq[1].kick_fd != -1) { 1024 dev->ready = 1; 1025 DPRINT("vhost-user-bridge is ready for processing queues.\n"); 1026 } 1027 return 0; 1028 1029 } 1030 1031 static int 1032 vubr_set_vring_call_exec(VubrDev *dev, VhostUserMsg *vmsg) 1033 { 1034 uint64_t u64_arg = vmsg->payload.u64; 1035 int index = u64_arg & VHOST_USER_VRING_IDX_MASK; 1036 1037 DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); 1038 assert((u64_arg & VHOST_USER_VRING_NOFD_MASK) == 0); 1039 assert(vmsg->fd_num == 1); 1040 1041 if (dev->vq[index].call_fd != -1) { 1042 close(dev->vq[index].call_fd); 1043 dispatcher_remove(&dev->dispatcher, dev->vq[index].call_fd); 1044 } 1045 dev->vq[index].call_fd = vmsg->fds[0]; 1046 DPRINT("Got call_fd: %d for vq: %d\n", vmsg->fds[0], index); 1047 1048 return 0; 1049 } 1050 1051 static int 1052 vubr_set_vring_err_exec(VubrDev *dev, VhostUserMsg *vmsg) 1053 { 1054 DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); 1055 return 0; 1056 } 1057 1058 static int 1059 vubr_get_protocol_features_exec(VubrDev *dev, VhostUserMsg *vmsg) 1060 { 1061 vmsg->payload.u64 = 1ULL << VHOST_USER_PROTOCOL_F_LOG_SHMFD; 1062 DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); 1063 vmsg->size = sizeof(vmsg->payload.u64); 1064 1065 /* Reply */ 1066 return 1; 1067 } 1068 1069 static int 1070 vubr_set_protocol_features_exec(VubrDev *dev, VhostUserMsg *vmsg) 1071 { 1072 /* FIXME: unimplented */ 1073 DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); 1074 return 0; 1075 } 1076 1077 static int 1078 vubr_get_queue_num_exec(VubrDev *dev, VhostUserMsg *vmsg) 1079 { 1080 DPRINT("Function %s() not implemented yet.\n", __func__); 1081 return 0; 1082 } 1083 1084 static int 1085 vubr_set_vring_enable_exec(VubrDev *dev, VhostUserMsg *vmsg) 1086 { 1087 unsigned int index = vmsg->payload.state.index; 1088 unsigned int enable = vmsg->payload.state.num; 1089 1090 DPRINT("State.index: %d\n", index); 1091 DPRINT("State.enable: %d\n", enable); 1092 dev->vq[index].enable = enable; 1093 return 0; 1094 } 1095 1096 static int 1097 vubr_send_rarp_exec(VubrDev *dev, VhostUserMsg *vmsg) 1098 { 1099 DPRINT("Function %s() not implemented yet.\n", __func__); 1100 return 0; 1101 } 1102 1103 static int 1104 vubr_execute_request(VubrDev *dev, VhostUserMsg *vmsg) 1105 { 1106 /* Print out generic part of the request. */ 1107 DPRINT( 1108 "================== Vhost user message from QEMU ==================\n"); 1109 DPRINT("Request: %s (%d)\n", vubr_request_str[vmsg->request], 1110 vmsg->request); 1111 DPRINT("Flags: 0x%x\n", vmsg->flags); 1112 DPRINT("Size: %d\n", vmsg->size); 1113 1114 if (vmsg->fd_num) { 1115 int i; 1116 DPRINT("Fds:"); 1117 for (i = 0; i < vmsg->fd_num; i++) { 1118 DPRINT(" %d", vmsg->fds[i]); 1119 } 1120 DPRINT("\n"); 1121 } 1122 1123 switch (vmsg->request) { 1124 case VHOST_USER_NONE: 1125 return vubr_none_exec(dev, vmsg); 1126 case VHOST_USER_GET_FEATURES: 1127 return vubr_get_features_exec(dev, vmsg); 1128 case VHOST_USER_SET_FEATURES: 1129 return vubr_set_features_exec(dev, vmsg); 1130 case VHOST_USER_SET_OWNER: 1131 return vubr_set_owner_exec(dev, vmsg); 1132 case VHOST_USER_RESET_OWNER: 1133 return vubr_reset_device_exec(dev, vmsg); 1134 case VHOST_USER_SET_MEM_TABLE: 1135 return vubr_set_mem_table_exec(dev, vmsg); 1136 case VHOST_USER_SET_LOG_BASE: 1137 return vubr_set_log_base_exec(dev, vmsg); 1138 case VHOST_USER_SET_LOG_FD: 1139 return vubr_set_log_fd_exec(dev, vmsg); 1140 case VHOST_USER_SET_VRING_NUM: 1141 return vubr_set_vring_num_exec(dev, vmsg); 1142 case VHOST_USER_SET_VRING_ADDR: 1143 return vubr_set_vring_addr_exec(dev, vmsg); 1144 case VHOST_USER_SET_VRING_BASE: 1145 return vubr_set_vring_base_exec(dev, vmsg); 1146 case VHOST_USER_GET_VRING_BASE: 1147 return vubr_get_vring_base_exec(dev, vmsg); 1148 case VHOST_USER_SET_VRING_KICK: 1149 return vubr_set_vring_kick_exec(dev, vmsg); 1150 case VHOST_USER_SET_VRING_CALL: 1151 return vubr_set_vring_call_exec(dev, vmsg); 1152 case VHOST_USER_SET_VRING_ERR: 1153 return vubr_set_vring_err_exec(dev, vmsg); 1154 case VHOST_USER_GET_PROTOCOL_FEATURES: 1155 return vubr_get_protocol_features_exec(dev, vmsg); 1156 case VHOST_USER_SET_PROTOCOL_FEATURES: 1157 return vubr_set_protocol_features_exec(dev, vmsg); 1158 case VHOST_USER_GET_QUEUE_NUM: 1159 return vubr_get_queue_num_exec(dev, vmsg); 1160 case VHOST_USER_SET_VRING_ENABLE: 1161 return vubr_set_vring_enable_exec(dev, vmsg); 1162 case VHOST_USER_SEND_RARP: 1163 return vubr_send_rarp_exec(dev, vmsg); 1164 1165 case VHOST_USER_MAX: 1166 assert(vmsg->request != VHOST_USER_MAX); 1167 } 1168 return 0; 1169 } 1170 1171 static void 1172 vubr_receive_cb(int sock, void *ctx) 1173 { 1174 VubrDev *dev = (VubrDev *) ctx; 1175 VhostUserMsg vmsg; 1176 int reply_requested; 1177 1178 vubr_message_read(sock, &vmsg); 1179 reply_requested = vubr_execute_request(dev, &vmsg); 1180 if (reply_requested) { 1181 /* Set the version in the flags when sending the reply */ 1182 vmsg.flags &= ~VHOST_USER_VERSION_MASK; 1183 vmsg.flags |= VHOST_USER_VERSION; 1184 vmsg.flags |= VHOST_USER_REPLY_MASK; 1185 vubr_message_write(sock, &vmsg); 1186 } 1187 } 1188 1189 static void 1190 vubr_accept_cb(int sock, void *ctx) 1191 { 1192 VubrDev *dev = (VubrDev *)ctx; 1193 int conn_fd; 1194 struct sockaddr_un un; 1195 socklen_t len = sizeof(un); 1196 1197 conn_fd = accept(sock, (struct sockaddr *) &un, &len); 1198 if (conn_fd == -1) { 1199 vubr_die("accept()"); 1200 } 1201 DPRINT("Got connection from remote peer on sock %d\n", conn_fd); 1202 dispatcher_add(&dev->dispatcher, conn_fd, ctx, vubr_receive_cb); 1203 } 1204 1205 static VubrDev * 1206 vubr_new(const char *path) 1207 { 1208 VubrDev *dev = (VubrDev *) calloc(1, sizeof(VubrDev)); 1209 dev->nregions = 0; 1210 int i; 1211 struct sockaddr_un un; 1212 size_t len; 1213 1214 for (i = 0; i < MAX_NR_VIRTQUEUE; i++) { 1215 dev->vq[i] = (VubrVirtq) { 1216 .call_fd = -1, .kick_fd = -1, 1217 .size = 0, 1218 .last_avail_index = 0, .last_used_index = 0, 1219 .desc = 0, .avail = 0, .used = 0, 1220 .enable = 0, 1221 }; 1222 } 1223 1224 /* Init log */ 1225 dev->log_call_fd = -1; 1226 dev->log_size = 0; 1227 dev->log_table = 0; 1228 dev->ready = 0; 1229 dev->features = 0; 1230 1231 /* Get a UNIX socket. */ 1232 dev->sock = socket(AF_UNIX, SOCK_STREAM, 0); 1233 if (dev->sock == -1) { 1234 vubr_die("socket"); 1235 } 1236 1237 un.sun_family = AF_UNIX; 1238 strcpy(un.sun_path, path); 1239 len = sizeof(un.sun_family) + strlen(path); 1240 unlink(path); 1241 1242 if (bind(dev->sock, (struct sockaddr *) &un, len) == -1) { 1243 vubr_die("bind"); 1244 } 1245 1246 if (listen(dev->sock, 1) == -1) { 1247 vubr_die("listen"); 1248 } 1249 1250 dispatcher_init(&dev->dispatcher); 1251 dispatcher_add(&dev->dispatcher, dev->sock, (void *)dev, 1252 vubr_accept_cb); 1253 1254 DPRINT("Waiting for connections on UNIX socket %s ...\n", path); 1255 return dev; 1256 } 1257 1258 static void 1259 vubr_set_host(struct sockaddr_in *saddr, const char *host) 1260 { 1261 if (isdigit(host[0])) { 1262 if (!inet_aton(host, &saddr->sin_addr)) { 1263 fprintf(stderr, "inet_aton() failed.\n"); 1264 exit(1); 1265 } 1266 } else { 1267 struct hostent *he = gethostbyname(host); 1268 1269 if (!he) { 1270 fprintf(stderr, "gethostbyname() failed.\n"); 1271 exit(1); 1272 } 1273 saddr->sin_addr = *(struct in_addr *)he->h_addr; 1274 } 1275 } 1276 1277 static void 1278 vubr_backend_udp_setup(VubrDev *dev, 1279 const char *local_host, 1280 const char *local_port, 1281 const char *remote_host, 1282 const char *remote_port) 1283 { 1284 int sock; 1285 const char *r; 1286 1287 int lport, rport; 1288 1289 lport = strtol(local_port, (char **)&r, 0); 1290 if (r == local_port) { 1291 fprintf(stderr, "lport parsing failed.\n"); 1292 exit(1); 1293 } 1294 1295 rport = strtol(remote_port, (char **)&r, 0); 1296 if (r == remote_port) { 1297 fprintf(stderr, "rport parsing failed.\n"); 1298 exit(1); 1299 } 1300 1301 struct sockaddr_in si_local = { 1302 .sin_family = AF_INET, 1303 .sin_port = htons(lport), 1304 }; 1305 1306 vubr_set_host(&si_local, local_host); 1307 1308 /* setup destination for sends */ 1309 dev->backend_udp_dest = (struct sockaddr_in) { 1310 .sin_family = AF_INET, 1311 .sin_port = htons(rport), 1312 }; 1313 vubr_set_host(&dev->backend_udp_dest, remote_host); 1314 1315 sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); 1316 if (sock == -1) { 1317 vubr_die("socket"); 1318 } 1319 1320 if (bind(sock, (struct sockaddr *)&si_local, sizeof(si_local)) == -1) { 1321 vubr_die("bind"); 1322 } 1323 1324 dev->backend_udp_sock = sock; 1325 dispatcher_add(&dev->dispatcher, sock, dev, vubr_backend_recv_cb); 1326 DPRINT("Waiting for data from udp backend on %s:%d...\n", 1327 local_host, lport); 1328 } 1329 1330 static void 1331 vubr_run(VubrDev *dev) 1332 { 1333 while (1) { 1334 /* timeout 200ms */ 1335 dispatcher_wait(&dev->dispatcher, 200000); 1336 /* Here one can try polling strategy. */ 1337 } 1338 } 1339 1340 static int 1341 vubr_parse_host_port(const char **host, const char **port, const char *buf) 1342 { 1343 char *p = strchr(buf, ':'); 1344 1345 if (!p) { 1346 return -1; 1347 } 1348 *p = '\0'; 1349 *host = strdup(buf); 1350 *port = strdup(p + 1); 1351 return 0; 1352 } 1353 1354 #define DEFAULT_UD_SOCKET "/tmp/vubr.sock" 1355 #define DEFAULT_LHOST "127.0.0.1" 1356 #define DEFAULT_LPORT "4444" 1357 #define DEFAULT_RHOST "127.0.0.1" 1358 #define DEFAULT_RPORT "5555" 1359 1360 static const char *ud_socket_path = DEFAULT_UD_SOCKET; 1361 static const char *lhost = DEFAULT_LHOST; 1362 static const char *lport = DEFAULT_LPORT; 1363 static const char *rhost = DEFAULT_RHOST; 1364 static const char *rport = DEFAULT_RPORT; 1365 1366 int 1367 main(int argc, char *argv[]) 1368 { 1369 VubrDev *dev; 1370 int opt; 1371 1372 while ((opt = getopt(argc, argv, "l:r:u:")) != -1) { 1373 1374 switch (opt) { 1375 case 'l': 1376 if (vubr_parse_host_port(&lhost, &lport, optarg) < 0) { 1377 goto out; 1378 } 1379 break; 1380 case 'r': 1381 if (vubr_parse_host_port(&rhost, &rport, optarg) < 0) { 1382 goto out; 1383 } 1384 break; 1385 case 'u': 1386 ud_socket_path = strdup(optarg); 1387 break; 1388 default: 1389 goto out; 1390 } 1391 } 1392 1393 DPRINT("ud socket: %s\n", ud_socket_path); 1394 DPRINT("local: %s:%s\n", lhost, lport); 1395 DPRINT("remote: %s:%s\n", rhost, rport); 1396 1397 dev = vubr_new(ud_socket_path); 1398 if (!dev) { 1399 return 1; 1400 } 1401 1402 vubr_backend_udp_setup(dev, lhost, lport, rhost, rport); 1403 vubr_run(dev); 1404 return 0; 1405 1406 out: 1407 fprintf(stderr, "Usage: %s ", argv[0]); 1408 fprintf(stderr, "[-u ud_socket_path] [-l lhost:lport] [-r rhost:rport]\n"); 1409 fprintf(stderr, "\t-u path to unix doman socket. default: %s\n", 1410 DEFAULT_UD_SOCKET); 1411 fprintf(stderr, "\t-l local host and port. default: %s:%s\n", 1412 DEFAULT_LHOST, DEFAULT_LPORT); 1413 fprintf(stderr, "\t-r remote host and port. default: %s:%s\n", 1414 DEFAULT_RHOST, DEFAULT_RPORT); 1415 1416 return 1; 1417 } 1418