xref: /qemu/hw/vfio-user/proxy.c (revision 18e899e63dd9d05e567c081023ad7fc5b2c5dc9a)
1 /*
2  * vfio protocol over a UNIX socket.
3  *
4  * Copyright © 2018, 2021 Oracle and/or its affiliates.
5  *
6  * SPDX-License-Identifier: GPL-2.0-or-later
7  */
8 
9 #include "qemu/osdep.h"
10 #include <sys/ioctl.h>
11 
12 #include "hw/vfio/vfio-device.h"
13 #include "hw/vfio-user/proxy.h"
14 #include "hw/vfio-user/trace.h"
15 #include "qapi/error.h"
16 #include "qobject/qdict.h"
17 #include "qobject/qjson.h"
18 #include "qobject/qnum.h"
19 #include "qemu/error-report.h"
20 #include "qemu/lockable.h"
21 #include "qemu/main-loop.h"
22 #include "system/iothread.h"
23 
24 static int wait_time = 5000;   /* wait up to 5 sec for busy servers */
25 static IOThread *vfio_user_iothread;
26 
27 static void vfio_user_shutdown(VFIOUserProxy *proxy);
28 static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
29                                      VFIOUserFDs *fds);
30 static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg);
31 
32 static void vfio_user_recv(void *opaque);
33 static void vfio_user_send(void *opaque);
34 static void vfio_user_cb(void *opaque);
35 
36 static void vfio_user_request(void *opaque);
37 
38 static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
39 {
40     hdr->flags |= VFIO_USER_ERROR;
41     hdr->error_reply = err;
42 }
43 
44 /*
45  * Functions called by main, CPU, or iothread threads
46  */
47 
48 static void vfio_user_shutdown(VFIOUserProxy *proxy)
49 {
50     qio_channel_shutdown(proxy->ioc, QIO_CHANNEL_SHUTDOWN_READ, NULL);
51     qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL,
52                                    proxy->ctx, NULL, NULL);
53 }
54 
55 /*
56  * Same return values as qio_channel_writev_full():
57  *
58  * QIO_CHANNEL_ERR_BLOCK: *errp not set
59  * -1: *errp will be populated
60  * otherwise: bytes written
61  */
62 static ssize_t vfio_user_send_qio(VFIOUserProxy *proxy, VFIOUserMsg *msg,
63                                   Error **errp)
64 {
65     VFIOUserFDs *fds =  msg->fds;
66     struct iovec iov = {
67         .iov_base = msg->hdr,
68         .iov_len = msg->hdr->size,
69     };
70     size_t numfds = 0;
71     int *fdp = NULL;
72     ssize_t ret;
73 
74     if (fds != NULL && fds->send_fds != 0) {
75         numfds = fds->send_fds;
76         fdp = fds->fds;
77     }
78 
79     ret = qio_channel_writev_full(proxy->ioc, &iov, 1, fdp, numfds, 0, errp);
80 
81     if (ret == -1) {
82         vfio_user_set_error(msg->hdr, EIO);
83         vfio_user_shutdown(proxy);
84     }
85     trace_vfio_user_send_write(msg->hdr->id, ret);
86 
87     return ret;
88 }
89 
90 static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
91                                      VFIOUserFDs *fds)
92 {
93     VFIOUserMsg *msg;
94 
95     msg = QTAILQ_FIRST(&proxy->free);
96     if (msg != NULL) {
97         QTAILQ_REMOVE(&proxy->free, msg, next);
98     } else {
99         msg = g_malloc0(sizeof(*msg));
100         qemu_cond_init(&msg->cv);
101     }
102 
103     msg->hdr = hdr;
104     msg->fds = fds;
105     return msg;
106 }
107 
108 /*
109  * Recycle a message list entry to the free list.
110  */
111 static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg)
112 {
113     if (msg->type == VFIO_MSG_NONE) {
114         error_printf("vfio_user_recycle - freeing free msg\n");
115         return;
116     }
117 
118     /* free msg buffer if no one is waiting to consume the reply */
119     if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) {
120         g_free(msg->hdr);
121         if (msg->fds != NULL) {
122             g_free(msg->fds);
123         }
124     }
125 
126     msg->type = VFIO_MSG_NONE;
127     msg->hdr = NULL;
128     msg->fds = NULL;
129     msg->complete = false;
130     msg->pending = false;
131     QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
132 }
133 
134 VFIOUserFDs *vfio_user_getfds(int numfds)
135 {
136     VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
137 
138     fds->fds = (int *)((char *)fds + sizeof(*fds));
139 
140     return fds;
141 }
142 
143 /*
144  * Functions only called by iothread
145  */
146 
147 /*
148  * Process a received message.
149  */
150 static void vfio_user_process(VFIOUserProxy *proxy, VFIOUserMsg *msg,
151                               bool isreply)
152 {
153 
154     /*
155      * Replies signal a waiter, if none just check for errors
156      * and free the message buffer.
157      *
158      * Requests get queued for the BH.
159      */
160     if (isreply) {
161         msg->complete = true;
162         if (msg->type == VFIO_MSG_WAIT) {
163             qemu_cond_signal(&msg->cv);
164         } else {
165             if (msg->hdr->flags & VFIO_USER_ERROR) {
166                 error_printf("vfio_user_process: error reply on async ");
167                 error_printf("request command %x error %s\n",
168                              msg->hdr->command,
169                              strerror(msg->hdr->error_reply));
170             }
171             /* youngest nowait msg has been ack'd */
172             if (proxy->last_nowait == msg) {
173                 proxy->last_nowait = NULL;
174             }
175             vfio_user_recycle(proxy, msg);
176         }
177     } else {
178         QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
179         qemu_bh_schedule(proxy->req_bh);
180     }
181 }
182 
183 /*
184  * Complete a partial message read
185  */
186 static int vfio_user_complete(VFIOUserProxy *proxy, Error **errp)
187 {
188     VFIOUserMsg *msg = proxy->part_recv;
189     size_t msgleft = proxy->recv_left;
190     bool isreply;
191     char *data;
192     int ret;
193 
194     data = (char *)msg->hdr + (msg->hdr->size - msgleft);
195     while (msgleft > 0) {
196         ret = qio_channel_read(proxy->ioc, data, msgleft, errp);
197 
198         /* error or would block */
199         if (ret <= 0) {
200             /* try for rest on next iternation */
201             if (ret == QIO_CHANNEL_ERR_BLOCK) {
202                 proxy->recv_left = msgleft;
203             }
204             return ret;
205         }
206         trace_vfio_user_recv_read(msg->hdr->id, ret);
207 
208         msgleft -= ret;
209         data += ret;
210     }
211 
212     /*
213      * Read complete message, process it.
214      */
215     proxy->part_recv = NULL;
216     proxy->recv_left = 0;
217     isreply = (msg->hdr->flags & VFIO_USER_TYPE) == VFIO_USER_REPLY;
218     vfio_user_process(proxy, msg, isreply);
219 
220     /* return positive value */
221     return 1;
222 }
223 
224 /*
225  * Receive and process one incoming message.
226  *
227  * For replies, find matching outgoing request and wake any waiters.
228  * For requests, queue in incoming list and run request BH.
229  */
230 static int vfio_user_recv_one(VFIOUserProxy *proxy, Error **errp)
231 {
232     VFIOUserMsg *msg = NULL;
233     g_autofree int *fdp = NULL;
234     VFIOUserFDs *reqfds;
235     VFIOUserHdr hdr;
236     struct iovec iov = {
237         .iov_base = &hdr,
238         .iov_len = sizeof(hdr),
239     };
240     bool isreply = false;
241     int i, ret;
242     size_t msgleft, numfds = 0;
243     char *data = NULL;
244     char *buf = NULL;
245 
246     /*
247      * Complete any partial reads
248      */
249     if (proxy->part_recv != NULL) {
250         ret = vfio_user_complete(proxy, errp);
251 
252         /* still not complete, try later */
253         if (ret == QIO_CHANNEL_ERR_BLOCK) {
254             return ret;
255         }
256 
257         if (ret <= 0) {
258             goto fatal;
259         }
260         /* else fall into reading another msg */
261     }
262 
263     /*
264      * Read header
265      */
266     ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, 0,
267                                  errp);
268     if (ret == QIO_CHANNEL_ERR_BLOCK) {
269         return ret;
270     }
271 
272     /* read error or other side closed connection */
273     if (ret <= 0) {
274         goto fatal;
275     }
276 
277     if (ret < sizeof(hdr)) {
278         error_setg(errp, "short read of header");
279         goto fatal;
280     }
281 
282     /*
283      * Validate header
284      */
285     if (hdr.size < sizeof(VFIOUserHdr)) {
286         error_setg(errp, "bad header size");
287         goto fatal;
288     }
289     switch (hdr.flags & VFIO_USER_TYPE) {
290     case VFIO_USER_REQUEST:
291         isreply = false;
292         break;
293     case VFIO_USER_REPLY:
294         isreply = true;
295         break;
296     default:
297         error_setg(errp, "unknown message type");
298         goto fatal;
299     }
300     trace_vfio_user_recv_hdr(proxy->sockname, hdr.id, hdr.command, hdr.size,
301                              hdr.flags);
302 
303     /*
304      * For replies, find the matching pending request.
305      * For requests, reap incoming FDs.
306      */
307     if (isreply) {
308         QTAILQ_FOREACH(msg, &proxy->pending, next) {
309             if (hdr.id == msg->id) {
310                 break;
311             }
312         }
313         if (msg == NULL) {
314             error_setg(errp, "unexpected reply");
315             goto err;
316         }
317         QTAILQ_REMOVE(&proxy->pending, msg, next);
318 
319         /*
320          * Process any received FDs
321          */
322         if (numfds != 0) {
323             if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
324                 error_setg(errp, "unexpected FDs");
325                 goto err;
326             }
327             msg->fds->recv_fds = numfds;
328             memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
329         }
330     } else {
331         if (numfds != 0) {
332             reqfds = vfio_user_getfds(numfds);
333             memcpy(reqfds->fds, fdp, numfds * sizeof(int));
334         } else {
335             reqfds = NULL;
336         }
337     }
338 
339     /*
340      * Put the whole message into a single buffer.
341      */
342     if (isreply) {
343         if (hdr.size > msg->rsize) {
344             error_setg(errp, "reply larger than recv buffer");
345             goto err;
346         }
347         *msg->hdr = hdr;
348         data = (char *)msg->hdr + sizeof(hdr);
349     } else {
350         buf = g_malloc0(hdr.size);
351         memcpy(buf, &hdr, sizeof(hdr));
352         data = buf + sizeof(hdr);
353         msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
354         msg->type = VFIO_MSG_REQ;
355     }
356 
357     /*
358      * Read rest of message.
359      */
360     msgleft = hdr.size - sizeof(hdr);
361     while (msgleft > 0) {
362         ret = qio_channel_read(proxy->ioc, data, msgleft, errp);
363 
364         /* prepare to complete read on next iternation */
365         if (ret == QIO_CHANNEL_ERR_BLOCK) {
366             proxy->part_recv = msg;
367             proxy->recv_left = msgleft;
368             return ret;
369         }
370 
371         if (ret <= 0) {
372             goto fatal;
373         }
374         trace_vfio_user_recv_read(hdr.id, ret);
375 
376         msgleft -= ret;
377         data += ret;
378     }
379 
380     vfio_user_process(proxy, msg, isreply);
381     return 0;
382 
383     /*
384      * fatal means the other side closed or we don't trust the stream
385      * err means this message is corrupt
386      */
387 fatal:
388     vfio_user_shutdown(proxy);
389     proxy->state = VFIO_PROXY_ERROR;
390 
391     /* set error if server side closed */
392     if (ret == 0) {
393         error_setg(errp, "server closed socket");
394     }
395 
396 err:
397     for (i = 0; i < numfds; i++) {
398         close(fdp[i]);
399     }
400     if (isreply && msg != NULL) {
401         /* force an error to keep sending thread from hanging */
402         vfio_user_set_error(msg->hdr, EINVAL);
403         msg->complete = true;
404         qemu_cond_signal(&msg->cv);
405     }
406     return -1;
407 }
408 
409 static void vfio_user_recv(void *opaque)
410 {
411     VFIOUserProxy *proxy = opaque;
412 
413     QEMU_LOCK_GUARD(&proxy->lock);
414 
415     if (proxy->state == VFIO_PROXY_CONNECTED) {
416         Error *local_err = NULL;
417 
418         while (vfio_user_recv_one(proxy, &local_err) == 0) {
419             ;
420         }
421 
422         if (local_err != NULL) {
423             error_report_err(local_err);
424         }
425     }
426 }
427 
428 /*
429  * Send a single message, same return semantics as vfio_user_send_qio().
430  *
431  * Sent async messages are freed, others are moved to pending queue.
432  */
433 static ssize_t vfio_user_send_one(VFIOUserProxy *proxy, Error **errp)
434 {
435     VFIOUserMsg *msg;
436     ssize_t ret;
437 
438     msg = QTAILQ_FIRST(&proxy->outgoing);
439     ret = vfio_user_send_qio(proxy, msg, errp);
440     if (ret < 0) {
441         return ret;
442     }
443 
444     QTAILQ_REMOVE(&proxy->outgoing, msg, next);
445     if (msg->type == VFIO_MSG_ASYNC) {
446         vfio_user_recycle(proxy, msg);
447     } else {
448         QTAILQ_INSERT_TAIL(&proxy->pending, msg, next);
449         msg->pending = true;
450     }
451 
452     return ret;
453 }
454 
455 /*
456  * Send messages from outgoing queue when the socket buffer has space.
457  * If we deplete 'outgoing', remove ourselves from the poll list.
458  */
459 static void vfio_user_send(void *opaque)
460 {
461     VFIOUserProxy *proxy = opaque;
462 
463     QEMU_LOCK_GUARD(&proxy->lock);
464 
465     if (proxy->state == VFIO_PROXY_CONNECTED) {
466         while (!QTAILQ_EMPTY(&proxy->outgoing)) {
467             Error *local_err = NULL;
468             int ret;
469 
470             ret = vfio_user_send_one(proxy, &local_err);
471 
472             if (ret == QIO_CHANNEL_ERR_BLOCK) {
473                 return;
474             } else if (ret == -1) {
475                 error_report_err(local_err);
476                 return;
477             }
478         }
479         qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
480                                        vfio_user_recv, NULL, NULL, proxy);
481     }
482 }
483 
484 static void vfio_user_cb(void *opaque)
485 {
486     VFIOUserProxy *proxy = opaque;
487 
488     QEMU_LOCK_GUARD(&proxy->lock);
489 
490     proxy->state = VFIO_PROXY_CLOSED;
491     qemu_cond_signal(&proxy->close_cv);
492 }
493 
494 
495 /*
496  * Functions called by main or CPU threads
497  */
498 
499 /*
500  * Process incoming requests.
501  *
502  * The bus-specific callback has the form:
503  *    request(opaque, msg)
504  * where 'opaque' was specified in vfio_user_set_handler
505  * and 'msg' is the inbound message.
506  *
507  * The callback is responsible for disposing of the message buffer,
508  * usually by re-using it when calling vfio_send_reply or vfio_send_error,
509  * both of which free their message buffer when the reply is sent.
510  *
511  * If the callback uses a new buffer, it needs to free the old one.
512  */
513 static void vfio_user_request(void *opaque)
514 {
515     VFIOUserProxy *proxy = opaque;
516     VFIOUserMsgQ new, free;
517     VFIOUserMsg *msg, *m1;
518 
519     /* reap all incoming */
520     QTAILQ_INIT(&new);
521     WITH_QEMU_LOCK_GUARD(&proxy->lock) {
522         QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
523             QTAILQ_REMOVE(&proxy->incoming, msg, next);
524             QTAILQ_INSERT_TAIL(&new, msg, next);
525         }
526     }
527 
528     /* process list */
529     QTAILQ_INIT(&free);
530     QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
531         QTAILQ_REMOVE(&new, msg, next);
532         trace_vfio_user_recv_request(msg->hdr->command);
533         proxy->request(proxy->req_arg, msg);
534         QTAILQ_INSERT_HEAD(&free, msg, next);
535     }
536 
537     /* free list */
538     WITH_QEMU_LOCK_GUARD(&proxy->lock) {
539         QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
540             vfio_user_recycle(proxy, msg);
541         }
542     }
543 }
544 
545 /*
546  * Messages are queued onto the proxy's outgoing list.
547  *
548  * It handles 3 types of messages:
549  *
550  * async messages - replies and posted writes
551  *
552  * There will be no reply from the server, so message
553  * buffers are freed after they're sent.
554  *
555  * nowait messages - map/unmap during address space transactions
556  *
557  * These are also sent async, but a reply is expected so that
558  * vfio_wait_reqs() can wait for the youngest nowait request.
559  * They transition from the outgoing list to the pending list
560  * when sent, and are freed when the reply is received.
561  *
562  * wait messages - all other requests
563  *
564  * The reply to these messages is waited for by their caller.
565  * They also transition from outgoing to pending when sent, but
566  * the message buffer is returned to the caller with the reply
567  * contents.  The caller is responsible for freeing these messages.
568  *
569  * As an optimization, if the outgoing list and the socket send
570  * buffer are empty, the message is sent inline instead of being
571  * added to the outgoing list.  The rest of the transitions are
572  * unchanged.
573  */
574 static bool vfio_user_send_queued(VFIOUserProxy *proxy, VFIOUserMsg *msg,
575                                   Error **errp)
576 {
577     int ret;
578 
579     /*
580      * Unsent outgoing msgs - add to tail
581      */
582     if (!QTAILQ_EMPTY(&proxy->outgoing)) {
583         QTAILQ_INSERT_TAIL(&proxy->outgoing, msg, next);
584         return true;
585     }
586 
587     /*
588      * Try inline - if blocked, queue it and kick send poller
589      */
590     if (proxy->flags & VFIO_PROXY_FORCE_QUEUED) {
591         ret = QIO_CHANNEL_ERR_BLOCK;
592     } else {
593         ret = vfio_user_send_qio(proxy, msg, errp);
594     }
595 
596     if (ret == QIO_CHANNEL_ERR_BLOCK) {
597         QTAILQ_INSERT_HEAD(&proxy->outgoing, msg, next);
598         qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
599                                        vfio_user_recv, proxy->ctx,
600                                        vfio_user_send, proxy);
601         return true;
602     }
603     if (ret == -1) {
604         return false;
605     }
606 
607     /*
608      * Sent - free async, add others to pending
609      */
610     if (msg->type == VFIO_MSG_ASYNC) {
611         vfio_user_recycle(proxy, msg);
612     } else {
613         QTAILQ_INSERT_TAIL(&proxy->pending, msg, next);
614         msg->pending = true;
615     }
616 
617     return true;
618 }
619 
620 /*
621  * nowait send - vfio_wait_reqs() can wait for it later
622  *
623  * Returns false if we did not successfully receive a reply message, in which
624  * case @errp will be populated.
625  *
626  * In either case, ownership of @hdr and @fds is taken, and the caller must
627  * *not* free them itself.
628  */
629 bool vfio_user_send_nowait(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
630                            VFIOUserFDs *fds, int rsize, Error **errp)
631 {
632     VFIOUserMsg *msg;
633 
634     QEMU_LOCK_GUARD(&proxy->lock);
635 
636     msg = vfio_user_getmsg(proxy, hdr, fds);
637     msg->id = hdr->id;
638     msg->rsize = rsize ? rsize : hdr->size;
639     msg->type = VFIO_MSG_NOWAIT;
640 
641     if (hdr->flags & VFIO_USER_NO_REPLY) {
642         error_setg_errno(errp, EINVAL, "%s on NO_REPLY message", __func__);
643         vfio_user_recycle(proxy, msg);
644         return false;
645     }
646 
647     if (!vfio_user_send_queued(proxy, msg, errp)) {
648         vfio_user_recycle(proxy, msg);
649         return false;
650     }
651 
652     proxy->last_nowait = msg;
653 
654     return true;
655 }
656 
657 /*
658  * Returns false if we did not successfully receive a reply message, in which
659  * case @errp will be populated.
660  *
661  * In either case, the caller must free @hdr and @fds if needed.
662  */
663 bool vfio_user_send_wait(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
664                          VFIOUserFDs *fds, int rsize, Error **errp)
665 {
666     VFIOUserMsg *msg;
667     bool ok = false;
668 
669     if (hdr->flags & VFIO_USER_NO_REPLY) {
670         error_setg_errno(errp, EINVAL, "%s on NO_REPLY message", __func__);
671         return false;
672     }
673 
674     qemu_mutex_lock(&proxy->lock);
675 
676     msg = vfio_user_getmsg(proxy, hdr, fds);
677     msg->id = hdr->id;
678     msg->rsize = rsize ? rsize : hdr->size;
679     msg->type = VFIO_MSG_WAIT;
680 
681     ok = vfio_user_send_queued(proxy, msg, errp);
682 
683     if (ok) {
684         while (!msg->complete) {
685             if (!qemu_cond_timedwait(&msg->cv, &proxy->lock, wait_time)) {
686                 VFIOUserMsgQ *list;
687 
688                 list = msg->pending ? &proxy->pending : &proxy->outgoing;
689                 QTAILQ_REMOVE(list, msg, next);
690                 error_setg_errno(errp, ETIMEDOUT,
691                                  "timed out waiting for reply");
692                 ok = false;
693                 break;
694             }
695         }
696     }
697 
698     vfio_user_recycle(proxy, msg);
699 
700     qemu_mutex_unlock(&proxy->lock);
701 
702     return ok;
703 }
704 
705 void vfio_user_wait_reqs(VFIOUserProxy *proxy)
706 {
707     VFIOUserMsg *msg;
708 
709     /*
710      * Any DMA map/unmap requests sent in the middle
711      * of a memory region transaction were sent nowait.
712      * Wait for them here.
713      */
714     qemu_mutex_lock(&proxy->lock);
715     if (proxy->last_nowait != NULL) {
716         /*
717          * Change type to WAIT to wait for reply
718          */
719         msg = proxy->last_nowait;
720         msg->type = VFIO_MSG_WAIT;
721         proxy->last_nowait = NULL;
722         while (!msg->complete) {
723             if (!qemu_cond_timedwait(&msg->cv, &proxy->lock, wait_time)) {
724                 VFIOUserMsgQ *list;
725 
726                 list = msg->pending ? &proxy->pending : &proxy->outgoing;
727                 QTAILQ_REMOVE(list, msg, next);
728                 error_printf("vfio_wait_reqs - timed out\n");
729                 break;
730             }
731         }
732 
733         if (msg->hdr->flags & VFIO_USER_ERROR) {
734             error_printf("vfio_user_wait_reqs - error reply on async ");
735             error_printf("request: command %x error %s\n", msg->hdr->command,
736                          strerror(msg->hdr->error_reply));
737         }
738 
739         /*
740          * Change type back to NOWAIT to free
741          */
742         msg->type = VFIO_MSG_NOWAIT;
743         vfio_user_recycle(proxy, msg);
744     }
745 
746     qemu_mutex_unlock(&proxy->lock);
747 }
748 
749 static QLIST_HEAD(, VFIOUserProxy) vfio_user_sockets =
750     QLIST_HEAD_INITIALIZER(vfio_user_sockets);
751 
752 VFIOUserProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp)
753 {
754     VFIOUserProxy *proxy;
755     QIOChannelSocket *sioc;
756     QIOChannel *ioc;
757     char *sockname;
758 
759     if (addr->type != SOCKET_ADDRESS_TYPE_UNIX) {
760         error_setg(errp, "vfio_user_connect - bad address family");
761         return NULL;
762     }
763     sockname = addr->u.q_unix.path;
764 
765     sioc = qio_channel_socket_new();
766     ioc = QIO_CHANNEL(sioc);
767     if (qio_channel_socket_connect_sync(sioc, addr, errp)) {
768         object_unref(OBJECT(ioc));
769         return NULL;
770     }
771     qio_channel_set_blocking(ioc, false, NULL);
772 
773     proxy = g_malloc0(sizeof(VFIOUserProxy));
774     proxy->sockname = g_strdup_printf("unix:%s", sockname);
775     proxy->ioc = ioc;
776 
777     /* init defaults */
778     proxy->max_xfer_size = VFIO_USER_DEF_MAX_XFER;
779     proxy->max_send_fds = VFIO_USER_DEF_MAX_FDS;
780     proxy->max_dma = VFIO_USER_DEF_MAP_MAX;
781     proxy->dma_pgsizes = VFIO_USER_DEF_PGSIZE;
782     proxy->max_bitmap = VFIO_USER_DEF_MAX_BITMAP;
783     proxy->migr_pgsize = VFIO_USER_DEF_PGSIZE;
784 
785     proxy->flags = VFIO_PROXY_CLIENT;
786     proxy->state = VFIO_PROXY_CONNECTED;
787 
788     qemu_mutex_init(&proxy->lock);
789     qemu_cond_init(&proxy->close_cv);
790 
791     if (vfio_user_iothread == NULL) {
792         vfio_user_iothread = iothread_create("VFIO user", errp);
793     }
794 
795     proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
796     proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
797 
798     QTAILQ_INIT(&proxy->outgoing);
799     QTAILQ_INIT(&proxy->incoming);
800     QTAILQ_INIT(&proxy->free);
801     QTAILQ_INIT(&proxy->pending);
802     QLIST_INSERT_HEAD(&vfio_user_sockets, proxy, next);
803 
804     return proxy;
805 }
806 
807 void vfio_user_set_handler(VFIODevice *vbasedev,
808                            void (*handler)(void *opaque, VFIOUserMsg *msg),
809                            void *req_arg)
810 {
811     VFIOUserProxy *proxy = vbasedev->proxy;
812 
813     proxy->request = handler;
814     proxy->req_arg = req_arg;
815     qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
816                                    vfio_user_recv, NULL, NULL, proxy);
817 }
818 
819 void vfio_user_disconnect(VFIOUserProxy *proxy)
820 {
821     VFIOUserMsg *r1, *r2;
822 
823     qemu_mutex_lock(&proxy->lock);
824 
825     /* our side is quitting */
826     if (proxy->state == VFIO_PROXY_CONNECTED) {
827         vfio_user_shutdown(proxy);
828         if (!QTAILQ_EMPTY(&proxy->pending)) {
829             error_printf("vfio_user_disconnect: outstanding requests\n");
830         }
831     }
832     object_unref(OBJECT(proxy->ioc));
833     proxy->ioc = NULL;
834     qemu_bh_delete(proxy->req_bh);
835     proxy->req_bh = NULL;
836 
837     proxy->state = VFIO_PROXY_CLOSING;
838     QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
839         qemu_cond_destroy(&r1->cv);
840         QTAILQ_REMOVE(&proxy->outgoing, r1, next);
841         g_free(r1);
842     }
843     QTAILQ_FOREACH_SAFE(r1, &proxy->incoming, next, r2) {
844         qemu_cond_destroy(&r1->cv);
845         QTAILQ_REMOVE(&proxy->incoming, r1, next);
846         g_free(r1);
847     }
848     QTAILQ_FOREACH_SAFE(r1, &proxy->pending, next, r2) {
849         qemu_cond_destroy(&r1->cv);
850         QTAILQ_REMOVE(&proxy->pending, r1, next);
851         g_free(r1);
852     }
853     QTAILQ_FOREACH_SAFE(r1, &proxy->free, next, r2) {
854         qemu_cond_destroy(&r1->cv);
855         QTAILQ_REMOVE(&proxy->free, r1, next);
856         g_free(r1);
857     }
858 
859     /*
860      * Make sure the iothread isn't blocking anywhere
861      * with a ref to this proxy by waiting for a BH
862      * handler to run after the proxy fd handlers were
863      * deleted above.
864      */
865     aio_bh_schedule_oneshot(proxy->ctx, vfio_user_cb, proxy);
866     qemu_cond_wait(&proxy->close_cv, &proxy->lock);
867 
868     /* we now hold the only ref to proxy */
869     qemu_mutex_unlock(&proxy->lock);
870     qemu_cond_destroy(&proxy->close_cv);
871     qemu_mutex_destroy(&proxy->lock);
872 
873     QLIST_REMOVE(proxy, next);
874     if (QLIST_EMPTY(&vfio_user_sockets)) {
875         iothread_destroy(vfio_user_iothread);
876         vfio_user_iothread = NULL;
877     }
878 
879     g_free(proxy->sockname);
880     g_free(proxy);
881 }
882 
883 void vfio_user_request_msg(VFIOUserHdr *hdr, uint16_t cmd,
884                            uint32_t size, uint32_t flags)
885 {
886     static uint16_t next_id;
887 
888     hdr->id = qatomic_fetch_inc(&next_id);
889     hdr->command = cmd;
890     hdr->size = size;
891     hdr->flags = (flags & ~VFIO_USER_TYPE) | VFIO_USER_REQUEST;
892     hdr->error_reply = 0;
893 }
894 
895 struct cap_entry {
896     const char *name;
897     bool (*check)(VFIOUserProxy *proxy, QObject *qobj, Error **errp);
898 };
899 
900 static bool caps_parse(VFIOUserProxy *proxy, QDict *qdict,
901                        struct cap_entry caps[], Error **errp)
902 {
903     QObject *qobj;
904     struct cap_entry *p;
905 
906     for (p = caps; p->name != NULL; p++) {
907         qobj = qdict_get(qdict, p->name);
908         if (qobj != NULL) {
909             if (!p->check(proxy, qobj, errp)) {
910                 return false;
911             }
912             qdict_del(qdict, p->name);
913         }
914     }
915 
916     /* warning, for now */
917     if (qdict_size(qdict) != 0) {
918         warn_report("spurious capabilities");
919     }
920     return true;
921 }
922 
923 static bool check_migr_pgsize(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
924 {
925     QNum *qn = qobject_to(QNum, qobj);
926     uint64_t pgsize;
927 
928     if (qn == NULL || !qnum_get_try_uint(qn, &pgsize)) {
929         error_setg(errp, "malformed %s", VFIO_USER_CAP_PGSIZE);
930         return false;
931     }
932 
933     /* must be larger than default */
934     if (pgsize & (VFIO_USER_DEF_PGSIZE - 1)) {
935         error_setg(errp, "pgsize 0x%"PRIx64" too small", pgsize);
936         return false;
937     }
938 
939     proxy->migr_pgsize = pgsize;
940     return true;
941 }
942 
943 static bool check_bitmap(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
944 {
945     QNum *qn = qobject_to(QNum, qobj);
946     uint64_t bitmap_size;
947 
948     if (qn == NULL || !qnum_get_try_uint(qn, &bitmap_size)) {
949         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_BITMAP);
950         return false;
951     }
952 
953     /* can only lower it */
954     if (bitmap_size > VFIO_USER_DEF_MAX_BITMAP) {
955         error_setg(errp, "%s too large", VFIO_USER_CAP_MAX_BITMAP);
956         return false;
957     }
958 
959     proxy->max_bitmap = bitmap_size;
960     return true;
961 }
962 
963 static struct cap_entry caps_migr[] = {
964     { VFIO_USER_CAP_PGSIZE, check_migr_pgsize },
965     { VFIO_USER_CAP_MAX_BITMAP, check_bitmap },
966     { NULL }
967 };
968 
969 static bool check_max_fds(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
970 {
971     QNum *qn = qobject_to(QNum, qobj);
972     uint64_t max_send_fds;
973 
974     if (qn == NULL || !qnum_get_try_uint(qn, &max_send_fds) ||
975         max_send_fds > VFIO_USER_MAX_MAX_FDS) {
976         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_FDS);
977         return false;
978     }
979     proxy->max_send_fds = max_send_fds;
980     return true;
981 }
982 
983 static bool check_max_xfer(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
984 {
985     QNum *qn = qobject_to(QNum, qobj);
986     uint64_t max_xfer_size;
987 
988     if (qn == NULL || !qnum_get_try_uint(qn, &max_xfer_size) ||
989         max_xfer_size > VFIO_USER_MAX_MAX_XFER) {
990         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_XFER);
991         return false;
992     }
993     proxy->max_xfer_size = max_xfer_size;
994     return true;
995 }
996 
997 static bool check_pgsizes(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
998 {
999     QNum *qn = qobject_to(QNum, qobj);
1000     uint64_t pgsizes;
1001 
1002     if (qn == NULL || !qnum_get_try_uint(qn, &pgsizes)) {
1003         error_setg(errp, "malformed %s", VFIO_USER_CAP_PGSIZES);
1004         return false;
1005     }
1006 
1007     /* must be larger than default */
1008     if (pgsizes & (VFIO_USER_DEF_PGSIZE - 1)) {
1009         error_setg(errp, "pgsize 0x%"PRIx64" too small", pgsizes);
1010         return false;
1011     }
1012 
1013     proxy->dma_pgsizes = pgsizes;
1014     return true;
1015 }
1016 
1017 static bool check_max_dma(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1018 {
1019     QNum *qn = qobject_to(QNum, qobj);
1020     uint64_t max_dma;
1021 
1022     if (qn == NULL || !qnum_get_try_uint(qn, &max_dma)) {
1023         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAP_MAX);
1024         return false;
1025     }
1026 
1027     /* can only lower it */
1028     if (max_dma > VFIO_USER_DEF_MAP_MAX) {
1029         error_setg(errp, "%s too large", VFIO_USER_CAP_MAP_MAX);
1030         return false;
1031     }
1032 
1033     proxy->max_dma = max_dma;
1034     return true;
1035 }
1036 
1037 static bool check_migr(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1038 {
1039     QDict *qdict = qobject_to(QDict, qobj);
1040 
1041     if (qdict == NULL) {
1042         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_FDS);
1043         return true;
1044     }
1045     return caps_parse(proxy, qdict, caps_migr, errp);
1046 }
1047 
1048 static struct cap_entry caps_cap[] = {
1049     { VFIO_USER_CAP_MAX_FDS, check_max_fds },
1050     { VFIO_USER_CAP_MAX_XFER, check_max_xfer },
1051     { VFIO_USER_CAP_PGSIZES, check_pgsizes },
1052     { VFIO_USER_CAP_MAP_MAX, check_max_dma },
1053     { VFIO_USER_CAP_MIGR, check_migr },
1054     { NULL }
1055 };
1056 
1057 static bool check_cap(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1058 {
1059    QDict *qdict = qobject_to(QDict, qobj);
1060 
1061     if (qdict == NULL) {
1062         error_setg(errp, "malformed %s", VFIO_USER_CAP);
1063         return false;
1064     }
1065     return caps_parse(proxy, qdict, caps_cap, errp);
1066 }
1067 
1068 static struct cap_entry ver_0_0[] = {
1069     { VFIO_USER_CAP, check_cap },
1070     { NULL }
1071 };
1072 
1073 static bool caps_check(VFIOUserProxy *proxy, int minor, const char *caps,
1074                        Error **errp)
1075 {
1076     QObject *qobj;
1077     QDict *qdict;
1078     bool ret;
1079 
1080     qobj = qobject_from_json(caps, NULL);
1081     if (qobj == NULL) {
1082         error_setg(errp, "malformed capabilities %s", caps);
1083         return false;
1084     }
1085     qdict = qobject_to(QDict, qobj);
1086     if (qdict == NULL) {
1087         error_setg(errp, "capabilities %s not an object", caps);
1088         qobject_unref(qobj);
1089         return false;
1090     }
1091     ret = caps_parse(proxy, qdict, ver_0_0, errp);
1092 
1093     qobject_unref(qobj);
1094     return ret;
1095 }
1096 
1097 static GString *caps_json(void)
1098 {
1099     QDict *dict = qdict_new();
1100     QDict *capdict = qdict_new();
1101     QDict *migdict = qdict_new();
1102     GString *str;
1103 
1104     qdict_put_int(migdict, VFIO_USER_CAP_PGSIZE, VFIO_USER_DEF_PGSIZE);
1105     qdict_put_int(migdict, VFIO_USER_CAP_MAX_BITMAP, VFIO_USER_DEF_MAX_BITMAP);
1106     qdict_put_obj(capdict, VFIO_USER_CAP_MIGR, QOBJECT(migdict));
1107 
1108     qdict_put_int(capdict, VFIO_USER_CAP_MAX_FDS, VFIO_USER_MAX_MAX_FDS);
1109     qdict_put_int(capdict, VFIO_USER_CAP_MAX_XFER, VFIO_USER_DEF_MAX_XFER);
1110     qdict_put_int(capdict, VFIO_USER_CAP_PGSIZES, VFIO_USER_DEF_PGSIZE);
1111     qdict_put_int(capdict, VFIO_USER_CAP_MAP_MAX, VFIO_USER_DEF_MAP_MAX);
1112 
1113     qdict_put_obj(dict, VFIO_USER_CAP, QOBJECT(capdict));
1114 
1115     str = qobject_to_json(QOBJECT(dict));
1116     qobject_unref(dict);
1117     return str;
1118 }
1119 
1120 bool vfio_user_validate_version(VFIOUserProxy *proxy, Error **errp)
1121 {
1122     g_autofree VFIOUserVersion *msgp = NULL;
1123     GString *caps;
1124     char *reply;
1125     int size, caplen;
1126 
1127     caps = caps_json();
1128     caplen = caps->len + 1;
1129     size = sizeof(*msgp) + caplen;
1130     msgp = g_malloc0(size);
1131 
1132     vfio_user_request_msg(&msgp->hdr, VFIO_USER_VERSION, size, 0);
1133     msgp->major = VFIO_USER_MAJOR_VER;
1134     msgp->minor = VFIO_USER_MINOR_VER;
1135     memcpy(&msgp->capabilities, caps->str, caplen);
1136     g_string_free(caps, true);
1137     trace_vfio_user_version(msgp->major, msgp->minor, msgp->capabilities);
1138 
1139     if (!vfio_user_send_wait(proxy, &msgp->hdr, NULL, 0, errp)) {
1140         return false;
1141     }
1142 
1143     if (msgp->hdr.flags & VFIO_USER_ERROR) {
1144         error_setg_errno(errp, msgp->hdr.error_reply, "version reply");
1145         return false;
1146     }
1147 
1148     if (msgp->major != VFIO_USER_MAJOR_VER ||
1149         msgp->minor > VFIO_USER_MINOR_VER) {
1150         error_setg(errp, "incompatible server version");
1151         return false;
1152     }
1153 
1154     reply = msgp->capabilities;
1155     if (reply[msgp->hdr.size - sizeof(*msgp) - 1] != '\0') {
1156         error_setg(errp, "corrupt version reply");
1157         return false;
1158     }
1159 
1160     if (!caps_check(proxy, msgp->minor, reply, errp)) {
1161         return false;
1162     }
1163 
1164     trace_vfio_user_version(msgp->major, msgp->minor, msgp->capabilities);
1165     return true;
1166 }
1167