xref: /cloud-hypervisor/virtio-devices/src/vsock/unix/muxer_killq.rs (revision 80b2c98a68d4c68f372f849e8d26f7cae5867000)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 
5 //! `MuxerKillQ` implements a helper object that `VsockMuxer` can use for scheduling forced
6 //! connection termination. I.e. after one peer issues a clean shutdown request
7 //! (VSOCK_OP_SHUTDOWN), the concerned connection is queued for termination (VSOCK_OP_RST) in
8 //! the near future (herein implemented via an expiring timer).
9 //!
10 //! Whenever the muxer needs to schedule a connection for termination, it pushes it (or rather
11 //! an identifier - the connection key) to this queue. A subsequent pop() operation will
12 //! succeed if and only if the first connection in the queue is ready to be terminated (i.e.
13 //! its kill timer expired).
14 //!
15 //! Without using this queue, the muxer would have to walk its entire connection pool
16 //! (hashmap), whenever it needs to check for expired kill timers. With this queue, both
17 //! scheduling and termination are performed in constant time. However, since we don't want to
18 //! waste space on a kill queue that's as big as the connection hashmap itself, it is possible
19 //! that this queue may become full at times.  We call this kill queue "synchronized" if we are
20 //! certain that all connections that are awaiting termination are present in the queue. This
21 //! means a simple constant-time pop() operation is enough to check whether any connections
22 //! need to be terminated.  When the kill queue becomes full, though, pushing fails, so
23 //! connections that should be terminated are left out. The queue is not synchronized anymore.
24 //! When that happens, the muxer will first drain the queue, and then replace it with a new
25 //! queue, created by walking the connection pool, looking for connections that will be
26 //! expiring in the future.
27 
28 use std::collections::{HashMap, VecDeque};
29 use std::time::Instant;
30 
31 use super::muxer::ConnMapKey;
32 use super::{defs, MuxerConnection};
33 
34 /// A kill queue item, holding the connection key and the scheduled time for termination.
35 ///
36 #[derive(Clone, Copy)]
37 struct MuxerKillQItem {
38     key: ConnMapKey,
39     kill_time: Instant,
40 }
41 
42 /// The connection kill queue: a FIFO structure, storing the connections that are scheduled for
43 /// termination.
44 ///
45 pub struct MuxerKillQ {
46     /// The kill queue contents.
47     q: VecDeque<MuxerKillQItem>,
48 
49     /// The kill queue sync status:
50     /// - when true, all connections that are awaiting termination are guaranteed to be in this
51     ///   queue;
52     /// - when false, some connections may have been left out.
53     ///
54     synced: bool,
55 }
56 
57 impl MuxerKillQ {
58     const SIZE: usize = defs::MUXER_KILLQ_SIZE;
59 
60     /// Trivial kill queue constructor.
61     ///
62     pub fn new() -> Self {
63         Self {
64             q: VecDeque::with_capacity(Self::SIZE),
65             synced: true,
66         }
67     }
68 
69     /// Create a kill queue by walking the connection pool, looking for connections that are
70     /// set to expire at some point in the future.
71     /// Note: if more than `Self::SIZE` connections are found, the queue will be created in an
72     ///       out-of-sync state, and will be discarded after it is emptied.
73     ///
74     pub fn from_conn_map(conn_map: &HashMap<ConnMapKey, MuxerConnection>) -> Self {
75         let mut q_buf: Vec<MuxerKillQItem> = Vec::with_capacity(Self::SIZE);
76         let mut synced = true;
77         for (key, conn) in conn_map.iter() {
78             if !conn.will_expire() {
79                 continue;
80             }
81             if q_buf.len() >= Self::SIZE {
82                 synced = false;
83                 break;
84             }
85             q_buf.push(MuxerKillQItem {
86                 key: *key,
87                 kill_time: conn.expiry().unwrap(),
88             });
89         }
90         q_buf.sort_unstable_by_key(|it| it.kill_time);
91         Self {
92             q: q_buf.into(),
93             synced,
94         }
95     }
96 
97     /// Push a connection key to the queue, scheduling it for termination at
98     /// `CONN_SHUTDOWN_TIMEOUT_MS` from now (the push time).
99     ///
100     pub fn push(&mut self, key: ConnMapKey, kill_time: Instant) {
101         if !self.is_synced() || self.is_full() {
102             self.synced = false;
103             return;
104         }
105         self.q.push_back(MuxerKillQItem { key, kill_time });
106     }
107 
108     /// Attempt to pop an expired connection from the kill queue.
109     ///
110     /// This will succeed and return a connection key, only if the connection at the front of
111     /// the queue has expired. Otherwise, `None` is returned.
112     ///
113     pub fn pop(&mut self) -> Option<ConnMapKey> {
114         if let Some(item) = self.q.front() {
115             if Instant::now() > item.kill_time {
116                 return Some(self.q.pop_front().unwrap().key);
117             }
118         }
119         None
120     }
121 
122     /// Check if the kill queue is synchronized with the connection pool.
123     ///
124     pub fn is_synced(&self) -> bool {
125         self.synced
126     }
127 
128     /// Check if the kill queue is empty, obviously.
129     ///
130     pub fn is_empty(&self) -> bool {
131         self.q.len() == 0
132     }
133 
134     /// Check if the kill queue is full.
135     ///
136     pub fn is_full(&self) -> bool {
137         self.q.len() == Self::SIZE
138     }
139 }
140