1 // Copyright © 2019 Intel Corporation 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 6 #[macro_use] 7 extern crate event_monitor; 8 #[macro_use] 9 extern crate lazy_static; 10 #[macro_use] 11 extern crate log; 12 #[macro_use] 13 extern crate serde_derive; 14 #[cfg(test)] 15 #[macro_use] 16 extern crate credibility; 17 18 use crate::api::{ 19 ApiError, ApiRequest, ApiResponse, ApiResponsePayload, VmInfo, VmReceiveMigrationData, 20 VmSendMigrationData, VmmPingResponse, 21 }; 22 use crate::config::{ 23 DeviceConfig, DiskConfig, FsConfig, NetConfig, PmemConfig, RestoreConfig, VmConfig, VsockConfig, 24 }; 25 use crate::migration::{get_vm_snapshot, recv_vm_snapshot}; 26 use crate::seccomp_filters::{get_seccomp_filter, Thread}; 27 use crate::vm::{Error as VmError, Vm, VmState}; 28 use anyhow::anyhow; 29 use libc::EFD_NONBLOCK; 30 use seccomp::{SeccompAction, SeccompFilter}; 31 use serde::ser::{Serialize, SerializeStruct, Serializer}; 32 use std::fs::File; 33 use std::io; 34 use std::io::{Read, Write}; 35 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; 36 use std::os::unix::net::UnixListener; 37 use std::os::unix::net::UnixStream; 38 use std::path::PathBuf; 39 use std::sync::mpsc::{Receiver, RecvError, SendError, Sender}; 40 use std::sync::{Arc, Mutex}; 41 use std::{result, thread}; 42 use thiserror::Error; 43 use vm_memory::bitmap::AtomicBitmap; 44 use vm_migration::protocol::*; 45 use vm_migration::{MigratableError, Pausable, Snapshot, Snapshottable, Transportable}; 46 use vmm_sys_util::eventfd::EventFd; 47 48 pub mod api; 49 pub mod config; 50 pub mod cpu; 51 pub mod device_manager; 52 pub mod device_tree; 53 pub mod interrupt; 54 pub mod memory_manager; 55 pub mod migration; 56 pub mod seccomp_filters; 57 pub mod vm; 58 59 #[cfg(feature = "acpi")] 60 mod acpi; 61 62 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<AtomicBitmap>; 63 type GuestRegionMmap = vm_memory::GuestRegionMmap<AtomicBitmap>; 64 65 /// Errors associated with VMM management 66 #[derive(Debug, Error)] 67 #[allow(clippy::large_enum_variant)] 68 pub enum Error { 69 /// API request receive error 70 #[error("Error receiving API request: {0}")] 71 ApiRequestRecv(#[source] RecvError), 72 73 /// API response send error 74 #[error("Error sending API request: {0}")] 75 ApiResponseSend(#[source] SendError<ApiResponse>), 76 77 /// Cannot bind to the UNIX domain socket path 78 #[error("Error binding to UNIX domain socket: {0}")] 79 Bind(#[source] io::Error), 80 81 /// Cannot clone EventFd. 82 #[error("Error cloning EventFd: {0}")] 83 EventFdClone(#[source] io::Error), 84 85 /// Cannot create EventFd. 86 #[error("Error creating EventFd: {0}")] 87 EventFdCreate(#[source] io::Error), 88 89 /// Cannot read from EventFd. 90 #[error("Error reading from EventFd: {0}")] 91 EventFdRead(#[source] io::Error), 92 93 /// Cannot create epoll context. 94 #[error("Error creating epoll context: {0}")] 95 Epoll(#[source] io::Error), 96 97 /// Cannot create HTTP thread 98 #[error("Error spawning HTTP thread: {0}")] 99 HttpThreadSpawn(#[source] io::Error), 100 101 /// Cannot handle the VM STDIN stream 102 #[error("Error handling VM stdin: {0:?}")] 103 Stdin(VmError), 104 105 /// Cannot handle the VM pty stream 106 #[error("Error handling VM pty: {0:?}")] 107 Pty(VmError), 108 109 /// Cannot reboot the VM 110 #[error("Error rebooting VM: {0:?}")] 111 VmReboot(VmError), 112 113 /// Cannot create VMM thread 114 #[error("Error spawning VMM thread {0:?}")] 115 VmmThreadSpawn(#[source] io::Error), 116 117 /// Cannot shut the VMM down 118 #[error("Error shutting down VMM: {0:?}")] 119 VmmShutdown(VmError), 120 121 /// Cannot create seccomp filter 122 #[error("Error creating seccomp filter: {0}")] 123 CreateSeccompFilter(seccomp::SeccompError), 124 125 /// Cannot apply seccomp filter 126 #[error("Error applying seccomp filter: {0}")] 127 ApplySeccompFilter(seccomp::Error), 128 129 /// Error activating virtio devices 130 #[error("Error activating virtio devices: {0:?}")] 131 ActivateVirtioDevices(VmError), 132 133 /// Error creating API server 134 #[error("Error creating API server {0:?}")] 135 CreateApiServer(micro_http::ServerError), 136 137 /// Error binding API server socket 138 #[error("Error creation API server's socket {0:?}")] 139 CreateApiServerSocket(#[source] io::Error), 140 } 141 pub type Result<T> = result::Result<T, Error>; 142 143 #[derive(Debug, Clone, Copy, PartialEq)] 144 pub enum EpollDispatch { 145 Exit, 146 Reset, 147 Stdin, 148 Api, 149 ActivateVirtioDevices, 150 Pty, 151 } 152 153 pub struct EpollContext { 154 epoll_file: File, 155 dispatch_table: Vec<Option<EpollDispatch>>, 156 } 157 158 impl EpollContext { 159 pub fn new() -> result::Result<EpollContext, io::Error> { 160 let epoll_fd = epoll::create(true)?; 161 // Use 'File' to enforce closing on 'epoll_fd' 162 let epoll_file = unsafe { File::from_raw_fd(epoll_fd) }; 163 164 // Initial capacity needs to be large enough to hold: 165 // * 1 exit event 166 // * 1 reset event 167 // * 1 stdin event 168 // * 1 API event 169 let mut dispatch_table = Vec::with_capacity(5); 170 dispatch_table.push(None); 171 172 Ok(EpollContext { 173 epoll_file, 174 dispatch_table, 175 }) 176 } 177 178 pub fn add_stdin(&mut self) -> result::Result<(), io::Error> { 179 let dispatch_index = self.dispatch_table.len() as u64; 180 epoll::ctl( 181 self.epoll_file.as_raw_fd(), 182 epoll::ControlOptions::EPOLL_CTL_ADD, 183 libc::STDIN_FILENO, 184 epoll::Event::new(epoll::Events::EPOLLIN, dispatch_index), 185 )?; 186 187 self.dispatch_table.push(Some(EpollDispatch::Stdin)); 188 189 Ok(()) 190 } 191 192 fn add_event<T>(&mut self, fd: &T, token: EpollDispatch) -> result::Result<(), io::Error> 193 where 194 T: AsRawFd, 195 { 196 let dispatch_index = self.dispatch_table.len() as u64; 197 epoll::ctl( 198 self.epoll_file.as_raw_fd(), 199 epoll::ControlOptions::EPOLL_CTL_ADD, 200 fd.as_raw_fd(), 201 epoll::Event::new(epoll::Events::EPOLLIN, dispatch_index), 202 )?; 203 self.dispatch_table.push(Some(token)); 204 205 Ok(()) 206 } 207 } 208 209 impl AsRawFd for EpollContext { 210 fn as_raw_fd(&self) -> RawFd { 211 self.epoll_file.as_raw_fd() 212 } 213 } 214 215 pub struct PciDeviceInfo { 216 pub id: String, 217 pub bdf: u32, 218 } 219 220 impl Serialize for PciDeviceInfo { 221 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> 222 where 223 S: Serializer, 224 { 225 // Transform the PCI b/d/f into a standardized string. 226 let segment = (self.bdf >> 16) & 0xffff; 227 let bus = (self.bdf >> 8) & 0xff; 228 let device = (self.bdf >> 3) & 0x1f; 229 let function = self.bdf & 0x7; 230 let bdf_str = format!( 231 "{:04x}:{:02x}:{:02x}.{:01x}", 232 segment, bus, device, function 233 ); 234 235 // Serialize the structure. 236 let mut state = serializer.serialize_struct("PciDeviceInfo", 2)?; 237 state.serialize_field("id", &self.id)?; 238 state.serialize_field("bdf", &bdf_str)?; 239 state.end() 240 } 241 } 242 243 #[allow(clippy::too_many_arguments)] 244 pub fn start_vmm_thread( 245 vmm_version: String, 246 http_path: &Option<String>, 247 http_fd: Option<RawFd>, 248 api_event: EventFd, 249 api_sender: Sender<ApiRequest>, 250 api_receiver: Receiver<ApiRequest>, 251 seccomp_action: &SeccompAction, 252 hypervisor: Arc<dyn hypervisor::Hypervisor>, 253 ) -> Result<thread::JoinHandle<Result<()>>> { 254 let http_api_event = api_event.try_clone().map_err(Error::EventFdClone)?; 255 256 // Retrieve seccomp filter 257 let vmm_seccomp_filter = 258 get_seccomp_filter(seccomp_action, Thread::Vmm).map_err(Error::CreateSeccompFilter)?; 259 260 let vmm_seccomp_action = seccomp_action.clone(); 261 let thread = thread::Builder::new() 262 .name("vmm".to_string()) 263 .spawn(move || { 264 // Apply seccomp filter for VMM thread. 265 SeccompFilter::apply(vmm_seccomp_filter).map_err(Error::ApplySeccompFilter)?; 266 267 let mut vmm = Vmm::new( 268 vmm_version.to_string(), 269 api_event, 270 vmm_seccomp_action, 271 hypervisor, 272 )?; 273 274 vmm.control_loop(Arc::new(api_receiver)) 275 }) 276 .map_err(Error::VmmThreadSpawn)?; 277 278 // The VMM thread is started, we can start serving HTTP requests 279 if let Some(http_path) = http_path { 280 api::start_http_path_thread(http_path, http_api_event, api_sender, seccomp_action)?; 281 } else if let Some(http_fd) = http_fd { 282 api::start_http_fd_thread(http_fd, http_api_event, api_sender, seccomp_action)?; 283 } 284 Ok(thread) 285 } 286 287 pub struct Vmm { 288 epoll: EpollContext, 289 exit_evt: EventFd, 290 reset_evt: EventFd, 291 api_evt: EventFd, 292 version: String, 293 vm: Option<Vm>, 294 vm_config: Option<Arc<Mutex<VmConfig>>>, 295 seccomp_action: SeccompAction, 296 hypervisor: Arc<dyn hypervisor::Hypervisor>, 297 activate_evt: EventFd, 298 } 299 300 impl Vmm { 301 fn new( 302 vmm_version: String, 303 api_evt: EventFd, 304 seccomp_action: SeccompAction, 305 hypervisor: Arc<dyn hypervisor::Hypervisor>, 306 ) -> Result<Self> { 307 let mut epoll = EpollContext::new().map_err(Error::Epoll)?; 308 let exit_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?; 309 let reset_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?; 310 let activate_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?; 311 312 if unsafe { libc::isatty(libc::STDIN_FILENO as i32) } != 0 { 313 epoll.add_stdin().map_err(Error::Epoll)?; 314 } 315 316 epoll 317 .add_event(&exit_evt, EpollDispatch::Exit) 318 .map_err(Error::Epoll)?; 319 320 epoll 321 .add_event(&reset_evt, EpollDispatch::Reset) 322 .map_err(Error::Epoll)?; 323 324 epoll 325 .add_event(&activate_evt, EpollDispatch::ActivateVirtioDevices) 326 .map_err(Error::Epoll)?; 327 328 epoll 329 .add_event(&api_evt, EpollDispatch::Api) 330 .map_err(Error::Epoll)?; 331 332 Ok(Vmm { 333 epoll, 334 exit_evt, 335 reset_evt, 336 api_evt, 337 version: vmm_version, 338 vm: None, 339 vm_config: None, 340 seccomp_action, 341 hypervisor, 342 activate_evt, 343 }) 344 } 345 346 fn vm_create(&mut self, config: Arc<Mutex<VmConfig>>) -> result::Result<(), VmError> { 347 // We only store the passed VM config. 348 // The VM will be created when being asked to boot it. 349 if self.vm_config.is_none() { 350 self.vm_config = Some(config); 351 Ok(()) 352 } else { 353 Err(VmError::VmAlreadyCreated) 354 } 355 } 356 357 fn vm_boot(&mut self) -> result::Result<(), VmError> { 358 // If we don't have a config, we can not boot a VM. 359 if self.vm_config.is_none() { 360 return Err(VmError::VmMissingConfig); 361 }; 362 363 // Create a new VM if we don't have one yet. 364 if self.vm.is_none() { 365 let exit_evt = self.exit_evt.try_clone().map_err(VmError::EventFdClone)?; 366 let reset_evt = self.reset_evt.try_clone().map_err(VmError::EventFdClone)?; 367 let activate_evt = self 368 .activate_evt 369 .try_clone() 370 .map_err(VmError::EventFdClone)?; 371 372 if let Some(ref vm_config) = self.vm_config { 373 let vm = Vm::new( 374 Arc::clone(vm_config), 375 exit_evt, 376 reset_evt, 377 &self.seccomp_action, 378 self.hypervisor.clone(), 379 activate_evt, 380 None, 381 None, 382 )?; 383 if let Some(serial_pty) = vm.serial_pty() { 384 self.epoll 385 .add_event(&serial_pty.main, EpollDispatch::Pty) 386 .map_err(VmError::EventfdError)?; 387 }; 388 if let Some(console_pty) = vm.console_pty() { 389 self.epoll 390 .add_event(&console_pty.main, EpollDispatch::Pty) 391 .map_err(VmError::EventfdError)?; 392 }; 393 self.vm = Some(vm); 394 } 395 } 396 397 // Now we can boot the VM. 398 if let Some(ref mut vm) = self.vm { 399 vm.boot() 400 } else { 401 Err(VmError::VmNotCreated) 402 } 403 } 404 405 fn vm_pause(&mut self) -> result::Result<(), VmError> { 406 if let Some(ref mut vm) = self.vm { 407 vm.pause().map_err(VmError::Pause) 408 } else { 409 Err(VmError::VmNotRunning) 410 } 411 } 412 413 fn vm_resume(&mut self) -> result::Result<(), VmError> { 414 if let Some(ref mut vm) = self.vm { 415 vm.resume().map_err(VmError::Resume) 416 } else { 417 Err(VmError::VmNotRunning) 418 } 419 } 420 421 fn vm_snapshot(&mut self, destination_url: &str) -> result::Result<(), VmError> { 422 if let Some(ref mut vm) = self.vm { 423 vm.snapshot() 424 .map_err(VmError::Snapshot) 425 .and_then(|snapshot| { 426 vm.send(&snapshot, destination_url) 427 .map_err(VmError::SnapshotSend) 428 }) 429 } else { 430 Err(VmError::VmNotRunning) 431 } 432 } 433 434 fn vm_restore(&mut self, restore_cfg: RestoreConfig) -> result::Result<(), VmError> { 435 if self.vm.is_some() || self.vm_config.is_some() { 436 return Err(VmError::VmAlreadyCreated); 437 } 438 439 let source_url = restore_cfg.source_url.as_path().to_str(); 440 if source_url.is_none() { 441 return Err(VmError::RestoreSourceUrlPathToStr); 442 } 443 // Safe to unwrap as we checked it was Some(&str). 444 let source_url = source_url.unwrap(); 445 446 let snapshot = recv_vm_snapshot(source_url).map_err(VmError::Restore)?; 447 let vm_snapshot = get_vm_snapshot(&snapshot).map_err(VmError::Restore)?; 448 449 self.vm_config = Some(Arc::clone(&vm_snapshot.config)); 450 451 let exit_evt = self.exit_evt.try_clone().map_err(VmError::EventFdClone)?; 452 let reset_evt = self.reset_evt.try_clone().map_err(VmError::EventFdClone)?; 453 let activate_evt = self 454 .activate_evt 455 .try_clone() 456 .map_err(VmError::EventFdClone)?; 457 458 let vm = Vm::new_from_snapshot( 459 &snapshot, 460 exit_evt, 461 reset_evt, 462 Some(source_url), 463 restore_cfg.prefault, 464 &self.seccomp_action, 465 self.hypervisor.clone(), 466 activate_evt, 467 )?; 468 self.vm = Some(vm); 469 470 // Now we can restore the rest of the VM. 471 if let Some(ref mut vm) = self.vm { 472 vm.restore(snapshot).map_err(VmError::Restore) 473 } else { 474 Err(VmError::VmNotCreated) 475 } 476 } 477 478 fn vm_shutdown(&mut self) -> result::Result<(), VmError> { 479 if let Some(ref mut vm) = self.vm.take() { 480 vm.shutdown() 481 } else { 482 Err(VmError::VmNotRunning) 483 } 484 } 485 486 fn vm_reboot(&mut self) -> result::Result<(), VmError> { 487 // Without ACPI, a reset is equivalent to a shutdown 488 // On AArch64, before ACPI is supported, we simply jump over this check and continue to reset. 489 #[cfg(all(target_arch = "x86_64", not(feature = "acpi")))] 490 { 491 if self.vm.is_some() { 492 self.exit_evt.write(1).unwrap(); 493 return Ok(()); 494 } 495 } 496 497 // First we stop the current VM and create a new one. 498 if let Some(ref mut vm) = self.vm { 499 let config = vm.get_config(); 500 let serial_pty = vm.serial_pty(); 501 let console_pty = vm.console_pty(); 502 self.vm_shutdown()?; 503 504 let exit_evt = self.exit_evt.try_clone().map_err(VmError::EventFdClone)?; 505 let reset_evt = self.reset_evt.try_clone().map_err(VmError::EventFdClone)?; 506 let activate_evt = self 507 .activate_evt 508 .try_clone() 509 .map_err(VmError::EventFdClone)?; 510 511 // The Linux kernel fires off an i8042 reset after doing the ACPI reset so there may be 512 // an event sitting in the shared reset_evt. Without doing this we get very early reboots 513 // during the boot process. 514 if self.reset_evt.read().is_ok() { 515 warn!("Spurious second reset event received. Ignoring."); 516 } 517 self.vm = Some(Vm::new( 518 config, 519 exit_evt, 520 reset_evt, 521 &self.seccomp_action, 522 self.hypervisor.clone(), 523 activate_evt, 524 serial_pty, 525 console_pty, 526 )?); 527 } 528 529 // Then we start the new VM. 530 if let Some(ref mut vm) = self.vm { 531 vm.boot() 532 } else { 533 Err(VmError::VmNotCreated) 534 } 535 } 536 537 fn vm_info(&self) -> result::Result<VmInfo, VmError> { 538 match &self.vm_config { 539 Some(config) => { 540 let state = match &self.vm { 541 Some(vm) => vm.get_state()?, 542 None => VmState::Created, 543 }; 544 545 let config = Arc::clone(config); 546 547 let mut memory_actual_size = config.lock().unwrap().memory.total_size(); 548 if let Some(vm) = &self.vm { 549 memory_actual_size -= vm.balloon_size(); 550 } 551 552 let device_tree = self.vm.as_ref().map(|vm| vm.device_tree()); 553 554 Ok(VmInfo { 555 config, 556 state, 557 memory_actual_size, 558 device_tree, 559 }) 560 } 561 None => Err(VmError::VmNotCreated), 562 } 563 } 564 565 fn vmm_ping(&self) -> VmmPingResponse { 566 VmmPingResponse { 567 version: self.version.clone(), 568 } 569 } 570 571 fn vm_delete(&mut self) -> result::Result<(), VmError> { 572 if self.vm_config.is_none() { 573 return Ok(()); 574 } 575 576 // If a VM is booted, we first try to shut it down. 577 if self.vm.is_some() { 578 self.vm_shutdown()?; 579 } 580 581 self.vm_config = None; 582 583 event!("vm", "deleted"); 584 585 Ok(()) 586 } 587 588 fn vmm_shutdown(&mut self) -> result::Result<(), VmError> { 589 self.vm_delete()?; 590 event!("vmm", "shutdown"); 591 Ok(()) 592 } 593 594 fn vm_resize( 595 &mut self, 596 desired_vcpus: Option<u8>, 597 desired_ram: Option<u64>, 598 desired_balloon: Option<u64>, 599 ) -> result::Result<(), VmError> { 600 if let Some(ref mut vm) = self.vm { 601 if let Err(e) = vm.resize(desired_vcpus, desired_ram, desired_balloon) { 602 error!("Error when resizing VM: {:?}", e); 603 Err(e) 604 } else { 605 Ok(()) 606 } 607 } else { 608 Err(VmError::VmNotRunning) 609 } 610 } 611 612 fn vm_resize_zone(&mut self, id: String, desired_ram: u64) -> result::Result<(), VmError> { 613 if let Some(ref mut vm) = self.vm { 614 if let Err(e) = vm.resize_zone(id, desired_ram) { 615 error!("Error when resizing VM: {:?}", e); 616 Err(e) 617 } else { 618 Ok(()) 619 } 620 } else { 621 Err(VmError::VmNotRunning) 622 } 623 } 624 625 fn vm_add_device(&mut self, device_cfg: DeviceConfig) -> result::Result<Vec<u8>, VmError> { 626 if let Some(ref mut vm) = self.vm { 627 let info = vm.add_device(device_cfg).map_err(|e| { 628 error!("Error when adding new device to the VM: {:?}", e); 629 e 630 })?; 631 serde_json::to_vec(&info).map_err(VmError::SerializeJson) 632 } else { 633 Err(VmError::VmNotRunning) 634 } 635 } 636 637 fn vm_remove_device(&mut self, id: String) -> result::Result<(), VmError> { 638 if let Some(ref mut vm) = self.vm { 639 if let Err(e) = vm.remove_device(id) { 640 error!("Error when removing new device to the VM: {:?}", e); 641 Err(e) 642 } else { 643 Ok(()) 644 } 645 } else { 646 Err(VmError::VmNotRunning) 647 } 648 } 649 650 fn vm_add_disk(&mut self, disk_cfg: DiskConfig) -> result::Result<Vec<u8>, VmError> { 651 if let Some(ref mut vm) = self.vm { 652 let info = vm.add_disk(disk_cfg).map_err(|e| { 653 error!("Error when adding new disk to the VM: {:?}", e); 654 e 655 })?; 656 serde_json::to_vec(&info).map_err(VmError::SerializeJson) 657 } else { 658 Err(VmError::VmNotRunning) 659 } 660 } 661 662 fn vm_add_fs(&mut self, fs_cfg: FsConfig) -> result::Result<Vec<u8>, VmError> { 663 if let Some(ref mut vm) = self.vm { 664 let info = vm.add_fs(fs_cfg).map_err(|e| { 665 error!("Error when adding new fs to the VM: {:?}", e); 666 e 667 })?; 668 serde_json::to_vec(&info).map_err(VmError::SerializeJson) 669 } else { 670 Err(VmError::VmNotRunning) 671 } 672 } 673 674 fn vm_add_pmem(&mut self, pmem_cfg: PmemConfig) -> result::Result<Vec<u8>, VmError> { 675 if let Some(ref mut vm) = self.vm { 676 let info = vm.add_pmem(pmem_cfg).map_err(|e| { 677 error!("Error when adding new pmem device to the VM: {:?}", e); 678 e 679 })?; 680 serde_json::to_vec(&info).map_err(VmError::SerializeJson) 681 } else { 682 Err(VmError::VmNotRunning) 683 } 684 } 685 686 fn vm_add_net(&mut self, net_cfg: NetConfig) -> result::Result<Vec<u8>, VmError> { 687 if let Some(ref mut vm) = self.vm { 688 let info = vm.add_net(net_cfg).map_err(|e| { 689 error!("Error when adding new network device to the VM: {:?}", e); 690 e 691 })?; 692 serde_json::to_vec(&info).map_err(VmError::SerializeJson) 693 } else { 694 Err(VmError::VmNotRunning) 695 } 696 } 697 698 fn vm_add_vsock(&mut self, vsock_cfg: VsockConfig) -> result::Result<Vec<u8>, VmError> { 699 if let Some(ref mut vm) = self.vm { 700 let info = vm.add_vsock(vsock_cfg).map_err(|e| { 701 error!("Error when adding new vsock device to the VM: {:?}", e); 702 e 703 })?; 704 serde_json::to_vec(&info).map_err(VmError::SerializeJson) 705 } else { 706 Err(VmError::VmNotRunning) 707 } 708 } 709 710 fn vm_counters(&mut self) -> result::Result<Vec<u8>, VmError> { 711 if let Some(ref mut vm) = self.vm { 712 let info = vm.counters().map_err(|e| { 713 error!("Error when getting counters from the VM: {:?}", e); 714 e 715 })?; 716 serde_json::to_vec(&info).map_err(VmError::SerializeJson) 717 } else { 718 Err(VmError::VmNotRunning) 719 } 720 } 721 722 fn vm_power_button(&mut self) -> result::Result<(), VmError> { 723 if let Some(ref mut vm) = self.vm { 724 vm.power_button() 725 } else { 726 Err(VmError::VmNotRunning) 727 } 728 } 729 730 fn vm_receive_config<T>( 731 &mut self, 732 req: &Request, 733 socket: &mut T, 734 ) -> std::result::Result<Vm, MigratableError> 735 where 736 T: Read + Write, 737 { 738 // Read in config data 739 let mut data = Vec::with_capacity(req.length() as usize); 740 unsafe { 741 data.set_len(req.length() as usize); 742 } 743 socket 744 .read_exact(&mut data) 745 .map_err(MigratableError::MigrateSocket)?; 746 let config: VmConfig = serde_json::from_slice(&data).map_err(|e| { 747 MigratableError::MigrateReceive(anyhow!("Error deserialising config: {}", e)) 748 })?; 749 750 let exit_evt = self.exit_evt.try_clone().map_err(|e| { 751 MigratableError::MigrateReceive(anyhow!("Error cloning exit EventFd: {}", e)) 752 })?; 753 let reset_evt = self.reset_evt.try_clone().map_err(|e| { 754 MigratableError::MigrateReceive(anyhow!("Error cloning reset EventFd: {}", e)) 755 })?; 756 let activate_evt = self.activate_evt.try_clone().map_err(|e| { 757 MigratableError::MigrateReceive(anyhow!("Error cloning activate EventFd: {}", e)) 758 })?; 759 760 self.vm_config = Some(Arc::new(Mutex::new(config))); 761 let vm = Vm::new_from_migration( 762 self.vm_config.clone().unwrap(), 763 exit_evt, 764 reset_evt, 765 &self.seccomp_action, 766 self.hypervisor.clone(), 767 activate_evt, 768 ) 769 .map_err(|e| { 770 MigratableError::MigrateReceive(anyhow!("Error creating VM from snapshot: {:?}", e)) 771 })?; 772 773 Response::ok().write_to(socket)?; 774 775 Ok(vm) 776 } 777 778 fn vm_receive_state<T>( 779 &mut self, 780 req: &Request, 781 socket: &mut T, 782 mut vm: Vm, 783 ) -> std::result::Result<(), MigratableError> 784 where 785 T: Read + Write, 786 { 787 // Read in state data 788 let mut data = Vec::with_capacity(req.length() as usize); 789 unsafe { 790 data.set_len(req.length() as usize); 791 } 792 socket 793 .read_exact(&mut data) 794 .map_err(MigratableError::MigrateSocket)?; 795 let snapshot: Snapshot = serde_json::from_slice(&data).map_err(|e| { 796 MigratableError::MigrateReceive(anyhow!("Error deserialising snapshot: {}", e)) 797 })?; 798 799 #[cfg(all(feature = "kvm", target_arch = "x86_64"))] 800 vm.load_clock_from_snapshot(&snapshot) 801 .map_err(|e| MigratableError::MigrateReceive(anyhow!("Error resume clock: {:?}", e)))?; 802 803 // Create VM 804 vm.restore(snapshot).map_err(|e| { 805 Response::error().write_to(socket).ok(); 806 e 807 })?; 808 self.vm = Some(vm); 809 810 Response::ok().write_to(socket)?; 811 812 Ok(()) 813 } 814 815 fn vm_receive_memory<T>( 816 &mut self, 817 req: &Request, 818 socket: &mut T, 819 vm: &mut Vm, 820 ) -> std::result::Result<(), MigratableError> 821 where 822 T: Read + Write, 823 { 824 // Read table 825 let table = MemoryRangeTable::read_from(socket, req.length())?; 826 827 // And then read the memory itself 828 vm.receive_memory_regions(&table, socket).map_err(|e| { 829 Response::error().write_to(socket).ok(); 830 e 831 })?; 832 Response::ok().write_to(socket)?; 833 Ok(()) 834 } 835 836 fn socket_url_to_path(url: &str) -> result::Result<PathBuf, MigratableError> { 837 url.strip_prefix("unix:") 838 .ok_or_else(|| { 839 MigratableError::MigrateSend(anyhow!("Could not extract path from URL: {}", url)) 840 }) 841 .map(|s| s.into()) 842 } 843 844 fn vm_receive_migration( 845 &mut self, 846 receive_data_migration: VmReceiveMigrationData, 847 ) -> result::Result<(), MigratableError> { 848 info!( 849 "Receiving migration: receiver_url = {}", 850 receive_data_migration.receiver_url 851 ); 852 853 let path = Self::socket_url_to_path(&receive_data_migration.receiver_url)?; 854 let listener = UnixListener::bind(&path).map_err(|e| { 855 MigratableError::MigrateReceive(anyhow!("Error binding to UNIX socket: {}", e)) 856 })?; 857 let (mut socket, _addr) = listener.accept().map_err(|e| { 858 MigratableError::MigrateReceive(anyhow!("Error accepting on UNIX socket: {}", e)) 859 })?; 860 std::fs::remove_file(&path).map_err(|e| { 861 MigratableError::MigrateReceive(anyhow!("Error unlinking UNIX socket: {}", e)) 862 })?; 863 864 let mut started = false; 865 let mut vm: Option<Vm> = None; 866 867 loop { 868 let req = Request::read_from(&mut socket)?; 869 match req.command() { 870 Command::Invalid => info!("Invalid Command Received"), 871 Command::Start => { 872 info!("Start Command Received"); 873 started = true; 874 875 Response::ok().write_to(&mut socket)?; 876 } 877 Command::Config => { 878 info!("Config Command Received"); 879 880 if !started { 881 warn!("Migration not started yet"); 882 Response::error().write_to(&mut socket)?; 883 continue; 884 } 885 vm = Some(self.vm_receive_config(&req, &mut socket)?); 886 } 887 Command::State => { 888 info!("State Command Received"); 889 890 if !started { 891 warn!("Migration not started yet"); 892 Response::error().write_to(&mut socket)?; 893 continue; 894 } 895 if let Some(vm) = vm.take() { 896 self.vm_receive_state(&req, &mut socket, vm)?; 897 } else { 898 warn!("Configuration not sent yet"); 899 Response::error().write_to(&mut socket)?; 900 } 901 } 902 Command::Memory => { 903 info!("Memory Command Received"); 904 905 if !started { 906 warn!("Migration not started yet"); 907 Response::error().write_to(&mut socket)?; 908 continue; 909 } 910 if let Some(ref mut vm) = vm.as_mut() { 911 self.vm_receive_memory(&req, &mut socket, vm)?; 912 } else { 913 warn!("Configuration not sent yet"); 914 Response::error().write_to(&mut socket)?; 915 } 916 } 917 Command::Complete => { 918 info!("Complete Command Received"); 919 if let Some(ref mut vm) = self.vm.as_mut() { 920 vm.resume()?; 921 Response::ok().write_to(&mut socket)?; 922 } else { 923 warn!("VM not created yet"); 924 Response::error().write_to(&mut socket)?; 925 } 926 break; 927 } 928 Command::Abandon => { 929 info!("Abandon Command Received"); 930 self.vm = None; 931 self.vm_config = None; 932 Response::ok().write_to(&mut socket).ok(); 933 break; 934 } 935 } 936 } 937 938 Ok(()) 939 } 940 941 // Returns true if there were dirty pages to send 942 fn vm_maybe_send_dirty_pages<T>( 943 vm: &mut Vm, 944 socket: &mut T, 945 ) -> result::Result<bool, MigratableError> 946 where 947 T: Read + Write, 948 { 949 // Send (dirty) memory table 950 let table = vm.dirty_memory_range_table()?; 951 952 // But if there are no regions go straight to pause 953 if table.regions().is_empty() { 954 return Ok(false); 955 } 956 957 Request::memory(table.length()).write_to(socket).unwrap(); 958 table.write_to(socket)?; 959 // And then the memory itself 960 vm.send_memory_regions(&table, socket)?; 961 let res = Response::read_from(socket)?; 962 if res.status() != Status::Ok { 963 warn!("Error during dirty memory migration"); 964 Request::abandon().write_to(socket)?; 965 Response::read_from(socket).ok(); 966 return Err(MigratableError::MigrateSend(anyhow!( 967 "Error during dirty memory migration" 968 ))); 969 } 970 971 Ok(true) 972 } 973 974 fn vm_send_migration( 975 &mut self, 976 send_data_migration: VmSendMigrationData, 977 ) -> result::Result<(), MigratableError> { 978 info!( 979 "Sending migration: destination_url = {}", 980 send_data_migration.destination_url 981 ); 982 if let Some(ref mut vm) = self.vm { 983 let path = Self::socket_url_to_path(&send_data_migration.destination_url)?; 984 let mut socket = UnixStream::connect(&path).map_err(|e| { 985 MigratableError::MigrateSend(anyhow!("Error connecting to UNIX socket: {}", e)) 986 })?; 987 988 // Start the migration 989 Request::start().write_to(&mut socket)?; 990 let res = Response::read_from(&mut socket)?; 991 if res.status() != Status::Ok { 992 warn!("Error starting migration"); 993 Request::abandon().write_to(&mut socket)?; 994 Response::read_from(&mut socket).ok(); 995 return Err(MigratableError::MigrateSend(anyhow!( 996 "Error starting migration" 997 ))); 998 } 999 1000 // Send config 1001 let config_data = serde_json::to_vec(&vm.get_config()).unwrap(); 1002 Request::config(config_data.len() as u64).write_to(&mut socket)?; 1003 socket 1004 .write_all(&config_data) 1005 .map_err(MigratableError::MigrateSocket)?; 1006 let res = Response::read_from(&mut socket)?; 1007 if res.status() != Status::Ok { 1008 warn!("Error during config migration"); 1009 Request::abandon().write_to(&mut socket)?; 1010 Response::read_from(&mut socket).ok(); 1011 return Err(MigratableError::MigrateSend(anyhow!( 1012 "Error during config migration" 1013 ))); 1014 } 1015 1016 // Start logging dirty pages 1017 vm.start_memory_dirty_log()?; 1018 1019 // Send memory table 1020 let table = vm.memory_range_table()?; 1021 Request::memory(table.length()) 1022 .write_to(&mut socket) 1023 .unwrap(); 1024 table.write_to(&mut socket)?; 1025 // And then the memory itself 1026 vm.send_memory_regions(&table, &mut socket)?; 1027 let res = Response::read_from(&mut socket)?; 1028 if res.status() != Status::Ok { 1029 warn!("Error during memory migration"); 1030 Request::abandon().write_to(&mut socket)?; 1031 Response::read_from(&mut socket).ok(); 1032 return Err(MigratableError::MigrateSend(anyhow!( 1033 "Error during memory migration" 1034 ))); 1035 } 1036 1037 // Try at most 5 passes of dirty memory sending 1038 const MAX_DIRTY_MIGRATIONS: usize = 5; 1039 for i in 0..MAX_DIRTY_MIGRATIONS { 1040 info!("Dirty memory migration {} of {}", i, MAX_DIRTY_MIGRATIONS); 1041 if !Self::vm_maybe_send_dirty_pages(vm, &mut socket)? { 1042 break; 1043 } 1044 } 1045 1046 // Now pause VM 1047 vm.pause()?; 1048 1049 // Send last batch of dirty pages 1050 Self::vm_maybe_send_dirty_pages(vm, &mut socket)?; 1051 1052 // Capture snapshot and send it 1053 let vm_snapshot = vm.snapshot()?; 1054 let snapshot_data = serde_json::to_vec(&vm_snapshot).unwrap(); 1055 Request::state(snapshot_data.len() as u64).write_to(&mut socket)?; 1056 socket 1057 .write_all(&snapshot_data) 1058 .map_err(MigratableError::MigrateSocket)?; 1059 let res = Response::read_from(&mut socket)?; 1060 if res.status() != Status::Ok { 1061 warn!("Error during state migration"); 1062 Request::abandon().write_to(&mut socket)?; 1063 Response::read_from(&mut socket).ok(); 1064 return Err(MigratableError::MigrateSend(anyhow!( 1065 "Error during state migration" 1066 ))); 1067 } 1068 1069 // Complete the migration 1070 Request::complete().write_to(&mut socket)?; 1071 let res = Response::read_from(&mut socket)?; 1072 if res.status() != Status::Ok { 1073 warn!("Error completing migration"); 1074 Request::abandon().write_to(&mut socket)?; 1075 Response::read_from(&mut socket).ok(); 1076 return Err(MigratableError::MigrateSend(anyhow!( 1077 "Error completing migration" 1078 ))); 1079 } 1080 info!("Migration complete"); 1081 Ok(()) 1082 } else { 1083 Err(MigratableError::MigrateSend(anyhow!("VM is not running"))) 1084 } 1085 } 1086 1087 fn control_loop(&mut self, api_receiver: Arc<Receiver<ApiRequest>>) -> Result<()> { 1088 const EPOLL_EVENTS_LEN: usize = 100; 1089 1090 let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); EPOLL_EVENTS_LEN]; 1091 let epoll_fd = self.epoll.as_raw_fd(); 1092 1093 'outer: loop { 1094 let num_events = match epoll::wait(epoll_fd, -1, &mut events[..]) { 1095 Ok(res) => res, 1096 Err(e) => { 1097 if e.kind() == io::ErrorKind::Interrupted { 1098 // It's well defined from the epoll_wait() syscall 1099 // documentation that the epoll loop can be interrupted 1100 // before any of the requested events occurred or the 1101 // timeout expired. In both those cases, epoll_wait() 1102 // returns an error of type EINTR, but this should not 1103 // be considered as a regular error. Instead it is more 1104 // appropriate to retry, by calling into epoll_wait(). 1105 continue; 1106 } 1107 return Err(Error::Epoll(e)); 1108 } 1109 }; 1110 1111 for event in events.iter().take(num_events) { 1112 let dispatch_idx = event.data as usize; 1113 1114 if let Some(dispatch_type) = self.epoll.dispatch_table[dispatch_idx] { 1115 match dispatch_type { 1116 EpollDispatch::Exit => { 1117 info!("VM exit event"); 1118 // Consume the event. 1119 self.exit_evt.read().map_err(Error::EventFdRead)?; 1120 self.vmm_shutdown().map_err(Error::VmmShutdown)?; 1121 1122 break 'outer; 1123 } 1124 EpollDispatch::Reset => { 1125 info!("VM reset event"); 1126 // Consume the event. 1127 self.reset_evt.read().map_err(Error::EventFdRead)?; 1128 self.vm_reboot().map_err(Error::VmReboot)?; 1129 } 1130 EpollDispatch::Stdin => { 1131 if let Some(ref vm) = self.vm { 1132 vm.handle_stdin().map_err(Error::Stdin)?; 1133 } 1134 } 1135 EpollDispatch::ActivateVirtioDevices => { 1136 if let Some(ref vm) = self.vm { 1137 let count = self.activate_evt.read().map_err(Error::EventFdRead)?; 1138 info!( 1139 "Trying to activate pending virtio devices: count = {}", 1140 count 1141 ); 1142 vm.activate_virtio_devices() 1143 .map_err(Error::ActivateVirtioDevices)?; 1144 } 1145 } 1146 EpollDispatch::Pty => { 1147 if let Some(ref vm) = self.vm { 1148 vm.handle_pty().map_err(Error::Pty)?; 1149 } 1150 } 1151 EpollDispatch::Api => { 1152 // Consume the event. 1153 self.api_evt.read().map_err(Error::EventFdRead)?; 1154 1155 // Read from the API receiver channel 1156 let api_request = api_receiver.recv().map_err(Error::ApiRequestRecv)?; 1157 1158 info!("API request event: {:?}", api_request); 1159 match api_request { 1160 ApiRequest::VmCreate(config, sender) => { 1161 let response = self 1162 .vm_create(config) 1163 .map_err(ApiError::VmCreate) 1164 .map(|_| ApiResponsePayload::Empty); 1165 1166 sender.send(response).map_err(Error::ApiResponseSend)?; 1167 } 1168 ApiRequest::VmDelete(sender) => { 1169 let response = self 1170 .vm_delete() 1171 .map_err(ApiError::VmDelete) 1172 .map(|_| ApiResponsePayload::Empty); 1173 1174 sender.send(response).map_err(Error::ApiResponseSend)?; 1175 } 1176 ApiRequest::VmBoot(sender) => { 1177 let response = self 1178 .vm_boot() 1179 .map_err(ApiError::VmBoot) 1180 .map(|_| ApiResponsePayload::Empty); 1181 1182 sender.send(response).map_err(Error::ApiResponseSend)?; 1183 } 1184 ApiRequest::VmShutdown(sender) => { 1185 let response = self 1186 .vm_shutdown() 1187 .map_err(ApiError::VmShutdown) 1188 .map(|_| ApiResponsePayload::Empty); 1189 1190 sender.send(response).map_err(Error::ApiResponseSend)?; 1191 } 1192 ApiRequest::VmReboot(sender) => { 1193 let response = self 1194 .vm_reboot() 1195 .map_err(ApiError::VmReboot) 1196 .map(|_| ApiResponsePayload::Empty); 1197 1198 sender.send(response).map_err(Error::ApiResponseSend)?; 1199 } 1200 ApiRequest::VmInfo(sender) => { 1201 let response = self 1202 .vm_info() 1203 .map_err(ApiError::VmInfo) 1204 .map(ApiResponsePayload::VmInfo); 1205 1206 sender.send(response).map_err(Error::ApiResponseSend)?; 1207 } 1208 ApiRequest::VmmPing(sender) => { 1209 let response = ApiResponsePayload::VmmPing(self.vmm_ping()); 1210 1211 sender.send(Ok(response)).map_err(Error::ApiResponseSend)?; 1212 } 1213 ApiRequest::VmPause(sender) => { 1214 let response = self 1215 .vm_pause() 1216 .map_err(ApiError::VmPause) 1217 .map(|_| ApiResponsePayload::Empty); 1218 1219 sender.send(response).map_err(Error::ApiResponseSend)?; 1220 } 1221 ApiRequest::VmResume(sender) => { 1222 let response = self 1223 .vm_resume() 1224 .map_err(ApiError::VmResume) 1225 .map(|_| ApiResponsePayload::Empty); 1226 1227 sender.send(response).map_err(Error::ApiResponseSend)?; 1228 } 1229 ApiRequest::VmSnapshot(snapshot_data, sender) => { 1230 let response = self 1231 .vm_snapshot(&snapshot_data.destination_url) 1232 .map_err(ApiError::VmSnapshot) 1233 .map(|_| ApiResponsePayload::Empty); 1234 1235 sender.send(response).map_err(Error::ApiResponseSend)?; 1236 } 1237 ApiRequest::VmRestore(restore_data, sender) => { 1238 let response = self 1239 .vm_restore(restore_data.as_ref().clone()) 1240 .map_err(ApiError::VmRestore) 1241 .map(|_| ApiResponsePayload::Empty); 1242 1243 sender.send(response).map_err(Error::ApiResponseSend)?; 1244 } 1245 ApiRequest::VmmShutdown(sender) => { 1246 let response = self 1247 .vmm_shutdown() 1248 .map_err(ApiError::VmmShutdown) 1249 .map(|_| ApiResponsePayload::Empty); 1250 1251 sender.send(response).map_err(Error::ApiResponseSend)?; 1252 1253 break 'outer; 1254 } 1255 ApiRequest::VmResize(resize_data, sender) => { 1256 let response = self 1257 .vm_resize( 1258 resize_data.desired_vcpus, 1259 resize_data.desired_ram, 1260 resize_data.desired_balloon, 1261 ) 1262 .map_err(ApiError::VmResize) 1263 .map(|_| ApiResponsePayload::Empty); 1264 sender.send(response).map_err(Error::ApiResponseSend)?; 1265 } 1266 ApiRequest::VmResizeZone(resize_zone_data, sender) => { 1267 let response = self 1268 .vm_resize_zone( 1269 resize_zone_data.id.clone(), 1270 resize_zone_data.desired_ram, 1271 ) 1272 .map_err(ApiError::VmResizeZone) 1273 .map(|_| ApiResponsePayload::Empty); 1274 sender.send(response).map_err(Error::ApiResponseSend)?; 1275 } 1276 ApiRequest::VmAddDevice(add_device_data, sender) => { 1277 let response = self 1278 .vm_add_device(add_device_data.as_ref().clone()) 1279 .map_err(ApiError::VmAddDevice) 1280 .map(ApiResponsePayload::VmAction); 1281 sender.send(response).map_err(Error::ApiResponseSend)?; 1282 } 1283 ApiRequest::VmRemoveDevice(remove_device_data, sender) => { 1284 let response = self 1285 .vm_remove_device(remove_device_data.id.clone()) 1286 .map_err(ApiError::VmRemoveDevice) 1287 .map(|_| ApiResponsePayload::Empty); 1288 sender.send(response).map_err(Error::ApiResponseSend)?; 1289 } 1290 ApiRequest::VmAddDisk(add_disk_data, sender) => { 1291 let response = self 1292 .vm_add_disk(add_disk_data.as_ref().clone()) 1293 .map_err(ApiError::VmAddDisk) 1294 .map(ApiResponsePayload::VmAction); 1295 sender.send(response).map_err(Error::ApiResponseSend)?; 1296 } 1297 ApiRequest::VmAddFs(add_fs_data, sender) => { 1298 let response = self 1299 .vm_add_fs(add_fs_data.as_ref().clone()) 1300 .map_err(ApiError::VmAddFs) 1301 .map(ApiResponsePayload::VmAction); 1302 sender.send(response).map_err(Error::ApiResponseSend)?; 1303 } 1304 ApiRequest::VmAddPmem(add_pmem_data, sender) => { 1305 let response = self 1306 .vm_add_pmem(add_pmem_data.as_ref().clone()) 1307 .map_err(ApiError::VmAddPmem) 1308 .map(ApiResponsePayload::VmAction); 1309 sender.send(response).map_err(Error::ApiResponseSend)?; 1310 } 1311 ApiRequest::VmAddNet(add_net_data, sender) => { 1312 let response = self 1313 .vm_add_net(add_net_data.as_ref().clone()) 1314 .map_err(ApiError::VmAddNet) 1315 .map(ApiResponsePayload::VmAction); 1316 sender.send(response).map_err(Error::ApiResponseSend)?; 1317 } 1318 ApiRequest::VmAddVsock(add_vsock_data, sender) => { 1319 let response = self 1320 .vm_add_vsock(add_vsock_data.as_ref().clone()) 1321 .map_err(ApiError::VmAddVsock) 1322 .map(ApiResponsePayload::VmAction); 1323 sender.send(response).map_err(Error::ApiResponseSend)?; 1324 } 1325 ApiRequest::VmCounters(sender) => { 1326 let response = self 1327 .vm_counters() 1328 .map_err(ApiError::VmInfo) 1329 .map(ApiResponsePayload::VmAction); 1330 1331 sender.send(response).map_err(Error::ApiResponseSend)?; 1332 } 1333 ApiRequest::VmReceiveMigration(receive_migration_data, sender) => { 1334 let response = self 1335 .vm_receive_migration( 1336 receive_migration_data.as_ref().clone(), 1337 ) 1338 .map_err(ApiError::VmReceiveMigration) 1339 .map(|_| ApiResponsePayload::Empty); 1340 sender.send(response).map_err(Error::ApiResponseSend)?; 1341 } 1342 ApiRequest::VmSendMigration(send_migration_data, sender) => { 1343 let response = self 1344 .vm_send_migration(send_migration_data.as_ref().clone()) 1345 .map_err(ApiError::VmSendMigration) 1346 .map(|_| ApiResponsePayload::Empty); 1347 sender.send(response).map_err(Error::ApiResponseSend)?; 1348 } 1349 ApiRequest::VmPowerButton(sender) => { 1350 let response = self 1351 .vm_power_button() 1352 .map_err(ApiError::VmPowerButton) 1353 .map(|_| ApiResponsePayload::Empty); 1354 1355 sender.send(response).map_err(Error::ApiResponseSend)?; 1356 } 1357 } 1358 } 1359 } 1360 } 1361 } 1362 } 1363 1364 Ok(()) 1365 } 1366 } 1367 1368 const CPU_MANAGER_SNAPSHOT_ID: &str = "cpu-manager"; 1369 const MEMORY_MANAGER_SNAPSHOT_ID: &str = "memory-manager"; 1370 const DEVICE_MANAGER_SNAPSHOT_ID: &str = "device-manager"; 1371