xref: /cloud-hypervisor/virtio-devices/src/vsock/unix/muxer_rxq.rs (revision 3ce0fef7fd546467398c914dbc74d8542e45cf6f)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 
5 //! `MuxerRxQ` implements a helper object that `VsockMuxer` can use for queuing RX (host -> guest)
6 //! packets (or rather instructions on how to build said packets).
7 //!
8 //! Under ideal operation, every connection, that has pending RX data, will be present in the muxer
9 //! RX queue. However, since the RX queue is smaller than the connection pool, it may, under some
10 //! conditions, become full, meaning that it can no longer account for all the connections that can
11 //! yield RX data.  When that happens, we say that it is no longer "synchronized" (i.e. with the
12 //! connection pool).  A desynchronized RX queue still holds valid data, and the muxer will
13 //! continue to pop packets from it. However, when a desynchronized queue is drained, additional
14 //! data may still be available, so the muxer will have to perform a more costly walk of the entire
15 //! connection pool to find it.  This walk is performed here, as part of building an RX queue from
16 //! the connection pool. When an out-of-sync is drained, the muxer will discard it, and attempt to
17 //! rebuild a synced one.
18 
19 use std::collections::{HashMap, VecDeque};
20 
21 use super::super::VsockChannel;
22 use super::defs;
23 use super::muxer::{ConnMapKey, MuxerRx};
24 use super::MuxerConnection;
25 
26 /// The muxer RX queue.
27 ///
28 pub struct MuxerRxQ {
29     /// The RX queue data.
30     q: VecDeque<MuxerRx>,
31     /// The RX queue sync status.
32     synced: bool,
33 }
34 
35 impl MuxerRxQ {
36     const SIZE: usize = defs::MUXER_RXQ_SIZE;
37 
38     /// Trivial RX queue constructor.
39     ///
40     pub fn new() -> Self {
41         Self {
42             q: VecDeque::with_capacity(Self::SIZE),
43             synced: true,
44         }
45     }
46 
47     /// Attempt to build an RX queue, that is synchronized to the connection pool.
48     /// Note: the resulting queue may still be desynchronized, if there are too many connections
49     ///       that have pending RX data. In that case, the muxer will first drain this queue, and
50     ///       then try again to build a synchronized one.
51     ///
52     pub fn from_conn_map(conn_map: &HashMap<ConnMapKey, MuxerConnection>) -> Self {
53         let mut q = VecDeque::new();
54         let mut synced = true;
55 
56         for (key, conn) in conn_map.iter() {
57             if !conn.has_pending_rx() {
58                 continue;
59             }
60             if q.len() >= Self::SIZE {
61                 synced = false;
62                 break;
63             }
64             q.push_back(MuxerRx::ConnRx(*key));
65         }
66         Self { q, synced }
67     }
68 
69     /// Push a new RX item to the queue.
70     ///
71     /// A push will fail when:
72     /// - trying to push a connection key onto an out-of-sync, or full queue; or
73     /// - trying to push an RST onto a queue already full of RSTs.
74     /// RSTs take precedence over connections, because connections can always be queried for
75     /// pending RX data later. Aside from this queue, there is no other storage for RSTs, so
76     /// failing to push one means that we have to drop the packet.
77     ///
78     /// Returns:
79     /// - `true` if the new item has been successfully queued; or
80     /// - `false` if there was no room left in the queue.
81     ///
82     pub fn push(&mut self, rx: MuxerRx) -> bool {
83         // Pushing to a non-full, synchronized queue will always succeed.
84         if self.is_synced() && !self.is_full() {
85             self.q.push_back(rx);
86             return true;
87         }
88 
89         match rx {
90             MuxerRx::RstPkt { .. } => {
91                 // If we just failed to push an RST packet, we'll look through the queue, trying to
92                 // find a connection key that we could evict. This way, the queue does lose sync,
93                 // but we don't drop any packets.
94                 for qi in self.q.iter_mut().rev() {
95                     if let MuxerRx::ConnRx(_) = qi {
96                         *qi = rx;
97                         self.synced = false;
98                         return true;
99                     }
100                 }
101             }
102             MuxerRx::ConnRx(_) => {
103                 self.synced = false;
104             }
105         };
106 
107         false
108     }
109 
110     /// Peek into the front of the queue.
111     ///
112     pub fn peek(&self) -> Option<MuxerRx> {
113         self.q.front().copied()
114     }
115 
116     /// Pop an RX item from the front of the queue.
117     ///
118     pub fn pop(&mut self) -> Option<MuxerRx> {
119         self.q.pop_front()
120     }
121 
122     /// Check if the RX queue is synchronized with the connection pool.
123     ///
124     pub fn is_synced(&self) -> bool {
125         self.synced
126     }
127 
128     /// Get the total number of items in the queue.
129     ///
130     pub fn len(&self) -> usize {
131         self.q.len()
132     }
133 
134     /// Check if the queue is empty.
135     ///
136     pub fn is_empty(&self) -> bool {
137         self.len() == 0
138     }
139 
140     /// Check if the queue is full.
141     ///
142     pub fn is_full(&self) -> bool {
143         self.len() == Self::SIZE
144     }
145 }
146