xref: /qemu/hw/vfio-user/proxy.c (revision 98a906d9e5827b18c51d7d7485be2f21f8900cc7)
1 /*
2  * vfio protocol over a UNIX socket.
3  *
4  * Copyright © 2018, 2021 Oracle and/or its affiliates.
5  *
6  * SPDX-License-Identifier: GPL-2.0-or-later
7  */
8 
9 #include "qemu/osdep.h"
10 #include <sys/ioctl.h>
11 
12 #include "hw/vfio/vfio-device.h"
13 #include "hw/vfio-user/proxy.h"
14 #include "hw/vfio-user/trace.h"
15 #include "qapi/error.h"
16 #include "qobject/qdict.h"
17 #include "qobject/qjson.h"
18 #include "qobject/qnum.h"
19 #include "qemu/error-report.h"
20 #include "qemu/lockable.h"
21 #include "qemu/main-loop.h"
22 #include "system/iothread.h"
23 
24 static IOThread *vfio_user_iothread;
25 
26 static void vfio_user_shutdown(VFIOUserProxy *proxy);
27 static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
28                                      VFIOUserFDs *fds);
29 static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg);
30 
31 static void vfio_user_recv(void *opaque);
32 static void vfio_user_send(void *opaque);
33 static void vfio_user_cb(void *opaque);
34 
35 static void vfio_user_request(void *opaque);
36 
37 static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
38 {
39     hdr->flags |= VFIO_USER_ERROR;
40     hdr->error_reply = err;
41 }
42 
43 /*
44  * Functions called by main, CPU, or iothread threads
45  */
46 
47 static void vfio_user_shutdown(VFIOUserProxy *proxy)
48 {
49     qio_channel_shutdown(proxy->ioc, QIO_CHANNEL_SHUTDOWN_READ, NULL);
50     qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL,
51                                    proxy->ctx, NULL, NULL);
52 }
53 
54 /*
55  * Same return values as qio_channel_writev_full():
56  *
57  * QIO_CHANNEL_ERR_BLOCK: *errp not set
58  * -1: *errp will be populated
59  * otherwise: bytes written
60  */
61 static ssize_t vfio_user_send_qio(VFIOUserProxy *proxy, VFIOUserMsg *msg,
62                                   Error **errp)
63 {
64     VFIOUserFDs *fds =  msg->fds;
65     struct iovec iov = {
66         .iov_base = msg->hdr,
67         .iov_len = msg->hdr->size,
68     };
69     size_t numfds = 0;
70     int *fdp = NULL;
71     ssize_t ret;
72 
73     if (fds != NULL && fds->send_fds != 0) {
74         numfds = fds->send_fds;
75         fdp = fds->fds;
76     }
77 
78     ret = qio_channel_writev_full(proxy->ioc, &iov, 1, fdp, numfds, 0, errp);
79 
80     if (ret == -1) {
81         vfio_user_set_error(msg->hdr, EIO);
82         vfio_user_shutdown(proxy);
83     }
84     trace_vfio_user_send_write(msg->hdr->id, ret);
85 
86     return ret;
87 }
88 
89 static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
90                                      VFIOUserFDs *fds)
91 {
92     VFIOUserMsg *msg;
93 
94     msg = QTAILQ_FIRST(&proxy->free);
95     if (msg != NULL) {
96         QTAILQ_REMOVE(&proxy->free, msg, next);
97     } else {
98         msg = g_malloc0(sizeof(*msg));
99         qemu_cond_init(&msg->cv);
100     }
101 
102     msg->hdr = hdr;
103     msg->fds = fds;
104     return msg;
105 }
106 
107 /*
108  * Recycle a message list entry to the free list.
109  */
110 static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg)
111 {
112     if (msg->type == VFIO_MSG_NONE) {
113         error_printf("vfio_user_recycle - freeing free msg\n");
114         return;
115     }
116 
117     /* free msg buffer if no one is waiting to consume the reply */
118     if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) {
119         g_free(msg->hdr);
120         if (msg->fds != NULL) {
121             g_free(msg->fds);
122         }
123     }
124 
125     msg->type = VFIO_MSG_NONE;
126     msg->hdr = NULL;
127     msg->fds = NULL;
128     msg->complete = false;
129     msg->pending = false;
130     QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
131 }
132 
133 VFIOUserFDs *vfio_user_getfds(int numfds)
134 {
135     VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
136 
137     fds->fds = (int *)((char *)fds + sizeof(*fds));
138 
139     return fds;
140 }
141 
142 /*
143  * Functions only called by iothread
144  */
145 
146 /*
147  * Process a received message.
148  */
149 static void vfio_user_process(VFIOUserProxy *proxy, VFIOUserMsg *msg,
150                               bool isreply)
151 {
152 
153     /*
154      * Replies signal a waiter, if none just check for errors
155      * and free the message buffer.
156      *
157      * Requests get queued for the BH.
158      */
159     if (isreply) {
160         msg->complete = true;
161         if (msg->type == VFIO_MSG_WAIT) {
162             qemu_cond_signal(&msg->cv);
163         } else {
164             if (msg->hdr->flags & VFIO_USER_ERROR) {
165                 error_printf("vfio_user_process: error reply on async ");
166                 error_printf("request command %x error %s\n",
167                              msg->hdr->command,
168                              strerror(msg->hdr->error_reply));
169             }
170             /* youngest nowait msg has been ack'd */
171             if (proxy->last_nowait == msg) {
172                 proxy->last_nowait = NULL;
173             }
174             vfio_user_recycle(proxy, msg);
175         }
176     } else {
177         QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
178         qemu_bh_schedule(proxy->req_bh);
179     }
180 }
181 
182 /*
183  * Complete a partial message read
184  */
185 static int vfio_user_complete(VFIOUserProxy *proxy, Error **errp)
186 {
187     VFIOUserMsg *msg = proxy->part_recv;
188     size_t msgleft = proxy->recv_left;
189     bool isreply;
190     char *data;
191     int ret;
192 
193     data = (char *)msg->hdr + (msg->hdr->size - msgleft);
194     while (msgleft > 0) {
195         ret = qio_channel_read(proxy->ioc, data, msgleft, errp);
196 
197         /* error or would block */
198         if (ret <= 0) {
199             /* try for rest on next iternation */
200             if (ret == QIO_CHANNEL_ERR_BLOCK) {
201                 proxy->recv_left = msgleft;
202             }
203             return ret;
204         }
205         trace_vfio_user_recv_read(msg->hdr->id, ret);
206 
207         msgleft -= ret;
208         data += ret;
209     }
210 
211     /*
212      * Read complete message, process it.
213      */
214     proxy->part_recv = NULL;
215     proxy->recv_left = 0;
216     isreply = (msg->hdr->flags & VFIO_USER_TYPE) == VFIO_USER_REPLY;
217     vfio_user_process(proxy, msg, isreply);
218 
219     /* return positive value */
220     return 1;
221 }
222 
223 /*
224  * Receive and process one incoming message.
225  *
226  * For replies, find matching outgoing request and wake any waiters.
227  * For requests, queue in incoming list and run request BH.
228  */
229 static int vfio_user_recv_one(VFIOUserProxy *proxy, Error **errp)
230 {
231     VFIOUserMsg *msg = NULL;
232     g_autofree int *fdp = NULL;
233     VFIOUserFDs *reqfds;
234     VFIOUserHdr hdr;
235     struct iovec iov = {
236         .iov_base = &hdr,
237         .iov_len = sizeof(hdr),
238     };
239     bool isreply = false;
240     int i, ret;
241     size_t msgleft, numfds = 0;
242     char *data = NULL;
243     char *buf = NULL;
244 
245     /*
246      * Complete any partial reads
247      */
248     if (proxy->part_recv != NULL) {
249         ret = vfio_user_complete(proxy, errp);
250 
251         /* still not complete, try later */
252         if (ret == QIO_CHANNEL_ERR_BLOCK) {
253             return ret;
254         }
255 
256         if (ret <= 0) {
257             goto fatal;
258         }
259         /* else fall into reading another msg */
260     }
261 
262     /*
263      * Read header
264      */
265     ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, 0,
266                                  errp);
267     if (ret == QIO_CHANNEL_ERR_BLOCK) {
268         return ret;
269     }
270 
271     /* read error or other side closed connection */
272     if (ret <= 0) {
273         goto fatal;
274     }
275 
276     if (ret < sizeof(hdr)) {
277         error_setg(errp, "short read of header");
278         goto fatal;
279     }
280 
281     /*
282      * Validate header
283      */
284     if (hdr.size < sizeof(VFIOUserHdr)) {
285         error_setg(errp, "bad header size");
286         goto fatal;
287     }
288     switch (hdr.flags & VFIO_USER_TYPE) {
289     case VFIO_USER_REQUEST:
290         isreply = false;
291         break;
292     case VFIO_USER_REPLY:
293         isreply = true;
294         break;
295     default:
296         error_setg(errp, "unknown message type");
297         goto fatal;
298     }
299     trace_vfio_user_recv_hdr(proxy->sockname, hdr.id, hdr.command, hdr.size,
300                              hdr.flags);
301 
302     /*
303      * For replies, find the matching pending request.
304      * For requests, reap incoming FDs.
305      */
306     if (isreply) {
307         QTAILQ_FOREACH(msg, &proxy->pending, next) {
308             if (hdr.id == msg->id) {
309                 break;
310             }
311         }
312         if (msg == NULL) {
313             error_setg(errp, "unexpected reply");
314             goto err;
315         }
316         QTAILQ_REMOVE(&proxy->pending, msg, next);
317 
318         /*
319          * Process any received FDs
320          */
321         if (numfds != 0) {
322             if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
323                 error_setg(errp, "unexpected FDs");
324                 goto err;
325             }
326             msg->fds->recv_fds = numfds;
327             memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
328         }
329     } else {
330         if (numfds != 0) {
331             reqfds = vfio_user_getfds(numfds);
332             memcpy(reqfds->fds, fdp, numfds * sizeof(int));
333         } else {
334             reqfds = NULL;
335         }
336     }
337 
338     /*
339      * Put the whole message into a single buffer.
340      */
341     if (isreply) {
342         if (hdr.size > msg->rsize) {
343             error_setg(errp, "reply larger than recv buffer");
344             goto err;
345         }
346         *msg->hdr = hdr;
347         data = (char *)msg->hdr + sizeof(hdr);
348     } else {
349         if (hdr.size > proxy->max_xfer_size + sizeof(VFIOUserDMARW)) {
350             error_setg(errp, "vfio_user_recv request larger than max");
351             goto err;
352         }
353         buf = g_malloc0(hdr.size);
354         memcpy(buf, &hdr, sizeof(hdr));
355         data = buf + sizeof(hdr);
356         msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
357         msg->type = VFIO_MSG_REQ;
358     }
359 
360     /*
361      * Read rest of message.
362      */
363     msgleft = hdr.size - sizeof(hdr);
364     while (msgleft > 0) {
365         ret = qio_channel_read(proxy->ioc, data, msgleft, errp);
366 
367         /* prepare to complete read on next iternation */
368         if (ret == QIO_CHANNEL_ERR_BLOCK) {
369             proxy->part_recv = msg;
370             proxy->recv_left = msgleft;
371             return ret;
372         }
373 
374         if (ret <= 0) {
375             goto fatal;
376         }
377         trace_vfio_user_recv_read(hdr.id, ret);
378 
379         msgleft -= ret;
380         data += ret;
381     }
382 
383     vfio_user_process(proxy, msg, isreply);
384     return 0;
385 
386     /*
387      * fatal means the other side closed or we don't trust the stream
388      * err means this message is corrupt
389      */
390 fatal:
391     vfio_user_shutdown(proxy);
392     proxy->state = VFIO_PROXY_ERROR;
393 
394     /* set error if server side closed */
395     if (ret == 0) {
396         error_setg(errp, "server closed socket");
397     }
398 
399 err:
400     for (i = 0; i < numfds; i++) {
401         close(fdp[i]);
402     }
403     if (isreply && msg != NULL) {
404         /* force an error to keep sending thread from hanging */
405         vfio_user_set_error(msg->hdr, EINVAL);
406         msg->complete = true;
407         qemu_cond_signal(&msg->cv);
408     }
409     return -1;
410 }
411 
412 static void vfio_user_recv(void *opaque)
413 {
414     VFIOUserProxy *proxy = opaque;
415 
416     QEMU_LOCK_GUARD(&proxy->lock);
417 
418     if (proxy->state == VFIO_PROXY_CONNECTED) {
419         Error *local_err = NULL;
420 
421         while (vfio_user_recv_one(proxy, &local_err) == 0) {
422             ;
423         }
424 
425         if (local_err != NULL) {
426             error_report_err(local_err);
427         }
428     }
429 }
430 
431 /*
432  * Send a single message, same return semantics as vfio_user_send_qio().
433  *
434  * Sent async messages are freed, others are moved to pending queue.
435  */
436 static ssize_t vfio_user_send_one(VFIOUserProxy *proxy, Error **errp)
437 {
438     VFIOUserMsg *msg;
439     ssize_t ret;
440 
441     msg = QTAILQ_FIRST(&proxy->outgoing);
442     ret = vfio_user_send_qio(proxy, msg, errp);
443     if (ret < 0) {
444         return ret;
445     }
446 
447     QTAILQ_REMOVE(&proxy->outgoing, msg, next);
448     if (msg->type == VFIO_MSG_ASYNC) {
449         vfio_user_recycle(proxy, msg);
450     } else {
451         QTAILQ_INSERT_TAIL(&proxy->pending, msg, next);
452         msg->pending = true;
453     }
454 
455     return ret;
456 }
457 
458 /*
459  * Send messages from outgoing queue when the socket buffer has space.
460  * If we deplete 'outgoing', remove ourselves from the poll list.
461  */
462 static void vfio_user_send(void *opaque)
463 {
464     VFIOUserProxy *proxy = opaque;
465 
466     QEMU_LOCK_GUARD(&proxy->lock);
467 
468     if (proxy->state == VFIO_PROXY_CONNECTED) {
469         while (!QTAILQ_EMPTY(&proxy->outgoing)) {
470             Error *local_err = NULL;
471             int ret;
472 
473             ret = vfio_user_send_one(proxy, &local_err);
474 
475             if (ret == QIO_CHANNEL_ERR_BLOCK) {
476                 return;
477             } else if (ret == -1) {
478                 error_report_err(local_err);
479                 return;
480             }
481         }
482         qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
483                                        vfio_user_recv, NULL, NULL, proxy);
484     }
485 }
486 
487 static void vfio_user_cb(void *opaque)
488 {
489     VFIOUserProxy *proxy = opaque;
490 
491     QEMU_LOCK_GUARD(&proxy->lock);
492 
493     proxy->state = VFIO_PROXY_CLOSED;
494     qemu_cond_signal(&proxy->close_cv);
495 }
496 
497 
498 /*
499  * Functions called by main or CPU threads
500  */
501 
502 /*
503  * Process incoming requests.
504  *
505  * The bus-specific callback has the form:
506  *    request(opaque, msg)
507  * where 'opaque' was specified in vfio_user_set_handler
508  * and 'msg' is the inbound message.
509  *
510  * The callback is responsible for disposing of the message buffer,
511  * usually by re-using it when calling vfio_send_reply or vfio_send_error,
512  * both of which free their message buffer when the reply is sent.
513  *
514  * If the callback uses a new buffer, it needs to free the old one.
515  */
516 static void vfio_user_request(void *opaque)
517 {
518     VFIOUserProxy *proxy = opaque;
519     VFIOUserMsgQ new, free;
520     VFIOUserMsg *msg, *m1;
521 
522     /* reap all incoming */
523     QTAILQ_INIT(&new);
524     WITH_QEMU_LOCK_GUARD(&proxy->lock) {
525         QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
526             QTAILQ_REMOVE(&proxy->incoming, msg, next);
527             QTAILQ_INSERT_TAIL(&new, msg, next);
528         }
529     }
530 
531     /* process list */
532     QTAILQ_INIT(&free);
533     QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
534         QTAILQ_REMOVE(&new, msg, next);
535         trace_vfio_user_recv_request(msg->hdr->command);
536         proxy->request(proxy->req_arg, msg);
537         QTAILQ_INSERT_HEAD(&free, msg, next);
538     }
539 
540     /* free list */
541     WITH_QEMU_LOCK_GUARD(&proxy->lock) {
542         QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
543             vfio_user_recycle(proxy, msg);
544         }
545     }
546 }
547 
548 /*
549  * Messages are queued onto the proxy's outgoing list.
550  *
551  * It handles 3 types of messages:
552  *
553  * async messages - replies and posted writes
554  *
555  * There will be no reply from the server, so message
556  * buffers are freed after they're sent.
557  *
558  * nowait messages - map/unmap during address space transactions
559  *
560  * These are also sent async, but a reply is expected so that
561  * vfio_wait_reqs() can wait for the youngest nowait request.
562  * They transition from the outgoing list to the pending list
563  * when sent, and are freed when the reply is received.
564  *
565  * wait messages - all other requests
566  *
567  * The reply to these messages is waited for by their caller.
568  * They also transition from outgoing to pending when sent, but
569  * the message buffer is returned to the caller with the reply
570  * contents.  The caller is responsible for freeing these messages.
571  *
572  * As an optimization, if the outgoing list and the socket send
573  * buffer are empty, the message is sent inline instead of being
574  * added to the outgoing list.  The rest of the transitions are
575  * unchanged.
576  */
577 static bool vfio_user_send_queued(VFIOUserProxy *proxy, VFIOUserMsg *msg,
578                                   Error **errp)
579 {
580     int ret;
581 
582     /*
583      * Unsent outgoing msgs - add to tail
584      */
585     if (!QTAILQ_EMPTY(&proxy->outgoing)) {
586         QTAILQ_INSERT_TAIL(&proxy->outgoing, msg, next);
587         return true;
588     }
589 
590     /*
591      * Try inline - if blocked, queue it and kick send poller
592      */
593     if (proxy->flags & VFIO_PROXY_FORCE_QUEUED) {
594         ret = QIO_CHANNEL_ERR_BLOCK;
595     } else {
596         ret = vfio_user_send_qio(proxy, msg, errp);
597     }
598 
599     if (ret == QIO_CHANNEL_ERR_BLOCK) {
600         QTAILQ_INSERT_HEAD(&proxy->outgoing, msg, next);
601         qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
602                                        vfio_user_recv, proxy->ctx,
603                                        vfio_user_send, proxy);
604         return true;
605     }
606     if (ret == -1) {
607         return false;
608     }
609 
610     /*
611      * Sent - free async, add others to pending
612      */
613     if (msg->type == VFIO_MSG_ASYNC) {
614         vfio_user_recycle(proxy, msg);
615     } else {
616         QTAILQ_INSERT_TAIL(&proxy->pending, msg, next);
617         msg->pending = true;
618     }
619 
620     return true;
621 }
622 
623 /*
624  * nowait send - vfio_wait_reqs() can wait for it later
625  *
626  * Returns false if we did not successfully receive a reply message, in which
627  * case @errp will be populated.
628  *
629  * In either case, ownership of @hdr and @fds is taken, and the caller must
630  * *not* free them itself.
631  */
632 bool vfio_user_send_nowait(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
633                            VFIOUserFDs *fds, int rsize, Error **errp)
634 {
635     VFIOUserMsg *msg;
636 
637     QEMU_LOCK_GUARD(&proxy->lock);
638 
639     msg = vfio_user_getmsg(proxy, hdr, fds);
640     msg->id = hdr->id;
641     msg->rsize = rsize ? rsize : hdr->size;
642     msg->type = VFIO_MSG_NOWAIT;
643 
644     if (hdr->flags & VFIO_USER_NO_REPLY) {
645         error_setg_errno(errp, EINVAL, "%s on NO_REPLY message", __func__);
646         vfio_user_recycle(proxy, msg);
647         return false;
648     }
649 
650     if (!vfio_user_send_queued(proxy, msg, errp)) {
651         vfio_user_recycle(proxy, msg);
652         return false;
653     }
654 
655     proxy->last_nowait = msg;
656 
657     return true;
658 }
659 
660 /*
661  * Returns false if we did not successfully receive a reply message, in which
662  * case @errp will be populated.
663  *
664  * In either case, the caller must free @hdr and @fds if needed.
665  */
666 bool vfio_user_send_wait(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
667                          VFIOUserFDs *fds, int rsize, Error **errp)
668 {
669     VFIOUserMsg *msg;
670     bool ok = false;
671 
672     if (hdr->flags & VFIO_USER_NO_REPLY) {
673         error_setg_errno(errp, EINVAL, "%s on NO_REPLY message", __func__);
674         return false;
675     }
676 
677     qemu_mutex_lock(&proxy->lock);
678 
679     msg = vfio_user_getmsg(proxy, hdr, fds);
680     msg->id = hdr->id;
681     msg->rsize = rsize ? rsize : hdr->size;
682     msg->type = VFIO_MSG_WAIT;
683 
684     ok = vfio_user_send_queued(proxy, msg, errp);
685 
686     if (ok) {
687         while (!msg->complete) {
688             if (!qemu_cond_timedwait(&msg->cv, &proxy->lock,
689                                      proxy->wait_time)) {
690                 VFIOUserMsgQ *list;
691 
692                 list = msg->pending ? &proxy->pending : &proxy->outgoing;
693                 QTAILQ_REMOVE(list, msg, next);
694                 error_setg_errno(errp, ETIMEDOUT,
695                                  "timed out waiting for reply");
696                 ok = false;
697                 break;
698             }
699         }
700     }
701 
702     vfio_user_recycle(proxy, msg);
703 
704     qemu_mutex_unlock(&proxy->lock);
705 
706     return ok;
707 }
708 
709 /*
710  * async send - msg can be queued, but will be freed when sent
711  *
712  * Returns false on failure, in which case @errp will be populated.
713  *
714  * In either case, ownership of @hdr and @fds is taken, and the caller must
715  * *not* free them itself.
716  */
717 bool vfio_user_send_async(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
718                           VFIOUserFDs *fds, Error **errp)
719 {
720     VFIOUserMsg *msg;
721 
722     QEMU_LOCK_GUARD(&proxy->lock);
723 
724     msg = vfio_user_getmsg(proxy, hdr, fds);
725     msg->id = hdr->id;
726     msg->rsize = 0;
727     msg->type = VFIO_MSG_ASYNC;
728 
729     if (!(hdr->flags & (VFIO_USER_NO_REPLY | VFIO_USER_REPLY))) {
730         error_setg_errno(errp, EINVAL, "%s on sync message", __func__);
731         vfio_user_recycle(proxy, msg);
732         return false;
733     }
734 
735     if (!vfio_user_send_queued(proxy, msg, errp)) {
736         vfio_user_recycle(proxy, msg);
737         return false;
738     }
739 
740     return true;
741 }
742 
743 void vfio_user_wait_reqs(VFIOUserProxy *proxy)
744 {
745     VFIOUserMsg *msg;
746 
747     /*
748      * Any DMA map/unmap requests sent in the middle
749      * of a memory region transaction were sent nowait.
750      * Wait for them here.
751      */
752     qemu_mutex_lock(&proxy->lock);
753     if (proxy->last_nowait != NULL) {
754         /*
755          * Change type to WAIT to wait for reply
756          */
757         msg = proxy->last_nowait;
758         msg->type = VFIO_MSG_WAIT;
759         proxy->last_nowait = NULL;
760         while (!msg->complete) {
761             if (!qemu_cond_timedwait(&msg->cv, &proxy->lock,
762                                      proxy->wait_time)) {
763                 VFIOUserMsgQ *list;
764 
765                 list = msg->pending ? &proxy->pending : &proxy->outgoing;
766                 QTAILQ_REMOVE(list, msg, next);
767                 error_printf("vfio_wait_reqs - timed out\n");
768                 break;
769             }
770         }
771 
772         if (msg->hdr->flags & VFIO_USER_ERROR) {
773             error_printf("vfio_user_wait_reqs - error reply on async ");
774             error_printf("request: command %x error %s\n", msg->hdr->command,
775                          strerror(msg->hdr->error_reply));
776         }
777 
778         /*
779          * Change type back to NOWAIT to free
780          */
781         msg->type = VFIO_MSG_NOWAIT;
782         vfio_user_recycle(proxy, msg);
783     }
784 
785     qemu_mutex_unlock(&proxy->lock);
786 }
787 
788 /*
789  * Reply to an incoming request.
790  */
791 void vfio_user_send_reply(VFIOUserProxy *proxy, VFIOUserHdr *hdr, int size)
792 {
793     Error *local_err = NULL;
794 
795     if (size < sizeof(VFIOUserHdr)) {
796         error_printf("%s: size too small", __func__);
797         g_free(hdr);
798         return;
799     }
800 
801     /*
802      * convert header to associated reply
803      */
804     hdr->flags = VFIO_USER_REPLY;
805     hdr->size = size;
806 
807     if (!vfio_user_send_async(proxy, hdr, NULL, &local_err)) {
808         error_report_err(local_err);
809     }
810 }
811 
812 /*
813  * Send an error reply to an incoming request.
814  */
815 void vfio_user_send_error(VFIOUserProxy *proxy, VFIOUserHdr *hdr, int error)
816 {
817     Error *local_err = NULL;
818 
819     /*
820      * convert header to associated reply
821      */
822     hdr->flags = VFIO_USER_REPLY;
823     hdr->flags |= VFIO_USER_ERROR;
824     hdr->error_reply = error;
825     hdr->size = sizeof(*hdr);
826 
827     if (!vfio_user_send_async(proxy, hdr, NULL, &local_err)) {
828         error_report_err(local_err);
829     }
830 }
831 
832 /*
833  * Close FDs erroneously received in an incoming request.
834  */
835 void vfio_user_putfds(VFIOUserMsg *msg)
836 {
837     VFIOUserFDs *fds = msg->fds;
838     int i;
839 
840     for (i = 0; i < fds->recv_fds; i++) {
841         close(fds->fds[i]);
842     }
843     g_free(fds);
844     msg->fds = NULL;
845 }
846 
847 void
848 vfio_user_disable_posted_writes(VFIOUserProxy *proxy)
849 {
850     WITH_QEMU_LOCK_GUARD(&proxy->lock) {
851          proxy->flags |= VFIO_PROXY_NO_POST;
852     }
853 }
854 
855 static QLIST_HEAD(, VFIOUserProxy) vfio_user_sockets =
856     QLIST_HEAD_INITIALIZER(vfio_user_sockets);
857 
858 VFIOUserProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp)
859 {
860     VFIOUserProxy *proxy;
861     QIOChannelSocket *sioc;
862     QIOChannel *ioc;
863     char *sockname;
864 
865     if (addr->type != SOCKET_ADDRESS_TYPE_UNIX) {
866         error_setg(errp, "vfio_user_connect - bad address family");
867         return NULL;
868     }
869     sockname = addr->u.q_unix.path;
870 
871     sioc = qio_channel_socket_new();
872     ioc = QIO_CHANNEL(sioc);
873     if (qio_channel_socket_connect_sync(sioc, addr, errp)) {
874         object_unref(OBJECT(ioc));
875         return NULL;
876     }
877     qio_channel_set_blocking(ioc, false, NULL);
878 
879     proxy = g_malloc0(sizeof(VFIOUserProxy));
880     proxy->sockname = g_strdup_printf("unix:%s", sockname);
881     proxy->ioc = ioc;
882 
883     /* init defaults */
884     proxy->max_xfer_size = VFIO_USER_DEF_MAX_XFER;
885     proxy->max_send_fds = VFIO_USER_DEF_MAX_FDS;
886     proxy->max_dma = VFIO_USER_DEF_MAP_MAX;
887     proxy->dma_pgsizes = VFIO_USER_DEF_PGSIZE;
888     proxy->max_bitmap = VFIO_USER_DEF_MAX_BITMAP;
889     proxy->migr_pgsize = VFIO_USER_DEF_PGSIZE;
890 
891     proxy->flags = VFIO_PROXY_CLIENT;
892     proxy->state = VFIO_PROXY_CONNECTED;
893 
894     qemu_mutex_init(&proxy->lock);
895     qemu_cond_init(&proxy->close_cv);
896 
897     if (vfio_user_iothread == NULL) {
898         vfio_user_iothread = iothread_create("VFIO user", errp);
899     }
900 
901     proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
902     proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
903 
904     QTAILQ_INIT(&proxy->outgoing);
905     QTAILQ_INIT(&proxy->incoming);
906     QTAILQ_INIT(&proxy->free);
907     QTAILQ_INIT(&proxy->pending);
908     QLIST_INSERT_HEAD(&vfio_user_sockets, proxy, next);
909 
910     return proxy;
911 }
912 
913 void vfio_user_set_handler(VFIODevice *vbasedev,
914                            void (*handler)(void *opaque, VFIOUserMsg *msg),
915                            void *req_arg)
916 {
917     VFIOUserProxy *proxy = vbasedev->proxy;
918 
919     proxy->request = handler;
920     proxy->req_arg = req_arg;
921     qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
922                                    vfio_user_recv, NULL, NULL, proxy);
923 }
924 
925 void vfio_user_disconnect(VFIOUserProxy *proxy)
926 {
927     VFIOUserMsg *r1, *r2;
928 
929     qemu_mutex_lock(&proxy->lock);
930 
931     /* our side is quitting */
932     if (proxy->state == VFIO_PROXY_CONNECTED) {
933         vfio_user_shutdown(proxy);
934         if (!QTAILQ_EMPTY(&proxy->pending)) {
935             error_printf("vfio_user_disconnect: outstanding requests\n");
936         }
937     }
938     object_unref(OBJECT(proxy->ioc));
939     proxy->ioc = NULL;
940     qemu_bh_delete(proxy->req_bh);
941     proxy->req_bh = NULL;
942 
943     proxy->state = VFIO_PROXY_CLOSING;
944     QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
945         qemu_cond_destroy(&r1->cv);
946         QTAILQ_REMOVE(&proxy->outgoing, r1, next);
947         g_free(r1);
948     }
949     QTAILQ_FOREACH_SAFE(r1, &proxy->incoming, next, r2) {
950         qemu_cond_destroy(&r1->cv);
951         QTAILQ_REMOVE(&proxy->incoming, r1, next);
952         g_free(r1);
953     }
954     QTAILQ_FOREACH_SAFE(r1, &proxy->pending, next, r2) {
955         qemu_cond_destroy(&r1->cv);
956         QTAILQ_REMOVE(&proxy->pending, r1, next);
957         g_free(r1);
958     }
959     QTAILQ_FOREACH_SAFE(r1, &proxy->free, next, r2) {
960         qemu_cond_destroy(&r1->cv);
961         QTAILQ_REMOVE(&proxy->free, r1, next);
962         g_free(r1);
963     }
964 
965     /*
966      * Make sure the iothread isn't blocking anywhere
967      * with a ref to this proxy by waiting for a BH
968      * handler to run after the proxy fd handlers were
969      * deleted above.
970      */
971     aio_bh_schedule_oneshot(proxy->ctx, vfio_user_cb, proxy);
972     qemu_cond_wait(&proxy->close_cv, &proxy->lock);
973 
974     /* we now hold the only ref to proxy */
975     qemu_mutex_unlock(&proxy->lock);
976     qemu_cond_destroy(&proxy->close_cv);
977     qemu_mutex_destroy(&proxy->lock);
978 
979     QLIST_REMOVE(proxy, next);
980     if (QLIST_EMPTY(&vfio_user_sockets)) {
981         iothread_destroy(vfio_user_iothread);
982         vfio_user_iothread = NULL;
983     }
984 
985     g_free(proxy->sockname);
986     g_free(proxy);
987 }
988 
989 void vfio_user_request_msg(VFIOUserHdr *hdr, uint16_t cmd,
990                            uint32_t size, uint32_t flags)
991 {
992     static uint16_t next_id;
993 
994     hdr->id = qatomic_fetch_inc(&next_id);
995     hdr->command = cmd;
996     hdr->size = size;
997     hdr->flags = (flags & ~VFIO_USER_TYPE) | VFIO_USER_REQUEST;
998     hdr->error_reply = 0;
999 }
1000 
1001 struct cap_entry {
1002     const char *name;
1003     bool (*check)(VFIOUserProxy *proxy, QObject *qobj, Error **errp);
1004 };
1005 
1006 static bool caps_parse(VFIOUserProxy *proxy, QDict *qdict,
1007                        struct cap_entry caps[], Error **errp)
1008 {
1009     QObject *qobj;
1010     struct cap_entry *p;
1011 
1012     for (p = caps; p->name != NULL; p++) {
1013         qobj = qdict_get(qdict, p->name);
1014         if (qobj != NULL) {
1015             if (!p->check(proxy, qobj, errp)) {
1016                 return false;
1017             }
1018             qdict_del(qdict, p->name);
1019         }
1020     }
1021 
1022     /* warning, for now */
1023     if (qdict_size(qdict) != 0) {
1024         warn_report("spurious capabilities");
1025     }
1026     return true;
1027 }
1028 
1029 static bool check_migr_pgsize(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1030 {
1031     QNum *qn = qobject_to(QNum, qobj);
1032     uint64_t pgsize;
1033 
1034     if (qn == NULL || !qnum_get_try_uint(qn, &pgsize)) {
1035         error_setg(errp, "malformed %s", VFIO_USER_CAP_PGSIZE);
1036         return false;
1037     }
1038 
1039     /* must be larger than default */
1040     if (pgsize & (VFIO_USER_DEF_PGSIZE - 1)) {
1041         error_setg(errp, "pgsize 0x%"PRIx64" too small", pgsize);
1042         return false;
1043     }
1044 
1045     proxy->migr_pgsize = pgsize;
1046     return true;
1047 }
1048 
1049 static bool check_bitmap(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1050 {
1051     QNum *qn = qobject_to(QNum, qobj);
1052     uint64_t bitmap_size;
1053 
1054     if (qn == NULL || !qnum_get_try_uint(qn, &bitmap_size)) {
1055         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_BITMAP);
1056         return false;
1057     }
1058 
1059     /* can only lower it */
1060     if (bitmap_size > VFIO_USER_DEF_MAX_BITMAP) {
1061         error_setg(errp, "%s too large", VFIO_USER_CAP_MAX_BITMAP);
1062         return false;
1063     }
1064 
1065     proxy->max_bitmap = bitmap_size;
1066     return true;
1067 }
1068 
1069 static struct cap_entry caps_migr[] = {
1070     { VFIO_USER_CAP_PGSIZE, check_migr_pgsize },
1071     { VFIO_USER_CAP_MAX_BITMAP, check_bitmap },
1072     { NULL }
1073 };
1074 
1075 static bool check_max_fds(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1076 {
1077     QNum *qn = qobject_to(QNum, qobj);
1078     uint64_t max_send_fds;
1079 
1080     if (qn == NULL || !qnum_get_try_uint(qn, &max_send_fds) ||
1081         max_send_fds > VFIO_USER_MAX_MAX_FDS) {
1082         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_FDS);
1083         return false;
1084     }
1085     proxy->max_send_fds = max_send_fds;
1086     return true;
1087 }
1088 
1089 static bool check_max_xfer(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1090 {
1091     QNum *qn = qobject_to(QNum, qobj);
1092     uint64_t max_xfer_size;
1093 
1094     if (qn == NULL || !qnum_get_try_uint(qn, &max_xfer_size) ||
1095         max_xfer_size > VFIO_USER_MAX_MAX_XFER) {
1096         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_XFER);
1097         return false;
1098     }
1099     proxy->max_xfer_size = max_xfer_size;
1100     return true;
1101 }
1102 
1103 static bool check_pgsizes(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1104 {
1105     QNum *qn = qobject_to(QNum, qobj);
1106     uint64_t pgsizes;
1107 
1108     if (qn == NULL || !qnum_get_try_uint(qn, &pgsizes)) {
1109         error_setg(errp, "malformed %s", VFIO_USER_CAP_PGSIZES);
1110         return false;
1111     }
1112 
1113     /* must be larger than default */
1114     if (pgsizes & (VFIO_USER_DEF_PGSIZE - 1)) {
1115         error_setg(errp, "pgsize 0x%"PRIx64" too small", pgsizes);
1116         return false;
1117     }
1118 
1119     proxy->dma_pgsizes = pgsizes;
1120     return true;
1121 }
1122 
1123 static bool check_max_dma(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1124 {
1125     QNum *qn = qobject_to(QNum, qobj);
1126     uint64_t max_dma;
1127 
1128     if (qn == NULL || !qnum_get_try_uint(qn, &max_dma)) {
1129         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAP_MAX);
1130         return false;
1131     }
1132 
1133     /* can only lower it */
1134     if (max_dma > VFIO_USER_DEF_MAP_MAX) {
1135         error_setg(errp, "%s too large", VFIO_USER_CAP_MAP_MAX);
1136         return false;
1137     }
1138 
1139     proxy->max_dma = max_dma;
1140     return true;
1141 }
1142 
1143 static bool check_migr(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1144 {
1145     QDict *qdict = qobject_to(QDict, qobj);
1146 
1147     if (qdict == NULL) {
1148         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_FDS);
1149         return true;
1150     }
1151     return caps_parse(proxy, qdict, caps_migr, errp);
1152 }
1153 
1154 static struct cap_entry caps_cap[] = {
1155     { VFIO_USER_CAP_MAX_FDS, check_max_fds },
1156     { VFIO_USER_CAP_MAX_XFER, check_max_xfer },
1157     { VFIO_USER_CAP_PGSIZES, check_pgsizes },
1158     { VFIO_USER_CAP_MAP_MAX, check_max_dma },
1159     { VFIO_USER_CAP_MIGR, check_migr },
1160     { NULL }
1161 };
1162 
1163 static bool check_cap(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1164 {
1165    QDict *qdict = qobject_to(QDict, qobj);
1166 
1167     if (qdict == NULL) {
1168         error_setg(errp, "malformed %s", VFIO_USER_CAP);
1169         return false;
1170     }
1171     return caps_parse(proxy, qdict, caps_cap, errp);
1172 }
1173 
1174 static struct cap_entry ver_0_0[] = {
1175     { VFIO_USER_CAP, check_cap },
1176     { NULL }
1177 };
1178 
1179 static bool caps_check(VFIOUserProxy *proxy, int minor, const char *caps,
1180                        Error **errp)
1181 {
1182     QObject *qobj;
1183     QDict *qdict;
1184     bool ret;
1185 
1186     qobj = qobject_from_json(caps, NULL);
1187     if (qobj == NULL) {
1188         error_setg(errp, "malformed capabilities %s", caps);
1189         return false;
1190     }
1191     qdict = qobject_to(QDict, qobj);
1192     if (qdict == NULL) {
1193         error_setg(errp, "capabilities %s not an object", caps);
1194         qobject_unref(qobj);
1195         return false;
1196     }
1197     ret = caps_parse(proxy, qdict, ver_0_0, errp);
1198 
1199     qobject_unref(qobj);
1200     return ret;
1201 }
1202 
1203 static GString *caps_json(void)
1204 {
1205     QDict *dict = qdict_new();
1206     QDict *capdict = qdict_new();
1207     QDict *migdict = qdict_new();
1208     GString *str;
1209 
1210     qdict_put_int(migdict, VFIO_USER_CAP_PGSIZE, VFIO_USER_DEF_PGSIZE);
1211     qdict_put_int(migdict, VFIO_USER_CAP_MAX_BITMAP, VFIO_USER_DEF_MAX_BITMAP);
1212     qdict_put_obj(capdict, VFIO_USER_CAP_MIGR, QOBJECT(migdict));
1213 
1214     qdict_put_int(capdict, VFIO_USER_CAP_MAX_FDS, VFIO_USER_MAX_MAX_FDS);
1215     qdict_put_int(capdict, VFIO_USER_CAP_MAX_XFER, VFIO_USER_DEF_MAX_XFER);
1216     qdict_put_int(capdict, VFIO_USER_CAP_PGSIZES, VFIO_USER_DEF_PGSIZE);
1217     qdict_put_int(capdict, VFIO_USER_CAP_MAP_MAX, VFIO_USER_DEF_MAP_MAX);
1218 
1219     qdict_put_obj(dict, VFIO_USER_CAP, QOBJECT(capdict));
1220 
1221     str = qobject_to_json(QOBJECT(dict));
1222     qobject_unref(dict);
1223     return str;
1224 }
1225 
1226 bool vfio_user_validate_version(VFIOUserProxy *proxy, Error **errp)
1227 {
1228     g_autofree VFIOUserVersion *msgp = NULL;
1229     GString *caps;
1230     char *reply;
1231     int size, caplen;
1232 
1233     caps = caps_json();
1234     caplen = caps->len + 1;
1235     size = sizeof(*msgp) + caplen;
1236     msgp = g_malloc0(size);
1237 
1238     vfio_user_request_msg(&msgp->hdr, VFIO_USER_VERSION, size, 0);
1239     msgp->major = VFIO_USER_MAJOR_VER;
1240     msgp->minor = VFIO_USER_MINOR_VER;
1241     memcpy(&msgp->capabilities, caps->str, caplen);
1242     g_string_free(caps, true);
1243     trace_vfio_user_version(msgp->major, msgp->minor, msgp->capabilities);
1244 
1245     if (!vfio_user_send_wait(proxy, &msgp->hdr, NULL, 0, errp)) {
1246         return false;
1247     }
1248 
1249     if (msgp->hdr.flags & VFIO_USER_ERROR) {
1250         error_setg_errno(errp, msgp->hdr.error_reply, "version reply");
1251         return false;
1252     }
1253 
1254     if (msgp->major != VFIO_USER_MAJOR_VER ||
1255         msgp->minor > VFIO_USER_MINOR_VER) {
1256         error_setg(errp, "incompatible server version");
1257         return false;
1258     }
1259 
1260     reply = msgp->capabilities;
1261     if (reply[msgp->hdr.size - sizeof(*msgp) - 1] != '\0') {
1262         error_setg(errp, "corrupt version reply");
1263         return false;
1264     }
1265 
1266     if (!caps_check(proxy, msgp->minor, reply, errp)) {
1267         return false;
1268     }
1269 
1270     trace_vfio_user_version(msgp->major, msgp->minor, msgp->capabilities);
1271     return true;
1272 }
1273