1 // Copyright © 2021 Intel Corporation 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 6 #![allow(clippy::undocumented_unsafe_blocks)] 7 8 use once_cell::sync::Lazy; 9 use serde_json::Value; 10 use ssh2::Session; 11 use std::env; 12 use std::ffi::OsStr; 13 use std::fmt::Display; 14 use std::io; 15 use std::io::{Read, Write}; 16 use std::net::TcpListener; 17 use std::net::TcpStream; 18 use std::os::unix::fs::PermissionsExt; 19 use std::os::unix::io::{AsRawFd, FromRawFd}; 20 use std::path::Path; 21 use std::process::{Child, Command, ExitStatus, Output, Stdio}; 22 use std::str::FromStr; 23 use std::sync::Mutex; 24 use std::thread; 25 use std::time::Duration; 26 use std::{fmt, fs}; 27 use vmm_sys_util::tempdir::TempDir; 28 use wait_timeout::ChildExt; 29 30 #[derive(Debug)] 31 pub enum WaitTimeoutError { 32 Timedout, 33 ExitStatus, 34 General(std::io::Error), 35 } 36 37 #[derive(Debug)] 38 pub enum Error { 39 Parsing(std::num::ParseIntError), 40 SshCommand(SshCommandError), 41 WaitForBoot(WaitForBootError), 42 EthrLogFile(std::io::Error), 43 EthrLogParse, 44 FioOutputParse, 45 Iperf3Parse, 46 Spawn(std::io::Error), 47 WaitTimeout(WaitTimeoutError), 48 } 49 50 impl From<SshCommandError> for Error { 51 fn from(e: SshCommandError) -> Self { 52 Self::SshCommand(e) 53 } 54 } 55 56 pub struct GuestNetworkConfig { 57 pub guest_ip: String, 58 pub l2_guest_ip1: String, 59 pub l2_guest_ip2: String, 60 pub l2_guest_ip3: String, 61 pub host_ip: String, 62 pub guest_mac: String, 63 pub l2_guest_mac1: String, 64 pub l2_guest_mac2: String, 65 pub l2_guest_mac3: String, 66 pub tcp_listener_port: u16, 67 } 68 69 pub const DEFAULT_TCP_LISTENER_MESSAGE: &str = "booted"; 70 pub const DEFAULT_TCP_LISTENER_PORT: u16 = 8000; 71 pub const DEFAULT_TCP_LISTENER_TIMEOUT: i32 = 120; 72 73 #[derive(Debug)] 74 pub enum WaitForBootError { 75 EpollWait(std::io::Error), 76 Listen(std::io::Error), 77 EpollWaitTimeout, 78 WrongGuestAddr, 79 Accept(std::io::Error), 80 } 81 82 impl GuestNetworkConfig { 83 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), WaitForBootError> { 84 let start = std::time::Instant::now(); 85 // The 'port' is unique per 'GUEST' and listening to wild-card ip avoids retrying on 'TcpListener::bind()' 86 let listen_addr = format!("0.0.0.0:{}", self.tcp_listener_port); 87 let expected_guest_addr = self.guest_ip.as_str(); 88 let mut s = String::new(); 89 let timeout = match custom_timeout { 90 Some(t) => t, 91 None => DEFAULT_TCP_LISTENER_TIMEOUT, 92 }; 93 94 let mut closure = || -> Result<(), WaitForBootError> { 95 let listener = 96 TcpListener::bind(listen_addr.as_str()).map_err(WaitForBootError::Listen)?; 97 listener 98 .set_nonblocking(true) 99 .expect("Cannot set non-blocking for tcp listener"); 100 101 // Reply on epoll w/ timeout to wait for guest connections faithfully 102 let epoll_fd = epoll::create(true).expect("Cannot create epoll fd"); 103 // Use 'File' to enforce closing on 'epoll_fd' 104 let _epoll_file = unsafe { fs::File::from_raw_fd(epoll_fd) }; 105 epoll::ctl( 106 epoll_fd, 107 epoll::ControlOptions::EPOLL_CTL_ADD, 108 listener.as_raw_fd(), 109 epoll::Event::new(epoll::Events::EPOLLIN, 0), 110 ) 111 .expect("Cannot add 'tcp_listener' event to epoll"); 112 let mut events = [epoll::Event::new(epoll::Events::empty(), 0); 1]; 113 loop { 114 let num_events = match epoll::wait(epoll_fd, timeout * 1000_i32, &mut events[..]) { 115 Ok(num_events) => Ok(num_events), 116 Err(e) => match e.raw_os_error() { 117 Some(libc::EAGAIN) | Some(libc::EINTR) => continue, 118 _ => Err(e), 119 }, 120 } 121 .map_err(WaitForBootError::EpollWait)?; 122 if num_events == 0 { 123 return Err(WaitForBootError::EpollWaitTimeout); 124 } 125 break; 126 } 127 128 match listener.accept() { 129 Ok((_, addr)) => { 130 // Make sure the connection is from the expected 'guest_addr' 131 if addr.ip() != std::net::IpAddr::from_str(expected_guest_addr).unwrap() { 132 s = format!( 133 "Expecting the guest ip '{}' while being connected with ip '{}'", 134 expected_guest_addr, 135 addr.ip() 136 ); 137 return Err(WaitForBootError::WrongGuestAddr); 138 } 139 140 Ok(()) 141 } 142 Err(e) => { 143 s = "TcpListener::accept() failed".to_string(); 144 Err(WaitForBootError::Accept(e)) 145 } 146 } 147 }; 148 149 match closure() { 150 Err(e) => { 151 let duration = start.elapsed(); 152 eprintln!( 153 "\n\n==== Start 'wait_vm_boot' (FAILED) ==== \ 154 \n\nduration =\"{duration:?}, timeout = {timeout}s\" \ 155 \nlisten_addr=\"{listen_addr}\" \ 156 \nexpected_guest_addr=\"{expected_guest_addr}\" \ 157 \nmessage=\"{s}\" \ 158 \nerror=\"{e:?}\" \ 159 \n\n==== End 'wait_vm_boot' outout ====\n\n" 160 ); 161 162 Err(e) 163 } 164 Ok(_) => Ok(()), 165 } 166 } 167 } 168 169 pub enum DiskType { 170 OperatingSystem, 171 CloudInit, 172 } 173 174 pub trait DiskConfig { 175 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig); 176 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String; 177 fn disk(&self, disk_type: DiskType) -> Option<String>; 178 } 179 180 #[derive(Clone)] 181 pub struct UbuntuDiskConfig { 182 osdisk_path: String, 183 cloudinit_path: String, 184 image_name: String, 185 } 186 187 impl UbuntuDiskConfig { 188 pub fn new(image_name: String) -> Self { 189 UbuntuDiskConfig { 190 image_name, 191 osdisk_path: String::new(), 192 cloudinit_path: String::new(), 193 } 194 } 195 } 196 197 pub struct WindowsDiskConfig { 198 image_name: String, 199 osdisk_path: String, 200 loopback_device: String, 201 windows_snapshot_cow: String, 202 windows_snapshot: String, 203 } 204 205 impl WindowsDiskConfig { 206 pub fn new(image_name: String) -> Self { 207 WindowsDiskConfig { 208 image_name, 209 osdisk_path: String::new(), 210 loopback_device: String::new(), 211 windows_snapshot_cow: String::new(), 212 windows_snapshot: String::new(), 213 } 214 } 215 } 216 217 impl Drop for WindowsDiskConfig { 218 fn drop(&mut self) { 219 // dmsetup remove windows-snapshot-1 220 std::process::Command::new("dmsetup") 221 .arg("remove") 222 .arg(self.windows_snapshot.as_str()) 223 .output() 224 .expect("Expect removing Windows snapshot with 'dmsetup' to succeed"); 225 226 // dmsetup remove windows-snapshot-cow-1 227 std::process::Command::new("dmsetup") 228 .arg("remove") 229 .arg(self.windows_snapshot_cow.as_str()) 230 .output() 231 .expect("Expect removing Windows snapshot CoW with 'dmsetup' to succeed"); 232 233 // losetup -d <loopback_device> 234 std::process::Command::new("losetup") 235 .args(["-d", self.loopback_device.as_str()]) 236 .output() 237 .expect("Expect removing loopback device to succeed"); 238 } 239 } 240 241 impl DiskConfig for UbuntuDiskConfig { 242 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String { 243 let cloudinit_file_path = 244 String::from(tmp_dir.as_path().join("cloudinit").to_str().unwrap()); 245 246 let cloud_init_directory = tmp_dir.as_path().join("cloud-init").join("ubuntu"); 247 248 fs::create_dir_all(&cloud_init_directory) 249 .expect("Expect creating cloud-init directory to succeed"); 250 251 let source_file_dir = std::env::current_dir() 252 .unwrap() 253 .join("test_data") 254 .join("cloud-init") 255 .join("ubuntu") 256 .join("ci"); 257 258 ["meta-data"].iter().for_each(|x| { 259 rate_limited_copy(source_file_dir.join(x), cloud_init_directory.join(x)) 260 .expect("Expect copying cloud-init meta-data to succeed"); 261 }); 262 263 let mut user_data_string = String::new(); 264 fs::File::open(source_file_dir.join("user-data")) 265 .unwrap() 266 .read_to_string(&mut user_data_string) 267 .expect("Expected reading user-data file to succeed"); 268 user_data_string = user_data_string.replace( 269 "@DEFAULT_TCP_LISTENER_MESSAGE", 270 DEFAULT_TCP_LISTENER_MESSAGE, 271 ); 272 user_data_string = user_data_string.replace("@HOST_IP", &network.host_ip); 273 user_data_string = 274 user_data_string.replace("@TCP_LISTENER_PORT", &network.tcp_listener_port.to_string()); 275 276 fs::File::create(cloud_init_directory.join("user-data")) 277 .unwrap() 278 .write_all(user_data_string.as_bytes()) 279 .expect("Expected writing out user-data to succeed"); 280 281 let mut network_config_string = String::new(); 282 283 fs::File::open(source_file_dir.join("network-config")) 284 .unwrap() 285 .read_to_string(&mut network_config_string) 286 .expect("Expected reading network-config file to succeed"); 287 288 network_config_string = network_config_string.replace("192.168.2.1", &network.host_ip); 289 network_config_string = network_config_string.replace("192.168.2.2", &network.guest_ip); 290 network_config_string = network_config_string.replace("192.168.2.3", &network.l2_guest_ip1); 291 network_config_string = network_config_string.replace("192.168.2.4", &network.l2_guest_ip2); 292 network_config_string = network_config_string.replace("192.168.2.5", &network.l2_guest_ip3); 293 network_config_string = 294 network_config_string.replace("12:34:56:78:90:ab", &network.guest_mac); 295 network_config_string = 296 network_config_string.replace("de:ad:be:ef:12:34", &network.l2_guest_mac1); 297 network_config_string = 298 network_config_string.replace("de:ad:be:ef:34:56", &network.l2_guest_mac2); 299 network_config_string = 300 network_config_string.replace("de:ad:be:ef:56:78", &network.l2_guest_mac3); 301 302 fs::File::create(cloud_init_directory.join("network-config")) 303 .unwrap() 304 .write_all(network_config_string.as_bytes()) 305 .expect("Expected writing out network-config to succeed"); 306 307 std::process::Command::new("mkdosfs") 308 .args(["-n", "CIDATA"]) 309 .args(["-C", cloudinit_file_path.as_str()]) 310 .arg("8192") 311 .output() 312 .expect("Expect creating disk image to succeed"); 313 314 ["user-data", "meta-data", "network-config"] 315 .iter() 316 .for_each(|x| { 317 std::process::Command::new("mcopy") 318 .arg("-o") 319 .args(["-i", cloudinit_file_path.as_str()]) 320 .args(["-s", cloud_init_directory.join(x).to_str().unwrap(), "::"]) 321 .output() 322 .expect("Expect copying files to disk image to succeed"); 323 }); 324 325 cloudinit_file_path 326 } 327 328 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig) { 329 let mut workload_path = dirs::home_dir().unwrap(); 330 workload_path.push("workloads"); 331 332 let mut osdisk_base_path = workload_path; 333 osdisk_base_path.push(&self.image_name); 334 335 let osdisk_path = String::from(tmp_dir.as_path().join("osdisk.img").to_str().unwrap()); 336 let cloudinit_path = self.prepare_cloudinit(tmp_dir, network); 337 338 rate_limited_copy(osdisk_base_path, &osdisk_path) 339 .expect("copying of OS source disk image failed"); 340 341 self.cloudinit_path = cloudinit_path; 342 self.osdisk_path = osdisk_path; 343 } 344 345 fn disk(&self, disk_type: DiskType) -> Option<String> { 346 match disk_type { 347 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 348 DiskType::CloudInit => Some(self.cloudinit_path.clone()), 349 } 350 } 351 } 352 353 impl DiskConfig for WindowsDiskConfig { 354 fn prepare_cloudinit(&self, _tmp_dir: &TempDir, _network: &GuestNetworkConfig) -> String { 355 String::new() 356 } 357 358 fn prepare_files(&mut self, tmp_dir: &TempDir, _network: &GuestNetworkConfig) { 359 let mut workload_path = dirs::home_dir().unwrap(); 360 workload_path.push("workloads"); 361 362 let mut osdisk_path = workload_path; 363 osdisk_path.push(&self.image_name); 364 365 let osdisk_blk_size = fs::metadata(osdisk_path) 366 .expect("Expect retrieving Windows image metadata") 367 .len() 368 >> 9; 369 370 let snapshot_cow_path = 371 String::from(tmp_dir.as_path().join("snapshot_cow").to_str().unwrap()); 372 373 // Create and truncate CoW file for device mapper 374 let cow_file_size: u64 = 1 << 30; 375 let cow_file_blk_size = cow_file_size >> 9; 376 let cow_file = std::fs::File::create(snapshot_cow_path.as_str()) 377 .expect("Expect creating CoW image to succeed"); 378 cow_file 379 .set_len(cow_file_size) 380 .expect("Expect truncating CoW image to succeed"); 381 382 // losetup --find --show /tmp/snapshot_cow 383 let loopback_device = std::process::Command::new("losetup") 384 .arg("--find") 385 .arg("--show") 386 .arg(snapshot_cow_path.as_str()) 387 .output() 388 .expect("Expect creating loopback device from snapshot CoW image to succeed"); 389 390 self.loopback_device = String::from_utf8_lossy(&loopback_device.stdout) 391 .trim() 392 .to_string(); 393 394 let random_extension = tmp_dir.as_path().file_name().unwrap(); 395 let windows_snapshot_cow = format!( 396 "windows-snapshot-cow-{}", 397 random_extension.to_str().unwrap() 398 ); 399 400 // dmsetup create windows-snapshot-cow-1 --table '0 2097152 linear /dev/loop1 0' 401 std::process::Command::new("dmsetup") 402 .arg("create") 403 .arg(windows_snapshot_cow.as_str()) 404 .args([ 405 "--table", 406 format!("0 {} linear {} 0", cow_file_blk_size, self.loopback_device).as_str(), 407 ]) 408 .output() 409 .expect("Expect creating Windows snapshot CoW with 'dmsetup' to succeed"); 410 411 let windows_snapshot = format!("windows-snapshot-{}", random_extension.to_str().unwrap()); 412 413 // dmsetup mknodes 414 std::process::Command::new("dmsetup") 415 .arg("mknodes") 416 .output() 417 .expect("Expect device mapper nodes to be ready"); 418 419 // dmsetup create windows-snapshot-1 --table '0 41943040 snapshot /dev/mapper/windows-base /dev/mapper/windows-snapshot-cow-1 P 8' 420 std::process::Command::new("dmsetup") 421 .arg("create") 422 .arg(windows_snapshot.as_str()) 423 .args([ 424 "--table", 425 format!( 426 "0 {} snapshot /dev/mapper/windows-base /dev/mapper/{} P 8", 427 osdisk_blk_size, 428 windows_snapshot_cow.as_str() 429 ) 430 .as_str(), 431 ]) 432 .output() 433 .expect("Expect creating Windows snapshot with 'dmsetup' to succeed"); 434 435 // dmsetup mknodes 436 std::process::Command::new("dmsetup") 437 .arg("mknodes") 438 .output() 439 .expect("Expect device mapper nodes to be ready"); 440 441 self.osdisk_path = format!("/dev/mapper/{windows_snapshot}"); 442 self.windows_snapshot_cow = windows_snapshot_cow; 443 self.windows_snapshot = windows_snapshot; 444 } 445 446 fn disk(&self, disk_type: DiskType) -> Option<String> { 447 match disk_type { 448 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 449 DiskType::CloudInit => None, 450 } 451 } 452 } 453 454 pub fn rate_limited_copy<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<u64> { 455 for i in 0..10 { 456 let free_bytes = unsafe { 457 let mut stats = std::mem::MaybeUninit::zeroed(); 458 let fs_name = std::ffi::CString::new("/tmp").unwrap(); 459 libc::statvfs(fs_name.as_ptr(), stats.as_mut_ptr()); 460 461 let free_blocks = stats.assume_init().f_bfree; 462 let block_size = stats.assume_init().f_bsize; 463 464 free_blocks * block_size 465 }; 466 467 // Make sure there is at least 6 GiB of space 468 if free_bytes < 6 << 30 { 469 eprintln!("Not enough space on disk ({free_bytes}). Attempt {i} of 10. Sleeping."); 470 thread::sleep(std::time::Duration::new(60, 0)); 471 continue; 472 } 473 474 match fs::copy(&from, &to) { 475 Err(e) => { 476 if let Some(errno) = e.raw_os_error() { 477 if errno == libc::ENOSPC { 478 eprintln!("Copy returned ENOSPC. Attempt {i} of 10. Sleeping."); 479 thread::sleep(std::time::Duration::new(60, 0)); 480 continue; 481 } 482 } 483 return Err(e); 484 } 485 Ok(i) => return Ok(i), 486 } 487 } 488 Err(io::Error::last_os_error()) 489 } 490 491 pub fn handle_child_output( 492 r: Result<(), std::boxed::Box<dyn std::any::Any + std::marker::Send>>, 493 output: &std::process::Output, 494 ) { 495 use std::os::unix::process::ExitStatusExt; 496 if r.is_ok() && output.status.success() { 497 return; 498 } 499 500 match output.status.code() { 501 None => { 502 // Don't treat child.kill() as a problem 503 if output.status.signal() == Some(9) && r.is_ok() { 504 return; 505 } 506 507 eprintln!( 508 "==== child killed by signal: {} ====", 509 output.status.signal().unwrap() 510 ); 511 } 512 Some(code) => { 513 eprintln!("\n\n==== child exit code: {code} ===="); 514 } 515 } 516 517 eprintln!( 518 "\n\n==== Start child stdout ====\n\n{}\n\n==== End child stdout ====", 519 String::from_utf8_lossy(&output.stdout) 520 ); 521 eprintln!( 522 "\n\n==== Start child stderr ====\n\n{}\n\n==== End child stderr ====", 523 String::from_utf8_lossy(&output.stderr) 524 ); 525 526 panic!("Test failed") 527 } 528 529 #[derive(Debug)] 530 pub struct PasswordAuth { 531 pub username: String, 532 pub password: String, 533 } 534 535 pub const DEFAULT_SSH_RETRIES: u8 = 6; 536 pub const DEFAULT_SSH_TIMEOUT: u8 = 10; 537 538 #[derive(Debug)] 539 pub enum SshCommandError { 540 Connection(std::io::Error), 541 Handshake(ssh2::Error), 542 Authentication(ssh2::Error), 543 ChannelSession(ssh2::Error), 544 Command(ssh2::Error), 545 ExitStatus(ssh2::Error), 546 NonZeroExitStatus(i32), 547 FileRead(std::io::Error), 548 FileMetadata(std::io::Error), 549 ScpSend(ssh2::Error), 550 WriteAll(std::io::Error), 551 SendEof(ssh2::Error), 552 WaitEof(ssh2::Error), 553 } 554 555 fn scp_to_guest_with_auth( 556 path: &Path, 557 remote_path: &Path, 558 auth: &PasswordAuth, 559 ip: &str, 560 retries: u8, 561 timeout: u8, 562 ) -> Result<(), SshCommandError> { 563 let mut counter = 0; 564 loop { 565 let closure = || -> Result<(), SshCommandError> { 566 let tcp = 567 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?; 568 let mut sess = Session::new().unwrap(); 569 sess.set_tcp_stream(tcp); 570 sess.handshake().map_err(SshCommandError::Handshake)?; 571 572 sess.userauth_password(&auth.username, &auth.password) 573 .map_err(SshCommandError::Authentication)?; 574 assert!(sess.authenticated()); 575 576 let content = fs::read(path).map_err(SshCommandError::FileRead)?; 577 let mode = fs::metadata(path) 578 .map_err(SshCommandError::FileMetadata)? 579 .permissions() 580 .mode() 581 & 0o777; 582 583 let mut channel = sess 584 .scp_send(remote_path, mode as i32, content.len() as u64, None) 585 .map_err(SshCommandError::ScpSend)?; 586 channel 587 .write_all(&content) 588 .map_err(SshCommandError::WriteAll)?; 589 channel.send_eof().map_err(SshCommandError::SendEof)?; 590 channel.wait_eof().map_err(SshCommandError::WaitEof)?; 591 592 // Intentionally ignore these results here as their failure 593 // does not precipitate a repeat 594 let _ = channel.close(); 595 let _ = channel.wait_close(); 596 597 Ok(()) 598 }; 599 600 match closure() { 601 Ok(_) => break, 602 Err(e) => { 603 counter += 1; 604 if counter >= retries { 605 eprintln!( 606 "\n\n==== Start scp command output (FAILED) ====\n\n\ 607 path =\"{path:?}\"\n\ 608 remote_path =\"{remote_path:?}\"\n\ 609 auth=\"{auth:#?}\"\n\ 610 ip=\"{ip}\"\n\ 611 error=\"{e:?}\"\n\ 612 \n==== End scp command outout ====\n\n" 613 ); 614 615 return Err(e); 616 } 617 } 618 }; 619 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 620 } 621 Ok(()) 622 } 623 624 pub fn scp_to_guest( 625 path: &Path, 626 remote_path: &Path, 627 ip: &str, 628 retries: u8, 629 timeout: u8, 630 ) -> Result<(), SshCommandError> { 631 scp_to_guest_with_auth( 632 path, 633 remote_path, 634 &PasswordAuth { 635 username: String::from("cloud"), 636 password: String::from("cloud123"), 637 }, 638 ip, 639 retries, 640 timeout, 641 ) 642 } 643 644 pub fn ssh_command_ip_with_auth( 645 command: &str, 646 auth: &PasswordAuth, 647 ip: &str, 648 retries: u8, 649 timeout: u8, 650 ) -> Result<String, SshCommandError> { 651 let mut s = String::new(); 652 653 let mut counter = 0; 654 loop { 655 let mut closure = || -> Result<(), SshCommandError> { 656 let tcp = 657 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?; 658 let mut sess = Session::new().unwrap(); 659 sess.set_tcp_stream(tcp); 660 sess.handshake().map_err(SshCommandError::Handshake)?; 661 662 sess.userauth_password(&auth.username, &auth.password) 663 .map_err(SshCommandError::Authentication)?; 664 assert!(sess.authenticated()); 665 666 let mut channel = sess 667 .channel_session() 668 .map_err(SshCommandError::ChannelSession)?; 669 channel.exec(command).map_err(SshCommandError::Command)?; 670 671 // Intentionally ignore these results here as their failure 672 // does not precipitate a repeat 673 let _ = channel.read_to_string(&mut s); 674 let _ = channel.close(); 675 let _ = channel.wait_close(); 676 677 let status = channel.exit_status().map_err(SshCommandError::ExitStatus)?; 678 679 if status != 0 { 680 Err(SshCommandError::NonZeroExitStatus(status)) 681 } else { 682 Ok(()) 683 } 684 }; 685 686 match closure() { 687 Ok(_) => break, 688 Err(e) => { 689 counter += 1; 690 if counter >= retries { 691 eprintln!( 692 "\n\n==== Start ssh command output (FAILED) ====\n\n\ 693 command=\"{command}\"\n\ 694 auth=\"{auth:#?}\"\n\ 695 ip=\"{ip}\"\n\ 696 output=\"{s}\"\n\ 697 error=\"{e:?}\"\n\ 698 \n==== End ssh command outout ====\n\n" 699 ); 700 701 return Err(e); 702 } 703 } 704 }; 705 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 706 } 707 Ok(s) 708 } 709 710 pub fn ssh_command_ip( 711 command: &str, 712 ip: &str, 713 retries: u8, 714 timeout: u8, 715 ) -> Result<String, SshCommandError> { 716 ssh_command_ip_with_auth( 717 command, 718 &PasswordAuth { 719 username: String::from("cloud"), 720 password: String::from("cloud123"), 721 }, 722 ip, 723 retries, 724 timeout, 725 ) 726 } 727 728 pub fn exec_host_command_with_retries(command: &str, retries: u32, interval: Duration) -> bool { 729 for _ in 0..retries { 730 let s = exec_host_command_output(command).status; 731 if !s.success() { 732 eprintln!("\n\n==== retrying in {:?} ===\n\n", interval); 733 thread::sleep(interval); 734 } else { 735 return true; 736 } 737 } 738 739 false 740 } 741 742 pub fn exec_host_command_status(command: &str) -> ExitStatus { 743 exec_host_command_output(command).status 744 } 745 746 pub fn exec_host_command_output(command: &str) -> Output { 747 let output = std::process::Command::new("bash") 748 .args(["-c", command]) 749 .output() 750 .unwrap_or_else(|e| panic!("Expected '{command}' to run. Error: {:?}", e)); 751 752 if !output.status.success() { 753 let stdout = String::from_utf8_lossy(&output.stdout); 754 let stderr = String::from_utf8_lossy(&output.stderr); 755 eprintln!( 756 "\n\n==== Start 'exec_host_command' failed ==== \ 757 \n\n---stdout---\n{stdout}\n---stderr---{stderr} \ 758 \n\n==== End 'exec_host_command' failed ====", 759 ); 760 } 761 762 output 763 } 764 765 pub fn check_lines_count(input: &str, line_count: usize) -> bool { 766 if input.lines().count() == line_count { 767 true 768 } else { 769 eprintln!( 770 "\n\n==== Start 'check_lines_count' failed ==== \ 771 \n\ninput = {input}\nline_count = {line_count} \ 772 \n\n==== End 'check_lines_count' failed ====", 773 ); 774 775 false 776 } 777 } 778 779 pub fn check_matched_lines_count(input: &str, keywords: Vec<&str>, line_count: usize) -> bool { 780 let mut matches = String::new(); 781 for line in input.lines() { 782 if keywords.iter().all(|k| line.contains(k)) { 783 matches += line; 784 } 785 } 786 787 if matches.lines().count() == line_count { 788 true 789 } else { 790 eprintln!( 791 "\n\n==== Start 'check_matched_lines_count' failed ==== \ 792 \nkeywords = {keywords:?}, line_count = {line_count} \ 793 \n\ninput = {input} matches = {matches} \ 794 \n\n==== End 'check_matched_lines_count' failed ====", 795 ); 796 797 false 798 } 799 } 800 801 pub fn kill_child(child: &mut Child) { 802 let r = unsafe { libc::kill(child.id() as i32, libc::SIGTERM) }; 803 if r != 0 { 804 let e = io::Error::last_os_error(); 805 if e.raw_os_error().unwrap() == libc::ESRCH { 806 return; 807 } 808 eprintln!("Failed to kill child with SIGTERM: {e:?}"); 809 } 810 811 // The timeout period elapsed without the child exiting 812 if child.wait_timeout(Duration::new(5, 0)).unwrap().is_none() { 813 let _ = child.kill(); 814 } 815 } 816 817 pub const PIPE_SIZE: i32 = 32 << 20; 818 819 static NEXT_VM_ID: Lazy<Mutex<u8>> = Lazy::new(|| Mutex::new(1)); 820 821 pub struct Guest { 822 pub tmp_dir: TempDir, 823 pub disk_config: Box<dyn DiskConfig>, 824 pub network: GuestNetworkConfig, 825 } 826 827 // Safe to implement as we know we have no interior mutability 828 impl std::panic::RefUnwindSafe for Guest {} 829 830 impl Guest { 831 pub fn new_from_ip_range(mut disk_config: Box<dyn DiskConfig>, class: &str, id: u8) -> Self { 832 let tmp_dir = TempDir::new_with_prefix("/tmp/ch").unwrap(); 833 834 let network = GuestNetworkConfig { 835 guest_ip: format!("{class}.{id}.2"), 836 l2_guest_ip1: format!("{class}.{id}.3"), 837 l2_guest_ip2: format!("{class}.{id}.4"), 838 l2_guest_ip3: format!("{class}.{id}.5"), 839 host_ip: format!("{class}.{id}.1"), 840 guest_mac: format!("12:34:56:78:90:{id:02x}"), 841 l2_guest_mac1: format!("de:ad:be:ef:12:{id:02x}"), 842 l2_guest_mac2: format!("de:ad:be:ef:34:{id:02x}"), 843 l2_guest_mac3: format!("de:ad:be:ef:56:{id:02x}"), 844 tcp_listener_port: DEFAULT_TCP_LISTENER_PORT + id as u16, 845 }; 846 847 disk_config.prepare_files(&tmp_dir, &network); 848 849 Guest { 850 tmp_dir, 851 disk_config, 852 network, 853 } 854 } 855 856 pub fn new(disk_config: Box<dyn DiskConfig>) -> Self { 857 let mut guard = NEXT_VM_ID.lock().unwrap(); 858 let id = *guard; 859 *guard = id + 1; 860 861 Self::new_from_ip_range(disk_config, "192.168", id) 862 } 863 864 pub fn default_net_string(&self) -> String { 865 format!( 866 "tap=,mac={},ip={},mask=255.255.255.0", 867 self.network.guest_mac, self.network.host_ip 868 ) 869 } 870 871 pub fn default_net_string_w_iommu(&self) -> String { 872 format!( 873 "tap=,mac={},ip={},mask=255.255.255.0,iommu=on", 874 self.network.guest_mac, self.network.host_ip 875 ) 876 } 877 878 pub fn default_net_string_w_mtu(&self, mtu: u16) -> String { 879 format!( 880 "tap=,mac={},ip={},mask=255.255.255.0,mtu={}", 881 self.network.guest_mac, self.network.host_ip, mtu 882 ) 883 } 884 885 pub fn ssh_command(&self, command: &str) -> Result<String, SshCommandError> { 886 ssh_command_ip( 887 command, 888 &self.network.guest_ip, 889 DEFAULT_SSH_RETRIES, 890 DEFAULT_SSH_TIMEOUT, 891 ) 892 } 893 894 #[cfg(target_arch = "x86_64")] 895 pub fn ssh_command_l1(&self, command: &str) -> Result<String, SshCommandError> { 896 ssh_command_ip( 897 command, 898 &self.network.guest_ip, 899 DEFAULT_SSH_RETRIES, 900 DEFAULT_SSH_TIMEOUT, 901 ) 902 } 903 904 #[cfg(target_arch = "x86_64")] 905 pub fn ssh_command_l2_1(&self, command: &str) -> Result<String, SshCommandError> { 906 ssh_command_ip( 907 command, 908 &self.network.l2_guest_ip1, 909 DEFAULT_SSH_RETRIES, 910 DEFAULT_SSH_TIMEOUT, 911 ) 912 } 913 914 #[cfg(target_arch = "x86_64")] 915 pub fn ssh_command_l2_2(&self, command: &str) -> Result<String, SshCommandError> { 916 ssh_command_ip( 917 command, 918 &self.network.l2_guest_ip2, 919 DEFAULT_SSH_RETRIES, 920 DEFAULT_SSH_TIMEOUT, 921 ) 922 } 923 924 #[cfg(target_arch = "x86_64")] 925 pub fn ssh_command_l2_3(&self, command: &str) -> Result<String, SshCommandError> { 926 ssh_command_ip( 927 command, 928 &self.network.l2_guest_ip3, 929 DEFAULT_SSH_RETRIES, 930 DEFAULT_SSH_TIMEOUT, 931 ) 932 } 933 934 pub fn api_create_body(&self, cpu_count: u8, kernel_path: &str, kernel_cmd: &str) -> String { 935 format! {"{{\"cpus\":{{\"boot_vcpus\":{},\"max_vcpus\":{}}},\"payload\":{{\"kernel\":\"{}\",\"cmdline\": \"{}\"}},\"net\":[{{\"ip\":\"{}\", \"mask\":\"255.255.255.0\", \"mac\":\"{}\"}}], \"disks\":[{{\"path\":\"{}\"}}, {{\"path\":\"{}\"}}]}}", 936 cpu_count, 937 cpu_count, 938 kernel_path, 939 kernel_cmd, 940 self.network.host_ip, 941 self.network.guest_mac, 942 self.disk_config.disk(DiskType::OperatingSystem).unwrap().as_str(), 943 self.disk_config.disk(DiskType::CloudInit).unwrap().as_str(), 944 } 945 } 946 947 pub fn get_cpu_count(&self) -> Result<u32, Error> { 948 self.ssh_command("grep -c processor /proc/cpuinfo")? 949 .trim() 950 .parse() 951 .map_err(Error::Parsing) 952 } 953 954 pub fn get_total_memory(&self) -> Result<u32, Error> { 955 self.ssh_command("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 956 .trim() 957 .parse() 958 .map_err(Error::Parsing) 959 } 960 961 #[cfg(target_arch = "x86_64")] 962 pub fn get_total_memory_l2(&self) -> Result<u32, Error> { 963 self.ssh_command_l2_1("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 964 .trim() 965 .parse() 966 .map_err(Error::Parsing) 967 } 968 969 pub fn get_numa_node_memory(&self, node_id: usize) -> Result<u32, Error> { 970 self.ssh_command( 971 format!( 972 "grep MemTotal /sys/devices/system/node/node{node_id}/meminfo \ 973 | cut -d \":\" -f 2 | grep -o \"[0-9]*\"" 974 ) 975 .as_str(), 976 )? 977 .trim() 978 .parse() 979 .map_err(Error::Parsing) 980 } 981 982 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), Error> { 983 self.network 984 .wait_vm_boot(custom_timeout) 985 .map_err(Error::WaitForBoot) 986 } 987 988 pub fn check_numa_node_cpus(&self, node_id: usize, cpus: Vec<usize>) -> Result<(), Error> { 989 for cpu in cpus.iter() { 990 let cmd = format!("[ -d \"/sys/devices/system/node/node{node_id}/cpu{cpu}\" ]"); 991 self.ssh_command(cmd.as_str())?; 992 } 993 994 Ok(()) 995 } 996 997 pub fn check_numa_node_distances( 998 &self, 999 node_id: usize, 1000 distances: &str, 1001 ) -> Result<bool, Error> { 1002 let cmd = format!("cat /sys/devices/system/node/node{node_id}/distance"); 1003 if self.ssh_command(cmd.as_str())?.trim() == distances { 1004 Ok(true) 1005 } else { 1006 Ok(false) 1007 } 1008 } 1009 1010 pub fn check_numa_common( 1011 &self, 1012 mem_ref: Option<&[u32]>, 1013 node_ref: Option<&[Vec<usize>]>, 1014 distance_ref: Option<&[&str]>, 1015 ) { 1016 if let Some(mem_ref) = mem_ref { 1017 // Check each NUMA node has been assigned the right amount of 1018 // memory. 1019 for (i, &m) in mem_ref.iter().enumerate() { 1020 assert!(self.get_numa_node_memory(i).unwrap_or_default() > m); 1021 } 1022 } 1023 1024 if let Some(node_ref) = node_ref { 1025 // Check each NUMA node has been assigned the right CPUs set. 1026 for (i, n) in node_ref.iter().enumerate() { 1027 self.check_numa_node_cpus(i, n.clone()).unwrap(); 1028 } 1029 } 1030 1031 if let Some(distance_ref) = distance_ref { 1032 // Check each NUMA node has been assigned the right distances. 1033 for (i, &d) in distance_ref.iter().enumerate() { 1034 assert!(self.check_numa_node_distances(i, d).unwrap()); 1035 } 1036 } 1037 } 1038 1039 #[cfg(target_arch = "x86_64")] 1040 pub fn check_sgx_support(&self) -> Result<(), Error> { 1041 self.ssh_command( 1042 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX: \ 1043 Software Guard Extensions supported = true'", 1044 )?; 1045 self.ssh_command( 1046 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX_LC: \ 1047 SGX launch config supported = true'", 1048 )?; 1049 self.ssh_command( 1050 "cpuid -l 0x12 -s 0 | tr -s [:space:] | grep -q 'SGX1 \ 1051 supported = true'", 1052 )?; 1053 1054 Ok(()) 1055 } 1056 1057 pub fn get_pci_bridge_class(&self) -> Result<String, Error> { 1058 Ok(self 1059 .ssh_command("cat /sys/bus/pci/devices/0000:00:00.0/class")? 1060 .trim() 1061 .to_string()) 1062 } 1063 1064 pub fn get_pci_device_ids(&self) -> Result<String, Error> { 1065 Ok(self 1066 .ssh_command("cat /sys/bus/pci/devices/*/device")? 1067 .trim() 1068 .to_string()) 1069 } 1070 1071 pub fn get_pci_vendor_ids(&self) -> Result<String, Error> { 1072 Ok(self 1073 .ssh_command("cat /sys/bus/pci/devices/*/vendor")? 1074 .trim() 1075 .to_string()) 1076 } 1077 1078 pub fn does_device_vendor_pair_match( 1079 &self, 1080 device_id: &str, 1081 vendor_id: &str, 1082 ) -> Result<bool, Error> { 1083 // We are checking if console device's device id and vendor id pair matches 1084 let devices = self.get_pci_device_ids()?; 1085 let devices: Vec<&str> = devices.split('\n').collect(); 1086 let vendors = self.get_pci_vendor_ids()?; 1087 let vendors: Vec<&str> = vendors.split('\n').collect(); 1088 1089 for (index, d_id) in devices.iter().enumerate() { 1090 if *d_id == device_id { 1091 if let Some(v_id) = vendors.get(index) { 1092 if *v_id == vendor_id { 1093 return Ok(true); 1094 } 1095 } 1096 } 1097 } 1098 1099 Ok(false) 1100 } 1101 1102 pub fn check_vsock(&self, socket: &str) { 1103 // Listen from guest on vsock CID=3 PORT=16 1104 // SOCKET-LISTEN:<domain>:<protocol>:<local-address> 1105 let guest_ip = self.network.guest_ip.clone(); 1106 let listen_socat = thread::spawn(move || { 1107 ssh_command_ip("sudo socat - SOCKET-LISTEN:40:0:x00x00x10x00x00x00x03x00x00x00x00x00x00x00 > vsock_log", &guest_ip, DEFAULT_SSH_RETRIES, DEFAULT_SSH_TIMEOUT).unwrap(); 1108 }); 1109 1110 // Make sure socat is listening, which might take a few second on slow systems 1111 thread::sleep(std::time::Duration::new(10, 0)); 1112 1113 // Write something to vsock from the host 1114 assert!(exec_host_command_status(&format!( 1115 "echo -e \"CONNECT 16\\nHelloWorld!\" | socat - UNIX-CONNECT:{socket}" 1116 )) 1117 .success()); 1118 1119 // Wait for the thread to terminate. 1120 listen_socat.join().unwrap(); 1121 1122 assert_eq!( 1123 self.ssh_command("cat vsock_log").unwrap().trim(), 1124 "HelloWorld!" 1125 ); 1126 } 1127 1128 #[cfg(target_arch = "x86_64")] 1129 pub fn check_nvidia_gpu(&self) { 1130 assert!(self.ssh_command("nvidia-smi").unwrap().contains("Tesla T4")); 1131 } 1132 1133 pub fn reboot_linux(&self, current_reboot_count: u32, custom_timeout: Option<i32>) { 1134 let list_boots_cmd = "sudo last | grep -c reboot"; 1135 let boot_count = self 1136 .ssh_command(list_boots_cmd) 1137 .unwrap() 1138 .trim() 1139 .parse::<u32>() 1140 .unwrap_or_default(); 1141 1142 assert_eq!(boot_count, current_reboot_count + 1); 1143 self.ssh_command("sudo reboot").unwrap(); 1144 1145 self.wait_vm_boot(custom_timeout).unwrap(); 1146 let boot_count = self 1147 .ssh_command(list_boots_cmd) 1148 .unwrap() 1149 .trim() 1150 .parse::<u32>() 1151 .unwrap_or_default(); 1152 assert_eq!(boot_count, current_reboot_count + 2); 1153 } 1154 1155 pub fn enable_memory_hotplug(&self) { 1156 self.ssh_command("echo online | sudo tee /sys/devices/system/memory/auto_online_blocks") 1157 .unwrap(); 1158 } 1159 1160 pub fn check_devices_common( 1161 &self, 1162 socket: Option<&String>, 1163 console_text: Option<&String>, 1164 pmem_path: Option<&String>, 1165 ) { 1166 // Check block devices are readable 1167 self.ssh_command("sudo dd if=/dev/vda of=/dev/null bs=1M iflag=direct count=1024") 1168 .unwrap(); 1169 self.ssh_command("sudo dd if=/dev/vdb of=/dev/null bs=1M iflag=direct count=8") 1170 .unwrap(); 1171 // Check if the rng device is readable 1172 self.ssh_command("sudo head -c 1000 /dev/hwrng > /dev/null") 1173 .unwrap(); 1174 // Check vsock 1175 if let Some(socket) = socket { 1176 self.check_vsock(socket.as_str()); 1177 } 1178 // Check if the console is usable 1179 if let Some(console_text) = console_text { 1180 let console_cmd = format!("echo {console_text} | sudo tee /dev/hvc0"); 1181 self.ssh_command(&console_cmd).unwrap(); 1182 } 1183 // The net device is 'automatically' exercised through the above 'ssh' commands 1184 1185 // Check if the pmem device is usable 1186 if let Some(pmem_path) = pmem_path { 1187 assert_eq!( 1188 self.ssh_command(&format!("ls {pmem_path}")).unwrap().trim(), 1189 pmem_path 1190 ); 1191 assert_eq!( 1192 self.ssh_command(&format!("sudo mount {pmem_path} /mnt")) 1193 .unwrap(), 1194 "" 1195 ); 1196 assert_eq!(self.ssh_command("ls /mnt").unwrap(), "lost+found\n"); 1197 self.ssh_command("echo test123 | sudo tee /mnt/test") 1198 .unwrap(); 1199 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1200 assert_eq!(self.ssh_command("ls /mnt").unwrap(), ""); 1201 1202 assert_eq!( 1203 self.ssh_command(&format!("sudo mount {pmem_path} /mnt")) 1204 .unwrap(), 1205 "" 1206 ); 1207 assert_eq!( 1208 self.ssh_command("sudo cat /mnt/test || true") 1209 .unwrap() 1210 .trim(), 1211 "test123" 1212 ); 1213 self.ssh_command("sudo rm /mnt/test").unwrap(); 1214 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1215 } 1216 } 1217 } 1218 1219 pub enum VerbosityLevel { 1220 Warn, 1221 Info, 1222 Debug, 1223 } 1224 1225 impl Default for VerbosityLevel { 1226 fn default() -> Self { 1227 Self::Warn 1228 } 1229 } 1230 1231 impl Display for VerbosityLevel { 1232 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1233 use VerbosityLevel::*; 1234 match self { 1235 Warn => (), 1236 Info => write!(f, "-v")?, 1237 Debug => write!(f, "-vv")?, 1238 } 1239 Ok(()) 1240 } 1241 } 1242 1243 pub struct GuestCommand<'a> { 1244 command: Command, 1245 guest: &'a Guest, 1246 capture_output: bool, 1247 print_cmd: bool, 1248 verbosity: VerbosityLevel, 1249 } 1250 1251 impl<'a> GuestCommand<'a> { 1252 pub fn new(guest: &'a Guest) -> Self { 1253 Self::new_with_binary_path(guest, &clh_command("cloud-hypervisor")) 1254 } 1255 1256 pub fn new_with_binary_path(guest: &'a Guest, binary_path: &str) -> Self { 1257 Self { 1258 command: Command::new(binary_path), 1259 guest, 1260 capture_output: false, 1261 print_cmd: true, 1262 verbosity: VerbosityLevel::Info, 1263 } 1264 } 1265 1266 pub fn verbosity(&mut self, verbosity: VerbosityLevel) -> &mut Self { 1267 self.verbosity = verbosity; 1268 self 1269 } 1270 1271 pub fn capture_output(&mut self) -> &mut Self { 1272 self.capture_output = true; 1273 self 1274 } 1275 1276 pub fn set_print_cmd(&mut self, print_cmd: bool) -> &mut Self { 1277 self.print_cmd = print_cmd; 1278 self 1279 } 1280 1281 pub fn spawn(&mut self) -> io::Result<Child> { 1282 use VerbosityLevel::*; 1283 match &self.verbosity { 1284 Warn => {} 1285 Info => { 1286 self.command.arg("-v"); 1287 } 1288 Debug => { 1289 self.command.args(["-vv"]); 1290 } 1291 }; 1292 1293 if self.print_cmd { 1294 println!( 1295 "\n\n==== Start cloud-hypervisor command-line ====\n\n\ 1296 {:?}\n\ 1297 \n==== End cloud-hypervisor command-line ====\n\n", 1298 self.command 1299 ); 1300 } 1301 1302 if self.capture_output { 1303 let child = self 1304 .command 1305 .stderr(Stdio::piped()) 1306 .stdout(Stdio::piped()) 1307 .spawn() 1308 .unwrap(); 1309 1310 let fd = child.stdout.as_ref().unwrap().as_raw_fd(); 1311 let pipesize = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1312 if pipesize == -1 { 1313 return Err(io::Error::last_os_error()); 1314 } 1315 let fd = child.stderr.as_ref().unwrap().as_raw_fd(); 1316 let pipesize1 = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1317 if pipesize1 == -1 { 1318 return Err(io::Error::last_os_error()); 1319 } 1320 1321 if pipesize >= PIPE_SIZE && pipesize1 >= PIPE_SIZE { 1322 Ok(child) 1323 } else { 1324 Err(std::io::Error::new( 1325 std::io::ErrorKind::Other, 1326 format!( 1327 "resizing pipe w/ 'fnctl' failed: stdout pipesize {pipesize}, stderr pipesize {pipesize1}" 1328 ), 1329 )) 1330 } 1331 } else { 1332 self.command.spawn() 1333 } 1334 } 1335 1336 pub fn args<I, S>(&mut self, args: I) -> &mut Self 1337 where 1338 I: IntoIterator<Item = S>, 1339 S: AsRef<OsStr>, 1340 { 1341 self.command.args(args); 1342 self 1343 } 1344 1345 pub fn default_disks(&mut self) -> &mut Self { 1346 if self.guest.disk_config.disk(DiskType::CloudInit).is_some() { 1347 self.args([ 1348 "--disk", 1349 format!( 1350 "path={}", 1351 self.guest 1352 .disk_config 1353 .disk(DiskType::OperatingSystem) 1354 .unwrap() 1355 ) 1356 .as_str(), 1357 format!( 1358 "path={}", 1359 self.guest.disk_config.disk(DiskType::CloudInit).unwrap() 1360 ) 1361 .as_str(), 1362 ]) 1363 } else { 1364 self.args([ 1365 "--disk", 1366 format!( 1367 "path={}", 1368 self.guest 1369 .disk_config 1370 .disk(DiskType::OperatingSystem) 1371 .unwrap() 1372 ) 1373 .as_str(), 1374 ]) 1375 } 1376 } 1377 1378 pub fn default_net(&mut self) -> &mut Self { 1379 self.args(["--net", self.guest.default_net_string().as_str()]) 1380 } 1381 } 1382 1383 pub fn clh_command(cmd: &str) -> String { 1384 env::var("BUILD_TARGET").map_or( 1385 format!("target/x86_64-unknown-linux-gnu/release/{cmd}"), 1386 |target| format!("target/{target}/release/{cmd}"), 1387 ) 1388 } 1389 1390 pub fn parse_iperf3_output(output: &[u8], sender: bool, bandwidth: bool) -> Result<f64, Error> { 1391 std::panic::catch_unwind(|| { 1392 let s = String::from_utf8_lossy(output); 1393 let v: Value = serde_json::from_str(&s).expect("'iperf3' parse error: invalid json output"); 1394 1395 if bandwidth { 1396 if sender { 1397 v["end"]["sum_sent"]["bits_per_second"] 1398 .as_f64() 1399 .expect("'iperf3' parse error: missing entry 'end.sum_sent.bits_per_second'") 1400 } else { 1401 v["end"]["sum_received"]["bits_per_second"].as_f64().expect( 1402 "'iperf3' parse error: missing entry 'end.sum_received.bits_per_second'", 1403 ) 1404 } 1405 } else { 1406 // iperf does not distinguish sent vs received in this case. 1407 1408 let lost_packets = v["end"]["sum"]["lost_packets"] 1409 .as_f64() 1410 .expect("'iperf3' parse error: missing entry 'end.sum.lost_packets'"); 1411 let packets = v["end"]["sum"]["packets"] 1412 .as_f64() 1413 .expect("'iperf3' parse error: missing entry 'end.sum.packets'"); 1414 let seconds = v["end"]["sum"]["seconds"] 1415 .as_f64() 1416 .expect("'iperf3' parse error: missing entry 'end.sum.seconds'"); 1417 1418 (packets - lost_packets) / seconds 1419 } 1420 }) 1421 .map_err(|_| { 1422 eprintln!( 1423 "==== Start iperf3 output ===\n\n{}\n\n=== End iperf3 output ===\n\n", 1424 String::from_utf8_lossy(output) 1425 ); 1426 Error::Iperf3Parse 1427 }) 1428 } 1429 1430 #[derive(Clone)] 1431 pub enum FioOps { 1432 Read, 1433 RandomRead, 1434 Write, 1435 RandomWrite, 1436 ReadWrite, 1437 RandRW, 1438 } 1439 1440 impl fmt::Display for FioOps { 1441 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1442 match self { 1443 FioOps::Read => write!(f, "read"), 1444 FioOps::RandomRead => write!(f, "randread"), 1445 FioOps::Write => write!(f, "write"), 1446 FioOps::RandomWrite => write!(f, "randwrite"), 1447 FioOps::ReadWrite => write!(f, "rw"), 1448 FioOps::RandRW => write!(f, "randrw"), 1449 } 1450 } 1451 } 1452 1453 pub fn parse_fio_output(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1454 std::panic::catch_unwind(|| { 1455 let v: Value = 1456 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1457 let jobs = v["jobs"] 1458 .as_array() 1459 .expect("'fio' parse error: missing entry 'jobs'"); 1460 assert_eq!( 1461 jobs.len(), 1462 num_jobs as usize, 1463 "'fio' parse error: Unexpected number of 'fio' jobs." 1464 ); 1465 1466 let (read, write) = match fio_ops { 1467 FioOps::Read | FioOps::RandomRead => (true, false), 1468 FioOps::Write | FioOps::RandomWrite => (false, true), 1469 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1470 }; 1471 1472 let mut total_bps = 0_f64; 1473 for j in jobs { 1474 if read { 1475 let bytes = j["read"]["io_bytes"] 1476 .as_u64() 1477 .expect("'fio' parse error: missing entry 'read.io_bytes'"); 1478 let runtime = j["read"]["runtime"] 1479 .as_u64() 1480 .expect("'fio' parse error: missing entry 'read.runtime'") 1481 as f64 1482 / 1000_f64; 1483 total_bps += bytes as f64 / runtime; 1484 } 1485 if write { 1486 let bytes = j["write"]["io_bytes"] 1487 .as_u64() 1488 .expect("'fio' parse error: missing entry 'write.io_bytes'"); 1489 let runtime = j["write"]["runtime"] 1490 .as_u64() 1491 .expect("'fio' parse error: missing entry 'write.runtime'") 1492 as f64 1493 / 1000_f64; 1494 total_bps += bytes as f64 / runtime; 1495 } 1496 } 1497 1498 total_bps 1499 }) 1500 .map_err(|_| { 1501 eprintln!("=== Start Fio output ===\n\n{output}\n\n=== End Fio output ===\n\n"); 1502 Error::FioOutputParse 1503 }) 1504 } 1505 1506 pub fn parse_fio_output_iops(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1507 std::panic::catch_unwind(|| { 1508 let v: Value = 1509 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1510 let jobs = v["jobs"] 1511 .as_array() 1512 .expect("'fio' parse error: missing entry 'jobs'"); 1513 assert_eq!( 1514 jobs.len(), 1515 num_jobs as usize, 1516 "'fio' parse error: Unexpected number of 'fio' jobs." 1517 ); 1518 1519 let (read, write) = match fio_ops { 1520 FioOps::Read | FioOps::RandomRead => (true, false), 1521 FioOps::Write | FioOps::RandomWrite => (false, true), 1522 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1523 }; 1524 1525 let mut total_iops = 0_f64; 1526 for j in jobs { 1527 if read { 1528 let ios = j["read"]["total_ios"] 1529 .as_u64() 1530 .expect("'fio' parse error: missing entry 'read.total_ios'"); 1531 let runtime = j["read"]["runtime"] 1532 .as_u64() 1533 .expect("'fio' parse error: missing entry 'read.runtime'") 1534 as f64 1535 / 1000_f64; 1536 total_iops += ios as f64 / runtime; 1537 } 1538 if write { 1539 let ios = j["write"]["total_ios"] 1540 .as_u64() 1541 .expect("'fio' parse error: missing entry 'write.total_ios'"); 1542 let runtime = j["write"]["runtime"] 1543 .as_u64() 1544 .expect("'fio' parse error: missing entry 'write.runtime'") 1545 as f64 1546 / 1000_f64; 1547 total_iops += ios as f64 / runtime; 1548 } 1549 } 1550 1551 total_iops 1552 }) 1553 .map_err(|_| { 1554 eprintln!("=== Start Fio output ===\n\n{output}\n\n=== End Fio output ===\n\n"); 1555 Error::FioOutputParse 1556 }) 1557 } 1558 1559 // Wait the child process for a given timeout 1560 fn child_wait_timeout(child: &mut Child, timeout: u64) -> Result<(), WaitTimeoutError> { 1561 match child.wait_timeout(Duration::from_secs(timeout)) { 1562 Err(e) => { 1563 return Err(WaitTimeoutError::General(e)); 1564 } 1565 Ok(s) => match s { 1566 None => { 1567 return Err(WaitTimeoutError::Timedout); 1568 } 1569 Some(s) => { 1570 if !s.success() { 1571 return Err(WaitTimeoutError::ExitStatus); 1572 } 1573 } 1574 }, 1575 } 1576 1577 Ok(()) 1578 } 1579 1580 pub fn measure_virtio_net_throughput( 1581 test_timeout: u32, 1582 queue_pairs: u32, 1583 guest: &Guest, 1584 receive: bool, 1585 bandwidth: bool, 1586 ) -> Result<f64, Error> { 1587 let default_port = 5201; 1588 1589 // 1. start the iperf3 server on the guest 1590 for n in 0..queue_pairs { 1591 guest.ssh_command(&format!("iperf3 -s -p {} -D", default_port + n))?; 1592 } 1593 1594 thread::sleep(Duration::new(1, 0)); 1595 1596 // 2. start the iperf3 client on host to measure RX through-put 1597 let mut clients = Vec::new(); 1598 for n in 0..queue_pairs { 1599 let mut cmd = Command::new("iperf3"); 1600 cmd.args([ 1601 "-J", // Output in JSON format 1602 "-c", 1603 &guest.network.guest_ip, 1604 "-p", 1605 &format!("{}", default_port + n), 1606 "-t", 1607 &format!("{test_timeout}"), 1608 "-i", 1609 "0", 1610 ]); 1611 // For measuring the guest transmit throughput (as a sender), 1612 // use reverse mode of the iperf3 client on the host 1613 if !receive { 1614 cmd.args(["-R"]); 1615 } 1616 // Use UDP stream to measure packets per second. The bitrate is set to 1617 // 1T to make sure it saturates the link. 1618 if !bandwidth { 1619 cmd.args(["-u", "-b", "1T"]); 1620 } 1621 let client = cmd 1622 .stderr(Stdio::piped()) 1623 .stdout(Stdio::piped()) 1624 .spawn() 1625 .map_err(Error::Spawn)?; 1626 1627 clients.push(client); 1628 } 1629 1630 let mut err: Option<Error> = None; 1631 let mut results = Vec::new(); 1632 let mut failed = false; 1633 for c in clients { 1634 let mut c = c; 1635 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5) { 1636 err = Some(Error::WaitTimeout(e)); 1637 failed = true; 1638 } 1639 1640 if !failed { 1641 // Safe to unwrap as we know the child has terminated successfully 1642 let output = c.wait_with_output().unwrap(); 1643 results.push(parse_iperf3_output(&output.stdout, receive, bandwidth)?); 1644 } else { 1645 let _ = c.kill(); 1646 let output = c.wait_with_output().unwrap(); 1647 println!( 1648 "=============== Client output [Error] ===============\n\n{}\n\n===========end============\n\n", 1649 String::from_utf8_lossy(&output.stdout) 1650 ); 1651 } 1652 } 1653 1654 if let Some(e) = err { 1655 Err(e) 1656 } else { 1657 Ok(results.iter().sum()) 1658 } 1659 } 1660 1661 pub fn parse_ethr_latency_output(output: &[u8]) -> Result<Vec<f64>, Error> { 1662 std::panic::catch_unwind(|| { 1663 let s = String::from_utf8_lossy(output); 1664 let mut latency = Vec::new(); 1665 for l in s.lines() { 1666 let v: Value = serde_json::from_str(l).expect("'ethr' parse error: invalid json line"); 1667 // Skip header/summary lines 1668 if let Some(avg) = v["Avg"].as_str() { 1669 // Assume the latency unit is always "us" 1670 latency.push( 1671 avg.split("us").collect::<Vec<&str>>()[0] 1672 .parse::<f64>() 1673 .expect("'ethr' parse error: invalid 'Avg' entry"), 1674 ); 1675 } 1676 } 1677 1678 assert!( 1679 !latency.is_empty(), 1680 "'ethr' parse error: no valid latency data found" 1681 ); 1682 1683 latency 1684 }) 1685 .map_err(|_| { 1686 eprintln!( 1687 "=== Start ethr output ===\n\n{}\n\n=== End ethr output ===\n\n", 1688 String::from_utf8_lossy(output) 1689 ); 1690 Error::EthrLogParse 1691 }) 1692 } 1693 1694 pub fn measure_virtio_net_latency(guest: &Guest, test_timeout: u32) -> Result<Vec<f64>, Error> { 1695 // copy the 'ethr' tool to the guest image 1696 let ethr_path = "/usr/local/bin/ethr"; 1697 let ethr_remote_path = "/tmp/ethr"; 1698 scp_to_guest( 1699 Path::new(ethr_path), 1700 Path::new(ethr_remote_path), 1701 &guest.network.guest_ip, 1702 //DEFAULT_SSH_RETRIES, 1703 1, 1704 DEFAULT_SSH_TIMEOUT, 1705 )?; 1706 1707 // Start the ethr server on the guest 1708 guest.ssh_command(&format!("{ethr_remote_path} -s &> /dev/null &"))?; 1709 1710 thread::sleep(Duration::new(10, 0)); 1711 1712 // Start the ethr client on the host 1713 let log_file = guest 1714 .tmp_dir 1715 .as_path() 1716 .join("ethr.client.log") 1717 .to_str() 1718 .unwrap() 1719 .to_string(); 1720 let mut c = Command::new(ethr_path) 1721 .args([ 1722 "-c", 1723 &guest.network.guest_ip, 1724 "-t", 1725 "l", 1726 "-o", 1727 &log_file, // file output is JSON format 1728 "-d", 1729 &format!("{test_timeout}s"), 1730 ]) 1731 .stderr(Stdio::piped()) 1732 .stdout(Stdio::piped()) 1733 .spawn() 1734 .map_err(Error::Spawn)?; 1735 1736 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5).map_err(Error::WaitTimeout) 1737 { 1738 let _ = c.kill(); 1739 return Err(e); 1740 } 1741 1742 // Parse the ethr latency test output 1743 let content = fs::read(log_file).map_err(Error::EthrLogFile)?; 1744 parse_ethr_latency_output(&content) 1745 } 1746