xref: /cloud-hypervisor/rate_limiter/src/group.rs (revision 3ce0fef7fd546467398c914dbc74d8542e45cf6f)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 // Copyright 2023 Crusoe Energy Systems LLC
5 // SPDX-License-Identifier: Apache-2.0
6 
7 use crate::{RateLimiter, TokenType};
8 use core::panic::AssertUnwindSafe;
9 use std::fs::File;
10 use std::io;
11 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
12 use std::result;
13 use std::sync::{Arc, Mutex};
14 use std::thread;
15 use thiserror::Error;
16 use vmm_sys_util::eventfd::EventFd;
17 
18 /// Errors associated with rate-limiter group.
19 #[derive(Debug, Error)]
20 pub enum Error {
21     /// Cannot create thread
22     #[error("Error spawning rate-limiter thread {0}")]
23     ThreadSpawn(#[source] io::Error),
24 
25     /// Cannot create epoll context.
26     #[error("Error creating epoll context: {0}")]
27     Epoll(#[source] io::Error),
28 
29     /// Cannot create EventFd.
30     #[error("Error creating EventFd: {0}")]
31     EventFd(#[source] io::Error),
32 
33     /// Cannot create RateLimiter.
34     #[error("Error creating RateLimiter: {0}")]
35     RateLimiter(#[source] io::Error),
36 
37     /// Cannot read from EventFd.
38     #[error("Error reading from EventFd: {0}")]
39     EventFdRead(#[source] io::Error),
40 
41     /// Cannot write to EventFd.
42     #[error("Error writing to EventFd: {0}")]
43     EventFdWrite(#[source] io::Error),
44 }
45 
46 /// The RateLimiterGroupHandle is a handle to a RateLimiterGroup that may be
47 /// used in exactly the same way as the RateLimiter type. When the RateLimiter
48 /// within a RateLimiterGroup is unblocked, each RateLimiterGroupHandle will
49 /// be notified.
50 pub struct RateLimiterGroupHandle {
51     eventfd: Arc<EventFd>,
52     inner: Arc<RateLimiterGroupInner>,
53 }
54 
55 impl RateLimiterGroupHandle {
56     fn new(inner: Arc<RateLimiterGroupInner>) -> result::Result<Self, Error> {
57         let eventfd = Arc::new(EventFd::new(0).map_err(Error::EventFd)?);
58         inner.handles.lock().unwrap().push(eventfd.clone());
59         Ok(Self { eventfd, inner })
60     }
61 
62     /// Attempts to consume tokens and returns whether that is possible.
63     ///
64     /// If rate limiting is disabled on provided `token_type`, this function will always succeed.
65     pub fn consume(&self, tokens: u64, token_type: TokenType) -> bool {
66         self.inner.rate_limiter.consume(tokens, token_type)
67     }
68 
69     /// Adds tokens of `token_type` to their respective bucket.
70     ///
71     /// Can be used to *manually* add tokens to a bucket. Useful for reverting a
72     /// `consume()` if needed.
73     pub fn manual_replenish(&self, tokens: u64, token_type: TokenType) {
74         self.inner.rate_limiter.manual_replenish(tokens, token_type)
75     }
76 
77     /// This function needs to be called every time there is an event on the
78     /// FD provided by this object's `AsRawFd` trait implementation.
79     ///
80     /// # Errors
81     ///
82     /// If the rate limiter is disabled or is not blocked, an error is returned.
83     pub fn event_handler(&self) -> Result<(), Error> {
84         self.eventfd.read().map_err(Error::EventFdRead).map(|_| ())
85     }
86 
87     /// Returns whether this rate limiter is blocked.
88     ///
89     /// The limiter 'blocks' when a `consume()` operation fails because there was not enough
90     /// budget for it.
91     /// An event will be generated on the exported FD when the limiter 'unblocks'.
92     pub fn is_blocked(&self) -> bool {
93         self.inner.rate_limiter.is_blocked()
94     }
95 }
96 
97 impl Clone for RateLimiterGroupHandle {
98     fn clone(&self) -> Self {
99         RateLimiterGroupHandle::new(self.inner.clone()).unwrap()
100     }
101 }
102 
103 impl AsRawFd for RateLimiterGroupHandle {
104     fn as_raw_fd(&self) -> RawFd {
105         self.eventfd.as_raw_fd()
106     }
107 }
108 
109 impl Drop for RateLimiterGroupHandle {
110     fn drop(&mut self) {
111         let mut handles = self.inner.handles.lock().unwrap();
112         let index = handles
113             .iter()
114             .position(|handle| handle.as_raw_fd() == self.eventfd.as_raw_fd())
115             .expect("RateLimiterGroupHandle must be subscribed to RateLimiterGroup");
116         handles.remove(index);
117     }
118 }
119 
120 struct RateLimiterGroupInner {
121     id: String,
122     rate_limiter: RateLimiter,
123     handles: Mutex<Vec<Arc<EventFd>>>,
124 }
125 
126 /// A RateLimiterGroup is an extension of RateLimiter that enables rate-limiting
127 /// the aggregate io consumption of multiple consumers.
128 pub struct RateLimiterGroup {
129     inner: Arc<RateLimiterGroupInner>,
130     epoll_file: File,
131     kill_evt: EventFd,
132     epoll_thread: Option<thread::JoinHandle<()>>,
133 }
134 
135 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
136 #[repr(u64)]
137 enum EpollDispatch {
138     Kill = 1,
139     Unblocked = 2,
140     Unknown,
141 }
142 
143 impl From<u64> for EpollDispatch {
144     fn from(v: u64) -> Self {
145         use EpollDispatch::*;
146         match v {
147             1 => Kill,
148             2 => Unblocked,
149             _ => Unknown,
150         }
151     }
152 }
153 
154 impl RateLimiterGroup {
155     /// Create a new RateLimiterGroup.
156     pub fn new(
157         id: &str,
158         bytes_total_capacity: u64,
159         bytes_one_time_burst: u64,
160         bytes_complete_refill_time_ms: u64,
161         ops_total_capacity: u64,
162         ops_one_time_burst: u64,
163         ops_complete_refill_time_ms: u64,
164     ) -> result::Result<Self, Error> {
165         let rate_limiter = RateLimiter::new(
166             bytes_total_capacity,
167             bytes_one_time_burst,
168             bytes_complete_refill_time_ms,
169             ops_total_capacity,
170             ops_one_time_burst,
171             ops_complete_refill_time_ms,
172         )
173         .map_err(Error::RateLimiter)?;
174 
175         let epoll_fd = epoll::create(true).map_err(Error::Epoll)?;
176         let kill_evt = EventFd::new(0).map_err(Error::EventFd)?;
177 
178         epoll::ctl(
179             epoll_fd,
180             epoll::ControlOptions::EPOLL_CTL_ADD,
181             kill_evt.as_raw_fd(),
182             epoll::Event::new(epoll::Events::EPOLLIN, EpollDispatch::Kill as u64),
183         )
184         .map_err(Error::Epoll)?;
185 
186         epoll::ctl(
187             epoll_fd,
188             epoll::ControlOptions::EPOLL_CTL_ADD,
189             rate_limiter.as_raw_fd(),
190             epoll::Event::new(epoll::Events::EPOLLIN, EpollDispatch::Unblocked as u64),
191         )
192         .map_err(Error::Epoll)?;
193 
194         // Use 'File' to enforce closing on 'epoll_fd'
195         // SAFETY: epoll_fd is valid
196         let epoll_file = unsafe { File::from_raw_fd(epoll_fd) };
197 
198         Ok(Self {
199             inner: Arc::new(RateLimiterGroupInner {
200                 id: id.to_string(),
201                 rate_limiter,
202                 handles: Mutex::new(Vec::new()),
203             }),
204             epoll_file,
205             kill_evt,
206             epoll_thread: None,
207         })
208     }
209 
210     /// Create a new RateLimiterGroupHandle.
211     pub fn new_handle(&self) -> result::Result<RateLimiterGroupHandle, Error> {
212         RateLimiterGroupHandle::new(self.inner.clone())
213     }
214 
215     /// Start a worker thread to broadcast an event to each RateLimiterGroupHandle
216     /// when the RateLimiter becomes unblocked.
217     pub fn start_thread(&mut self, exit_evt: EventFd) -> result::Result<(), Error> {
218         let inner = self.inner.clone();
219         let epoll_fd = self.epoll_file.as_raw_fd();
220         thread::Builder::new()
221             .name(format!("rate-limit-group-{}", inner.id))
222             .spawn(move || {
223                 let res = std::panic::catch_unwind(AssertUnwindSafe(move || {
224                     const EPOLL_EVENTS_LEN: usize = 2;
225 
226                     let mut events =
227                         [epoll::Event::new(epoll::Events::empty(), 0); EPOLL_EVENTS_LEN];
228 
229                     loop {
230                         let num_events = match epoll::wait(epoll_fd, -1, &mut events[..]) {
231                             Ok(res) => res,
232                             Err(e) => {
233                                 if e.kind() == io::ErrorKind::Interrupted {
234                                     continue;
235                                 } else {
236                                     return Err(Error::Epoll(e));
237                                 }
238                             }
239                         };
240 
241                         for event in events.iter().take(num_events) {
242                             let dispatch_event: EpollDispatch = event.data.into();
243                             match dispatch_event {
244                                 EpollDispatch::Unknown => {
245                                     let event = event.data;
246                                     warn!("Unknown rate-limiter loop event: {}", event);
247                                 }
248                                 EpollDispatch::Unblocked => {
249                                     inner.rate_limiter.event_handler().unwrap();
250                                     let handles = inner.handles.lock().unwrap();
251                                     for handle in handles.iter() {
252                                         handle.write(1).map_err(Error::EventFdWrite)?
253                                     }
254                                 }
255                                 EpollDispatch::Kill => {
256                                     info!(
257                                         "KILL_EVENT received, stopping rate-limit-group epoll loop"
258                                     );
259                                     return Ok(());
260                                 }
261                             }
262                         }
263                     }
264                 }));
265 
266                 match res {
267                     Ok(res) => {
268                         if let Err(e) = res {
269                             error!("Error running rate-limit-group worker: {:?}", e);
270                             exit_evt.write(1).unwrap();
271                         }
272                     }
273                     Err(_) => {
274                         error!("rate-limit-group worker panicked");
275                         exit_evt.write(1).unwrap();
276                     }
277                 };
278             })
279             .map(|thread| self.epoll_thread.insert(thread))
280             .map_err(Error::ThreadSpawn)?;
281 
282         Ok(())
283     }
284 }
285 
286 impl Drop for RateLimiterGroup {
287     fn drop(&mut self) {
288         self.kill_evt.write(1).unwrap();
289 
290         if let Some(t) = self.epoll_thread.take() {
291             if let Err(e) = t.join() {
292                 error!("Error joining thread: {:?}", e);
293             }
294         }
295     }
296 }
297 
298 #[cfg(test)]
299 pub(crate) mod tests {
300     use super::RateLimiterGroupHandle;
301     use crate::{group::RateLimiterGroup, TokenBucket, TokenType, REFILL_TIMER_INTERVAL_MS};
302     use std::{os::fd::AsRawFd, thread, time::Duration};
303     use vmm_sys_util::eventfd::EventFd;
304 
305     impl RateLimiterGroupHandle {
306         pub fn bandwidth(&self) -> Option<TokenBucket> {
307             let guard = self.inner.rate_limiter.inner.lock().unwrap();
308             guard.bandwidth.clone()
309         }
310 
311         pub fn ops(&self) -> Option<TokenBucket> {
312             let guard = self.inner.rate_limiter.inner.lock().unwrap();
313             guard.ops.clone()
314         }
315     }
316 
317     #[test]
318     fn test_rate_limiter_group_new() {
319         let l = RateLimiterGroup::new("test", 1000, 1001, 1002, 1003, 1004, 1005).unwrap();
320         let h = l.new_handle().unwrap();
321         let bw = h.bandwidth().unwrap();
322         assert_eq!(bw.capacity(), 1000);
323         assert_eq!(bw.one_time_burst(), 1001);
324         assert_eq!(bw.refill_time_ms(), 1002);
325         assert_eq!(bw.budget(), 1000);
326 
327         let ops = h.ops().unwrap();
328         assert_eq!(ops.capacity(), 1003);
329         assert_eq!(ops.one_time_burst(), 1004);
330         assert_eq!(ops.refill_time_ms(), 1005);
331         assert_eq!(ops.budget(), 1003);
332     }
333 
334     #[test]
335     fn test_rate_limiter_group_manual_replenish() {
336         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
337         let l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap();
338         let h = l.new_handle().unwrap();
339 
340         // consume 123 bytes
341         assert!(h.consume(123, TokenType::Bytes));
342         h.manual_replenish(23, TokenType::Bytes);
343         {
344             let bytes_tb = h.bandwidth().unwrap();
345             assert_eq!(bytes_tb.budget(), 900);
346         }
347         // consume 123 ops
348         assert!(h.consume(123, TokenType::Ops));
349         h.manual_replenish(23, TokenType::Ops);
350         {
351             let bytes_tb = h.ops().unwrap();
352             assert_eq!(bytes_tb.budget(), 900);
353         }
354     }
355 
356     #[test]
357     fn test_rate_limiter_group_bandwidth() {
358         // rate limiter with limit of 1000 bytes/s
359         let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 0, 0, 0).unwrap();
360         l.start_thread(EventFd::new(0).unwrap()).unwrap();
361 
362         let h = l.new_handle().unwrap();
363 
364         // limiter should not be blocked
365         assert!(!h.is_blocked());
366         // raw FD for this disabled should be valid
367         assert!(h.as_raw_fd() > 0);
368 
369         // ops/s limiter should be disabled so consume(whatever) should work
370         assert!(h.consume(u64::max_value(), TokenType::Ops));
371 
372         // do full 1000 bytes
373         assert!(h.consume(1000, TokenType::Bytes));
374         // try and fail on another 100
375         assert!(!h.consume(100, TokenType::Bytes));
376         // since consume failed, limiter should be blocked now
377         assert!(h.is_blocked());
378         // wait half the timer period
379         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
380         // limiter should still be blocked
381         assert!(h.is_blocked());
382         // wait the other half of the timer period
383         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
384         // the timer_fd should have an event on it by now
385         assert!(h.event_handler().is_ok());
386         // limiter should now be unblocked
387         assert!(!h.is_blocked());
388         // try and succeed on another 100 bytes this time
389         assert!(h.consume(100, TokenType::Bytes));
390     }
391 
392     #[test]
393     fn test_rate_limiter_group_ops() {
394         // rate limiter with limit of 1000 ops/s
395         let mut l = RateLimiterGroup::new("test", 0, 0, 0, 1000, 0, 1000).unwrap();
396         l.start_thread(EventFd::new(0).unwrap()).unwrap();
397 
398         let h = l.new_handle().unwrap();
399 
400         // limiter should not be blocked
401         assert!(!h.is_blocked());
402         // raw FD for this disabled should be valid
403         assert!(h.as_raw_fd() > 0);
404 
405         // bytes/s limiter should be disabled so consume(whatever) should work
406         assert!(h.consume(u64::max_value(), TokenType::Bytes));
407 
408         // do full 1000 ops
409         assert!(h.consume(1000, TokenType::Ops));
410         // try and fail on another 100
411         assert!(!h.consume(100, TokenType::Ops));
412         // since consume failed, limiter should be blocked now
413         assert!(h.is_blocked());
414         // wait half the timer period
415         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
416         // limiter should still be blocked
417         assert!(h.is_blocked());
418         // wait the other half of the timer period
419         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
420         // the timer_fd should have an event on it by now
421         assert!(h.event_handler().is_ok());
422         // limiter should now be unblocked
423         assert!(!h.is_blocked());
424         // try and succeed on another 100 ops this time
425         assert!(h.consume(100, TokenType::Ops));
426     }
427 
428     #[test]
429     fn test_rate_limiter_group_full() {
430         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
431         let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap();
432         l.start_thread(EventFd::new(0).unwrap()).unwrap();
433 
434         let h = l.new_handle().unwrap();
435 
436         // limiter should not be blocked
437         assert!(!h.is_blocked());
438         // raw FD for this disabled should be valid
439         assert!(h.as_raw_fd() > 0);
440 
441         // do full 1000 bytes
442         assert!(h.consume(1000, TokenType::Ops));
443         // do full 1000 bytes
444         assert!(h.consume(1000, TokenType::Bytes));
445         // try and fail on another 100 ops
446         assert!(!h.consume(100, TokenType::Ops));
447         // try and fail on another 100 bytes
448         assert!(!h.consume(100, TokenType::Bytes));
449         // since consume failed, limiter should be blocked now
450         assert!(h.is_blocked());
451         // wait half the timer period
452         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
453         // limiter should still be blocked
454         assert!(h.is_blocked());
455         // wait the other half of the timer period
456         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
457         // the timer_fd should have an event on it by now
458         assert!(h.event_handler().is_ok());
459         // limiter should now be unblocked
460         assert!(!h.is_blocked());
461         // try and succeed on another 100 ops this time
462         assert!(h.consume(100, TokenType::Ops));
463         // try and succeed on another 100 bytes this time
464         assert!(h.consume(100, TokenType::Bytes));
465     }
466 
467     #[test]
468     fn test_rate_limiter_group_overconsumption() {
469         // initialize the rate limiter
470         let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap();
471         l.start_thread(EventFd::new(0).unwrap()).unwrap();
472 
473         let h = l.new_handle().unwrap();
474 
475         // try to consume 2.5x the bucket size
476         // we are "borrowing" 1.5x the bucket size in tokens since
477         // the bucket is full
478         assert!(h.consume(2500, TokenType::Bytes));
479 
480         // check that even after a whole second passes, the rate limiter
481         // is still blocked
482         thread::sleep(Duration::from_millis(1000));
483         assert!(h.is_blocked());
484 
485         // after 1.5x the replenish time has passed, the rate limiter
486         // is available again
487         thread::sleep(Duration::from_millis(500));
488         assert!(h.event_handler().is_ok());
489         assert!(!h.is_blocked());
490 
491         // reset the rate limiter
492         let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap();
493         l.start_thread(EventFd::new(0).unwrap()).unwrap();
494 
495         let h = l.new_handle().unwrap();
496         // try to consume 1.5x the bucket size
497         // we are "borrowing" 1.5x the bucket size in tokens since
498         // the bucket is full, should arm the timer to 0.5x replenish
499         // time, which is 500 ms
500         assert!(h.consume(1500, TokenType::Bytes));
501 
502         // check that after more than the minimum refill time,
503         // the rate limiter is still blocked
504         thread::sleep(Duration::from_millis(200));
505         assert!(h.is_blocked());
506 
507         // try to consume some tokens, which should fail as the timer
508         // is still active
509         assert!(!h.consume(100, TokenType::Bytes));
510         assert!(h.is_blocked());
511 
512         // check that after the minimum refill time, the timer was not
513         // overwritten and the rate limiter is still blocked from the
514         // borrowing we performed earlier
515         thread::sleep(Duration::from_millis(100));
516         assert!(h.is_blocked());
517         assert!(!h.consume(100, TokenType::Bytes));
518 
519         // after waiting out the full duration, rate limiter should be
520         // available again
521         thread::sleep(Duration::from_millis(200));
522         assert!(h.event_handler().is_ok());
523         assert!(!h.is_blocked());
524         assert!(h.consume(100, TokenType::Bytes));
525     }
526 }
527