xref: /cloud-hypervisor/vmm/src/api/dbus/mod.rs (revision 3f3489e38e32a652241e889a9a1f6c67823d584b)
1 // Copyright © 2023 Sartura Ltd.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4 //
5 use std::panic::AssertUnwindSafe;
6 use std::sync::mpsc::Sender;
7 use std::sync::Arc;
8 use std::thread;
9 
10 use futures::channel::oneshot;
11 use futures::{executor, FutureExt};
12 use hypervisor::HypervisorType;
13 use seccompiler::{apply_filter, SeccompAction};
14 use vmm_sys_util::eventfd::EventFd;
15 use zbus::connection::Builder;
16 use zbus::fdo::{self, Result};
17 use zbus::interface;
18 use zbus::zvariant::Optional;
19 
20 use super::{ApiAction, ApiRequest};
21 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))]
22 use crate::api::VmCoredump;
23 use crate::api::{
24     AddDisk, Body, VmAddDevice, VmAddFs, VmAddNet, VmAddPmem, VmAddUserDevice, VmAddVdpa,
25     VmAddVsock, VmBoot, VmCounters, VmCreate, VmDelete, VmInfo, VmPause, VmPowerButton, VmReboot,
26     VmReceiveMigration, VmRemoveDevice, VmResize, VmResizeZone, VmRestore, VmResume,
27     VmSendMigration, VmShutdown, VmSnapshot, VmmPing, VmmShutdown,
28 };
29 use crate::seccomp_filters::{get_seccomp_filter, Thread};
30 use crate::{Error as VmmError, NetConfig, Result as VmmResult, VmConfig};
31 
32 pub type DBusApiShutdownChannels = (oneshot::Sender<()>, oneshot::Receiver<()>);
33 
34 pub struct DBusApiOptions {
35     pub service_name: String,
36     pub object_path: String,
37     pub system_bus: bool,
38     pub event_monitor_rx: flume::Receiver<Arc<String>>,
39 }
40 
41 pub struct DBusApi {
42     api_notifier: EventFd,
43     api_sender: futures::lock::Mutex<Sender<ApiRequest>>,
44 }
45 
46 fn api_error(error: impl std::fmt::Debug + std::fmt::Display) -> fdo::Error {
47     fdo::Error::Failed(format!("{error}"))
48 }
49 
50 // This method is intended to ensure that the DBusApi thread has enough time to
51 // send a response to the VmmShutdown method call before it is terminated. If
52 // this step is omitted, the thread may be terminated before it can send a
53 // response, resulting in an error message stating that the message recipient
54 // disconnected from the message bus without providing a reply.
55 pub fn dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels) {
56     let (send_shutdown, mut recv_done) = ch;
57 
58     // send the shutdown signal and return
59     // if it errors out
60     if send_shutdown.send(()).is_err() {
61         return;
62     }
63 
64     // loop until `recv_err` errors out
65     // or as long as the return value indicates
66     // "immediately stale" (None)
67     while let Ok(None) = recv_done.try_recv() {}
68 }
69 
70 impl DBusApi {
71     pub fn new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self {
72         Self {
73             api_notifier,
74             api_sender: futures::lock::Mutex::new(api_sender),
75         }
76     }
77 
78     async fn clone_api_sender(&self) -> Sender<ApiRequest> {
79         // lock the async mutex, clone the `Sender` and then immediately
80         // drop the MutexGuard so that other tasks can clone the
81         // `Sender` as well
82         self.api_sender.lock().await.clone()
83     }
84 
85     fn clone_api_notifier(&self) -> Result<EventFd> {
86         self.api_notifier
87             .try_clone()
88             .map_err(|err| fdo::Error::IOError(format!("{err:?}")))
89     }
90 
91     async fn vm_action<Action: ApiAction<ResponseBody = Option<Body>>>(
92         &self,
93         action: &'static Action,
94         body: Action::RequestBody,
95     ) -> Result<Optional<String>> {
96         let api_sender = self.clone_api_sender().await;
97         let api_notifier = self.clone_api_notifier()?;
98 
99         let result = blocking::unblock(move || action.send(api_notifier, api_sender, body))
100             .await
101             .map_err(api_error)?
102             // We're using `from_utf8_lossy` here to not deal with the
103             // error case of `from_utf8` as we know that `b.body` is valid JSON.
104             .map(|b| String::from_utf8_lossy(&b.body).to_string());
105 
106         Ok(result.into())
107     }
108 }
109 
110 #[interface(name = "org.cloudhypervisor.DBusApi1")]
111 impl DBusApi {
112     async fn vmm_ping(&self) -> Result<String> {
113         let api_sender = self.clone_api_sender().await;
114         let api_notifier = self.clone_api_notifier()?;
115 
116         let result = blocking::unblock(move || VmmPing.send(api_notifier, api_sender, ()))
117             .await
118             .map_err(api_error)?;
119         serde_json::to_string(&result).map_err(api_error)
120     }
121 
122     async fn vmm_shutdown(&self) -> Result<()> {
123         let api_sender = self.clone_api_sender().await;
124         let api_notifier = self.clone_api_notifier()?;
125 
126         blocking::unblock(move || VmmShutdown.send(api_notifier, api_sender, ()))
127             .await
128             .map_err(api_error)
129     }
130 
131     async fn vm_add_device(&self, device_config: String) -> Result<Optional<String>> {
132         let device_config = serde_json::from_str(&device_config).map_err(api_error)?;
133         self.vm_action(&VmAddDevice, device_config).await
134     }
135 
136     async fn vm_add_disk(&self, disk_config: String) -> Result<Optional<String>> {
137         let disk_config = serde_json::from_str(&disk_config).map_err(api_error)?;
138         self.vm_action(&AddDisk, disk_config).await
139     }
140 
141     async fn vm_add_fs(&self, fs_config: String) -> Result<Optional<String>> {
142         let fs_config = serde_json::from_str(&fs_config).map_err(api_error)?;
143         self.vm_action(&VmAddFs, fs_config).await
144     }
145 
146     async fn vm_add_net(&self, net_config: String) -> Result<Optional<String>> {
147         let mut net_config: NetConfig = serde_json::from_str(&net_config).map_err(api_error)?;
148         if net_config.fds.is_some() {
149             warn!("Ignoring FDs sent via the D-Bus request body");
150             net_config.fds = None;
151         }
152         self.vm_action(&VmAddNet, net_config).await
153     }
154 
155     async fn vm_add_pmem(&self, pmem_config: String) -> Result<Optional<String>> {
156         let pmem_config = serde_json::from_str(&pmem_config).map_err(api_error)?;
157         self.vm_action(&VmAddPmem, pmem_config).await
158     }
159 
160     async fn vm_add_user_device(&self, vm_add_user_device: String) -> Result<Optional<String>> {
161         let vm_add_user_device = serde_json::from_str(&vm_add_user_device).map_err(api_error)?;
162         self.vm_action(&VmAddUserDevice, vm_add_user_device).await
163     }
164 
165     async fn vm_add_vdpa(&self, vdpa_config: String) -> Result<Optional<String>> {
166         let vdpa_config = serde_json::from_str(&vdpa_config).map_err(api_error)?;
167         self.vm_action(&VmAddVdpa, vdpa_config).await
168     }
169 
170     async fn vm_add_vsock(&self, vsock_config: String) -> Result<Optional<String>> {
171         let vsock_config = serde_json::from_str(&vsock_config).map_err(api_error)?;
172         self.vm_action(&VmAddVsock, vsock_config).await
173     }
174 
175     async fn vm_boot(&self) -> Result<()> {
176         self.vm_action(&VmBoot, ()).await.map(|_| ())
177     }
178 
179     #[allow(unused_variables)]
180     // zbus doesn't support cfg attributes on interface methods
181     // as a workaround, we make the *call to the internal API* conditionally
182     // compile and return an error on unsupported platforms.
183     async fn vm_coredump(&self, vm_coredump_data: String) -> Result<()> {
184         #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))]
185         {
186             let vm_coredump_data = serde_json::from_str(&vm_coredump_data).map_err(api_error)?;
187             self.vm_action(&VmCoredump, vm_coredump_data)
188                 .await
189                 .map(|_| ())
190         }
191 
192         #[cfg(not(all(target_arch = "x86_64", feature = "guest_debug")))]
193         Err(api_error(
194             "VmCoredump only works on x86_64 with the `guest_debug` feature enabled",
195         ))
196     }
197 
198     async fn vm_counters(&self) -> Result<Optional<String>> {
199         self.vm_action(&VmCounters, ()).await
200     }
201 
202     async fn vm_create(&self, vm_config: String) -> Result<()> {
203         let api_sender = self.clone_api_sender().await;
204         let api_notifier = self.clone_api_notifier()?;
205 
206         let mut vm_config: Box<VmConfig> = serde_json::from_str(&vm_config).map_err(api_error)?;
207 
208         if let Some(ref mut nets) = vm_config.net {
209             if nets.iter().any(|net| net.fds.is_some()) {
210                 warn!("Ignoring FDs sent via the D-Bus request body");
211             }
212             for net in nets {
213                 net.fds = None;
214             }
215         }
216 
217         blocking::unblock(move || VmCreate.send(api_notifier, api_sender, vm_config))
218             .await
219             .map_err(api_error)?;
220 
221         Ok(())
222     }
223 
224     async fn vm_delete(&self) -> Result<()> {
225         self.vm_action(&VmDelete, ()).await.map(|_| ())
226     }
227 
228     async fn vm_info(&self) -> Result<String> {
229         let api_sender = self.clone_api_sender().await;
230         let api_notifier = self.clone_api_notifier()?;
231 
232         let result = blocking::unblock(move || VmInfo.send(api_notifier, api_sender, ()))
233             .await
234             .map_err(api_error)?;
235         serde_json::to_string(&result).map_err(api_error)
236     }
237 
238     async fn vm_pause(&self) -> Result<()> {
239         self.vm_action(&VmPause, ()).await.map(|_| ())
240     }
241 
242     async fn vm_power_button(&self) -> Result<()> {
243         self.vm_action(&VmPowerButton, ()).await.map(|_| ())
244     }
245 
246     async fn vm_reboot(&self) -> Result<()> {
247         self.vm_action(&VmReboot, ()).await.map(|_| ())
248     }
249 
250     async fn vm_remove_device(&self, vm_remove_device: String) -> Result<()> {
251         let vm_remove_device = serde_json::from_str(&vm_remove_device).map_err(api_error)?;
252         self.vm_action(&VmRemoveDevice, vm_remove_device)
253             .await
254             .map(|_| ())
255     }
256 
257     async fn vm_resize(&self, vm_resize: String) -> Result<()> {
258         let vm_resize = serde_json::from_str(&vm_resize).map_err(api_error)?;
259         self.vm_action(&VmResize, vm_resize).await.map(|_| ())
260     }
261 
262     async fn vm_resize_zone(&self, vm_resize_zone: String) -> Result<()> {
263         let vm_resize_zone = serde_json::from_str(&vm_resize_zone).map_err(api_error)?;
264         self.vm_action(&VmResizeZone, vm_resize_zone)
265             .await
266             .map(|_| ())
267     }
268 
269     async fn vm_restore(&self, restore_config: String) -> Result<()> {
270         let restore_config = serde_json::from_str(&restore_config).map_err(api_error)?;
271         self.vm_action(&VmRestore, restore_config).await.map(|_| ())
272     }
273 
274     async fn vm_receive_migration(&self, receive_migration_data: String) -> Result<()> {
275         let receive_migration_data =
276             serde_json::from_str(&receive_migration_data).map_err(api_error)?;
277         self.vm_action(&VmReceiveMigration, receive_migration_data)
278             .await
279             .map(|_| ())
280     }
281 
282     async fn vm_send_migration(&self, send_migration_data: String) -> Result<()> {
283         let send_migration_data = serde_json::from_str(&send_migration_data).map_err(api_error)?;
284         self.vm_action(&VmSendMigration, send_migration_data)
285             .await
286             .map(|_| ())
287     }
288 
289     async fn vm_resume(&self) -> Result<()> {
290         self.vm_action(&VmResume, ()).await.map(|_| ())
291     }
292 
293     async fn vm_shutdown(&self) -> Result<()> {
294         self.vm_action(&VmShutdown, ()).await.map(|_| ())
295     }
296 
297     async fn vm_snapshot(&self, vm_snapshot_config: String) -> Result<()> {
298         let vm_snapshot_config = serde_json::from_str(&vm_snapshot_config).map_err(api_error)?;
299         self.vm_action(&VmSnapshot, vm_snapshot_config)
300             .await
301             .map(|_| ())
302     }
303 
304     // implementation of this function is provided by the `#[zbus(signal)]` macro call
305     #[zbus(signal)]
306     async fn event(
307         ctxt: &zbus::object_server::SignalEmitter<'_>,
308         event: Arc<String>,
309     ) -> zbus::Result<()>;
310 }
311 
312 pub fn start_dbus_thread(
313     dbus_options: DBusApiOptions,
314     api_notifier: EventFd,
315     api_sender: Sender<ApiRequest>,
316     seccomp_action: &SeccompAction,
317     exit_evt: EventFd,
318     hypervisor_type: HypervisorType,
319 ) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> {
320     let dbus_iface = DBusApi::new(api_notifier, api_sender);
321     let (connection, iface_ref) = executor::block_on(async move {
322         let conn_builder = if dbus_options.system_bus {
323             Builder::system()?
324         } else {
325             Builder::session()?
326         };
327 
328         let conn = conn_builder
329             .internal_executor(false)
330             .name(dbus_options.service_name)?
331             .serve_at(dbus_options.object_path.as_str(), dbus_iface)?
332             .build()
333             .await?;
334 
335         let iface_ref = conn
336             .object_server()
337             .interface::<_, DBusApi>(dbus_options.object_path)
338             .await?;
339 
340         Ok((conn, iface_ref))
341     })
342     .map_err(VmmError::CreateDBusSession)?;
343 
344     let (send_shutdown, recv_shutdown) = oneshot::channel::<()>();
345     let (send_done, recv_done) = oneshot::channel::<()>();
346 
347     // Retrieve seccomp filter for API thread
348     let api_seccomp_filter = get_seccomp_filter(seccomp_action, Thread::DBusApi, hypervisor_type)
349         .map_err(VmmError::CreateSeccompFilter)?;
350 
351     let thread_join_handle = thread::Builder::new()
352         .name("dbus-thread".to_string())
353         .spawn(move || {
354             // Apply seccomp filter for API thread.
355             if !api_seccomp_filter.is_empty() {
356                 apply_filter(&api_seccomp_filter)
357                     .map_err(VmmError::ApplySeccompFilter)
358                     .map_err(|e| {
359                         error!("Error applying seccomp filter: {:?}", e);
360                         exit_evt.write(1).ok();
361                         e
362                     })?;
363             }
364 
365             std::panic::catch_unwind(AssertUnwindSafe(move || {
366                 executor::block_on(async move {
367                     let recv_shutdown = recv_shutdown.fuse();
368                     let executor_tick = futures::future::Fuse::terminated();
369                     futures::pin_mut!(recv_shutdown, executor_tick);
370                     executor_tick.set(connection.executor().tick().fuse());
371 
372                     loop {
373                         futures::select! {
374                             _ = executor_tick => executor_tick.set(connection.executor().tick().fuse()),
375                             _ = recv_shutdown => {
376                                 send_done.send(()).ok();
377                                 break;
378                             },
379                             ret = dbus_options.event_monitor_rx.recv_async() => {
380                                 if let Ok(event) = ret {
381                                     DBusApi::event(iface_ref.signal_emitter(), event).await.ok();
382                                 }
383                             }
384                         }
385                     }
386                 })
387             }))
388             .map_err(|_| {
389                 error!("dbus-api thread panicked");
390                 exit_evt.write(1).ok()
391             })
392             .ok();
393 
394             Ok(())
395         })
396         .map_err(VmmError::DBusThreadSpawn)?;
397 
398     Ok((thread_join_handle, (send_shutdown, recv_done)))
399 }
400