xref: /cloud-hypervisor/vmm/src/api/dbus/mod.rs (revision 88a9f799449c04180c6b9a21d3b9c0c4b57e2bd6)
1 // Copyright © 2023 Sartura Ltd.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4 //
5 use std::panic::AssertUnwindSafe;
6 use std::sync::mpsc::Sender;
7 use std::sync::Arc;
8 use std::thread;
9 
10 use futures::channel::oneshot;
11 use futures::{executor, FutureExt};
12 use hypervisor::HypervisorType;
13 use seccompiler::{apply_filter, SeccompAction};
14 use vmm_sys_util::eventfd::EventFd;
15 use zbus::fdo::{self, Result};
16 use zbus::zvariant::Optional;
17 use zbus::{interface, ConnectionBuilder};
18 
19 use super::{ApiAction, ApiRequest};
20 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))]
21 use crate::api::VmCoredump;
22 use crate::api::{
23     AddDisk, Body, VmAddDevice, VmAddFs, VmAddNet, VmAddPmem, VmAddUserDevice, VmAddVdpa,
24     VmAddVsock, VmBoot, VmCounters, VmCreate, VmDelete, VmInfo, VmPause, VmPowerButton, VmReboot,
25     VmReceiveMigration, VmRemoveDevice, VmResize, VmResizeZone, VmRestore, VmResume,
26     VmSendMigration, VmShutdown, VmSnapshot, VmmPing, VmmShutdown,
27 };
28 use crate::seccomp_filters::{get_seccomp_filter, Thread};
29 use crate::{Error as VmmError, Result as VmmResult};
30 use crate::{NetConfig, VmConfig};
31 
32 pub type DBusApiShutdownChannels = (oneshot::Sender<()>, oneshot::Receiver<()>);
33 
34 pub struct DBusApiOptions {
35     pub service_name: String,
36     pub object_path: String,
37     pub system_bus: bool,
38     pub event_monitor_rx: flume::Receiver<Arc<String>>,
39 }
40 
41 pub struct DBusApi {
42     api_notifier: EventFd,
43     api_sender: futures::lock::Mutex<Sender<ApiRequest>>,
44 }
45 
46 fn api_error(error: impl std::fmt::Debug + std::fmt::Display) -> fdo::Error {
47     fdo::Error::Failed(format!("{error}"))
48 }
49 
50 // This method is intended to ensure that the DBusApi thread has enough time to
51 // send a response to the VmmShutdown method call before it is terminated. If
52 // this step is omitted, the thread may be terminated before it can send a
53 // response, resulting in an error message stating that the message recipient
54 // disconnected from the message bus without providing a reply.
55 pub fn dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels) {
56     let (send_shutdown, mut recv_done) = ch;
57 
58     // send the shutdown signal and return
59     // if it errors out
60     if send_shutdown.send(()).is_err() {
61         return;
62     }
63 
64     // loop until `recv_err` errors out
65     // or as long as the return value indicates
66     // "immediately stale" (None)
67     while let Ok(None) = recv_done.try_recv() {}
68 }
69 
70 impl DBusApi {
71     pub fn new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self {
72         Self {
73             api_notifier,
74             api_sender: futures::lock::Mutex::new(api_sender),
75         }
76     }
77 
78     async fn clone_api_sender(&self) -> Sender<ApiRequest> {
79         // lock the async mutex, clone the `Sender` and then immediately
80         // drop the MutexGuard so that other tasks can clone the
81         // `Sender` as well
82         self.api_sender.lock().await.clone()
83     }
84 
85     fn clone_api_notifier(&self) -> Result<EventFd> {
86         self.api_notifier
87             .try_clone()
88             .map_err(|err| fdo::Error::IOError(format!("{err:?}")))
89     }
90 
91     async fn vm_action<Action: ApiAction<ResponseBody = Option<Body>>>(
92         &self,
93         action: &'static Action,
94         body: Action::RequestBody,
95     ) -> Result<Optional<String>> {
96         let api_sender = self.clone_api_sender().await;
97         let api_notifier = self.clone_api_notifier()?;
98 
99         let result = blocking::unblock(move || action.send(api_notifier, api_sender, body))
100             .await
101             .map_err(api_error)?
102             // We're using `from_utf8_lossy` here to not deal with the
103             // error case of `from_utf8` as we know that `b.body` is valid JSON.
104             .map(|b| String::from_utf8_lossy(&b.body).to_string());
105 
106         Ok(result.into())
107     }
108 }
109 
110 #[interface(name = "org.cloudhypervisor.DBusApi1")]
111 impl DBusApi {
112     async fn vmm_ping(&self) -> Result<String> {
113         let api_sender = self.clone_api_sender().await;
114         let api_notifier = self.clone_api_notifier()?;
115 
116         let result = blocking::unblock(move || VmmPing.send(api_notifier, api_sender, ()))
117             .await
118             .map_err(api_error)?;
119         serde_json::to_string(&result).map_err(api_error)
120     }
121 
122     async fn vmm_shutdown(&self) -> Result<()> {
123         let api_sender = self.clone_api_sender().await;
124         let api_notifier = self.clone_api_notifier()?;
125 
126         blocking::unblock(move || VmmShutdown.send(api_notifier, api_sender, ()))
127             .await
128             .map_err(api_error)
129     }
130 
131     async fn vm_add_device(&self, device_config: String) -> Result<Optional<String>> {
132         let device_config = serde_json::from_str(&device_config).map_err(api_error)?;
133         self.vm_action(&VmAddDevice, device_config).await
134     }
135 
136     async fn vm_add_disk(&self, disk_config: String) -> Result<Optional<String>> {
137         let disk_config = serde_json::from_str(&disk_config).map_err(api_error)?;
138         self.vm_action(&AddDisk, disk_config).await
139     }
140 
141     async fn vm_add_fs(&self, fs_config: String) -> Result<Optional<String>> {
142         let fs_config = serde_json::from_str(&fs_config).map_err(api_error)?;
143         self.vm_action(&VmAddFs, fs_config).await
144     }
145 
146     async fn vm_add_net(&self, net_config: String) -> Result<Optional<String>> {
147         let mut net_config: NetConfig = serde_json::from_str(&net_config).map_err(api_error)?;
148         if net_config.fds.is_some() {
149             warn!("Ignoring FDs sent via the D-Bus request body");
150             net_config.fds = None;
151         }
152         self.vm_action(&VmAddNet, net_config).await
153     }
154 
155     async fn vm_add_pmem(&self, pmem_config: String) -> Result<Optional<String>> {
156         let pmem_config = serde_json::from_str(&pmem_config).map_err(api_error)?;
157         self.vm_action(&VmAddPmem, pmem_config).await
158     }
159 
160     async fn vm_add_user_device(&self, vm_add_user_device: String) -> Result<Optional<String>> {
161         let vm_add_user_device = serde_json::from_str(&vm_add_user_device).map_err(api_error)?;
162         self.vm_action(&VmAddUserDevice, vm_add_user_device).await
163     }
164 
165     async fn vm_add_vdpa(&self, vdpa_config: String) -> Result<Optional<String>> {
166         let vdpa_config = serde_json::from_str(&vdpa_config).map_err(api_error)?;
167         self.vm_action(&VmAddVdpa, vdpa_config).await
168     }
169 
170     async fn vm_add_vsock(&self, vsock_config: String) -> Result<Optional<String>> {
171         let vsock_config = serde_json::from_str(&vsock_config).map_err(api_error)?;
172         self.vm_action(&VmAddVsock, vsock_config).await
173     }
174 
175     async fn vm_boot(&self) -> Result<()> {
176         self.vm_action(&VmBoot, ()).await.map(|_| ())
177     }
178 
179     #[allow(unused_variables)]
180     // zbus doesn't support cfg attributes on interface methods
181     // as a workaround, we make the *call to the internal API* conditionally
182     // compile and return an error on unsupported platforms.
183     async fn vm_coredump(&self, vm_coredump_data: String) -> Result<()> {
184         #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))]
185         {
186             let vm_coredump_data = serde_json::from_str(&vm_coredump_data).map_err(api_error)?;
187             self.vm_action(&VmCoredump, vm_coredump_data)
188                 .await
189                 .map(|_| ())
190         }
191 
192         #[cfg(not(all(target_arch = "x86_64", feature = "guest_debug")))]
193         Err(api_error(
194             "VmCoredump only works on x86_64 with the `guest_debug` feature enabled",
195         ))
196     }
197 
198     async fn vm_counters(&self) -> Result<Optional<String>> {
199         self.vm_action(&VmCounters, ()).await
200     }
201 
202     async fn vm_create(&self, vm_config: String) -> Result<()> {
203         let api_sender = self.clone_api_sender().await;
204         let api_notifier = self.clone_api_notifier()?;
205 
206         let mut vm_config: Box<VmConfig> = serde_json::from_str(&vm_config).map_err(api_error)?;
207 
208         if let Some(ref mut nets) = vm_config.net {
209             if nets.iter().any(|net| net.fds.is_some()) {
210                 warn!("Ignoring FDs sent via the D-Bus request body");
211             }
212             for net in nets {
213                 net.fds = None;
214             }
215         }
216 
217         blocking::unblock(move || VmCreate.send(api_notifier, api_sender, vm_config))
218             .await
219             .map_err(api_error)?;
220 
221         Ok(())
222     }
223 
224     async fn vm_delete(&self) -> Result<()> {
225         self.vm_action(&VmDelete, ()).await.map(|_| ())
226     }
227 
228     async fn vm_info(&self) -> Result<String> {
229         let api_sender = self.clone_api_sender().await;
230         let api_notifier = self.clone_api_notifier()?;
231 
232         let result = blocking::unblock(move || VmInfo.send(api_notifier, api_sender, ()))
233             .await
234             .map_err(api_error)?;
235         serde_json::to_string(&result).map_err(api_error)
236     }
237 
238     async fn vm_pause(&self) -> Result<()> {
239         self.vm_action(&VmPause, ()).await.map(|_| ())
240     }
241 
242     async fn vm_power_button(&self) -> Result<()> {
243         self.vm_action(&VmPowerButton, ()).await.map(|_| ())
244     }
245 
246     async fn vm_reboot(&self) -> Result<()> {
247         self.vm_action(&VmReboot, ()).await.map(|_| ())
248     }
249 
250     async fn vm_remove_device(&self, vm_remove_device: String) -> Result<()> {
251         let vm_remove_device = serde_json::from_str(&vm_remove_device).map_err(api_error)?;
252         self.vm_action(&VmRemoveDevice, vm_remove_device)
253             .await
254             .map(|_| ())
255     }
256 
257     async fn vm_resize(&self, vm_resize: String) -> Result<()> {
258         let vm_resize = serde_json::from_str(&vm_resize).map_err(api_error)?;
259         self.vm_action(&VmResize, vm_resize).await.map(|_| ())
260     }
261 
262     async fn vm_resize_zone(&self, vm_resize_zone: String) -> Result<()> {
263         let vm_resize_zone = serde_json::from_str(&vm_resize_zone).map_err(api_error)?;
264         self.vm_action(&VmResizeZone, vm_resize_zone)
265             .await
266             .map(|_| ())
267     }
268 
269     async fn vm_restore(&self, restore_config: String) -> Result<()> {
270         let restore_config = serde_json::from_str(&restore_config).map_err(api_error)?;
271         self.vm_action(&VmRestore, restore_config).await.map(|_| ())
272     }
273 
274     async fn vm_receive_migration(&self, receive_migration_data: String) -> Result<()> {
275         let receive_migration_data =
276             serde_json::from_str(&receive_migration_data).map_err(api_error)?;
277         self.vm_action(&VmReceiveMigration, receive_migration_data)
278             .await
279             .map(|_| ())
280     }
281 
282     async fn vm_send_migration(&self, send_migration_data: String) -> Result<()> {
283         let send_migration_data = serde_json::from_str(&send_migration_data).map_err(api_error)?;
284         self.vm_action(&VmSendMigration, send_migration_data)
285             .await
286             .map(|_| ())
287     }
288 
289     async fn vm_resume(&self) -> Result<()> {
290         self.vm_action(&VmResume, ()).await.map(|_| ())
291     }
292 
293     async fn vm_shutdown(&self) -> Result<()> {
294         self.vm_action(&VmShutdown, ()).await.map(|_| ())
295     }
296 
297     async fn vm_snapshot(&self, vm_snapshot_config: String) -> Result<()> {
298         let vm_snapshot_config = serde_json::from_str(&vm_snapshot_config).map_err(api_error)?;
299         self.vm_action(&VmSnapshot, vm_snapshot_config)
300             .await
301             .map(|_| ())
302     }
303 
304     // implementation of this function is provided by the `#[zbus(signal)]` macro call
305     #[zbus(signal)]
306     async fn event(ctxt: &zbus::SignalContext<'_>, event: Arc<String>) -> zbus::Result<()>;
307 }
308 
309 pub fn start_dbus_thread(
310     dbus_options: DBusApiOptions,
311     api_notifier: EventFd,
312     api_sender: Sender<ApiRequest>,
313     seccomp_action: &SeccompAction,
314     exit_evt: EventFd,
315     hypervisor_type: HypervisorType,
316 ) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> {
317     let dbus_iface = DBusApi::new(api_notifier, api_sender);
318     let (connection, iface_ref) = executor::block_on(async move {
319         let conn_builder = if dbus_options.system_bus {
320             ConnectionBuilder::system()?
321         } else {
322             ConnectionBuilder::session()?
323         };
324 
325         let conn = conn_builder
326             .internal_executor(false)
327             .name(dbus_options.service_name)?
328             .serve_at(dbus_options.object_path.as_str(), dbus_iface)?
329             .build()
330             .await?;
331 
332         let iface_ref = conn
333             .object_server()
334             .interface::<_, DBusApi>(dbus_options.object_path)
335             .await?;
336 
337         Ok((conn, iface_ref))
338     })
339     .map_err(VmmError::CreateDBusSession)?;
340 
341     let (send_shutdown, recv_shutdown) = oneshot::channel::<()>();
342     let (send_done, recv_done) = oneshot::channel::<()>();
343 
344     // Retrieve seccomp filter for API thread
345     let api_seccomp_filter = get_seccomp_filter(seccomp_action, Thread::DBusApi, hypervisor_type)
346         .map_err(VmmError::CreateSeccompFilter)?;
347 
348     let thread_join_handle = thread::Builder::new()
349         .name("dbus-thread".to_string())
350         .spawn(move || {
351             // Apply seccomp filter for API thread.
352             if !api_seccomp_filter.is_empty() {
353                 apply_filter(&api_seccomp_filter)
354                     .map_err(VmmError::ApplySeccompFilter)
355                     .map_err(|e| {
356                         error!("Error applying seccomp filter: {:?}", e);
357                         exit_evt.write(1).ok();
358                         e
359                     })?;
360             }
361 
362             std::panic::catch_unwind(AssertUnwindSafe(move || {
363                 executor::block_on(async move {
364                     let recv_shutdown = recv_shutdown.fuse();
365                     let executor_tick = futures::future::Fuse::terminated();
366                     futures::pin_mut!(recv_shutdown, executor_tick);
367                     executor_tick.set(connection.executor().tick().fuse());
368 
369                     loop {
370                         futures::select! {
371                             _ = executor_tick => executor_tick.set(connection.executor().tick().fuse()),
372                             _ = recv_shutdown => {
373                                 send_done.send(()).ok();
374                                 break;
375                             },
376                             ret = dbus_options.event_monitor_rx.recv_async() => {
377                                 if let Ok(event) = ret {
378                                     DBusApi::event(iface_ref.signal_context(), event).await.ok();
379                                 }
380                             }
381                         }
382                     }
383                 })
384             }))
385             .map_err(|_| {
386                 error!("dbus-api thread panicked");
387                 exit_evt.write(1).ok()
388             })
389             .ok();
390 
391             Ok(())
392         })
393         .map_err(VmmError::DBusThreadSpawn)?;
394 
395     Ok((thread_join_handle, (send_shutdown, recv_done)))
396 }
397