1 /* 2 * QEMU I/O channels sockets driver 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2.1 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 */ 19 20 #include "qemu/osdep.h" 21 #include "qapi/error.h" 22 #include "qapi/qapi-visit-sockets.h" 23 #include "qemu/module.h" 24 #include "io/channel-socket.h" 25 #include "io/channel-util.h" 26 #include "io/channel-watch.h" 27 #include "trace.h" 28 #include "qapi/clone-visitor.h" 29 #ifdef CONFIG_LINUX 30 #include <linux/errqueue.h> 31 #include <sys/socket.h> 32 33 #if (defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY)) 34 #define QEMU_MSG_ZEROCOPY 35 #endif 36 #endif 37 38 #define SOCKET_MAX_FDS 16 39 40 SocketAddress * 41 qio_channel_socket_get_local_address(QIOChannelSocket *ioc, 42 Error **errp) 43 { 44 return socket_sockaddr_to_address(&ioc->localAddr, 45 ioc->localAddrLen, 46 errp); 47 } 48 49 SocketAddress * 50 qio_channel_socket_get_remote_address(QIOChannelSocket *ioc, 51 Error **errp) 52 { 53 return socket_sockaddr_to_address(&ioc->remoteAddr, 54 ioc->remoteAddrLen, 55 errp); 56 } 57 58 QIOChannelSocket * 59 qio_channel_socket_new(void) 60 { 61 QIOChannelSocket *sioc; 62 QIOChannel *ioc; 63 64 sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET)); 65 sioc->fd = -1; 66 sioc->zero_copy_queued = 0; 67 sioc->zero_copy_sent = 0; 68 69 ioc = QIO_CHANNEL(sioc); 70 qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN); 71 72 #ifdef WIN32 73 ioc->event = CreateEvent(NULL, FALSE, FALSE, NULL); 74 #endif 75 76 trace_qio_channel_socket_new(sioc); 77 78 return sioc; 79 } 80 81 int qio_channel_socket_set_send_buffer(QIOChannelSocket *ioc, 82 size_t size, 83 Error **errp) 84 { 85 if (setsockopt(ioc->fd, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) < 0) { 86 error_setg_errno(errp, errno, "Unable to set socket send buffer size"); 87 return -1; 88 } 89 90 return 0; 91 } 92 93 static int 94 qio_channel_socket_set_fd(QIOChannelSocket *sioc, 95 int fd, 96 Error **errp) 97 { 98 if (sioc->fd != -1) { 99 error_setg(errp, "Socket is already open"); 100 return -1; 101 } 102 103 sioc->fd = fd; 104 sioc->remoteAddrLen = sizeof(sioc->remoteAddr); 105 sioc->localAddrLen = sizeof(sioc->localAddr); 106 107 108 if (getpeername(fd, (struct sockaddr *)&sioc->remoteAddr, 109 &sioc->remoteAddrLen) < 0) { 110 if (errno == ENOTCONN) { 111 memset(&sioc->remoteAddr, 0, sizeof(sioc->remoteAddr)); 112 sioc->remoteAddrLen = sizeof(sioc->remoteAddr); 113 } else { 114 error_setg_errno(errp, errno, 115 "Unable to query remote socket address"); 116 goto error; 117 } 118 } 119 120 if (getsockname(fd, (struct sockaddr *)&sioc->localAddr, 121 &sioc->localAddrLen) < 0) { 122 error_setg_errno(errp, errno, 123 "Unable to query local socket address"); 124 goto error; 125 } 126 127 #ifndef WIN32 128 if (sioc->localAddr.ss_family == AF_UNIX) { 129 QIOChannel *ioc = QIO_CHANNEL(sioc); 130 qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS); 131 } 132 #endif /* WIN32 */ 133 134 return 0; 135 136 error: 137 sioc->fd = -1; /* Let the caller close FD on failure */ 138 return -1; 139 } 140 141 QIOChannelSocket * 142 qio_channel_socket_new_fd(int fd, 143 Error **errp) 144 { 145 QIOChannelSocket *ioc; 146 147 ioc = qio_channel_socket_new(); 148 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 149 object_unref(OBJECT(ioc)); 150 return NULL; 151 } 152 153 trace_qio_channel_socket_new_fd(ioc, fd); 154 155 return ioc; 156 } 157 158 159 int qio_channel_socket_connect_sync(QIOChannelSocket *ioc, 160 SocketAddress *addr, 161 Error **errp) 162 { 163 int fd; 164 165 trace_qio_channel_socket_connect_sync(ioc, addr); 166 fd = socket_connect(addr, errp); 167 if (fd < 0) { 168 trace_qio_channel_socket_connect_fail(ioc); 169 return -1; 170 } 171 172 trace_qio_channel_socket_connect_complete(ioc, fd); 173 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 174 close(fd); 175 return -1; 176 } 177 178 #ifdef QEMU_MSG_ZEROCOPY 179 int ret, v = 1; 180 ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v)); 181 if (ret == 0) { 182 /* Zero copy available on host */ 183 qio_channel_set_feature(QIO_CHANNEL(ioc), 184 QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY); 185 } 186 #endif 187 188 qio_channel_set_feature(QIO_CHANNEL(ioc), 189 QIO_CHANNEL_FEATURE_READ_MSG_PEEK); 190 191 return 0; 192 } 193 194 195 static void qio_channel_socket_connect_worker(QIOTask *task, 196 gpointer opaque) 197 { 198 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task)); 199 SocketAddress *addr = opaque; 200 Error *err = NULL; 201 202 qio_channel_socket_connect_sync(ioc, addr, &err); 203 204 qio_task_set_error(task, err); 205 } 206 207 208 void qio_channel_socket_connect_async(QIOChannelSocket *ioc, 209 SocketAddress *addr, 210 QIOTaskFunc callback, 211 gpointer opaque, 212 GDestroyNotify destroy, 213 GMainContext *context) 214 { 215 QIOTask *task = qio_task_new( 216 OBJECT(ioc), callback, opaque, destroy); 217 SocketAddress *addrCopy; 218 219 addrCopy = QAPI_CLONE(SocketAddress, addr); 220 221 /* socket_connect() does a non-blocking connect(), but it 222 * still blocks in DNS lookups, so we must use a thread */ 223 trace_qio_channel_socket_connect_async(ioc, addr); 224 qio_task_run_in_thread(task, 225 qio_channel_socket_connect_worker, 226 addrCopy, 227 (GDestroyNotify)qapi_free_SocketAddress, 228 context); 229 } 230 231 232 int qio_channel_socket_listen_sync(QIOChannelSocket *ioc, 233 SocketAddress *addr, 234 int num, 235 Error **errp) 236 { 237 int fd; 238 239 trace_qio_channel_socket_listen_sync(ioc, addr, num); 240 fd = socket_listen(addr, num, errp); 241 if (fd < 0) { 242 trace_qio_channel_socket_listen_fail(ioc); 243 return -1; 244 } 245 246 trace_qio_channel_socket_listen_complete(ioc, fd); 247 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 248 close(fd); 249 return -1; 250 } 251 qio_channel_set_feature(QIO_CHANNEL(ioc), QIO_CHANNEL_FEATURE_LISTEN); 252 253 return 0; 254 } 255 256 257 struct QIOChannelListenWorkerData { 258 SocketAddress *addr; 259 int num; /* amount of expected connections */ 260 }; 261 262 static void qio_channel_listen_worker_free(gpointer opaque) 263 { 264 struct QIOChannelListenWorkerData *data = opaque; 265 266 qapi_free_SocketAddress(data->addr); 267 g_free(data); 268 } 269 270 static void qio_channel_socket_listen_worker(QIOTask *task, 271 gpointer opaque) 272 { 273 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task)); 274 struct QIOChannelListenWorkerData *data = opaque; 275 Error *err = NULL; 276 277 qio_channel_socket_listen_sync(ioc, data->addr, data->num, &err); 278 279 qio_task_set_error(task, err); 280 } 281 282 283 void qio_channel_socket_listen_async(QIOChannelSocket *ioc, 284 SocketAddress *addr, 285 int num, 286 QIOTaskFunc callback, 287 gpointer opaque, 288 GDestroyNotify destroy, 289 GMainContext *context) 290 { 291 QIOTask *task = qio_task_new( 292 OBJECT(ioc), callback, opaque, destroy); 293 struct QIOChannelListenWorkerData *data; 294 295 data = g_new0(struct QIOChannelListenWorkerData, 1); 296 data->addr = QAPI_CLONE(SocketAddress, addr); 297 data->num = num; 298 299 /* socket_listen() blocks in DNS lookups, so we must use a thread */ 300 trace_qio_channel_socket_listen_async(ioc, addr, num); 301 qio_task_run_in_thread(task, 302 qio_channel_socket_listen_worker, 303 data, 304 qio_channel_listen_worker_free, 305 context); 306 } 307 308 309 int qio_channel_socket_dgram_sync(QIOChannelSocket *ioc, 310 SocketAddress *localAddr, 311 SocketAddress *remoteAddr, 312 Error **errp) 313 { 314 int fd; 315 316 trace_qio_channel_socket_dgram_sync(ioc, localAddr, remoteAddr); 317 fd = socket_dgram(remoteAddr, localAddr, errp); 318 if (fd < 0) { 319 trace_qio_channel_socket_dgram_fail(ioc); 320 return -1; 321 } 322 323 trace_qio_channel_socket_dgram_complete(ioc, fd); 324 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 325 close(fd); 326 return -1; 327 } 328 329 return 0; 330 } 331 332 333 struct QIOChannelSocketDGramWorkerData { 334 SocketAddress *localAddr; 335 SocketAddress *remoteAddr; 336 }; 337 338 339 static void qio_channel_socket_dgram_worker_free(gpointer opaque) 340 { 341 struct QIOChannelSocketDGramWorkerData *data = opaque; 342 qapi_free_SocketAddress(data->localAddr); 343 qapi_free_SocketAddress(data->remoteAddr); 344 g_free(data); 345 } 346 347 static void qio_channel_socket_dgram_worker(QIOTask *task, 348 gpointer opaque) 349 { 350 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task)); 351 struct QIOChannelSocketDGramWorkerData *data = opaque; 352 Error *err = NULL; 353 354 /* socket_dgram() blocks in DNS lookups, so we must use a thread */ 355 qio_channel_socket_dgram_sync(ioc, data->localAddr, 356 data->remoteAddr, &err); 357 358 qio_task_set_error(task, err); 359 } 360 361 362 void qio_channel_socket_dgram_async(QIOChannelSocket *ioc, 363 SocketAddress *localAddr, 364 SocketAddress *remoteAddr, 365 QIOTaskFunc callback, 366 gpointer opaque, 367 GDestroyNotify destroy, 368 GMainContext *context) 369 { 370 QIOTask *task = qio_task_new( 371 OBJECT(ioc), callback, opaque, destroy); 372 struct QIOChannelSocketDGramWorkerData *data = g_new0( 373 struct QIOChannelSocketDGramWorkerData, 1); 374 375 data->localAddr = QAPI_CLONE(SocketAddress, localAddr); 376 data->remoteAddr = QAPI_CLONE(SocketAddress, remoteAddr); 377 378 trace_qio_channel_socket_dgram_async(ioc, localAddr, remoteAddr); 379 qio_task_run_in_thread(task, 380 qio_channel_socket_dgram_worker, 381 data, 382 qio_channel_socket_dgram_worker_free, 383 context); 384 } 385 386 387 QIOChannelSocket * 388 qio_channel_socket_accept(QIOChannelSocket *ioc, 389 Error **errp) 390 { 391 QIOChannelSocket *cioc; 392 393 cioc = qio_channel_socket_new(); 394 cioc->remoteAddrLen = sizeof(ioc->remoteAddr); 395 cioc->localAddrLen = sizeof(ioc->localAddr); 396 397 retry: 398 trace_qio_channel_socket_accept(ioc); 399 cioc->fd = qemu_accept(ioc->fd, (struct sockaddr *)&cioc->remoteAddr, 400 &cioc->remoteAddrLen); 401 if (cioc->fd < 0) { 402 if (errno == EINTR) { 403 goto retry; 404 } 405 error_setg_errno(errp, errno, "Unable to accept connection"); 406 trace_qio_channel_socket_accept_fail(ioc); 407 goto error; 408 } 409 410 if (getsockname(cioc->fd, (struct sockaddr *)&cioc->localAddr, 411 &cioc->localAddrLen) < 0) { 412 error_setg_errno(errp, errno, 413 "Unable to query local socket address"); 414 goto error; 415 } 416 417 #ifndef WIN32 418 if (cioc->localAddr.ss_family == AF_UNIX) { 419 QIOChannel *ioc_local = QIO_CHANNEL(cioc); 420 qio_channel_set_feature(ioc_local, QIO_CHANNEL_FEATURE_FD_PASS); 421 } 422 #endif /* WIN32 */ 423 424 qio_channel_set_feature(QIO_CHANNEL(cioc), 425 QIO_CHANNEL_FEATURE_READ_MSG_PEEK); 426 427 trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd); 428 return cioc; 429 430 error: 431 object_unref(OBJECT(cioc)); 432 return NULL; 433 } 434 435 static void qio_channel_socket_init(Object *obj) 436 { 437 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj); 438 ioc->fd = -1; 439 } 440 441 static void qio_channel_socket_finalize(Object *obj) 442 { 443 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj); 444 445 if (ioc->fd != -1) { 446 QIOChannel *ioc_local = QIO_CHANNEL(ioc); 447 if (qio_channel_has_feature(ioc_local, QIO_CHANNEL_FEATURE_LISTEN)) { 448 Error *err = NULL; 449 450 socket_listen_cleanup(ioc->fd, &err); 451 if (err) { 452 error_report_err(err); 453 err = NULL; 454 } 455 } 456 #ifdef WIN32 457 qemu_socket_unselect(ioc->fd, NULL); 458 #endif 459 close(ioc->fd); 460 ioc->fd = -1; 461 } 462 } 463 464 465 #ifndef WIN32 466 static void qio_channel_socket_copy_fds(struct msghdr *msg, 467 int **fds, size_t *nfds) 468 { 469 struct cmsghdr *cmsg; 470 471 *nfds = 0; 472 *fds = NULL; 473 474 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { 475 int fd_size, i; 476 int gotfds; 477 478 if (cmsg->cmsg_len < CMSG_LEN(sizeof(int)) || 479 cmsg->cmsg_level != SOL_SOCKET || 480 cmsg->cmsg_type != SCM_RIGHTS) { 481 continue; 482 } 483 484 fd_size = cmsg->cmsg_len - CMSG_LEN(0); 485 486 if (!fd_size) { 487 continue; 488 } 489 490 gotfds = fd_size / sizeof(int); 491 *fds = g_renew(int, *fds, *nfds + gotfds); 492 memcpy(*fds + *nfds, CMSG_DATA(cmsg), fd_size); 493 494 for (i = 0; i < gotfds; i++) { 495 int fd = (*fds)[*nfds + i]; 496 if (fd < 0) { 497 continue; 498 } 499 500 /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */ 501 qemu_socket_set_block(fd); 502 503 #ifndef MSG_CMSG_CLOEXEC 504 qemu_set_cloexec(fd); 505 #endif 506 } 507 *nfds += gotfds; 508 } 509 } 510 511 512 static ssize_t qio_channel_socket_readv(QIOChannel *ioc, 513 const struct iovec *iov, 514 size_t niov, 515 int **fds, 516 size_t *nfds, 517 int flags, 518 Error **errp) 519 { 520 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 521 ssize_t ret; 522 struct msghdr msg = { NULL, }; 523 char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)]; 524 int sflags = 0; 525 526 memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)); 527 528 msg.msg_iov = (struct iovec *)iov; 529 msg.msg_iovlen = niov; 530 if (fds && nfds) { 531 msg.msg_control = control; 532 msg.msg_controllen = sizeof(control); 533 #ifdef MSG_CMSG_CLOEXEC 534 sflags |= MSG_CMSG_CLOEXEC; 535 #endif 536 537 } 538 539 if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) { 540 sflags |= MSG_PEEK; 541 } 542 543 retry: 544 ret = recvmsg(sioc->fd, &msg, sflags); 545 if (ret < 0) { 546 if (errno == EAGAIN) { 547 return QIO_CHANNEL_ERR_BLOCK; 548 } 549 if (errno == EINTR) { 550 goto retry; 551 } 552 553 error_setg_errno(errp, errno, 554 "Unable to read from socket"); 555 return -1; 556 } 557 558 if (fds && nfds) { 559 qio_channel_socket_copy_fds(&msg, fds, nfds); 560 } 561 562 return ret; 563 } 564 565 static ssize_t qio_channel_socket_writev(QIOChannel *ioc, 566 const struct iovec *iov, 567 size_t niov, 568 int *fds, 569 size_t nfds, 570 int flags, 571 Error **errp) 572 { 573 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 574 ssize_t ret; 575 struct msghdr msg = { NULL, }; 576 char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)]; 577 size_t fdsize = sizeof(int) * nfds; 578 struct cmsghdr *cmsg; 579 int sflags = 0; 580 581 memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)); 582 583 msg.msg_iov = (struct iovec *)iov; 584 msg.msg_iovlen = niov; 585 586 if (nfds) { 587 if (nfds > SOCKET_MAX_FDS) { 588 error_setg_errno(errp, EINVAL, 589 "Only %d FDs can be sent, got %zu", 590 SOCKET_MAX_FDS, nfds); 591 return -1; 592 } 593 594 msg.msg_control = control; 595 msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds); 596 597 cmsg = CMSG_FIRSTHDR(&msg); 598 cmsg->cmsg_len = CMSG_LEN(fdsize); 599 cmsg->cmsg_level = SOL_SOCKET; 600 cmsg->cmsg_type = SCM_RIGHTS; 601 memcpy(CMSG_DATA(cmsg), fds, fdsize); 602 } 603 604 if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) { 605 #ifdef QEMU_MSG_ZEROCOPY 606 sflags = MSG_ZEROCOPY; 607 #else 608 /* 609 * We expect QIOChannel class entry point to have 610 * blocked this code path already 611 */ 612 g_assert_not_reached(); 613 #endif 614 } 615 616 retry: 617 ret = sendmsg(sioc->fd, &msg, sflags); 618 if (ret <= 0) { 619 switch (errno) { 620 case EAGAIN: 621 return QIO_CHANNEL_ERR_BLOCK; 622 case EINTR: 623 goto retry; 624 case ENOBUFS: 625 if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) { 626 error_setg_errno(errp, errno, 627 "Process can't lock enough memory for using MSG_ZEROCOPY"); 628 return -1; 629 } 630 break; 631 } 632 633 error_setg_errno(errp, errno, 634 "Unable to write to socket"); 635 return -1; 636 } 637 638 if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) { 639 sioc->zero_copy_queued++; 640 } 641 642 return ret; 643 } 644 #else /* WIN32 */ 645 static ssize_t qio_channel_socket_readv(QIOChannel *ioc, 646 const struct iovec *iov, 647 size_t niov, 648 int **fds, 649 size_t *nfds, 650 int flags, 651 Error **errp) 652 { 653 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 654 ssize_t done = 0; 655 ssize_t i; 656 int sflags = 0; 657 658 if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) { 659 sflags |= MSG_PEEK; 660 } 661 662 for (i = 0; i < niov; i++) { 663 ssize_t ret; 664 retry: 665 ret = recv(sioc->fd, 666 iov[i].iov_base, 667 iov[i].iov_len, 668 sflags); 669 if (ret < 0) { 670 if (errno == EAGAIN) { 671 if (done) { 672 return done; 673 } else { 674 return QIO_CHANNEL_ERR_BLOCK; 675 } 676 } else if (errno == EINTR) { 677 goto retry; 678 } else { 679 error_setg_errno(errp, errno, 680 "Unable to read from socket"); 681 return -1; 682 } 683 } 684 done += ret; 685 if (ret < iov[i].iov_len) { 686 return done; 687 } 688 } 689 690 return done; 691 } 692 693 static ssize_t qio_channel_socket_writev(QIOChannel *ioc, 694 const struct iovec *iov, 695 size_t niov, 696 int *fds, 697 size_t nfds, 698 int flags, 699 Error **errp) 700 { 701 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 702 ssize_t done = 0; 703 ssize_t i; 704 705 for (i = 0; i < niov; i++) { 706 ssize_t ret; 707 retry: 708 ret = send(sioc->fd, 709 iov[i].iov_base, 710 iov[i].iov_len, 711 0); 712 if (ret < 0) { 713 if (errno == EAGAIN) { 714 if (done) { 715 return done; 716 } else { 717 return QIO_CHANNEL_ERR_BLOCK; 718 } 719 } else if (errno == EINTR) { 720 goto retry; 721 } else { 722 error_setg_errno(errp, errno, 723 "Unable to write to socket"); 724 return -1; 725 } 726 } 727 done += ret; 728 if (ret < iov[i].iov_len) { 729 return done; 730 } 731 } 732 733 return done; 734 } 735 #endif /* WIN32 */ 736 737 738 #ifdef QEMU_MSG_ZEROCOPY 739 static int qio_channel_socket_flush(QIOChannel *ioc, 740 Error **errp) 741 { 742 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 743 struct msghdr msg = {}; 744 struct sock_extended_err *serr; 745 struct cmsghdr *cm; 746 char control[CMSG_SPACE(sizeof(*serr))]; 747 int received; 748 int ret; 749 750 if (sioc->zero_copy_queued == sioc->zero_copy_sent) { 751 return 0; 752 } 753 754 msg.msg_control = control; 755 msg.msg_controllen = sizeof(control); 756 memset(control, 0, sizeof(control)); 757 758 ret = 1; 759 760 while (sioc->zero_copy_sent < sioc->zero_copy_queued) { 761 received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE); 762 if (received < 0) { 763 switch (errno) { 764 case EAGAIN: 765 /* Nothing on errqueue, wait until something is available */ 766 qio_channel_wait(ioc, G_IO_ERR); 767 continue; 768 case EINTR: 769 continue; 770 default: 771 error_setg_errno(errp, errno, 772 "Unable to read errqueue"); 773 return -1; 774 } 775 } 776 777 cm = CMSG_FIRSTHDR(&msg); 778 if (cm->cmsg_level != SOL_IP && cm->cmsg_type != IP_RECVERR && 779 cm->cmsg_level != SOL_IPV6 && cm->cmsg_type != IPV6_RECVERR) { 780 error_setg_errno(errp, EPROTOTYPE, 781 "Wrong cmsg in errqueue"); 782 return -1; 783 } 784 785 serr = (void *) CMSG_DATA(cm); 786 if (serr->ee_errno != SO_EE_ORIGIN_NONE) { 787 error_setg_errno(errp, serr->ee_errno, 788 "Error on socket"); 789 return -1; 790 } 791 if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 792 error_setg_errno(errp, serr->ee_origin, 793 "Error not from zero copy"); 794 return -1; 795 } 796 if (serr->ee_data < serr->ee_info) { 797 error_setg_errno(errp, serr->ee_origin, 798 "Wrong notification bounds"); 799 return -1; 800 } 801 802 /* No errors, count successfully finished sendmsg()*/ 803 sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1; 804 805 /* If any sendmsg() succeeded using zero copy, return 0 at the end */ 806 if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) { 807 ret = 0; 808 } 809 } 810 811 return ret; 812 } 813 814 #endif /* QEMU_MSG_ZEROCOPY */ 815 816 static int 817 qio_channel_socket_set_blocking(QIOChannel *ioc, 818 bool enabled, 819 Error **errp) 820 { 821 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 822 823 if (enabled) { 824 qemu_socket_set_block(sioc->fd); 825 } else { 826 qemu_socket_set_nonblock(sioc->fd); 827 } 828 return 0; 829 } 830 831 832 static void 833 qio_channel_socket_set_delay(QIOChannel *ioc, 834 bool enabled) 835 { 836 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 837 int v = enabled ? 0 : 1; 838 839 setsockopt(sioc->fd, 840 IPPROTO_TCP, TCP_NODELAY, 841 &v, sizeof(v)); 842 } 843 844 845 static void 846 qio_channel_socket_set_cork(QIOChannel *ioc, 847 bool enabled) 848 { 849 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 850 int v = enabled ? 1 : 0; 851 852 socket_set_cork(sioc->fd, v); 853 } 854 855 static int 856 qio_channel_socket_get_peerpid(QIOChannel *ioc, 857 unsigned int *pid, 858 Error **errp) 859 { 860 #ifdef CONFIG_LINUX 861 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 862 Error *err = NULL; 863 socklen_t len = sizeof(struct ucred); 864 865 struct ucred cred; 866 if (getsockopt(sioc->fd, 867 SOL_SOCKET, SO_PEERCRED, 868 &cred, &len) == -1) { 869 error_setg_errno(&err, errno, "Unable to get peer credentials"); 870 error_propagate(errp, err); 871 *pid = -1; 872 return -1; 873 } 874 *pid = (unsigned int)cred.pid; 875 return 0; 876 #else 877 error_setg(errp, "Unsupported feature"); 878 *pid = -1; 879 return -1; 880 #endif 881 } 882 883 static int 884 qio_channel_socket_close(QIOChannel *ioc, 885 Error **errp) 886 { 887 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 888 int rc = 0; 889 Error *err = NULL; 890 891 if (sioc->fd != -1) { 892 #ifdef WIN32 893 qemu_socket_unselect(sioc->fd, NULL); 894 #endif 895 if (qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_LISTEN)) { 896 socket_listen_cleanup(sioc->fd, errp); 897 } 898 899 if (close(sioc->fd) < 0) { 900 sioc->fd = -1; 901 error_setg_errno(&err, errno, "Unable to close socket"); 902 error_propagate(errp, err); 903 return -1; 904 } 905 sioc->fd = -1; 906 } 907 return rc; 908 } 909 910 static int 911 qio_channel_socket_shutdown(QIOChannel *ioc, 912 QIOChannelShutdown how, 913 Error **errp) 914 { 915 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 916 int sockhow; 917 918 switch (how) { 919 case QIO_CHANNEL_SHUTDOWN_READ: 920 sockhow = SHUT_RD; 921 break; 922 case QIO_CHANNEL_SHUTDOWN_WRITE: 923 sockhow = SHUT_WR; 924 break; 925 case QIO_CHANNEL_SHUTDOWN_BOTH: 926 default: 927 sockhow = SHUT_RDWR; 928 break; 929 } 930 931 if (shutdown(sioc->fd, sockhow) < 0) { 932 error_setg_errno(errp, errno, 933 "Unable to shutdown socket"); 934 return -1; 935 } 936 return 0; 937 } 938 939 static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc, 940 AioContext *read_ctx, 941 IOHandler *io_read, 942 AioContext *write_ctx, 943 IOHandler *io_write, 944 void *opaque) 945 { 946 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 947 948 qio_channel_util_set_aio_fd_handler(sioc->fd, read_ctx, io_read, 949 sioc->fd, write_ctx, io_write, 950 opaque); 951 } 952 953 static GSource *qio_channel_socket_create_watch(QIOChannel *ioc, 954 GIOCondition condition) 955 { 956 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 957 return qio_channel_create_socket_watch(ioc, 958 sioc->fd, 959 condition); 960 } 961 962 static void qio_channel_socket_class_init(ObjectClass *klass, 963 const void *class_data G_GNUC_UNUSED) 964 { 965 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); 966 967 ioc_klass->io_writev = qio_channel_socket_writev; 968 ioc_klass->io_readv = qio_channel_socket_readv; 969 ioc_klass->io_set_blocking = qio_channel_socket_set_blocking; 970 ioc_klass->io_close = qio_channel_socket_close; 971 ioc_klass->io_shutdown = qio_channel_socket_shutdown; 972 ioc_klass->io_set_cork = qio_channel_socket_set_cork; 973 ioc_klass->io_set_delay = qio_channel_socket_set_delay; 974 ioc_klass->io_create_watch = qio_channel_socket_create_watch; 975 ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler; 976 #ifdef QEMU_MSG_ZEROCOPY 977 ioc_klass->io_flush = qio_channel_socket_flush; 978 #endif 979 ioc_klass->io_peerpid = qio_channel_socket_get_peerpid; 980 } 981 982 static const TypeInfo qio_channel_socket_info = { 983 .parent = TYPE_QIO_CHANNEL, 984 .name = TYPE_QIO_CHANNEL_SOCKET, 985 .instance_size = sizeof(QIOChannelSocket), 986 .instance_init = qio_channel_socket_init, 987 .instance_finalize = qio_channel_socket_finalize, 988 .class_init = qio_channel_socket_class_init, 989 }; 990 991 static void qio_channel_socket_register_types(void) 992 { 993 type_register_static(&qio_channel_socket_info); 994 } 995 996 type_init(qio_channel_socket_register_types); 997