xref: /cloud-hypervisor/virtio-devices/src/vsock/unix/muxer_killq.rs (revision 61e57e1cb149de03ae1e0b799b9e5ba9a4a63ace)
1434a5d0eSSebastien Boeuf // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2434a5d0eSSebastien Boeuf // SPDX-License-Identifier: Apache-2.0
3434a5d0eSSebastien Boeuf //
4434a5d0eSSebastien Boeuf 
5f6236087SAlyssa Ross //! `MuxerKillQ` implements a helper object that `VsockMuxer` can use for scheduling forced
6f6236087SAlyssa Ross //! connection termination. I.e. after one peer issues a clean shutdown request
7f6236087SAlyssa Ross //! (VSOCK_OP_SHUTDOWN), the concerned connection is queued for termination (VSOCK_OP_RST) in
8f6236087SAlyssa Ross //! the near future (herein implemented via an expiring timer).
9f6236087SAlyssa Ross //!
10f6236087SAlyssa Ross //! Whenever the muxer needs to schedule a connection for termination, it pushes it (or rather
11f6236087SAlyssa Ross //! an identifier - the connection key) to this queue. A subsequent pop() operation will
12f6236087SAlyssa Ross //! succeed if and only if the first connection in the queue is ready to be terminated (i.e.
13f6236087SAlyssa Ross //! its kill timer expired).
14f6236087SAlyssa Ross //!
15f6236087SAlyssa Ross //! Without using this queue, the muxer would have to walk its entire connection pool
16f6236087SAlyssa Ross //! (hashmap), whenever it needs to check for expired kill timers. With this queue, both
17f6236087SAlyssa Ross //! scheduling and termination are performed in constant time. However, since we don't want to
18f6236087SAlyssa Ross //! waste space on a kill queue that's as big as the connection hashmap itself, it is possible
19f6236087SAlyssa Ross //! that this queue may become full at times.  We call this kill queue "synchronized" if we are
20f6236087SAlyssa Ross //! certain that all connections that are awaiting termination are present in the queue. This
21f6236087SAlyssa Ross //! means a simple constant-time pop() operation is enough to check whether any connections
22f6236087SAlyssa Ross //! need to be terminated.  When the kill queue becomes full, though, pushing fails, so
23f6236087SAlyssa Ross //! connections that should be terminated are left out. The queue is not synchronized anymore.
24f6236087SAlyssa Ross //! When that happens, the muxer will first drain the queue, and then replace it with a new
25f6236087SAlyssa Ross //! queue, created by walking the connection pool, looking for connections that will be
26f6236087SAlyssa Ross //! expiring in the future.
27f6236087SAlyssa Ross 
28434a5d0eSSebastien Boeuf use std::collections::{HashMap, VecDeque};
29434a5d0eSSebastien Boeuf use std::time::Instant;
30434a5d0eSSebastien Boeuf 
31434a5d0eSSebastien Boeuf use super::muxer::ConnMapKey;
32*61e57e1cSRuoqing He use super::{defs, MuxerConnection};
33434a5d0eSSebastien Boeuf 
34434a5d0eSSebastien Boeuf /// A kill queue item, holding the connection key and the scheduled time for termination.
35434a5d0eSSebastien Boeuf ///
36434a5d0eSSebastien Boeuf #[derive(Clone, Copy)]
37434a5d0eSSebastien Boeuf struct MuxerKillQItem {
38434a5d0eSSebastien Boeuf     key: ConnMapKey,
39434a5d0eSSebastien Boeuf     kill_time: Instant,
40434a5d0eSSebastien Boeuf }
41434a5d0eSSebastien Boeuf 
42434a5d0eSSebastien Boeuf /// The connection kill queue: a FIFO structure, storing the connections that are scheduled for
43434a5d0eSSebastien Boeuf /// termination.
44434a5d0eSSebastien Boeuf ///
45434a5d0eSSebastien Boeuf pub struct MuxerKillQ {
46434a5d0eSSebastien Boeuf     /// The kill queue contents.
47434a5d0eSSebastien Boeuf     q: VecDeque<MuxerKillQItem>,
48434a5d0eSSebastien Boeuf 
49434a5d0eSSebastien Boeuf     /// The kill queue sync status:
50434a5d0eSSebastien Boeuf     /// - when true, all connections that are awaiting termination are guaranteed to be in this
51434a5d0eSSebastien Boeuf     ///   queue;
52434a5d0eSSebastien Boeuf     /// - when false, some connections may have been left out.
53434a5d0eSSebastien Boeuf     ///
54434a5d0eSSebastien Boeuf     synced: bool,
55434a5d0eSSebastien Boeuf }
56434a5d0eSSebastien Boeuf 
57434a5d0eSSebastien Boeuf impl MuxerKillQ {
58434a5d0eSSebastien Boeuf     const SIZE: usize = defs::MUXER_KILLQ_SIZE;
59434a5d0eSSebastien Boeuf 
60434a5d0eSSebastien Boeuf     /// Trivial kill queue constructor.
61434a5d0eSSebastien Boeuf     ///
new() -> Self62434a5d0eSSebastien Boeuf     pub fn new() -> Self {
63434a5d0eSSebastien Boeuf         Self {
64434a5d0eSSebastien Boeuf             q: VecDeque::with_capacity(Self::SIZE),
65434a5d0eSSebastien Boeuf             synced: true,
66434a5d0eSSebastien Boeuf         }
67434a5d0eSSebastien Boeuf     }
68434a5d0eSSebastien Boeuf 
69434a5d0eSSebastien Boeuf     /// Create a kill queue by walking the connection pool, looking for connections that are
70434a5d0eSSebastien Boeuf     /// set to expire at some point in the future.
71434a5d0eSSebastien Boeuf     /// Note: if more than `Self::SIZE` connections are found, the queue will be created in an
72434a5d0eSSebastien Boeuf     ///       out-of-sync state, and will be discarded after it is emptied.
73434a5d0eSSebastien Boeuf     ///
from_conn_map(conn_map: &HashMap<ConnMapKey, MuxerConnection>) -> Self74434a5d0eSSebastien Boeuf     pub fn from_conn_map(conn_map: &HashMap<ConnMapKey, MuxerConnection>) -> Self {
75434a5d0eSSebastien Boeuf         let mut q_buf: Vec<MuxerKillQItem> = Vec::with_capacity(Self::SIZE);
76434a5d0eSSebastien Boeuf         let mut synced = true;
77434a5d0eSSebastien Boeuf         for (key, conn) in conn_map.iter() {
78434a5d0eSSebastien Boeuf             if !conn.will_expire() {
79434a5d0eSSebastien Boeuf                 continue;
80434a5d0eSSebastien Boeuf             }
81434a5d0eSSebastien Boeuf             if q_buf.len() >= Self::SIZE {
82434a5d0eSSebastien Boeuf                 synced = false;
83434a5d0eSSebastien Boeuf                 break;
84434a5d0eSSebastien Boeuf             }
85434a5d0eSSebastien Boeuf             q_buf.push(MuxerKillQItem {
86434a5d0eSSebastien Boeuf                 key: *key,
87434a5d0eSSebastien Boeuf                 kill_time: conn.expiry().unwrap(),
88434a5d0eSSebastien Boeuf             });
89434a5d0eSSebastien Boeuf         }
90434a5d0eSSebastien Boeuf         q_buf.sort_unstable_by_key(|it| it.kill_time);
91434a5d0eSSebastien Boeuf         Self {
92434a5d0eSSebastien Boeuf             q: q_buf.into(),
93434a5d0eSSebastien Boeuf             synced,
94434a5d0eSSebastien Boeuf         }
95434a5d0eSSebastien Boeuf     }
96434a5d0eSSebastien Boeuf 
97434a5d0eSSebastien Boeuf     /// Push a connection key to the queue, scheduling it for termination at
98434a5d0eSSebastien Boeuf     /// `CONN_SHUTDOWN_TIMEOUT_MS` from now (the push time).
99434a5d0eSSebastien Boeuf     ///
push(&mut self, key: ConnMapKey, kill_time: Instant)100434a5d0eSSebastien Boeuf     pub fn push(&mut self, key: ConnMapKey, kill_time: Instant) {
101434a5d0eSSebastien Boeuf         if !self.is_synced() || self.is_full() {
102434a5d0eSSebastien Boeuf             self.synced = false;
103434a5d0eSSebastien Boeuf             return;
104434a5d0eSSebastien Boeuf         }
105434a5d0eSSebastien Boeuf         self.q.push_back(MuxerKillQItem { key, kill_time });
106434a5d0eSSebastien Boeuf     }
107434a5d0eSSebastien Boeuf 
108434a5d0eSSebastien Boeuf     /// Attempt to pop an expired connection from the kill queue.
109434a5d0eSSebastien Boeuf     ///
110434a5d0eSSebastien Boeuf     /// This will succeed and return a connection key, only if the connection at the front of
111434a5d0eSSebastien Boeuf     /// the queue has expired. Otherwise, `None` is returned.
112434a5d0eSSebastien Boeuf     ///
pop(&mut self) -> Option<ConnMapKey>113434a5d0eSSebastien Boeuf     pub fn pop(&mut self) -> Option<ConnMapKey> {
114434a5d0eSSebastien Boeuf         if let Some(item) = self.q.front() {
115434a5d0eSSebastien Boeuf             if Instant::now() > item.kill_time {
116434a5d0eSSebastien Boeuf                 return Some(self.q.pop_front().unwrap().key);
117434a5d0eSSebastien Boeuf             }
118434a5d0eSSebastien Boeuf         }
119434a5d0eSSebastien Boeuf         None
120434a5d0eSSebastien Boeuf     }
121434a5d0eSSebastien Boeuf 
122434a5d0eSSebastien Boeuf     /// Check if the kill queue is synchronized with the connection pool.
123434a5d0eSSebastien Boeuf     ///
is_synced(&self) -> bool124434a5d0eSSebastien Boeuf     pub fn is_synced(&self) -> bool {
125434a5d0eSSebastien Boeuf         self.synced
126434a5d0eSSebastien Boeuf     }
127434a5d0eSSebastien Boeuf 
128434a5d0eSSebastien Boeuf     /// Check if the kill queue is empty, obviously.
129434a5d0eSSebastien Boeuf     ///
is_empty(&self) -> bool130434a5d0eSSebastien Boeuf     pub fn is_empty(&self) -> bool {
131434a5d0eSSebastien Boeuf         self.q.len() == 0
132434a5d0eSSebastien Boeuf     }
133434a5d0eSSebastien Boeuf 
134434a5d0eSSebastien Boeuf     /// Check if the kill queue is full.
135434a5d0eSSebastien Boeuf     ///
is_full(&self) -> bool136434a5d0eSSebastien Boeuf     pub fn is_full(&self) -> bool {
137434a5d0eSSebastien Boeuf         self.q.len() == Self::SIZE
138434a5d0eSSebastien Boeuf     }
139434a5d0eSSebastien Boeuf }
140