xref: /qemu/io/channel-socket.c (revision 72c58ff8958f6e00ce361d1d568dc21e41c85f45)
1 /*
2  * QEMU I/O channels sockets driver
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include "qemu/osdep.h"
21 #include "qapi/error.h"
22 #include "qapi/qapi-visit-sockets.h"
23 #include "qemu/module.h"
24 #include "io/channel-socket.h"
25 #include "io/channel-util.h"
26 #include "io/channel-watch.h"
27 #include "trace.h"
28 #include "qapi/clone-visitor.h"
29 #ifdef CONFIG_LINUX
30 #include <linux/errqueue.h>
31 #include <sys/socket.h>
32 
33 #if (defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY))
34 #define QEMU_MSG_ZEROCOPY
35 #endif
36 #endif
37 
38 #define SOCKET_MAX_FDS 16
39 
40 SocketAddress *
qio_channel_socket_get_local_address(QIOChannelSocket * ioc,Error ** errp)41 qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
42                                      Error **errp)
43 {
44     return socket_sockaddr_to_address(&ioc->localAddr,
45                                       ioc->localAddrLen,
46                                       errp);
47 }
48 
49 SocketAddress *
qio_channel_socket_get_remote_address(QIOChannelSocket * ioc,Error ** errp)50 qio_channel_socket_get_remote_address(QIOChannelSocket *ioc,
51                                       Error **errp)
52 {
53     return socket_sockaddr_to_address(&ioc->remoteAddr,
54                                       ioc->remoteAddrLen,
55                                       errp);
56 }
57 
58 QIOChannelSocket *
qio_channel_socket_new(void)59 qio_channel_socket_new(void)
60 {
61     QIOChannelSocket *sioc;
62     QIOChannel *ioc;
63 
64     sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
65     sioc->fd = -1;
66     sioc->zero_copy_queued = 0;
67     sioc->zero_copy_sent = 0;
68 
69     ioc = QIO_CHANNEL(sioc);
70     qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
71 
72 #ifdef WIN32
73     ioc->event = CreateEvent(NULL, FALSE, FALSE, NULL);
74 #endif
75 
76     trace_qio_channel_socket_new(sioc);
77 
78     return sioc;
79 }
80 
qio_channel_socket_set_send_buffer(QIOChannelSocket * ioc,size_t size,Error ** errp)81 int qio_channel_socket_set_send_buffer(QIOChannelSocket *ioc,
82                                        size_t size,
83                                        Error **errp)
84 {
85     if (setsockopt(ioc->fd, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) < 0) {
86         error_setg_errno(errp, errno, "Unable to set socket send buffer size");
87         return -1;
88     }
89 
90     return 0;
91 }
92 
93 static int
qio_channel_socket_set_fd(QIOChannelSocket * sioc,int fd,Error ** errp)94 qio_channel_socket_set_fd(QIOChannelSocket *sioc,
95                           int fd,
96                           Error **errp)
97 {
98     if (sioc->fd != -1) {
99         error_setg(errp, "Socket is already open");
100         return -1;
101     }
102 
103     sioc->fd = fd;
104     sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
105     sioc->localAddrLen = sizeof(sioc->localAddr);
106 
107 
108     if (getpeername(fd, (struct sockaddr *)&sioc->remoteAddr,
109                     &sioc->remoteAddrLen) < 0) {
110         if (errno == ENOTCONN) {
111             memset(&sioc->remoteAddr, 0, sizeof(sioc->remoteAddr));
112             sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
113         } else {
114             error_setg_errno(errp, errno,
115                              "Unable to query remote socket address");
116             goto error;
117         }
118     }
119 
120     if (getsockname(fd, (struct sockaddr *)&sioc->localAddr,
121                     &sioc->localAddrLen) < 0) {
122         error_setg_errno(errp, errno,
123                          "Unable to query local socket address");
124         goto error;
125     }
126 
127 #ifndef WIN32
128     if (sioc->localAddr.ss_family == AF_UNIX) {
129         QIOChannel *ioc = QIO_CHANNEL(sioc);
130         qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS);
131     }
132 #endif /* WIN32 */
133 
134     return 0;
135 
136  error:
137     sioc->fd = -1; /* Let the caller close FD on failure */
138     return -1;
139 }
140 
141 QIOChannelSocket *
qio_channel_socket_new_fd(int fd,Error ** errp)142 qio_channel_socket_new_fd(int fd,
143                           Error **errp)
144 {
145     QIOChannelSocket *ioc;
146 
147     ioc = qio_channel_socket_new();
148     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
149         object_unref(OBJECT(ioc));
150         return NULL;
151     }
152 
153     trace_qio_channel_socket_new_fd(ioc, fd);
154 
155     return ioc;
156 }
157 
158 
qio_channel_socket_connect_sync(QIOChannelSocket * ioc,SocketAddress * addr,Error ** errp)159 int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
160                                     SocketAddress *addr,
161                                     Error **errp)
162 {
163     int fd;
164 
165     trace_qio_channel_socket_connect_sync(ioc, addr);
166     fd = socket_connect(addr, errp);
167     if (fd < 0) {
168         trace_qio_channel_socket_connect_fail(ioc);
169         return -1;
170     }
171 
172     trace_qio_channel_socket_connect_complete(ioc, fd);
173     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
174         close(fd);
175         return -1;
176     }
177 
178 #ifdef QEMU_MSG_ZEROCOPY
179     int ret, v = 1;
180     ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
181     if (ret == 0) {
182         /* Zero copy available on host */
183         qio_channel_set_feature(QIO_CHANNEL(ioc),
184                                 QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY);
185     }
186 #endif
187 
188     qio_channel_set_feature(QIO_CHANNEL(ioc),
189                             QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
190 
191     return 0;
192 }
193 
194 
qio_channel_socket_connect_worker(QIOTask * task,gpointer opaque)195 static void qio_channel_socket_connect_worker(QIOTask *task,
196                                               gpointer opaque)
197 {
198     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
199     SocketAddress *addr = opaque;
200     Error *err = NULL;
201 
202     qio_channel_socket_connect_sync(ioc, addr, &err);
203 
204     qio_task_set_error(task, err);
205 }
206 
207 
qio_channel_socket_connect_async(QIOChannelSocket * ioc,SocketAddress * addr,QIOTaskFunc callback,gpointer opaque,GDestroyNotify destroy,GMainContext * context)208 void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
209                                       SocketAddress *addr,
210                                       QIOTaskFunc callback,
211                                       gpointer opaque,
212                                       GDestroyNotify destroy,
213                                       GMainContext *context)
214 {
215     QIOTask *task = qio_task_new(
216         OBJECT(ioc), callback, opaque, destroy);
217     SocketAddress *addrCopy;
218 
219     addrCopy = QAPI_CLONE(SocketAddress, addr);
220 
221     /* socket_connect() does a non-blocking connect(), but it
222      * still blocks in DNS lookups, so we must use a thread */
223     trace_qio_channel_socket_connect_async(ioc, addr);
224     qio_task_run_in_thread(task,
225                            qio_channel_socket_connect_worker,
226                            addrCopy,
227                            (GDestroyNotify)qapi_free_SocketAddress,
228                            context);
229 }
230 
231 
qio_channel_socket_listen_sync(QIOChannelSocket * ioc,SocketAddress * addr,int num,Error ** errp)232 int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
233                                    SocketAddress *addr,
234                                    int num,
235                                    Error **errp)
236 {
237     int fd;
238 
239     trace_qio_channel_socket_listen_sync(ioc, addr, num);
240     fd = socket_listen(addr, num, errp);
241     if (fd < 0) {
242         trace_qio_channel_socket_listen_fail(ioc);
243         return -1;
244     }
245 
246     trace_qio_channel_socket_listen_complete(ioc, fd);
247     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
248         close(fd);
249         return -1;
250     }
251     qio_channel_set_feature(QIO_CHANNEL(ioc), QIO_CHANNEL_FEATURE_LISTEN);
252 
253     return 0;
254 }
255 
256 
257 struct QIOChannelListenWorkerData {
258     SocketAddress *addr;
259     int num; /* amount of expected connections */
260 };
261 
qio_channel_listen_worker_free(gpointer opaque)262 static void qio_channel_listen_worker_free(gpointer opaque)
263 {
264     struct QIOChannelListenWorkerData *data = opaque;
265 
266     qapi_free_SocketAddress(data->addr);
267     g_free(data);
268 }
269 
qio_channel_socket_listen_worker(QIOTask * task,gpointer opaque)270 static void qio_channel_socket_listen_worker(QIOTask *task,
271                                              gpointer opaque)
272 {
273     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
274     struct QIOChannelListenWorkerData *data = opaque;
275     Error *err = NULL;
276 
277     qio_channel_socket_listen_sync(ioc, data->addr, data->num, &err);
278 
279     qio_task_set_error(task, err);
280 }
281 
282 
qio_channel_socket_listen_async(QIOChannelSocket * ioc,SocketAddress * addr,int num,QIOTaskFunc callback,gpointer opaque,GDestroyNotify destroy,GMainContext * context)283 void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
284                                      SocketAddress *addr,
285                                      int num,
286                                      QIOTaskFunc callback,
287                                      gpointer opaque,
288                                      GDestroyNotify destroy,
289                                      GMainContext *context)
290 {
291     QIOTask *task = qio_task_new(
292         OBJECT(ioc), callback, opaque, destroy);
293     struct QIOChannelListenWorkerData *data;
294 
295     data = g_new0(struct QIOChannelListenWorkerData, 1);
296     data->addr = QAPI_CLONE(SocketAddress, addr);
297     data->num = num;
298 
299     /* socket_listen() blocks in DNS lookups, so we must use a thread */
300     trace_qio_channel_socket_listen_async(ioc, addr, num);
301     qio_task_run_in_thread(task,
302                            qio_channel_socket_listen_worker,
303                            data,
304                            qio_channel_listen_worker_free,
305                            context);
306 }
307 
308 
qio_channel_socket_dgram_sync(QIOChannelSocket * ioc,SocketAddress * localAddr,SocketAddress * remoteAddr,Error ** errp)309 int qio_channel_socket_dgram_sync(QIOChannelSocket *ioc,
310                                   SocketAddress *localAddr,
311                                   SocketAddress *remoteAddr,
312                                   Error **errp)
313 {
314     int fd;
315 
316     trace_qio_channel_socket_dgram_sync(ioc, localAddr, remoteAddr);
317     fd = socket_dgram(remoteAddr, localAddr, errp);
318     if (fd < 0) {
319         trace_qio_channel_socket_dgram_fail(ioc);
320         return -1;
321     }
322 
323     trace_qio_channel_socket_dgram_complete(ioc, fd);
324     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
325         close(fd);
326         return -1;
327     }
328 
329     return 0;
330 }
331 
332 
333 struct QIOChannelSocketDGramWorkerData {
334     SocketAddress *localAddr;
335     SocketAddress *remoteAddr;
336 };
337 
338 
qio_channel_socket_dgram_worker_free(gpointer opaque)339 static void qio_channel_socket_dgram_worker_free(gpointer opaque)
340 {
341     struct QIOChannelSocketDGramWorkerData *data = opaque;
342     qapi_free_SocketAddress(data->localAddr);
343     qapi_free_SocketAddress(data->remoteAddr);
344     g_free(data);
345 }
346 
qio_channel_socket_dgram_worker(QIOTask * task,gpointer opaque)347 static void qio_channel_socket_dgram_worker(QIOTask *task,
348                                             gpointer opaque)
349 {
350     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
351     struct QIOChannelSocketDGramWorkerData *data = opaque;
352     Error *err = NULL;
353 
354     /* socket_dgram() blocks in DNS lookups, so we must use a thread */
355     qio_channel_socket_dgram_sync(ioc, data->localAddr,
356                                   data->remoteAddr, &err);
357 
358     qio_task_set_error(task, err);
359 }
360 
361 
qio_channel_socket_dgram_async(QIOChannelSocket * ioc,SocketAddress * localAddr,SocketAddress * remoteAddr,QIOTaskFunc callback,gpointer opaque,GDestroyNotify destroy,GMainContext * context)362 void qio_channel_socket_dgram_async(QIOChannelSocket *ioc,
363                                     SocketAddress *localAddr,
364                                     SocketAddress *remoteAddr,
365                                     QIOTaskFunc callback,
366                                     gpointer opaque,
367                                     GDestroyNotify destroy,
368                                     GMainContext *context)
369 {
370     QIOTask *task = qio_task_new(
371         OBJECT(ioc), callback, opaque, destroy);
372     struct QIOChannelSocketDGramWorkerData *data = g_new0(
373         struct QIOChannelSocketDGramWorkerData, 1);
374 
375     data->localAddr = QAPI_CLONE(SocketAddress, localAddr);
376     data->remoteAddr = QAPI_CLONE(SocketAddress, remoteAddr);
377 
378     trace_qio_channel_socket_dgram_async(ioc, localAddr, remoteAddr);
379     qio_task_run_in_thread(task,
380                            qio_channel_socket_dgram_worker,
381                            data,
382                            qio_channel_socket_dgram_worker_free,
383                            context);
384 }
385 
386 
387 QIOChannelSocket *
qio_channel_socket_accept(QIOChannelSocket * ioc,Error ** errp)388 qio_channel_socket_accept(QIOChannelSocket *ioc,
389                           Error **errp)
390 {
391     QIOChannelSocket *cioc;
392 
393     cioc = qio_channel_socket_new();
394     cioc->remoteAddrLen = sizeof(ioc->remoteAddr);
395     cioc->localAddrLen = sizeof(ioc->localAddr);
396 
397  retry:
398     trace_qio_channel_socket_accept(ioc);
399     cioc->fd = qemu_accept(ioc->fd, (struct sockaddr *)&cioc->remoteAddr,
400                            &cioc->remoteAddrLen);
401     if (cioc->fd < 0) {
402         if (errno == EINTR) {
403             goto retry;
404         }
405         error_setg_errno(errp, errno, "Unable to accept connection");
406         trace_qio_channel_socket_accept_fail(ioc);
407         goto error;
408     }
409 
410     if (getsockname(cioc->fd, (struct sockaddr *)&cioc->localAddr,
411                     &cioc->localAddrLen) < 0) {
412         error_setg_errno(errp, errno,
413                          "Unable to query local socket address");
414         goto error;
415     }
416 
417 #ifndef WIN32
418     if (cioc->localAddr.ss_family == AF_UNIX) {
419         QIOChannel *ioc_local = QIO_CHANNEL(cioc);
420         qio_channel_set_feature(ioc_local, QIO_CHANNEL_FEATURE_FD_PASS);
421     }
422 #endif /* WIN32 */
423 
424     qio_channel_set_feature(QIO_CHANNEL(cioc),
425                             QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
426 
427     trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd);
428     return cioc;
429 
430  error:
431     object_unref(OBJECT(cioc));
432     return NULL;
433 }
434 
qio_channel_socket_init(Object * obj)435 static void qio_channel_socket_init(Object *obj)
436 {
437     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
438     ioc->fd = -1;
439 }
440 
qio_channel_socket_finalize(Object * obj)441 static void qio_channel_socket_finalize(Object *obj)
442 {
443     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
444 
445     if (ioc->fd != -1) {
446         QIOChannel *ioc_local = QIO_CHANNEL(ioc);
447         if (qio_channel_has_feature(ioc_local, QIO_CHANNEL_FEATURE_LISTEN)) {
448             Error *err = NULL;
449 
450             socket_listen_cleanup(ioc->fd, &err);
451             if (err) {
452                 error_report_err(err);
453                 err = NULL;
454             }
455         }
456 #ifdef WIN32
457         qemu_socket_unselect(ioc->fd, NULL);
458 #endif
459         close(ioc->fd);
460         ioc->fd = -1;
461     }
462 }
463 
464 
465 #ifndef WIN32
qio_channel_socket_copy_fds(struct msghdr * msg,int ** fds,size_t * nfds)466 static void qio_channel_socket_copy_fds(struct msghdr *msg,
467                                         int **fds, size_t *nfds)
468 {
469     struct cmsghdr *cmsg;
470 
471     *nfds = 0;
472     *fds = NULL;
473 
474     for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
475         int fd_size, i;
476         int gotfds;
477 
478         if (cmsg->cmsg_len < CMSG_LEN(sizeof(int)) ||
479             cmsg->cmsg_level != SOL_SOCKET ||
480             cmsg->cmsg_type != SCM_RIGHTS) {
481             continue;
482         }
483 
484         fd_size = cmsg->cmsg_len - CMSG_LEN(0);
485 
486         if (!fd_size) {
487             continue;
488         }
489 
490         gotfds = fd_size / sizeof(int);
491         *fds = g_renew(int, *fds, *nfds + gotfds);
492         memcpy(*fds + *nfds, CMSG_DATA(cmsg), fd_size);
493 
494         for (i = 0; i < gotfds; i++) {
495             int fd = (*fds)[*nfds + i];
496             if (fd < 0) {
497                 continue;
498             }
499 
500             /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */
501             qemu_socket_set_block(fd);
502 
503 #ifndef MSG_CMSG_CLOEXEC
504             qemu_set_cloexec(fd);
505 #endif
506         }
507         *nfds += gotfds;
508     }
509 }
510 
511 
qio_channel_socket_readv(QIOChannel * ioc,const struct iovec * iov,size_t niov,int ** fds,size_t * nfds,int flags,Error ** errp)512 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
513                                         const struct iovec *iov,
514                                         size_t niov,
515                                         int **fds,
516                                         size_t *nfds,
517                                         int flags,
518                                         Error **errp)
519 {
520     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
521     ssize_t ret;
522     struct msghdr msg = { NULL, };
523     char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
524     int sflags = 0;
525 
526     memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
527 
528     msg.msg_iov = (struct iovec *)iov;
529     msg.msg_iovlen = niov;
530     if (fds && nfds) {
531         msg.msg_control = control;
532         msg.msg_controllen = sizeof(control);
533 #ifdef MSG_CMSG_CLOEXEC
534         sflags |= MSG_CMSG_CLOEXEC;
535 #endif
536 
537     }
538 
539     if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
540         sflags |= MSG_PEEK;
541     }
542 
543  retry:
544     ret = recvmsg(sioc->fd, &msg, sflags);
545     if (ret < 0) {
546         if (errno == EAGAIN) {
547             return QIO_CHANNEL_ERR_BLOCK;
548         }
549         if (errno == EINTR) {
550             goto retry;
551         }
552 
553         error_setg_errno(errp, errno,
554                          "Unable to read from socket");
555         return -1;
556     }
557 
558     if (fds && nfds) {
559         qio_channel_socket_copy_fds(&msg, fds, nfds);
560     }
561 
562     return ret;
563 }
564 
qio_channel_socket_writev(QIOChannel * ioc,const struct iovec * iov,size_t niov,int * fds,size_t nfds,int flags,Error ** errp)565 static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
566                                          const struct iovec *iov,
567                                          size_t niov,
568                                          int *fds,
569                                          size_t nfds,
570                                          int flags,
571                                          Error **errp)
572 {
573     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
574     ssize_t ret;
575     struct msghdr msg = { NULL, };
576     char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
577     size_t fdsize = sizeof(int) * nfds;
578     struct cmsghdr *cmsg;
579     int sflags = 0;
580 
581     memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
582 
583     msg.msg_iov = (struct iovec *)iov;
584     msg.msg_iovlen = niov;
585 
586     if (nfds) {
587         if (nfds > SOCKET_MAX_FDS) {
588             error_setg_errno(errp, EINVAL,
589                              "Only %d FDs can be sent, got %zu",
590                              SOCKET_MAX_FDS, nfds);
591             return -1;
592         }
593 
594         msg.msg_control = control;
595         msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds);
596 
597         cmsg = CMSG_FIRSTHDR(&msg);
598         cmsg->cmsg_len = CMSG_LEN(fdsize);
599         cmsg->cmsg_level = SOL_SOCKET;
600         cmsg->cmsg_type = SCM_RIGHTS;
601         memcpy(CMSG_DATA(cmsg), fds, fdsize);
602     }
603 
604     if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
605 #ifdef QEMU_MSG_ZEROCOPY
606         sflags = MSG_ZEROCOPY;
607 #else
608         /*
609          * We expect QIOChannel class entry point to have
610          * blocked this code path already
611          */
612         g_assert_not_reached();
613 #endif
614     }
615 
616  retry:
617     ret = sendmsg(sioc->fd, &msg, sflags);
618     if (ret <= 0) {
619         switch (errno) {
620         case EAGAIN:
621             return QIO_CHANNEL_ERR_BLOCK;
622         case EINTR:
623             goto retry;
624         case ENOBUFS:
625             if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
626                 error_setg_errno(errp, errno,
627                                  "Process can't lock enough memory for using MSG_ZEROCOPY");
628                 return -1;
629             }
630             break;
631         }
632 
633         error_setg_errno(errp, errno,
634                          "Unable to write to socket");
635         return -1;
636     }
637 
638     if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
639         sioc->zero_copy_queued++;
640     }
641 
642     return ret;
643 }
644 #else /* WIN32 */
qio_channel_socket_readv(QIOChannel * ioc,const struct iovec * iov,size_t niov,int ** fds,size_t * nfds,int flags,Error ** errp)645 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
646                                         const struct iovec *iov,
647                                         size_t niov,
648                                         int **fds,
649                                         size_t *nfds,
650                                         int flags,
651                                         Error **errp)
652 {
653     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
654     ssize_t done = 0;
655     ssize_t i;
656     int sflags = 0;
657 
658     if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
659         sflags |= MSG_PEEK;
660     }
661 
662     for (i = 0; i < niov; i++) {
663         ssize_t ret;
664     retry:
665         ret = recv(sioc->fd,
666                    iov[i].iov_base,
667                    iov[i].iov_len,
668                    sflags);
669         if (ret < 0) {
670             if (errno == EAGAIN) {
671                 if (done) {
672                     return done;
673                 } else {
674                     return QIO_CHANNEL_ERR_BLOCK;
675                 }
676             } else if (errno == EINTR) {
677                 goto retry;
678             } else {
679                 error_setg_errno(errp, errno,
680                                  "Unable to read from socket");
681                 return -1;
682             }
683         }
684         done += ret;
685         if (ret < iov[i].iov_len) {
686             return done;
687         }
688     }
689 
690     return done;
691 }
692 
qio_channel_socket_writev(QIOChannel * ioc,const struct iovec * iov,size_t niov,int * fds,size_t nfds,int flags,Error ** errp)693 static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
694                                          const struct iovec *iov,
695                                          size_t niov,
696                                          int *fds,
697                                          size_t nfds,
698                                          int flags,
699                                          Error **errp)
700 {
701     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
702     ssize_t done = 0;
703     ssize_t i;
704 
705     for (i = 0; i < niov; i++) {
706         ssize_t ret;
707     retry:
708         ret = send(sioc->fd,
709                    iov[i].iov_base,
710                    iov[i].iov_len,
711                    0);
712         if (ret < 0) {
713             if (errno == EAGAIN) {
714                 if (done) {
715                     return done;
716                 } else {
717                     return QIO_CHANNEL_ERR_BLOCK;
718                 }
719             } else if (errno == EINTR) {
720                 goto retry;
721             } else {
722                 error_setg_errno(errp, errno,
723                                  "Unable to write to socket");
724                 return -1;
725             }
726         }
727         done += ret;
728         if (ret < iov[i].iov_len) {
729             return done;
730         }
731     }
732 
733     return done;
734 }
735 #endif /* WIN32 */
736 
737 
738 #ifdef QEMU_MSG_ZEROCOPY
qio_channel_socket_flush(QIOChannel * ioc,Error ** errp)739 static int qio_channel_socket_flush(QIOChannel *ioc,
740                                     Error **errp)
741 {
742     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
743     struct msghdr msg = {};
744     struct sock_extended_err *serr;
745     struct cmsghdr *cm;
746     char control[CMSG_SPACE(sizeof(*serr))];
747     int received;
748     int ret;
749 
750     if (sioc->zero_copy_queued == sioc->zero_copy_sent) {
751         return 0;
752     }
753 
754     msg.msg_control = control;
755     msg.msg_controllen = sizeof(control);
756     memset(control, 0, sizeof(control));
757 
758     ret = 1;
759 
760     while (sioc->zero_copy_sent < sioc->zero_copy_queued) {
761         received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
762         if (received < 0) {
763             switch (errno) {
764             case EAGAIN:
765                 /* Nothing on errqueue, wait until something is available */
766                 qio_channel_wait(ioc, G_IO_ERR);
767                 continue;
768             case EINTR:
769                 continue;
770             default:
771                 error_setg_errno(errp, errno,
772                                  "Unable to read errqueue");
773                 return -1;
774             }
775         }
776 
777         cm = CMSG_FIRSTHDR(&msg);
778         if (cm->cmsg_level != SOL_IP   && cm->cmsg_type != IP_RECVERR &&
779             cm->cmsg_level != SOL_IPV6 && cm->cmsg_type != IPV6_RECVERR) {
780             error_setg_errno(errp, EPROTOTYPE,
781                              "Wrong cmsg in errqueue");
782             return -1;
783         }
784 
785         serr = (void *) CMSG_DATA(cm);
786         if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
787             error_setg_errno(errp, serr->ee_errno,
788                              "Error on socket");
789             return -1;
790         }
791         if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
792             error_setg_errno(errp, serr->ee_origin,
793                              "Error not from zero copy");
794             return -1;
795         }
796         if (serr->ee_data < serr->ee_info) {
797             error_setg_errno(errp, serr->ee_origin,
798                              "Wrong notification bounds");
799             return -1;
800         }
801 
802         /* No errors, count successfully finished sendmsg()*/
803         sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1;
804 
805         /* If any sendmsg() succeeded using zero copy, return 0 at the end */
806         if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) {
807             ret = 0;
808         }
809     }
810 
811     return ret;
812 }
813 
814 #endif /* QEMU_MSG_ZEROCOPY */
815 
816 static int
qio_channel_socket_set_blocking(QIOChannel * ioc,bool enabled,Error ** errp)817 qio_channel_socket_set_blocking(QIOChannel *ioc,
818                                 bool enabled,
819                                 Error **errp)
820 {
821     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
822 
823     if (enabled) {
824         qemu_socket_set_block(sioc->fd);
825     } else {
826         qemu_socket_set_nonblock(sioc->fd);
827     }
828     return 0;
829 }
830 
831 
832 static void
qio_channel_socket_set_delay(QIOChannel * ioc,bool enabled)833 qio_channel_socket_set_delay(QIOChannel *ioc,
834                              bool enabled)
835 {
836     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
837     int v = enabled ? 0 : 1;
838 
839     setsockopt(sioc->fd,
840                IPPROTO_TCP, TCP_NODELAY,
841                &v, sizeof(v));
842 }
843 
844 
845 static void
qio_channel_socket_set_cork(QIOChannel * ioc,bool enabled)846 qio_channel_socket_set_cork(QIOChannel *ioc,
847                             bool enabled)
848 {
849     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
850     int v = enabled ? 1 : 0;
851 
852     socket_set_cork(sioc->fd, v);
853 }
854 
855 static int
qio_channel_socket_get_peerpid(QIOChannel * ioc,unsigned int * pid,Error ** errp)856 qio_channel_socket_get_peerpid(QIOChannel *ioc,
857                                unsigned int *pid,
858                                Error **errp)
859 {
860 #ifdef CONFIG_LINUX
861     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
862     Error *err = NULL;
863     socklen_t len = sizeof(struct ucred);
864 
865     struct ucred cred;
866     if (getsockopt(sioc->fd,
867                SOL_SOCKET, SO_PEERCRED,
868                &cred, &len) == -1) {
869         error_setg_errno(&err, errno, "Unable to get peer credentials");
870         error_propagate(errp, err);
871         *pid = -1;
872         return -1;
873     }
874     *pid = (unsigned int)cred.pid;
875     return 0;
876 #else
877     error_setg(errp, "Unsupported feature");
878     *pid = -1;
879     return -1;
880 #endif
881 }
882 
883 static int
qio_channel_socket_close(QIOChannel * ioc,Error ** errp)884 qio_channel_socket_close(QIOChannel *ioc,
885                          Error **errp)
886 {
887     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
888     int rc = 0;
889     Error *err = NULL;
890 
891     if (sioc->fd != -1) {
892 #ifdef WIN32
893         qemu_socket_unselect(sioc->fd, NULL);
894 #endif
895         if (qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_LISTEN)) {
896             socket_listen_cleanup(sioc->fd, errp);
897         }
898 
899         if (close(sioc->fd) < 0) {
900             sioc->fd = -1;
901             error_setg_errno(&err, errno, "Unable to close socket");
902             error_propagate(errp, err);
903             return -1;
904         }
905         sioc->fd = -1;
906     }
907     return rc;
908 }
909 
910 static int
qio_channel_socket_shutdown(QIOChannel * ioc,QIOChannelShutdown how,Error ** errp)911 qio_channel_socket_shutdown(QIOChannel *ioc,
912                             QIOChannelShutdown how,
913                             Error **errp)
914 {
915     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
916     int sockhow;
917 
918     switch (how) {
919     case QIO_CHANNEL_SHUTDOWN_READ:
920         sockhow = SHUT_RD;
921         break;
922     case QIO_CHANNEL_SHUTDOWN_WRITE:
923         sockhow = SHUT_WR;
924         break;
925     case QIO_CHANNEL_SHUTDOWN_BOTH:
926     default:
927         sockhow = SHUT_RDWR;
928         break;
929     }
930 
931     if (shutdown(sioc->fd, sockhow) < 0) {
932         error_setg_errno(errp, errno,
933                          "Unable to shutdown socket");
934         return -1;
935     }
936     return 0;
937 }
938 
qio_channel_socket_set_aio_fd_handler(QIOChannel * ioc,AioContext * read_ctx,IOHandler * io_read,AioContext * write_ctx,IOHandler * io_write,void * opaque)939 static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
940                                                   AioContext *read_ctx,
941                                                   IOHandler *io_read,
942                                                   AioContext *write_ctx,
943                                                   IOHandler *io_write,
944                                                   void *opaque)
945 {
946     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
947 
948     qio_channel_util_set_aio_fd_handler(sioc->fd, read_ctx, io_read,
949                                         sioc->fd, write_ctx, io_write,
950                                         opaque);
951 }
952 
qio_channel_socket_create_watch(QIOChannel * ioc,GIOCondition condition)953 static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
954                                                 GIOCondition condition)
955 {
956     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
957     return qio_channel_create_socket_watch(ioc,
958                                            sioc->fd,
959                                            condition);
960 }
961 
qio_channel_socket_class_init(ObjectClass * klass,const void * class_data G_GNUC_UNUSED)962 static void qio_channel_socket_class_init(ObjectClass *klass,
963                                           const void *class_data G_GNUC_UNUSED)
964 {
965     QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
966 
967     ioc_klass->io_writev = qio_channel_socket_writev;
968     ioc_klass->io_readv = qio_channel_socket_readv;
969     ioc_klass->io_set_blocking = qio_channel_socket_set_blocking;
970     ioc_klass->io_close = qio_channel_socket_close;
971     ioc_klass->io_shutdown = qio_channel_socket_shutdown;
972     ioc_klass->io_set_cork = qio_channel_socket_set_cork;
973     ioc_klass->io_set_delay = qio_channel_socket_set_delay;
974     ioc_klass->io_create_watch = qio_channel_socket_create_watch;
975     ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
976 #ifdef QEMU_MSG_ZEROCOPY
977     ioc_klass->io_flush = qio_channel_socket_flush;
978 #endif
979     ioc_klass->io_peerpid = qio_channel_socket_get_peerpid;
980 }
981 
982 static const TypeInfo qio_channel_socket_info = {
983     .parent = TYPE_QIO_CHANNEL,
984     .name = TYPE_QIO_CHANNEL_SOCKET,
985     .instance_size = sizeof(QIOChannelSocket),
986     .instance_init = qio_channel_socket_init,
987     .instance_finalize = qio_channel_socket_finalize,
988     .class_init = qio_channel_socket_class_init,
989 };
990 
qio_channel_socket_register_types(void)991 static void qio_channel_socket_register_types(void)
992 {
993     type_register_static(&qio_channel_socket_info);
994 }
995 
996 type_init(qio_channel_socket_register_types);
997