xref: /cloud-hypervisor/event_monitor/src/lib.rs (revision 3ce0fef7fd546467398c914dbc74d8542e45cf6f)
1 // Copyright © 2021 Intel Corporation
2 //
3 // SPDX-License-Identifier: Apache-2.0
4 //
5 
6 use once_cell::sync::OnceCell;
7 use serde::Serialize;
8 use std::borrow::Cow;
9 use std::collections::HashMap;
10 use std::fs::File;
11 use std::io;
12 use std::os::unix::io::AsRawFd;
13 use std::sync::Arc;
14 use std::time::{Duration, Instant};
15 
16 static MONITOR: OnceCell<MonitorHandle> = OnceCell::new();
17 
18 #[derive(Serialize)]
19 struct Event<'a> {
20     timestamp: Duration,
21     source: &'a str,
22     event: &'a str,
23     properties: Option<&'a HashMap<Cow<'a, str>, Cow<'a, str>>>,
24 }
25 
26 pub struct Monitor {
27     pub rx: flume::Receiver<String>,
28     pub file: Option<File>,
29     pub broadcast: Vec<flume::Sender<Arc<String>>>,
30 }
31 
32 impl Monitor {
33     pub fn new(rx: flume::Receiver<String>, file: Option<File>) -> Self {
34         Self {
35             rx,
36             file,
37             broadcast: vec![],
38         }
39     }
40 
41     pub fn subscribe(&mut self) -> flume::Receiver<Arc<String>> {
42         let (tx, rx) = flume::unbounded();
43         self.broadcast.push(tx);
44         rx
45     }
46 }
47 
48 struct MonitorHandle {
49     tx: flume::Sender<String>,
50     start: Instant,
51 }
52 
53 fn set_file_nonblocking(file: &File) -> io::Result<()> {
54     let fd = file.as_raw_fd();
55 
56     // SAFETY: FFI call to configure the fd
57     let ret = unsafe {
58         let mut flags = libc::fcntl(fd, libc::F_GETFL);
59         flags |= libc::O_NONBLOCK;
60         libc::fcntl(fd, libc::F_SETFL, flags)
61     };
62 
63     if ret < 0 {
64         Err(io::Error::last_os_error())
65     } else {
66         Ok(())
67     }
68 }
69 
70 /// This function must only be called once from the main thread before any threads
71 /// are created to avoid race conditions.
72 pub fn set_monitor(file: Option<File>) -> io::Result<Monitor> {
73     // There is only one caller of this function, so MONITOR is written to only once
74     assert!(MONITOR.get().is_none());
75 
76     if let Some(ref file) = file {
77         set_file_nonblocking(file)?;
78     }
79 
80     let (tx, rx) = flume::unbounded();
81     let monitor = Monitor::new(rx, file);
82 
83     MONITOR.get_or_init(|| MonitorHandle {
84         tx,
85         start: Instant::now(),
86     });
87 
88     Ok(monitor)
89 }
90 
91 pub fn event_log(source: &str, event: &str, properties: Option<&HashMap<Cow<str>, Cow<str>>>) {
92     // `MONITOR` is always in a valid state (None or Some), because it is set
93     // only once before any threads are spawned, and it's not mutated
94     // afterwards. This function only creates immutable references to `MONITOR`.
95     // Because `MONITOR.tx` is `Sync`, it's safe to share `MONITOR` across
96     // threads, making this function thread-safe.
97     if let Some(monitor_handle) = MONITOR.get().as_ref() {
98         let event = Event {
99             timestamp: monitor_handle.start.elapsed(),
100             source,
101             event,
102             properties,
103         };
104 
105         if let Ok(event) = serde_json::to_string_pretty(&event) {
106             monitor_handle.tx.send(event).ok();
107         }
108     }
109 }
110 
111 /*
112     Through the use of Cow<'a, str> it is possible to use String as well as
113     &str as the parameters:
114     e.g.
115     event!("cpu_manager", "create_vcpu", "id", cpu_id.to_string());
116 */
117 #[macro_export]
118 macro_rules! event {
119     ($source:expr, $event:expr) => {
120         $crate::event_log($source, $event, None)
121     };
122     ($source:expr, $event:expr, $($key:expr, $value:expr),*) => {
123         {
124             let mut properties = ::std::collections::HashMap::new();
125             $(
126                 properties.insert($key.into(), $value.into());
127             )+
128             $crate::event_log($source, $event, Some(&properties))
129         }
130      };
131 }
132