xref: /cloud-hypervisor/virtio-devices/src/vsock/unix/muxer_rxq.rs (revision fee769bed4c58a07b67e25a7339cfd397f701f3a)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 
5 //! `MuxerRxQ` implements a helper object that `VsockMuxer` can use for queuing RX (host -> guest)
6 //! packets (or rather instructions on how to build said packets).
7 //!
8 //! Under ideal operation, every connection, that has pending RX data, will be present in the muxer
9 //! RX queue. However, since the RX queue is smaller than the connection pool, it may, under some
10 //! conditions, become full, meaning that it can no longer account for all the connections that can
11 //! yield RX data.  When that happens, we say that it is no longer "synchronized" (i.e. with the
12 //! connection pool).  A desynchronized RX queue still holds valid data, and the muxer will
13 //! continue to pop packets from it. However, when a desynchronized queue is drained, additional
14 //! data may still be available, so the muxer will have to perform a more costly walk of the entire
15 //! connection pool to find it.  This walk is performed here, as part of building an RX queue from
16 //! the connection pool. When an out-of-sync is drained, the muxer will discard it, and attempt to
17 //! rebuild a synced one.
18 
19 use std::collections::{HashMap, VecDeque};
20 
21 use super::super::VsockChannel;
22 use super::defs;
23 use super::muxer::{ConnMapKey, MuxerRx};
24 use super::MuxerConnection;
25 
26 /// The muxer RX queue.
27 ///
28 pub struct MuxerRxQ {
29     /// The RX queue data.
30     q: VecDeque<MuxerRx>,
31     /// The RX queue sync status.
32     synced: bool,
33 }
34 
35 impl MuxerRxQ {
36     const SIZE: usize = defs::MUXER_RXQ_SIZE;
37 
38     /// Trivial RX queue constructor.
39     ///
40     pub fn new() -> Self {
41         Self {
42             q: VecDeque::with_capacity(Self::SIZE),
43             synced: true,
44         }
45     }
46 
47     /// Attempt to build an RX queue, that is synchronized to the connection pool.
48     /// Note: the resulting queue may still be desynchronized, if there are too many connections
49     ///       that have pending RX data. In that case, the muxer will first drain this queue, and
50     ///       then try again to build a synchronized one.
51     ///
52     pub fn from_conn_map(conn_map: &HashMap<ConnMapKey, MuxerConnection>) -> Self {
53         let mut q = VecDeque::new();
54         let mut synced = true;
55 
56         for (key, conn) in conn_map.iter() {
57             if !conn.has_pending_rx() {
58                 continue;
59             }
60             if q.len() >= Self::SIZE {
61                 synced = false;
62                 break;
63             }
64             q.push_back(MuxerRx::ConnRx(*key));
65         }
66         Self { q, synced }
67     }
68 
69     /// Push a new RX item to the queue.
70     ///
71     /// A push will fail when:
72     /// - trying to push a connection key onto an out-of-sync, or full queue; or
73     /// - trying to push an RST onto a queue already full of RSTs.
74     ///
75     /// RSTs take precedence over connections, because connections can always be queried for
76     /// pending RX data later. Aside from this queue, there is no other storage for RSTs, so
77     /// failing to push one means that we have to drop the packet.
78     ///
79     /// Returns:
80     /// - `true` if the new item has been successfully queued; or
81     /// - `false` if there was no room left in the queue.
82     ///
83     pub fn push(&mut self, rx: MuxerRx) -> bool {
84         // Pushing to a non-full, synchronized queue will always succeed.
85         if self.is_synced() && !self.is_full() {
86             self.q.push_back(rx);
87             return true;
88         }
89 
90         match rx {
91             MuxerRx::RstPkt { .. } => {
92                 // If we just failed to push an RST packet, we'll look through the queue, trying to
93                 // find a connection key that we could evict. This way, the queue does lose sync,
94                 // but we don't drop any packets.
95                 for qi in self.q.iter_mut().rev() {
96                     if let MuxerRx::ConnRx(_) = qi {
97                         *qi = rx;
98                         self.synced = false;
99                         return true;
100                     }
101                 }
102             }
103             MuxerRx::ConnRx(_) => {
104                 self.synced = false;
105             }
106         };
107 
108         false
109     }
110 
111     /// Peek into the front of the queue.
112     ///
113     pub fn peek(&self) -> Option<MuxerRx> {
114         self.q.front().copied()
115     }
116 
117     /// Pop an RX item from the front of the queue.
118     ///
119     pub fn pop(&mut self) -> Option<MuxerRx> {
120         self.q.pop_front()
121     }
122 
123     /// Check if the RX queue is synchronized with the connection pool.
124     ///
125     pub fn is_synced(&self) -> bool {
126         self.synced
127     }
128 
129     /// Get the total number of items in the queue.
130     ///
131     pub fn len(&self) -> usize {
132         self.q.len()
133     }
134 
135     /// Check if the queue is empty.
136     ///
137     pub fn is_empty(&self) -> bool {
138         self.len() == 0
139     }
140 
141     /// Check if the queue is full.
142     ///
143     pub fn is_full(&self) -> bool {
144         self.len() == Self::SIZE
145     }
146 }
147