xref: /cloud-hypervisor/test_infra/src/lib.rs (revision eea9bcea38e0c5649f444c829f3a4f9c22aa486c)
1 // Copyright © 2021 Intel Corporation
2 //
3 // SPDX-License-Identifier: Apache-2.0
4 //
5 
6 use once_cell::sync::Lazy;
7 use serde_json::Value;
8 use ssh2::Session;
9 use std::env;
10 use std::ffi::OsStr;
11 use std::io;
12 use std::io::{Read, Write};
13 use std::net::TcpListener;
14 use std::net::TcpStream;
15 use std::os::unix::fs::PermissionsExt;
16 use std::os::unix::io::{AsRawFd, FromRawFd};
17 use std::path::Path;
18 use std::process::{Child, Command, ExitStatus, Output, Stdio};
19 use std::str::FromStr;
20 use std::sync::Mutex;
21 use std::thread;
22 use std::time::Duration;
23 use std::{fmt, fs};
24 use vmm_sys_util::tempdir::TempDir;
25 use wait_timeout::ChildExt;
26 
27 #[derive(Debug)]
28 pub enum WaitTimeoutError {
29     Timedout,
30     ExitStatus,
31     General(std::io::Error),
32 }
33 
34 #[derive(Debug)]
35 pub enum Error {
36     Parsing(std::num::ParseIntError),
37     SshCommand(SshCommandError),
38     WaitForBoot(WaitForBootError),
39     EthrLogFile(std::io::Error),
40     EthrLogParse,
41     FioOutputParse,
42     Iperf3Parse,
43     Spawn(std::io::Error),
44     WaitTimeout(WaitTimeoutError),
45 }
46 
47 impl From<SshCommandError> for Error {
48     fn from(e: SshCommandError) -> Self {
49         Self::SshCommand(e)
50     }
51 }
52 
53 pub struct GuestNetworkConfig {
54     pub guest_ip: String,
55     pub l2_guest_ip1: String,
56     pub l2_guest_ip2: String,
57     pub l2_guest_ip3: String,
58     pub host_ip: String,
59     pub guest_mac: String,
60     pub l2_guest_mac1: String,
61     pub l2_guest_mac2: String,
62     pub l2_guest_mac3: String,
63     pub tcp_listener_port: u16,
64 }
65 
66 pub const DEFAULT_TCP_LISTENER_MESSAGE: &str = "booted";
67 pub const DEFAULT_TCP_LISTENER_PORT: u16 = 8000;
68 pub const DEFAULT_TCP_LISTENER_TIMEOUT: i32 = 120;
69 
70 #[derive(Debug)]
71 pub enum WaitForBootError {
72     EpollWait(std::io::Error),
73     Listen(std::io::Error),
74     EpollWaitTimeout,
75     WrongGuestAddr,
76     Accept(std::io::Error),
77 }
78 
79 impl GuestNetworkConfig {
80     pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), WaitForBootError> {
81         let start = std::time::Instant::now();
82         // The 'port' is unique per 'GUEST' and listening to wild-card ip avoids retrying on 'TcpListener::bind()'
83         let listen_addr = format!("0.0.0.0:{}", self.tcp_listener_port);
84         let expected_guest_addr = self.guest_ip.as_str();
85         let mut s = String::new();
86         let timeout = match custom_timeout {
87             Some(t) => t,
88             None => DEFAULT_TCP_LISTENER_TIMEOUT,
89         };
90 
91         match (|| -> Result<(), WaitForBootError> {
92             let listener =
93                 TcpListener::bind(listen_addr.as_str()).map_err(WaitForBootError::Listen)?;
94             listener
95                 .set_nonblocking(true)
96                 .expect("Cannot set non-blocking for tcp listener");
97 
98             // Reply on epoll w/ timeout to wait for guest connections faithfully
99             let epoll_fd = epoll::create(true).expect("Cannot create epoll fd");
100             // Use 'File' to enforce closing on 'epoll_fd'
101             let _epoll_file = unsafe { fs::File::from_raw_fd(epoll_fd) };
102             epoll::ctl(
103                 epoll_fd,
104                 epoll::ControlOptions::EPOLL_CTL_ADD,
105                 listener.as_raw_fd(),
106                 epoll::Event::new(epoll::Events::EPOLLIN, 0),
107             )
108             .expect("Cannot add 'tcp_listener' event to epoll");
109             let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); 1];
110             loop {
111                 let num_events = match epoll::wait(epoll_fd, timeout * 1000_i32, &mut events[..]) {
112                     Ok(num_events) => Ok(num_events),
113                     Err(e) => match e.raw_os_error() {
114                         Some(libc::EAGAIN) | Some(libc::EINTR) => continue,
115                         _ => Err(e),
116                     },
117                 }
118                 .map_err(WaitForBootError::EpollWait)?;
119                 if num_events == 0 {
120                     return Err(WaitForBootError::EpollWaitTimeout);
121                 }
122                 break;
123             }
124 
125             match listener.accept() {
126                 Ok((_, addr)) => {
127                     // Make sure the connection is from the expected 'guest_addr'
128                     if addr.ip() != std::net::IpAddr::from_str(expected_guest_addr).unwrap() {
129                         s = format!(
130                             "Expecting the guest ip '{}' while being connected with ip '{}'",
131                             expected_guest_addr,
132                             addr.ip()
133                         );
134                         return Err(WaitForBootError::WrongGuestAddr);
135                     }
136 
137                     Ok(())
138                 }
139                 Err(e) => {
140                     s = "TcpListener::accept() failed".to_string();
141                     Err(WaitForBootError::Accept(e))
142                 }
143             }
144         })() {
145             Err(e) => {
146                 let duration = start.elapsed();
147                 eprintln!(
148                     "\n\n==== Start 'wait_vm_boot' (FAILED) ====\n\n\
149                  duration =\"{:?}, timeout = {}s\"\n\
150                  listen_addr=\"{}\"\n\
151                  expected_guest_addr=\"{}\"\n\
152                  message=\"{}\"\n\
153                  error=\"{:?}\"\n\
154                  \n==== End 'wait_vm_boot' outout ====\n\n",
155                     duration, timeout, listen_addr, expected_guest_addr, s, e
156                 );
157 
158                 Err(e)
159             }
160             Ok(_) => Ok(()),
161         }
162     }
163 }
164 
165 pub enum DiskType {
166     OperatingSystem,
167     CloudInit,
168 }
169 
170 pub trait DiskConfig {
171     fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig);
172     fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String;
173     fn disk(&self, disk_type: DiskType) -> Option<String>;
174 }
175 
176 #[derive(Clone)]
177 pub struct UbuntuDiskConfig {
178     osdisk_path: String,
179     cloudinit_path: String,
180     image_name: String,
181 }
182 
183 impl UbuntuDiskConfig {
184     pub fn new(image_name: String) -> Self {
185         UbuntuDiskConfig {
186             image_name,
187             osdisk_path: String::new(),
188             cloudinit_path: String::new(),
189         }
190     }
191 }
192 
193 pub struct WindowsDiskConfig {
194     image_name: String,
195     osdisk_path: String,
196     loopback_device: String,
197     windows_snapshot_cow: String,
198     windows_snapshot: String,
199 }
200 
201 impl WindowsDiskConfig {
202     pub fn new(image_name: String) -> Self {
203         WindowsDiskConfig {
204             image_name,
205             osdisk_path: String::new(),
206             loopback_device: String::new(),
207             windows_snapshot_cow: String::new(),
208             windows_snapshot: String::new(),
209         }
210     }
211 }
212 
213 impl Drop for WindowsDiskConfig {
214     fn drop(&mut self) {
215         // dmsetup remove windows-snapshot-1
216         std::process::Command::new("dmsetup")
217             .arg("remove")
218             .arg(self.windows_snapshot.as_str())
219             .output()
220             .expect("Expect removing Windows snapshot with 'dmsetup' to succeed");
221 
222         // dmsetup remove windows-snapshot-cow-1
223         std::process::Command::new("dmsetup")
224             .arg("remove")
225             .arg(self.windows_snapshot_cow.as_str())
226             .output()
227             .expect("Expect removing Windows snapshot CoW with 'dmsetup' to succeed");
228 
229         // losetup -d <loopback_device>
230         std::process::Command::new("losetup")
231             .args(["-d", self.loopback_device.as_str()])
232             .output()
233             .expect("Expect removing loopback device to succeed");
234     }
235 }
236 
237 impl DiskConfig for UbuntuDiskConfig {
238     fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String {
239         let cloudinit_file_path =
240             String::from(tmp_dir.as_path().join("cloudinit").to_str().unwrap());
241 
242         let cloud_init_directory = tmp_dir.as_path().join("cloud-init").join("ubuntu");
243 
244         fs::create_dir_all(&cloud_init_directory)
245             .expect("Expect creating cloud-init directory to succeed");
246 
247         let source_file_dir = std::env::current_dir()
248             .unwrap()
249             .join("test_data")
250             .join("cloud-init")
251             .join("ubuntu");
252 
253         vec!["meta-data"].iter().for_each(|x| {
254             rate_limited_copy(source_file_dir.join(x), cloud_init_directory.join(x))
255                 .expect("Expect copying cloud-init meta-data to succeed");
256         });
257 
258         let mut user_data_string = String::new();
259         fs::File::open(source_file_dir.join("user-data"))
260             .unwrap()
261             .read_to_string(&mut user_data_string)
262             .expect("Expected reading user-data file in to succeed");
263         user_data_string = user_data_string.replace(
264             "@DEFAULT_TCP_LISTENER_MESSAGE",
265             DEFAULT_TCP_LISTENER_MESSAGE,
266         );
267         user_data_string = user_data_string.replace("@HOST_IP", &network.host_ip);
268         user_data_string =
269             user_data_string.replace("@TCP_LISTENER_PORT", &network.tcp_listener_port.to_string());
270 
271         fs::File::create(cloud_init_directory.join("user-data"))
272             .unwrap()
273             .write_all(user_data_string.as_bytes())
274             .expect("Expected writing out user-data to succeed");
275 
276         let mut network_config_string = String::new();
277 
278         fs::File::open(source_file_dir.join("network-config"))
279             .unwrap()
280             .read_to_string(&mut network_config_string)
281             .expect("Expected reading network-config file in to succeed");
282 
283         network_config_string = network_config_string.replace("192.168.2.1", &network.host_ip);
284         network_config_string = network_config_string.replace("192.168.2.2", &network.guest_ip);
285         network_config_string = network_config_string.replace("192.168.2.3", &network.l2_guest_ip1);
286         network_config_string = network_config_string.replace("192.168.2.4", &network.l2_guest_ip2);
287         network_config_string = network_config_string.replace("192.168.2.5", &network.l2_guest_ip3);
288         network_config_string =
289             network_config_string.replace("12:34:56:78:90:ab", &network.guest_mac);
290         network_config_string =
291             network_config_string.replace("de:ad:be:ef:12:34", &network.l2_guest_mac1);
292         network_config_string =
293             network_config_string.replace("de:ad:be:ef:34:56", &network.l2_guest_mac2);
294         network_config_string =
295             network_config_string.replace("de:ad:be:ef:56:78", &network.l2_guest_mac3);
296 
297         fs::File::create(cloud_init_directory.join("network-config"))
298             .unwrap()
299             .write_all(network_config_string.as_bytes())
300             .expect("Expected writing out network-config to succeed");
301 
302         std::process::Command::new("mkdosfs")
303             .args(["-n", "cidata"])
304             .args(["-C", cloudinit_file_path.as_str()])
305             .arg("8192")
306             .output()
307             .expect("Expect creating disk image to succeed");
308 
309         vec!["user-data", "meta-data", "network-config"]
310             .iter()
311             .for_each(|x| {
312                 std::process::Command::new("mcopy")
313                     .arg("-o")
314                     .args(["-i", cloudinit_file_path.as_str()])
315                     .args(["-s", cloud_init_directory.join(x).to_str().unwrap(), "::"])
316                     .output()
317                     .expect("Expect copying files to disk image to succeed");
318             });
319 
320         cloudinit_file_path
321     }
322 
323     fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig) {
324         let mut workload_path = dirs::home_dir().unwrap();
325         workload_path.push("workloads");
326 
327         let mut osdisk_base_path = workload_path;
328         osdisk_base_path.push(&self.image_name);
329 
330         let osdisk_path = String::from(tmp_dir.as_path().join("osdisk.img").to_str().unwrap());
331         let cloudinit_path = self.prepare_cloudinit(tmp_dir, network);
332 
333         rate_limited_copy(osdisk_base_path, &osdisk_path)
334             .expect("copying of OS source disk image failed");
335 
336         self.cloudinit_path = cloudinit_path;
337         self.osdisk_path = osdisk_path;
338     }
339 
340     fn disk(&self, disk_type: DiskType) -> Option<String> {
341         match disk_type {
342             DiskType::OperatingSystem => Some(self.osdisk_path.clone()),
343             DiskType::CloudInit => Some(self.cloudinit_path.clone()),
344         }
345     }
346 }
347 
348 impl DiskConfig for WindowsDiskConfig {
349     fn prepare_cloudinit(&self, _tmp_dir: &TempDir, _network: &GuestNetworkConfig) -> String {
350         String::new()
351     }
352 
353     fn prepare_files(&mut self, tmp_dir: &TempDir, _network: &GuestNetworkConfig) {
354         let mut workload_path = dirs::home_dir().unwrap();
355         workload_path.push("workloads");
356 
357         let mut osdisk_path = workload_path;
358         osdisk_path.push(&self.image_name);
359 
360         let osdisk_blk_size = fs::metadata(osdisk_path)
361             .expect("Expect retrieving Windows image metadata")
362             .len()
363             >> 9;
364 
365         let snapshot_cow_path =
366             String::from(tmp_dir.as_path().join("snapshot_cow").to_str().unwrap());
367 
368         // Create and truncate CoW file for device mapper
369         let cow_file_size: u64 = 1 << 30;
370         let cow_file_blk_size = cow_file_size >> 9;
371         let cow_file = std::fs::File::create(snapshot_cow_path.as_str())
372             .expect("Expect creating CoW image to succeed");
373         cow_file
374             .set_len(cow_file_size)
375             .expect("Expect truncating CoW image to succeed");
376 
377         // losetup --find --show /tmp/snapshot_cow
378         let loopback_device = std::process::Command::new("losetup")
379             .arg("--find")
380             .arg("--show")
381             .arg(snapshot_cow_path.as_str())
382             .output()
383             .expect("Expect creating loopback device from snapshot CoW image to succeed");
384 
385         self.loopback_device = String::from_utf8_lossy(&loopback_device.stdout)
386             .trim()
387             .to_string();
388 
389         let random_extension = tmp_dir.as_path().file_name().unwrap();
390         let windows_snapshot_cow = format!(
391             "windows-snapshot-cow-{}",
392             random_extension.to_str().unwrap()
393         );
394 
395         // dmsetup create windows-snapshot-cow-1 --table '0 2097152 linear /dev/loop1 0'
396         std::process::Command::new("dmsetup")
397             .arg("create")
398             .arg(windows_snapshot_cow.as_str())
399             .args([
400                 "--table",
401                 format!("0 {} linear {} 0", cow_file_blk_size, self.loopback_device).as_str(),
402             ])
403             .output()
404             .expect("Expect creating Windows snapshot CoW with 'dmsetup' to succeed");
405 
406         let windows_snapshot = format!("windows-snapshot-{}", random_extension.to_str().unwrap());
407 
408         // dmsetup mknodes
409         std::process::Command::new("dmsetup")
410             .arg("mknodes")
411             .output()
412             .expect("Expect device mapper nodes to be ready");
413 
414         // dmsetup create windows-snapshot-1 --table '0 41943040 snapshot /dev/mapper/windows-base /dev/mapper/windows-snapshot-cow-1 P 8'
415         std::process::Command::new("dmsetup")
416             .arg("create")
417             .arg(windows_snapshot.as_str())
418             .args([
419                 "--table",
420                 format!(
421                     "0 {} snapshot /dev/mapper/windows-base /dev/mapper/{} P 8",
422                     osdisk_blk_size,
423                     windows_snapshot_cow.as_str()
424                 )
425                 .as_str(),
426             ])
427             .output()
428             .expect("Expect creating Windows snapshot with 'dmsetup' to succeed");
429 
430         // dmsetup mknodes
431         std::process::Command::new("dmsetup")
432             .arg("mknodes")
433             .output()
434             .expect("Expect device mapper nodes to be ready");
435 
436         self.osdisk_path = format!("/dev/mapper/{}", windows_snapshot);
437         self.windows_snapshot_cow = windows_snapshot_cow;
438         self.windows_snapshot = windows_snapshot;
439     }
440 
441     fn disk(&self, disk_type: DiskType) -> Option<String> {
442         match disk_type {
443             DiskType::OperatingSystem => Some(self.osdisk_path.clone()),
444             DiskType::CloudInit => None,
445         }
446     }
447 }
448 
449 pub fn rate_limited_copy<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<u64> {
450     for i in 0..10 {
451         let free_bytes = unsafe {
452             let mut stats = std::mem::MaybeUninit::zeroed();
453             let fs_name = std::ffi::CString::new("/tmp").unwrap();
454             libc::statvfs(fs_name.as_ptr(), stats.as_mut_ptr());
455 
456             let free_blocks = stats.assume_init().f_bfree;
457             let block_size = stats.assume_init().f_bsize;
458 
459             free_blocks * block_size
460         };
461 
462         // Make sure there is at least 6 GiB of space
463         if free_bytes < 6 << 30 {
464             eprintln!(
465                 "Not enough space on disk ({}). Attempt {} of 10. Sleeping.",
466                 free_bytes, i
467             );
468             thread::sleep(std::time::Duration::new(60, 0));
469             continue;
470         }
471 
472         match fs::copy(&from, &to) {
473             Err(e) => {
474                 if let Some(errno) = e.raw_os_error() {
475                     if errno == libc::ENOSPC {
476                         eprintln!("Copy returned ENOSPC. Attempt {} of 10. Sleeping.", i);
477                         thread::sleep(std::time::Duration::new(60, 0));
478                         continue;
479                     }
480                 }
481                 return Err(e);
482             }
483             Ok(i) => return Ok(i),
484         }
485     }
486     Err(io::Error::last_os_error())
487 }
488 
489 pub fn handle_child_output(
490     r: Result<(), std::boxed::Box<dyn std::any::Any + std::marker::Send>>,
491     output: &std::process::Output,
492 ) {
493     use std::os::unix::process::ExitStatusExt;
494     if r.is_ok() && output.status.success() {
495         return;
496     }
497 
498     match output.status.code() {
499         None => {
500             // Don't treat child.kill() as a problem
501             if output.status.signal() == Some(9) && r.is_ok() {
502                 return;
503             }
504 
505             eprintln!(
506                 "==== child killed by signal: {} ====",
507                 output.status.signal().unwrap()
508             );
509         }
510         Some(code) => {
511             eprintln!("\n\n==== child exit code: {} ====", code);
512         }
513     }
514 
515     eprintln!(
516         "\n\n==== Start child stdout ====\n\n{}\n\n==== End child stdout ====",
517         String::from_utf8_lossy(&output.stdout)
518     );
519     eprintln!(
520         "\n\n==== Start child stderr ====\n\n{}\n\n==== End child stderr ====",
521         String::from_utf8_lossy(&output.stderr)
522     );
523 
524     panic!("Test failed")
525 }
526 
527 #[derive(Debug)]
528 pub struct PasswordAuth {
529     pub username: String,
530     pub password: String,
531 }
532 
533 pub const DEFAULT_SSH_RETRIES: u8 = 6;
534 pub const DEFAULT_SSH_TIMEOUT: u8 = 10;
535 
536 #[derive(Debug)]
537 pub enum SshCommandError {
538     Connection(std::io::Error),
539     Handshake(ssh2::Error),
540     Authentication(ssh2::Error),
541     ChannelSession(ssh2::Error),
542     Command(ssh2::Error),
543     ExitStatus(ssh2::Error),
544     NonZeroExitStatus(i32),
545     FileRead(std::io::Error),
546     FileMetadata(std::io::Error),
547     ScpSend(ssh2::Error),
548     WriteAll(std::io::Error),
549     SendEof(ssh2::Error),
550     WaitEof(ssh2::Error),
551 }
552 
553 fn scp_to_guest_with_auth(
554     path: &Path,
555     remote_path: &Path,
556     auth: &PasswordAuth,
557     ip: &str,
558     retries: u8,
559     timeout: u8,
560 ) -> Result<(), SshCommandError> {
561     let mut counter = 0;
562     loop {
563         match (|| -> Result<(), SshCommandError> {
564             let tcp =
565                 TcpStream::connect(format!("{}:22", ip)).map_err(SshCommandError::Connection)?;
566             let mut sess = Session::new().unwrap();
567             sess.set_tcp_stream(tcp);
568             sess.handshake().map_err(SshCommandError::Handshake)?;
569 
570             sess.userauth_password(&auth.username, &auth.password)
571                 .map_err(SshCommandError::Authentication)?;
572             assert!(sess.authenticated());
573 
574             let content = fs::read(path).map_err(SshCommandError::FileRead)?;
575             let mode = fs::metadata(path)
576                 .map_err(SshCommandError::FileMetadata)?
577                 .permissions()
578                 .mode()
579                 & 0o777;
580 
581             let mut channel = sess
582                 .scp_send(remote_path, mode as i32, content.len() as u64, None)
583                 .map_err(SshCommandError::ScpSend)?;
584             channel
585                 .write_all(&content)
586                 .map_err(SshCommandError::WriteAll)?;
587             channel.send_eof().map_err(SshCommandError::SendEof)?;
588             channel.wait_eof().map_err(SshCommandError::WaitEof)?;
589 
590             // Intentionally ignore these results here as their failure
591             // does not precipitate a repeat
592             let _ = channel.close();
593             let _ = channel.wait_close();
594 
595             Ok(())
596         })() {
597             Ok(_) => break,
598             Err(e) => {
599                 counter += 1;
600                 if counter >= retries {
601                     eprintln!(
602                         "\n\n==== Start scp command output (FAILED) ====\n\n\
603                          path =\"{:?}\"\n\
604                          remote_path =\"{:?}\"\n\
605                          auth=\"{:#?}\"\n\
606                          ip=\"{}\"\n\
607                          error=\"{:?}\"\n\
608                          \n==== End scp command outout ====\n\n",
609                         path, remote_path, auth, ip, e
610                     );
611 
612                     return Err(e);
613                 }
614             }
615         };
616         thread::sleep(std::time::Duration::new((timeout * counter).into(), 0));
617     }
618     Ok(())
619 }
620 
621 pub fn scp_to_guest(
622     path: &Path,
623     remote_path: &Path,
624     ip: &str,
625     retries: u8,
626     timeout: u8,
627 ) -> Result<(), SshCommandError> {
628     scp_to_guest_with_auth(
629         path,
630         remote_path,
631         &PasswordAuth {
632             username: String::from("cloud"),
633             password: String::from("cloud123"),
634         },
635         ip,
636         retries,
637         timeout,
638     )
639 }
640 
641 pub fn ssh_command_ip_with_auth(
642     command: &str,
643     auth: &PasswordAuth,
644     ip: &str,
645     retries: u8,
646     timeout: u8,
647 ) -> Result<String, SshCommandError> {
648     let mut s = String::new();
649 
650     let mut counter = 0;
651     loop {
652         match (|| -> Result<(), SshCommandError> {
653             let tcp =
654                 TcpStream::connect(format!("{}:22", ip)).map_err(SshCommandError::Connection)?;
655             let mut sess = Session::new().unwrap();
656             sess.set_tcp_stream(tcp);
657             sess.handshake().map_err(SshCommandError::Handshake)?;
658 
659             sess.userauth_password(&auth.username, &auth.password)
660                 .map_err(SshCommandError::Authentication)?;
661             assert!(sess.authenticated());
662 
663             let mut channel = sess
664                 .channel_session()
665                 .map_err(SshCommandError::ChannelSession)?;
666             channel.exec(command).map_err(SshCommandError::Command)?;
667 
668             // Intentionally ignore these results here as their failure
669             // does not precipitate a repeat
670             let _ = channel.read_to_string(&mut s);
671             let _ = channel.close();
672             let _ = channel.wait_close();
673 
674             let status = channel.exit_status().map_err(SshCommandError::ExitStatus)?;
675 
676             if status != 0 {
677                 Err(SshCommandError::NonZeroExitStatus(status))
678             } else {
679                 Ok(())
680             }
681         })() {
682             Ok(_) => break,
683             Err(e) => {
684                 counter += 1;
685                 if counter >= retries {
686                     eprintln!(
687                         "\n\n==== Start ssh command output (FAILED) ====\n\n\
688                          command=\"{}\"\n\
689                          auth=\"{:#?}\"\n\
690                          ip=\"{}\"\n\
691                          output=\"{}\"\n\
692                          error=\"{:?}\"\n\
693                          \n==== End ssh command outout ====\n\n",
694                         command, auth, ip, s, e
695                     );
696 
697                     return Err(e);
698                 }
699             }
700         };
701         thread::sleep(std::time::Duration::new((timeout * counter).into(), 0));
702     }
703     Ok(s)
704 }
705 
706 pub fn ssh_command_ip(
707     command: &str,
708     ip: &str,
709     retries: u8,
710     timeout: u8,
711 ) -> Result<String, SshCommandError> {
712     ssh_command_ip_with_auth(
713         command,
714         &PasswordAuth {
715             username: String::from("cloud"),
716             password: String::from("cloud123"),
717         },
718         ip,
719         retries,
720         timeout,
721     )
722 }
723 
724 pub fn exec_host_command_status(command: &str) -> ExitStatus {
725     std::process::Command::new("bash")
726         .args(["-c", command])
727         .status()
728         .unwrap_or_else(|_| panic!("Expected '{}' to run", command))
729 }
730 
731 pub fn exec_host_command_output(command: &str) -> Output {
732     std::process::Command::new("bash")
733         .args(["-c", command])
734         .output()
735         .unwrap_or_else(|_| panic!("Expected '{}' to run", command))
736 }
737 
738 pub const PIPE_SIZE: i32 = 32 << 20;
739 
740 static NEXT_VM_ID: Lazy<Mutex<u8>> = Lazy::new(|| Mutex::new(1));
741 
742 pub struct Guest {
743     pub tmp_dir: TempDir,
744     pub disk_config: Box<dyn DiskConfig>,
745     pub network: GuestNetworkConfig,
746 }
747 
748 // Safe to implement as we know we have no interior mutability
749 impl std::panic::RefUnwindSafe for Guest {}
750 
751 impl Guest {
752     pub fn new_from_ip_range(mut disk_config: Box<dyn DiskConfig>, class: &str, id: u8) -> Self {
753         let tmp_dir = TempDir::new_with_prefix("/tmp/ch").unwrap();
754 
755         let network = GuestNetworkConfig {
756             guest_ip: format!("{}.{}.2", class, id),
757             l2_guest_ip1: format!("{}.{}.3", class, id),
758             l2_guest_ip2: format!("{}.{}.4", class, id),
759             l2_guest_ip3: format!("{}.{}.5", class, id),
760             host_ip: format!("{}.{}.1", class, id),
761             guest_mac: format!("12:34:56:78:90:{:02x}", id),
762             l2_guest_mac1: format!("de:ad:be:ef:12:{:02x}", id),
763             l2_guest_mac2: format!("de:ad:be:ef:34:{:02x}", id),
764             l2_guest_mac3: format!("de:ad:be:ef:56:{:02x}", id),
765             tcp_listener_port: DEFAULT_TCP_LISTENER_PORT + id as u16,
766         };
767 
768         disk_config.prepare_files(&tmp_dir, &network);
769 
770         Guest {
771             tmp_dir,
772             disk_config,
773             network,
774         }
775     }
776 
777     pub fn new(disk_config: Box<dyn DiskConfig>) -> Self {
778         let mut guard = NEXT_VM_ID.lock().unwrap();
779         let id = *guard;
780         *guard = id + 1;
781 
782         Self::new_from_ip_range(disk_config, "192.168", id)
783     }
784 
785     pub fn default_net_string(&self) -> String {
786         format!(
787             "tap=,mac={},ip={},mask=255.255.255.0",
788             self.network.guest_mac, self.network.host_ip
789         )
790     }
791 
792     pub fn default_net_string_w_iommu(&self) -> String {
793         format!(
794             "tap=,mac={},ip={},mask=255.255.255.0,iommu=on",
795             self.network.guest_mac, self.network.host_ip
796         )
797     }
798 
799     pub fn default_net_string_w_mtu(&self, mtu: u16) -> String {
800         format!(
801             "tap=,mac={},ip={},mask=255.255.255.0,mtu={}",
802             self.network.guest_mac, self.network.host_ip, mtu
803         )
804     }
805 
806     pub fn ssh_command(&self, command: &str) -> Result<String, SshCommandError> {
807         ssh_command_ip(
808             command,
809             &self.network.guest_ip,
810             DEFAULT_SSH_RETRIES,
811             DEFAULT_SSH_TIMEOUT,
812         )
813     }
814 
815     #[cfg(target_arch = "x86_64")]
816     pub fn ssh_command_l1(&self, command: &str) -> Result<String, SshCommandError> {
817         ssh_command_ip(
818             command,
819             &self.network.guest_ip,
820             DEFAULT_SSH_RETRIES,
821             DEFAULT_SSH_TIMEOUT,
822         )
823     }
824 
825     #[cfg(target_arch = "x86_64")]
826     pub fn ssh_command_l2_1(&self, command: &str) -> Result<String, SshCommandError> {
827         ssh_command_ip(
828             command,
829             &self.network.l2_guest_ip1,
830             DEFAULT_SSH_RETRIES,
831             DEFAULT_SSH_TIMEOUT,
832         )
833     }
834 
835     #[cfg(target_arch = "x86_64")]
836     pub fn ssh_command_l2_2(&self, command: &str) -> Result<String, SshCommandError> {
837         ssh_command_ip(
838             command,
839             &self.network.l2_guest_ip2,
840             DEFAULT_SSH_RETRIES,
841             DEFAULT_SSH_TIMEOUT,
842         )
843     }
844 
845     #[cfg(target_arch = "x86_64")]
846     pub fn ssh_command_l2_3(&self, command: &str) -> Result<String, SshCommandError> {
847         ssh_command_ip(
848             command,
849             &self.network.l2_guest_ip3,
850             DEFAULT_SSH_RETRIES,
851             DEFAULT_SSH_TIMEOUT,
852         )
853     }
854 
855     pub fn api_create_body(&self, cpu_count: u8, kernel_path: &str, kernel_cmd: &str) -> String {
856         format! {"{{\"cpus\":{{\"boot_vcpus\":{},\"max_vcpus\":{}}},\"payload\":{{\"kernel\":\"{}\",\"cmdline\": \"{}\"}},\"net\":[{{\"ip\":\"{}\", \"mask\":\"255.255.255.0\", \"mac\":\"{}\"}}], \"disks\":[{{\"path\":\"{}\"}}, {{\"path\":\"{}\"}}]}}",
857                  cpu_count,
858                  cpu_count,
859                  kernel_path,
860                  kernel_cmd,
861                  self.network.host_ip,
862                  self.network.guest_mac,
863                  self.disk_config.disk(DiskType::OperatingSystem).unwrap().as_str(),
864                  self.disk_config.disk(DiskType::CloudInit).unwrap().as_str(),
865         }
866     }
867 
868     pub fn get_cpu_count(&self) -> Result<u32, Error> {
869         self.ssh_command("grep -c processor /proc/cpuinfo")?
870             .trim()
871             .parse()
872             .map_err(Error::Parsing)
873     }
874 
875     #[cfg(target_arch = "x86_64")]
876     pub fn get_initial_apicid(&self) -> Result<u32, Error> {
877         self.ssh_command("grep \"initial apicid\" /proc/cpuinfo | grep -o \"[0-9]*\"")?
878             .trim()
879             .parse()
880             .map_err(Error::Parsing)
881     }
882 
883     pub fn get_total_memory(&self) -> Result<u32, Error> {
884         self.ssh_command("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")?
885             .trim()
886             .parse()
887             .map_err(Error::Parsing)
888     }
889 
890     #[cfg(target_arch = "x86_64")]
891     pub fn get_total_memory_l2(&self) -> Result<u32, Error> {
892         self.ssh_command_l2_1("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")?
893             .trim()
894             .parse()
895             .map_err(Error::Parsing)
896     }
897 
898     pub fn get_numa_node_memory(&self, node_id: usize) -> Result<u32, Error> {
899         self.ssh_command(
900             format!(
901                 "grep MemTotal /sys/devices/system/node/node{}/meminfo \
902                         | cut -d \":\" -f 2 | grep -o \"[0-9]*\"",
903                 node_id
904             )
905             .as_str(),
906         )?
907         .trim()
908         .parse()
909         .map_err(Error::Parsing)
910     }
911 
912     pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), Error> {
913         self.network
914             .wait_vm_boot(custom_timeout)
915             .map_err(Error::WaitForBoot)
916     }
917 
918     pub fn check_numa_node_cpus(&self, node_id: usize, cpus: Vec<usize>) -> Result<(), Error> {
919         for cpu in cpus.iter() {
920             let cmd = format!(
921                 "[ -d \"/sys/devices/system/node/node{}/cpu{}\" ]",
922                 node_id, cpu
923             );
924             self.ssh_command(cmd.as_str())?;
925         }
926 
927         Ok(())
928     }
929 
930     pub fn check_numa_node_distances(
931         &self,
932         node_id: usize,
933         distances: &str,
934     ) -> Result<bool, Error> {
935         let cmd = format!("cat /sys/devices/system/node/node{}/distance", node_id);
936         if self.ssh_command(cmd.as_str())?.trim() == distances {
937             Ok(true)
938         } else {
939             Ok(false)
940         }
941     }
942 
943     pub fn check_numa_common(
944         &self,
945         mem_ref: Option<&[u32]>,
946         node_ref: Option<&[Vec<usize>]>,
947         distance_ref: Option<&[&str]>,
948     ) {
949         if let Some(mem_ref) = mem_ref {
950             // Check each NUMA node has been assigned the right amount of
951             // memory.
952             for (i, &m) in mem_ref.iter().enumerate() {
953                 assert!(self.get_numa_node_memory(i).unwrap_or_default() > m);
954             }
955         }
956 
957         if let Some(node_ref) = node_ref {
958             // Check each NUMA node has been assigned the right CPUs set.
959             for (i, n) in node_ref.iter().enumerate() {
960                 self.check_numa_node_cpus(i, n.clone()).unwrap();
961             }
962         }
963 
964         if let Some(distance_ref) = distance_ref {
965             // Check each NUMA node has been assigned the right distances.
966             for (i, &d) in distance_ref.iter().enumerate() {
967                 assert!(self.check_numa_node_distances(i, d).unwrap());
968             }
969         }
970     }
971 
972     #[cfg(target_arch = "x86_64")]
973     pub fn check_sgx_support(&self) -> Result<(), Error> {
974         self.ssh_command(
975             "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX: \
976                     Software Guard Extensions supported = true'",
977         )?;
978         self.ssh_command(
979             "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX_LC: \
980                     SGX launch config supported = true'",
981         )?;
982         self.ssh_command(
983             "cpuid -l 0x12 -s 0 | tr -s [:space:] | grep -q 'SGX1 \
984                     supported = true'",
985         )?;
986 
987         Ok(())
988     }
989 
990     pub fn get_pci_bridge_class(&self) -> Result<String, Error> {
991         Ok(self
992             .ssh_command("cat /sys/bus/pci/devices/0000:00:00.0/class")?
993             .trim()
994             .to_string())
995     }
996 
997     pub fn get_pci_device_ids(&self) -> Result<String, Error> {
998         Ok(self
999             .ssh_command("cat /sys/bus/pci/devices/*/device")?
1000             .trim()
1001             .to_string())
1002     }
1003 
1004     pub fn get_pci_vendor_ids(&self) -> Result<String, Error> {
1005         Ok(self
1006             .ssh_command("cat /sys/bus/pci/devices/*/vendor")?
1007             .trim()
1008             .to_string())
1009     }
1010 
1011     pub fn does_device_vendor_pair_match(
1012         &self,
1013         device_id: &str,
1014         vendor_id: &str,
1015     ) -> Result<bool, Error> {
1016         // We are checking if console device's device id and vendor id pair matches
1017         let devices = self.get_pci_device_ids()?;
1018         let devices: Vec<&str> = devices.split('\n').collect();
1019         let vendors = self.get_pci_vendor_ids()?;
1020         let vendors: Vec<&str> = vendors.split('\n').collect();
1021 
1022         for (index, d_id) in devices.iter().enumerate() {
1023             if *d_id == device_id {
1024                 if let Some(v_id) = vendors.get(index) {
1025                     if *v_id == vendor_id {
1026                         return Ok(true);
1027                     }
1028                 }
1029             }
1030         }
1031 
1032         Ok(false)
1033     }
1034 
1035     pub fn check_vsock(&self, socket: &str) {
1036         // Listen from guest on vsock CID=3 PORT=16
1037         // SOCKET-LISTEN:<domain>:<protocol>:<local-address>
1038         let guest_ip = self.network.guest_ip.clone();
1039         let listen_socat = thread::spawn(move || {
1040             ssh_command_ip("sudo socat - SOCKET-LISTEN:40:0:x00x00x10x00x00x00x03x00x00x00x00x00x00x00 > vsock_log", &guest_ip, DEFAULT_SSH_RETRIES, DEFAULT_SSH_TIMEOUT).unwrap();
1041         });
1042 
1043         // Make sure socat is listening, which might take a few second on slow systems
1044         thread::sleep(std::time::Duration::new(10, 0));
1045 
1046         // Write something to vsock from the host
1047         assert!(exec_host_command_status(&format!(
1048             "echo -e \"CONNECT 16\\nHelloWorld!\" | socat - UNIX-CONNECT:{}",
1049             socket
1050         ))
1051         .success());
1052 
1053         // Wait for the thread to terminate.
1054         listen_socat.join().unwrap();
1055 
1056         assert_eq!(
1057             self.ssh_command("cat vsock_log").unwrap().trim(),
1058             "HelloWorld!"
1059         );
1060     }
1061 
1062     #[cfg(target_arch = "x86_64")]
1063     pub fn check_nvidia_gpu(&self) {
1064         // Run CUDA sample to validate it can find the device
1065         let device_query_result = self
1066             .ssh_command("sudo /root/NVIDIA_CUDA-11.3_Samples/bin/x86_64/linux/release/deviceQuery")
1067             .unwrap();
1068         assert!(device_query_result.contains("Detected 1 CUDA Capable device"));
1069         assert!(device_query_result.contains("Device 0: \"NVIDIA Tesla T4\""));
1070         assert!(device_query_result.contains("Result = PASS"));
1071 
1072         // Run NVIDIA DCGM Diagnostics to validate the device is functional
1073         self.ssh_command("sudo nv-hostengine").unwrap();
1074 
1075         assert!(self
1076             .ssh_command("sudo dcgmi discovery -l")
1077             .unwrap()
1078             .contains("Name: NVIDIA Tesla T4"));
1079         assert_eq!(
1080             self.ssh_command("sudo dcgmi diag -r 'diagnostic' | grep Pass | wc -l")
1081                 .unwrap()
1082                 .trim(),
1083             "10"
1084         );
1085     }
1086 
1087     pub fn reboot_linux(&self, current_reboot_count: u32, custom_timeout: Option<i32>) {
1088         let list_boots_cmd = "sudo journalctl --list-boots | wc -l";
1089         let boot_count = self
1090             .ssh_command(list_boots_cmd)
1091             .unwrap()
1092             .trim()
1093             .parse::<u32>()
1094             .unwrap_or_default();
1095 
1096         assert_eq!(boot_count, current_reboot_count + 1);
1097         self.ssh_command("sudo reboot").unwrap();
1098 
1099         self.wait_vm_boot(custom_timeout).unwrap();
1100         let boot_count = self
1101             .ssh_command(list_boots_cmd)
1102             .unwrap()
1103             .trim()
1104             .parse::<u32>()
1105             .unwrap_or_default();
1106         assert_eq!(boot_count, current_reboot_count + 2);
1107     }
1108 
1109     pub fn enable_memory_hotplug(&self) {
1110         self.ssh_command("echo online | sudo tee /sys/devices/system/memory/auto_online_blocks")
1111             .unwrap();
1112     }
1113 
1114     pub fn check_devices_common(
1115         &self,
1116         socket: Option<&String>,
1117         console_text: Option<&String>,
1118         pmem_path: Option<&String>,
1119     ) {
1120         // Check block devices are readable
1121         self.ssh_command("sudo dd if=/dev/vda of=/dev/null bs=1M iflag=direct count=1024")
1122             .unwrap();
1123         self.ssh_command("sudo dd if=/dev/vdb of=/dev/null bs=1M iflag=direct count=8")
1124             .unwrap();
1125         // Check if the rng device is readable
1126         self.ssh_command("sudo head -c 1000 /dev/hwrng > /dev/null")
1127             .unwrap();
1128         // Check vsock
1129         if let Some(socket) = socket {
1130             self.check_vsock(socket.as_str());
1131         }
1132         // Check if the console is usable
1133         if let Some(console_text) = console_text {
1134             let console_cmd = format!("echo {} | sudo tee /dev/hvc0", console_text);
1135             self.ssh_command(&console_cmd).unwrap();
1136         }
1137         // The net device is 'automatically' exercised through the above 'ssh' commands
1138 
1139         // Check if the pmem device is usable
1140         if let Some(pmem_path) = pmem_path {
1141             assert_eq!(
1142                 self.ssh_command(&format!("ls {}", pmem_path))
1143                     .unwrap()
1144                     .trim(),
1145                 pmem_path
1146             );
1147             assert_eq!(
1148                 self.ssh_command(&format!("sudo mount {} /mnt", pmem_path))
1149                     .unwrap(),
1150                 ""
1151             );
1152             assert_eq!(self.ssh_command("ls /mnt").unwrap(), "lost+found\n");
1153             self.ssh_command("echo test123 | sudo tee /mnt/test")
1154                 .unwrap();
1155             assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), "");
1156             assert_eq!(self.ssh_command("ls /mnt").unwrap(), "");
1157 
1158             assert_eq!(
1159                 self.ssh_command(&format!("sudo mount {} /mnt", pmem_path))
1160                     .unwrap(),
1161                 ""
1162             );
1163             assert_eq!(
1164                 self.ssh_command("sudo cat /mnt/test || true")
1165                     .unwrap()
1166                     .trim(),
1167                 "test123"
1168             );
1169             self.ssh_command("sudo rm /mnt/test").unwrap();
1170             assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), "");
1171         }
1172     }
1173 }
1174 
1175 pub enum VerbosityLevel {
1176     Warn,
1177     Info,
1178     Debug,
1179 }
1180 
1181 impl Default for VerbosityLevel {
1182     fn default() -> Self {
1183         Self::Warn
1184     }
1185 }
1186 
1187 impl ToString for VerbosityLevel {
1188     fn to_string(&self) -> String {
1189         use VerbosityLevel::*;
1190         match self {
1191             Warn => "".to_string(),
1192             Info => "-v".to_string(),
1193             Debug => "-vv".to_string(),
1194         }
1195     }
1196 }
1197 
1198 pub struct GuestCommand<'a> {
1199     command: Command,
1200     guest: &'a Guest,
1201     capture_output: bool,
1202     print_cmd: bool,
1203     verbosity: VerbosityLevel,
1204 }
1205 
1206 impl<'a> GuestCommand<'a> {
1207     pub fn new(guest: &'a Guest) -> Self {
1208         Self::new_with_binary_path(guest, &clh_command("cloud-hypervisor"))
1209     }
1210 
1211     pub fn new_with_binary_path(guest: &'a Guest, binary_path: &str) -> Self {
1212         Self {
1213             command: Command::new(binary_path),
1214             guest,
1215             capture_output: false,
1216             print_cmd: true,
1217             verbosity: VerbosityLevel::Info,
1218         }
1219     }
1220 
1221     pub fn verbosity(&mut self, verbosity: VerbosityLevel) -> &mut Self {
1222         self.verbosity = verbosity;
1223         self
1224     }
1225 
1226     pub fn capture_output(&mut self) -> &mut Self {
1227         self.capture_output = true;
1228         self
1229     }
1230 
1231     pub fn set_print_cmd(&mut self, print_cmd: bool) -> &mut Self {
1232         self.print_cmd = print_cmd;
1233         self
1234     }
1235 
1236     pub fn spawn(&mut self) -> io::Result<Child> {
1237         use VerbosityLevel::*;
1238         match &self.verbosity {
1239             Warn => {}
1240             Info => {
1241                 self.command.arg("-v");
1242             }
1243             Debug => {
1244                 self.command.arg("-vv");
1245             }
1246         };
1247 
1248         if self.print_cmd {
1249             println!(
1250                 "\n\n==== Start cloud-hypervisor command-line ====\n\n\
1251                      {:?}\n\
1252                      \n==== End cloud-hypervisor command-line ====\n\n",
1253                 self.command
1254             );
1255         }
1256 
1257         if self.capture_output {
1258             let child = self
1259                 .command
1260                 .stderr(Stdio::piped())
1261                 .stdout(Stdio::piped())
1262                 .spawn()
1263                 .unwrap();
1264 
1265             let fd = child.stdout.as_ref().unwrap().as_raw_fd();
1266             let pipesize = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) };
1267             if pipesize == -1 {
1268                 return Err(io::Error::last_os_error());
1269             }
1270             let fd = child.stderr.as_ref().unwrap().as_raw_fd();
1271             let pipesize1 = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) };
1272             if pipesize1 == -1 {
1273                 return Err(io::Error::last_os_error());
1274             }
1275 
1276             if pipesize >= PIPE_SIZE && pipesize1 >= PIPE_SIZE {
1277                 Ok(child)
1278             } else {
1279                 Err(std::io::Error::new(
1280                     std::io::ErrorKind::Other,
1281                     format!(
1282                         "resizing pipe w/ 'fnctl' failed: stdout pipesize {}, stderr pipesize {}",
1283                         pipesize, pipesize1
1284                     ),
1285                 ))
1286             }
1287         } else {
1288             self.command.spawn()
1289         }
1290     }
1291 
1292     pub fn args<I, S>(&mut self, args: I) -> &mut Self
1293     where
1294         I: IntoIterator<Item = S>,
1295         S: AsRef<OsStr>,
1296     {
1297         self.command.args(args);
1298         self
1299     }
1300 
1301     pub fn default_disks(&mut self) -> &mut Self {
1302         if self.guest.disk_config.disk(DiskType::CloudInit).is_some() {
1303             self.args([
1304                 "--disk",
1305                 format!(
1306                     "path={}",
1307                     self.guest
1308                         .disk_config
1309                         .disk(DiskType::OperatingSystem)
1310                         .unwrap()
1311                 )
1312                 .as_str(),
1313                 format!(
1314                     "path={}",
1315                     self.guest.disk_config.disk(DiskType::CloudInit).unwrap()
1316                 )
1317                 .as_str(),
1318             ])
1319         } else {
1320             self.args([
1321                 "--disk",
1322                 format!(
1323                     "path={}",
1324                     self.guest
1325                         .disk_config
1326                         .disk(DiskType::OperatingSystem)
1327                         .unwrap()
1328                 )
1329                 .as_str(),
1330             ])
1331         }
1332     }
1333 
1334     pub fn default_net(&mut self) -> &mut Self {
1335         self.args(["--net", self.guest.default_net_string().as_str()])
1336     }
1337 }
1338 
1339 pub fn clh_command(cmd: &str) -> String {
1340     env::var("BUILD_TARGET").map_or(
1341         format!("target/x86_64-unknown-linux-gnu/release/{}", cmd),
1342         |target| format!("target/{}/release/{}", target, cmd),
1343     )
1344 }
1345 
1346 pub fn parse_iperf3_output(output: &[u8], sender: bool) -> Result<f64, Error> {
1347     std::panic::catch_unwind(|| {
1348         let s = String::from_utf8_lossy(output);
1349         let v: Value = serde_json::from_str(&s).expect("'iperf3' parse error: invalid json output");
1350 
1351         let bps: f64 = if sender {
1352             v["end"]["sum_sent"]["bits_per_second"]
1353                 .as_f64()
1354                 .expect("'iperf3' parse error: missing entry 'end.sum_sent.bits_per_second'")
1355         } else {
1356             v["end"]["sum_received"]["bits_per_second"]
1357                 .as_f64()
1358                 .expect("'iperf3' parse error: missing entry 'end.sum_received.bits_per_second'")
1359         };
1360 
1361         bps
1362     })
1363     .map_err(|_| {
1364         eprintln!(
1365             "=============== iperf3 output ===============\n\n{}\n\n===========end============\n\n",
1366             String::from_utf8_lossy(output)
1367         );
1368         Error::Iperf3Parse
1369     })
1370 }
1371 
1372 pub enum FioOps {
1373     Read,
1374     RandomRead,
1375     Write,
1376     RandomWrite,
1377     ReadWrite,
1378     RandRW,
1379 }
1380 
1381 impl fmt::Display for FioOps {
1382     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1383         match self {
1384             FioOps::Read => write!(f, "read"),
1385             FioOps::RandomRead => write!(f, "randread"),
1386             FioOps::Write => write!(f, "write"),
1387             FioOps::RandomWrite => write!(f, "randwrite"),
1388             FioOps::ReadWrite => write!(f, "rw"),
1389             FioOps::RandRW => write!(f, "randrw"),
1390         }
1391     }
1392 }
1393 
1394 pub fn parse_fio_output(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> {
1395     std::panic::catch_unwind(|| {
1396         let v: Value =
1397             serde_json::from_str(output).expect("'fio' parse error: invalid json output");
1398         let jobs = v["jobs"]
1399             .as_array()
1400             .expect("'fio' parse error: missing entry 'jobs'");
1401         assert_eq!(
1402             jobs.len(),
1403             num_jobs as usize,
1404             "'fio' parse error: Unexpected number of 'fio' jobs."
1405         );
1406 
1407         let (read, write) = match fio_ops {
1408             FioOps::Read | FioOps::RandomRead => (true, false),
1409             FioOps::Write | FioOps::RandomWrite => (false, true),
1410             FioOps::ReadWrite | FioOps::RandRW => (true, true),
1411         };
1412 
1413         let mut total_bps = 0_f64;
1414         for j in jobs {
1415             if read {
1416                 let bytes = j["read"]["io_bytes"]
1417                     .as_u64()
1418                     .expect("'fio' parse error: missing entry 'read.io_bytes'");
1419                 let runtime = j["read"]["runtime"]
1420                     .as_u64()
1421                     .expect("'fio' parse error: missing entry 'read.runtime'")
1422                     as f64
1423                     / 1000_f64;
1424                 total_bps += bytes as f64 / runtime as f64;
1425             }
1426             if write {
1427                 let bytes = j["write"]["io_bytes"]
1428                     .as_u64()
1429                     .expect("'fio' parse error: missing entry 'write.io_bytes'");
1430                 let runtime = j["write"]["runtime"]
1431                     .as_u64()
1432                     .expect("'fio' parse error: missing entry 'write.runtime'")
1433                     as f64
1434                     / 1000_f64;
1435                 total_bps += bytes as f64 / runtime as f64;
1436             }
1437         }
1438 
1439         total_bps
1440     })
1441     .map_err(|_| {
1442         eprintln!(
1443             "=============== Fio output ===============\n\n{}\n\n===========end============\n\n",
1444             output
1445         );
1446         Error::FioOutputParse
1447     })
1448 }
1449 
1450 pub fn parse_fio_output_iops(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> {
1451     std::panic::catch_unwind(|| {
1452         let v: Value =
1453             serde_json::from_str(output).expect("'fio' parse error: invalid json output");
1454         let jobs = v["jobs"]
1455             .as_array()
1456             .expect("'fio' parse error: missing entry 'jobs'");
1457         assert_eq!(
1458             jobs.len(),
1459             num_jobs as usize,
1460             "'fio' parse error: Unexpected number of 'fio' jobs."
1461         );
1462 
1463         let (read, write) = match fio_ops {
1464             FioOps::Read | FioOps::RandomRead => (true, false),
1465             FioOps::Write | FioOps::RandomWrite => (false, true),
1466             FioOps::ReadWrite | FioOps::RandRW => (true, true),
1467         };
1468 
1469         let mut total_iops = 0_f64;
1470         for j in jobs {
1471             if read {
1472                 let ios = j["read"]["total_ios"]
1473                     .as_u64()
1474                     .expect("'fio' parse error: missing entry 'read.total_ios'");
1475                 let runtime = j["read"]["runtime"]
1476                     .as_u64()
1477                     .expect("'fio' parse error: missing entry 'read.runtime'")
1478                     as f64
1479                     / 1000_f64;
1480                 total_iops += ios as f64 / runtime as f64;
1481             }
1482             if write {
1483                 let ios = j["write"]["total_ios"]
1484                     .as_u64()
1485                     .expect("'fio' parse error: missing entry 'write.total_ios'");
1486                 let runtime = j["write"]["runtime"]
1487                     .as_u64()
1488                     .expect("'fio' parse error: missing entry 'write.runtime'")
1489                     as f64
1490                     / 1000_f64;
1491                 total_iops += ios as f64 / runtime as f64;
1492             }
1493         }
1494 
1495         total_iops
1496     })
1497     .map_err(|_| {
1498         eprintln!(
1499             "=============== Fio output ===============\n\n{}\n\n===========end============\n\n",
1500             output
1501         );
1502         Error::FioOutputParse
1503     })
1504 }
1505 
1506 // Wait the child process for a given timeout
1507 fn child_wait_timeout(child: &mut Child, timeout: u64) -> Result<(), WaitTimeoutError> {
1508     match child.wait_timeout(Duration::from_secs(timeout)) {
1509         Err(e) => {
1510             return Err(WaitTimeoutError::General(e));
1511         }
1512         Ok(s) => match s {
1513             None => {
1514                 return Err(WaitTimeoutError::Timedout);
1515             }
1516             Some(s) => {
1517                 if !s.success() {
1518                     return Err(WaitTimeoutError::ExitStatus);
1519                 }
1520             }
1521         },
1522     }
1523 
1524     Ok(())
1525 }
1526 
1527 pub fn measure_virtio_net_throughput(
1528     test_timeout: u32,
1529     queue_pairs: u32,
1530     guest: &Guest,
1531     receive: bool,
1532 ) -> Result<f64, Error> {
1533     let default_port = 5201;
1534 
1535     // 1. start the iperf3 server on the guest
1536     for n in 0..queue_pairs {
1537         guest.ssh_command(&format!("iperf3 -s -p {} -D", default_port + n))?;
1538     }
1539 
1540     thread::sleep(Duration::new(1, 0));
1541 
1542     // 2. start the iperf3 client on host to measure RX through-put
1543     let mut clients = Vec::new();
1544     for n in 0..queue_pairs {
1545         let mut cmd = Command::new("iperf3");
1546         cmd.args([
1547             "-J", // Output in JSON format
1548             "-c",
1549             &guest.network.guest_ip,
1550             "-p",
1551             &format!("{}", default_port + n),
1552             "-t",
1553             &format!("{}", test_timeout),
1554         ]);
1555         // For measuring the guest transmit throughput (as a sender),
1556         // use reverse mode of the iperf3 client on the host
1557         if !receive {
1558             cmd.args(["-R"]);
1559         }
1560         let client = cmd
1561             .stderr(Stdio::piped())
1562             .stdout(Stdio::piped())
1563             .spawn()
1564             .map_err(Error::Spawn)?;
1565 
1566         clients.push(client);
1567     }
1568 
1569     let mut err: Option<Error> = None;
1570     let mut bps = Vec::new();
1571     let mut failed = false;
1572     for c in clients {
1573         let mut c = c;
1574         if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5) {
1575             err = Some(Error::WaitTimeout(e));
1576             failed = true;
1577         }
1578 
1579         if !failed {
1580             // Safe to unwrap as we know the child has terminated succesffully
1581             let output = c.wait_with_output().unwrap();
1582             bps.push(parse_iperf3_output(&output.stdout, receive)?);
1583         } else {
1584             let _ = c.kill();
1585             let output = c.wait_with_output().unwrap();
1586             println!(
1587                 "=============== Client output [Error] ===============\n\n{}\n\n===========end============\n\n",
1588                 String::from_utf8_lossy(&output.stdout)
1589             );
1590         }
1591     }
1592 
1593     if let Some(e) = err {
1594         Err(e)
1595     } else {
1596         Ok(bps.iter().sum())
1597     }
1598 }
1599 
1600 pub fn parse_ethr_latency_output(output: &[u8]) -> Result<Vec<f64>, Error> {
1601     std::panic::catch_unwind(|| {
1602         let s = String::from_utf8_lossy(output);
1603         let mut latency = Vec::new();
1604         for l in s.lines() {
1605             let v: Value = serde_json::from_str(l).expect("'ethr' parse error: invalid json line");
1606             // Skip header/summary lines
1607             if let Some(avg) = v["Avg"].as_str() {
1608                 // Assume the latency unit is always "us"
1609                 latency.push(
1610                     avg.split("us").collect::<Vec<&str>>()[0]
1611                         .parse::<f64>()
1612                         .expect("'ethr' parse error: invalid 'Avg' entry"),
1613                 );
1614             }
1615         }
1616 
1617         assert!(
1618             !latency.is_empty(),
1619             "'ethr' parse error: no valid latency data found"
1620         );
1621 
1622         latency
1623     })
1624     .map_err(|_| {
1625         eprintln!(
1626             "=============== ethr output ===============\n\n{}\n\n===========end============\n\n",
1627             String::from_utf8_lossy(output)
1628         );
1629         Error::EthrLogParse
1630     })
1631 }
1632 
1633 pub fn measure_virtio_net_latency(guest: &Guest, test_timeout: u32) -> Result<Vec<f64>, Error> {
1634     // copy the 'ethr' tool to the guest image
1635     let ethr_path = "/usr/local/bin/ethr";
1636     let ethr_remote_path = "/tmp/ethr";
1637     scp_to_guest(
1638         Path::new(ethr_path),
1639         Path::new(ethr_remote_path),
1640         &guest.network.guest_ip,
1641         //DEFAULT_SSH_RETRIES,
1642         1,
1643         DEFAULT_SSH_TIMEOUT,
1644     )?;
1645 
1646     // Start the ethr server on the guest
1647     guest.ssh_command(&format!("{} -s &> /dev/null &", ethr_remote_path))?;
1648 
1649     thread::sleep(Duration::new(10, 0));
1650 
1651     // Start the ethr client on the host
1652     let log_file = guest
1653         .tmp_dir
1654         .as_path()
1655         .join("ethr.client.log")
1656         .to_str()
1657         .unwrap()
1658         .to_string();
1659     let mut c = Command::new(ethr_path)
1660         .args([
1661             "-c",
1662             &guest.network.guest_ip,
1663             "-t",
1664             "l",
1665             "-o",
1666             &log_file, // file output is JSON format
1667             "-d",
1668             &format!("{}s", test_timeout),
1669         ])
1670         .stderr(Stdio::piped())
1671         .stdout(Stdio::piped())
1672         .spawn()
1673         .map_err(Error::Spawn)?;
1674 
1675     if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5).map_err(Error::WaitTimeout)
1676     {
1677         let _ = c.kill();
1678         return Err(e);
1679     }
1680 
1681     // Parse the ethr latency test output
1682     let content = fs::read(log_file).map_err(Error::EthrLogFile)?;
1683     parse_ethr_latency_output(&content)
1684 }
1685