xref: /cloud-hypervisor/vmm/src/lib.rs (revision 9af2968a7dc47b89bf07ea9dc5e735084efcfa3a)
1 // Copyright © 2019 Intel Corporation
2 //
3 // SPDX-License-Identifier: Apache-2.0
4 //
5 
6 #[macro_use]
7 extern crate event_monitor;
8 #[macro_use]
9 extern crate lazy_static;
10 #[macro_use]
11 extern crate log;
12 #[macro_use]
13 extern crate serde_derive;
14 #[cfg(test)]
15 #[macro_use]
16 extern crate credibility;
17 
18 use crate::api::{
19     ApiError, ApiRequest, ApiResponse, ApiResponsePayload, VmInfo, VmReceiveMigrationData,
20     VmSendMigrationData, VmmPingResponse,
21 };
22 use crate::config::{
23     DeviceConfig, DiskConfig, FsConfig, NetConfig, PmemConfig, RestoreConfig, VmConfig, VsockConfig,
24 };
25 use crate::migration::{get_vm_snapshot, recv_vm_snapshot};
26 use crate::seccomp_filters::{get_seccomp_filter, Thread};
27 use crate::vm::{Error as VmError, Vm, VmState};
28 use anyhow::anyhow;
29 use libc::EFD_NONBLOCK;
30 use seccomp::{SeccompAction, SeccompFilter};
31 use serde::ser::{Serialize, SerializeStruct, Serializer};
32 use std::fs::File;
33 use std::io;
34 use std::io::{Read, Write};
35 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
36 use std::os::unix::net::UnixListener;
37 use std::os::unix::net::UnixStream;
38 use std::path::PathBuf;
39 use std::sync::mpsc::{Receiver, RecvError, SendError, Sender};
40 use std::sync::{Arc, Mutex};
41 use std::{result, thread};
42 use thiserror::Error;
43 use vm_memory::bitmap::AtomicBitmap;
44 use vm_migration::protocol::*;
45 use vm_migration::{MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
46 use vmm_sys_util::eventfd::EventFd;
47 
48 pub mod api;
49 pub mod config;
50 pub mod cpu;
51 pub mod device_manager;
52 pub mod device_tree;
53 pub mod interrupt;
54 pub mod memory_manager;
55 pub mod migration;
56 pub mod seccomp_filters;
57 pub mod vm;
58 
59 #[cfg(feature = "acpi")]
60 mod acpi;
61 
62 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<AtomicBitmap>;
63 type GuestRegionMmap = vm_memory::GuestRegionMmap<AtomicBitmap>;
64 
65 /// Errors associated with VMM management
66 #[derive(Debug, Error)]
67 #[allow(clippy::large_enum_variant)]
68 pub enum Error {
69     /// API request receive error
70     #[error("Error receiving API request: {0}")]
71     ApiRequestRecv(#[source] RecvError),
72 
73     /// API response send error
74     #[error("Error sending API request: {0}")]
75     ApiResponseSend(#[source] SendError<ApiResponse>),
76 
77     /// Cannot bind to the UNIX domain socket path
78     #[error("Error binding to UNIX domain socket: {0}")]
79     Bind(#[source] io::Error),
80 
81     /// Cannot clone EventFd.
82     #[error("Error cloning EventFd: {0}")]
83     EventFdClone(#[source] io::Error),
84 
85     /// Cannot create EventFd.
86     #[error("Error creating EventFd: {0}")]
87     EventFdCreate(#[source] io::Error),
88 
89     /// Cannot read from EventFd.
90     #[error("Error reading from EventFd: {0}")]
91     EventFdRead(#[source] io::Error),
92 
93     /// Cannot create epoll context.
94     #[error("Error creating epoll context: {0}")]
95     Epoll(#[source] io::Error),
96 
97     /// Cannot create HTTP thread
98     #[error("Error spawning HTTP thread: {0}")]
99     HttpThreadSpawn(#[source] io::Error),
100 
101     /// Cannot handle the VM STDIN stream
102     #[error("Error handling VM stdin: {0:?}")]
103     Stdin(VmError),
104 
105     /// Cannot handle the VM pty stream
106     #[error("Error handling VM pty: {0:?}")]
107     Pty(VmError),
108 
109     /// Cannot reboot the VM
110     #[error("Error rebooting VM: {0:?}")]
111     VmReboot(VmError),
112 
113     /// Cannot create VMM thread
114     #[error("Error spawning VMM thread {0:?}")]
115     VmmThreadSpawn(#[source] io::Error),
116 
117     /// Cannot shut the VMM down
118     #[error("Error shutting down VMM: {0:?}")]
119     VmmShutdown(VmError),
120 
121     /// Cannot create seccomp filter
122     #[error("Error creating seccomp filter: {0}")]
123     CreateSeccompFilter(seccomp::SeccompError),
124 
125     /// Cannot apply seccomp filter
126     #[error("Error applying seccomp filter: {0}")]
127     ApplySeccompFilter(seccomp::Error),
128 
129     /// Error activating virtio devices
130     #[error("Error activating virtio devices: {0:?}")]
131     ActivateVirtioDevices(VmError),
132 
133     /// Error creating API server
134     #[error("Error creating API server {0:?}")]
135     CreateApiServer(micro_http::ServerError),
136 
137     /// Error binding API server socket
138     #[error("Error creation API server's socket {0:?}")]
139     CreateApiServerSocket(#[source] io::Error),
140 }
141 pub type Result<T> = result::Result<T, Error>;
142 
143 #[derive(Debug, Clone, Copy, PartialEq)]
144 pub enum EpollDispatch {
145     Exit,
146     Reset,
147     Stdin,
148     Api,
149     ActivateVirtioDevices,
150     Pty,
151 }
152 
153 pub struct EpollContext {
154     epoll_file: File,
155     dispatch_table: Vec<Option<EpollDispatch>>,
156 }
157 
158 impl EpollContext {
159     pub fn new() -> result::Result<EpollContext, io::Error> {
160         let epoll_fd = epoll::create(true)?;
161         // Use 'File' to enforce closing on 'epoll_fd'
162         let epoll_file = unsafe { File::from_raw_fd(epoll_fd) };
163 
164         // Initial capacity needs to be large enough to hold:
165         // * 1 exit event
166         // * 1 reset event
167         // * 1 stdin event
168         // * 1 API event
169         let mut dispatch_table = Vec::with_capacity(5);
170         dispatch_table.push(None);
171 
172         Ok(EpollContext {
173             epoll_file,
174             dispatch_table,
175         })
176     }
177 
178     pub fn add_stdin(&mut self) -> result::Result<(), io::Error> {
179         let dispatch_index = self.dispatch_table.len() as u64;
180         epoll::ctl(
181             self.epoll_file.as_raw_fd(),
182             epoll::ControlOptions::EPOLL_CTL_ADD,
183             libc::STDIN_FILENO,
184             epoll::Event::new(epoll::Events::EPOLLIN, dispatch_index),
185         )?;
186 
187         self.dispatch_table.push(Some(EpollDispatch::Stdin));
188 
189         Ok(())
190     }
191 
192     fn add_event<T>(&mut self, fd: &T, token: EpollDispatch) -> result::Result<(), io::Error>
193     where
194         T: AsRawFd,
195     {
196         let dispatch_index = self.dispatch_table.len() as u64;
197         epoll::ctl(
198             self.epoll_file.as_raw_fd(),
199             epoll::ControlOptions::EPOLL_CTL_ADD,
200             fd.as_raw_fd(),
201             epoll::Event::new(epoll::Events::EPOLLIN, dispatch_index),
202         )?;
203         self.dispatch_table.push(Some(token));
204 
205         Ok(())
206     }
207 }
208 
209 impl AsRawFd for EpollContext {
210     fn as_raw_fd(&self) -> RawFd {
211         self.epoll_file.as_raw_fd()
212     }
213 }
214 
215 pub struct PciDeviceInfo {
216     pub id: String,
217     pub bdf: u32,
218 }
219 
220 impl Serialize for PciDeviceInfo {
221     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
222     where
223         S: Serializer,
224     {
225         // Transform the PCI b/d/f into a standardized string.
226         let segment = (self.bdf >> 16) & 0xffff;
227         let bus = (self.bdf >> 8) & 0xff;
228         let device = (self.bdf >> 3) & 0x1f;
229         let function = self.bdf & 0x7;
230         let bdf_str = format!(
231             "{:04x}:{:02x}:{:02x}.{:01x}",
232             segment, bus, device, function
233         );
234 
235         // Serialize the structure.
236         let mut state = serializer.serialize_struct("PciDeviceInfo", 2)?;
237         state.serialize_field("id", &self.id)?;
238         state.serialize_field("bdf", &bdf_str)?;
239         state.end()
240     }
241 }
242 
243 #[allow(clippy::too_many_arguments)]
244 pub fn start_vmm_thread(
245     vmm_version: String,
246     http_path: &Option<String>,
247     http_fd: Option<RawFd>,
248     api_event: EventFd,
249     api_sender: Sender<ApiRequest>,
250     api_receiver: Receiver<ApiRequest>,
251     seccomp_action: &SeccompAction,
252     hypervisor: Arc<dyn hypervisor::Hypervisor>,
253 ) -> Result<thread::JoinHandle<Result<()>>> {
254     let http_api_event = api_event.try_clone().map_err(Error::EventFdClone)?;
255 
256     // Retrieve seccomp filter
257     let vmm_seccomp_filter =
258         get_seccomp_filter(seccomp_action, Thread::Vmm).map_err(Error::CreateSeccompFilter)?;
259 
260     let vmm_seccomp_action = seccomp_action.clone();
261     let thread = thread::Builder::new()
262         .name("vmm".to_string())
263         .spawn(move || {
264             // Apply seccomp filter for VMM thread.
265             SeccompFilter::apply(vmm_seccomp_filter).map_err(Error::ApplySeccompFilter)?;
266 
267             let mut vmm = Vmm::new(
268                 vmm_version.to_string(),
269                 api_event,
270                 vmm_seccomp_action,
271                 hypervisor,
272             )?;
273 
274             vmm.control_loop(Arc::new(api_receiver))
275         })
276         .map_err(Error::VmmThreadSpawn)?;
277 
278     // The VMM thread is started, we can start serving HTTP requests
279     if let Some(http_path) = http_path {
280         api::start_http_path_thread(http_path, http_api_event, api_sender, seccomp_action)?;
281     } else if let Some(http_fd) = http_fd {
282         api::start_http_fd_thread(http_fd, http_api_event, api_sender, seccomp_action)?;
283     }
284     Ok(thread)
285 }
286 
287 pub struct Vmm {
288     epoll: EpollContext,
289     exit_evt: EventFd,
290     reset_evt: EventFd,
291     api_evt: EventFd,
292     version: String,
293     vm: Option<Vm>,
294     vm_config: Option<Arc<Mutex<VmConfig>>>,
295     seccomp_action: SeccompAction,
296     hypervisor: Arc<dyn hypervisor::Hypervisor>,
297     activate_evt: EventFd,
298 }
299 
300 impl Vmm {
301     fn new(
302         vmm_version: String,
303         api_evt: EventFd,
304         seccomp_action: SeccompAction,
305         hypervisor: Arc<dyn hypervisor::Hypervisor>,
306     ) -> Result<Self> {
307         let mut epoll = EpollContext::new().map_err(Error::Epoll)?;
308         let exit_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?;
309         let reset_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?;
310         let activate_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?;
311 
312         if unsafe { libc::isatty(libc::STDIN_FILENO as i32) } != 0 {
313             epoll.add_stdin().map_err(Error::Epoll)?;
314         }
315 
316         epoll
317             .add_event(&exit_evt, EpollDispatch::Exit)
318             .map_err(Error::Epoll)?;
319 
320         epoll
321             .add_event(&reset_evt, EpollDispatch::Reset)
322             .map_err(Error::Epoll)?;
323 
324         epoll
325             .add_event(&activate_evt, EpollDispatch::ActivateVirtioDevices)
326             .map_err(Error::Epoll)?;
327 
328         epoll
329             .add_event(&api_evt, EpollDispatch::Api)
330             .map_err(Error::Epoll)?;
331 
332         Ok(Vmm {
333             epoll,
334             exit_evt,
335             reset_evt,
336             api_evt,
337             version: vmm_version,
338             vm: None,
339             vm_config: None,
340             seccomp_action,
341             hypervisor,
342             activate_evt,
343         })
344     }
345 
346     fn vm_create(&mut self, config: Arc<Mutex<VmConfig>>) -> result::Result<(), VmError> {
347         // We only store the passed VM config.
348         // The VM will be created when being asked to boot it.
349         if self.vm_config.is_none() {
350             self.vm_config = Some(config);
351             Ok(())
352         } else {
353             Err(VmError::VmAlreadyCreated)
354         }
355     }
356 
357     fn vm_boot(&mut self) -> result::Result<(), VmError> {
358         // If we don't have a config, we can not boot a VM.
359         if self.vm_config.is_none() {
360             return Err(VmError::VmMissingConfig);
361         };
362 
363         // Create a new VM if we don't have one yet.
364         if self.vm.is_none() {
365             let exit_evt = self.exit_evt.try_clone().map_err(VmError::EventFdClone)?;
366             let reset_evt = self.reset_evt.try_clone().map_err(VmError::EventFdClone)?;
367             let activate_evt = self
368                 .activate_evt
369                 .try_clone()
370                 .map_err(VmError::EventFdClone)?;
371 
372             if let Some(ref vm_config) = self.vm_config {
373                 let vm = Vm::new(
374                     Arc::clone(vm_config),
375                     exit_evt,
376                     reset_evt,
377                     &self.seccomp_action,
378                     self.hypervisor.clone(),
379                     activate_evt,
380                     None,
381                     None,
382                 )?;
383                 if let Some(serial_pty) = vm.serial_pty() {
384                     self.epoll
385                         .add_event(&serial_pty.main, EpollDispatch::Pty)
386                         .map_err(VmError::EventfdError)?;
387                 };
388                 if let Some(console_pty) = vm.console_pty() {
389                     self.epoll
390                         .add_event(&console_pty.main, EpollDispatch::Pty)
391                         .map_err(VmError::EventfdError)?;
392                 };
393                 self.vm = Some(vm);
394             }
395         }
396 
397         // Now we can boot the VM.
398         if let Some(ref mut vm) = self.vm {
399             vm.boot()
400         } else {
401             Err(VmError::VmNotCreated)
402         }
403     }
404 
405     fn vm_pause(&mut self) -> result::Result<(), VmError> {
406         if let Some(ref mut vm) = self.vm {
407             vm.pause().map_err(VmError::Pause)
408         } else {
409             Err(VmError::VmNotRunning)
410         }
411     }
412 
413     fn vm_resume(&mut self) -> result::Result<(), VmError> {
414         if let Some(ref mut vm) = self.vm {
415             vm.resume().map_err(VmError::Resume)
416         } else {
417             Err(VmError::VmNotRunning)
418         }
419     }
420 
421     fn vm_snapshot(&mut self, destination_url: &str) -> result::Result<(), VmError> {
422         if let Some(ref mut vm) = self.vm {
423             vm.snapshot()
424                 .map_err(VmError::Snapshot)
425                 .and_then(|snapshot| {
426                     vm.send(&snapshot, destination_url)
427                         .map_err(VmError::SnapshotSend)
428                 })
429         } else {
430             Err(VmError::VmNotRunning)
431         }
432     }
433 
434     fn vm_restore(&mut self, restore_cfg: RestoreConfig) -> result::Result<(), VmError> {
435         if self.vm.is_some() || self.vm_config.is_some() {
436             return Err(VmError::VmAlreadyCreated);
437         }
438 
439         let source_url = restore_cfg.source_url.as_path().to_str();
440         if source_url.is_none() {
441             return Err(VmError::RestoreSourceUrlPathToStr);
442         }
443         // Safe to unwrap as we checked it was Some(&str).
444         let source_url = source_url.unwrap();
445 
446         let snapshot = recv_vm_snapshot(source_url).map_err(VmError::Restore)?;
447         let vm_snapshot = get_vm_snapshot(&snapshot).map_err(VmError::Restore)?;
448 
449         self.vm_config = Some(Arc::clone(&vm_snapshot.config));
450 
451         let exit_evt = self.exit_evt.try_clone().map_err(VmError::EventFdClone)?;
452         let reset_evt = self.reset_evt.try_clone().map_err(VmError::EventFdClone)?;
453         let activate_evt = self
454             .activate_evt
455             .try_clone()
456             .map_err(VmError::EventFdClone)?;
457 
458         let vm = Vm::new_from_snapshot(
459             &snapshot,
460             exit_evt,
461             reset_evt,
462             Some(source_url),
463             restore_cfg.prefault,
464             &self.seccomp_action,
465             self.hypervisor.clone(),
466             activate_evt,
467         )?;
468         self.vm = Some(vm);
469 
470         // Now we can restore the rest of the VM.
471         if let Some(ref mut vm) = self.vm {
472             vm.restore(snapshot).map_err(VmError::Restore)
473         } else {
474             Err(VmError::VmNotCreated)
475         }
476     }
477 
478     fn vm_shutdown(&mut self) -> result::Result<(), VmError> {
479         if let Some(ref mut vm) = self.vm.take() {
480             vm.shutdown()
481         } else {
482             Err(VmError::VmNotRunning)
483         }
484     }
485 
486     fn vm_reboot(&mut self) -> result::Result<(), VmError> {
487         // Without ACPI, a reset is equivalent to a shutdown
488         // On AArch64, before ACPI is supported, we simply jump over this check and continue to reset.
489         #[cfg(all(target_arch = "x86_64", not(feature = "acpi")))]
490         {
491             if self.vm.is_some() {
492                 self.exit_evt.write(1).unwrap();
493                 return Ok(());
494             }
495         }
496 
497         // First we stop the current VM and create a new one.
498         if let Some(ref mut vm) = self.vm {
499             let config = vm.get_config();
500             let serial_pty = vm.serial_pty();
501             let console_pty = vm.console_pty();
502             self.vm_shutdown()?;
503 
504             let exit_evt = self.exit_evt.try_clone().map_err(VmError::EventFdClone)?;
505             let reset_evt = self.reset_evt.try_clone().map_err(VmError::EventFdClone)?;
506             let activate_evt = self
507                 .activate_evt
508                 .try_clone()
509                 .map_err(VmError::EventFdClone)?;
510 
511             // The Linux kernel fires off an i8042 reset after doing the ACPI reset so there may be
512             // an event sitting in the shared reset_evt. Without doing this we get very early reboots
513             // during the boot process.
514             if self.reset_evt.read().is_ok() {
515                 warn!("Spurious second reset event received. Ignoring.");
516             }
517             self.vm = Some(Vm::new(
518                 config,
519                 exit_evt,
520                 reset_evt,
521                 &self.seccomp_action,
522                 self.hypervisor.clone(),
523                 activate_evt,
524                 serial_pty,
525                 console_pty,
526             )?);
527         }
528 
529         // Then we start the new VM.
530         if let Some(ref mut vm) = self.vm {
531             vm.boot()
532         } else {
533             Err(VmError::VmNotCreated)
534         }
535     }
536 
537     fn vm_info(&self) -> result::Result<VmInfo, VmError> {
538         match &self.vm_config {
539             Some(config) => {
540                 let state = match &self.vm {
541                     Some(vm) => vm.get_state()?,
542                     None => VmState::Created,
543                 };
544 
545                 let config = Arc::clone(config);
546 
547                 let mut memory_actual_size = config.lock().unwrap().memory.total_size();
548                 if let Some(vm) = &self.vm {
549                     memory_actual_size -= vm.balloon_size();
550                 }
551 
552                 let device_tree = self.vm.as_ref().map(|vm| vm.device_tree());
553 
554                 Ok(VmInfo {
555                     config,
556                     state,
557                     memory_actual_size,
558                     device_tree,
559                 })
560             }
561             None => Err(VmError::VmNotCreated),
562         }
563     }
564 
565     fn vmm_ping(&self) -> VmmPingResponse {
566         VmmPingResponse {
567             version: self.version.clone(),
568         }
569     }
570 
571     fn vm_delete(&mut self) -> result::Result<(), VmError> {
572         if self.vm_config.is_none() {
573             return Ok(());
574         }
575 
576         // If a VM is booted, we first try to shut it down.
577         if self.vm.is_some() {
578             self.vm_shutdown()?;
579         }
580 
581         self.vm_config = None;
582 
583         event!("vm", "deleted");
584 
585         Ok(())
586     }
587 
588     fn vmm_shutdown(&mut self) -> result::Result<(), VmError> {
589         self.vm_delete()?;
590         event!("vmm", "shutdown");
591         Ok(())
592     }
593 
594     fn vm_resize(
595         &mut self,
596         desired_vcpus: Option<u8>,
597         desired_ram: Option<u64>,
598         desired_balloon: Option<u64>,
599     ) -> result::Result<(), VmError> {
600         if let Some(ref mut vm) = self.vm {
601             if let Err(e) = vm.resize(desired_vcpus, desired_ram, desired_balloon) {
602                 error!("Error when resizing VM: {:?}", e);
603                 Err(e)
604             } else {
605                 Ok(())
606             }
607         } else {
608             Err(VmError::VmNotRunning)
609         }
610     }
611 
612     fn vm_resize_zone(&mut self, id: String, desired_ram: u64) -> result::Result<(), VmError> {
613         if let Some(ref mut vm) = self.vm {
614             if let Err(e) = vm.resize_zone(id, desired_ram) {
615                 error!("Error when resizing VM: {:?}", e);
616                 Err(e)
617             } else {
618                 Ok(())
619             }
620         } else {
621             Err(VmError::VmNotRunning)
622         }
623     }
624 
625     fn vm_add_device(&mut self, device_cfg: DeviceConfig) -> result::Result<Vec<u8>, VmError> {
626         if let Some(ref mut vm) = self.vm {
627             let info = vm.add_device(device_cfg).map_err(|e| {
628                 error!("Error when adding new device to the VM: {:?}", e);
629                 e
630             })?;
631             serde_json::to_vec(&info).map_err(VmError::SerializeJson)
632         } else {
633             Err(VmError::VmNotRunning)
634         }
635     }
636 
637     fn vm_remove_device(&mut self, id: String) -> result::Result<(), VmError> {
638         if let Some(ref mut vm) = self.vm {
639             if let Err(e) = vm.remove_device(id) {
640                 error!("Error when removing new device to the VM: {:?}", e);
641                 Err(e)
642             } else {
643                 Ok(())
644             }
645         } else {
646             Err(VmError::VmNotRunning)
647         }
648     }
649 
650     fn vm_add_disk(&mut self, disk_cfg: DiskConfig) -> result::Result<Vec<u8>, VmError> {
651         if let Some(ref mut vm) = self.vm {
652             let info = vm.add_disk(disk_cfg).map_err(|e| {
653                 error!("Error when adding new disk to the VM: {:?}", e);
654                 e
655             })?;
656             serde_json::to_vec(&info).map_err(VmError::SerializeJson)
657         } else {
658             Err(VmError::VmNotRunning)
659         }
660     }
661 
662     fn vm_add_fs(&mut self, fs_cfg: FsConfig) -> result::Result<Vec<u8>, VmError> {
663         if let Some(ref mut vm) = self.vm {
664             let info = vm.add_fs(fs_cfg).map_err(|e| {
665                 error!("Error when adding new fs to the VM: {:?}", e);
666                 e
667             })?;
668             serde_json::to_vec(&info).map_err(VmError::SerializeJson)
669         } else {
670             Err(VmError::VmNotRunning)
671         }
672     }
673 
674     fn vm_add_pmem(&mut self, pmem_cfg: PmemConfig) -> result::Result<Vec<u8>, VmError> {
675         if let Some(ref mut vm) = self.vm {
676             let info = vm.add_pmem(pmem_cfg).map_err(|e| {
677                 error!("Error when adding new pmem device to the VM: {:?}", e);
678                 e
679             })?;
680             serde_json::to_vec(&info).map_err(VmError::SerializeJson)
681         } else {
682             Err(VmError::VmNotRunning)
683         }
684     }
685 
686     fn vm_add_net(&mut self, net_cfg: NetConfig) -> result::Result<Vec<u8>, VmError> {
687         if let Some(ref mut vm) = self.vm {
688             let info = vm.add_net(net_cfg).map_err(|e| {
689                 error!("Error when adding new network device to the VM: {:?}", e);
690                 e
691             })?;
692             serde_json::to_vec(&info).map_err(VmError::SerializeJson)
693         } else {
694             Err(VmError::VmNotRunning)
695         }
696     }
697 
698     fn vm_add_vsock(&mut self, vsock_cfg: VsockConfig) -> result::Result<Vec<u8>, VmError> {
699         if let Some(ref mut vm) = self.vm {
700             let info = vm.add_vsock(vsock_cfg).map_err(|e| {
701                 error!("Error when adding new vsock device to the VM: {:?}", e);
702                 e
703             })?;
704             serde_json::to_vec(&info).map_err(VmError::SerializeJson)
705         } else {
706             Err(VmError::VmNotRunning)
707         }
708     }
709 
710     fn vm_counters(&mut self) -> result::Result<Vec<u8>, VmError> {
711         if let Some(ref mut vm) = self.vm {
712             let info = vm.counters().map_err(|e| {
713                 error!("Error when getting counters from the VM: {:?}", e);
714                 e
715             })?;
716             serde_json::to_vec(&info).map_err(VmError::SerializeJson)
717         } else {
718             Err(VmError::VmNotRunning)
719         }
720     }
721 
722     fn vm_power_button(&mut self) -> result::Result<(), VmError> {
723         if let Some(ref mut vm) = self.vm {
724             vm.power_button()
725         } else {
726             Err(VmError::VmNotRunning)
727         }
728     }
729 
730     fn vm_receive_config<T>(
731         &mut self,
732         req: &Request,
733         socket: &mut T,
734     ) -> std::result::Result<Vm, MigratableError>
735     where
736         T: Read + Write,
737     {
738         // Read in config data
739         let mut data = Vec::with_capacity(req.length() as usize);
740         unsafe {
741             data.set_len(req.length() as usize);
742         }
743         socket
744             .read_exact(&mut data)
745             .map_err(MigratableError::MigrateSocket)?;
746         let config: VmConfig = serde_json::from_slice(&data).map_err(|e| {
747             MigratableError::MigrateReceive(anyhow!("Error deserialising config: {}", e))
748         })?;
749 
750         let exit_evt = self.exit_evt.try_clone().map_err(|e| {
751             MigratableError::MigrateReceive(anyhow!("Error cloning exit EventFd: {}", e))
752         })?;
753         let reset_evt = self.reset_evt.try_clone().map_err(|e| {
754             MigratableError::MigrateReceive(anyhow!("Error cloning reset EventFd: {}", e))
755         })?;
756         let activate_evt = self.activate_evt.try_clone().map_err(|e| {
757             MigratableError::MigrateReceive(anyhow!("Error cloning activate EventFd: {}", e))
758         })?;
759 
760         self.vm_config = Some(Arc::new(Mutex::new(config)));
761         let vm = Vm::new_from_migration(
762             self.vm_config.clone().unwrap(),
763             exit_evt,
764             reset_evt,
765             &self.seccomp_action,
766             self.hypervisor.clone(),
767             activate_evt,
768         )
769         .map_err(|e| {
770             MigratableError::MigrateReceive(anyhow!("Error creating VM from snapshot: {:?}", e))
771         })?;
772 
773         Response::ok().write_to(socket)?;
774 
775         Ok(vm)
776     }
777 
778     fn vm_receive_state<T>(
779         &mut self,
780         req: &Request,
781         socket: &mut T,
782         mut vm: Vm,
783     ) -> std::result::Result<(), MigratableError>
784     where
785         T: Read + Write,
786     {
787         // Read in state data
788         let mut data = Vec::with_capacity(req.length() as usize);
789         unsafe {
790             data.set_len(req.length() as usize);
791         }
792         socket
793             .read_exact(&mut data)
794             .map_err(MigratableError::MigrateSocket)?;
795         let snapshot: Snapshot = serde_json::from_slice(&data).map_err(|e| {
796             MigratableError::MigrateReceive(anyhow!("Error deserialising snapshot: {}", e))
797         })?;
798 
799         #[cfg(all(feature = "kvm", target_arch = "x86_64"))]
800         vm.load_clock_from_snapshot(&snapshot)
801             .map_err(|e| MigratableError::MigrateReceive(anyhow!("Error resume clock: {:?}", e)))?;
802 
803         // Create VM
804         vm.restore(snapshot).map_err(|e| {
805             Response::error().write_to(socket).ok();
806             e
807         })?;
808         self.vm = Some(vm);
809 
810         Response::ok().write_to(socket)?;
811 
812         Ok(())
813     }
814 
815     fn vm_receive_memory<T>(
816         &mut self,
817         req: &Request,
818         socket: &mut T,
819         vm: &mut Vm,
820     ) -> std::result::Result<(), MigratableError>
821     where
822         T: Read + Write,
823     {
824         // Read table
825         let table = MemoryRangeTable::read_from(socket, req.length())?;
826 
827         // And then read the memory itself
828         vm.receive_memory_regions(&table, socket).map_err(|e| {
829             Response::error().write_to(socket).ok();
830             e
831         })?;
832         Response::ok().write_to(socket)?;
833         Ok(())
834     }
835 
836     fn socket_url_to_path(url: &str) -> result::Result<PathBuf, MigratableError> {
837         url.strip_prefix("unix:")
838             .ok_or_else(|| {
839                 MigratableError::MigrateSend(anyhow!("Could not extract path from URL: {}", url))
840             })
841             .map(|s| s.into())
842     }
843 
844     fn vm_receive_migration(
845         &mut self,
846         receive_data_migration: VmReceiveMigrationData,
847     ) -> result::Result<(), MigratableError> {
848         info!(
849             "Receiving migration: receiver_url = {}",
850             receive_data_migration.receiver_url
851         );
852 
853         let path = Self::socket_url_to_path(&receive_data_migration.receiver_url)?;
854         let listener = UnixListener::bind(&path).map_err(|e| {
855             MigratableError::MigrateReceive(anyhow!("Error binding to UNIX socket: {}", e))
856         })?;
857         let (mut socket, _addr) = listener.accept().map_err(|e| {
858             MigratableError::MigrateReceive(anyhow!("Error accepting on UNIX socket: {}", e))
859         })?;
860         std::fs::remove_file(&path).map_err(|e| {
861             MigratableError::MigrateReceive(anyhow!("Error unlinking UNIX socket: {}", e))
862         })?;
863 
864         let mut started = false;
865         let mut vm: Option<Vm> = None;
866 
867         loop {
868             let req = Request::read_from(&mut socket)?;
869             match req.command() {
870                 Command::Invalid => info!("Invalid Command Received"),
871                 Command::Start => {
872                     info!("Start Command Received");
873                     started = true;
874 
875                     Response::ok().write_to(&mut socket)?;
876                 }
877                 Command::Config => {
878                     info!("Config Command Received");
879 
880                     if !started {
881                         warn!("Migration not started yet");
882                         Response::error().write_to(&mut socket)?;
883                         continue;
884                     }
885                     vm = Some(self.vm_receive_config(&req, &mut socket)?);
886                 }
887                 Command::State => {
888                     info!("State Command Received");
889 
890                     if !started {
891                         warn!("Migration not started yet");
892                         Response::error().write_to(&mut socket)?;
893                         continue;
894                     }
895                     if let Some(vm) = vm.take() {
896                         self.vm_receive_state(&req, &mut socket, vm)?;
897                     } else {
898                         warn!("Configuration not sent yet");
899                         Response::error().write_to(&mut socket)?;
900                     }
901                 }
902                 Command::Memory => {
903                     info!("Memory Command Received");
904 
905                     if !started {
906                         warn!("Migration not started yet");
907                         Response::error().write_to(&mut socket)?;
908                         continue;
909                     }
910                     if let Some(ref mut vm) = vm.as_mut() {
911                         self.vm_receive_memory(&req, &mut socket, vm)?;
912                     } else {
913                         warn!("Configuration not sent yet");
914                         Response::error().write_to(&mut socket)?;
915                     }
916                 }
917                 Command::Complete => {
918                     info!("Complete Command Received");
919                     if let Some(ref mut vm) = self.vm.as_mut() {
920                         vm.resume()?;
921                         Response::ok().write_to(&mut socket)?;
922                     } else {
923                         warn!("VM not created yet");
924                         Response::error().write_to(&mut socket)?;
925                     }
926                     break;
927                 }
928                 Command::Abandon => {
929                     info!("Abandon Command Received");
930                     self.vm = None;
931                     self.vm_config = None;
932                     Response::ok().write_to(&mut socket).ok();
933                     break;
934                 }
935             }
936         }
937 
938         Ok(())
939     }
940 
941     // Returns true if there were dirty pages to send
942     fn vm_maybe_send_dirty_pages<T>(
943         vm: &mut Vm,
944         socket: &mut T,
945     ) -> result::Result<bool, MigratableError>
946     where
947         T: Read + Write,
948     {
949         // Send (dirty) memory table
950         let table = vm.dirty_memory_range_table()?;
951 
952         // But if there are no regions go straight to pause
953         if table.regions().is_empty() {
954             return Ok(false);
955         }
956 
957         Request::memory(table.length()).write_to(socket).unwrap();
958         table.write_to(socket)?;
959         // And then the memory itself
960         vm.send_memory_regions(&table, socket)?;
961         let res = Response::read_from(socket)?;
962         if res.status() != Status::Ok {
963             warn!("Error during dirty memory migration");
964             Request::abandon().write_to(socket)?;
965             Response::read_from(socket).ok();
966             return Err(MigratableError::MigrateSend(anyhow!(
967                 "Error during dirty memory migration"
968             )));
969         }
970 
971         Ok(true)
972     }
973 
974     fn vm_send_migration(
975         &mut self,
976         send_data_migration: VmSendMigrationData,
977     ) -> result::Result<(), MigratableError> {
978         info!(
979             "Sending migration: destination_url = {}",
980             send_data_migration.destination_url
981         );
982         if let Some(ref mut vm) = self.vm {
983             let path = Self::socket_url_to_path(&send_data_migration.destination_url)?;
984             let mut socket = UnixStream::connect(&path).map_err(|e| {
985                 MigratableError::MigrateSend(anyhow!("Error connecting to UNIX socket: {}", e))
986             })?;
987 
988             // Start the migration
989             Request::start().write_to(&mut socket)?;
990             let res = Response::read_from(&mut socket)?;
991             if res.status() != Status::Ok {
992                 warn!("Error starting migration");
993                 Request::abandon().write_to(&mut socket)?;
994                 Response::read_from(&mut socket).ok();
995                 return Err(MigratableError::MigrateSend(anyhow!(
996                     "Error starting migration"
997                 )));
998             }
999 
1000             // Send config
1001             let config_data = serde_json::to_vec(&vm.get_config()).unwrap();
1002             Request::config(config_data.len() as u64).write_to(&mut socket)?;
1003             socket
1004                 .write_all(&config_data)
1005                 .map_err(MigratableError::MigrateSocket)?;
1006             let res = Response::read_from(&mut socket)?;
1007             if res.status() != Status::Ok {
1008                 warn!("Error during config migration");
1009                 Request::abandon().write_to(&mut socket)?;
1010                 Response::read_from(&mut socket).ok();
1011                 return Err(MigratableError::MigrateSend(anyhow!(
1012                     "Error during config migration"
1013                 )));
1014             }
1015 
1016             // Start logging dirty pages
1017             vm.start_memory_dirty_log()?;
1018 
1019             // Send memory table
1020             let table = vm.memory_range_table()?;
1021             Request::memory(table.length())
1022                 .write_to(&mut socket)
1023                 .unwrap();
1024             table.write_to(&mut socket)?;
1025             // And then the memory itself
1026             vm.send_memory_regions(&table, &mut socket)?;
1027             let res = Response::read_from(&mut socket)?;
1028             if res.status() != Status::Ok {
1029                 warn!("Error during memory migration");
1030                 Request::abandon().write_to(&mut socket)?;
1031                 Response::read_from(&mut socket).ok();
1032                 return Err(MigratableError::MigrateSend(anyhow!(
1033                     "Error during memory migration"
1034                 )));
1035             }
1036 
1037             // Try at most 5 passes of dirty memory sending
1038             const MAX_DIRTY_MIGRATIONS: usize = 5;
1039             for i in 0..MAX_DIRTY_MIGRATIONS {
1040                 info!("Dirty memory migration {} of {}", i, MAX_DIRTY_MIGRATIONS);
1041                 if !Self::vm_maybe_send_dirty_pages(vm, &mut socket)? {
1042                     break;
1043                 }
1044             }
1045 
1046             // Now pause VM
1047             vm.pause()?;
1048 
1049             // Send last batch of dirty pages
1050             Self::vm_maybe_send_dirty_pages(vm, &mut socket)?;
1051 
1052             // Capture snapshot and send it
1053             let vm_snapshot = vm.snapshot()?;
1054             let snapshot_data = serde_json::to_vec(&vm_snapshot).unwrap();
1055             Request::state(snapshot_data.len() as u64).write_to(&mut socket)?;
1056             socket
1057                 .write_all(&snapshot_data)
1058                 .map_err(MigratableError::MigrateSocket)?;
1059             let res = Response::read_from(&mut socket)?;
1060             if res.status() != Status::Ok {
1061                 warn!("Error during state migration");
1062                 Request::abandon().write_to(&mut socket)?;
1063                 Response::read_from(&mut socket).ok();
1064                 return Err(MigratableError::MigrateSend(anyhow!(
1065                     "Error during state migration"
1066                 )));
1067             }
1068 
1069             // Complete the migration
1070             Request::complete().write_to(&mut socket)?;
1071             let res = Response::read_from(&mut socket)?;
1072             if res.status() != Status::Ok {
1073                 warn!("Error completing migration");
1074                 Request::abandon().write_to(&mut socket)?;
1075                 Response::read_from(&mut socket).ok();
1076                 return Err(MigratableError::MigrateSend(anyhow!(
1077                     "Error completing migration"
1078                 )));
1079             }
1080             info!("Migration complete");
1081             Ok(())
1082         } else {
1083             Err(MigratableError::MigrateSend(anyhow!("VM is not running")))
1084         }
1085     }
1086 
1087     fn control_loop(&mut self, api_receiver: Arc<Receiver<ApiRequest>>) -> Result<()> {
1088         const EPOLL_EVENTS_LEN: usize = 100;
1089 
1090         let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); EPOLL_EVENTS_LEN];
1091         let epoll_fd = self.epoll.as_raw_fd();
1092 
1093         'outer: loop {
1094             let num_events = match epoll::wait(epoll_fd, -1, &mut events[..]) {
1095                 Ok(res) => res,
1096                 Err(e) => {
1097                     if e.kind() == io::ErrorKind::Interrupted {
1098                         // It's well defined from the epoll_wait() syscall
1099                         // documentation that the epoll loop can be interrupted
1100                         // before any of the requested events occurred or the
1101                         // timeout expired. In both those cases, epoll_wait()
1102                         // returns an error of type EINTR, but this should not
1103                         // be considered as a regular error. Instead it is more
1104                         // appropriate to retry, by calling into epoll_wait().
1105                         continue;
1106                     }
1107                     return Err(Error::Epoll(e));
1108                 }
1109             };
1110 
1111             for event in events.iter().take(num_events) {
1112                 let dispatch_idx = event.data as usize;
1113 
1114                 if let Some(dispatch_type) = self.epoll.dispatch_table[dispatch_idx] {
1115                     match dispatch_type {
1116                         EpollDispatch::Exit => {
1117                             info!("VM exit event");
1118                             // Consume the event.
1119                             self.exit_evt.read().map_err(Error::EventFdRead)?;
1120                             self.vmm_shutdown().map_err(Error::VmmShutdown)?;
1121 
1122                             break 'outer;
1123                         }
1124                         EpollDispatch::Reset => {
1125                             info!("VM reset event");
1126                             // Consume the event.
1127                             self.reset_evt.read().map_err(Error::EventFdRead)?;
1128                             self.vm_reboot().map_err(Error::VmReboot)?;
1129                         }
1130                         EpollDispatch::Stdin => {
1131                             if let Some(ref vm) = self.vm {
1132                                 vm.handle_stdin().map_err(Error::Stdin)?;
1133                             }
1134                         }
1135                         EpollDispatch::ActivateVirtioDevices => {
1136                             if let Some(ref vm) = self.vm {
1137                                 let count = self.activate_evt.read().map_err(Error::EventFdRead)?;
1138                                 info!(
1139                                     "Trying to activate pending virtio devices: count = {}",
1140                                     count
1141                                 );
1142                                 vm.activate_virtio_devices()
1143                                     .map_err(Error::ActivateVirtioDevices)?;
1144                             }
1145                         }
1146                         EpollDispatch::Pty => {
1147                             if let Some(ref vm) = self.vm {
1148                                 vm.handle_pty().map_err(Error::Pty)?;
1149                             }
1150                         }
1151                         EpollDispatch::Api => {
1152                             // Consume the event.
1153                             self.api_evt.read().map_err(Error::EventFdRead)?;
1154 
1155                             // Read from the API receiver channel
1156                             let api_request = api_receiver.recv().map_err(Error::ApiRequestRecv)?;
1157 
1158                             info!("API request event: {:?}", api_request);
1159                             match api_request {
1160                                 ApiRequest::VmCreate(config, sender) => {
1161                                     let response = self
1162                                         .vm_create(config)
1163                                         .map_err(ApiError::VmCreate)
1164                                         .map(|_| ApiResponsePayload::Empty);
1165 
1166                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1167                                 }
1168                                 ApiRequest::VmDelete(sender) => {
1169                                     let response = self
1170                                         .vm_delete()
1171                                         .map_err(ApiError::VmDelete)
1172                                         .map(|_| ApiResponsePayload::Empty);
1173 
1174                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1175                                 }
1176                                 ApiRequest::VmBoot(sender) => {
1177                                     let response = self
1178                                         .vm_boot()
1179                                         .map_err(ApiError::VmBoot)
1180                                         .map(|_| ApiResponsePayload::Empty);
1181 
1182                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1183                                 }
1184                                 ApiRequest::VmShutdown(sender) => {
1185                                     let response = self
1186                                         .vm_shutdown()
1187                                         .map_err(ApiError::VmShutdown)
1188                                         .map(|_| ApiResponsePayload::Empty);
1189 
1190                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1191                                 }
1192                                 ApiRequest::VmReboot(sender) => {
1193                                     let response = self
1194                                         .vm_reboot()
1195                                         .map_err(ApiError::VmReboot)
1196                                         .map(|_| ApiResponsePayload::Empty);
1197 
1198                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1199                                 }
1200                                 ApiRequest::VmInfo(sender) => {
1201                                     let response = self
1202                                         .vm_info()
1203                                         .map_err(ApiError::VmInfo)
1204                                         .map(ApiResponsePayload::VmInfo);
1205 
1206                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1207                                 }
1208                                 ApiRequest::VmmPing(sender) => {
1209                                     let response = ApiResponsePayload::VmmPing(self.vmm_ping());
1210 
1211                                     sender.send(Ok(response)).map_err(Error::ApiResponseSend)?;
1212                                 }
1213                                 ApiRequest::VmPause(sender) => {
1214                                     let response = self
1215                                         .vm_pause()
1216                                         .map_err(ApiError::VmPause)
1217                                         .map(|_| ApiResponsePayload::Empty);
1218 
1219                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1220                                 }
1221                                 ApiRequest::VmResume(sender) => {
1222                                     let response = self
1223                                         .vm_resume()
1224                                         .map_err(ApiError::VmResume)
1225                                         .map(|_| ApiResponsePayload::Empty);
1226 
1227                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1228                                 }
1229                                 ApiRequest::VmSnapshot(snapshot_data, sender) => {
1230                                     let response = self
1231                                         .vm_snapshot(&snapshot_data.destination_url)
1232                                         .map_err(ApiError::VmSnapshot)
1233                                         .map(|_| ApiResponsePayload::Empty);
1234 
1235                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1236                                 }
1237                                 ApiRequest::VmRestore(restore_data, sender) => {
1238                                     let response = self
1239                                         .vm_restore(restore_data.as_ref().clone())
1240                                         .map_err(ApiError::VmRestore)
1241                                         .map(|_| ApiResponsePayload::Empty);
1242 
1243                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1244                                 }
1245                                 ApiRequest::VmmShutdown(sender) => {
1246                                     let response = self
1247                                         .vmm_shutdown()
1248                                         .map_err(ApiError::VmmShutdown)
1249                                         .map(|_| ApiResponsePayload::Empty);
1250 
1251                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1252 
1253                                     break 'outer;
1254                                 }
1255                                 ApiRequest::VmResize(resize_data, sender) => {
1256                                     let response = self
1257                                         .vm_resize(
1258                                             resize_data.desired_vcpus,
1259                                             resize_data.desired_ram,
1260                                             resize_data.desired_balloon,
1261                                         )
1262                                         .map_err(ApiError::VmResize)
1263                                         .map(|_| ApiResponsePayload::Empty);
1264                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1265                                 }
1266                                 ApiRequest::VmResizeZone(resize_zone_data, sender) => {
1267                                     let response = self
1268                                         .vm_resize_zone(
1269                                             resize_zone_data.id.clone(),
1270                                             resize_zone_data.desired_ram,
1271                                         )
1272                                         .map_err(ApiError::VmResizeZone)
1273                                         .map(|_| ApiResponsePayload::Empty);
1274                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1275                                 }
1276                                 ApiRequest::VmAddDevice(add_device_data, sender) => {
1277                                     let response = self
1278                                         .vm_add_device(add_device_data.as_ref().clone())
1279                                         .map_err(ApiError::VmAddDevice)
1280                                         .map(ApiResponsePayload::VmAction);
1281                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1282                                 }
1283                                 ApiRequest::VmRemoveDevice(remove_device_data, sender) => {
1284                                     let response = self
1285                                         .vm_remove_device(remove_device_data.id.clone())
1286                                         .map_err(ApiError::VmRemoveDevice)
1287                                         .map(|_| ApiResponsePayload::Empty);
1288                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1289                                 }
1290                                 ApiRequest::VmAddDisk(add_disk_data, sender) => {
1291                                     let response = self
1292                                         .vm_add_disk(add_disk_data.as_ref().clone())
1293                                         .map_err(ApiError::VmAddDisk)
1294                                         .map(ApiResponsePayload::VmAction);
1295                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1296                                 }
1297                                 ApiRequest::VmAddFs(add_fs_data, sender) => {
1298                                     let response = self
1299                                         .vm_add_fs(add_fs_data.as_ref().clone())
1300                                         .map_err(ApiError::VmAddFs)
1301                                         .map(ApiResponsePayload::VmAction);
1302                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1303                                 }
1304                                 ApiRequest::VmAddPmem(add_pmem_data, sender) => {
1305                                     let response = self
1306                                         .vm_add_pmem(add_pmem_data.as_ref().clone())
1307                                         .map_err(ApiError::VmAddPmem)
1308                                         .map(ApiResponsePayload::VmAction);
1309                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1310                                 }
1311                                 ApiRequest::VmAddNet(add_net_data, sender) => {
1312                                     let response = self
1313                                         .vm_add_net(add_net_data.as_ref().clone())
1314                                         .map_err(ApiError::VmAddNet)
1315                                         .map(ApiResponsePayload::VmAction);
1316                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1317                                 }
1318                                 ApiRequest::VmAddVsock(add_vsock_data, sender) => {
1319                                     let response = self
1320                                         .vm_add_vsock(add_vsock_data.as_ref().clone())
1321                                         .map_err(ApiError::VmAddVsock)
1322                                         .map(ApiResponsePayload::VmAction);
1323                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1324                                 }
1325                                 ApiRequest::VmCounters(sender) => {
1326                                     let response = self
1327                                         .vm_counters()
1328                                         .map_err(ApiError::VmInfo)
1329                                         .map(ApiResponsePayload::VmAction);
1330 
1331                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1332                                 }
1333                                 ApiRequest::VmReceiveMigration(receive_migration_data, sender) => {
1334                                     let response = self
1335                                         .vm_receive_migration(
1336                                             receive_migration_data.as_ref().clone(),
1337                                         )
1338                                         .map_err(ApiError::VmReceiveMigration)
1339                                         .map(|_| ApiResponsePayload::Empty);
1340                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1341                                 }
1342                                 ApiRequest::VmSendMigration(send_migration_data, sender) => {
1343                                     let response = self
1344                                         .vm_send_migration(send_migration_data.as_ref().clone())
1345                                         .map_err(ApiError::VmSendMigration)
1346                                         .map(|_| ApiResponsePayload::Empty);
1347                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1348                                 }
1349                                 ApiRequest::VmPowerButton(sender) => {
1350                                     let response = self
1351                                         .vm_power_button()
1352                                         .map_err(ApiError::VmPowerButton)
1353                                         .map(|_| ApiResponsePayload::Empty);
1354 
1355                                     sender.send(response).map_err(Error::ApiResponseSend)?;
1356                                 }
1357                             }
1358                         }
1359                     }
1360                 }
1361             }
1362         }
1363 
1364         Ok(())
1365     }
1366 }
1367 
1368 const CPU_MANAGER_SNAPSHOT_ID: &str = "cpu-manager";
1369 const MEMORY_MANAGER_SNAPSHOT_ID: &str = "memory-manager";
1370 const DEVICE_MANAGER_SNAPSHOT_ID: &str = "device-manager";
1371