1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 // 4 5 //! `MuxerRxQ` implements a helper object that `VsockMuxer` can use for queuing RX (host -> guest) 6 //! packets (or rather instructions on how to build said packets). 7 //! 8 //! Under ideal operation, every connection, that has pending RX data, will be present in the muxer 9 //! RX queue. However, since the RX queue is smaller than the connection pool, it may, under some 10 //! conditions, become full, meaning that it can no longer account for all the connections that can 11 //! yield RX data. When that happens, we say that it is no longer "synchronized" (i.e. with the 12 //! connection pool). A desynchronized RX queue still holds valid data, and the muxer will 13 //! continue to pop packets from it. However, when a desynchronized queue is drained, additional 14 //! data may still be available, so the muxer will have to perform a more costly walk of the entire 15 //! connection pool to find it. This walk is performed here, as part of building an RX queue from 16 //! the connection pool. When an out-of-sync is drained, the muxer will discard it, and attempt to 17 //! rebuild a synced one. 18 19 use std::collections::{HashMap, VecDeque}; 20 21 use super::super::VsockChannel; 22 use super::defs; 23 use super::muxer::{ConnMapKey, MuxerRx}; 24 use super::MuxerConnection; 25 26 /// The muxer RX queue. 27 /// 28 pub struct MuxerRxQ { 29 /// The RX queue data. 30 q: VecDeque<MuxerRx>, 31 /// The RX queue sync status. 32 synced: bool, 33 } 34 35 impl MuxerRxQ { 36 const SIZE: usize = defs::MUXER_RXQ_SIZE; 37 38 /// Trivial RX queue constructor. 39 /// 40 pub fn new() -> Self { 41 Self { 42 q: VecDeque::with_capacity(Self::SIZE), 43 synced: true, 44 } 45 } 46 47 /// Attempt to build an RX queue, that is synchronized to the connection pool. 48 /// Note: the resulting queue may still be desynchronized, if there are too many connections 49 /// that have pending RX data. In that case, the muxer will first drain this queue, and 50 /// then try again to build a synchronized one. 51 /// 52 pub fn from_conn_map(conn_map: &HashMap<ConnMapKey, MuxerConnection>) -> Self { 53 let mut q = VecDeque::new(); 54 let mut synced = true; 55 56 for (key, conn) in conn_map.iter() { 57 if !conn.has_pending_rx() { 58 continue; 59 } 60 if q.len() >= Self::SIZE { 61 synced = false; 62 break; 63 } 64 q.push_back(MuxerRx::ConnRx(*key)); 65 } 66 Self { q, synced } 67 } 68 69 /// Push a new RX item to the queue. 70 /// 71 /// A push will fail when: 72 /// - trying to push a connection key onto an out-of-sync, or full queue; or 73 /// - trying to push an RST onto a queue already full of RSTs. 74 /// 75 /// RSTs take precedence over connections, because connections can always be queried for 76 /// pending RX data later. Aside from this queue, there is no other storage for RSTs, so 77 /// failing to push one means that we have to drop the packet. 78 /// 79 /// Returns: 80 /// - `true` if the new item has been successfully queued; or 81 /// - `false` if there was no room left in the queue. 82 /// 83 pub fn push(&mut self, rx: MuxerRx) -> bool { 84 // Pushing to a non-full, synchronized queue will always succeed. 85 if self.is_synced() && !self.is_full() { 86 self.q.push_back(rx); 87 return true; 88 } 89 90 match rx { 91 MuxerRx::RstPkt { .. } => { 92 // If we just failed to push an RST packet, we'll look through the queue, trying to 93 // find a connection key that we could evict. This way, the queue does lose sync, 94 // but we don't drop any packets. 95 for qi in self.q.iter_mut().rev() { 96 if let MuxerRx::ConnRx(_) = qi { 97 *qi = rx; 98 self.synced = false; 99 return true; 100 } 101 } 102 } 103 MuxerRx::ConnRx(_) => { 104 self.synced = false; 105 } 106 }; 107 108 false 109 } 110 111 /// Peek into the front of the queue. 112 /// 113 pub fn peek(&self) -> Option<MuxerRx> { 114 self.q.front().copied() 115 } 116 117 /// Pop an RX item from the front of the queue. 118 /// 119 pub fn pop(&mut self) -> Option<MuxerRx> { 120 self.q.pop_front() 121 } 122 123 /// Check if the RX queue is synchronized with the connection pool. 124 /// 125 pub fn is_synced(&self) -> bool { 126 self.synced 127 } 128 129 /// Get the total number of items in the queue. 130 /// 131 pub fn len(&self) -> usize { 132 self.q.len() 133 } 134 135 /// Check if the queue is empty. 136 /// 137 pub fn is_empty(&self) -> bool { 138 self.len() == 0 139 } 140 141 /// Check if the queue is full. 142 /// 143 pub fn is_full(&self) -> bool { 144 self.len() == Self::SIZE 145 } 146 } 147