xref: /qemu/hw/vfio-user/proxy.c (revision 0b3d881a061b284a3db00d7fe9d33581fb424287)
1 /*
2  * vfio protocol over a UNIX socket.
3  *
4  * Copyright © 2018, 2021 Oracle and/or its affiliates.
5  *
6  * SPDX-License-Identifier: GPL-2.0-or-later
7  */
8 
9 #include "qemu/osdep.h"
10 #include <sys/ioctl.h>
11 
12 #include "hw/vfio/vfio-device.h"
13 #include "hw/vfio-user/proxy.h"
14 #include "hw/vfio-user/trace.h"
15 #include "qapi/error.h"
16 #include "qemu/error-report.h"
17 #include "qemu/lockable.h"
18 #include "qemu/main-loop.h"
19 #include "system/iothread.h"
20 
21 static IOThread *vfio_user_iothread;
22 
23 static void vfio_user_shutdown(VFIOUserProxy *proxy);
24 static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
25                                      VFIOUserFDs *fds);
26 static VFIOUserFDs *vfio_user_getfds(int numfds);
27 static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg);
28 
29 static void vfio_user_recv(void *opaque);
30 static void vfio_user_cb(void *opaque);
31 
32 static void vfio_user_request(void *opaque);
33 
34 static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
35 {
36     hdr->flags |= VFIO_USER_ERROR;
37     hdr->error_reply = err;
38 }
39 
40 /*
41  * Functions called by main, CPU, or iothread threads
42  */
43 
44 static void vfio_user_shutdown(VFIOUserProxy *proxy)
45 {
46     qio_channel_shutdown(proxy->ioc, QIO_CHANNEL_SHUTDOWN_READ, NULL);
47     qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL,
48                                    proxy->ctx, NULL, NULL);
49 }
50 
51 static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
52                                      VFIOUserFDs *fds)
53 {
54     VFIOUserMsg *msg;
55 
56     msg = QTAILQ_FIRST(&proxy->free);
57     if (msg != NULL) {
58         QTAILQ_REMOVE(&proxy->free, msg, next);
59     } else {
60         msg = g_malloc0(sizeof(*msg));
61         qemu_cond_init(&msg->cv);
62     }
63 
64     msg->hdr = hdr;
65     msg->fds = fds;
66     return msg;
67 }
68 
69 /*
70  * Recycle a message list entry to the free list.
71  */
72 static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg)
73 {
74     if (msg->type == VFIO_MSG_NONE) {
75         error_printf("vfio_user_recycle - freeing free msg\n");
76         return;
77     }
78 
79     /* free msg buffer if no one is waiting to consume the reply */
80     if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) {
81         g_free(msg->hdr);
82         if (msg->fds != NULL) {
83             g_free(msg->fds);
84         }
85     }
86 
87     msg->type = VFIO_MSG_NONE;
88     msg->hdr = NULL;
89     msg->fds = NULL;
90     msg->complete = false;
91     QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
92 }
93 
94 static VFIOUserFDs *vfio_user_getfds(int numfds)
95 {
96     VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
97 
98     fds->fds = (int *)((char *)fds + sizeof(*fds));
99 
100     return fds;
101 }
102 
103 /*
104  * Functions only called by iothread
105  */
106 
107 /*
108  * Process a received message.
109  */
110 static void vfio_user_process(VFIOUserProxy *proxy, VFIOUserMsg *msg,
111                               bool isreply)
112 {
113 
114     /*
115      * Replies signal a waiter, if none just check for errors
116      * and free the message buffer.
117      *
118      * Requests get queued for the BH.
119      */
120     if (isreply) {
121         msg->complete = true;
122         if (msg->type == VFIO_MSG_WAIT) {
123             qemu_cond_signal(&msg->cv);
124         } else {
125             if (msg->hdr->flags & VFIO_USER_ERROR) {
126                 error_printf("vfio_user_process: error reply on async ");
127                 error_printf("request command %x error %s\n",
128                              msg->hdr->command,
129                              strerror(msg->hdr->error_reply));
130             }
131             /* youngest nowait msg has been ack'd */
132             if (proxy->last_nowait == msg) {
133                 proxy->last_nowait = NULL;
134             }
135             vfio_user_recycle(proxy, msg);
136         }
137     } else {
138         QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
139         qemu_bh_schedule(proxy->req_bh);
140     }
141 }
142 
143 /*
144  * Complete a partial message read
145  */
146 static int vfio_user_complete(VFIOUserProxy *proxy, Error **errp)
147 {
148     VFIOUserMsg *msg = proxy->part_recv;
149     size_t msgleft = proxy->recv_left;
150     bool isreply;
151     char *data;
152     int ret;
153 
154     data = (char *)msg->hdr + (msg->hdr->size - msgleft);
155     while (msgleft > 0) {
156         ret = qio_channel_read(proxy->ioc, data, msgleft, errp);
157 
158         /* error or would block */
159         if (ret <= 0) {
160             /* try for rest on next iternation */
161             if (ret == QIO_CHANNEL_ERR_BLOCK) {
162                 proxy->recv_left = msgleft;
163             }
164             return ret;
165         }
166         trace_vfio_user_recv_read(msg->hdr->id, ret);
167 
168         msgleft -= ret;
169         data += ret;
170     }
171 
172     /*
173      * Read complete message, process it.
174      */
175     proxy->part_recv = NULL;
176     proxy->recv_left = 0;
177     isreply = (msg->hdr->flags & VFIO_USER_TYPE) == VFIO_USER_REPLY;
178     vfio_user_process(proxy, msg, isreply);
179 
180     /* return positive value */
181     return 1;
182 }
183 
184 /*
185  * Receive and process one incoming message.
186  *
187  * For replies, find matching outgoing request and wake any waiters.
188  * For requests, queue in incoming list and run request BH.
189  */
190 static int vfio_user_recv_one(VFIOUserProxy *proxy, Error **errp)
191 {
192     VFIOUserMsg *msg = NULL;
193     g_autofree int *fdp = NULL;
194     VFIOUserFDs *reqfds;
195     VFIOUserHdr hdr;
196     struct iovec iov = {
197         .iov_base = &hdr,
198         .iov_len = sizeof(hdr),
199     };
200     bool isreply = false;
201     int i, ret;
202     size_t msgleft, numfds = 0;
203     char *data = NULL;
204     char *buf = NULL;
205 
206     /*
207      * Complete any partial reads
208      */
209     if (proxy->part_recv != NULL) {
210         ret = vfio_user_complete(proxy, errp);
211 
212         /* still not complete, try later */
213         if (ret == QIO_CHANNEL_ERR_BLOCK) {
214             return ret;
215         }
216 
217         if (ret <= 0) {
218             goto fatal;
219         }
220         /* else fall into reading another msg */
221     }
222 
223     /*
224      * Read header
225      */
226     ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, 0,
227                                  errp);
228     if (ret == QIO_CHANNEL_ERR_BLOCK) {
229         return ret;
230     }
231 
232     /* read error or other side closed connection */
233     if (ret <= 0) {
234         goto fatal;
235     }
236 
237     if (ret < sizeof(hdr)) {
238         error_setg(errp, "short read of header");
239         goto fatal;
240     }
241 
242     /*
243      * Validate header
244      */
245     if (hdr.size < sizeof(VFIOUserHdr)) {
246         error_setg(errp, "bad header size");
247         goto fatal;
248     }
249     switch (hdr.flags & VFIO_USER_TYPE) {
250     case VFIO_USER_REQUEST:
251         isreply = false;
252         break;
253     case VFIO_USER_REPLY:
254         isreply = true;
255         break;
256     default:
257         error_setg(errp, "unknown message type");
258         goto fatal;
259     }
260     trace_vfio_user_recv_hdr(proxy->sockname, hdr.id, hdr.command, hdr.size,
261                              hdr.flags);
262 
263     /*
264      * For replies, find the matching pending request.
265      * For requests, reap incoming FDs.
266      */
267     if (isreply) {
268         QTAILQ_FOREACH(msg, &proxy->pending, next) {
269             if (hdr.id == msg->id) {
270                 break;
271             }
272         }
273         if (msg == NULL) {
274             error_setg(errp, "unexpected reply");
275             goto err;
276         }
277         QTAILQ_REMOVE(&proxy->pending, msg, next);
278 
279         /*
280          * Process any received FDs
281          */
282         if (numfds != 0) {
283             if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
284                 error_setg(errp, "unexpected FDs");
285                 goto err;
286             }
287             msg->fds->recv_fds = numfds;
288             memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
289         }
290     } else {
291         if (numfds != 0) {
292             reqfds = vfio_user_getfds(numfds);
293             memcpy(reqfds->fds, fdp, numfds * sizeof(int));
294         } else {
295             reqfds = NULL;
296         }
297     }
298 
299     /*
300      * Put the whole message into a single buffer.
301      */
302     if (isreply) {
303         if (hdr.size > msg->rsize) {
304             error_setg(errp, "reply larger than recv buffer");
305             goto err;
306         }
307         *msg->hdr = hdr;
308         data = (char *)msg->hdr + sizeof(hdr);
309     } else {
310         buf = g_malloc0(hdr.size);
311         memcpy(buf, &hdr, sizeof(hdr));
312         data = buf + sizeof(hdr);
313         msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
314         msg->type = VFIO_MSG_REQ;
315     }
316 
317     /*
318      * Read rest of message.
319      */
320     msgleft = hdr.size - sizeof(hdr);
321     while (msgleft > 0) {
322         ret = qio_channel_read(proxy->ioc, data, msgleft, errp);
323 
324         /* prepare to complete read on next iternation */
325         if (ret == QIO_CHANNEL_ERR_BLOCK) {
326             proxy->part_recv = msg;
327             proxy->recv_left = msgleft;
328             return ret;
329         }
330 
331         if (ret <= 0) {
332             goto fatal;
333         }
334         trace_vfio_user_recv_read(hdr.id, ret);
335 
336         msgleft -= ret;
337         data += ret;
338     }
339 
340     vfio_user_process(proxy, msg, isreply);
341     return 0;
342 
343     /*
344      * fatal means the other side closed or we don't trust the stream
345      * err means this message is corrupt
346      */
347 fatal:
348     vfio_user_shutdown(proxy);
349     proxy->state = VFIO_PROXY_ERROR;
350 
351     /* set error if server side closed */
352     if (ret == 0) {
353         error_setg(errp, "server closed socket");
354     }
355 
356 err:
357     for (i = 0; i < numfds; i++) {
358         close(fdp[i]);
359     }
360     if (isreply && msg != NULL) {
361         /* force an error to keep sending thread from hanging */
362         vfio_user_set_error(msg->hdr, EINVAL);
363         msg->complete = true;
364         qemu_cond_signal(&msg->cv);
365     }
366     return -1;
367 }
368 
369 static void vfio_user_recv(void *opaque)
370 {
371     VFIOUserProxy *proxy = opaque;
372 
373     QEMU_LOCK_GUARD(&proxy->lock);
374 
375     if (proxy->state == VFIO_PROXY_CONNECTED) {
376         Error *local_err = NULL;
377 
378         while (vfio_user_recv_one(proxy, &local_err) == 0) {
379             ;
380         }
381 
382         if (local_err != NULL) {
383             error_report_err(local_err);
384         }
385     }
386 }
387 
388 static void vfio_user_cb(void *opaque)
389 {
390     VFIOUserProxy *proxy = opaque;
391 
392     QEMU_LOCK_GUARD(&proxy->lock);
393 
394     proxy->state = VFIO_PROXY_CLOSED;
395     qemu_cond_signal(&proxy->close_cv);
396 }
397 
398 
399 /*
400  * Functions called by main or CPU threads
401  */
402 
403 /*
404  * Process incoming requests.
405  *
406  * The bus-specific callback has the form:
407  *    request(opaque, msg)
408  * where 'opaque' was specified in vfio_user_set_handler
409  * and 'msg' is the inbound message.
410  *
411  * The callback is responsible for disposing of the message buffer,
412  * usually by re-using it when calling vfio_send_reply or vfio_send_error,
413  * both of which free their message buffer when the reply is sent.
414  *
415  * If the callback uses a new buffer, it needs to free the old one.
416  */
417 static void vfio_user_request(void *opaque)
418 {
419     VFIOUserProxy *proxy = opaque;
420     VFIOUserMsgQ new, free;
421     VFIOUserMsg *msg, *m1;
422 
423     /* reap all incoming */
424     QTAILQ_INIT(&new);
425     WITH_QEMU_LOCK_GUARD(&proxy->lock) {
426         QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
427             QTAILQ_REMOVE(&proxy->incoming, msg, next);
428             QTAILQ_INSERT_TAIL(&new, msg, next);
429         }
430     }
431 
432     /* process list */
433     QTAILQ_INIT(&free);
434     QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
435         QTAILQ_REMOVE(&new, msg, next);
436         trace_vfio_user_recv_request(msg->hdr->command);
437         proxy->request(proxy->req_arg, msg);
438         QTAILQ_INSERT_HEAD(&free, msg, next);
439     }
440 
441     /* free list */
442     WITH_QEMU_LOCK_GUARD(&proxy->lock) {
443         QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
444             vfio_user_recycle(proxy, msg);
445         }
446     }
447 }
448 
449 
450 static QLIST_HEAD(, VFIOUserProxy) vfio_user_sockets =
451     QLIST_HEAD_INITIALIZER(vfio_user_sockets);
452 
453 VFIOUserProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp)
454 {
455     VFIOUserProxy *proxy;
456     QIOChannelSocket *sioc;
457     QIOChannel *ioc;
458     char *sockname;
459 
460     if (addr->type != SOCKET_ADDRESS_TYPE_UNIX) {
461         error_setg(errp, "vfio_user_connect - bad address family");
462         return NULL;
463     }
464     sockname = addr->u.q_unix.path;
465 
466     sioc = qio_channel_socket_new();
467     ioc = QIO_CHANNEL(sioc);
468     if (qio_channel_socket_connect_sync(sioc, addr, errp)) {
469         object_unref(OBJECT(ioc));
470         return NULL;
471     }
472     qio_channel_set_blocking(ioc, false, NULL);
473 
474     proxy = g_malloc0(sizeof(VFIOUserProxy));
475     proxy->sockname = g_strdup_printf("unix:%s", sockname);
476     proxy->ioc = ioc;
477     proxy->flags = VFIO_PROXY_CLIENT;
478     proxy->state = VFIO_PROXY_CONNECTED;
479 
480     qemu_mutex_init(&proxy->lock);
481     qemu_cond_init(&proxy->close_cv);
482 
483     if (vfio_user_iothread == NULL) {
484         vfio_user_iothread = iothread_create("VFIO user", errp);
485     }
486 
487     proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
488     proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
489 
490     QTAILQ_INIT(&proxy->outgoing);
491     QTAILQ_INIT(&proxy->incoming);
492     QTAILQ_INIT(&proxy->free);
493     QTAILQ_INIT(&proxy->pending);
494     QLIST_INSERT_HEAD(&vfio_user_sockets, proxy, next);
495 
496     return proxy;
497 }
498 
499 void vfio_user_set_handler(VFIODevice *vbasedev,
500                            void (*handler)(void *opaque, VFIOUserMsg *msg),
501                            void *req_arg)
502 {
503     VFIOUserProxy *proxy = vbasedev->proxy;
504 
505     proxy->request = handler;
506     proxy->req_arg = req_arg;
507     qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
508                                    vfio_user_recv, NULL, NULL, proxy);
509 }
510 
511 void vfio_user_disconnect(VFIOUserProxy *proxy)
512 {
513     VFIOUserMsg *r1, *r2;
514 
515     qemu_mutex_lock(&proxy->lock);
516 
517     /* our side is quitting */
518     if (proxy->state == VFIO_PROXY_CONNECTED) {
519         vfio_user_shutdown(proxy);
520         if (!QTAILQ_EMPTY(&proxy->pending)) {
521             error_printf("vfio_user_disconnect: outstanding requests\n");
522         }
523     }
524     object_unref(OBJECT(proxy->ioc));
525     proxy->ioc = NULL;
526     qemu_bh_delete(proxy->req_bh);
527     proxy->req_bh = NULL;
528 
529     proxy->state = VFIO_PROXY_CLOSING;
530     QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
531         qemu_cond_destroy(&r1->cv);
532         QTAILQ_REMOVE(&proxy->outgoing, r1, next);
533         g_free(r1);
534     }
535     QTAILQ_FOREACH_SAFE(r1, &proxy->incoming, next, r2) {
536         qemu_cond_destroy(&r1->cv);
537         QTAILQ_REMOVE(&proxy->incoming, r1, next);
538         g_free(r1);
539     }
540     QTAILQ_FOREACH_SAFE(r1, &proxy->pending, next, r2) {
541         qemu_cond_destroy(&r1->cv);
542         QTAILQ_REMOVE(&proxy->pending, r1, next);
543         g_free(r1);
544     }
545     QTAILQ_FOREACH_SAFE(r1, &proxy->free, next, r2) {
546         qemu_cond_destroy(&r1->cv);
547         QTAILQ_REMOVE(&proxy->free, r1, next);
548         g_free(r1);
549     }
550 
551     /*
552      * Make sure the iothread isn't blocking anywhere
553      * with a ref to this proxy by waiting for a BH
554      * handler to run after the proxy fd handlers were
555      * deleted above.
556      */
557     aio_bh_schedule_oneshot(proxy->ctx, vfio_user_cb, proxy);
558     qemu_cond_wait(&proxy->close_cv, &proxy->lock);
559 
560     /* we now hold the only ref to proxy */
561     qemu_mutex_unlock(&proxy->lock);
562     qemu_cond_destroy(&proxy->close_cv);
563     qemu_mutex_destroy(&proxy->lock);
564 
565     QLIST_REMOVE(proxy, next);
566     if (QLIST_EMPTY(&vfio_user_sockets)) {
567         iothread_destroy(vfio_user_iothread);
568         vfio_user_iothread = NULL;
569     }
570 
571     g_free(proxy->sockname);
572     g_free(proxy);
573 }
574