xref: /cloud-hypervisor/virtio-devices/src/vsock/unix/muxer.rs (revision 6f8bd27cf7629733582d930519e98d19e90afb16)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 
5 /// `VsockMuxer` is the device-facing component of the Unix domain sockets vsock backend. I.e.
6 /// by implementing the `VsockBackend` trait, it abstracts away the gory details of translating
7 /// between AF_VSOCK and AF_UNIX, and presents a clean interface to the rest of the vsock
8 /// device model.
9 ///
10 /// The vsock muxer has two main roles:
11 /// 1. Vsock connection multiplexer:
12 ///    It's the muxer's job to create, manage, and terminate `VsockConnection` objects. The
13 ///    muxer also routes packets to their owning connections. It does so via a connection
14 ///    `HashMap`, keyed by what is basically a (host_port, guest_port) tuple.
15 ///    Vsock packet traffic needs to be inspected, in order to detect connection request
16 ///    packets (leading to the creation of a new connection), and connection reset packets
17 ///    (leading to the termination of an existing connection). All other packets, though, must
18 ///    belong to an existing connection and, as such, the muxer simply forwards them.
19 /// 2. Event dispatcher
20 ///    There are three event categories that the vsock backend is interested it:
21 ///    1. A new host-initiated connection is ready to be accepted from the listening host Unix
22 ///       socket;
23 ///    2. Data is available for reading from a newly-accepted host-initiated connection (i.e.
24 ///       the host is ready to issue a vsock connection request, informing us of the
25 ///       destination port to which it wants to connect);
26 ///    3. Some event was triggered for a connected Unix socket, that belongs to a
27 ///       `VsockConnection`.
28 ///    The muxer gets notified about all of these events, because, as a `VsockEpollListener`
29 ///    implementor, it gets to register a nested epoll FD into the main VMM epolling loop. All
30 ///    other pollable FDs are then registered under this nested epoll FD.
31 ///    To route all these events to their handlers, the muxer uses another `HashMap` object,
32 ///    mapping `RawFd`s to `EpollListener`s.
33 ///
34 use std::collections::{HashMap, HashSet};
35 use std::fs::File;
36 use std::io::{self, Read};
37 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
38 use std::os::unix::net::{UnixListener, UnixStream};
39 
40 use super::super::csm::ConnState;
41 use super::super::defs::uapi;
42 use super::super::packet::VsockPacket;
43 use super::super::{
44     Result as VsockResult, VsockBackend, VsockChannel, VsockEpollListener, VsockError,
45 };
46 use super::defs;
47 use super::muxer_killq::MuxerKillQ;
48 use super::muxer_rxq::MuxerRxQ;
49 use super::MuxerConnection;
50 use super::{Error, Result};
51 
52 /// A unique identifier of a `MuxerConnection` object. Connections are stored in a hash map,
53 /// keyed by a `ConnMapKey` object.
54 ///
55 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
56 pub struct ConnMapKey {
57     local_port: u32,
58     peer_port: u32,
59 }
60 
61 /// A muxer RX queue item.
62 ///
63 #[derive(Clone, Copy, Debug)]
64 pub enum MuxerRx {
65     /// The packet must be fetched from the connection identified by `ConnMapKey`.
66     ConnRx(ConnMapKey),
67     /// The muxer must produce an RST packet.
68     RstPkt { local_port: u32, peer_port: u32 },
69 }
70 
71 /// An epoll listener, registered under the muxer's nested epoll FD.
72 ///
73 enum EpollListener {
74     /// The listener is a `MuxerConnection`, identified by `key`, and interested in the events
75     /// in `evset`. Since `MuxerConnection` implements `VsockEpollListener`, notifications will
76     /// be forwarded to the listener via `VsockEpollListener::notify()`.
77     Connection {
78         key: ConnMapKey,
79         evset: epoll::Events,
80     },
81     /// A listener interested in new host-initiated connections.
82     HostSock,
83     /// A listener interested in reading host "connect <port>" commands from a freshly
84     /// connected host socket.
85     LocalStream(UnixStream),
86 }
87 
88 /// The vsock connection multiplexer.
89 ///
90 pub struct VsockMuxer {
91     /// Guest CID.
92     cid: u64,
93     /// A hash map used to store the active connections.
94     conn_map: HashMap<ConnMapKey, MuxerConnection>,
95     /// A hash map used to store epoll event listeners / handlers.
96     listener_map: HashMap<RawFd, EpollListener>,
97     /// The RX queue. Items in this queue are consumed by `VsockMuxer::recv_pkt()`, and
98     /// produced
99     /// - by `VsockMuxer::send_pkt()` (e.g. RST in response to a connection request packet);
100     ///   and
101     /// - in response to EPOLLIN events (e.g. data available to be read from an AF_UNIX
102     ///   socket).
103     rxq: MuxerRxQ,
104     /// A queue used for terminating connections that are taking too long to shut down.
105     killq: MuxerKillQ,
106     /// The Unix socket, through which host-initiated connections are accepted.
107     host_sock: UnixListener,
108     /// The file system path of the host-side Unix socket. This is used to figure out the path
109     /// to Unix sockets listening on specific ports. I.e. "<this path>_<port number>".
110     host_sock_path: String,
111     /// The nested epoll File, used to register epoll listeners.
112     epoll_file: File,
113     /// A hash set used to keep track of used host-side (local) ports, in order to assign local
114     /// ports to host-initiated connections.
115     local_port_set: HashSet<u32>,
116     /// The last used host-side port.
117     local_port_last: u32,
118 }
119 
120 impl VsockChannel for VsockMuxer {
121     /// Deliver a vsock packet to the guest vsock driver.
122     ///
123     /// Returns:
124     /// - `Ok(())`: `pkt` has been successfully filled in; or
125     /// - `Err(VsockError::NoData)`: there was no available data with which to fill in the
126     ///   packet.
127     ///
128     fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> {
129         // We'll look for instructions on how to build the RX packet in the RX queue. If the
130         // queue is empty, that doesn't necessarily mean we don't have any pending RX, since
131         // the queue might be out-of-sync. If that's the case, we'll attempt to sync it first,
132         // and then try to pop something out again.
133         if self.rxq.is_empty() && !self.rxq.is_synced() {
134             self.rxq = MuxerRxQ::from_conn_map(&self.conn_map);
135         }
136 
137         while let Some(rx) = self.rxq.peek() {
138             let res = match rx {
139                 // We need to build an RST packet, going from `local_port` to `peer_port`.
140                 MuxerRx::RstPkt {
141                     local_port,
142                     peer_port,
143                 } => {
144                     pkt.set_op(uapi::VSOCK_OP_RST)
145                         .set_src_cid(uapi::VSOCK_HOST_CID)
146                         .set_dst_cid(self.cid)
147                         .set_src_port(local_port)
148                         .set_dst_port(peer_port)
149                         .set_len(0)
150                         .set_type(uapi::VSOCK_TYPE_STREAM)
151                         .set_flags(0)
152                         .set_buf_alloc(0)
153                         .set_fwd_cnt(0);
154                     self.rxq.pop().unwrap();
155                     return Ok(());
156                 }
157 
158                 // We'll defer building the packet to this connection, since it has something
159                 // to say.
160                 MuxerRx::ConnRx(key) => {
161                     let mut conn_res = Err(VsockError::NoData);
162                     let mut do_pop = true;
163                     self.apply_conn_mutation(key, |conn| {
164                         conn_res = conn.recv_pkt(pkt);
165                         do_pop = !conn.has_pending_rx();
166                     });
167                     if do_pop {
168                         self.rxq.pop().unwrap();
169                     }
170                     conn_res
171                 }
172             };
173 
174             if res.is_ok() {
175                 // Inspect traffic, looking for RST packets, since that means we have to
176                 // terminate and remove this connection from the active connection pool.
177                 //
178                 if pkt.op() == uapi::VSOCK_OP_RST {
179                     self.remove_connection(ConnMapKey {
180                         local_port: pkt.src_port(),
181                         peer_port: pkt.dst_port(),
182                     });
183                 }
184 
185                 debug!("vsock muxer: RX pkt: {:?}", pkt.hdr());
186                 return Ok(());
187             }
188         }
189 
190         Err(VsockError::NoData)
191     }
192 
193     /// Deliver a guest-generated packet to its destination in the vsock backend.
194     ///
195     /// This absorbs unexpected packets, handles RSTs (by dropping connections), and forwards
196     /// all the rest to their owning `MuxerConnection`.
197     ///
198     /// Returns:
199     /// always `Ok(())` - the packet has been consumed, and its virtio TX buffers can be
200     /// returned to the guest vsock driver.
201     ///
202     fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> {
203         let conn_key = ConnMapKey {
204             local_port: pkt.dst_port(),
205             peer_port: pkt.src_port(),
206         };
207 
208         debug!(
209             "vsock: muxer.send[rxq.len={}]: {:?}",
210             self.rxq.len(),
211             pkt.hdr()
212         );
213 
214         // If this packet has an unsupported type (!=stream), we must send back an RST.
215         //
216         if pkt.type_() != uapi::VSOCK_TYPE_STREAM {
217             self.enq_rst(pkt.dst_port(), pkt.src_port());
218             return Ok(());
219         }
220 
221         // We don't know how to handle packets addressed to other CIDs. We only handle the host
222         // part of the guest - host communication here.
223         if pkt.dst_cid() != uapi::VSOCK_HOST_CID {
224             info!(
225                 "vsock: dropping guest packet for unknown CID: {:?}",
226                 pkt.hdr()
227             );
228             return Ok(());
229         }
230 
231         if !self.conn_map.contains_key(&conn_key) {
232             // This packet can't be routed to any active connection (based on its src and dst
233             // ports).  The only orphan / unroutable packets we know how to handle are
234             // connection requests.
235             if pkt.op() == uapi::VSOCK_OP_REQUEST {
236                 // Oh, this is a connection request!
237                 self.handle_peer_request_pkt(pkt);
238             } else {
239                 // Send back an RST, to let the drive know we weren't expecting this packet.
240                 self.enq_rst(pkt.dst_port(), pkt.src_port());
241             }
242             return Ok(());
243         }
244 
245         // Right, we know where to send this packet, then (to `conn_key`).
246         // However, if this is an RST, we have to forcefully terminate the connection, so
247         // there's no point in forwarding it the packet.
248         if pkt.op() == uapi::VSOCK_OP_RST {
249             self.remove_connection(conn_key);
250             return Ok(());
251         }
252 
253         // Alright, everything looks in order - forward this packet to its owning connection.
254         let mut res: VsockResult<()> = Ok(());
255         self.apply_conn_mutation(conn_key, |conn| {
256             res = conn.send_pkt(pkt);
257         });
258 
259         res
260     }
261 
262     /// Check if the muxer has any pending RX data, with which to fill a guest-provided RX
263     /// buffer.
264     ///
265     fn has_pending_rx(&self) -> bool {
266         !self.rxq.is_empty() || !self.rxq.is_synced()
267     }
268 }
269 
270 impl VsockEpollListener for VsockMuxer {
271     /// Get the FD to be registered for polling upstream (in the main VMM epoll loop, in this
272     /// case).
273     ///
274     /// This will be the muxer's nested epoll FD.
275     ///
276     fn get_polled_fd(&self) -> RawFd {
277         self.epoll_file.as_raw_fd()
278     }
279 
280     /// Get the epoll events to be polled upstream.
281     ///
282     /// Since the polled FD is a nested epoll FD, we're only interested in EPOLLIN events (i.e.
283     /// some event occurred on one of the FDs registered under our epoll FD).
284     ///
285     fn get_polled_evset(&self) -> epoll::Events {
286         epoll::Events::EPOLLIN
287     }
288 
289     /// Notify the muxer about a pending event having occurred under its nested epoll FD.
290     ///
291     fn notify(&mut self, _: epoll::Events) {
292         debug!("vsock: muxer received kick");
293 
294         let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); 32];
295         'epoll: loop {
296             match epoll::wait(self.epoll_file.as_raw_fd(), 0, epoll_events.as_mut_slice()) {
297                 Ok(ev_cnt) => {
298                     for evt in epoll_events.iter().take(ev_cnt) {
299                         self.handle_event(
300                             evt.data as RawFd,
301                             // It's ok to unwrap here, since the `evt.events` is filled
302                             // in by `epoll::wait()`, and therefore contains only valid epoll
303                             // flags.
304                             epoll::Events::from_bits(evt.events).unwrap(),
305                         );
306                     }
307                 }
308                 Err(e) => {
309                     if e.kind() == io::ErrorKind::Interrupted {
310                         // It's well defined from the epoll_wait() syscall
311                         // documentation that the epoll loop can be interrupted
312                         // before any of the requested events occurred or the
313                         // timeout expired. In both those cases, epoll_wait()
314                         // returns an error of type EINTR, but this should not
315                         // be considered as a regular error. Instead it is more
316                         // appropriate to retry, by calling into epoll_wait().
317                         continue;
318                     }
319                     warn!("vsock: failed to consume muxer epoll event: {}", e);
320                 }
321             }
322             break 'epoll;
323         }
324     }
325 }
326 
327 impl VsockBackend for VsockMuxer {}
328 
329 impl VsockMuxer {
330     /// Muxer constructor.
331     ///
332     pub fn new(cid: u64, host_sock_path: String) -> Result<Self> {
333         // Create the nested epoll FD. This FD will be added to the VMM `EpollContext`, at
334         // device activation time.
335         let epoll_fd = epoll::create(true).map_err(Error::EpollFdCreate)?;
336         // Use 'File' to enforce closing on 'epoll_fd'
337         // SAFETY: epoll_fd is a valid fd
338         let epoll_file = unsafe { File::from_raw_fd(epoll_fd) };
339 
340         // Open/bind/listen on the host Unix socket, so we can accept host-initiated
341         // connections.
342         let host_sock = UnixListener::bind(&host_sock_path)
343             .and_then(|sock| sock.set_nonblocking(true).map(|_| sock))
344             .map_err(Error::UnixBind)?;
345 
346         let mut muxer = Self {
347             cid,
348             host_sock,
349             host_sock_path,
350             epoll_file,
351             rxq: MuxerRxQ::new(),
352             conn_map: HashMap::with_capacity(defs::MAX_CONNECTIONS),
353             listener_map: HashMap::with_capacity(defs::MAX_CONNECTIONS + 1),
354             killq: MuxerKillQ::new(),
355             local_port_last: (1u32 << 30) - 1,
356             local_port_set: HashSet::with_capacity(defs::MAX_CONNECTIONS),
357         };
358 
359         muxer.add_listener(muxer.host_sock.as_raw_fd(), EpollListener::HostSock)?;
360         Ok(muxer)
361     }
362 
363     /// Handle/dispatch an epoll event to its listener.
364     ///
365     fn handle_event(&mut self, fd: RawFd, event_set: epoll::Events) {
366         debug!(
367             "vsock: muxer processing event: fd={}, event_set={:?}",
368             fd, event_set
369         );
370 
371         match self.listener_map.get_mut(&fd) {
372             // This event needs to be forwarded to a `MuxerConnection` that is listening for
373             // it.
374             //
375             Some(EpollListener::Connection { key, evset: _ }) => {
376                 let key_copy = *key;
377                 // The handling of this event will most probably mutate the state of the
378                 // receiving connection. We'll need to check for new pending RX, event set
379                 // mutation, and all that, so we're wrapping the event delivery inside those
380                 // checks.
381                 self.apply_conn_mutation(key_copy, |conn| {
382                     conn.notify(event_set);
383                 });
384             }
385 
386             // A new host-initiated connection is ready to be accepted.
387             //
388             Some(EpollListener::HostSock) => {
389                 if self.conn_map.len() == defs::MAX_CONNECTIONS {
390                     // If we're already maxed-out on connections, we'll just accept and
391                     // immediately discard this potentially new one.
392                     warn!("vsock: connection limit reached; refusing new host connection");
393                     self.host_sock.accept().map(|_| 0).unwrap_or(0);
394                     return;
395                 }
396                 self.host_sock
397                     .accept()
398                     .map_err(Error::UnixAccept)
399                     .and_then(|(stream, _)| {
400                         stream
401                             .set_nonblocking(true)
402                             .map(|_| stream)
403                             .map_err(Error::UnixAccept)
404                     })
405                     .and_then(|stream| {
406                         // Before forwarding this connection to a listening AF_VSOCK socket on
407                         // the guest side, we need to know the destination port. We'll read
408                         // that port from a "connect" command received on this socket, so the
409                         // next step is to ask to be notified the moment we can read from it.
410                         self.add_listener(stream.as_raw_fd(), EpollListener::LocalStream(stream))
411                     })
412                     .unwrap_or_else(|err| {
413                         warn!("vsock: unable to accept local connection: {:?}", err);
414                     });
415             }
416 
417             // Data is ready to be read from a host-initiated connection. That would be the
418             // "connect" command that we're expecting.
419             Some(EpollListener::LocalStream(_)) => {
420                 if let Some(EpollListener::LocalStream(mut stream)) = self.remove_listener(fd) {
421                     Self::read_local_stream_port(&mut stream)
422                         .map(|peer_port| (self.allocate_local_port(), peer_port))
423                         .and_then(|(local_port, peer_port)| {
424                             self.add_connection(
425                                 ConnMapKey {
426                                     local_port,
427                                     peer_port,
428                                 },
429                                 MuxerConnection::new_local_init(
430                                     stream,
431                                     uapi::VSOCK_HOST_CID,
432                                     self.cid,
433                                     local_port,
434                                     peer_port,
435                                 ),
436                             )
437                         })
438                         .unwrap_or_else(|err| {
439                             info!("vsock: error adding local-init connection: {:?}", err);
440                         })
441                 }
442             }
443 
444             _ => {
445                 info!(
446                     "vsock: unexpected event: fd={:?}, event_set={:?}",
447                     fd, event_set
448                 );
449             }
450         }
451     }
452 
453     /// Parse a host "connect" command, and extract the destination vsock port.
454     ///
455     fn read_local_stream_port(stream: &mut UnixStream) -> Result<u32> {
456         let mut buf = [0u8; 32];
457 
458         // This is the minimum number of bytes that we should be able to read, when parsing a
459         // valid connection request. I.e. `b"connect 0\n".len()`.
460         const MIN_READ_LEN: usize = 10;
461 
462         // Bring in the minimum number of bytes that we should be able to read.
463         stream
464             .read_exact(&mut buf[..MIN_READ_LEN])
465             .map_err(Error::UnixRead)?;
466 
467         // Now, finish reading the destination port number, by bringing in one byte at a time,
468         // until we reach an EOL terminator (or our buffer space runs out).  Yeah, not
469         // particularly proud of this approach, but it will have to do for now.
470         let mut blen = MIN_READ_LEN;
471         while buf[blen - 1] != b'\n' && blen < buf.len() {
472             stream
473                 .read_exact(&mut buf[blen..=blen])
474                 .map_err(Error::UnixRead)?;
475             blen += 1;
476         }
477 
478         let mut word_iter = std::str::from_utf8(&buf[..blen])
479             .map_err(Error::ConvertFromUtf8)?
480             .split_whitespace();
481 
482         word_iter
483             .next()
484             .ok_or(Error::InvalidPortRequest)
485             .and_then(|word| {
486                 if word.to_lowercase() == "connect" {
487                     Ok(())
488                 } else {
489                     Err(Error::InvalidPortRequest)
490                 }
491             })
492             .and_then(|_| word_iter.next().ok_or(Error::InvalidPortRequest))
493             .and_then(|word| word.parse::<u32>().map_err(Error::ParseInteger))
494             .map_err(|e| Error::ReadStreamPort(Box::new(e)))
495     }
496 
497     /// Add a new connection to the active connection pool.
498     ///
499     fn add_connection(&mut self, key: ConnMapKey, conn: MuxerConnection) -> Result<()> {
500         // We might need to make room for this new connection, so let's sweep the kill queue
501         // first.  It's fine to do this here because:
502         // - unless the kill queue is out of sync, this is a pretty inexpensive operation; and
503         // - we are under no pressure to respect any accurate timing for connection
504         //   termination.
505         self.sweep_killq();
506 
507         if self.conn_map.len() >= defs::MAX_CONNECTIONS {
508             info!(
509                 "vsock: muxer connection limit reached ({})",
510                 defs::MAX_CONNECTIONS
511             );
512             return Err(Error::TooManyConnections);
513         }
514 
515         self.add_listener(
516             conn.get_polled_fd(),
517             EpollListener::Connection {
518                 key,
519                 evset: conn.get_polled_evset(),
520             },
521         )
522         .map(|_| {
523             if conn.has_pending_rx() {
524                 // We can safely ignore any error in adding a connection RX indication. Worst
525                 // case scenario, the RX queue will get desynchronized, but we'll handle that
526                 // the next time we need to yield an RX packet.
527                 self.rxq.push(MuxerRx::ConnRx(key));
528             }
529             self.conn_map.insert(key, conn);
530         })
531     }
532 
533     /// Remove a connection from the active connection poll.
534     ///
535     fn remove_connection(&mut self, key: ConnMapKey) {
536         if let Some(conn) = self.conn_map.remove(&key) {
537             self.remove_listener(conn.get_polled_fd());
538         }
539         self.free_local_port(key.local_port);
540     }
541 
542     /// Schedule a connection for immediate termination.
543     /// I.e. as soon as we can also let our peer know we're dropping the connection, by sending
544     /// it an RST packet.
545     ///
546     fn kill_connection(&mut self, key: ConnMapKey) {
547         let mut had_rx = false;
548         self.conn_map.entry(key).and_modify(|conn| {
549             had_rx = conn.has_pending_rx();
550             conn.kill();
551         });
552         // This connection will now have an RST packet to yield, so we need to add it to the RX
553         // queue.  However, there's no point in doing that if it was already in the queue.
554         if !had_rx {
555             // We can safely ignore any error in adding a connection RX indication. Worst case
556             // scenario, the RX queue will get desynchronized, but we'll handle that the next
557             // time we need to yield an RX packet.
558             self.rxq.push(MuxerRx::ConnRx(key));
559         }
560     }
561 
562     /// Register a new epoll listener under the muxer's nested epoll FD.
563     ///
564     fn add_listener(&mut self, fd: RawFd, listener: EpollListener) -> Result<()> {
565         let evset = match listener {
566             EpollListener::Connection { evset, .. } => evset,
567             EpollListener::LocalStream(_) => epoll::Events::EPOLLIN,
568             EpollListener::HostSock => epoll::Events::EPOLLIN,
569         };
570 
571         epoll::ctl(
572             self.epoll_file.as_raw_fd(),
573             epoll::ControlOptions::EPOLL_CTL_ADD,
574             fd,
575             epoll::Event::new(evset, fd as u64),
576         )
577         .map(|_| {
578             self.listener_map.insert(fd, listener);
579         })
580         .map_err(Error::EpollAdd)?;
581 
582         Ok(())
583     }
584 
585     /// Remove (and return) a previously registered epoll listener.
586     ///
587     fn remove_listener(&mut self, fd: RawFd) -> Option<EpollListener> {
588         let maybe_listener = self.listener_map.remove(&fd);
589 
590         if maybe_listener.is_some() {
591             epoll::ctl(
592                 self.epoll_file.as_raw_fd(),
593                 epoll::ControlOptions::EPOLL_CTL_DEL,
594                 fd,
595                 epoll::Event::new(epoll::Events::empty(), 0),
596             )
597             .unwrap_or_else(|err| {
598                 warn!(
599                     "vosck muxer: error removing epoll listener for fd {:?}: {:?}",
600                     fd, err
601                 );
602             });
603         }
604 
605         maybe_listener
606     }
607 
608     /// Allocate a host-side port to be assigned to a new host-initiated connection.
609     ///
610     ///
611     fn allocate_local_port(&mut self) -> u32 {
612         // TODO: this doesn't seem very space-efficient.
613         // Maybe rewrite this to limit port range and use a bitmap?
614         //
615 
616         loop {
617             self.local_port_last = (self.local_port_last + 1) & !(1 << 31) | (1 << 30);
618             if self.local_port_set.insert(self.local_port_last) {
619                 break;
620             }
621         }
622         self.local_port_last
623     }
624 
625     /// Mark a previously used host-side port as free.
626     ///
627     fn free_local_port(&mut self, port: u32) {
628         self.local_port_set.remove(&port);
629     }
630 
631     /// Handle a new connection request coming from our peer (the guest vsock driver).
632     ///
633     /// This will attempt to connect to a host-side Unix socket, expected to be listening at
634     /// the file system path corresponding to the destination port. If successful, a new
635     /// connection object will be created and added to the connection pool. On failure, a new
636     /// RST packet will be scheduled for delivery to the guest.
637     ///
638     fn handle_peer_request_pkt(&mut self, pkt: &VsockPacket) {
639         let port_path = format!("{}_{}", self.host_sock_path, pkt.dst_port());
640 
641         UnixStream::connect(port_path)
642             .and_then(|stream| stream.set_nonblocking(true).map(|_| stream))
643             .map_err(Error::UnixConnect)
644             .and_then(|stream| {
645                 self.add_connection(
646                     ConnMapKey {
647                         local_port: pkt.dst_port(),
648                         peer_port: pkt.src_port(),
649                     },
650                     MuxerConnection::new_peer_init(
651                         stream,
652                         uapi::VSOCK_HOST_CID,
653                         self.cid,
654                         pkt.dst_port(),
655                         pkt.src_port(),
656                         pkt.buf_alloc(),
657                     ),
658                 )
659             })
660             .unwrap_or_else(|_| self.enq_rst(pkt.dst_port(), pkt.src_port()));
661     }
662 
663     /// Perform an action that might mutate a connection's state.
664     ///
665     /// This is used as shorthand for repetitive tasks that need to be performed after a
666     /// connection object mutates. E.g.
667     /// - update the connection's epoll listener;
668     /// - schedule the connection to be queried for RX data;
669     /// - kill the connection if an unrecoverable error occurs.
670     ///
671     fn apply_conn_mutation<F>(&mut self, key: ConnMapKey, mut_fn: F)
672     where
673         F: FnOnce(&mut MuxerConnection),
674     {
675         if let Some(conn) = self.conn_map.get_mut(&key) {
676             let had_rx = conn.has_pending_rx();
677             let was_expiring = conn.will_expire();
678             let prev_state = conn.state();
679 
680             mut_fn(conn);
681 
682             // If this is a host-initiated connection that has just become established, we'll have
683             // to send an ack message to the host end.
684             if prev_state == ConnState::LocalInit && conn.state() == ConnState::Established {
685                 let msg = format!("OK {}\n", key.local_port);
686                 match conn.send_bytes_raw(msg.as_bytes()) {
687                     Ok(written) if written == msg.len() => (),
688                     Ok(_) => {
689                         // If we can't write a dozen bytes to a pristine connection something
690                         // must be really wrong. Killing it.
691                         conn.kill();
692                         warn!("vsock: unable to fully write connection ack msg.");
693                     }
694                     Err(err) => {
695                         conn.kill();
696                         warn!("vsock: unable to ack host connection: {:?}", err);
697                     }
698                 };
699             }
700 
701             // If the connection wasn't previously scheduled for RX, add it to our RX queue.
702             if !had_rx && conn.has_pending_rx() {
703                 self.rxq.push(MuxerRx::ConnRx(key));
704             }
705 
706             // If the connection wasn't previously scheduled for termination, add it to the
707             // kill queue.
708             if !was_expiring && conn.will_expire() {
709                 // It's safe to unwrap here, since `conn.will_expire()` already guaranteed that
710                 // an `conn.expiry` is available.
711                 self.killq.push(key, conn.expiry().unwrap());
712             }
713 
714             let fd = conn.get_polled_fd();
715             let new_evset = conn.get_polled_evset();
716             if new_evset.is_empty() {
717                 // If the connection no longer needs epoll notifications, remove its listener
718                 // from our list.
719                 self.remove_listener(fd);
720                 return;
721             }
722             if let Some(EpollListener::Connection { evset, .. }) = self.listener_map.get_mut(&fd) {
723                 if *evset != new_evset {
724                     // If the set of events that the connection is interested in has changed,
725                     // we need to update its epoll listener.
726                     debug!(
727                         "vsock: updating listener for (lp={}, pp={}): old={:?}, new={:?}",
728                         key.local_port, key.peer_port, *evset, new_evset
729                     );
730 
731                     *evset = new_evset;
732                     epoll::ctl(
733                         self.epoll_file.as_raw_fd(),
734                         epoll::ControlOptions::EPOLL_CTL_MOD,
735                         fd,
736                         epoll::Event::new(new_evset, fd as u64),
737                     )
738                     .unwrap_or_else(|err| {
739                         // This really shouldn't happen, like, ever. However, "famous last
740                         // words" and all that, so let's just kill it with fire, and walk away.
741                         self.kill_connection(key);
742                         error!(
743                             "vsock: error updating epoll listener for (lp={}, pp={}): {:?}",
744                             key.local_port, key.peer_port, err
745                         );
746                     });
747                 }
748             } else {
749                 // The connection had previously asked to be removed from the listener map (by
750                 // returning an empty event set via `get_polled_fd()`), but now wants back in.
751                 self.add_listener(
752                     fd,
753                     EpollListener::Connection {
754                         key,
755                         evset: new_evset,
756                     },
757                 )
758                 .unwrap_or_else(|err| {
759                     self.kill_connection(key);
760                     error!(
761                         "vsock: error updating epoll listener for (lp={}, pp={}): {:?}",
762                         key.local_port, key.peer_port, err
763                     );
764                 });
765             }
766         }
767     }
768 
769     /// Check if any connections have timed out, and if so, schedule them for immediate
770     /// termination.
771     ///
772     fn sweep_killq(&mut self) {
773         while let Some(key) = self.killq.pop() {
774             // Connections don't get removed from the kill queue when their kill timer is
775             // disarmed, since that would be a costly operation. This means we must check if
776             // the connection has indeed expired, prior to killing it.
777             let mut kill = false;
778             self.conn_map
779                 .entry(key)
780                 .and_modify(|conn| kill = conn.has_expired());
781             if kill {
782                 self.kill_connection(key);
783             }
784         }
785 
786         if self.killq.is_empty() && !self.killq.is_synced() {
787             self.killq = MuxerKillQ::from_conn_map(&self.conn_map);
788             // If we've just re-created the kill queue, we can sweep it again; maybe there's
789             // more to kill.
790             self.sweep_killq();
791         }
792     }
793 
794     /// Enqueue an RST packet into `self.rxq`.
795     ///
796     /// Enqueue errors aren't propagated up the call chain, since there is nothing we can do to
797     /// handle them. We do, however, log a warning, since not being able to enqueue an RST
798     /// packet means we have to drop it, which is not normal operation.
799     ///
800     fn enq_rst(&mut self, local_port: u32, peer_port: u32) {
801         let pushed = self.rxq.push(MuxerRx::RstPkt {
802             local_port,
803             peer_port,
804         });
805         if !pushed {
806             warn!(
807                 "vsock: muxer.rxq full; dropping RST packet for lp={}, pp={}",
808                 local_port, peer_port
809             );
810         }
811     }
812 }
813 
814 #[cfg(test)]
815 mod tests {
816     use std::io::{Read, Write};
817     use std::ops::Drop;
818     use std::os::unix::net::{UnixListener, UnixStream};
819     use std::path::{Path, PathBuf};
820 
821     use virtio_queue::QueueOwnedT;
822 
823     use super::super::super::csm::defs as csm_defs;
824     use super::super::super::tests::TestContext as VsockTestContext;
825     use super::*;
826 
827     const PEER_CID: u64 = 3;
828     const PEER_BUF_ALLOC: u32 = 64 * 1024;
829 
830     struct MuxerTestContext {
831         _vsock_test_ctx: VsockTestContext,
832         pkt: VsockPacket,
833         muxer: VsockMuxer,
834     }
835 
836     impl Drop for MuxerTestContext {
837         fn drop(&mut self) {
838             std::fs::remove_file(self.muxer.host_sock_path.as_str()).unwrap();
839         }
840     }
841 
842     impl MuxerTestContext {
843         fn new(name: &str) -> Self {
844             let vsock_test_ctx = VsockTestContext::new();
845             let mut handler_ctx = vsock_test_ctx.create_epoll_handler_context();
846             let pkt = VsockPacket::from_rx_virtq_head(
847                 &mut handler_ctx.handler.queues[0]
848                     .iter(&vsock_test_ctx.mem)
849                     .unwrap()
850                     .next()
851                     .unwrap(),
852                 None,
853             )
854             .unwrap();
855             let uds_path = format!("test_vsock_{}.sock", name);
856             let muxer = VsockMuxer::new(PEER_CID, uds_path).unwrap();
857 
858             Self {
859                 _vsock_test_ctx: vsock_test_ctx,
860                 pkt,
861                 muxer,
862             }
863         }
864 
865         fn init_pkt(&mut self, local_port: u32, peer_port: u32, op: u16) -> &mut VsockPacket {
866             for b in self.pkt.hdr_mut() {
867                 *b = 0;
868             }
869             self.pkt
870                 .set_type(uapi::VSOCK_TYPE_STREAM)
871                 .set_src_cid(PEER_CID)
872                 .set_dst_cid(uapi::VSOCK_HOST_CID)
873                 .set_src_port(peer_port)
874                 .set_dst_port(local_port)
875                 .set_op(op)
876                 .set_buf_alloc(PEER_BUF_ALLOC)
877         }
878 
879         fn init_data_pkt(
880             &mut self,
881             local_port: u32,
882             peer_port: u32,
883             data: &[u8],
884         ) -> &mut VsockPacket {
885             assert!(data.len() <= self.pkt.buf().unwrap().len());
886             self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RW)
887                 .set_len(data.len() as u32);
888             self.pkt.buf_mut().unwrap()[..data.len()].copy_from_slice(data);
889             &mut self.pkt
890         }
891 
892         fn send(&mut self) {
893             self.muxer.send_pkt(&self.pkt).unwrap();
894         }
895 
896         fn recv(&mut self) {
897             self.muxer.recv_pkt(&mut self.pkt).unwrap();
898         }
899 
900         fn notify_muxer(&mut self) {
901             self.muxer.notify(epoll::Events::EPOLLIN);
902         }
903 
904         fn count_epoll_listeners(&self) -> (usize, usize) {
905             let mut local_lsn_count = 0usize;
906             let mut conn_lsn_count = 0usize;
907             for key in self.muxer.listener_map.values() {
908                 match key {
909                     EpollListener::LocalStream(_) => local_lsn_count += 1,
910                     EpollListener::Connection { .. } => conn_lsn_count += 1,
911                     _ => (),
912                 };
913             }
914             (local_lsn_count, conn_lsn_count)
915         }
916 
917         fn create_local_listener(&self, port: u32) -> LocalListener {
918             LocalListener::new(format!("{}_{}", self.muxer.host_sock_path, port))
919         }
920 
921         fn local_connect(&mut self, peer_port: u32) -> (UnixStream, u32) {
922             let (init_local_lsn_count, init_conn_lsn_count) = self.count_epoll_listeners();
923 
924             let mut stream = UnixStream::connect(self.muxer.host_sock_path.clone()).unwrap();
925             stream.set_nonblocking(true).unwrap();
926             // The muxer would now get notified of a new connection having arrived at its Unix
927             // socket, so it can accept it.
928             self.notify_muxer();
929 
930             // Just after having accepted a new local connection, the muxer should've added a new
931             // `LocalStream` listener to its `listener_map`.
932             let (local_lsn_count, _) = self.count_epoll_listeners();
933             assert_eq!(local_lsn_count, init_local_lsn_count + 1);
934 
935             let buf = format!("CONNECT {}\n", peer_port);
936             stream.write_all(buf.as_bytes()).unwrap();
937             // The muxer would now get notified that data is available for reading from the locally
938             // initiated connection.
939             self.notify_muxer();
940 
941             // Successfully reading and parsing the connection request should have removed the
942             // LocalStream epoll listener and added a Connection epoll listener.
943             let (local_lsn_count, conn_lsn_count) = self.count_epoll_listeners();
944             assert_eq!(local_lsn_count, init_local_lsn_count);
945             assert_eq!(conn_lsn_count, init_conn_lsn_count + 1);
946 
947             // A LocalInit connection should've been added to the muxer connection map.  A new
948             // local port should also have been allocated for the new LocalInit connection.
949             let local_port = self.muxer.local_port_last;
950             let key = ConnMapKey {
951                 local_port,
952                 peer_port,
953             };
954             assert!(self.muxer.conn_map.contains_key(&key));
955             assert!(self.muxer.local_port_set.contains(&local_port));
956 
957             // A connection request for the peer should now be available from the muxer.
958             assert!(self.muxer.has_pending_rx());
959             self.recv();
960             assert_eq!(self.pkt.op(), uapi::VSOCK_OP_REQUEST);
961             assert_eq!(self.pkt.dst_port(), peer_port);
962             assert_eq!(self.pkt.src_port(), local_port);
963 
964             self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RESPONSE);
965             self.send();
966 
967             let mut buf = vec![0u8; 32];
968             let len = stream.read(&mut buf[..]).unwrap();
969             assert_eq!(&buf[..len], format!("OK {}\n", local_port).as_bytes());
970 
971             (stream, local_port)
972         }
973     }
974 
975     struct LocalListener {
976         path: PathBuf,
977         sock: UnixListener,
978     }
979     impl LocalListener {
980         fn new<P: AsRef<Path> + Clone>(path: P) -> Self {
981             let path_buf = path.as_ref().to_path_buf();
982             let sock = UnixListener::bind(path).unwrap();
983             sock.set_nonblocking(true).unwrap();
984             Self {
985                 path: path_buf,
986                 sock,
987             }
988         }
989         fn accept(&mut self) -> UnixStream {
990             let (stream, _) = self.sock.accept().unwrap();
991             stream.set_nonblocking(true).unwrap();
992             stream
993         }
994     }
995     impl Drop for LocalListener {
996         fn drop(&mut self) {
997             std::fs::remove_file(&self.path).unwrap();
998         }
999     }
1000 
1001     #[test]
1002     fn test_muxer_epoll_listener() {
1003         let ctx = MuxerTestContext::new("muxer_epoll_listener");
1004         assert_eq!(ctx.muxer.get_polled_fd(), ctx.muxer.epoll_file.as_raw_fd());
1005         assert_eq!(ctx.muxer.get_polled_evset(), epoll::Events::EPOLLIN);
1006     }
1007 
1008     #[test]
1009     fn test_bad_peer_pkt() {
1010         const LOCAL_PORT: u32 = 1026;
1011         const PEER_PORT: u32 = 1025;
1012         const SOCK_DGRAM: u16 = 2;
1013 
1014         let mut ctx = MuxerTestContext::new("bad_peer_pkt");
1015         ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST)
1016             .set_type(SOCK_DGRAM);
1017         ctx.send();
1018 
1019         // The guest sent a SOCK_DGRAM packet. Per the vsock spec, we need to reply with an RST
1020         // packet, since vsock only supports stream sockets.
1021         assert!(ctx.muxer.has_pending_rx());
1022         ctx.recv();
1023         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1024         assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID);
1025         assert_eq!(ctx.pkt.dst_cid(), PEER_CID);
1026         assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
1027         assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
1028 
1029         // Any orphan (i.e. without a connection), non-RST packet, should be replied to with an
1030         // RST.
1031         let bad_ops = [
1032             uapi::VSOCK_OP_RESPONSE,
1033             uapi::VSOCK_OP_CREDIT_REQUEST,
1034             uapi::VSOCK_OP_CREDIT_UPDATE,
1035             uapi::VSOCK_OP_SHUTDOWN,
1036             uapi::VSOCK_OP_RW,
1037         ];
1038         for op in bad_ops.iter() {
1039             ctx.init_pkt(LOCAL_PORT, PEER_PORT, *op);
1040             ctx.send();
1041             assert!(ctx.muxer.has_pending_rx());
1042             ctx.recv();
1043             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1044             assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
1045             assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
1046         }
1047 
1048         // Any packet addressed to anything other than VSOCK_VHOST_CID should get dropped.
1049         assert!(!ctx.muxer.has_pending_rx());
1050         ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST)
1051             .set_dst_cid(uapi::VSOCK_HOST_CID + 1);
1052         ctx.send();
1053         assert!(!ctx.muxer.has_pending_rx());
1054     }
1055 
1056     #[test]
1057     fn test_peer_connection() {
1058         const LOCAL_PORT: u32 = 1026;
1059         const PEER_PORT: u32 = 1025;
1060 
1061         let mut ctx = MuxerTestContext::new("peer_connection");
1062 
1063         // Test peer connection refused.
1064         ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST);
1065         ctx.send();
1066         ctx.recv();
1067         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1068         assert_eq!(ctx.pkt.len(), 0);
1069         assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID);
1070         assert_eq!(ctx.pkt.dst_cid(), PEER_CID);
1071         assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
1072         assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
1073 
1074         // Test peer connection accepted.
1075         let mut listener = ctx.create_local_listener(LOCAL_PORT);
1076         ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST);
1077         ctx.send();
1078         assert_eq!(ctx.muxer.conn_map.len(), 1);
1079         let mut stream = listener.accept();
1080         ctx.recv();
1081         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
1082         assert_eq!(ctx.pkt.len(), 0);
1083         assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID);
1084         assert_eq!(ctx.pkt.dst_cid(), PEER_CID);
1085         assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
1086         assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
1087         let key = ConnMapKey {
1088             local_port: LOCAL_PORT,
1089             peer_port: PEER_PORT,
1090         };
1091         assert!(ctx.muxer.conn_map.contains_key(&key));
1092 
1093         // Test guest -> host data flow.
1094         let data = [1, 2, 3, 4];
1095         ctx.init_data_pkt(LOCAL_PORT, PEER_PORT, &data);
1096         ctx.send();
1097         let mut buf = vec![0; data.len()];
1098         stream.read_exact(buf.as_mut_slice()).unwrap();
1099         assert_eq!(buf.as_slice(), data);
1100 
1101         // Test host -> guest data flow.
1102         let data = [5u8, 6, 7, 8];
1103         stream.write_all(&data).unwrap();
1104 
1105         // When data is available on the local stream, an EPOLLIN event would normally be delivered
1106         // to the muxer's nested epoll FD. For testing only, we can fake that event notification
1107         // here.
1108         ctx.notify_muxer();
1109         // After being notified, the muxer should've figured out that RX data was available for one
1110         // of its connections, so it should now be reporting that it can fill in an RX packet.
1111         assert!(ctx.muxer.has_pending_rx());
1112         ctx.recv();
1113         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW);
1114         assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data);
1115         assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
1116         assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
1117 
1118         assert!(!ctx.muxer.has_pending_rx());
1119     }
1120 
1121     #[test]
1122     fn test_local_connection() {
1123         let mut ctx = MuxerTestContext::new("local_connection");
1124         let peer_port = 1025;
1125         let (mut stream, local_port) = ctx.local_connect(peer_port);
1126 
1127         // Test guest -> host data flow.
1128         let data = [1, 2, 3, 4];
1129         ctx.init_data_pkt(local_port, peer_port, &data);
1130         ctx.send();
1131 
1132         let mut buf = vec![0u8; data.len()];
1133         stream.read_exact(buf.as_mut_slice()).unwrap();
1134         assert_eq!(buf.as_slice(), &data);
1135 
1136         // Test host -> guest data flow.
1137         let data = [5, 6, 7, 8];
1138         stream.write_all(&data).unwrap();
1139         ctx.notify_muxer();
1140 
1141         assert!(ctx.muxer.has_pending_rx());
1142         ctx.recv();
1143         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW);
1144         assert_eq!(ctx.pkt.src_port(), local_port);
1145         assert_eq!(ctx.pkt.dst_port(), peer_port);
1146         assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data);
1147     }
1148 
1149     #[test]
1150     fn test_local_close() {
1151         let peer_port = 1025;
1152         let mut ctx = MuxerTestContext::new("local_close");
1153         let local_port;
1154         {
1155             let (_stream, local_port_) = ctx.local_connect(peer_port);
1156             local_port = local_port_;
1157         }
1158         // Local var `_stream` was now dropped, thus closing the local stream. After the muxer gets
1159         // notified via EPOLLIN, it should attempt to gracefully shutdown the connection, issuing a
1160         // VSOCK_OP_SHUTDOWN with both no-more-send and no-more-recv indications set.
1161         ctx.notify_muxer();
1162         assert!(ctx.muxer.has_pending_rx());
1163         ctx.recv();
1164         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN);
1165         assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND, 0);
1166         assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV, 0);
1167         assert_eq!(ctx.pkt.src_port(), local_port);
1168         assert_eq!(ctx.pkt.dst_port(), peer_port);
1169 
1170         // The connection should get removed (and its local port freed), after the peer replies
1171         // with an RST.
1172         ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RST);
1173         ctx.send();
1174         let key = ConnMapKey {
1175             local_port,
1176             peer_port,
1177         };
1178         assert!(!ctx.muxer.conn_map.contains_key(&key));
1179         assert!(!ctx.muxer.local_port_set.contains(&local_port));
1180     }
1181 
1182     #[test]
1183     fn test_peer_close() {
1184         let peer_port = 1025;
1185         let local_port = 1026;
1186         let mut ctx = MuxerTestContext::new("peer_close");
1187 
1188         let mut sock = ctx.create_local_listener(local_port);
1189         ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_REQUEST);
1190         ctx.send();
1191         let mut stream = sock.accept();
1192 
1193         assert!(ctx.muxer.has_pending_rx());
1194         ctx.recv();
1195         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
1196         assert_eq!(ctx.pkt.src_port(), local_port);
1197         assert_eq!(ctx.pkt.dst_port(), peer_port);
1198         let key = ConnMapKey {
1199             local_port,
1200             peer_port,
1201         };
1202         assert!(ctx.muxer.conn_map.contains_key(&key));
1203 
1204         // Emulate a full shutdown from the peer (no-more-send + no-more-recv).
1205         ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_SHUTDOWN)
1206             .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND)
1207             .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV);
1208         ctx.send();
1209 
1210         // Now, the muxer should remove the connection from its map, and reply with an RST.
1211         assert!(ctx.muxer.has_pending_rx());
1212         ctx.recv();
1213         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1214         assert_eq!(ctx.pkt.src_port(), local_port);
1215         assert_eq!(ctx.pkt.dst_port(), peer_port);
1216         let key = ConnMapKey {
1217             local_port,
1218             peer_port,
1219         };
1220         assert!(!ctx.muxer.conn_map.contains_key(&key));
1221 
1222         // The muxer should also drop / close the local Unix socket for this connection.
1223         let mut buf = vec![0u8; 16];
1224         assert_eq!(stream.read(buf.as_mut_slice()).unwrap(), 0);
1225     }
1226 
1227     #[test]
1228     fn test_muxer_rxq() {
1229         let mut ctx = MuxerTestContext::new("muxer_rxq");
1230         let local_port = 1026;
1231         let peer_port_first = 1025;
1232         let mut listener = ctx.create_local_listener(local_port);
1233         let mut streams: Vec<UnixStream> = Vec::new();
1234 
1235         for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE {
1236             ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST);
1237             ctx.send();
1238             streams.push(listener.accept());
1239         }
1240 
1241         // The muxer RX queue should now be full (with connection responses), but still
1242         // synchronized.
1243         assert!(ctx.muxer.rxq.is_synced());
1244 
1245         // One more queued reply should desync the RX queue.
1246         ctx.init_pkt(
1247             local_port,
1248             (peer_port_first + defs::MUXER_RXQ_SIZE) as u32,
1249             uapi::VSOCK_OP_REQUEST,
1250         );
1251         ctx.send();
1252         assert!(!ctx.muxer.rxq.is_synced());
1253 
1254         // With an out-of-sync queue, an RST should evict any non-RST packet from the queue, and
1255         // take its place. We'll check that by making sure that the last packet popped from the
1256         // queue is an RST.
1257         ctx.init_pkt(
1258             local_port + 1,
1259             peer_port_first as u32,
1260             uapi::VSOCK_OP_REQUEST,
1261         );
1262         ctx.send();
1263 
1264         for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE - 1 {
1265             ctx.recv();
1266             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
1267             // The response order should hold. The evicted response should have been the last
1268             // enqueued.
1269             assert_eq!(ctx.pkt.dst_port(), peer_port as u32);
1270         }
1271         // There should be one more packet in the queue: the RST.
1272         assert_eq!(ctx.muxer.rxq.len(), 1);
1273         ctx.recv();
1274         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1275 
1276         // The queue should now be empty, but out-of-sync, so the muxer should report it has some
1277         // pending RX.
1278         assert!(ctx.muxer.rxq.is_empty());
1279         assert!(!ctx.muxer.rxq.is_synced());
1280         assert!(ctx.muxer.has_pending_rx());
1281 
1282         // The next recv should sync the queue back up. It should also yield one of the two
1283         // responses that are still left:
1284         // - the one that desynchronized the queue; and
1285         // - the one that got evicted by the RST.
1286         ctx.recv();
1287         assert!(ctx.muxer.rxq.is_synced());
1288         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
1289 
1290         assert!(ctx.muxer.has_pending_rx());
1291         ctx.recv();
1292         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
1293     }
1294 
1295     #[test]
1296     fn test_muxer_killq() {
1297         let mut ctx = MuxerTestContext::new("muxer_killq");
1298         let local_port = 1026;
1299         let peer_port_first = 1025;
1300         let peer_port_last = peer_port_first + defs::MUXER_KILLQ_SIZE;
1301         let mut listener = ctx.create_local_listener(local_port);
1302 
1303         for peer_port in peer_port_first..=peer_port_last {
1304             ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST);
1305             ctx.send();
1306             ctx.notify_muxer();
1307             ctx.recv();
1308             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
1309             assert_eq!(ctx.pkt.src_port(), local_port);
1310             assert_eq!(ctx.pkt.dst_port(), peer_port as u32);
1311             {
1312                 let _stream = listener.accept();
1313             }
1314             ctx.notify_muxer();
1315             ctx.recv();
1316             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN);
1317             assert_eq!(ctx.pkt.src_port(), local_port);
1318             assert_eq!(ctx.pkt.dst_port(), peer_port as u32);
1319             // The kill queue should be synchronized, up until the `defs::MUXER_KILLQ_SIZE`th
1320             // connection we schedule for termination.
1321             assert_eq!(
1322                 ctx.muxer.killq.is_synced(),
1323                 peer_port < peer_port_first + defs::MUXER_KILLQ_SIZE
1324             );
1325         }
1326 
1327         assert!(!ctx.muxer.killq.is_synced());
1328         assert!(!ctx.muxer.has_pending_rx());
1329 
1330         // Wait for the kill timers to expire.
1331         std::thread::sleep(std::time::Duration::from_millis(
1332             csm_defs::CONN_SHUTDOWN_TIMEOUT_MS,
1333         ));
1334 
1335         // Trigger a kill queue sweep, by requesting a new connection.
1336         ctx.init_pkt(
1337             local_port,
1338             peer_port_last as u32 + 1,
1339             uapi::VSOCK_OP_REQUEST,
1340         );
1341         ctx.send();
1342 
1343         // After sweeping the kill queue, it should now be synced (assuming the RX queue is larger
1344         // than the kill queue, since an RST packet will be queued for each killed connection).
1345         assert!(ctx.muxer.killq.is_synced());
1346         assert!(ctx.muxer.has_pending_rx());
1347         // There should be `defs::MUXER_KILLQ_SIZE` RSTs in the RX queue, from terminating the
1348         // dying connections in the recent killq sweep.
1349         for _p in peer_port_first..peer_port_last {
1350             ctx.recv();
1351             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1352             assert_eq!(ctx.pkt.src_port(), local_port);
1353         }
1354 
1355         // There should be one more packet in the RX queue: the connection response our request
1356         // that triggered the kill queue sweep.
1357         ctx.recv();
1358         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
1359         assert_eq!(ctx.pkt.dst_port(), peer_port_last as u32 + 1);
1360 
1361         assert!(!ctx.muxer.has_pending_rx());
1362     }
1363 
1364     #[test]
1365     fn test_regression_handshake() {
1366         // Address one of the issues found while fixing the following issue:
1367         // https://github.com/firecracker-microvm/firecracker/issues/1751
1368         // This test checks that the handshake message is not accounted for
1369         let mut ctx = MuxerTestContext::new("regression_handshake");
1370         let peer_port = 1025;
1371 
1372         // Create a local connection.
1373         let (_, local_port) = ctx.local_connect(peer_port);
1374 
1375         // Get the connection from the connection map.
1376         let key = ConnMapKey {
1377             local_port,
1378             peer_port,
1379         };
1380         let conn = ctx.muxer.conn_map.get_mut(&key).unwrap();
1381 
1382         // Check that fwd_cnt is 0 - "OK ..." was not accounted for.
1383         assert_eq!(conn.fwd_cnt().0, 0);
1384     }
1385 
1386     #[test]
1387     fn test_regression_rxq_pop() {
1388         // Address one of the issues found while fixing the following issue:
1389         // https://github.com/firecracker-microvm/firecracker/issues/1751
1390         // This test checks that a connection is not popped out of the muxer
1391         // rxq when multiple flags are set
1392         let mut ctx = MuxerTestContext::new("regression_rxq_pop");
1393         let peer_port = 1025;
1394         let (mut stream, local_port) = ctx.local_connect(peer_port);
1395 
1396         // Send some data.
1397         let data = [5u8, 6, 7, 8];
1398         stream.write_all(&data).unwrap();
1399         ctx.notify_muxer();
1400 
1401         // Get the connection from the connection map.
1402         let key = ConnMapKey {
1403             local_port,
1404             peer_port,
1405         };
1406         let conn = ctx.muxer.conn_map.get_mut(&key).unwrap();
1407 
1408         // Forcefully insert another flag.
1409         conn.insert_credit_update();
1410 
1411         // Call recv twice in order to check that the connection is still
1412         // in the rxq.
1413         assert!(ctx.muxer.has_pending_rx());
1414         ctx.recv();
1415         assert!(ctx.muxer.has_pending_rx());
1416         ctx.recv();
1417 
1418         // Since initially the connection had two flags set, now there should
1419         // not be any pending RX in the muxer.
1420         assert!(!ctx.muxer.has_pending_rx());
1421     }
1422 }
1423