1 // Copyright © 2023 Sartura Ltd. 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 use std::panic::AssertUnwindSafe; 6 use std::sync::mpsc::Sender; 7 use std::sync::Arc; 8 use std::thread; 9 10 use futures::channel::oneshot; 11 use futures::{executor, FutureExt}; 12 use hypervisor::HypervisorType; 13 use seccompiler::{apply_filter, SeccompAction}; 14 use vmm_sys_util::eventfd::EventFd; 15 use zbus::fdo::{self, Result}; 16 use zbus::zvariant::Optional; 17 use zbus::{interface, ConnectionBuilder}; 18 19 use super::{ApiAction, ApiRequest}; 20 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))] 21 use crate::api::VmCoredump; 22 use crate::api::{ 23 AddDisk, Body, VmAddDevice, VmAddFs, VmAddNet, VmAddPmem, VmAddUserDevice, VmAddVdpa, 24 VmAddVsock, VmBoot, VmCounters, VmCreate, VmDelete, VmInfo, VmPause, VmPowerButton, VmReboot, 25 VmReceiveMigration, VmRemoveDevice, VmResize, VmResizeZone, VmRestore, VmResume, 26 VmSendMigration, VmShutdown, VmSnapshot, VmmPing, VmmShutdown, 27 }; 28 use crate::seccomp_filters::{get_seccomp_filter, Thread}; 29 use crate::{Error as VmmError, Result as VmmResult}; 30 use crate::{NetConfig, VmConfig}; 31 32 pub type DBusApiShutdownChannels = (oneshot::Sender<()>, oneshot::Receiver<()>); 33 34 pub struct DBusApiOptions { 35 pub service_name: String, 36 pub object_path: String, 37 pub system_bus: bool, 38 pub event_monitor_rx: flume::Receiver<Arc<String>>, 39 } 40 41 pub struct DBusApi { 42 api_notifier: EventFd, 43 api_sender: futures::lock::Mutex<Sender<ApiRequest>>, 44 } 45 46 fn api_error(error: impl std::fmt::Debug + std::fmt::Display) -> fdo::Error { 47 fdo::Error::Failed(format!("{error}")) 48 } 49 50 // This method is intended to ensure that the DBusApi thread has enough time to 51 // send a response to the VmmShutdown method call before it is terminated. If 52 // this step is omitted, the thread may be terminated before it can send a 53 // response, resulting in an error message stating that the message recipient 54 // disconnected from the message bus without providing a reply. 55 pub fn dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels) { 56 let (send_shutdown, mut recv_done) = ch; 57 58 // send the shutdown signal and return 59 // if it errors out 60 if send_shutdown.send(()).is_err() { 61 return; 62 } 63 64 // loop until `recv_err` errors out 65 // or as long as the return value indicates 66 // "immediately stale" (None) 67 while let Ok(None) = recv_done.try_recv() {} 68 } 69 70 impl DBusApi { 71 pub fn new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self { 72 Self { 73 api_notifier, 74 api_sender: futures::lock::Mutex::new(api_sender), 75 } 76 } 77 78 async fn clone_api_sender(&self) -> Sender<ApiRequest> { 79 // lock the async mutex, clone the `Sender` and then immediately 80 // drop the MutexGuard so that other tasks can clone the 81 // `Sender` as well 82 self.api_sender.lock().await.clone() 83 } 84 85 fn clone_api_notifier(&self) -> Result<EventFd> { 86 self.api_notifier 87 .try_clone() 88 .map_err(|err| fdo::Error::IOError(format!("{err:?}"))) 89 } 90 91 async fn vm_action<Action: ApiAction<ResponseBody = Option<Body>>>( 92 &self, 93 action: &'static Action, 94 body: Action::RequestBody, 95 ) -> Result<Optional<String>> { 96 let api_sender = self.clone_api_sender().await; 97 let api_notifier = self.clone_api_notifier()?; 98 99 let result = blocking::unblock(move || action.send(api_notifier, api_sender, body)) 100 .await 101 .map_err(api_error)? 102 // We're using `from_utf8_lossy` here to not deal with the 103 // error case of `from_utf8` as we know that `b.body` is valid JSON. 104 .map(|b| String::from_utf8_lossy(&b.body).to_string()); 105 106 Ok(result.into()) 107 } 108 } 109 110 #[interface(name = "org.cloudhypervisor.DBusApi1")] 111 impl DBusApi { 112 async fn vmm_ping(&self) -> Result<String> { 113 let api_sender = self.clone_api_sender().await; 114 let api_notifier = self.clone_api_notifier()?; 115 116 let result = blocking::unblock(move || VmmPing.send(api_notifier, api_sender, ())) 117 .await 118 .map_err(api_error)?; 119 serde_json::to_string(&result).map_err(api_error) 120 } 121 122 async fn vmm_shutdown(&self) -> Result<()> { 123 let api_sender = self.clone_api_sender().await; 124 let api_notifier = self.clone_api_notifier()?; 125 126 blocking::unblock(move || VmmShutdown.send(api_notifier, api_sender, ())) 127 .await 128 .map_err(api_error) 129 } 130 131 async fn vm_add_device(&self, device_config: String) -> Result<Optional<String>> { 132 let device_config = serde_json::from_str(&device_config).map_err(api_error)?; 133 self.vm_action(&VmAddDevice, device_config).await 134 } 135 136 async fn vm_add_disk(&self, disk_config: String) -> Result<Optional<String>> { 137 let disk_config = serde_json::from_str(&disk_config).map_err(api_error)?; 138 self.vm_action(&AddDisk, disk_config).await 139 } 140 141 async fn vm_add_fs(&self, fs_config: String) -> Result<Optional<String>> { 142 let fs_config = serde_json::from_str(&fs_config).map_err(api_error)?; 143 self.vm_action(&VmAddFs, fs_config).await 144 } 145 146 async fn vm_add_net(&self, net_config: String) -> Result<Optional<String>> { 147 let mut net_config: NetConfig = serde_json::from_str(&net_config).map_err(api_error)?; 148 if net_config.fds.is_some() { 149 warn!("Ignoring FDs sent via the D-Bus request body"); 150 net_config.fds = None; 151 } 152 self.vm_action(&VmAddNet, net_config).await 153 } 154 155 async fn vm_add_pmem(&self, pmem_config: String) -> Result<Optional<String>> { 156 let pmem_config = serde_json::from_str(&pmem_config).map_err(api_error)?; 157 self.vm_action(&VmAddPmem, pmem_config).await 158 } 159 160 async fn vm_add_user_device(&self, vm_add_user_device: String) -> Result<Optional<String>> { 161 let vm_add_user_device = serde_json::from_str(&vm_add_user_device).map_err(api_error)?; 162 self.vm_action(&VmAddUserDevice, vm_add_user_device).await 163 } 164 165 async fn vm_add_vdpa(&self, vdpa_config: String) -> Result<Optional<String>> { 166 let vdpa_config = serde_json::from_str(&vdpa_config).map_err(api_error)?; 167 self.vm_action(&VmAddVdpa, vdpa_config).await 168 } 169 170 async fn vm_add_vsock(&self, vsock_config: String) -> Result<Optional<String>> { 171 let vsock_config = serde_json::from_str(&vsock_config).map_err(api_error)?; 172 self.vm_action(&VmAddVsock, vsock_config).await 173 } 174 175 async fn vm_boot(&self) -> Result<()> { 176 self.vm_action(&VmBoot, ()).await.map(|_| ()) 177 } 178 179 #[allow(unused_variables)] 180 // zbus doesn't support cfg attributes on interface methods 181 // as a workaround, we make the *call to the internal API* conditionally 182 // compile and return an error on unsupported platforms. 183 async fn vm_coredump(&self, vm_coredump_data: String) -> Result<()> { 184 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))] 185 { 186 let vm_coredump_data = serde_json::from_str(&vm_coredump_data).map_err(api_error)?; 187 self.vm_action(&VmCoredump, vm_coredump_data) 188 .await 189 .map(|_| ()) 190 } 191 192 #[cfg(not(all(target_arch = "x86_64", feature = "guest_debug")))] 193 Err(api_error( 194 "VmCoredump only works on x86_64 with the `guest_debug` feature enabled", 195 )) 196 } 197 198 async fn vm_counters(&self) -> Result<Optional<String>> { 199 self.vm_action(&VmCounters, ()).await 200 } 201 202 async fn vm_create(&self, vm_config: String) -> Result<()> { 203 let api_sender = self.clone_api_sender().await; 204 let api_notifier = self.clone_api_notifier()?; 205 206 let mut vm_config: Box<VmConfig> = serde_json::from_str(&vm_config).map_err(api_error)?; 207 208 if let Some(ref mut nets) = vm_config.net { 209 if nets.iter().any(|net| net.fds.is_some()) { 210 warn!("Ignoring FDs sent via the D-Bus request body"); 211 } 212 for net in nets { 213 net.fds = None; 214 } 215 } 216 217 blocking::unblock(move || VmCreate.send(api_notifier, api_sender, vm_config)) 218 .await 219 .map_err(api_error)?; 220 221 Ok(()) 222 } 223 224 async fn vm_delete(&self) -> Result<()> { 225 self.vm_action(&VmDelete, ()).await.map(|_| ()) 226 } 227 228 async fn vm_info(&self) -> Result<String> { 229 let api_sender = self.clone_api_sender().await; 230 let api_notifier = self.clone_api_notifier()?; 231 232 let result = blocking::unblock(move || VmInfo.send(api_notifier, api_sender, ())) 233 .await 234 .map_err(api_error)?; 235 serde_json::to_string(&result).map_err(api_error) 236 } 237 238 async fn vm_pause(&self) -> Result<()> { 239 self.vm_action(&VmPause, ()).await.map(|_| ()) 240 } 241 242 async fn vm_power_button(&self) -> Result<()> { 243 self.vm_action(&VmPowerButton, ()).await.map(|_| ()) 244 } 245 246 async fn vm_reboot(&self) -> Result<()> { 247 self.vm_action(&VmReboot, ()).await.map(|_| ()) 248 } 249 250 async fn vm_remove_device(&self, vm_remove_device: String) -> Result<()> { 251 let vm_remove_device = serde_json::from_str(&vm_remove_device).map_err(api_error)?; 252 self.vm_action(&VmRemoveDevice, vm_remove_device) 253 .await 254 .map(|_| ()) 255 } 256 257 async fn vm_resize(&self, vm_resize: String) -> Result<()> { 258 let vm_resize = serde_json::from_str(&vm_resize).map_err(api_error)?; 259 self.vm_action(&VmResize, vm_resize).await.map(|_| ()) 260 } 261 262 async fn vm_resize_zone(&self, vm_resize_zone: String) -> Result<()> { 263 let vm_resize_zone = serde_json::from_str(&vm_resize_zone).map_err(api_error)?; 264 self.vm_action(&VmResizeZone, vm_resize_zone) 265 .await 266 .map(|_| ()) 267 } 268 269 async fn vm_restore(&self, restore_config: String) -> Result<()> { 270 let restore_config = serde_json::from_str(&restore_config).map_err(api_error)?; 271 self.vm_action(&VmRestore, restore_config).await.map(|_| ()) 272 } 273 274 async fn vm_receive_migration(&self, receive_migration_data: String) -> Result<()> { 275 let receive_migration_data = 276 serde_json::from_str(&receive_migration_data).map_err(api_error)?; 277 self.vm_action(&VmReceiveMigration, receive_migration_data) 278 .await 279 .map(|_| ()) 280 } 281 282 async fn vm_send_migration(&self, send_migration_data: String) -> Result<()> { 283 let send_migration_data = serde_json::from_str(&send_migration_data).map_err(api_error)?; 284 self.vm_action(&VmSendMigration, send_migration_data) 285 .await 286 .map(|_| ()) 287 } 288 289 async fn vm_resume(&self) -> Result<()> { 290 self.vm_action(&VmResume, ()).await.map(|_| ()) 291 } 292 293 async fn vm_shutdown(&self) -> Result<()> { 294 self.vm_action(&VmShutdown, ()).await.map(|_| ()) 295 } 296 297 async fn vm_snapshot(&self, vm_snapshot_config: String) -> Result<()> { 298 let vm_snapshot_config = serde_json::from_str(&vm_snapshot_config).map_err(api_error)?; 299 self.vm_action(&VmSnapshot, vm_snapshot_config) 300 .await 301 .map(|_| ()) 302 } 303 304 // implementation of this function is provided by the `#[zbus(signal)]` macro call 305 #[zbus(signal)] 306 async fn event(ctxt: &zbus::SignalContext<'_>, event: Arc<String>) -> zbus::Result<()>; 307 } 308 309 pub fn start_dbus_thread( 310 dbus_options: DBusApiOptions, 311 api_notifier: EventFd, 312 api_sender: Sender<ApiRequest>, 313 seccomp_action: &SeccompAction, 314 exit_evt: EventFd, 315 hypervisor_type: HypervisorType, 316 ) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> { 317 let dbus_iface = DBusApi::new(api_notifier, api_sender); 318 let (connection, iface_ref) = executor::block_on(async move { 319 let conn_builder = if dbus_options.system_bus { 320 ConnectionBuilder::system()? 321 } else { 322 ConnectionBuilder::session()? 323 }; 324 325 let conn = conn_builder 326 .internal_executor(false) 327 .name(dbus_options.service_name)? 328 .serve_at(dbus_options.object_path.as_str(), dbus_iface)? 329 .build() 330 .await?; 331 332 let iface_ref = conn 333 .object_server() 334 .interface::<_, DBusApi>(dbus_options.object_path) 335 .await?; 336 337 Ok((conn, iface_ref)) 338 }) 339 .map_err(VmmError::CreateDBusSession)?; 340 341 let (send_shutdown, recv_shutdown) = oneshot::channel::<()>(); 342 let (send_done, recv_done) = oneshot::channel::<()>(); 343 344 // Retrieve seccomp filter for API thread 345 let api_seccomp_filter = get_seccomp_filter(seccomp_action, Thread::DBusApi, hypervisor_type) 346 .map_err(VmmError::CreateSeccompFilter)?; 347 348 let thread_join_handle = thread::Builder::new() 349 .name("dbus-thread".to_string()) 350 .spawn(move || { 351 // Apply seccomp filter for API thread. 352 if !api_seccomp_filter.is_empty() { 353 apply_filter(&api_seccomp_filter) 354 .map_err(VmmError::ApplySeccompFilter) 355 .map_err(|e| { 356 error!("Error applying seccomp filter: {:?}", e); 357 exit_evt.write(1).ok(); 358 e 359 })?; 360 } 361 362 std::panic::catch_unwind(AssertUnwindSafe(move || { 363 executor::block_on(async move { 364 let recv_shutdown = recv_shutdown.fuse(); 365 let executor_tick = futures::future::Fuse::terminated(); 366 futures::pin_mut!(recv_shutdown, executor_tick); 367 executor_tick.set(connection.executor().tick().fuse()); 368 369 loop { 370 futures::select! { 371 _ = executor_tick => executor_tick.set(connection.executor().tick().fuse()), 372 _ = recv_shutdown => { 373 send_done.send(()).ok(); 374 break; 375 }, 376 ret = dbus_options.event_monitor_rx.recv_async() => { 377 if let Ok(event) = ret { 378 DBusApi::event(iface_ref.signal_context(), event).await.ok(); 379 } 380 } 381 } 382 } 383 }) 384 })) 385 .map_err(|_| { 386 error!("dbus-api thread panicked"); 387 exit_evt.write(1).ok() 388 }) 389 .ok(); 390 391 Ok(()) 392 }) 393 .map_err(VmmError::DBusThreadSpawn)?; 394 395 Ok((thread_join_handle, (send_shutdown, recv_done))) 396 } 397