xref: /qemu/io/channel.c (revision 9af3d9a931156142199c61518937506bfa5475f1)
1 /*
2  * QEMU I/O channels
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20 
21 #include "qemu/osdep.h"
22 #include "block/aio-wait.h"
23 #include "io/channel.h"
24 #include "qapi/error.h"
25 #include "qemu/main-loop.h"
26 #include "qemu/module.h"
27 #include "qemu/iov.h"
28 
qio_channel_has_feature(QIOChannel * ioc,QIOChannelFeature feature)29 bool qio_channel_has_feature(QIOChannel *ioc,
30                              QIOChannelFeature feature)
31 {
32     return ioc->features & (1 << feature);
33 }
34 
35 
qio_channel_set_feature(QIOChannel * ioc,QIOChannelFeature feature)36 void qio_channel_set_feature(QIOChannel *ioc,
37                              QIOChannelFeature feature)
38 {
39     ioc->features |= (1 << feature);
40 }
41 
42 
qio_channel_set_name(QIOChannel * ioc,const char * name)43 void qio_channel_set_name(QIOChannel *ioc,
44                           const char *name)
45 {
46     g_free(ioc->name);
47     ioc->name = g_strdup(name);
48 }
49 
50 
qio_channel_readv_full(QIOChannel * ioc,const struct iovec * iov,size_t niov,int ** fds,size_t * nfds,int flags,Error ** errp)51 ssize_t qio_channel_readv_full(QIOChannel *ioc,
52                                const struct iovec *iov,
53                                size_t niov,
54                                int **fds,
55                                size_t *nfds,
56                                int flags,
57                                Error **errp)
58 {
59     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
60 
61     if ((fds || nfds) &&
62         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
63         error_setg_errno(errp, EINVAL,
64                          "Channel does not support file descriptor passing");
65         return -1;
66     }
67 
68     if ((flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) &&
69         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_READ_MSG_PEEK)) {
70         error_setg_errno(errp, EINVAL,
71                          "Channel does not support peek read");
72         return -1;
73     }
74 
75     return klass->io_readv(ioc, iov, niov, fds, nfds, flags, errp);
76 }
77 
78 
qio_channel_writev_full(QIOChannel * ioc,const struct iovec * iov,size_t niov,int * fds,size_t nfds,int flags,Error ** errp)79 ssize_t qio_channel_writev_full(QIOChannel *ioc,
80                                 const struct iovec *iov,
81                                 size_t niov,
82                                 int *fds,
83                                 size_t nfds,
84                                 int flags,
85                                 Error **errp)
86 {
87     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
88 
89     if (fds || nfds) {
90         if (!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
91             error_setg_errno(errp, EINVAL,
92                              "Channel does not support file descriptor passing");
93             return -1;
94         }
95         if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
96             error_setg_errno(errp, EINVAL,
97                              "Zero Copy does not support file descriptor passing");
98             return -1;
99         }
100     }
101 
102     if ((flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) &&
103         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
104         error_setg_errno(errp, EINVAL,
105                          "Requested Zero Copy feature is not available");
106         return -1;
107     }
108 
109     return klass->io_writev(ioc, iov, niov, fds, nfds, flags, errp);
110 }
111 
112 
qio_channel_readv_all_eof(QIOChannel * ioc,const struct iovec * iov,size_t niov,Error ** errp)113 int coroutine_mixed_fn qio_channel_readv_all_eof(QIOChannel *ioc,
114                                                  const struct iovec *iov,
115                                                  size_t niov,
116                                                  Error **errp)
117 {
118     return qio_channel_readv_full_all_eof(ioc, iov, niov, NULL, NULL, 0,
119                                           errp);
120 }
121 
qio_channel_readv_all(QIOChannel * ioc,const struct iovec * iov,size_t niov,Error ** errp)122 int coroutine_mixed_fn qio_channel_readv_all(QIOChannel *ioc,
123                                              const struct iovec *iov,
124                                              size_t niov,
125                                              Error **errp)
126 {
127     return qio_channel_readv_full_all(ioc, iov, niov, NULL, NULL, errp);
128 }
129 
qio_channel_readv_full_all_eof(QIOChannel * ioc,const struct iovec * iov,size_t niov,int ** fds,size_t * nfds,int flags,Error ** errp)130 int coroutine_mixed_fn qio_channel_readv_full_all_eof(QIOChannel *ioc,
131                                                       const struct iovec *iov,
132                                                       size_t niov,
133                                                       int **fds, size_t *nfds,
134                                                       int flags,
135                                                       Error **errp)
136 {
137     int ret = -1;
138     struct iovec *local_iov = g_new(struct iovec, niov);
139     struct iovec *local_iov_head = local_iov;
140     unsigned int nlocal_iov = niov;
141     int **local_fds = fds;
142     size_t *local_nfds = nfds;
143     bool partial = false;
144 
145     if (nfds) {
146         *nfds = 0;
147     }
148 
149     if (fds) {
150         *fds = NULL;
151     }
152 
153     nlocal_iov = iov_copy(local_iov, nlocal_iov,
154                           iov, niov,
155                           0, iov_size(iov, niov));
156 
157     while ((nlocal_iov > 0) || local_fds) {
158         ssize_t len;
159         len = qio_channel_readv_full(ioc, local_iov, nlocal_iov, local_fds,
160                                      local_nfds, flags, errp);
161         if (len == QIO_CHANNEL_ERR_BLOCK) {
162             if (qemu_in_coroutine()) {
163                 qio_channel_yield(ioc, G_IO_IN);
164             } else {
165                 qio_channel_wait(ioc, G_IO_IN);
166             }
167             continue;
168         }
169 
170         if (len == 0) {
171             if (local_nfds && *local_nfds) {
172                 /*
173                  * Got some FDs, but no data yet. This isn't an EOF
174                  * scenario (yet), so carry on to try to read data
175                  * on next loop iteration
176                  */
177                 goto next_iter;
178             } else if (!partial) {
179                 /* No fds and no data - EOF before any data read */
180                 ret = 0;
181                 goto cleanup;
182             } else {
183                 len = -1;
184                 error_setg(errp,
185                            "Unexpected end-of-file before all data were read");
186                 /* Fallthrough into len < 0 handling */
187             }
188         }
189 
190         if (len < 0) {
191             /* Close any FDs we previously received */
192             if (nfds && fds) {
193                 size_t i;
194                 for (i = 0; i < (*nfds); i++) {
195                     close((*fds)[i]);
196                 }
197                 g_free(*fds);
198                 *fds = NULL;
199                 *nfds = 0;
200             }
201             goto cleanup;
202         }
203 
204         if (nlocal_iov) {
205             iov_discard_front(&local_iov, &nlocal_iov, len);
206         }
207 
208 next_iter:
209         partial = true;
210         local_fds = NULL;
211         local_nfds = NULL;
212     }
213 
214     ret = 1;
215 
216  cleanup:
217     g_free(local_iov_head);
218     return ret;
219 }
220 
qio_channel_readv_full_all(QIOChannel * ioc,const struct iovec * iov,size_t niov,int ** fds,size_t * nfds,Error ** errp)221 int coroutine_mixed_fn qio_channel_readv_full_all(QIOChannel *ioc,
222                                                   const struct iovec *iov,
223                                                   size_t niov,
224                                                   int **fds, size_t *nfds,
225                                                   Error **errp)
226 {
227     int ret = qio_channel_readv_full_all_eof(ioc, iov, niov, fds, nfds, 0,
228                                              errp);
229 
230     if (ret == 0) {
231         error_setg(errp, "Unexpected end-of-file before all data were read");
232         return -1;
233     }
234     if (ret == 1) {
235         return 0;
236     }
237 
238     return ret;
239 }
240 
qio_channel_writev_all(QIOChannel * ioc,const struct iovec * iov,size_t niov,Error ** errp)241 int coroutine_mixed_fn qio_channel_writev_all(QIOChannel *ioc,
242                                               const struct iovec *iov,
243                                               size_t niov,
244                                               Error **errp)
245 {
246     return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, 0, errp);
247 }
248 
qio_channel_writev_full_all(QIOChannel * ioc,const struct iovec * iov,size_t niov,int * fds,size_t nfds,int flags,Error ** errp)249 int coroutine_mixed_fn qio_channel_writev_full_all(QIOChannel *ioc,
250                                                    const struct iovec *iov,
251                                                    size_t niov,
252                                                    int *fds, size_t nfds,
253                                                    int flags, Error **errp)
254 {
255     int ret = -1;
256     struct iovec *local_iov = g_new(struct iovec, niov);
257     struct iovec *local_iov_head = local_iov;
258     unsigned int nlocal_iov = niov;
259 
260     nlocal_iov = iov_copy(local_iov, nlocal_iov,
261                           iov, niov,
262                           0, iov_size(iov, niov));
263 
264     while (nlocal_iov > 0) {
265         ssize_t len;
266 
267         len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds,
268                                             nfds, flags, errp);
269 
270         if (len == QIO_CHANNEL_ERR_BLOCK) {
271             if (qemu_in_coroutine()) {
272                 qio_channel_yield(ioc, G_IO_OUT);
273             } else {
274                 qio_channel_wait(ioc, G_IO_OUT);
275             }
276             continue;
277         }
278         if (len < 0) {
279             goto cleanup;
280         }
281 
282         iov_discard_front(&local_iov, &nlocal_iov, len);
283 
284         fds = NULL;
285         nfds = 0;
286     }
287 
288     ret = 0;
289  cleanup:
290     g_free(local_iov_head);
291     return ret;
292 }
293 
qio_channel_readv(QIOChannel * ioc,const struct iovec * iov,size_t niov,Error ** errp)294 ssize_t qio_channel_readv(QIOChannel *ioc,
295                           const struct iovec *iov,
296                           size_t niov,
297                           Error **errp)
298 {
299     return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, 0, errp);
300 }
301 
302 
qio_channel_writev(QIOChannel * ioc,const struct iovec * iov,size_t niov,Error ** errp)303 ssize_t qio_channel_writev(QIOChannel *ioc,
304                            const struct iovec *iov,
305                            size_t niov,
306                            Error **errp)
307 {
308     return qio_channel_writev_full(ioc, iov, niov, NULL, 0, 0, errp);
309 }
310 
311 
qio_channel_read(QIOChannel * ioc,char * buf,size_t buflen,Error ** errp)312 ssize_t qio_channel_read(QIOChannel *ioc,
313                          char *buf,
314                          size_t buflen,
315                          Error **errp)
316 {
317     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
318     return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, 0, errp);
319 }
320 
321 
qio_channel_write(QIOChannel * ioc,const char * buf,size_t buflen,Error ** errp)322 ssize_t qio_channel_write(QIOChannel *ioc,
323                           const char *buf,
324                           size_t buflen,
325                           Error **errp)
326 {
327     struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
328     return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, 0, errp);
329 }
330 
331 
qio_channel_read_all_eof(QIOChannel * ioc,char * buf,size_t buflen,Error ** errp)332 int coroutine_mixed_fn qio_channel_read_all_eof(QIOChannel *ioc,
333                                                 char *buf,
334                                                 size_t buflen,
335                                                 Error **errp)
336 {
337     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
338     return qio_channel_readv_all_eof(ioc, &iov, 1, errp);
339 }
340 
341 
qio_channel_read_all(QIOChannel * ioc,char * buf,size_t buflen,Error ** errp)342 int coroutine_mixed_fn qio_channel_read_all(QIOChannel *ioc,
343                                             char *buf,
344                                             size_t buflen,
345                                             Error **errp)
346 {
347     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
348     return qio_channel_readv_all(ioc, &iov, 1, errp);
349 }
350 
351 
qio_channel_write_all(QIOChannel * ioc,const char * buf,size_t buflen,Error ** errp)352 int coroutine_mixed_fn qio_channel_write_all(QIOChannel *ioc,
353                                              const char *buf,
354                                              size_t buflen,
355                                              Error **errp)
356 {
357     struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
358     return qio_channel_writev_all(ioc, &iov, 1, errp);
359 }
360 
361 
qio_channel_set_blocking(QIOChannel * ioc,bool enabled,Error ** errp)362 int qio_channel_set_blocking(QIOChannel *ioc,
363                               bool enabled,
364                               Error **errp)
365 {
366     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
367     return klass->io_set_blocking(ioc, enabled, errp);
368 }
369 
370 
qio_channel_set_follow_coroutine_ctx(QIOChannel * ioc,bool enabled)371 void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled)
372 {
373     ioc->follow_coroutine_ctx = enabled;
374 }
375 
376 
qio_channel_close(QIOChannel * ioc,Error ** errp)377 int qio_channel_close(QIOChannel *ioc,
378                       Error **errp)
379 {
380     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
381     return klass->io_close(ioc, errp);
382 }
383 
384 
qio_channel_create_watch(QIOChannel * ioc,GIOCondition condition)385 GSource *qio_channel_create_watch(QIOChannel *ioc,
386                                   GIOCondition condition)
387 {
388     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
389     GSource *ret = klass->io_create_watch(ioc, condition);
390 
391     if (ioc->name) {
392         g_source_set_name(ret, ioc->name);
393     }
394 
395     return ret;
396 }
397 
398 
qio_channel_set_aio_fd_handler(QIOChannel * ioc,AioContext * read_ctx,IOHandler * io_read,AioContext * write_ctx,IOHandler * io_write,void * opaque)399 void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
400                                     AioContext *read_ctx,
401                                     IOHandler *io_read,
402                                     AioContext *write_ctx,
403                                     IOHandler *io_write,
404                                     void *opaque)
405 {
406     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
407 
408     klass->io_set_aio_fd_handler(ioc, read_ctx, io_read, write_ctx, io_write,
409             opaque);
410 }
411 
qio_channel_add_watch_full(QIOChannel * ioc,GIOCondition condition,QIOChannelFunc func,gpointer user_data,GDestroyNotify notify,GMainContext * context)412 guint qio_channel_add_watch_full(QIOChannel *ioc,
413                                  GIOCondition condition,
414                                  QIOChannelFunc func,
415                                  gpointer user_data,
416                                  GDestroyNotify notify,
417                                  GMainContext *context)
418 {
419     GSource *source;
420     guint id;
421 
422     source = qio_channel_create_watch(ioc, condition);
423 
424     g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
425 
426     id = g_source_attach(source, context);
427     g_source_unref(source);
428 
429     return id;
430 }
431 
qio_channel_add_watch(QIOChannel * ioc,GIOCondition condition,QIOChannelFunc func,gpointer user_data,GDestroyNotify notify)432 guint qio_channel_add_watch(QIOChannel *ioc,
433                             GIOCondition condition,
434                             QIOChannelFunc func,
435                             gpointer user_data,
436                             GDestroyNotify notify)
437 {
438     return qio_channel_add_watch_full(ioc, condition, func,
439                                       user_data, notify, NULL);
440 }
441 
qio_channel_add_watch_source(QIOChannel * ioc,GIOCondition condition,QIOChannelFunc func,gpointer user_data,GDestroyNotify notify,GMainContext * context)442 GSource *qio_channel_add_watch_source(QIOChannel *ioc,
443                                       GIOCondition condition,
444                                       QIOChannelFunc func,
445                                       gpointer user_data,
446                                       GDestroyNotify notify,
447                                       GMainContext *context)
448 {
449     GSource *source;
450     guint id;
451 
452     id = qio_channel_add_watch_full(ioc, condition, func,
453                                     user_data, notify, context);
454     source = g_main_context_find_source_by_id(context, id);
455     g_source_ref(source);
456     return source;
457 }
458 
459 
qio_channel_pwritev(QIOChannel * ioc,const struct iovec * iov,size_t niov,off_t offset,Error ** errp)460 ssize_t qio_channel_pwritev(QIOChannel *ioc, const struct iovec *iov,
461                             size_t niov, off_t offset, Error **errp)
462 {
463     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
464 
465     if (!klass->io_pwritev) {
466         error_setg(errp, "Channel does not support pwritev");
467         return -1;
468     }
469 
470     if (!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_SEEKABLE)) {
471         error_setg_errno(errp, EINVAL, "Requested channel is not seekable");
472         return -1;
473     }
474 
475     return klass->io_pwritev(ioc, iov, niov, offset, errp);
476 }
477 
qio_channel_pwrite(QIOChannel * ioc,char * buf,size_t buflen,off_t offset,Error ** errp)478 ssize_t qio_channel_pwrite(QIOChannel *ioc, char *buf, size_t buflen,
479                            off_t offset, Error **errp)
480 {
481     struct iovec iov = {
482         .iov_base = buf,
483         .iov_len = buflen
484     };
485 
486     return qio_channel_pwritev(ioc, &iov, 1, offset, errp);
487 }
488 
qio_channel_preadv(QIOChannel * ioc,const struct iovec * iov,size_t niov,off_t offset,Error ** errp)489 ssize_t qio_channel_preadv(QIOChannel *ioc, const struct iovec *iov,
490                            size_t niov, off_t offset, Error **errp)
491 {
492     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
493 
494     if (!klass->io_preadv) {
495         error_setg(errp, "Channel does not support preadv");
496         return -1;
497     }
498 
499     if (!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_SEEKABLE)) {
500         error_setg_errno(errp, EINVAL, "Requested channel is not seekable");
501         return -1;
502     }
503 
504     return klass->io_preadv(ioc, iov, niov, offset, errp);
505 }
506 
qio_channel_pread(QIOChannel * ioc,char * buf,size_t buflen,off_t offset,Error ** errp)507 ssize_t qio_channel_pread(QIOChannel *ioc, char *buf, size_t buflen,
508                           off_t offset, Error **errp)
509 {
510     struct iovec iov = {
511         .iov_base = buf,
512         .iov_len = buflen
513     };
514 
515     return qio_channel_preadv(ioc, &iov, 1, offset, errp);
516 }
517 
qio_channel_shutdown(QIOChannel * ioc,QIOChannelShutdown how,Error ** errp)518 int qio_channel_shutdown(QIOChannel *ioc,
519                          QIOChannelShutdown how,
520                          Error **errp)
521 {
522     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
523 
524     if (!klass->io_shutdown) {
525         error_setg(errp, "Data path shutdown not supported");
526         return -1;
527     }
528 
529     return klass->io_shutdown(ioc, how, errp);
530 }
531 
532 
qio_channel_set_delay(QIOChannel * ioc,bool enabled)533 void qio_channel_set_delay(QIOChannel *ioc,
534                            bool enabled)
535 {
536     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
537 
538     if (klass->io_set_delay) {
539         klass->io_set_delay(ioc, enabled);
540     }
541 }
542 
543 
qio_channel_set_cork(QIOChannel * ioc,bool enabled)544 void qio_channel_set_cork(QIOChannel *ioc,
545                           bool enabled)
546 {
547     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
548 
549     if (klass->io_set_cork) {
550         klass->io_set_cork(ioc, enabled);
551     }
552 }
553 
qio_channel_get_peerpid(QIOChannel * ioc,unsigned int * pid,Error ** errp)554 int qio_channel_get_peerpid(QIOChannel *ioc,
555                              unsigned int *pid,
556                              Error **errp)
557 {
558     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
559 
560     if (!klass->io_peerpid) {
561         error_setg(errp, "Channel does not support peer pid");
562         return -1;
563     }
564     klass->io_peerpid(ioc, pid, errp);
565     return 0;
566 }
567 
qio_channel_io_seek(QIOChannel * ioc,off_t offset,int whence,Error ** errp)568 off_t qio_channel_io_seek(QIOChannel *ioc,
569                           off_t offset,
570                           int whence,
571                           Error **errp)
572 {
573     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
574 
575     if (!klass->io_seek) {
576         error_setg(errp, "Channel does not support random access");
577         return -1;
578     }
579 
580     return klass->io_seek(ioc, offset, whence, errp);
581 }
582 
qio_channel_flush(QIOChannel * ioc,Error ** errp)583 int qio_channel_flush(QIOChannel *ioc,
584                                 Error **errp)
585 {
586     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
587 
588     if (!klass->io_flush ||
589         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
590         return 0;
591     }
592 
593     return klass->io_flush(ioc, errp);
594 }
595 
596 
qio_channel_restart_read(void * opaque)597 static void qio_channel_restart_read(void *opaque)
598 {
599     QIOChannel *ioc = opaque;
600     Coroutine *co = qatomic_xchg(&ioc->read_coroutine, NULL);
601 
602     if (!co) {
603         return;
604     }
605 
606     /* Assert that aio_co_wake() reenters the coroutine directly */
607     assert(qemu_get_current_aio_context() ==
608            qemu_coroutine_get_aio_context(co));
609     aio_co_wake(co);
610 }
611 
qio_channel_restart_write(void * opaque)612 static void qio_channel_restart_write(void *opaque)
613 {
614     QIOChannel *ioc = opaque;
615     Coroutine *co = qatomic_xchg(&ioc->write_coroutine, NULL);
616 
617     if (!co) {
618         return;
619     }
620 
621     /* Assert that aio_co_wake() reenters the coroutine directly */
622     assert(qemu_get_current_aio_context() ==
623            qemu_coroutine_get_aio_context(co));
624     aio_co_wake(co);
625 }
626 
627 static void coroutine_fn
qio_channel_set_fd_handlers(QIOChannel * ioc,GIOCondition condition)628 qio_channel_set_fd_handlers(QIOChannel *ioc, GIOCondition condition)
629 {
630     AioContext *ctx = ioc->follow_coroutine_ctx ?
631         qemu_coroutine_get_aio_context(qemu_coroutine_self()) :
632         iohandler_get_aio_context();
633     AioContext *read_ctx = NULL;
634     IOHandler *io_read = NULL;
635     AioContext *write_ctx = NULL;
636     IOHandler *io_write = NULL;
637 
638     if (condition == G_IO_IN) {
639         ioc->read_coroutine = qemu_coroutine_self();
640         ioc->read_ctx = ctx;
641         read_ctx = ctx;
642         io_read = qio_channel_restart_read;
643 
644         /*
645          * Thread safety: if the other coroutine is set and its AioContext
646          * matches ours, then there is mutual exclusion between read and write
647          * because they share a single thread and it's safe to set both read
648          * and write fd handlers here. If the AioContext does not match ours,
649          * then both threads may run in parallel but there is no shared state
650          * to worry about.
651          */
652         if (ioc->write_coroutine && ioc->write_ctx == ctx) {
653             write_ctx = ctx;
654             io_write = qio_channel_restart_write;
655         }
656     } else if (condition == G_IO_OUT) {
657         ioc->write_coroutine = qemu_coroutine_self();
658         ioc->write_ctx = ctx;
659         write_ctx = ctx;
660         io_write = qio_channel_restart_write;
661         if (ioc->read_coroutine && ioc->read_ctx == ctx) {
662             read_ctx = ctx;
663             io_read = qio_channel_restart_read;
664         }
665     } else {
666         abort();
667     }
668 
669     qio_channel_set_aio_fd_handler(ioc, read_ctx, io_read,
670             write_ctx, io_write, ioc);
671 }
672 
673 static void coroutine_fn
qio_channel_clear_fd_handlers(QIOChannel * ioc,GIOCondition condition)674 qio_channel_clear_fd_handlers(QIOChannel *ioc, GIOCondition condition)
675 {
676     AioContext *read_ctx = NULL;
677     IOHandler *io_read = NULL;
678     AioContext *write_ctx = NULL;
679     IOHandler *io_write = NULL;
680     AioContext *ctx;
681 
682     if (condition == G_IO_IN) {
683         ctx = ioc->read_ctx;
684         read_ctx = ctx;
685         io_read = NULL;
686         if (ioc->write_coroutine && ioc->write_ctx == ctx) {
687             write_ctx = ctx;
688             io_write = qio_channel_restart_write;
689         }
690     } else if (condition == G_IO_OUT) {
691         ctx = ioc->write_ctx;
692         write_ctx = ctx;
693         io_write = NULL;
694         if (ioc->read_coroutine && ioc->read_ctx == ctx) {
695             read_ctx = ctx;
696             io_read = qio_channel_restart_read;
697         }
698     } else {
699         abort();
700     }
701 
702     qio_channel_set_aio_fd_handler(ioc, read_ctx, io_read,
703             write_ctx, io_write, ioc);
704 }
705 
qio_channel_yield(QIOChannel * ioc,GIOCondition condition)706 void coroutine_fn qio_channel_yield(QIOChannel *ioc,
707                                     GIOCondition condition)
708 {
709     AioContext *ioc_ctx;
710 
711     assert(qemu_in_coroutine());
712     ioc_ctx = qemu_coroutine_get_aio_context(qemu_coroutine_self());
713 
714     if (condition == G_IO_IN) {
715         assert(!ioc->read_coroutine);
716     } else if (condition == G_IO_OUT) {
717         assert(!ioc->write_coroutine);
718     } else {
719         abort();
720     }
721     qio_channel_set_fd_handlers(ioc, condition);
722     qemu_coroutine_yield();
723     assert(in_aio_context_home_thread(ioc_ctx));
724 
725     /* Allow interrupting the operation by reentering the coroutine other than
726      * through the aio_fd_handlers. */
727     if (condition == G_IO_IN) {
728         assert(ioc->read_coroutine == NULL);
729     } else if (condition == G_IO_OUT) {
730         assert(ioc->write_coroutine == NULL);
731     }
732     qio_channel_clear_fd_handlers(ioc, condition);
733 }
734 
qio_channel_wake_read(QIOChannel * ioc)735 void qio_channel_wake_read(QIOChannel *ioc)
736 {
737     Coroutine *co = qatomic_xchg(&ioc->read_coroutine, NULL);
738     if (co) {
739         aio_co_wake(co);
740     }
741 }
742 
qio_channel_wait_complete(QIOChannel * ioc,GIOCondition condition,gpointer opaque)743 static gboolean qio_channel_wait_complete(QIOChannel *ioc,
744                                           GIOCondition condition,
745                                           gpointer opaque)
746 {
747     GMainLoop *loop = opaque;
748 
749     g_main_loop_quit(loop);
750     return FALSE;
751 }
752 
753 
qio_channel_wait(QIOChannel * ioc,GIOCondition condition)754 void qio_channel_wait(QIOChannel *ioc,
755                       GIOCondition condition)
756 {
757     GMainContext *ctxt = g_main_context_new();
758     GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
759     GSource *source;
760 
761     source = qio_channel_create_watch(ioc, condition);
762 
763     g_source_set_callback(source,
764                           (GSourceFunc)qio_channel_wait_complete,
765                           loop,
766                           NULL);
767 
768     g_source_attach(source, ctxt);
769 
770     g_main_loop_run(loop);
771 
772     g_source_unref(source);
773     g_main_loop_unref(loop);
774     g_main_context_unref(ctxt);
775 }
776 
777 
qio_channel_finalize(Object * obj)778 static void qio_channel_finalize(Object *obj)
779 {
780     QIOChannel *ioc = QIO_CHANNEL(obj);
781 
782     /* Must not have coroutines in qio_channel_yield() */
783     assert(!ioc->read_coroutine);
784     assert(!ioc->write_coroutine);
785 
786     g_free(ioc->name);
787 
788 #ifdef _WIN32
789     if (ioc->event) {
790         CloseHandle(ioc->event);
791     }
792 #endif
793 }
794 
795 static const TypeInfo qio_channel_info = {
796     .parent = TYPE_OBJECT,
797     .name = TYPE_QIO_CHANNEL,
798     .instance_size = sizeof(QIOChannel),
799     .instance_finalize = qio_channel_finalize,
800     .abstract = true,
801     .class_size = sizeof(QIOChannelClass),
802 };
803 
804 
qio_channel_register_types(void)805 static void qio_channel_register_types(void)
806 {
807     type_register_static(&qio_channel_info);
808 }
809 
810 
811 type_init(qio_channel_register_types);
812