xref: /cloud-hypervisor/vmm/src/api/dbus/mod.rs (revision 80b2c98a68d4c68f372f849e8d26f7cae5867000)
1 // Copyright © 2023 Sartura Ltd.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4 //
5 use std::panic::AssertUnwindSafe;
6 use std::sync::mpsc::Sender;
7 use std::sync::Arc;
8 use std::thread;
9 
10 use futures::channel::oneshot;
11 use futures::{executor, FutureExt};
12 use hypervisor::HypervisorType;
13 use seccompiler::{apply_filter, SeccompAction};
14 use vmm_sys_util::eventfd::EventFd;
15 use zbus::fdo::{self, Result};
16 use zbus::zvariant::Optional;
17 use zbus::{interface, ConnectionBuilder};
18 
19 use super::{ApiAction, ApiRequest};
20 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))]
21 use crate::api::VmCoredump;
22 use crate::api::{
23     AddDisk, Body, VmAddDevice, VmAddFs, VmAddNet, VmAddPmem, VmAddUserDevice, VmAddVdpa,
24     VmAddVsock, VmBoot, VmCounters, VmCreate, VmDelete, VmInfo, VmPause, VmPowerButton, VmReboot,
25     VmReceiveMigration, VmRemoveDevice, VmResize, VmResizeZone, VmRestore, VmResume,
26     VmSendMigration, VmShutdown, VmSnapshot, VmmPing, VmmShutdown,
27 };
28 use crate::seccomp_filters::{get_seccomp_filter, Thread};
29 use crate::{Error as VmmError, NetConfig, Result as VmmResult, VmConfig};
30 
31 pub type DBusApiShutdownChannels = (oneshot::Sender<()>, oneshot::Receiver<()>);
32 
33 pub struct DBusApiOptions {
34     pub service_name: String,
35     pub object_path: String,
36     pub system_bus: bool,
37     pub event_monitor_rx: flume::Receiver<Arc<String>>,
38 }
39 
40 pub struct DBusApi {
41     api_notifier: EventFd,
42     api_sender: futures::lock::Mutex<Sender<ApiRequest>>,
43 }
44 
45 fn api_error(error: impl std::fmt::Debug + std::fmt::Display) -> fdo::Error {
46     fdo::Error::Failed(format!("{error}"))
47 }
48 
49 // This method is intended to ensure that the DBusApi thread has enough time to
50 // send a response to the VmmShutdown method call before it is terminated. If
51 // this step is omitted, the thread may be terminated before it can send a
52 // response, resulting in an error message stating that the message recipient
53 // disconnected from the message bus without providing a reply.
54 pub fn dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels) {
55     let (send_shutdown, mut recv_done) = ch;
56 
57     // send the shutdown signal and return
58     // if it errors out
59     if send_shutdown.send(()).is_err() {
60         return;
61     }
62 
63     // loop until `recv_err` errors out
64     // or as long as the return value indicates
65     // "immediately stale" (None)
66     while let Ok(None) = recv_done.try_recv() {}
67 }
68 
69 impl DBusApi {
70     pub fn new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self {
71         Self {
72             api_notifier,
73             api_sender: futures::lock::Mutex::new(api_sender),
74         }
75     }
76 
77     async fn clone_api_sender(&self) -> Sender<ApiRequest> {
78         // lock the async mutex, clone the `Sender` and then immediately
79         // drop the MutexGuard so that other tasks can clone the
80         // `Sender` as well
81         self.api_sender.lock().await.clone()
82     }
83 
84     fn clone_api_notifier(&self) -> Result<EventFd> {
85         self.api_notifier
86             .try_clone()
87             .map_err(|err| fdo::Error::IOError(format!("{err:?}")))
88     }
89 
90     async fn vm_action<Action: ApiAction<ResponseBody = Option<Body>>>(
91         &self,
92         action: &'static Action,
93         body: Action::RequestBody,
94     ) -> Result<Optional<String>> {
95         let api_sender = self.clone_api_sender().await;
96         let api_notifier = self.clone_api_notifier()?;
97 
98         let result = blocking::unblock(move || action.send(api_notifier, api_sender, body))
99             .await
100             .map_err(api_error)?
101             // We're using `from_utf8_lossy` here to not deal with the
102             // error case of `from_utf8` as we know that `b.body` is valid JSON.
103             .map(|b| String::from_utf8_lossy(&b.body).to_string());
104 
105         Ok(result.into())
106     }
107 }
108 
109 #[interface(name = "org.cloudhypervisor.DBusApi1")]
110 impl DBusApi {
111     async fn vmm_ping(&self) -> Result<String> {
112         let api_sender = self.clone_api_sender().await;
113         let api_notifier = self.clone_api_notifier()?;
114 
115         let result = blocking::unblock(move || VmmPing.send(api_notifier, api_sender, ()))
116             .await
117             .map_err(api_error)?;
118         serde_json::to_string(&result).map_err(api_error)
119     }
120 
121     async fn vmm_shutdown(&self) -> Result<()> {
122         let api_sender = self.clone_api_sender().await;
123         let api_notifier = self.clone_api_notifier()?;
124 
125         blocking::unblock(move || VmmShutdown.send(api_notifier, api_sender, ()))
126             .await
127             .map_err(api_error)
128     }
129 
130     async fn vm_add_device(&self, device_config: String) -> Result<Optional<String>> {
131         let device_config = serde_json::from_str(&device_config).map_err(api_error)?;
132         self.vm_action(&VmAddDevice, device_config).await
133     }
134 
135     async fn vm_add_disk(&self, disk_config: String) -> Result<Optional<String>> {
136         let disk_config = serde_json::from_str(&disk_config).map_err(api_error)?;
137         self.vm_action(&AddDisk, disk_config).await
138     }
139 
140     async fn vm_add_fs(&self, fs_config: String) -> Result<Optional<String>> {
141         let fs_config = serde_json::from_str(&fs_config).map_err(api_error)?;
142         self.vm_action(&VmAddFs, fs_config).await
143     }
144 
145     async fn vm_add_net(&self, net_config: String) -> Result<Optional<String>> {
146         let mut net_config: NetConfig = serde_json::from_str(&net_config).map_err(api_error)?;
147         if net_config.fds.is_some() {
148             warn!("Ignoring FDs sent via the D-Bus request body");
149             net_config.fds = None;
150         }
151         self.vm_action(&VmAddNet, net_config).await
152     }
153 
154     async fn vm_add_pmem(&self, pmem_config: String) -> Result<Optional<String>> {
155         let pmem_config = serde_json::from_str(&pmem_config).map_err(api_error)?;
156         self.vm_action(&VmAddPmem, pmem_config).await
157     }
158 
159     async fn vm_add_user_device(&self, vm_add_user_device: String) -> Result<Optional<String>> {
160         let vm_add_user_device = serde_json::from_str(&vm_add_user_device).map_err(api_error)?;
161         self.vm_action(&VmAddUserDevice, vm_add_user_device).await
162     }
163 
164     async fn vm_add_vdpa(&self, vdpa_config: String) -> Result<Optional<String>> {
165         let vdpa_config = serde_json::from_str(&vdpa_config).map_err(api_error)?;
166         self.vm_action(&VmAddVdpa, vdpa_config).await
167     }
168 
169     async fn vm_add_vsock(&self, vsock_config: String) -> Result<Optional<String>> {
170         let vsock_config = serde_json::from_str(&vsock_config).map_err(api_error)?;
171         self.vm_action(&VmAddVsock, vsock_config).await
172     }
173 
174     async fn vm_boot(&self) -> Result<()> {
175         self.vm_action(&VmBoot, ()).await.map(|_| ())
176     }
177 
178     #[allow(unused_variables)]
179     // zbus doesn't support cfg attributes on interface methods
180     // as a workaround, we make the *call to the internal API* conditionally
181     // compile and return an error on unsupported platforms.
182     async fn vm_coredump(&self, vm_coredump_data: String) -> Result<()> {
183         #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))]
184         {
185             let vm_coredump_data = serde_json::from_str(&vm_coredump_data).map_err(api_error)?;
186             self.vm_action(&VmCoredump, vm_coredump_data)
187                 .await
188                 .map(|_| ())
189         }
190 
191         #[cfg(not(all(target_arch = "x86_64", feature = "guest_debug")))]
192         Err(api_error(
193             "VmCoredump only works on x86_64 with the `guest_debug` feature enabled",
194         ))
195     }
196 
197     async fn vm_counters(&self) -> Result<Optional<String>> {
198         self.vm_action(&VmCounters, ()).await
199     }
200 
201     async fn vm_create(&self, vm_config: String) -> Result<()> {
202         let api_sender = self.clone_api_sender().await;
203         let api_notifier = self.clone_api_notifier()?;
204 
205         let mut vm_config: Box<VmConfig> = serde_json::from_str(&vm_config).map_err(api_error)?;
206 
207         if let Some(ref mut nets) = vm_config.net {
208             if nets.iter().any(|net| net.fds.is_some()) {
209                 warn!("Ignoring FDs sent via the D-Bus request body");
210             }
211             for net in nets {
212                 net.fds = None;
213             }
214         }
215 
216         blocking::unblock(move || VmCreate.send(api_notifier, api_sender, vm_config))
217             .await
218             .map_err(api_error)?;
219 
220         Ok(())
221     }
222 
223     async fn vm_delete(&self) -> Result<()> {
224         self.vm_action(&VmDelete, ()).await.map(|_| ())
225     }
226 
227     async fn vm_info(&self) -> Result<String> {
228         let api_sender = self.clone_api_sender().await;
229         let api_notifier = self.clone_api_notifier()?;
230 
231         let result = blocking::unblock(move || VmInfo.send(api_notifier, api_sender, ()))
232             .await
233             .map_err(api_error)?;
234         serde_json::to_string(&result).map_err(api_error)
235     }
236 
237     async fn vm_pause(&self) -> Result<()> {
238         self.vm_action(&VmPause, ()).await.map(|_| ())
239     }
240 
241     async fn vm_power_button(&self) -> Result<()> {
242         self.vm_action(&VmPowerButton, ()).await.map(|_| ())
243     }
244 
245     async fn vm_reboot(&self) -> Result<()> {
246         self.vm_action(&VmReboot, ()).await.map(|_| ())
247     }
248 
249     async fn vm_remove_device(&self, vm_remove_device: String) -> Result<()> {
250         let vm_remove_device = serde_json::from_str(&vm_remove_device).map_err(api_error)?;
251         self.vm_action(&VmRemoveDevice, vm_remove_device)
252             .await
253             .map(|_| ())
254     }
255 
256     async fn vm_resize(&self, vm_resize: String) -> Result<()> {
257         let vm_resize = serde_json::from_str(&vm_resize).map_err(api_error)?;
258         self.vm_action(&VmResize, vm_resize).await.map(|_| ())
259     }
260 
261     async fn vm_resize_zone(&self, vm_resize_zone: String) -> Result<()> {
262         let vm_resize_zone = serde_json::from_str(&vm_resize_zone).map_err(api_error)?;
263         self.vm_action(&VmResizeZone, vm_resize_zone)
264             .await
265             .map(|_| ())
266     }
267 
268     async fn vm_restore(&self, restore_config: String) -> Result<()> {
269         let restore_config = serde_json::from_str(&restore_config).map_err(api_error)?;
270         self.vm_action(&VmRestore, restore_config).await.map(|_| ())
271     }
272 
273     async fn vm_receive_migration(&self, receive_migration_data: String) -> Result<()> {
274         let receive_migration_data =
275             serde_json::from_str(&receive_migration_data).map_err(api_error)?;
276         self.vm_action(&VmReceiveMigration, receive_migration_data)
277             .await
278             .map(|_| ())
279     }
280 
281     async fn vm_send_migration(&self, send_migration_data: String) -> Result<()> {
282         let send_migration_data = serde_json::from_str(&send_migration_data).map_err(api_error)?;
283         self.vm_action(&VmSendMigration, send_migration_data)
284             .await
285             .map(|_| ())
286     }
287 
288     async fn vm_resume(&self) -> Result<()> {
289         self.vm_action(&VmResume, ()).await.map(|_| ())
290     }
291 
292     async fn vm_shutdown(&self) -> Result<()> {
293         self.vm_action(&VmShutdown, ()).await.map(|_| ())
294     }
295 
296     async fn vm_snapshot(&self, vm_snapshot_config: String) -> Result<()> {
297         let vm_snapshot_config = serde_json::from_str(&vm_snapshot_config).map_err(api_error)?;
298         self.vm_action(&VmSnapshot, vm_snapshot_config)
299             .await
300             .map(|_| ())
301     }
302 
303     // implementation of this function is provided by the `#[zbus(signal)]` macro call
304     #[zbus(signal)]
305     async fn event(ctxt: &zbus::SignalContext<'_>, event: Arc<String>) -> zbus::Result<()>;
306 }
307 
308 pub fn start_dbus_thread(
309     dbus_options: DBusApiOptions,
310     api_notifier: EventFd,
311     api_sender: Sender<ApiRequest>,
312     seccomp_action: &SeccompAction,
313     exit_evt: EventFd,
314     hypervisor_type: HypervisorType,
315 ) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> {
316     let dbus_iface = DBusApi::new(api_notifier, api_sender);
317     let (connection, iface_ref) = executor::block_on(async move {
318         let conn_builder = if dbus_options.system_bus {
319             ConnectionBuilder::system()?
320         } else {
321             ConnectionBuilder::session()?
322         };
323 
324         let conn = conn_builder
325             .internal_executor(false)
326             .name(dbus_options.service_name)?
327             .serve_at(dbus_options.object_path.as_str(), dbus_iface)?
328             .build()
329             .await?;
330 
331         let iface_ref = conn
332             .object_server()
333             .interface::<_, DBusApi>(dbus_options.object_path)
334             .await?;
335 
336         Ok((conn, iface_ref))
337     })
338     .map_err(VmmError::CreateDBusSession)?;
339 
340     let (send_shutdown, recv_shutdown) = oneshot::channel::<()>();
341     let (send_done, recv_done) = oneshot::channel::<()>();
342 
343     // Retrieve seccomp filter for API thread
344     let api_seccomp_filter = get_seccomp_filter(seccomp_action, Thread::DBusApi, hypervisor_type)
345         .map_err(VmmError::CreateSeccompFilter)?;
346 
347     let thread_join_handle = thread::Builder::new()
348         .name("dbus-thread".to_string())
349         .spawn(move || {
350             // Apply seccomp filter for API thread.
351             if !api_seccomp_filter.is_empty() {
352                 apply_filter(&api_seccomp_filter)
353                     .map_err(VmmError::ApplySeccompFilter)
354                     .map_err(|e| {
355                         error!("Error applying seccomp filter: {:?}", e);
356                         exit_evt.write(1).ok();
357                         e
358                     })?;
359             }
360 
361             std::panic::catch_unwind(AssertUnwindSafe(move || {
362                 executor::block_on(async move {
363                     let recv_shutdown = recv_shutdown.fuse();
364                     let executor_tick = futures::future::Fuse::terminated();
365                     futures::pin_mut!(recv_shutdown, executor_tick);
366                     executor_tick.set(connection.executor().tick().fuse());
367 
368                     loop {
369                         futures::select! {
370                             _ = executor_tick => executor_tick.set(connection.executor().tick().fuse()),
371                             _ = recv_shutdown => {
372                                 send_done.send(()).ok();
373                                 break;
374                             },
375                             ret = dbus_options.event_monitor_rx.recv_async() => {
376                                 if let Ok(event) = ret {
377                                     DBusApi::event(iface_ref.signal_context(), event).await.ok();
378                                 }
379                             }
380                         }
381                     }
382                 })
383             }))
384             .map_err(|_| {
385                 error!("dbus-api thread panicked");
386                 exit_evt.write(1).ok()
387             })
388             .ok();
389 
390             Ok(())
391         })
392         .map_err(VmmError::DBusThreadSpawn)?;
393 
394     Ok((thread_join_handle, (send_shutdown, recv_done)))
395 }
396