1 // Copyright © 2023 Sartura Ltd. 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 use std::panic::AssertUnwindSafe; 6 use std::sync::mpsc::Sender; 7 use std::sync::Arc; 8 use std::thread; 9 10 use futures::channel::oneshot; 11 use futures::{executor, FutureExt}; 12 use hypervisor::HypervisorType; 13 use seccompiler::{apply_filter, SeccompAction}; 14 use vmm_sys_util::eventfd::EventFd; 15 use zbus::fdo::{self, Result}; 16 use zbus::zvariant::Optional; 17 use zbus::{interface, ConnectionBuilder}; 18 19 use super::{ApiAction, ApiRequest}; 20 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))] 21 use crate::api::VmCoredump; 22 use crate::api::{ 23 AddDisk, Body, VmAddDevice, VmAddFs, VmAddNet, VmAddPmem, VmAddUserDevice, VmAddVdpa, 24 VmAddVsock, VmBoot, VmCounters, VmCreate, VmDelete, VmInfo, VmPause, VmPowerButton, VmReboot, 25 VmReceiveMigration, VmRemoveDevice, VmResize, VmResizeZone, VmRestore, VmResume, 26 VmSendMigration, VmShutdown, VmSnapshot, VmmPing, VmmShutdown, 27 }; 28 use crate::seccomp_filters::{get_seccomp_filter, Thread}; 29 use crate::{Error as VmmError, NetConfig, Result as VmmResult, VmConfig}; 30 31 pub type DBusApiShutdownChannels = (oneshot::Sender<()>, oneshot::Receiver<()>); 32 33 pub struct DBusApiOptions { 34 pub service_name: String, 35 pub object_path: String, 36 pub system_bus: bool, 37 pub event_monitor_rx: flume::Receiver<Arc<String>>, 38 } 39 40 pub struct DBusApi { 41 api_notifier: EventFd, 42 api_sender: futures::lock::Mutex<Sender<ApiRequest>>, 43 } 44 45 fn api_error(error: impl std::fmt::Debug + std::fmt::Display) -> fdo::Error { 46 fdo::Error::Failed(format!("{error}")) 47 } 48 49 // This method is intended to ensure that the DBusApi thread has enough time to 50 // send a response to the VmmShutdown method call before it is terminated. If 51 // this step is omitted, the thread may be terminated before it can send a 52 // response, resulting in an error message stating that the message recipient 53 // disconnected from the message bus without providing a reply. 54 pub fn dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels) { 55 let (send_shutdown, mut recv_done) = ch; 56 57 // send the shutdown signal and return 58 // if it errors out 59 if send_shutdown.send(()).is_err() { 60 return; 61 } 62 63 // loop until `recv_err` errors out 64 // or as long as the return value indicates 65 // "immediately stale" (None) 66 while let Ok(None) = recv_done.try_recv() {} 67 } 68 69 impl DBusApi { 70 pub fn new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self { 71 Self { 72 api_notifier, 73 api_sender: futures::lock::Mutex::new(api_sender), 74 } 75 } 76 77 async fn clone_api_sender(&self) -> Sender<ApiRequest> { 78 // lock the async mutex, clone the `Sender` and then immediately 79 // drop the MutexGuard so that other tasks can clone the 80 // `Sender` as well 81 self.api_sender.lock().await.clone() 82 } 83 84 fn clone_api_notifier(&self) -> Result<EventFd> { 85 self.api_notifier 86 .try_clone() 87 .map_err(|err| fdo::Error::IOError(format!("{err:?}"))) 88 } 89 90 async fn vm_action<Action: ApiAction<ResponseBody = Option<Body>>>( 91 &self, 92 action: &'static Action, 93 body: Action::RequestBody, 94 ) -> Result<Optional<String>> { 95 let api_sender = self.clone_api_sender().await; 96 let api_notifier = self.clone_api_notifier()?; 97 98 let result = blocking::unblock(move || action.send(api_notifier, api_sender, body)) 99 .await 100 .map_err(api_error)? 101 // We're using `from_utf8_lossy` here to not deal with the 102 // error case of `from_utf8` as we know that `b.body` is valid JSON. 103 .map(|b| String::from_utf8_lossy(&b.body).to_string()); 104 105 Ok(result.into()) 106 } 107 } 108 109 #[interface(name = "org.cloudhypervisor.DBusApi1")] 110 impl DBusApi { 111 async fn vmm_ping(&self) -> Result<String> { 112 let api_sender = self.clone_api_sender().await; 113 let api_notifier = self.clone_api_notifier()?; 114 115 let result = blocking::unblock(move || VmmPing.send(api_notifier, api_sender, ())) 116 .await 117 .map_err(api_error)?; 118 serde_json::to_string(&result).map_err(api_error) 119 } 120 121 async fn vmm_shutdown(&self) -> Result<()> { 122 let api_sender = self.clone_api_sender().await; 123 let api_notifier = self.clone_api_notifier()?; 124 125 blocking::unblock(move || VmmShutdown.send(api_notifier, api_sender, ())) 126 .await 127 .map_err(api_error) 128 } 129 130 async fn vm_add_device(&self, device_config: String) -> Result<Optional<String>> { 131 let device_config = serde_json::from_str(&device_config).map_err(api_error)?; 132 self.vm_action(&VmAddDevice, device_config).await 133 } 134 135 async fn vm_add_disk(&self, disk_config: String) -> Result<Optional<String>> { 136 let disk_config = serde_json::from_str(&disk_config).map_err(api_error)?; 137 self.vm_action(&AddDisk, disk_config).await 138 } 139 140 async fn vm_add_fs(&self, fs_config: String) -> Result<Optional<String>> { 141 let fs_config = serde_json::from_str(&fs_config).map_err(api_error)?; 142 self.vm_action(&VmAddFs, fs_config).await 143 } 144 145 async fn vm_add_net(&self, net_config: String) -> Result<Optional<String>> { 146 let mut net_config: NetConfig = serde_json::from_str(&net_config).map_err(api_error)?; 147 if net_config.fds.is_some() { 148 warn!("Ignoring FDs sent via the D-Bus request body"); 149 net_config.fds = None; 150 } 151 self.vm_action(&VmAddNet, net_config).await 152 } 153 154 async fn vm_add_pmem(&self, pmem_config: String) -> Result<Optional<String>> { 155 let pmem_config = serde_json::from_str(&pmem_config).map_err(api_error)?; 156 self.vm_action(&VmAddPmem, pmem_config).await 157 } 158 159 async fn vm_add_user_device(&self, vm_add_user_device: String) -> Result<Optional<String>> { 160 let vm_add_user_device = serde_json::from_str(&vm_add_user_device).map_err(api_error)?; 161 self.vm_action(&VmAddUserDevice, vm_add_user_device).await 162 } 163 164 async fn vm_add_vdpa(&self, vdpa_config: String) -> Result<Optional<String>> { 165 let vdpa_config = serde_json::from_str(&vdpa_config).map_err(api_error)?; 166 self.vm_action(&VmAddVdpa, vdpa_config).await 167 } 168 169 async fn vm_add_vsock(&self, vsock_config: String) -> Result<Optional<String>> { 170 let vsock_config = serde_json::from_str(&vsock_config).map_err(api_error)?; 171 self.vm_action(&VmAddVsock, vsock_config).await 172 } 173 174 async fn vm_boot(&self) -> Result<()> { 175 self.vm_action(&VmBoot, ()).await.map(|_| ()) 176 } 177 178 #[allow(unused_variables)] 179 // zbus doesn't support cfg attributes on interface methods 180 // as a workaround, we make the *call to the internal API* conditionally 181 // compile and return an error on unsupported platforms. 182 async fn vm_coredump(&self, vm_coredump_data: String) -> Result<()> { 183 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))] 184 { 185 let vm_coredump_data = serde_json::from_str(&vm_coredump_data).map_err(api_error)?; 186 self.vm_action(&VmCoredump, vm_coredump_data) 187 .await 188 .map(|_| ()) 189 } 190 191 #[cfg(not(all(target_arch = "x86_64", feature = "guest_debug")))] 192 Err(api_error( 193 "VmCoredump only works on x86_64 with the `guest_debug` feature enabled", 194 )) 195 } 196 197 async fn vm_counters(&self) -> Result<Optional<String>> { 198 self.vm_action(&VmCounters, ()).await 199 } 200 201 async fn vm_create(&self, vm_config: String) -> Result<()> { 202 let api_sender = self.clone_api_sender().await; 203 let api_notifier = self.clone_api_notifier()?; 204 205 let mut vm_config: Box<VmConfig> = serde_json::from_str(&vm_config).map_err(api_error)?; 206 207 if let Some(ref mut nets) = vm_config.net { 208 if nets.iter().any(|net| net.fds.is_some()) { 209 warn!("Ignoring FDs sent via the D-Bus request body"); 210 } 211 for net in nets { 212 net.fds = None; 213 } 214 } 215 216 blocking::unblock(move || VmCreate.send(api_notifier, api_sender, vm_config)) 217 .await 218 .map_err(api_error)?; 219 220 Ok(()) 221 } 222 223 async fn vm_delete(&self) -> Result<()> { 224 self.vm_action(&VmDelete, ()).await.map(|_| ()) 225 } 226 227 async fn vm_info(&self) -> Result<String> { 228 let api_sender = self.clone_api_sender().await; 229 let api_notifier = self.clone_api_notifier()?; 230 231 let result = blocking::unblock(move || VmInfo.send(api_notifier, api_sender, ())) 232 .await 233 .map_err(api_error)?; 234 serde_json::to_string(&result).map_err(api_error) 235 } 236 237 async fn vm_pause(&self) -> Result<()> { 238 self.vm_action(&VmPause, ()).await.map(|_| ()) 239 } 240 241 async fn vm_power_button(&self) -> Result<()> { 242 self.vm_action(&VmPowerButton, ()).await.map(|_| ()) 243 } 244 245 async fn vm_reboot(&self) -> Result<()> { 246 self.vm_action(&VmReboot, ()).await.map(|_| ()) 247 } 248 249 async fn vm_remove_device(&self, vm_remove_device: String) -> Result<()> { 250 let vm_remove_device = serde_json::from_str(&vm_remove_device).map_err(api_error)?; 251 self.vm_action(&VmRemoveDevice, vm_remove_device) 252 .await 253 .map(|_| ()) 254 } 255 256 async fn vm_resize(&self, vm_resize: String) -> Result<()> { 257 let vm_resize = serde_json::from_str(&vm_resize).map_err(api_error)?; 258 self.vm_action(&VmResize, vm_resize).await.map(|_| ()) 259 } 260 261 async fn vm_resize_zone(&self, vm_resize_zone: String) -> Result<()> { 262 let vm_resize_zone = serde_json::from_str(&vm_resize_zone).map_err(api_error)?; 263 self.vm_action(&VmResizeZone, vm_resize_zone) 264 .await 265 .map(|_| ()) 266 } 267 268 async fn vm_restore(&self, restore_config: String) -> Result<()> { 269 let restore_config = serde_json::from_str(&restore_config).map_err(api_error)?; 270 self.vm_action(&VmRestore, restore_config).await.map(|_| ()) 271 } 272 273 async fn vm_receive_migration(&self, receive_migration_data: String) -> Result<()> { 274 let receive_migration_data = 275 serde_json::from_str(&receive_migration_data).map_err(api_error)?; 276 self.vm_action(&VmReceiveMigration, receive_migration_data) 277 .await 278 .map(|_| ()) 279 } 280 281 async fn vm_send_migration(&self, send_migration_data: String) -> Result<()> { 282 let send_migration_data = serde_json::from_str(&send_migration_data).map_err(api_error)?; 283 self.vm_action(&VmSendMigration, send_migration_data) 284 .await 285 .map(|_| ()) 286 } 287 288 async fn vm_resume(&self) -> Result<()> { 289 self.vm_action(&VmResume, ()).await.map(|_| ()) 290 } 291 292 async fn vm_shutdown(&self) -> Result<()> { 293 self.vm_action(&VmShutdown, ()).await.map(|_| ()) 294 } 295 296 async fn vm_snapshot(&self, vm_snapshot_config: String) -> Result<()> { 297 let vm_snapshot_config = serde_json::from_str(&vm_snapshot_config).map_err(api_error)?; 298 self.vm_action(&VmSnapshot, vm_snapshot_config) 299 .await 300 .map(|_| ()) 301 } 302 303 // implementation of this function is provided by the `#[zbus(signal)]` macro call 304 #[zbus(signal)] 305 async fn event(ctxt: &zbus::SignalContext<'_>, event: Arc<String>) -> zbus::Result<()>; 306 } 307 308 pub fn start_dbus_thread( 309 dbus_options: DBusApiOptions, 310 api_notifier: EventFd, 311 api_sender: Sender<ApiRequest>, 312 seccomp_action: &SeccompAction, 313 exit_evt: EventFd, 314 hypervisor_type: HypervisorType, 315 ) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> { 316 let dbus_iface = DBusApi::new(api_notifier, api_sender); 317 let (connection, iface_ref) = executor::block_on(async move { 318 let conn_builder = if dbus_options.system_bus { 319 ConnectionBuilder::system()? 320 } else { 321 ConnectionBuilder::session()? 322 }; 323 324 let conn = conn_builder 325 .internal_executor(false) 326 .name(dbus_options.service_name)? 327 .serve_at(dbus_options.object_path.as_str(), dbus_iface)? 328 .build() 329 .await?; 330 331 let iface_ref = conn 332 .object_server() 333 .interface::<_, DBusApi>(dbus_options.object_path) 334 .await?; 335 336 Ok((conn, iface_ref)) 337 }) 338 .map_err(VmmError::CreateDBusSession)?; 339 340 let (send_shutdown, recv_shutdown) = oneshot::channel::<()>(); 341 let (send_done, recv_done) = oneshot::channel::<()>(); 342 343 // Retrieve seccomp filter for API thread 344 let api_seccomp_filter = get_seccomp_filter(seccomp_action, Thread::DBusApi, hypervisor_type) 345 .map_err(VmmError::CreateSeccompFilter)?; 346 347 let thread_join_handle = thread::Builder::new() 348 .name("dbus-thread".to_string()) 349 .spawn(move || { 350 // Apply seccomp filter for API thread. 351 if !api_seccomp_filter.is_empty() { 352 apply_filter(&api_seccomp_filter) 353 .map_err(VmmError::ApplySeccompFilter) 354 .map_err(|e| { 355 error!("Error applying seccomp filter: {:?}", e); 356 exit_evt.write(1).ok(); 357 e 358 })?; 359 } 360 361 std::panic::catch_unwind(AssertUnwindSafe(move || { 362 executor::block_on(async move { 363 let recv_shutdown = recv_shutdown.fuse(); 364 let executor_tick = futures::future::Fuse::terminated(); 365 futures::pin_mut!(recv_shutdown, executor_tick); 366 executor_tick.set(connection.executor().tick().fuse()); 367 368 loop { 369 futures::select! { 370 _ = executor_tick => executor_tick.set(connection.executor().tick().fuse()), 371 _ = recv_shutdown => { 372 send_done.send(()).ok(); 373 break; 374 }, 375 ret = dbus_options.event_monitor_rx.recv_async() => { 376 if let Ok(event) = ret { 377 DBusApi::event(iface_ref.signal_context(), event).await.ok(); 378 } 379 } 380 } 381 } 382 }) 383 })) 384 .map_err(|_| { 385 error!("dbus-api thread panicked"); 386 exit_evt.write(1).ok() 387 }) 388 .ok(); 389 390 Ok(()) 391 }) 392 .map_err(VmmError::DBusThreadSpawn)?; 393 394 Ok((thread_join_handle, (send_shutdown, recv_done))) 395 } 396