1434a5d0eSSebastien Boeuf // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2434a5d0eSSebastien Boeuf // SPDX-License-Identifier: Apache-2.0 3434a5d0eSSebastien Boeuf // 4434a5d0eSSebastien Boeuf 5f6236087SAlyssa Ross //! `VsockMuxer` is the device-facing component of the Unix domain sockets vsock backend. I.e. 6f6236087SAlyssa Ross //! by implementing the `VsockBackend` trait, it abstracts away the gory details of translating 7f6236087SAlyssa Ross //! between AF_VSOCK and AF_UNIX, and presents a clean interface to the rest of the vsock 8f6236087SAlyssa Ross //! device model. 9f6236087SAlyssa Ross //! 10f6236087SAlyssa Ross //! The vsock muxer has two main roles: 11f6236087SAlyssa Ross //! 12f6236087SAlyssa Ross //! ## Vsock connection multiplexer 13f6236087SAlyssa Ross //! 14f6236087SAlyssa Ross //! It's the muxer's job to create, manage, and terminate `VsockConnection` objects. The 15f6236087SAlyssa Ross //! muxer also routes packets to their owning connections. It does so via a connection 16f6236087SAlyssa Ross //! `HashMap`, keyed by what is basically a (host_port, guest_port) tuple. 17f6236087SAlyssa Ross //! 18f6236087SAlyssa Ross //! Vsock packet traffic needs to be inspected, in order to detect connection request 19f6236087SAlyssa Ross //! packets (leading to the creation of a new connection), and connection reset packets 20f6236087SAlyssa Ross //! (leading to the termination of an existing connection). All other packets, though, must 21f6236087SAlyssa Ross //! belong to an existing connection and, as such, the muxer simply forwards them. 22f6236087SAlyssa Ross //! 23f6236087SAlyssa Ross //! ## Event dispatcher 24f6236087SAlyssa Ross //! 25f6236087SAlyssa Ross //! There are three event categories that the vsock backend is interested it: 26f6236087SAlyssa Ross //! 1. A new host-initiated connection is ready to be accepted from the listening host Unix 27f6236087SAlyssa Ross //! socket; 28f6236087SAlyssa Ross //! 2. Data is available for reading from a newly-accepted host-initiated connection (i.e. 29f6236087SAlyssa Ross //! the host is ready to issue a vsock connection request, informing us of the 30f6236087SAlyssa Ross //! destination port to which it wants to connect); 31f6236087SAlyssa Ross //! 3. Some event was triggered for a connected Unix socket, that belongs to a 32f6236087SAlyssa Ross //! `VsockConnection`. 33f6236087SAlyssa Ross //! 34f6236087SAlyssa Ross //! The muxer gets notified about all of these events, because, as a `VsockEpollListener` 3542e9632cSJosh Soref //! implementor, it gets to register a nested epoll FD into the main VMM epoll()ing loop. All 36f6236087SAlyssa Ross //! other pollable FDs are then registered under this nested epoll FD. 37f6236087SAlyssa Ross //! 38f6236087SAlyssa Ross //! To route all these events to their handlers, the muxer uses another `HashMap` object, 39f6236087SAlyssa Ross //! mapping `RawFd`s to `EpollListener`s. 40f6236087SAlyssa Ross 41434a5d0eSSebastien Boeuf use std::collections::{HashMap, HashSet}; 4235782bd9SBo Chen use std::fs::File; 4348de8007SAlyssa Ross use std::io::{self, ErrorKind, Read}; 4435782bd9SBo Chen use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; 45434a5d0eSSebastien Boeuf use std::os::unix::net::{UnixListener, UnixStream}; 46434a5d0eSSebastien Boeuf 479701fde2SSebastien Boeuf use super::super::csm::ConnState; 48434a5d0eSSebastien Boeuf use super::super::defs::uapi; 49434a5d0eSSebastien Boeuf use super::super::packet::VsockPacket; 50434a5d0eSSebastien Boeuf use super::super::{ 51434a5d0eSSebastien Boeuf Result as VsockResult, VsockBackend, VsockChannel, VsockEpollListener, VsockError, 52434a5d0eSSebastien Boeuf }; 53434a5d0eSSebastien Boeuf use super::muxer_killq::MuxerKillQ; 54434a5d0eSSebastien Boeuf use super::muxer_rxq::MuxerRxQ; 55*61e57e1cSRuoqing He use super::{defs, Error, MuxerConnection, Result}; 56434a5d0eSSebastien Boeuf 57434a5d0eSSebastien Boeuf /// A unique identifier of a `MuxerConnection` object. Connections are stored in a hash map, 58434a5d0eSSebastien Boeuf /// keyed by a `ConnMapKey` object. 59434a5d0eSSebastien Boeuf /// 60434a5d0eSSebastien Boeuf #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] 61434a5d0eSSebastien Boeuf pub struct ConnMapKey { 62434a5d0eSSebastien Boeuf local_port: u32, 63434a5d0eSSebastien Boeuf peer_port: u32, 64434a5d0eSSebastien Boeuf } 65434a5d0eSSebastien Boeuf 66434a5d0eSSebastien Boeuf /// A muxer RX queue item. 67434a5d0eSSebastien Boeuf /// 685fc52a50SStefano Garzarella #[derive(Clone, Copy, Debug)] 69434a5d0eSSebastien Boeuf pub enum MuxerRx { 70434a5d0eSSebastien Boeuf /// The packet must be fetched from the connection identified by `ConnMapKey`. 71434a5d0eSSebastien Boeuf ConnRx(ConnMapKey), 72434a5d0eSSebastien Boeuf /// The muxer must produce an RST packet. 73434a5d0eSSebastien Boeuf RstPkt { local_port: u32, peer_port: u32 }, 74434a5d0eSSebastien Boeuf } 75434a5d0eSSebastien Boeuf 76434a5d0eSSebastien Boeuf /// An epoll listener, registered under the muxer's nested epoll FD. 77434a5d0eSSebastien Boeuf /// 78434a5d0eSSebastien Boeuf enum EpollListener { 79434a5d0eSSebastien Boeuf /// The listener is a `MuxerConnection`, identified by `key`, and interested in the events 80434a5d0eSSebastien Boeuf /// in `evset`. Since `MuxerConnection` implements `VsockEpollListener`, notifications will 81434a5d0eSSebastien Boeuf /// be forwarded to the listener via `VsockEpollListener::notify()`. 82434a5d0eSSebastien Boeuf Connection { 83434a5d0eSSebastien Boeuf key: ConnMapKey, 84434a5d0eSSebastien Boeuf evset: epoll::Events, 85434a5d0eSSebastien Boeuf }, 86434a5d0eSSebastien Boeuf /// A listener interested in new host-initiated connections. 87434a5d0eSSebastien Boeuf HostSock, 88a807b91fSAlyssa Ross /// A listener interested in reading host "connect \<port>" commands from a freshly 89434a5d0eSSebastien Boeuf /// connected host socket. 90434a5d0eSSebastien Boeuf LocalStream(UnixStream), 91434a5d0eSSebastien Boeuf } 92434a5d0eSSebastien Boeuf 9348de8007SAlyssa Ross /// A partially read "CONNECT" command. 9448de8007SAlyssa Ross #[derive(Default)] 9548de8007SAlyssa Ross struct PartiallyReadCommand { 9648de8007SAlyssa Ross /// The bytes of the command that have been read so far. 9748de8007SAlyssa Ross buf: [u8; 32], 9848de8007SAlyssa Ross /// How much of `buf` has been used. 9948de8007SAlyssa Ross len: usize, 10048de8007SAlyssa Ross } 10148de8007SAlyssa Ross 102434a5d0eSSebastien Boeuf /// The vsock connection multiplexer. 103434a5d0eSSebastien Boeuf /// 104434a5d0eSSebastien Boeuf pub struct VsockMuxer { 105434a5d0eSSebastien Boeuf /// Guest CID. 106434a5d0eSSebastien Boeuf cid: u64, 107434a5d0eSSebastien Boeuf /// A hash map used to store the active connections. 108434a5d0eSSebastien Boeuf conn_map: HashMap<ConnMapKey, MuxerConnection>, 109434a5d0eSSebastien Boeuf /// A hash map used to store epoll event listeners / handlers. 110434a5d0eSSebastien Boeuf listener_map: HashMap<RawFd, EpollListener>, 11148de8007SAlyssa Ross /// A hash map used to store partially read "connect" commands. 11248de8007SAlyssa Ross partial_command_map: HashMap<RawFd, PartiallyReadCommand>, 113434a5d0eSSebastien Boeuf /// The RX queue. Items in this queue are consumed by `VsockMuxer::recv_pkt()`, and 114434a5d0eSSebastien Boeuf /// produced 115434a5d0eSSebastien Boeuf /// - by `VsockMuxer::send_pkt()` (e.g. RST in response to a connection request packet); 116434a5d0eSSebastien Boeuf /// and 117434a5d0eSSebastien Boeuf /// - in response to EPOLLIN events (e.g. data available to be read from an AF_UNIX 118434a5d0eSSebastien Boeuf /// socket). 119434a5d0eSSebastien Boeuf rxq: MuxerRxQ, 120434a5d0eSSebastien Boeuf /// A queue used for terminating connections that are taking too long to shut down. 121434a5d0eSSebastien Boeuf killq: MuxerKillQ, 122434a5d0eSSebastien Boeuf /// The Unix socket, through which host-initiated connections are accepted. 123434a5d0eSSebastien Boeuf host_sock: UnixListener, 124434a5d0eSSebastien Boeuf /// The file system path of the host-side Unix socket. This is used to figure out the path 125a807b91fSAlyssa Ross /// to Unix sockets listening on specific ports. I.e. "\<this path>_\<port number>". 126434a5d0eSSebastien Boeuf host_sock_path: String, 12735782bd9SBo Chen /// The nested epoll File, used to register epoll listeners. 12835782bd9SBo Chen epoll_file: File, 129434a5d0eSSebastien Boeuf /// A hash set used to keep track of used host-side (local) ports, in order to assign local 130434a5d0eSSebastien Boeuf /// ports to host-initiated connections. 131434a5d0eSSebastien Boeuf local_port_set: HashSet<u32>, 132434a5d0eSSebastien Boeuf /// The last used host-side port. 133434a5d0eSSebastien Boeuf local_port_last: u32, 134434a5d0eSSebastien Boeuf } 135434a5d0eSSebastien Boeuf 136434a5d0eSSebastien Boeuf impl VsockChannel for VsockMuxer { 137434a5d0eSSebastien Boeuf /// Deliver a vsock packet to the guest vsock driver. 138434a5d0eSSebastien Boeuf /// 1395c3f4dbeSJosh Soref /// Returns: 140434a5d0eSSebastien Boeuf /// - `Ok(())`: `pkt` has been successfully filled in; or 141434a5d0eSSebastien Boeuf /// - `Err(VsockError::NoData)`: there was no available data with which to fill in the 142434a5d0eSSebastien Boeuf /// packet. 143434a5d0eSSebastien Boeuf /// recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()>144434a5d0eSSebastien Boeuf fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> { 145434a5d0eSSebastien Boeuf // We'll look for instructions on how to build the RX packet in the RX queue. If the 146434a5d0eSSebastien Boeuf // queue is empty, that doesn't necessarily mean we don't have any pending RX, since 147434a5d0eSSebastien Boeuf // the queue might be out-of-sync. If that's the case, we'll attempt to sync it first, 148434a5d0eSSebastien Boeuf // and then try to pop something out again. 149434a5d0eSSebastien Boeuf if self.rxq.is_empty() && !self.rxq.is_synced() { 150434a5d0eSSebastien Boeuf self.rxq = MuxerRxQ::from_conn_map(&self.conn_map); 151434a5d0eSSebastien Boeuf } 152434a5d0eSSebastien Boeuf 1535fc52a50SStefano Garzarella while let Some(rx) = self.rxq.peek() { 154434a5d0eSSebastien Boeuf let res = match rx { 155434a5d0eSSebastien Boeuf // We need to build an RST packet, going from `local_port` to `peer_port`. 156434a5d0eSSebastien Boeuf MuxerRx::RstPkt { 157434a5d0eSSebastien Boeuf local_port, 158434a5d0eSSebastien Boeuf peer_port, 159434a5d0eSSebastien Boeuf } => { 160434a5d0eSSebastien Boeuf pkt.set_op(uapi::VSOCK_OP_RST) 161434a5d0eSSebastien Boeuf .set_src_cid(uapi::VSOCK_HOST_CID) 162434a5d0eSSebastien Boeuf .set_dst_cid(self.cid) 163434a5d0eSSebastien Boeuf .set_src_port(local_port) 164434a5d0eSSebastien Boeuf .set_dst_port(peer_port) 165434a5d0eSSebastien Boeuf .set_len(0) 166434a5d0eSSebastien Boeuf .set_type(uapi::VSOCK_TYPE_STREAM) 167434a5d0eSSebastien Boeuf .set_flags(0) 168434a5d0eSSebastien Boeuf .set_buf_alloc(0) 169434a5d0eSSebastien Boeuf .set_fwd_cnt(0); 1705fc52a50SStefano Garzarella self.rxq.pop().unwrap(); 171434a5d0eSSebastien Boeuf return Ok(()); 172434a5d0eSSebastien Boeuf } 173434a5d0eSSebastien Boeuf 174434a5d0eSSebastien Boeuf // We'll defer building the packet to this connection, since it has something 175434a5d0eSSebastien Boeuf // to say. 176434a5d0eSSebastien Boeuf MuxerRx::ConnRx(key) => { 177434a5d0eSSebastien Boeuf let mut conn_res = Err(VsockError::NoData); 1785fc52a50SStefano Garzarella let mut do_pop = true; 179434a5d0eSSebastien Boeuf self.apply_conn_mutation(key, |conn| { 180434a5d0eSSebastien Boeuf conn_res = conn.recv_pkt(pkt); 1815fc52a50SStefano Garzarella do_pop = !conn.has_pending_rx(); 182434a5d0eSSebastien Boeuf }); 1835fc52a50SStefano Garzarella if do_pop { 1845fc52a50SStefano Garzarella self.rxq.pop().unwrap(); 1855fc52a50SStefano Garzarella } 186434a5d0eSSebastien Boeuf conn_res 187434a5d0eSSebastien Boeuf } 188434a5d0eSSebastien Boeuf }; 189434a5d0eSSebastien Boeuf 190434a5d0eSSebastien Boeuf if res.is_ok() { 191434a5d0eSSebastien Boeuf // Inspect traffic, looking for RST packets, since that means we have to 192434a5d0eSSebastien Boeuf // terminate and remove this connection from the active connection pool. 193434a5d0eSSebastien Boeuf // 194434a5d0eSSebastien Boeuf if pkt.op() == uapi::VSOCK_OP_RST { 195434a5d0eSSebastien Boeuf self.remove_connection(ConnMapKey { 196434a5d0eSSebastien Boeuf local_port: pkt.src_port(), 197434a5d0eSSebastien Boeuf peer_port: pkt.dst_port(), 198434a5d0eSSebastien Boeuf }); 199434a5d0eSSebastien Boeuf } 200434a5d0eSSebastien Boeuf 201434a5d0eSSebastien Boeuf debug!("vsock muxer: RX pkt: {:?}", pkt.hdr()); 202434a5d0eSSebastien Boeuf return Ok(()); 203434a5d0eSSebastien Boeuf } 204434a5d0eSSebastien Boeuf } 205434a5d0eSSebastien Boeuf 206434a5d0eSSebastien Boeuf Err(VsockError::NoData) 207434a5d0eSSebastien Boeuf } 208434a5d0eSSebastien Boeuf 209434a5d0eSSebastien Boeuf /// Deliver a guest-generated packet to its destination in the vsock backend. 210434a5d0eSSebastien Boeuf /// 211434a5d0eSSebastien Boeuf /// This absorbs unexpected packets, handles RSTs (by dropping connections), and forwards 212434a5d0eSSebastien Boeuf /// all the rest to their owning `MuxerConnection`. 213434a5d0eSSebastien Boeuf /// 214434a5d0eSSebastien Boeuf /// Returns: 215434a5d0eSSebastien Boeuf /// always `Ok(())` - the packet has been consumed, and its virtio TX buffers can be 216434a5d0eSSebastien Boeuf /// returned to the guest vsock driver. 217434a5d0eSSebastien Boeuf /// send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()>218434a5d0eSSebastien Boeuf fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> { 219434a5d0eSSebastien Boeuf let conn_key = ConnMapKey { 220434a5d0eSSebastien Boeuf local_port: pkt.dst_port(), 221434a5d0eSSebastien Boeuf peer_port: pkt.src_port(), 222434a5d0eSSebastien Boeuf }; 223434a5d0eSSebastien Boeuf 224434a5d0eSSebastien Boeuf debug!( 225434a5d0eSSebastien Boeuf "vsock: muxer.send[rxq.len={}]: {:?}", 226434a5d0eSSebastien Boeuf self.rxq.len(), 227434a5d0eSSebastien Boeuf pkt.hdr() 228434a5d0eSSebastien Boeuf ); 229434a5d0eSSebastien Boeuf 230434a5d0eSSebastien Boeuf // If this packet has an unsupported type (!=stream), we must send back an RST. 231434a5d0eSSebastien Boeuf // 232434a5d0eSSebastien Boeuf if pkt.type_() != uapi::VSOCK_TYPE_STREAM { 233434a5d0eSSebastien Boeuf self.enq_rst(pkt.dst_port(), pkt.src_port()); 234434a5d0eSSebastien Boeuf return Ok(()); 235434a5d0eSSebastien Boeuf } 236434a5d0eSSebastien Boeuf 237434a5d0eSSebastien Boeuf // We don't know how to handle packets addressed to other CIDs. We only handle the host 238434a5d0eSSebastien Boeuf // part of the guest - host communication here. 239434a5d0eSSebastien Boeuf if pkt.dst_cid() != uapi::VSOCK_HOST_CID { 240434a5d0eSSebastien Boeuf info!( 241434a5d0eSSebastien Boeuf "vsock: dropping guest packet for unknown CID: {:?}", 242434a5d0eSSebastien Boeuf pkt.hdr() 243434a5d0eSSebastien Boeuf ); 244434a5d0eSSebastien Boeuf return Ok(()); 245434a5d0eSSebastien Boeuf } 246434a5d0eSSebastien Boeuf 247434a5d0eSSebastien Boeuf if !self.conn_map.contains_key(&conn_key) { 248434a5d0eSSebastien Boeuf // This packet can't be routed to any active connection (based on its src and dst 249434a5d0eSSebastien Boeuf // ports). The only orphan / unroutable packets we know how to handle are 250434a5d0eSSebastien Boeuf // connection requests. 251434a5d0eSSebastien Boeuf if pkt.op() == uapi::VSOCK_OP_REQUEST { 252434a5d0eSSebastien Boeuf // Oh, this is a connection request! 2535825ab2dSBo Chen self.handle_peer_request_pkt(pkt); 254434a5d0eSSebastien Boeuf } else { 255434a5d0eSSebastien Boeuf // Send back an RST, to let the drive know we weren't expecting this packet. 256434a5d0eSSebastien Boeuf self.enq_rst(pkt.dst_port(), pkt.src_port()); 257434a5d0eSSebastien Boeuf } 258434a5d0eSSebastien Boeuf return Ok(()); 259434a5d0eSSebastien Boeuf } 260434a5d0eSSebastien Boeuf 261434a5d0eSSebastien Boeuf // Right, we know where to send this packet, then (to `conn_key`). 262434a5d0eSSebastien Boeuf // However, if this is an RST, we have to forcefully terminate the connection, so 263434a5d0eSSebastien Boeuf // there's no point in forwarding it the packet. 264434a5d0eSSebastien Boeuf if pkt.op() == uapi::VSOCK_OP_RST { 265434a5d0eSSebastien Boeuf self.remove_connection(conn_key); 266434a5d0eSSebastien Boeuf return Ok(()); 267434a5d0eSSebastien Boeuf } 268434a5d0eSSebastien Boeuf 269434a5d0eSSebastien Boeuf // Alright, everything looks in order - forward this packet to its owning connection. 270434a5d0eSSebastien Boeuf let mut res: VsockResult<()> = Ok(()); 271434a5d0eSSebastien Boeuf self.apply_conn_mutation(conn_key, |conn| { 272434a5d0eSSebastien Boeuf res = conn.send_pkt(pkt); 273434a5d0eSSebastien Boeuf }); 274434a5d0eSSebastien Boeuf 275434a5d0eSSebastien Boeuf res 276434a5d0eSSebastien Boeuf } 277434a5d0eSSebastien Boeuf 278434a5d0eSSebastien Boeuf /// Check if the muxer has any pending RX data, with which to fill a guest-provided RX 279434a5d0eSSebastien Boeuf /// buffer. 280434a5d0eSSebastien Boeuf /// has_pending_rx(&self) -> bool281434a5d0eSSebastien Boeuf fn has_pending_rx(&self) -> bool { 282434a5d0eSSebastien Boeuf !self.rxq.is_empty() || !self.rxq.is_synced() 283434a5d0eSSebastien Boeuf } 284434a5d0eSSebastien Boeuf } 285434a5d0eSSebastien Boeuf 286434a5d0eSSebastien Boeuf impl VsockEpollListener for VsockMuxer { 287434a5d0eSSebastien Boeuf /// Get the FD to be registered for polling upstream (in the main VMM epoll loop, in this 288434a5d0eSSebastien Boeuf /// case). 289434a5d0eSSebastien Boeuf /// 290434a5d0eSSebastien Boeuf /// This will be the muxer's nested epoll FD. 291434a5d0eSSebastien Boeuf /// get_polled_fd(&self) -> RawFd292434a5d0eSSebastien Boeuf fn get_polled_fd(&self) -> RawFd { 29335782bd9SBo Chen self.epoll_file.as_raw_fd() 294434a5d0eSSebastien Boeuf } 295434a5d0eSSebastien Boeuf 296434a5d0eSSebastien Boeuf /// Get the epoll events to be polled upstream. 297434a5d0eSSebastien Boeuf /// 298434a5d0eSSebastien Boeuf /// Since the polled FD is a nested epoll FD, we're only interested in EPOLLIN events (i.e. 2995c3f4dbeSJosh Soref /// some event occurred on one of the FDs registered under our epoll FD). 300434a5d0eSSebastien Boeuf /// get_polled_evset(&self) -> epoll::Events301434a5d0eSSebastien Boeuf fn get_polled_evset(&self) -> epoll::Events { 302434a5d0eSSebastien Boeuf epoll::Events::EPOLLIN 303434a5d0eSSebastien Boeuf } 304434a5d0eSSebastien Boeuf 3055c3f4dbeSJosh Soref /// Notify the muxer about a pending event having occurred under its nested epoll FD. 306434a5d0eSSebastien Boeuf /// notify(&mut self, _: epoll::Events)307434a5d0eSSebastien Boeuf fn notify(&mut self, _: epoll::Events) { 308434a5d0eSSebastien Boeuf debug!("vsock: muxer received kick"); 309434a5d0eSSebastien Boeuf 310434a5d0eSSebastien Boeuf let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); 32]; 31156d7c042SSebastien Boeuf 'epoll: loop { 31235782bd9SBo Chen match epoll::wait(self.epoll_file.as_raw_fd(), 0, epoll_events.as_mut_slice()) { 313434a5d0eSSebastien Boeuf Ok(ev_cnt) => { 3144f05b846SWei Liu for evt in epoll_events.iter().take(ev_cnt) { 315434a5d0eSSebastien Boeuf self.handle_event( 3164f05b846SWei Liu evt.data as RawFd, 3174f05b846SWei Liu // It's ok to unwrap here, since the `evt.events` is filled 318434a5d0eSSebastien Boeuf // in by `epoll::wait()`, and therefore contains only valid epoll 319434a5d0eSSebastien Boeuf // flags. 3204f05b846SWei Liu epoll::Events::from_bits(evt.events).unwrap(), 321434a5d0eSSebastien Boeuf ); 322434a5d0eSSebastien Boeuf } 323434a5d0eSSebastien Boeuf } 324434a5d0eSSebastien Boeuf Err(e) => { 32556d7c042SSebastien Boeuf if e.kind() == io::ErrorKind::Interrupted { 32656d7c042SSebastien Boeuf // It's well defined from the epoll_wait() syscall 32756d7c042SSebastien Boeuf // documentation that the epoll loop can be interrupted 32856d7c042SSebastien Boeuf // before any of the requested events occurred or the 32956d7c042SSebastien Boeuf // timeout expired. In both those cases, epoll_wait() 33056d7c042SSebastien Boeuf // returns an error of type EINTR, but this should not 33156d7c042SSebastien Boeuf // be considered as a regular error. Instead it is more 33256d7c042SSebastien Boeuf // appropriate to retry, by calling into epoll_wait(). 33356d7c042SSebastien Boeuf continue; 33456d7c042SSebastien Boeuf } 335434a5d0eSSebastien Boeuf warn!("vsock: failed to consume muxer epoll event: {}", e); 336434a5d0eSSebastien Boeuf } 337434a5d0eSSebastien Boeuf } 33856d7c042SSebastien Boeuf break 'epoll; 33956d7c042SSebastien Boeuf } 340434a5d0eSSebastien Boeuf } 341434a5d0eSSebastien Boeuf } 342434a5d0eSSebastien Boeuf 343434a5d0eSSebastien Boeuf impl VsockBackend for VsockMuxer {} 344434a5d0eSSebastien Boeuf 345434a5d0eSSebastien Boeuf impl VsockMuxer { 346434a5d0eSSebastien Boeuf /// Muxer constructor. 347434a5d0eSSebastien Boeuf /// new(cid: u32, host_sock_path: String) -> Result<Self>348451d3fb2SAlyssa Ross pub fn new(cid: u32, host_sock_path: String) -> Result<Self> { 349434a5d0eSSebastien Boeuf // Create the nested epoll FD. This FD will be added to the VMM `EpollContext`, at 350434a5d0eSSebastien Boeuf // device activation time. 351434a5d0eSSebastien Boeuf let epoll_fd = epoll::create(true).map_err(Error::EpollFdCreate)?; 35235782bd9SBo Chen // Use 'File' to enforce closing on 'epoll_fd' 353c45d24dfSWei Liu // SAFETY: epoll_fd is a valid fd 35435782bd9SBo Chen let epoll_file = unsafe { File::from_raw_fd(epoll_fd) }; 355434a5d0eSSebastien Boeuf 356434a5d0eSSebastien Boeuf // Open/bind/listen on the host Unix socket, so we can accept host-initiated 357434a5d0eSSebastien Boeuf // connections. 358434a5d0eSSebastien Boeuf let host_sock = UnixListener::bind(&host_sock_path) 359434a5d0eSSebastien Boeuf .and_then(|sock| sock.set_nonblocking(true).map(|_| sock)) 360434a5d0eSSebastien Boeuf .map_err(Error::UnixBind)?; 361434a5d0eSSebastien Boeuf 362434a5d0eSSebastien Boeuf let mut muxer = Self { 363451d3fb2SAlyssa Ross cid: cid.into(), 364434a5d0eSSebastien Boeuf host_sock, 365434a5d0eSSebastien Boeuf host_sock_path, 36635782bd9SBo Chen epoll_file, 367434a5d0eSSebastien Boeuf rxq: MuxerRxQ::new(), 368434a5d0eSSebastien Boeuf conn_map: HashMap::with_capacity(defs::MAX_CONNECTIONS), 369434a5d0eSSebastien Boeuf listener_map: HashMap::with_capacity(defs::MAX_CONNECTIONS + 1), 37048de8007SAlyssa Ross partial_command_map: Default::default(), 371434a5d0eSSebastien Boeuf killq: MuxerKillQ::new(), 372434a5d0eSSebastien Boeuf local_port_last: (1u32 << 30) - 1, 373434a5d0eSSebastien Boeuf local_port_set: HashSet::with_capacity(defs::MAX_CONNECTIONS), 374434a5d0eSSebastien Boeuf }; 375434a5d0eSSebastien Boeuf 376434a5d0eSSebastien Boeuf muxer.add_listener(muxer.host_sock.as_raw_fd(), EpollListener::HostSock)?; 377434a5d0eSSebastien Boeuf Ok(muxer) 378434a5d0eSSebastien Boeuf } 379434a5d0eSSebastien Boeuf 380434a5d0eSSebastien Boeuf /// Handle/dispatch an epoll event to its listener. 381434a5d0eSSebastien Boeuf /// handle_event(&mut self, fd: RawFd, event_set: epoll::Events)382c47ab55eSSebastien Boeuf fn handle_event(&mut self, fd: RawFd, event_set: epoll::Events) { 383434a5d0eSSebastien Boeuf debug!( 384c47ab55eSSebastien Boeuf "vsock: muxer processing event: fd={}, event_set={:?}", 385c47ab55eSSebastien Boeuf fd, event_set 386434a5d0eSSebastien Boeuf ); 387434a5d0eSSebastien Boeuf 388434a5d0eSSebastien Boeuf match self.listener_map.get_mut(&fd) { 389434a5d0eSSebastien Boeuf // This event needs to be forwarded to a `MuxerConnection` that is listening for 390434a5d0eSSebastien Boeuf // it. 391434a5d0eSSebastien Boeuf // 392c47ab55eSSebastien Boeuf Some(EpollListener::Connection { key, evset: _ }) => { 393434a5d0eSSebastien Boeuf let key_copy = *key; 394434a5d0eSSebastien Boeuf // The handling of this event will most probably mutate the state of the 3955c3f4dbeSJosh Soref // receiving connection. We'll need to check for new pending RX, event set 396434a5d0eSSebastien Boeuf // mutation, and all that, so we're wrapping the event delivery inside those 397434a5d0eSSebastien Boeuf // checks. 398434a5d0eSSebastien Boeuf self.apply_conn_mutation(key_copy, |conn| { 399c47ab55eSSebastien Boeuf conn.notify(event_set); 400434a5d0eSSebastien Boeuf }); 401434a5d0eSSebastien Boeuf } 402434a5d0eSSebastien Boeuf 403434a5d0eSSebastien Boeuf // A new host-initiated connection is ready to be accepted. 404434a5d0eSSebastien Boeuf // 405434a5d0eSSebastien Boeuf Some(EpollListener::HostSock) => { 406434a5d0eSSebastien Boeuf if self.conn_map.len() == defs::MAX_CONNECTIONS { 407434a5d0eSSebastien Boeuf // If we're already maxed-out on connections, we'll just accept and 408434a5d0eSSebastien Boeuf // immediately discard this potentially new one. 409434a5d0eSSebastien Boeuf warn!("vsock: connection limit reached; refusing new host connection"); 410434a5d0eSSebastien Boeuf self.host_sock.accept().map(|_| 0).unwrap_or(0); 411434a5d0eSSebastien Boeuf return; 412434a5d0eSSebastien Boeuf } 413434a5d0eSSebastien Boeuf self.host_sock 414434a5d0eSSebastien Boeuf .accept() 415434a5d0eSSebastien Boeuf .map_err(Error::UnixAccept) 416434a5d0eSSebastien Boeuf .and_then(|(stream, _)| { 417434a5d0eSSebastien Boeuf stream 418434a5d0eSSebastien Boeuf .set_nonblocking(true) 419434a5d0eSSebastien Boeuf .map(|_| stream) 420434a5d0eSSebastien Boeuf .map_err(Error::UnixAccept) 421434a5d0eSSebastien Boeuf }) 422434a5d0eSSebastien Boeuf .and_then(|stream| { 423434a5d0eSSebastien Boeuf // Before forwarding this connection to a listening AF_VSOCK socket on 424434a5d0eSSebastien Boeuf // the guest side, we need to know the destination port. We'll read 425434a5d0eSSebastien Boeuf // that port from a "connect" command received on this socket, so the 426434a5d0eSSebastien Boeuf // next step is to ask to be notified the moment we can read from it. 427434a5d0eSSebastien Boeuf self.add_listener(stream.as_raw_fd(), EpollListener::LocalStream(stream)) 428434a5d0eSSebastien Boeuf }) 429434a5d0eSSebastien Boeuf .unwrap_or_else(|err| { 430434a5d0eSSebastien Boeuf warn!("vsock: unable to accept local connection: {:?}", err); 431434a5d0eSSebastien Boeuf }); 432434a5d0eSSebastien Boeuf } 433434a5d0eSSebastien Boeuf 434434a5d0eSSebastien Boeuf // Data is ready to be read from a host-initiated connection. That would be the 435434a5d0eSSebastien Boeuf // "connect" command that we're expecting. 436434a5d0eSSebastien Boeuf Some(EpollListener::LocalStream(_)) => { 43748de8007SAlyssa Ross if let Some(EpollListener::LocalStream(stream)) = self.listener_map.get_mut(&fd) { 43848de8007SAlyssa Ross let port = Self::read_local_stream_port(&mut self.partial_command_map, stream); 43948de8007SAlyssa Ross 44048de8007SAlyssa Ross if let Err(Error::UnixRead(ref e)) = port { 44148de8007SAlyssa Ross if e.kind() == ErrorKind::WouldBlock { 44248de8007SAlyssa Ross return; 44348de8007SAlyssa Ross } 44448de8007SAlyssa Ross } 44548de8007SAlyssa Ross 44648de8007SAlyssa Ross let stream = match self.remove_listener(fd) { 44748de8007SAlyssa Ross Some(EpollListener::LocalStream(s)) => s, 44848de8007SAlyssa Ross _ => unreachable!(), 44948de8007SAlyssa Ross }; 45048de8007SAlyssa Ross 45148de8007SAlyssa Ross port.and_then(|peer_port| { 45248de8007SAlyssa Ross let local_port = self.allocate_local_port(); 45348de8007SAlyssa Ross 454434a5d0eSSebastien Boeuf self.add_connection( 455434a5d0eSSebastien Boeuf ConnMapKey { 456434a5d0eSSebastien Boeuf local_port, 457434a5d0eSSebastien Boeuf peer_port, 458434a5d0eSSebastien Boeuf }, 459434a5d0eSSebastien Boeuf MuxerConnection::new_local_init( 460434a5d0eSSebastien Boeuf stream, 461434a5d0eSSebastien Boeuf uapi::VSOCK_HOST_CID, 462434a5d0eSSebastien Boeuf self.cid, 463434a5d0eSSebastien Boeuf local_port, 464434a5d0eSSebastien Boeuf peer_port, 465434a5d0eSSebastien Boeuf ), 466434a5d0eSSebastien Boeuf ) 467434a5d0eSSebastien Boeuf }) 468434a5d0eSSebastien Boeuf .unwrap_or_else(|err| { 469434a5d0eSSebastien Boeuf info!("vsock: error adding local-init connection: {:?}", err); 470434a5d0eSSebastien Boeuf }) 471434a5d0eSSebastien Boeuf } 472434a5d0eSSebastien Boeuf } 473434a5d0eSSebastien Boeuf 474434a5d0eSSebastien Boeuf _ => { 475c47ab55eSSebastien Boeuf info!( 476c47ab55eSSebastien Boeuf "vsock: unexpected event: fd={:?}, event_set={:?}", 477c47ab55eSSebastien Boeuf fd, event_set 478c47ab55eSSebastien Boeuf ); 479434a5d0eSSebastien Boeuf } 480434a5d0eSSebastien Boeuf } 481434a5d0eSSebastien Boeuf } 482434a5d0eSSebastien Boeuf 483434a5d0eSSebastien Boeuf /// Parse a host "connect" command, and extract the destination vsock port. 484434a5d0eSSebastien Boeuf /// read_local_stream_port( partial_command_map: &mut HashMap<RawFd, PartiallyReadCommand>, stream: &mut UnixStream, ) -> Result<u32>48548de8007SAlyssa Ross fn read_local_stream_port( 48648de8007SAlyssa Ross partial_command_map: &mut HashMap<RawFd, PartiallyReadCommand>, 48748de8007SAlyssa Ross stream: &mut UnixStream, 48848de8007SAlyssa Ross ) -> Result<u32> { 48948de8007SAlyssa Ross let command = partial_command_map.entry(stream.as_raw_fd()).or_default(); 490434a5d0eSSebastien Boeuf 491434a5d0eSSebastien Boeuf // This is the minimum number of bytes that we should be able to read, when parsing a 492434a5d0eSSebastien Boeuf // valid connection request. I.e. `b"connect 0\n".len()`. 49348de8007SAlyssa Ross const MIN_COMMAND_LEN: usize = 10; 494434a5d0eSSebastien Boeuf 495434a5d0eSSebastien Boeuf // Bring in the minimum number of bytes that we should be able to read. 49682ac114bSWei Liu stream 49782ac114bSWei Liu .read_exact(&mut command.buf[command.len..MIN_COMMAND_LEN]) 498434a5d0eSSebastien Boeuf .map_err(Error::UnixRead)?; 49982ac114bSWei Liu command.len = MIN_COMMAND_LEN; 500434a5d0eSSebastien Boeuf 501434a5d0eSSebastien Boeuf // Now, finish reading the destination port number, by bringing in one byte at a time, 502434a5d0eSSebastien Boeuf // until we reach an EOL terminator (or our buffer space runs out). Yeah, not 503434a5d0eSSebastien Boeuf // particularly proud of this approach, but it will have to do for now. 50448de8007SAlyssa Ross while command.buf[command.len - 1] != b'\n' && command.len < command.buf.len() { 50548de8007SAlyssa Ross command.len += stream 50648de8007SAlyssa Ross .read(&mut command.buf[command.len..=command.len]) 507434a5d0eSSebastien Boeuf .map_err(Error::UnixRead)?; 508434a5d0eSSebastien Boeuf } 509434a5d0eSSebastien Boeuf 51048de8007SAlyssa Ross let command = partial_command_map.remove(&stream.as_raw_fd()).unwrap(); 51148de8007SAlyssa Ross 51248de8007SAlyssa Ross let mut word_iter = std::str::from_utf8(&command.buf[..command.len]) 5136837de90SRob Bradford .map_err(Error::ConvertFromUtf8)? 514434a5d0eSSebastien Boeuf .split_whitespace(); 515434a5d0eSSebastien Boeuf 516434a5d0eSSebastien Boeuf word_iter 517434a5d0eSSebastien Boeuf .next() 518434a5d0eSSebastien Boeuf .ok_or(Error::InvalidPortRequest) 519434a5d0eSSebastien Boeuf .and_then(|word| { 520434a5d0eSSebastien Boeuf if word.to_lowercase() == "connect" { 521434a5d0eSSebastien Boeuf Ok(()) 522434a5d0eSSebastien Boeuf } else { 523434a5d0eSSebastien Boeuf Err(Error::InvalidPortRequest) 524434a5d0eSSebastien Boeuf } 525434a5d0eSSebastien Boeuf }) 526434a5d0eSSebastien Boeuf .and_then(|_| word_iter.next().ok_or(Error::InvalidPortRequest)) 5270a7bcc9aSSebastien Boeuf .and_then(|word| word.parse::<u32>().map_err(Error::ParseInteger)) 5280a7bcc9aSSebastien Boeuf .map_err(|e| Error::ReadStreamPort(Box::new(e))) 529434a5d0eSSebastien Boeuf } 530434a5d0eSSebastien Boeuf 531434a5d0eSSebastien Boeuf /// Add a new connection to the active connection pool. 532434a5d0eSSebastien Boeuf /// add_connection(&mut self, key: ConnMapKey, conn: MuxerConnection) -> Result<()>533434a5d0eSSebastien Boeuf fn add_connection(&mut self, key: ConnMapKey, conn: MuxerConnection) -> Result<()> { 534434a5d0eSSebastien Boeuf // We might need to make room for this new connection, so let's sweep the kill queue 535434a5d0eSSebastien Boeuf // first. It's fine to do this here because: 536434a5d0eSSebastien Boeuf // - unless the kill queue is out of sync, this is a pretty inexpensive operation; and 537434a5d0eSSebastien Boeuf // - we are under no pressure to respect any accurate timing for connection 538434a5d0eSSebastien Boeuf // termination. 539434a5d0eSSebastien Boeuf self.sweep_killq(); 540434a5d0eSSebastien Boeuf 541434a5d0eSSebastien Boeuf if self.conn_map.len() >= defs::MAX_CONNECTIONS { 542434a5d0eSSebastien Boeuf info!( 543434a5d0eSSebastien Boeuf "vsock: muxer connection limit reached ({})", 544434a5d0eSSebastien Boeuf defs::MAX_CONNECTIONS 545434a5d0eSSebastien Boeuf ); 546434a5d0eSSebastien Boeuf return Err(Error::TooManyConnections); 547434a5d0eSSebastien Boeuf } 548434a5d0eSSebastien Boeuf 549434a5d0eSSebastien Boeuf self.add_listener( 550434a5d0eSSebastien Boeuf conn.get_polled_fd(), 551434a5d0eSSebastien Boeuf EpollListener::Connection { 552434a5d0eSSebastien Boeuf key, 553434a5d0eSSebastien Boeuf evset: conn.get_polled_evset(), 554434a5d0eSSebastien Boeuf }, 555434a5d0eSSebastien Boeuf ) 5566b40f2dbSRob Bradford .map(|_| { 557434a5d0eSSebastien Boeuf if conn.has_pending_rx() { 558434a5d0eSSebastien Boeuf // We can safely ignore any error in adding a connection RX indication. Worst 559434a5d0eSSebastien Boeuf // case scenario, the RX queue will get desynchronized, but we'll handle that 560434a5d0eSSebastien Boeuf // the next time we need to yield an RX packet. 561434a5d0eSSebastien Boeuf self.rxq.push(MuxerRx::ConnRx(key)); 562434a5d0eSSebastien Boeuf } 563434a5d0eSSebastien Boeuf self.conn_map.insert(key, conn); 564434a5d0eSSebastien Boeuf }) 565434a5d0eSSebastien Boeuf } 566434a5d0eSSebastien Boeuf 567434a5d0eSSebastien Boeuf /// Remove a connection from the active connection poll. 568434a5d0eSSebastien Boeuf /// remove_connection(&mut self, key: ConnMapKey)569434a5d0eSSebastien Boeuf fn remove_connection(&mut self, key: ConnMapKey) { 570434a5d0eSSebastien Boeuf if let Some(conn) = self.conn_map.remove(&key) { 571434a5d0eSSebastien Boeuf self.remove_listener(conn.get_polled_fd()); 572434a5d0eSSebastien Boeuf } 573434a5d0eSSebastien Boeuf self.free_local_port(key.local_port); 574434a5d0eSSebastien Boeuf } 575434a5d0eSSebastien Boeuf 576434a5d0eSSebastien Boeuf /// Schedule a connection for immediate termination. 577434a5d0eSSebastien Boeuf /// I.e. as soon as we can also let our peer know we're dropping the connection, by sending 578434a5d0eSSebastien Boeuf /// it an RST packet. 579434a5d0eSSebastien Boeuf /// kill_connection(&mut self, key: ConnMapKey)580434a5d0eSSebastien Boeuf fn kill_connection(&mut self, key: ConnMapKey) { 581434a5d0eSSebastien Boeuf let mut had_rx = false; 582434a5d0eSSebastien Boeuf self.conn_map.entry(key).and_modify(|conn| { 583434a5d0eSSebastien Boeuf had_rx = conn.has_pending_rx(); 584434a5d0eSSebastien Boeuf conn.kill(); 585434a5d0eSSebastien Boeuf }); 586434a5d0eSSebastien Boeuf // This connection will now have an RST packet to yield, so we need to add it to the RX 587434a5d0eSSebastien Boeuf // queue. However, there's no point in doing that if it was already in the queue. 588434a5d0eSSebastien Boeuf if !had_rx { 589434a5d0eSSebastien Boeuf // We can safely ignore any error in adding a connection RX indication. Worst case 590434a5d0eSSebastien Boeuf // scenario, the RX queue will get desynchronized, but we'll handle that the next 591434a5d0eSSebastien Boeuf // time we need to yield an RX packet. 592434a5d0eSSebastien Boeuf self.rxq.push(MuxerRx::ConnRx(key)); 593434a5d0eSSebastien Boeuf } 594434a5d0eSSebastien Boeuf } 595434a5d0eSSebastien Boeuf 596434a5d0eSSebastien Boeuf /// Register a new epoll listener under the muxer's nested epoll FD. 597434a5d0eSSebastien Boeuf /// add_listener(&mut self, fd: RawFd, listener: EpollListener) -> Result<()>598434a5d0eSSebastien Boeuf fn add_listener(&mut self, fd: RawFd, listener: EpollListener) -> Result<()> { 599434a5d0eSSebastien Boeuf let evset = match listener { 600434a5d0eSSebastien Boeuf EpollListener::Connection { evset, .. } => evset, 601434a5d0eSSebastien Boeuf EpollListener::LocalStream(_) => epoll::Events::EPOLLIN, 602434a5d0eSSebastien Boeuf EpollListener::HostSock => epoll::Events::EPOLLIN, 603434a5d0eSSebastien Boeuf }; 604434a5d0eSSebastien Boeuf 605434a5d0eSSebastien Boeuf epoll::ctl( 60635782bd9SBo Chen self.epoll_file.as_raw_fd(), 607434a5d0eSSebastien Boeuf epoll::ControlOptions::EPOLL_CTL_ADD, 608434a5d0eSSebastien Boeuf fd, 609434a5d0eSSebastien Boeuf epoll::Event::new(evset, fd as u64), 610434a5d0eSSebastien Boeuf ) 6116b40f2dbSRob Bradford .map(|_| { 612434a5d0eSSebastien Boeuf self.listener_map.insert(fd, listener); 613434a5d0eSSebastien Boeuf }) 614434a5d0eSSebastien Boeuf .map_err(Error::EpollAdd)?; 615434a5d0eSSebastien Boeuf 616434a5d0eSSebastien Boeuf Ok(()) 617434a5d0eSSebastien Boeuf } 618434a5d0eSSebastien Boeuf 619434a5d0eSSebastien Boeuf /// Remove (and return) a previously registered epoll listener. 620434a5d0eSSebastien Boeuf /// remove_listener(&mut self, fd: RawFd) -> Option<EpollListener>621434a5d0eSSebastien Boeuf fn remove_listener(&mut self, fd: RawFd) -> Option<EpollListener> { 622434a5d0eSSebastien Boeuf let maybe_listener = self.listener_map.remove(&fd); 623434a5d0eSSebastien Boeuf 624434a5d0eSSebastien Boeuf if maybe_listener.is_some() { 625434a5d0eSSebastien Boeuf epoll::ctl( 62635782bd9SBo Chen self.epoll_file.as_raw_fd(), 627434a5d0eSSebastien Boeuf epoll::ControlOptions::EPOLL_CTL_DEL, 628434a5d0eSSebastien Boeuf fd, 629434a5d0eSSebastien Boeuf epoll::Event::new(epoll::Events::empty(), 0), 630434a5d0eSSebastien Boeuf ) 631434a5d0eSSebastien Boeuf .unwrap_or_else(|err| { 632434a5d0eSSebastien Boeuf warn!( 633434a5d0eSSebastien Boeuf "vosck muxer: error removing epoll listener for fd {:?}: {:?}", 634434a5d0eSSebastien Boeuf fd, err 635434a5d0eSSebastien Boeuf ); 636434a5d0eSSebastien Boeuf }); 637434a5d0eSSebastien Boeuf } 638434a5d0eSSebastien Boeuf 639434a5d0eSSebastien Boeuf maybe_listener 640434a5d0eSSebastien Boeuf } 641434a5d0eSSebastien Boeuf 642434a5d0eSSebastien Boeuf /// Allocate a host-side port to be assigned to a new host-initiated connection. 643434a5d0eSSebastien Boeuf /// 644434a5d0eSSebastien Boeuf /// allocate_local_port(&mut self) -> u32645434a5d0eSSebastien Boeuf fn allocate_local_port(&mut self) -> u32 { 646434a5d0eSSebastien Boeuf // TODO: this doesn't seem very space-efficient. 6475c3f4dbeSJosh Soref // Maybe rewrite this to limit port range and use a bitmap? 648434a5d0eSSebastien Boeuf // 649434a5d0eSSebastien Boeuf 650434a5d0eSSebastien Boeuf loop { 651434a5d0eSSebastien Boeuf self.local_port_last = (self.local_port_last + 1) & !(1 << 31) | (1 << 30); 652434a5d0eSSebastien Boeuf if self.local_port_set.insert(self.local_port_last) { 653434a5d0eSSebastien Boeuf break; 654434a5d0eSSebastien Boeuf } 655434a5d0eSSebastien Boeuf } 656434a5d0eSSebastien Boeuf self.local_port_last 657434a5d0eSSebastien Boeuf } 658434a5d0eSSebastien Boeuf 659434a5d0eSSebastien Boeuf /// Mark a previously used host-side port as free. 660434a5d0eSSebastien Boeuf /// free_local_port(&mut self, port: u32)661434a5d0eSSebastien Boeuf fn free_local_port(&mut self, port: u32) { 662434a5d0eSSebastien Boeuf self.local_port_set.remove(&port); 663434a5d0eSSebastien Boeuf } 664434a5d0eSSebastien Boeuf 6655c3f4dbeSJosh Soref /// Handle a new connection request coming from our peer (the guest vsock driver). 666434a5d0eSSebastien Boeuf /// 667434a5d0eSSebastien Boeuf /// This will attempt to connect to a host-side Unix socket, expected to be listening at 6685c3f4dbeSJosh Soref /// the file system path corresponding to the destination port. If successful, a new 669434a5d0eSSebastien Boeuf /// connection object will be created and added to the connection pool. On failure, a new 670434a5d0eSSebastien Boeuf /// RST packet will be scheduled for delivery to the guest. 671434a5d0eSSebastien Boeuf /// handle_peer_request_pkt(&mut self, pkt: &VsockPacket)672434a5d0eSSebastien Boeuf fn handle_peer_request_pkt(&mut self, pkt: &VsockPacket) { 673434a5d0eSSebastien Boeuf let port_path = format!("{}_{}", self.host_sock_path, pkt.dst_port()); 674434a5d0eSSebastien Boeuf 675434a5d0eSSebastien Boeuf UnixStream::connect(port_path) 676434a5d0eSSebastien Boeuf .and_then(|stream| stream.set_nonblocking(true).map(|_| stream)) 677434a5d0eSSebastien Boeuf .map_err(Error::UnixConnect) 678434a5d0eSSebastien Boeuf .and_then(|stream| { 679434a5d0eSSebastien Boeuf self.add_connection( 680434a5d0eSSebastien Boeuf ConnMapKey { 681434a5d0eSSebastien Boeuf local_port: pkt.dst_port(), 682434a5d0eSSebastien Boeuf peer_port: pkt.src_port(), 683434a5d0eSSebastien Boeuf }, 684434a5d0eSSebastien Boeuf MuxerConnection::new_peer_init( 685434a5d0eSSebastien Boeuf stream, 686434a5d0eSSebastien Boeuf uapi::VSOCK_HOST_CID, 687434a5d0eSSebastien Boeuf self.cid, 688434a5d0eSSebastien Boeuf pkt.dst_port(), 689434a5d0eSSebastien Boeuf pkt.src_port(), 690434a5d0eSSebastien Boeuf pkt.buf_alloc(), 691434a5d0eSSebastien Boeuf ), 692434a5d0eSSebastien Boeuf ) 693434a5d0eSSebastien Boeuf }) 694434a5d0eSSebastien Boeuf .unwrap_or_else(|_| self.enq_rst(pkt.dst_port(), pkt.src_port())); 695434a5d0eSSebastien Boeuf } 696434a5d0eSSebastien Boeuf 697434a5d0eSSebastien Boeuf /// Perform an action that might mutate a connection's state. 698434a5d0eSSebastien Boeuf /// 699434a5d0eSSebastien Boeuf /// This is used as shorthand for repetitive tasks that need to be performed after a 700434a5d0eSSebastien Boeuf /// connection object mutates. E.g. 701434a5d0eSSebastien Boeuf /// - update the connection's epoll listener; 702434a5d0eSSebastien Boeuf /// - schedule the connection to be queried for RX data; 703434a5d0eSSebastien Boeuf /// - kill the connection if an unrecoverable error occurs. 704434a5d0eSSebastien Boeuf /// apply_conn_mutation<F>(&mut self, key: ConnMapKey, mut_fn: F) where F: FnOnce(&mut MuxerConnection),705434a5d0eSSebastien Boeuf fn apply_conn_mutation<F>(&mut self, key: ConnMapKey, mut_fn: F) 706434a5d0eSSebastien Boeuf where 707434a5d0eSSebastien Boeuf F: FnOnce(&mut MuxerConnection), 708434a5d0eSSebastien Boeuf { 709434a5d0eSSebastien Boeuf if let Some(conn) = self.conn_map.get_mut(&key) { 710434a5d0eSSebastien Boeuf let had_rx = conn.has_pending_rx(); 711434a5d0eSSebastien Boeuf let was_expiring = conn.will_expire(); 7129701fde2SSebastien Boeuf let prev_state = conn.state(); 713434a5d0eSSebastien Boeuf 714434a5d0eSSebastien Boeuf mut_fn(conn); 715434a5d0eSSebastien Boeuf 7169701fde2SSebastien Boeuf // If this is a host-initiated connection that has just become established, we'll have 7179701fde2SSebastien Boeuf // to send an ack message to the host end. 7189701fde2SSebastien Boeuf if prev_state == ConnState::LocalInit && conn.state() == ConnState::Established { 719aca2baf4SStefano Garzarella let msg = format!("OK {}\n", key.local_port); 720aca2baf4SStefano Garzarella match conn.send_bytes_raw(msg.as_bytes()) { 721aca2baf4SStefano Garzarella Ok(written) if written == msg.len() => (), 722aca2baf4SStefano Garzarella Ok(_) => { 723aca2baf4SStefano Garzarella // If we can't write a dozen bytes to a pristine connection something 724aca2baf4SStefano Garzarella // must be really wrong. Killing it. 725aca2baf4SStefano Garzarella conn.kill(); 726aca2baf4SStefano Garzarella warn!("vsock: unable to fully write connection ack msg."); 727aca2baf4SStefano Garzarella } 728aca2baf4SStefano Garzarella Err(err) => { 7299701fde2SSebastien Boeuf conn.kill(); 7309701fde2SSebastien Boeuf warn!("vsock: unable to ack host connection: {:?}", err); 731aca2baf4SStefano Garzarella } 732aca2baf4SStefano Garzarella }; 7339701fde2SSebastien Boeuf } 7349701fde2SSebastien Boeuf 735434a5d0eSSebastien Boeuf // If the connection wasn't previously scheduled for RX, add it to our RX queue. 736434a5d0eSSebastien Boeuf if !had_rx && conn.has_pending_rx() { 737434a5d0eSSebastien Boeuf self.rxq.push(MuxerRx::ConnRx(key)); 738434a5d0eSSebastien Boeuf } 739434a5d0eSSebastien Boeuf 740434a5d0eSSebastien Boeuf // If the connection wasn't previously scheduled for termination, add it to the 741434a5d0eSSebastien Boeuf // kill queue. 742434a5d0eSSebastien Boeuf if !was_expiring && conn.will_expire() { 743434a5d0eSSebastien Boeuf // It's safe to unwrap here, since `conn.will_expire()` already guaranteed that 744434a5d0eSSebastien Boeuf // an `conn.expiry` is available. 745434a5d0eSSebastien Boeuf self.killq.push(key, conn.expiry().unwrap()); 746434a5d0eSSebastien Boeuf } 747434a5d0eSSebastien Boeuf 748434a5d0eSSebastien Boeuf let fd = conn.get_polled_fd(); 749434a5d0eSSebastien Boeuf let new_evset = conn.get_polled_evset(); 750434a5d0eSSebastien Boeuf if new_evset.is_empty() { 751434a5d0eSSebastien Boeuf // If the connection no longer needs epoll notifications, remove its listener 752434a5d0eSSebastien Boeuf // from our list. 753434a5d0eSSebastien Boeuf self.remove_listener(fd); 754434a5d0eSSebastien Boeuf return; 755434a5d0eSSebastien Boeuf } 756434a5d0eSSebastien Boeuf if let Some(EpollListener::Connection { evset, .. }) = self.listener_map.get_mut(&fd) { 757434a5d0eSSebastien Boeuf if *evset != new_evset { 758434a5d0eSSebastien Boeuf // If the set of events that the connection is interested in has changed, 759434a5d0eSSebastien Boeuf // we need to update its epoll listener. 760434a5d0eSSebastien Boeuf debug!( 761434a5d0eSSebastien Boeuf "vsock: updating listener for (lp={}, pp={}): old={:?}, new={:?}", 762434a5d0eSSebastien Boeuf key.local_port, key.peer_port, *evset, new_evset 763434a5d0eSSebastien Boeuf ); 764434a5d0eSSebastien Boeuf 765434a5d0eSSebastien Boeuf *evset = new_evset; 766434a5d0eSSebastien Boeuf epoll::ctl( 76735782bd9SBo Chen self.epoll_file.as_raw_fd(), 768434a5d0eSSebastien Boeuf epoll::ControlOptions::EPOLL_CTL_MOD, 769434a5d0eSSebastien Boeuf fd, 770434a5d0eSSebastien Boeuf epoll::Event::new(new_evset, fd as u64), 771434a5d0eSSebastien Boeuf ) 772434a5d0eSSebastien Boeuf .unwrap_or_else(|err| { 773434a5d0eSSebastien Boeuf // This really shouldn't happen, like, ever. However, "famous last 774434a5d0eSSebastien Boeuf // words" and all that, so let's just kill it with fire, and walk away. 775434a5d0eSSebastien Boeuf self.kill_connection(key); 776434a5d0eSSebastien Boeuf error!( 777434a5d0eSSebastien Boeuf "vsock: error updating epoll listener for (lp={}, pp={}): {:?}", 778434a5d0eSSebastien Boeuf key.local_port, key.peer_port, err 779434a5d0eSSebastien Boeuf ); 780434a5d0eSSebastien Boeuf }); 781434a5d0eSSebastien Boeuf } 782434a5d0eSSebastien Boeuf } else { 783434a5d0eSSebastien Boeuf // The connection had previously asked to be removed from the listener map (by 784434a5d0eSSebastien Boeuf // returning an empty event set via `get_polled_fd()`), but now wants back in. 785434a5d0eSSebastien Boeuf self.add_listener( 786434a5d0eSSebastien Boeuf fd, 787434a5d0eSSebastien Boeuf EpollListener::Connection { 788434a5d0eSSebastien Boeuf key, 789434a5d0eSSebastien Boeuf evset: new_evset, 790434a5d0eSSebastien Boeuf }, 791434a5d0eSSebastien Boeuf ) 792434a5d0eSSebastien Boeuf .unwrap_or_else(|err| { 793434a5d0eSSebastien Boeuf self.kill_connection(key); 794434a5d0eSSebastien Boeuf error!( 795434a5d0eSSebastien Boeuf "vsock: error updating epoll listener for (lp={}, pp={}): {:?}", 796434a5d0eSSebastien Boeuf key.local_port, key.peer_port, err 797434a5d0eSSebastien Boeuf ); 798434a5d0eSSebastien Boeuf }); 799434a5d0eSSebastien Boeuf } 800434a5d0eSSebastien Boeuf } 801434a5d0eSSebastien Boeuf } 802434a5d0eSSebastien Boeuf 803434a5d0eSSebastien Boeuf /// Check if any connections have timed out, and if so, schedule them for immediate 804434a5d0eSSebastien Boeuf /// termination. 805434a5d0eSSebastien Boeuf /// sweep_killq(&mut self)806434a5d0eSSebastien Boeuf fn sweep_killq(&mut self) { 807434a5d0eSSebastien Boeuf while let Some(key) = self.killq.pop() { 808434a5d0eSSebastien Boeuf // Connections don't get removed from the kill queue when their kill timer is 809434a5d0eSSebastien Boeuf // disarmed, since that would be a costly operation. This means we must check if 810434a5d0eSSebastien Boeuf // the connection has indeed expired, prior to killing it. 811434a5d0eSSebastien Boeuf let mut kill = false; 812434a5d0eSSebastien Boeuf self.conn_map 813434a5d0eSSebastien Boeuf .entry(key) 814434a5d0eSSebastien Boeuf .and_modify(|conn| kill = conn.has_expired()); 815434a5d0eSSebastien Boeuf if kill { 816434a5d0eSSebastien Boeuf self.kill_connection(key); 817434a5d0eSSebastien Boeuf } 818434a5d0eSSebastien Boeuf } 819434a5d0eSSebastien Boeuf 820434a5d0eSSebastien Boeuf if self.killq.is_empty() && !self.killq.is_synced() { 821434a5d0eSSebastien Boeuf self.killq = MuxerKillQ::from_conn_map(&self.conn_map); 822434a5d0eSSebastien Boeuf // If we've just re-created the kill queue, we can sweep it again; maybe there's 823434a5d0eSSebastien Boeuf // more to kill. 824434a5d0eSSebastien Boeuf self.sweep_killq(); 825434a5d0eSSebastien Boeuf } 826434a5d0eSSebastien Boeuf } 827434a5d0eSSebastien Boeuf 828434a5d0eSSebastien Boeuf /// Enqueue an RST packet into `self.rxq`. 829434a5d0eSSebastien Boeuf /// 830434a5d0eSSebastien Boeuf /// Enqueue errors aren't propagated up the call chain, since there is nothing we can do to 831434a5d0eSSebastien Boeuf /// handle them. We do, however, log a warning, since not being able to enqueue an RST 832434a5d0eSSebastien Boeuf /// packet means we have to drop it, which is not normal operation. 833434a5d0eSSebastien Boeuf /// enq_rst(&mut self, local_port: u32, peer_port: u32)834434a5d0eSSebastien Boeuf fn enq_rst(&mut self, local_port: u32, peer_port: u32) { 835434a5d0eSSebastien Boeuf let pushed = self.rxq.push(MuxerRx::RstPkt { 836434a5d0eSSebastien Boeuf local_port, 837434a5d0eSSebastien Boeuf peer_port, 838434a5d0eSSebastien Boeuf }); 839434a5d0eSSebastien Boeuf if !pushed { 840434a5d0eSSebastien Boeuf warn!( 841434a5d0eSSebastien Boeuf "vsock: muxer.rxq full; dropping RST packet for lp={}, pp={}", 842434a5d0eSSebastien Boeuf local_port, peer_port 843434a5d0eSSebastien Boeuf ); 844434a5d0eSSebastien Boeuf } 845434a5d0eSSebastien Boeuf } 846434a5d0eSSebastien Boeuf } 84779753949SSebastien Boeuf 84879753949SSebastien Boeuf #[cfg(test)] 84979753949SSebastien Boeuf mod tests { 85088a9f799SRob Bradford use std::io::Write; 85188a9f799SRob Bradford use std::path::{Path, PathBuf}; 85288a9f799SRob Bradford 85388a9f799SRob Bradford use virtio_queue::QueueOwnedT; 85488a9f799SRob Bradford 85579753949SSebastien Boeuf use super::super::super::csm::defs as csm_defs; 85679753949SSebastien Boeuf use super::super::super::tests::TestContext as VsockTestContext; 85779753949SSebastien Boeuf use super::*; 85879753949SSebastien Boeuf 859451d3fb2SAlyssa Ross const PEER_CID: u32 = 3; 86079753949SSebastien Boeuf const PEER_BUF_ALLOC: u32 = 64 * 1024; 86179753949SSebastien Boeuf 86279753949SSebastien Boeuf struct MuxerTestContext { 86379753949SSebastien Boeuf _vsock_test_ctx: VsockTestContext, 86479753949SSebastien Boeuf pkt: VsockPacket, 86579753949SSebastien Boeuf muxer: VsockMuxer, 86679753949SSebastien Boeuf } 86779753949SSebastien Boeuf 86879753949SSebastien Boeuf impl Drop for MuxerTestContext { drop(&mut self)86979753949SSebastien Boeuf fn drop(&mut self) { 87079753949SSebastien Boeuf std::fs::remove_file(self.muxer.host_sock_path.as_str()).unwrap(); 87179753949SSebastien Boeuf } 87279753949SSebastien Boeuf } 87379753949SSebastien Boeuf 87479753949SSebastien Boeuf impl MuxerTestContext { new(name: &str) -> Self87579753949SSebastien Boeuf fn new(name: &str) -> Self { 87679753949SSebastien Boeuf let vsock_test_ctx = VsockTestContext::new(); 87779753949SSebastien Boeuf let mut handler_ctx = vsock_test_ctx.create_epoll_handler_context(); 87879753949SSebastien Boeuf let pkt = VsockPacket::from_rx_virtq_head( 8790249e864SSebastien Boeuf &mut handler_ctx.handler.queues[0] 880a423bf13SSebastien Boeuf .iter(&vsock_test_ctx.mem) 8810249e864SSebastien Boeuf .unwrap() 88279753949SSebastien Boeuf .next() 88379753949SSebastien Boeuf .unwrap(), 884e2225bb4SSebastien Boeuf None, 88579753949SSebastien Boeuf ) 88679753949SSebastien Boeuf .unwrap(); 8875e527294SRob Bradford let uds_path = format!("test_vsock_{name}.sock"); 88879753949SSebastien Boeuf let muxer = VsockMuxer::new(PEER_CID, uds_path).unwrap(); 88979753949SSebastien Boeuf 89079753949SSebastien Boeuf Self { 89179753949SSebastien Boeuf _vsock_test_ctx: vsock_test_ctx, 89279753949SSebastien Boeuf pkt, 89379753949SSebastien Boeuf muxer, 89479753949SSebastien Boeuf } 89579753949SSebastien Boeuf } 89679753949SSebastien Boeuf init_pkt(&mut self, local_port: u32, peer_port: u32, op: u16) -> &mut VsockPacket89779753949SSebastien Boeuf fn init_pkt(&mut self, local_port: u32, peer_port: u32, op: u16) -> &mut VsockPacket { 89879753949SSebastien Boeuf for b in self.pkt.hdr_mut() { 89979753949SSebastien Boeuf *b = 0; 90079753949SSebastien Boeuf } 90179753949SSebastien Boeuf self.pkt 90279753949SSebastien Boeuf .set_type(uapi::VSOCK_TYPE_STREAM) 903451d3fb2SAlyssa Ross .set_src_cid(PEER_CID.into()) 90479753949SSebastien Boeuf .set_dst_cid(uapi::VSOCK_HOST_CID) 90579753949SSebastien Boeuf .set_src_port(peer_port) 90679753949SSebastien Boeuf .set_dst_port(local_port) 90779753949SSebastien Boeuf .set_op(op) 90879753949SSebastien Boeuf .set_buf_alloc(PEER_BUF_ALLOC) 90979753949SSebastien Boeuf } 91079753949SSebastien Boeuf init_data_pkt( &mut self, local_port: u32, peer_port: u32, data: &[u8], ) -> &mut VsockPacket91179753949SSebastien Boeuf fn init_data_pkt( 91279753949SSebastien Boeuf &mut self, 91379753949SSebastien Boeuf local_port: u32, 91479753949SSebastien Boeuf peer_port: u32, 91579753949SSebastien Boeuf data: &[u8], 91679753949SSebastien Boeuf ) -> &mut VsockPacket { 917a9ec0f33SBo Chen assert!(data.len() <= self.pkt.buf().unwrap().len()); 91879753949SSebastien Boeuf self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RW) 91979753949SSebastien Boeuf .set_len(data.len() as u32); 92079753949SSebastien Boeuf self.pkt.buf_mut().unwrap()[..data.len()].copy_from_slice(data); 92179753949SSebastien Boeuf &mut self.pkt 92279753949SSebastien Boeuf } 92379753949SSebastien Boeuf send(&mut self)92479753949SSebastien Boeuf fn send(&mut self) { 92579753949SSebastien Boeuf self.muxer.send_pkt(&self.pkt).unwrap(); 92679753949SSebastien Boeuf } 92779753949SSebastien Boeuf recv(&mut self)92879753949SSebastien Boeuf fn recv(&mut self) { 92979753949SSebastien Boeuf self.muxer.recv_pkt(&mut self.pkt).unwrap(); 93079753949SSebastien Boeuf } 93179753949SSebastien Boeuf notify_muxer(&mut self)93279753949SSebastien Boeuf fn notify_muxer(&mut self) { 93379753949SSebastien Boeuf self.muxer.notify(epoll::Events::EPOLLIN); 93479753949SSebastien Boeuf } 93579753949SSebastien Boeuf count_epoll_listeners(&self) -> (usize, usize)93679753949SSebastien Boeuf fn count_epoll_listeners(&self) -> (usize, usize) { 93779753949SSebastien Boeuf let mut local_lsn_count = 0usize; 93879753949SSebastien Boeuf let mut conn_lsn_count = 0usize; 93979753949SSebastien Boeuf for key in self.muxer.listener_map.values() { 94079753949SSebastien Boeuf match key { 94179753949SSebastien Boeuf EpollListener::LocalStream(_) => local_lsn_count += 1, 94279753949SSebastien Boeuf EpollListener::Connection { .. } => conn_lsn_count += 1, 94379753949SSebastien Boeuf _ => (), 94479753949SSebastien Boeuf }; 94579753949SSebastien Boeuf } 94679753949SSebastien Boeuf (local_lsn_count, conn_lsn_count) 94779753949SSebastien Boeuf } 94879753949SSebastien Boeuf create_local_listener(&self, port: u32) -> LocalListener94979753949SSebastien Boeuf fn create_local_listener(&self, port: u32) -> LocalListener { 95079753949SSebastien Boeuf LocalListener::new(format!("{}_{}", self.muxer.host_sock_path, port)) 95179753949SSebastien Boeuf } 95279753949SSebastien Boeuf local_connect(&mut self, peer_port: u32) -> (UnixStream, u32)95379753949SSebastien Boeuf fn local_connect(&mut self, peer_port: u32) -> (UnixStream, u32) { 95479753949SSebastien Boeuf let (init_local_lsn_count, init_conn_lsn_count) = self.count_epoll_listeners(); 95579753949SSebastien Boeuf 95679753949SSebastien Boeuf let mut stream = UnixStream::connect(self.muxer.host_sock_path.clone()).unwrap(); 95779753949SSebastien Boeuf stream.set_nonblocking(true).unwrap(); 95879753949SSebastien Boeuf // The muxer would now get notified of a new connection having arrived at its Unix 95979753949SSebastien Boeuf // socket, so it can accept it. 96079753949SSebastien Boeuf self.notify_muxer(); 96179753949SSebastien Boeuf 96279753949SSebastien Boeuf // Just after having accepted a new local connection, the muxer should've added a new 96379753949SSebastien Boeuf // `LocalStream` listener to its `listener_map`. 96479753949SSebastien Boeuf let (local_lsn_count, _) = self.count_epoll_listeners(); 96579753949SSebastien Boeuf assert_eq!(local_lsn_count, init_local_lsn_count + 1); 96679753949SSebastien Boeuf 9675e527294SRob Bradford let buf = format!("CONNECT {peer_port}\n"); 96879753949SSebastien Boeuf stream.write_all(buf.as_bytes()).unwrap(); 96979753949SSebastien Boeuf // The muxer would now get notified that data is available for reading from the locally 97079753949SSebastien Boeuf // initiated connection. 97179753949SSebastien Boeuf self.notify_muxer(); 97279753949SSebastien Boeuf 97379753949SSebastien Boeuf // Successfully reading and parsing the connection request should have removed the 97479753949SSebastien Boeuf // LocalStream epoll listener and added a Connection epoll listener. 97579753949SSebastien Boeuf let (local_lsn_count, conn_lsn_count) = self.count_epoll_listeners(); 97679753949SSebastien Boeuf assert_eq!(local_lsn_count, init_local_lsn_count); 97779753949SSebastien Boeuf assert_eq!(conn_lsn_count, init_conn_lsn_count + 1); 97879753949SSebastien Boeuf 97979753949SSebastien Boeuf // A LocalInit connection should've been added to the muxer connection map. A new 98079753949SSebastien Boeuf // local port should also have been allocated for the new LocalInit connection. 98179753949SSebastien Boeuf let local_port = self.muxer.local_port_last; 98279753949SSebastien Boeuf let key = ConnMapKey { 98379753949SSebastien Boeuf local_port, 98479753949SSebastien Boeuf peer_port, 98579753949SSebastien Boeuf }; 98679753949SSebastien Boeuf assert!(self.muxer.conn_map.contains_key(&key)); 98779753949SSebastien Boeuf assert!(self.muxer.local_port_set.contains(&local_port)); 98879753949SSebastien Boeuf 98979753949SSebastien Boeuf // A connection request for the peer should now be available from the muxer. 99079753949SSebastien Boeuf assert!(self.muxer.has_pending_rx()); 99179753949SSebastien Boeuf self.recv(); 99279753949SSebastien Boeuf assert_eq!(self.pkt.op(), uapi::VSOCK_OP_REQUEST); 99379753949SSebastien Boeuf assert_eq!(self.pkt.dst_port(), peer_port); 99479753949SSebastien Boeuf assert_eq!(self.pkt.src_port(), local_port); 99579753949SSebastien Boeuf 99679753949SSebastien Boeuf self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RESPONSE); 99779753949SSebastien Boeuf self.send(); 99879753949SSebastien Boeuf 999d0dbc7fbSYu Li let mut buf = [0u8; 32]; 10009701fde2SSebastien Boeuf let len = stream.read(&mut buf[..]).unwrap(); 10015e527294SRob Bradford assert_eq!(&buf[..len], format!("OK {local_port}\n").as_bytes()); 10029701fde2SSebastien Boeuf 100379753949SSebastien Boeuf (stream, local_port) 100479753949SSebastien Boeuf } 100579753949SSebastien Boeuf } 100679753949SSebastien Boeuf 100779753949SSebastien Boeuf struct LocalListener { 100879753949SSebastien Boeuf path: PathBuf, 100979753949SSebastien Boeuf sock: UnixListener, 101079753949SSebastien Boeuf } 101179753949SSebastien Boeuf impl LocalListener { new<P: AsRef<Path> + Clone>(path: P) -> Self101279753949SSebastien Boeuf fn new<P: AsRef<Path> + Clone>(path: P) -> Self { 1013cf86ca15SRob Bradford let path_buf = path.as_ref().to_path_buf(); 101479753949SSebastien Boeuf let sock = UnixListener::bind(path).unwrap(); 101579753949SSebastien Boeuf sock.set_nonblocking(true).unwrap(); 101679753949SSebastien Boeuf Self { 101779753949SSebastien Boeuf path: path_buf, 101879753949SSebastien Boeuf sock, 101979753949SSebastien Boeuf } 102079753949SSebastien Boeuf } accept(&mut self) -> UnixStream102179753949SSebastien Boeuf fn accept(&mut self) -> UnixStream { 102279753949SSebastien Boeuf let (stream, _) = self.sock.accept().unwrap(); 102379753949SSebastien Boeuf stream.set_nonblocking(true).unwrap(); 102479753949SSebastien Boeuf stream 102579753949SSebastien Boeuf } 102679753949SSebastien Boeuf } 102779753949SSebastien Boeuf impl Drop for LocalListener { drop(&mut self)102879753949SSebastien Boeuf fn drop(&mut self) { 102979753949SSebastien Boeuf std::fs::remove_file(&self.path).unwrap(); 103079753949SSebastien Boeuf } 103179753949SSebastien Boeuf } 103279753949SSebastien Boeuf 103379753949SSebastien Boeuf #[test] test_muxer_epoll_listener()103479753949SSebastien Boeuf fn test_muxer_epoll_listener() { 103579753949SSebastien Boeuf let ctx = MuxerTestContext::new("muxer_epoll_listener"); 103635782bd9SBo Chen assert_eq!(ctx.muxer.get_polled_fd(), ctx.muxer.epoll_file.as_raw_fd()); 103779753949SSebastien Boeuf assert_eq!(ctx.muxer.get_polled_evset(), epoll::Events::EPOLLIN); 103879753949SSebastien Boeuf } 103979753949SSebastien Boeuf 104079753949SSebastien Boeuf #[test] test_bad_peer_pkt()104179753949SSebastien Boeuf fn test_bad_peer_pkt() { 104279753949SSebastien Boeuf const LOCAL_PORT: u32 = 1026; 104379753949SSebastien Boeuf const PEER_PORT: u32 = 1025; 104479753949SSebastien Boeuf const SOCK_DGRAM: u16 = 2; 104579753949SSebastien Boeuf 104679753949SSebastien Boeuf let mut ctx = MuxerTestContext::new("bad_peer_pkt"); 104779753949SSebastien Boeuf ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST) 104879753949SSebastien Boeuf .set_type(SOCK_DGRAM); 104979753949SSebastien Boeuf ctx.send(); 105079753949SSebastien Boeuf 105179753949SSebastien Boeuf // The guest sent a SOCK_DGRAM packet. Per the vsock spec, we need to reply with an RST 105279753949SSebastien Boeuf // packet, since vsock only supports stream sockets. 105379753949SSebastien Boeuf assert!(ctx.muxer.has_pending_rx()); 105479753949SSebastien Boeuf ctx.recv(); 105579753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 105679753949SSebastien Boeuf assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1057451d3fb2SAlyssa Ross assert_eq!(ctx.pkt.dst_cid(), PEER_CID as u64); 105879753949SSebastien Boeuf assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 105979753949SSebastien Boeuf assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 106079753949SSebastien Boeuf 106179753949SSebastien Boeuf // Any orphan (i.e. without a connection), non-RST packet, should be replied to with an 106279753949SSebastien Boeuf // RST. 106379753949SSebastien Boeuf let bad_ops = [ 106479753949SSebastien Boeuf uapi::VSOCK_OP_RESPONSE, 106579753949SSebastien Boeuf uapi::VSOCK_OP_CREDIT_REQUEST, 106679753949SSebastien Boeuf uapi::VSOCK_OP_CREDIT_UPDATE, 106779753949SSebastien Boeuf uapi::VSOCK_OP_SHUTDOWN, 106879753949SSebastien Boeuf uapi::VSOCK_OP_RW, 106979753949SSebastien Boeuf ]; 107079753949SSebastien Boeuf for op in bad_ops.iter() { 107179753949SSebastien Boeuf ctx.init_pkt(LOCAL_PORT, PEER_PORT, *op); 107279753949SSebastien Boeuf ctx.send(); 107379753949SSebastien Boeuf assert!(ctx.muxer.has_pending_rx()); 107479753949SSebastien Boeuf ctx.recv(); 107579753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 107679753949SSebastien Boeuf assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 107779753949SSebastien Boeuf assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 107879753949SSebastien Boeuf } 107979753949SSebastien Boeuf 108079753949SSebastien Boeuf // Any packet addressed to anything other than VSOCK_VHOST_CID should get dropped. 108179753949SSebastien Boeuf assert!(!ctx.muxer.has_pending_rx()); 108279753949SSebastien Boeuf ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST) 108379753949SSebastien Boeuf .set_dst_cid(uapi::VSOCK_HOST_CID + 1); 108479753949SSebastien Boeuf ctx.send(); 108579753949SSebastien Boeuf assert!(!ctx.muxer.has_pending_rx()); 108679753949SSebastien Boeuf } 108779753949SSebastien Boeuf 108879753949SSebastien Boeuf #[test] test_peer_connection()108979753949SSebastien Boeuf fn test_peer_connection() { 109079753949SSebastien Boeuf const LOCAL_PORT: u32 = 1026; 109179753949SSebastien Boeuf const PEER_PORT: u32 = 1025; 109279753949SSebastien Boeuf 109379753949SSebastien Boeuf let mut ctx = MuxerTestContext::new("peer_connection"); 109479753949SSebastien Boeuf 109579753949SSebastien Boeuf // Test peer connection refused. 109679753949SSebastien Boeuf ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST); 109779753949SSebastien Boeuf ctx.send(); 109879753949SSebastien Boeuf ctx.recv(); 109979753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 110079753949SSebastien Boeuf assert_eq!(ctx.pkt.len(), 0); 110179753949SSebastien Boeuf assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1102451d3fb2SAlyssa Ross assert_eq!(ctx.pkt.dst_cid(), PEER_CID as u64); 110379753949SSebastien Boeuf assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 110479753949SSebastien Boeuf assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 110579753949SSebastien Boeuf 110679753949SSebastien Boeuf // Test peer connection accepted. 110779753949SSebastien Boeuf let mut listener = ctx.create_local_listener(LOCAL_PORT); 110879753949SSebastien Boeuf ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST); 110979753949SSebastien Boeuf ctx.send(); 111079753949SSebastien Boeuf assert_eq!(ctx.muxer.conn_map.len(), 1); 111179753949SSebastien Boeuf let mut stream = listener.accept(); 111279753949SSebastien Boeuf ctx.recv(); 111379753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 111479753949SSebastien Boeuf assert_eq!(ctx.pkt.len(), 0); 111579753949SSebastien Boeuf assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1116451d3fb2SAlyssa Ross assert_eq!(ctx.pkt.dst_cid(), PEER_CID as u64); 111779753949SSebastien Boeuf assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 111879753949SSebastien Boeuf assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 111979753949SSebastien Boeuf let key = ConnMapKey { 112079753949SSebastien Boeuf local_port: LOCAL_PORT, 112179753949SSebastien Boeuf peer_port: PEER_PORT, 112279753949SSebastien Boeuf }; 112379753949SSebastien Boeuf assert!(ctx.muxer.conn_map.contains_key(&key)); 112479753949SSebastien Boeuf 112579753949SSebastien Boeuf // Test guest -> host data flow. 112679753949SSebastien Boeuf let data = [1, 2, 3, 4]; 112779753949SSebastien Boeuf ctx.init_data_pkt(LOCAL_PORT, PEER_PORT, &data); 112879753949SSebastien Boeuf ctx.send(); 112979753949SSebastien Boeuf let mut buf = vec![0; data.len()]; 113079753949SSebastien Boeuf stream.read_exact(buf.as_mut_slice()).unwrap(); 113179753949SSebastien Boeuf assert_eq!(buf.as_slice(), data); 113279753949SSebastien Boeuf 113379753949SSebastien Boeuf // Test host -> guest data flow. 113479753949SSebastien Boeuf let data = [5u8, 6, 7, 8]; 113579753949SSebastien Boeuf stream.write_all(&data).unwrap(); 113679753949SSebastien Boeuf 113779753949SSebastien Boeuf // When data is available on the local stream, an EPOLLIN event would normally be delivered 113879753949SSebastien Boeuf // to the muxer's nested epoll FD. For testing only, we can fake that event notification 113979753949SSebastien Boeuf // here. 114079753949SSebastien Boeuf ctx.notify_muxer(); 114179753949SSebastien Boeuf // After being notified, the muxer should've figured out that RX data was available for one 114279753949SSebastien Boeuf // of its connections, so it should now be reporting that it can fill in an RX packet. 114379753949SSebastien Boeuf assert!(ctx.muxer.has_pending_rx()); 114479753949SSebastien Boeuf ctx.recv(); 114579753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 114679753949SSebastien Boeuf assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data); 114779753949SSebastien Boeuf assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 114879753949SSebastien Boeuf assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 114979753949SSebastien Boeuf 115079753949SSebastien Boeuf assert!(!ctx.muxer.has_pending_rx()); 115179753949SSebastien Boeuf } 115279753949SSebastien Boeuf 115379753949SSebastien Boeuf #[test] test_local_connection()115479753949SSebastien Boeuf fn test_local_connection() { 115579753949SSebastien Boeuf let mut ctx = MuxerTestContext::new("local_connection"); 115679753949SSebastien Boeuf let peer_port = 1025; 115779753949SSebastien Boeuf let (mut stream, local_port) = ctx.local_connect(peer_port); 115879753949SSebastien Boeuf 115979753949SSebastien Boeuf // Test guest -> host data flow. 116079753949SSebastien Boeuf let data = [1, 2, 3, 4]; 116179753949SSebastien Boeuf ctx.init_data_pkt(local_port, peer_port, &data); 116279753949SSebastien Boeuf ctx.send(); 116379753949SSebastien Boeuf 116479753949SSebastien Boeuf let mut buf = vec![0u8; data.len()]; 116579753949SSebastien Boeuf stream.read_exact(buf.as_mut_slice()).unwrap(); 116679753949SSebastien Boeuf assert_eq!(buf.as_slice(), &data); 116779753949SSebastien Boeuf 116879753949SSebastien Boeuf // Test host -> guest data flow. 116979753949SSebastien Boeuf let data = [5, 6, 7, 8]; 117079753949SSebastien Boeuf stream.write_all(&data).unwrap(); 117179753949SSebastien Boeuf ctx.notify_muxer(); 117279753949SSebastien Boeuf 117379753949SSebastien Boeuf assert!(ctx.muxer.has_pending_rx()); 117479753949SSebastien Boeuf ctx.recv(); 117579753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 117679753949SSebastien Boeuf assert_eq!(ctx.pkt.src_port(), local_port); 117779753949SSebastien Boeuf assert_eq!(ctx.pkt.dst_port(), peer_port); 117879753949SSebastien Boeuf assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data); 117979753949SSebastien Boeuf } 118079753949SSebastien Boeuf 118179753949SSebastien Boeuf #[test] test_local_close()118279753949SSebastien Boeuf fn test_local_close() { 118379753949SSebastien Boeuf let peer_port = 1025; 118479753949SSebastien Boeuf let mut ctx = MuxerTestContext::new("local_close"); 118579753949SSebastien Boeuf let local_port; 118679753949SSebastien Boeuf { 118779753949SSebastien Boeuf let (_stream, local_port_) = ctx.local_connect(peer_port); 118879753949SSebastien Boeuf local_port = local_port_; 118979753949SSebastien Boeuf } 119079753949SSebastien Boeuf // Local var `_stream` was now dropped, thus closing the local stream. After the muxer gets 119179753949SSebastien Boeuf // notified via EPOLLIN, it should attempt to gracefully shutdown the connection, issuing a 119279753949SSebastien Boeuf // VSOCK_OP_SHUTDOWN with both no-more-send and no-more-recv indications set. 119379753949SSebastien Boeuf ctx.notify_muxer(); 119479753949SSebastien Boeuf assert!(ctx.muxer.has_pending_rx()); 119579753949SSebastien Boeuf ctx.recv(); 119679753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN); 119779753949SSebastien Boeuf assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND, 0); 119879753949SSebastien Boeuf assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV, 0); 119979753949SSebastien Boeuf assert_eq!(ctx.pkt.src_port(), local_port); 120079753949SSebastien Boeuf assert_eq!(ctx.pkt.dst_port(), peer_port); 120179753949SSebastien Boeuf 120279753949SSebastien Boeuf // The connection should get removed (and its local port freed), after the peer replies 120379753949SSebastien Boeuf // with an RST. 120479753949SSebastien Boeuf ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RST); 120579753949SSebastien Boeuf ctx.send(); 120679753949SSebastien Boeuf let key = ConnMapKey { 120779753949SSebastien Boeuf local_port, 120879753949SSebastien Boeuf peer_port, 120979753949SSebastien Boeuf }; 121079753949SSebastien Boeuf assert!(!ctx.muxer.conn_map.contains_key(&key)); 121179753949SSebastien Boeuf assert!(!ctx.muxer.local_port_set.contains(&local_port)); 121279753949SSebastien Boeuf } 121379753949SSebastien Boeuf 121479753949SSebastien Boeuf #[test] test_peer_close()121579753949SSebastien Boeuf fn test_peer_close() { 121679753949SSebastien Boeuf let peer_port = 1025; 121779753949SSebastien Boeuf let local_port = 1026; 121879753949SSebastien Boeuf let mut ctx = MuxerTestContext::new("peer_close"); 121979753949SSebastien Boeuf 122079753949SSebastien Boeuf let mut sock = ctx.create_local_listener(local_port); 122179753949SSebastien Boeuf ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_REQUEST); 122279753949SSebastien Boeuf ctx.send(); 122379753949SSebastien Boeuf let mut stream = sock.accept(); 122479753949SSebastien Boeuf 122579753949SSebastien Boeuf assert!(ctx.muxer.has_pending_rx()); 122679753949SSebastien Boeuf ctx.recv(); 122779753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 122879753949SSebastien Boeuf assert_eq!(ctx.pkt.src_port(), local_port); 122979753949SSebastien Boeuf assert_eq!(ctx.pkt.dst_port(), peer_port); 123079753949SSebastien Boeuf let key = ConnMapKey { 123179753949SSebastien Boeuf local_port, 123279753949SSebastien Boeuf peer_port, 123379753949SSebastien Boeuf }; 123479753949SSebastien Boeuf assert!(ctx.muxer.conn_map.contains_key(&key)); 123579753949SSebastien Boeuf 123679753949SSebastien Boeuf // Emulate a full shutdown from the peer (no-more-send + no-more-recv). 123779753949SSebastien Boeuf ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_SHUTDOWN) 123879753949SSebastien Boeuf .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND) 123979753949SSebastien Boeuf .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV); 124079753949SSebastien Boeuf ctx.send(); 124179753949SSebastien Boeuf 124279753949SSebastien Boeuf // Now, the muxer should remove the connection from its map, and reply with an RST. 124379753949SSebastien Boeuf assert!(ctx.muxer.has_pending_rx()); 124479753949SSebastien Boeuf ctx.recv(); 124579753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 124679753949SSebastien Boeuf assert_eq!(ctx.pkt.src_port(), local_port); 124779753949SSebastien Boeuf assert_eq!(ctx.pkt.dst_port(), peer_port); 124879753949SSebastien Boeuf let key = ConnMapKey { 124979753949SSebastien Boeuf local_port, 125079753949SSebastien Boeuf peer_port, 125179753949SSebastien Boeuf }; 125279753949SSebastien Boeuf assert!(!ctx.muxer.conn_map.contains_key(&key)); 125379753949SSebastien Boeuf 125479753949SSebastien Boeuf // The muxer should also drop / close the local Unix socket for this connection. 125579753949SSebastien Boeuf let mut buf = vec![0u8; 16]; 125679753949SSebastien Boeuf assert_eq!(stream.read(buf.as_mut_slice()).unwrap(), 0); 125779753949SSebastien Boeuf } 125879753949SSebastien Boeuf 125979753949SSebastien Boeuf #[test] test_muxer_rxq()126079753949SSebastien Boeuf fn test_muxer_rxq() { 126179753949SSebastien Boeuf let mut ctx = MuxerTestContext::new("muxer_rxq"); 126279753949SSebastien Boeuf let local_port = 1026; 126379753949SSebastien Boeuf let peer_port_first = 1025; 126479753949SSebastien Boeuf let mut listener = ctx.create_local_listener(local_port); 126579753949SSebastien Boeuf let mut streams: Vec<UnixStream> = Vec::new(); 126679753949SSebastien Boeuf 126779753949SSebastien Boeuf for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE { 126879753949SSebastien Boeuf ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST); 126979753949SSebastien Boeuf ctx.send(); 127079753949SSebastien Boeuf streams.push(listener.accept()); 127179753949SSebastien Boeuf } 127279753949SSebastien Boeuf 12735c3f4dbeSJosh Soref // The muxer RX queue should now be full (with connection responses), but still 127479753949SSebastien Boeuf // synchronized. 127579753949SSebastien Boeuf assert!(ctx.muxer.rxq.is_synced()); 127679753949SSebastien Boeuf 127779753949SSebastien Boeuf // One more queued reply should desync the RX queue. 127879753949SSebastien Boeuf ctx.init_pkt( 127979753949SSebastien Boeuf local_port, 128079753949SSebastien Boeuf (peer_port_first + defs::MUXER_RXQ_SIZE) as u32, 128179753949SSebastien Boeuf uapi::VSOCK_OP_REQUEST, 128279753949SSebastien Boeuf ); 128379753949SSebastien Boeuf ctx.send(); 128479753949SSebastien Boeuf assert!(!ctx.muxer.rxq.is_synced()); 128579753949SSebastien Boeuf 128679753949SSebastien Boeuf // With an out-of-sync queue, an RST should evict any non-RST packet from the queue, and 128779753949SSebastien Boeuf // take its place. We'll check that by making sure that the last packet popped from the 128879753949SSebastien Boeuf // queue is an RST. 128979753949SSebastien Boeuf ctx.init_pkt( 129079753949SSebastien Boeuf local_port + 1, 129179753949SSebastien Boeuf peer_port_first as u32, 129279753949SSebastien Boeuf uapi::VSOCK_OP_REQUEST, 129379753949SSebastien Boeuf ); 129479753949SSebastien Boeuf ctx.send(); 129579753949SSebastien Boeuf 129679753949SSebastien Boeuf for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE - 1 { 129779753949SSebastien Boeuf ctx.recv(); 129879753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 129979753949SSebastien Boeuf // The response order should hold. The evicted response should have been the last 130079753949SSebastien Boeuf // enqueued. 130179753949SSebastien Boeuf assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 130279753949SSebastien Boeuf } 130379753949SSebastien Boeuf // There should be one more packet in the queue: the RST. 130479753949SSebastien Boeuf assert_eq!(ctx.muxer.rxq.len(), 1); 130579753949SSebastien Boeuf ctx.recv(); 130679753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 130779753949SSebastien Boeuf 130879753949SSebastien Boeuf // The queue should now be empty, but out-of-sync, so the muxer should report it has some 130979753949SSebastien Boeuf // pending RX. 131079753949SSebastien Boeuf assert!(ctx.muxer.rxq.is_empty()); 131179753949SSebastien Boeuf assert!(!ctx.muxer.rxq.is_synced()); 131279753949SSebastien Boeuf assert!(ctx.muxer.has_pending_rx()); 131379753949SSebastien Boeuf 131479753949SSebastien Boeuf // The next recv should sync the queue back up. It should also yield one of the two 131579753949SSebastien Boeuf // responses that are still left: 131679753949SSebastien Boeuf // - the one that desynchronized the queue; and 131779753949SSebastien Boeuf // - the one that got evicted by the RST. 131879753949SSebastien Boeuf ctx.recv(); 131979753949SSebastien Boeuf assert!(ctx.muxer.rxq.is_synced()); 132079753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 132179753949SSebastien Boeuf 132279753949SSebastien Boeuf assert!(ctx.muxer.has_pending_rx()); 132379753949SSebastien Boeuf ctx.recv(); 132479753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 132579753949SSebastien Boeuf } 132679753949SSebastien Boeuf 132779753949SSebastien Boeuf #[test] test_muxer_killq()132879753949SSebastien Boeuf fn test_muxer_killq() { 132979753949SSebastien Boeuf let mut ctx = MuxerTestContext::new("muxer_killq"); 133079753949SSebastien Boeuf let local_port = 1026; 133179753949SSebastien Boeuf let peer_port_first = 1025; 133279753949SSebastien Boeuf let peer_port_last = peer_port_first + defs::MUXER_KILLQ_SIZE; 133379753949SSebastien Boeuf let mut listener = ctx.create_local_listener(local_port); 133479753949SSebastien Boeuf 133579753949SSebastien Boeuf for peer_port in peer_port_first..=peer_port_last { 133679753949SSebastien Boeuf ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST); 133779753949SSebastien Boeuf ctx.send(); 133879753949SSebastien Boeuf ctx.notify_muxer(); 133979753949SSebastien Boeuf ctx.recv(); 134079753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 134179753949SSebastien Boeuf assert_eq!(ctx.pkt.src_port(), local_port); 134279753949SSebastien Boeuf assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 134379753949SSebastien Boeuf { 134479753949SSebastien Boeuf let _stream = listener.accept(); 134579753949SSebastien Boeuf } 134679753949SSebastien Boeuf ctx.notify_muxer(); 134779753949SSebastien Boeuf ctx.recv(); 134879753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN); 134979753949SSebastien Boeuf assert_eq!(ctx.pkt.src_port(), local_port); 135079753949SSebastien Boeuf assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 135179753949SSebastien Boeuf // The kill queue should be synchronized, up until the `defs::MUXER_KILLQ_SIZE`th 135279753949SSebastien Boeuf // connection we schedule for termination. 135379753949SSebastien Boeuf assert_eq!( 135479753949SSebastien Boeuf ctx.muxer.killq.is_synced(), 135579753949SSebastien Boeuf peer_port < peer_port_first + defs::MUXER_KILLQ_SIZE 135679753949SSebastien Boeuf ); 135779753949SSebastien Boeuf } 135879753949SSebastien Boeuf 135979753949SSebastien Boeuf assert!(!ctx.muxer.killq.is_synced()); 136079753949SSebastien Boeuf assert!(!ctx.muxer.has_pending_rx()); 136179753949SSebastien Boeuf 136279753949SSebastien Boeuf // Wait for the kill timers to expire. 136379753949SSebastien Boeuf std::thread::sleep(std::time::Duration::from_millis( 136479753949SSebastien Boeuf csm_defs::CONN_SHUTDOWN_TIMEOUT_MS, 136579753949SSebastien Boeuf )); 136679753949SSebastien Boeuf 136779753949SSebastien Boeuf // Trigger a kill queue sweep, by requesting a new connection. 136879753949SSebastien Boeuf ctx.init_pkt( 136979753949SSebastien Boeuf local_port, 137079753949SSebastien Boeuf peer_port_last as u32 + 1, 137179753949SSebastien Boeuf uapi::VSOCK_OP_REQUEST, 137279753949SSebastien Boeuf ); 137379753949SSebastien Boeuf ctx.send(); 137479753949SSebastien Boeuf 137579753949SSebastien Boeuf // After sweeping the kill queue, it should now be synced (assuming the RX queue is larger 137679753949SSebastien Boeuf // than the kill queue, since an RST packet will be queued for each killed connection). 137779753949SSebastien Boeuf assert!(ctx.muxer.killq.is_synced()); 137879753949SSebastien Boeuf assert!(ctx.muxer.has_pending_rx()); 137979753949SSebastien Boeuf // There should be `defs::MUXER_KILLQ_SIZE` RSTs in the RX queue, from terminating the 138079753949SSebastien Boeuf // dying connections in the recent killq sweep. 138179753949SSebastien Boeuf for _p in peer_port_first..peer_port_last { 138279753949SSebastien Boeuf ctx.recv(); 138379753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 138479753949SSebastien Boeuf assert_eq!(ctx.pkt.src_port(), local_port); 138579753949SSebastien Boeuf } 138679753949SSebastien Boeuf 138779753949SSebastien Boeuf // There should be one more packet in the RX queue: the connection response our request 138879753949SSebastien Boeuf // that triggered the kill queue sweep. 138979753949SSebastien Boeuf ctx.recv(); 139079753949SSebastien Boeuf assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 139179753949SSebastien Boeuf assert_eq!(ctx.pkt.dst_port(), peer_port_last as u32 + 1); 139279753949SSebastien Boeuf 139379753949SSebastien Boeuf assert!(!ctx.muxer.has_pending_rx()); 139479753949SSebastien Boeuf } 1395bb3cf7c3SStefano Garzarella 1396bb3cf7c3SStefano Garzarella #[test] test_regression_handshake()1397bb3cf7c3SStefano Garzarella fn test_regression_handshake() { 1398bb3cf7c3SStefano Garzarella // Address one of the issues found while fixing the following issue: 1399bb3cf7c3SStefano Garzarella // https://github.com/firecracker-microvm/firecracker/issues/1751 1400bb3cf7c3SStefano Garzarella // This test checks that the handshake message is not accounted for 1401bb3cf7c3SStefano Garzarella let mut ctx = MuxerTestContext::new("regression_handshake"); 1402bb3cf7c3SStefano Garzarella let peer_port = 1025; 1403bb3cf7c3SStefano Garzarella 1404bb3cf7c3SStefano Garzarella // Create a local connection. 1405bb3cf7c3SStefano Garzarella let (_, local_port) = ctx.local_connect(peer_port); 1406bb3cf7c3SStefano Garzarella 1407bb3cf7c3SStefano Garzarella // Get the connection from the connection map. 1408bb3cf7c3SStefano Garzarella let key = ConnMapKey { 1409bb3cf7c3SStefano Garzarella local_port, 1410bb3cf7c3SStefano Garzarella peer_port, 1411bb3cf7c3SStefano Garzarella }; 1412bb3cf7c3SStefano Garzarella let conn = ctx.muxer.conn_map.get_mut(&key).unwrap(); 1413bb3cf7c3SStefano Garzarella 1414bb3cf7c3SStefano Garzarella // Check that fwd_cnt is 0 - "OK ..." was not accounted for. 1415bb3cf7c3SStefano Garzarella assert_eq!(conn.fwd_cnt().0, 0); 1416bb3cf7c3SStefano Garzarella } 1417f756174bSStefano Garzarella 1418f756174bSStefano Garzarella #[test] test_regression_rxq_pop()1419f756174bSStefano Garzarella fn test_regression_rxq_pop() { 1420f756174bSStefano Garzarella // Address one of the issues found while fixing the following issue: 1421f756174bSStefano Garzarella // https://github.com/firecracker-microvm/firecracker/issues/1751 1422f756174bSStefano Garzarella // This test checks that a connection is not popped out of the muxer 1423f756174bSStefano Garzarella // rxq when multiple flags are set 1424f756174bSStefano Garzarella let mut ctx = MuxerTestContext::new("regression_rxq_pop"); 1425f756174bSStefano Garzarella let peer_port = 1025; 1426f756174bSStefano Garzarella let (mut stream, local_port) = ctx.local_connect(peer_port); 1427f756174bSStefano Garzarella 1428f756174bSStefano Garzarella // Send some data. 1429f756174bSStefano Garzarella let data = [5u8, 6, 7, 8]; 1430f756174bSStefano Garzarella stream.write_all(&data).unwrap(); 1431f756174bSStefano Garzarella ctx.notify_muxer(); 1432f756174bSStefano Garzarella 1433f756174bSStefano Garzarella // Get the connection from the connection map. 1434f756174bSStefano Garzarella let key = ConnMapKey { 1435f756174bSStefano Garzarella local_port, 1436f756174bSStefano Garzarella peer_port, 1437f756174bSStefano Garzarella }; 1438f756174bSStefano Garzarella let conn = ctx.muxer.conn_map.get_mut(&key).unwrap(); 1439f756174bSStefano Garzarella 1440f756174bSStefano Garzarella // Forcefully insert another flag. 1441f756174bSStefano Garzarella conn.insert_credit_update(); 1442f756174bSStefano Garzarella 1443f756174bSStefano Garzarella // Call recv twice in order to check that the connection is still 1444f756174bSStefano Garzarella // in the rxq. 1445f756174bSStefano Garzarella assert!(ctx.muxer.has_pending_rx()); 1446f756174bSStefano Garzarella ctx.recv(); 1447f756174bSStefano Garzarella assert!(ctx.muxer.has_pending_rx()); 1448f756174bSStefano Garzarella ctx.recv(); 1449f756174bSStefano Garzarella 1450f756174bSStefano Garzarella // Since initially the connection had two flags set, now there should 1451f756174bSStefano Garzarella // not be any pending RX in the muxer. 1452f756174bSStefano Garzarella assert!(!ctx.muxer.has_pending_rx()); 1453f756174bSStefano Garzarella } 145479753949SSebastien Boeuf } 1455