1434a5d0eSSebastien Boeuf // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2434a5d0eSSebastien Boeuf // SPDX-License-Identifier: Apache-2.0 3434a5d0eSSebastien Boeuf // 4434a5d0eSSebastien Boeuf 5f6236087SAlyssa Ross //! `MuxerKillQ` implements a helper object that `VsockMuxer` can use for scheduling forced 6f6236087SAlyssa Ross //! connection termination. I.e. after one peer issues a clean shutdown request 7f6236087SAlyssa Ross //! (VSOCK_OP_SHUTDOWN), the concerned connection is queued for termination (VSOCK_OP_RST) in 8f6236087SAlyssa Ross //! the near future (herein implemented via an expiring timer). 9f6236087SAlyssa Ross //! 10f6236087SAlyssa Ross //! Whenever the muxer needs to schedule a connection for termination, it pushes it (or rather 11f6236087SAlyssa Ross //! an identifier - the connection key) to this queue. A subsequent pop() operation will 12f6236087SAlyssa Ross //! succeed if and only if the first connection in the queue is ready to be terminated (i.e. 13f6236087SAlyssa Ross //! its kill timer expired). 14f6236087SAlyssa Ross //! 15f6236087SAlyssa Ross //! Without using this queue, the muxer would have to walk its entire connection pool 16f6236087SAlyssa Ross //! (hashmap), whenever it needs to check for expired kill timers. With this queue, both 17f6236087SAlyssa Ross //! scheduling and termination are performed in constant time. However, since we don't want to 18f6236087SAlyssa Ross //! waste space on a kill queue that's as big as the connection hashmap itself, it is possible 19f6236087SAlyssa Ross //! that this queue may become full at times. We call this kill queue "synchronized" if we are 20f6236087SAlyssa Ross //! certain that all connections that are awaiting termination are present in the queue. This 21f6236087SAlyssa Ross //! means a simple constant-time pop() operation is enough to check whether any connections 22f6236087SAlyssa Ross //! need to be terminated. When the kill queue becomes full, though, pushing fails, so 23f6236087SAlyssa Ross //! connections that should be terminated are left out. The queue is not synchronized anymore. 24f6236087SAlyssa Ross //! When that happens, the muxer will first drain the queue, and then replace it with a new 25f6236087SAlyssa Ross //! queue, created by walking the connection pool, looking for connections that will be 26f6236087SAlyssa Ross //! expiring in the future. 27f6236087SAlyssa Ross 28434a5d0eSSebastien Boeuf use std::collections::{HashMap, VecDeque}; 29434a5d0eSSebastien Boeuf use std::time::Instant; 30434a5d0eSSebastien Boeuf 31434a5d0eSSebastien Boeuf use super::muxer::ConnMapKey; 32*61e57e1cSRuoqing He use super::{defs, MuxerConnection}; 33434a5d0eSSebastien Boeuf 34434a5d0eSSebastien Boeuf /// A kill queue item, holding the connection key and the scheduled time for termination. 35434a5d0eSSebastien Boeuf /// 36434a5d0eSSebastien Boeuf #[derive(Clone, Copy)] 37434a5d0eSSebastien Boeuf struct MuxerKillQItem { 38434a5d0eSSebastien Boeuf key: ConnMapKey, 39434a5d0eSSebastien Boeuf kill_time: Instant, 40434a5d0eSSebastien Boeuf } 41434a5d0eSSebastien Boeuf 42434a5d0eSSebastien Boeuf /// The connection kill queue: a FIFO structure, storing the connections that are scheduled for 43434a5d0eSSebastien Boeuf /// termination. 44434a5d0eSSebastien Boeuf /// 45434a5d0eSSebastien Boeuf pub struct MuxerKillQ { 46434a5d0eSSebastien Boeuf /// The kill queue contents. 47434a5d0eSSebastien Boeuf q: VecDeque<MuxerKillQItem>, 48434a5d0eSSebastien Boeuf 49434a5d0eSSebastien Boeuf /// The kill queue sync status: 50434a5d0eSSebastien Boeuf /// - when true, all connections that are awaiting termination are guaranteed to be in this 51434a5d0eSSebastien Boeuf /// queue; 52434a5d0eSSebastien Boeuf /// - when false, some connections may have been left out. 53434a5d0eSSebastien Boeuf /// 54434a5d0eSSebastien Boeuf synced: bool, 55434a5d0eSSebastien Boeuf } 56434a5d0eSSebastien Boeuf 57434a5d0eSSebastien Boeuf impl MuxerKillQ { 58434a5d0eSSebastien Boeuf const SIZE: usize = defs::MUXER_KILLQ_SIZE; 59434a5d0eSSebastien Boeuf 60434a5d0eSSebastien Boeuf /// Trivial kill queue constructor. 61434a5d0eSSebastien Boeuf /// new() -> Self62434a5d0eSSebastien Boeuf pub fn new() -> Self { 63434a5d0eSSebastien Boeuf Self { 64434a5d0eSSebastien Boeuf q: VecDeque::with_capacity(Self::SIZE), 65434a5d0eSSebastien Boeuf synced: true, 66434a5d0eSSebastien Boeuf } 67434a5d0eSSebastien Boeuf } 68434a5d0eSSebastien Boeuf 69434a5d0eSSebastien Boeuf /// Create a kill queue by walking the connection pool, looking for connections that are 70434a5d0eSSebastien Boeuf /// set to expire at some point in the future. 71434a5d0eSSebastien Boeuf /// Note: if more than `Self::SIZE` connections are found, the queue will be created in an 72434a5d0eSSebastien Boeuf /// out-of-sync state, and will be discarded after it is emptied. 73434a5d0eSSebastien Boeuf /// from_conn_map(conn_map: &HashMap<ConnMapKey, MuxerConnection>) -> Self74434a5d0eSSebastien Boeuf pub fn from_conn_map(conn_map: &HashMap<ConnMapKey, MuxerConnection>) -> Self { 75434a5d0eSSebastien Boeuf let mut q_buf: Vec<MuxerKillQItem> = Vec::with_capacity(Self::SIZE); 76434a5d0eSSebastien Boeuf let mut synced = true; 77434a5d0eSSebastien Boeuf for (key, conn) in conn_map.iter() { 78434a5d0eSSebastien Boeuf if !conn.will_expire() { 79434a5d0eSSebastien Boeuf continue; 80434a5d0eSSebastien Boeuf } 81434a5d0eSSebastien Boeuf if q_buf.len() >= Self::SIZE { 82434a5d0eSSebastien Boeuf synced = false; 83434a5d0eSSebastien Boeuf break; 84434a5d0eSSebastien Boeuf } 85434a5d0eSSebastien Boeuf q_buf.push(MuxerKillQItem { 86434a5d0eSSebastien Boeuf key: *key, 87434a5d0eSSebastien Boeuf kill_time: conn.expiry().unwrap(), 88434a5d0eSSebastien Boeuf }); 89434a5d0eSSebastien Boeuf } 90434a5d0eSSebastien Boeuf q_buf.sort_unstable_by_key(|it| it.kill_time); 91434a5d0eSSebastien Boeuf Self { 92434a5d0eSSebastien Boeuf q: q_buf.into(), 93434a5d0eSSebastien Boeuf synced, 94434a5d0eSSebastien Boeuf } 95434a5d0eSSebastien Boeuf } 96434a5d0eSSebastien Boeuf 97434a5d0eSSebastien Boeuf /// Push a connection key to the queue, scheduling it for termination at 98434a5d0eSSebastien Boeuf /// `CONN_SHUTDOWN_TIMEOUT_MS` from now (the push time). 99434a5d0eSSebastien Boeuf /// push(&mut self, key: ConnMapKey, kill_time: Instant)100434a5d0eSSebastien Boeuf pub fn push(&mut self, key: ConnMapKey, kill_time: Instant) { 101434a5d0eSSebastien Boeuf if !self.is_synced() || self.is_full() { 102434a5d0eSSebastien Boeuf self.synced = false; 103434a5d0eSSebastien Boeuf return; 104434a5d0eSSebastien Boeuf } 105434a5d0eSSebastien Boeuf self.q.push_back(MuxerKillQItem { key, kill_time }); 106434a5d0eSSebastien Boeuf } 107434a5d0eSSebastien Boeuf 108434a5d0eSSebastien Boeuf /// Attempt to pop an expired connection from the kill queue. 109434a5d0eSSebastien Boeuf /// 110434a5d0eSSebastien Boeuf /// This will succeed and return a connection key, only if the connection at the front of 111434a5d0eSSebastien Boeuf /// the queue has expired. Otherwise, `None` is returned. 112434a5d0eSSebastien Boeuf /// pop(&mut self) -> Option<ConnMapKey>113434a5d0eSSebastien Boeuf pub fn pop(&mut self) -> Option<ConnMapKey> { 114434a5d0eSSebastien Boeuf if let Some(item) = self.q.front() { 115434a5d0eSSebastien Boeuf if Instant::now() > item.kill_time { 116434a5d0eSSebastien Boeuf return Some(self.q.pop_front().unwrap().key); 117434a5d0eSSebastien Boeuf } 118434a5d0eSSebastien Boeuf } 119434a5d0eSSebastien Boeuf None 120434a5d0eSSebastien Boeuf } 121434a5d0eSSebastien Boeuf 122434a5d0eSSebastien Boeuf /// Check if the kill queue is synchronized with the connection pool. 123434a5d0eSSebastien Boeuf /// is_synced(&self) -> bool124434a5d0eSSebastien Boeuf pub fn is_synced(&self) -> bool { 125434a5d0eSSebastien Boeuf self.synced 126434a5d0eSSebastien Boeuf } 127434a5d0eSSebastien Boeuf 128434a5d0eSSebastien Boeuf /// Check if the kill queue is empty, obviously. 129434a5d0eSSebastien Boeuf /// is_empty(&self) -> bool130434a5d0eSSebastien Boeuf pub fn is_empty(&self) -> bool { 131434a5d0eSSebastien Boeuf self.q.len() == 0 132434a5d0eSSebastien Boeuf } 133434a5d0eSSebastien Boeuf 134434a5d0eSSebastien Boeuf /// Check if the kill queue is full. 135434a5d0eSSebastien Boeuf /// is_full(&self) -> bool136434a5d0eSSebastien Boeuf pub fn is_full(&self) -> bool { 137434a5d0eSSebastien Boeuf self.q.len() == Self::SIZE 138434a5d0eSSebastien Boeuf } 139434a5d0eSSebastien Boeuf } 140