1 // Copyright © 2023 Sartura Ltd.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4 //
5 use std::panic::AssertUnwindSafe;
6 use std::sync::mpsc::Sender;
7 use std::sync::Arc;
8 use std::thread;
9
10 use futures::channel::oneshot;
11 use futures::{executor, FutureExt};
12 use hypervisor::HypervisorType;
13 use seccompiler::{apply_filter, SeccompAction};
14 use vmm_sys_util::eventfd::EventFd;
15 use zbus::connection::Builder;
16 use zbus::fdo::{self, Result};
17 use zbus::interface;
18 use zbus::zvariant::Optional;
19
20 use super::{ApiAction, ApiRequest};
21 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))]
22 use crate::api::VmCoredump;
23 use crate::api::{
24 AddDisk, Body, VmAddDevice, VmAddFs, VmAddNet, VmAddPmem, VmAddUserDevice, VmAddVdpa,
25 VmAddVsock, VmBoot, VmCounters, VmCreate, VmDelete, VmInfo, VmPause, VmPowerButton, VmReboot,
26 VmReceiveMigration, VmRemoveDevice, VmResize, VmResizeZone, VmRestore, VmResume,
27 VmSendMigration, VmShutdown, VmSnapshot, VmmPing, VmmShutdown,
28 };
29 use crate::seccomp_filters::{get_seccomp_filter, Thread};
30 use crate::{Error as VmmError, NetConfig, Result as VmmResult, VmConfig};
31
32 pub type DBusApiShutdownChannels = (oneshot::Sender<()>, oneshot::Receiver<()>);
33
34 pub struct DBusApiOptions {
35 pub service_name: String,
36 pub object_path: String,
37 pub system_bus: bool,
38 pub event_monitor_rx: flume::Receiver<Arc<String>>,
39 }
40
41 pub struct DBusApi {
42 api_notifier: EventFd,
43 api_sender: futures::lock::Mutex<Sender<ApiRequest>>,
44 }
45
api_error(error: impl std::fmt::Debug + std::fmt::Display) -> fdo::Error46 fn api_error(error: impl std::fmt::Debug + std::fmt::Display) -> fdo::Error {
47 fdo::Error::Failed(format!("{error}"))
48 }
49
50 // This method is intended to ensure that the DBusApi thread has enough time to
51 // send a response to the VmmShutdown method call before it is terminated. If
52 // this step is omitted, the thread may be terminated before it can send a
53 // response, resulting in an error message stating that the message recipient
54 // disconnected from the message bus without providing a reply.
dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels)55 pub fn dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels) {
56 let (send_shutdown, mut recv_done) = ch;
57
58 // send the shutdown signal and return
59 // if it errors out
60 if send_shutdown.send(()).is_err() {
61 return;
62 }
63
64 // loop until `recv_err` errors out
65 // or as long as the return value indicates
66 // "immediately stale" (None)
67 while let Ok(None) = recv_done.try_recv() {}
68 }
69
70 impl DBusApi {
new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self71 pub fn new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self {
72 Self {
73 api_notifier,
74 api_sender: futures::lock::Mutex::new(api_sender),
75 }
76 }
77
clone_api_sender(&self) -> Sender<ApiRequest>78 async fn clone_api_sender(&self) -> Sender<ApiRequest> {
79 // lock the async mutex, clone the `Sender` and then immediately
80 // drop the MutexGuard so that other tasks can clone the
81 // `Sender` as well
82 self.api_sender.lock().await.clone()
83 }
84
clone_api_notifier(&self) -> Result<EventFd>85 fn clone_api_notifier(&self) -> Result<EventFd> {
86 self.api_notifier
87 .try_clone()
88 .map_err(|err| fdo::Error::IOError(format!("{err:?}")))
89 }
90
vm_action<Action: ApiAction<ResponseBody = Option<Body>>>( &self, action: &'static Action, body: Action::RequestBody, ) -> Result<Optional<String>>91 async fn vm_action<Action: ApiAction<ResponseBody = Option<Body>>>(
92 &self,
93 action: &'static Action,
94 body: Action::RequestBody,
95 ) -> Result<Optional<String>> {
96 let api_sender = self.clone_api_sender().await;
97 let api_notifier = self.clone_api_notifier()?;
98
99 let result = blocking::unblock(move || action.send(api_notifier, api_sender, body))
100 .await
101 .map_err(api_error)?
102 // We're using `from_utf8_lossy` here to not deal with the
103 // error case of `from_utf8` as we know that `b.body` is valid JSON.
104 .map(|b| String::from_utf8_lossy(&b.body).to_string());
105
106 Ok(result.into())
107 }
108 }
109
110 #[interface(name = "org.cloudhypervisor.DBusApi1")]
111 impl DBusApi {
vmm_ping(&self) -> Result<String>112 async fn vmm_ping(&self) -> Result<String> {
113 let api_sender = self.clone_api_sender().await;
114 let api_notifier = self.clone_api_notifier()?;
115
116 let result = blocking::unblock(move || VmmPing.send(api_notifier, api_sender, ()))
117 .await
118 .map_err(api_error)?;
119 serde_json::to_string(&result).map_err(api_error)
120 }
121
vmm_shutdown(&self) -> Result<()>122 async fn vmm_shutdown(&self) -> Result<()> {
123 let api_sender = self.clone_api_sender().await;
124 let api_notifier = self.clone_api_notifier()?;
125
126 blocking::unblock(move || VmmShutdown.send(api_notifier, api_sender, ()))
127 .await
128 .map_err(api_error)
129 }
130
vm_add_device(&self, device_config: String) -> Result<Optional<String>>131 async fn vm_add_device(&self, device_config: String) -> Result<Optional<String>> {
132 let device_config = serde_json::from_str(&device_config).map_err(api_error)?;
133 self.vm_action(&VmAddDevice, device_config).await
134 }
135
vm_add_disk(&self, disk_config: String) -> Result<Optional<String>>136 async fn vm_add_disk(&self, disk_config: String) -> Result<Optional<String>> {
137 let disk_config = serde_json::from_str(&disk_config).map_err(api_error)?;
138 self.vm_action(&AddDisk, disk_config).await
139 }
140
vm_add_fs(&self, fs_config: String) -> Result<Optional<String>>141 async fn vm_add_fs(&self, fs_config: String) -> Result<Optional<String>> {
142 let fs_config = serde_json::from_str(&fs_config).map_err(api_error)?;
143 self.vm_action(&VmAddFs, fs_config).await
144 }
145
vm_add_net(&self, net_config: String) -> Result<Optional<String>>146 async fn vm_add_net(&self, net_config: String) -> Result<Optional<String>> {
147 let mut net_config: NetConfig = serde_json::from_str(&net_config).map_err(api_error)?;
148 if net_config.fds.is_some() {
149 warn!("Ignoring FDs sent via the D-Bus request body");
150 net_config.fds = None;
151 }
152 self.vm_action(&VmAddNet, net_config).await
153 }
154
vm_add_pmem(&self, pmem_config: String) -> Result<Optional<String>>155 async fn vm_add_pmem(&self, pmem_config: String) -> Result<Optional<String>> {
156 let pmem_config = serde_json::from_str(&pmem_config).map_err(api_error)?;
157 self.vm_action(&VmAddPmem, pmem_config).await
158 }
159
vm_add_user_device(&self, vm_add_user_device: String) -> Result<Optional<String>>160 async fn vm_add_user_device(&self, vm_add_user_device: String) -> Result<Optional<String>> {
161 let vm_add_user_device = serde_json::from_str(&vm_add_user_device).map_err(api_error)?;
162 self.vm_action(&VmAddUserDevice, vm_add_user_device).await
163 }
164
vm_add_vdpa(&self, vdpa_config: String) -> Result<Optional<String>>165 async fn vm_add_vdpa(&self, vdpa_config: String) -> Result<Optional<String>> {
166 let vdpa_config = serde_json::from_str(&vdpa_config).map_err(api_error)?;
167 self.vm_action(&VmAddVdpa, vdpa_config).await
168 }
169
vm_add_vsock(&self, vsock_config: String) -> Result<Optional<String>>170 async fn vm_add_vsock(&self, vsock_config: String) -> Result<Optional<String>> {
171 let vsock_config = serde_json::from_str(&vsock_config).map_err(api_error)?;
172 self.vm_action(&VmAddVsock, vsock_config).await
173 }
174
vm_boot(&self) -> Result<()>175 async fn vm_boot(&self) -> Result<()> {
176 self.vm_action(&VmBoot, ()).await.map(|_| ())
177 }
178
179 #[allow(unused_variables)]
180 // zbus doesn't support cfg attributes on interface methods
181 // as a workaround, we make the *call to the internal API* conditionally
182 // compile and return an error on unsupported platforms.
vm_coredump(&self, vm_coredump_data: String) -> Result<()>183 async fn vm_coredump(&self, vm_coredump_data: String) -> Result<()> {
184 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))]
185 {
186 let vm_coredump_data = serde_json::from_str(&vm_coredump_data).map_err(api_error)?;
187 self.vm_action(&VmCoredump, vm_coredump_data)
188 .await
189 .map(|_| ())
190 }
191
192 #[cfg(not(all(target_arch = "x86_64", feature = "guest_debug")))]
193 Err(api_error(
194 "VmCoredump only works on x86_64 with the `guest_debug` feature enabled",
195 ))
196 }
197
vm_counters(&self) -> Result<Optional<String>>198 async fn vm_counters(&self) -> Result<Optional<String>> {
199 self.vm_action(&VmCounters, ()).await
200 }
201
vm_create(&self, vm_config: String) -> Result<()>202 async fn vm_create(&self, vm_config: String) -> Result<()> {
203 let api_sender = self.clone_api_sender().await;
204 let api_notifier = self.clone_api_notifier()?;
205
206 let mut vm_config: Box<VmConfig> = serde_json::from_str(&vm_config).map_err(api_error)?;
207
208 if let Some(ref mut nets) = vm_config.net {
209 if nets.iter().any(|net| net.fds.is_some()) {
210 warn!("Ignoring FDs sent via the D-Bus request body");
211 }
212 for net in nets {
213 net.fds = None;
214 }
215 }
216
217 blocking::unblock(move || VmCreate.send(api_notifier, api_sender, vm_config))
218 .await
219 .map_err(api_error)?;
220
221 Ok(())
222 }
223
vm_delete(&self) -> Result<()>224 async fn vm_delete(&self) -> Result<()> {
225 self.vm_action(&VmDelete, ()).await.map(|_| ())
226 }
227
vm_info(&self) -> Result<String>228 async fn vm_info(&self) -> Result<String> {
229 let api_sender = self.clone_api_sender().await;
230 let api_notifier = self.clone_api_notifier()?;
231
232 let result = blocking::unblock(move || VmInfo.send(api_notifier, api_sender, ()))
233 .await
234 .map_err(api_error)?;
235 serde_json::to_string(&result).map_err(api_error)
236 }
237
vm_pause(&self) -> Result<()>238 async fn vm_pause(&self) -> Result<()> {
239 self.vm_action(&VmPause, ()).await.map(|_| ())
240 }
241
vm_power_button(&self) -> Result<()>242 async fn vm_power_button(&self) -> Result<()> {
243 self.vm_action(&VmPowerButton, ()).await.map(|_| ())
244 }
245
vm_reboot(&self) -> Result<()>246 async fn vm_reboot(&self) -> Result<()> {
247 self.vm_action(&VmReboot, ()).await.map(|_| ())
248 }
249
vm_remove_device(&self, vm_remove_device: String) -> Result<()>250 async fn vm_remove_device(&self, vm_remove_device: String) -> Result<()> {
251 let vm_remove_device = serde_json::from_str(&vm_remove_device).map_err(api_error)?;
252 self.vm_action(&VmRemoveDevice, vm_remove_device)
253 .await
254 .map(|_| ())
255 }
256
vm_resize(&self, vm_resize: String) -> Result<()>257 async fn vm_resize(&self, vm_resize: String) -> Result<()> {
258 let vm_resize = serde_json::from_str(&vm_resize).map_err(api_error)?;
259 self.vm_action(&VmResize, vm_resize).await.map(|_| ())
260 }
261
vm_resize_zone(&self, vm_resize_zone: String) -> Result<()>262 async fn vm_resize_zone(&self, vm_resize_zone: String) -> Result<()> {
263 let vm_resize_zone = serde_json::from_str(&vm_resize_zone).map_err(api_error)?;
264 self.vm_action(&VmResizeZone, vm_resize_zone)
265 .await
266 .map(|_| ())
267 }
268
vm_restore(&self, restore_config: String) -> Result<()>269 async fn vm_restore(&self, restore_config: String) -> Result<()> {
270 let restore_config = serde_json::from_str(&restore_config).map_err(api_error)?;
271 self.vm_action(&VmRestore, restore_config).await.map(|_| ())
272 }
273
vm_receive_migration(&self, receive_migration_data: String) -> Result<()>274 async fn vm_receive_migration(&self, receive_migration_data: String) -> Result<()> {
275 let receive_migration_data =
276 serde_json::from_str(&receive_migration_data).map_err(api_error)?;
277 self.vm_action(&VmReceiveMigration, receive_migration_data)
278 .await
279 .map(|_| ())
280 }
281
vm_send_migration(&self, send_migration_data: String) -> Result<()>282 async fn vm_send_migration(&self, send_migration_data: String) -> Result<()> {
283 let send_migration_data = serde_json::from_str(&send_migration_data).map_err(api_error)?;
284 self.vm_action(&VmSendMigration, send_migration_data)
285 .await
286 .map(|_| ())
287 }
288
vm_resume(&self) -> Result<()>289 async fn vm_resume(&self) -> Result<()> {
290 self.vm_action(&VmResume, ()).await.map(|_| ())
291 }
292
vm_shutdown(&self) -> Result<()>293 async fn vm_shutdown(&self) -> Result<()> {
294 self.vm_action(&VmShutdown, ()).await.map(|_| ())
295 }
296
vm_snapshot(&self, vm_snapshot_config: String) -> Result<()>297 async fn vm_snapshot(&self, vm_snapshot_config: String) -> Result<()> {
298 let vm_snapshot_config = serde_json::from_str(&vm_snapshot_config).map_err(api_error)?;
299 self.vm_action(&VmSnapshot, vm_snapshot_config)
300 .await
301 .map(|_| ())
302 }
303
304 // implementation of this function is provided by the `#[zbus(signal)]` macro call
305 #[zbus(signal)]
event( ctxt: &zbus::object_server::SignalEmitter<'_>, event: Arc<String>, ) -> zbus::Result<()>306 async fn event(
307 ctxt: &zbus::object_server::SignalEmitter<'_>,
308 event: Arc<String>,
309 ) -> zbus::Result<()>;
310 }
311
start_dbus_thread( dbus_options: DBusApiOptions, api_notifier: EventFd, api_sender: Sender<ApiRequest>, seccomp_action: &SeccompAction, exit_evt: EventFd, hypervisor_type: HypervisorType, ) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)>312 pub fn start_dbus_thread(
313 dbus_options: DBusApiOptions,
314 api_notifier: EventFd,
315 api_sender: Sender<ApiRequest>,
316 seccomp_action: &SeccompAction,
317 exit_evt: EventFd,
318 hypervisor_type: HypervisorType,
319 ) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> {
320 let dbus_iface = DBusApi::new(api_notifier, api_sender);
321 let (connection, iface_ref) = executor::block_on(async move {
322 let conn_builder = if dbus_options.system_bus {
323 Builder::system()?
324 } else {
325 Builder::session()?
326 };
327
328 let conn = conn_builder
329 .internal_executor(false)
330 .name(dbus_options.service_name)?
331 .serve_at(dbus_options.object_path.as_str(), dbus_iface)?
332 .build()
333 .await?;
334
335 let iface_ref = conn
336 .object_server()
337 .interface::<_, DBusApi>(dbus_options.object_path)
338 .await?;
339
340 Ok((conn, iface_ref))
341 })
342 .map_err(VmmError::CreateDBusSession)?;
343
344 let (send_shutdown, recv_shutdown) = oneshot::channel::<()>();
345 let (send_done, recv_done) = oneshot::channel::<()>();
346
347 // Retrieve seccomp filter for API thread
348 let api_seccomp_filter = get_seccomp_filter(seccomp_action, Thread::DBusApi, hypervisor_type)
349 .map_err(VmmError::CreateSeccompFilter)?;
350
351 let thread_join_handle = thread::Builder::new()
352 .name("dbus-thread".to_string())
353 .spawn(move || {
354 // Apply seccomp filter for API thread.
355 if !api_seccomp_filter.is_empty() {
356 apply_filter(&api_seccomp_filter)
357 .map_err(VmmError::ApplySeccompFilter)
358 .map_err(|e| {
359 error!("Error applying seccomp filter: {:?}", e);
360 exit_evt.write(1).ok();
361 e
362 })?;
363 }
364
365 std::panic::catch_unwind(AssertUnwindSafe(move || {
366 executor::block_on(async move {
367 let recv_shutdown = recv_shutdown.fuse();
368 let executor_tick = futures::future::Fuse::terminated();
369 futures::pin_mut!(recv_shutdown, executor_tick);
370 executor_tick.set(connection.executor().tick().fuse());
371
372 loop {
373 futures::select! {
374 _ = executor_tick => executor_tick.set(connection.executor().tick().fuse()),
375 _ = recv_shutdown => {
376 send_done.send(()).ok();
377 break;
378 },
379 ret = dbus_options.event_monitor_rx.recv_async() => {
380 if let Ok(event) = ret {
381 DBusApi::event(iface_ref.signal_emitter(), event).await.ok();
382 }
383 }
384 }
385 }
386 })
387 }))
388 .map_err(|_| {
389 error!("dbus-api thread panicked");
390 exit_evt.write(1).ok()
391 })
392 .ok();
393
394 Ok(())
395 })
396 .map_err(VmmError::DBusThreadSpawn)?;
397
398 Ok((thread_join_handle, (send_shutdown, recv_done)))
399 }
400