1 // Copyright © 2023 Sartura Ltd. 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 use std::panic::AssertUnwindSafe; 6 use std::sync::mpsc::Sender; 7 use std::sync::Arc; 8 use std::thread; 9 10 use futures::channel::oneshot; 11 use futures::{executor, FutureExt}; 12 use hypervisor::HypervisorType; 13 use seccompiler::{apply_filter, SeccompAction}; 14 use vmm_sys_util::eventfd::EventFd; 15 use zbus::connection::Builder; 16 use zbus::fdo::{self, Result}; 17 use zbus::interface; 18 use zbus::zvariant::Optional; 19 20 use super::{ApiAction, ApiRequest}; 21 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))] 22 use crate::api::VmCoredump; 23 use crate::api::{ 24 AddDisk, Body, VmAddDevice, VmAddFs, VmAddNet, VmAddPmem, VmAddUserDevice, VmAddVdpa, 25 VmAddVsock, VmBoot, VmCounters, VmCreate, VmDelete, VmInfo, VmPause, VmPowerButton, VmReboot, 26 VmReceiveMigration, VmRemoveDevice, VmResize, VmResizeZone, VmRestore, VmResume, 27 VmSendMigration, VmShutdown, VmSnapshot, VmmPing, VmmShutdown, 28 }; 29 use crate::seccomp_filters::{get_seccomp_filter, Thread}; 30 use crate::{Error as VmmError, NetConfig, Result as VmmResult, VmConfig}; 31 32 pub type DBusApiShutdownChannels = (oneshot::Sender<()>, oneshot::Receiver<()>); 33 34 pub struct DBusApiOptions { 35 pub service_name: String, 36 pub object_path: String, 37 pub system_bus: bool, 38 pub event_monitor_rx: flume::Receiver<Arc<String>>, 39 } 40 41 pub struct DBusApi { 42 api_notifier: EventFd, 43 api_sender: futures::lock::Mutex<Sender<ApiRequest>>, 44 } 45 46 fn api_error(error: impl std::fmt::Debug + std::fmt::Display) -> fdo::Error { 47 fdo::Error::Failed(format!("{error}")) 48 } 49 50 // This method is intended to ensure that the DBusApi thread has enough time to 51 // send a response to the VmmShutdown method call before it is terminated. If 52 // this step is omitted, the thread may be terminated before it can send a 53 // response, resulting in an error message stating that the message recipient 54 // disconnected from the message bus without providing a reply. 55 pub fn dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels) { 56 let (send_shutdown, mut recv_done) = ch; 57 58 // send the shutdown signal and return 59 // if it errors out 60 if send_shutdown.send(()).is_err() { 61 return; 62 } 63 64 // loop until `recv_err` errors out 65 // or as long as the return value indicates 66 // "immediately stale" (None) 67 while let Ok(None) = recv_done.try_recv() {} 68 } 69 70 impl DBusApi { 71 pub fn new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self { 72 Self { 73 api_notifier, 74 api_sender: futures::lock::Mutex::new(api_sender), 75 } 76 } 77 78 async fn clone_api_sender(&self) -> Sender<ApiRequest> { 79 // lock the async mutex, clone the `Sender` and then immediately 80 // drop the MutexGuard so that other tasks can clone the 81 // `Sender` as well 82 self.api_sender.lock().await.clone() 83 } 84 85 fn clone_api_notifier(&self) -> Result<EventFd> { 86 self.api_notifier 87 .try_clone() 88 .map_err(|err| fdo::Error::IOError(format!("{err:?}"))) 89 } 90 91 async fn vm_action<Action: ApiAction<ResponseBody = Option<Body>>>( 92 &self, 93 action: &'static Action, 94 body: Action::RequestBody, 95 ) -> Result<Optional<String>> { 96 let api_sender = self.clone_api_sender().await; 97 let api_notifier = self.clone_api_notifier()?; 98 99 let result = blocking::unblock(move || action.send(api_notifier, api_sender, body)) 100 .await 101 .map_err(api_error)? 102 // We're using `from_utf8_lossy` here to not deal with the 103 // error case of `from_utf8` as we know that `b.body` is valid JSON. 104 .map(|b| String::from_utf8_lossy(&b.body).to_string()); 105 106 Ok(result.into()) 107 } 108 } 109 110 #[interface(name = "org.cloudhypervisor.DBusApi1")] 111 impl DBusApi { 112 async fn vmm_ping(&self) -> Result<String> { 113 let api_sender = self.clone_api_sender().await; 114 let api_notifier = self.clone_api_notifier()?; 115 116 let result = blocking::unblock(move || VmmPing.send(api_notifier, api_sender, ())) 117 .await 118 .map_err(api_error)?; 119 serde_json::to_string(&result).map_err(api_error) 120 } 121 122 async fn vmm_shutdown(&self) -> Result<()> { 123 let api_sender = self.clone_api_sender().await; 124 let api_notifier = self.clone_api_notifier()?; 125 126 blocking::unblock(move || VmmShutdown.send(api_notifier, api_sender, ())) 127 .await 128 .map_err(api_error) 129 } 130 131 async fn vm_add_device(&self, device_config: String) -> Result<Optional<String>> { 132 let device_config = serde_json::from_str(&device_config).map_err(api_error)?; 133 self.vm_action(&VmAddDevice, device_config).await 134 } 135 136 async fn vm_add_disk(&self, disk_config: String) -> Result<Optional<String>> { 137 let disk_config = serde_json::from_str(&disk_config).map_err(api_error)?; 138 self.vm_action(&AddDisk, disk_config).await 139 } 140 141 async fn vm_add_fs(&self, fs_config: String) -> Result<Optional<String>> { 142 let fs_config = serde_json::from_str(&fs_config).map_err(api_error)?; 143 self.vm_action(&VmAddFs, fs_config).await 144 } 145 146 async fn vm_add_net(&self, net_config: String) -> Result<Optional<String>> { 147 let mut net_config: NetConfig = serde_json::from_str(&net_config).map_err(api_error)?; 148 if net_config.fds.is_some() { 149 warn!("Ignoring FDs sent via the D-Bus request body"); 150 net_config.fds = None; 151 } 152 self.vm_action(&VmAddNet, net_config).await 153 } 154 155 async fn vm_add_pmem(&self, pmem_config: String) -> Result<Optional<String>> { 156 let pmem_config = serde_json::from_str(&pmem_config).map_err(api_error)?; 157 self.vm_action(&VmAddPmem, pmem_config).await 158 } 159 160 async fn vm_add_user_device(&self, vm_add_user_device: String) -> Result<Optional<String>> { 161 let vm_add_user_device = serde_json::from_str(&vm_add_user_device).map_err(api_error)?; 162 self.vm_action(&VmAddUserDevice, vm_add_user_device).await 163 } 164 165 async fn vm_add_vdpa(&self, vdpa_config: String) -> Result<Optional<String>> { 166 let vdpa_config = serde_json::from_str(&vdpa_config).map_err(api_error)?; 167 self.vm_action(&VmAddVdpa, vdpa_config).await 168 } 169 170 async fn vm_add_vsock(&self, vsock_config: String) -> Result<Optional<String>> { 171 let vsock_config = serde_json::from_str(&vsock_config).map_err(api_error)?; 172 self.vm_action(&VmAddVsock, vsock_config).await 173 } 174 175 async fn vm_boot(&self) -> Result<()> { 176 self.vm_action(&VmBoot, ()).await.map(|_| ()) 177 } 178 179 #[allow(unused_variables)] 180 // zbus doesn't support cfg attributes on interface methods 181 // as a workaround, we make the *call to the internal API* conditionally 182 // compile and return an error on unsupported platforms. 183 async fn vm_coredump(&self, vm_coredump_data: String) -> Result<()> { 184 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))] 185 { 186 let vm_coredump_data = serde_json::from_str(&vm_coredump_data).map_err(api_error)?; 187 self.vm_action(&VmCoredump, vm_coredump_data) 188 .await 189 .map(|_| ()) 190 } 191 192 #[cfg(not(all(target_arch = "x86_64", feature = "guest_debug")))] 193 Err(api_error( 194 "VmCoredump only works on x86_64 with the `guest_debug` feature enabled", 195 )) 196 } 197 198 async fn vm_counters(&self) -> Result<Optional<String>> { 199 self.vm_action(&VmCounters, ()).await 200 } 201 202 async fn vm_create(&self, vm_config: String) -> Result<()> { 203 let api_sender = self.clone_api_sender().await; 204 let api_notifier = self.clone_api_notifier()?; 205 206 let mut vm_config: Box<VmConfig> = serde_json::from_str(&vm_config).map_err(api_error)?; 207 208 if let Some(ref mut nets) = vm_config.net { 209 if nets.iter().any(|net| net.fds.is_some()) { 210 warn!("Ignoring FDs sent via the D-Bus request body"); 211 } 212 for net in nets { 213 net.fds = None; 214 } 215 } 216 217 blocking::unblock(move || VmCreate.send(api_notifier, api_sender, vm_config)) 218 .await 219 .map_err(api_error)?; 220 221 Ok(()) 222 } 223 224 async fn vm_delete(&self) -> Result<()> { 225 self.vm_action(&VmDelete, ()).await.map(|_| ()) 226 } 227 228 async fn vm_info(&self) -> Result<String> { 229 let api_sender = self.clone_api_sender().await; 230 let api_notifier = self.clone_api_notifier()?; 231 232 let result = blocking::unblock(move || VmInfo.send(api_notifier, api_sender, ())) 233 .await 234 .map_err(api_error)?; 235 serde_json::to_string(&result).map_err(api_error) 236 } 237 238 async fn vm_pause(&self) -> Result<()> { 239 self.vm_action(&VmPause, ()).await.map(|_| ()) 240 } 241 242 async fn vm_power_button(&self) -> Result<()> { 243 self.vm_action(&VmPowerButton, ()).await.map(|_| ()) 244 } 245 246 async fn vm_reboot(&self) -> Result<()> { 247 self.vm_action(&VmReboot, ()).await.map(|_| ()) 248 } 249 250 async fn vm_remove_device(&self, vm_remove_device: String) -> Result<()> { 251 let vm_remove_device = serde_json::from_str(&vm_remove_device).map_err(api_error)?; 252 self.vm_action(&VmRemoveDevice, vm_remove_device) 253 .await 254 .map(|_| ()) 255 } 256 257 async fn vm_resize(&self, vm_resize: String) -> Result<()> { 258 let vm_resize = serde_json::from_str(&vm_resize).map_err(api_error)?; 259 self.vm_action(&VmResize, vm_resize).await.map(|_| ()) 260 } 261 262 async fn vm_resize_zone(&self, vm_resize_zone: String) -> Result<()> { 263 let vm_resize_zone = serde_json::from_str(&vm_resize_zone).map_err(api_error)?; 264 self.vm_action(&VmResizeZone, vm_resize_zone) 265 .await 266 .map(|_| ()) 267 } 268 269 async fn vm_restore(&self, restore_config: String) -> Result<()> { 270 let restore_config = serde_json::from_str(&restore_config).map_err(api_error)?; 271 self.vm_action(&VmRestore, restore_config).await.map(|_| ()) 272 } 273 274 async fn vm_receive_migration(&self, receive_migration_data: String) -> Result<()> { 275 let receive_migration_data = 276 serde_json::from_str(&receive_migration_data).map_err(api_error)?; 277 self.vm_action(&VmReceiveMigration, receive_migration_data) 278 .await 279 .map(|_| ()) 280 } 281 282 async fn vm_send_migration(&self, send_migration_data: String) -> Result<()> { 283 let send_migration_data = serde_json::from_str(&send_migration_data).map_err(api_error)?; 284 self.vm_action(&VmSendMigration, send_migration_data) 285 .await 286 .map(|_| ()) 287 } 288 289 async fn vm_resume(&self) -> Result<()> { 290 self.vm_action(&VmResume, ()).await.map(|_| ()) 291 } 292 293 async fn vm_shutdown(&self) -> Result<()> { 294 self.vm_action(&VmShutdown, ()).await.map(|_| ()) 295 } 296 297 async fn vm_snapshot(&self, vm_snapshot_config: String) -> Result<()> { 298 let vm_snapshot_config = serde_json::from_str(&vm_snapshot_config).map_err(api_error)?; 299 self.vm_action(&VmSnapshot, vm_snapshot_config) 300 .await 301 .map(|_| ()) 302 } 303 304 // implementation of this function is provided by the `#[zbus(signal)]` macro call 305 #[zbus(signal)] 306 async fn event( 307 ctxt: &zbus::object_server::SignalEmitter<'_>, 308 event: Arc<String>, 309 ) -> zbus::Result<()>; 310 } 311 312 pub fn start_dbus_thread( 313 dbus_options: DBusApiOptions, 314 api_notifier: EventFd, 315 api_sender: Sender<ApiRequest>, 316 seccomp_action: &SeccompAction, 317 exit_evt: EventFd, 318 hypervisor_type: HypervisorType, 319 ) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> { 320 let dbus_iface = DBusApi::new(api_notifier, api_sender); 321 let (connection, iface_ref) = executor::block_on(async move { 322 let conn_builder = if dbus_options.system_bus { 323 Builder::system()? 324 } else { 325 Builder::session()? 326 }; 327 328 let conn = conn_builder 329 .internal_executor(false) 330 .name(dbus_options.service_name)? 331 .serve_at(dbus_options.object_path.as_str(), dbus_iface)? 332 .build() 333 .await?; 334 335 let iface_ref = conn 336 .object_server() 337 .interface::<_, DBusApi>(dbus_options.object_path) 338 .await?; 339 340 Ok((conn, iface_ref)) 341 }) 342 .map_err(VmmError::CreateDBusSession)?; 343 344 let (send_shutdown, recv_shutdown) = oneshot::channel::<()>(); 345 let (send_done, recv_done) = oneshot::channel::<()>(); 346 347 // Retrieve seccomp filter for API thread 348 let api_seccomp_filter = get_seccomp_filter(seccomp_action, Thread::DBusApi, hypervisor_type) 349 .map_err(VmmError::CreateSeccompFilter)?; 350 351 let thread_join_handle = thread::Builder::new() 352 .name("dbus-thread".to_string()) 353 .spawn(move || { 354 // Apply seccomp filter for API thread. 355 if !api_seccomp_filter.is_empty() { 356 apply_filter(&api_seccomp_filter) 357 .map_err(VmmError::ApplySeccompFilter) 358 .map_err(|e| { 359 error!("Error applying seccomp filter: {:?}", e); 360 exit_evt.write(1).ok(); 361 e 362 })?; 363 } 364 365 std::panic::catch_unwind(AssertUnwindSafe(move || { 366 executor::block_on(async move { 367 let recv_shutdown = recv_shutdown.fuse(); 368 let executor_tick = futures::future::Fuse::terminated(); 369 futures::pin_mut!(recv_shutdown, executor_tick); 370 executor_tick.set(connection.executor().tick().fuse()); 371 372 loop { 373 futures::select! { 374 _ = executor_tick => executor_tick.set(connection.executor().tick().fuse()), 375 _ = recv_shutdown => { 376 send_done.send(()).ok(); 377 break; 378 }, 379 ret = dbus_options.event_monitor_rx.recv_async() => { 380 if let Ok(event) = ret { 381 DBusApi::event(iface_ref.signal_emitter(), event).await.ok(); 382 } 383 } 384 } 385 } 386 }) 387 })) 388 .map_err(|_| { 389 error!("dbus-api thread panicked"); 390 exit_evt.write(1).ok() 391 }) 392 .ok(); 393 394 Ok(()) 395 }) 396 .map_err(VmmError::DBusThreadSpawn)?; 397 398 Ok((thread_join_handle, (send_shutdown, recv_done))) 399 } 400