1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 // 4 5 //! `MuxerRxQ` implements a helper object that `VsockMuxer` can use for queuing RX (host -> guest) 6 //! packets (or rather instructions on how to build said packets). 7 //! 8 //! Under ideal operation, every connection, that has pending RX data, will be present in the muxer 9 //! RX queue. However, since the RX queue is smaller than the connection pool, it may, under some 10 //! conditions, become full, meaning that it can no longer account for all the connections that can 11 //! yield RX data. When that happens, we say that it is no longer "synchronized" (i.e. with the 12 //! connection pool). A desynchronized RX queue still holds valid data, and the muxer will 13 //! continue to pop packets from it. However, when a desynchronized queue is drained, additional 14 //! data may still be available, so the muxer will have to perform a more costly walk of the entire 15 //! connection pool to find it. This walk is performed here, as part of building an RX queue from 16 //! the connection pool. When an out-of-sync is drained, the muxer will discard it, and attempt to 17 //! rebuild a synced one. 18 19 use std::collections::{HashMap, VecDeque}; 20 21 use super::super::VsockChannel; 22 use super::muxer::{ConnMapKey, MuxerRx}; 23 use super::{defs, MuxerConnection}; 24 25 /// The muxer RX queue. 26 /// 27 pub struct MuxerRxQ { 28 /// The RX queue data. 29 q: VecDeque<MuxerRx>, 30 /// The RX queue sync status. 31 synced: bool, 32 } 33 34 impl MuxerRxQ { 35 const SIZE: usize = defs::MUXER_RXQ_SIZE; 36 37 /// Trivial RX queue constructor. 38 /// new() -> Self39 pub fn new() -> Self { 40 Self { 41 q: VecDeque::with_capacity(Self::SIZE), 42 synced: true, 43 } 44 } 45 46 /// Attempt to build an RX queue, that is synchronized to the connection pool. 47 /// Note: the resulting queue may still be desynchronized, if there are too many connections 48 /// that have pending RX data. In that case, the muxer will first drain this queue, and 49 /// then try again to build a synchronized one. 50 /// from_conn_map(conn_map: &HashMap<ConnMapKey, MuxerConnection>) -> Self51 pub fn from_conn_map(conn_map: &HashMap<ConnMapKey, MuxerConnection>) -> Self { 52 let mut q = VecDeque::new(); 53 let mut synced = true; 54 55 for (key, conn) in conn_map.iter() { 56 if !conn.has_pending_rx() { 57 continue; 58 } 59 if q.len() >= Self::SIZE { 60 synced = false; 61 break; 62 } 63 q.push_back(MuxerRx::ConnRx(*key)); 64 } 65 Self { q, synced } 66 } 67 68 /// Push a new RX item to the queue. 69 /// 70 /// A push will fail when: 71 /// - trying to push a connection key onto an out-of-sync, or full queue; or 72 /// - trying to push an RST onto a queue already full of RSTs. 73 /// 74 /// RSTs take precedence over connections, because connections can always be queried for 75 /// pending RX data later. Aside from this queue, there is no other storage for RSTs, so 76 /// failing to push one means that we have to drop the packet. 77 /// 78 /// Returns: 79 /// - `true` if the new item has been successfully queued; or 80 /// - `false` if there was no room left in the queue. 81 /// push(&mut self, rx: MuxerRx) -> bool82 pub fn push(&mut self, rx: MuxerRx) -> bool { 83 // Pushing to a non-full, synchronized queue will always succeed. 84 if self.is_synced() && !self.is_full() { 85 self.q.push_back(rx); 86 return true; 87 } 88 89 match rx { 90 MuxerRx::RstPkt { .. } => { 91 // If we just failed to push an RST packet, we'll look through the queue, trying to 92 // find a connection key that we could evict. This way, the queue does lose sync, 93 // but we don't drop any packets. 94 for qi in self.q.iter_mut().rev() { 95 if let MuxerRx::ConnRx(_) = qi { 96 *qi = rx; 97 self.synced = false; 98 return true; 99 } 100 } 101 } 102 MuxerRx::ConnRx(_) => { 103 self.synced = false; 104 } 105 }; 106 107 false 108 } 109 110 /// Peek into the front of the queue. 111 /// peek(&self) -> Option<MuxerRx>112 pub fn peek(&self) -> Option<MuxerRx> { 113 self.q.front().copied() 114 } 115 116 /// Pop an RX item from the front of the queue. 117 /// pop(&mut self) -> Option<MuxerRx>118 pub fn pop(&mut self) -> Option<MuxerRx> { 119 self.q.pop_front() 120 } 121 122 /// Check if the RX queue is synchronized with the connection pool. 123 /// is_synced(&self) -> bool124 pub fn is_synced(&self) -> bool { 125 self.synced 126 } 127 128 /// Get the total number of items in the queue. 129 /// len(&self) -> usize130 pub fn len(&self) -> usize { 131 self.q.len() 132 } 133 134 /// Check if the queue is empty. 135 /// is_empty(&self) -> bool136 pub fn is_empty(&self) -> bool { 137 self.len() == 0 138 } 139 140 /// Check if the queue is full. 141 /// is_full(&self) -> bool142 pub fn is_full(&self) -> bool { 143 self.len() == Self::SIZE 144 } 145 } 146