xref: /cloud-hypervisor/event_monitor/src/lib.rs (revision 88a9f799449c04180c6b9a21d3b9c0c4b57e2bd6)
1 // Copyright © 2021 Intel Corporation
2 //
3 // SPDX-License-Identifier: Apache-2.0
4 //
5 
6 use std::borrow::Cow;
7 use std::collections::HashMap;
8 use std::fs::File;
9 use std::io;
10 use std::os::unix::io::AsRawFd;
11 use std::sync::Arc;
12 use std::time::{Duration, Instant};
13 
14 use once_cell::sync::OnceCell;
15 use serde::Serialize;
16 
17 static MONITOR: OnceCell<MonitorHandle> = OnceCell::new();
18 
19 #[derive(Serialize)]
20 struct Event<'a> {
21     timestamp: Duration,
22     source: &'a str,
23     event: &'a str,
24     properties: Option<&'a HashMap<Cow<'a, str>, Cow<'a, str>>>,
25 }
26 
27 pub struct Monitor {
28     pub rx: flume::Receiver<String>,
29     pub file: Option<File>,
30     pub broadcast: Vec<flume::Sender<Arc<String>>>,
31 }
32 
33 impl Monitor {
34     pub fn new(rx: flume::Receiver<String>, file: Option<File>) -> Self {
35         Self {
36             rx,
37             file,
38             broadcast: vec![],
39         }
40     }
41 
42     pub fn subscribe(&mut self) -> flume::Receiver<Arc<String>> {
43         let (tx, rx) = flume::unbounded();
44         self.broadcast.push(tx);
45         rx
46     }
47 }
48 
49 struct MonitorHandle {
50     tx: flume::Sender<String>,
51     start: Instant,
52 }
53 
54 fn set_file_nonblocking(file: &File) -> io::Result<()> {
55     let fd = file.as_raw_fd();
56 
57     // SAFETY: FFI call to configure the fd
58     let ret = unsafe {
59         let mut flags = libc::fcntl(fd, libc::F_GETFL);
60         flags |= libc::O_NONBLOCK;
61         libc::fcntl(fd, libc::F_SETFL, flags)
62     };
63 
64     if ret < 0 {
65         Err(io::Error::last_os_error())
66     } else {
67         Ok(())
68     }
69 }
70 
71 /// This function must only be called once from the main thread before any threads
72 /// are created to avoid race conditions.
73 pub fn set_monitor(file: Option<File>) -> io::Result<Monitor> {
74     // There is only one caller of this function, so MONITOR is written to only once
75     assert!(MONITOR.get().is_none());
76 
77     if let Some(ref file) = file {
78         set_file_nonblocking(file)?;
79     }
80 
81     let (tx, rx) = flume::unbounded();
82     let monitor = Monitor::new(rx, file);
83 
84     MONITOR.get_or_init(|| MonitorHandle {
85         tx,
86         start: Instant::now(),
87     });
88 
89     Ok(monitor)
90 }
91 
92 pub fn event_log(source: &str, event: &str, properties: Option<&HashMap<Cow<str>, Cow<str>>>) {
93     // `MONITOR` is always in a valid state (None or Some), because it is set
94     // only once before any threads are spawned, and it's not mutated
95     // afterwards. This function only creates immutable references to `MONITOR`.
96     // Because `MONITOR.tx` is `Sync`, it's safe to share `MONITOR` across
97     // threads, making this function thread-safe.
98     if let Some(monitor_handle) = MONITOR.get().as_ref() {
99         let event = Event {
100             timestamp: monitor_handle.start.elapsed(),
101             source,
102             event,
103             properties,
104         };
105 
106         if let Ok(event) = serde_json::to_string_pretty(&event) {
107             monitor_handle.tx.send(event).ok();
108         }
109     }
110 }
111 
112 /*
113     Through the use of Cow<'a, str> it is possible to use String as well as
114     &str as the parameters:
115     e.g.
116     event!("cpu_manager", "create_vcpu", "id", cpu_id.to_string());
117 */
118 #[macro_export]
119 macro_rules! event {
120     ($source:expr, $event:expr) => {
121         $crate::event_log($source, $event, None)
122     };
123     ($source:expr, $event:expr, $($key:expr, $value:expr),*) => {
124         {
125             let mut properties = ::std::collections::HashMap::new();
126             $(
127                 properties.insert($key.into(), $value.into());
128             )+
129             $crate::event_log($source, $event, Some(&properties))
130         }
131      };
132 }
133