1 // Copyright © 2023 Sartura Ltd. 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 use super::{ApiRequest, VmAction}; 6 use crate::seccomp_filters::{get_seccomp_filter, Thread}; 7 use crate::{Error as VmmError, Result as VmmResult}; 8 use crate::{NetConfig, VmConfig}; 9 use futures::channel::oneshot; 10 use futures::{executor, FutureExt}; 11 use hypervisor::HypervisorType; 12 use seccompiler::{apply_filter, SeccompAction}; 13 use std::panic::AssertUnwindSafe; 14 use std::sync::mpsc::Sender; 15 use std::sync::{Arc, Mutex}; 16 use std::thread; 17 use vmm_sys_util::eventfd::EventFd; 18 use zbus::fdo::{self, Result}; 19 use zbus::zvariant::Optional; 20 use zbus::{dbus_interface, ConnectionBuilder}; 21 22 pub type DBusApiShutdownChannels = (oneshot::Sender<()>, oneshot::Receiver<()>); 23 24 pub struct DBusApiOptions { 25 pub service_name: String, 26 pub object_path: String, 27 pub system_bus: bool, 28 } 29 30 pub struct DBusApi { 31 api_notifier: EventFd, 32 api_sender: futures::lock::Mutex<Sender<ApiRequest>>, 33 } 34 35 fn api_error(error: impl std::fmt::Debug) -> fdo::Error { 36 fdo::Error::Failed(format!("{error:?}")) 37 } 38 39 // This method is intended to ensure that the DBusApi thread has enough time to 40 // send a response to the VmmShutdown method call before it is terminated. If 41 // this step is omitted, the thread may be terminated before it can send a 42 // response, resulting in an error message stating that the message recipient 43 // disconnected from the message bus without providing a reply. 44 pub fn dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels) { 45 let (send_shutdown, mut recv_done) = ch; 46 47 // send the shutdown signal and return 48 // if it errors out 49 if send_shutdown.send(()).is_err() { 50 return; 51 } 52 53 // loop until `recv_err` errors out 54 // or as long as the return value indicates 55 // "immediately stale" (None) 56 while let Ok(None) = recv_done.try_recv() {} 57 } 58 59 impl DBusApi { 60 pub fn new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self { 61 Self { 62 api_notifier, 63 api_sender: futures::lock::Mutex::new(api_sender), 64 } 65 } 66 67 async fn clone_api_sender(&self) -> Sender<ApiRequest> { 68 // lock the async mutex, clone the `Sender` and then immediately 69 // drop the MutexGuard so that other tasks can clone the 70 // `Sender` as well 71 self.api_sender.lock().await.clone() 72 } 73 74 fn clone_api_notifier(&self) -> Result<EventFd> { 75 self.api_notifier 76 .try_clone() 77 .map_err(|err| fdo::Error::IOError(format!("{err:?}"))) 78 } 79 80 async fn vm_action(&self, action: VmAction) -> Result<Optional<String>> { 81 let api_sender = self.clone_api_sender().await; 82 let api_notifier = self.clone_api_notifier()?; 83 84 let result = blocking::unblock(move || super::vm_action(api_notifier, api_sender, action)) 85 .await 86 .map_err(api_error)? 87 // We're using `from_utf8_lossy` here to not deal with the 88 // error case of `from_utf8` as we know that `b.body` is valid JSON. 89 .map(|b| String::from_utf8_lossy(&b.body).to_string()); 90 91 Ok(result.into()) 92 } 93 } 94 95 #[dbus_interface(name = "org.cloudhypervisor.DBusApi1")] 96 impl DBusApi { 97 async fn vmm_ping(&self) -> Result<String> { 98 let api_sender = self.clone_api_sender().await; 99 let api_notifier = self.clone_api_notifier()?; 100 101 let result = blocking::unblock(move || super::vmm_ping(api_notifier, api_sender)) 102 .await 103 .map_err(api_error)?; 104 serde_json::to_string(&result).map_err(api_error) 105 } 106 107 async fn vmm_shutdown(&self) -> Result<()> { 108 let api_sender = self.clone_api_sender().await; 109 let api_notifier = self.clone_api_notifier()?; 110 111 blocking::unblock(move || super::vmm_shutdown(api_notifier, api_sender)) 112 .await 113 .map_err(api_error) 114 } 115 116 async fn vm_add_device(&self, device_config: String) -> Result<Optional<String>> { 117 let device_config = serde_json::from_str(&device_config).map_err(api_error)?; 118 self.vm_action(VmAction::AddDevice(Arc::new(device_config))) 119 .await 120 } 121 122 async fn vm_add_disk(&self, disk_config: String) -> Result<Optional<String>> { 123 let disk_config = serde_json::from_str(&disk_config).map_err(api_error)?; 124 self.vm_action(VmAction::AddDisk(Arc::new(disk_config))) 125 .await 126 } 127 128 async fn vm_add_fs(&self, fs_config: String) -> Result<Optional<String>> { 129 let fs_config = serde_json::from_str(&fs_config).map_err(api_error)?; 130 self.vm_action(VmAction::AddFs(Arc::new(fs_config))).await 131 } 132 133 async fn vm_add_net(&self, net_config: String) -> Result<Optional<String>> { 134 let mut net_config: NetConfig = serde_json::from_str(&net_config).map_err(api_error)?; 135 if net_config.fds.is_some() { 136 warn!("Ignoring FDs sent via the D-Bus request body"); 137 net_config.fds = None; 138 } 139 self.vm_action(VmAction::AddNet(Arc::new(net_config))).await 140 } 141 142 async fn vm_add_pmem(&self, pmem_config: String) -> Result<Optional<String>> { 143 let pmem_config = serde_json::from_str(&pmem_config).map_err(api_error)?; 144 self.vm_action(VmAction::AddPmem(Arc::new(pmem_config))) 145 .await 146 } 147 148 async fn vm_add_user_device(&self, vm_add_user_device: String) -> Result<Optional<String>> { 149 let vm_add_user_device = serde_json::from_str(&vm_add_user_device).map_err(api_error)?; 150 self.vm_action(VmAction::AddUserDevice(Arc::new(vm_add_user_device))) 151 .await 152 } 153 154 async fn vm_add_vdpa(&self, vdpa_config: String) -> Result<Optional<String>> { 155 let vdpa_config = serde_json::from_str(&vdpa_config).map_err(api_error)?; 156 self.vm_action(VmAction::AddVdpa(Arc::new(vdpa_config))) 157 .await 158 } 159 160 async fn vm_add_vsock(&self, vsock_config: String) -> Result<Optional<String>> { 161 let vsock_config = serde_json::from_str(&vsock_config).map_err(api_error)?; 162 self.vm_action(VmAction::AddVsock(Arc::new(vsock_config))) 163 .await 164 } 165 166 async fn vm_boot(&self) -> Result<()> { 167 self.vm_action(VmAction::Boot).await.map(|_| ()) 168 } 169 170 #[allow(unused_variables)] 171 // zbus doesn't support cfg attributes on interface methods 172 // as a workaround, we make the *call to the internal API* conditionally 173 // compile and return an error on unsupported platforms. 174 async fn vm_coredump(&self, vm_coredump_data: String) -> Result<()> { 175 #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))] 176 { 177 let vm_coredump_data = serde_json::from_str(&vm_coredump_data).map_err(api_error)?; 178 self.vm_action(VmAction::Coredump(Arc::new(vm_coredump_data))) 179 .await 180 .map(|_| ()) 181 } 182 183 #[cfg(not(all(target_arch = "x86_64", feature = "guest_debug")))] 184 Err(api_error( 185 "VmCoredump only works on x86_64 with the `guest_debug` feature enabled", 186 )) 187 } 188 189 async fn vm_counters(&self) -> Result<Optional<String>> { 190 self.vm_action(VmAction::Counters).await 191 } 192 193 async fn vm_create(&self, vm_config: String) -> Result<()> { 194 let api_sender = self.clone_api_sender().await; 195 let api_notifier = self.clone_api_notifier()?; 196 197 let mut vm_config: VmConfig = serde_json::from_str(&vm_config).map_err(api_error)?; 198 199 if let Some(ref mut nets) = vm_config.net { 200 if nets.iter().any(|net| net.fds.is_some()) { 201 warn!("Ignoring FDs sent via the D-Bus request body"); 202 } 203 for net in nets { 204 net.fds = None; 205 } 206 } 207 208 blocking::unblock(move || { 209 super::vm_create(api_notifier, api_sender, Arc::new(Mutex::new(vm_config))) 210 }) 211 .await 212 .map_err(api_error)?; 213 214 Ok(()) 215 } 216 217 async fn vm_delete(&self) -> Result<()> { 218 self.vm_action(VmAction::Delete).await.map(|_| ()) 219 } 220 221 async fn vm_info(&self) -> Result<String> { 222 let api_sender = self.clone_api_sender().await; 223 let api_notifier = self.clone_api_notifier()?; 224 225 let result = blocking::unblock(move || super::vm_info(api_notifier, api_sender)) 226 .await 227 .map_err(api_error)?; 228 serde_json::to_string(&result).map_err(api_error) 229 } 230 231 async fn vm_pause(&self) -> Result<()> { 232 self.vm_action(VmAction::Pause).await.map(|_| ()) 233 } 234 235 async fn vm_power_button(&self) -> Result<()> { 236 self.vm_action(VmAction::PowerButton).await.map(|_| ()) 237 } 238 239 async fn vm_reboot(&self) -> Result<()> { 240 self.vm_action(VmAction::Reboot).await.map(|_| ()) 241 } 242 243 async fn vm_remove_device(&self, vm_remove_device: String) -> Result<()> { 244 let vm_remove_device = serde_json::from_str(&vm_remove_device).map_err(api_error)?; 245 self.vm_action(VmAction::RemoveDevice(Arc::new(vm_remove_device))) 246 .await 247 .map(|_| ()) 248 } 249 250 async fn vm_resize(&self, vm_resize: String) -> Result<()> { 251 let vm_resize = serde_json::from_str(&vm_resize).map_err(api_error)?; 252 self.vm_action(VmAction::Resize(Arc::new(vm_resize))) 253 .await 254 .map(|_| ()) 255 } 256 257 async fn vm_resize_zone(&self, vm_resize_zone: String) -> Result<()> { 258 let vm_resize_zone = serde_json::from_str(&vm_resize_zone).map_err(api_error)?; 259 self.vm_action(VmAction::ResizeZone(Arc::new(vm_resize_zone))) 260 .await 261 .map(|_| ()) 262 } 263 264 async fn vm_restore(&self, restore_config: String) -> Result<()> { 265 let restore_config = serde_json::from_str(&restore_config).map_err(api_error)?; 266 self.vm_action(VmAction::Restore(Arc::new(restore_config))) 267 .await 268 .map(|_| ()) 269 } 270 271 async fn vm_receive_migration(&self, receive_migration_data: String) -> Result<()> { 272 let receive_migration_data = 273 serde_json::from_str(&receive_migration_data).map_err(api_error)?; 274 self.vm_action(VmAction::ReceiveMigration(Arc::new(receive_migration_data))) 275 .await 276 .map(|_| ()) 277 } 278 279 async fn vm_send_migration(&self, send_migration_data: String) -> Result<()> { 280 let send_migration_data = serde_json::from_str(&send_migration_data).map_err(api_error)?; 281 self.vm_action(VmAction::SendMigration(Arc::new(send_migration_data))) 282 .await 283 .map(|_| ()) 284 } 285 286 async fn vm_resume(&self) -> Result<()> { 287 self.vm_action(VmAction::Resume).await.map(|_| ()) 288 } 289 290 async fn vm_shutdown(&self) -> Result<()> { 291 self.vm_action(VmAction::Shutdown).await.map(|_| ()) 292 } 293 294 async fn vm_snapshot(&self, vm_snapshot_config: String) -> Result<()> { 295 let vm_snapshot_config = serde_json::from_str(&vm_snapshot_config).map_err(api_error)?; 296 self.vm_action(VmAction::Snapshot(Arc::new(vm_snapshot_config))) 297 .await 298 .map(|_| ()) 299 } 300 } 301 302 pub fn start_dbus_thread( 303 dbus_options: DBusApiOptions, 304 api_notifier: EventFd, 305 api_sender: Sender<ApiRequest>, 306 seccomp_action: &SeccompAction, 307 exit_evt: EventFd, 308 hypervisor_type: HypervisorType, 309 ) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> { 310 let dbus_iface = DBusApi::new(api_notifier, api_sender); 311 let connection = executor::block_on(async move { 312 let conn_builder = if dbus_options.system_bus { 313 ConnectionBuilder::system()? 314 } else { 315 ConnectionBuilder::session()? 316 }; 317 318 conn_builder 319 .internal_executor(false) 320 .name(dbus_options.service_name)? 321 .serve_at(dbus_options.object_path, dbus_iface)? 322 .build() 323 .await 324 }) 325 .map_err(VmmError::CreateDBusSession)?; 326 327 let (send_shutdown, recv_shutdown) = oneshot::channel::<()>(); 328 let (send_done, recv_done) = oneshot::channel::<()>(); 329 330 // Retrieve seccomp filter for API thread 331 let api_seccomp_filter = get_seccomp_filter(seccomp_action, Thread::DBusApi, hypervisor_type) 332 .map_err(VmmError::CreateSeccompFilter)?; 333 334 let thread_join_handle = thread::Builder::new() 335 .name("dbus-thread".to_string()) 336 .spawn(move || { 337 // Apply seccomp filter for API thread. 338 if !api_seccomp_filter.is_empty() { 339 apply_filter(&api_seccomp_filter) 340 .map_err(VmmError::ApplySeccompFilter) 341 .map_err(|e| { 342 error!("Error applying seccomp filter: {:?}", e); 343 exit_evt.write(1).ok(); 344 e 345 })?; 346 } 347 348 std::panic::catch_unwind(AssertUnwindSafe(move || { 349 executor::block_on(async move { 350 let recv_shutdown = recv_shutdown.fuse(); 351 let executor_tick = futures::future::Fuse::terminated(); 352 futures::pin_mut!(recv_shutdown, executor_tick); 353 executor_tick.set(connection.executor().tick().fuse()); 354 355 loop { 356 futures::select! { 357 _ = executor_tick => executor_tick.set(connection.executor().tick().fuse()), 358 _ = recv_shutdown => { 359 send_done.send(()).ok(); 360 break; 361 }, 362 } 363 } 364 }) 365 })) 366 .map_err(|_| { 367 error!("dbus-api thread panicked"); 368 exit_evt.write(1).ok() 369 }) 370 .ok(); 371 372 Ok(()) 373 }) 374 .map_err(VmmError::DBusThreadSpawn)?; 375 376 Ok((thread_join_handle, (send_shutdown, recv_done))) 377 } 378