1 // Copyright © 2023 Sartura Ltd. 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 use super::{ApiRequest, VmAction}; 6 use crate::seccomp_filters::{get_seccomp_filter, Thread}; 7 use crate::{Error as VmmError, Result as VmmResult}; 8 use crate::{NetConfig, VmConfig}; 9 use futures::channel::oneshot; 10 use futures::{executor, FutureExt}; 11 use hypervisor::HypervisorType; 12 use seccompiler::{apply_filter, SeccompAction}; 13 use std::panic::AssertUnwindSafe; 14 use std::sync::mpsc::Sender; 15 use std::sync::{Arc, Mutex}; 16 use std::thread; 17 use vmm_sys_util::eventfd::EventFd; 18 use zbus::fdo::{self, Result}; 19 use zbus::zvariant::Optional; 20 use zbus::{dbus_interface, ConnectionBuilder}; 21 22 pub type DBusApiShutdownChannels = (oneshot::Sender<()>, oneshot::Receiver<()>); 23 24 pub struct DBusApiOptions { 25 pub service_name: String, 26 pub object_path: String, 27 pub system_bus: bool, 28 pub event_monitor_rx: flume::Receiver<Arc<String>>, 29 } 30 31 pub struct DBusApi { 32 api_notifier: EventFd, 33 api_sender: futures::lock::Mutex<Sender<ApiRequest>>, 34 } 35 36 fn api_error(error: impl std::fmt::Debug) -> fdo::Error { 37 fdo::Error::Failed(format!("{error:?}")) 38 } 39 40 // This method is intended to ensure that the DBusApi thread has enough time to 41 // send a response to the VmmShutdown method call before it is terminated. If 42 // this step is omitted, the thread may be terminated before it can send a 43 // response, resulting in an error message stating that the message recipient 44 // disconnected from the message bus without providing a reply. 45 pub fn dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels) { 46 let (send_shutdown, mut recv_done) = ch; 47 48 // send the shutdown signal and return 49 // if it errors out 50 if send_shutdown.send(()).is_err() { 51 return; 52 } 53 54 // loop until `recv_err` errors out 55 // or as long as the return value indicates 56 // "immediately stale" (None) 57 while let Ok(None) = recv_done.try_recv() {} 58 } 59 60 impl DBusApi { 61 pub fn new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self { 62 Self { 63 api_notifier, 64 api_sender: futures::lock::Mutex::new(api_sender), 65 } 66 } 67 68 async fn clone_api_sender(&self) -> Sender<ApiRequest> { 69 // lock the async mutex, clone the `Sender` and then immediately 70 // drop the MutexGuard so that other tasks can clone the 71 // `Sender` as well 72 self.api_sender.lock().await.clone() 73 } 74 75 fn clone_api_notifier(&self) -> Result<EventFd> { 76 self.api_notifier 77 .try_clone() 78 .map_err(|err| fdo::Error::IOError(format!("{err:?}"))) 79 } 80 81 async fn vm_action(&self, action: VmAction) -> Result<Optional<String>> { 82 let api_sender = self.clone_api_sender().await; 83 let api_notifier = self.clone_api_notifier()?; 84 85 let result = blocking::unblock(move || super::vm_action(api_notifier, api_sender, action)) 86 .await 87 .map_err(api_error)? 88 // We're using `from_utf8_lossy` here to not deal with the 89 // error case of `from_utf8` as we know that `b.body` is valid JSON. 90 .map(|b| String::from_utf8_lossy(&b.body).to_string()); 91 92 Ok(result.into()) 93 } 94 } 95 96 #[dbus_interface(name = "org.cloudhypervisor.DBusApi1")] 97 impl DBusApi { 98 async fn vmm_ping(&self) -> Result<String> { 99 let api_sender = self.clone_api_sender().await; 100 let api_notifier = self.clone_api_notifier()?; 101 102 let result = blocking::unblock(move || super::vmm_ping(api_notifier, api_sender)) 103 .await 104 .map_err(api_error)?; 105 serde_json::to_string(&result).map_err(api_error) 106 } 107 108 async fn vmm_shutdown(&self) -> Result<()> { 109 let api_sender = self.clone_api_sender().await; 110 let api_notifier = self.clone_api_notifier()?; 111 112 blocking::unblock(move || super::vmm_shutdown(api_notifier, api_sender)) 113 .await 114 .map_err(api_error) 115 } 116 117 async fn vm_add_device(&self, device_config: String) -> Result<Optional<String>> { 118 let device_config = serde_json::from_str(&device_config).map_err(api_error)?; 119 self.vm_action(VmAction::AddDevice(Arc::new(device_config))) 120 .await 121 } 122 123 async fn vm_add_disk(&self, disk_config: String) -> Result<Optional<String>> { 124 let disk_config = serde_json::from_str(&disk_config).map_err(api_error)?; 125 self.vm_action(VmAction::AddDisk(Arc::new(disk_config))) 126 .await 127 } 128 129 async fn vm_add_fs(&self, fs_config: String) -> Result<Optional<String>> { 130 let fs_config = serde_json::from_str(&fs_config).map_err(api_error)?; 131 self.vm_action(VmAction::AddFs(Arc::new(fs_config))).await 132 } 133 134 async fn vm_add_net(&self, net_config: String) -> Result<Optional<String>> { 135 let mut net_config: NetConfig = serde_json::from_str(&net_config).map_err(api_error)?; 136 if net_config.fds.is_some() { 137 warn!("Ignoring FDs sent via the D-Bus request body"); 138 net_config.fds = None; 139 } 140 self.vm_action(VmAction::AddNet(Arc::new(net_config))).await 141 } 142 143 async fn vm_add_pmem(&self, pmem_config: String) -> Result<Optional<String>> { 144 let pmem_config = serde_json::from_str(&pmem_config).map_err(api_error)?; 145 self.vm_action(VmAction::AddPmem(Arc::new(pmem_config))) 146 .await 147 } 148 149 async fn vm_add_user_device(&self, vm_add_user_device: String) -> Result<Optional<String>> { 150 let vm_add_user_device = serde_json::from_str(&vm_add_user_device).map_err(api_error)?; 151 self.vm_action(VmAction::AddUserDevice(Arc::new(vm_add_user_device))) 152 .await 153 } 154 155 async fn vm_add_vdpa(&self, vdpa_config: String) -> Result<Optional<String>> { 156 let vdpa_config = serde_json::from_str(&vdpa_config).map_err(api_error)?; 157 self.vm_action(VmAction::AddVdpa(Arc::new(vdpa_config))) 158 .await 159 } 160 161 async fn vm_add_vsock(&self, vsock_config: String) -> Result<Optional<String>> { 162 let vsock_config = serde_json::from_str(&vsock_config).map_err(api_error)?; 163 self.vm_action(VmAction::AddVsock(Arc::new(vsock_config))) 164 .await 165 } 166 167 async fn vm_boot(&self) -> Result<()> { 168 self.vm_action(VmAction::Boot).await.map(|_| ()) 169 } 170 171 #[allow(unused_variables)] 172 // zbus doesn't support cfg attributes on interface methods 173 // as a workaround, we make the *call to the internal API* conditionally 174 // compile and return an error on unsupported platforms. 175 async fn vm_coredump(&self, vm_coredump_data: String) -> Result<()> { 176 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))] 177 { 178 let vm_coredump_data = serde_json::from_str(&vm_coredump_data).map_err(api_error)?; 179 self.vm_action(VmAction::Coredump(Arc::new(vm_coredump_data))) 180 .await 181 .map(|_| ()) 182 } 183 184 #[cfg(not(all(target_arch = "x86_64", feature = "guest_debug")))] 185 Err(api_error( 186 "VmCoredump only works on x86_64 with the `guest_debug` feature enabled", 187 )) 188 } 189 190 async fn vm_counters(&self) -> Result<Optional<String>> { 191 self.vm_action(VmAction::Counters).await 192 } 193 194 async fn vm_create(&self, vm_config: String) -> Result<()> { 195 let api_sender = self.clone_api_sender().await; 196 let api_notifier = self.clone_api_notifier()?; 197 198 let mut vm_config: VmConfig = serde_json::from_str(&vm_config).map_err(api_error)?; 199 200 if let Some(ref mut nets) = vm_config.net { 201 if nets.iter().any(|net| net.fds.is_some()) { 202 warn!("Ignoring FDs sent via the D-Bus request body"); 203 } 204 for net in nets { 205 net.fds = None; 206 } 207 } 208 209 blocking::unblock(move || { 210 super::vm_create(api_notifier, api_sender, Arc::new(Mutex::new(vm_config))) 211 }) 212 .await 213 .map_err(api_error)?; 214 215 Ok(()) 216 } 217 218 async fn vm_delete(&self) -> Result<()> { 219 self.vm_action(VmAction::Delete).await.map(|_| ()) 220 } 221 222 async fn vm_info(&self) -> Result<String> { 223 let api_sender = self.clone_api_sender().await; 224 let api_notifier = self.clone_api_notifier()?; 225 226 let result = blocking::unblock(move || super::vm_info(api_notifier, api_sender)) 227 .await 228 .map_err(api_error)?; 229 serde_json::to_string(&result).map_err(api_error) 230 } 231 232 async fn vm_pause(&self) -> Result<()> { 233 self.vm_action(VmAction::Pause).await.map(|_| ()) 234 } 235 236 async fn vm_power_button(&self) -> Result<()> { 237 self.vm_action(VmAction::PowerButton).await.map(|_| ()) 238 } 239 240 async fn vm_reboot(&self) -> Result<()> { 241 self.vm_action(VmAction::Reboot).await.map(|_| ()) 242 } 243 244 async fn vm_remove_device(&self, vm_remove_device: String) -> Result<()> { 245 let vm_remove_device = serde_json::from_str(&vm_remove_device).map_err(api_error)?; 246 self.vm_action(VmAction::RemoveDevice(Arc::new(vm_remove_device))) 247 .await 248 .map(|_| ()) 249 } 250 251 async fn vm_resize(&self, vm_resize: String) -> Result<()> { 252 let vm_resize = serde_json::from_str(&vm_resize).map_err(api_error)?; 253 self.vm_action(VmAction::Resize(Arc::new(vm_resize))) 254 .await 255 .map(|_| ()) 256 } 257 258 async fn vm_resize_zone(&self, vm_resize_zone: String) -> Result<()> { 259 let vm_resize_zone = serde_json::from_str(&vm_resize_zone).map_err(api_error)?; 260 self.vm_action(VmAction::ResizeZone(Arc::new(vm_resize_zone))) 261 .await 262 .map(|_| ()) 263 } 264 265 async fn vm_restore(&self, restore_config: String) -> Result<()> { 266 let restore_config = serde_json::from_str(&restore_config).map_err(api_error)?; 267 self.vm_action(VmAction::Restore(Arc::new(restore_config))) 268 .await 269 .map(|_| ()) 270 } 271 272 async fn vm_receive_migration(&self, receive_migration_data: String) -> Result<()> { 273 let receive_migration_data = 274 serde_json::from_str(&receive_migration_data).map_err(api_error)?; 275 self.vm_action(VmAction::ReceiveMigration(Arc::new(receive_migration_data))) 276 .await 277 .map(|_| ()) 278 } 279 280 async fn vm_send_migration(&self, send_migration_data: String) -> Result<()> { 281 let send_migration_data = serde_json::from_str(&send_migration_data).map_err(api_error)?; 282 self.vm_action(VmAction::SendMigration(Arc::new(send_migration_data))) 283 .await 284 .map(|_| ()) 285 } 286 287 async fn vm_resume(&self) -> Result<()> { 288 self.vm_action(VmAction::Resume).await.map(|_| ()) 289 } 290 291 async fn vm_shutdown(&self) -> Result<()> { 292 self.vm_action(VmAction::Shutdown).await.map(|_| ()) 293 } 294 295 async fn vm_snapshot(&self, vm_snapshot_config: String) -> Result<()> { 296 let vm_snapshot_config = serde_json::from_str(&vm_snapshot_config).map_err(api_error)?; 297 self.vm_action(VmAction::Snapshot(Arc::new(vm_snapshot_config))) 298 .await 299 .map(|_| ()) 300 } 301 302 // implementation of this function is provided by the `dbus_interface` macro 303 #[dbus_interface(signal)] 304 async fn event(ctxt: &zbus::SignalContext<'_>, event: Arc<String>) -> zbus::Result<()>; 305 } 306 307 pub fn start_dbus_thread( 308 dbus_options: DBusApiOptions, 309 api_notifier: EventFd, 310 api_sender: Sender<ApiRequest>, 311 seccomp_action: &SeccompAction, 312 exit_evt: EventFd, 313 hypervisor_type: HypervisorType, 314 ) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> { 315 let dbus_iface = DBusApi::new(api_notifier, api_sender); 316 let (connection, iface_ref) = executor::block_on(async move { 317 let conn_builder = if dbus_options.system_bus { 318 ConnectionBuilder::system()? 319 } else { 320 ConnectionBuilder::session()? 321 }; 322 323 let conn = conn_builder 324 .internal_executor(false) 325 .name(dbus_options.service_name)? 326 .serve_at(dbus_options.object_path.as_str(), dbus_iface)? 327 .build() 328 .await?; 329 330 let iface_ref = conn 331 .object_server() 332 .interface::<_, DBusApi>(dbus_options.object_path) 333 .await?; 334 335 Ok((conn, iface_ref)) 336 }) 337 .map_err(VmmError::CreateDBusSession)?; 338 339 let (send_shutdown, recv_shutdown) = oneshot::channel::<()>(); 340 let (send_done, recv_done) = oneshot::channel::<()>(); 341 342 // Retrieve seccomp filter for API thread 343 let api_seccomp_filter = get_seccomp_filter(seccomp_action, Thread::DBusApi, hypervisor_type) 344 .map_err(VmmError::CreateSeccompFilter)?; 345 346 let thread_join_handle = thread::Builder::new() 347 .name("dbus-thread".to_string()) 348 .spawn(move || { 349 // Apply seccomp filter for API thread. 350 if !api_seccomp_filter.is_empty() { 351 apply_filter(&api_seccomp_filter) 352 .map_err(VmmError::ApplySeccompFilter) 353 .map_err(|e| { 354 error!("Error applying seccomp filter: {:?}", e); 355 exit_evt.write(1).ok(); 356 e 357 })?; 358 } 359 360 std::panic::catch_unwind(AssertUnwindSafe(move || { 361 executor::block_on(async move { 362 let recv_shutdown = recv_shutdown.fuse(); 363 let executor_tick = futures::future::Fuse::terminated(); 364 futures::pin_mut!(recv_shutdown, executor_tick); 365 executor_tick.set(connection.executor().tick().fuse()); 366 367 loop { 368 futures::select! { 369 _ = executor_tick => executor_tick.set(connection.executor().tick().fuse()), 370 _ = recv_shutdown => { 371 send_done.send(()).ok(); 372 break; 373 }, 374 ret = dbus_options.event_monitor_rx.recv_async() => { 375 if let Ok(event) = ret { 376 DBusApi::event(iface_ref.signal_context(), event).await.ok(); 377 } 378 } 379 } 380 } 381 }) 382 })) 383 .map_err(|_| { 384 error!("dbus-api thread panicked"); 385 exit_evt.write(1).ok() 386 }) 387 .ok(); 388 389 Ok(()) 390 }) 391 .map_err(VmmError::DBusThreadSpawn)?; 392 393 Ok((thread_join_handle, (send_shutdown, recv_done))) 394 } 395