1 // Copyright © 2023 Sartura Ltd. 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 use super::{ApiAction, ApiRequest}; 6 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))] 7 use crate::api::VmCoredump; 8 use crate::api::{ 9 AddDisk, Body, VmAddDevice, VmAddFs, VmAddNet, VmAddPmem, VmAddUserDevice, VmAddVdpa, 10 VmAddVsock, VmBoot, VmCounters, VmCreate, VmDelete, VmInfo, VmPause, VmPowerButton, VmReboot, 11 VmReceiveMigration, VmRemoveDevice, VmResize, VmResizeZone, VmRestore, VmResume, 12 VmSendMigration, VmShutdown, VmSnapshot, VmmPing, VmmShutdown, 13 }; 14 use crate::seccomp_filters::{get_seccomp_filter, Thread}; 15 use crate::{Error as VmmError, Result as VmmResult}; 16 use crate::{NetConfig, VmConfig}; 17 use futures::channel::oneshot; 18 use futures::{executor, FutureExt}; 19 use hypervisor::HypervisorType; 20 use seccompiler::{apply_filter, SeccompAction}; 21 use std::panic::AssertUnwindSafe; 22 use std::sync::mpsc::Sender; 23 use std::sync::{Arc, Mutex}; 24 use std::thread; 25 use vmm_sys_util::eventfd::EventFd; 26 use zbus::fdo::{self, Result}; 27 use zbus::zvariant::Optional; 28 use zbus::{interface, ConnectionBuilder}; 29 30 pub type DBusApiShutdownChannels = (oneshot::Sender<()>, oneshot::Receiver<()>); 31 32 pub struct DBusApiOptions { 33 pub service_name: String, 34 pub object_path: String, 35 pub system_bus: bool, 36 pub event_monitor_rx: flume::Receiver<Arc<String>>, 37 } 38 39 pub struct DBusApi { 40 api_notifier: EventFd, 41 api_sender: futures::lock::Mutex<Sender<ApiRequest>>, 42 } 43 44 fn api_error(error: impl std::fmt::Debug + std::fmt::Display) -> fdo::Error { 45 fdo::Error::Failed(format!("{error}")) 46 } 47 48 // This method is intended to ensure that the DBusApi thread has enough time to 49 // send a response to the VmmShutdown method call before it is terminated. If 50 // this step is omitted, the thread may be terminated before it can send a 51 // response, resulting in an error message stating that the message recipient 52 // disconnected from the message bus without providing a reply. 53 pub fn dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels) { 54 let (send_shutdown, mut recv_done) = ch; 55 56 // send the shutdown signal and return 57 // if it errors out 58 if send_shutdown.send(()).is_err() { 59 return; 60 } 61 62 // loop until `recv_err` errors out 63 // or as long as the return value indicates 64 // "immediately stale" (None) 65 while let Ok(None) = recv_done.try_recv() {} 66 } 67 68 impl DBusApi { 69 pub fn new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self { 70 Self { 71 api_notifier, 72 api_sender: futures::lock::Mutex::new(api_sender), 73 } 74 } 75 76 async fn clone_api_sender(&self) -> Sender<ApiRequest> { 77 // lock the async mutex, clone the `Sender` and then immediately 78 // drop the MutexGuard so that other tasks can clone the 79 // `Sender` as well 80 self.api_sender.lock().await.clone() 81 } 82 83 fn clone_api_notifier(&self) -> Result<EventFd> { 84 self.api_notifier 85 .try_clone() 86 .map_err(|err| fdo::Error::IOError(format!("{err:?}"))) 87 } 88 89 async fn vm_action<Action: ApiAction<ResponseBody = Option<Body>>>( 90 &self, 91 action: &'static Action, 92 body: Action::RequestBody, 93 ) -> Result<Optional<String>> { 94 let api_sender = self.clone_api_sender().await; 95 let api_notifier = self.clone_api_notifier()?; 96 97 let result = blocking::unblock(move || action.send(api_notifier, api_sender, body)) 98 .await 99 .map_err(api_error)? 100 // We're using `from_utf8_lossy` here to not deal with the 101 // error case of `from_utf8` as we know that `b.body` is valid JSON. 102 .map(|b| String::from_utf8_lossy(&b.body).to_string()); 103 104 Ok(result.into()) 105 } 106 } 107 108 #[interface(name = "org.cloudhypervisor.DBusApi1")] 109 impl DBusApi { 110 async fn vmm_ping(&self) -> Result<String> { 111 let api_sender = self.clone_api_sender().await; 112 let api_notifier = self.clone_api_notifier()?; 113 114 let result = blocking::unblock(move || VmmPing.send(api_notifier, api_sender, ())) 115 .await 116 .map_err(api_error)?; 117 serde_json::to_string(&result).map_err(api_error) 118 } 119 120 async fn vmm_shutdown(&self) -> Result<()> { 121 let api_sender = self.clone_api_sender().await; 122 let api_notifier = self.clone_api_notifier()?; 123 124 blocking::unblock(move || VmmShutdown.send(api_notifier, api_sender, ())) 125 .await 126 .map_err(api_error) 127 } 128 129 async fn vm_add_device(&self, device_config: String) -> Result<Optional<String>> { 130 let device_config = serde_json::from_str(&device_config).map_err(api_error)?; 131 self.vm_action(&VmAddDevice, device_config).await 132 } 133 134 async fn vm_add_disk(&self, disk_config: String) -> Result<Optional<String>> { 135 let disk_config = serde_json::from_str(&disk_config).map_err(api_error)?; 136 self.vm_action(&AddDisk, disk_config).await 137 } 138 139 async fn vm_add_fs(&self, fs_config: String) -> Result<Optional<String>> { 140 let fs_config = serde_json::from_str(&fs_config).map_err(api_error)?; 141 self.vm_action(&VmAddFs, fs_config).await 142 } 143 144 async fn vm_add_net(&self, net_config: String) -> Result<Optional<String>> { 145 let mut net_config: NetConfig = serde_json::from_str(&net_config).map_err(api_error)?; 146 if net_config.fds.is_some() { 147 warn!("Ignoring FDs sent via the D-Bus request body"); 148 net_config.fds = None; 149 } 150 self.vm_action(&VmAddNet, net_config).await 151 } 152 153 async fn vm_add_pmem(&self, pmem_config: String) -> Result<Optional<String>> { 154 let pmem_config = serde_json::from_str(&pmem_config).map_err(api_error)?; 155 self.vm_action(&VmAddPmem, pmem_config).await 156 } 157 158 async fn vm_add_user_device(&self, vm_add_user_device: String) -> Result<Optional<String>> { 159 let vm_add_user_device = serde_json::from_str(&vm_add_user_device).map_err(api_error)?; 160 self.vm_action(&VmAddUserDevice, vm_add_user_device).await 161 } 162 163 async fn vm_add_vdpa(&self, vdpa_config: String) -> Result<Optional<String>> { 164 let vdpa_config = serde_json::from_str(&vdpa_config).map_err(api_error)?; 165 self.vm_action(&VmAddVdpa, vdpa_config).await 166 } 167 168 async fn vm_add_vsock(&self, vsock_config: String) -> Result<Optional<String>> { 169 let vsock_config = serde_json::from_str(&vsock_config).map_err(api_error)?; 170 self.vm_action(&VmAddVsock, vsock_config).await 171 } 172 173 async fn vm_boot(&self) -> Result<()> { 174 self.vm_action(&VmBoot, ()).await.map(|_| ()) 175 } 176 177 #[allow(unused_variables)] 178 // zbus doesn't support cfg attributes on interface methods 179 // as a workaround, we make the *call to the internal API* conditionally 180 // compile and return an error on unsupported platforms. 181 async fn vm_coredump(&self, vm_coredump_data: String) -> Result<()> { 182 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))] 183 { 184 let vm_coredump_data = serde_json::from_str(&vm_coredump_data).map_err(api_error)?; 185 self.vm_action(&VmCoredump, vm_coredump_data) 186 .await 187 .map(|_| ()) 188 } 189 190 #[cfg(not(all(target_arch = "x86_64", feature = "guest_debug")))] 191 Err(api_error( 192 "VmCoredump only works on x86_64 with the `guest_debug` feature enabled", 193 )) 194 } 195 196 async fn vm_counters(&self) -> Result<Optional<String>> { 197 self.vm_action(&VmCounters, ()).await 198 } 199 200 async fn vm_create(&self, vm_config: String) -> Result<()> { 201 let api_sender = self.clone_api_sender().await; 202 let api_notifier = self.clone_api_notifier()?; 203 204 let mut vm_config: VmConfig = serde_json::from_str(&vm_config).map_err(api_error)?; 205 206 if let Some(ref mut nets) = vm_config.net { 207 if nets.iter().any(|net| net.fds.is_some()) { 208 warn!("Ignoring FDs sent via the D-Bus request body"); 209 } 210 for net in nets { 211 net.fds = None; 212 } 213 } 214 215 blocking::unblock(move || { 216 VmCreate.send(api_notifier, api_sender, Arc::new(Mutex::new(vm_config))) 217 }) 218 .await 219 .map_err(api_error)?; 220 221 Ok(()) 222 } 223 224 async fn vm_delete(&self) -> Result<()> { 225 self.vm_action(&VmDelete, ()).await.map(|_| ()) 226 } 227 228 async fn vm_info(&self) -> Result<String> { 229 let api_sender = self.clone_api_sender().await; 230 let api_notifier = self.clone_api_notifier()?; 231 232 let result = blocking::unblock(move || VmInfo.send(api_notifier, api_sender, ())) 233 .await 234 .map_err(api_error)?; 235 serde_json::to_string(&result).map_err(api_error) 236 } 237 238 async fn vm_pause(&self) -> Result<()> { 239 self.vm_action(&VmPause, ()).await.map(|_| ()) 240 } 241 242 async fn vm_power_button(&self) -> Result<()> { 243 self.vm_action(&VmPowerButton, ()).await.map(|_| ()) 244 } 245 246 async fn vm_reboot(&self) -> Result<()> { 247 self.vm_action(&VmReboot, ()).await.map(|_| ()) 248 } 249 250 async fn vm_remove_device(&self, vm_remove_device: String) -> Result<()> { 251 let vm_remove_device = serde_json::from_str(&vm_remove_device).map_err(api_error)?; 252 self.vm_action(&VmRemoveDevice, vm_remove_device) 253 .await 254 .map(|_| ()) 255 } 256 257 async fn vm_resize(&self, vm_resize: String) -> Result<()> { 258 let vm_resize = serde_json::from_str(&vm_resize).map_err(api_error)?; 259 self.vm_action(&VmResize, vm_resize).await.map(|_| ()) 260 } 261 262 async fn vm_resize_zone(&self, vm_resize_zone: String) -> Result<()> { 263 let vm_resize_zone = serde_json::from_str(&vm_resize_zone).map_err(api_error)?; 264 self.vm_action(&VmResizeZone, vm_resize_zone) 265 .await 266 .map(|_| ()) 267 } 268 269 async fn vm_restore(&self, restore_config: String) -> Result<()> { 270 let restore_config = serde_json::from_str(&restore_config).map_err(api_error)?; 271 self.vm_action(&VmRestore, restore_config).await.map(|_| ()) 272 } 273 274 async fn vm_receive_migration(&self, receive_migration_data: String) -> Result<()> { 275 let receive_migration_data = 276 serde_json::from_str(&receive_migration_data).map_err(api_error)?; 277 self.vm_action(&VmReceiveMigration, receive_migration_data) 278 .await 279 .map(|_| ()) 280 } 281 282 async fn vm_send_migration(&self, send_migration_data: String) -> Result<()> { 283 let send_migration_data = serde_json::from_str(&send_migration_data).map_err(api_error)?; 284 self.vm_action(&VmSendMigration, send_migration_data) 285 .await 286 .map(|_| ()) 287 } 288 289 async fn vm_resume(&self) -> Result<()> { 290 self.vm_action(&VmResume, ()).await.map(|_| ()) 291 } 292 293 async fn vm_shutdown(&self) -> Result<()> { 294 self.vm_action(&VmShutdown, ()).await.map(|_| ()) 295 } 296 297 async fn vm_snapshot(&self, vm_snapshot_config: String) -> Result<()> { 298 let vm_snapshot_config = serde_json::from_str(&vm_snapshot_config).map_err(api_error)?; 299 self.vm_action(&VmSnapshot, vm_snapshot_config) 300 .await 301 .map(|_| ()) 302 } 303 304 // implementation of this function is provided by the `#[zbus(signal)]` macro call 305 #[zbus(signal)] 306 async fn event(ctxt: &zbus::SignalContext<'_>, event: Arc<String>) -> zbus::Result<()>; 307 } 308 309 pub fn start_dbus_thread( 310 dbus_options: DBusApiOptions, 311 api_notifier: EventFd, 312 api_sender: Sender<ApiRequest>, 313 seccomp_action: &SeccompAction, 314 exit_evt: EventFd, 315 hypervisor_type: HypervisorType, 316 ) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> { 317 let dbus_iface = DBusApi::new(api_notifier, api_sender); 318 let (connection, iface_ref) = executor::block_on(async move { 319 let conn_builder = if dbus_options.system_bus { 320 ConnectionBuilder::system()? 321 } else { 322 ConnectionBuilder::session()? 323 }; 324 325 let conn = conn_builder 326 .internal_executor(false) 327 .name(dbus_options.service_name)? 328 .serve_at(dbus_options.object_path.as_str(), dbus_iface)? 329 .build() 330 .await?; 331 332 let iface_ref = conn 333 .object_server() 334 .interface::<_, DBusApi>(dbus_options.object_path) 335 .await?; 336 337 Ok((conn, iface_ref)) 338 }) 339 .map_err(VmmError::CreateDBusSession)?; 340 341 let (send_shutdown, recv_shutdown) = oneshot::channel::<()>(); 342 let (send_done, recv_done) = oneshot::channel::<()>(); 343 344 // Retrieve seccomp filter for API thread 345 let api_seccomp_filter = get_seccomp_filter(seccomp_action, Thread::DBusApi, hypervisor_type) 346 .map_err(VmmError::CreateSeccompFilter)?; 347 348 let thread_join_handle = thread::Builder::new() 349 .name("dbus-thread".to_string()) 350 .spawn(move || { 351 // Apply seccomp filter for API thread. 352 if !api_seccomp_filter.is_empty() { 353 apply_filter(&api_seccomp_filter) 354 .map_err(VmmError::ApplySeccompFilter) 355 .map_err(|e| { 356 error!("Error applying seccomp filter: {:?}", e); 357 exit_evt.write(1).ok(); 358 e 359 })?; 360 } 361 362 std::panic::catch_unwind(AssertUnwindSafe(move || { 363 executor::block_on(async move { 364 let recv_shutdown = recv_shutdown.fuse(); 365 let executor_tick = futures::future::Fuse::terminated(); 366 futures::pin_mut!(recv_shutdown, executor_tick); 367 executor_tick.set(connection.executor().tick().fuse()); 368 369 loop { 370 futures::select! { 371 _ = executor_tick => executor_tick.set(connection.executor().tick().fuse()), 372 _ = recv_shutdown => { 373 send_done.send(()).ok(); 374 break; 375 }, 376 ret = dbus_options.event_monitor_rx.recv_async() => { 377 if let Ok(event) = ret { 378 DBusApi::event(iface_ref.signal_context(), event).await.ok(); 379 } 380 } 381 } 382 } 383 }) 384 })) 385 .map_err(|_| { 386 error!("dbus-api thread panicked"); 387 exit_evt.write(1).ok() 388 }) 389 .ok(); 390 391 Ok(()) 392 }) 393 .map_err(VmmError::DBusThreadSpawn)?; 394 395 Ok((thread_join_handle, (send_shutdown, recv_done))) 396 } 397