xref: /cloud-hypervisor/vmm/src/api/dbus/mod.rs (revision 08cf983d420af7bce0cd67f34e660324ef219de6)
1 // Copyright © 2023 Sartura Ltd.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4 //
5 use super::{ApiAction, ApiRequest};
6 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))]
7 use crate::api::VmCoredump;
8 use crate::api::{
9     AddDisk, Body, VmAddDevice, VmAddFs, VmAddNet, VmAddPmem, VmAddUserDevice, VmAddVdpa,
10     VmAddVsock, VmBoot, VmCounters, VmCreate, VmDelete, VmInfo, VmPause, VmPowerButton, VmReboot,
11     VmReceiveMigration, VmRemoveDevice, VmResize, VmResizeZone, VmRestore, VmResume,
12     VmSendMigration, VmShutdown, VmSnapshot, VmmPing, VmmShutdown,
13 };
14 use crate::seccomp_filters::{get_seccomp_filter, Thread};
15 use crate::{Error as VmmError, Result as VmmResult};
16 use crate::{NetConfig, VmConfig};
17 use futures::channel::oneshot;
18 use futures::{executor, FutureExt};
19 use hypervisor::HypervisorType;
20 use seccompiler::{apply_filter, SeccompAction};
21 use std::panic::AssertUnwindSafe;
22 use std::sync::mpsc::Sender;
23 use std::sync::{Arc, Mutex};
24 use std::thread;
25 use vmm_sys_util::eventfd::EventFd;
26 use zbus::fdo::{self, Result};
27 use zbus::zvariant::Optional;
28 use zbus::{interface, ConnectionBuilder};
29 
30 pub type DBusApiShutdownChannels = (oneshot::Sender<()>, oneshot::Receiver<()>);
31 
32 pub struct DBusApiOptions {
33     pub service_name: String,
34     pub object_path: String,
35     pub system_bus: bool,
36     pub event_monitor_rx: flume::Receiver<Arc<String>>,
37 }
38 
39 pub struct DBusApi {
40     api_notifier: EventFd,
41     api_sender: futures::lock::Mutex<Sender<ApiRequest>>,
42 }
43 
44 fn api_error(error: impl std::fmt::Debug + std::fmt::Display) -> fdo::Error {
45     fdo::Error::Failed(format!("{error}"))
46 }
47 
48 // This method is intended to ensure that the DBusApi thread has enough time to
49 // send a response to the VmmShutdown method call before it is terminated. If
50 // this step is omitted, the thread may be terminated before it can send a
51 // response, resulting in an error message stating that the message recipient
52 // disconnected from the message bus without providing a reply.
53 pub fn dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels) {
54     let (send_shutdown, mut recv_done) = ch;
55 
56     // send the shutdown signal and return
57     // if it errors out
58     if send_shutdown.send(()).is_err() {
59         return;
60     }
61 
62     // loop until `recv_err` errors out
63     // or as long as the return value indicates
64     // "immediately stale" (None)
65     while let Ok(None) = recv_done.try_recv() {}
66 }
67 
68 impl DBusApi {
69     pub fn new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self {
70         Self {
71             api_notifier,
72             api_sender: futures::lock::Mutex::new(api_sender),
73         }
74     }
75 
76     async fn clone_api_sender(&self) -> Sender<ApiRequest> {
77         // lock the async mutex, clone the `Sender` and then immediately
78         // drop the MutexGuard so that other tasks can clone the
79         // `Sender` as well
80         self.api_sender.lock().await.clone()
81     }
82 
83     fn clone_api_notifier(&self) -> Result<EventFd> {
84         self.api_notifier
85             .try_clone()
86             .map_err(|err| fdo::Error::IOError(format!("{err:?}")))
87     }
88 
89     async fn vm_action<Action: ApiAction<ResponseBody = Option<Body>>>(
90         &self,
91         action: &'static Action,
92         body: Action::RequestBody,
93     ) -> Result<Optional<String>> {
94         let api_sender = self.clone_api_sender().await;
95         let api_notifier = self.clone_api_notifier()?;
96 
97         let result = blocking::unblock(move || action.send(api_notifier, api_sender, body))
98             .await
99             .map_err(api_error)?
100             // We're using `from_utf8_lossy` here to not deal with the
101             // error case of `from_utf8` as we know that `b.body` is valid JSON.
102             .map(|b| String::from_utf8_lossy(&b.body).to_string());
103 
104         Ok(result.into())
105     }
106 }
107 
108 #[interface(name = "org.cloudhypervisor.DBusApi1")]
109 impl DBusApi {
110     async fn vmm_ping(&self) -> Result<String> {
111         let api_sender = self.clone_api_sender().await;
112         let api_notifier = self.clone_api_notifier()?;
113 
114         let result = blocking::unblock(move || VmmPing.send(api_notifier, api_sender, ()))
115             .await
116             .map_err(api_error)?;
117         serde_json::to_string(&result).map_err(api_error)
118     }
119 
120     async fn vmm_shutdown(&self) -> Result<()> {
121         let api_sender = self.clone_api_sender().await;
122         let api_notifier = self.clone_api_notifier()?;
123 
124         blocking::unblock(move || VmmShutdown.send(api_notifier, api_sender, ()))
125             .await
126             .map_err(api_error)
127     }
128 
129     async fn vm_add_device(&self, device_config: String) -> Result<Optional<String>> {
130         let device_config = serde_json::from_str(&device_config).map_err(api_error)?;
131         self.vm_action(&VmAddDevice, device_config).await
132     }
133 
134     async fn vm_add_disk(&self, disk_config: String) -> Result<Optional<String>> {
135         let disk_config = serde_json::from_str(&disk_config).map_err(api_error)?;
136         self.vm_action(&AddDisk, disk_config).await
137     }
138 
139     async fn vm_add_fs(&self, fs_config: String) -> Result<Optional<String>> {
140         let fs_config = serde_json::from_str(&fs_config).map_err(api_error)?;
141         self.vm_action(&VmAddFs, fs_config).await
142     }
143 
144     async fn vm_add_net(&self, net_config: String) -> Result<Optional<String>> {
145         let mut net_config: NetConfig = serde_json::from_str(&net_config).map_err(api_error)?;
146         if net_config.fds.is_some() {
147             warn!("Ignoring FDs sent via the D-Bus request body");
148             net_config.fds = None;
149         }
150         self.vm_action(&VmAddNet, net_config).await
151     }
152 
153     async fn vm_add_pmem(&self, pmem_config: String) -> Result<Optional<String>> {
154         let pmem_config = serde_json::from_str(&pmem_config).map_err(api_error)?;
155         self.vm_action(&VmAddPmem, pmem_config).await
156     }
157 
158     async fn vm_add_user_device(&self, vm_add_user_device: String) -> Result<Optional<String>> {
159         let vm_add_user_device = serde_json::from_str(&vm_add_user_device).map_err(api_error)?;
160         self.vm_action(&VmAddUserDevice, vm_add_user_device).await
161     }
162 
163     async fn vm_add_vdpa(&self, vdpa_config: String) -> Result<Optional<String>> {
164         let vdpa_config = serde_json::from_str(&vdpa_config).map_err(api_error)?;
165         self.vm_action(&VmAddVdpa, vdpa_config).await
166     }
167 
168     async fn vm_add_vsock(&self, vsock_config: String) -> Result<Optional<String>> {
169         let vsock_config = serde_json::from_str(&vsock_config).map_err(api_error)?;
170         self.vm_action(&VmAddVsock, vsock_config).await
171     }
172 
173     async fn vm_boot(&self) -> Result<()> {
174         self.vm_action(&VmBoot, ()).await.map(|_| ())
175     }
176 
177     #[allow(unused_variables)]
178     // zbus doesn't support cfg attributes on interface methods
179     // as a workaround, we make the *call to the internal API* conditionally
180     // compile and return an error on unsupported platforms.
181     async fn vm_coredump(&self, vm_coredump_data: String) -> Result<()> {
182         #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))]
183         {
184             let vm_coredump_data = serde_json::from_str(&vm_coredump_data).map_err(api_error)?;
185             self.vm_action(&VmCoredump, vm_coredump_data)
186                 .await
187                 .map(|_| ())
188         }
189 
190         #[cfg(not(all(target_arch = "x86_64", feature = "guest_debug")))]
191         Err(api_error(
192             "VmCoredump only works on x86_64 with the `guest_debug` feature enabled",
193         ))
194     }
195 
196     async fn vm_counters(&self) -> Result<Optional<String>> {
197         self.vm_action(&VmCounters, ()).await
198     }
199 
200     async fn vm_create(&self, vm_config: String) -> Result<()> {
201         let api_sender = self.clone_api_sender().await;
202         let api_notifier = self.clone_api_notifier()?;
203 
204         let mut vm_config: VmConfig = serde_json::from_str(&vm_config).map_err(api_error)?;
205 
206         if let Some(ref mut nets) = vm_config.net {
207             if nets.iter().any(|net| net.fds.is_some()) {
208                 warn!("Ignoring FDs sent via the D-Bus request body");
209             }
210             for net in nets {
211                 net.fds = None;
212             }
213         }
214 
215         blocking::unblock(move || {
216             VmCreate.send(api_notifier, api_sender, Arc::new(Mutex::new(vm_config)))
217         })
218         .await
219         .map_err(api_error)?;
220 
221         Ok(())
222     }
223 
224     async fn vm_delete(&self) -> Result<()> {
225         self.vm_action(&VmDelete, ()).await.map(|_| ())
226     }
227 
228     async fn vm_info(&self) -> Result<String> {
229         let api_sender = self.clone_api_sender().await;
230         let api_notifier = self.clone_api_notifier()?;
231 
232         let result = blocking::unblock(move || VmInfo.send(api_notifier, api_sender, ()))
233             .await
234             .map_err(api_error)?;
235         serde_json::to_string(&result).map_err(api_error)
236     }
237 
238     async fn vm_pause(&self) -> Result<()> {
239         self.vm_action(&VmPause, ()).await.map(|_| ())
240     }
241 
242     async fn vm_power_button(&self) -> Result<()> {
243         self.vm_action(&VmPowerButton, ()).await.map(|_| ())
244     }
245 
246     async fn vm_reboot(&self) -> Result<()> {
247         self.vm_action(&VmReboot, ()).await.map(|_| ())
248     }
249 
250     async fn vm_remove_device(&self, vm_remove_device: String) -> Result<()> {
251         let vm_remove_device = serde_json::from_str(&vm_remove_device).map_err(api_error)?;
252         self.vm_action(&VmRemoveDevice, vm_remove_device)
253             .await
254             .map(|_| ())
255     }
256 
257     async fn vm_resize(&self, vm_resize: String) -> Result<()> {
258         let vm_resize = serde_json::from_str(&vm_resize).map_err(api_error)?;
259         self.vm_action(&VmResize, vm_resize).await.map(|_| ())
260     }
261 
262     async fn vm_resize_zone(&self, vm_resize_zone: String) -> Result<()> {
263         let vm_resize_zone = serde_json::from_str(&vm_resize_zone).map_err(api_error)?;
264         self.vm_action(&VmResizeZone, vm_resize_zone)
265             .await
266             .map(|_| ())
267     }
268 
269     async fn vm_restore(&self, restore_config: String) -> Result<()> {
270         let restore_config = serde_json::from_str(&restore_config).map_err(api_error)?;
271         self.vm_action(&VmRestore, restore_config).await.map(|_| ())
272     }
273 
274     async fn vm_receive_migration(&self, receive_migration_data: String) -> Result<()> {
275         let receive_migration_data =
276             serde_json::from_str(&receive_migration_data).map_err(api_error)?;
277         self.vm_action(&VmReceiveMigration, receive_migration_data)
278             .await
279             .map(|_| ())
280     }
281 
282     async fn vm_send_migration(&self, send_migration_data: String) -> Result<()> {
283         let send_migration_data = serde_json::from_str(&send_migration_data).map_err(api_error)?;
284         self.vm_action(&VmSendMigration, send_migration_data)
285             .await
286             .map(|_| ())
287     }
288 
289     async fn vm_resume(&self) -> Result<()> {
290         self.vm_action(&VmResume, ()).await.map(|_| ())
291     }
292 
293     async fn vm_shutdown(&self) -> Result<()> {
294         self.vm_action(&VmShutdown, ()).await.map(|_| ())
295     }
296 
297     async fn vm_snapshot(&self, vm_snapshot_config: String) -> Result<()> {
298         let vm_snapshot_config = serde_json::from_str(&vm_snapshot_config).map_err(api_error)?;
299         self.vm_action(&VmSnapshot, vm_snapshot_config)
300             .await
301             .map(|_| ())
302     }
303 
304     // implementation of this function is provided by the `#[zbus(signal)]` macro call
305     #[zbus(signal)]
306     async fn event(ctxt: &zbus::SignalContext<'_>, event: Arc<String>) -> zbus::Result<()>;
307 }
308 
309 pub fn start_dbus_thread(
310     dbus_options: DBusApiOptions,
311     api_notifier: EventFd,
312     api_sender: Sender<ApiRequest>,
313     seccomp_action: &SeccompAction,
314     exit_evt: EventFd,
315     hypervisor_type: HypervisorType,
316 ) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> {
317     let dbus_iface = DBusApi::new(api_notifier, api_sender);
318     let (connection, iface_ref) = executor::block_on(async move {
319         let conn_builder = if dbus_options.system_bus {
320             ConnectionBuilder::system()?
321         } else {
322             ConnectionBuilder::session()?
323         };
324 
325         let conn = conn_builder
326             .internal_executor(false)
327             .name(dbus_options.service_name)?
328             .serve_at(dbus_options.object_path.as_str(), dbus_iface)?
329             .build()
330             .await?;
331 
332         let iface_ref = conn
333             .object_server()
334             .interface::<_, DBusApi>(dbus_options.object_path)
335             .await?;
336 
337         Ok((conn, iface_ref))
338     })
339     .map_err(VmmError::CreateDBusSession)?;
340 
341     let (send_shutdown, recv_shutdown) = oneshot::channel::<()>();
342     let (send_done, recv_done) = oneshot::channel::<()>();
343 
344     // Retrieve seccomp filter for API thread
345     let api_seccomp_filter = get_seccomp_filter(seccomp_action, Thread::DBusApi, hypervisor_type)
346         .map_err(VmmError::CreateSeccompFilter)?;
347 
348     let thread_join_handle = thread::Builder::new()
349         .name("dbus-thread".to_string())
350         .spawn(move || {
351             // Apply seccomp filter for API thread.
352             if !api_seccomp_filter.is_empty() {
353                 apply_filter(&api_seccomp_filter)
354                     .map_err(VmmError::ApplySeccompFilter)
355                     .map_err(|e| {
356                         error!("Error applying seccomp filter: {:?}", e);
357                         exit_evt.write(1).ok();
358                         e
359                     })?;
360             }
361 
362             std::panic::catch_unwind(AssertUnwindSafe(move || {
363                 executor::block_on(async move {
364                     let recv_shutdown = recv_shutdown.fuse();
365                     let executor_tick = futures::future::Fuse::terminated();
366                     futures::pin_mut!(recv_shutdown, executor_tick);
367                     executor_tick.set(connection.executor().tick().fuse());
368 
369                     loop {
370                         futures::select! {
371                             _ = executor_tick => executor_tick.set(connection.executor().tick().fuse()),
372                             _ = recv_shutdown => {
373                                 send_done.send(()).ok();
374                                 break;
375                             },
376                             ret = dbus_options.event_monitor_rx.recv_async() => {
377                                 if let Ok(event) = ret {
378                                     DBusApi::event(iface_ref.signal_context(), event).await.ok();
379                                 }
380                             }
381                         }
382                     }
383                 })
384             }))
385             .map_err(|_| {
386                 error!("dbus-api thread panicked");
387                 exit_evt.write(1).ok()
388             })
389             .ok();
390 
391             Ok(())
392         })
393         .map_err(VmmError::DBusThreadSpawn)?;
394 
395     Ok((thread_join_handle, (send_shutdown, recv_done)))
396 }
397