1 /*
2 * QEMU I/O channels sockets driver
3 *
4 * Copyright (c) 2015 Red Hat, Inc.
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "qemu/osdep.h"
21 #include "qapi/error.h"
22 #include "qapi/qapi-visit-sockets.h"
23 #include "qemu/module.h"
24 #include "io/channel-socket.h"
25 #include "io/channel-util.h"
26 #include "io/channel-watch.h"
27 #include "trace.h"
28 #include "qapi/clone-visitor.h"
29 #ifdef CONFIG_LINUX
30 #include <linux/errqueue.h>
31 #include <sys/socket.h>
32
33 #if (defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY))
34 #define QEMU_MSG_ZEROCOPY
35 #endif
36 #endif
37
38 #define SOCKET_MAX_FDS 16
39
40 SocketAddress *
qio_channel_socket_get_local_address(QIOChannelSocket * ioc,Error ** errp)41 qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
42 Error **errp)
43 {
44 return socket_sockaddr_to_address(&ioc->localAddr,
45 ioc->localAddrLen,
46 errp);
47 }
48
49 SocketAddress *
qio_channel_socket_get_remote_address(QIOChannelSocket * ioc,Error ** errp)50 qio_channel_socket_get_remote_address(QIOChannelSocket *ioc,
51 Error **errp)
52 {
53 return socket_sockaddr_to_address(&ioc->remoteAddr,
54 ioc->remoteAddrLen,
55 errp);
56 }
57
58 QIOChannelSocket *
qio_channel_socket_new(void)59 qio_channel_socket_new(void)
60 {
61 QIOChannelSocket *sioc;
62 QIOChannel *ioc;
63
64 sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
65 sioc->fd = -1;
66 sioc->zero_copy_queued = 0;
67 sioc->zero_copy_sent = 0;
68
69 ioc = QIO_CHANNEL(sioc);
70 qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
71
72 #ifdef WIN32
73 ioc->event = CreateEvent(NULL, FALSE, FALSE, NULL);
74 #endif
75
76 trace_qio_channel_socket_new(sioc);
77
78 return sioc;
79 }
80
qio_channel_socket_set_send_buffer(QIOChannelSocket * ioc,size_t size,Error ** errp)81 int qio_channel_socket_set_send_buffer(QIOChannelSocket *ioc,
82 size_t size,
83 Error **errp)
84 {
85 if (setsockopt(ioc->fd, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) < 0) {
86 error_setg_errno(errp, errno, "Unable to set socket send buffer size");
87 return -1;
88 }
89
90 return 0;
91 }
92
93 static int
qio_channel_socket_set_fd(QIOChannelSocket * sioc,int fd,Error ** errp)94 qio_channel_socket_set_fd(QIOChannelSocket *sioc,
95 int fd,
96 Error **errp)
97 {
98 if (sioc->fd != -1) {
99 error_setg(errp, "Socket is already open");
100 return -1;
101 }
102
103 sioc->fd = fd;
104 sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
105 sioc->localAddrLen = sizeof(sioc->localAddr);
106
107
108 if (getpeername(fd, (struct sockaddr *)&sioc->remoteAddr,
109 &sioc->remoteAddrLen) < 0) {
110 if (errno == ENOTCONN) {
111 memset(&sioc->remoteAddr, 0, sizeof(sioc->remoteAddr));
112 sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
113 } else {
114 error_setg_errno(errp, errno,
115 "Unable to query remote socket address");
116 goto error;
117 }
118 }
119
120 if (getsockname(fd, (struct sockaddr *)&sioc->localAddr,
121 &sioc->localAddrLen) < 0) {
122 error_setg_errno(errp, errno,
123 "Unable to query local socket address");
124 goto error;
125 }
126
127 #ifndef WIN32
128 if (sioc->localAddr.ss_family == AF_UNIX) {
129 QIOChannel *ioc = QIO_CHANNEL(sioc);
130 qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS);
131 }
132 #endif /* WIN32 */
133
134 return 0;
135
136 error:
137 sioc->fd = -1; /* Let the caller close FD on failure */
138 return -1;
139 }
140
141 QIOChannelSocket *
qio_channel_socket_new_fd(int fd,Error ** errp)142 qio_channel_socket_new_fd(int fd,
143 Error **errp)
144 {
145 QIOChannelSocket *ioc;
146
147 ioc = qio_channel_socket_new();
148 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
149 object_unref(OBJECT(ioc));
150 return NULL;
151 }
152
153 trace_qio_channel_socket_new_fd(ioc, fd);
154
155 return ioc;
156 }
157
158
qio_channel_socket_connect_sync(QIOChannelSocket * ioc,SocketAddress * addr,Error ** errp)159 int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
160 SocketAddress *addr,
161 Error **errp)
162 {
163 int fd;
164
165 trace_qio_channel_socket_connect_sync(ioc, addr);
166 fd = socket_connect(addr, errp);
167 if (fd < 0) {
168 trace_qio_channel_socket_connect_fail(ioc);
169 return -1;
170 }
171
172 trace_qio_channel_socket_connect_complete(ioc, fd);
173 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
174 close(fd);
175 return -1;
176 }
177
178 #ifdef QEMU_MSG_ZEROCOPY
179 int ret, v = 1;
180 ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
181 if (ret == 0) {
182 /* Zero copy available on host */
183 qio_channel_set_feature(QIO_CHANNEL(ioc),
184 QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY);
185 }
186 #endif
187
188 qio_channel_set_feature(QIO_CHANNEL(ioc),
189 QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
190
191 return 0;
192 }
193
194
qio_channel_socket_connect_worker(QIOTask * task,gpointer opaque)195 static void qio_channel_socket_connect_worker(QIOTask *task,
196 gpointer opaque)
197 {
198 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
199 SocketAddress *addr = opaque;
200 Error *err = NULL;
201
202 qio_channel_socket_connect_sync(ioc, addr, &err);
203
204 qio_task_set_error(task, err);
205 }
206
207
qio_channel_socket_connect_async(QIOChannelSocket * ioc,SocketAddress * addr,QIOTaskFunc callback,gpointer opaque,GDestroyNotify destroy,GMainContext * context)208 void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
209 SocketAddress *addr,
210 QIOTaskFunc callback,
211 gpointer opaque,
212 GDestroyNotify destroy,
213 GMainContext *context)
214 {
215 QIOTask *task = qio_task_new(
216 OBJECT(ioc), callback, opaque, destroy);
217 SocketAddress *addrCopy;
218
219 addrCopy = QAPI_CLONE(SocketAddress, addr);
220
221 /* socket_connect() does a non-blocking connect(), but it
222 * still blocks in DNS lookups, so we must use a thread */
223 trace_qio_channel_socket_connect_async(ioc, addr);
224 qio_task_run_in_thread(task,
225 qio_channel_socket_connect_worker,
226 addrCopy,
227 (GDestroyNotify)qapi_free_SocketAddress,
228 context);
229 }
230
231
qio_channel_socket_listen_sync(QIOChannelSocket * ioc,SocketAddress * addr,int num,Error ** errp)232 int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
233 SocketAddress *addr,
234 int num,
235 Error **errp)
236 {
237 int fd;
238
239 trace_qio_channel_socket_listen_sync(ioc, addr, num);
240 fd = socket_listen(addr, num, errp);
241 if (fd < 0) {
242 trace_qio_channel_socket_listen_fail(ioc);
243 return -1;
244 }
245
246 trace_qio_channel_socket_listen_complete(ioc, fd);
247 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
248 close(fd);
249 return -1;
250 }
251 qio_channel_set_feature(QIO_CHANNEL(ioc), QIO_CHANNEL_FEATURE_LISTEN);
252
253 return 0;
254 }
255
256
257 struct QIOChannelListenWorkerData {
258 SocketAddress *addr;
259 int num; /* amount of expected connections */
260 };
261
qio_channel_listen_worker_free(gpointer opaque)262 static void qio_channel_listen_worker_free(gpointer opaque)
263 {
264 struct QIOChannelListenWorkerData *data = opaque;
265
266 qapi_free_SocketAddress(data->addr);
267 g_free(data);
268 }
269
qio_channel_socket_listen_worker(QIOTask * task,gpointer opaque)270 static void qio_channel_socket_listen_worker(QIOTask *task,
271 gpointer opaque)
272 {
273 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
274 struct QIOChannelListenWorkerData *data = opaque;
275 Error *err = NULL;
276
277 qio_channel_socket_listen_sync(ioc, data->addr, data->num, &err);
278
279 qio_task_set_error(task, err);
280 }
281
282
qio_channel_socket_listen_async(QIOChannelSocket * ioc,SocketAddress * addr,int num,QIOTaskFunc callback,gpointer opaque,GDestroyNotify destroy,GMainContext * context)283 void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
284 SocketAddress *addr,
285 int num,
286 QIOTaskFunc callback,
287 gpointer opaque,
288 GDestroyNotify destroy,
289 GMainContext *context)
290 {
291 QIOTask *task = qio_task_new(
292 OBJECT(ioc), callback, opaque, destroy);
293 struct QIOChannelListenWorkerData *data;
294
295 data = g_new0(struct QIOChannelListenWorkerData, 1);
296 data->addr = QAPI_CLONE(SocketAddress, addr);
297 data->num = num;
298
299 /* socket_listen() blocks in DNS lookups, so we must use a thread */
300 trace_qio_channel_socket_listen_async(ioc, addr, num);
301 qio_task_run_in_thread(task,
302 qio_channel_socket_listen_worker,
303 data,
304 qio_channel_listen_worker_free,
305 context);
306 }
307
308
qio_channel_socket_dgram_sync(QIOChannelSocket * ioc,SocketAddress * localAddr,SocketAddress * remoteAddr,Error ** errp)309 int qio_channel_socket_dgram_sync(QIOChannelSocket *ioc,
310 SocketAddress *localAddr,
311 SocketAddress *remoteAddr,
312 Error **errp)
313 {
314 int fd;
315
316 trace_qio_channel_socket_dgram_sync(ioc, localAddr, remoteAddr);
317 fd = socket_dgram(remoteAddr, localAddr, errp);
318 if (fd < 0) {
319 trace_qio_channel_socket_dgram_fail(ioc);
320 return -1;
321 }
322
323 trace_qio_channel_socket_dgram_complete(ioc, fd);
324 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
325 close(fd);
326 return -1;
327 }
328
329 return 0;
330 }
331
332
333 struct QIOChannelSocketDGramWorkerData {
334 SocketAddress *localAddr;
335 SocketAddress *remoteAddr;
336 };
337
338
qio_channel_socket_dgram_worker_free(gpointer opaque)339 static void qio_channel_socket_dgram_worker_free(gpointer opaque)
340 {
341 struct QIOChannelSocketDGramWorkerData *data = opaque;
342 qapi_free_SocketAddress(data->localAddr);
343 qapi_free_SocketAddress(data->remoteAddr);
344 g_free(data);
345 }
346
qio_channel_socket_dgram_worker(QIOTask * task,gpointer opaque)347 static void qio_channel_socket_dgram_worker(QIOTask *task,
348 gpointer opaque)
349 {
350 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
351 struct QIOChannelSocketDGramWorkerData *data = opaque;
352 Error *err = NULL;
353
354 /* socket_dgram() blocks in DNS lookups, so we must use a thread */
355 qio_channel_socket_dgram_sync(ioc, data->localAddr,
356 data->remoteAddr, &err);
357
358 qio_task_set_error(task, err);
359 }
360
361
qio_channel_socket_dgram_async(QIOChannelSocket * ioc,SocketAddress * localAddr,SocketAddress * remoteAddr,QIOTaskFunc callback,gpointer opaque,GDestroyNotify destroy,GMainContext * context)362 void qio_channel_socket_dgram_async(QIOChannelSocket *ioc,
363 SocketAddress *localAddr,
364 SocketAddress *remoteAddr,
365 QIOTaskFunc callback,
366 gpointer opaque,
367 GDestroyNotify destroy,
368 GMainContext *context)
369 {
370 QIOTask *task = qio_task_new(
371 OBJECT(ioc), callback, opaque, destroy);
372 struct QIOChannelSocketDGramWorkerData *data = g_new0(
373 struct QIOChannelSocketDGramWorkerData, 1);
374
375 data->localAddr = QAPI_CLONE(SocketAddress, localAddr);
376 data->remoteAddr = QAPI_CLONE(SocketAddress, remoteAddr);
377
378 trace_qio_channel_socket_dgram_async(ioc, localAddr, remoteAddr);
379 qio_task_run_in_thread(task,
380 qio_channel_socket_dgram_worker,
381 data,
382 qio_channel_socket_dgram_worker_free,
383 context);
384 }
385
386
387 QIOChannelSocket *
qio_channel_socket_accept(QIOChannelSocket * ioc,Error ** errp)388 qio_channel_socket_accept(QIOChannelSocket *ioc,
389 Error **errp)
390 {
391 QIOChannelSocket *cioc;
392
393 cioc = qio_channel_socket_new();
394 cioc->remoteAddrLen = sizeof(ioc->remoteAddr);
395 cioc->localAddrLen = sizeof(ioc->localAddr);
396
397 retry:
398 trace_qio_channel_socket_accept(ioc);
399 cioc->fd = qemu_accept(ioc->fd, (struct sockaddr *)&cioc->remoteAddr,
400 &cioc->remoteAddrLen);
401 if (cioc->fd < 0) {
402 if (errno == EINTR) {
403 goto retry;
404 }
405 error_setg_errno(errp, errno, "Unable to accept connection");
406 trace_qio_channel_socket_accept_fail(ioc);
407 goto error;
408 }
409
410 if (getsockname(cioc->fd, (struct sockaddr *)&cioc->localAddr,
411 &cioc->localAddrLen) < 0) {
412 error_setg_errno(errp, errno,
413 "Unable to query local socket address");
414 goto error;
415 }
416
417 #ifndef WIN32
418 if (cioc->localAddr.ss_family == AF_UNIX) {
419 QIOChannel *ioc_local = QIO_CHANNEL(cioc);
420 qio_channel_set_feature(ioc_local, QIO_CHANNEL_FEATURE_FD_PASS);
421 }
422 #endif /* WIN32 */
423
424 qio_channel_set_feature(QIO_CHANNEL(cioc),
425 QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
426
427 trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd);
428 return cioc;
429
430 error:
431 object_unref(OBJECT(cioc));
432 return NULL;
433 }
434
qio_channel_socket_init(Object * obj)435 static void qio_channel_socket_init(Object *obj)
436 {
437 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
438 ioc->fd = -1;
439 }
440
qio_channel_socket_finalize(Object * obj)441 static void qio_channel_socket_finalize(Object *obj)
442 {
443 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
444
445 if (ioc->fd != -1) {
446 QIOChannel *ioc_local = QIO_CHANNEL(ioc);
447 if (qio_channel_has_feature(ioc_local, QIO_CHANNEL_FEATURE_LISTEN)) {
448 Error *err = NULL;
449
450 socket_listen_cleanup(ioc->fd, &err);
451 if (err) {
452 error_report_err(err);
453 err = NULL;
454 }
455 }
456 #ifdef WIN32
457 qemu_socket_unselect(ioc->fd, NULL);
458 #endif
459 close(ioc->fd);
460 ioc->fd = -1;
461 }
462 }
463
464
465 #ifndef WIN32
qio_channel_socket_copy_fds(struct msghdr * msg,int ** fds,size_t * nfds)466 static void qio_channel_socket_copy_fds(struct msghdr *msg,
467 int **fds, size_t *nfds)
468 {
469 struct cmsghdr *cmsg;
470
471 *nfds = 0;
472 *fds = NULL;
473
474 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
475 int fd_size, i;
476 int gotfds;
477
478 if (cmsg->cmsg_len < CMSG_LEN(sizeof(int)) ||
479 cmsg->cmsg_level != SOL_SOCKET ||
480 cmsg->cmsg_type != SCM_RIGHTS) {
481 continue;
482 }
483
484 fd_size = cmsg->cmsg_len - CMSG_LEN(0);
485
486 if (!fd_size) {
487 continue;
488 }
489
490 gotfds = fd_size / sizeof(int);
491 *fds = g_renew(int, *fds, *nfds + gotfds);
492 memcpy(*fds + *nfds, CMSG_DATA(cmsg), fd_size);
493
494 for (i = 0; i < gotfds; i++) {
495 int fd = (*fds)[*nfds + i];
496 if (fd < 0) {
497 continue;
498 }
499
500 /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */
501 qemu_socket_set_block(fd);
502
503 #ifndef MSG_CMSG_CLOEXEC
504 qemu_set_cloexec(fd);
505 #endif
506 }
507 *nfds += gotfds;
508 }
509 }
510
511
qio_channel_socket_readv(QIOChannel * ioc,const struct iovec * iov,size_t niov,int ** fds,size_t * nfds,int flags,Error ** errp)512 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
513 const struct iovec *iov,
514 size_t niov,
515 int **fds,
516 size_t *nfds,
517 int flags,
518 Error **errp)
519 {
520 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
521 ssize_t ret;
522 struct msghdr msg = { NULL, };
523 char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
524 int sflags = 0;
525
526 memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
527
528 msg.msg_iov = (struct iovec *)iov;
529 msg.msg_iovlen = niov;
530 if (fds && nfds) {
531 msg.msg_control = control;
532 msg.msg_controllen = sizeof(control);
533 #ifdef MSG_CMSG_CLOEXEC
534 sflags |= MSG_CMSG_CLOEXEC;
535 #endif
536
537 }
538
539 if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
540 sflags |= MSG_PEEK;
541 }
542
543 retry:
544 ret = recvmsg(sioc->fd, &msg, sflags);
545 if (ret < 0) {
546 if (errno == EAGAIN) {
547 return QIO_CHANNEL_ERR_BLOCK;
548 }
549 if (errno == EINTR) {
550 goto retry;
551 }
552
553 error_setg_errno(errp, errno,
554 "Unable to read from socket");
555 return -1;
556 }
557
558 if (fds && nfds) {
559 qio_channel_socket_copy_fds(&msg, fds, nfds);
560 }
561
562 return ret;
563 }
564
qio_channel_socket_writev(QIOChannel * ioc,const struct iovec * iov,size_t niov,int * fds,size_t nfds,int flags,Error ** errp)565 static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
566 const struct iovec *iov,
567 size_t niov,
568 int *fds,
569 size_t nfds,
570 int flags,
571 Error **errp)
572 {
573 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
574 ssize_t ret;
575 struct msghdr msg = { NULL, };
576 char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
577 size_t fdsize = sizeof(int) * nfds;
578 struct cmsghdr *cmsg;
579 int sflags = 0;
580
581 memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
582
583 msg.msg_iov = (struct iovec *)iov;
584 msg.msg_iovlen = niov;
585
586 if (nfds) {
587 if (nfds > SOCKET_MAX_FDS) {
588 error_setg_errno(errp, EINVAL,
589 "Only %d FDs can be sent, got %zu",
590 SOCKET_MAX_FDS, nfds);
591 return -1;
592 }
593
594 msg.msg_control = control;
595 msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds);
596
597 cmsg = CMSG_FIRSTHDR(&msg);
598 cmsg->cmsg_len = CMSG_LEN(fdsize);
599 cmsg->cmsg_level = SOL_SOCKET;
600 cmsg->cmsg_type = SCM_RIGHTS;
601 memcpy(CMSG_DATA(cmsg), fds, fdsize);
602 }
603
604 if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
605 #ifdef QEMU_MSG_ZEROCOPY
606 sflags = MSG_ZEROCOPY;
607 #else
608 /*
609 * We expect QIOChannel class entry point to have
610 * blocked this code path already
611 */
612 g_assert_not_reached();
613 #endif
614 }
615
616 retry:
617 ret = sendmsg(sioc->fd, &msg, sflags);
618 if (ret <= 0) {
619 switch (errno) {
620 case EAGAIN:
621 return QIO_CHANNEL_ERR_BLOCK;
622 case EINTR:
623 goto retry;
624 case ENOBUFS:
625 if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
626 error_setg_errno(errp, errno,
627 "Process can't lock enough memory for using MSG_ZEROCOPY");
628 return -1;
629 }
630 break;
631 }
632
633 error_setg_errno(errp, errno,
634 "Unable to write to socket");
635 return -1;
636 }
637
638 if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
639 sioc->zero_copy_queued++;
640 }
641
642 return ret;
643 }
644 #else /* WIN32 */
qio_channel_socket_readv(QIOChannel * ioc,const struct iovec * iov,size_t niov,int ** fds,size_t * nfds,int flags,Error ** errp)645 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
646 const struct iovec *iov,
647 size_t niov,
648 int **fds,
649 size_t *nfds,
650 int flags,
651 Error **errp)
652 {
653 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
654 ssize_t done = 0;
655 ssize_t i;
656 int sflags = 0;
657
658 if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
659 sflags |= MSG_PEEK;
660 }
661
662 for (i = 0; i < niov; i++) {
663 ssize_t ret;
664 retry:
665 ret = recv(sioc->fd,
666 iov[i].iov_base,
667 iov[i].iov_len,
668 sflags);
669 if (ret < 0) {
670 if (errno == EAGAIN) {
671 if (done) {
672 return done;
673 } else {
674 return QIO_CHANNEL_ERR_BLOCK;
675 }
676 } else if (errno == EINTR) {
677 goto retry;
678 } else {
679 error_setg_errno(errp, errno,
680 "Unable to read from socket");
681 return -1;
682 }
683 }
684 done += ret;
685 if (ret < iov[i].iov_len) {
686 return done;
687 }
688 }
689
690 return done;
691 }
692
qio_channel_socket_writev(QIOChannel * ioc,const struct iovec * iov,size_t niov,int * fds,size_t nfds,int flags,Error ** errp)693 static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
694 const struct iovec *iov,
695 size_t niov,
696 int *fds,
697 size_t nfds,
698 int flags,
699 Error **errp)
700 {
701 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
702 ssize_t done = 0;
703 ssize_t i;
704
705 for (i = 0; i < niov; i++) {
706 ssize_t ret;
707 retry:
708 ret = send(sioc->fd,
709 iov[i].iov_base,
710 iov[i].iov_len,
711 0);
712 if (ret < 0) {
713 if (errno == EAGAIN) {
714 if (done) {
715 return done;
716 } else {
717 return QIO_CHANNEL_ERR_BLOCK;
718 }
719 } else if (errno == EINTR) {
720 goto retry;
721 } else {
722 error_setg_errno(errp, errno,
723 "Unable to write to socket");
724 return -1;
725 }
726 }
727 done += ret;
728 if (ret < iov[i].iov_len) {
729 return done;
730 }
731 }
732
733 return done;
734 }
735 #endif /* WIN32 */
736
737
738 #ifdef QEMU_MSG_ZEROCOPY
qio_channel_socket_flush(QIOChannel * ioc,Error ** errp)739 static int qio_channel_socket_flush(QIOChannel *ioc,
740 Error **errp)
741 {
742 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
743 struct msghdr msg = {};
744 struct sock_extended_err *serr;
745 struct cmsghdr *cm;
746 char control[CMSG_SPACE(sizeof(*serr))];
747 int received;
748 int ret;
749
750 if (sioc->zero_copy_queued == sioc->zero_copy_sent) {
751 return 0;
752 }
753
754 msg.msg_control = control;
755 msg.msg_controllen = sizeof(control);
756 memset(control, 0, sizeof(control));
757
758 ret = 1;
759
760 while (sioc->zero_copy_sent < sioc->zero_copy_queued) {
761 received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
762 if (received < 0) {
763 switch (errno) {
764 case EAGAIN:
765 /* Nothing on errqueue, wait until something is available */
766 qio_channel_wait(ioc, G_IO_ERR);
767 continue;
768 case EINTR:
769 continue;
770 default:
771 error_setg_errno(errp, errno,
772 "Unable to read errqueue");
773 return -1;
774 }
775 }
776
777 cm = CMSG_FIRSTHDR(&msg);
778 if (cm->cmsg_level != SOL_IP && cm->cmsg_type != IP_RECVERR &&
779 cm->cmsg_level != SOL_IPV6 && cm->cmsg_type != IPV6_RECVERR) {
780 error_setg_errno(errp, EPROTOTYPE,
781 "Wrong cmsg in errqueue");
782 return -1;
783 }
784
785 serr = (void *) CMSG_DATA(cm);
786 if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
787 error_setg_errno(errp, serr->ee_errno,
788 "Error on socket");
789 return -1;
790 }
791 if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
792 error_setg_errno(errp, serr->ee_origin,
793 "Error not from zero copy");
794 return -1;
795 }
796 if (serr->ee_data < serr->ee_info) {
797 error_setg_errno(errp, serr->ee_origin,
798 "Wrong notification bounds");
799 return -1;
800 }
801
802 /* No errors, count successfully finished sendmsg()*/
803 sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1;
804
805 /* If any sendmsg() succeeded using zero copy, return 0 at the end */
806 if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) {
807 ret = 0;
808 }
809 }
810
811 return ret;
812 }
813
814 #endif /* QEMU_MSG_ZEROCOPY */
815
816 static int
qio_channel_socket_set_blocking(QIOChannel * ioc,bool enabled,Error ** errp)817 qio_channel_socket_set_blocking(QIOChannel *ioc,
818 bool enabled,
819 Error **errp)
820 {
821 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
822
823 if (enabled) {
824 qemu_socket_set_block(sioc->fd);
825 } else {
826 qemu_socket_set_nonblock(sioc->fd);
827 }
828 return 0;
829 }
830
831
832 static void
qio_channel_socket_set_delay(QIOChannel * ioc,bool enabled)833 qio_channel_socket_set_delay(QIOChannel *ioc,
834 bool enabled)
835 {
836 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
837 int v = enabled ? 0 : 1;
838
839 setsockopt(sioc->fd,
840 IPPROTO_TCP, TCP_NODELAY,
841 &v, sizeof(v));
842 }
843
844
845 static void
qio_channel_socket_set_cork(QIOChannel * ioc,bool enabled)846 qio_channel_socket_set_cork(QIOChannel *ioc,
847 bool enabled)
848 {
849 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
850 int v = enabled ? 1 : 0;
851
852 socket_set_cork(sioc->fd, v);
853 }
854
855 static int
qio_channel_socket_get_peerpid(QIOChannel * ioc,unsigned int * pid,Error ** errp)856 qio_channel_socket_get_peerpid(QIOChannel *ioc,
857 unsigned int *pid,
858 Error **errp)
859 {
860 #ifdef CONFIG_LINUX
861 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
862 Error *err = NULL;
863 socklen_t len = sizeof(struct ucred);
864
865 struct ucred cred;
866 if (getsockopt(sioc->fd,
867 SOL_SOCKET, SO_PEERCRED,
868 &cred, &len) == -1) {
869 error_setg_errno(&err, errno, "Unable to get peer credentials");
870 error_propagate(errp, err);
871 *pid = -1;
872 return -1;
873 }
874 *pid = (unsigned int)cred.pid;
875 return 0;
876 #else
877 error_setg(errp, "Unsupported feature");
878 *pid = -1;
879 return -1;
880 #endif
881 }
882
883 static int
qio_channel_socket_close(QIOChannel * ioc,Error ** errp)884 qio_channel_socket_close(QIOChannel *ioc,
885 Error **errp)
886 {
887 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
888 int rc = 0;
889 Error *err = NULL;
890
891 if (sioc->fd != -1) {
892 #ifdef WIN32
893 qemu_socket_unselect(sioc->fd, NULL);
894 #endif
895 if (qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_LISTEN)) {
896 socket_listen_cleanup(sioc->fd, errp);
897 }
898
899 if (close(sioc->fd) < 0) {
900 sioc->fd = -1;
901 error_setg_errno(&err, errno, "Unable to close socket");
902 error_propagate(errp, err);
903 return -1;
904 }
905 sioc->fd = -1;
906 }
907 return rc;
908 }
909
910 static int
qio_channel_socket_shutdown(QIOChannel * ioc,QIOChannelShutdown how,Error ** errp)911 qio_channel_socket_shutdown(QIOChannel *ioc,
912 QIOChannelShutdown how,
913 Error **errp)
914 {
915 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
916 int sockhow;
917
918 switch (how) {
919 case QIO_CHANNEL_SHUTDOWN_READ:
920 sockhow = SHUT_RD;
921 break;
922 case QIO_CHANNEL_SHUTDOWN_WRITE:
923 sockhow = SHUT_WR;
924 break;
925 case QIO_CHANNEL_SHUTDOWN_BOTH:
926 default:
927 sockhow = SHUT_RDWR;
928 break;
929 }
930
931 if (shutdown(sioc->fd, sockhow) < 0) {
932 error_setg_errno(errp, errno,
933 "Unable to shutdown socket");
934 return -1;
935 }
936 return 0;
937 }
938
qio_channel_socket_set_aio_fd_handler(QIOChannel * ioc,AioContext * read_ctx,IOHandler * io_read,AioContext * write_ctx,IOHandler * io_write,void * opaque)939 static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
940 AioContext *read_ctx,
941 IOHandler *io_read,
942 AioContext *write_ctx,
943 IOHandler *io_write,
944 void *opaque)
945 {
946 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
947
948 qio_channel_util_set_aio_fd_handler(sioc->fd, read_ctx, io_read,
949 sioc->fd, write_ctx, io_write,
950 opaque);
951 }
952
qio_channel_socket_create_watch(QIOChannel * ioc,GIOCondition condition)953 static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
954 GIOCondition condition)
955 {
956 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
957 return qio_channel_create_socket_watch(ioc,
958 sioc->fd,
959 condition);
960 }
961
qio_channel_socket_class_init(ObjectClass * klass,const void * class_data G_GNUC_UNUSED)962 static void qio_channel_socket_class_init(ObjectClass *klass,
963 const void *class_data G_GNUC_UNUSED)
964 {
965 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
966
967 ioc_klass->io_writev = qio_channel_socket_writev;
968 ioc_klass->io_readv = qio_channel_socket_readv;
969 ioc_klass->io_set_blocking = qio_channel_socket_set_blocking;
970 ioc_klass->io_close = qio_channel_socket_close;
971 ioc_klass->io_shutdown = qio_channel_socket_shutdown;
972 ioc_klass->io_set_cork = qio_channel_socket_set_cork;
973 ioc_klass->io_set_delay = qio_channel_socket_set_delay;
974 ioc_klass->io_create_watch = qio_channel_socket_create_watch;
975 ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
976 #ifdef QEMU_MSG_ZEROCOPY
977 ioc_klass->io_flush = qio_channel_socket_flush;
978 #endif
979 ioc_klass->io_peerpid = qio_channel_socket_get_peerpid;
980 }
981
982 static const TypeInfo qio_channel_socket_info = {
983 .parent = TYPE_QIO_CHANNEL,
984 .name = TYPE_QIO_CHANNEL_SOCKET,
985 .instance_size = sizeof(QIOChannelSocket),
986 .instance_init = qio_channel_socket_init,
987 .instance_finalize = qio_channel_socket_finalize,
988 .class_init = qio_channel_socket_class_init,
989 };
990
qio_channel_socket_register_types(void)991 static void qio_channel_socket_register_types(void)
992 {
993 type_register_static(&qio_channel_socket_info);
994 }
995
996 type_init(qio_channel_socket_register_types);
997