xref: /cloud-hypervisor/rate_limiter/src/group.rs (revision 19d36c765fdf00be749d95b3e61028bc302d6d73)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 // Copyright 2023 Crusoe Energy Systems LLC
5 // SPDX-License-Identifier: Apache-2.0
6 
7 use core::panic::AssertUnwindSafe;
8 use std::fs::File;
9 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
10 use std::sync::{Arc, Mutex};
11 use std::{io, result, thread};
12 
13 use thiserror::Error;
14 use vmm_sys_util::eventfd::EventFd;
15 
16 use crate::{RateLimiter, TokenType};
17 
18 /// Errors associated with rate-limiter group.
19 #[derive(Debug, Error)]
20 pub enum Error {
21     /// Cannot create thread
22     #[error("Error spawning rate-limiter thread {0}")]
23     ThreadSpawn(#[source] io::Error),
24 
25     /// Cannot create epoll context.
26     #[error("Error creating epoll context: {0}")]
27     Epoll(#[source] io::Error),
28 
29     /// Cannot create EventFd.
30     #[error("Error creating EventFd: {0}")]
31     EventFd(#[source] io::Error),
32 
33     /// Cannot create RateLimiter.
34     #[error("Error creating RateLimiter: {0}")]
35     RateLimiter(#[source] io::Error),
36 
37     /// Cannot read from EventFd.
38     #[error("Error reading from EventFd: {0}")]
39     EventFdRead(#[source] io::Error),
40 
41     /// Cannot write to EventFd.
42     #[error("Error writing to EventFd: {0}")]
43     EventFdWrite(#[source] io::Error),
44 }
45 
46 /// Handle to a RateLimiterGroup
47 ///
48 /// The RateLimiterGroupHandle may be used in exactly the same way as
49 /// the RateLimiter type. When the RateLimiter within a RateLimiterGroup
50 /// is unblocked, each RateLimiterGroupHandle will be notified.
51 pub struct RateLimiterGroupHandle {
52     eventfd: Arc<EventFd>,
53     inner: Arc<RateLimiterGroupInner>,
54 }
55 
56 impl RateLimiterGroupHandle {
57     fn new(inner: Arc<RateLimiterGroupInner>) -> result::Result<Self, Error> {
58         let eventfd = Arc::new(EventFd::new(0).map_err(Error::EventFd)?);
59         inner.handles.lock().unwrap().push(eventfd.clone());
60         Ok(Self { eventfd, inner })
61     }
62 
63     /// Attempts to consume tokens and returns whether that is possible.
64     ///
65     /// If rate limiting is disabled on provided `token_type`, this function will always succeed.
66     pub fn consume(&self, tokens: u64, token_type: TokenType) -> bool {
67         self.inner.rate_limiter.consume(tokens, token_type)
68     }
69 
70     /// Adds tokens of `token_type` to their respective bucket.
71     ///
72     /// Can be used to *manually* add tokens to a bucket. Useful for reverting a
73     /// `consume()` if needed.
74     pub fn manual_replenish(&self, tokens: u64, token_type: TokenType) {
75         self.inner.rate_limiter.manual_replenish(tokens, token_type)
76     }
77 
78     /// This function needs to be called every time there is an event on the
79     /// FD provided by this object's `AsRawFd` trait implementation.
80     ///
81     /// # Errors
82     ///
83     /// If the rate limiter is disabled or is not blocked, an error is returned.
84     pub fn event_handler(&self) -> Result<(), Error> {
85         self.eventfd.read().map_err(Error::EventFdRead).map(|_| ())
86     }
87 
88     /// Returns whether this rate limiter is blocked.
89     ///
90     /// The limiter 'blocks' when a `consume()` operation fails because there was not enough
91     /// budget for it.
92     /// An event will be generated on the exported FD when the limiter 'unblocks'.
93     pub fn is_blocked(&self) -> bool {
94         self.inner.rate_limiter.is_blocked()
95     }
96 }
97 
98 impl Clone for RateLimiterGroupHandle {
99     fn clone(&self) -> Self {
100         RateLimiterGroupHandle::new(self.inner.clone()).unwrap()
101     }
102 }
103 
104 impl AsRawFd for RateLimiterGroupHandle {
105     fn as_raw_fd(&self) -> RawFd {
106         self.eventfd.as_raw_fd()
107     }
108 }
109 
110 impl Drop for RateLimiterGroupHandle {
111     fn drop(&mut self) {
112         let mut handles = self.inner.handles.lock().unwrap();
113         let index = handles
114             .iter()
115             .position(|handle| handle.as_raw_fd() == self.eventfd.as_raw_fd())
116             .expect("RateLimiterGroupHandle must be subscribed to RateLimiterGroup");
117         handles.remove(index);
118     }
119 }
120 
121 struct RateLimiterGroupInner {
122     id: String,
123     rate_limiter: RateLimiter,
124     handles: Mutex<Vec<Arc<EventFd>>>,
125 }
126 
127 /// A RateLimiterGroup is an extension of RateLimiter that enables rate-limiting
128 /// the aggregate io consumption of multiple consumers.
129 pub struct RateLimiterGroup {
130     inner: Arc<RateLimiterGroupInner>,
131     epoll_file: File,
132     kill_evt: EventFd,
133     epoll_thread: Option<thread::JoinHandle<()>>,
134 }
135 
136 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
137 #[repr(u64)]
138 enum EpollDispatch {
139     Kill = 1,
140     Unblocked = 2,
141     Unknown,
142 }
143 
144 impl From<u64> for EpollDispatch {
145     fn from(v: u64) -> Self {
146         use EpollDispatch::*;
147         match v {
148             1 => Kill,
149             2 => Unblocked,
150             _ => Unknown,
151         }
152     }
153 }
154 
155 impl RateLimiterGroup {
156     /// Create a new RateLimiterGroup.
157     pub fn new(
158         id: &str,
159         bytes_total_capacity: u64,
160         bytes_one_time_burst: u64,
161         bytes_complete_refill_time_ms: u64,
162         ops_total_capacity: u64,
163         ops_one_time_burst: u64,
164         ops_complete_refill_time_ms: u64,
165     ) -> result::Result<Self, Error> {
166         let rate_limiter = RateLimiter::new(
167             bytes_total_capacity,
168             bytes_one_time_burst,
169             bytes_complete_refill_time_ms,
170             ops_total_capacity,
171             ops_one_time_burst,
172             ops_complete_refill_time_ms,
173         )
174         .map_err(Error::RateLimiter)?;
175 
176         let epoll_fd = epoll::create(true).map_err(Error::Epoll)?;
177         let kill_evt = EventFd::new(0).map_err(Error::EventFd)?;
178 
179         epoll::ctl(
180             epoll_fd,
181             epoll::ControlOptions::EPOLL_CTL_ADD,
182             kill_evt.as_raw_fd(),
183             epoll::Event::new(epoll::Events::EPOLLIN, EpollDispatch::Kill as u64),
184         )
185         .map_err(Error::Epoll)?;
186 
187         epoll::ctl(
188             epoll_fd,
189             epoll::ControlOptions::EPOLL_CTL_ADD,
190             rate_limiter.as_raw_fd(),
191             epoll::Event::new(epoll::Events::EPOLLIN, EpollDispatch::Unblocked as u64),
192         )
193         .map_err(Error::Epoll)?;
194 
195         // Use 'File' to enforce closing on 'epoll_fd'
196         // SAFETY: epoll_fd is valid
197         let epoll_file = unsafe { File::from_raw_fd(epoll_fd) };
198 
199         Ok(Self {
200             inner: Arc::new(RateLimiterGroupInner {
201                 id: id.to_string(),
202                 rate_limiter,
203                 handles: Mutex::new(Vec::new()),
204             }),
205             epoll_file,
206             kill_evt,
207             epoll_thread: None,
208         })
209     }
210 
211     /// Create a new RateLimiterGroupHandle.
212     pub fn new_handle(&self) -> result::Result<RateLimiterGroupHandle, Error> {
213         RateLimiterGroupHandle::new(self.inner.clone())
214     }
215 
216     /// Start a worker thread to broadcast an event to each RateLimiterGroupHandle
217     /// when the RateLimiter becomes unblocked.
218     pub fn start_thread(&mut self, exit_evt: EventFd) -> result::Result<(), Error> {
219         let inner = self.inner.clone();
220         let epoll_fd = self.epoll_file.as_raw_fd();
221         thread::Builder::new()
222             .name(format!("rate-limit-group-{}", inner.id))
223             .spawn(move || {
224                 let res = std::panic::catch_unwind(AssertUnwindSafe(move || {
225                     const EPOLL_EVENTS_LEN: usize = 2;
226 
227                     let mut events =
228                         [epoll::Event::new(epoll::Events::empty(), 0); EPOLL_EVENTS_LEN];
229 
230                     loop {
231                         let num_events = match epoll::wait(epoll_fd, -1, &mut events[..]) {
232                             Ok(res) => res,
233                             Err(e) => {
234                                 if e.kind() == io::ErrorKind::Interrupted {
235                                     continue;
236                                 } else {
237                                     return Err(Error::Epoll(e));
238                                 }
239                             }
240                         };
241 
242                         for event in events.iter().take(num_events) {
243                             let dispatch_event: EpollDispatch = event.data.into();
244                             match dispatch_event {
245                                 EpollDispatch::Unknown => {
246                                     let event = event.data;
247                                     warn!("Unknown rate-limiter loop event: {}", event);
248                                 }
249                                 EpollDispatch::Unblocked => {
250                                     inner.rate_limiter.event_handler().unwrap();
251                                     let handles = inner.handles.lock().unwrap();
252                                     for handle in handles.iter() {
253                                         handle.write(1).map_err(Error::EventFdWrite)?
254                                     }
255                                 }
256                                 EpollDispatch::Kill => {
257                                     info!(
258                                         "KILL_EVENT received, stopping rate-limit-group epoll loop"
259                                     );
260                                     return Ok(());
261                                 }
262                             }
263                         }
264                     }
265                 }));
266 
267                 match res {
268                     Ok(res) => {
269                         if let Err(e) = res {
270                             error!("Error running rate-limit-group worker: {:?}", e);
271                             exit_evt.write(1).unwrap();
272                         }
273                     }
274                     Err(_) => {
275                         error!("rate-limit-group worker panicked");
276                         exit_evt.write(1).unwrap();
277                     }
278                 };
279             })
280             .map(|thread| self.epoll_thread.insert(thread))
281             .map_err(Error::ThreadSpawn)?;
282 
283         Ok(())
284     }
285 }
286 
287 impl Drop for RateLimiterGroup {
288     fn drop(&mut self) {
289         self.kill_evt.write(1).unwrap();
290 
291         if let Some(t) = self.epoll_thread.take() {
292             if let Err(e) = t.join() {
293                 error!("Error joining thread: {:?}", e);
294             }
295         }
296     }
297 }
298 
299 #[cfg(test)]
300 pub(crate) mod tests {
301     use std::os::fd::AsRawFd;
302     use std::thread;
303     use std::time::Duration;
304 
305     use vmm_sys_util::eventfd::EventFd;
306 
307     use super::RateLimiterGroupHandle;
308     use crate::group::RateLimiterGroup;
309     use crate::{TokenBucket, TokenType, REFILL_TIMER_INTERVAL_MS};
310 
311     impl RateLimiterGroupHandle {
312         pub fn bandwidth(&self) -> Option<TokenBucket> {
313             let guard = self.inner.rate_limiter.inner.lock().unwrap();
314             guard.bandwidth.clone()
315         }
316 
317         pub fn ops(&self) -> Option<TokenBucket> {
318             let guard = self.inner.rate_limiter.inner.lock().unwrap();
319             guard.ops.clone()
320         }
321     }
322 
323     #[test]
324     fn test_rate_limiter_group_new() {
325         let l = RateLimiterGroup::new("test", 1000, 1001, 1002, 1003, 1004, 1005).unwrap();
326         let h = l.new_handle().unwrap();
327         let bw = h.bandwidth().unwrap();
328         assert_eq!(bw.capacity(), 1000);
329         assert_eq!(bw.one_time_burst(), 1001);
330         assert_eq!(bw.refill_time_ms(), 1002);
331         assert_eq!(bw.budget(), 1000);
332 
333         let ops = h.ops().unwrap();
334         assert_eq!(ops.capacity(), 1003);
335         assert_eq!(ops.one_time_burst(), 1004);
336         assert_eq!(ops.refill_time_ms(), 1005);
337         assert_eq!(ops.budget(), 1003);
338     }
339 
340     #[test]
341     fn test_rate_limiter_group_manual_replenish() {
342         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
343         let l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap();
344         let h = l.new_handle().unwrap();
345 
346         // consume 123 bytes
347         assert!(h.consume(123, TokenType::Bytes));
348         h.manual_replenish(23, TokenType::Bytes);
349         {
350             let bytes_tb = h.bandwidth().unwrap();
351             assert_eq!(bytes_tb.budget(), 900);
352         }
353         // consume 123 ops
354         assert!(h.consume(123, TokenType::Ops));
355         h.manual_replenish(23, TokenType::Ops);
356         {
357             let bytes_tb = h.ops().unwrap();
358             assert_eq!(bytes_tb.budget(), 900);
359         }
360     }
361 
362     #[test]
363     fn test_rate_limiter_group_bandwidth() {
364         // rate limiter with limit of 1000 bytes/s
365         let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 0, 0, 0).unwrap();
366         l.start_thread(EventFd::new(0).unwrap()).unwrap();
367 
368         let h = l.new_handle().unwrap();
369 
370         // limiter should not be blocked
371         assert!(!h.is_blocked());
372         // raw FD for this disabled should be valid
373         assert!(h.as_raw_fd() > 0);
374 
375         // ops/s limiter should be disabled so consume(whatever) should work
376         assert!(h.consume(u64::MAX, TokenType::Ops));
377 
378         // do full 1000 bytes
379         assert!(h.consume(1000, TokenType::Bytes));
380         // try and fail on another 100
381         assert!(!h.consume(100, TokenType::Bytes));
382         // since consume failed, limiter should be blocked now
383         assert!(h.is_blocked());
384         // wait half the timer period
385         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
386         // limiter should still be blocked
387         assert!(h.is_blocked());
388         // wait the other half of the timer period
389         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
390         // the timer_fd should have an event on it by now
391         assert!(h.event_handler().is_ok());
392         // limiter should now be unblocked
393         assert!(!h.is_blocked());
394         // try and succeed on another 100 bytes this time
395         assert!(h.consume(100, TokenType::Bytes));
396     }
397 
398     #[test]
399     fn test_rate_limiter_group_ops() {
400         // rate limiter with limit of 1000 ops/s
401         let mut l = RateLimiterGroup::new("test", 0, 0, 0, 1000, 0, 1000).unwrap();
402         l.start_thread(EventFd::new(0).unwrap()).unwrap();
403 
404         let h = l.new_handle().unwrap();
405 
406         // limiter should not be blocked
407         assert!(!h.is_blocked());
408         // raw FD for this disabled should be valid
409         assert!(h.as_raw_fd() > 0);
410 
411         // bytes/s limiter should be disabled so consume(whatever) should work
412         assert!(h.consume(u64::MAX, TokenType::Bytes));
413 
414         // do full 1000 ops
415         assert!(h.consume(1000, TokenType::Ops));
416         // try and fail on another 100
417         assert!(!h.consume(100, TokenType::Ops));
418         // since consume failed, limiter should be blocked now
419         assert!(h.is_blocked());
420         // wait half the timer period
421         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
422         // limiter should still be blocked
423         assert!(h.is_blocked());
424         // wait the other half of the timer period
425         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
426         // the timer_fd should have an event on it by now
427         assert!(h.event_handler().is_ok());
428         // limiter should now be unblocked
429         assert!(!h.is_blocked());
430         // try and succeed on another 100 ops this time
431         assert!(h.consume(100, TokenType::Ops));
432     }
433 
434     #[test]
435     fn test_rate_limiter_group_full() {
436         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
437         let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap();
438         l.start_thread(EventFd::new(0).unwrap()).unwrap();
439 
440         let h = l.new_handle().unwrap();
441 
442         // limiter should not be blocked
443         assert!(!h.is_blocked());
444         // raw FD for this disabled should be valid
445         assert!(h.as_raw_fd() > 0);
446 
447         // do full 1000 bytes
448         assert!(h.consume(1000, TokenType::Ops));
449         // do full 1000 bytes
450         assert!(h.consume(1000, TokenType::Bytes));
451         // try and fail on another 100 ops
452         assert!(!h.consume(100, TokenType::Ops));
453         // try and fail on another 100 bytes
454         assert!(!h.consume(100, TokenType::Bytes));
455         // since consume failed, limiter should be blocked now
456         assert!(h.is_blocked());
457         // wait half the timer period
458         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
459         // limiter should still be blocked
460         assert!(h.is_blocked());
461         // wait the other half of the timer period
462         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
463         // the timer_fd should have an event on it by now
464         assert!(h.event_handler().is_ok());
465         // limiter should now be unblocked
466         assert!(!h.is_blocked());
467         // try and succeed on another 100 ops this time
468         assert!(h.consume(100, TokenType::Ops));
469         // try and succeed on another 100 bytes this time
470         assert!(h.consume(100, TokenType::Bytes));
471     }
472 
473     #[test]
474     fn test_rate_limiter_group_overconsumption() {
475         // initialize the rate limiter
476         let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap();
477         l.start_thread(EventFd::new(0).unwrap()).unwrap();
478 
479         let h = l.new_handle().unwrap();
480 
481         // try to consume 2.5x the bucket size
482         // we are "borrowing" 1.5x the bucket size in tokens since
483         // the bucket is full
484         assert!(h.consume(2500, TokenType::Bytes));
485 
486         // check that even after a whole second passes, the rate limiter
487         // is still blocked
488         thread::sleep(Duration::from_millis(1000));
489         assert!(h.is_blocked());
490 
491         // after 1.5x the replenish time has passed, the rate limiter
492         // is available again
493         thread::sleep(Duration::from_millis(500));
494         assert!(h.event_handler().is_ok());
495         assert!(!h.is_blocked());
496 
497         // reset the rate limiter
498         let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap();
499         l.start_thread(EventFd::new(0).unwrap()).unwrap();
500 
501         let h = l.new_handle().unwrap();
502         // try to consume 1.5x the bucket size
503         // we are "borrowing" 1.5x the bucket size in tokens since
504         // the bucket is full, should arm the timer to 0.5x replenish
505         // time, which is 500 ms
506         assert!(h.consume(1500, TokenType::Bytes));
507 
508         // check that after more than the minimum refill time,
509         // the rate limiter is still blocked
510         thread::sleep(Duration::from_millis(200));
511         assert!(h.is_blocked());
512 
513         // try to consume some tokens, which should fail as the timer
514         // is still active
515         assert!(!h.consume(100, TokenType::Bytes));
516         assert!(h.is_blocked());
517 
518         // check that after the minimum refill time, the timer was not
519         // overwritten and the rate limiter is still blocked from the
520         // borrowing we performed earlier
521         thread::sleep(Duration::from_millis(100));
522         assert!(h.is_blocked());
523         assert!(!h.consume(100, TokenType::Bytes));
524 
525         // after waiting out the full duration, rate limiter should be
526         // available again
527         thread::sleep(Duration::from_millis(200));
528         assert!(h.event_handler().is_ok());
529         assert!(!h.is_blocked());
530         assert!(h.consume(100, TokenType::Bytes));
531     }
532 }
533