xref: /cloud-hypervisor/test_infra/src/lib.rs (revision 5e52729453cb62edbe4fb3a4aa24f8cca31e667e)
1 // Copyright © 2021 Intel Corporation
2 //
3 // SPDX-License-Identifier: Apache-2.0
4 //
5 
6 #![allow(clippy::undocumented_unsafe_blocks)]
7 
8 use once_cell::sync::Lazy;
9 use serde_json::Value;
10 use ssh2::Session;
11 use std::env;
12 use std::ffi::OsStr;
13 use std::io;
14 use std::io::{Read, Write};
15 use std::net::TcpListener;
16 use std::net::TcpStream;
17 use std::os::unix::fs::PermissionsExt;
18 use std::os::unix::io::{AsRawFd, FromRawFd};
19 use std::path::Path;
20 use std::process::{Child, Command, ExitStatus, Output, Stdio};
21 use std::str::FromStr;
22 use std::sync::Mutex;
23 use std::thread;
24 use std::time::Duration;
25 use std::{fmt, fs};
26 use vmm_sys_util::tempdir::TempDir;
27 use wait_timeout::ChildExt;
28 
29 #[derive(Debug)]
30 pub enum WaitTimeoutError {
31     Timedout,
32     ExitStatus,
33     General(std::io::Error),
34 }
35 
36 #[derive(Debug)]
37 pub enum Error {
38     Parsing(std::num::ParseIntError),
39     SshCommand(SshCommandError),
40     WaitForBoot(WaitForBootError),
41     EthrLogFile(std::io::Error),
42     EthrLogParse,
43     FioOutputParse,
44     Iperf3Parse,
45     Spawn(std::io::Error),
46     WaitTimeout(WaitTimeoutError),
47 }
48 
49 impl From<SshCommandError> for Error {
50     fn from(e: SshCommandError) -> Self {
51         Self::SshCommand(e)
52     }
53 }
54 
55 pub struct GuestNetworkConfig {
56     pub guest_ip: String,
57     pub l2_guest_ip1: String,
58     pub l2_guest_ip2: String,
59     pub l2_guest_ip3: String,
60     pub host_ip: String,
61     pub guest_mac: String,
62     pub l2_guest_mac1: String,
63     pub l2_guest_mac2: String,
64     pub l2_guest_mac3: String,
65     pub tcp_listener_port: u16,
66 }
67 
68 pub const DEFAULT_TCP_LISTENER_MESSAGE: &str = "booted";
69 pub const DEFAULT_TCP_LISTENER_PORT: u16 = 8000;
70 pub const DEFAULT_TCP_LISTENER_TIMEOUT: i32 = 120;
71 
72 #[derive(Debug)]
73 pub enum WaitForBootError {
74     EpollWait(std::io::Error),
75     Listen(std::io::Error),
76     EpollWaitTimeout,
77     WrongGuestAddr,
78     Accept(std::io::Error),
79 }
80 
81 impl GuestNetworkConfig {
82     pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), WaitForBootError> {
83         let start = std::time::Instant::now();
84         // The 'port' is unique per 'GUEST' and listening to wild-card ip avoids retrying on 'TcpListener::bind()'
85         let listen_addr = format!("0.0.0.0:{}", self.tcp_listener_port);
86         let expected_guest_addr = self.guest_ip.as_str();
87         let mut s = String::new();
88         let timeout = match custom_timeout {
89             Some(t) => t,
90             None => DEFAULT_TCP_LISTENER_TIMEOUT,
91         };
92 
93         match (|| -> Result<(), WaitForBootError> {
94             let listener =
95                 TcpListener::bind(listen_addr.as_str()).map_err(WaitForBootError::Listen)?;
96             listener
97                 .set_nonblocking(true)
98                 .expect("Cannot set non-blocking for tcp listener");
99 
100             // Reply on epoll w/ timeout to wait for guest connections faithfully
101             let epoll_fd = epoll::create(true).expect("Cannot create epoll fd");
102             // Use 'File' to enforce closing on 'epoll_fd'
103             let _epoll_file = unsafe { fs::File::from_raw_fd(epoll_fd) };
104             epoll::ctl(
105                 epoll_fd,
106                 epoll::ControlOptions::EPOLL_CTL_ADD,
107                 listener.as_raw_fd(),
108                 epoll::Event::new(epoll::Events::EPOLLIN, 0),
109             )
110             .expect("Cannot add 'tcp_listener' event to epoll");
111             let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); 1];
112             loop {
113                 let num_events = match epoll::wait(epoll_fd, timeout * 1000_i32, &mut events[..]) {
114                     Ok(num_events) => Ok(num_events),
115                     Err(e) => match e.raw_os_error() {
116                         Some(libc::EAGAIN) | Some(libc::EINTR) => continue,
117                         _ => Err(e),
118                     },
119                 }
120                 .map_err(WaitForBootError::EpollWait)?;
121                 if num_events == 0 {
122                     return Err(WaitForBootError::EpollWaitTimeout);
123                 }
124                 break;
125             }
126 
127             match listener.accept() {
128                 Ok((_, addr)) => {
129                     // Make sure the connection is from the expected 'guest_addr'
130                     if addr.ip() != std::net::IpAddr::from_str(expected_guest_addr).unwrap() {
131                         s = format!(
132                             "Expecting the guest ip '{}' while being connected with ip '{}'",
133                             expected_guest_addr,
134                             addr.ip()
135                         );
136                         return Err(WaitForBootError::WrongGuestAddr);
137                     }
138 
139                     Ok(())
140                 }
141                 Err(e) => {
142                     s = "TcpListener::accept() failed".to_string();
143                     Err(WaitForBootError::Accept(e))
144                 }
145             }
146         })() {
147             Err(e) => {
148                 let duration = start.elapsed();
149                 eprintln!(
150                     "\n\n==== Start 'wait_vm_boot' (FAILED) ====\n\n\
151                  duration =\"{duration:?}, timeout = {timeout}s\"\n\
152                  listen_addr=\"{listen_addr}\"\n\
153                  expected_guest_addr=\"{expected_guest_addr}\"\n\
154                  message=\"{s}\"\n\
155                  error=\"{e:?}\"\n\
156                  \n==== End 'wait_vm_boot' outout ====\n\n"
157                 );
158 
159                 Err(e)
160             }
161             Ok(_) => Ok(()),
162         }
163     }
164 }
165 
166 pub enum DiskType {
167     OperatingSystem,
168     CloudInit,
169 }
170 
171 pub trait DiskConfig {
172     fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig);
173     fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String;
174     fn disk(&self, disk_type: DiskType) -> Option<String>;
175 }
176 
177 #[derive(Clone)]
178 pub struct UbuntuDiskConfig {
179     osdisk_path: String,
180     cloudinit_path: String,
181     image_name: String,
182 }
183 
184 impl UbuntuDiskConfig {
185     pub fn new(image_name: String) -> Self {
186         UbuntuDiskConfig {
187             image_name,
188             osdisk_path: String::new(),
189             cloudinit_path: String::new(),
190         }
191     }
192 }
193 
194 pub struct WindowsDiskConfig {
195     image_name: String,
196     osdisk_path: String,
197     loopback_device: String,
198     windows_snapshot_cow: String,
199     windows_snapshot: String,
200 }
201 
202 impl WindowsDiskConfig {
203     pub fn new(image_name: String) -> Self {
204         WindowsDiskConfig {
205             image_name,
206             osdisk_path: String::new(),
207             loopback_device: String::new(),
208             windows_snapshot_cow: String::new(),
209             windows_snapshot: String::new(),
210         }
211     }
212 }
213 
214 impl Drop for WindowsDiskConfig {
215     fn drop(&mut self) {
216         // dmsetup remove windows-snapshot-1
217         std::process::Command::new("dmsetup")
218             .arg("remove")
219             .arg(self.windows_snapshot.as_str())
220             .output()
221             .expect("Expect removing Windows snapshot with 'dmsetup' to succeed");
222 
223         // dmsetup remove windows-snapshot-cow-1
224         std::process::Command::new("dmsetup")
225             .arg("remove")
226             .arg(self.windows_snapshot_cow.as_str())
227             .output()
228             .expect("Expect removing Windows snapshot CoW with 'dmsetup' to succeed");
229 
230         // losetup -d <loopback_device>
231         std::process::Command::new("losetup")
232             .args(["-d", self.loopback_device.as_str()])
233             .output()
234             .expect("Expect removing loopback device to succeed");
235     }
236 }
237 
238 impl DiskConfig for UbuntuDiskConfig {
239     fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String {
240         let cloudinit_file_path =
241             String::from(tmp_dir.as_path().join("cloudinit").to_str().unwrap());
242 
243         let cloud_init_directory = tmp_dir.as_path().join("cloud-init").join("ubuntu");
244 
245         fs::create_dir_all(&cloud_init_directory)
246             .expect("Expect creating cloud-init directory to succeed");
247 
248         let source_file_dir = std::env::current_dir()
249             .unwrap()
250             .join("test_data")
251             .join("cloud-init")
252             .join("ubuntu");
253 
254         vec!["meta-data"].iter().for_each(|x| {
255             rate_limited_copy(source_file_dir.join(x), cloud_init_directory.join(x))
256                 .expect("Expect copying cloud-init meta-data to succeed");
257         });
258 
259         let mut user_data_string = String::new();
260         fs::File::open(source_file_dir.join("user-data"))
261             .unwrap()
262             .read_to_string(&mut user_data_string)
263             .expect("Expected reading user-data file in to succeed");
264         user_data_string = user_data_string.replace(
265             "@DEFAULT_TCP_LISTENER_MESSAGE",
266             DEFAULT_TCP_LISTENER_MESSAGE,
267         );
268         user_data_string = user_data_string.replace("@HOST_IP", &network.host_ip);
269         user_data_string =
270             user_data_string.replace("@TCP_LISTENER_PORT", &network.tcp_listener_port.to_string());
271 
272         fs::File::create(cloud_init_directory.join("user-data"))
273             .unwrap()
274             .write_all(user_data_string.as_bytes())
275             .expect("Expected writing out user-data to succeed");
276 
277         let mut network_config_string = String::new();
278 
279         fs::File::open(source_file_dir.join("network-config"))
280             .unwrap()
281             .read_to_string(&mut network_config_string)
282             .expect("Expected reading network-config file in to succeed");
283 
284         network_config_string = network_config_string.replace("192.168.2.1", &network.host_ip);
285         network_config_string = network_config_string.replace("192.168.2.2", &network.guest_ip);
286         network_config_string = network_config_string.replace("192.168.2.3", &network.l2_guest_ip1);
287         network_config_string = network_config_string.replace("192.168.2.4", &network.l2_guest_ip2);
288         network_config_string = network_config_string.replace("192.168.2.5", &network.l2_guest_ip3);
289         network_config_string =
290             network_config_string.replace("12:34:56:78:90:ab", &network.guest_mac);
291         network_config_string =
292             network_config_string.replace("de:ad:be:ef:12:34", &network.l2_guest_mac1);
293         network_config_string =
294             network_config_string.replace("de:ad:be:ef:34:56", &network.l2_guest_mac2);
295         network_config_string =
296             network_config_string.replace("de:ad:be:ef:56:78", &network.l2_guest_mac3);
297 
298         fs::File::create(cloud_init_directory.join("network-config"))
299             .unwrap()
300             .write_all(network_config_string.as_bytes())
301             .expect("Expected writing out network-config to succeed");
302 
303         std::process::Command::new("mkdosfs")
304             .args(["-n", "cidata"])
305             .args(["-C", cloudinit_file_path.as_str()])
306             .arg("8192")
307             .output()
308             .expect("Expect creating disk image to succeed");
309 
310         vec!["user-data", "meta-data", "network-config"]
311             .iter()
312             .for_each(|x| {
313                 std::process::Command::new("mcopy")
314                     .arg("-o")
315                     .args(["-i", cloudinit_file_path.as_str()])
316                     .args(["-s", cloud_init_directory.join(x).to_str().unwrap(), "::"])
317                     .output()
318                     .expect("Expect copying files to disk image to succeed");
319             });
320 
321         cloudinit_file_path
322     }
323 
324     fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig) {
325         let mut workload_path = dirs::home_dir().unwrap();
326         workload_path.push("workloads");
327 
328         let mut osdisk_base_path = workload_path;
329         osdisk_base_path.push(&self.image_name);
330 
331         let osdisk_path = String::from(tmp_dir.as_path().join("osdisk.img").to_str().unwrap());
332         let cloudinit_path = self.prepare_cloudinit(tmp_dir, network);
333 
334         rate_limited_copy(osdisk_base_path, &osdisk_path)
335             .expect("copying of OS source disk image failed");
336 
337         self.cloudinit_path = cloudinit_path;
338         self.osdisk_path = osdisk_path;
339     }
340 
341     fn disk(&self, disk_type: DiskType) -> Option<String> {
342         match disk_type {
343             DiskType::OperatingSystem => Some(self.osdisk_path.clone()),
344             DiskType::CloudInit => Some(self.cloudinit_path.clone()),
345         }
346     }
347 }
348 
349 impl DiskConfig for WindowsDiskConfig {
350     fn prepare_cloudinit(&self, _tmp_dir: &TempDir, _network: &GuestNetworkConfig) -> String {
351         String::new()
352     }
353 
354     fn prepare_files(&mut self, tmp_dir: &TempDir, _network: &GuestNetworkConfig) {
355         let mut workload_path = dirs::home_dir().unwrap();
356         workload_path.push("workloads");
357 
358         let mut osdisk_path = workload_path;
359         osdisk_path.push(&self.image_name);
360 
361         let osdisk_blk_size = fs::metadata(osdisk_path)
362             .expect("Expect retrieving Windows image metadata")
363             .len()
364             >> 9;
365 
366         let snapshot_cow_path =
367             String::from(tmp_dir.as_path().join("snapshot_cow").to_str().unwrap());
368 
369         // Create and truncate CoW file for device mapper
370         let cow_file_size: u64 = 1 << 30;
371         let cow_file_blk_size = cow_file_size >> 9;
372         let cow_file = std::fs::File::create(snapshot_cow_path.as_str())
373             .expect("Expect creating CoW image to succeed");
374         cow_file
375             .set_len(cow_file_size)
376             .expect("Expect truncating CoW image to succeed");
377 
378         // losetup --find --show /tmp/snapshot_cow
379         let loopback_device = std::process::Command::new("losetup")
380             .arg("--find")
381             .arg("--show")
382             .arg(snapshot_cow_path.as_str())
383             .output()
384             .expect("Expect creating loopback device from snapshot CoW image to succeed");
385 
386         self.loopback_device = String::from_utf8_lossy(&loopback_device.stdout)
387             .trim()
388             .to_string();
389 
390         let random_extension = tmp_dir.as_path().file_name().unwrap();
391         let windows_snapshot_cow = format!(
392             "windows-snapshot-cow-{}",
393             random_extension.to_str().unwrap()
394         );
395 
396         // dmsetup create windows-snapshot-cow-1 --table '0 2097152 linear /dev/loop1 0'
397         std::process::Command::new("dmsetup")
398             .arg("create")
399             .arg(windows_snapshot_cow.as_str())
400             .args([
401                 "--table",
402                 format!("0 {} linear {} 0", cow_file_blk_size, self.loopback_device).as_str(),
403             ])
404             .output()
405             .expect("Expect creating Windows snapshot CoW with 'dmsetup' to succeed");
406 
407         let windows_snapshot = format!("windows-snapshot-{}", random_extension.to_str().unwrap());
408 
409         // dmsetup mknodes
410         std::process::Command::new("dmsetup")
411             .arg("mknodes")
412             .output()
413             .expect("Expect device mapper nodes to be ready");
414 
415         // dmsetup create windows-snapshot-1 --table '0 41943040 snapshot /dev/mapper/windows-base /dev/mapper/windows-snapshot-cow-1 P 8'
416         std::process::Command::new("dmsetup")
417             .arg("create")
418             .arg(windows_snapshot.as_str())
419             .args([
420                 "--table",
421                 format!(
422                     "0 {} snapshot /dev/mapper/windows-base /dev/mapper/{} P 8",
423                     osdisk_blk_size,
424                     windows_snapshot_cow.as_str()
425                 )
426                 .as_str(),
427             ])
428             .output()
429             .expect("Expect creating Windows snapshot with 'dmsetup' to succeed");
430 
431         // dmsetup mknodes
432         std::process::Command::new("dmsetup")
433             .arg("mknodes")
434             .output()
435             .expect("Expect device mapper nodes to be ready");
436 
437         self.osdisk_path = format!("/dev/mapper/{windows_snapshot}");
438         self.windows_snapshot_cow = windows_snapshot_cow;
439         self.windows_snapshot = windows_snapshot;
440     }
441 
442     fn disk(&self, disk_type: DiskType) -> Option<String> {
443         match disk_type {
444             DiskType::OperatingSystem => Some(self.osdisk_path.clone()),
445             DiskType::CloudInit => None,
446         }
447     }
448 }
449 
450 pub fn rate_limited_copy<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<u64> {
451     for i in 0..10 {
452         let free_bytes = unsafe {
453             let mut stats = std::mem::MaybeUninit::zeroed();
454             let fs_name = std::ffi::CString::new("/tmp").unwrap();
455             libc::statvfs(fs_name.as_ptr(), stats.as_mut_ptr());
456 
457             let free_blocks = stats.assume_init().f_bfree;
458             let block_size = stats.assume_init().f_bsize;
459 
460             free_blocks * block_size
461         };
462 
463         // Make sure there is at least 6 GiB of space
464         if free_bytes < 6 << 30 {
465             eprintln!("Not enough space on disk ({free_bytes}). Attempt {i} of 10. Sleeping.");
466             thread::sleep(std::time::Duration::new(60, 0));
467             continue;
468         }
469 
470         match fs::copy(&from, &to) {
471             Err(e) => {
472                 if let Some(errno) = e.raw_os_error() {
473                     if errno == libc::ENOSPC {
474                         eprintln!("Copy returned ENOSPC. Attempt {i} of 10. Sleeping.");
475                         thread::sleep(std::time::Duration::new(60, 0));
476                         continue;
477                     }
478                 }
479                 return Err(e);
480             }
481             Ok(i) => return Ok(i),
482         }
483     }
484     Err(io::Error::last_os_error())
485 }
486 
487 pub fn handle_child_output(
488     r: Result<(), std::boxed::Box<dyn std::any::Any + std::marker::Send>>,
489     output: &std::process::Output,
490 ) {
491     use std::os::unix::process::ExitStatusExt;
492     if r.is_ok() && output.status.success() {
493         return;
494     }
495 
496     match output.status.code() {
497         None => {
498             // Don't treat child.kill() as a problem
499             if output.status.signal() == Some(9) && r.is_ok() {
500                 return;
501             }
502 
503             eprintln!(
504                 "==== child killed by signal: {} ====",
505                 output.status.signal().unwrap()
506             );
507         }
508         Some(code) => {
509             eprintln!("\n\n==== child exit code: {code} ====");
510         }
511     }
512 
513     eprintln!(
514         "\n\n==== Start child stdout ====\n\n{}\n\n==== End child stdout ====",
515         String::from_utf8_lossy(&output.stdout)
516     );
517     eprintln!(
518         "\n\n==== Start child stderr ====\n\n{}\n\n==== End child stderr ====",
519         String::from_utf8_lossy(&output.stderr)
520     );
521 
522     panic!("Test failed")
523 }
524 
525 #[derive(Debug)]
526 pub struct PasswordAuth {
527     pub username: String,
528     pub password: String,
529 }
530 
531 pub const DEFAULT_SSH_RETRIES: u8 = 6;
532 pub const DEFAULT_SSH_TIMEOUT: u8 = 10;
533 
534 #[derive(Debug)]
535 pub enum SshCommandError {
536     Connection(std::io::Error),
537     Handshake(ssh2::Error),
538     Authentication(ssh2::Error),
539     ChannelSession(ssh2::Error),
540     Command(ssh2::Error),
541     ExitStatus(ssh2::Error),
542     NonZeroExitStatus(i32),
543     FileRead(std::io::Error),
544     FileMetadata(std::io::Error),
545     ScpSend(ssh2::Error),
546     WriteAll(std::io::Error),
547     SendEof(ssh2::Error),
548     WaitEof(ssh2::Error),
549 }
550 
551 fn scp_to_guest_with_auth(
552     path: &Path,
553     remote_path: &Path,
554     auth: &PasswordAuth,
555     ip: &str,
556     retries: u8,
557     timeout: u8,
558 ) -> Result<(), SshCommandError> {
559     let mut counter = 0;
560     loop {
561         match (|| -> Result<(), SshCommandError> {
562             let tcp =
563                 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?;
564             let mut sess = Session::new().unwrap();
565             sess.set_tcp_stream(tcp);
566             sess.handshake().map_err(SshCommandError::Handshake)?;
567 
568             sess.userauth_password(&auth.username, &auth.password)
569                 .map_err(SshCommandError::Authentication)?;
570             assert!(sess.authenticated());
571 
572             let content = fs::read(path).map_err(SshCommandError::FileRead)?;
573             let mode = fs::metadata(path)
574                 .map_err(SshCommandError::FileMetadata)?
575                 .permissions()
576                 .mode()
577                 & 0o777;
578 
579             let mut channel = sess
580                 .scp_send(remote_path, mode as i32, content.len() as u64, None)
581                 .map_err(SshCommandError::ScpSend)?;
582             channel
583                 .write_all(&content)
584                 .map_err(SshCommandError::WriteAll)?;
585             channel.send_eof().map_err(SshCommandError::SendEof)?;
586             channel.wait_eof().map_err(SshCommandError::WaitEof)?;
587 
588             // Intentionally ignore these results here as their failure
589             // does not precipitate a repeat
590             let _ = channel.close();
591             let _ = channel.wait_close();
592 
593             Ok(())
594         })() {
595             Ok(_) => break,
596             Err(e) => {
597                 counter += 1;
598                 if counter >= retries {
599                     eprintln!(
600                         "\n\n==== Start scp command output (FAILED) ====\n\n\
601                          path =\"{path:?}\"\n\
602                          remote_path =\"{remote_path:?}\"\n\
603                          auth=\"{auth:#?}\"\n\
604                          ip=\"{ip}\"\n\
605                          error=\"{e:?}\"\n\
606                          \n==== End scp command outout ====\n\n"
607                     );
608 
609                     return Err(e);
610                 }
611             }
612         };
613         thread::sleep(std::time::Duration::new((timeout * counter).into(), 0));
614     }
615     Ok(())
616 }
617 
618 pub fn scp_to_guest(
619     path: &Path,
620     remote_path: &Path,
621     ip: &str,
622     retries: u8,
623     timeout: u8,
624 ) -> Result<(), SshCommandError> {
625     scp_to_guest_with_auth(
626         path,
627         remote_path,
628         &PasswordAuth {
629             username: String::from("cloud"),
630             password: String::from("cloud123"),
631         },
632         ip,
633         retries,
634         timeout,
635     )
636 }
637 
638 pub fn ssh_command_ip_with_auth(
639     command: &str,
640     auth: &PasswordAuth,
641     ip: &str,
642     retries: u8,
643     timeout: u8,
644 ) -> Result<String, SshCommandError> {
645     let mut s = String::new();
646 
647     let mut counter = 0;
648     loop {
649         match (|| -> Result<(), SshCommandError> {
650             let tcp =
651                 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?;
652             let mut sess = Session::new().unwrap();
653             sess.set_tcp_stream(tcp);
654             sess.handshake().map_err(SshCommandError::Handshake)?;
655 
656             sess.userauth_password(&auth.username, &auth.password)
657                 .map_err(SshCommandError::Authentication)?;
658             assert!(sess.authenticated());
659 
660             let mut channel = sess
661                 .channel_session()
662                 .map_err(SshCommandError::ChannelSession)?;
663             channel.exec(command).map_err(SshCommandError::Command)?;
664 
665             // Intentionally ignore these results here as their failure
666             // does not precipitate a repeat
667             let _ = channel.read_to_string(&mut s);
668             let _ = channel.close();
669             let _ = channel.wait_close();
670 
671             let status = channel.exit_status().map_err(SshCommandError::ExitStatus)?;
672 
673             if status != 0 {
674                 Err(SshCommandError::NonZeroExitStatus(status))
675             } else {
676                 Ok(())
677             }
678         })() {
679             Ok(_) => break,
680             Err(e) => {
681                 counter += 1;
682                 if counter >= retries {
683                     eprintln!(
684                         "\n\n==== Start ssh command output (FAILED) ====\n\n\
685                          command=\"{command}\"\n\
686                          auth=\"{auth:#?}\"\n\
687                          ip=\"{ip}\"\n\
688                          output=\"{s}\"\n\
689                          error=\"{e:?}\"\n\
690                          \n==== End ssh command outout ====\n\n"
691                     );
692 
693                     return Err(e);
694                 }
695             }
696         };
697         thread::sleep(std::time::Duration::new((timeout * counter).into(), 0));
698     }
699     Ok(s)
700 }
701 
702 pub fn ssh_command_ip(
703     command: &str,
704     ip: &str,
705     retries: u8,
706     timeout: u8,
707 ) -> Result<String, SshCommandError> {
708     ssh_command_ip_with_auth(
709         command,
710         &PasswordAuth {
711             username: String::from("cloud"),
712             password: String::from("cloud123"),
713         },
714         ip,
715         retries,
716         timeout,
717     )
718 }
719 
720 pub fn exec_host_command_status(command: &str) -> ExitStatus {
721     std::process::Command::new("bash")
722         .args(["-c", command])
723         .status()
724         .unwrap_or_else(|_| panic!("Expected '{command}' to run"))
725 }
726 
727 pub fn exec_host_command_output(command: &str) -> Output {
728     std::process::Command::new("bash")
729         .args(["-c", command])
730         .output()
731         .unwrap_or_else(|_| panic!("Expected '{command}' to run"))
732 }
733 
734 pub const PIPE_SIZE: i32 = 32 << 20;
735 
736 static NEXT_VM_ID: Lazy<Mutex<u8>> = Lazy::new(|| Mutex::new(1));
737 
738 pub struct Guest {
739     pub tmp_dir: TempDir,
740     pub disk_config: Box<dyn DiskConfig>,
741     pub network: GuestNetworkConfig,
742 }
743 
744 // Safe to implement as we know we have no interior mutability
745 impl std::panic::RefUnwindSafe for Guest {}
746 
747 impl Guest {
748     pub fn new_from_ip_range(mut disk_config: Box<dyn DiskConfig>, class: &str, id: u8) -> Self {
749         let tmp_dir = TempDir::new_with_prefix("/tmp/ch").unwrap();
750 
751         let network = GuestNetworkConfig {
752             guest_ip: format!("{class}.{id}.2"),
753             l2_guest_ip1: format!("{class}.{id}.3"),
754             l2_guest_ip2: format!("{class}.{id}.4"),
755             l2_guest_ip3: format!("{class}.{id}.5"),
756             host_ip: format!("{class}.{id}.1"),
757             guest_mac: format!("12:34:56:78:90:{id:02x}"),
758             l2_guest_mac1: format!("de:ad:be:ef:12:{id:02x}"),
759             l2_guest_mac2: format!("de:ad:be:ef:34:{id:02x}"),
760             l2_guest_mac3: format!("de:ad:be:ef:56:{id:02x}"),
761             tcp_listener_port: DEFAULT_TCP_LISTENER_PORT + id as u16,
762         };
763 
764         disk_config.prepare_files(&tmp_dir, &network);
765 
766         Guest {
767             tmp_dir,
768             disk_config,
769             network,
770         }
771     }
772 
773     pub fn new(disk_config: Box<dyn DiskConfig>) -> Self {
774         let mut guard = NEXT_VM_ID.lock().unwrap();
775         let id = *guard;
776         *guard = id + 1;
777 
778         Self::new_from_ip_range(disk_config, "192.168", id)
779     }
780 
781     pub fn default_net_string(&self) -> String {
782         format!(
783             "tap=,mac={},ip={},mask=255.255.255.0",
784             self.network.guest_mac, self.network.host_ip
785         )
786     }
787 
788     pub fn default_net_string_w_iommu(&self) -> String {
789         format!(
790             "tap=,mac={},ip={},mask=255.255.255.0,iommu=on",
791             self.network.guest_mac, self.network.host_ip
792         )
793     }
794 
795     pub fn default_net_string_w_mtu(&self, mtu: u16) -> String {
796         format!(
797             "tap=,mac={},ip={},mask=255.255.255.0,mtu={}",
798             self.network.guest_mac, self.network.host_ip, mtu
799         )
800     }
801 
802     pub fn ssh_command(&self, command: &str) -> Result<String, SshCommandError> {
803         ssh_command_ip(
804             command,
805             &self.network.guest_ip,
806             DEFAULT_SSH_RETRIES,
807             DEFAULT_SSH_TIMEOUT,
808         )
809     }
810 
811     #[cfg(target_arch = "x86_64")]
812     pub fn ssh_command_l1(&self, command: &str) -> Result<String, SshCommandError> {
813         ssh_command_ip(
814             command,
815             &self.network.guest_ip,
816             DEFAULT_SSH_RETRIES,
817             DEFAULT_SSH_TIMEOUT,
818         )
819     }
820 
821     #[cfg(target_arch = "x86_64")]
822     pub fn ssh_command_l2_1(&self, command: &str) -> Result<String, SshCommandError> {
823         ssh_command_ip(
824             command,
825             &self.network.l2_guest_ip1,
826             DEFAULT_SSH_RETRIES,
827             DEFAULT_SSH_TIMEOUT,
828         )
829     }
830 
831     #[cfg(target_arch = "x86_64")]
832     pub fn ssh_command_l2_2(&self, command: &str) -> Result<String, SshCommandError> {
833         ssh_command_ip(
834             command,
835             &self.network.l2_guest_ip2,
836             DEFAULT_SSH_RETRIES,
837             DEFAULT_SSH_TIMEOUT,
838         )
839     }
840 
841     #[cfg(target_arch = "x86_64")]
842     pub fn ssh_command_l2_3(&self, command: &str) -> Result<String, SshCommandError> {
843         ssh_command_ip(
844             command,
845             &self.network.l2_guest_ip3,
846             DEFAULT_SSH_RETRIES,
847             DEFAULT_SSH_TIMEOUT,
848         )
849     }
850 
851     pub fn api_create_body(&self, cpu_count: u8, kernel_path: &str, kernel_cmd: &str) -> String {
852         format! {"{{\"cpus\":{{\"boot_vcpus\":{},\"max_vcpus\":{}}},\"payload\":{{\"kernel\":\"{}\",\"cmdline\": \"{}\"}},\"net\":[{{\"ip\":\"{}\", \"mask\":\"255.255.255.0\", \"mac\":\"{}\"}}], \"disks\":[{{\"path\":\"{}\"}}, {{\"path\":\"{}\"}}]}}",
853                  cpu_count,
854                  cpu_count,
855                  kernel_path,
856                  kernel_cmd,
857                  self.network.host_ip,
858                  self.network.guest_mac,
859                  self.disk_config.disk(DiskType::OperatingSystem).unwrap().as_str(),
860                  self.disk_config.disk(DiskType::CloudInit).unwrap().as_str(),
861         }
862     }
863 
864     pub fn get_cpu_count(&self) -> Result<u32, Error> {
865         self.ssh_command("grep -c processor /proc/cpuinfo")?
866             .trim()
867             .parse()
868             .map_err(Error::Parsing)
869     }
870 
871     #[cfg(target_arch = "x86_64")]
872     pub fn get_initial_apicid(&self) -> Result<u32, Error> {
873         self.ssh_command("grep \"initial apicid\" /proc/cpuinfo | grep -o \"[0-9]*\"")?
874             .trim()
875             .parse()
876             .map_err(Error::Parsing)
877     }
878 
879     pub fn get_total_memory(&self) -> Result<u32, Error> {
880         self.ssh_command("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")?
881             .trim()
882             .parse()
883             .map_err(Error::Parsing)
884     }
885 
886     #[cfg(target_arch = "x86_64")]
887     pub fn get_total_memory_l2(&self) -> Result<u32, Error> {
888         self.ssh_command_l2_1("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")?
889             .trim()
890             .parse()
891             .map_err(Error::Parsing)
892     }
893 
894     pub fn get_numa_node_memory(&self, node_id: usize) -> Result<u32, Error> {
895         self.ssh_command(
896             format!(
897                 "grep MemTotal /sys/devices/system/node/node{node_id}/meminfo \
898                         | cut -d \":\" -f 2 | grep -o \"[0-9]*\""
899             )
900             .as_str(),
901         )?
902         .trim()
903         .parse()
904         .map_err(Error::Parsing)
905     }
906 
907     pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), Error> {
908         self.network
909             .wait_vm_boot(custom_timeout)
910             .map_err(Error::WaitForBoot)
911     }
912 
913     pub fn check_numa_node_cpus(&self, node_id: usize, cpus: Vec<usize>) -> Result<(), Error> {
914         for cpu in cpus.iter() {
915             let cmd = format!("[ -d \"/sys/devices/system/node/node{node_id}/cpu{cpu}\" ]");
916             self.ssh_command(cmd.as_str())?;
917         }
918 
919         Ok(())
920     }
921 
922     pub fn check_numa_node_distances(
923         &self,
924         node_id: usize,
925         distances: &str,
926     ) -> Result<bool, Error> {
927         let cmd = format!("cat /sys/devices/system/node/node{node_id}/distance");
928         if self.ssh_command(cmd.as_str())?.trim() == distances {
929             Ok(true)
930         } else {
931             Ok(false)
932         }
933     }
934 
935     pub fn check_numa_common(
936         &self,
937         mem_ref: Option<&[u32]>,
938         node_ref: Option<&[Vec<usize>]>,
939         distance_ref: Option<&[&str]>,
940     ) {
941         if let Some(mem_ref) = mem_ref {
942             // Check each NUMA node has been assigned the right amount of
943             // memory.
944             for (i, &m) in mem_ref.iter().enumerate() {
945                 assert!(self.get_numa_node_memory(i).unwrap_or_default() > m);
946             }
947         }
948 
949         if let Some(node_ref) = node_ref {
950             // Check each NUMA node has been assigned the right CPUs set.
951             for (i, n) in node_ref.iter().enumerate() {
952                 self.check_numa_node_cpus(i, n.clone()).unwrap();
953             }
954         }
955 
956         if let Some(distance_ref) = distance_ref {
957             // Check each NUMA node has been assigned the right distances.
958             for (i, &d) in distance_ref.iter().enumerate() {
959                 assert!(self.check_numa_node_distances(i, d).unwrap());
960             }
961         }
962     }
963 
964     #[cfg(target_arch = "x86_64")]
965     pub fn check_sgx_support(&self) -> Result<(), Error> {
966         self.ssh_command(
967             "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX: \
968                     Software Guard Extensions supported = true'",
969         )?;
970         self.ssh_command(
971             "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX_LC: \
972                     SGX launch config supported = true'",
973         )?;
974         self.ssh_command(
975             "cpuid -l 0x12 -s 0 | tr -s [:space:] | grep -q 'SGX1 \
976                     supported = true'",
977         )?;
978 
979         Ok(())
980     }
981 
982     pub fn get_pci_bridge_class(&self) -> Result<String, Error> {
983         Ok(self
984             .ssh_command("cat /sys/bus/pci/devices/0000:00:00.0/class")?
985             .trim()
986             .to_string())
987     }
988 
989     pub fn get_pci_device_ids(&self) -> Result<String, Error> {
990         Ok(self
991             .ssh_command("cat /sys/bus/pci/devices/*/device")?
992             .trim()
993             .to_string())
994     }
995 
996     pub fn get_pci_vendor_ids(&self) -> Result<String, Error> {
997         Ok(self
998             .ssh_command("cat /sys/bus/pci/devices/*/vendor")?
999             .trim()
1000             .to_string())
1001     }
1002 
1003     pub fn does_device_vendor_pair_match(
1004         &self,
1005         device_id: &str,
1006         vendor_id: &str,
1007     ) -> Result<bool, Error> {
1008         // We are checking if console device's device id and vendor id pair matches
1009         let devices = self.get_pci_device_ids()?;
1010         let devices: Vec<&str> = devices.split('\n').collect();
1011         let vendors = self.get_pci_vendor_ids()?;
1012         let vendors: Vec<&str> = vendors.split('\n').collect();
1013 
1014         for (index, d_id) in devices.iter().enumerate() {
1015             if *d_id == device_id {
1016                 if let Some(v_id) = vendors.get(index) {
1017                     if *v_id == vendor_id {
1018                         return Ok(true);
1019                     }
1020                 }
1021             }
1022         }
1023 
1024         Ok(false)
1025     }
1026 
1027     pub fn check_vsock(&self, socket: &str) {
1028         // Listen from guest on vsock CID=3 PORT=16
1029         // SOCKET-LISTEN:<domain>:<protocol>:<local-address>
1030         let guest_ip = self.network.guest_ip.clone();
1031         let listen_socat = thread::spawn(move || {
1032             ssh_command_ip("sudo socat - SOCKET-LISTEN:40:0:x00x00x10x00x00x00x03x00x00x00x00x00x00x00 > vsock_log", &guest_ip, DEFAULT_SSH_RETRIES, DEFAULT_SSH_TIMEOUT).unwrap();
1033         });
1034 
1035         // Make sure socat is listening, which might take a few second on slow systems
1036         thread::sleep(std::time::Duration::new(10, 0));
1037 
1038         // Write something to vsock from the host
1039         assert!(exec_host_command_status(&format!(
1040             "echo -e \"CONNECT 16\\nHelloWorld!\" | socat - UNIX-CONNECT:{socket}"
1041         ))
1042         .success());
1043 
1044         // Wait for the thread to terminate.
1045         listen_socat.join().unwrap();
1046 
1047         assert_eq!(
1048             self.ssh_command("cat vsock_log").unwrap().trim(),
1049             "HelloWorld!"
1050         );
1051     }
1052 
1053     #[cfg(target_arch = "x86_64")]
1054     pub fn check_nvidia_gpu(&self) {
1055         assert!(self.ssh_command("nvidia-smi").unwrap().contains("Tesla T4"));
1056     }
1057 
1058     pub fn reboot_linux(&self, current_reboot_count: u32, custom_timeout: Option<i32>) {
1059         let list_boots_cmd = "sudo last | grep -c reboot";
1060         let boot_count = self
1061             .ssh_command(list_boots_cmd)
1062             .unwrap()
1063             .trim()
1064             .parse::<u32>()
1065             .unwrap_or_default();
1066 
1067         assert_eq!(boot_count, current_reboot_count + 1);
1068         self.ssh_command("sudo reboot").unwrap();
1069 
1070         self.wait_vm_boot(custom_timeout).unwrap();
1071         let boot_count = self
1072             .ssh_command(list_boots_cmd)
1073             .unwrap()
1074             .trim()
1075             .parse::<u32>()
1076             .unwrap_or_default();
1077         assert_eq!(boot_count, current_reboot_count + 2);
1078     }
1079 
1080     pub fn enable_memory_hotplug(&self) {
1081         self.ssh_command("echo online | sudo tee /sys/devices/system/memory/auto_online_blocks")
1082             .unwrap();
1083     }
1084 
1085     pub fn check_devices_common(
1086         &self,
1087         socket: Option<&String>,
1088         console_text: Option<&String>,
1089         pmem_path: Option<&String>,
1090     ) {
1091         // Check block devices are readable
1092         self.ssh_command("sudo dd if=/dev/vda of=/dev/null bs=1M iflag=direct count=1024")
1093             .unwrap();
1094         self.ssh_command("sudo dd if=/dev/vdb of=/dev/null bs=1M iflag=direct count=8")
1095             .unwrap();
1096         // Check if the rng device is readable
1097         self.ssh_command("sudo head -c 1000 /dev/hwrng > /dev/null")
1098             .unwrap();
1099         // Check vsock
1100         if let Some(socket) = socket {
1101             self.check_vsock(socket.as_str());
1102         }
1103         // Check if the console is usable
1104         if let Some(console_text) = console_text {
1105             let console_cmd = format!("echo {console_text} | sudo tee /dev/hvc0");
1106             self.ssh_command(&console_cmd).unwrap();
1107         }
1108         // The net device is 'automatically' exercised through the above 'ssh' commands
1109 
1110         // Check if the pmem device is usable
1111         if let Some(pmem_path) = pmem_path {
1112             assert_eq!(
1113                 self.ssh_command(&format!("ls {pmem_path}")).unwrap().trim(),
1114                 pmem_path
1115             );
1116             assert_eq!(
1117                 self.ssh_command(&format!("sudo mount {pmem_path} /mnt"))
1118                     .unwrap(),
1119                 ""
1120             );
1121             assert_eq!(self.ssh_command("ls /mnt").unwrap(), "lost+found\n");
1122             self.ssh_command("echo test123 | sudo tee /mnt/test")
1123                 .unwrap();
1124             assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), "");
1125             assert_eq!(self.ssh_command("ls /mnt").unwrap(), "");
1126 
1127             assert_eq!(
1128                 self.ssh_command(&format!("sudo mount {pmem_path} /mnt"))
1129                     .unwrap(),
1130                 ""
1131             );
1132             assert_eq!(
1133                 self.ssh_command("sudo cat /mnt/test || true")
1134                     .unwrap()
1135                     .trim(),
1136                 "test123"
1137             );
1138             self.ssh_command("sudo rm /mnt/test").unwrap();
1139             assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), "");
1140         }
1141     }
1142 }
1143 
1144 pub enum VerbosityLevel {
1145     Warn,
1146     Info,
1147     Debug,
1148 }
1149 
1150 impl Default for VerbosityLevel {
1151     fn default() -> Self {
1152         Self::Warn
1153     }
1154 }
1155 
1156 impl ToString for VerbosityLevel {
1157     fn to_string(&self) -> String {
1158         use VerbosityLevel::*;
1159         match self {
1160             Warn => "".to_string(),
1161             Info => "-v".to_string(),
1162             Debug => "-vv".to_string(),
1163         }
1164     }
1165 }
1166 
1167 pub struct GuestCommand<'a> {
1168     command: Command,
1169     guest: &'a Guest,
1170     capture_output: bool,
1171     print_cmd: bool,
1172     verbosity: VerbosityLevel,
1173 }
1174 
1175 impl<'a> GuestCommand<'a> {
1176     pub fn new(guest: &'a Guest) -> Self {
1177         Self::new_with_binary_path(guest, &clh_command("cloud-hypervisor"))
1178     }
1179 
1180     pub fn new_with_binary_path(guest: &'a Guest, binary_path: &str) -> Self {
1181         Self {
1182             command: Command::new(binary_path),
1183             guest,
1184             capture_output: false,
1185             print_cmd: true,
1186             verbosity: VerbosityLevel::Info,
1187         }
1188     }
1189 
1190     pub fn verbosity(&mut self, verbosity: VerbosityLevel) -> &mut Self {
1191         self.verbosity = verbosity;
1192         self
1193     }
1194 
1195     pub fn capture_output(&mut self) -> &mut Self {
1196         self.capture_output = true;
1197         self
1198     }
1199 
1200     pub fn set_print_cmd(&mut self, print_cmd: bool) -> &mut Self {
1201         self.print_cmd = print_cmd;
1202         self
1203     }
1204 
1205     pub fn spawn(&mut self) -> io::Result<Child> {
1206         use VerbosityLevel::*;
1207         match &self.verbosity {
1208             Warn => {}
1209             Info => {
1210                 self.command.arg("-v");
1211             }
1212             Debug => {
1213                 self.command.arg("-vv");
1214             }
1215         };
1216 
1217         if self.print_cmd {
1218             println!(
1219                 "\n\n==== Start cloud-hypervisor command-line ====\n\n\
1220                      {:?}\n\
1221                      \n==== End cloud-hypervisor command-line ====\n\n",
1222                 self.command
1223             );
1224         }
1225 
1226         if self.capture_output {
1227             let child = self
1228                 .command
1229                 .stderr(Stdio::piped())
1230                 .stdout(Stdio::piped())
1231                 .spawn()
1232                 .unwrap();
1233 
1234             let fd = child.stdout.as_ref().unwrap().as_raw_fd();
1235             let pipesize = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) };
1236             if pipesize == -1 {
1237                 return Err(io::Error::last_os_error());
1238             }
1239             let fd = child.stderr.as_ref().unwrap().as_raw_fd();
1240             let pipesize1 = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) };
1241             if pipesize1 == -1 {
1242                 return Err(io::Error::last_os_error());
1243             }
1244 
1245             if pipesize >= PIPE_SIZE && pipesize1 >= PIPE_SIZE {
1246                 Ok(child)
1247             } else {
1248                 Err(std::io::Error::new(
1249                     std::io::ErrorKind::Other,
1250                     format!(
1251                         "resizing pipe w/ 'fnctl' failed: stdout pipesize {pipesize}, stderr pipesize {pipesize1}"
1252                     ),
1253                 ))
1254             }
1255         } else {
1256             self.command.spawn()
1257         }
1258     }
1259 
1260     pub fn args<I, S>(&mut self, args: I) -> &mut Self
1261     where
1262         I: IntoIterator<Item = S>,
1263         S: AsRef<OsStr>,
1264     {
1265         self.command.args(args);
1266         self
1267     }
1268 
1269     pub fn default_disks(&mut self) -> &mut Self {
1270         if self.guest.disk_config.disk(DiskType::CloudInit).is_some() {
1271             self.args([
1272                 "--disk",
1273                 format!(
1274                     "path={}",
1275                     self.guest
1276                         .disk_config
1277                         .disk(DiskType::OperatingSystem)
1278                         .unwrap()
1279                 )
1280                 .as_str(),
1281                 format!(
1282                     "path={}",
1283                     self.guest.disk_config.disk(DiskType::CloudInit).unwrap()
1284                 )
1285                 .as_str(),
1286             ])
1287         } else {
1288             self.args([
1289                 "--disk",
1290                 format!(
1291                     "path={}",
1292                     self.guest
1293                         .disk_config
1294                         .disk(DiskType::OperatingSystem)
1295                         .unwrap()
1296                 )
1297                 .as_str(),
1298             ])
1299         }
1300     }
1301 
1302     pub fn default_net(&mut self) -> &mut Self {
1303         self.args(["--net", self.guest.default_net_string().as_str()])
1304     }
1305 }
1306 
1307 pub fn clh_command(cmd: &str) -> String {
1308     env::var("BUILD_TARGET").map_or(
1309         format!("target/x86_64-unknown-linux-gnu/release/{cmd}"),
1310         |target| format!("target/{target}/release/{cmd}"),
1311     )
1312 }
1313 
1314 pub fn parse_iperf3_output(output: &[u8], sender: bool) -> Result<f64, Error> {
1315     std::panic::catch_unwind(|| {
1316         let s = String::from_utf8_lossy(output);
1317         let v: Value = serde_json::from_str(&s).expect("'iperf3' parse error: invalid json output");
1318 
1319         let bps: f64 = if sender {
1320             v["end"]["sum_sent"]["bits_per_second"]
1321                 .as_f64()
1322                 .expect("'iperf3' parse error: missing entry 'end.sum_sent.bits_per_second'")
1323         } else {
1324             v["end"]["sum_received"]["bits_per_second"]
1325                 .as_f64()
1326                 .expect("'iperf3' parse error: missing entry 'end.sum_received.bits_per_second'")
1327         };
1328 
1329         bps
1330     })
1331     .map_err(|_| {
1332         eprintln!(
1333             "=============== iperf3 output ===============\n\n{}\n\n===========end============\n\n",
1334             String::from_utf8_lossy(output)
1335         );
1336         Error::Iperf3Parse
1337     })
1338 }
1339 
1340 pub enum FioOps {
1341     Read,
1342     RandomRead,
1343     Write,
1344     RandomWrite,
1345     ReadWrite,
1346     RandRW,
1347 }
1348 
1349 impl fmt::Display for FioOps {
1350     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1351         match self {
1352             FioOps::Read => write!(f, "read"),
1353             FioOps::RandomRead => write!(f, "randread"),
1354             FioOps::Write => write!(f, "write"),
1355             FioOps::RandomWrite => write!(f, "randwrite"),
1356             FioOps::ReadWrite => write!(f, "rw"),
1357             FioOps::RandRW => write!(f, "randrw"),
1358         }
1359     }
1360 }
1361 
1362 pub fn parse_fio_output(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> {
1363     std::panic::catch_unwind(|| {
1364         let v: Value =
1365             serde_json::from_str(output).expect("'fio' parse error: invalid json output");
1366         let jobs = v["jobs"]
1367             .as_array()
1368             .expect("'fio' parse error: missing entry 'jobs'");
1369         assert_eq!(
1370             jobs.len(),
1371             num_jobs as usize,
1372             "'fio' parse error: Unexpected number of 'fio' jobs."
1373         );
1374 
1375         let (read, write) = match fio_ops {
1376             FioOps::Read | FioOps::RandomRead => (true, false),
1377             FioOps::Write | FioOps::RandomWrite => (false, true),
1378             FioOps::ReadWrite | FioOps::RandRW => (true, true),
1379         };
1380 
1381         let mut total_bps = 0_f64;
1382         for j in jobs {
1383             if read {
1384                 let bytes = j["read"]["io_bytes"]
1385                     .as_u64()
1386                     .expect("'fio' parse error: missing entry 'read.io_bytes'");
1387                 let runtime = j["read"]["runtime"]
1388                     .as_u64()
1389                     .expect("'fio' parse error: missing entry 'read.runtime'")
1390                     as f64
1391                     / 1000_f64;
1392                 total_bps += bytes as f64 / runtime;
1393             }
1394             if write {
1395                 let bytes = j["write"]["io_bytes"]
1396                     .as_u64()
1397                     .expect("'fio' parse error: missing entry 'write.io_bytes'");
1398                 let runtime = j["write"]["runtime"]
1399                     .as_u64()
1400                     .expect("'fio' parse error: missing entry 'write.runtime'")
1401                     as f64
1402                     / 1000_f64;
1403                 total_bps += bytes as f64 / runtime;
1404             }
1405         }
1406 
1407         total_bps
1408     })
1409     .map_err(|_| {
1410         eprintln!(
1411             "=============== Fio output ===============\n\n{output}\n\n===========end============\n\n"
1412         );
1413         Error::FioOutputParse
1414     })
1415 }
1416 
1417 pub fn parse_fio_output_iops(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> {
1418     std::panic::catch_unwind(|| {
1419         let v: Value =
1420             serde_json::from_str(output).expect("'fio' parse error: invalid json output");
1421         let jobs = v["jobs"]
1422             .as_array()
1423             .expect("'fio' parse error: missing entry 'jobs'");
1424         assert_eq!(
1425             jobs.len(),
1426             num_jobs as usize,
1427             "'fio' parse error: Unexpected number of 'fio' jobs."
1428         );
1429 
1430         let (read, write) = match fio_ops {
1431             FioOps::Read | FioOps::RandomRead => (true, false),
1432             FioOps::Write | FioOps::RandomWrite => (false, true),
1433             FioOps::ReadWrite | FioOps::RandRW => (true, true),
1434         };
1435 
1436         let mut total_iops = 0_f64;
1437         for j in jobs {
1438             if read {
1439                 let ios = j["read"]["total_ios"]
1440                     .as_u64()
1441                     .expect("'fio' parse error: missing entry 'read.total_ios'");
1442                 let runtime = j["read"]["runtime"]
1443                     .as_u64()
1444                     .expect("'fio' parse error: missing entry 'read.runtime'")
1445                     as f64
1446                     / 1000_f64;
1447                 total_iops += ios as f64 / runtime;
1448             }
1449             if write {
1450                 let ios = j["write"]["total_ios"]
1451                     .as_u64()
1452                     .expect("'fio' parse error: missing entry 'write.total_ios'");
1453                 let runtime = j["write"]["runtime"]
1454                     .as_u64()
1455                     .expect("'fio' parse error: missing entry 'write.runtime'")
1456                     as f64
1457                     / 1000_f64;
1458                 total_iops += ios as f64 / runtime;
1459             }
1460         }
1461 
1462         total_iops
1463     })
1464     .map_err(|_| {
1465         eprintln!(
1466             "=============== Fio output ===============\n\n{output}\n\n===========end============\n\n"
1467         );
1468         Error::FioOutputParse
1469     })
1470 }
1471 
1472 // Wait the child process for a given timeout
1473 fn child_wait_timeout(child: &mut Child, timeout: u64) -> Result<(), WaitTimeoutError> {
1474     match child.wait_timeout(Duration::from_secs(timeout)) {
1475         Err(e) => {
1476             return Err(WaitTimeoutError::General(e));
1477         }
1478         Ok(s) => match s {
1479             None => {
1480                 return Err(WaitTimeoutError::Timedout);
1481             }
1482             Some(s) => {
1483                 if !s.success() {
1484                     return Err(WaitTimeoutError::ExitStatus);
1485                 }
1486             }
1487         },
1488     }
1489 
1490     Ok(())
1491 }
1492 
1493 pub fn measure_virtio_net_throughput(
1494     test_timeout: u32,
1495     queue_pairs: u32,
1496     guest: &Guest,
1497     receive: bool,
1498 ) -> Result<f64, Error> {
1499     let default_port = 5201;
1500 
1501     // 1. start the iperf3 server on the guest
1502     for n in 0..queue_pairs {
1503         guest.ssh_command(&format!("iperf3 -s -p {} -D", default_port + n))?;
1504     }
1505 
1506     thread::sleep(Duration::new(1, 0));
1507 
1508     // 2. start the iperf3 client on host to measure RX through-put
1509     let mut clients = Vec::new();
1510     for n in 0..queue_pairs {
1511         let mut cmd = Command::new("iperf3");
1512         cmd.args([
1513             "-J", // Output in JSON format
1514             "-c",
1515             &guest.network.guest_ip,
1516             "-p",
1517             &format!("{}", default_port + n),
1518             "-t",
1519             &format!("{test_timeout}"),
1520         ]);
1521         // For measuring the guest transmit throughput (as a sender),
1522         // use reverse mode of the iperf3 client on the host
1523         if !receive {
1524             cmd.args(["-R"]);
1525         }
1526         let client = cmd
1527             .stderr(Stdio::piped())
1528             .stdout(Stdio::piped())
1529             .spawn()
1530             .map_err(Error::Spawn)?;
1531 
1532         clients.push(client);
1533     }
1534 
1535     let mut err: Option<Error> = None;
1536     let mut bps = Vec::new();
1537     let mut failed = false;
1538     for c in clients {
1539         let mut c = c;
1540         if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5) {
1541             err = Some(Error::WaitTimeout(e));
1542             failed = true;
1543         }
1544 
1545         if !failed {
1546             // Safe to unwrap as we know the child has terminated succesffully
1547             let output = c.wait_with_output().unwrap();
1548             bps.push(parse_iperf3_output(&output.stdout, receive)?);
1549         } else {
1550             let _ = c.kill();
1551             let output = c.wait_with_output().unwrap();
1552             println!(
1553                 "=============== Client output [Error] ===============\n\n{}\n\n===========end============\n\n",
1554                 String::from_utf8_lossy(&output.stdout)
1555             );
1556         }
1557     }
1558 
1559     if let Some(e) = err {
1560         Err(e)
1561     } else {
1562         Ok(bps.iter().sum())
1563     }
1564 }
1565 
1566 pub fn parse_ethr_latency_output(output: &[u8]) -> Result<Vec<f64>, Error> {
1567     std::panic::catch_unwind(|| {
1568         let s = String::from_utf8_lossy(output);
1569         let mut latency = Vec::new();
1570         for l in s.lines() {
1571             let v: Value = serde_json::from_str(l).expect("'ethr' parse error: invalid json line");
1572             // Skip header/summary lines
1573             if let Some(avg) = v["Avg"].as_str() {
1574                 // Assume the latency unit is always "us"
1575                 latency.push(
1576                     avg.split("us").collect::<Vec<&str>>()[0]
1577                         .parse::<f64>()
1578                         .expect("'ethr' parse error: invalid 'Avg' entry"),
1579                 );
1580             }
1581         }
1582 
1583         assert!(
1584             !latency.is_empty(),
1585             "'ethr' parse error: no valid latency data found"
1586         );
1587 
1588         latency
1589     })
1590     .map_err(|_| {
1591         eprintln!(
1592             "=============== ethr output ===============\n\n{}\n\n===========end============\n\n",
1593             String::from_utf8_lossy(output)
1594         );
1595         Error::EthrLogParse
1596     })
1597 }
1598 
1599 pub fn measure_virtio_net_latency(guest: &Guest, test_timeout: u32) -> Result<Vec<f64>, Error> {
1600     // copy the 'ethr' tool to the guest image
1601     let ethr_path = "/usr/local/bin/ethr";
1602     let ethr_remote_path = "/tmp/ethr";
1603     scp_to_guest(
1604         Path::new(ethr_path),
1605         Path::new(ethr_remote_path),
1606         &guest.network.guest_ip,
1607         //DEFAULT_SSH_RETRIES,
1608         1,
1609         DEFAULT_SSH_TIMEOUT,
1610     )?;
1611 
1612     // Start the ethr server on the guest
1613     guest.ssh_command(&format!("{ethr_remote_path} -s &> /dev/null &"))?;
1614 
1615     thread::sleep(Duration::new(10, 0));
1616 
1617     // Start the ethr client on the host
1618     let log_file = guest
1619         .tmp_dir
1620         .as_path()
1621         .join("ethr.client.log")
1622         .to_str()
1623         .unwrap()
1624         .to_string();
1625     let mut c = Command::new(ethr_path)
1626         .args([
1627             "-c",
1628             &guest.network.guest_ip,
1629             "-t",
1630             "l",
1631             "-o",
1632             &log_file, // file output is JSON format
1633             "-d",
1634             &format!("{test_timeout}s"),
1635         ])
1636         .stderr(Stdio::piped())
1637         .stdout(Stdio::piped())
1638         .spawn()
1639         .map_err(Error::Spawn)?;
1640 
1641     if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5).map_err(Error::WaitTimeout)
1642     {
1643         let _ = c.kill();
1644         return Err(e);
1645     }
1646 
1647     // Parse the ethr latency test output
1648     let content = fs::read(log_file).map_err(Error::EthrLogFile)?;
1649     parse_ethr_latency_output(&content)
1650 }
1651