xref: /cloud-hypervisor/virtio-devices/src/vsock/unix/muxer.rs (revision 2571e59438597f53aa4993cd70d6462fe1364ba7)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 
5 //! `VsockMuxer` is the device-facing component of the Unix domain sockets vsock backend. I.e.
6 //! by implementing the `VsockBackend` trait, it abstracts away the gory details of translating
7 //! between AF_VSOCK and AF_UNIX, and presents a clean interface to the rest of the vsock
8 //! device model.
9 //!
10 //! The vsock muxer has two main roles:
11 //!
12 //! ## Vsock connection multiplexer
13 //!
14 //! It's the muxer's job to create, manage, and terminate `VsockConnection` objects. The
15 //! muxer also routes packets to their owning connections. It does so via a connection
16 //! `HashMap`, keyed by what is basically a (host_port, guest_port) tuple.
17 //!
18 //! Vsock packet traffic needs to be inspected, in order to detect connection request
19 //! packets (leading to the creation of a new connection), and connection reset packets
20 //! (leading to the termination of an existing connection). All other packets, though, must
21 //! belong to an existing connection and, as such, the muxer simply forwards them.
22 //!
23 //! ## Event dispatcher
24 //!
25 //! There are three event categories that the vsock backend is interested it:
26 //! 1. A new host-initiated connection is ready to be accepted from the listening host Unix
27 //!    socket;
28 //! 2. Data is available for reading from a newly-accepted host-initiated connection (i.e.
29 //!    the host is ready to issue a vsock connection request, informing us of the
30 //!    destination port to which it wants to connect);
31 //! 3. Some event was triggered for a connected Unix socket, that belongs to a
32 //!    `VsockConnection`.
33 //!
34 //! The muxer gets notified about all of these events, because, as a `VsockEpollListener`
35 //! implementor, it gets to register a nested epoll FD into the main VMM epolling loop. All
36 //! other pollable FDs are then registered under this nested epoll FD.
37 //!
38 //! To route all these events to their handlers, the muxer uses another `HashMap` object,
39 //! mapping `RawFd`s to `EpollListener`s.
40 
41 use std::collections::{HashMap, HashSet};
42 use std::fs::File;
43 use std::io::{self, Read};
44 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
45 use std::os::unix::net::{UnixListener, UnixStream};
46 
47 use super::super::csm::ConnState;
48 use super::super::defs::uapi;
49 use super::super::packet::VsockPacket;
50 use super::super::{
51     Result as VsockResult, VsockBackend, VsockChannel, VsockEpollListener, VsockError,
52 };
53 use super::defs;
54 use super::muxer_killq::MuxerKillQ;
55 use super::muxer_rxq::MuxerRxQ;
56 use super::MuxerConnection;
57 use super::{Error, Result};
58 
59 /// A unique identifier of a `MuxerConnection` object. Connections are stored in a hash map,
60 /// keyed by a `ConnMapKey` object.
61 ///
62 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
63 pub struct ConnMapKey {
64     local_port: u32,
65     peer_port: u32,
66 }
67 
68 /// A muxer RX queue item.
69 ///
70 #[derive(Clone, Copy, Debug)]
71 pub enum MuxerRx {
72     /// The packet must be fetched from the connection identified by `ConnMapKey`.
73     ConnRx(ConnMapKey),
74     /// The muxer must produce an RST packet.
75     RstPkt { local_port: u32, peer_port: u32 },
76 }
77 
78 /// An epoll listener, registered under the muxer's nested epoll FD.
79 ///
80 enum EpollListener {
81     /// The listener is a `MuxerConnection`, identified by `key`, and interested in the events
82     /// in `evset`. Since `MuxerConnection` implements `VsockEpollListener`, notifications will
83     /// be forwarded to the listener via `VsockEpollListener::notify()`.
84     Connection {
85         key: ConnMapKey,
86         evset: epoll::Events,
87     },
88     /// A listener interested in new host-initiated connections.
89     HostSock,
90     /// A listener interested in reading host "connect \<port>" commands from a freshly
91     /// connected host socket.
92     LocalStream(UnixStream),
93 }
94 
95 /// The vsock connection multiplexer.
96 ///
97 pub struct VsockMuxer {
98     /// Guest CID.
99     cid: u64,
100     /// A hash map used to store the active connections.
101     conn_map: HashMap<ConnMapKey, MuxerConnection>,
102     /// A hash map used to store epoll event listeners / handlers.
103     listener_map: HashMap<RawFd, EpollListener>,
104     /// The RX queue. Items in this queue are consumed by `VsockMuxer::recv_pkt()`, and
105     /// produced
106     /// - by `VsockMuxer::send_pkt()` (e.g. RST in response to a connection request packet);
107     ///   and
108     /// - in response to EPOLLIN events (e.g. data available to be read from an AF_UNIX
109     ///   socket).
110     rxq: MuxerRxQ,
111     /// A queue used for terminating connections that are taking too long to shut down.
112     killq: MuxerKillQ,
113     /// The Unix socket, through which host-initiated connections are accepted.
114     host_sock: UnixListener,
115     /// The file system path of the host-side Unix socket. This is used to figure out the path
116     /// to Unix sockets listening on specific ports. I.e. "\<this path>_\<port number>".
117     host_sock_path: String,
118     /// The nested epoll File, used to register epoll listeners.
119     epoll_file: File,
120     /// A hash set used to keep track of used host-side (local) ports, in order to assign local
121     /// ports to host-initiated connections.
122     local_port_set: HashSet<u32>,
123     /// The last used host-side port.
124     local_port_last: u32,
125 }
126 
127 impl VsockChannel for VsockMuxer {
128     /// Deliver a vsock packet to the guest vsock driver.
129     ///
130     /// Returns:
131     /// - `Ok(())`: `pkt` has been successfully filled in; or
132     /// - `Err(VsockError::NoData)`: there was no available data with which to fill in the
133     ///   packet.
134     ///
135     fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> {
136         // We'll look for instructions on how to build the RX packet in the RX queue. If the
137         // queue is empty, that doesn't necessarily mean we don't have any pending RX, since
138         // the queue might be out-of-sync. If that's the case, we'll attempt to sync it first,
139         // and then try to pop something out again.
140         if self.rxq.is_empty() && !self.rxq.is_synced() {
141             self.rxq = MuxerRxQ::from_conn_map(&self.conn_map);
142         }
143 
144         while let Some(rx) = self.rxq.peek() {
145             let res = match rx {
146                 // We need to build an RST packet, going from `local_port` to `peer_port`.
147                 MuxerRx::RstPkt {
148                     local_port,
149                     peer_port,
150                 } => {
151                     pkt.set_op(uapi::VSOCK_OP_RST)
152                         .set_src_cid(uapi::VSOCK_HOST_CID)
153                         .set_dst_cid(self.cid)
154                         .set_src_port(local_port)
155                         .set_dst_port(peer_port)
156                         .set_len(0)
157                         .set_type(uapi::VSOCK_TYPE_STREAM)
158                         .set_flags(0)
159                         .set_buf_alloc(0)
160                         .set_fwd_cnt(0);
161                     self.rxq.pop().unwrap();
162                     return Ok(());
163                 }
164 
165                 // We'll defer building the packet to this connection, since it has something
166                 // to say.
167                 MuxerRx::ConnRx(key) => {
168                     let mut conn_res = Err(VsockError::NoData);
169                     let mut do_pop = true;
170                     self.apply_conn_mutation(key, |conn| {
171                         conn_res = conn.recv_pkt(pkt);
172                         do_pop = !conn.has_pending_rx();
173                     });
174                     if do_pop {
175                         self.rxq.pop().unwrap();
176                     }
177                     conn_res
178                 }
179             };
180 
181             if res.is_ok() {
182                 // Inspect traffic, looking for RST packets, since that means we have to
183                 // terminate and remove this connection from the active connection pool.
184                 //
185                 if pkt.op() == uapi::VSOCK_OP_RST {
186                     self.remove_connection(ConnMapKey {
187                         local_port: pkt.src_port(),
188                         peer_port: pkt.dst_port(),
189                     });
190                 }
191 
192                 debug!("vsock muxer: RX pkt: {:?}", pkt.hdr());
193                 return Ok(());
194             }
195         }
196 
197         Err(VsockError::NoData)
198     }
199 
200     /// Deliver a guest-generated packet to its destination in the vsock backend.
201     ///
202     /// This absorbs unexpected packets, handles RSTs (by dropping connections), and forwards
203     /// all the rest to their owning `MuxerConnection`.
204     ///
205     /// Returns:
206     /// always `Ok(())` - the packet has been consumed, and its virtio TX buffers can be
207     /// returned to the guest vsock driver.
208     ///
209     fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> {
210         let conn_key = ConnMapKey {
211             local_port: pkt.dst_port(),
212             peer_port: pkt.src_port(),
213         };
214 
215         debug!(
216             "vsock: muxer.send[rxq.len={}]: {:?}",
217             self.rxq.len(),
218             pkt.hdr()
219         );
220 
221         // If this packet has an unsupported type (!=stream), we must send back an RST.
222         //
223         if pkt.type_() != uapi::VSOCK_TYPE_STREAM {
224             self.enq_rst(pkt.dst_port(), pkt.src_port());
225             return Ok(());
226         }
227 
228         // We don't know how to handle packets addressed to other CIDs. We only handle the host
229         // part of the guest - host communication here.
230         if pkt.dst_cid() != uapi::VSOCK_HOST_CID {
231             info!(
232                 "vsock: dropping guest packet for unknown CID: {:?}",
233                 pkt.hdr()
234             );
235             return Ok(());
236         }
237 
238         if !self.conn_map.contains_key(&conn_key) {
239             // This packet can't be routed to any active connection (based on its src and dst
240             // ports).  The only orphan / unroutable packets we know how to handle are
241             // connection requests.
242             if pkt.op() == uapi::VSOCK_OP_REQUEST {
243                 // Oh, this is a connection request!
244                 self.handle_peer_request_pkt(pkt);
245             } else {
246                 // Send back an RST, to let the drive know we weren't expecting this packet.
247                 self.enq_rst(pkt.dst_port(), pkt.src_port());
248             }
249             return Ok(());
250         }
251 
252         // Right, we know where to send this packet, then (to `conn_key`).
253         // However, if this is an RST, we have to forcefully terminate the connection, so
254         // there's no point in forwarding it the packet.
255         if pkt.op() == uapi::VSOCK_OP_RST {
256             self.remove_connection(conn_key);
257             return Ok(());
258         }
259 
260         // Alright, everything looks in order - forward this packet to its owning connection.
261         let mut res: VsockResult<()> = Ok(());
262         self.apply_conn_mutation(conn_key, |conn| {
263             res = conn.send_pkt(pkt);
264         });
265 
266         res
267     }
268 
269     /// Check if the muxer has any pending RX data, with which to fill a guest-provided RX
270     /// buffer.
271     ///
272     fn has_pending_rx(&self) -> bool {
273         !self.rxq.is_empty() || !self.rxq.is_synced()
274     }
275 }
276 
277 impl VsockEpollListener for VsockMuxer {
278     /// Get the FD to be registered for polling upstream (in the main VMM epoll loop, in this
279     /// case).
280     ///
281     /// This will be the muxer's nested epoll FD.
282     ///
283     fn get_polled_fd(&self) -> RawFd {
284         self.epoll_file.as_raw_fd()
285     }
286 
287     /// Get the epoll events to be polled upstream.
288     ///
289     /// Since the polled FD is a nested epoll FD, we're only interested in EPOLLIN events (i.e.
290     /// some event occurred on one of the FDs registered under our epoll FD).
291     ///
292     fn get_polled_evset(&self) -> epoll::Events {
293         epoll::Events::EPOLLIN
294     }
295 
296     /// Notify the muxer about a pending event having occurred under its nested epoll FD.
297     ///
298     fn notify(&mut self, _: epoll::Events) {
299         debug!("vsock: muxer received kick");
300 
301         let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); 32];
302         'epoll: loop {
303             match epoll::wait(self.epoll_file.as_raw_fd(), 0, epoll_events.as_mut_slice()) {
304                 Ok(ev_cnt) => {
305                     for evt in epoll_events.iter().take(ev_cnt) {
306                         self.handle_event(
307                             evt.data as RawFd,
308                             // It's ok to unwrap here, since the `evt.events` is filled
309                             // in by `epoll::wait()`, and therefore contains only valid epoll
310                             // flags.
311                             epoll::Events::from_bits(evt.events).unwrap(),
312                         );
313                     }
314                 }
315                 Err(e) => {
316                     if e.kind() == io::ErrorKind::Interrupted {
317                         // It's well defined from the epoll_wait() syscall
318                         // documentation that the epoll loop can be interrupted
319                         // before any of the requested events occurred or the
320                         // timeout expired. In both those cases, epoll_wait()
321                         // returns an error of type EINTR, but this should not
322                         // be considered as a regular error. Instead it is more
323                         // appropriate to retry, by calling into epoll_wait().
324                         continue;
325                     }
326                     warn!("vsock: failed to consume muxer epoll event: {}", e);
327                 }
328             }
329             break 'epoll;
330         }
331     }
332 }
333 
334 impl VsockBackend for VsockMuxer {}
335 
336 impl VsockMuxer {
337     /// Muxer constructor.
338     ///
339     pub fn new(cid: u64, host_sock_path: String) -> Result<Self> {
340         // Create the nested epoll FD. This FD will be added to the VMM `EpollContext`, at
341         // device activation time.
342         let epoll_fd = epoll::create(true).map_err(Error::EpollFdCreate)?;
343         // Use 'File' to enforce closing on 'epoll_fd'
344         // SAFETY: epoll_fd is a valid fd
345         let epoll_file = unsafe { File::from_raw_fd(epoll_fd) };
346 
347         // Open/bind/listen on the host Unix socket, so we can accept host-initiated
348         // connections.
349         let host_sock = UnixListener::bind(&host_sock_path)
350             .and_then(|sock| sock.set_nonblocking(true).map(|_| sock))
351             .map_err(Error::UnixBind)?;
352 
353         let mut muxer = Self {
354             cid,
355             host_sock,
356             host_sock_path,
357             epoll_file,
358             rxq: MuxerRxQ::new(),
359             conn_map: HashMap::with_capacity(defs::MAX_CONNECTIONS),
360             listener_map: HashMap::with_capacity(defs::MAX_CONNECTIONS + 1),
361             killq: MuxerKillQ::new(),
362             local_port_last: (1u32 << 30) - 1,
363             local_port_set: HashSet::with_capacity(defs::MAX_CONNECTIONS),
364         };
365 
366         muxer.add_listener(muxer.host_sock.as_raw_fd(), EpollListener::HostSock)?;
367         Ok(muxer)
368     }
369 
370     /// Handle/dispatch an epoll event to its listener.
371     ///
372     fn handle_event(&mut self, fd: RawFd, event_set: epoll::Events) {
373         debug!(
374             "vsock: muxer processing event: fd={}, event_set={:?}",
375             fd, event_set
376         );
377 
378         match self.listener_map.get_mut(&fd) {
379             // This event needs to be forwarded to a `MuxerConnection` that is listening for
380             // it.
381             //
382             Some(EpollListener::Connection { key, evset: _ }) => {
383                 let key_copy = *key;
384                 // The handling of this event will most probably mutate the state of the
385                 // receiving connection. We'll need to check for new pending RX, event set
386                 // mutation, and all that, so we're wrapping the event delivery inside those
387                 // checks.
388                 self.apply_conn_mutation(key_copy, |conn| {
389                     conn.notify(event_set);
390                 });
391             }
392 
393             // A new host-initiated connection is ready to be accepted.
394             //
395             Some(EpollListener::HostSock) => {
396                 if self.conn_map.len() == defs::MAX_CONNECTIONS {
397                     // If we're already maxed-out on connections, we'll just accept and
398                     // immediately discard this potentially new one.
399                     warn!("vsock: connection limit reached; refusing new host connection");
400                     self.host_sock.accept().map(|_| 0).unwrap_or(0);
401                     return;
402                 }
403                 self.host_sock
404                     .accept()
405                     .map_err(Error::UnixAccept)
406                     .and_then(|(stream, _)| {
407                         stream
408                             .set_nonblocking(true)
409                             .map(|_| stream)
410                             .map_err(Error::UnixAccept)
411                     })
412                     .and_then(|stream| {
413                         // Before forwarding this connection to a listening AF_VSOCK socket on
414                         // the guest side, we need to know the destination port. We'll read
415                         // that port from a "connect" command received on this socket, so the
416                         // next step is to ask to be notified the moment we can read from it.
417                         self.add_listener(stream.as_raw_fd(), EpollListener::LocalStream(stream))
418                     })
419                     .unwrap_or_else(|err| {
420                         warn!("vsock: unable to accept local connection: {:?}", err);
421                     });
422             }
423 
424             // Data is ready to be read from a host-initiated connection. That would be the
425             // "connect" command that we're expecting.
426             Some(EpollListener::LocalStream(_)) => {
427                 if let Some(EpollListener::LocalStream(mut stream)) = self.remove_listener(fd) {
428                     Self::read_local_stream_port(&mut stream)
429                         .map(|peer_port| (self.allocate_local_port(), peer_port))
430                         .and_then(|(local_port, peer_port)| {
431                             self.add_connection(
432                                 ConnMapKey {
433                                     local_port,
434                                     peer_port,
435                                 },
436                                 MuxerConnection::new_local_init(
437                                     stream,
438                                     uapi::VSOCK_HOST_CID,
439                                     self.cid,
440                                     local_port,
441                                     peer_port,
442                                 ),
443                             )
444                         })
445                         .unwrap_or_else(|err| {
446                             info!("vsock: error adding local-init connection: {:?}", err);
447                         })
448                 }
449             }
450 
451             _ => {
452                 info!(
453                     "vsock: unexpected event: fd={:?}, event_set={:?}",
454                     fd, event_set
455                 );
456             }
457         }
458     }
459 
460     /// Parse a host "connect" command, and extract the destination vsock port.
461     ///
462     fn read_local_stream_port(stream: &mut UnixStream) -> Result<u32> {
463         let mut buf = [0u8; 32];
464 
465         // This is the minimum number of bytes that we should be able to read, when parsing a
466         // valid connection request. I.e. `b"connect 0\n".len()`.
467         const MIN_READ_LEN: usize = 10;
468 
469         // Bring in the minimum number of bytes that we should be able to read.
470         stream
471             .read_exact(&mut buf[..MIN_READ_LEN])
472             .map_err(Error::UnixRead)?;
473 
474         // Now, finish reading the destination port number, by bringing in one byte at a time,
475         // until we reach an EOL terminator (or our buffer space runs out).  Yeah, not
476         // particularly proud of this approach, but it will have to do for now.
477         let mut blen = MIN_READ_LEN;
478         while buf[blen - 1] != b'\n' && blen < buf.len() {
479             stream
480                 .read_exact(&mut buf[blen..=blen])
481                 .map_err(Error::UnixRead)?;
482             blen += 1;
483         }
484 
485         let mut word_iter = std::str::from_utf8(&buf[..blen])
486             .map_err(Error::ConvertFromUtf8)?
487             .split_whitespace();
488 
489         word_iter
490             .next()
491             .ok_or(Error::InvalidPortRequest)
492             .and_then(|word| {
493                 if word.to_lowercase() == "connect" {
494                     Ok(())
495                 } else {
496                     Err(Error::InvalidPortRequest)
497                 }
498             })
499             .and_then(|_| word_iter.next().ok_or(Error::InvalidPortRequest))
500             .and_then(|word| word.parse::<u32>().map_err(Error::ParseInteger))
501             .map_err(|e| Error::ReadStreamPort(Box::new(e)))
502     }
503 
504     /// Add a new connection to the active connection pool.
505     ///
506     fn add_connection(&mut self, key: ConnMapKey, conn: MuxerConnection) -> Result<()> {
507         // We might need to make room for this new connection, so let's sweep the kill queue
508         // first.  It's fine to do this here because:
509         // - unless the kill queue is out of sync, this is a pretty inexpensive operation; and
510         // - we are under no pressure to respect any accurate timing for connection
511         //   termination.
512         self.sweep_killq();
513 
514         if self.conn_map.len() >= defs::MAX_CONNECTIONS {
515             info!(
516                 "vsock: muxer connection limit reached ({})",
517                 defs::MAX_CONNECTIONS
518             );
519             return Err(Error::TooManyConnections);
520         }
521 
522         self.add_listener(
523             conn.get_polled_fd(),
524             EpollListener::Connection {
525                 key,
526                 evset: conn.get_polled_evset(),
527             },
528         )
529         .map(|_| {
530             if conn.has_pending_rx() {
531                 // We can safely ignore any error in adding a connection RX indication. Worst
532                 // case scenario, the RX queue will get desynchronized, but we'll handle that
533                 // the next time we need to yield an RX packet.
534                 self.rxq.push(MuxerRx::ConnRx(key));
535             }
536             self.conn_map.insert(key, conn);
537         })
538     }
539 
540     /// Remove a connection from the active connection poll.
541     ///
542     fn remove_connection(&mut self, key: ConnMapKey) {
543         if let Some(conn) = self.conn_map.remove(&key) {
544             self.remove_listener(conn.get_polled_fd());
545         }
546         self.free_local_port(key.local_port);
547     }
548 
549     /// Schedule a connection for immediate termination.
550     /// I.e. as soon as we can also let our peer know we're dropping the connection, by sending
551     /// it an RST packet.
552     ///
553     fn kill_connection(&mut self, key: ConnMapKey) {
554         let mut had_rx = false;
555         self.conn_map.entry(key).and_modify(|conn| {
556             had_rx = conn.has_pending_rx();
557             conn.kill();
558         });
559         // This connection will now have an RST packet to yield, so we need to add it to the RX
560         // queue.  However, there's no point in doing that if it was already in the queue.
561         if !had_rx {
562             // We can safely ignore any error in adding a connection RX indication. Worst case
563             // scenario, the RX queue will get desynchronized, but we'll handle that the next
564             // time we need to yield an RX packet.
565             self.rxq.push(MuxerRx::ConnRx(key));
566         }
567     }
568 
569     /// Register a new epoll listener under the muxer's nested epoll FD.
570     ///
571     fn add_listener(&mut self, fd: RawFd, listener: EpollListener) -> Result<()> {
572         let evset = match listener {
573             EpollListener::Connection { evset, .. } => evset,
574             EpollListener::LocalStream(_) => epoll::Events::EPOLLIN,
575             EpollListener::HostSock => epoll::Events::EPOLLIN,
576         };
577 
578         epoll::ctl(
579             self.epoll_file.as_raw_fd(),
580             epoll::ControlOptions::EPOLL_CTL_ADD,
581             fd,
582             epoll::Event::new(evset, fd as u64),
583         )
584         .map(|_| {
585             self.listener_map.insert(fd, listener);
586         })
587         .map_err(Error::EpollAdd)?;
588 
589         Ok(())
590     }
591 
592     /// Remove (and return) a previously registered epoll listener.
593     ///
594     fn remove_listener(&mut self, fd: RawFd) -> Option<EpollListener> {
595         let maybe_listener = self.listener_map.remove(&fd);
596 
597         if maybe_listener.is_some() {
598             epoll::ctl(
599                 self.epoll_file.as_raw_fd(),
600                 epoll::ControlOptions::EPOLL_CTL_DEL,
601                 fd,
602                 epoll::Event::new(epoll::Events::empty(), 0),
603             )
604             .unwrap_or_else(|err| {
605                 warn!(
606                     "vosck muxer: error removing epoll listener for fd {:?}: {:?}",
607                     fd, err
608                 );
609             });
610         }
611 
612         maybe_listener
613     }
614 
615     /// Allocate a host-side port to be assigned to a new host-initiated connection.
616     ///
617     ///
618     fn allocate_local_port(&mut self) -> u32 {
619         // TODO: this doesn't seem very space-efficient.
620         // Maybe rewrite this to limit port range and use a bitmap?
621         //
622 
623         loop {
624             self.local_port_last = (self.local_port_last + 1) & !(1 << 31) | (1 << 30);
625             if self.local_port_set.insert(self.local_port_last) {
626                 break;
627             }
628         }
629         self.local_port_last
630     }
631 
632     /// Mark a previously used host-side port as free.
633     ///
634     fn free_local_port(&mut self, port: u32) {
635         self.local_port_set.remove(&port);
636     }
637 
638     /// Handle a new connection request coming from our peer (the guest vsock driver).
639     ///
640     /// This will attempt to connect to a host-side Unix socket, expected to be listening at
641     /// the file system path corresponding to the destination port. If successful, a new
642     /// connection object will be created and added to the connection pool. On failure, a new
643     /// RST packet will be scheduled for delivery to the guest.
644     ///
645     fn handle_peer_request_pkt(&mut self, pkt: &VsockPacket) {
646         let port_path = format!("{}_{}", self.host_sock_path, pkt.dst_port());
647 
648         UnixStream::connect(port_path)
649             .and_then(|stream| stream.set_nonblocking(true).map(|_| stream))
650             .map_err(Error::UnixConnect)
651             .and_then(|stream| {
652                 self.add_connection(
653                     ConnMapKey {
654                         local_port: pkt.dst_port(),
655                         peer_port: pkt.src_port(),
656                     },
657                     MuxerConnection::new_peer_init(
658                         stream,
659                         uapi::VSOCK_HOST_CID,
660                         self.cid,
661                         pkt.dst_port(),
662                         pkt.src_port(),
663                         pkt.buf_alloc(),
664                     ),
665                 )
666             })
667             .unwrap_or_else(|_| self.enq_rst(pkt.dst_port(), pkt.src_port()));
668     }
669 
670     /// Perform an action that might mutate a connection's state.
671     ///
672     /// This is used as shorthand for repetitive tasks that need to be performed after a
673     /// connection object mutates. E.g.
674     /// - update the connection's epoll listener;
675     /// - schedule the connection to be queried for RX data;
676     /// - kill the connection if an unrecoverable error occurs.
677     ///
678     fn apply_conn_mutation<F>(&mut self, key: ConnMapKey, mut_fn: F)
679     where
680         F: FnOnce(&mut MuxerConnection),
681     {
682         if let Some(conn) = self.conn_map.get_mut(&key) {
683             let had_rx = conn.has_pending_rx();
684             let was_expiring = conn.will_expire();
685             let prev_state = conn.state();
686 
687             mut_fn(conn);
688 
689             // If this is a host-initiated connection that has just become established, we'll have
690             // to send an ack message to the host end.
691             if prev_state == ConnState::LocalInit && conn.state() == ConnState::Established {
692                 let msg = format!("OK {}\n", key.local_port);
693                 match conn.send_bytes_raw(msg.as_bytes()) {
694                     Ok(written) if written == msg.len() => (),
695                     Ok(_) => {
696                         // If we can't write a dozen bytes to a pristine connection something
697                         // must be really wrong. Killing it.
698                         conn.kill();
699                         warn!("vsock: unable to fully write connection ack msg.");
700                     }
701                     Err(err) => {
702                         conn.kill();
703                         warn!("vsock: unable to ack host connection: {:?}", err);
704                     }
705                 };
706             }
707 
708             // If the connection wasn't previously scheduled for RX, add it to our RX queue.
709             if !had_rx && conn.has_pending_rx() {
710                 self.rxq.push(MuxerRx::ConnRx(key));
711             }
712 
713             // If the connection wasn't previously scheduled for termination, add it to the
714             // kill queue.
715             if !was_expiring && conn.will_expire() {
716                 // It's safe to unwrap here, since `conn.will_expire()` already guaranteed that
717                 // an `conn.expiry` is available.
718                 self.killq.push(key, conn.expiry().unwrap());
719             }
720 
721             let fd = conn.get_polled_fd();
722             let new_evset = conn.get_polled_evset();
723             if new_evset.is_empty() {
724                 // If the connection no longer needs epoll notifications, remove its listener
725                 // from our list.
726                 self.remove_listener(fd);
727                 return;
728             }
729             if let Some(EpollListener::Connection { evset, .. }) = self.listener_map.get_mut(&fd) {
730                 if *evset != new_evset {
731                     // If the set of events that the connection is interested in has changed,
732                     // we need to update its epoll listener.
733                     debug!(
734                         "vsock: updating listener for (lp={}, pp={}): old={:?}, new={:?}",
735                         key.local_port, key.peer_port, *evset, new_evset
736                     );
737 
738                     *evset = new_evset;
739                     epoll::ctl(
740                         self.epoll_file.as_raw_fd(),
741                         epoll::ControlOptions::EPOLL_CTL_MOD,
742                         fd,
743                         epoll::Event::new(new_evset, fd as u64),
744                     )
745                     .unwrap_or_else(|err| {
746                         // This really shouldn't happen, like, ever. However, "famous last
747                         // words" and all that, so let's just kill it with fire, and walk away.
748                         self.kill_connection(key);
749                         error!(
750                             "vsock: error updating epoll listener for (lp={}, pp={}): {:?}",
751                             key.local_port, key.peer_port, err
752                         );
753                     });
754                 }
755             } else {
756                 // The connection had previously asked to be removed from the listener map (by
757                 // returning an empty event set via `get_polled_fd()`), but now wants back in.
758                 self.add_listener(
759                     fd,
760                     EpollListener::Connection {
761                         key,
762                         evset: new_evset,
763                     },
764                 )
765                 .unwrap_or_else(|err| {
766                     self.kill_connection(key);
767                     error!(
768                         "vsock: error updating epoll listener for (lp={}, pp={}): {:?}",
769                         key.local_port, key.peer_port, err
770                     );
771                 });
772             }
773         }
774     }
775 
776     /// Check if any connections have timed out, and if so, schedule them for immediate
777     /// termination.
778     ///
779     fn sweep_killq(&mut self) {
780         while let Some(key) = self.killq.pop() {
781             // Connections don't get removed from the kill queue when their kill timer is
782             // disarmed, since that would be a costly operation. This means we must check if
783             // the connection has indeed expired, prior to killing it.
784             let mut kill = false;
785             self.conn_map
786                 .entry(key)
787                 .and_modify(|conn| kill = conn.has_expired());
788             if kill {
789                 self.kill_connection(key);
790             }
791         }
792 
793         if self.killq.is_empty() && !self.killq.is_synced() {
794             self.killq = MuxerKillQ::from_conn_map(&self.conn_map);
795             // If we've just re-created the kill queue, we can sweep it again; maybe there's
796             // more to kill.
797             self.sweep_killq();
798         }
799     }
800 
801     /// Enqueue an RST packet into `self.rxq`.
802     ///
803     /// Enqueue errors aren't propagated up the call chain, since there is nothing we can do to
804     /// handle them. We do, however, log a warning, since not being able to enqueue an RST
805     /// packet means we have to drop it, which is not normal operation.
806     ///
807     fn enq_rst(&mut self, local_port: u32, peer_port: u32) {
808         let pushed = self.rxq.push(MuxerRx::RstPkt {
809             local_port,
810             peer_port,
811         });
812         if !pushed {
813             warn!(
814                 "vsock: muxer.rxq full; dropping RST packet for lp={}, pp={}",
815                 local_port, peer_port
816             );
817         }
818     }
819 }
820 
821 #[cfg(test)]
822 mod tests {
823     use std::io::{Read, Write};
824     use std::ops::Drop;
825     use std::os::unix::net::{UnixListener, UnixStream};
826     use std::path::{Path, PathBuf};
827 
828     use virtio_queue::QueueOwnedT;
829 
830     use super::super::super::csm::defs as csm_defs;
831     use super::super::super::tests::TestContext as VsockTestContext;
832     use super::*;
833 
834     const PEER_CID: u64 = 3;
835     const PEER_BUF_ALLOC: u32 = 64 * 1024;
836 
837     struct MuxerTestContext {
838         _vsock_test_ctx: VsockTestContext,
839         pkt: VsockPacket,
840         muxer: VsockMuxer,
841     }
842 
843     impl Drop for MuxerTestContext {
844         fn drop(&mut self) {
845             std::fs::remove_file(self.muxer.host_sock_path.as_str()).unwrap();
846         }
847     }
848 
849     impl MuxerTestContext {
850         fn new(name: &str) -> Self {
851             let vsock_test_ctx = VsockTestContext::new();
852             let mut handler_ctx = vsock_test_ctx.create_epoll_handler_context();
853             let pkt = VsockPacket::from_rx_virtq_head(
854                 &mut handler_ctx.handler.queues[0]
855                     .iter(&vsock_test_ctx.mem)
856                     .unwrap()
857                     .next()
858                     .unwrap(),
859                 None,
860             )
861             .unwrap();
862             let uds_path = format!("test_vsock_{name}.sock");
863             let muxer = VsockMuxer::new(PEER_CID, uds_path).unwrap();
864 
865             Self {
866                 _vsock_test_ctx: vsock_test_ctx,
867                 pkt,
868                 muxer,
869             }
870         }
871 
872         fn init_pkt(&mut self, local_port: u32, peer_port: u32, op: u16) -> &mut VsockPacket {
873             for b in self.pkt.hdr_mut() {
874                 *b = 0;
875             }
876             self.pkt
877                 .set_type(uapi::VSOCK_TYPE_STREAM)
878                 .set_src_cid(PEER_CID)
879                 .set_dst_cid(uapi::VSOCK_HOST_CID)
880                 .set_src_port(peer_port)
881                 .set_dst_port(local_port)
882                 .set_op(op)
883                 .set_buf_alloc(PEER_BUF_ALLOC)
884         }
885 
886         fn init_data_pkt(
887             &mut self,
888             local_port: u32,
889             peer_port: u32,
890             data: &[u8],
891         ) -> &mut VsockPacket {
892             assert!(data.len() <= self.pkt.buf().unwrap().len());
893             self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RW)
894                 .set_len(data.len() as u32);
895             self.pkt.buf_mut().unwrap()[..data.len()].copy_from_slice(data);
896             &mut self.pkt
897         }
898 
899         fn send(&mut self) {
900             self.muxer.send_pkt(&self.pkt).unwrap();
901         }
902 
903         fn recv(&mut self) {
904             self.muxer.recv_pkt(&mut self.pkt).unwrap();
905         }
906 
907         fn notify_muxer(&mut self) {
908             self.muxer.notify(epoll::Events::EPOLLIN);
909         }
910 
911         fn count_epoll_listeners(&self) -> (usize, usize) {
912             let mut local_lsn_count = 0usize;
913             let mut conn_lsn_count = 0usize;
914             for key in self.muxer.listener_map.values() {
915                 match key {
916                     EpollListener::LocalStream(_) => local_lsn_count += 1,
917                     EpollListener::Connection { .. } => conn_lsn_count += 1,
918                     _ => (),
919                 };
920             }
921             (local_lsn_count, conn_lsn_count)
922         }
923 
924         fn create_local_listener(&self, port: u32) -> LocalListener {
925             LocalListener::new(format!("{}_{}", self.muxer.host_sock_path, port))
926         }
927 
928         fn local_connect(&mut self, peer_port: u32) -> (UnixStream, u32) {
929             let (init_local_lsn_count, init_conn_lsn_count) = self.count_epoll_listeners();
930 
931             let mut stream = UnixStream::connect(self.muxer.host_sock_path.clone()).unwrap();
932             stream.set_nonblocking(true).unwrap();
933             // The muxer would now get notified of a new connection having arrived at its Unix
934             // socket, so it can accept it.
935             self.notify_muxer();
936 
937             // Just after having accepted a new local connection, the muxer should've added a new
938             // `LocalStream` listener to its `listener_map`.
939             let (local_lsn_count, _) = self.count_epoll_listeners();
940             assert_eq!(local_lsn_count, init_local_lsn_count + 1);
941 
942             let buf = format!("CONNECT {peer_port}\n");
943             stream.write_all(buf.as_bytes()).unwrap();
944             // The muxer would now get notified that data is available for reading from the locally
945             // initiated connection.
946             self.notify_muxer();
947 
948             // Successfully reading and parsing the connection request should have removed the
949             // LocalStream epoll listener and added a Connection epoll listener.
950             let (local_lsn_count, conn_lsn_count) = self.count_epoll_listeners();
951             assert_eq!(local_lsn_count, init_local_lsn_count);
952             assert_eq!(conn_lsn_count, init_conn_lsn_count + 1);
953 
954             // A LocalInit connection should've been added to the muxer connection map.  A new
955             // local port should also have been allocated for the new LocalInit connection.
956             let local_port = self.muxer.local_port_last;
957             let key = ConnMapKey {
958                 local_port,
959                 peer_port,
960             };
961             assert!(self.muxer.conn_map.contains_key(&key));
962             assert!(self.muxer.local_port_set.contains(&local_port));
963 
964             // A connection request for the peer should now be available from the muxer.
965             assert!(self.muxer.has_pending_rx());
966             self.recv();
967             assert_eq!(self.pkt.op(), uapi::VSOCK_OP_REQUEST);
968             assert_eq!(self.pkt.dst_port(), peer_port);
969             assert_eq!(self.pkt.src_port(), local_port);
970 
971             self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RESPONSE);
972             self.send();
973 
974             let mut buf = [0u8; 32];
975             let len = stream.read(&mut buf[..]).unwrap();
976             assert_eq!(&buf[..len], format!("OK {local_port}\n").as_bytes());
977 
978             (stream, local_port)
979         }
980     }
981 
982     struct LocalListener {
983         path: PathBuf,
984         sock: UnixListener,
985     }
986     impl LocalListener {
987         fn new<P: AsRef<Path> + Clone>(path: P) -> Self {
988             let path_buf = path.as_ref().to_path_buf();
989             let sock = UnixListener::bind(path).unwrap();
990             sock.set_nonblocking(true).unwrap();
991             Self {
992                 path: path_buf,
993                 sock,
994             }
995         }
996         fn accept(&mut self) -> UnixStream {
997             let (stream, _) = self.sock.accept().unwrap();
998             stream.set_nonblocking(true).unwrap();
999             stream
1000         }
1001     }
1002     impl Drop for LocalListener {
1003         fn drop(&mut self) {
1004             std::fs::remove_file(&self.path).unwrap();
1005         }
1006     }
1007 
1008     #[test]
1009     fn test_muxer_epoll_listener() {
1010         let ctx = MuxerTestContext::new("muxer_epoll_listener");
1011         assert_eq!(ctx.muxer.get_polled_fd(), ctx.muxer.epoll_file.as_raw_fd());
1012         assert_eq!(ctx.muxer.get_polled_evset(), epoll::Events::EPOLLIN);
1013     }
1014 
1015     #[test]
1016     fn test_bad_peer_pkt() {
1017         const LOCAL_PORT: u32 = 1026;
1018         const PEER_PORT: u32 = 1025;
1019         const SOCK_DGRAM: u16 = 2;
1020 
1021         let mut ctx = MuxerTestContext::new("bad_peer_pkt");
1022         ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST)
1023             .set_type(SOCK_DGRAM);
1024         ctx.send();
1025 
1026         // The guest sent a SOCK_DGRAM packet. Per the vsock spec, we need to reply with an RST
1027         // packet, since vsock only supports stream sockets.
1028         assert!(ctx.muxer.has_pending_rx());
1029         ctx.recv();
1030         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1031         assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID);
1032         assert_eq!(ctx.pkt.dst_cid(), PEER_CID);
1033         assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
1034         assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
1035 
1036         // Any orphan (i.e. without a connection), non-RST packet, should be replied to with an
1037         // RST.
1038         let bad_ops = [
1039             uapi::VSOCK_OP_RESPONSE,
1040             uapi::VSOCK_OP_CREDIT_REQUEST,
1041             uapi::VSOCK_OP_CREDIT_UPDATE,
1042             uapi::VSOCK_OP_SHUTDOWN,
1043             uapi::VSOCK_OP_RW,
1044         ];
1045         for op in bad_ops.iter() {
1046             ctx.init_pkt(LOCAL_PORT, PEER_PORT, *op);
1047             ctx.send();
1048             assert!(ctx.muxer.has_pending_rx());
1049             ctx.recv();
1050             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1051             assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
1052             assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
1053         }
1054 
1055         // Any packet addressed to anything other than VSOCK_VHOST_CID should get dropped.
1056         assert!(!ctx.muxer.has_pending_rx());
1057         ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST)
1058             .set_dst_cid(uapi::VSOCK_HOST_CID + 1);
1059         ctx.send();
1060         assert!(!ctx.muxer.has_pending_rx());
1061     }
1062 
1063     #[test]
1064     fn test_peer_connection() {
1065         const LOCAL_PORT: u32 = 1026;
1066         const PEER_PORT: u32 = 1025;
1067 
1068         let mut ctx = MuxerTestContext::new("peer_connection");
1069 
1070         // Test peer connection refused.
1071         ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST);
1072         ctx.send();
1073         ctx.recv();
1074         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1075         assert_eq!(ctx.pkt.len(), 0);
1076         assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID);
1077         assert_eq!(ctx.pkt.dst_cid(), PEER_CID);
1078         assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
1079         assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
1080 
1081         // Test peer connection accepted.
1082         let mut listener = ctx.create_local_listener(LOCAL_PORT);
1083         ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST);
1084         ctx.send();
1085         assert_eq!(ctx.muxer.conn_map.len(), 1);
1086         let mut stream = listener.accept();
1087         ctx.recv();
1088         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
1089         assert_eq!(ctx.pkt.len(), 0);
1090         assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID);
1091         assert_eq!(ctx.pkt.dst_cid(), PEER_CID);
1092         assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
1093         assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
1094         let key = ConnMapKey {
1095             local_port: LOCAL_PORT,
1096             peer_port: PEER_PORT,
1097         };
1098         assert!(ctx.muxer.conn_map.contains_key(&key));
1099 
1100         // Test guest -> host data flow.
1101         let data = [1, 2, 3, 4];
1102         ctx.init_data_pkt(LOCAL_PORT, PEER_PORT, &data);
1103         ctx.send();
1104         let mut buf = vec![0; data.len()];
1105         stream.read_exact(buf.as_mut_slice()).unwrap();
1106         assert_eq!(buf.as_slice(), data);
1107 
1108         // Test host -> guest data flow.
1109         let data = [5u8, 6, 7, 8];
1110         stream.write_all(&data).unwrap();
1111 
1112         // When data is available on the local stream, an EPOLLIN event would normally be delivered
1113         // to the muxer's nested epoll FD. For testing only, we can fake that event notification
1114         // here.
1115         ctx.notify_muxer();
1116         // After being notified, the muxer should've figured out that RX data was available for one
1117         // of its connections, so it should now be reporting that it can fill in an RX packet.
1118         assert!(ctx.muxer.has_pending_rx());
1119         ctx.recv();
1120         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW);
1121         assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data);
1122         assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
1123         assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
1124 
1125         assert!(!ctx.muxer.has_pending_rx());
1126     }
1127 
1128     #[test]
1129     fn test_local_connection() {
1130         let mut ctx = MuxerTestContext::new("local_connection");
1131         let peer_port = 1025;
1132         let (mut stream, local_port) = ctx.local_connect(peer_port);
1133 
1134         // Test guest -> host data flow.
1135         let data = [1, 2, 3, 4];
1136         ctx.init_data_pkt(local_port, peer_port, &data);
1137         ctx.send();
1138 
1139         let mut buf = vec![0u8; data.len()];
1140         stream.read_exact(buf.as_mut_slice()).unwrap();
1141         assert_eq!(buf.as_slice(), &data);
1142 
1143         // Test host -> guest data flow.
1144         let data = [5, 6, 7, 8];
1145         stream.write_all(&data).unwrap();
1146         ctx.notify_muxer();
1147 
1148         assert!(ctx.muxer.has_pending_rx());
1149         ctx.recv();
1150         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW);
1151         assert_eq!(ctx.pkt.src_port(), local_port);
1152         assert_eq!(ctx.pkt.dst_port(), peer_port);
1153         assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data);
1154     }
1155 
1156     #[test]
1157     fn test_local_close() {
1158         let peer_port = 1025;
1159         let mut ctx = MuxerTestContext::new("local_close");
1160         let local_port;
1161         {
1162             let (_stream, local_port_) = ctx.local_connect(peer_port);
1163             local_port = local_port_;
1164         }
1165         // Local var `_stream` was now dropped, thus closing the local stream. After the muxer gets
1166         // notified via EPOLLIN, it should attempt to gracefully shutdown the connection, issuing a
1167         // VSOCK_OP_SHUTDOWN with both no-more-send and no-more-recv indications set.
1168         ctx.notify_muxer();
1169         assert!(ctx.muxer.has_pending_rx());
1170         ctx.recv();
1171         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN);
1172         assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND, 0);
1173         assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV, 0);
1174         assert_eq!(ctx.pkt.src_port(), local_port);
1175         assert_eq!(ctx.pkt.dst_port(), peer_port);
1176 
1177         // The connection should get removed (and its local port freed), after the peer replies
1178         // with an RST.
1179         ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RST);
1180         ctx.send();
1181         let key = ConnMapKey {
1182             local_port,
1183             peer_port,
1184         };
1185         assert!(!ctx.muxer.conn_map.contains_key(&key));
1186         assert!(!ctx.muxer.local_port_set.contains(&local_port));
1187     }
1188 
1189     #[test]
1190     fn test_peer_close() {
1191         let peer_port = 1025;
1192         let local_port = 1026;
1193         let mut ctx = MuxerTestContext::new("peer_close");
1194 
1195         let mut sock = ctx.create_local_listener(local_port);
1196         ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_REQUEST);
1197         ctx.send();
1198         let mut stream = sock.accept();
1199 
1200         assert!(ctx.muxer.has_pending_rx());
1201         ctx.recv();
1202         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
1203         assert_eq!(ctx.pkt.src_port(), local_port);
1204         assert_eq!(ctx.pkt.dst_port(), peer_port);
1205         let key = ConnMapKey {
1206             local_port,
1207             peer_port,
1208         };
1209         assert!(ctx.muxer.conn_map.contains_key(&key));
1210 
1211         // Emulate a full shutdown from the peer (no-more-send + no-more-recv).
1212         ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_SHUTDOWN)
1213             .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND)
1214             .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV);
1215         ctx.send();
1216 
1217         // Now, the muxer should remove the connection from its map, and reply with an RST.
1218         assert!(ctx.muxer.has_pending_rx());
1219         ctx.recv();
1220         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1221         assert_eq!(ctx.pkt.src_port(), local_port);
1222         assert_eq!(ctx.pkt.dst_port(), peer_port);
1223         let key = ConnMapKey {
1224             local_port,
1225             peer_port,
1226         };
1227         assert!(!ctx.muxer.conn_map.contains_key(&key));
1228 
1229         // The muxer should also drop / close the local Unix socket for this connection.
1230         let mut buf = vec![0u8; 16];
1231         assert_eq!(stream.read(buf.as_mut_slice()).unwrap(), 0);
1232     }
1233 
1234     #[test]
1235     fn test_muxer_rxq() {
1236         let mut ctx = MuxerTestContext::new("muxer_rxq");
1237         let local_port = 1026;
1238         let peer_port_first = 1025;
1239         let mut listener = ctx.create_local_listener(local_port);
1240         let mut streams: Vec<UnixStream> = Vec::new();
1241 
1242         for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE {
1243             ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST);
1244             ctx.send();
1245             streams.push(listener.accept());
1246         }
1247 
1248         // The muxer RX queue should now be full (with connection responses), but still
1249         // synchronized.
1250         assert!(ctx.muxer.rxq.is_synced());
1251 
1252         // One more queued reply should desync the RX queue.
1253         ctx.init_pkt(
1254             local_port,
1255             (peer_port_first + defs::MUXER_RXQ_SIZE) as u32,
1256             uapi::VSOCK_OP_REQUEST,
1257         );
1258         ctx.send();
1259         assert!(!ctx.muxer.rxq.is_synced());
1260 
1261         // With an out-of-sync queue, an RST should evict any non-RST packet from the queue, and
1262         // take its place. We'll check that by making sure that the last packet popped from the
1263         // queue is an RST.
1264         ctx.init_pkt(
1265             local_port + 1,
1266             peer_port_first as u32,
1267             uapi::VSOCK_OP_REQUEST,
1268         );
1269         ctx.send();
1270 
1271         for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE - 1 {
1272             ctx.recv();
1273             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
1274             // The response order should hold. The evicted response should have been the last
1275             // enqueued.
1276             assert_eq!(ctx.pkt.dst_port(), peer_port as u32);
1277         }
1278         // There should be one more packet in the queue: the RST.
1279         assert_eq!(ctx.muxer.rxq.len(), 1);
1280         ctx.recv();
1281         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1282 
1283         // The queue should now be empty, but out-of-sync, so the muxer should report it has some
1284         // pending RX.
1285         assert!(ctx.muxer.rxq.is_empty());
1286         assert!(!ctx.muxer.rxq.is_synced());
1287         assert!(ctx.muxer.has_pending_rx());
1288 
1289         // The next recv should sync the queue back up. It should also yield one of the two
1290         // responses that are still left:
1291         // - the one that desynchronized the queue; and
1292         // - the one that got evicted by the RST.
1293         ctx.recv();
1294         assert!(ctx.muxer.rxq.is_synced());
1295         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
1296 
1297         assert!(ctx.muxer.has_pending_rx());
1298         ctx.recv();
1299         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
1300     }
1301 
1302     #[test]
1303     fn test_muxer_killq() {
1304         let mut ctx = MuxerTestContext::new("muxer_killq");
1305         let local_port = 1026;
1306         let peer_port_first = 1025;
1307         let peer_port_last = peer_port_first + defs::MUXER_KILLQ_SIZE;
1308         let mut listener = ctx.create_local_listener(local_port);
1309 
1310         for peer_port in peer_port_first..=peer_port_last {
1311             ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST);
1312             ctx.send();
1313             ctx.notify_muxer();
1314             ctx.recv();
1315             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
1316             assert_eq!(ctx.pkt.src_port(), local_port);
1317             assert_eq!(ctx.pkt.dst_port(), peer_port as u32);
1318             {
1319                 let _stream = listener.accept();
1320             }
1321             ctx.notify_muxer();
1322             ctx.recv();
1323             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN);
1324             assert_eq!(ctx.pkt.src_port(), local_port);
1325             assert_eq!(ctx.pkt.dst_port(), peer_port as u32);
1326             // The kill queue should be synchronized, up until the `defs::MUXER_KILLQ_SIZE`th
1327             // connection we schedule for termination.
1328             assert_eq!(
1329                 ctx.muxer.killq.is_synced(),
1330                 peer_port < peer_port_first + defs::MUXER_KILLQ_SIZE
1331             );
1332         }
1333 
1334         assert!(!ctx.muxer.killq.is_synced());
1335         assert!(!ctx.muxer.has_pending_rx());
1336 
1337         // Wait for the kill timers to expire.
1338         std::thread::sleep(std::time::Duration::from_millis(
1339             csm_defs::CONN_SHUTDOWN_TIMEOUT_MS,
1340         ));
1341 
1342         // Trigger a kill queue sweep, by requesting a new connection.
1343         ctx.init_pkt(
1344             local_port,
1345             peer_port_last as u32 + 1,
1346             uapi::VSOCK_OP_REQUEST,
1347         );
1348         ctx.send();
1349 
1350         // After sweeping the kill queue, it should now be synced (assuming the RX queue is larger
1351         // than the kill queue, since an RST packet will be queued for each killed connection).
1352         assert!(ctx.muxer.killq.is_synced());
1353         assert!(ctx.muxer.has_pending_rx());
1354         // There should be `defs::MUXER_KILLQ_SIZE` RSTs in the RX queue, from terminating the
1355         // dying connections in the recent killq sweep.
1356         for _p in peer_port_first..peer_port_last {
1357             ctx.recv();
1358             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1359             assert_eq!(ctx.pkt.src_port(), local_port);
1360         }
1361 
1362         // There should be one more packet in the RX queue: the connection response our request
1363         // that triggered the kill queue sweep.
1364         ctx.recv();
1365         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
1366         assert_eq!(ctx.pkt.dst_port(), peer_port_last as u32 + 1);
1367 
1368         assert!(!ctx.muxer.has_pending_rx());
1369     }
1370 
1371     #[test]
1372     fn test_regression_handshake() {
1373         // Address one of the issues found while fixing the following issue:
1374         // https://github.com/firecracker-microvm/firecracker/issues/1751
1375         // This test checks that the handshake message is not accounted for
1376         let mut ctx = MuxerTestContext::new("regression_handshake");
1377         let peer_port = 1025;
1378 
1379         // Create a local connection.
1380         let (_, local_port) = ctx.local_connect(peer_port);
1381 
1382         // Get the connection from the connection map.
1383         let key = ConnMapKey {
1384             local_port,
1385             peer_port,
1386         };
1387         let conn = ctx.muxer.conn_map.get_mut(&key).unwrap();
1388 
1389         // Check that fwd_cnt is 0 - "OK ..." was not accounted for.
1390         assert_eq!(conn.fwd_cnt().0, 0);
1391     }
1392 
1393     #[test]
1394     fn test_regression_rxq_pop() {
1395         // Address one of the issues found while fixing the following issue:
1396         // https://github.com/firecracker-microvm/firecracker/issues/1751
1397         // This test checks that a connection is not popped out of the muxer
1398         // rxq when multiple flags are set
1399         let mut ctx = MuxerTestContext::new("regression_rxq_pop");
1400         let peer_port = 1025;
1401         let (mut stream, local_port) = ctx.local_connect(peer_port);
1402 
1403         // Send some data.
1404         let data = [5u8, 6, 7, 8];
1405         stream.write_all(&data).unwrap();
1406         ctx.notify_muxer();
1407 
1408         // Get the connection from the connection map.
1409         let key = ConnMapKey {
1410             local_port,
1411             peer_port,
1412         };
1413         let conn = ctx.muxer.conn_map.get_mut(&key).unwrap();
1414 
1415         // Forcefully insert another flag.
1416         conn.insert_credit_update();
1417 
1418         // Call recv twice in order to check that the connection is still
1419         // in the rxq.
1420         assert!(ctx.muxer.has_pending_rx());
1421         ctx.recv();
1422         assert!(ctx.muxer.has_pending_rx());
1423         ctx.recv();
1424 
1425         // Since initially the connection had two flags set, now there should
1426         // not be any pending RX in the muxer.
1427         assert!(!ctx.muxer.has_pending_rx());
1428     }
1429 }
1430