1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 // 4 5 //! `MuxerKillQ` implements a helper object that `VsockMuxer` can use for scheduling forced 6 //! connection termination. I.e. after one peer issues a clean shutdown request 7 //! (VSOCK_OP_SHUTDOWN), the concerned connection is queued for termination (VSOCK_OP_RST) in 8 //! the near future (herein implemented via an expiring timer). 9 //! 10 //! Whenever the muxer needs to schedule a connection for termination, it pushes it (or rather 11 //! an identifier - the connection key) to this queue. A subsequent pop() operation will 12 //! succeed if and only if the first connection in the queue is ready to be terminated (i.e. 13 //! its kill timer expired). 14 //! 15 //! Without using this queue, the muxer would have to walk its entire connection pool 16 //! (hashmap), whenever it needs to check for expired kill timers. With this queue, both 17 //! scheduling and termination are performed in constant time. However, since we don't want to 18 //! waste space on a kill queue that's as big as the connection hashmap itself, it is possible 19 //! that this queue may become full at times. We call this kill queue "synchronized" if we are 20 //! certain that all connections that are awaiting termination are present in the queue. This 21 //! means a simple constant-time pop() operation is enough to check whether any connections 22 //! need to be terminated. When the kill queue becomes full, though, pushing fails, so 23 //! connections that should be terminated are left out. The queue is not synchronized anymore. 24 //! When that happens, the muxer will first drain the queue, and then replace it with a new 25 //! queue, created by walking the connection pool, looking for connections that will be 26 //! expiring in the future. 27 28 use std::collections::{HashMap, VecDeque}; 29 use std::time::Instant; 30 31 use super::muxer::ConnMapKey; 32 use super::{defs, MuxerConnection}; 33 34 /// A kill queue item, holding the connection key and the scheduled time for termination. 35 /// 36 #[derive(Clone, Copy)] 37 struct MuxerKillQItem { 38 key: ConnMapKey, 39 kill_time: Instant, 40 } 41 42 /// The connection kill queue: a FIFO structure, storing the connections that are scheduled for 43 /// termination. 44 /// 45 pub struct MuxerKillQ { 46 /// The kill queue contents. 47 q: VecDeque<MuxerKillQItem>, 48 49 /// The kill queue sync status: 50 /// - when true, all connections that are awaiting termination are guaranteed to be in this 51 /// queue; 52 /// - when false, some connections may have been left out. 53 /// 54 synced: bool, 55 } 56 57 impl MuxerKillQ { 58 const SIZE: usize = defs::MUXER_KILLQ_SIZE; 59 60 /// Trivial kill queue constructor. 61 /// new() -> Self62 pub fn new() -> Self { 63 Self { 64 q: VecDeque::with_capacity(Self::SIZE), 65 synced: true, 66 } 67 } 68 69 /// Create a kill queue by walking the connection pool, looking for connections that are 70 /// set to expire at some point in the future. 71 /// Note: if more than `Self::SIZE` connections are found, the queue will be created in an 72 /// out-of-sync state, and will be discarded after it is emptied. 73 /// from_conn_map(conn_map: &HashMap<ConnMapKey, MuxerConnection>) -> Self74 pub fn from_conn_map(conn_map: &HashMap<ConnMapKey, MuxerConnection>) -> Self { 75 let mut q_buf: Vec<MuxerKillQItem> = Vec::with_capacity(Self::SIZE); 76 let mut synced = true; 77 for (key, conn) in conn_map.iter() { 78 if !conn.will_expire() { 79 continue; 80 } 81 if q_buf.len() >= Self::SIZE { 82 synced = false; 83 break; 84 } 85 q_buf.push(MuxerKillQItem { 86 key: *key, 87 kill_time: conn.expiry().unwrap(), 88 }); 89 } 90 q_buf.sort_unstable_by_key(|it| it.kill_time); 91 Self { 92 q: q_buf.into(), 93 synced, 94 } 95 } 96 97 /// Push a connection key to the queue, scheduling it for termination at 98 /// `CONN_SHUTDOWN_TIMEOUT_MS` from now (the push time). 99 /// push(&mut self, key: ConnMapKey, kill_time: Instant)100 pub fn push(&mut self, key: ConnMapKey, kill_time: Instant) { 101 if !self.is_synced() || self.is_full() { 102 self.synced = false; 103 return; 104 } 105 self.q.push_back(MuxerKillQItem { key, kill_time }); 106 } 107 108 /// Attempt to pop an expired connection from the kill queue. 109 /// 110 /// This will succeed and return a connection key, only if the connection at the front of 111 /// the queue has expired. Otherwise, `None` is returned. 112 /// pop(&mut self) -> Option<ConnMapKey>113 pub fn pop(&mut self) -> Option<ConnMapKey> { 114 if let Some(item) = self.q.front() { 115 if Instant::now() > item.kill_time { 116 return Some(self.q.pop_front().unwrap().key); 117 } 118 } 119 None 120 } 121 122 /// Check if the kill queue is synchronized with the connection pool. 123 /// is_synced(&self) -> bool124 pub fn is_synced(&self) -> bool { 125 self.synced 126 } 127 128 /// Check if the kill queue is empty, obviously. 129 /// is_empty(&self) -> bool130 pub fn is_empty(&self) -> bool { 131 self.q.len() == 0 132 } 133 134 /// Check if the kill queue is full. 135 /// is_full(&self) -> bool136 pub fn is_full(&self) -> bool { 137 self.q.len() == Self::SIZE 138 } 139 } 140