xref: /cloud-hypervisor/virtio-devices/src/vsock/unix/muxer_rxq.rs (revision eeae63b4595fbf0cc69f62b6e9d9a79c543c4ac7)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 
5 //! `MuxerRxQ` implements a helper object that `VsockMuxer` can use for queuing RX (host -> guest)
6 //! packets (or rather instructions on how to build said packets).
7 //!
8 //! Under ideal operation, every connection, that has pending RX data, will be present in the muxer
9 //! RX queue. However, since the RX queue is smaller than the connection pool, it may, under some
10 //! conditions, become full, meaning that it can no longer account for all the connections that can
11 //! yield RX data.  When that happens, we say that it is no longer "synchronized" (i.e. with the
12 //! connection pool).  A desynchronized RX queue still holds valid data, and the muxer will
13 //! continue to pop packets from it. However, when a desynchronized queue is drained, additional
14 //! data may still be available, so the muxer will have to perform a more costly walk of the entire
15 //! connection pool to find it.  This walk is performed here, as part of building an RX queue from
16 //! the connection pool. When an out-of-sync is drained, the muxer will discard it, and attempt to
17 //! rebuild a synced one.
18 
19 use std::collections::{HashMap, VecDeque};
20 
21 use super::super::VsockChannel;
22 use super::muxer::{ConnMapKey, MuxerRx};
23 use super::{defs, MuxerConnection};
24 
25 /// The muxer RX queue.
26 ///
27 pub struct MuxerRxQ {
28     /// The RX queue data.
29     q: VecDeque<MuxerRx>,
30     /// The RX queue sync status.
31     synced: bool,
32 }
33 
34 impl MuxerRxQ {
35     const SIZE: usize = defs::MUXER_RXQ_SIZE;
36 
37     /// Trivial RX queue constructor.
38     ///
39     pub fn new() -> Self {
40         Self {
41             q: VecDeque::with_capacity(Self::SIZE),
42             synced: true,
43         }
44     }
45 
46     /// Attempt to build an RX queue, that is synchronized to the connection pool.
47     /// Note: the resulting queue may still be desynchronized, if there are too many connections
48     ///       that have pending RX data. In that case, the muxer will first drain this queue, and
49     ///       then try again to build a synchronized one.
50     ///
51     pub fn from_conn_map(conn_map: &HashMap<ConnMapKey, MuxerConnection>) -> Self {
52         let mut q = VecDeque::new();
53         let mut synced = true;
54 
55         for (key, conn) in conn_map.iter() {
56             if !conn.has_pending_rx() {
57                 continue;
58             }
59             if q.len() >= Self::SIZE {
60                 synced = false;
61                 break;
62             }
63             q.push_back(MuxerRx::ConnRx(*key));
64         }
65         Self { q, synced }
66     }
67 
68     /// Push a new RX item to the queue.
69     ///
70     /// A push will fail when:
71     /// - trying to push a connection key onto an out-of-sync, or full queue; or
72     /// - trying to push an RST onto a queue already full of RSTs.
73     ///
74     /// RSTs take precedence over connections, because connections can always be queried for
75     /// pending RX data later. Aside from this queue, there is no other storage for RSTs, so
76     /// failing to push one means that we have to drop the packet.
77     ///
78     /// Returns:
79     /// - `true` if the new item has been successfully queued; or
80     /// - `false` if there was no room left in the queue.
81     ///
82     pub fn push(&mut self, rx: MuxerRx) -> bool {
83         // Pushing to a non-full, synchronized queue will always succeed.
84         if self.is_synced() && !self.is_full() {
85             self.q.push_back(rx);
86             return true;
87         }
88 
89         match rx {
90             MuxerRx::RstPkt { .. } => {
91                 // If we just failed to push an RST packet, we'll look through the queue, trying to
92                 // find a connection key that we could evict. This way, the queue does lose sync,
93                 // but we don't drop any packets.
94                 for qi in self.q.iter_mut().rev() {
95                     if let MuxerRx::ConnRx(_) = qi {
96                         *qi = rx;
97                         self.synced = false;
98                         return true;
99                     }
100                 }
101             }
102             MuxerRx::ConnRx(_) => {
103                 self.synced = false;
104             }
105         };
106 
107         false
108     }
109 
110     /// Peek into the front of the queue.
111     ///
112     pub fn peek(&self) -> Option<MuxerRx> {
113         self.q.front().copied()
114     }
115 
116     /// Pop an RX item from the front of the queue.
117     ///
118     pub fn pop(&mut self) -> Option<MuxerRx> {
119         self.q.pop_front()
120     }
121 
122     /// Check if the RX queue is synchronized with the connection pool.
123     ///
124     pub fn is_synced(&self) -> bool {
125         self.synced
126     }
127 
128     /// Get the total number of items in the queue.
129     ///
130     pub fn len(&self) -> usize {
131         self.q.len()
132     }
133 
134     /// Check if the queue is empty.
135     ///
136     pub fn is_empty(&self) -> bool {
137         self.len() == 0
138     }
139 
140     /// Check if the queue is full.
141     ///
142     pub fn is_full(&self) -> bool {
143         self.len() == Self::SIZE
144     }
145 }
146