xref: /cloud-hypervisor/virtio-devices/src/vsock/unix/muxer.rs (revision 61e57e1cb149de03ae1e0b799b9e5ba9a4a63ace)
1434a5d0eSSebastien Boeuf // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2434a5d0eSSebastien Boeuf // SPDX-License-Identifier: Apache-2.0
3434a5d0eSSebastien Boeuf //
4434a5d0eSSebastien Boeuf 
5f6236087SAlyssa Ross //! `VsockMuxer` is the device-facing component of the Unix domain sockets vsock backend. I.e.
6f6236087SAlyssa Ross //! by implementing the `VsockBackend` trait, it abstracts away the gory details of translating
7f6236087SAlyssa Ross //! between AF_VSOCK and AF_UNIX, and presents a clean interface to the rest of the vsock
8f6236087SAlyssa Ross //! device model.
9f6236087SAlyssa Ross //!
10f6236087SAlyssa Ross //! The vsock muxer has two main roles:
11f6236087SAlyssa Ross //!
12f6236087SAlyssa Ross //! ## Vsock connection multiplexer
13f6236087SAlyssa Ross //!
14f6236087SAlyssa Ross //! It's the muxer's job to create, manage, and terminate `VsockConnection` objects. The
15f6236087SAlyssa Ross //! muxer also routes packets to their owning connections. It does so via a connection
16f6236087SAlyssa Ross //! `HashMap`, keyed by what is basically a (host_port, guest_port) tuple.
17f6236087SAlyssa Ross //!
18f6236087SAlyssa Ross //! Vsock packet traffic needs to be inspected, in order to detect connection request
19f6236087SAlyssa Ross //! packets (leading to the creation of a new connection), and connection reset packets
20f6236087SAlyssa Ross //! (leading to the termination of an existing connection). All other packets, though, must
21f6236087SAlyssa Ross //! belong to an existing connection and, as such, the muxer simply forwards them.
22f6236087SAlyssa Ross //!
23f6236087SAlyssa Ross //! ## Event dispatcher
24f6236087SAlyssa Ross //!
25f6236087SAlyssa Ross //! There are three event categories that the vsock backend is interested it:
26f6236087SAlyssa Ross //! 1. A new host-initiated connection is ready to be accepted from the listening host Unix
27f6236087SAlyssa Ross //!    socket;
28f6236087SAlyssa Ross //! 2. Data is available for reading from a newly-accepted host-initiated connection (i.e.
29f6236087SAlyssa Ross //!    the host is ready to issue a vsock connection request, informing us of the
30f6236087SAlyssa Ross //!    destination port to which it wants to connect);
31f6236087SAlyssa Ross //! 3. Some event was triggered for a connected Unix socket, that belongs to a
32f6236087SAlyssa Ross //!    `VsockConnection`.
33f6236087SAlyssa Ross //!
34f6236087SAlyssa Ross //! The muxer gets notified about all of these events, because, as a `VsockEpollListener`
3542e9632cSJosh Soref //! implementor, it gets to register a nested epoll FD into the main VMM epoll()ing loop. All
36f6236087SAlyssa Ross //! other pollable FDs are then registered under this nested epoll FD.
37f6236087SAlyssa Ross //!
38f6236087SAlyssa Ross //! To route all these events to their handlers, the muxer uses another `HashMap` object,
39f6236087SAlyssa Ross //! mapping `RawFd`s to `EpollListener`s.
40f6236087SAlyssa Ross 
41434a5d0eSSebastien Boeuf use std::collections::{HashMap, HashSet};
4235782bd9SBo Chen use std::fs::File;
4348de8007SAlyssa Ross use std::io::{self, ErrorKind, Read};
4435782bd9SBo Chen use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
45434a5d0eSSebastien Boeuf use std::os::unix::net::{UnixListener, UnixStream};
46434a5d0eSSebastien Boeuf 
479701fde2SSebastien Boeuf use super::super::csm::ConnState;
48434a5d0eSSebastien Boeuf use super::super::defs::uapi;
49434a5d0eSSebastien Boeuf use super::super::packet::VsockPacket;
50434a5d0eSSebastien Boeuf use super::super::{
51434a5d0eSSebastien Boeuf     Result as VsockResult, VsockBackend, VsockChannel, VsockEpollListener, VsockError,
52434a5d0eSSebastien Boeuf };
53434a5d0eSSebastien Boeuf use super::muxer_killq::MuxerKillQ;
54434a5d0eSSebastien Boeuf use super::muxer_rxq::MuxerRxQ;
55*61e57e1cSRuoqing He use super::{defs, Error, MuxerConnection, Result};
56434a5d0eSSebastien Boeuf 
57434a5d0eSSebastien Boeuf /// A unique identifier of a `MuxerConnection` object. Connections are stored in a hash map,
58434a5d0eSSebastien Boeuf /// keyed by a `ConnMapKey` object.
59434a5d0eSSebastien Boeuf ///
60434a5d0eSSebastien Boeuf #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
61434a5d0eSSebastien Boeuf pub struct ConnMapKey {
62434a5d0eSSebastien Boeuf     local_port: u32,
63434a5d0eSSebastien Boeuf     peer_port: u32,
64434a5d0eSSebastien Boeuf }
65434a5d0eSSebastien Boeuf 
66434a5d0eSSebastien Boeuf /// A muxer RX queue item.
67434a5d0eSSebastien Boeuf ///
685fc52a50SStefano Garzarella #[derive(Clone, Copy, Debug)]
69434a5d0eSSebastien Boeuf pub enum MuxerRx {
70434a5d0eSSebastien Boeuf     /// The packet must be fetched from the connection identified by `ConnMapKey`.
71434a5d0eSSebastien Boeuf     ConnRx(ConnMapKey),
72434a5d0eSSebastien Boeuf     /// The muxer must produce an RST packet.
73434a5d0eSSebastien Boeuf     RstPkt { local_port: u32, peer_port: u32 },
74434a5d0eSSebastien Boeuf }
75434a5d0eSSebastien Boeuf 
76434a5d0eSSebastien Boeuf /// An epoll listener, registered under the muxer's nested epoll FD.
77434a5d0eSSebastien Boeuf ///
78434a5d0eSSebastien Boeuf enum EpollListener {
79434a5d0eSSebastien Boeuf     /// The listener is a `MuxerConnection`, identified by `key`, and interested in the events
80434a5d0eSSebastien Boeuf     /// in `evset`. Since `MuxerConnection` implements `VsockEpollListener`, notifications will
81434a5d0eSSebastien Boeuf     /// be forwarded to the listener via `VsockEpollListener::notify()`.
82434a5d0eSSebastien Boeuf     Connection {
83434a5d0eSSebastien Boeuf         key: ConnMapKey,
84434a5d0eSSebastien Boeuf         evset: epoll::Events,
85434a5d0eSSebastien Boeuf     },
86434a5d0eSSebastien Boeuf     /// A listener interested in new host-initiated connections.
87434a5d0eSSebastien Boeuf     HostSock,
88a807b91fSAlyssa Ross     /// A listener interested in reading host "connect \<port>" commands from a freshly
89434a5d0eSSebastien Boeuf     /// connected host socket.
90434a5d0eSSebastien Boeuf     LocalStream(UnixStream),
91434a5d0eSSebastien Boeuf }
92434a5d0eSSebastien Boeuf 
9348de8007SAlyssa Ross /// A partially read "CONNECT" command.
9448de8007SAlyssa Ross #[derive(Default)]
9548de8007SAlyssa Ross struct PartiallyReadCommand {
9648de8007SAlyssa Ross     /// The bytes of the command that have been read so far.
9748de8007SAlyssa Ross     buf: [u8; 32],
9848de8007SAlyssa Ross     /// How much of `buf` has been used.
9948de8007SAlyssa Ross     len: usize,
10048de8007SAlyssa Ross }
10148de8007SAlyssa Ross 
102434a5d0eSSebastien Boeuf /// The vsock connection multiplexer.
103434a5d0eSSebastien Boeuf ///
104434a5d0eSSebastien Boeuf pub struct VsockMuxer {
105434a5d0eSSebastien Boeuf     /// Guest CID.
106434a5d0eSSebastien Boeuf     cid: u64,
107434a5d0eSSebastien Boeuf     /// A hash map used to store the active connections.
108434a5d0eSSebastien Boeuf     conn_map: HashMap<ConnMapKey, MuxerConnection>,
109434a5d0eSSebastien Boeuf     /// A hash map used to store epoll event listeners / handlers.
110434a5d0eSSebastien Boeuf     listener_map: HashMap<RawFd, EpollListener>,
11148de8007SAlyssa Ross     /// A hash map used to store partially read "connect" commands.
11248de8007SAlyssa Ross     partial_command_map: HashMap<RawFd, PartiallyReadCommand>,
113434a5d0eSSebastien Boeuf     /// The RX queue. Items in this queue are consumed by `VsockMuxer::recv_pkt()`, and
114434a5d0eSSebastien Boeuf     /// produced
115434a5d0eSSebastien Boeuf     /// - by `VsockMuxer::send_pkt()` (e.g. RST in response to a connection request packet);
116434a5d0eSSebastien Boeuf     ///   and
117434a5d0eSSebastien Boeuf     /// - in response to EPOLLIN events (e.g. data available to be read from an AF_UNIX
118434a5d0eSSebastien Boeuf     ///   socket).
119434a5d0eSSebastien Boeuf     rxq: MuxerRxQ,
120434a5d0eSSebastien Boeuf     /// A queue used for terminating connections that are taking too long to shut down.
121434a5d0eSSebastien Boeuf     killq: MuxerKillQ,
122434a5d0eSSebastien Boeuf     /// The Unix socket, through which host-initiated connections are accepted.
123434a5d0eSSebastien Boeuf     host_sock: UnixListener,
124434a5d0eSSebastien Boeuf     /// The file system path of the host-side Unix socket. This is used to figure out the path
125a807b91fSAlyssa Ross     /// to Unix sockets listening on specific ports. I.e. "\<this path>_\<port number>".
126434a5d0eSSebastien Boeuf     host_sock_path: String,
12735782bd9SBo Chen     /// The nested epoll File, used to register epoll listeners.
12835782bd9SBo Chen     epoll_file: File,
129434a5d0eSSebastien Boeuf     /// A hash set used to keep track of used host-side (local) ports, in order to assign local
130434a5d0eSSebastien Boeuf     /// ports to host-initiated connections.
131434a5d0eSSebastien Boeuf     local_port_set: HashSet<u32>,
132434a5d0eSSebastien Boeuf     /// The last used host-side port.
133434a5d0eSSebastien Boeuf     local_port_last: u32,
134434a5d0eSSebastien Boeuf }
135434a5d0eSSebastien Boeuf 
136434a5d0eSSebastien Boeuf impl VsockChannel for VsockMuxer {
137434a5d0eSSebastien Boeuf     /// Deliver a vsock packet to the guest vsock driver.
138434a5d0eSSebastien Boeuf     ///
1395c3f4dbeSJosh Soref     /// Returns:
140434a5d0eSSebastien Boeuf     /// - `Ok(())`: `pkt` has been successfully filled in; or
141434a5d0eSSebastien Boeuf     /// - `Err(VsockError::NoData)`: there was no available data with which to fill in the
142434a5d0eSSebastien Boeuf     ///   packet.
143434a5d0eSSebastien Boeuf     ///
recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()>144434a5d0eSSebastien Boeuf     fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> {
145434a5d0eSSebastien Boeuf         // We'll look for instructions on how to build the RX packet in the RX queue. If the
146434a5d0eSSebastien Boeuf         // queue is empty, that doesn't necessarily mean we don't have any pending RX, since
147434a5d0eSSebastien Boeuf         // the queue might be out-of-sync. If that's the case, we'll attempt to sync it first,
148434a5d0eSSebastien Boeuf         // and then try to pop something out again.
149434a5d0eSSebastien Boeuf         if self.rxq.is_empty() && !self.rxq.is_synced() {
150434a5d0eSSebastien Boeuf             self.rxq = MuxerRxQ::from_conn_map(&self.conn_map);
151434a5d0eSSebastien Boeuf         }
152434a5d0eSSebastien Boeuf 
1535fc52a50SStefano Garzarella         while let Some(rx) = self.rxq.peek() {
154434a5d0eSSebastien Boeuf             let res = match rx {
155434a5d0eSSebastien Boeuf                 // We need to build an RST packet, going from `local_port` to `peer_port`.
156434a5d0eSSebastien Boeuf                 MuxerRx::RstPkt {
157434a5d0eSSebastien Boeuf                     local_port,
158434a5d0eSSebastien Boeuf                     peer_port,
159434a5d0eSSebastien Boeuf                 } => {
160434a5d0eSSebastien Boeuf                     pkt.set_op(uapi::VSOCK_OP_RST)
161434a5d0eSSebastien Boeuf                         .set_src_cid(uapi::VSOCK_HOST_CID)
162434a5d0eSSebastien Boeuf                         .set_dst_cid(self.cid)
163434a5d0eSSebastien Boeuf                         .set_src_port(local_port)
164434a5d0eSSebastien Boeuf                         .set_dst_port(peer_port)
165434a5d0eSSebastien Boeuf                         .set_len(0)
166434a5d0eSSebastien Boeuf                         .set_type(uapi::VSOCK_TYPE_STREAM)
167434a5d0eSSebastien Boeuf                         .set_flags(0)
168434a5d0eSSebastien Boeuf                         .set_buf_alloc(0)
169434a5d0eSSebastien Boeuf                         .set_fwd_cnt(0);
1705fc52a50SStefano Garzarella                     self.rxq.pop().unwrap();
171434a5d0eSSebastien Boeuf                     return Ok(());
172434a5d0eSSebastien Boeuf                 }
173434a5d0eSSebastien Boeuf 
174434a5d0eSSebastien Boeuf                 // We'll defer building the packet to this connection, since it has something
175434a5d0eSSebastien Boeuf                 // to say.
176434a5d0eSSebastien Boeuf                 MuxerRx::ConnRx(key) => {
177434a5d0eSSebastien Boeuf                     let mut conn_res = Err(VsockError::NoData);
1785fc52a50SStefano Garzarella                     let mut do_pop = true;
179434a5d0eSSebastien Boeuf                     self.apply_conn_mutation(key, |conn| {
180434a5d0eSSebastien Boeuf                         conn_res = conn.recv_pkt(pkt);
1815fc52a50SStefano Garzarella                         do_pop = !conn.has_pending_rx();
182434a5d0eSSebastien Boeuf                     });
1835fc52a50SStefano Garzarella                     if do_pop {
1845fc52a50SStefano Garzarella                         self.rxq.pop().unwrap();
1855fc52a50SStefano Garzarella                     }
186434a5d0eSSebastien Boeuf                     conn_res
187434a5d0eSSebastien Boeuf                 }
188434a5d0eSSebastien Boeuf             };
189434a5d0eSSebastien Boeuf 
190434a5d0eSSebastien Boeuf             if res.is_ok() {
191434a5d0eSSebastien Boeuf                 // Inspect traffic, looking for RST packets, since that means we have to
192434a5d0eSSebastien Boeuf                 // terminate and remove this connection from the active connection pool.
193434a5d0eSSebastien Boeuf                 //
194434a5d0eSSebastien Boeuf                 if pkt.op() == uapi::VSOCK_OP_RST {
195434a5d0eSSebastien Boeuf                     self.remove_connection(ConnMapKey {
196434a5d0eSSebastien Boeuf                         local_port: pkt.src_port(),
197434a5d0eSSebastien Boeuf                         peer_port: pkt.dst_port(),
198434a5d0eSSebastien Boeuf                     });
199434a5d0eSSebastien Boeuf                 }
200434a5d0eSSebastien Boeuf 
201434a5d0eSSebastien Boeuf                 debug!("vsock muxer: RX pkt: {:?}", pkt.hdr());
202434a5d0eSSebastien Boeuf                 return Ok(());
203434a5d0eSSebastien Boeuf             }
204434a5d0eSSebastien Boeuf         }
205434a5d0eSSebastien Boeuf 
206434a5d0eSSebastien Boeuf         Err(VsockError::NoData)
207434a5d0eSSebastien Boeuf     }
208434a5d0eSSebastien Boeuf 
209434a5d0eSSebastien Boeuf     /// Deliver a guest-generated packet to its destination in the vsock backend.
210434a5d0eSSebastien Boeuf     ///
211434a5d0eSSebastien Boeuf     /// This absorbs unexpected packets, handles RSTs (by dropping connections), and forwards
212434a5d0eSSebastien Boeuf     /// all the rest to their owning `MuxerConnection`.
213434a5d0eSSebastien Boeuf     ///
214434a5d0eSSebastien Boeuf     /// Returns:
215434a5d0eSSebastien Boeuf     /// always `Ok(())` - the packet has been consumed, and its virtio TX buffers can be
216434a5d0eSSebastien Boeuf     /// returned to the guest vsock driver.
217434a5d0eSSebastien Boeuf     ///
send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()>218434a5d0eSSebastien Boeuf     fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> {
219434a5d0eSSebastien Boeuf         let conn_key = ConnMapKey {
220434a5d0eSSebastien Boeuf             local_port: pkt.dst_port(),
221434a5d0eSSebastien Boeuf             peer_port: pkt.src_port(),
222434a5d0eSSebastien Boeuf         };
223434a5d0eSSebastien Boeuf 
224434a5d0eSSebastien Boeuf         debug!(
225434a5d0eSSebastien Boeuf             "vsock: muxer.send[rxq.len={}]: {:?}",
226434a5d0eSSebastien Boeuf             self.rxq.len(),
227434a5d0eSSebastien Boeuf             pkt.hdr()
228434a5d0eSSebastien Boeuf         );
229434a5d0eSSebastien Boeuf 
230434a5d0eSSebastien Boeuf         // If this packet has an unsupported type (!=stream), we must send back an RST.
231434a5d0eSSebastien Boeuf         //
232434a5d0eSSebastien Boeuf         if pkt.type_() != uapi::VSOCK_TYPE_STREAM {
233434a5d0eSSebastien Boeuf             self.enq_rst(pkt.dst_port(), pkt.src_port());
234434a5d0eSSebastien Boeuf             return Ok(());
235434a5d0eSSebastien Boeuf         }
236434a5d0eSSebastien Boeuf 
237434a5d0eSSebastien Boeuf         // We don't know how to handle packets addressed to other CIDs. We only handle the host
238434a5d0eSSebastien Boeuf         // part of the guest - host communication here.
239434a5d0eSSebastien Boeuf         if pkt.dst_cid() != uapi::VSOCK_HOST_CID {
240434a5d0eSSebastien Boeuf             info!(
241434a5d0eSSebastien Boeuf                 "vsock: dropping guest packet for unknown CID: {:?}",
242434a5d0eSSebastien Boeuf                 pkt.hdr()
243434a5d0eSSebastien Boeuf             );
244434a5d0eSSebastien Boeuf             return Ok(());
245434a5d0eSSebastien Boeuf         }
246434a5d0eSSebastien Boeuf 
247434a5d0eSSebastien Boeuf         if !self.conn_map.contains_key(&conn_key) {
248434a5d0eSSebastien Boeuf             // This packet can't be routed to any active connection (based on its src and dst
249434a5d0eSSebastien Boeuf             // ports).  The only orphan / unroutable packets we know how to handle are
250434a5d0eSSebastien Boeuf             // connection requests.
251434a5d0eSSebastien Boeuf             if pkt.op() == uapi::VSOCK_OP_REQUEST {
252434a5d0eSSebastien Boeuf                 // Oh, this is a connection request!
2535825ab2dSBo Chen                 self.handle_peer_request_pkt(pkt);
254434a5d0eSSebastien Boeuf             } else {
255434a5d0eSSebastien Boeuf                 // Send back an RST, to let the drive know we weren't expecting this packet.
256434a5d0eSSebastien Boeuf                 self.enq_rst(pkt.dst_port(), pkt.src_port());
257434a5d0eSSebastien Boeuf             }
258434a5d0eSSebastien Boeuf             return Ok(());
259434a5d0eSSebastien Boeuf         }
260434a5d0eSSebastien Boeuf 
261434a5d0eSSebastien Boeuf         // Right, we know where to send this packet, then (to `conn_key`).
262434a5d0eSSebastien Boeuf         // However, if this is an RST, we have to forcefully terminate the connection, so
263434a5d0eSSebastien Boeuf         // there's no point in forwarding it the packet.
264434a5d0eSSebastien Boeuf         if pkt.op() == uapi::VSOCK_OP_RST {
265434a5d0eSSebastien Boeuf             self.remove_connection(conn_key);
266434a5d0eSSebastien Boeuf             return Ok(());
267434a5d0eSSebastien Boeuf         }
268434a5d0eSSebastien Boeuf 
269434a5d0eSSebastien Boeuf         // Alright, everything looks in order - forward this packet to its owning connection.
270434a5d0eSSebastien Boeuf         let mut res: VsockResult<()> = Ok(());
271434a5d0eSSebastien Boeuf         self.apply_conn_mutation(conn_key, |conn| {
272434a5d0eSSebastien Boeuf             res = conn.send_pkt(pkt);
273434a5d0eSSebastien Boeuf         });
274434a5d0eSSebastien Boeuf 
275434a5d0eSSebastien Boeuf         res
276434a5d0eSSebastien Boeuf     }
277434a5d0eSSebastien Boeuf 
278434a5d0eSSebastien Boeuf     /// Check if the muxer has any pending RX data, with which to fill a guest-provided RX
279434a5d0eSSebastien Boeuf     /// buffer.
280434a5d0eSSebastien Boeuf     ///
has_pending_rx(&self) -> bool281434a5d0eSSebastien Boeuf     fn has_pending_rx(&self) -> bool {
282434a5d0eSSebastien Boeuf         !self.rxq.is_empty() || !self.rxq.is_synced()
283434a5d0eSSebastien Boeuf     }
284434a5d0eSSebastien Boeuf }
285434a5d0eSSebastien Boeuf 
286434a5d0eSSebastien Boeuf impl VsockEpollListener for VsockMuxer {
287434a5d0eSSebastien Boeuf     /// Get the FD to be registered for polling upstream (in the main VMM epoll loop, in this
288434a5d0eSSebastien Boeuf     /// case).
289434a5d0eSSebastien Boeuf     ///
290434a5d0eSSebastien Boeuf     /// This will be the muxer's nested epoll FD.
291434a5d0eSSebastien Boeuf     ///
get_polled_fd(&self) -> RawFd292434a5d0eSSebastien Boeuf     fn get_polled_fd(&self) -> RawFd {
29335782bd9SBo Chen         self.epoll_file.as_raw_fd()
294434a5d0eSSebastien Boeuf     }
295434a5d0eSSebastien Boeuf 
296434a5d0eSSebastien Boeuf     /// Get the epoll events to be polled upstream.
297434a5d0eSSebastien Boeuf     ///
298434a5d0eSSebastien Boeuf     /// Since the polled FD is a nested epoll FD, we're only interested in EPOLLIN events (i.e.
2995c3f4dbeSJosh Soref     /// some event occurred on one of the FDs registered under our epoll FD).
300434a5d0eSSebastien Boeuf     ///
get_polled_evset(&self) -> epoll::Events301434a5d0eSSebastien Boeuf     fn get_polled_evset(&self) -> epoll::Events {
302434a5d0eSSebastien Boeuf         epoll::Events::EPOLLIN
303434a5d0eSSebastien Boeuf     }
304434a5d0eSSebastien Boeuf 
3055c3f4dbeSJosh Soref     /// Notify the muxer about a pending event having occurred under its nested epoll FD.
306434a5d0eSSebastien Boeuf     ///
notify(&mut self, _: epoll::Events)307434a5d0eSSebastien Boeuf     fn notify(&mut self, _: epoll::Events) {
308434a5d0eSSebastien Boeuf         debug!("vsock: muxer received kick");
309434a5d0eSSebastien Boeuf 
310434a5d0eSSebastien Boeuf         let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); 32];
31156d7c042SSebastien Boeuf         'epoll: loop {
31235782bd9SBo Chen             match epoll::wait(self.epoll_file.as_raw_fd(), 0, epoll_events.as_mut_slice()) {
313434a5d0eSSebastien Boeuf                 Ok(ev_cnt) => {
3144f05b846SWei Liu                     for evt in epoll_events.iter().take(ev_cnt) {
315434a5d0eSSebastien Boeuf                         self.handle_event(
3164f05b846SWei Liu                             evt.data as RawFd,
3174f05b846SWei Liu                             // It's ok to unwrap here, since the `evt.events` is filled
318434a5d0eSSebastien Boeuf                             // in by `epoll::wait()`, and therefore contains only valid epoll
319434a5d0eSSebastien Boeuf                             // flags.
3204f05b846SWei Liu                             epoll::Events::from_bits(evt.events).unwrap(),
321434a5d0eSSebastien Boeuf                         );
322434a5d0eSSebastien Boeuf                     }
323434a5d0eSSebastien Boeuf                 }
324434a5d0eSSebastien Boeuf                 Err(e) => {
32556d7c042SSebastien Boeuf                     if e.kind() == io::ErrorKind::Interrupted {
32656d7c042SSebastien Boeuf                         // It's well defined from the epoll_wait() syscall
32756d7c042SSebastien Boeuf                         // documentation that the epoll loop can be interrupted
32856d7c042SSebastien Boeuf                         // before any of the requested events occurred or the
32956d7c042SSebastien Boeuf                         // timeout expired. In both those cases, epoll_wait()
33056d7c042SSebastien Boeuf                         // returns an error of type EINTR, but this should not
33156d7c042SSebastien Boeuf                         // be considered as a regular error. Instead it is more
33256d7c042SSebastien Boeuf                         // appropriate to retry, by calling into epoll_wait().
33356d7c042SSebastien Boeuf                         continue;
33456d7c042SSebastien Boeuf                     }
335434a5d0eSSebastien Boeuf                     warn!("vsock: failed to consume muxer epoll event: {}", e);
336434a5d0eSSebastien Boeuf                 }
337434a5d0eSSebastien Boeuf             }
33856d7c042SSebastien Boeuf             break 'epoll;
33956d7c042SSebastien Boeuf         }
340434a5d0eSSebastien Boeuf     }
341434a5d0eSSebastien Boeuf }
342434a5d0eSSebastien Boeuf 
343434a5d0eSSebastien Boeuf impl VsockBackend for VsockMuxer {}
344434a5d0eSSebastien Boeuf 
345434a5d0eSSebastien Boeuf impl VsockMuxer {
346434a5d0eSSebastien Boeuf     /// Muxer constructor.
347434a5d0eSSebastien Boeuf     ///
new(cid: u32, host_sock_path: String) -> Result<Self>348451d3fb2SAlyssa Ross     pub fn new(cid: u32, host_sock_path: String) -> Result<Self> {
349434a5d0eSSebastien Boeuf         // Create the nested epoll FD. This FD will be added to the VMM `EpollContext`, at
350434a5d0eSSebastien Boeuf         // device activation time.
351434a5d0eSSebastien Boeuf         let epoll_fd = epoll::create(true).map_err(Error::EpollFdCreate)?;
35235782bd9SBo Chen         // Use 'File' to enforce closing on 'epoll_fd'
353c45d24dfSWei Liu         // SAFETY: epoll_fd is a valid fd
35435782bd9SBo Chen         let epoll_file = unsafe { File::from_raw_fd(epoll_fd) };
355434a5d0eSSebastien Boeuf 
356434a5d0eSSebastien Boeuf         // Open/bind/listen on the host Unix socket, so we can accept host-initiated
357434a5d0eSSebastien Boeuf         // connections.
358434a5d0eSSebastien Boeuf         let host_sock = UnixListener::bind(&host_sock_path)
359434a5d0eSSebastien Boeuf             .and_then(|sock| sock.set_nonblocking(true).map(|_| sock))
360434a5d0eSSebastien Boeuf             .map_err(Error::UnixBind)?;
361434a5d0eSSebastien Boeuf 
362434a5d0eSSebastien Boeuf         let mut muxer = Self {
363451d3fb2SAlyssa Ross             cid: cid.into(),
364434a5d0eSSebastien Boeuf             host_sock,
365434a5d0eSSebastien Boeuf             host_sock_path,
36635782bd9SBo Chen             epoll_file,
367434a5d0eSSebastien Boeuf             rxq: MuxerRxQ::new(),
368434a5d0eSSebastien Boeuf             conn_map: HashMap::with_capacity(defs::MAX_CONNECTIONS),
369434a5d0eSSebastien Boeuf             listener_map: HashMap::with_capacity(defs::MAX_CONNECTIONS + 1),
37048de8007SAlyssa Ross             partial_command_map: Default::default(),
371434a5d0eSSebastien Boeuf             killq: MuxerKillQ::new(),
372434a5d0eSSebastien Boeuf             local_port_last: (1u32 << 30) - 1,
373434a5d0eSSebastien Boeuf             local_port_set: HashSet::with_capacity(defs::MAX_CONNECTIONS),
374434a5d0eSSebastien Boeuf         };
375434a5d0eSSebastien Boeuf 
376434a5d0eSSebastien Boeuf         muxer.add_listener(muxer.host_sock.as_raw_fd(), EpollListener::HostSock)?;
377434a5d0eSSebastien Boeuf         Ok(muxer)
378434a5d0eSSebastien Boeuf     }
379434a5d0eSSebastien Boeuf 
380434a5d0eSSebastien Boeuf     /// Handle/dispatch an epoll event to its listener.
381434a5d0eSSebastien Boeuf     ///
handle_event(&mut self, fd: RawFd, event_set: epoll::Events)382c47ab55eSSebastien Boeuf     fn handle_event(&mut self, fd: RawFd, event_set: epoll::Events) {
383434a5d0eSSebastien Boeuf         debug!(
384c47ab55eSSebastien Boeuf             "vsock: muxer processing event: fd={}, event_set={:?}",
385c47ab55eSSebastien Boeuf             fd, event_set
386434a5d0eSSebastien Boeuf         );
387434a5d0eSSebastien Boeuf 
388434a5d0eSSebastien Boeuf         match self.listener_map.get_mut(&fd) {
389434a5d0eSSebastien Boeuf             // This event needs to be forwarded to a `MuxerConnection` that is listening for
390434a5d0eSSebastien Boeuf             // it.
391434a5d0eSSebastien Boeuf             //
392c47ab55eSSebastien Boeuf             Some(EpollListener::Connection { key, evset: _ }) => {
393434a5d0eSSebastien Boeuf                 let key_copy = *key;
394434a5d0eSSebastien Boeuf                 // The handling of this event will most probably mutate the state of the
3955c3f4dbeSJosh Soref                 // receiving connection. We'll need to check for new pending RX, event set
396434a5d0eSSebastien Boeuf                 // mutation, and all that, so we're wrapping the event delivery inside those
397434a5d0eSSebastien Boeuf                 // checks.
398434a5d0eSSebastien Boeuf                 self.apply_conn_mutation(key_copy, |conn| {
399c47ab55eSSebastien Boeuf                     conn.notify(event_set);
400434a5d0eSSebastien Boeuf                 });
401434a5d0eSSebastien Boeuf             }
402434a5d0eSSebastien Boeuf 
403434a5d0eSSebastien Boeuf             // A new host-initiated connection is ready to be accepted.
404434a5d0eSSebastien Boeuf             //
405434a5d0eSSebastien Boeuf             Some(EpollListener::HostSock) => {
406434a5d0eSSebastien Boeuf                 if self.conn_map.len() == defs::MAX_CONNECTIONS {
407434a5d0eSSebastien Boeuf                     // If we're already maxed-out on connections, we'll just accept and
408434a5d0eSSebastien Boeuf                     // immediately discard this potentially new one.
409434a5d0eSSebastien Boeuf                     warn!("vsock: connection limit reached; refusing new host connection");
410434a5d0eSSebastien Boeuf                     self.host_sock.accept().map(|_| 0).unwrap_or(0);
411434a5d0eSSebastien Boeuf                     return;
412434a5d0eSSebastien Boeuf                 }
413434a5d0eSSebastien Boeuf                 self.host_sock
414434a5d0eSSebastien Boeuf                     .accept()
415434a5d0eSSebastien Boeuf                     .map_err(Error::UnixAccept)
416434a5d0eSSebastien Boeuf                     .and_then(|(stream, _)| {
417434a5d0eSSebastien Boeuf                         stream
418434a5d0eSSebastien Boeuf                             .set_nonblocking(true)
419434a5d0eSSebastien Boeuf                             .map(|_| stream)
420434a5d0eSSebastien Boeuf                             .map_err(Error::UnixAccept)
421434a5d0eSSebastien Boeuf                     })
422434a5d0eSSebastien Boeuf                     .and_then(|stream| {
423434a5d0eSSebastien Boeuf                         // Before forwarding this connection to a listening AF_VSOCK socket on
424434a5d0eSSebastien Boeuf                         // the guest side, we need to know the destination port. We'll read
425434a5d0eSSebastien Boeuf                         // that port from a "connect" command received on this socket, so the
426434a5d0eSSebastien Boeuf                         // next step is to ask to be notified the moment we can read from it.
427434a5d0eSSebastien Boeuf                         self.add_listener(stream.as_raw_fd(), EpollListener::LocalStream(stream))
428434a5d0eSSebastien Boeuf                     })
429434a5d0eSSebastien Boeuf                     .unwrap_or_else(|err| {
430434a5d0eSSebastien Boeuf                         warn!("vsock: unable to accept local connection: {:?}", err);
431434a5d0eSSebastien Boeuf                     });
432434a5d0eSSebastien Boeuf             }
433434a5d0eSSebastien Boeuf 
434434a5d0eSSebastien Boeuf             // Data is ready to be read from a host-initiated connection. That would be the
435434a5d0eSSebastien Boeuf             // "connect" command that we're expecting.
436434a5d0eSSebastien Boeuf             Some(EpollListener::LocalStream(_)) => {
43748de8007SAlyssa Ross                 if let Some(EpollListener::LocalStream(stream)) = self.listener_map.get_mut(&fd) {
43848de8007SAlyssa Ross                     let port = Self::read_local_stream_port(&mut self.partial_command_map, stream);
43948de8007SAlyssa Ross 
44048de8007SAlyssa Ross                     if let Err(Error::UnixRead(ref e)) = port {
44148de8007SAlyssa Ross                         if e.kind() == ErrorKind::WouldBlock {
44248de8007SAlyssa Ross                             return;
44348de8007SAlyssa Ross                         }
44448de8007SAlyssa Ross                     }
44548de8007SAlyssa Ross 
44648de8007SAlyssa Ross                     let stream = match self.remove_listener(fd) {
44748de8007SAlyssa Ross                         Some(EpollListener::LocalStream(s)) => s,
44848de8007SAlyssa Ross                         _ => unreachable!(),
44948de8007SAlyssa Ross                     };
45048de8007SAlyssa Ross 
45148de8007SAlyssa Ross                     port.and_then(|peer_port| {
45248de8007SAlyssa Ross                         let local_port = self.allocate_local_port();
45348de8007SAlyssa Ross 
454434a5d0eSSebastien Boeuf                         self.add_connection(
455434a5d0eSSebastien Boeuf                             ConnMapKey {
456434a5d0eSSebastien Boeuf                                 local_port,
457434a5d0eSSebastien Boeuf                                 peer_port,
458434a5d0eSSebastien Boeuf                             },
459434a5d0eSSebastien Boeuf                             MuxerConnection::new_local_init(
460434a5d0eSSebastien Boeuf                                 stream,
461434a5d0eSSebastien Boeuf                                 uapi::VSOCK_HOST_CID,
462434a5d0eSSebastien Boeuf                                 self.cid,
463434a5d0eSSebastien Boeuf                                 local_port,
464434a5d0eSSebastien Boeuf                                 peer_port,
465434a5d0eSSebastien Boeuf                             ),
466434a5d0eSSebastien Boeuf                         )
467434a5d0eSSebastien Boeuf                     })
468434a5d0eSSebastien Boeuf                     .unwrap_or_else(|err| {
469434a5d0eSSebastien Boeuf                         info!("vsock: error adding local-init connection: {:?}", err);
470434a5d0eSSebastien Boeuf                     })
471434a5d0eSSebastien Boeuf                 }
472434a5d0eSSebastien Boeuf             }
473434a5d0eSSebastien Boeuf 
474434a5d0eSSebastien Boeuf             _ => {
475c47ab55eSSebastien Boeuf                 info!(
476c47ab55eSSebastien Boeuf                     "vsock: unexpected event: fd={:?}, event_set={:?}",
477c47ab55eSSebastien Boeuf                     fd, event_set
478c47ab55eSSebastien Boeuf                 );
479434a5d0eSSebastien Boeuf             }
480434a5d0eSSebastien Boeuf         }
481434a5d0eSSebastien Boeuf     }
482434a5d0eSSebastien Boeuf 
483434a5d0eSSebastien Boeuf     /// Parse a host "connect" command, and extract the destination vsock port.
484434a5d0eSSebastien Boeuf     ///
read_local_stream_port( partial_command_map: &mut HashMap<RawFd, PartiallyReadCommand>, stream: &mut UnixStream, ) -> Result<u32>48548de8007SAlyssa Ross     fn read_local_stream_port(
48648de8007SAlyssa Ross         partial_command_map: &mut HashMap<RawFd, PartiallyReadCommand>,
48748de8007SAlyssa Ross         stream: &mut UnixStream,
48848de8007SAlyssa Ross     ) -> Result<u32> {
48948de8007SAlyssa Ross         let command = partial_command_map.entry(stream.as_raw_fd()).or_default();
490434a5d0eSSebastien Boeuf 
491434a5d0eSSebastien Boeuf         // This is the minimum number of bytes that we should be able to read, when parsing a
492434a5d0eSSebastien Boeuf         // valid connection request. I.e. `b"connect 0\n".len()`.
49348de8007SAlyssa Ross         const MIN_COMMAND_LEN: usize = 10;
494434a5d0eSSebastien Boeuf 
495434a5d0eSSebastien Boeuf         // Bring in the minimum number of bytes that we should be able to read.
49682ac114bSWei Liu         stream
49782ac114bSWei Liu             .read_exact(&mut command.buf[command.len..MIN_COMMAND_LEN])
498434a5d0eSSebastien Boeuf             .map_err(Error::UnixRead)?;
49982ac114bSWei Liu         command.len = MIN_COMMAND_LEN;
500434a5d0eSSebastien Boeuf 
501434a5d0eSSebastien Boeuf         // Now, finish reading the destination port number, by bringing in one byte at a time,
502434a5d0eSSebastien Boeuf         // until we reach an EOL terminator (or our buffer space runs out).  Yeah, not
503434a5d0eSSebastien Boeuf         // particularly proud of this approach, but it will have to do for now.
50448de8007SAlyssa Ross         while command.buf[command.len - 1] != b'\n' && command.len < command.buf.len() {
50548de8007SAlyssa Ross             command.len += stream
50648de8007SAlyssa Ross                 .read(&mut command.buf[command.len..=command.len])
507434a5d0eSSebastien Boeuf                 .map_err(Error::UnixRead)?;
508434a5d0eSSebastien Boeuf         }
509434a5d0eSSebastien Boeuf 
51048de8007SAlyssa Ross         let command = partial_command_map.remove(&stream.as_raw_fd()).unwrap();
51148de8007SAlyssa Ross 
51248de8007SAlyssa Ross         let mut word_iter = std::str::from_utf8(&command.buf[..command.len])
5136837de90SRob Bradford             .map_err(Error::ConvertFromUtf8)?
514434a5d0eSSebastien Boeuf             .split_whitespace();
515434a5d0eSSebastien Boeuf 
516434a5d0eSSebastien Boeuf         word_iter
517434a5d0eSSebastien Boeuf             .next()
518434a5d0eSSebastien Boeuf             .ok_or(Error::InvalidPortRequest)
519434a5d0eSSebastien Boeuf             .and_then(|word| {
520434a5d0eSSebastien Boeuf                 if word.to_lowercase() == "connect" {
521434a5d0eSSebastien Boeuf                     Ok(())
522434a5d0eSSebastien Boeuf                 } else {
523434a5d0eSSebastien Boeuf                     Err(Error::InvalidPortRequest)
524434a5d0eSSebastien Boeuf                 }
525434a5d0eSSebastien Boeuf             })
526434a5d0eSSebastien Boeuf             .and_then(|_| word_iter.next().ok_or(Error::InvalidPortRequest))
5270a7bcc9aSSebastien Boeuf             .and_then(|word| word.parse::<u32>().map_err(Error::ParseInteger))
5280a7bcc9aSSebastien Boeuf             .map_err(|e| Error::ReadStreamPort(Box::new(e)))
529434a5d0eSSebastien Boeuf     }
530434a5d0eSSebastien Boeuf 
531434a5d0eSSebastien Boeuf     /// Add a new connection to the active connection pool.
532434a5d0eSSebastien Boeuf     ///
add_connection(&mut self, key: ConnMapKey, conn: MuxerConnection) -> Result<()>533434a5d0eSSebastien Boeuf     fn add_connection(&mut self, key: ConnMapKey, conn: MuxerConnection) -> Result<()> {
534434a5d0eSSebastien Boeuf         // We might need to make room for this new connection, so let's sweep the kill queue
535434a5d0eSSebastien Boeuf         // first.  It's fine to do this here because:
536434a5d0eSSebastien Boeuf         // - unless the kill queue is out of sync, this is a pretty inexpensive operation; and
537434a5d0eSSebastien Boeuf         // - we are under no pressure to respect any accurate timing for connection
538434a5d0eSSebastien Boeuf         //   termination.
539434a5d0eSSebastien Boeuf         self.sweep_killq();
540434a5d0eSSebastien Boeuf 
541434a5d0eSSebastien Boeuf         if self.conn_map.len() >= defs::MAX_CONNECTIONS {
542434a5d0eSSebastien Boeuf             info!(
543434a5d0eSSebastien Boeuf                 "vsock: muxer connection limit reached ({})",
544434a5d0eSSebastien Boeuf                 defs::MAX_CONNECTIONS
545434a5d0eSSebastien Boeuf             );
546434a5d0eSSebastien Boeuf             return Err(Error::TooManyConnections);
547434a5d0eSSebastien Boeuf         }
548434a5d0eSSebastien Boeuf 
549434a5d0eSSebastien Boeuf         self.add_listener(
550434a5d0eSSebastien Boeuf             conn.get_polled_fd(),
551434a5d0eSSebastien Boeuf             EpollListener::Connection {
552434a5d0eSSebastien Boeuf                 key,
553434a5d0eSSebastien Boeuf                 evset: conn.get_polled_evset(),
554434a5d0eSSebastien Boeuf             },
555434a5d0eSSebastien Boeuf         )
5566b40f2dbSRob Bradford         .map(|_| {
557434a5d0eSSebastien Boeuf             if conn.has_pending_rx() {
558434a5d0eSSebastien Boeuf                 // We can safely ignore any error in adding a connection RX indication. Worst
559434a5d0eSSebastien Boeuf                 // case scenario, the RX queue will get desynchronized, but we'll handle that
560434a5d0eSSebastien Boeuf                 // the next time we need to yield an RX packet.
561434a5d0eSSebastien Boeuf                 self.rxq.push(MuxerRx::ConnRx(key));
562434a5d0eSSebastien Boeuf             }
563434a5d0eSSebastien Boeuf             self.conn_map.insert(key, conn);
564434a5d0eSSebastien Boeuf         })
565434a5d0eSSebastien Boeuf     }
566434a5d0eSSebastien Boeuf 
567434a5d0eSSebastien Boeuf     /// Remove a connection from the active connection poll.
568434a5d0eSSebastien Boeuf     ///
remove_connection(&mut self, key: ConnMapKey)569434a5d0eSSebastien Boeuf     fn remove_connection(&mut self, key: ConnMapKey) {
570434a5d0eSSebastien Boeuf         if let Some(conn) = self.conn_map.remove(&key) {
571434a5d0eSSebastien Boeuf             self.remove_listener(conn.get_polled_fd());
572434a5d0eSSebastien Boeuf         }
573434a5d0eSSebastien Boeuf         self.free_local_port(key.local_port);
574434a5d0eSSebastien Boeuf     }
575434a5d0eSSebastien Boeuf 
576434a5d0eSSebastien Boeuf     /// Schedule a connection for immediate termination.
577434a5d0eSSebastien Boeuf     /// I.e. as soon as we can also let our peer know we're dropping the connection, by sending
578434a5d0eSSebastien Boeuf     /// it an RST packet.
579434a5d0eSSebastien Boeuf     ///
kill_connection(&mut self, key: ConnMapKey)580434a5d0eSSebastien Boeuf     fn kill_connection(&mut self, key: ConnMapKey) {
581434a5d0eSSebastien Boeuf         let mut had_rx = false;
582434a5d0eSSebastien Boeuf         self.conn_map.entry(key).and_modify(|conn| {
583434a5d0eSSebastien Boeuf             had_rx = conn.has_pending_rx();
584434a5d0eSSebastien Boeuf             conn.kill();
585434a5d0eSSebastien Boeuf         });
586434a5d0eSSebastien Boeuf         // This connection will now have an RST packet to yield, so we need to add it to the RX
587434a5d0eSSebastien Boeuf         // queue.  However, there's no point in doing that if it was already in the queue.
588434a5d0eSSebastien Boeuf         if !had_rx {
589434a5d0eSSebastien Boeuf             // We can safely ignore any error in adding a connection RX indication. Worst case
590434a5d0eSSebastien Boeuf             // scenario, the RX queue will get desynchronized, but we'll handle that the next
591434a5d0eSSebastien Boeuf             // time we need to yield an RX packet.
592434a5d0eSSebastien Boeuf             self.rxq.push(MuxerRx::ConnRx(key));
593434a5d0eSSebastien Boeuf         }
594434a5d0eSSebastien Boeuf     }
595434a5d0eSSebastien Boeuf 
596434a5d0eSSebastien Boeuf     /// Register a new epoll listener under the muxer's nested epoll FD.
597434a5d0eSSebastien Boeuf     ///
add_listener(&mut self, fd: RawFd, listener: EpollListener) -> Result<()>598434a5d0eSSebastien Boeuf     fn add_listener(&mut self, fd: RawFd, listener: EpollListener) -> Result<()> {
599434a5d0eSSebastien Boeuf         let evset = match listener {
600434a5d0eSSebastien Boeuf             EpollListener::Connection { evset, .. } => evset,
601434a5d0eSSebastien Boeuf             EpollListener::LocalStream(_) => epoll::Events::EPOLLIN,
602434a5d0eSSebastien Boeuf             EpollListener::HostSock => epoll::Events::EPOLLIN,
603434a5d0eSSebastien Boeuf         };
604434a5d0eSSebastien Boeuf 
605434a5d0eSSebastien Boeuf         epoll::ctl(
60635782bd9SBo Chen             self.epoll_file.as_raw_fd(),
607434a5d0eSSebastien Boeuf             epoll::ControlOptions::EPOLL_CTL_ADD,
608434a5d0eSSebastien Boeuf             fd,
609434a5d0eSSebastien Boeuf             epoll::Event::new(evset, fd as u64),
610434a5d0eSSebastien Boeuf         )
6116b40f2dbSRob Bradford         .map(|_| {
612434a5d0eSSebastien Boeuf             self.listener_map.insert(fd, listener);
613434a5d0eSSebastien Boeuf         })
614434a5d0eSSebastien Boeuf         .map_err(Error::EpollAdd)?;
615434a5d0eSSebastien Boeuf 
616434a5d0eSSebastien Boeuf         Ok(())
617434a5d0eSSebastien Boeuf     }
618434a5d0eSSebastien Boeuf 
619434a5d0eSSebastien Boeuf     /// Remove (and return) a previously registered epoll listener.
620434a5d0eSSebastien Boeuf     ///
remove_listener(&mut self, fd: RawFd) -> Option<EpollListener>621434a5d0eSSebastien Boeuf     fn remove_listener(&mut self, fd: RawFd) -> Option<EpollListener> {
622434a5d0eSSebastien Boeuf         let maybe_listener = self.listener_map.remove(&fd);
623434a5d0eSSebastien Boeuf 
624434a5d0eSSebastien Boeuf         if maybe_listener.is_some() {
625434a5d0eSSebastien Boeuf             epoll::ctl(
62635782bd9SBo Chen                 self.epoll_file.as_raw_fd(),
627434a5d0eSSebastien Boeuf                 epoll::ControlOptions::EPOLL_CTL_DEL,
628434a5d0eSSebastien Boeuf                 fd,
629434a5d0eSSebastien Boeuf                 epoll::Event::new(epoll::Events::empty(), 0),
630434a5d0eSSebastien Boeuf             )
631434a5d0eSSebastien Boeuf             .unwrap_or_else(|err| {
632434a5d0eSSebastien Boeuf                 warn!(
633434a5d0eSSebastien Boeuf                     "vosck muxer: error removing epoll listener for fd {:?}: {:?}",
634434a5d0eSSebastien Boeuf                     fd, err
635434a5d0eSSebastien Boeuf                 );
636434a5d0eSSebastien Boeuf             });
637434a5d0eSSebastien Boeuf         }
638434a5d0eSSebastien Boeuf 
639434a5d0eSSebastien Boeuf         maybe_listener
640434a5d0eSSebastien Boeuf     }
641434a5d0eSSebastien Boeuf 
642434a5d0eSSebastien Boeuf     /// Allocate a host-side port to be assigned to a new host-initiated connection.
643434a5d0eSSebastien Boeuf     ///
644434a5d0eSSebastien Boeuf     ///
allocate_local_port(&mut self) -> u32645434a5d0eSSebastien Boeuf     fn allocate_local_port(&mut self) -> u32 {
646434a5d0eSSebastien Boeuf         // TODO: this doesn't seem very space-efficient.
6475c3f4dbeSJosh Soref         // Maybe rewrite this to limit port range and use a bitmap?
648434a5d0eSSebastien Boeuf         //
649434a5d0eSSebastien Boeuf 
650434a5d0eSSebastien Boeuf         loop {
651434a5d0eSSebastien Boeuf             self.local_port_last = (self.local_port_last + 1) & !(1 << 31) | (1 << 30);
652434a5d0eSSebastien Boeuf             if self.local_port_set.insert(self.local_port_last) {
653434a5d0eSSebastien Boeuf                 break;
654434a5d0eSSebastien Boeuf             }
655434a5d0eSSebastien Boeuf         }
656434a5d0eSSebastien Boeuf         self.local_port_last
657434a5d0eSSebastien Boeuf     }
658434a5d0eSSebastien Boeuf 
659434a5d0eSSebastien Boeuf     /// Mark a previously used host-side port as free.
660434a5d0eSSebastien Boeuf     ///
free_local_port(&mut self, port: u32)661434a5d0eSSebastien Boeuf     fn free_local_port(&mut self, port: u32) {
662434a5d0eSSebastien Boeuf         self.local_port_set.remove(&port);
663434a5d0eSSebastien Boeuf     }
664434a5d0eSSebastien Boeuf 
6655c3f4dbeSJosh Soref     /// Handle a new connection request coming from our peer (the guest vsock driver).
666434a5d0eSSebastien Boeuf     ///
667434a5d0eSSebastien Boeuf     /// This will attempt to connect to a host-side Unix socket, expected to be listening at
6685c3f4dbeSJosh Soref     /// the file system path corresponding to the destination port. If successful, a new
669434a5d0eSSebastien Boeuf     /// connection object will be created and added to the connection pool. On failure, a new
670434a5d0eSSebastien Boeuf     /// RST packet will be scheduled for delivery to the guest.
671434a5d0eSSebastien Boeuf     ///
handle_peer_request_pkt(&mut self, pkt: &VsockPacket)672434a5d0eSSebastien Boeuf     fn handle_peer_request_pkt(&mut self, pkt: &VsockPacket) {
673434a5d0eSSebastien Boeuf         let port_path = format!("{}_{}", self.host_sock_path, pkt.dst_port());
674434a5d0eSSebastien Boeuf 
675434a5d0eSSebastien Boeuf         UnixStream::connect(port_path)
676434a5d0eSSebastien Boeuf             .and_then(|stream| stream.set_nonblocking(true).map(|_| stream))
677434a5d0eSSebastien Boeuf             .map_err(Error::UnixConnect)
678434a5d0eSSebastien Boeuf             .and_then(|stream| {
679434a5d0eSSebastien Boeuf                 self.add_connection(
680434a5d0eSSebastien Boeuf                     ConnMapKey {
681434a5d0eSSebastien Boeuf                         local_port: pkt.dst_port(),
682434a5d0eSSebastien Boeuf                         peer_port: pkt.src_port(),
683434a5d0eSSebastien Boeuf                     },
684434a5d0eSSebastien Boeuf                     MuxerConnection::new_peer_init(
685434a5d0eSSebastien Boeuf                         stream,
686434a5d0eSSebastien Boeuf                         uapi::VSOCK_HOST_CID,
687434a5d0eSSebastien Boeuf                         self.cid,
688434a5d0eSSebastien Boeuf                         pkt.dst_port(),
689434a5d0eSSebastien Boeuf                         pkt.src_port(),
690434a5d0eSSebastien Boeuf                         pkt.buf_alloc(),
691434a5d0eSSebastien Boeuf                     ),
692434a5d0eSSebastien Boeuf                 )
693434a5d0eSSebastien Boeuf             })
694434a5d0eSSebastien Boeuf             .unwrap_or_else(|_| self.enq_rst(pkt.dst_port(), pkt.src_port()));
695434a5d0eSSebastien Boeuf     }
696434a5d0eSSebastien Boeuf 
697434a5d0eSSebastien Boeuf     /// Perform an action that might mutate a connection's state.
698434a5d0eSSebastien Boeuf     ///
699434a5d0eSSebastien Boeuf     /// This is used as shorthand for repetitive tasks that need to be performed after a
700434a5d0eSSebastien Boeuf     /// connection object mutates. E.g.
701434a5d0eSSebastien Boeuf     /// - update the connection's epoll listener;
702434a5d0eSSebastien Boeuf     /// - schedule the connection to be queried for RX data;
703434a5d0eSSebastien Boeuf     /// - kill the connection if an unrecoverable error occurs.
704434a5d0eSSebastien Boeuf     ///
apply_conn_mutation<F>(&mut self, key: ConnMapKey, mut_fn: F) where F: FnOnce(&mut MuxerConnection),705434a5d0eSSebastien Boeuf     fn apply_conn_mutation<F>(&mut self, key: ConnMapKey, mut_fn: F)
706434a5d0eSSebastien Boeuf     where
707434a5d0eSSebastien Boeuf         F: FnOnce(&mut MuxerConnection),
708434a5d0eSSebastien Boeuf     {
709434a5d0eSSebastien Boeuf         if let Some(conn) = self.conn_map.get_mut(&key) {
710434a5d0eSSebastien Boeuf             let had_rx = conn.has_pending_rx();
711434a5d0eSSebastien Boeuf             let was_expiring = conn.will_expire();
7129701fde2SSebastien Boeuf             let prev_state = conn.state();
713434a5d0eSSebastien Boeuf 
714434a5d0eSSebastien Boeuf             mut_fn(conn);
715434a5d0eSSebastien Boeuf 
7169701fde2SSebastien Boeuf             // If this is a host-initiated connection that has just become established, we'll have
7179701fde2SSebastien Boeuf             // to send an ack message to the host end.
7189701fde2SSebastien Boeuf             if prev_state == ConnState::LocalInit && conn.state() == ConnState::Established {
719aca2baf4SStefano Garzarella                 let msg = format!("OK {}\n", key.local_port);
720aca2baf4SStefano Garzarella                 match conn.send_bytes_raw(msg.as_bytes()) {
721aca2baf4SStefano Garzarella                     Ok(written) if written == msg.len() => (),
722aca2baf4SStefano Garzarella                     Ok(_) => {
723aca2baf4SStefano Garzarella                         // If we can't write a dozen bytes to a pristine connection something
724aca2baf4SStefano Garzarella                         // must be really wrong. Killing it.
725aca2baf4SStefano Garzarella                         conn.kill();
726aca2baf4SStefano Garzarella                         warn!("vsock: unable to fully write connection ack msg.");
727aca2baf4SStefano Garzarella                     }
728aca2baf4SStefano Garzarella                     Err(err) => {
7299701fde2SSebastien Boeuf                         conn.kill();
7309701fde2SSebastien Boeuf                         warn!("vsock: unable to ack host connection: {:?}", err);
731aca2baf4SStefano Garzarella                     }
732aca2baf4SStefano Garzarella                 };
7339701fde2SSebastien Boeuf             }
7349701fde2SSebastien Boeuf 
735434a5d0eSSebastien Boeuf             // If the connection wasn't previously scheduled for RX, add it to our RX queue.
736434a5d0eSSebastien Boeuf             if !had_rx && conn.has_pending_rx() {
737434a5d0eSSebastien Boeuf                 self.rxq.push(MuxerRx::ConnRx(key));
738434a5d0eSSebastien Boeuf             }
739434a5d0eSSebastien Boeuf 
740434a5d0eSSebastien Boeuf             // If the connection wasn't previously scheduled for termination, add it to the
741434a5d0eSSebastien Boeuf             // kill queue.
742434a5d0eSSebastien Boeuf             if !was_expiring && conn.will_expire() {
743434a5d0eSSebastien Boeuf                 // It's safe to unwrap here, since `conn.will_expire()` already guaranteed that
744434a5d0eSSebastien Boeuf                 // an `conn.expiry` is available.
745434a5d0eSSebastien Boeuf                 self.killq.push(key, conn.expiry().unwrap());
746434a5d0eSSebastien Boeuf             }
747434a5d0eSSebastien Boeuf 
748434a5d0eSSebastien Boeuf             let fd = conn.get_polled_fd();
749434a5d0eSSebastien Boeuf             let new_evset = conn.get_polled_evset();
750434a5d0eSSebastien Boeuf             if new_evset.is_empty() {
751434a5d0eSSebastien Boeuf                 // If the connection no longer needs epoll notifications, remove its listener
752434a5d0eSSebastien Boeuf                 // from our list.
753434a5d0eSSebastien Boeuf                 self.remove_listener(fd);
754434a5d0eSSebastien Boeuf                 return;
755434a5d0eSSebastien Boeuf             }
756434a5d0eSSebastien Boeuf             if let Some(EpollListener::Connection { evset, .. }) = self.listener_map.get_mut(&fd) {
757434a5d0eSSebastien Boeuf                 if *evset != new_evset {
758434a5d0eSSebastien Boeuf                     // If the set of events that the connection is interested in has changed,
759434a5d0eSSebastien Boeuf                     // we need to update its epoll listener.
760434a5d0eSSebastien Boeuf                     debug!(
761434a5d0eSSebastien Boeuf                         "vsock: updating listener for (lp={}, pp={}): old={:?}, new={:?}",
762434a5d0eSSebastien Boeuf                         key.local_port, key.peer_port, *evset, new_evset
763434a5d0eSSebastien Boeuf                     );
764434a5d0eSSebastien Boeuf 
765434a5d0eSSebastien Boeuf                     *evset = new_evset;
766434a5d0eSSebastien Boeuf                     epoll::ctl(
76735782bd9SBo Chen                         self.epoll_file.as_raw_fd(),
768434a5d0eSSebastien Boeuf                         epoll::ControlOptions::EPOLL_CTL_MOD,
769434a5d0eSSebastien Boeuf                         fd,
770434a5d0eSSebastien Boeuf                         epoll::Event::new(new_evset, fd as u64),
771434a5d0eSSebastien Boeuf                     )
772434a5d0eSSebastien Boeuf                     .unwrap_or_else(|err| {
773434a5d0eSSebastien Boeuf                         // This really shouldn't happen, like, ever. However, "famous last
774434a5d0eSSebastien Boeuf                         // words" and all that, so let's just kill it with fire, and walk away.
775434a5d0eSSebastien Boeuf                         self.kill_connection(key);
776434a5d0eSSebastien Boeuf                         error!(
777434a5d0eSSebastien Boeuf                             "vsock: error updating epoll listener for (lp={}, pp={}): {:?}",
778434a5d0eSSebastien Boeuf                             key.local_port, key.peer_port, err
779434a5d0eSSebastien Boeuf                         );
780434a5d0eSSebastien Boeuf                     });
781434a5d0eSSebastien Boeuf                 }
782434a5d0eSSebastien Boeuf             } else {
783434a5d0eSSebastien Boeuf                 // The connection had previously asked to be removed from the listener map (by
784434a5d0eSSebastien Boeuf                 // returning an empty event set via `get_polled_fd()`), but now wants back in.
785434a5d0eSSebastien Boeuf                 self.add_listener(
786434a5d0eSSebastien Boeuf                     fd,
787434a5d0eSSebastien Boeuf                     EpollListener::Connection {
788434a5d0eSSebastien Boeuf                         key,
789434a5d0eSSebastien Boeuf                         evset: new_evset,
790434a5d0eSSebastien Boeuf                     },
791434a5d0eSSebastien Boeuf                 )
792434a5d0eSSebastien Boeuf                 .unwrap_or_else(|err| {
793434a5d0eSSebastien Boeuf                     self.kill_connection(key);
794434a5d0eSSebastien Boeuf                     error!(
795434a5d0eSSebastien Boeuf                         "vsock: error updating epoll listener for (lp={}, pp={}): {:?}",
796434a5d0eSSebastien Boeuf                         key.local_port, key.peer_port, err
797434a5d0eSSebastien Boeuf                     );
798434a5d0eSSebastien Boeuf                 });
799434a5d0eSSebastien Boeuf             }
800434a5d0eSSebastien Boeuf         }
801434a5d0eSSebastien Boeuf     }
802434a5d0eSSebastien Boeuf 
803434a5d0eSSebastien Boeuf     /// Check if any connections have timed out, and if so, schedule them for immediate
804434a5d0eSSebastien Boeuf     /// termination.
805434a5d0eSSebastien Boeuf     ///
sweep_killq(&mut self)806434a5d0eSSebastien Boeuf     fn sweep_killq(&mut self) {
807434a5d0eSSebastien Boeuf         while let Some(key) = self.killq.pop() {
808434a5d0eSSebastien Boeuf             // Connections don't get removed from the kill queue when their kill timer is
809434a5d0eSSebastien Boeuf             // disarmed, since that would be a costly operation. This means we must check if
810434a5d0eSSebastien Boeuf             // the connection has indeed expired, prior to killing it.
811434a5d0eSSebastien Boeuf             let mut kill = false;
812434a5d0eSSebastien Boeuf             self.conn_map
813434a5d0eSSebastien Boeuf                 .entry(key)
814434a5d0eSSebastien Boeuf                 .and_modify(|conn| kill = conn.has_expired());
815434a5d0eSSebastien Boeuf             if kill {
816434a5d0eSSebastien Boeuf                 self.kill_connection(key);
817434a5d0eSSebastien Boeuf             }
818434a5d0eSSebastien Boeuf         }
819434a5d0eSSebastien Boeuf 
820434a5d0eSSebastien Boeuf         if self.killq.is_empty() && !self.killq.is_synced() {
821434a5d0eSSebastien Boeuf             self.killq = MuxerKillQ::from_conn_map(&self.conn_map);
822434a5d0eSSebastien Boeuf             // If we've just re-created the kill queue, we can sweep it again; maybe there's
823434a5d0eSSebastien Boeuf             // more to kill.
824434a5d0eSSebastien Boeuf             self.sweep_killq();
825434a5d0eSSebastien Boeuf         }
826434a5d0eSSebastien Boeuf     }
827434a5d0eSSebastien Boeuf 
828434a5d0eSSebastien Boeuf     /// Enqueue an RST packet into `self.rxq`.
829434a5d0eSSebastien Boeuf     ///
830434a5d0eSSebastien Boeuf     /// Enqueue errors aren't propagated up the call chain, since there is nothing we can do to
831434a5d0eSSebastien Boeuf     /// handle them. We do, however, log a warning, since not being able to enqueue an RST
832434a5d0eSSebastien Boeuf     /// packet means we have to drop it, which is not normal operation.
833434a5d0eSSebastien Boeuf     ///
enq_rst(&mut self, local_port: u32, peer_port: u32)834434a5d0eSSebastien Boeuf     fn enq_rst(&mut self, local_port: u32, peer_port: u32) {
835434a5d0eSSebastien Boeuf         let pushed = self.rxq.push(MuxerRx::RstPkt {
836434a5d0eSSebastien Boeuf             local_port,
837434a5d0eSSebastien Boeuf             peer_port,
838434a5d0eSSebastien Boeuf         });
839434a5d0eSSebastien Boeuf         if !pushed {
840434a5d0eSSebastien Boeuf             warn!(
841434a5d0eSSebastien Boeuf                 "vsock: muxer.rxq full; dropping RST packet for lp={}, pp={}",
842434a5d0eSSebastien Boeuf                 local_port, peer_port
843434a5d0eSSebastien Boeuf             );
844434a5d0eSSebastien Boeuf         }
845434a5d0eSSebastien Boeuf     }
846434a5d0eSSebastien Boeuf }
84779753949SSebastien Boeuf 
84879753949SSebastien Boeuf #[cfg(test)]
84979753949SSebastien Boeuf mod tests {
85088a9f799SRob Bradford     use std::io::Write;
85188a9f799SRob Bradford     use std::path::{Path, PathBuf};
85288a9f799SRob Bradford 
85388a9f799SRob Bradford     use virtio_queue::QueueOwnedT;
85488a9f799SRob Bradford 
85579753949SSebastien Boeuf     use super::super::super::csm::defs as csm_defs;
85679753949SSebastien Boeuf     use super::super::super::tests::TestContext as VsockTestContext;
85779753949SSebastien Boeuf     use super::*;
85879753949SSebastien Boeuf 
859451d3fb2SAlyssa Ross     const PEER_CID: u32 = 3;
86079753949SSebastien Boeuf     const PEER_BUF_ALLOC: u32 = 64 * 1024;
86179753949SSebastien Boeuf 
86279753949SSebastien Boeuf     struct MuxerTestContext {
86379753949SSebastien Boeuf         _vsock_test_ctx: VsockTestContext,
86479753949SSebastien Boeuf         pkt: VsockPacket,
86579753949SSebastien Boeuf         muxer: VsockMuxer,
86679753949SSebastien Boeuf     }
86779753949SSebastien Boeuf 
86879753949SSebastien Boeuf     impl Drop for MuxerTestContext {
drop(&mut self)86979753949SSebastien Boeuf         fn drop(&mut self) {
87079753949SSebastien Boeuf             std::fs::remove_file(self.muxer.host_sock_path.as_str()).unwrap();
87179753949SSebastien Boeuf         }
87279753949SSebastien Boeuf     }
87379753949SSebastien Boeuf 
87479753949SSebastien Boeuf     impl MuxerTestContext {
new(name: &str) -> Self87579753949SSebastien Boeuf         fn new(name: &str) -> Self {
87679753949SSebastien Boeuf             let vsock_test_ctx = VsockTestContext::new();
87779753949SSebastien Boeuf             let mut handler_ctx = vsock_test_ctx.create_epoll_handler_context();
87879753949SSebastien Boeuf             let pkt = VsockPacket::from_rx_virtq_head(
8790249e864SSebastien Boeuf                 &mut handler_ctx.handler.queues[0]
880a423bf13SSebastien Boeuf                     .iter(&vsock_test_ctx.mem)
8810249e864SSebastien Boeuf                     .unwrap()
88279753949SSebastien Boeuf                     .next()
88379753949SSebastien Boeuf                     .unwrap(),
884e2225bb4SSebastien Boeuf                 None,
88579753949SSebastien Boeuf             )
88679753949SSebastien Boeuf             .unwrap();
8875e527294SRob Bradford             let uds_path = format!("test_vsock_{name}.sock");
88879753949SSebastien Boeuf             let muxer = VsockMuxer::new(PEER_CID, uds_path).unwrap();
88979753949SSebastien Boeuf 
89079753949SSebastien Boeuf             Self {
89179753949SSebastien Boeuf                 _vsock_test_ctx: vsock_test_ctx,
89279753949SSebastien Boeuf                 pkt,
89379753949SSebastien Boeuf                 muxer,
89479753949SSebastien Boeuf             }
89579753949SSebastien Boeuf         }
89679753949SSebastien Boeuf 
init_pkt(&mut self, local_port: u32, peer_port: u32, op: u16) -> &mut VsockPacket89779753949SSebastien Boeuf         fn init_pkt(&mut self, local_port: u32, peer_port: u32, op: u16) -> &mut VsockPacket {
89879753949SSebastien Boeuf             for b in self.pkt.hdr_mut() {
89979753949SSebastien Boeuf                 *b = 0;
90079753949SSebastien Boeuf             }
90179753949SSebastien Boeuf             self.pkt
90279753949SSebastien Boeuf                 .set_type(uapi::VSOCK_TYPE_STREAM)
903451d3fb2SAlyssa Ross                 .set_src_cid(PEER_CID.into())
90479753949SSebastien Boeuf                 .set_dst_cid(uapi::VSOCK_HOST_CID)
90579753949SSebastien Boeuf                 .set_src_port(peer_port)
90679753949SSebastien Boeuf                 .set_dst_port(local_port)
90779753949SSebastien Boeuf                 .set_op(op)
90879753949SSebastien Boeuf                 .set_buf_alloc(PEER_BUF_ALLOC)
90979753949SSebastien Boeuf         }
91079753949SSebastien Boeuf 
init_data_pkt( &mut self, local_port: u32, peer_port: u32, data: &[u8], ) -> &mut VsockPacket91179753949SSebastien Boeuf         fn init_data_pkt(
91279753949SSebastien Boeuf             &mut self,
91379753949SSebastien Boeuf             local_port: u32,
91479753949SSebastien Boeuf             peer_port: u32,
91579753949SSebastien Boeuf             data: &[u8],
91679753949SSebastien Boeuf         ) -> &mut VsockPacket {
917a9ec0f33SBo Chen             assert!(data.len() <= self.pkt.buf().unwrap().len());
91879753949SSebastien Boeuf             self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RW)
91979753949SSebastien Boeuf                 .set_len(data.len() as u32);
92079753949SSebastien Boeuf             self.pkt.buf_mut().unwrap()[..data.len()].copy_from_slice(data);
92179753949SSebastien Boeuf             &mut self.pkt
92279753949SSebastien Boeuf         }
92379753949SSebastien Boeuf 
send(&mut self)92479753949SSebastien Boeuf         fn send(&mut self) {
92579753949SSebastien Boeuf             self.muxer.send_pkt(&self.pkt).unwrap();
92679753949SSebastien Boeuf         }
92779753949SSebastien Boeuf 
recv(&mut self)92879753949SSebastien Boeuf         fn recv(&mut self) {
92979753949SSebastien Boeuf             self.muxer.recv_pkt(&mut self.pkt).unwrap();
93079753949SSebastien Boeuf         }
93179753949SSebastien Boeuf 
notify_muxer(&mut self)93279753949SSebastien Boeuf         fn notify_muxer(&mut self) {
93379753949SSebastien Boeuf             self.muxer.notify(epoll::Events::EPOLLIN);
93479753949SSebastien Boeuf         }
93579753949SSebastien Boeuf 
count_epoll_listeners(&self) -> (usize, usize)93679753949SSebastien Boeuf         fn count_epoll_listeners(&self) -> (usize, usize) {
93779753949SSebastien Boeuf             let mut local_lsn_count = 0usize;
93879753949SSebastien Boeuf             let mut conn_lsn_count = 0usize;
93979753949SSebastien Boeuf             for key in self.muxer.listener_map.values() {
94079753949SSebastien Boeuf                 match key {
94179753949SSebastien Boeuf                     EpollListener::LocalStream(_) => local_lsn_count += 1,
94279753949SSebastien Boeuf                     EpollListener::Connection { .. } => conn_lsn_count += 1,
94379753949SSebastien Boeuf                     _ => (),
94479753949SSebastien Boeuf                 };
94579753949SSebastien Boeuf             }
94679753949SSebastien Boeuf             (local_lsn_count, conn_lsn_count)
94779753949SSebastien Boeuf         }
94879753949SSebastien Boeuf 
create_local_listener(&self, port: u32) -> LocalListener94979753949SSebastien Boeuf         fn create_local_listener(&self, port: u32) -> LocalListener {
95079753949SSebastien Boeuf             LocalListener::new(format!("{}_{}", self.muxer.host_sock_path, port))
95179753949SSebastien Boeuf         }
95279753949SSebastien Boeuf 
local_connect(&mut self, peer_port: u32) -> (UnixStream, u32)95379753949SSebastien Boeuf         fn local_connect(&mut self, peer_port: u32) -> (UnixStream, u32) {
95479753949SSebastien Boeuf             let (init_local_lsn_count, init_conn_lsn_count) = self.count_epoll_listeners();
95579753949SSebastien Boeuf 
95679753949SSebastien Boeuf             let mut stream = UnixStream::connect(self.muxer.host_sock_path.clone()).unwrap();
95779753949SSebastien Boeuf             stream.set_nonblocking(true).unwrap();
95879753949SSebastien Boeuf             // The muxer would now get notified of a new connection having arrived at its Unix
95979753949SSebastien Boeuf             // socket, so it can accept it.
96079753949SSebastien Boeuf             self.notify_muxer();
96179753949SSebastien Boeuf 
96279753949SSebastien Boeuf             // Just after having accepted a new local connection, the muxer should've added a new
96379753949SSebastien Boeuf             // `LocalStream` listener to its `listener_map`.
96479753949SSebastien Boeuf             let (local_lsn_count, _) = self.count_epoll_listeners();
96579753949SSebastien Boeuf             assert_eq!(local_lsn_count, init_local_lsn_count + 1);
96679753949SSebastien Boeuf 
9675e527294SRob Bradford             let buf = format!("CONNECT {peer_port}\n");
96879753949SSebastien Boeuf             stream.write_all(buf.as_bytes()).unwrap();
96979753949SSebastien Boeuf             // The muxer would now get notified that data is available for reading from the locally
97079753949SSebastien Boeuf             // initiated connection.
97179753949SSebastien Boeuf             self.notify_muxer();
97279753949SSebastien Boeuf 
97379753949SSebastien Boeuf             // Successfully reading and parsing the connection request should have removed the
97479753949SSebastien Boeuf             // LocalStream epoll listener and added a Connection epoll listener.
97579753949SSebastien Boeuf             let (local_lsn_count, conn_lsn_count) = self.count_epoll_listeners();
97679753949SSebastien Boeuf             assert_eq!(local_lsn_count, init_local_lsn_count);
97779753949SSebastien Boeuf             assert_eq!(conn_lsn_count, init_conn_lsn_count + 1);
97879753949SSebastien Boeuf 
97979753949SSebastien Boeuf             // A LocalInit connection should've been added to the muxer connection map.  A new
98079753949SSebastien Boeuf             // local port should also have been allocated for the new LocalInit connection.
98179753949SSebastien Boeuf             let local_port = self.muxer.local_port_last;
98279753949SSebastien Boeuf             let key = ConnMapKey {
98379753949SSebastien Boeuf                 local_port,
98479753949SSebastien Boeuf                 peer_port,
98579753949SSebastien Boeuf             };
98679753949SSebastien Boeuf             assert!(self.muxer.conn_map.contains_key(&key));
98779753949SSebastien Boeuf             assert!(self.muxer.local_port_set.contains(&local_port));
98879753949SSebastien Boeuf 
98979753949SSebastien Boeuf             // A connection request for the peer should now be available from the muxer.
99079753949SSebastien Boeuf             assert!(self.muxer.has_pending_rx());
99179753949SSebastien Boeuf             self.recv();
99279753949SSebastien Boeuf             assert_eq!(self.pkt.op(), uapi::VSOCK_OP_REQUEST);
99379753949SSebastien Boeuf             assert_eq!(self.pkt.dst_port(), peer_port);
99479753949SSebastien Boeuf             assert_eq!(self.pkt.src_port(), local_port);
99579753949SSebastien Boeuf 
99679753949SSebastien Boeuf             self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RESPONSE);
99779753949SSebastien Boeuf             self.send();
99879753949SSebastien Boeuf 
999d0dbc7fbSYu Li             let mut buf = [0u8; 32];
10009701fde2SSebastien Boeuf             let len = stream.read(&mut buf[..]).unwrap();
10015e527294SRob Bradford             assert_eq!(&buf[..len], format!("OK {local_port}\n").as_bytes());
10029701fde2SSebastien Boeuf 
100379753949SSebastien Boeuf             (stream, local_port)
100479753949SSebastien Boeuf         }
100579753949SSebastien Boeuf     }
100679753949SSebastien Boeuf 
100779753949SSebastien Boeuf     struct LocalListener {
100879753949SSebastien Boeuf         path: PathBuf,
100979753949SSebastien Boeuf         sock: UnixListener,
101079753949SSebastien Boeuf     }
101179753949SSebastien Boeuf     impl LocalListener {
new<P: AsRef<Path> + Clone>(path: P) -> Self101279753949SSebastien Boeuf         fn new<P: AsRef<Path> + Clone>(path: P) -> Self {
1013cf86ca15SRob Bradford             let path_buf = path.as_ref().to_path_buf();
101479753949SSebastien Boeuf             let sock = UnixListener::bind(path).unwrap();
101579753949SSebastien Boeuf             sock.set_nonblocking(true).unwrap();
101679753949SSebastien Boeuf             Self {
101779753949SSebastien Boeuf                 path: path_buf,
101879753949SSebastien Boeuf                 sock,
101979753949SSebastien Boeuf             }
102079753949SSebastien Boeuf         }
accept(&mut self) -> UnixStream102179753949SSebastien Boeuf         fn accept(&mut self) -> UnixStream {
102279753949SSebastien Boeuf             let (stream, _) = self.sock.accept().unwrap();
102379753949SSebastien Boeuf             stream.set_nonblocking(true).unwrap();
102479753949SSebastien Boeuf             stream
102579753949SSebastien Boeuf         }
102679753949SSebastien Boeuf     }
102779753949SSebastien Boeuf     impl Drop for LocalListener {
drop(&mut self)102879753949SSebastien Boeuf         fn drop(&mut self) {
102979753949SSebastien Boeuf             std::fs::remove_file(&self.path).unwrap();
103079753949SSebastien Boeuf         }
103179753949SSebastien Boeuf     }
103279753949SSebastien Boeuf 
103379753949SSebastien Boeuf     #[test]
test_muxer_epoll_listener()103479753949SSebastien Boeuf     fn test_muxer_epoll_listener() {
103579753949SSebastien Boeuf         let ctx = MuxerTestContext::new("muxer_epoll_listener");
103635782bd9SBo Chen         assert_eq!(ctx.muxer.get_polled_fd(), ctx.muxer.epoll_file.as_raw_fd());
103779753949SSebastien Boeuf         assert_eq!(ctx.muxer.get_polled_evset(), epoll::Events::EPOLLIN);
103879753949SSebastien Boeuf     }
103979753949SSebastien Boeuf 
104079753949SSebastien Boeuf     #[test]
test_bad_peer_pkt()104179753949SSebastien Boeuf     fn test_bad_peer_pkt() {
104279753949SSebastien Boeuf         const LOCAL_PORT: u32 = 1026;
104379753949SSebastien Boeuf         const PEER_PORT: u32 = 1025;
104479753949SSebastien Boeuf         const SOCK_DGRAM: u16 = 2;
104579753949SSebastien Boeuf 
104679753949SSebastien Boeuf         let mut ctx = MuxerTestContext::new("bad_peer_pkt");
104779753949SSebastien Boeuf         ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST)
104879753949SSebastien Boeuf             .set_type(SOCK_DGRAM);
104979753949SSebastien Boeuf         ctx.send();
105079753949SSebastien Boeuf 
105179753949SSebastien Boeuf         // The guest sent a SOCK_DGRAM packet. Per the vsock spec, we need to reply with an RST
105279753949SSebastien Boeuf         // packet, since vsock only supports stream sockets.
105379753949SSebastien Boeuf         assert!(ctx.muxer.has_pending_rx());
105479753949SSebastien Boeuf         ctx.recv();
105579753949SSebastien Boeuf         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
105679753949SSebastien Boeuf         assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID);
1057451d3fb2SAlyssa Ross         assert_eq!(ctx.pkt.dst_cid(), PEER_CID as u64);
105879753949SSebastien Boeuf         assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
105979753949SSebastien Boeuf         assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
106079753949SSebastien Boeuf 
106179753949SSebastien Boeuf         // Any orphan (i.e. without a connection), non-RST packet, should be replied to with an
106279753949SSebastien Boeuf         // RST.
106379753949SSebastien Boeuf         let bad_ops = [
106479753949SSebastien Boeuf             uapi::VSOCK_OP_RESPONSE,
106579753949SSebastien Boeuf             uapi::VSOCK_OP_CREDIT_REQUEST,
106679753949SSebastien Boeuf             uapi::VSOCK_OP_CREDIT_UPDATE,
106779753949SSebastien Boeuf             uapi::VSOCK_OP_SHUTDOWN,
106879753949SSebastien Boeuf             uapi::VSOCK_OP_RW,
106979753949SSebastien Boeuf         ];
107079753949SSebastien Boeuf         for op in bad_ops.iter() {
107179753949SSebastien Boeuf             ctx.init_pkt(LOCAL_PORT, PEER_PORT, *op);
107279753949SSebastien Boeuf             ctx.send();
107379753949SSebastien Boeuf             assert!(ctx.muxer.has_pending_rx());
107479753949SSebastien Boeuf             ctx.recv();
107579753949SSebastien Boeuf             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
107679753949SSebastien Boeuf             assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
107779753949SSebastien Boeuf             assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
107879753949SSebastien Boeuf         }
107979753949SSebastien Boeuf 
108079753949SSebastien Boeuf         // Any packet addressed to anything other than VSOCK_VHOST_CID should get dropped.
108179753949SSebastien Boeuf         assert!(!ctx.muxer.has_pending_rx());
108279753949SSebastien Boeuf         ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST)
108379753949SSebastien Boeuf             .set_dst_cid(uapi::VSOCK_HOST_CID + 1);
108479753949SSebastien Boeuf         ctx.send();
108579753949SSebastien Boeuf         assert!(!ctx.muxer.has_pending_rx());
108679753949SSebastien Boeuf     }
108779753949SSebastien Boeuf 
108879753949SSebastien Boeuf     #[test]
test_peer_connection()108979753949SSebastien Boeuf     fn test_peer_connection() {
109079753949SSebastien Boeuf         const LOCAL_PORT: u32 = 1026;
109179753949SSebastien Boeuf         const PEER_PORT: u32 = 1025;
109279753949SSebastien Boeuf 
109379753949SSebastien Boeuf         let mut ctx = MuxerTestContext::new("peer_connection");
109479753949SSebastien Boeuf 
109579753949SSebastien Boeuf         // Test peer connection refused.
109679753949SSebastien Boeuf         ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST);
109779753949SSebastien Boeuf         ctx.send();
109879753949SSebastien Boeuf         ctx.recv();
109979753949SSebastien Boeuf         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
110079753949SSebastien Boeuf         assert_eq!(ctx.pkt.len(), 0);
110179753949SSebastien Boeuf         assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID);
1102451d3fb2SAlyssa Ross         assert_eq!(ctx.pkt.dst_cid(), PEER_CID as u64);
110379753949SSebastien Boeuf         assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
110479753949SSebastien Boeuf         assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
110579753949SSebastien Boeuf 
110679753949SSebastien Boeuf         // Test peer connection accepted.
110779753949SSebastien Boeuf         let mut listener = ctx.create_local_listener(LOCAL_PORT);
110879753949SSebastien Boeuf         ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST);
110979753949SSebastien Boeuf         ctx.send();
111079753949SSebastien Boeuf         assert_eq!(ctx.muxer.conn_map.len(), 1);
111179753949SSebastien Boeuf         let mut stream = listener.accept();
111279753949SSebastien Boeuf         ctx.recv();
111379753949SSebastien Boeuf         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
111479753949SSebastien Boeuf         assert_eq!(ctx.pkt.len(), 0);
111579753949SSebastien Boeuf         assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID);
1116451d3fb2SAlyssa Ross         assert_eq!(ctx.pkt.dst_cid(), PEER_CID as u64);
111779753949SSebastien Boeuf         assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
111879753949SSebastien Boeuf         assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
111979753949SSebastien Boeuf         let key = ConnMapKey {
112079753949SSebastien Boeuf             local_port: LOCAL_PORT,
112179753949SSebastien Boeuf             peer_port: PEER_PORT,
112279753949SSebastien Boeuf         };
112379753949SSebastien Boeuf         assert!(ctx.muxer.conn_map.contains_key(&key));
112479753949SSebastien Boeuf 
112579753949SSebastien Boeuf         // Test guest -> host data flow.
112679753949SSebastien Boeuf         let data = [1, 2, 3, 4];
112779753949SSebastien Boeuf         ctx.init_data_pkt(LOCAL_PORT, PEER_PORT, &data);
112879753949SSebastien Boeuf         ctx.send();
112979753949SSebastien Boeuf         let mut buf = vec![0; data.len()];
113079753949SSebastien Boeuf         stream.read_exact(buf.as_mut_slice()).unwrap();
113179753949SSebastien Boeuf         assert_eq!(buf.as_slice(), data);
113279753949SSebastien Boeuf 
113379753949SSebastien Boeuf         // Test host -> guest data flow.
113479753949SSebastien Boeuf         let data = [5u8, 6, 7, 8];
113579753949SSebastien Boeuf         stream.write_all(&data).unwrap();
113679753949SSebastien Boeuf 
113779753949SSebastien Boeuf         // When data is available on the local stream, an EPOLLIN event would normally be delivered
113879753949SSebastien Boeuf         // to the muxer's nested epoll FD. For testing only, we can fake that event notification
113979753949SSebastien Boeuf         // here.
114079753949SSebastien Boeuf         ctx.notify_muxer();
114179753949SSebastien Boeuf         // After being notified, the muxer should've figured out that RX data was available for one
114279753949SSebastien Boeuf         // of its connections, so it should now be reporting that it can fill in an RX packet.
114379753949SSebastien Boeuf         assert!(ctx.muxer.has_pending_rx());
114479753949SSebastien Boeuf         ctx.recv();
114579753949SSebastien Boeuf         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW);
114679753949SSebastien Boeuf         assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data);
114779753949SSebastien Boeuf         assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
114879753949SSebastien Boeuf         assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
114979753949SSebastien Boeuf 
115079753949SSebastien Boeuf         assert!(!ctx.muxer.has_pending_rx());
115179753949SSebastien Boeuf     }
115279753949SSebastien Boeuf 
115379753949SSebastien Boeuf     #[test]
test_local_connection()115479753949SSebastien Boeuf     fn test_local_connection() {
115579753949SSebastien Boeuf         let mut ctx = MuxerTestContext::new("local_connection");
115679753949SSebastien Boeuf         let peer_port = 1025;
115779753949SSebastien Boeuf         let (mut stream, local_port) = ctx.local_connect(peer_port);
115879753949SSebastien Boeuf 
115979753949SSebastien Boeuf         // Test guest -> host data flow.
116079753949SSebastien Boeuf         let data = [1, 2, 3, 4];
116179753949SSebastien Boeuf         ctx.init_data_pkt(local_port, peer_port, &data);
116279753949SSebastien Boeuf         ctx.send();
116379753949SSebastien Boeuf 
116479753949SSebastien Boeuf         let mut buf = vec![0u8; data.len()];
116579753949SSebastien Boeuf         stream.read_exact(buf.as_mut_slice()).unwrap();
116679753949SSebastien Boeuf         assert_eq!(buf.as_slice(), &data);
116779753949SSebastien Boeuf 
116879753949SSebastien Boeuf         // Test host -> guest data flow.
116979753949SSebastien Boeuf         let data = [5, 6, 7, 8];
117079753949SSebastien Boeuf         stream.write_all(&data).unwrap();
117179753949SSebastien Boeuf         ctx.notify_muxer();
117279753949SSebastien Boeuf 
117379753949SSebastien Boeuf         assert!(ctx.muxer.has_pending_rx());
117479753949SSebastien Boeuf         ctx.recv();
117579753949SSebastien Boeuf         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW);
117679753949SSebastien Boeuf         assert_eq!(ctx.pkt.src_port(), local_port);
117779753949SSebastien Boeuf         assert_eq!(ctx.pkt.dst_port(), peer_port);
117879753949SSebastien Boeuf         assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data);
117979753949SSebastien Boeuf     }
118079753949SSebastien Boeuf 
118179753949SSebastien Boeuf     #[test]
test_local_close()118279753949SSebastien Boeuf     fn test_local_close() {
118379753949SSebastien Boeuf         let peer_port = 1025;
118479753949SSebastien Boeuf         let mut ctx = MuxerTestContext::new("local_close");
118579753949SSebastien Boeuf         let local_port;
118679753949SSebastien Boeuf         {
118779753949SSebastien Boeuf             let (_stream, local_port_) = ctx.local_connect(peer_port);
118879753949SSebastien Boeuf             local_port = local_port_;
118979753949SSebastien Boeuf         }
119079753949SSebastien Boeuf         // Local var `_stream` was now dropped, thus closing the local stream. After the muxer gets
119179753949SSebastien Boeuf         // notified via EPOLLIN, it should attempt to gracefully shutdown the connection, issuing a
119279753949SSebastien Boeuf         // VSOCK_OP_SHUTDOWN with both no-more-send and no-more-recv indications set.
119379753949SSebastien Boeuf         ctx.notify_muxer();
119479753949SSebastien Boeuf         assert!(ctx.muxer.has_pending_rx());
119579753949SSebastien Boeuf         ctx.recv();
119679753949SSebastien Boeuf         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN);
119779753949SSebastien Boeuf         assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND, 0);
119879753949SSebastien Boeuf         assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV, 0);
119979753949SSebastien Boeuf         assert_eq!(ctx.pkt.src_port(), local_port);
120079753949SSebastien Boeuf         assert_eq!(ctx.pkt.dst_port(), peer_port);
120179753949SSebastien Boeuf 
120279753949SSebastien Boeuf         // The connection should get removed (and its local port freed), after the peer replies
120379753949SSebastien Boeuf         // with an RST.
120479753949SSebastien Boeuf         ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RST);
120579753949SSebastien Boeuf         ctx.send();
120679753949SSebastien Boeuf         let key = ConnMapKey {
120779753949SSebastien Boeuf             local_port,
120879753949SSebastien Boeuf             peer_port,
120979753949SSebastien Boeuf         };
121079753949SSebastien Boeuf         assert!(!ctx.muxer.conn_map.contains_key(&key));
121179753949SSebastien Boeuf         assert!(!ctx.muxer.local_port_set.contains(&local_port));
121279753949SSebastien Boeuf     }
121379753949SSebastien Boeuf 
121479753949SSebastien Boeuf     #[test]
test_peer_close()121579753949SSebastien Boeuf     fn test_peer_close() {
121679753949SSebastien Boeuf         let peer_port = 1025;
121779753949SSebastien Boeuf         let local_port = 1026;
121879753949SSebastien Boeuf         let mut ctx = MuxerTestContext::new("peer_close");
121979753949SSebastien Boeuf 
122079753949SSebastien Boeuf         let mut sock = ctx.create_local_listener(local_port);
122179753949SSebastien Boeuf         ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_REQUEST);
122279753949SSebastien Boeuf         ctx.send();
122379753949SSebastien Boeuf         let mut stream = sock.accept();
122479753949SSebastien Boeuf 
122579753949SSebastien Boeuf         assert!(ctx.muxer.has_pending_rx());
122679753949SSebastien Boeuf         ctx.recv();
122779753949SSebastien Boeuf         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
122879753949SSebastien Boeuf         assert_eq!(ctx.pkt.src_port(), local_port);
122979753949SSebastien Boeuf         assert_eq!(ctx.pkt.dst_port(), peer_port);
123079753949SSebastien Boeuf         let key = ConnMapKey {
123179753949SSebastien Boeuf             local_port,
123279753949SSebastien Boeuf             peer_port,
123379753949SSebastien Boeuf         };
123479753949SSebastien Boeuf         assert!(ctx.muxer.conn_map.contains_key(&key));
123579753949SSebastien Boeuf 
123679753949SSebastien Boeuf         // Emulate a full shutdown from the peer (no-more-send + no-more-recv).
123779753949SSebastien Boeuf         ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_SHUTDOWN)
123879753949SSebastien Boeuf             .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND)
123979753949SSebastien Boeuf             .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV);
124079753949SSebastien Boeuf         ctx.send();
124179753949SSebastien Boeuf 
124279753949SSebastien Boeuf         // Now, the muxer should remove the connection from its map, and reply with an RST.
124379753949SSebastien Boeuf         assert!(ctx.muxer.has_pending_rx());
124479753949SSebastien Boeuf         ctx.recv();
124579753949SSebastien Boeuf         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
124679753949SSebastien Boeuf         assert_eq!(ctx.pkt.src_port(), local_port);
124779753949SSebastien Boeuf         assert_eq!(ctx.pkt.dst_port(), peer_port);
124879753949SSebastien Boeuf         let key = ConnMapKey {
124979753949SSebastien Boeuf             local_port,
125079753949SSebastien Boeuf             peer_port,
125179753949SSebastien Boeuf         };
125279753949SSebastien Boeuf         assert!(!ctx.muxer.conn_map.contains_key(&key));
125379753949SSebastien Boeuf 
125479753949SSebastien Boeuf         // The muxer should also drop / close the local Unix socket for this connection.
125579753949SSebastien Boeuf         let mut buf = vec![0u8; 16];
125679753949SSebastien Boeuf         assert_eq!(stream.read(buf.as_mut_slice()).unwrap(), 0);
125779753949SSebastien Boeuf     }
125879753949SSebastien Boeuf 
125979753949SSebastien Boeuf     #[test]
test_muxer_rxq()126079753949SSebastien Boeuf     fn test_muxer_rxq() {
126179753949SSebastien Boeuf         let mut ctx = MuxerTestContext::new("muxer_rxq");
126279753949SSebastien Boeuf         let local_port = 1026;
126379753949SSebastien Boeuf         let peer_port_first = 1025;
126479753949SSebastien Boeuf         let mut listener = ctx.create_local_listener(local_port);
126579753949SSebastien Boeuf         let mut streams: Vec<UnixStream> = Vec::new();
126679753949SSebastien Boeuf 
126779753949SSebastien Boeuf         for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE {
126879753949SSebastien Boeuf             ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST);
126979753949SSebastien Boeuf             ctx.send();
127079753949SSebastien Boeuf             streams.push(listener.accept());
127179753949SSebastien Boeuf         }
127279753949SSebastien Boeuf 
12735c3f4dbeSJosh Soref         // The muxer RX queue should now be full (with connection responses), but still
127479753949SSebastien Boeuf         // synchronized.
127579753949SSebastien Boeuf         assert!(ctx.muxer.rxq.is_synced());
127679753949SSebastien Boeuf 
127779753949SSebastien Boeuf         // One more queued reply should desync the RX queue.
127879753949SSebastien Boeuf         ctx.init_pkt(
127979753949SSebastien Boeuf             local_port,
128079753949SSebastien Boeuf             (peer_port_first + defs::MUXER_RXQ_SIZE) as u32,
128179753949SSebastien Boeuf             uapi::VSOCK_OP_REQUEST,
128279753949SSebastien Boeuf         );
128379753949SSebastien Boeuf         ctx.send();
128479753949SSebastien Boeuf         assert!(!ctx.muxer.rxq.is_synced());
128579753949SSebastien Boeuf 
128679753949SSebastien Boeuf         // With an out-of-sync queue, an RST should evict any non-RST packet from the queue, and
128779753949SSebastien Boeuf         // take its place. We'll check that by making sure that the last packet popped from the
128879753949SSebastien Boeuf         // queue is an RST.
128979753949SSebastien Boeuf         ctx.init_pkt(
129079753949SSebastien Boeuf             local_port + 1,
129179753949SSebastien Boeuf             peer_port_first as u32,
129279753949SSebastien Boeuf             uapi::VSOCK_OP_REQUEST,
129379753949SSebastien Boeuf         );
129479753949SSebastien Boeuf         ctx.send();
129579753949SSebastien Boeuf 
129679753949SSebastien Boeuf         for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE - 1 {
129779753949SSebastien Boeuf             ctx.recv();
129879753949SSebastien Boeuf             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
129979753949SSebastien Boeuf             // The response order should hold. The evicted response should have been the last
130079753949SSebastien Boeuf             // enqueued.
130179753949SSebastien Boeuf             assert_eq!(ctx.pkt.dst_port(), peer_port as u32);
130279753949SSebastien Boeuf         }
130379753949SSebastien Boeuf         // There should be one more packet in the queue: the RST.
130479753949SSebastien Boeuf         assert_eq!(ctx.muxer.rxq.len(), 1);
130579753949SSebastien Boeuf         ctx.recv();
130679753949SSebastien Boeuf         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
130779753949SSebastien Boeuf 
130879753949SSebastien Boeuf         // The queue should now be empty, but out-of-sync, so the muxer should report it has some
130979753949SSebastien Boeuf         // pending RX.
131079753949SSebastien Boeuf         assert!(ctx.muxer.rxq.is_empty());
131179753949SSebastien Boeuf         assert!(!ctx.muxer.rxq.is_synced());
131279753949SSebastien Boeuf         assert!(ctx.muxer.has_pending_rx());
131379753949SSebastien Boeuf 
131479753949SSebastien Boeuf         // The next recv should sync the queue back up. It should also yield one of the two
131579753949SSebastien Boeuf         // responses that are still left:
131679753949SSebastien Boeuf         // - the one that desynchronized the queue; and
131779753949SSebastien Boeuf         // - the one that got evicted by the RST.
131879753949SSebastien Boeuf         ctx.recv();
131979753949SSebastien Boeuf         assert!(ctx.muxer.rxq.is_synced());
132079753949SSebastien Boeuf         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
132179753949SSebastien Boeuf 
132279753949SSebastien Boeuf         assert!(ctx.muxer.has_pending_rx());
132379753949SSebastien Boeuf         ctx.recv();
132479753949SSebastien Boeuf         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
132579753949SSebastien Boeuf     }
132679753949SSebastien Boeuf 
132779753949SSebastien Boeuf     #[test]
test_muxer_killq()132879753949SSebastien Boeuf     fn test_muxer_killq() {
132979753949SSebastien Boeuf         let mut ctx = MuxerTestContext::new("muxer_killq");
133079753949SSebastien Boeuf         let local_port = 1026;
133179753949SSebastien Boeuf         let peer_port_first = 1025;
133279753949SSebastien Boeuf         let peer_port_last = peer_port_first + defs::MUXER_KILLQ_SIZE;
133379753949SSebastien Boeuf         let mut listener = ctx.create_local_listener(local_port);
133479753949SSebastien Boeuf 
133579753949SSebastien Boeuf         for peer_port in peer_port_first..=peer_port_last {
133679753949SSebastien Boeuf             ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST);
133779753949SSebastien Boeuf             ctx.send();
133879753949SSebastien Boeuf             ctx.notify_muxer();
133979753949SSebastien Boeuf             ctx.recv();
134079753949SSebastien Boeuf             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
134179753949SSebastien Boeuf             assert_eq!(ctx.pkt.src_port(), local_port);
134279753949SSebastien Boeuf             assert_eq!(ctx.pkt.dst_port(), peer_port as u32);
134379753949SSebastien Boeuf             {
134479753949SSebastien Boeuf                 let _stream = listener.accept();
134579753949SSebastien Boeuf             }
134679753949SSebastien Boeuf             ctx.notify_muxer();
134779753949SSebastien Boeuf             ctx.recv();
134879753949SSebastien Boeuf             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN);
134979753949SSebastien Boeuf             assert_eq!(ctx.pkt.src_port(), local_port);
135079753949SSebastien Boeuf             assert_eq!(ctx.pkt.dst_port(), peer_port as u32);
135179753949SSebastien Boeuf             // The kill queue should be synchronized, up until the `defs::MUXER_KILLQ_SIZE`th
135279753949SSebastien Boeuf             // connection we schedule for termination.
135379753949SSebastien Boeuf             assert_eq!(
135479753949SSebastien Boeuf                 ctx.muxer.killq.is_synced(),
135579753949SSebastien Boeuf                 peer_port < peer_port_first + defs::MUXER_KILLQ_SIZE
135679753949SSebastien Boeuf             );
135779753949SSebastien Boeuf         }
135879753949SSebastien Boeuf 
135979753949SSebastien Boeuf         assert!(!ctx.muxer.killq.is_synced());
136079753949SSebastien Boeuf         assert!(!ctx.muxer.has_pending_rx());
136179753949SSebastien Boeuf 
136279753949SSebastien Boeuf         // Wait for the kill timers to expire.
136379753949SSebastien Boeuf         std::thread::sleep(std::time::Duration::from_millis(
136479753949SSebastien Boeuf             csm_defs::CONN_SHUTDOWN_TIMEOUT_MS,
136579753949SSebastien Boeuf         ));
136679753949SSebastien Boeuf 
136779753949SSebastien Boeuf         // Trigger a kill queue sweep, by requesting a new connection.
136879753949SSebastien Boeuf         ctx.init_pkt(
136979753949SSebastien Boeuf             local_port,
137079753949SSebastien Boeuf             peer_port_last as u32 + 1,
137179753949SSebastien Boeuf             uapi::VSOCK_OP_REQUEST,
137279753949SSebastien Boeuf         );
137379753949SSebastien Boeuf         ctx.send();
137479753949SSebastien Boeuf 
137579753949SSebastien Boeuf         // After sweeping the kill queue, it should now be synced (assuming the RX queue is larger
137679753949SSebastien Boeuf         // than the kill queue, since an RST packet will be queued for each killed connection).
137779753949SSebastien Boeuf         assert!(ctx.muxer.killq.is_synced());
137879753949SSebastien Boeuf         assert!(ctx.muxer.has_pending_rx());
137979753949SSebastien Boeuf         // There should be `defs::MUXER_KILLQ_SIZE` RSTs in the RX queue, from terminating the
138079753949SSebastien Boeuf         // dying connections in the recent killq sweep.
138179753949SSebastien Boeuf         for _p in peer_port_first..peer_port_last {
138279753949SSebastien Boeuf             ctx.recv();
138379753949SSebastien Boeuf             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
138479753949SSebastien Boeuf             assert_eq!(ctx.pkt.src_port(), local_port);
138579753949SSebastien Boeuf         }
138679753949SSebastien Boeuf 
138779753949SSebastien Boeuf         // There should be one more packet in the RX queue: the connection response our request
138879753949SSebastien Boeuf         // that triggered the kill queue sweep.
138979753949SSebastien Boeuf         ctx.recv();
139079753949SSebastien Boeuf         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
139179753949SSebastien Boeuf         assert_eq!(ctx.pkt.dst_port(), peer_port_last as u32 + 1);
139279753949SSebastien Boeuf 
139379753949SSebastien Boeuf         assert!(!ctx.muxer.has_pending_rx());
139479753949SSebastien Boeuf     }
1395bb3cf7c3SStefano Garzarella 
1396bb3cf7c3SStefano Garzarella     #[test]
test_regression_handshake()1397bb3cf7c3SStefano Garzarella     fn test_regression_handshake() {
1398bb3cf7c3SStefano Garzarella         // Address one of the issues found while fixing the following issue:
1399bb3cf7c3SStefano Garzarella         // https://github.com/firecracker-microvm/firecracker/issues/1751
1400bb3cf7c3SStefano Garzarella         // This test checks that the handshake message is not accounted for
1401bb3cf7c3SStefano Garzarella         let mut ctx = MuxerTestContext::new("regression_handshake");
1402bb3cf7c3SStefano Garzarella         let peer_port = 1025;
1403bb3cf7c3SStefano Garzarella 
1404bb3cf7c3SStefano Garzarella         // Create a local connection.
1405bb3cf7c3SStefano Garzarella         let (_, local_port) = ctx.local_connect(peer_port);
1406bb3cf7c3SStefano Garzarella 
1407bb3cf7c3SStefano Garzarella         // Get the connection from the connection map.
1408bb3cf7c3SStefano Garzarella         let key = ConnMapKey {
1409bb3cf7c3SStefano Garzarella             local_port,
1410bb3cf7c3SStefano Garzarella             peer_port,
1411bb3cf7c3SStefano Garzarella         };
1412bb3cf7c3SStefano Garzarella         let conn = ctx.muxer.conn_map.get_mut(&key).unwrap();
1413bb3cf7c3SStefano Garzarella 
1414bb3cf7c3SStefano Garzarella         // Check that fwd_cnt is 0 - "OK ..." was not accounted for.
1415bb3cf7c3SStefano Garzarella         assert_eq!(conn.fwd_cnt().0, 0);
1416bb3cf7c3SStefano Garzarella     }
1417f756174bSStefano Garzarella 
1418f756174bSStefano Garzarella     #[test]
test_regression_rxq_pop()1419f756174bSStefano Garzarella     fn test_regression_rxq_pop() {
1420f756174bSStefano Garzarella         // Address one of the issues found while fixing the following issue:
1421f756174bSStefano Garzarella         // https://github.com/firecracker-microvm/firecracker/issues/1751
1422f756174bSStefano Garzarella         // This test checks that a connection is not popped out of the muxer
1423f756174bSStefano Garzarella         // rxq when multiple flags are set
1424f756174bSStefano Garzarella         let mut ctx = MuxerTestContext::new("regression_rxq_pop");
1425f756174bSStefano Garzarella         let peer_port = 1025;
1426f756174bSStefano Garzarella         let (mut stream, local_port) = ctx.local_connect(peer_port);
1427f756174bSStefano Garzarella 
1428f756174bSStefano Garzarella         // Send some data.
1429f756174bSStefano Garzarella         let data = [5u8, 6, 7, 8];
1430f756174bSStefano Garzarella         stream.write_all(&data).unwrap();
1431f756174bSStefano Garzarella         ctx.notify_muxer();
1432f756174bSStefano Garzarella 
1433f756174bSStefano Garzarella         // Get the connection from the connection map.
1434f756174bSStefano Garzarella         let key = ConnMapKey {
1435f756174bSStefano Garzarella             local_port,
1436f756174bSStefano Garzarella             peer_port,
1437f756174bSStefano Garzarella         };
1438f756174bSStefano Garzarella         let conn = ctx.muxer.conn_map.get_mut(&key).unwrap();
1439f756174bSStefano Garzarella 
1440f756174bSStefano Garzarella         // Forcefully insert another flag.
1441f756174bSStefano Garzarella         conn.insert_credit_update();
1442f756174bSStefano Garzarella 
1443f756174bSStefano Garzarella         // Call recv twice in order to check that the connection is still
1444f756174bSStefano Garzarella         // in the rxq.
1445f756174bSStefano Garzarella         assert!(ctx.muxer.has_pending_rx());
1446f756174bSStefano Garzarella         ctx.recv();
1447f756174bSStefano Garzarella         assert!(ctx.muxer.has_pending_rx());
1448f756174bSStefano Garzarella         ctx.recv();
1449f756174bSStefano Garzarella 
1450f756174bSStefano Garzarella         // Since initially the connection had two flags set, now there should
1451f756174bSStefano Garzarella         // not be any pending RX in the muxer.
1452f756174bSStefano Garzarella         assert!(!ctx.muxer.has_pending_rx());
1453f756174bSStefano Garzarella     }
145479753949SSebastien Boeuf }
1455