1 // Copyright © 2021 Intel Corporation 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 6 #![allow(clippy::undocumented_unsafe_blocks)] 7 8 use std::env; 9 use std::ffi::OsStr; 10 use std::fmt::Display; 11 use std::io; 12 use std::io::{Read, Write}; 13 use std::net::TcpListener; 14 use std::net::TcpStream; 15 use std::os::unix::fs::PermissionsExt; 16 use std::os::unix::io::{AsRawFd, FromRawFd}; 17 use std::path::Path; 18 use std::process::{Child, Command, ExitStatus, Output, Stdio}; 19 use std::str::FromStr; 20 use std::sync::Mutex; 21 use std::thread; 22 use std::time::Duration; 23 use std::{fmt, fs}; 24 25 use once_cell::sync::Lazy; 26 use serde_json::Value; 27 use ssh2::Session; 28 use vmm_sys_util::tempdir::TempDir; 29 use wait_timeout::ChildExt; 30 31 #[derive(Debug)] 32 pub enum WaitTimeoutError { 33 Timedout, 34 ExitStatus, 35 General(std::io::Error), 36 } 37 38 #[derive(Debug)] 39 pub enum Error { 40 Parsing(std::num::ParseIntError), 41 SshCommand(SshCommandError), 42 WaitForBoot(WaitForBootError), 43 EthrLogFile(std::io::Error), 44 EthrLogParse, 45 FioOutputParse, 46 Iperf3Parse, 47 Spawn(std::io::Error), 48 WaitTimeout(WaitTimeoutError), 49 } 50 51 impl From<SshCommandError> for Error { 52 fn from(e: SshCommandError) -> Self { 53 Self::SshCommand(e) 54 } 55 } 56 57 pub struct GuestNetworkConfig { 58 pub guest_ip: String, 59 pub l2_guest_ip1: String, 60 pub l2_guest_ip2: String, 61 pub l2_guest_ip3: String, 62 pub host_ip: String, 63 pub guest_mac: String, 64 pub l2_guest_mac1: String, 65 pub l2_guest_mac2: String, 66 pub l2_guest_mac3: String, 67 pub tcp_listener_port: u16, 68 } 69 70 pub const DEFAULT_TCP_LISTENER_MESSAGE: &str = "booted"; 71 pub const DEFAULT_TCP_LISTENER_PORT: u16 = 8000; 72 pub const DEFAULT_TCP_LISTENER_TIMEOUT: i32 = 120; 73 74 #[derive(Debug)] 75 pub enum WaitForBootError { 76 EpollWait(std::io::Error), 77 Listen(std::io::Error), 78 EpollWaitTimeout, 79 WrongGuestAddr, 80 Accept(std::io::Error), 81 } 82 83 impl GuestNetworkConfig { 84 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), WaitForBootError> { 85 let start = std::time::Instant::now(); 86 // The 'port' is unique per 'GUEST' and listening to wild-card ip avoids retrying on 'TcpListener::bind()' 87 let listen_addr = format!("0.0.0.0:{}", self.tcp_listener_port); 88 let expected_guest_addr = self.guest_ip.as_str(); 89 let mut s = String::new(); 90 let timeout = match custom_timeout { 91 Some(t) => t, 92 None => DEFAULT_TCP_LISTENER_TIMEOUT, 93 }; 94 95 let mut closure = || -> Result<(), WaitForBootError> { 96 let listener = 97 TcpListener::bind(listen_addr.as_str()).map_err(WaitForBootError::Listen)?; 98 listener 99 .set_nonblocking(true) 100 .expect("Cannot set non-blocking for tcp listener"); 101 102 // Reply on epoll w/ timeout to wait for guest connections faithfully 103 let epoll_fd = epoll::create(true).expect("Cannot create epoll fd"); 104 // Use 'File' to enforce closing on 'epoll_fd' 105 let _epoll_file = unsafe { fs::File::from_raw_fd(epoll_fd) }; 106 epoll::ctl( 107 epoll_fd, 108 epoll::ControlOptions::EPOLL_CTL_ADD, 109 listener.as_raw_fd(), 110 epoll::Event::new(epoll::Events::EPOLLIN, 0), 111 ) 112 .expect("Cannot add 'tcp_listener' event to epoll"); 113 let mut events = [epoll::Event::new(epoll::Events::empty(), 0); 1]; 114 loop { 115 let num_events = match epoll::wait(epoll_fd, timeout * 1000_i32, &mut events[..]) { 116 Ok(num_events) => Ok(num_events), 117 Err(e) => match e.raw_os_error() { 118 Some(libc::EAGAIN) | Some(libc::EINTR) => continue, 119 _ => Err(e), 120 }, 121 } 122 .map_err(WaitForBootError::EpollWait)?; 123 if num_events == 0 { 124 return Err(WaitForBootError::EpollWaitTimeout); 125 } 126 break; 127 } 128 129 match listener.accept() { 130 Ok((_, addr)) => { 131 // Make sure the connection is from the expected 'guest_addr' 132 if addr.ip() != std::net::IpAddr::from_str(expected_guest_addr).unwrap() { 133 s = format!( 134 "Expecting the guest ip '{}' while being connected with ip '{}'", 135 expected_guest_addr, 136 addr.ip() 137 ); 138 return Err(WaitForBootError::WrongGuestAddr); 139 } 140 141 Ok(()) 142 } 143 Err(e) => { 144 s = "TcpListener::accept() failed".to_string(); 145 Err(WaitForBootError::Accept(e)) 146 } 147 } 148 }; 149 150 match closure() { 151 Err(e) => { 152 let duration = start.elapsed(); 153 eprintln!( 154 "\n\n==== Start 'wait_vm_boot' (FAILED) ==== \ 155 \n\nduration =\"{duration:?}, timeout = {timeout}s\" \ 156 \nlisten_addr=\"{listen_addr}\" \ 157 \nexpected_guest_addr=\"{expected_guest_addr}\" \ 158 \nmessage=\"{s}\" \ 159 \nerror=\"{e:?}\" \ 160 \n\n==== End 'wait_vm_boot' outout ====\n\n" 161 ); 162 163 Err(e) 164 } 165 Ok(_) => Ok(()), 166 } 167 } 168 } 169 170 pub enum DiskType { 171 OperatingSystem, 172 CloudInit, 173 } 174 175 pub trait DiskConfig { 176 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig); 177 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String; 178 fn disk(&self, disk_type: DiskType) -> Option<String>; 179 } 180 181 #[derive(Clone)] 182 pub struct UbuntuDiskConfig { 183 osdisk_path: String, 184 cloudinit_path: String, 185 image_name: String, 186 } 187 188 impl UbuntuDiskConfig { 189 pub fn new(image_name: String) -> Self { 190 UbuntuDiskConfig { 191 image_name, 192 osdisk_path: String::new(), 193 cloudinit_path: String::new(), 194 } 195 } 196 } 197 198 pub struct WindowsDiskConfig { 199 image_name: String, 200 osdisk_path: String, 201 loopback_device: String, 202 windows_snapshot_cow: String, 203 windows_snapshot: String, 204 } 205 206 impl WindowsDiskConfig { 207 pub fn new(image_name: String) -> Self { 208 WindowsDiskConfig { 209 image_name, 210 osdisk_path: String::new(), 211 loopback_device: String::new(), 212 windows_snapshot_cow: String::new(), 213 windows_snapshot: String::new(), 214 } 215 } 216 } 217 218 impl Drop for WindowsDiskConfig { 219 fn drop(&mut self) { 220 // dmsetup remove windows-snapshot-1 221 std::process::Command::new("dmsetup") 222 .arg("remove") 223 .arg(self.windows_snapshot.as_str()) 224 .output() 225 .expect("Expect removing Windows snapshot with 'dmsetup' to succeed"); 226 227 // dmsetup remove windows-snapshot-cow-1 228 std::process::Command::new("dmsetup") 229 .arg("remove") 230 .arg(self.windows_snapshot_cow.as_str()) 231 .output() 232 .expect("Expect removing Windows snapshot CoW with 'dmsetup' to succeed"); 233 234 // losetup -d <loopback_device> 235 std::process::Command::new("losetup") 236 .args(["-d", self.loopback_device.as_str()]) 237 .output() 238 .expect("Expect removing loopback device to succeed"); 239 } 240 } 241 242 impl DiskConfig for UbuntuDiskConfig { 243 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String { 244 let cloudinit_file_path = 245 String::from(tmp_dir.as_path().join("cloudinit").to_str().unwrap()); 246 247 let cloud_init_directory = tmp_dir.as_path().join("cloud-init").join("ubuntu"); 248 249 fs::create_dir_all(&cloud_init_directory) 250 .expect("Expect creating cloud-init directory to succeed"); 251 252 let source_file_dir = std::env::current_dir() 253 .unwrap() 254 .join("test_data") 255 .join("cloud-init") 256 .join("ubuntu") 257 .join("ci"); 258 259 ["meta-data"].iter().for_each(|x| { 260 rate_limited_copy(source_file_dir.join(x), cloud_init_directory.join(x)) 261 .expect("Expect copying cloud-init meta-data to succeed"); 262 }); 263 264 let mut user_data_string = String::new(); 265 fs::File::open(source_file_dir.join("user-data")) 266 .unwrap() 267 .read_to_string(&mut user_data_string) 268 .expect("Expected reading user-data file to succeed"); 269 user_data_string = user_data_string.replace( 270 "@DEFAULT_TCP_LISTENER_MESSAGE", 271 DEFAULT_TCP_LISTENER_MESSAGE, 272 ); 273 user_data_string = user_data_string.replace("@HOST_IP", &network.host_ip); 274 user_data_string = 275 user_data_string.replace("@TCP_LISTENER_PORT", &network.tcp_listener_port.to_string()); 276 277 fs::File::create(cloud_init_directory.join("user-data")) 278 .unwrap() 279 .write_all(user_data_string.as_bytes()) 280 .expect("Expected writing out user-data to succeed"); 281 282 let mut network_config_string = String::new(); 283 284 fs::File::open(source_file_dir.join("network-config")) 285 .unwrap() 286 .read_to_string(&mut network_config_string) 287 .expect("Expected reading network-config file to succeed"); 288 289 network_config_string = network_config_string.replace("192.168.2.1", &network.host_ip); 290 network_config_string = network_config_string.replace("192.168.2.2", &network.guest_ip); 291 network_config_string = network_config_string.replace("192.168.2.3", &network.l2_guest_ip1); 292 network_config_string = network_config_string.replace("192.168.2.4", &network.l2_guest_ip2); 293 network_config_string = network_config_string.replace("192.168.2.5", &network.l2_guest_ip3); 294 network_config_string = 295 network_config_string.replace("12:34:56:78:90:ab", &network.guest_mac); 296 network_config_string = 297 network_config_string.replace("de:ad:be:ef:12:34", &network.l2_guest_mac1); 298 network_config_string = 299 network_config_string.replace("de:ad:be:ef:34:56", &network.l2_guest_mac2); 300 network_config_string = 301 network_config_string.replace("de:ad:be:ef:56:78", &network.l2_guest_mac3); 302 303 fs::File::create(cloud_init_directory.join("network-config")) 304 .unwrap() 305 .write_all(network_config_string.as_bytes()) 306 .expect("Expected writing out network-config to succeed"); 307 308 std::process::Command::new("mkdosfs") 309 .args(["-n", "CIDATA"]) 310 .args(["-C", cloudinit_file_path.as_str()]) 311 .arg("8192") 312 .output() 313 .expect("Expect creating disk image to succeed"); 314 315 ["user-data", "meta-data", "network-config"] 316 .iter() 317 .for_each(|x| { 318 std::process::Command::new("mcopy") 319 .arg("-o") 320 .args(["-i", cloudinit_file_path.as_str()]) 321 .args(["-s", cloud_init_directory.join(x).to_str().unwrap(), "::"]) 322 .output() 323 .expect("Expect copying files to disk image to succeed"); 324 }); 325 326 cloudinit_file_path 327 } 328 329 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig) { 330 let mut workload_path = dirs::home_dir().unwrap(); 331 workload_path.push("workloads"); 332 333 let mut osdisk_base_path = workload_path; 334 osdisk_base_path.push(&self.image_name); 335 336 let osdisk_path = String::from(tmp_dir.as_path().join("osdisk.img").to_str().unwrap()); 337 let cloudinit_path = self.prepare_cloudinit(tmp_dir, network); 338 339 rate_limited_copy(osdisk_base_path, &osdisk_path) 340 .expect("copying of OS source disk image failed"); 341 342 self.cloudinit_path = cloudinit_path; 343 self.osdisk_path = osdisk_path; 344 } 345 346 fn disk(&self, disk_type: DiskType) -> Option<String> { 347 match disk_type { 348 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 349 DiskType::CloudInit => Some(self.cloudinit_path.clone()), 350 } 351 } 352 } 353 354 impl DiskConfig for WindowsDiskConfig { 355 fn prepare_cloudinit(&self, _tmp_dir: &TempDir, _network: &GuestNetworkConfig) -> String { 356 String::new() 357 } 358 359 fn prepare_files(&mut self, tmp_dir: &TempDir, _network: &GuestNetworkConfig) { 360 let mut workload_path = dirs::home_dir().unwrap(); 361 workload_path.push("workloads"); 362 363 let mut osdisk_path = workload_path; 364 osdisk_path.push(&self.image_name); 365 366 let osdisk_blk_size = fs::metadata(osdisk_path) 367 .expect("Expect retrieving Windows image metadata") 368 .len() 369 >> 9; 370 371 let snapshot_cow_path = 372 String::from(tmp_dir.as_path().join("snapshot_cow").to_str().unwrap()); 373 374 // Create and truncate CoW file for device mapper 375 let cow_file_size: u64 = 1 << 30; 376 let cow_file_blk_size = cow_file_size >> 9; 377 let cow_file = std::fs::File::create(snapshot_cow_path.as_str()) 378 .expect("Expect creating CoW image to succeed"); 379 cow_file 380 .set_len(cow_file_size) 381 .expect("Expect truncating CoW image to succeed"); 382 383 // losetup --find --show /tmp/snapshot_cow 384 let loopback_device = std::process::Command::new("losetup") 385 .arg("--find") 386 .arg("--show") 387 .arg(snapshot_cow_path.as_str()) 388 .output() 389 .expect("Expect creating loopback device from snapshot CoW image to succeed"); 390 391 self.loopback_device = String::from_utf8_lossy(&loopback_device.stdout) 392 .trim() 393 .to_string(); 394 395 let random_extension = tmp_dir.as_path().file_name().unwrap(); 396 let windows_snapshot_cow = format!( 397 "windows-snapshot-cow-{}", 398 random_extension.to_str().unwrap() 399 ); 400 401 // dmsetup create windows-snapshot-cow-1 --table '0 2097152 linear /dev/loop1 0' 402 std::process::Command::new("dmsetup") 403 .arg("create") 404 .arg(windows_snapshot_cow.as_str()) 405 .args([ 406 "--table", 407 format!("0 {} linear {} 0", cow_file_blk_size, self.loopback_device).as_str(), 408 ]) 409 .output() 410 .expect("Expect creating Windows snapshot CoW with 'dmsetup' to succeed"); 411 412 let windows_snapshot = format!("windows-snapshot-{}", random_extension.to_str().unwrap()); 413 414 // dmsetup mknodes 415 std::process::Command::new("dmsetup") 416 .arg("mknodes") 417 .output() 418 .expect("Expect device mapper nodes to be ready"); 419 420 // dmsetup create windows-snapshot-1 --table '0 41943040 snapshot /dev/mapper/windows-base /dev/mapper/windows-snapshot-cow-1 P 8' 421 std::process::Command::new("dmsetup") 422 .arg("create") 423 .arg(windows_snapshot.as_str()) 424 .args([ 425 "--table", 426 format!( 427 "0 {} snapshot /dev/mapper/windows-base /dev/mapper/{} P 8", 428 osdisk_blk_size, 429 windows_snapshot_cow.as_str() 430 ) 431 .as_str(), 432 ]) 433 .output() 434 .expect("Expect creating Windows snapshot with 'dmsetup' to succeed"); 435 436 // dmsetup mknodes 437 std::process::Command::new("dmsetup") 438 .arg("mknodes") 439 .output() 440 .expect("Expect device mapper nodes to be ready"); 441 442 self.osdisk_path = format!("/dev/mapper/{windows_snapshot}"); 443 self.windows_snapshot_cow = windows_snapshot_cow; 444 self.windows_snapshot = windows_snapshot; 445 } 446 447 fn disk(&self, disk_type: DiskType) -> Option<String> { 448 match disk_type { 449 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 450 DiskType::CloudInit => None, 451 } 452 } 453 } 454 455 pub fn rate_limited_copy<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<u64> { 456 for i in 0..10 { 457 let free_bytes = unsafe { 458 let mut stats = std::mem::MaybeUninit::zeroed(); 459 let fs_name = std::ffi::CString::new("/tmp").unwrap(); 460 libc::statvfs(fs_name.as_ptr(), stats.as_mut_ptr()); 461 462 let free_blocks = stats.assume_init().f_bfree; 463 let block_size = stats.assume_init().f_bsize; 464 465 free_blocks * block_size 466 }; 467 468 // Make sure there is at least 6 GiB of space 469 if free_bytes < 6 << 30 { 470 eprintln!("Not enough space on disk ({free_bytes}). Attempt {i} of 10. Sleeping."); 471 thread::sleep(std::time::Duration::new(60, 0)); 472 continue; 473 } 474 475 match fs::copy(&from, &to) { 476 Err(e) => { 477 if let Some(errno) = e.raw_os_error() { 478 if errno == libc::ENOSPC { 479 eprintln!("Copy returned ENOSPC. Attempt {i} of 10. Sleeping."); 480 thread::sleep(std::time::Duration::new(60, 0)); 481 continue; 482 } 483 } 484 return Err(e); 485 } 486 Ok(i) => return Ok(i), 487 } 488 } 489 Err(io::Error::last_os_error()) 490 } 491 492 pub fn handle_child_output( 493 r: Result<(), std::boxed::Box<dyn std::any::Any + std::marker::Send>>, 494 output: &std::process::Output, 495 ) { 496 use std::os::unix::process::ExitStatusExt; 497 if r.is_ok() && output.status.success() { 498 return; 499 } 500 501 match output.status.code() { 502 None => { 503 // Don't treat child.kill() as a problem 504 if output.status.signal() == Some(9) && r.is_ok() { 505 return; 506 } 507 508 eprintln!( 509 "==== child killed by signal: {} ====", 510 output.status.signal().unwrap() 511 ); 512 } 513 Some(code) => { 514 eprintln!("\n\n==== child exit code: {code} ===="); 515 } 516 } 517 518 eprintln!( 519 "\n\n==== Start child stdout ====\n\n{}\n\n==== End child stdout ====", 520 String::from_utf8_lossy(&output.stdout) 521 ); 522 eprintln!( 523 "\n\n==== Start child stderr ====\n\n{}\n\n==== End child stderr ====", 524 String::from_utf8_lossy(&output.stderr) 525 ); 526 527 panic!("Test failed") 528 } 529 530 #[derive(Debug)] 531 pub struct PasswordAuth { 532 pub username: String, 533 pub password: String, 534 } 535 536 pub const DEFAULT_SSH_RETRIES: u8 = 6; 537 pub const DEFAULT_SSH_TIMEOUT: u8 = 10; 538 539 #[derive(Debug)] 540 pub enum SshCommandError { 541 Connection(std::io::Error), 542 Handshake(ssh2::Error), 543 Authentication(ssh2::Error), 544 ChannelSession(ssh2::Error), 545 Command(ssh2::Error), 546 ExitStatus(ssh2::Error), 547 NonZeroExitStatus(i32), 548 FileRead(std::io::Error), 549 FileMetadata(std::io::Error), 550 ScpSend(ssh2::Error), 551 WriteAll(std::io::Error), 552 SendEof(ssh2::Error), 553 WaitEof(ssh2::Error), 554 } 555 556 fn scp_to_guest_with_auth( 557 path: &Path, 558 remote_path: &Path, 559 auth: &PasswordAuth, 560 ip: &str, 561 retries: u8, 562 timeout: u8, 563 ) -> Result<(), SshCommandError> { 564 let mut counter = 0; 565 loop { 566 let closure = || -> Result<(), SshCommandError> { 567 let tcp = 568 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?; 569 let mut sess = Session::new().unwrap(); 570 sess.set_tcp_stream(tcp); 571 sess.handshake().map_err(SshCommandError::Handshake)?; 572 573 sess.userauth_password(&auth.username, &auth.password) 574 .map_err(SshCommandError::Authentication)?; 575 assert!(sess.authenticated()); 576 577 let content = fs::read(path).map_err(SshCommandError::FileRead)?; 578 let mode = fs::metadata(path) 579 .map_err(SshCommandError::FileMetadata)? 580 .permissions() 581 .mode() 582 & 0o777; 583 584 let mut channel = sess 585 .scp_send(remote_path, mode as i32, content.len() as u64, None) 586 .map_err(SshCommandError::ScpSend)?; 587 channel 588 .write_all(&content) 589 .map_err(SshCommandError::WriteAll)?; 590 channel.send_eof().map_err(SshCommandError::SendEof)?; 591 channel.wait_eof().map_err(SshCommandError::WaitEof)?; 592 593 // Intentionally ignore these results here as their failure 594 // does not precipitate a repeat 595 let _ = channel.close(); 596 let _ = channel.wait_close(); 597 598 Ok(()) 599 }; 600 601 match closure() { 602 Ok(_) => break, 603 Err(e) => { 604 counter += 1; 605 if counter >= retries { 606 eprintln!( 607 "\n\n==== Start scp command output (FAILED) ====\n\n\ 608 path =\"{path:?}\"\n\ 609 remote_path =\"{remote_path:?}\"\n\ 610 auth=\"{auth:#?}\"\n\ 611 ip=\"{ip}\"\n\ 612 error=\"{e:?}\"\n\ 613 \n==== End scp command outout ====\n\n" 614 ); 615 616 return Err(e); 617 } 618 } 619 }; 620 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 621 } 622 Ok(()) 623 } 624 625 pub fn scp_to_guest( 626 path: &Path, 627 remote_path: &Path, 628 ip: &str, 629 retries: u8, 630 timeout: u8, 631 ) -> Result<(), SshCommandError> { 632 scp_to_guest_with_auth( 633 path, 634 remote_path, 635 &PasswordAuth { 636 username: String::from("cloud"), 637 password: String::from("cloud123"), 638 }, 639 ip, 640 retries, 641 timeout, 642 ) 643 } 644 645 pub fn ssh_command_ip_with_auth( 646 command: &str, 647 auth: &PasswordAuth, 648 ip: &str, 649 retries: u8, 650 timeout: u8, 651 ) -> Result<String, SshCommandError> { 652 let mut s = String::new(); 653 654 let mut counter = 0; 655 loop { 656 let mut closure = || -> Result<(), SshCommandError> { 657 let tcp = 658 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?; 659 let mut sess = Session::new().unwrap(); 660 sess.set_tcp_stream(tcp); 661 sess.handshake().map_err(SshCommandError::Handshake)?; 662 663 sess.userauth_password(&auth.username, &auth.password) 664 .map_err(SshCommandError::Authentication)?; 665 assert!(sess.authenticated()); 666 667 let mut channel = sess 668 .channel_session() 669 .map_err(SshCommandError::ChannelSession)?; 670 channel.exec(command).map_err(SshCommandError::Command)?; 671 672 // Intentionally ignore these results here as their failure 673 // does not precipitate a repeat 674 let _ = channel.read_to_string(&mut s); 675 let _ = channel.close(); 676 let _ = channel.wait_close(); 677 678 let status = channel.exit_status().map_err(SshCommandError::ExitStatus)?; 679 680 if status != 0 { 681 Err(SshCommandError::NonZeroExitStatus(status)) 682 } else { 683 Ok(()) 684 } 685 }; 686 687 match closure() { 688 Ok(_) => break, 689 Err(e) => { 690 counter += 1; 691 if counter >= retries { 692 eprintln!( 693 "\n\n==== Start ssh command output (FAILED) ====\n\n\ 694 command=\"{command}\"\n\ 695 auth=\"{auth:#?}\"\n\ 696 ip=\"{ip}\"\n\ 697 output=\"{s}\"\n\ 698 error=\"{e:?}\"\n\ 699 \n==== End ssh command outout ====\n\n" 700 ); 701 702 return Err(e); 703 } 704 } 705 }; 706 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 707 } 708 Ok(s) 709 } 710 711 pub fn ssh_command_ip( 712 command: &str, 713 ip: &str, 714 retries: u8, 715 timeout: u8, 716 ) -> Result<String, SshCommandError> { 717 ssh_command_ip_with_auth( 718 command, 719 &PasswordAuth { 720 username: String::from("cloud"), 721 password: String::from("cloud123"), 722 }, 723 ip, 724 retries, 725 timeout, 726 ) 727 } 728 729 pub fn exec_host_command_with_retries(command: &str, retries: u32, interval: Duration) -> bool { 730 for _ in 0..retries { 731 let s = exec_host_command_output(command).status; 732 if !s.success() { 733 eprintln!("\n\n==== retrying in {:?} ===\n\n", interval); 734 thread::sleep(interval); 735 } else { 736 return true; 737 } 738 } 739 740 false 741 } 742 743 pub fn exec_host_command_status(command: &str) -> ExitStatus { 744 exec_host_command_output(command).status 745 } 746 747 pub fn exec_host_command_output(command: &str) -> Output { 748 let output = std::process::Command::new("bash") 749 .args(["-c", command]) 750 .output() 751 .unwrap_or_else(|e| panic!("Expected '{command}' to run. Error: {:?}", e)); 752 753 if !output.status.success() { 754 let stdout = String::from_utf8_lossy(&output.stdout); 755 let stderr = String::from_utf8_lossy(&output.stderr); 756 eprintln!( 757 "\n\n==== Start 'exec_host_command' failed ==== \ 758 \n\n---stdout---\n{stdout}\n---stderr---{stderr} \ 759 \n\n==== End 'exec_host_command' failed ====", 760 ); 761 } 762 763 output 764 } 765 766 pub fn check_lines_count(input: &str, line_count: usize) -> bool { 767 if input.lines().count() == line_count { 768 true 769 } else { 770 eprintln!( 771 "\n\n==== Start 'check_lines_count' failed ==== \ 772 \n\ninput = {input}\nline_count = {line_count} \ 773 \n\n==== End 'check_lines_count' failed ====", 774 ); 775 776 false 777 } 778 } 779 780 pub fn check_matched_lines_count(input: &str, keywords: Vec<&str>, line_count: usize) -> bool { 781 let mut matches = String::new(); 782 for line in input.lines() { 783 if keywords.iter().all(|k| line.contains(k)) { 784 matches += line; 785 } 786 } 787 788 if matches.lines().count() == line_count { 789 true 790 } else { 791 eprintln!( 792 "\n\n==== Start 'check_matched_lines_count' failed ==== \ 793 \nkeywords = {keywords:?}, line_count = {line_count} \ 794 \n\ninput = {input} matches = {matches} \ 795 \n\n==== End 'check_matched_lines_count' failed ====", 796 ); 797 798 false 799 } 800 } 801 802 pub fn kill_child(child: &mut Child) { 803 let r = unsafe { libc::kill(child.id() as i32, libc::SIGTERM) }; 804 if r != 0 { 805 let e = io::Error::last_os_error(); 806 if e.raw_os_error().unwrap() == libc::ESRCH { 807 return; 808 } 809 eprintln!("Failed to kill child with SIGTERM: {e:?}"); 810 } 811 812 // The timeout period elapsed without the child exiting 813 if child.wait_timeout(Duration::new(10, 0)).unwrap().is_none() { 814 let _ = child.kill(); 815 let rust_flags = env::var("RUSTFLAGS").unwrap_or_default(); 816 if rust_flags.contains("-Cinstrument-coverage") { 817 panic!("Wait child timeout, please check the reason.") 818 } 819 } 820 } 821 822 pub const PIPE_SIZE: i32 = 32 << 20; 823 824 static NEXT_VM_ID: Lazy<Mutex<u8>> = Lazy::new(|| Mutex::new(1)); 825 826 pub struct Guest { 827 pub tmp_dir: TempDir, 828 pub disk_config: Box<dyn DiskConfig>, 829 pub network: GuestNetworkConfig, 830 } 831 832 // Safe to implement as we know we have no interior mutability 833 impl std::panic::RefUnwindSafe for Guest {} 834 835 impl Guest { 836 pub fn new_from_ip_range(mut disk_config: Box<dyn DiskConfig>, class: &str, id: u8) -> Self { 837 let tmp_dir = TempDir::new_with_prefix("/tmp/ch").unwrap(); 838 839 let network = GuestNetworkConfig { 840 guest_ip: format!("{class}.{id}.2"), 841 l2_guest_ip1: format!("{class}.{id}.3"), 842 l2_guest_ip2: format!("{class}.{id}.4"), 843 l2_guest_ip3: format!("{class}.{id}.5"), 844 host_ip: format!("{class}.{id}.1"), 845 guest_mac: format!("12:34:56:78:90:{id:02x}"), 846 l2_guest_mac1: format!("de:ad:be:ef:12:{id:02x}"), 847 l2_guest_mac2: format!("de:ad:be:ef:34:{id:02x}"), 848 l2_guest_mac3: format!("de:ad:be:ef:56:{id:02x}"), 849 tcp_listener_port: DEFAULT_TCP_LISTENER_PORT + id as u16, 850 }; 851 852 disk_config.prepare_files(&tmp_dir, &network); 853 854 Guest { 855 tmp_dir, 856 disk_config, 857 network, 858 } 859 } 860 861 pub fn new(disk_config: Box<dyn DiskConfig>) -> Self { 862 let mut guard = NEXT_VM_ID.lock().unwrap(); 863 let id = *guard; 864 *guard = id + 1; 865 866 Self::new_from_ip_range(disk_config, "192.168", id) 867 } 868 869 pub fn default_net_string(&self) -> String { 870 format!( 871 "tap=,mac={},ip={},mask=255.255.255.0", 872 self.network.guest_mac, self.network.host_ip 873 ) 874 } 875 876 pub fn default_net_string_w_iommu(&self) -> String { 877 format!( 878 "tap=,mac={},ip={},mask=255.255.255.0,iommu=on", 879 self.network.guest_mac, self.network.host_ip 880 ) 881 } 882 883 pub fn default_net_string_w_mtu(&self, mtu: u16) -> String { 884 format!( 885 "tap=,mac={},ip={},mask=255.255.255.0,mtu={}", 886 self.network.guest_mac, self.network.host_ip, mtu 887 ) 888 } 889 890 pub fn ssh_command(&self, command: &str) -> Result<String, SshCommandError> { 891 ssh_command_ip( 892 command, 893 &self.network.guest_ip, 894 DEFAULT_SSH_RETRIES, 895 DEFAULT_SSH_TIMEOUT, 896 ) 897 } 898 899 #[cfg(target_arch = "x86_64")] 900 pub fn ssh_command_l1(&self, command: &str) -> Result<String, SshCommandError> { 901 ssh_command_ip( 902 command, 903 &self.network.guest_ip, 904 DEFAULT_SSH_RETRIES, 905 DEFAULT_SSH_TIMEOUT, 906 ) 907 } 908 909 #[cfg(target_arch = "x86_64")] 910 pub fn ssh_command_l2_1(&self, command: &str) -> Result<String, SshCommandError> { 911 ssh_command_ip( 912 command, 913 &self.network.l2_guest_ip1, 914 DEFAULT_SSH_RETRIES, 915 DEFAULT_SSH_TIMEOUT, 916 ) 917 } 918 919 #[cfg(target_arch = "x86_64")] 920 pub fn ssh_command_l2_2(&self, command: &str) -> Result<String, SshCommandError> { 921 ssh_command_ip( 922 command, 923 &self.network.l2_guest_ip2, 924 DEFAULT_SSH_RETRIES, 925 DEFAULT_SSH_TIMEOUT, 926 ) 927 } 928 929 #[cfg(target_arch = "x86_64")] 930 pub fn ssh_command_l2_3(&self, command: &str) -> Result<String, SshCommandError> { 931 ssh_command_ip( 932 command, 933 &self.network.l2_guest_ip3, 934 DEFAULT_SSH_RETRIES, 935 DEFAULT_SSH_TIMEOUT, 936 ) 937 } 938 939 pub fn api_create_body(&self, cpu_count: u8, kernel_path: &str, kernel_cmd: &str) -> String { 940 format! {"{{\"cpus\":{{\"boot_vcpus\":{},\"max_vcpus\":{}}},\"payload\":{{\"kernel\":\"{}\",\"cmdline\": \"{}\"}},\"net\":[{{\"ip\":\"{}\", \"mask\":\"255.255.255.0\", \"mac\":\"{}\"}}], \"disks\":[{{\"path\":\"{}\"}}, {{\"path\":\"{}\"}}]}}", 941 cpu_count, 942 cpu_count, 943 kernel_path, 944 kernel_cmd, 945 self.network.host_ip, 946 self.network.guest_mac, 947 self.disk_config.disk(DiskType::OperatingSystem).unwrap().as_str(), 948 self.disk_config.disk(DiskType::CloudInit).unwrap().as_str(), 949 } 950 } 951 952 pub fn get_cpu_count(&self) -> Result<u32, Error> { 953 self.ssh_command("grep -c processor /proc/cpuinfo")? 954 .trim() 955 .parse() 956 .map_err(Error::Parsing) 957 } 958 959 pub fn get_total_memory(&self) -> Result<u32, Error> { 960 self.ssh_command("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 961 .trim() 962 .parse() 963 .map_err(Error::Parsing) 964 } 965 966 #[cfg(target_arch = "x86_64")] 967 pub fn get_total_memory_l2(&self) -> Result<u32, Error> { 968 self.ssh_command_l2_1("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 969 .trim() 970 .parse() 971 .map_err(Error::Parsing) 972 } 973 974 pub fn get_numa_node_memory(&self, node_id: usize) -> Result<u32, Error> { 975 self.ssh_command( 976 format!( 977 "grep MemTotal /sys/devices/system/node/node{node_id}/meminfo \ 978 | cut -d \":\" -f 2 | grep -o \"[0-9]*\"" 979 ) 980 .as_str(), 981 )? 982 .trim() 983 .parse() 984 .map_err(Error::Parsing) 985 } 986 987 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), Error> { 988 self.network 989 .wait_vm_boot(custom_timeout) 990 .map_err(Error::WaitForBoot) 991 } 992 993 pub fn check_numa_node_cpus(&self, node_id: usize, cpus: Vec<usize>) -> Result<(), Error> { 994 for cpu in cpus.iter() { 995 let cmd = format!("[ -d \"/sys/devices/system/node/node{node_id}/cpu{cpu}\" ]"); 996 self.ssh_command(cmd.as_str())?; 997 } 998 999 Ok(()) 1000 } 1001 1002 pub fn check_numa_node_distances( 1003 &self, 1004 node_id: usize, 1005 distances: &str, 1006 ) -> Result<bool, Error> { 1007 let cmd = format!("cat /sys/devices/system/node/node{node_id}/distance"); 1008 if self.ssh_command(cmd.as_str())?.trim() == distances { 1009 Ok(true) 1010 } else { 1011 Ok(false) 1012 } 1013 } 1014 1015 pub fn check_numa_common( 1016 &self, 1017 mem_ref: Option<&[u32]>, 1018 node_ref: Option<&[Vec<usize>]>, 1019 distance_ref: Option<&[&str]>, 1020 ) { 1021 if let Some(mem_ref) = mem_ref { 1022 // Check each NUMA node has been assigned the right amount of 1023 // memory. 1024 for (i, &m) in mem_ref.iter().enumerate() { 1025 assert!(self.get_numa_node_memory(i).unwrap_or_default() > m); 1026 } 1027 } 1028 1029 if let Some(node_ref) = node_ref { 1030 // Check each NUMA node has been assigned the right CPUs set. 1031 for (i, n) in node_ref.iter().enumerate() { 1032 self.check_numa_node_cpus(i, n.clone()).unwrap(); 1033 } 1034 } 1035 1036 if let Some(distance_ref) = distance_ref { 1037 // Check each NUMA node has been assigned the right distances. 1038 for (i, &d) in distance_ref.iter().enumerate() { 1039 assert!(self.check_numa_node_distances(i, d).unwrap()); 1040 } 1041 } 1042 } 1043 1044 #[cfg(target_arch = "x86_64")] 1045 pub fn check_sgx_support(&self) -> Result<(), Error> { 1046 self.ssh_command( 1047 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX: \ 1048 Software Guard Extensions supported = true'", 1049 )?; 1050 self.ssh_command( 1051 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX_LC: \ 1052 SGX launch config supported = true'", 1053 )?; 1054 self.ssh_command( 1055 "cpuid -l 0x12 -s 0 | tr -s [:space:] | grep -q 'SGX1 \ 1056 supported = true'", 1057 )?; 1058 1059 Ok(()) 1060 } 1061 1062 pub fn get_pci_bridge_class(&self) -> Result<String, Error> { 1063 Ok(self 1064 .ssh_command("cat /sys/bus/pci/devices/0000:00:00.0/class")? 1065 .trim() 1066 .to_string()) 1067 } 1068 1069 pub fn get_pci_device_ids(&self) -> Result<String, Error> { 1070 Ok(self 1071 .ssh_command("cat /sys/bus/pci/devices/*/device")? 1072 .trim() 1073 .to_string()) 1074 } 1075 1076 pub fn get_pci_vendor_ids(&self) -> Result<String, Error> { 1077 Ok(self 1078 .ssh_command("cat /sys/bus/pci/devices/*/vendor")? 1079 .trim() 1080 .to_string()) 1081 } 1082 1083 pub fn does_device_vendor_pair_match( 1084 &self, 1085 device_id: &str, 1086 vendor_id: &str, 1087 ) -> Result<bool, Error> { 1088 // We are checking if console device's device id and vendor id pair matches 1089 let devices = self.get_pci_device_ids()?; 1090 let devices: Vec<&str> = devices.split('\n').collect(); 1091 let vendors = self.get_pci_vendor_ids()?; 1092 let vendors: Vec<&str> = vendors.split('\n').collect(); 1093 1094 for (index, d_id) in devices.iter().enumerate() { 1095 if *d_id == device_id { 1096 if let Some(v_id) = vendors.get(index) { 1097 if *v_id == vendor_id { 1098 return Ok(true); 1099 } 1100 } 1101 } 1102 } 1103 1104 Ok(false) 1105 } 1106 1107 pub fn check_vsock(&self, socket: &str) { 1108 // Listen from guest on vsock CID=3 PORT=16 1109 // SOCKET-LISTEN:<domain>:<protocol>:<local-address> 1110 let guest_ip = self.network.guest_ip.clone(); 1111 let listen_socat = thread::spawn(move || { 1112 ssh_command_ip("sudo socat - SOCKET-LISTEN:40:0:x00x00x10x00x00x00x03x00x00x00x00x00x00x00 > vsock_log", &guest_ip, DEFAULT_SSH_RETRIES, DEFAULT_SSH_TIMEOUT).unwrap(); 1113 }); 1114 1115 // Make sure socat is listening, which might take a few second on slow systems 1116 thread::sleep(std::time::Duration::new(10, 0)); 1117 1118 // Write something to vsock from the host 1119 assert!(exec_host_command_status(&format!( 1120 "echo -e \"CONNECT 16\\nHelloWorld!\" | socat - UNIX-CONNECT:{socket}" 1121 )) 1122 .success()); 1123 1124 // Wait for the thread to terminate. 1125 listen_socat.join().unwrap(); 1126 1127 assert_eq!( 1128 self.ssh_command("cat vsock_log").unwrap().trim(), 1129 "HelloWorld!" 1130 ); 1131 } 1132 1133 #[cfg(target_arch = "x86_64")] 1134 pub fn check_nvidia_gpu(&self) { 1135 assert!(self.ssh_command("nvidia-smi").unwrap().contains("Tesla T4")); 1136 } 1137 1138 pub fn reboot_linux(&self, current_reboot_count: u32, custom_timeout: Option<i32>) { 1139 let list_boots_cmd = "sudo last | grep -c reboot"; 1140 let boot_count = self 1141 .ssh_command(list_boots_cmd) 1142 .unwrap() 1143 .trim() 1144 .parse::<u32>() 1145 .unwrap_or_default(); 1146 1147 assert_eq!(boot_count, current_reboot_count + 1); 1148 self.ssh_command("sudo reboot").unwrap(); 1149 1150 self.wait_vm_boot(custom_timeout).unwrap(); 1151 let boot_count = self 1152 .ssh_command(list_boots_cmd) 1153 .unwrap() 1154 .trim() 1155 .parse::<u32>() 1156 .unwrap_or_default(); 1157 assert_eq!(boot_count, current_reboot_count + 2); 1158 } 1159 1160 pub fn enable_memory_hotplug(&self) { 1161 self.ssh_command("echo online | sudo tee /sys/devices/system/memory/auto_online_blocks") 1162 .unwrap(); 1163 } 1164 1165 pub fn check_devices_common( 1166 &self, 1167 socket: Option<&String>, 1168 console_text: Option<&String>, 1169 pmem_path: Option<&String>, 1170 ) { 1171 // Check block devices are readable 1172 self.ssh_command("sudo dd if=/dev/vda of=/dev/null bs=1M iflag=direct count=1024") 1173 .unwrap(); 1174 self.ssh_command("sudo dd if=/dev/vdb of=/dev/null bs=1M iflag=direct count=8") 1175 .unwrap(); 1176 // Check if the rng device is readable 1177 self.ssh_command("sudo head -c 1000 /dev/hwrng > /dev/null") 1178 .unwrap(); 1179 // Check vsock 1180 if let Some(socket) = socket { 1181 self.check_vsock(socket.as_str()); 1182 } 1183 // Check if the console is usable 1184 if let Some(console_text) = console_text { 1185 let console_cmd = format!("echo {console_text} | sudo tee /dev/hvc0"); 1186 self.ssh_command(&console_cmd).unwrap(); 1187 } 1188 // The net device is 'automatically' exercised through the above 'ssh' commands 1189 1190 // Check if the pmem device is usable 1191 if let Some(pmem_path) = pmem_path { 1192 assert_eq!( 1193 self.ssh_command(&format!("ls {pmem_path}")).unwrap().trim(), 1194 pmem_path 1195 ); 1196 assert_eq!( 1197 self.ssh_command(&format!("sudo mount {pmem_path} /mnt")) 1198 .unwrap(), 1199 "" 1200 ); 1201 assert_eq!(self.ssh_command("ls /mnt").unwrap(), "lost+found\n"); 1202 self.ssh_command("echo test123 | sudo tee /mnt/test") 1203 .unwrap(); 1204 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1205 assert_eq!(self.ssh_command("ls /mnt").unwrap(), ""); 1206 1207 assert_eq!( 1208 self.ssh_command(&format!("sudo mount {pmem_path} /mnt")) 1209 .unwrap(), 1210 "" 1211 ); 1212 assert_eq!( 1213 self.ssh_command("sudo cat /mnt/test || true") 1214 .unwrap() 1215 .trim(), 1216 "test123" 1217 ); 1218 self.ssh_command("sudo rm /mnt/test").unwrap(); 1219 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1220 } 1221 } 1222 } 1223 1224 pub enum VerbosityLevel { 1225 Warn, 1226 Info, 1227 Debug, 1228 } 1229 1230 impl Default for VerbosityLevel { 1231 fn default() -> Self { 1232 Self::Warn 1233 } 1234 } 1235 1236 impl Display for VerbosityLevel { 1237 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1238 use VerbosityLevel::*; 1239 match self { 1240 Warn => (), 1241 Info => write!(f, "-v")?, 1242 Debug => write!(f, "-vv")?, 1243 } 1244 Ok(()) 1245 } 1246 } 1247 1248 pub struct GuestCommand<'a> { 1249 command: Command, 1250 guest: &'a Guest, 1251 capture_output: bool, 1252 print_cmd: bool, 1253 verbosity: VerbosityLevel, 1254 } 1255 1256 impl<'a> GuestCommand<'a> { 1257 pub fn new(guest: &'a Guest) -> Self { 1258 Self::new_with_binary_path(guest, &clh_command("cloud-hypervisor")) 1259 } 1260 1261 pub fn new_with_binary_path(guest: &'a Guest, binary_path: &str) -> Self { 1262 Self { 1263 command: Command::new(binary_path), 1264 guest, 1265 capture_output: false, 1266 print_cmd: true, 1267 verbosity: VerbosityLevel::Info, 1268 } 1269 } 1270 1271 pub fn verbosity(&mut self, verbosity: VerbosityLevel) -> &mut Self { 1272 self.verbosity = verbosity; 1273 self 1274 } 1275 1276 pub fn capture_output(&mut self) -> &mut Self { 1277 self.capture_output = true; 1278 self 1279 } 1280 1281 pub fn set_print_cmd(&mut self, print_cmd: bool) -> &mut Self { 1282 self.print_cmd = print_cmd; 1283 self 1284 } 1285 1286 pub fn spawn(&mut self) -> io::Result<Child> { 1287 use VerbosityLevel::*; 1288 match &self.verbosity { 1289 Warn => {} 1290 Info => { 1291 self.command.arg("-v"); 1292 } 1293 Debug => { 1294 self.command.args(["-vv"]); 1295 } 1296 }; 1297 1298 if self.print_cmd { 1299 println!( 1300 "\n\n==== Start cloud-hypervisor command-line ====\n\n\ 1301 {:?}\n\ 1302 \n==== End cloud-hypervisor command-line ====\n\n", 1303 self.command 1304 ); 1305 } 1306 1307 if self.capture_output { 1308 let child = self 1309 .command 1310 .stderr(Stdio::piped()) 1311 .stdout(Stdio::piped()) 1312 .spawn() 1313 .unwrap(); 1314 1315 let fd = child.stdout.as_ref().unwrap().as_raw_fd(); 1316 let pipesize = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1317 if pipesize == -1 { 1318 return Err(io::Error::last_os_error()); 1319 } 1320 let fd = child.stderr.as_ref().unwrap().as_raw_fd(); 1321 let pipesize1 = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1322 if pipesize1 == -1 { 1323 return Err(io::Error::last_os_error()); 1324 } 1325 1326 if pipesize >= PIPE_SIZE && pipesize1 >= PIPE_SIZE { 1327 Ok(child) 1328 } else { 1329 Err(std::io::Error::new( 1330 std::io::ErrorKind::Other, 1331 format!( 1332 "resizing pipe w/ 'fnctl' failed: stdout pipesize {pipesize}, stderr pipesize {pipesize1}" 1333 ), 1334 )) 1335 } 1336 } else { 1337 self.command.spawn() 1338 } 1339 } 1340 1341 pub fn args<I, S>(&mut self, args: I) -> &mut Self 1342 where 1343 I: IntoIterator<Item = S>, 1344 S: AsRef<OsStr>, 1345 { 1346 self.command.args(args); 1347 self 1348 } 1349 1350 pub fn default_disks(&mut self) -> &mut Self { 1351 if self.guest.disk_config.disk(DiskType::CloudInit).is_some() { 1352 self.args([ 1353 "--disk", 1354 format!( 1355 "path={}", 1356 self.guest 1357 .disk_config 1358 .disk(DiskType::OperatingSystem) 1359 .unwrap() 1360 ) 1361 .as_str(), 1362 format!( 1363 "path={}", 1364 self.guest.disk_config.disk(DiskType::CloudInit).unwrap() 1365 ) 1366 .as_str(), 1367 ]) 1368 } else { 1369 self.args([ 1370 "--disk", 1371 format!( 1372 "path={}", 1373 self.guest 1374 .disk_config 1375 .disk(DiskType::OperatingSystem) 1376 .unwrap() 1377 ) 1378 .as_str(), 1379 ]) 1380 } 1381 } 1382 1383 pub fn default_net(&mut self) -> &mut Self { 1384 self.args(["--net", self.guest.default_net_string().as_str()]) 1385 } 1386 } 1387 1388 pub fn clh_command(cmd: &str) -> String { 1389 env::var("BUILD_TARGET").map_or( 1390 format!("target/x86_64-unknown-linux-gnu/release/{cmd}"), 1391 |target| format!("target/{target}/release/{cmd}"), 1392 ) 1393 } 1394 1395 pub fn parse_iperf3_output(output: &[u8], sender: bool, bandwidth: bool) -> Result<f64, Error> { 1396 std::panic::catch_unwind(|| { 1397 let s = String::from_utf8_lossy(output); 1398 let v: Value = serde_json::from_str(&s).expect("'iperf3' parse error: invalid json output"); 1399 1400 if bandwidth { 1401 if sender { 1402 v["end"]["sum_sent"]["bits_per_second"] 1403 .as_f64() 1404 .expect("'iperf3' parse error: missing entry 'end.sum_sent.bits_per_second'") 1405 } else { 1406 v["end"]["sum_received"]["bits_per_second"].as_f64().expect( 1407 "'iperf3' parse error: missing entry 'end.sum_received.bits_per_second'", 1408 ) 1409 } 1410 } else { 1411 // iperf does not distinguish sent vs received in this case. 1412 1413 let lost_packets = v["end"]["sum"]["lost_packets"] 1414 .as_f64() 1415 .expect("'iperf3' parse error: missing entry 'end.sum.lost_packets'"); 1416 let packets = v["end"]["sum"]["packets"] 1417 .as_f64() 1418 .expect("'iperf3' parse error: missing entry 'end.sum.packets'"); 1419 let seconds = v["end"]["sum"]["seconds"] 1420 .as_f64() 1421 .expect("'iperf3' parse error: missing entry 'end.sum.seconds'"); 1422 1423 (packets - lost_packets) / seconds 1424 } 1425 }) 1426 .map_err(|_| { 1427 eprintln!( 1428 "==== Start iperf3 output ===\n\n{}\n\n=== End iperf3 output ===\n\n", 1429 String::from_utf8_lossy(output) 1430 ); 1431 Error::Iperf3Parse 1432 }) 1433 } 1434 1435 #[derive(Clone)] 1436 pub enum FioOps { 1437 Read, 1438 RandomRead, 1439 Write, 1440 RandomWrite, 1441 ReadWrite, 1442 RandRW, 1443 } 1444 1445 impl fmt::Display for FioOps { 1446 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1447 match self { 1448 FioOps::Read => write!(f, "read"), 1449 FioOps::RandomRead => write!(f, "randread"), 1450 FioOps::Write => write!(f, "write"), 1451 FioOps::RandomWrite => write!(f, "randwrite"), 1452 FioOps::ReadWrite => write!(f, "rw"), 1453 FioOps::RandRW => write!(f, "randrw"), 1454 } 1455 } 1456 } 1457 1458 pub fn parse_fio_output(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1459 std::panic::catch_unwind(|| { 1460 let v: Value = 1461 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1462 let jobs = v["jobs"] 1463 .as_array() 1464 .expect("'fio' parse error: missing entry 'jobs'"); 1465 assert_eq!( 1466 jobs.len(), 1467 num_jobs as usize, 1468 "'fio' parse error: Unexpected number of 'fio' jobs." 1469 ); 1470 1471 let (read, write) = match fio_ops { 1472 FioOps::Read | FioOps::RandomRead => (true, false), 1473 FioOps::Write | FioOps::RandomWrite => (false, true), 1474 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1475 }; 1476 1477 let mut total_bps = 0_f64; 1478 for j in jobs { 1479 if read { 1480 let bytes = j["read"]["io_bytes"] 1481 .as_u64() 1482 .expect("'fio' parse error: missing entry 'read.io_bytes'"); 1483 let runtime = j["read"]["runtime"] 1484 .as_u64() 1485 .expect("'fio' parse error: missing entry 'read.runtime'") 1486 as f64 1487 / 1000_f64; 1488 total_bps += bytes as f64 / runtime; 1489 } 1490 if write { 1491 let bytes = j["write"]["io_bytes"] 1492 .as_u64() 1493 .expect("'fio' parse error: missing entry 'write.io_bytes'"); 1494 let runtime = j["write"]["runtime"] 1495 .as_u64() 1496 .expect("'fio' parse error: missing entry 'write.runtime'") 1497 as f64 1498 / 1000_f64; 1499 total_bps += bytes as f64 / runtime; 1500 } 1501 } 1502 1503 total_bps 1504 }) 1505 .map_err(|_| { 1506 eprintln!("=== Start Fio output ===\n\n{output}\n\n=== End Fio output ===\n\n"); 1507 Error::FioOutputParse 1508 }) 1509 } 1510 1511 pub fn parse_fio_output_iops(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1512 std::panic::catch_unwind(|| { 1513 let v: Value = 1514 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1515 let jobs = v["jobs"] 1516 .as_array() 1517 .expect("'fio' parse error: missing entry 'jobs'"); 1518 assert_eq!( 1519 jobs.len(), 1520 num_jobs as usize, 1521 "'fio' parse error: Unexpected number of 'fio' jobs." 1522 ); 1523 1524 let (read, write) = match fio_ops { 1525 FioOps::Read | FioOps::RandomRead => (true, false), 1526 FioOps::Write | FioOps::RandomWrite => (false, true), 1527 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1528 }; 1529 1530 let mut total_iops = 0_f64; 1531 for j in jobs { 1532 if read { 1533 let ios = j["read"]["total_ios"] 1534 .as_u64() 1535 .expect("'fio' parse error: missing entry 'read.total_ios'"); 1536 let runtime = j["read"]["runtime"] 1537 .as_u64() 1538 .expect("'fio' parse error: missing entry 'read.runtime'") 1539 as f64 1540 / 1000_f64; 1541 total_iops += ios as f64 / runtime; 1542 } 1543 if write { 1544 let ios = j["write"]["total_ios"] 1545 .as_u64() 1546 .expect("'fio' parse error: missing entry 'write.total_ios'"); 1547 let runtime = j["write"]["runtime"] 1548 .as_u64() 1549 .expect("'fio' parse error: missing entry 'write.runtime'") 1550 as f64 1551 / 1000_f64; 1552 total_iops += ios as f64 / runtime; 1553 } 1554 } 1555 1556 total_iops 1557 }) 1558 .map_err(|_| { 1559 eprintln!("=== Start Fio output ===\n\n{output}\n\n=== End Fio output ===\n\n"); 1560 Error::FioOutputParse 1561 }) 1562 } 1563 1564 // Wait the child process for a given timeout 1565 fn child_wait_timeout(child: &mut Child, timeout: u64) -> Result<(), WaitTimeoutError> { 1566 match child.wait_timeout(Duration::from_secs(timeout)) { 1567 Err(e) => { 1568 return Err(WaitTimeoutError::General(e)); 1569 } 1570 Ok(s) => match s { 1571 None => { 1572 return Err(WaitTimeoutError::Timedout); 1573 } 1574 Some(s) => { 1575 if !s.success() { 1576 return Err(WaitTimeoutError::ExitStatus); 1577 } 1578 } 1579 }, 1580 } 1581 1582 Ok(()) 1583 } 1584 1585 pub fn measure_virtio_net_throughput( 1586 test_timeout: u32, 1587 queue_pairs: u32, 1588 guest: &Guest, 1589 receive: bool, 1590 bandwidth: bool, 1591 ) -> Result<f64, Error> { 1592 let default_port = 5201; 1593 1594 // 1. start the iperf3 server on the guest 1595 for n in 0..queue_pairs { 1596 guest.ssh_command(&format!("iperf3 -s -p {} -D", default_port + n))?; 1597 } 1598 1599 thread::sleep(Duration::new(1, 0)); 1600 1601 // 2. start the iperf3 client on host to measure RX through-put 1602 let mut clients = Vec::new(); 1603 for n in 0..queue_pairs { 1604 let mut cmd = Command::new("iperf3"); 1605 cmd.args([ 1606 "-J", // Output in JSON format 1607 "-c", 1608 &guest.network.guest_ip, 1609 "-p", 1610 &format!("{}", default_port + n), 1611 "-t", 1612 &format!("{test_timeout}"), 1613 "-i", 1614 "0", 1615 ]); 1616 // For measuring the guest transmit throughput (as a sender), 1617 // use reverse mode of the iperf3 client on the host 1618 if !receive { 1619 cmd.args(["-R"]); 1620 } 1621 // Use UDP stream to measure packets per second. The bitrate is set to 1622 // 1T to make sure it saturates the link. 1623 if !bandwidth { 1624 cmd.args(["-u", "-b", "1T"]); 1625 } 1626 let client = cmd 1627 .stderr(Stdio::piped()) 1628 .stdout(Stdio::piped()) 1629 .spawn() 1630 .map_err(Error::Spawn)?; 1631 1632 clients.push(client); 1633 } 1634 1635 let mut err: Option<Error> = None; 1636 let mut results = Vec::new(); 1637 let mut failed = false; 1638 for c in clients { 1639 let mut c = c; 1640 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5) { 1641 err = Some(Error::WaitTimeout(e)); 1642 failed = true; 1643 } 1644 1645 if !failed { 1646 // Safe to unwrap as we know the child has terminated successfully 1647 let output = c.wait_with_output().unwrap(); 1648 results.push(parse_iperf3_output(&output.stdout, receive, bandwidth)?); 1649 } else { 1650 let _ = c.kill(); 1651 let output = c.wait_with_output().unwrap(); 1652 println!( 1653 "=============== Client output [Error] ===============\n\n{}\n\n===========end============\n\n", 1654 String::from_utf8_lossy(&output.stdout) 1655 ); 1656 } 1657 } 1658 1659 if let Some(e) = err { 1660 Err(e) 1661 } else { 1662 Ok(results.iter().sum()) 1663 } 1664 } 1665 1666 pub fn parse_ethr_latency_output(output: &[u8]) -> Result<Vec<f64>, Error> { 1667 std::panic::catch_unwind(|| { 1668 let s = String::from_utf8_lossy(output); 1669 let mut latency = Vec::new(); 1670 for l in s.lines() { 1671 let v: Value = serde_json::from_str(l).expect("'ethr' parse error: invalid json line"); 1672 // Skip header/summary lines 1673 if let Some(avg) = v["Avg"].as_str() { 1674 // Assume the latency unit is always "us" 1675 latency.push( 1676 avg.split("us").collect::<Vec<&str>>()[0] 1677 .parse::<f64>() 1678 .expect("'ethr' parse error: invalid 'Avg' entry"), 1679 ); 1680 } 1681 } 1682 1683 assert!( 1684 !latency.is_empty(), 1685 "'ethr' parse error: no valid latency data found" 1686 ); 1687 1688 latency 1689 }) 1690 .map_err(|_| { 1691 eprintln!( 1692 "=== Start ethr output ===\n\n{}\n\n=== End ethr output ===\n\n", 1693 String::from_utf8_lossy(output) 1694 ); 1695 Error::EthrLogParse 1696 }) 1697 } 1698 1699 pub fn measure_virtio_net_latency(guest: &Guest, test_timeout: u32) -> Result<Vec<f64>, Error> { 1700 // copy the 'ethr' tool to the guest image 1701 let ethr_path = "/usr/local/bin/ethr"; 1702 let ethr_remote_path = "/tmp/ethr"; 1703 scp_to_guest( 1704 Path::new(ethr_path), 1705 Path::new(ethr_remote_path), 1706 &guest.network.guest_ip, 1707 //DEFAULT_SSH_RETRIES, 1708 1, 1709 DEFAULT_SSH_TIMEOUT, 1710 )?; 1711 1712 // Start the ethr server on the guest 1713 guest.ssh_command(&format!("{ethr_remote_path} -s &> /dev/null &"))?; 1714 1715 thread::sleep(Duration::new(10, 0)); 1716 1717 // Start the ethr client on the host 1718 let log_file = guest 1719 .tmp_dir 1720 .as_path() 1721 .join("ethr.client.log") 1722 .to_str() 1723 .unwrap() 1724 .to_string(); 1725 let mut c = Command::new(ethr_path) 1726 .args([ 1727 "-c", 1728 &guest.network.guest_ip, 1729 "-t", 1730 "l", 1731 "-o", 1732 &log_file, // file output is JSON format 1733 "-d", 1734 &format!("{test_timeout}s"), 1735 ]) 1736 .stderr(Stdio::piped()) 1737 .stdout(Stdio::piped()) 1738 .spawn() 1739 .map_err(Error::Spawn)?; 1740 1741 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5).map_err(Error::WaitTimeout) 1742 { 1743 let _ = c.kill(); 1744 return Err(e); 1745 } 1746 1747 // Parse the ethr latency test output 1748 let content = fs::read(log_file).map_err(Error::EthrLogFile)?; 1749 parse_ethr_latency_output(&content) 1750 } 1751