xref: /cloud-hypervisor/virtio-devices/src/vsock/unix/muxer_killq.rs (revision 3ce0fef7fd546467398c914dbc74d8542e45cf6f)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 
5 //! `MuxerKillQ` implements a helper object that `VsockMuxer` can use for scheduling forced
6 //! connection termination. I.e. after one peer issues a clean shutdown request
7 //! (VSOCK_OP_SHUTDOWN), the concerned connection is queued for termination (VSOCK_OP_RST) in
8 //! the near future (herein implemented via an expiring timer).
9 //!
10 //! Whenever the muxer needs to schedule a connection for termination, it pushes it (or rather
11 //! an identifier - the connection key) to this queue. A subsequent pop() operation will
12 //! succeed if and only if the first connection in the queue is ready to be terminated (i.e.
13 //! its kill timer expired).
14 //!
15 //! Without using this queue, the muxer would have to walk its entire connection pool
16 //! (hashmap), whenever it needs to check for expired kill timers. With this queue, both
17 //! scheduling and termination are performed in constant time. However, since we don't want to
18 //! waste space on a kill queue that's as big as the connection hashmap itself, it is possible
19 //! that this queue may become full at times.  We call this kill queue "synchronized" if we are
20 //! certain that all connections that are awaiting termination are present in the queue. This
21 //! means a simple constant-time pop() operation is enough to check whether any connections
22 //! need to be terminated.  When the kill queue becomes full, though, pushing fails, so
23 //! connections that should be terminated are left out. The queue is not synchronized anymore.
24 //! When that happens, the muxer will first drain the queue, and then replace it with a new
25 //! queue, created by walking the connection pool, looking for connections that will be
26 //! expiring in the future.
27 
28 use std::collections::{HashMap, VecDeque};
29 use std::time::Instant;
30 
31 use super::defs;
32 use super::muxer::ConnMapKey;
33 use super::MuxerConnection;
34 
35 /// A kill queue item, holding the connection key and the scheduled time for termination.
36 ///
37 #[derive(Clone, Copy)]
38 struct MuxerKillQItem {
39     key: ConnMapKey,
40     kill_time: Instant,
41 }
42 
43 /// The connection kill queue: a FIFO structure, storing the connections that are scheduled for
44 /// termination.
45 ///
46 pub struct MuxerKillQ {
47     /// The kill queue contents.
48     q: VecDeque<MuxerKillQItem>,
49 
50     /// The kill queue sync status:
51     /// - when true, all connections that are awaiting termination are guaranteed to be in this
52     ///   queue;
53     /// - when false, some connections may have been left out.
54     ///
55     synced: bool,
56 }
57 
58 impl MuxerKillQ {
59     const SIZE: usize = defs::MUXER_KILLQ_SIZE;
60 
61     /// Trivial kill queue constructor.
62     ///
63     pub fn new() -> Self {
64         Self {
65             q: VecDeque::with_capacity(Self::SIZE),
66             synced: true,
67         }
68     }
69 
70     /// Create a kill queue by walking the connection pool, looking for connections that are
71     /// set to expire at some point in the future.
72     /// Note: if more than `Self::SIZE` connections are found, the queue will be created in an
73     ///       out-of-sync state, and will be discarded after it is emptied.
74     ///
75     pub fn from_conn_map(conn_map: &HashMap<ConnMapKey, MuxerConnection>) -> Self {
76         let mut q_buf: Vec<MuxerKillQItem> = Vec::with_capacity(Self::SIZE);
77         let mut synced = true;
78         for (key, conn) in conn_map.iter() {
79             if !conn.will_expire() {
80                 continue;
81             }
82             if q_buf.len() >= Self::SIZE {
83                 synced = false;
84                 break;
85             }
86             q_buf.push(MuxerKillQItem {
87                 key: *key,
88                 kill_time: conn.expiry().unwrap(),
89             });
90         }
91         q_buf.sort_unstable_by_key(|it| it.kill_time);
92         Self {
93             q: q_buf.into(),
94             synced,
95         }
96     }
97 
98     /// Push a connection key to the queue, scheduling it for termination at
99     /// `CONN_SHUTDOWN_TIMEOUT_MS` from now (the push time).
100     ///
101     pub fn push(&mut self, key: ConnMapKey, kill_time: Instant) {
102         if !self.is_synced() || self.is_full() {
103             self.synced = false;
104             return;
105         }
106         self.q.push_back(MuxerKillQItem { key, kill_time });
107     }
108 
109     /// Attempt to pop an expired connection from the kill queue.
110     ///
111     /// This will succeed and return a connection key, only if the connection at the front of
112     /// the queue has expired. Otherwise, `None` is returned.
113     ///
114     pub fn pop(&mut self) -> Option<ConnMapKey> {
115         if let Some(item) = self.q.front() {
116             if Instant::now() > item.kill_time {
117                 return Some(self.q.pop_front().unwrap().key);
118             }
119         }
120         None
121     }
122 
123     /// Check if the kill queue is synchronized with the connection pool.
124     ///
125     pub fn is_synced(&self) -> bool {
126         self.synced
127     }
128 
129     /// Check if the kill queue is empty, obviously.
130     ///
131     pub fn is_empty(&self) -> bool {
132         self.q.len() == 0
133     }
134 
135     /// Check if the kill queue is full.
136     ///
137     pub fn is_full(&self) -> bool {
138         self.q.len() == Self::SIZE
139     }
140 }
141