1 // Copyright © 2021 Intel Corporation 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 6 #![allow(clippy::undocumented_unsafe_blocks)] 7 8 use std::ffi::OsStr; 9 use std::fmt::Display; 10 use std::io::{Read, Write}; 11 use std::net::{TcpListener, TcpStream}; 12 use std::os::unix::fs::PermissionsExt; 13 use std::os::unix::io::{AsRawFd, FromRawFd}; 14 use std::path::Path; 15 use std::process::{Child, Command, ExitStatus, Output, Stdio}; 16 use std::str::FromStr; 17 use std::sync::{LazyLock, Mutex}; 18 use std::time::Duration; 19 use std::{env, fmt, fs, io, thread}; 20 21 use serde_json::Value; 22 use ssh2::Session; 23 use thiserror::Error; 24 use vmm_sys_util::tempdir::TempDir; 25 use wait_timeout::ChildExt; 26 27 #[derive(Error, Debug)] 28 pub enum WaitTimeoutError { 29 #[error("timeout")] 30 Timedout, 31 #[error("exit status indicates failure")] 32 ExitStatus, 33 #[error("general failure")] 34 General(#[source] std::io::Error), 35 } 36 37 #[derive(Error, Debug)] 38 pub enum Error { 39 #[error("Failed to parse")] 40 Parsing(#[source] std::num::ParseIntError), 41 #[error("ssh command failed")] 42 SshCommand(#[from] SshCommandError), 43 #[error("waiting for boot failed")] 44 WaitForBoot(#[source] WaitForBootError), 45 #[error("reading log file failed")] 46 EthrLogFile(#[source] std::io::Error), 47 #[error("parsing log file failed")] 48 EthrLogParse, 49 #[error("parsing fio output failed")] 50 FioOutputParse, 51 #[error("parsing iperf3 output failed")] 52 Iperf3Parse, 53 #[error("spawning process failed")] 54 Spawn(#[source] std::io::Error), 55 #[error("waiting for timeout failed")] 56 WaitTimeout(#[source] WaitTimeoutError), 57 } 58 59 pub struct GuestNetworkConfig { 60 pub guest_ip: String, 61 pub l2_guest_ip1: String, 62 pub l2_guest_ip2: String, 63 pub l2_guest_ip3: String, 64 pub host_ip: String, 65 pub guest_mac: String, 66 pub l2_guest_mac1: String, 67 pub l2_guest_mac2: String, 68 pub l2_guest_mac3: String, 69 pub tcp_listener_port: u16, 70 } 71 72 pub const DEFAULT_TCP_LISTENER_MESSAGE: &str = "booted"; 73 pub const DEFAULT_TCP_LISTENER_PORT: u16 = 8000; 74 pub const DEFAULT_TCP_LISTENER_TIMEOUT: i32 = 120; 75 76 #[derive(Error, Debug)] 77 pub enum WaitForBootError { 78 #[error("Failed to wait for epoll")] 79 EpollWait(#[source] std::io::Error), 80 #[error("Failed to listen for boot")] 81 Listen(#[source] std::io::Error), 82 #[error("Epoll wait timeout")] 83 EpollWaitTimeout, 84 #[error("wrong guest address")] 85 WrongGuestAddr, 86 #[error("Failed to accept a TCP request")] 87 Accept(#[source] std::io::Error), 88 } 89 90 impl GuestNetworkConfig { 91 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), WaitForBootError> { 92 let start = std::time::Instant::now(); 93 // The 'port' is unique per 'GUEST' and listening to wild-card ip avoids retrying on 'TcpListener::bind()' 94 let listen_addr = format!("0.0.0.0:{}", self.tcp_listener_port); 95 let expected_guest_addr = self.guest_ip.as_str(); 96 let mut s = String::new(); 97 let timeout = match custom_timeout { 98 Some(t) => t, 99 None => DEFAULT_TCP_LISTENER_TIMEOUT, 100 }; 101 102 let mut closure = || -> Result<(), WaitForBootError> { 103 let listener = 104 TcpListener::bind(listen_addr.as_str()).map_err(WaitForBootError::Listen)?; 105 listener 106 .set_nonblocking(true) 107 .expect("Cannot set non-blocking for tcp listener"); 108 109 // Reply on epoll w/ timeout to wait for guest connections faithfully 110 let epoll_fd = epoll::create(true).expect("Cannot create epoll fd"); 111 // Use 'File' to enforce closing on 'epoll_fd' 112 let _epoll_file = unsafe { fs::File::from_raw_fd(epoll_fd) }; 113 epoll::ctl( 114 epoll_fd, 115 epoll::ControlOptions::EPOLL_CTL_ADD, 116 listener.as_raw_fd(), 117 epoll::Event::new(epoll::Events::EPOLLIN, 0), 118 ) 119 .expect("Cannot add 'tcp_listener' event to epoll"); 120 let mut events = [epoll::Event::new(epoll::Events::empty(), 0); 1]; 121 loop { 122 let num_events = match epoll::wait(epoll_fd, timeout * 1000_i32, &mut events[..]) { 123 Ok(num_events) => Ok(num_events), 124 Err(e) => match e.raw_os_error() { 125 Some(libc::EAGAIN) | Some(libc::EINTR) => continue, 126 _ => Err(e), 127 }, 128 } 129 .map_err(WaitForBootError::EpollWait)?; 130 if num_events == 0 { 131 return Err(WaitForBootError::EpollWaitTimeout); 132 } 133 break; 134 } 135 136 match listener.accept() { 137 Ok((_, addr)) => { 138 // Make sure the connection is from the expected 'guest_addr' 139 if addr.ip() != std::net::IpAddr::from_str(expected_guest_addr).unwrap() { 140 s = format!( 141 "Expecting the guest ip '{}' while being connected with ip '{}'", 142 expected_guest_addr, 143 addr.ip() 144 ); 145 return Err(WaitForBootError::WrongGuestAddr); 146 } 147 148 Ok(()) 149 } 150 Err(e) => { 151 s = "TcpListener::accept() failed".to_string(); 152 Err(WaitForBootError::Accept(e)) 153 } 154 } 155 }; 156 157 match closure() { 158 Err(e) => { 159 let duration = start.elapsed(); 160 eprintln!( 161 "\n\n==== Start 'wait_vm_boot' (FAILED) ==== \ 162 \n\nduration =\"{duration:?}, timeout = {timeout}s\" \ 163 \nlisten_addr=\"{listen_addr}\" \ 164 \nexpected_guest_addr=\"{expected_guest_addr}\" \ 165 \nmessage=\"{s}\" \ 166 \nerror=\"{e:?}\" \ 167 \n\n==== End 'wait_vm_boot' outout ====\n\n" 168 ); 169 170 Err(e) 171 } 172 Ok(_) => Ok(()), 173 } 174 } 175 } 176 177 pub enum DiskType { 178 OperatingSystem, 179 CloudInit, 180 } 181 182 pub trait DiskConfig { 183 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig); 184 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String; 185 fn disk(&self, disk_type: DiskType) -> Option<String>; 186 } 187 188 #[derive(Clone)] 189 pub struct UbuntuDiskConfig { 190 osdisk_path: String, 191 cloudinit_path: String, 192 image_name: String, 193 } 194 195 impl UbuntuDiskConfig { 196 pub fn new(image_name: String) -> Self { 197 UbuntuDiskConfig { 198 image_name, 199 osdisk_path: String::new(), 200 cloudinit_path: String::new(), 201 } 202 } 203 } 204 205 pub struct WindowsDiskConfig { 206 image_name: String, 207 osdisk_path: String, 208 loopback_device: String, 209 windows_snapshot_cow: String, 210 windows_snapshot: String, 211 } 212 213 impl WindowsDiskConfig { 214 pub fn new(image_name: String) -> Self { 215 WindowsDiskConfig { 216 image_name, 217 osdisk_path: String::new(), 218 loopback_device: String::new(), 219 windows_snapshot_cow: String::new(), 220 windows_snapshot: String::new(), 221 } 222 } 223 } 224 225 impl Drop for WindowsDiskConfig { 226 fn drop(&mut self) { 227 // dmsetup remove windows-snapshot-1 228 std::process::Command::new("dmsetup") 229 .arg("remove") 230 .arg(self.windows_snapshot.as_str()) 231 .output() 232 .expect("Expect removing Windows snapshot with 'dmsetup' to succeed"); 233 234 // dmsetup remove windows-snapshot-cow-1 235 std::process::Command::new("dmsetup") 236 .arg("remove") 237 .arg(self.windows_snapshot_cow.as_str()) 238 .output() 239 .expect("Expect removing Windows snapshot CoW with 'dmsetup' to succeed"); 240 241 // losetup -d <loopback_device> 242 std::process::Command::new("losetup") 243 .args(["-d", self.loopback_device.as_str()]) 244 .output() 245 .expect("Expect removing loopback device to succeed"); 246 } 247 } 248 249 impl DiskConfig for UbuntuDiskConfig { 250 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String { 251 let cloudinit_file_path = 252 String::from(tmp_dir.as_path().join("cloudinit").to_str().unwrap()); 253 254 let cloud_init_directory = tmp_dir.as_path().join("cloud-init").join("ubuntu"); 255 256 fs::create_dir_all(&cloud_init_directory) 257 .expect("Expect creating cloud-init directory to succeed"); 258 259 let source_file_dir = std::env::current_dir() 260 .unwrap() 261 .join("test_data") 262 .join("cloud-init") 263 .join("ubuntu") 264 .join("ci"); 265 266 ["meta-data"].iter().for_each(|x| { 267 rate_limited_copy(source_file_dir.join(x), cloud_init_directory.join(x)) 268 .expect("Expect copying cloud-init meta-data to succeed"); 269 }); 270 271 let mut user_data_string = String::new(); 272 fs::File::open(source_file_dir.join("user-data")) 273 .unwrap() 274 .read_to_string(&mut user_data_string) 275 .expect("Expected reading user-data file to succeed"); 276 user_data_string = user_data_string.replace( 277 "@DEFAULT_TCP_LISTENER_MESSAGE", 278 DEFAULT_TCP_LISTENER_MESSAGE, 279 ); 280 user_data_string = user_data_string.replace("@HOST_IP", &network.host_ip); 281 user_data_string = 282 user_data_string.replace("@TCP_LISTENER_PORT", &network.tcp_listener_port.to_string()); 283 284 fs::File::create(cloud_init_directory.join("user-data")) 285 .unwrap() 286 .write_all(user_data_string.as_bytes()) 287 .expect("Expected writing out user-data to succeed"); 288 289 let mut network_config_string = String::new(); 290 291 fs::File::open(source_file_dir.join("network-config")) 292 .unwrap() 293 .read_to_string(&mut network_config_string) 294 .expect("Expected reading network-config file to succeed"); 295 296 network_config_string = network_config_string.replace("192.168.2.1", &network.host_ip); 297 network_config_string = network_config_string.replace("192.168.2.2", &network.guest_ip); 298 network_config_string = network_config_string.replace("192.168.2.3", &network.l2_guest_ip1); 299 network_config_string = network_config_string.replace("192.168.2.4", &network.l2_guest_ip2); 300 network_config_string = network_config_string.replace("192.168.2.5", &network.l2_guest_ip3); 301 network_config_string = 302 network_config_string.replace("12:34:56:78:90:ab", &network.guest_mac); 303 network_config_string = 304 network_config_string.replace("de:ad:be:ef:12:34", &network.l2_guest_mac1); 305 network_config_string = 306 network_config_string.replace("de:ad:be:ef:34:56", &network.l2_guest_mac2); 307 network_config_string = 308 network_config_string.replace("de:ad:be:ef:56:78", &network.l2_guest_mac3); 309 310 fs::File::create(cloud_init_directory.join("network-config")) 311 .unwrap() 312 .write_all(network_config_string.as_bytes()) 313 .expect("Expected writing out network-config to succeed"); 314 315 std::process::Command::new("mkdosfs") 316 .args(["-n", "CIDATA"]) 317 .args(["-C", cloudinit_file_path.as_str()]) 318 .arg("8192") 319 .output() 320 .expect("Expect creating disk image to succeed"); 321 322 ["user-data", "meta-data", "network-config"] 323 .iter() 324 .for_each(|x| { 325 std::process::Command::new("mcopy") 326 .arg("-o") 327 .args(["-i", cloudinit_file_path.as_str()]) 328 .args(["-s", cloud_init_directory.join(x).to_str().unwrap(), "::"]) 329 .output() 330 .expect("Expect copying files to disk image to succeed"); 331 }); 332 333 cloudinit_file_path 334 } 335 336 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig) { 337 let mut workload_path = dirs::home_dir().unwrap(); 338 workload_path.push("workloads"); 339 340 let mut osdisk_base_path = workload_path; 341 osdisk_base_path.push(&self.image_name); 342 343 let osdisk_path = String::from(tmp_dir.as_path().join("osdisk.img").to_str().unwrap()); 344 let cloudinit_path = self.prepare_cloudinit(tmp_dir, network); 345 346 rate_limited_copy(osdisk_base_path, &osdisk_path) 347 .expect("copying of OS source disk image failed"); 348 349 self.cloudinit_path = cloudinit_path; 350 self.osdisk_path = osdisk_path; 351 } 352 353 fn disk(&self, disk_type: DiskType) -> Option<String> { 354 match disk_type { 355 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 356 DiskType::CloudInit => Some(self.cloudinit_path.clone()), 357 } 358 } 359 } 360 361 impl DiskConfig for WindowsDiskConfig { 362 fn prepare_cloudinit(&self, _tmp_dir: &TempDir, _network: &GuestNetworkConfig) -> String { 363 String::new() 364 } 365 366 fn prepare_files(&mut self, tmp_dir: &TempDir, _network: &GuestNetworkConfig) { 367 let mut workload_path = dirs::home_dir().unwrap(); 368 workload_path.push("workloads"); 369 370 let mut osdisk_path = workload_path; 371 osdisk_path.push(&self.image_name); 372 373 let osdisk_blk_size = fs::metadata(osdisk_path) 374 .expect("Expect retrieving Windows image metadata") 375 .len() 376 >> 9; 377 378 let snapshot_cow_path = 379 String::from(tmp_dir.as_path().join("snapshot_cow").to_str().unwrap()); 380 381 // Create and truncate CoW file for device mapper 382 let cow_file_size: u64 = 1 << 30; 383 let cow_file_blk_size = cow_file_size >> 9; 384 let cow_file = std::fs::File::create(snapshot_cow_path.as_str()) 385 .expect("Expect creating CoW image to succeed"); 386 cow_file 387 .set_len(cow_file_size) 388 .expect("Expect truncating CoW image to succeed"); 389 390 // losetup --find --show /tmp/snapshot_cow 391 let loopback_device = std::process::Command::new("losetup") 392 .arg("--find") 393 .arg("--show") 394 .arg(snapshot_cow_path.as_str()) 395 .output() 396 .expect("Expect creating loopback device from snapshot CoW image to succeed"); 397 398 self.loopback_device = String::from_utf8_lossy(&loopback_device.stdout) 399 .trim() 400 .to_string(); 401 402 let random_extension = tmp_dir.as_path().file_name().unwrap(); 403 let windows_snapshot_cow = format!( 404 "windows-snapshot-cow-{}", 405 random_extension.to_str().unwrap() 406 ); 407 408 // dmsetup create windows-snapshot-cow-1 --table '0 2097152 linear /dev/loop1 0' 409 std::process::Command::new("dmsetup") 410 .arg("create") 411 .arg(windows_snapshot_cow.as_str()) 412 .args([ 413 "--table", 414 format!("0 {} linear {} 0", cow_file_blk_size, self.loopback_device).as_str(), 415 ]) 416 .output() 417 .expect("Expect creating Windows snapshot CoW with 'dmsetup' to succeed"); 418 419 let windows_snapshot = format!("windows-snapshot-{}", random_extension.to_str().unwrap()); 420 421 // dmsetup mknodes 422 std::process::Command::new("dmsetup") 423 .arg("mknodes") 424 .output() 425 .expect("Expect device mapper nodes to be ready"); 426 427 // dmsetup create windows-snapshot-1 --table '0 41943040 snapshot /dev/mapper/windows-base /dev/mapper/windows-snapshot-cow-1 P 8' 428 std::process::Command::new("dmsetup") 429 .arg("create") 430 .arg(windows_snapshot.as_str()) 431 .args([ 432 "--table", 433 format!( 434 "0 {} snapshot /dev/mapper/windows-base /dev/mapper/{} P 8", 435 osdisk_blk_size, 436 windows_snapshot_cow.as_str() 437 ) 438 .as_str(), 439 ]) 440 .output() 441 .expect("Expect creating Windows snapshot with 'dmsetup' to succeed"); 442 443 // dmsetup mknodes 444 std::process::Command::new("dmsetup") 445 .arg("mknodes") 446 .output() 447 .expect("Expect device mapper nodes to be ready"); 448 449 self.osdisk_path = format!("/dev/mapper/{windows_snapshot}"); 450 self.windows_snapshot_cow = windows_snapshot_cow; 451 self.windows_snapshot = windows_snapshot; 452 } 453 454 fn disk(&self, disk_type: DiskType) -> Option<String> { 455 match disk_type { 456 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 457 DiskType::CloudInit => None, 458 } 459 } 460 } 461 462 pub fn rate_limited_copy<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<u64> { 463 for i in 0..10 { 464 let free_bytes = unsafe { 465 let mut stats = std::mem::MaybeUninit::zeroed(); 466 let fs_name = std::ffi::CString::new("/tmp").unwrap(); 467 libc::statvfs(fs_name.as_ptr(), stats.as_mut_ptr()); 468 469 let free_blocks = stats.assume_init().f_bfree; 470 let block_size = stats.assume_init().f_bsize; 471 472 free_blocks * block_size 473 }; 474 475 // Make sure there is at least 6 GiB of space 476 if free_bytes < 6 << 30 { 477 eprintln!("Not enough space on disk ({free_bytes}). Attempt {i} of 10. Sleeping."); 478 thread::sleep(std::time::Duration::new(60, 0)); 479 continue; 480 } 481 482 match fs::copy(&from, &to) { 483 Err(e) => { 484 if let Some(errno) = e.raw_os_error() { 485 if errno == libc::ENOSPC { 486 eprintln!("Copy returned ENOSPC. Attempt {i} of 10. Sleeping."); 487 thread::sleep(std::time::Duration::new(60, 0)); 488 continue; 489 } 490 } 491 return Err(e); 492 } 493 Ok(i) => return Ok(i), 494 } 495 } 496 Err(io::Error::last_os_error()) 497 } 498 499 pub fn handle_child_output( 500 r: Result<(), std::boxed::Box<dyn std::any::Any + std::marker::Send>>, 501 output: &std::process::Output, 502 ) { 503 use std::os::unix::process::ExitStatusExt; 504 if r.is_ok() && output.status.success() { 505 return; 506 } 507 508 match output.status.code() { 509 None => { 510 // Don't treat child.kill() as a problem 511 if output.status.signal() == Some(9) && r.is_ok() { 512 return; 513 } 514 515 eprintln!( 516 "==== child killed by signal: {} ====", 517 output.status.signal().unwrap() 518 ); 519 } 520 Some(code) => { 521 eprintln!("\n\n==== child exit code: {code} ===="); 522 } 523 } 524 525 eprintln!( 526 "\n\n==== Start child stdout ====\n\n{}\n\n==== End child stdout ====", 527 String::from_utf8_lossy(&output.stdout) 528 ); 529 eprintln!( 530 "\n\n==== Start child stderr ====\n\n{}\n\n==== End child stderr ====", 531 String::from_utf8_lossy(&output.stderr) 532 ); 533 534 panic!("Test failed") 535 } 536 537 #[derive(Debug)] 538 pub struct PasswordAuth { 539 pub username: String, 540 pub password: String, 541 } 542 543 pub const DEFAULT_SSH_RETRIES: u8 = 6; 544 pub const DEFAULT_SSH_TIMEOUT: u8 = 10; 545 546 #[derive(Error, Debug)] 547 pub enum SshCommandError { 548 #[error("ssh connection failed")] 549 Connection(#[source] std::io::Error), 550 #[error("ssh handshake failed")] 551 Handshake(#[source] ssh2::Error), 552 #[error("ssh authentication failed")] 553 Authentication(#[source] ssh2::Error), 554 #[error("ssh channel session failed")] 555 ChannelSession(#[source] ssh2::Error), 556 #[error("ssh command failed")] 557 Command(#[source] ssh2::Error), 558 #[error("retrieving exit status from ssh command failed")] 559 ExitStatus(#[source] ssh2::Error), 560 #[error("the exit code indicates failure: {0}")] 561 NonZeroExitStatus(i32), 562 #[error("failed to read file")] 563 FileRead(#[source] std::io::Error), 564 #[error("failed to read metadata")] 565 FileMetadata(#[source] std::io::Error), 566 #[error("scp send failed")] 567 ScpSend(#[source] ssh2::Error), 568 #[error("scp write failed")] 569 WriteAll(#[source] std::io::Error), 570 #[error("scp send EOF failed")] 571 SendEof(#[source] ssh2::Error), 572 #[error("scp wait EOF failed")] 573 WaitEof(#[source] ssh2::Error), 574 } 575 576 fn scp_to_guest_with_auth( 577 path: &Path, 578 remote_path: &Path, 579 auth: &PasswordAuth, 580 ip: &str, 581 retries: u8, 582 timeout: u8, 583 ) -> Result<(), SshCommandError> { 584 let mut counter = 0; 585 loop { 586 let closure = || -> Result<(), SshCommandError> { 587 let tcp = 588 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?; 589 let mut sess = Session::new().unwrap(); 590 sess.set_tcp_stream(tcp); 591 sess.handshake().map_err(SshCommandError::Handshake)?; 592 593 sess.userauth_password(&auth.username, &auth.password) 594 .map_err(SshCommandError::Authentication)?; 595 assert!(sess.authenticated()); 596 597 let content = fs::read(path).map_err(SshCommandError::FileRead)?; 598 let mode = fs::metadata(path) 599 .map_err(SshCommandError::FileMetadata)? 600 .permissions() 601 .mode() 602 & 0o777; 603 604 let mut channel = sess 605 .scp_send(remote_path, mode as i32, content.len() as u64, None) 606 .map_err(SshCommandError::ScpSend)?; 607 channel 608 .write_all(&content) 609 .map_err(SshCommandError::WriteAll)?; 610 channel.send_eof().map_err(SshCommandError::SendEof)?; 611 channel.wait_eof().map_err(SshCommandError::WaitEof)?; 612 613 // Intentionally ignore these results here as their failure 614 // does not precipitate a repeat 615 let _ = channel.close(); 616 let _ = channel.wait_close(); 617 618 Ok(()) 619 }; 620 621 match closure() { 622 Ok(_) => break, 623 Err(e) => { 624 counter += 1; 625 if counter >= retries { 626 eprintln!( 627 "\n\n==== Start scp command output (FAILED) ====\n\n\ 628 path =\"{path:?}\"\n\ 629 remote_path =\"{remote_path:?}\"\n\ 630 auth=\"{auth:#?}\"\n\ 631 ip=\"{ip}\"\n\ 632 error=\"{e:?}\"\n\ 633 \n==== End scp command outout ====\n\n" 634 ); 635 636 return Err(e); 637 } 638 } 639 }; 640 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 641 } 642 Ok(()) 643 } 644 645 pub fn scp_to_guest( 646 path: &Path, 647 remote_path: &Path, 648 ip: &str, 649 retries: u8, 650 timeout: u8, 651 ) -> Result<(), SshCommandError> { 652 scp_to_guest_with_auth( 653 path, 654 remote_path, 655 &PasswordAuth { 656 username: String::from("cloud"), 657 password: String::from("cloud123"), 658 }, 659 ip, 660 retries, 661 timeout, 662 ) 663 } 664 665 pub fn ssh_command_ip_with_auth( 666 command: &str, 667 auth: &PasswordAuth, 668 ip: &str, 669 retries: u8, 670 timeout: u8, 671 ) -> Result<String, SshCommandError> { 672 let mut s = String::new(); 673 674 let mut counter = 0; 675 loop { 676 let mut closure = || -> Result<(), SshCommandError> { 677 let tcp = 678 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?; 679 let mut sess = Session::new().unwrap(); 680 sess.set_tcp_stream(tcp); 681 sess.handshake().map_err(SshCommandError::Handshake)?; 682 683 sess.userauth_password(&auth.username, &auth.password) 684 .map_err(SshCommandError::Authentication)?; 685 assert!(sess.authenticated()); 686 687 let mut channel = sess 688 .channel_session() 689 .map_err(SshCommandError::ChannelSession)?; 690 channel.exec(command).map_err(SshCommandError::Command)?; 691 692 // Intentionally ignore these results here as their failure 693 // does not precipitate a repeat 694 let _ = channel.read_to_string(&mut s); 695 let _ = channel.close(); 696 let _ = channel.wait_close(); 697 698 let status = channel.exit_status().map_err(SshCommandError::ExitStatus)?; 699 700 if status != 0 { 701 Err(SshCommandError::NonZeroExitStatus(status)) 702 } else { 703 Ok(()) 704 } 705 }; 706 707 match closure() { 708 Ok(_) => break, 709 Err(e) => { 710 counter += 1; 711 if counter >= retries { 712 eprintln!( 713 "\n\n==== Start ssh command output (FAILED) ====\n\n\ 714 command=\"{command}\"\n\ 715 auth=\"{auth:#?}\"\n\ 716 ip=\"{ip}\"\n\ 717 output=\"{s}\"\n\ 718 error=\"{e:?}\"\n\ 719 \n==== End ssh command outout ====\n\n" 720 ); 721 722 return Err(e); 723 } 724 } 725 }; 726 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 727 } 728 Ok(s) 729 } 730 731 pub fn ssh_command_ip( 732 command: &str, 733 ip: &str, 734 retries: u8, 735 timeout: u8, 736 ) -> Result<String, SshCommandError> { 737 ssh_command_ip_with_auth( 738 command, 739 &PasswordAuth { 740 username: String::from("cloud"), 741 password: String::from("cloud123"), 742 }, 743 ip, 744 retries, 745 timeout, 746 ) 747 } 748 749 pub fn exec_host_command_with_retries(command: &str, retries: u32, interval: Duration) -> bool { 750 for _ in 0..retries { 751 let s = exec_host_command_output(command).status; 752 if !s.success() { 753 eprintln!("\n\n==== retrying in {interval:?} ===\n\n"); 754 thread::sleep(interval); 755 } else { 756 return true; 757 } 758 } 759 760 false 761 } 762 763 pub fn exec_host_command_status(command: &str) -> ExitStatus { 764 exec_host_command_output(command).status 765 } 766 767 pub fn exec_host_command_output(command: &str) -> Output { 768 let output = std::process::Command::new("bash") 769 .args(["-c", command]) 770 .output() 771 .unwrap_or_else(|e| panic!("Expected '{command}' to run. Error: {e:?}")); 772 773 if !output.status.success() { 774 let stdout = String::from_utf8_lossy(&output.stdout); 775 let stderr = String::from_utf8_lossy(&output.stderr); 776 eprintln!( 777 "\n\n==== Start 'exec_host_command' failed ==== \ 778 \n\n---stdout---\n{stdout}\n---stderr---{stderr} \ 779 \n\n==== End 'exec_host_command' failed ====", 780 ); 781 } 782 783 output 784 } 785 786 pub fn check_lines_count(input: &str, line_count: usize) -> bool { 787 if input.lines().count() == line_count { 788 true 789 } else { 790 eprintln!( 791 "\n\n==== Start 'check_lines_count' failed ==== \ 792 \n\ninput = {input}\nline_count = {line_count} \ 793 \n\n==== End 'check_lines_count' failed ====", 794 ); 795 796 false 797 } 798 } 799 800 pub fn check_matched_lines_count(input: &str, keywords: Vec<&str>, line_count: usize) -> bool { 801 let mut matches = String::new(); 802 for line in input.lines() { 803 if keywords.iter().all(|k| line.contains(k)) { 804 matches += line; 805 } 806 } 807 808 if matches.lines().count() == line_count { 809 true 810 } else { 811 eprintln!( 812 "\n\n==== Start 'check_matched_lines_count' failed ==== \ 813 \nkeywords = {keywords:?}, line_count = {line_count} \ 814 \n\ninput = {input} matches = {matches} \ 815 \n\n==== End 'check_matched_lines_count' failed ====", 816 ); 817 818 false 819 } 820 } 821 822 pub fn kill_child(child: &mut Child) { 823 let r = unsafe { libc::kill(child.id() as i32, libc::SIGTERM) }; 824 if r != 0 { 825 let e = io::Error::last_os_error(); 826 if e.raw_os_error().unwrap() == libc::ESRCH { 827 return; 828 } 829 eprintln!("Failed to kill child with SIGTERM: {e:?}"); 830 } 831 832 // The timeout period elapsed without the child exiting 833 if child.wait_timeout(Duration::new(10, 0)).unwrap().is_none() { 834 let _ = child.kill(); 835 let rust_flags = env::var("RUSTFLAGS").unwrap_or_default(); 836 if rust_flags.contains("-Cinstrument-coverage") { 837 panic!("Wait child timeout, please check the reason.") 838 } 839 } 840 } 841 842 pub const PIPE_SIZE: i32 = 32 << 20; 843 844 static NEXT_VM_ID: LazyLock<Mutex<u8>> = LazyLock::new(|| Mutex::new(1)); 845 846 pub struct Guest { 847 pub tmp_dir: TempDir, 848 pub disk_config: Box<dyn DiskConfig>, 849 pub network: GuestNetworkConfig, 850 } 851 852 // Safe to implement as we know we have no interior mutability 853 impl std::panic::RefUnwindSafe for Guest {} 854 855 impl Guest { 856 pub fn new_from_ip_range(mut disk_config: Box<dyn DiskConfig>, class: &str, id: u8) -> Self { 857 let tmp_dir = TempDir::new_with_prefix("/tmp/ch").unwrap(); 858 859 let network = GuestNetworkConfig { 860 guest_ip: format!("{class}.{id}.2"), 861 l2_guest_ip1: format!("{class}.{id}.3"), 862 l2_guest_ip2: format!("{class}.{id}.4"), 863 l2_guest_ip3: format!("{class}.{id}.5"), 864 host_ip: format!("{class}.{id}.1"), 865 guest_mac: format!("12:34:56:78:90:{id:02x}"), 866 l2_guest_mac1: format!("de:ad:be:ef:12:{id:02x}"), 867 l2_guest_mac2: format!("de:ad:be:ef:34:{id:02x}"), 868 l2_guest_mac3: format!("de:ad:be:ef:56:{id:02x}"), 869 tcp_listener_port: DEFAULT_TCP_LISTENER_PORT + id as u16, 870 }; 871 872 disk_config.prepare_files(&tmp_dir, &network); 873 874 Guest { 875 tmp_dir, 876 disk_config, 877 network, 878 } 879 } 880 881 pub fn new(disk_config: Box<dyn DiskConfig>) -> Self { 882 let mut guard = NEXT_VM_ID.lock().unwrap(); 883 let id = *guard; 884 *guard = id + 1; 885 886 Self::new_from_ip_range(disk_config, "192.168", id) 887 } 888 889 pub fn default_net_string(&self) -> String { 890 format!( 891 "tap=,mac={},ip={},mask=255.255.255.0", 892 self.network.guest_mac, self.network.host_ip 893 ) 894 } 895 896 pub fn default_net_string_w_iommu(&self) -> String { 897 format!( 898 "tap=,mac={},ip={},mask=255.255.255.0,iommu=on", 899 self.network.guest_mac, self.network.host_ip 900 ) 901 } 902 903 pub fn default_net_string_w_mtu(&self, mtu: u16) -> String { 904 format!( 905 "tap=,mac={},ip={},mask=255.255.255.0,mtu={}", 906 self.network.guest_mac, self.network.host_ip, mtu 907 ) 908 } 909 910 pub fn ssh_command(&self, command: &str) -> Result<String, SshCommandError> { 911 ssh_command_ip( 912 command, 913 &self.network.guest_ip, 914 DEFAULT_SSH_RETRIES, 915 DEFAULT_SSH_TIMEOUT, 916 ) 917 } 918 919 #[cfg(target_arch = "x86_64")] 920 pub fn ssh_command_l1(&self, command: &str) -> Result<String, SshCommandError> { 921 ssh_command_ip( 922 command, 923 &self.network.guest_ip, 924 DEFAULT_SSH_RETRIES, 925 DEFAULT_SSH_TIMEOUT, 926 ) 927 } 928 929 #[cfg(target_arch = "x86_64")] 930 pub fn ssh_command_l2_1(&self, command: &str) -> Result<String, SshCommandError> { 931 ssh_command_ip( 932 command, 933 &self.network.l2_guest_ip1, 934 DEFAULT_SSH_RETRIES, 935 DEFAULT_SSH_TIMEOUT, 936 ) 937 } 938 939 #[cfg(target_arch = "x86_64")] 940 pub fn ssh_command_l2_2(&self, command: &str) -> Result<String, SshCommandError> { 941 ssh_command_ip( 942 command, 943 &self.network.l2_guest_ip2, 944 DEFAULT_SSH_RETRIES, 945 DEFAULT_SSH_TIMEOUT, 946 ) 947 } 948 949 #[cfg(target_arch = "x86_64")] 950 pub fn ssh_command_l2_3(&self, command: &str) -> Result<String, SshCommandError> { 951 ssh_command_ip( 952 command, 953 &self.network.l2_guest_ip3, 954 DEFAULT_SSH_RETRIES, 955 DEFAULT_SSH_TIMEOUT, 956 ) 957 } 958 959 pub fn api_create_body(&self, cpu_count: u8, kernel_path: &str, kernel_cmd: &str) -> String { 960 format! {"{{\"cpus\":{{\"boot_vcpus\":{},\"max_vcpus\":{}}},\"payload\":{{\"kernel\":\"{}\",\"cmdline\": \"{}\"}},\"net\":[{{\"ip\":\"{}\", \"mask\":\"255.255.255.0\", \"mac\":\"{}\"}}], \"disks\":[{{\"path\":\"{}\"}}, {{\"path\":\"{}\"}}]}}", 961 cpu_count, 962 cpu_count, 963 kernel_path, 964 kernel_cmd, 965 self.network.host_ip, 966 self.network.guest_mac, 967 self.disk_config.disk(DiskType::OperatingSystem).unwrap().as_str(), 968 self.disk_config.disk(DiskType::CloudInit).unwrap().as_str(), 969 } 970 } 971 972 pub fn get_cpu_count(&self) -> Result<u32, Error> { 973 self.ssh_command("grep -c processor /proc/cpuinfo")? 974 .trim() 975 .parse() 976 .map_err(Error::Parsing) 977 } 978 979 pub fn get_total_memory(&self) -> Result<u32, Error> { 980 self.ssh_command("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 981 .trim() 982 .parse() 983 .map_err(Error::Parsing) 984 } 985 986 #[cfg(target_arch = "x86_64")] 987 pub fn get_total_memory_l2(&self) -> Result<u32, Error> { 988 self.ssh_command_l2_1("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 989 .trim() 990 .parse() 991 .map_err(Error::Parsing) 992 } 993 994 pub fn get_numa_node_memory(&self, node_id: usize) -> Result<u32, Error> { 995 self.ssh_command( 996 format!( 997 "grep MemTotal /sys/devices/system/node/node{node_id}/meminfo \ 998 | cut -d \":\" -f 2 | grep -o \"[0-9]*\"" 999 ) 1000 .as_str(), 1001 )? 1002 .trim() 1003 .parse() 1004 .map_err(Error::Parsing) 1005 } 1006 1007 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), Error> { 1008 self.network 1009 .wait_vm_boot(custom_timeout) 1010 .map_err(Error::WaitForBoot) 1011 } 1012 1013 pub fn check_numa_node_cpus(&self, node_id: usize, cpus: Vec<usize>) -> Result<(), Error> { 1014 for cpu in cpus.iter() { 1015 let cmd = format!("[ -d \"/sys/devices/system/node/node{node_id}/cpu{cpu}\" ]"); 1016 self.ssh_command(cmd.as_str())?; 1017 } 1018 1019 Ok(()) 1020 } 1021 1022 pub fn check_numa_node_distances( 1023 &self, 1024 node_id: usize, 1025 distances: &str, 1026 ) -> Result<bool, Error> { 1027 let cmd = format!("cat /sys/devices/system/node/node{node_id}/distance"); 1028 if self.ssh_command(cmd.as_str())?.trim() == distances { 1029 Ok(true) 1030 } else { 1031 Ok(false) 1032 } 1033 } 1034 1035 pub fn check_numa_common( 1036 &self, 1037 mem_ref: Option<&[u32]>, 1038 node_ref: Option<&[Vec<usize>]>, 1039 distance_ref: Option<&[&str]>, 1040 ) { 1041 if let Some(mem_ref) = mem_ref { 1042 // Check each NUMA node has been assigned the right amount of 1043 // memory. 1044 for (i, &m) in mem_ref.iter().enumerate() { 1045 assert!(self.get_numa_node_memory(i).unwrap_or_default() > m); 1046 } 1047 } 1048 1049 if let Some(node_ref) = node_ref { 1050 // Check each NUMA node has been assigned the right CPUs set. 1051 for (i, n) in node_ref.iter().enumerate() { 1052 self.check_numa_node_cpus(i, n.clone()).unwrap(); 1053 } 1054 } 1055 1056 if let Some(distance_ref) = distance_ref { 1057 // Check each NUMA node has been assigned the right distances. 1058 for (i, &d) in distance_ref.iter().enumerate() { 1059 assert!(self.check_numa_node_distances(i, d).unwrap()); 1060 } 1061 } 1062 } 1063 1064 #[cfg(target_arch = "x86_64")] 1065 pub fn check_sgx_support(&self) -> Result<(), Error> { 1066 self.ssh_command( 1067 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX: \ 1068 Software Guard Extensions supported = true'", 1069 )?; 1070 self.ssh_command( 1071 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX_LC: \ 1072 SGX launch config supported = true'", 1073 )?; 1074 self.ssh_command( 1075 "cpuid -l 0x12 -s 0 | tr -s [:space:] | grep -q 'SGX1 \ 1076 supported = true'", 1077 )?; 1078 1079 Ok(()) 1080 } 1081 1082 pub fn get_pci_bridge_class(&self) -> Result<String, Error> { 1083 Ok(self 1084 .ssh_command("cat /sys/bus/pci/devices/0000:00:00.0/class")? 1085 .trim() 1086 .to_string()) 1087 } 1088 1089 pub fn get_pci_device_ids(&self) -> Result<String, Error> { 1090 Ok(self 1091 .ssh_command("cat /sys/bus/pci/devices/*/device")? 1092 .trim() 1093 .to_string()) 1094 } 1095 1096 pub fn get_pci_vendor_ids(&self) -> Result<String, Error> { 1097 Ok(self 1098 .ssh_command("cat /sys/bus/pci/devices/*/vendor")? 1099 .trim() 1100 .to_string()) 1101 } 1102 1103 pub fn does_device_vendor_pair_match( 1104 &self, 1105 device_id: &str, 1106 vendor_id: &str, 1107 ) -> Result<bool, Error> { 1108 // We are checking if console device's device id and vendor id pair matches 1109 let devices = self.get_pci_device_ids()?; 1110 let devices: Vec<&str> = devices.split('\n').collect(); 1111 let vendors = self.get_pci_vendor_ids()?; 1112 let vendors: Vec<&str> = vendors.split('\n').collect(); 1113 1114 for (index, d_id) in devices.iter().enumerate() { 1115 if *d_id == device_id { 1116 if let Some(v_id) = vendors.get(index) { 1117 if *v_id == vendor_id { 1118 return Ok(true); 1119 } 1120 } 1121 } 1122 } 1123 1124 Ok(false) 1125 } 1126 1127 pub fn check_vsock(&self, socket: &str) { 1128 // Listen from guest on vsock CID=3 PORT=16 1129 // SOCKET-LISTEN:<domain>:<protocol>:<local-address> 1130 let guest_ip = self.network.guest_ip.clone(); 1131 let listen_socat = thread::spawn(move || { 1132 ssh_command_ip("sudo socat - SOCKET-LISTEN:40:0:x00x00x10x00x00x00x03x00x00x00x00x00x00x00 > vsock_log", &guest_ip, DEFAULT_SSH_RETRIES, DEFAULT_SSH_TIMEOUT).unwrap(); 1133 }); 1134 1135 // Make sure socat is listening, which might take a few second on slow systems 1136 thread::sleep(std::time::Duration::new(10, 0)); 1137 1138 // Write something to vsock from the host 1139 assert!(exec_host_command_status(&format!( 1140 "echo -e \"CONNECT 16\\nHelloWorld!\" | socat - UNIX-CONNECT:{socket}" 1141 )) 1142 .success()); 1143 1144 // Wait for the thread to terminate. 1145 listen_socat.join().unwrap(); 1146 1147 assert_eq!( 1148 self.ssh_command("cat vsock_log").unwrap().trim(), 1149 "HelloWorld!" 1150 ); 1151 } 1152 1153 #[cfg(target_arch = "x86_64")] 1154 pub fn check_nvidia_gpu(&self) { 1155 assert!(self 1156 .ssh_command("nvidia-smi") 1157 .unwrap() 1158 .contains("NVIDIA L40S")); 1159 } 1160 1161 pub fn reboot_linux(&self, current_reboot_count: u32, custom_timeout: Option<i32>) { 1162 let list_boots_cmd = "sudo last | grep -c reboot"; 1163 let boot_count = self 1164 .ssh_command(list_boots_cmd) 1165 .unwrap() 1166 .trim() 1167 .parse::<u32>() 1168 .unwrap_or_default(); 1169 1170 assert_eq!(boot_count, current_reboot_count + 1); 1171 self.ssh_command("sudo reboot").unwrap(); 1172 1173 self.wait_vm_boot(custom_timeout).unwrap(); 1174 let boot_count = self 1175 .ssh_command(list_boots_cmd) 1176 .unwrap() 1177 .trim() 1178 .parse::<u32>() 1179 .unwrap_or_default(); 1180 assert_eq!(boot_count, current_reboot_count + 2); 1181 } 1182 1183 pub fn enable_memory_hotplug(&self) { 1184 self.ssh_command("echo online | sudo tee /sys/devices/system/memory/auto_online_blocks") 1185 .unwrap(); 1186 } 1187 1188 pub fn check_devices_common( 1189 &self, 1190 socket: Option<&String>, 1191 console_text: Option<&String>, 1192 pmem_path: Option<&String>, 1193 ) { 1194 // Check block devices are readable 1195 self.ssh_command("sudo dd if=/dev/vda of=/dev/null bs=1M iflag=direct count=1024") 1196 .unwrap(); 1197 self.ssh_command("sudo dd if=/dev/vdb of=/dev/null bs=1M iflag=direct count=8") 1198 .unwrap(); 1199 // Check if the rng device is readable 1200 self.ssh_command("sudo head -c 1000 /dev/hwrng > /dev/null") 1201 .unwrap(); 1202 // Check vsock 1203 if let Some(socket) = socket { 1204 self.check_vsock(socket.as_str()); 1205 } 1206 // Check if the console is usable 1207 if let Some(console_text) = console_text { 1208 let console_cmd = format!("echo {console_text} | sudo tee /dev/hvc0"); 1209 self.ssh_command(&console_cmd).unwrap(); 1210 } 1211 // The net device is 'automatically' exercised through the above 'ssh' commands 1212 1213 // Check if the pmem device is usable 1214 if let Some(pmem_path) = pmem_path { 1215 assert_eq!( 1216 self.ssh_command(&format!("ls {pmem_path}")).unwrap().trim(), 1217 pmem_path 1218 ); 1219 assert_eq!( 1220 self.ssh_command(&format!("sudo mount {pmem_path} /mnt")) 1221 .unwrap(), 1222 "" 1223 ); 1224 assert_eq!(self.ssh_command("ls /mnt").unwrap(), "lost+found\n"); 1225 self.ssh_command("echo test123 | sudo tee /mnt/test") 1226 .unwrap(); 1227 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1228 assert_eq!(self.ssh_command("ls /mnt").unwrap(), ""); 1229 1230 assert_eq!( 1231 self.ssh_command(&format!("sudo mount {pmem_path} /mnt")) 1232 .unwrap(), 1233 "" 1234 ); 1235 assert_eq!( 1236 self.ssh_command("sudo cat /mnt/test || true") 1237 .unwrap() 1238 .trim(), 1239 "test123" 1240 ); 1241 self.ssh_command("sudo rm /mnt/test").unwrap(); 1242 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1243 } 1244 } 1245 } 1246 1247 pub enum VerbosityLevel { 1248 Warn, 1249 Info, 1250 Debug, 1251 } 1252 1253 impl Default for VerbosityLevel { 1254 fn default() -> Self { 1255 Self::Warn 1256 } 1257 } 1258 1259 impl Display for VerbosityLevel { 1260 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1261 use VerbosityLevel::*; 1262 match self { 1263 Warn => (), 1264 Info => write!(f, "-v")?, 1265 Debug => write!(f, "-vv")?, 1266 } 1267 Ok(()) 1268 } 1269 } 1270 1271 pub struct GuestCommand<'a> { 1272 command: Command, 1273 guest: &'a Guest, 1274 capture_output: bool, 1275 print_cmd: bool, 1276 verbosity: VerbosityLevel, 1277 } 1278 1279 impl<'a> GuestCommand<'a> { 1280 pub fn new(guest: &'a Guest) -> Self { 1281 Self::new_with_binary_path(guest, &clh_command("cloud-hypervisor")) 1282 } 1283 1284 pub fn new_with_binary_path(guest: &'a Guest, binary_path: &str) -> Self { 1285 Self { 1286 command: Command::new(binary_path), 1287 guest, 1288 capture_output: false, 1289 print_cmd: true, 1290 verbosity: VerbosityLevel::Info, 1291 } 1292 } 1293 1294 pub fn verbosity(&mut self, verbosity: VerbosityLevel) -> &mut Self { 1295 self.verbosity = verbosity; 1296 self 1297 } 1298 1299 pub fn capture_output(&mut self) -> &mut Self { 1300 self.capture_output = true; 1301 self 1302 } 1303 1304 pub fn set_print_cmd(&mut self, print_cmd: bool) -> &mut Self { 1305 self.print_cmd = print_cmd; 1306 self 1307 } 1308 1309 pub fn spawn(&mut self) -> io::Result<Child> { 1310 use VerbosityLevel::*; 1311 match &self.verbosity { 1312 Warn => {} 1313 Info => { 1314 self.command.arg("-v"); 1315 } 1316 Debug => { 1317 self.command.args(["-vv"]); 1318 } 1319 }; 1320 1321 if self.print_cmd { 1322 println!( 1323 "\n\n==== Start cloud-hypervisor command-line ====\n\n\ 1324 {:?}\n\ 1325 \n==== End cloud-hypervisor command-line ====\n\n", 1326 self.command 1327 ); 1328 } 1329 1330 if self.capture_output { 1331 // The caller should call .wait() on the returned child 1332 #[allow(unknown_lints)] 1333 #[allow(clippy::zombie_processes)] 1334 let child = self 1335 .command 1336 .stderr(Stdio::piped()) 1337 .stdout(Stdio::piped()) 1338 .spawn() 1339 .unwrap(); 1340 1341 let fd = child.stdout.as_ref().unwrap().as_raw_fd(); 1342 let pipesize = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1343 if pipesize == -1 { 1344 return Err(io::Error::last_os_error()); 1345 } 1346 let fd = child.stderr.as_ref().unwrap().as_raw_fd(); 1347 let pipesize1 = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1348 if pipesize1 == -1 { 1349 return Err(io::Error::last_os_error()); 1350 } 1351 1352 if pipesize >= PIPE_SIZE && pipesize1 >= PIPE_SIZE { 1353 Ok(child) 1354 } else { 1355 Err(std::io::Error::other( 1356 format!( 1357 "resizing pipe w/ 'fnctl' failed: stdout pipesize {pipesize}, stderr pipesize {pipesize1}" 1358 ), 1359 )) 1360 } 1361 } else { 1362 // The caller should call .wait() on the returned child 1363 #[allow(unknown_lints)] 1364 #[allow(clippy::zombie_processes)] 1365 self.command.spawn() 1366 } 1367 } 1368 1369 pub fn args<I, S>(&mut self, args: I) -> &mut Self 1370 where 1371 I: IntoIterator<Item = S>, 1372 S: AsRef<OsStr>, 1373 { 1374 self.command.args(args); 1375 self 1376 } 1377 1378 pub fn default_disks(&mut self) -> &mut Self { 1379 if self.guest.disk_config.disk(DiskType::CloudInit).is_some() { 1380 self.args([ 1381 "--disk", 1382 format!( 1383 "path={}", 1384 self.guest 1385 .disk_config 1386 .disk(DiskType::OperatingSystem) 1387 .unwrap() 1388 ) 1389 .as_str(), 1390 format!( 1391 "path={}", 1392 self.guest.disk_config.disk(DiskType::CloudInit).unwrap() 1393 ) 1394 .as_str(), 1395 ]) 1396 } else { 1397 self.args([ 1398 "--disk", 1399 format!( 1400 "path={}", 1401 self.guest 1402 .disk_config 1403 .disk(DiskType::OperatingSystem) 1404 .unwrap() 1405 ) 1406 .as_str(), 1407 ]) 1408 } 1409 } 1410 1411 pub fn default_net(&mut self) -> &mut Self { 1412 self.args(["--net", self.guest.default_net_string().as_str()]) 1413 } 1414 } 1415 1416 pub fn clh_command(cmd: &str) -> String { 1417 env::var("BUILD_TARGET").map_or( 1418 format!("target/x86_64-unknown-linux-gnu/release/{cmd}"), 1419 |target| format!("target/{target}/release/{cmd}"), 1420 ) 1421 } 1422 1423 pub fn parse_iperf3_output(output: &[u8], sender: bool, bandwidth: bool) -> Result<f64, Error> { 1424 std::panic::catch_unwind(|| { 1425 let s = String::from_utf8_lossy(output); 1426 let v: Value = serde_json::from_str(&s).expect("'iperf3' parse error: invalid json output"); 1427 1428 if bandwidth { 1429 if sender { 1430 v["end"]["sum_sent"]["bits_per_second"] 1431 .as_f64() 1432 .expect("'iperf3' parse error: missing entry 'end.sum_sent.bits_per_second'") 1433 } else { 1434 v["end"]["sum_received"]["bits_per_second"].as_f64().expect( 1435 "'iperf3' parse error: missing entry 'end.sum_received.bits_per_second'", 1436 ) 1437 } 1438 } else { 1439 // iperf does not distinguish sent vs received in this case. 1440 1441 let lost_packets = v["end"]["sum"]["lost_packets"] 1442 .as_f64() 1443 .expect("'iperf3' parse error: missing entry 'end.sum.lost_packets'"); 1444 let packets = v["end"]["sum"]["packets"] 1445 .as_f64() 1446 .expect("'iperf3' parse error: missing entry 'end.sum.packets'"); 1447 let seconds = v["end"]["sum"]["seconds"] 1448 .as_f64() 1449 .expect("'iperf3' parse error: missing entry 'end.sum.seconds'"); 1450 1451 (packets - lost_packets) / seconds 1452 } 1453 }) 1454 .map_err(|_| { 1455 eprintln!( 1456 "==== Start iperf3 output ===\n\n{}\n\n=== End iperf3 output ===\n\n", 1457 String::from_utf8_lossy(output) 1458 ); 1459 Error::Iperf3Parse 1460 }) 1461 } 1462 1463 #[derive(Clone)] 1464 pub enum FioOps { 1465 Read, 1466 RandomRead, 1467 Write, 1468 RandomWrite, 1469 ReadWrite, 1470 RandRW, 1471 } 1472 1473 impl fmt::Display for FioOps { 1474 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1475 match self { 1476 FioOps::Read => write!(f, "read"), 1477 FioOps::RandomRead => write!(f, "randread"), 1478 FioOps::Write => write!(f, "write"), 1479 FioOps::RandomWrite => write!(f, "randwrite"), 1480 FioOps::ReadWrite => write!(f, "rw"), 1481 FioOps::RandRW => write!(f, "randrw"), 1482 } 1483 } 1484 } 1485 1486 pub fn parse_fio_output(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1487 std::panic::catch_unwind(|| { 1488 let v: Value = 1489 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1490 let jobs = v["jobs"] 1491 .as_array() 1492 .expect("'fio' parse error: missing entry 'jobs'"); 1493 assert_eq!( 1494 jobs.len(), 1495 num_jobs as usize, 1496 "'fio' parse error: Unexpected number of 'fio' jobs." 1497 ); 1498 1499 let (read, write) = match fio_ops { 1500 FioOps::Read | FioOps::RandomRead => (true, false), 1501 FioOps::Write | FioOps::RandomWrite => (false, true), 1502 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1503 }; 1504 1505 let mut total_bps = 0_f64; 1506 for j in jobs { 1507 if read { 1508 let bytes = j["read"]["io_bytes"] 1509 .as_u64() 1510 .expect("'fio' parse error: missing entry 'read.io_bytes'"); 1511 let runtime = j["read"]["runtime"] 1512 .as_u64() 1513 .expect("'fio' parse error: missing entry 'read.runtime'") 1514 as f64 1515 / 1000_f64; 1516 total_bps += bytes as f64 / runtime; 1517 } 1518 if write { 1519 let bytes = j["write"]["io_bytes"] 1520 .as_u64() 1521 .expect("'fio' parse error: missing entry 'write.io_bytes'"); 1522 let runtime = j["write"]["runtime"] 1523 .as_u64() 1524 .expect("'fio' parse error: missing entry 'write.runtime'") 1525 as f64 1526 / 1000_f64; 1527 total_bps += bytes as f64 / runtime; 1528 } 1529 } 1530 1531 total_bps 1532 }) 1533 .map_err(|_| { 1534 eprintln!("=== Start Fio output ===\n\n{output}\n\n=== End Fio output ===\n\n"); 1535 Error::FioOutputParse 1536 }) 1537 } 1538 1539 pub fn parse_fio_output_iops(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1540 std::panic::catch_unwind(|| { 1541 let v: Value = 1542 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1543 let jobs = v["jobs"] 1544 .as_array() 1545 .expect("'fio' parse error: missing entry 'jobs'"); 1546 assert_eq!( 1547 jobs.len(), 1548 num_jobs as usize, 1549 "'fio' parse error: Unexpected number of 'fio' jobs." 1550 ); 1551 1552 let (read, write) = match fio_ops { 1553 FioOps::Read | FioOps::RandomRead => (true, false), 1554 FioOps::Write | FioOps::RandomWrite => (false, true), 1555 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1556 }; 1557 1558 let mut total_iops = 0_f64; 1559 for j in jobs { 1560 if read { 1561 let ios = j["read"]["total_ios"] 1562 .as_u64() 1563 .expect("'fio' parse error: missing entry 'read.total_ios'"); 1564 let runtime = j["read"]["runtime"] 1565 .as_u64() 1566 .expect("'fio' parse error: missing entry 'read.runtime'") 1567 as f64 1568 / 1000_f64; 1569 total_iops += ios as f64 / runtime; 1570 } 1571 if write { 1572 let ios = j["write"]["total_ios"] 1573 .as_u64() 1574 .expect("'fio' parse error: missing entry 'write.total_ios'"); 1575 let runtime = j["write"]["runtime"] 1576 .as_u64() 1577 .expect("'fio' parse error: missing entry 'write.runtime'") 1578 as f64 1579 / 1000_f64; 1580 total_iops += ios as f64 / runtime; 1581 } 1582 } 1583 1584 total_iops 1585 }) 1586 .map_err(|_| { 1587 eprintln!("=== Start Fio output ===\n\n{output}\n\n=== End Fio output ===\n\n"); 1588 Error::FioOutputParse 1589 }) 1590 } 1591 1592 // Wait the child process for a given timeout 1593 fn child_wait_timeout(child: &mut Child, timeout: u64) -> Result<(), WaitTimeoutError> { 1594 match child.wait_timeout(Duration::from_secs(timeout)) { 1595 Err(e) => { 1596 return Err(WaitTimeoutError::General(e)); 1597 } 1598 Ok(s) => match s { 1599 None => { 1600 return Err(WaitTimeoutError::Timedout); 1601 } 1602 Some(s) => { 1603 if !s.success() { 1604 return Err(WaitTimeoutError::ExitStatus); 1605 } 1606 } 1607 }, 1608 } 1609 1610 Ok(()) 1611 } 1612 1613 pub fn measure_virtio_net_throughput( 1614 test_timeout: u32, 1615 queue_pairs: u32, 1616 guest: &Guest, 1617 receive: bool, 1618 bandwidth: bool, 1619 ) -> Result<f64, Error> { 1620 let default_port = 5201; 1621 1622 // 1. start the iperf3 server on the guest 1623 for n in 0..queue_pairs { 1624 guest.ssh_command(&format!("iperf3 -s -p {} -D", default_port + n))?; 1625 } 1626 1627 thread::sleep(Duration::new(1, 0)); 1628 1629 // 2. start the iperf3 client on host to measure RX through-put 1630 let mut clients = Vec::new(); 1631 for n in 0..queue_pairs { 1632 let mut cmd = Command::new("iperf3"); 1633 cmd.args([ 1634 "-J", // Output in JSON format 1635 "-c", 1636 &guest.network.guest_ip, 1637 "-p", 1638 &format!("{}", default_port + n), 1639 "-t", 1640 &format!("{test_timeout}"), 1641 "-i", 1642 "0", 1643 ]); 1644 // For measuring the guest transmit throughput (as a sender), 1645 // use reverse mode of the iperf3 client on the host 1646 if !receive { 1647 cmd.args(["-R"]); 1648 } 1649 // Use UDP stream to measure packets per second. The bitrate is set to 1650 // 1T to make sure it saturates the link. 1651 if !bandwidth { 1652 cmd.args(["-u", "-b", "1T"]); 1653 } 1654 let client = cmd 1655 .stderr(Stdio::piped()) 1656 .stdout(Stdio::piped()) 1657 .spawn() 1658 .map_err(Error::Spawn)?; 1659 1660 clients.push(client); 1661 } 1662 1663 let mut err: Option<Error> = None; 1664 let mut results = Vec::new(); 1665 let mut failed = false; 1666 for c in clients { 1667 let mut c = c; 1668 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5) { 1669 err = Some(Error::WaitTimeout(e)); 1670 failed = true; 1671 } 1672 1673 if !failed { 1674 // Safe to unwrap as we know the child has terminated successfully 1675 let output = c.wait_with_output().unwrap(); 1676 results.push(parse_iperf3_output(&output.stdout, receive, bandwidth)?); 1677 } else { 1678 let _ = c.kill(); 1679 let output = c.wait_with_output().unwrap(); 1680 println!( 1681 "=============== Client output [Error] ===============\n\n{}\n\n===========end============\n\n", 1682 String::from_utf8_lossy(&output.stdout) 1683 ); 1684 } 1685 } 1686 1687 if let Some(e) = err { 1688 Err(e) 1689 } else { 1690 Ok(results.iter().sum()) 1691 } 1692 } 1693 1694 pub fn parse_ethr_latency_output(output: &[u8]) -> Result<Vec<f64>, Error> { 1695 std::panic::catch_unwind(|| { 1696 let s = String::from_utf8_lossy(output); 1697 let mut latency = Vec::new(); 1698 for l in s.lines() { 1699 let v: Value = serde_json::from_str(l).expect("'ethr' parse error: invalid json line"); 1700 // Skip header/summary lines 1701 if let Some(avg) = v["Avg"].as_str() { 1702 // Assume the latency unit is always "us" 1703 latency.push( 1704 avg.split("us").collect::<Vec<&str>>()[0] 1705 .parse::<f64>() 1706 .expect("'ethr' parse error: invalid 'Avg' entry"), 1707 ); 1708 } 1709 } 1710 1711 assert!( 1712 !latency.is_empty(), 1713 "'ethr' parse error: no valid latency data found" 1714 ); 1715 1716 latency 1717 }) 1718 .map_err(|_| { 1719 eprintln!( 1720 "=== Start ethr output ===\n\n{}\n\n=== End ethr output ===\n\n", 1721 String::from_utf8_lossy(output) 1722 ); 1723 Error::EthrLogParse 1724 }) 1725 } 1726 1727 pub fn measure_virtio_net_latency(guest: &Guest, test_timeout: u32) -> Result<Vec<f64>, Error> { 1728 // copy the 'ethr' tool to the guest image 1729 let ethr_path = "/usr/local/bin/ethr"; 1730 let ethr_remote_path = "/tmp/ethr"; 1731 scp_to_guest( 1732 Path::new(ethr_path), 1733 Path::new(ethr_remote_path), 1734 &guest.network.guest_ip, 1735 //DEFAULT_SSH_RETRIES, 1736 1, 1737 DEFAULT_SSH_TIMEOUT, 1738 )?; 1739 1740 // Start the ethr server on the guest 1741 guest.ssh_command(&format!("{ethr_remote_path} -s &> /dev/null &"))?; 1742 1743 thread::sleep(Duration::new(10, 0)); 1744 1745 // Start the ethr client on the host 1746 let log_file = guest 1747 .tmp_dir 1748 .as_path() 1749 .join("ethr.client.log") 1750 .to_str() 1751 .unwrap() 1752 .to_string(); 1753 let mut c = Command::new(ethr_path) 1754 .args([ 1755 "-c", 1756 &guest.network.guest_ip, 1757 "-t", 1758 "l", 1759 "-o", 1760 &log_file, // file output is JSON format 1761 "-d", 1762 &format!("{test_timeout}s"), 1763 ]) 1764 .stderr(Stdio::piped()) 1765 .stdout(Stdio::piped()) 1766 .spawn() 1767 .map_err(Error::Spawn)?; 1768 1769 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5).map_err(Error::WaitTimeout) 1770 { 1771 let _ = c.kill(); 1772 return Err(e); 1773 } 1774 1775 // Parse the ethr latency test output 1776 let content = fs::read(log_file).map_err(Error::EthrLogFile)?; 1777 parse_ethr_latency_output(&content) 1778 } 1779