1 // Copyright © 2021 Intel Corporation 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 6 #![allow(clippy::undocumented_unsafe_blocks)] 7 8 use std::ffi::OsStr; 9 use std::fmt::Display; 10 use std::io::{Read, Write}; 11 use std::net::{TcpListener, TcpStream}; 12 use std::os::unix::fs::PermissionsExt; 13 use std::os::unix::io::{AsRawFd, FromRawFd}; 14 use std::path::Path; 15 use std::process::{Child, Command, ExitStatus, Output, Stdio}; 16 use std::str::FromStr; 17 use std::sync::Mutex; 18 use std::time::Duration; 19 use std::{env, fmt, fs, io, thread}; 20 21 use once_cell::sync::Lazy; 22 use serde_json::Value; 23 use ssh2::Session; 24 use thiserror::Error; 25 use vmm_sys_util::tempdir::TempDir; 26 use wait_timeout::ChildExt; 27 28 #[derive(Error, Debug)] 29 pub enum WaitTimeoutError { 30 #[error("timeout")] 31 Timedout, 32 #[error("exit status indicates failure")] 33 ExitStatus, 34 #[error("general failure")] 35 General(#[source] std::io::Error), 36 } 37 38 #[derive(Error, Debug)] 39 pub enum Error { 40 #[error("Failed to parse")] 41 Parsing(#[source] std::num::ParseIntError), 42 #[error("ssh command failed")] 43 SshCommand(#[from] SshCommandError), 44 #[error("waiting for boot failed")] 45 WaitForBoot(#[source] WaitForBootError), 46 #[error("reading log file failed")] 47 EthrLogFile(#[source] std::io::Error), 48 #[error("parsing log file failed")] 49 EthrLogParse, 50 #[error("parsing fio output failed")] 51 FioOutputParse, 52 #[error("parsing iperf3 output failed")] 53 Iperf3Parse, 54 #[error("spawning process failed")] 55 Spawn(#[source] std::io::Error), 56 #[error("waiting for timeout failed")] 57 WaitTimeout(#[source] WaitTimeoutError), 58 } 59 60 pub struct GuestNetworkConfig { 61 pub guest_ip: String, 62 pub l2_guest_ip1: String, 63 pub l2_guest_ip2: String, 64 pub l2_guest_ip3: String, 65 pub host_ip: String, 66 pub guest_mac: String, 67 pub l2_guest_mac1: String, 68 pub l2_guest_mac2: String, 69 pub l2_guest_mac3: String, 70 pub tcp_listener_port: u16, 71 } 72 73 pub const DEFAULT_TCP_LISTENER_MESSAGE: &str = "booted"; 74 pub const DEFAULT_TCP_LISTENER_PORT: u16 = 8000; 75 pub const DEFAULT_TCP_LISTENER_TIMEOUT: i32 = 120; 76 77 #[derive(Error, Debug)] 78 pub enum WaitForBootError { 79 #[error("Failed to wait for epoll")] 80 EpollWait(#[source] std::io::Error), 81 #[error("Failed to listen for boot")] 82 Listen(#[source] std::io::Error), 83 #[error("Epoll wait timeout")] 84 EpollWaitTimeout, 85 #[error("wrong guest address")] 86 WrongGuestAddr, 87 #[error("Failed to accept a TCP request")] 88 Accept(#[source] std::io::Error), 89 } 90 91 impl GuestNetworkConfig { 92 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), WaitForBootError> { 93 let start = std::time::Instant::now(); 94 // The 'port' is unique per 'GUEST' and listening to wild-card ip avoids retrying on 'TcpListener::bind()' 95 let listen_addr = format!("0.0.0.0:{}", self.tcp_listener_port); 96 let expected_guest_addr = self.guest_ip.as_str(); 97 let mut s = String::new(); 98 let timeout = match custom_timeout { 99 Some(t) => t, 100 None => DEFAULT_TCP_LISTENER_TIMEOUT, 101 }; 102 103 let mut closure = || -> Result<(), WaitForBootError> { 104 let listener = 105 TcpListener::bind(listen_addr.as_str()).map_err(WaitForBootError::Listen)?; 106 listener 107 .set_nonblocking(true) 108 .expect("Cannot set non-blocking for tcp listener"); 109 110 // Reply on epoll w/ timeout to wait for guest connections faithfully 111 let epoll_fd = epoll::create(true).expect("Cannot create epoll fd"); 112 // Use 'File' to enforce closing on 'epoll_fd' 113 let _epoll_file = unsafe { fs::File::from_raw_fd(epoll_fd) }; 114 epoll::ctl( 115 epoll_fd, 116 epoll::ControlOptions::EPOLL_CTL_ADD, 117 listener.as_raw_fd(), 118 epoll::Event::new(epoll::Events::EPOLLIN, 0), 119 ) 120 .expect("Cannot add 'tcp_listener' event to epoll"); 121 let mut events = [epoll::Event::new(epoll::Events::empty(), 0); 1]; 122 loop { 123 let num_events = match epoll::wait(epoll_fd, timeout * 1000_i32, &mut events[..]) { 124 Ok(num_events) => Ok(num_events), 125 Err(e) => match e.raw_os_error() { 126 Some(libc::EAGAIN) | Some(libc::EINTR) => continue, 127 _ => Err(e), 128 }, 129 } 130 .map_err(WaitForBootError::EpollWait)?; 131 if num_events == 0 { 132 return Err(WaitForBootError::EpollWaitTimeout); 133 } 134 break; 135 } 136 137 match listener.accept() { 138 Ok((_, addr)) => { 139 // Make sure the connection is from the expected 'guest_addr' 140 if addr.ip() != std::net::IpAddr::from_str(expected_guest_addr).unwrap() { 141 s = format!( 142 "Expecting the guest ip '{}' while being connected with ip '{}'", 143 expected_guest_addr, 144 addr.ip() 145 ); 146 return Err(WaitForBootError::WrongGuestAddr); 147 } 148 149 Ok(()) 150 } 151 Err(e) => { 152 s = "TcpListener::accept() failed".to_string(); 153 Err(WaitForBootError::Accept(e)) 154 } 155 } 156 }; 157 158 match closure() { 159 Err(e) => { 160 let duration = start.elapsed(); 161 eprintln!( 162 "\n\n==== Start 'wait_vm_boot' (FAILED) ==== \ 163 \n\nduration =\"{duration:?}, timeout = {timeout}s\" \ 164 \nlisten_addr=\"{listen_addr}\" \ 165 \nexpected_guest_addr=\"{expected_guest_addr}\" \ 166 \nmessage=\"{s}\" \ 167 \nerror=\"{e:?}\" \ 168 \n\n==== End 'wait_vm_boot' outout ====\n\n" 169 ); 170 171 Err(e) 172 } 173 Ok(_) => Ok(()), 174 } 175 } 176 } 177 178 pub enum DiskType { 179 OperatingSystem, 180 CloudInit, 181 } 182 183 pub trait DiskConfig { 184 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig); 185 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String; 186 fn disk(&self, disk_type: DiskType) -> Option<String>; 187 } 188 189 #[derive(Clone)] 190 pub struct UbuntuDiskConfig { 191 osdisk_path: String, 192 cloudinit_path: String, 193 image_name: String, 194 } 195 196 impl UbuntuDiskConfig { 197 pub fn new(image_name: String) -> Self { 198 UbuntuDiskConfig { 199 image_name, 200 osdisk_path: String::new(), 201 cloudinit_path: String::new(), 202 } 203 } 204 } 205 206 pub struct WindowsDiskConfig { 207 image_name: String, 208 osdisk_path: String, 209 loopback_device: String, 210 windows_snapshot_cow: String, 211 windows_snapshot: String, 212 } 213 214 impl WindowsDiskConfig { 215 pub fn new(image_name: String) -> Self { 216 WindowsDiskConfig { 217 image_name, 218 osdisk_path: String::new(), 219 loopback_device: String::new(), 220 windows_snapshot_cow: String::new(), 221 windows_snapshot: String::new(), 222 } 223 } 224 } 225 226 impl Drop for WindowsDiskConfig { 227 fn drop(&mut self) { 228 // dmsetup remove windows-snapshot-1 229 std::process::Command::new("dmsetup") 230 .arg("remove") 231 .arg(self.windows_snapshot.as_str()) 232 .output() 233 .expect("Expect removing Windows snapshot with 'dmsetup' to succeed"); 234 235 // dmsetup remove windows-snapshot-cow-1 236 std::process::Command::new("dmsetup") 237 .arg("remove") 238 .arg(self.windows_snapshot_cow.as_str()) 239 .output() 240 .expect("Expect removing Windows snapshot CoW with 'dmsetup' to succeed"); 241 242 // losetup -d <loopback_device> 243 std::process::Command::new("losetup") 244 .args(["-d", self.loopback_device.as_str()]) 245 .output() 246 .expect("Expect removing loopback device to succeed"); 247 } 248 } 249 250 impl DiskConfig for UbuntuDiskConfig { 251 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String { 252 let cloudinit_file_path = 253 String::from(tmp_dir.as_path().join("cloudinit").to_str().unwrap()); 254 255 let cloud_init_directory = tmp_dir.as_path().join("cloud-init").join("ubuntu"); 256 257 fs::create_dir_all(&cloud_init_directory) 258 .expect("Expect creating cloud-init directory to succeed"); 259 260 let source_file_dir = std::env::current_dir() 261 .unwrap() 262 .join("test_data") 263 .join("cloud-init") 264 .join("ubuntu") 265 .join("ci"); 266 267 ["meta-data"].iter().for_each(|x| { 268 rate_limited_copy(source_file_dir.join(x), cloud_init_directory.join(x)) 269 .expect("Expect copying cloud-init meta-data to succeed"); 270 }); 271 272 let mut user_data_string = String::new(); 273 fs::File::open(source_file_dir.join("user-data")) 274 .unwrap() 275 .read_to_string(&mut user_data_string) 276 .expect("Expected reading user-data file to succeed"); 277 user_data_string = user_data_string.replace( 278 "@DEFAULT_TCP_LISTENER_MESSAGE", 279 DEFAULT_TCP_LISTENER_MESSAGE, 280 ); 281 user_data_string = user_data_string.replace("@HOST_IP", &network.host_ip); 282 user_data_string = 283 user_data_string.replace("@TCP_LISTENER_PORT", &network.tcp_listener_port.to_string()); 284 285 fs::File::create(cloud_init_directory.join("user-data")) 286 .unwrap() 287 .write_all(user_data_string.as_bytes()) 288 .expect("Expected writing out user-data to succeed"); 289 290 let mut network_config_string = String::new(); 291 292 fs::File::open(source_file_dir.join("network-config")) 293 .unwrap() 294 .read_to_string(&mut network_config_string) 295 .expect("Expected reading network-config file to succeed"); 296 297 network_config_string = network_config_string.replace("192.168.2.1", &network.host_ip); 298 network_config_string = network_config_string.replace("192.168.2.2", &network.guest_ip); 299 network_config_string = network_config_string.replace("192.168.2.3", &network.l2_guest_ip1); 300 network_config_string = network_config_string.replace("192.168.2.4", &network.l2_guest_ip2); 301 network_config_string = network_config_string.replace("192.168.2.5", &network.l2_guest_ip3); 302 network_config_string = 303 network_config_string.replace("12:34:56:78:90:ab", &network.guest_mac); 304 network_config_string = 305 network_config_string.replace("de:ad:be:ef:12:34", &network.l2_guest_mac1); 306 network_config_string = 307 network_config_string.replace("de:ad:be:ef:34:56", &network.l2_guest_mac2); 308 network_config_string = 309 network_config_string.replace("de:ad:be:ef:56:78", &network.l2_guest_mac3); 310 311 fs::File::create(cloud_init_directory.join("network-config")) 312 .unwrap() 313 .write_all(network_config_string.as_bytes()) 314 .expect("Expected writing out network-config to succeed"); 315 316 std::process::Command::new("mkdosfs") 317 .args(["-n", "CIDATA"]) 318 .args(["-C", cloudinit_file_path.as_str()]) 319 .arg("8192") 320 .output() 321 .expect("Expect creating disk image to succeed"); 322 323 ["user-data", "meta-data", "network-config"] 324 .iter() 325 .for_each(|x| { 326 std::process::Command::new("mcopy") 327 .arg("-o") 328 .args(["-i", cloudinit_file_path.as_str()]) 329 .args(["-s", cloud_init_directory.join(x).to_str().unwrap(), "::"]) 330 .output() 331 .expect("Expect copying files to disk image to succeed"); 332 }); 333 334 cloudinit_file_path 335 } 336 337 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig) { 338 let mut workload_path = dirs::home_dir().unwrap(); 339 workload_path.push("workloads"); 340 341 let mut osdisk_base_path = workload_path; 342 osdisk_base_path.push(&self.image_name); 343 344 let osdisk_path = String::from(tmp_dir.as_path().join("osdisk.img").to_str().unwrap()); 345 let cloudinit_path = self.prepare_cloudinit(tmp_dir, network); 346 347 rate_limited_copy(osdisk_base_path, &osdisk_path) 348 .expect("copying of OS source disk image failed"); 349 350 self.cloudinit_path = cloudinit_path; 351 self.osdisk_path = osdisk_path; 352 } 353 354 fn disk(&self, disk_type: DiskType) -> Option<String> { 355 match disk_type { 356 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 357 DiskType::CloudInit => Some(self.cloudinit_path.clone()), 358 } 359 } 360 } 361 362 impl DiskConfig for WindowsDiskConfig { 363 fn prepare_cloudinit(&self, _tmp_dir: &TempDir, _network: &GuestNetworkConfig) -> String { 364 String::new() 365 } 366 367 fn prepare_files(&mut self, tmp_dir: &TempDir, _network: &GuestNetworkConfig) { 368 let mut workload_path = dirs::home_dir().unwrap(); 369 workload_path.push("workloads"); 370 371 let mut osdisk_path = workload_path; 372 osdisk_path.push(&self.image_name); 373 374 let osdisk_blk_size = fs::metadata(osdisk_path) 375 .expect("Expect retrieving Windows image metadata") 376 .len() 377 >> 9; 378 379 let snapshot_cow_path = 380 String::from(tmp_dir.as_path().join("snapshot_cow").to_str().unwrap()); 381 382 // Create and truncate CoW file for device mapper 383 let cow_file_size: u64 = 1 << 30; 384 let cow_file_blk_size = cow_file_size >> 9; 385 let cow_file = std::fs::File::create(snapshot_cow_path.as_str()) 386 .expect("Expect creating CoW image to succeed"); 387 cow_file 388 .set_len(cow_file_size) 389 .expect("Expect truncating CoW image to succeed"); 390 391 // losetup --find --show /tmp/snapshot_cow 392 let loopback_device = std::process::Command::new("losetup") 393 .arg("--find") 394 .arg("--show") 395 .arg(snapshot_cow_path.as_str()) 396 .output() 397 .expect("Expect creating loopback device from snapshot CoW image to succeed"); 398 399 self.loopback_device = String::from_utf8_lossy(&loopback_device.stdout) 400 .trim() 401 .to_string(); 402 403 let random_extension = tmp_dir.as_path().file_name().unwrap(); 404 let windows_snapshot_cow = format!( 405 "windows-snapshot-cow-{}", 406 random_extension.to_str().unwrap() 407 ); 408 409 // dmsetup create windows-snapshot-cow-1 --table '0 2097152 linear /dev/loop1 0' 410 std::process::Command::new("dmsetup") 411 .arg("create") 412 .arg(windows_snapshot_cow.as_str()) 413 .args([ 414 "--table", 415 format!("0 {} linear {} 0", cow_file_blk_size, self.loopback_device).as_str(), 416 ]) 417 .output() 418 .expect("Expect creating Windows snapshot CoW with 'dmsetup' to succeed"); 419 420 let windows_snapshot = format!("windows-snapshot-{}", random_extension.to_str().unwrap()); 421 422 // dmsetup mknodes 423 std::process::Command::new("dmsetup") 424 .arg("mknodes") 425 .output() 426 .expect("Expect device mapper nodes to be ready"); 427 428 // dmsetup create windows-snapshot-1 --table '0 41943040 snapshot /dev/mapper/windows-base /dev/mapper/windows-snapshot-cow-1 P 8' 429 std::process::Command::new("dmsetup") 430 .arg("create") 431 .arg(windows_snapshot.as_str()) 432 .args([ 433 "--table", 434 format!( 435 "0 {} snapshot /dev/mapper/windows-base /dev/mapper/{} P 8", 436 osdisk_blk_size, 437 windows_snapshot_cow.as_str() 438 ) 439 .as_str(), 440 ]) 441 .output() 442 .expect("Expect creating Windows snapshot with 'dmsetup' to succeed"); 443 444 // dmsetup mknodes 445 std::process::Command::new("dmsetup") 446 .arg("mknodes") 447 .output() 448 .expect("Expect device mapper nodes to be ready"); 449 450 self.osdisk_path = format!("/dev/mapper/{windows_snapshot}"); 451 self.windows_snapshot_cow = windows_snapshot_cow; 452 self.windows_snapshot = windows_snapshot; 453 } 454 455 fn disk(&self, disk_type: DiskType) -> Option<String> { 456 match disk_type { 457 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 458 DiskType::CloudInit => None, 459 } 460 } 461 } 462 463 pub fn rate_limited_copy<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<u64> { 464 for i in 0..10 { 465 let free_bytes = unsafe { 466 let mut stats = std::mem::MaybeUninit::zeroed(); 467 let fs_name = std::ffi::CString::new("/tmp").unwrap(); 468 libc::statvfs(fs_name.as_ptr(), stats.as_mut_ptr()); 469 470 let free_blocks = stats.assume_init().f_bfree; 471 let block_size = stats.assume_init().f_bsize; 472 473 free_blocks * block_size 474 }; 475 476 // Make sure there is at least 6 GiB of space 477 if free_bytes < 6 << 30 { 478 eprintln!("Not enough space on disk ({free_bytes}). Attempt {i} of 10. Sleeping."); 479 thread::sleep(std::time::Duration::new(60, 0)); 480 continue; 481 } 482 483 match fs::copy(&from, &to) { 484 Err(e) => { 485 if let Some(errno) = e.raw_os_error() { 486 if errno == libc::ENOSPC { 487 eprintln!("Copy returned ENOSPC. Attempt {i} of 10. Sleeping."); 488 thread::sleep(std::time::Duration::new(60, 0)); 489 continue; 490 } 491 } 492 return Err(e); 493 } 494 Ok(i) => return Ok(i), 495 } 496 } 497 Err(io::Error::last_os_error()) 498 } 499 500 pub fn handle_child_output( 501 r: Result<(), std::boxed::Box<dyn std::any::Any + std::marker::Send>>, 502 output: &std::process::Output, 503 ) { 504 use std::os::unix::process::ExitStatusExt; 505 if r.is_ok() && output.status.success() { 506 return; 507 } 508 509 match output.status.code() { 510 None => { 511 // Don't treat child.kill() as a problem 512 if output.status.signal() == Some(9) && r.is_ok() { 513 return; 514 } 515 516 eprintln!( 517 "==== child killed by signal: {} ====", 518 output.status.signal().unwrap() 519 ); 520 } 521 Some(code) => { 522 eprintln!("\n\n==== child exit code: {code} ===="); 523 } 524 } 525 526 eprintln!( 527 "\n\n==== Start child stdout ====\n\n{}\n\n==== End child stdout ====", 528 String::from_utf8_lossy(&output.stdout) 529 ); 530 eprintln!( 531 "\n\n==== Start child stderr ====\n\n{}\n\n==== End child stderr ====", 532 String::from_utf8_lossy(&output.stderr) 533 ); 534 535 panic!("Test failed") 536 } 537 538 #[derive(Debug)] 539 pub struct PasswordAuth { 540 pub username: String, 541 pub password: String, 542 } 543 544 pub const DEFAULT_SSH_RETRIES: u8 = 6; 545 pub const DEFAULT_SSH_TIMEOUT: u8 = 10; 546 547 #[derive(Error, Debug)] 548 pub enum SshCommandError { 549 #[error("ssh connection failed")] 550 Connection(#[source] std::io::Error), 551 #[error("ssh handshake failed")] 552 Handshake(#[source] ssh2::Error), 553 #[error("ssh authentication failed")] 554 Authentication(#[source] ssh2::Error), 555 #[error("ssh channel session failed")] 556 ChannelSession(#[source] ssh2::Error), 557 #[error("ssh command failed")] 558 Command(#[source] ssh2::Error), 559 #[error("retrieving exit status from ssh command failed")] 560 ExitStatus(#[source] ssh2::Error), 561 #[error("the exit code indicates failure: {0}")] 562 NonZeroExitStatus(i32), 563 #[error("failed to read file")] 564 FileRead(#[source] std::io::Error), 565 #[error("failed to read metadata")] 566 FileMetadata(#[source] std::io::Error), 567 #[error("scp send failed")] 568 ScpSend(#[source] ssh2::Error), 569 #[error("scp write failed")] 570 WriteAll(#[source] std::io::Error), 571 #[error("scp send EOF failed")] 572 SendEof(#[source] ssh2::Error), 573 #[error("scp wait EOF failed")] 574 WaitEof(#[source] ssh2::Error), 575 } 576 577 fn scp_to_guest_with_auth( 578 path: &Path, 579 remote_path: &Path, 580 auth: &PasswordAuth, 581 ip: &str, 582 retries: u8, 583 timeout: u8, 584 ) -> Result<(), SshCommandError> { 585 let mut counter = 0; 586 loop { 587 let closure = || -> Result<(), SshCommandError> { 588 let tcp = 589 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?; 590 let mut sess = Session::new().unwrap(); 591 sess.set_tcp_stream(tcp); 592 sess.handshake().map_err(SshCommandError::Handshake)?; 593 594 sess.userauth_password(&auth.username, &auth.password) 595 .map_err(SshCommandError::Authentication)?; 596 assert!(sess.authenticated()); 597 598 let content = fs::read(path).map_err(SshCommandError::FileRead)?; 599 let mode = fs::metadata(path) 600 .map_err(SshCommandError::FileMetadata)? 601 .permissions() 602 .mode() 603 & 0o777; 604 605 let mut channel = sess 606 .scp_send(remote_path, mode as i32, content.len() as u64, None) 607 .map_err(SshCommandError::ScpSend)?; 608 channel 609 .write_all(&content) 610 .map_err(SshCommandError::WriteAll)?; 611 channel.send_eof().map_err(SshCommandError::SendEof)?; 612 channel.wait_eof().map_err(SshCommandError::WaitEof)?; 613 614 // Intentionally ignore these results here as their failure 615 // does not precipitate a repeat 616 let _ = channel.close(); 617 let _ = channel.wait_close(); 618 619 Ok(()) 620 }; 621 622 match closure() { 623 Ok(_) => break, 624 Err(e) => { 625 counter += 1; 626 if counter >= retries { 627 eprintln!( 628 "\n\n==== Start scp command output (FAILED) ====\n\n\ 629 path =\"{path:?}\"\n\ 630 remote_path =\"{remote_path:?}\"\n\ 631 auth=\"{auth:#?}\"\n\ 632 ip=\"{ip}\"\n\ 633 error=\"{e:?}\"\n\ 634 \n==== End scp command outout ====\n\n" 635 ); 636 637 return Err(e); 638 } 639 } 640 }; 641 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 642 } 643 Ok(()) 644 } 645 646 pub fn scp_to_guest( 647 path: &Path, 648 remote_path: &Path, 649 ip: &str, 650 retries: u8, 651 timeout: u8, 652 ) -> Result<(), SshCommandError> { 653 scp_to_guest_with_auth( 654 path, 655 remote_path, 656 &PasswordAuth { 657 username: String::from("cloud"), 658 password: String::from("cloud123"), 659 }, 660 ip, 661 retries, 662 timeout, 663 ) 664 } 665 666 pub fn ssh_command_ip_with_auth( 667 command: &str, 668 auth: &PasswordAuth, 669 ip: &str, 670 retries: u8, 671 timeout: u8, 672 ) -> Result<String, SshCommandError> { 673 let mut s = String::new(); 674 675 let mut counter = 0; 676 loop { 677 let mut closure = || -> Result<(), SshCommandError> { 678 let tcp = 679 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?; 680 let mut sess = Session::new().unwrap(); 681 sess.set_tcp_stream(tcp); 682 sess.handshake().map_err(SshCommandError::Handshake)?; 683 684 sess.userauth_password(&auth.username, &auth.password) 685 .map_err(SshCommandError::Authentication)?; 686 assert!(sess.authenticated()); 687 688 let mut channel = sess 689 .channel_session() 690 .map_err(SshCommandError::ChannelSession)?; 691 channel.exec(command).map_err(SshCommandError::Command)?; 692 693 // Intentionally ignore these results here as their failure 694 // does not precipitate a repeat 695 let _ = channel.read_to_string(&mut s); 696 let _ = channel.close(); 697 let _ = channel.wait_close(); 698 699 let status = channel.exit_status().map_err(SshCommandError::ExitStatus)?; 700 701 if status != 0 { 702 Err(SshCommandError::NonZeroExitStatus(status)) 703 } else { 704 Ok(()) 705 } 706 }; 707 708 match closure() { 709 Ok(_) => break, 710 Err(e) => { 711 counter += 1; 712 if counter >= retries { 713 eprintln!( 714 "\n\n==== Start ssh command output (FAILED) ====\n\n\ 715 command=\"{command}\"\n\ 716 auth=\"{auth:#?}\"\n\ 717 ip=\"{ip}\"\n\ 718 output=\"{s}\"\n\ 719 error=\"{e:?}\"\n\ 720 \n==== End ssh command outout ====\n\n" 721 ); 722 723 return Err(e); 724 } 725 } 726 }; 727 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 728 } 729 Ok(s) 730 } 731 732 pub fn ssh_command_ip( 733 command: &str, 734 ip: &str, 735 retries: u8, 736 timeout: u8, 737 ) -> Result<String, SshCommandError> { 738 ssh_command_ip_with_auth( 739 command, 740 &PasswordAuth { 741 username: String::from("cloud"), 742 password: String::from("cloud123"), 743 }, 744 ip, 745 retries, 746 timeout, 747 ) 748 } 749 750 pub fn exec_host_command_with_retries(command: &str, retries: u32, interval: Duration) -> bool { 751 for _ in 0..retries { 752 let s = exec_host_command_output(command).status; 753 if !s.success() { 754 eprintln!("\n\n==== retrying in {interval:?} ===\n\n"); 755 thread::sleep(interval); 756 } else { 757 return true; 758 } 759 } 760 761 false 762 } 763 764 pub fn exec_host_command_status(command: &str) -> ExitStatus { 765 exec_host_command_output(command).status 766 } 767 768 pub fn exec_host_command_output(command: &str) -> Output { 769 let output = std::process::Command::new("bash") 770 .args(["-c", command]) 771 .output() 772 .unwrap_or_else(|e| panic!("Expected '{command}' to run. Error: {e:?}")); 773 774 if !output.status.success() { 775 let stdout = String::from_utf8_lossy(&output.stdout); 776 let stderr = String::from_utf8_lossy(&output.stderr); 777 eprintln!( 778 "\n\n==== Start 'exec_host_command' failed ==== \ 779 \n\n---stdout---\n{stdout}\n---stderr---{stderr} \ 780 \n\n==== End 'exec_host_command' failed ====", 781 ); 782 } 783 784 output 785 } 786 787 pub fn check_lines_count(input: &str, line_count: usize) -> bool { 788 if input.lines().count() == line_count { 789 true 790 } else { 791 eprintln!( 792 "\n\n==== Start 'check_lines_count' failed ==== \ 793 \n\ninput = {input}\nline_count = {line_count} \ 794 \n\n==== End 'check_lines_count' failed ====", 795 ); 796 797 false 798 } 799 } 800 801 pub fn check_matched_lines_count(input: &str, keywords: Vec<&str>, line_count: usize) -> bool { 802 let mut matches = String::new(); 803 for line in input.lines() { 804 if keywords.iter().all(|k| line.contains(k)) { 805 matches += line; 806 } 807 } 808 809 if matches.lines().count() == line_count { 810 true 811 } else { 812 eprintln!( 813 "\n\n==== Start 'check_matched_lines_count' failed ==== \ 814 \nkeywords = {keywords:?}, line_count = {line_count} \ 815 \n\ninput = {input} matches = {matches} \ 816 \n\n==== End 'check_matched_lines_count' failed ====", 817 ); 818 819 false 820 } 821 } 822 823 pub fn kill_child(child: &mut Child) { 824 let r = unsafe { libc::kill(child.id() as i32, libc::SIGTERM) }; 825 if r != 0 { 826 let e = io::Error::last_os_error(); 827 if e.raw_os_error().unwrap() == libc::ESRCH { 828 return; 829 } 830 eprintln!("Failed to kill child with SIGTERM: {e:?}"); 831 } 832 833 // The timeout period elapsed without the child exiting 834 if child.wait_timeout(Duration::new(10, 0)).unwrap().is_none() { 835 let _ = child.kill(); 836 let rust_flags = env::var("RUSTFLAGS").unwrap_or_default(); 837 if rust_flags.contains("-Cinstrument-coverage") { 838 panic!("Wait child timeout, please check the reason.") 839 } 840 } 841 } 842 843 pub const PIPE_SIZE: i32 = 32 << 20; 844 845 static NEXT_VM_ID: Lazy<Mutex<u8>> = Lazy::new(|| Mutex::new(1)); 846 847 pub struct Guest { 848 pub tmp_dir: TempDir, 849 pub disk_config: Box<dyn DiskConfig>, 850 pub network: GuestNetworkConfig, 851 } 852 853 // Safe to implement as we know we have no interior mutability 854 impl std::panic::RefUnwindSafe for Guest {} 855 856 impl Guest { 857 pub fn new_from_ip_range(mut disk_config: Box<dyn DiskConfig>, class: &str, id: u8) -> Self { 858 let tmp_dir = TempDir::new_with_prefix("/tmp/ch").unwrap(); 859 860 let network = GuestNetworkConfig { 861 guest_ip: format!("{class}.{id}.2"), 862 l2_guest_ip1: format!("{class}.{id}.3"), 863 l2_guest_ip2: format!("{class}.{id}.4"), 864 l2_guest_ip3: format!("{class}.{id}.5"), 865 host_ip: format!("{class}.{id}.1"), 866 guest_mac: format!("12:34:56:78:90:{id:02x}"), 867 l2_guest_mac1: format!("de:ad:be:ef:12:{id:02x}"), 868 l2_guest_mac2: format!("de:ad:be:ef:34:{id:02x}"), 869 l2_guest_mac3: format!("de:ad:be:ef:56:{id:02x}"), 870 tcp_listener_port: DEFAULT_TCP_LISTENER_PORT + id as u16, 871 }; 872 873 disk_config.prepare_files(&tmp_dir, &network); 874 875 Guest { 876 tmp_dir, 877 disk_config, 878 network, 879 } 880 } 881 882 pub fn new(disk_config: Box<dyn DiskConfig>) -> Self { 883 let mut guard = NEXT_VM_ID.lock().unwrap(); 884 let id = *guard; 885 *guard = id + 1; 886 887 Self::new_from_ip_range(disk_config, "192.168", id) 888 } 889 890 pub fn default_net_string(&self) -> String { 891 format!( 892 "tap=,mac={},ip={},mask=255.255.255.0", 893 self.network.guest_mac, self.network.host_ip 894 ) 895 } 896 897 pub fn default_net_string_w_iommu(&self) -> String { 898 format!( 899 "tap=,mac={},ip={},mask=255.255.255.0,iommu=on", 900 self.network.guest_mac, self.network.host_ip 901 ) 902 } 903 904 pub fn default_net_string_w_mtu(&self, mtu: u16) -> String { 905 format!( 906 "tap=,mac={},ip={},mask=255.255.255.0,mtu={}", 907 self.network.guest_mac, self.network.host_ip, mtu 908 ) 909 } 910 911 pub fn ssh_command(&self, command: &str) -> Result<String, SshCommandError> { 912 ssh_command_ip( 913 command, 914 &self.network.guest_ip, 915 DEFAULT_SSH_RETRIES, 916 DEFAULT_SSH_TIMEOUT, 917 ) 918 } 919 920 #[cfg(target_arch = "x86_64")] 921 pub fn ssh_command_l1(&self, command: &str) -> Result<String, SshCommandError> { 922 ssh_command_ip( 923 command, 924 &self.network.guest_ip, 925 DEFAULT_SSH_RETRIES, 926 DEFAULT_SSH_TIMEOUT, 927 ) 928 } 929 930 #[cfg(target_arch = "x86_64")] 931 pub fn ssh_command_l2_1(&self, command: &str) -> Result<String, SshCommandError> { 932 ssh_command_ip( 933 command, 934 &self.network.l2_guest_ip1, 935 DEFAULT_SSH_RETRIES, 936 DEFAULT_SSH_TIMEOUT, 937 ) 938 } 939 940 #[cfg(target_arch = "x86_64")] 941 pub fn ssh_command_l2_2(&self, command: &str) -> Result<String, SshCommandError> { 942 ssh_command_ip( 943 command, 944 &self.network.l2_guest_ip2, 945 DEFAULT_SSH_RETRIES, 946 DEFAULT_SSH_TIMEOUT, 947 ) 948 } 949 950 #[cfg(target_arch = "x86_64")] 951 pub fn ssh_command_l2_3(&self, command: &str) -> Result<String, SshCommandError> { 952 ssh_command_ip( 953 command, 954 &self.network.l2_guest_ip3, 955 DEFAULT_SSH_RETRIES, 956 DEFAULT_SSH_TIMEOUT, 957 ) 958 } 959 960 pub fn api_create_body(&self, cpu_count: u8, kernel_path: &str, kernel_cmd: &str) -> String { 961 format! {"{{\"cpus\":{{\"boot_vcpus\":{},\"max_vcpus\":{}}},\"payload\":{{\"kernel\":\"{}\",\"cmdline\": \"{}\"}},\"net\":[{{\"ip\":\"{}\", \"mask\":\"255.255.255.0\", \"mac\":\"{}\"}}], \"disks\":[{{\"path\":\"{}\"}}, {{\"path\":\"{}\"}}]}}", 962 cpu_count, 963 cpu_count, 964 kernel_path, 965 kernel_cmd, 966 self.network.host_ip, 967 self.network.guest_mac, 968 self.disk_config.disk(DiskType::OperatingSystem).unwrap().as_str(), 969 self.disk_config.disk(DiskType::CloudInit).unwrap().as_str(), 970 } 971 } 972 973 pub fn get_cpu_count(&self) -> Result<u32, Error> { 974 self.ssh_command("grep -c processor /proc/cpuinfo")? 975 .trim() 976 .parse() 977 .map_err(Error::Parsing) 978 } 979 980 pub fn get_total_memory(&self) -> Result<u32, Error> { 981 self.ssh_command("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 982 .trim() 983 .parse() 984 .map_err(Error::Parsing) 985 } 986 987 #[cfg(target_arch = "x86_64")] 988 pub fn get_total_memory_l2(&self) -> Result<u32, Error> { 989 self.ssh_command_l2_1("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 990 .trim() 991 .parse() 992 .map_err(Error::Parsing) 993 } 994 995 pub fn get_numa_node_memory(&self, node_id: usize) -> Result<u32, Error> { 996 self.ssh_command( 997 format!( 998 "grep MemTotal /sys/devices/system/node/node{node_id}/meminfo \ 999 | cut -d \":\" -f 2 | grep -o \"[0-9]*\"" 1000 ) 1001 .as_str(), 1002 )? 1003 .trim() 1004 .parse() 1005 .map_err(Error::Parsing) 1006 } 1007 1008 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), Error> { 1009 self.network 1010 .wait_vm_boot(custom_timeout) 1011 .map_err(Error::WaitForBoot) 1012 } 1013 1014 pub fn check_numa_node_cpus(&self, node_id: usize, cpus: Vec<usize>) -> Result<(), Error> { 1015 for cpu in cpus.iter() { 1016 let cmd = format!("[ -d \"/sys/devices/system/node/node{node_id}/cpu{cpu}\" ]"); 1017 self.ssh_command(cmd.as_str())?; 1018 } 1019 1020 Ok(()) 1021 } 1022 1023 pub fn check_numa_node_distances( 1024 &self, 1025 node_id: usize, 1026 distances: &str, 1027 ) -> Result<bool, Error> { 1028 let cmd = format!("cat /sys/devices/system/node/node{node_id}/distance"); 1029 if self.ssh_command(cmd.as_str())?.trim() == distances { 1030 Ok(true) 1031 } else { 1032 Ok(false) 1033 } 1034 } 1035 1036 pub fn check_numa_common( 1037 &self, 1038 mem_ref: Option<&[u32]>, 1039 node_ref: Option<&[Vec<usize>]>, 1040 distance_ref: Option<&[&str]>, 1041 ) { 1042 if let Some(mem_ref) = mem_ref { 1043 // Check each NUMA node has been assigned the right amount of 1044 // memory. 1045 for (i, &m) in mem_ref.iter().enumerate() { 1046 assert!(self.get_numa_node_memory(i).unwrap_or_default() > m); 1047 } 1048 } 1049 1050 if let Some(node_ref) = node_ref { 1051 // Check each NUMA node has been assigned the right CPUs set. 1052 for (i, n) in node_ref.iter().enumerate() { 1053 self.check_numa_node_cpus(i, n.clone()).unwrap(); 1054 } 1055 } 1056 1057 if let Some(distance_ref) = distance_ref { 1058 // Check each NUMA node has been assigned the right distances. 1059 for (i, &d) in distance_ref.iter().enumerate() { 1060 assert!(self.check_numa_node_distances(i, d).unwrap()); 1061 } 1062 } 1063 } 1064 1065 #[cfg(target_arch = "x86_64")] 1066 pub fn check_sgx_support(&self) -> Result<(), Error> { 1067 self.ssh_command( 1068 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX: \ 1069 Software Guard Extensions supported = true'", 1070 )?; 1071 self.ssh_command( 1072 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX_LC: \ 1073 SGX launch config supported = true'", 1074 )?; 1075 self.ssh_command( 1076 "cpuid -l 0x12 -s 0 | tr -s [:space:] | grep -q 'SGX1 \ 1077 supported = true'", 1078 )?; 1079 1080 Ok(()) 1081 } 1082 1083 pub fn get_pci_bridge_class(&self) -> Result<String, Error> { 1084 Ok(self 1085 .ssh_command("cat /sys/bus/pci/devices/0000:00:00.0/class")? 1086 .trim() 1087 .to_string()) 1088 } 1089 1090 pub fn get_pci_device_ids(&self) -> Result<String, Error> { 1091 Ok(self 1092 .ssh_command("cat /sys/bus/pci/devices/*/device")? 1093 .trim() 1094 .to_string()) 1095 } 1096 1097 pub fn get_pci_vendor_ids(&self) -> Result<String, Error> { 1098 Ok(self 1099 .ssh_command("cat /sys/bus/pci/devices/*/vendor")? 1100 .trim() 1101 .to_string()) 1102 } 1103 1104 pub fn does_device_vendor_pair_match( 1105 &self, 1106 device_id: &str, 1107 vendor_id: &str, 1108 ) -> Result<bool, Error> { 1109 // We are checking if console device's device id and vendor id pair matches 1110 let devices = self.get_pci_device_ids()?; 1111 let devices: Vec<&str> = devices.split('\n').collect(); 1112 let vendors = self.get_pci_vendor_ids()?; 1113 let vendors: Vec<&str> = vendors.split('\n').collect(); 1114 1115 for (index, d_id) in devices.iter().enumerate() { 1116 if *d_id == device_id { 1117 if let Some(v_id) = vendors.get(index) { 1118 if *v_id == vendor_id { 1119 return Ok(true); 1120 } 1121 } 1122 } 1123 } 1124 1125 Ok(false) 1126 } 1127 1128 pub fn check_vsock(&self, socket: &str) { 1129 // Listen from guest on vsock CID=3 PORT=16 1130 // SOCKET-LISTEN:<domain>:<protocol>:<local-address> 1131 let guest_ip = self.network.guest_ip.clone(); 1132 let listen_socat = thread::spawn(move || { 1133 ssh_command_ip("sudo socat - SOCKET-LISTEN:40:0:x00x00x10x00x00x00x03x00x00x00x00x00x00x00 > vsock_log", &guest_ip, DEFAULT_SSH_RETRIES, DEFAULT_SSH_TIMEOUT).unwrap(); 1134 }); 1135 1136 // Make sure socat is listening, which might take a few second on slow systems 1137 thread::sleep(std::time::Duration::new(10, 0)); 1138 1139 // Write something to vsock from the host 1140 assert!(exec_host_command_status(&format!( 1141 "echo -e \"CONNECT 16\\nHelloWorld!\" | socat - UNIX-CONNECT:{socket}" 1142 )) 1143 .success()); 1144 1145 // Wait for the thread to terminate. 1146 listen_socat.join().unwrap(); 1147 1148 assert_eq!( 1149 self.ssh_command("cat vsock_log").unwrap().trim(), 1150 "HelloWorld!" 1151 ); 1152 } 1153 1154 #[cfg(target_arch = "x86_64")] 1155 pub fn check_nvidia_gpu(&self) { 1156 assert!(self 1157 .ssh_command("nvidia-smi") 1158 .unwrap() 1159 .contains("NVIDIA L40S")); 1160 } 1161 1162 pub fn reboot_linux(&self, current_reboot_count: u32, custom_timeout: Option<i32>) { 1163 let list_boots_cmd = "sudo last | grep -c reboot"; 1164 let boot_count = self 1165 .ssh_command(list_boots_cmd) 1166 .unwrap() 1167 .trim() 1168 .parse::<u32>() 1169 .unwrap_or_default(); 1170 1171 assert_eq!(boot_count, current_reboot_count + 1); 1172 self.ssh_command("sudo reboot").unwrap(); 1173 1174 self.wait_vm_boot(custom_timeout).unwrap(); 1175 let boot_count = self 1176 .ssh_command(list_boots_cmd) 1177 .unwrap() 1178 .trim() 1179 .parse::<u32>() 1180 .unwrap_or_default(); 1181 assert_eq!(boot_count, current_reboot_count + 2); 1182 } 1183 1184 pub fn enable_memory_hotplug(&self) { 1185 self.ssh_command("echo online | sudo tee /sys/devices/system/memory/auto_online_blocks") 1186 .unwrap(); 1187 } 1188 1189 pub fn check_devices_common( 1190 &self, 1191 socket: Option<&String>, 1192 console_text: Option<&String>, 1193 pmem_path: Option<&String>, 1194 ) { 1195 // Check block devices are readable 1196 self.ssh_command("sudo dd if=/dev/vda of=/dev/null bs=1M iflag=direct count=1024") 1197 .unwrap(); 1198 self.ssh_command("sudo dd if=/dev/vdb of=/dev/null bs=1M iflag=direct count=8") 1199 .unwrap(); 1200 // Check if the rng device is readable 1201 self.ssh_command("sudo head -c 1000 /dev/hwrng > /dev/null") 1202 .unwrap(); 1203 // Check vsock 1204 if let Some(socket) = socket { 1205 self.check_vsock(socket.as_str()); 1206 } 1207 // Check if the console is usable 1208 if let Some(console_text) = console_text { 1209 let console_cmd = format!("echo {console_text} | sudo tee /dev/hvc0"); 1210 self.ssh_command(&console_cmd).unwrap(); 1211 } 1212 // The net device is 'automatically' exercised through the above 'ssh' commands 1213 1214 // Check if the pmem device is usable 1215 if let Some(pmem_path) = pmem_path { 1216 assert_eq!( 1217 self.ssh_command(&format!("ls {pmem_path}")).unwrap().trim(), 1218 pmem_path 1219 ); 1220 assert_eq!( 1221 self.ssh_command(&format!("sudo mount {pmem_path} /mnt")) 1222 .unwrap(), 1223 "" 1224 ); 1225 assert_eq!(self.ssh_command("ls /mnt").unwrap(), "lost+found\n"); 1226 self.ssh_command("echo test123 | sudo tee /mnt/test") 1227 .unwrap(); 1228 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1229 assert_eq!(self.ssh_command("ls /mnt").unwrap(), ""); 1230 1231 assert_eq!( 1232 self.ssh_command(&format!("sudo mount {pmem_path} /mnt")) 1233 .unwrap(), 1234 "" 1235 ); 1236 assert_eq!( 1237 self.ssh_command("sudo cat /mnt/test || true") 1238 .unwrap() 1239 .trim(), 1240 "test123" 1241 ); 1242 self.ssh_command("sudo rm /mnt/test").unwrap(); 1243 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1244 } 1245 } 1246 } 1247 1248 pub enum VerbosityLevel { 1249 Warn, 1250 Info, 1251 Debug, 1252 } 1253 1254 impl Default for VerbosityLevel { 1255 fn default() -> Self { 1256 Self::Warn 1257 } 1258 } 1259 1260 impl Display for VerbosityLevel { 1261 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1262 use VerbosityLevel::*; 1263 match self { 1264 Warn => (), 1265 Info => write!(f, "-v")?, 1266 Debug => write!(f, "-vv")?, 1267 } 1268 Ok(()) 1269 } 1270 } 1271 1272 pub struct GuestCommand<'a> { 1273 command: Command, 1274 guest: &'a Guest, 1275 capture_output: bool, 1276 print_cmd: bool, 1277 verbosity: VerbosityLevel, 1278 } 1279 1280 impl<'a> GuestCommand<'a> { 1281 pub fn new(guest: &'a Guest) -> Self { 1282 Self::new_with_binary_path(guest, &clh_command("cloud-hypervisor")) 1283 } 1284 1285 pub fn new_with_binary_path(guest: &'a Guest, binary_path: &str) -> Self { 1286 Self { 1287 command: Command::new(binary_path), 1288 guest, 1289 capture_output: false, 1290 print_cmd: true, 1291 verbosity: VerbosityLevel::Info, 1292 } 1293 } 1294 1295 pub fn verbosity(&mut self, verbosity: VerbosityLevel) -> &mut Self { 1296 self.verbosity = verbosity; 1297 self 1298 } 1299 1300 pub fn capture_output(&mut self) -> &mut Self { 1301 self.capture_output = true; 1302 self 1303 } 1304 1305 pub fn set_print_cmd(&mut self, print_cmd: bool) -> &mut Self { 1306 self.print_cmd = print_cmd; 1307 self 1308 } 1309 1310 pub fn spawn(&mut self) -> io::Result<Child> { 1311 use VerbosityLevel::*; 1312 match &self.verbosity { 1313 Warn => {} 1314 Info => { 1315 self.command.arg("-v"); 1316 } 1317 Debug => { 1318 self.command.args(["-vv"]); 1319 } 1320 }; 1321 1322 if self.print_cmd { 1323 println!( 1324 "\n\n==== Start cloud-hypervisor command-line ====\n\n\ 1325 {:?}\n\ 1326 \n==== End cloud-hypervisor command-line ====\n\n", 1327 self.command 1328 ); 1329 } 1330 1331 if self.capture_output { 1332 // The caller should call .wait() on the returned child 1333 #[allow(unknown_lints)] 1334 #[allow(clippy::zombie_processes)] 1335 let child = self 1336 .command 1337 .stderr(Stdio::piped()) 1338 .stdout(Stdio::piped()) 1339 .spawn() 1340 .unwrap(); 1341 1342 let fd = child.stdout.as_ref().unwrap().as_raw_fd(); 1343 let pipesize = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1344 if pipesize == -1 { 1345 return Err(io::Error::last_os_error()); 1346 } 1347 let fd = child.stderr.as_ref().unwrap().as_raw_fd(); 1348 let pipesize1 = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1349 if pipesize1 == -1 { 1350 return Err(io::Error::last_os_error()); 1351 } 1352 1353 if pipesize >= PIPE_SIZE && pipesize1 >= PIPE_SIZE { 1354 Ok(child) 1355 } else { 1356 Err(std::io::Error::other( 1357 format!( 1358 "resizing pipe w/ 'fnctl' failed: stdout pipesize {pipesize}, stderr pipesize {pipesize1}" 1359 ), 1360 )) 1361 } 1362 } else { 1363 // The caller should call .wait() on the returned child 1364 #[allow(unknown_lints)] 1365 #[allow(clippy::zombie_processes)] 1366 self.command.spawn() 1367 } 1368 } 1369 1370 pub fn args<I, S>(&mut self, args: I) -> &mut Self 1371 where 1372 I: IntoIterator<Item = S>, 1373 S: AsRef<OsStr>, 1374 { 1375 self.command.args(args); 1376 self 1377 } 1378 1379 pub fn default_disks(&mut self) -> &mut Self { 1380 if self.guest.disk_config.disk(DiskType::CloudInit).is_some() { 1381 self.args([ 1382 "--disk", 1383 format!( 1384 "path={}", 1385 self.guest 1386 .disk_config 1387 .disk(DiskType::OperatingSystem) 1388 .unwrap() 1389 ) 1390 .as_str(), 1391 format!( 1392 "path={}", 1393 self.guest.disk_config.disk(DiskType::CloudInit).unwrap() 1394 ) 1395 .as_str(), 1396 ]) 1397 } else { 1398 self.args([ 1399 "--disk", 1400 format!( 1401 "path={}", 1402 self.guest 1403 .disk_config 1404 .disk(DiskType::OperatingSystem) 1405 .unwrap() 1406 ) 1407 .as_str(), 1408 ]) 1409 } 1410 } 1411 1412 pub fn default_net(&mut self) -> &mut Self { 1413 self.args(["--net", self.guest.default_net_string().as_str()]) 1414 } 1415 } 1416 1417 pub fn clh_command(cmd: &str) -> String { 1418 env::var("BUILD_TARGET").map_or( 1419 format!("target/x86_64-unknown-linux-gnu/release/{cmd}"), 1420 |target| format!("target/{target}/release/{cmd}"), 1421 ) 1422 } 1423 1424 pub fn parse_iperf3_output(output: &[u8], sender: bool, bandwidth: bool) -> Result<f64, Error> { 1425 std::panic::catch_unwind(|| { 1426 let s = String::from_utf8_lossy(output); 1427 let v: Value = serde_json::from_str(&s).expect("'iperf3' parse error: invalid json output"); 1428 1429 if bandwidth { 1430 if sender { 1431 v["end"]["sum_sent"]["bits_per_second"] 1432 .as_f64() 1433 .expect("'iperf3' parse error: missing entry 'end.sum_sent.bits_per_second'") 1434 } else { 1435 v["end"]["sum_received"]["bits_per_second"].as_f64().expect( 1436 "'iperf3' parse error: missing entry 'end.sum_received.bits_per_second'", 1437 ) 1438 } 1439 } else { 1440 // iperf does not distinguish sent vs received in this case. 1441 1442 let lost_packets = v["end"]["sum"]["lost_packets"] 1443 .as_f64() 1444 .expect("'iperf3' parse error: missing entry 'end.sum.lost_packets'"); 1445 let packets = v["end"]["sum"]["packets"] 1446 .as_f64() 1447 .expect("'iperf3' parse error: missing entry 'end.sum.packets'"); 1448 let seconds = v["end"]["sum"]["seconds"] 1449 .as_f64() 1450 .expect("'iperf3' parse error: missing entry 'end.sum.seconds'"); 1451 1452 (packets - lost_packets) / seconds 1453 } 1454 }) 1455 .map_err(|_| { 1456 eprintln!( 1457 "==== Start iperf3 output ===\n\n{}\n\n=== End iperf3 output ===\n\n", 1458 String::from_utf8_lossy(output) 1459 ); 1460 Error::Iperf3Parse 1461 }) 1462 } 1463 1464 #[derive(Clone)] 1465 pub enum FioOps { 1466 Read, 1467 RandomRead, 1468 Write, 1469 RandomWrite, 1470 ReadWrite, 1471 RandRW, 1472 } 1473 1474 impl fmt::Display for FioOps { 1475 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1476 match self { 1477 FioOps::Read => write!(f, "read"), 1478 FioOps::RandomRead => write!(f, "randread"), 1479 FioOps::Write => write!(f, "write"), 1480 FioOps::RandomWrite => write!(f, "randwrite"), 1481 FioOps::ReadWrite => write!(f, "rw"), 1482 FioOps::RandRW => write!(f, "randrw"), 1483 } 1484 } 1485 } 1486 1487 pub fn parse_fio_output(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1488 std::panic::catch_unwind(|| { 1489 let v: Value = 1490 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1491 let jobs = v["jobs"] 1492 .as_array() 1493 .expect("'fio' parse error: missing entry 'jobs'"); 1494 assert_eq!( 1495 jobs.len(), 1496 num_jobs as usize, 1497 "'fio' parse error: Unexpected number of 'fio' jobs." 1498 ); 1499 1500 let (read, write) = match fio_ops { 1501 FioOps::Read | FioOps::RandomRead => (true, false), 1502 FioOps::Write | FioOps::RandomWrite => (false, true), 1503 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1504 }; 1505 1506 let mut total_bps = 0_f64; 1507 for j in jobs { 1508 if read { 1509 let bytes = j["read"]["io_bytes"] 1510 .as_u64() 1511 .expect("'fio' parse error: missing entry 'read.io_bytes'"); 1512 let runtime = j["read"]["runtime"] 1513 .as_u64() 1514 .expect("'fio' parse error: missing entry 'read.runtime'") 1515 as f64 1516 / 1000_f64; 1517 total_bps += bytes as f64 / runtime; 1518 } 1519 if write { 1520 let bytes = j["write"]["io_bytes"] 1521 .as_u64() 1522 .expect("'fio' parse error: missing entry 'write.io_bytes'"); 1523 let runtime = j["write"]["runtime"] 1524 .as_u64() 1525 .expect("'fio' parse error: missing entry 'write.runtime'") 1526 as f64 1527 / 1000_f64; 1528 total_bps += bytes as f64 / runtime; 1529 } 1530 } 1531 1532 total_bps 1533 }) 1534 .map_err(|_| { 1535 eprintln!("=== Start Fio output ===\n\n{output}\n\n=== End Fio output ===\n\n"); 1536 Error::FioOutputParse 1537 }) 1538 } 1539 1540 pub fn parse_fio_output_iops(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1541 std::panic::catch_unwind(|| { 1542 let v: Value = 1543 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1544 let jobs = v["jobs"] 1545 .as_array() 1546 .expect("'fio' parse error: missing entry 'jobs'"); 1547 assert_eq!( 1548 jobs.len(), 1549 num_jobs as usize, 1550 "'fio' parse error: Unexpected number of 'fio' jobs." 1551 ); 1552 1553 let (read, write) = match fio_ops { 1554 FioOps::Read | FioOps::RandomRead => (true, false), 1555 FioOps::Write | FioOps::RandomWrite => (false, true), 1556 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1557 }; 1558 1559 let mut total_iops = 0_f64; 1560 for j in jobs { 1561 if read { 1562 let ios = j["read"]["total_ios"] 1563 .as_u64() 1564 .expect("'fio' parse error: missing entry 'read.total_ios'"); 1565 let runtime = j["read"]["runtime"] 1566 .as_u64() 1567 .expect("'fio' parse error: missing entry 'read.runtime'") 1568 as f64 1569 / 1000_f64; 1570 total_iops += ios as f64 / runtime; 1571 } 1572 if write { 1573 let ios = j["write"]["total_ios"] 1574 .as_u64() 1575 .expect("'fio' parse error: missing entry 'write.total_ios'"); 1576 let runtime = j["write"]["runtime"] 1577 .as_u64() 1578 .expect("'fio' parse error: missing entry 'write.runtime'") 1579 as f64 1580 / 1000_f64; 1581 total_iops += ios as f64 / runtime; 1582 } 1583 } 1584 1585 total_iops 1586 }) 1587 .map_err(|_| { 1588 eprintln!("=== Start Fio output ===\n\n{output}\n\n=== End Fio output ===\n\n"); 1589 Error::FioOutputParse 1590 }) 1591 } 1592 1593 // Wait the child process for a given timeout 1594 fn child_wait_timeout(child: &mut Child, timeout: u64) -> Result<(), WaitTimeoutError> { 1595 match child.wait_timeout(Duration::from_secs(timeout)) { 1596 Err(e) => { 1597 return Err(WaitTimeoutError::General(e)); 1598 } 1599 Ok(s) => match s { 1600 None => { 1601 return Err(WaitTimeoutError::Timedout); 1602 } 1603 Some(s) => { 1604 if !s.success() { 1605 return Err(WaitTimeoutError::ExitStatus); 1606 } 1607 } 1608 }, 1609 } 1610 1611 Ok(()) 1612 } 1613 1614 pub fn measure_virtio_net_throughput( 1615 test_timeout: u32, 1616 queue_pairs: u32, 1617 guest: &Guest, 1618 receive: bool, 1619 bandwidth: bool, 1620 ) -> Result<f64, Error> { 1621 let default_port = 5201; 1622 1623 // 1. start the iperf3 server on the guest 1624 for n in 0..queue_pairs { 1625 guest.ssh_command(&format!("iperf3 -s -p {} -D", default_port + n))?; 1626 } 1627 1628 thread::sleep(Duration::new(1, 0)); 1629 1630 // 2. start the iperf3 client on host to measure RX through-put 1631 let mut clients = Vec::new(); 1632 for n in 0..queue_pairs { 1633 let mut cmd = Command::new("iperf3"); 1634 cmd.args([ 1635 "-J", // Output in JSON format 1636 "-c", 1637 &guest.network.guest_ip, 1638 "-p", 1639 &format!("{}", default_port + n), 1640 "-t", 1641 &format!("{test_timeout}"), 1642 "-i", 1643 "0", 1644 ]); 1645 // For measuring the guest transmit throughput (as a sender), 1646 // use reverse mode of the iperf3 client on the host 1647 if !receive { 1648 cmd.args(["-R"]); 1649 } 1650 // Use UDP stream to measure packets per second. The bitrate is set to 1651 // 1T to make sure it saturates the link. 1652 if !bandwidth { 1653 cmd.args(["-u", "-b", "1T"]); 1654 } 1655 let client = cmd 1656 .stderr(Stdio::piped()) 1657 .stdout(Stdio::piped()) 1658 .spawn() 1659 .map_err(Error::Spawn)?; 1660 1661 clients.push(client); 1662 } 1663 1664 let mut err: Option<Error> = None; 1665 let mut results = Vec::new(); 1666 let mut failed = false; 1667 for c in clients { 1668 let mut c = c; 1669 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5) { 1670 err = Some(Error::WaitTimeout(e)); 1671 failed = true; 1672 } 1673 1674 if !failed { 1675 // Safe to unwrap as we know the child has terminated successfully 1676 let output = c.wait_with_output().unwrap(); 1677 results.push(parse_iperf3_output(&output.stdout, receive, bandwidth)?); 1678 } else { 1679 let _ = c.kill(); 1680 let output = c.wait_with_output().unwrap(); 1681 println!( 1682 "=============== Client output [Error] ===============\n\n{}\n\n===========end============\n\n", 1683 String::from_utf8_lossy(&output.stdout) 1684 ); 1685 } 1686 } 1687 1688 if let Some(e) = err { 1689 Err(e) 1690 } else { 1691 Ok(results.iter().sum()) 1692 } 1693 } 1694 1695 pub fn parse_ethr_latency_output(output: &[u8]) -> Result<Vec<f64>, Error> { 1696 std::panic::catch_unwind(|| { 1697 let s = String::from_utf8_lossy(output); 1698 let mut latency = Vec::new(); 1699 for l in s.lines() { 1700 let v: Value = serde_json::from_str(l).expect("'ethr' parse error: invalid json line"); 1701 // Skip header/summary lines 1702 if let Some(avg) = v["Avg"].as_str() { 1703 // Assume the latency unit is always "us" 1704 latency.push( 1705 avg.split("us").collect::<Vec<&str>>()[0] 1706 .parse::<f64>() 1707 .expect("'ethr' parse error: invalid 'Avg' entry"), 1708 ); 1709 } 1710 } 1711 1712 assert!( 1713 !latency.is_empty(), 1714 "'ethr' parse error: no valid latency data found" 1715 ); 1716 1717 latency 1718 }) 1719 .map_err(|_| { 1720 eprintln!( 1721 "=== Start ethr output ===\n\n{}\n\n=== End ethr output ===\n\n", 1722 String::from_utf8_lossy(output) 1723 ); 1724 Error::EthrLogParse 1725 }) 1726 } 1727 1728 pub fn measure_virtio_net_latency(guest: &Guest, test_timeout: u32) -> Result<Vec<f64>, Error> { 1729 // copy the 'ethr' tool to the guest image 1730 let ethr_path = "/usr/local/bin/ethr"; 1731 let ethr_remote_path = "/tmp/ethr"; 1732 scp_to_guest( 1733 Path::new(ethr_path), 1734 Path::new(ethr_remote_path), 1735 &guest.network.guest_ip, 1736 //DEFAULT_SSH_RETRIES, 1737 1, 1738 DEFAULT_SSH_TIMEOUT, 1739 )?; 1740 1741 // Start the ethr server on the guest 1742 guest.ssh_command(&format!("{ethr_remote_path} -s &> /dev/null &"))?; 1743 1744 thread::sleep(Duration::new(10, 0)); 1745 1746 // Start the ethr client on the host 1747 let log_file = guest 1748 .tmp_dir 1749 .as_path() 1750 .join("ethr.client.log") 1751 .to_str() 1752 .unwrap() 1753 .to_string(); 1754 let mut c = Command::new(ethr_path) 1755 .args([ 1756 "-c", 1757 &guest.network.guest_ip, 1758 "-t", 1759 "l", 1760 "-o", 1761 &log_file, // file output is JSON format 1762 "-d", 1763 &format!("{test_timeout}s"), 1764 ]) 1765 .stderr(Stdio::piped()) 1766 .stdout(Stdio::piped()) 1767 .spawn() 1768 .map_err(Error::Spawn)?; 1769 1770 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5).map_err(Error::WaitTimeout) 1771 { 1772 let _ = c.kill(); 1773 return Err(e); 1774 } 1775 1776 // Parse the ethr latency test output 1777 let content = fs::read(log_file).map_err(Error::EthrLogFile)?; 1778 parse_ethr_latency_output(&content) 1779 } 1780