xref: /cloud-hypervisor/vmm/src/api/dbus/mod.rs (revision 99a98f270d8ac3acaff5a24d126e442350469bca)
1 // Copyright © 2023 Sartura Ltd.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4 //
5 use super::{ApiRequest, VmAction};
6 use crate::seccomp_filters::{get_seccomp_filter, Thread};
7 use crate::{Error as VmmError, Result as VmmResult};
8 use crate::{NetConfig, VmConfig};
9 use futures::channel::oneshot;
10 use futures::{executor, FutureExt};
11 use hypervisor::HypervisorType;
12 use seccompiler::{apply_filter, SeccompAction};
13 use std::panic::AssertUnwindSafe;
14 use std::sync::mpsc::Sender;
15 use std::sync::{Arc, Mutex};
16 use std::thread;
17 use vmm_sys_util::eventfd::EventFd;
18 use zbus::fdo::{self, Result};
19 use zbus::zvariant::Optional;
20 use zbus::{dbus_interface, ConnectionBuilder};
21 
22 pub type DBusApiShutdownChannels = (oneshot::Sender<()>, oneshot::Receiver<()>);
23 
24 pub struct DBusApiOptions {
25     pub service_name: String,
26     pub object_path: String,
27     pub system_bus: bool,
28 }
29 
30 pub struct DBusApi {
31     api_notifier: EventFd,
32     api_sender: futures::lock::Mutex<Sender<ApiRequest>>,
33 }
34 
35 fn api_error(error: impl std::fmt::Debug) -> fdo::Error {
36     fdo::Error::Failed(format!("{error:?}"))
37 }
38 
39 // This method is intended to ensure that the DBusApi thread has enough time to
40 // send a response to the VmmShutdown method call before it is terminated. If
41 // this step is omitted, the thread may be terminated before it can send a
42 // response, resulting in an error message stating that the message recipient
43 // disconnected from the message bus without providing a reply.
44 pub fn dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels) {
45     let (send_shutdown, mut recv_done) = ch;
46 
47     // send the shutdown signal and return
48     // if it errors out
49     if send_shutdown.send(()).is_err() {
50         return;
51     }
52 
53     // loop until `recv_err` errors out
54     // or as long as the return value indicates
55     // "immediately stale" (None)
56     while let Ok(None) = recv_done.try_recv() {}
57 }
58 
59 impl DBusApi {
60     pub fn new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self {
61         Self {
62             api_notifier,
63             api_sender: futures::lock::Mutex::new(api_sender),
64         }
65     }
66 
67     async fn clone_api_sender(&self) -> Sender<ApiRequest> {
68         // lock the async mutex, clone the `Sender` and then immediately
69         // drop the MutexGuard so that other tasks can clone the
70         // `Sender` as well
71         self.api_sender.lock().await.clone()
72     }
73 
74     fn clone_api_notifier(&self) -> Result<EventFd> {
75         self.api_notifier
76             .try_clone()
77             .map_err(|err| fdo::Error::IOError(format!("{err:?}")))
78     }
79 
80     async fn vm_action(&self, action: VmAction) -> Result<Optional<String>> {
81         let api_sender = self.clone_api_sender().await;
82         let api_notifier = self.clone_api_notifier()?;
83 
84         let result = blocking::unblock(move || super::vm_action(api_notifier, api_sender, action))
85             .await
86             .map_err(api_error)?
87             // We're using `from_utf8_lossy` here to not deal with the
88             // error case of `from_utf8` as we know that `b.body` is valid JSON.
89             .map(|b| String::from_utf8_lossy(&b.body).to_string());
90 
91         Ok(result.into())
92     }
93 }
94 
95 #[dbus_interface(name = "org.cloudhypervisor.DBusApi1")]
96 impl DBusApi {
97     async fn vmm_ping(&self) -> Result<String> {
98         let api_sender = self.clone_api_sender().await;
99         let api_notifier = self.clone_api_notifier()?;
100 
101         let result = blocking::unblock(move || super::vmm_ping(api_notifier, api_sender))
102             .await
103             .map_err(api_error)?;
104         serde_json::to_string(&result).map_err(api_error)
105     }
106 
107     async fn vmm_shutdown(&self) -> Result<()> {
108         let api_sender = self.clone_api_sender().await;
109         let api_notifier = self.clone_api_notifier()?;
110 
111         blocking::unblock(move || super::vmm_shutdown(api_notifier, api_sender))
112             .await
113             .map_err(api_error)
114     }
115 
116     async fn vm_add_device(&self, device_config: String) -> Result<Optional<String>> {
117         let device_config = serde_json::from_str(&device_config).map_err(api_error)?;
118         self.vm_action(VmAction::AddDevice(Arc::new(device_config)))
119             .await
120     }
121 
122     async fn vm_add_disk(&self, disk_config: String) -> Result<Optional<String>> {
123         let disk_config = serde_json::from_str(&disk_config).map_err(api_error)?;
124         self.vm_action(VmAction::AddDisk(Arc::new(disk_config)))
125             .await
126     }
127 
128     async fn vm_add_fs(&self, fs_config: String) -> Result<Optional<String>> {
129         let fs_config = serde_json::from_str(&fs_config).map_err(api_error)?;
130         self.vm_action(VmAction::AddFs(Arc::new(fs_config))).await
131     }
132 
133     async fn vm_add_net(&self, net_config: String) -> Result<Optional<String>> {
134         let mut net_config: NetConfig = serde_json::from_str(&net_config).map_err(api_error)?;
135         if net_config.fds.is_some() {
136             warn!("Ignoring FDs sent via the D-Bus request body");
137             net_config.fds = None;
138         }
139         self.vm_action(VmAction::AddNet(Arc::new(net_config))).await
140     }
141 
142     async fn vm_add_pmem(&self, pmem_config: String) -> Result<Optional<String>> {
143         let pmem_config = serde_json::from_str(&pmem_config).map_err(api_error)?;
144         self.vm_action(VmAction::AddPmem(Arc::new(pmem_config)))
145             .await
146     }
147 
148     async fn vm_add_user_device(&self, vm_add_user_device: String) -> Result<Optional<String>> {
149         let vm_add_user_device = serde_json::from_str(&vm_add_user_device).map_err(api_error)?;
150         self.vm_action(VmAction::AddUserDevice(Arc::new(vm_add_user_device)))
151             .await
152     }
153 
154     async fn vm_add_vdpa(&self, vdpa_config: String) -> Result<Optional<String>> {
155         let vdpa_config = serde_json::from_str(&vdpa_config).map_err(api_error)?;
156         self.vm_action(VmAction::AddVdpa(Arc::new(vdpa_config)))
157             .await
158     }
159 
160     async fn vm_add_vsock(&self, vsock_config: String) -> Result<Optional<String>> {
161         let vsock_config = serde_json::from_str(&vsock_config).map_err(api_error)?;
162         self.vm_action(VmAction::AddVsock(Arc::new(vsock_config)))
163             .await
164     }
165 
166     async fn vm_boot(&self) -> Result<()> {
167         self.vm_action(VmAction::Boot).await.map(|_| ())
168     }
169 
170     #[allow(unused_variables)]
171     // zbus doesn't support cfg attributes on interface methods
172     // as a workaround, we make the *call to the internal API* conditionally
173     // compile and return an error on unsupported platforms.
174     async fn vm_coredump(&self, vm_coredump_data: String) -> Result<()> {
175         #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))]
176         {
177             let vm_coredump_data = serde_json::from_str(&vm_coredump_data).map_err(api_error)?;
178             self.vm_action(VmAction::Coredump(Arc::new(vm_coredump_data)))
179                 .await
180                 .map(|_| ())
181         }
182 
183         #[cfg(not(all(target_arch = "x86_64", feature = "guest_debug")))]
184         Err(api_error(
185             "VmCoredump only works on x86_64 with the `guest_debug` feature enabled",
186         ))
187     }
188 
189     async fn vm_counters(&self) -> Result<Optional<String>> {
190         self.vm_action(VmAction::Counters).await
191     }
192 
193     async fn vm_create(&self, vm_config: String) -> Result<()> {
194         let api_sender = self.clone_api_sender().await;
195         let api_notifier = self.clone_api_notifier()?;
196 
197         let mut vm_config: VmConfig = serde_json::from_str(&vm_config).map_err(api_error)?;
198 
199         if let Some(ref mut nets) = vm_config.net {
200             if nets.iter().any(|net| net.fds.is_some()) {
201                 warn!("Ignoring FDs sent via the D-Bus request body");
202             }
203             for net in nets {
204                 net.fds = None;
205             }
206         }
207 
208         blocking::unblock(move || {
209             super::vm_create(api_notifier, api_sender, Arc::new(Mutex::new(vm_config)))
210         })
211         .await
212         .map_err(api_error)?;
213 
214         Ok(())
215     }
216 
217     async fn vm_delete(&self) -> Result<()> {
218         self.vm_action(VmAction::Delete).await.map(|_| ())
219     }
220 
221     async fn vm_info(&self) -> Result<String> {
222         let api_sender = self.clone_api_sender().await;
223         let api_notifier = self.clone_api_notifier()?;
224 
225         let result = blocking::unblock(move || super::vm_info(api_notifier, api_sender))
226             .await
227             .map_err(api_error)?;
228         serde_json::to_string(&result).map_err(api_error)
229     }
230 
231     async fn vm_pause(&self) -> Result<()> {
232         self.vm_action(VmAction::Pause).await.map(|_| ())
233     }
234 
235     async fn vm_power_button(&self) -> Result<()> {
236         self.vm_action(VmAction::PowerButton).await.map(|_| ())
237     }
238 
239     async fn vm_reboot(&self) -> Result<()> {
240         self.vm_action(VmAction::Reboot).await.map(|_| ())
241     }
242 
243     async fn vm_remove_device(&self, vm_remove_device: String) -> Result<()> {
244         let vm_remove_device = serde_json::from_str(&vm_remove_device).map_err(api_error)?;
245         self.vm_action(VmAction::RemoveDevice(Arc::new(vm_remove_device)))
246             .await
247             .map(|_| ())
248     }
249 
250     async fn vm_resize(&self, vm_resize: String) -> Result<()> {
251         let vm_resize = serde_json::from_str(&vm_resize).map_err(api_error)?;
252         self.vm_action(VmAction::Resize(Arc::new(vm_resize)))
253             .await
254             .map(|_| ())
255     }
256 
257     async fn vm_resize_zone(&self, vm_resize_zone: String) -> Result<()> {
258         let vm_resize_zone = serde_json::from_str(&vm_resize_zone).map_err(api_error)?;
259         self.vm_action(VmAction::ResizeZone(Arc::new(vm_resize_zone)))
260             .await
261             .map(|_| ())
262     }
263 
264     async fn vm_restore(&self, restore_config: String) -> Result<()> {
265         let restore_config = serde_json::from_str(&restore_config).map_err(api_error)?;
266         self.vm_action(VmAction::Restore(Arc::new(restore_config)))
267             .await
268             .map(|_| ())
269     }
270 
271     async fn vm_receive_migration(&self, receive_migration_data: String) -> Result<()> {
272         let receive_migration_data =
273             serde_json::from_str(&receive_migration_data).map_err(api_error)?;
274         self.vm_action(VmAction::ReceiveMigration(Arc::new(receive_migration_data)))
275             .await
276             .map(|_| ())
277     }
278 
279     async fn vm_send_migration(&self, send_migration_data: String) -> Result<()> {
280         let send_migration_data = serde_json::from_str(&send_migration_data).map_err(api_error)?;
281         self.vm_action(VmAction::SendMigration(Arc::new(send_migration_data)))
282             .await
283             .map(|_| ())
284     }
285 
286     async fn vm_resume(&self) -> Result<()> {
287         self.vm_action(VmAction::Resume).await.map(|_| ())
288     }
289 
290     async fn vm_shutdown(&self) -> Result<()> {
291         self.vm_action(VmAction::Shutdown).await.map(|_| ())
292     }
293 
294     async fn vm_snapshot(&self, vm_snapshot_config: String) -> Result<()> {
295         let vm_snapshot_config = serde_json::from_str(&vm_snapshot_config).map_err(api_error)?;
296         self.vm_action(VmAction::Snapshot(Arc::new(vm_snapshot_config)))
297             .await
298             .map(|_| ())
299     }
300 }
301 
302 pub fn start_dbus_thread(
303     dbus_options: DBusApiOptions,
304     api_notifier: EventFd,
305     api_sender: Sender<ApiRequest>,
306     seccomp_action: &SeccompAction,
307     exit_evt: EventFd,
308     hypervisor_type: HypervisorType,
309 ) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> {
310     let dbus_iface = DBusApi::new(api_notifier, api_sender);
311     let connection = executor::block_on(async move {
312         let conn_builder = if dbus_options.system_bus {
313             ConnectionBuilder::system()?
314         } else {
315             ConnectionBuilder::session()?
316         };
317 
318         conn_builder
319             .internal_executor(false)
320             .name(dbus_options.service_name)?
321             .serve_at(dbus_options.object_path, dbus_iface)?
322             .build()
323             .await
324     })
325     .map_err(VmmError::CreateDBusSession)?;
326 
327     let (send_shutdown, recv_shutdown) = oneshot::channel::<()>();
328     let (send_done, recv_done) = oneshot::channel::<()>();
329 
330     // Retrieve seccomp filter for API thread
331     let api_seccomp_filter = get_seccomp_filter(seccomp_action, Thread::DBusApi, hypervisor_type)
332         .map_err(VmmError::CreateSeccompFilter)?;
333 
334     let thread_join_handle = thread::Builder::new()
335         .name("dbus-thread".to_string())
336         .spawn(move || {
337             // Apply seccomp filter for API thread.
338             if !api_seccomp_filter.is_empty() {
339                 apply_filter(&api_seccomp_filter)
340                     .map_err(VmmError::ApplySeccompFilter)
341                     .map_err(|e| {
342                         error!("Error applying seccomp filter: {:?}", e);
343                         exit_evt.write(1).ok();
344                         e
345                     })?;
346             }
347 
348             std::panic::catch_unwind(AssertUnwindSafe(move || {
349                 executor::block_on(async move {
350                     let recv_shutdown = recv_shutdown.fuse();
351                     let executor_tick = futures::future::Fuse::terminated();
352                     futures::pin_mut!(recv_shutdown, executor_tick);
353                     executor_tick.set(connection.executor().tick().fuse());
354 
355                     loop {
356                         futures::select! {
357                             _ = executor_tick => executor_tick.set(connection.executor().tick().fuse()),
358                             _ = recv_shutdown => {
359                                 send_done.send(()).ok();
360                                 break;
361                             },
362                         }
363                     }
364                 })
365             }))
366             .map_err(|_| {
367                 error!("dbus-api thread panicked");
368                 exit_evt.write(1).ok()
369             })
370             .ok();
371 
372             Ok(())
373         })
374         .map_err(VmmError::DBusThreadSpawn)?;
375 
376     Ok((thread_join_handle, (send_shutdown, recv_done)))
377 }
378