1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 // 4 5 //! `MuxerKillQ` implements a helper object that `VsockMuxer` can use for scheduling forced 6 //! connection termination. I.e. after one peer issues a clean shutdown request 7 //! (VSOCK_OP_SHUTDOWN), the concerned connection is queued for termination (VSOCK_OP_RST) in 8 //! the near future (herein implemented via an expiring timer). 9 //! 10 //! Whenever the muxer needs to schedule a connection for termination, it pushes it (or rather 11 //! an identifier - the connection key) to this queue. A subsequent pop() operation will 12 //! succeed if and only if the first connection in the queue is ready to be terminated (i.e. 13 //! its kill timer expired). 14 //! 15 //! Without using this queue, the muxer would have to walk its entire connection pool 16 //! (hashmap), whenever it needs to check for expired kill timers. With this queue, both 17 //! scheduling and termination are performed in constant time. However, since we don't want to 18 //! waste space on a kill queue that's as big as the connection hashmap itself, it is possible 19 //! that this queue may become full at times. We call this kill queue "synchronized" if we are 20 //! certain that all connections that are awaiting termination are present in the queue. This 21 //! means a simple constant-time pop() operation is enough to check whether any connections 22 //! need to be terminated. When the kill queue becomes full, though, pushing fails, so 23 //! connections that should be terminated are left out. The queue is not synchronized anymore. 24 //! When that happens, the muxer will first drain the queue, and then replace it with a new 25 //! queue, created by walking the connection pool, looking for connections that will be 26 //! expiring in the future. 27 28 use std::collections::{HashMap, VecDeque}; 29 use std::time::Instant; 30 31 use super::defs; 32 use super::muxer::ConnMapKey; 33 use super::MuxerConnection; 34 35 /// A kill queue item, holding the connection key and the scheduled time for termination. 36 /// 37 #[derive(Clone, Copy)] 38 struct MuxerKillQItem { 39 key: ConnMapKey, 40 kill_time: Instant, 41 } 42 43 /// The connection kill queue: a FIFO structure, storing the connections that are scheduled for 44 /// termination. 45 /// 46 pub struct MuxerKillQ { 47 /// The kill queue contents. 48 q: VecDeque<MuxerKillQItem>, 49 50 /// The kill queue sync status: 51 /// - when true, all connections that are awaiting termination are guaranteed to be in this 52 /// queue; 53 /// - when false, some connections may have been left out. 54 /// 55 synced: bool, 56 } 57 58 impl MuxerKillQ { 59 const SIZE: usize = defs::MUXER_KILLQ_SIZE; 60 61 /// Trivial kill queue constructor. 62 /// 63 pub fn new() -> Self { 64 Self { 65 q: VecDeque::with_capacity(Self::SIZE), 66 synced: true, 67 } 68 } 69 70 /// Create a kill queue by walking the connection pool, looking for connections that are 71 /// set to expire at some point in the future. 72 /// Note: if more than `Self::SIZE` connections are found, the queue will be created in an 73 /// out-of-sync state, and will be discarded after it is emptied. 74 /// 75 pub fn from_conn_map(conn_map: &HashMap<ConnMapKey, MuxerConnection>) -> Self { 76 let mut q_buf: Vec<MuxerKillQItem> = Vec::with_capacity(Self::SIZE); 77 let mut synced = true; 78 for (key, conn) in conn_map.iter() { 79 if !conn.will_expire() { 80 continue; 81 } 82 if q_buf.len() >= Self::SIZE { 83 synced = false; 84 break; 85 } 86 q_buf.push(MuxerKillQItem { 87 key: *key, 88 kill_time: conn.expiry().unwrap(), 89 }); 90 } 91 q_buf.sort_unstable_by_key(|it| it.kill_time); 92 Self { 93 q: q_buf.into(), 94 synced, 95 } 96 } 97 98 /// Push a connection key to the queue, scheduling it for termination at 99 /// `CONN_SHUTDOWN_TIMEOUT_MS` from now (the push time). 100 /// 101 pub fn push(&mut self, key: ConnMapKey, kill_time: Instant) { 102 if !self.is_synced() || self.is_full() { 103 self.synced = false; 104 return; 105 } 106 self.q.push_back(MuxerKillQItem { key, kill_time }); 107 } 108 109 /// Attempt to pop an expired connection from the kill queue. 110 /// 111 /// This will succeed and return a connection key, only if the connection at the front of 112 /// the queue has expired. Otherwise, `None` is returned. 113 /// 114 pub fn pop(&mut self) -> Option<ConnMapKey> { 115 if let Some(item) = self.q.front() { 116 if Instant::now() > item.kill_time { 117 return Some(self.q.pop_front().unwrap().key); 118 } 119 } 120 None 121 } 122 123 /// Check if the kill queue is synchronized with the connection pool. 124 /// 125 pub fn is_synced(&self) -> bool { 126 self.synced 127 } 128 129 /// Check if the kill queue is empty, obviously. 130 /// 131 pub fn is_empty(&self) -> bool { 132 self.q.len() == 0 133 } 134 135 /// Check if the kill queue is full. 136 /// 137 pub fn is_full(&self) -> bool { 138 self.q.len() == Self::SIZE 139 } 140 } 141