xref: /cloud-hypervisor/vmm/src/api/dbus/mod.rs (revision 07d1208dd53a207a65b649b8952780dfd0ca59d9)
1 // Copyright © 2023 Sartura Ltd.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4 //
5 use super::{ApiRequest, VmAction};
6 use crate::seccomp_filters::{get_seccomp_filter, Thread};
7 use crate::{Error as VmmError, Result as VmmResult};
8 use crate::{NetConfig, VmConfig};
9 use futures::channel::oneshot;
10 use futures::{executor, FutureExt};
11 use hypervisor::HypervisorType;
12 use seccompiler::{apply_filter, SeccompAction};
13 use std::panic::AssertUnwindSafe;
14 use std::sync::mpsc::Sender;
15 use std::sync::{Arc, Mutex};
16 use std::thread;
17 use vmm_sys_util::eventfd::EventFd;
18 use zbus::fdo::{self, Result};
19 use zbus::zvariant::Optional;
20 use zbus::{dbus_interface, ConnectionBuilder};
21 
22 pub type DBusApiShutdownChannels = (oneshot::Sender<()>, oneshot::Receiver<()>);
23 
24 pub struct DBusApiOptions {
25     pub service_name: String,
26     pub object_path: String,
27     pub system_bus: bool,
28     pub event_monitor_rx: flume::Receiver<Arc<String>>,
29 }
30 
31 pub struct DBusApi {
32     api_notifier: EventFd,
33     api_sender: futures::lock::Mutex<Sender<ApiRequest>>,
34 }
35 
36 fn api_error(error: impl std::fmt::Debug) -> fdo::Error {
37     fdo::Error::Failed(format!("{error:?}"))
38 }
39 
40 // This method is intended to ensure that the DBusApi thread has enough time to
41 // send a response to the VmmShutdown method call before it is terminated. If
42 // this step is omitted, the thread may be terminated before it can send a
43 // response, resulting in an error message stating that the message recipient
44 // disconnected from the message bus without providing a reply.
45 pub fn dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels) {
46     let (send_shutdown, mut recv_done) = ch;
47 
48     // send the shutdown signal and return
49     // if it errors out
50     if send_shutdown.send(()).is_err() {
51         return;
52     }
53 
54     // loop until `recv_err` errors out
55     // or as long as the return value indicates
56     // "immediately stale" (None)
57     while let Ok(None) = recv_done.try_recv() {}
58 }
59 
60 impl DBusApi {
61     pub fn new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self {
62         Self {
63             api_notifier,
64             api_sender: futures::lock::Mutex::new(api_sender),
65         }
66     }
67 
68     async fn clone_api_sender(&self) -> Sender<ApiRequest> {
69         // lock the async mutex, clone the `Sender` and then immediately
70         // drop the MutexGuard so that other tasks can clone the
71         // `Sender` as well
72         self.api_sender.lock().await.clone()
73     }
74 
75     fn clone_api_notifier(&self) -> Result<EventFd> {
76         self.api_notifier
77             .try_clone()
78             .map_err(|err| fdo::Error::IOError(format!("{err:?}")))
79     }
80 
81     async fn vm_action(&self, action: VmAction) -> Result<Optional<String>> {
82         let api_sender = self.clone_api_sender().await;
83         let api_notifier = self.clone_api_notifier()?;
84 
85         let result = blocking::unblock(move || super::vm_action(api_notifier, api_sender, action))
86             .await
87             .map_err(api_error)?
88             // We're using `from_utf8_lossy` here to not deal with the
89             // error case of `from_utf8` as we know that `b.body` is valid JSON.
90             .map(|b| String::from_utf8_lossy(&b.body).to_string());
91 
92         Ok(result.into())
93     }
94 }
95 
96 #[dbus_interface(name = "org.cloudhypervisor.DBusApi1")]
97 impl DBusApi {
98     async fn vmm_ping(&self) -> Result<String> {
99         let api_sender = self.clone_api_sender().await;
100         let api_notifier = self.clone_api_notifier()?;
101 
102         let result = blocking::unblock(move || super::vmm_ping(api_notifier, api_sender))
103             .await
104             .map_err(api_error)?;
105         serde_json::to_string(&result).map_err(api_error)
106     }
107 
108     async fn vmm_shutdown(&self) -> Result<()> {
109         let api_sender = self.clone_api_sender().await;
110         let api_notifier = self.clone_api_notifier()?;
111 
112         blocking::unblock(move || super::vmm_shutdown(api_notifier, api_sender))
113             .await
114             .map_err(api_error)
115     }
116 
117     async fn vm_add_device(&self, device_config: String) -> Result<Optional<String>> {
118         let device_config = serde_json::from_str(&device_config).map_err(api_error)?;
119         self.vm_action(VmAction::AddDevice(Arc::new(device_config)))
120             .await
121     }
122 
123     async fn vm_add_disk(&self, disk_config: String) -> Result<Optional<String>> {
124         let disk_config = serde_json::from_str(&disk_config).map_err(api_error)?;
125         self.vm_action(VmAction::AddDisk(Arc::new(disk_config)))
126             .await
127     }
128 
129     async fn vm_add_fs(&self, fs_config: String) -> Result<Optional<String>> {
130         let fs_config = serde_json::from_str(&fs_config).map_err(api_error)?;
131         self.vm_action(VmAction::AddFs(Arc::new(fs_config))).await
132     }
133 
134     async fn vm_add_net(&self, net_config: String) -> Result<Optional<String>> {
135         let mut net_config: NetConfig = serde_json::from_str(&net_config).map_err(api_error)?;
136         if net_config.fds.is_some() {
137             warn!("Ignoring FDs sent via the D-Bus request body");
138             net_config.fds = None;
139         }
140         self.vm_action(VmAction::AddNet(Arc::new(net_config))).await
141     }
142 
143     async fn vm_add_pmem(&self, pmem_config: String) -> Result<Optional<String>> {
144         let pmem_config = serde_json::from_str(&pmem_config).map_err(api_error)?;
145         self.vm_action(VmAction::AddPmem(Arc::new(pmem_config)))
146             .await
147     }
148 
149     async fn vm_add_user_device(&self, vm_add_user_device: String) -> Result<Optional<String>> {
150         let vm_add_user_device = serde_json::from_str(&vm_add_user_device).map_err(api_error)?;
151         self.vm_action(VmAction::AddUserDevice(Arc::new(vm_add_user_device)))
152             .await
153     }
154 
155     async fn vm_add_vdpa(&self, vdpa_config: String) -> Result<Optional<String>> {
156         let vdpa_config = serde_json::from_str(&vdpa_config).map_err(api_error)?;
157         self.vm_action(VmAction::AddVdpa(Arc::new(vdpa_config)))
158             .await
159     }
160 
161     async fn vm_add_vsock(&self, vsock_config: String) -> Result<Optional<String>> {
162         let vsock_config = serde_json::from_str(&vsock_config).map_err(api_error)?;
163         self.vm_action(VmAction::AddVsock(Arc::new(vsock_config)))
164             .await
165     }
166 
167     async fn vm_boot(&self) -> Result<()> {
168         self.vm_action(VmAction::Boot).await.map(|_| ())
169     }
170 
171     #[allow(unused_variables)]
172     // zbus doesn't support cfg attributes on interface methods
173     // as a workaround, we make the *call to the internal API* conditionally
174     // compile and return an error on unsupported platforms.
175     async fn vm_coredump(&self, vm_coredump_data: String) -> Result<()> {
176         #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))]
177         {
178             let vm_coredump_data = serde_json::from_str(&vm_coredump_data).map_err(api_error)?;
179             self.vm_action(VmAction::Coredump(Arc::new(vm_coredump_data)))
180                 .await
181                 .map(|_| ())
182         }
183 
184         #[cfg(not(all(target_arch = "x86_64", feature = "guest_debug")))]
185         Err(api_error(
186             "VmCoredump only works on x86_64 with the `guest_debug` feature enabled",
187         ))
188     }
189 
190     async fn vm_counters(&self) -> Result<Optional<String>> {
191         self.vm_action(VmAction::Counters).await
192     }
193 
194     async fn vm_create(&self, vm_config: String) -> Result<()> {
195         let api_sender = self.clone_api_sender().await;
196         let api_notifier = self.clone_api_notifier()?;
197 
198         let mut vm_config: VmConfig = serde_json::from_str(&vm_config).map_err(api_error)?;
199 
200         if let Some(ref mut nets) = vm_config.net {
201             if nets.iter().any(|net| net.fds.is_some()) {
202                 warn!("Ignoring FDs sent via the D-Bus request body");
203             }
204             for net in nets {
205                 net.fds = None;
206             }
207         }
208 
209         blocking::unblock(move || {
210             super::vm_create(api_notifier, api_sender, Arc::new(Mutex::new(vm_config)))
211         })
212         .await
213         .map_err(api_error)?;
214 
215         Ok(())
216     }
217 
218     async fn vm_delete(&self) -> Result<()> {
219         self.vm_action(VmAction::Delete).await.map(|_| ())
220     }
221 
222     async fn vm_info(&self) -> Result<String> {
223         let api_sender = self.clone_api_sender().await;
224         let api_notifier = self.clone_api_notifier()?;
225 
226         let result = blocking::unblock(move || super::vm_info(api_notifier, api_sender))
227             .await
228             .map_err(api_error)?;
229         serde_json::to_string(&result).map_err(api_error)
230     }
231 
232     async fn vm_pause(&self) -> Result<()> {
233         self.vm_action(VmAction::Pause).await.map(|_| ())
234     }
235 
236     async fn vm_power_button(&self) -> Result<()> {
237         self.vm_action(VmAction::PowerButton).await.map(|_| ())
238     }
239 
240     async fn vm_reboot(&self) -> Result<()> {
241         self.vm_action(VmAction::Reboot).await.map(|_| ())
242     }
243 
244     async fn vm_remove_device(&self, vm_remove_device: String) -> Result<()> {
245         let vm_remove_device = serde_json::from_str(&vm_remove_device).map_err(api_error)?;
246         self.vm_action(VmAction::RemoveDevice(Arc::new(vm_remove_device)))
247             .await
248             .map(|_| ())
249     }
250 
251     async fn vm_resize(&self, vm_resize: String) -> Result<()> {
252         let vm_resize = serde_json::from_str(&vm_resize).map_err(api_error)?;
253         self.vm_action(VmAction::Resize(Arc::new(vm_resize)))
254             .await
255             .map(|_| ())
256     }
257 
258     async fn vm_resize_zone(&self, vm_resize_zone: String) -> Result<()> {
259         let vm_resize_zone = serde_json::from_str(&vm_resize_zone).map_err(api_error)?;
260         self.vm_action(VmAction::ResizeZone(Arc::new(vm_resize_zone)))
261             .await
262             .map(|_| ())
263     }
264 
265     async fn vm_restore(&self, restore_config: String) -> Result<()> {
266         let restore_config = serde_json::from_str(&restore_config).map_err(api_error)?;
267         self.vm_action(VmAction::Restore(Arc::new(restore_config)))
268             .await
269             .map(|_| ())
270     }
271 
272     async fn vm_receive_migration(&self, receive_migration_data: String) -> Result<()> {
273         let receive_migration_data =
274             serde_json::from_str(&receive_migration_data).map_err(api_error)?;
275         self.vm_action(VmAction::ReceiveMigration(Arc::new(receive_migration_data)))
276             .await
277             .map(|_| ())
278     }
279 
280     async fn vm_send_migration(&self, send_migration_data: String) -> Result<()> {
281         let send_migration_data = serde_json::from_str(&send_migration_data).map_err(api_error)?;
282         self.vm_action(VmAction::SendMigration(Arc::new(send_migration_data)))
283             .await
284             .map(|_| ())
285     }
286 
287     async fn vm_resume(&self) -> Result<()> {
288         self.vm_action(VmAction::Resume).await.map(|_| ())
289     }
290 
291     async fn vm_shutdown(&self) -> Result<()> {
292         self.vm_action(VmAction::Shutdown).await.map(|_| ())
293     }
294 
295     async fn vm_snapshot(&self, vm_snapshot_config: String) -> Result<()> {
296         let vm_snapshot_config = serde_json::from_str(&vm_snapshot_config).map_err(api_error)?;
297         self.vm_action(VmAction::Snapshot(Arc::new(vm_snapshot_config)))
298             .await
299             .map(|_| ())
300     }
301 
302     // implementation of this function is provided by the `dbus_interface` macro
303     #[dbus_interface(signal)]
304     async fn event(ctxt: &zbus::SignalContext<'_>, event: Arc<String>) -> zbus::Result<()>;
305 }
306 
307 pub fn start_dbus_thread(
308     dbus_options: DBusApiOptions,
309     api_notifier: EventFd,
310     api_sender: Sender<ApiRequest>,
311     seccomp_action: &SeccompAction,
312     exit_evt: EventFd,
313     hypervisor_type: HypervisorType,
314 ) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> {
315     let dbus_iface = DBusApi::new(api_notifier, api_sender);
316     let (connection, iface_ref) = executor::block_on(async move {
317         let conn_builder = if dbus_options.system_bus {
318             ConnectionBuilder::system()?
319         } else {
320             ConnectionBuilder::session()?
321         };
322 
323         let conn = conn_builder
324             .internal_executor(false)
325             .name(dbus_options.service_name)?
326             .serve_at(dbus_options.object_path.as_str(), dbus_iface)?
327             .build()
328             .await?;
329 
330         let iface_ref = conn
331             .object_server()
332             .interface::<_, DBusApi>(dbus_options.object_path)
333             .await?;
334 
335         Ok((conn, iface_ref))
336     })
337     .map_err(VmmError::CreateDBusSession)?;
338 
339     let (send_shutdown, recv_shutdown) = oneshot::channel::<()>();
340     let (send_done, recv_done) = oneshot::channel::<()>();
341 
342     // Retrieve seccomp filter for API thread
343     let api_seccomp_filter = get_seccomp_filter(seccomp_action, Thread::DBusApi, hypervisor_type)
344         .map_err(VmmError::CreateSeccompFilter)?;
345 
346     let thread_join_handle = thread::Builder::new()
347         .name("dbus-thread".to_string())
348         .spawn(move || {
349             // Apply seccomp filter for API thread.
350             if !api_seccomp_filter.is_empty() {
351                 apply_filter(&api_seccomp_filter)
352                     .map_err(VmmError::ApplySeccompFilter)
353                     .map_err(|e| {
354                         error!("Error applying seccomp filter: {:?}", e);
355                         exit_evt.write(1).ok();
356                         e
357                     })?;
358             }
359 
360             std::panic::catch_unwind(AssertUnwindSafe(move || {
361                 executor::block_on(async move {
362                     let recv_shutdown = recv_shutdown.fuse();
363                     let executor_tick = futures::future::Fuse::terminated();
364                     futures::pin_mut!(recv_shutdown, executor_tick);
365                     executor_tick.set(connection.executor().tick().fuse());
366 
367                     loop {
368                         futures::select! {
369                             _ = executor_tick => executor_tick.set(connection.executor().tick().fuse()),
370                             _ = recv_shutdown => {
371                                 send_done.send(()).ok();
372                                 break;
373                             },
374                             ret = dbus_options.event_monitor_rx.recv_async() => {
375                                 if let Ok(event) = ret {
376                                     DBusApi::event(iface_ref.signal_context(), event).await.ok();
377                                 }
378                             }
379                         }
380                     }
381                 })
382             }))
383             .map_err(|_| {
384                 error!("dbus-api thread panicked");
385                 exit_evt.write(1).ok()
386             })
387             .ok();
388 
389             Ok(())
390         })
391         .map_err(VmmError::DBusThreadSpawn)?;
392 
393     Ok((thread_join_handle, (send_shutdown, recv_done)))
394 }
395