1 // Copyright © 2021 Intel Corporation 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 6 #![allow(clippy::undocumented_unsafe_blocks)] 7 8 use once_cell::sync::Lazy; 9 use serde_json::Value; 10 use ssh2::Session; 11 use std::env; 12 use std::ffi::OsStr; 13 use std::io; 14 use std::io::{Read, Write}; 15 use std::net::TcpListener; 16 use std::net::TcpStream; 17 use std::os::unix::fs::PermissionsExt; 18 use std::os::unix::io::{AsRawFd, FromRawFd}; 19 use std::path::Path; 20 use std::process::{Child, Command, ExitStatus, Output, Stdio}; 21 use std::str::FromStr; 22 use std::sync::Mutex; 23 use std::thread; 24 use std::time::Duration; 25 use std::{fmt, fs}; 26 use vmm_sys_util::tempdir::TempDir; 27 use wait_timeout::ChildExt; 28 29 #[derive(Debug)] 30 pub enum WaitTimeoutError { 31 Timedout, 32 ExitStatus, 33 General(std::io::Error), 34 } 35 36 #[derive(Debug)] 37 pub enum Error { 38 Parsing(std::num::ParseIntError), 39 SshCommand(SshCommandError), 40 WaitForBoot(WaitForBootError), 41 EthrLogFile(std::io::Error), 42 EthrLogParse, 43 FioOutputParse, 44 Iperf3Parse, 45 Spawn(std::io::Error), 46 WaitTimeout(WaitTimeoutError), 47 } 48 49 impl From<SshCommandError> for Error { 50 fn from(e: SshCommandError) -> Self { 51 Self::SshCommand(e) 52 } 53 } 54 55 pub struct GuestNetworkConfig { 56 pub guest_ip: String, 57 pub l2_guest_ip1: String, 58 pub l2_guest_ip2: String, 59 pub l2_guest_ip3: String, 60 pub host_ip: String, 61 pub guest_mac: String, 62 pub l2_guest_mac1: String, 63 pub l2_guest_mac2: String, 64 pub l2_guest_mac3: String, 65 pub tcp_listener_port: u16, 66 } 67 68 pub const DEFAULT_TCP_LISTENER_MESSAGE: &str = "booted"; 69 pub const DEFAULT_TCP_LISTENER_PORT: u16 = 8000; 70 pub const DEFAULT_TCP_LISTENER_TIMEOUT: i32 = 120; 71 72 #[derive(Debug)] 73 pub enum WaitForBootError { 74 EpollWait(std::io::Error), 75 Listen(std::io::Error), 76 EpollWaitTimeout, 77 WrongGuestAddr, 78 Accept(std::io::Error), 79 } 80 81 impl GuestNetworkConfig { 82 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), WaitForBootError> { 83 let start = std::time::Instant::now(); 84 // The 'port' is unique per 'GUEST' and listening to wild-card ip avoids retrying on 'TcpListener::bind()' 85 let listen_addr = format!("0.0.0.0:{}", self.tcp_listener_port); 86 let expected_guest_addr = self.guest_ip.as_str(); 87 let mut s = String::new(); 88 let timeout = match custom_timeout { 89 Some(t) => t, 90 None => DEFAULT_TCP_LISTENER_TIMEOUT, 91 }; 92 93 match (|| -> Result<(), WaitForBootError> { 94 let listener = 95 TcpListener::bind(listen_addr.as_str()).map_err(WaitForBootError::Listen)?; 96 listener 97 .set_nonblocking(true) 98 .expect("Cannot set non-blocking for tcp listener"); 99 100 // Reply on epoll w/ timeout to wait for guest connections faithfully 101 let epoll_fd = epoll::create(true).expect("Cannot create epoll fd"); 102 // Use 'File' to enforce closing on 'epoll_fd' 103 let _epoll_file = unsafe { fs::File::from_raw_fd(epoll_fd) }; 104 epoll::ctl( 105 epoll_fd, 106 epoll::ControlOptions::EPOLL_CTL_ADD, 107 listener.as_raw_fd(), 108 epoll::Event::new(epoll::Events::EPOLLIN, 0), 109 ) 110 .expect("Cannot add 'tcp_listener' event to epoll"); 111 let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); 1]; 112 loop { 113 let num_events = match epoll::wait(epoll_fd, timeout * 1000_i32, &mut events[..]) { 114 Ok(num_events) => Ok(num_events), 115 Err(e) => match e.raw_os_error() { 116 Some(libc::EAGAIN) | Some(libc::EINTR) => continue, 117 _ => Err(e), 118 }, 119 } 120 .map_err(WaitForBootError::EpollWait)?; 121 if num_events == 0 { 122 return Err(WaitForBootError::EpollWaitTimeout); 123 } 124 break; 125 } 126 127 match listener.accept() { 128 Ok((_, addr)) => { 129 // Make sure the connection is from the expected 'guest_addr' 130 if addr.ip() != std::net::IpAddr::from_str(expected_guest_addr).unwrap() { 131 s = format!( 132 "Expecting the guest ip '{}' while being connected with ip '{}'", 133 expected_guest_addr, 134 addr.ip() 135 ); 136 return Err(WaitForBootError::WrongGuestAddr); 137 } 138 139 Ok(()) 140 } 141 Err(e) => { 142 s = "TcpListener::accept() failed".to_string(); 143 Err(WaitForBootError::Accept(e)) 144 } 145 } 146 })() { 147 Err(e) => { 148 let duration = start.elapsed(); 149 eprintln!( 150 "\n\n==== Start 'wait_vm_boot' (FAILED) ====\n\n\ 151 duration =\"{duration:?}, timeout = {timeout}s\"\n\ 152 listen_addr=\"{listen_addr}\"\n\ 153 expected_guest_addr=\"{expected_guest_addr}\"\n\ 154 message=\"{s}\"\n\ 155 error=\"{e:?}\"\n\ 156 \n==== End 'wait_vm_boot' outout ====\n\n" 157 ); 158 159 Err(e) 160 } 161 Ok(_) => Ok(()), 162 } 163 } 164 } 165 166 pub enum DiskType { 167 OperatingSystem, 168 CloudInit, 169 } 170 171 pub trait DiskConfig { 172 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig); 173 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String; 174 fn disk(&self, disk_type: DiskType) -> Option<String>; 175 } 176 177 #[derive(Clone)] 178 pub struct UbuntuDiskConfig { 179 osdisk_path: String, 180 cloudinit_path: String, 181 image_name: String, 182 } 183 184 impl UbuntuDiskConfig { 185 pub fn new(image_name: String) -> Self { 186 UbuntuDiskConfig { 187 image_name, 188 osdisk_path: String::new(), 189 cloudinit_path: String::new(), 190 } 191 } 192 } 193 194 pub struct WindowsDiskConfig { 195 image_name: String, 196 osdisk_path: String, 197 loopback_device: String, 198 windows_snapshot_cow: String, 199 windows_snapshot: String, 200 } 201 202 impl WindowsDiskConfig { 203 pub fn new(image_name: String) -> Self { 204 WindowsDiskConfig { 205 image_name, 206 osdisk_path: String::new(), 207 loopback_device: String::new(), 208 windows_snapshot_cow: String::new(), 209 windows_snapshot: String::new(), 210 } 211 } 212 } 213 214 impl Drop for WindowsDiskConfig { 215 fn drop(&mut self) { 216 // dmsetup remove windows-snapshot-1 217 std::process::Command::new("dmsetup") 218 .arg("remove") 219 .arg(self.windows_snapshot.as_str()) 220 .output() 221 .expect("Expect removing Windows snapshot with 'dmsetup' to succeed"); 222 223 // dmsetup remove windows-snapshot-cow-1 224 std::process::Command::new("dmsetup") 225 .arg("remove") 226 .arg(self.windows_snapshot_cow.as_str()) 227 .output() 228 .expect("Expect removing Windows snapshot CoW with 'dmsetup' to succeed"); 229 230 // losetup -d <loopback_device> 231 std::process::Command::new("losetup") 232 .args(["-d", self.loopback_device.as_str()]) 233 .output() 234 .expect("Expect removing loopback device to succeed"); 235 } 236 } 237 238 impl DiskConfig for UbuntuDiskConfig { 239 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String { 240 let cloudinit_file_path = 241 String::from(tmp_dir.as_path().join("cloudinit").to_str().unwrap()); 242 243 let cloud_init_directory = tmp_dir.as_path().join("cloud-init").join("ubuntu"); 244 245 fs::create_dir_all(&cloud_init_directory) 246 .expect("Expect creating cloud-init directory to succeed"); 247 248 let source_file_dir = std::env::current_dir() 249 .unwrap() 250 .join("test_data") 251 .join("cloud-init") 252 .join("ubuntu"); 253 254 vec!["meta-data"].iter().for_each(|x| { 255 rate_limited_copy(source_file_dir.join(x), cloud_init_directory.join(x)) 256 .expect("Expect copying cloud-init meta-data to succeed"); 257 }); 258 259 let mut user_data_string = String::new(); 260 fs::File::open(source_file_dir.join("user-data")) 261 .unwrap() 262 .read_to_string(&mut user_data_string) 263 .expect("Expected reading user-data file in to succeed"); 264 user_data_string = user_data_string.replace( 265 "@DEFAULT_TCP_LISTENER_MESSAGE", 266 DEFAULT_TCP_LISTENER_MESSAGE, 267 ); 268 user_data_string = user_data_string.replace("@HOST_IP", &network.host_ip); 269 user_data_string = 270 user_data_string.replace("@TCP_LISTENER_PORT", &network.tcp_listener_port.to_string()); 271 272 fs::File::create(cloud_init_directory.join("user-data")) 273 .unwrap() 274 .write_all(user_data_string.as_bytes()) 275 .expect("Expected writing out user-data to succeed"); 276 277 let mut network_config_string = String::new(); 278 279 fs::File::open(source_file_dir.join("network-config")) 280 .unwrap() 281 .read_to_string(&mut network_config_string) 282 .expect("Expected reading network-config file in to succeed"); 283 284 network_config_string = network_config_string.replace("192.168.2.1", &network.host_ip); 285 network_config_string = network_config_string.replace("192.168.2.2", &network.guest_ip); 286 network_config_string = network_config_string.replace("192.168.2.3", &network.l2_guest_ip1); 287 network_config_string = network_config_string.replace("192.168.2.4", &network.l2_guest_ip2); 288 network_config_string = network_config_string.replace("192.168.2.5", &network.l2_guest_ip3); 289 network_config_string = 290 network_config_string.replace("12:34:56:78:90:ab", &network.guest_mac); 291 network_config_string = 292 network_config_string.replace("de:ad:be:ef:12:34", &network.l2_guest_mac1); 293 network_config_string = 294 network_config_string.replace("de:ad:be:ef:34:56", &network.l2_guest_mac2); 295 network_config_string = 296 network_config_string.replace("de:ad:be:ef:56:78", &network.l2_guest_mac3); 297 298 fs::File::create(cloud_init_directory.join("network-config")) 299 .unwrap() 300 .write_all(network_config_string.as_bytes()) 301 .expect("Expected writing out network-config to succeed"); 302 303 std::process::Command::new("mkdosfs") 304 .args(["-n", "cidata"]) 305 .args(["-C", cloudinit_file_path.as_str()]) 306 .arg("8192") 307 .output() 308 .expect("Expect creating disk image to succeed"); 309 310 vec!["user-data", "meta-data", "network-config"] 311 .iter() 312 .for_each(|x| { 313 std::process::Command::new("mcopy") 314 .arg("-o") 315 .args(["-i", cloudinit_file_path.as_str()]) 316 .args(["-s", cloud_init_directory.join(x).to_str().unwrap(), "::"]) 317 .output() 318 .expect("Expect copying files to disk image to succeed"); 319 }); 320 321 cloudinit_file_path 322 } 323 324 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig) { 325 let mut workload_path = dirs::home_dir().unwrap(); 326 workload_path.push("workloads"); 327 328 let mut osdisk_base_path = workload_path; 329 osdisk_base_path.push(&self.image_name); 330 331 let osdisk_path = String::from(tmp_dir.as_path().join("osdisk.img").to_str().unwrap()); 332 let cloudinit_path = self.prepare_cloudinit(tmp_dir, network); 333 334 rate_limited_copy(osdisk_base_path, &osdisk_path) 335 .expect("copying of OS source disk image failed"); 336 337 self.cloudinit_path = cloudinit_path; 338 self.osdisk_path = osdisk_path; 339 } 340 341 fn disk(&self, disk_type: DiskType) -> Option<String> { 342 match disk_type { 343 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 344 DiskType::CloudInit => Some(self.cloudinit_path.clone()), 345 } 346 } 347 } 348 349 impl DiskConfig for WindowsDiskConfig { 350 fn prepare_cloudinit(&self, _tmp_dir: &TempDir, _network: &GuestNetworkConfig) -> String { 351 String::new() 352 } 353 354 fn prepare_files(&mut self, tmp_dir: &TempDir, _network: &GuestNetworkConfig) { 355 let mut workload_path = dirs::home_dir().unwrap(); 356 workload_path.push("workloads"); 357 358 let mut osdisk_path = workload_path; 359 osdisk_path.push(&self.image_name); 360 361 let osdisk_blk_size = fs::metadata(osdisk_path) 362 .expect("Expect retrieving Windows image metadata") 363 .len() 364 >> 9; 365 366 let snapshot_cow_path = 367 String::from(tmp_dir.as_path().join("snapshot_cow").to_str().unwrap()); 368 369 // Create and truncate CoW file for device mapper 370 let cow_file_size: u64 = 1 << 30; 371 let cow_file_blk_size = cow_file_size >> 9; 372 let cow_file = std::fs::File::create(snapshot_cow_path.as_str()) 373 .expect("Expect creating CoW image to succeed"); 374 cow_file 375 .set_len(cow_file_size) 376 .expect("Expect truncating CoW image to succeed"); 377 378 // losetup --find --show /tmp/snapshot_cow 379 let loopback_device = std::process::Command::new("losetup") 380 .arg("--find") 381 .arg("--show") 382 .arg(snapshot_cow_path.as_str()) 383 .output() 384 .expect("Expect creating loopback device from snapshot CoW image to succeed"); 385 386 self.loopback_device = String::from_utf8_lossy(&loopback_device.stdout) 387 .trim() 388 .to_string(); 389 390 let random_extension = tmp_dir.as_path().file_name().unwrap(); 391 let windows_snapshot_cow = format!( 392 "windows-snapshot-cow-{}", 393 random_extension.to_str().unwrap() 394 ); 395 396 // dmsetup create windows-snapshot-cow-1 --table '0 2097152 linear /dev/loop1 0' 397 std::process::Command::new("dmsetup") 398 .arg("create") 399 .arg(windows_snapshot_cow.as_str()) 400 .args([ 401 "--table", 402 format!("0 {} linear {} 0", cow_file_blk_size, self.loopback_device).as_str(), 403 ]) 404 .output() 405 .expect("Expect creating Windows snapshot CoW with 'dmsetup' to succeed"); 406 407 let windows_snapshot = format!("windows-snapshot-{}", random_extension.to_str().unwrap()); 408 409 // dmsetup mknodes 410 std::process::Command::new("dmsetup") 411 .arg("mknodes") 412 .output() 413 .expect("Expect device mapper nodes to be ready"); 414 415 // dmsetup create windows-snapshot-1 --table '0 41943040 snapshot /dev/mapper/windows-base /dev/mapper/windows-snapshot-cow-1 P 8' 416 std::process::Command::new("dmsetup") 417 .arg("create") 418 .arg(windows_snapshot.as_str()) 419 .args([ 420 "--table", 421 format!( 422 "0 {} snapshot /dev/mapper/windows-base /dev/mapper/{} P 8", 423 osdisk_blk_size, 424 windows_snapshot_cow.as_str() 425 ) 426 .as_str(), 427 ]) 428 .output() 429 .expect("Expect creating Windows snapshot with 'dmsetup' to succeed"); 430 431 // dmsetup mknodes 432 std::process::Command::new("dmsetup") 433 .arg("mknodes") 434 .output() 435 .expect("Expect device mapper nodes to be ready"); 436 437 self.osdisk_path = format!("/dev/mapper/{windows_snapshot}"); 438 self.windows_snapshot_cow = windows_snapshot_cow; 439 self.windows_snapshot = windows_snapshot; 440 } 441 442 fn disk(&self, disk_type: DiskType) -> Option<String> { 443 match disk_type { 444 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 445 DiskType::CloudInit => None, 446 } 447 } 448 } 449 450 pub fn rate_limited_copy<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<u64> { 451 for i in 0..10 { 452 let free_bytes = unsafe { 453 let mut stats = std::mem::MaybeUninit::zeroed(); 454 let fs_name = std::ffi::CString::new("/tmp").unwrap(); 455 libc::statvfs(fs_name.as_ptr(), stats.as_mut_ptr()); 456 457 let free_blocks = stats.assume_init().f_bfree; 458 let block_size = stats.assume_init().f_bsize; 459 460 free_blocks * block_size 461 }; 462 463 // Make sure there is at least 6 GiB of space 464 if free_bytes < 6 << 30 { 465 eprintln!("Not enough space on disk ({free_bytes}). Attempt {i} of 10. Sleeping."); 466 thread::sleep(std::time::Duration::new(60, 0)); 467 continue; 468 } 469 470 match fs::copy(&from, &to) { 471 Err(e) => { 472 if let Some(errno) = e.raw_os_error() { 473 if errno == libc::ENOSPC { 474 eprintln!("Copy returned ENOSPC. Attempt {i} of 10. Sleeping."); 475 thread::sleep(std::time::Duration::new(60, 0)); 476 continue; 477 } 478 } 479 return Err(e); 480 } 481 Ok(i) => return Ok(i), 482 } 483 } 484 Err(io::Error::last_os_error()) 485 } 486 487 pub fn handle_child_output( 488 r: Result<(), std::boxed::Box<dyn std::any::Any + std::marker::Send>>, 489 output: &std::process::Output, 490 ) { 491 use std::os::unix::process::ExitStatusExt; 492 if r.is_ok() && output.status.success() { 493 return; 494 } 495 496 match output.status.code() { 497 None => { 498 // Don't treat child.kill() as a problem 499 if output.status.signal() == Some(9) && r.is_ok() { 500 return; 501 } 502 503 eprintln!( 504 "==== child killed by signal: {} ====", 505 output.status.signal().unwrap() 506 ); 507 } 508 Some(code) => { 509 eprintln!("\n\n==== child exit code: {code} ===="); 510 } 511 } 512 513 eprintln!( 514 "\n\n==== Start child stdout ====\n\n{}\n\n==== End child stdout ====", 515 String::from_utf8_lossy(&output.stdout) 516 ); 517 eprintln!( 518 "\n\n==== Start child stderr ====\n\n{}\n\n==== End child stderr ====", 519 String::from_utf8_lossy(&output.stderr) 520 ); 521 522 panic!("Test failed") 523 } 524 525 #[derive(Debug)] 526 pub struct PasswordAuth { 527 pub username: String, 528 pub password: String, 529 } 530 531 pub const DEFAULT_SSH_RETRIES: u8 = 6; 532 pub const DEFAULT_SSH_TIMEOUT: u8 = 10; 533 534 #[derive(Debug)] 535 pub enum SshCommandError { 536 Connection(std::io::Error), 537 Handshake(ssh2::Error), 538 Authentication(ssh2::Error), 539 ChannelSession(ssh2::Error), 540 Command(ssh2::Error), 541 ExitStatus(ssh2::Error), 542 NonZeroExitStatus(i32), 543 FileRead(std::io::Error), 544 FileMetadata(std::io::Error), 545 ScpSend(ssh2::Error), 546 WriteAll(std::io::Error), 547 SendEof(ssh2::Error), 548 WaitEof(ssh2::Error), 549 } 550 551 fn scp_to_guest_with_auth( 552 path: &Path, 553 remote_path: &Path, 554 auth: &PasswordAuth, 555 ip: &str, 556 retries: u8, 557 timeout: u8, 558 ) -> Result<(), SshCommandError> { 559 let mut counter = 0; 560 loop { 561 match (|| -> Result<(), SshCommandError> { 562 let tcp = 563 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?; 564 let mut sess = Session::new().unwrap(); 565 sess.set_tcp_stream(tcp); 566 sess.handshake().map_err(SshCommandError::Handshake)?; 567 568 sess.userauth_password(&auth.username, &auth.password) 569 .map_err(SshCommandError::Authentication)?; 570 assert!(sess.authenticated()); 571 572 let content = fs::read(path).map_err(SshCommandError::FileRead)?; 573 let mode = fs::metadata(path) 574 .map_err(SshCommandError::FileMetadata)? 575 .permissions() 576 .mode() 577 & 0o777; 578 579 let mut channel = sess 580 .scp_send(remote_path, mode as i32, content.len() as u64, None) 581 .map_err(SshCommandError::ScpSend)?; 582 channel 583 .write_all(&content) 584 .map_err(SshCommandError::WriteAll)?; 585 channel.send_eof().map_err(SshCommandError::SendEof)?; 586 channel.wait_eof().map_err(SshCommandError::WaitEof)?; 587 588 // Intentionally ignore these results here as their failure 589 // does not precipitate a repeat 590 let _ = channel.close(); 591 let _ = channel.wait_close(); 592 593 Ok(()) 594 })() { 595 Ok(_) => break, 596 Err(e) => { 597 counter += 1; 598 if counter >= retries { 599 eprintln!( 600 "\n\n==== Start scp command output (FAILED) ====\n\n\ 601 path =\"{path:?}\"\n\ 602 remote_path =\"{remote_path:?}\"\n\ 603 auth=\"{auth:#?}\"\n\ 604 ip=\"{ip}\"\n\ 605 error=\"{e:?}\"\n\ 606 \n==== End scp command outout ====\n\n" 607 ); 608 609 return Err(e); 610 } 611 } 612 }; 613 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 614 } 615 Ok(()) 616 } 617 618 pub fn scp_to_guest( 619 path: &Path, 620 remote_path: &Path, 621 ip: &str, 622 retries: u8, 623 timeout: u8, 624 ) -> Result<(), SshCommandError> { 625 scp_to_guest_with_auth( 626 path, 627 remote_path, 628 &PasswordAuth { 629 username: String::from("cloud"), 630 password: String::from("cloud123"), 631 }, 632 ip, 633 retries, 634 timeout, 635 ) 636 } 637 638 pub fn ssh_command_ip_with_auth( 639 command: &str, 640 auth: &PasswordAuth, 641 ip: &str, 642 retries: u8, 643 timeout: u8, 644 ) -> Result<String, SshCommandError> { 645 let mut s = String::new(); 646 647 let mut counter = 0; 648 loop { 649 match (|| -> Result<(), SshCommandError> { 650 let tcp = 651 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?; 652 let mut sess = Session::new().unwrap(); 653 sess.set_tcp_stream(tcp); 654 sess.handshake().map_err(SshCommandError::Handshake)?; 655 656 sess.userauth_password(&auth.username, &auth.password) 657 .map_err(SshCommandError::Authentication)?; 658 assert!(sess.authenticated()); 659 660 let mut channel = sess 661 .channel_session() 662 .map_err(SshCommandError::ChannelSession)?; 663 channel.exec(command).map_err(SshCommandError::Command)?; 664 665 // Intentionally ignore these results here as their failure 666 // does not precipitate a repeat 667 let _ = channel.read_to_string(&mut s); 668 let _ = channel.close(); 669 let _ = channel.wait_close(); 670 671 let status = channel.exit_status().map_err(SshCommandError::ExitStatus)?; 672 673 if status != 0 { 674 Err(SshCommandError::NonZeroExitStatus(status)) 675 } else { 676 Ok(()) 677 } 678 })() { 679 Ok(_) => break, 680 Err(e) => { 681 counter += 1; 682 if counter >= retries { 683 eprintln!( 684 "\n\n==== Start ssh command output (FAILED) ====\n\n\ 685 command=\"{command}\"\n\ 686 auth=\"{auth:#?}\"\n\ 687 ip=\"{ip}\"\n\ 688 output=\"{s}\"\n\ 689 error=\"{e:?}\"\n\ 690 \n==== End ssh command outout ====\n\n" 691 ); 692 693 return Err(e); 694 } 695 } 696 }; 697 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 698 } 699 Ok(s) 700 } 701 702 pub fn ssh_command_ip( 703 command: &str, 704 ip: &str, 705 retries: u8, 706 timeout: u8, 707 ) -> Result<String, SshCommandError> { 708 ssh_command_ip_with_auth( 709 command, 710 &PasswordAuth { 711 username: String::from("cloud"), 712 password: String::from("cloud123"), 713 }, 714 ip, 715 retries, 716 timeout, 717 ) 718 } 719 720 pub fn exec_host_command_status(command: &str) -> ExitStatus { 721 std::process::Command::new("bash") 722 .args(["-c", command]) 723 .status() 724 .unwrap_or_else(|_| panic!("Expected '{command}' to run")) 725 } 726 727 pub fn exec_host_command_output(command: &str) -> Output { 728 std::process::Command::new("bash") 729 .args(["-c", command]) 730 .output() 731 .unwrap_or_else(|_| panic!("Expected '{command}' to run")) 732 } 733 734 pub const PIPE_SIZE: i32 = 32 << 20; 735 736 static NEXT_VM_ID: Lazy<Mutex<u8>> = Lazy::new(|| Mutex::new(1)); 737 738 pub struct Guest { 739 pub tmp_dir: TempDir, 740 pub disk_config: Box<dyn DiskConfig>, 741 pub network: GuestNetworkConfig, 742 } 743 744 // Safe to implement as we know we have no interior mutability 745 impl std::panic::RefUnwindSafe for Guest {} 746 747 impl Guest { 748 pub fn new_from_ip_range(mut disk_config: Box<dyn DiskConfig>, class: &str, id: u8) -> Self { 749 let tmp_dir = TempDir::new_with_prefix("/tmp/ch").unwrap(); 750 751 let network = GuestNetworkConfig { 752 guest_ip: format!("{class}.{id}.2"), 753 l2_guest_ip1: format!("{class}.{id}.3"), 754 l2_guest_ip2: format!("{class}.{id}.4"), 755 l2_guest_ip3: format!("{class}.{id}.5"), 756 host_ip: format!("{class}.{id}.1"), 757 guest_mac: format!("12:34:56:78:90:{id:02x}"), 758 l2_guest_mac1: format!("de:ad:be:ef:12:{id:02x}"), 759 l2_guest_mac2: format!("de:ad:be:ef:34:{id:02x}"), 760 l2_guest_mac3: format!("de:ad:be:ef:56:{id:02x}"), 761 tcp_listener_port: DEFAULT_TCP_LISTENER_PORT + id as u16, 762 }; 763 764 disk_config.prepare_files(&tmp_dir, &network); 765 766 Guest { 767 tmp_dir, 768 disk_config, 769 network, 770 } 771 } 772 773 pub fn new(disk_config: Box<dyn DiskConfig>) -> Self { 774 let mut guard = NEXT_VM_ID.lock().unwrap(); 775 let id = *guard; 776 *guard = id + 1; 777 778 Self::new_from_ip_range(disk_config, "192.168", id) 779 } 780 781 pub fn default_net_string(&self) -> String { 782 format!( 783 "tap=,mac={},ip={},mask=255.255.255.0", 784 self.network.guest_mac, self.network.host_ip 785 ) 786 } 787 788 pub fn default_net_string_w_iommu(&self) -> String { 789 format!( 790 "tap=,mac={},ip={},mask=255.255.255.0,iommu=on", 791 self.network.guest_mac, self.network.host_ip 792 ) 793 } 794 795 pub fn default_net_string_w_mtu(&self, mtu: u16) -> String { 796 format!( 797 "tap=,mac={},ip={},mask=255.255.255.0,mtu={}", 798 self.network.guest_mac, self.network.host_ip, mtu 799 ) 800 } 801 802 pub fn ssh_command(&self, command: &str) -> Result<String, SshCommandError> { 803 ssh_command_ip( 804 command, 805 &self.network.guest_ip, 806 DEFAULT_SSH_RETRIES, 807 DEFAULT_SSH_TIMEOUT, 808 ) 809 } 810 811 #[cfg(target_arch = "x86_64")] 812 pub fn ssh_command_l1(&self, command: &str) -> Result<String, SshCommandError> { 813 ssh_command_ip( 814 command, 815 &self.network.guest_ip, 816 DEFAULT_SSH_RETRIES, 817 DEFAULT_SSH_TIMEOUT, 818 ) 819 } 820 821 #[cfg(target_arch = "x86_64")] 822 pub fn ssh_command_l2_1(&self, command: &str) -> Result<String, SshCommandError> { 823 ssh_command_ip( 824 command, 825 &self.network.l2_guest_ip1, 826 DEFAULT_SSH_RETRIES, 827 DEFAULT_SSH_TIMEOUT, 828 ) 829 } 830 831 #[cfg(target_arch = "x86_64")] 832 pub fn ssh_command_l2_2(&self, command: &str) -> Result<String, SshCommandError> { 833 ssh_command_ip( 834 command, 835 &self.network.l2_guest_ip2, 836 DEFAULT_SSH_RETRIES, 837 DEFAULT_SSH_TIMEOUT, 838 ) 839 } 840 841 #[cfg(target_arch = "x86_64")] 842 pub fn ssh_command_l2_3(&self, command: &str) -> Result<String, SshCommandError> { 843 ssh_command_ip( 844 command, 845 &self.network.l2_guest_ip3, 846 DEFAULT_SSH_RETRIES, 847 DEFAULT_SSH_TIMEOUT, 848 ) 849 } 850 851 pub fn api_create_body(&self, cpu_count: u8, kernel_path: &str, kernel_cmd: &str) -> String { 852 format! {"{{\"cpus\":{{\"boot_vcpus\":{},\"max_vcpus\":{}}},\"payload\":{{\"kernel\":\"{}\",\"cmdline\": \"{}\"}},\"net\":[{{\"ip\":\"{}\", \"mask\":\"255.255.255.0\", \"mac\":\"{}\"}}], \"disks\":[{{\"path\":\"{}\"}}, {{\"path\":\"{}\"}}]}}", 853 cpu_count, 854 cpu_count, 855 kernel_path, 856 kernel_cmd, 857 self.network.host_ip, 858 self.network.guest_mac, 859 self.disk_config.disk(DiskType::OperatingSystem).unwrap().as_str(), 860 self.disk_config.disk(DiskType::CloudInit).unwrap().as_str(), 861 } 862 } 863 864 pub fn get_cpu_count(&self) -> Result<u32, Error> { 865 self.ssh_command("grep -c processor /proc/cpuinfo")? 866 .trim() 867 .parse() 868 .map_err(Error::Parsing) 869 } 870 871 #[cfg(target_arch = "x86_64")] 872 pub fn get_initial_apicid(&self) -> Result<u32, Error> { 873 self.ssh_command("grep \"initial apicid\" /proc/cpuinfo | grep -o \"[0-9]*\"")? 874 .trim() 875 .parse() 876 .map_err(Error::Parsing) 877 } 878 879 pub fn get_total_memory(&self) -> Result<u32, Error> { 880 self.ssh_command("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 881 .trim() 882 .parse() 883 .map_err(Error::Parsing) 884 } 885 886 #[cfg(target_arch = "x86_64")] 887 pub fn get_total_memory_l2(&self) -> Result<u32, Error> { 888 self.ssh_command_l2_1("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 889 .trim() 890 .parse() 891 .map_err(Error::Parsing) 892 } 893 894 pub fn get_numa_node_memory(&self, node_id: usize) -> Result<u32, Error> { 895 self.ssh_command( 896 format!( 897 "grep MemTotal /sys/devices/system/node/node{node_id}/meminfo \ 898 | cut -d \":\" -f 2 | grep -o \"[0-9]*\"" 899 ) 900 .as_str(), 901 )? 902 .trim() 903 .parse() 904 .map_err(Error::Parsing) 905 } 906 907 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), Error> { 908 self.network 909 .wait_vm_boot(custom_timeout) 910 .map_err(Error::WaitForBoot) 911 } 912 913 pub fn check_numa_node_cpus(&self, node_id: usize, cpus: Vec<usize>) -> Result<(), Error> { 914 for cpu in cpus.iter() { 915 let cmd = format!("[ -d \"/sys/devices/system/node/node{node_id}/cpu{cpu}\" ]"); 916 self.ssh_command(cmd.as_str())?; 917 } 918 919 Ok(()) 920 } 921 922 pub fn check_numa_node_distances( 923 &self, 924 node_id: usize, 925 distances: &str, 926 ) -> Result<bool, Error> { 927 let cmd = format!("cat /sys/devices/system/node/node{node_id}/distance"); 928 if self.ssh_command(cmd.as_str())?.trim() == distances { 929 Ok(true) 930 } else { 931 Ok(false) 932 } 933 } 934 935 pub fn check_numa_common( 936 &self, 937 mem_ref: Option<&[u32]>, 938 node_ref: Option<&[Vec<usize>]>, 939 distance_ref: Option<&[&str]>, 940 ) { 941 if let Some(mem_ref) = mem_ref { 942 // Check each NUMA node has been assigned the right amount of 943 // memory. 944 for (i, &m) in mem_ref.iter().enumerate() { 945 assert!(self.get_numa_node_memory(i).unwrap_or_default() > m); 946 } 947 } 948 949 if let Some(node_ref) = node_ref { 950 // Check each NUMA node has been assigned the right CPUs set. 951 for (i, n) in node_ref.iter().enumerate() { 952 self.check_numa_node_cpus(i, n.clone()).unwrap(); 953 } 954 } 955 956 if let Some(distance_ref) = distance_ref { 957 // Check each NUMA node has been assigned the right distances. 958 for (i, &d) in distance_ref.iter().enumerate() { 959 assert!(self.check_numa_node_distances(i, d).unwrap()); 960 } 961 } 962 } 963 964 #[cfg(target_arch = "x86_64")] 965 pub fn check_sgx_support(&self) -> Result<(), Error> { 966 self.ssh_command( 967 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX: \ 968 Software Guard Extensions supported = true'", 969 )?; 970 self.ssh_command( 971 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX_LC: \ 972 SGX launch config supported = true'", 973 )?; 974 self.ssh_command( 975 "cpuid -l 0x12 -s 0 | tr -s [:space:] | grep -q 'SGX1 \ 976 supported = true'", 977 )?; 978 979 Ok(()) 980 } 981 982 pub fn get_pci_bridge_class(&self) -> Result<String, Error> { 983 Ok(self 984 .ssh_command("cat /sys/bus/pci/devices/0000:00:00.0/class")? 985 .trim() 986 .to_string()) 987 } 988 989 pub fn get_pci_device_ids(&self) -> Result<String, Error> { 990 Ok(self 991 .ssh_command("cat /sys/bus/pci/devices/*/device")? 992 .trim() 993 .to_string()) 994 } 995 996 pub fn get_pci_vendor_ids(&self) -> Result<String, Error> { 997 Ok(self 998 .ssh_command("cat /sys/bus/pci/devices/*/vendor")? 999 .trim() 1000 .to_string()) 1001 } 1002 1003 pub fn does_device_vendor_pair_match( 1004 &self, 1005 device_id: &str, 1006 vendor_id: &str, 1007 ) -> Result<bool, Error> { 1008 // We are checking if console device's device id and vendor id pair matches 1009 let devices = self.get_pci_device_ids()?; 1010 let devices: Vec<&str> = devices.split('\n').collect(); 1011 let vendors = self.get_pci_vendor_ids()?; 1012 let vendors: Vec<&str> = vendors.split('\n').collect(); 1013 1014 for (index, d_id) in devices.iter().enumerate() { 1015 if *d_id == device_id { 1016 if let Some(v_id) = vendors.get(index) { 1017 if *v_id == vendor_id { 1018 return Ok(true); 1019 } 1020 } 1021 } 1022 } 1023 1024 Ok(false) 1025 } 1026 1027 pub fn check_vsock(&self, socket: &str) { 1028 // Listen from guest on vsock CID=3 PORT=16 1029 // SOCKET-LISTEN:<domain>:<protocol>:<local-address> 1030 let guest_ip = self.network.guest_ip.clone(); 1031 let listen_socat = thread::spawn(move || { 1032 ssh_command_ip("sudo socat - SOCKET-LISTEN:40:0:x00x00x10x00x00x00x03x00x00x00x00x00x00x00 > vsock_log", &guest_ip, DEFAULT_SSH_RETRIES, DEFAULT_SSH_TIMEOUT).unwrap(); 1033 }); 1034 1035 // Make sure socat is listening, which might take a few second on slow systems 1036 thread::sleep(std::time::Duration::new(10, 0)); 1037 1038 // Write something to vsock from the host 1039 assert!(exec_host_command_status(&format!( 1040 "echo -e \"CONNECT 16\\nHelloWorld!\" | socat - UNIX-CONNECT:{socket}" 1041 )) 1042 .success()); 1043 1044 // Wait for the thread to terminate. 1045 listen_socat.join().unwrap(); 1046 1047 assert_eq!( 1048 self.ssh_command("cat vsock_log").unwrap().trim(), 1049 "HelloWorld!" 1050 ); 1051 } 1052 1053 #[cfg(target_arch = "x86_64")] 1054 pub fn check_nvidia_gpu(&self) { 1055 assert!(self.ssh_command("nvidia-smi").unwrap().contains("Tesla T4")); 1056 } 1057 1058 pub fn reboot_linux(&self, current_reboot_count: u32, custom_timeout: Option<i32>) { 1059 let list_boots_cmd = "sudo last | grep -c reboot"; 1060 let boot_count = self 1061 .ssh_command(list_boots_cmd) 1062 .unwrap() 1063 .trim() 1064 .parse::<u32>() 1065 .unwrap_or_default(); 1066 1067 assert_eq!(boot_count, current_reboot_count + 1); 1068 self.ssh_command("sudo reboot").unwrap(); 1069 1070 self.wait_vm_boot(custom_timeout).unwrap(); 1071 let boot_count = self 1072 .ssh_command(list_boots_cmd) 1073 .unwrap() 1074 .trim() 1075 .parse::<u32>() 1076 .unwrap_or_default(); 1077 assert_eq!(boot_count, current_reboot_count + 2); 1078 } 1079 1080 pub fn enable_memory_hotplug(&self) { 1081 self.ssh_command("echo online | sudo tee /sys/devices/system/memory/auto_online_blocks") 1082 .unwrap(); 1083 } 1084 1085 pub fn check_devices_common( 1086 &self, 1087 socket: Option<&String>, 1088 console_text: Option<&String>, 1089 pmem_path: Option<&String>, 1090 ) { 1091 // Check block devices are readable 1092 self.ssh_command("sudo dd if=/dev/vda of=/dev/null bs=1M iflag=direct count=1024") 1093 .unwrap(); 1094 self.ssh_command("sudo dd if=/dev/vdb of=/dev/null bs=1M iflag=direct count=8") 1095 .unwrap(); 1096 // Check if the rng device is readable 1097 self.ssh_command("sudo head -c 1000 /dev/hwrng > /dev/null") 1098 .unwrap(); 1099 // Check vsock 1100 if let Some(socket) = socket { 1101 self.check_vsock(socket.as_str()); 1102 } 1103 // Check if the console is usable 1104 if let Some(console_text) = console_text { 1105 let console_cmd = format!("echo {console_text} | sudo tee /dev/hvc0"); 1106 self.ssh_command(&console_cmd).unwrap(); 1107 } 1108 // The net device is 'automatically' exercised through the above 'ssh' commands 1109 1110 // Check if the pmem device is usable 1111 if let Some(pmem_path) = pmem_path { 1112 assert_eq!( 1113 self.ssh_command(&format!("ls {pmem_path}")).unwrap().trim(), 1114 pmem_path 1115 ); 1116 assert_eq!( 1117 self.ssh_command(&format!("sudo mount {pmem_path} /mnt")) 1118 .unwrap(), 1119 "" 1120 ); 1121 assert_eq!(self.ssh_command("ls /mnt").unwrap(), "lost+found\n"); 1122 self.ssh_command("echo test123 | sudo tee /mnt/test") 1123 .unwrap(); 1124 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1125 assert_eq!(self.ssh_command("ls /mnt").unwrap(), ""); 1126 1127 assert_eq!( 1128 self.ssh_command(&format!("sudo mount {pmem_path} /mnt")) 1129 .unwrap(), 1130 "" 1131 ); 1132 assert_eq!( 1133 self.ssh_command("sudo cat /mnt/test || true") 1134 .unwrap() 1135 .trim(), 1136 "test123" 1137 ); 1138 self.ssh_command("sudo rm /mnt/test").unwrap(); 1139 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1140 } 1141 } 1142 } 1143 1144 pub enum VerbosityLevel { 1145 Warn, 1146 Info, 1147 Debug, 1148 } 1149 1150 impl Default for VerbosityLevel { 1151 fn default() -> Self { 1152 Self::Warn 1153 } 1154 } 1155 1156 impl ToString for VerbosityLevel { 1157 fn to_string(&self) -> String { 1158 use VerbosityLevel::*; 1159 match self { 1160 Warn => "".to_string(), 1161 Info => "-v".to_string(), 1162 Debug => "-vv".to_string(), 1163 } 1164 } 1165 } 1166 1167 pub struct GuestCommand<'a> { 1168 command: Command, 1169 guest: &'a Guest, 1170 capture_output: bool, 1171 print_cmd: bool, 1172 verbosity: VerbosityLevel, 1173 } 1174 1175 impl<'a> GuestCommand<'a> { 1176 pub fn new(guest: &'a Guest) -> Self { 1177 Self::new_with_binary_path(guest, &clh_command("cloud-hypervisor")) 1178 } 1179 1180 pub fn new_with_binary_path(guest: &'a Guest, binary_path: &str) -> Self { 1181 Self { 1182 command: Command::new(binary_path), 1183 guest, 1184 capture_output: false, 1185 print_cmd: true, 1186 verbosity: VerbosityLevel::Info, 1187 } 1188 } 1189 1190 pub fn verbosity(&mut self, verbosity: VerbosityLevel) -> &mut Self { 1191 self.verbosity = verbosity; 1192 self 1193 } 1194 1195 pub fn capture_output(&mut self) -> &mut Self { 1196 self.capture_output = true; 1197 self 1198 } 1199 1200 pub fn set_print_cmd(&mut self, print_cmd: bool) -> &mut Self { 1201 self.print_cmd = print_cmd; 1202 self 1203 } 1204 1205 pub fn spawn(&mut self) -> io::Result<Child> { 1206 use VerbosityLevel::*; 1207 match &self.verbosity { 1208 Warn => {} 1209 Info => { 1210 self.command.arg("-v"); 1211 } 1212 Debug => { 1213 self.command.arg("-vv"); 1214 } 1215 }; 1216 1217 if self.print_cmd { 1218 println!( 1219 "\n\n==== Start cloud-hypervisor command-line ====\n\n\ 1220 {:?}\n\ 1221 \n==== End cloud-hypervisor command-line ====\n\n", 1222 self.command 1223 ); 1224 } 1225 1226 if self.capture_output { 1227 let child = self 1228 .command 1229 .stderr(Stdio::piped()) 1230 .stdout(Stdio::piped()) 1231 .spawn() 1232 .unwrap(); 1233 1234 let fd = child.stdout.as_ref().unwrap().as_raw_fd(); 1235 let pipesize = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1236 if pipesize == -1 { 1237 return Err(io::Error::last_os_error()); 1238 } 1239 let fd = child.stderr.as_ref().unwrap().as_raw_fd(); 1240 let pipesize1 = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1241 if pipesize1 == -1 { 1242 return Err(io::Error::last_os_error()); 1243 } 1244 1245 if pipesize >= PIPE_SIZE && pipesize1 >= PIPE_SIZE { 1246 Ok(child) 1247 } else { 1248 Err(std::io::Error::new( 1249 std::io::ErrorKind::Other, 1250 format!( 1251 "resizing pipe w/ 'fnctl' failed: stdout pipesize {pipesize}, stderr pipesize {pipesize1}" 1252 ), 1253 )) 1254 } 1255 } else { 1256 self.command.spawn() 1257 } 1258 } 1259 1260 pub fn args<I, S>(&mut self, args: I) -> &mut Self 1261 where 1262 I: IntoIterator<Item = S>, 1263 S: AsRef<OsStr>, 1264 { 1265 self.command.args(args); 1266 self 1267 } 1268 1269 pub fn default_disks(&mut self) -> &mut Self { 1270 if self.guest.disk_config.disk(DiskType::CloudInit).is_some() { 1271 self.args([ 1272 "--disk", 1273 format!( 1274 "path={}", 1275 self.guest 1276 .disk_config 1277 .disk(DiskType::OperatingSystem) 1278 .unwrap() 1279 ) 1280 .as_str(), 1281 format!( 1282 "path={}", 1283 self.guest.disk_config.disk(DiskType::CloudInit).unwrap() 1284 ) 1285 .as_str(), 1286 ]) 1287 } else { 1288 self.args([ 1289 "--disk", 1290 format!( 1291 "path={}", 1292 self.guest 1293 .disk_config 1294 .disk(DiskType::OperatingSystem) 1295 .unwrap() 1296 ) 1297 .as_str(), 1298 ]) 1299 } 1300 } 1301 1302 pub fn default_net(&mut self) -> &mut Self { 1303 self.args(["--net", self.guest.default_net_string().as_str()]) 1304 } 1305 } 1306 1307 pub fn clh_command(cmd: &str) -> String { 1308 env::var("BUILD_TARGET").map_or( 1309 format!("target/x86_64-unknown-linux-gnu/release/{cmd}"), 1310 |target| format!("target/{target}/release/{cmd}"), 1311 ) 1312 } 1313 1314 pub fn parse_iperf3_output(output: &[u8], sender: bool) -> Result<f64, Error> { 1315 std::panic::catch_unwind(|| { 1316 let s = String::from_utf8_lossy(output); 1317 let v: Value = serde_json::from_str(&s).expect("'iperf3' parse error: invalid json output"); 1318 1319 let bps: f64 = if sender { 1320 v["end"]["sum_sent"]["bits_per_second"] 1321 .as_f64() 1322 .expect("'iperf3' parse error: missing entry 'end.sum_sent.bits_per_second'") 1323 } else { 1324 v["end"]["sum_received"]["bits_per_second"] 1325 .as_f64() 1326 .expect("'iperf3' parse error: missing entry 'end.sum_received.bits_per_second'") 1327 }; 1328 1329 bps 1330 }) 1331 .map_err(|_| { 1332 eprintln!( 1333 "=============== iperf3 output ===============\n\n{}\n\n===========end============\n\n", 1334 String::from_utf8_lossy(output) 1335 ); 1336 Error::Iperf3Parse 1337 }) 1338 } 1339 1340 pub enum FioOps { 1341 Read, 1342 RandomRead, 1343 Write, 1344 RandomWrite, 1345 ReadWrite, 1346 RandRW, 1347 } 1348 1349 impl fmt::Display for FioOps { 1350 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1351 match self { 1352 FioOps::Read => write!(f, "read"), 1353 FioOps::RandomRead => write!(f, "randread"), 1354 FioOps::Write => write!(f, "write"), 1355 FioOps::RandomWrite => write!(f, "randwrite"), 1356 FioOps::ReadWrite => write!(f, "rw"), 1357 FioOps::RandRW => write!(f, "randrw"), 1358 } 1359 } 1360 } 1361 1362 pub fn parse_fio_output(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1363 std::panic::catch_unwind(|| { 1364 let v: Value = 1365 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1366 let jobs = v["jobs"] 1367 .as_array() 1368 .expect("'fio' parse error: missing entry 'jobs'"); 1369 assert_eq!( 1370 jobs.len(), 1371 num_jobs as usize, 1372 "'fio' parse error: Unexpected number of 'fio' jobs." 1373 ); 1374 1375 let (read, write) = match fio_ops { 1376 FioOps::Read | FioOps::RandomRead => (true, false), 1377 FioOps::Write | FioOps::RandomWrite => (false, true), 1378 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1379 }; 1380 1381 let mut total_bps = 0_f64; 1382 for j in jobs { 1383 if read { 1384 let bytes = j["read"]["io_bytes"] 1385 .as_u64() 1386 .expect("'fio' parse error: missing entry 'read.io_bytes'"); 1387 let runtime = j["read"]["runtime"] 1388 .as_u64() 1389 .expect("'fio' parse error: missing entry 'read.runtime'") 1390 as f64 1391 / 1000_f64; 1392 total_bps += bytes as f64 / runtime; 1393 } 1394 if write { 1395 let bytes = j["write"]["io_bytes"] 1396 .as_u64() 1397 .expect("'fio' parse error: missing entry 'write.io_bytes'"); 1398 let runtime = j["write"]["runtime"] 1399 .as_u64() 1400 .expect("'fio' parse error: missing entry 'write.runtime'") 1401 as f64 1402 / 1000_f64; 1403 total_bps += bytes as f64 / runtime; 1404 } 1405 } 1406 1407 total_bps 1408 }) 1409 .map_err(|_| { 1410 eprintln!( 1411 "=============== Fio output ===============\n\n{output}\n\n===========end============\n\n" 1412 ); 1413 Error::FioOutputParse 1414 }) 1415 } 1416 1417 pub fn parse_fio_output_iops(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1418 std::panic::catch_unwind(|| { 1419 let v: Value = 1420 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1421 let jobs = v["jobs"] 1422 .as_array() 1423 .expect("'fio' parse error: missing entry 'jobs'"); 1424 assert_eq!( 1425 jobs.len(), 1426 num_jobs as usize, 1427 "'fio' parse error: Unexpected number of 'fio' jobs." 1428 ); 1429 1430 let (read, write) = match fio_ops { 1431 FioOps::Read | FioOps::RandomRead => (true, false), 1432 FioOps::Write | FioOps::RandomWrite => (false, true), 1433 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1434 }; 1435 1436 let mut total_iops = 0_f64; 1437 for j in jobs { 1438 if read { 1439 let ios = j["read"]["total_ios"] 1440 .as_u64() 1441 .expect("'fio' parse error: missing entry 'read.total_ios'"); 1442 let runtime = j["read"]["runtime"] 1443 .as_u64() 1444 .expect("'fio' parse error: missing entry 'read.runtime'") 1445 as f64 1446 / 1000_f64; 1447 total_iops += ios as f64 / runtime; 1448 } 1449 if write { 1450 let ios = j["write"]["total_ios"] 1451 .as_u64() 1452 .expect("'fio' parse error: missing entry 'write.total_ios'"); 1453 let runtime = j["write"]["runtime"] 1454 .as_u64() 1455 .expect("'fio' parse error: missing entry 'write.runtime'") 1456 as f64 1457 / 1000_f64; 1458 total_iops += ios as f64 / runtime; 1459 } 1460 } 1461 1462 total_iops 1463 }) 1464 .map_err(|_| { 1465 eprintln!( 1466 "=============== Fio output ===============\n\n{output}\n\n===========end============\n\n" 1467 ); 1468 Error::FioOutputParse 1469 }) 1470 } 1471 1472 // Wait the child process for a given timeout 1473 fn child_wait_timeout(child: &mut Child, timeout: u64) -> Result<(), WaitTimeoutError> { 1474 match child.wait_timeout(Duration::from_secs(timeout)) { 1475 Err(e) => { 1476 return Err(WaitTimeoutError::General(e)); 1477 } 1478 Ok(s) => match s { 1479 None => { 1480 return Err(WaitTimeoutError::Timedout); 1481 } 1482 Some(s) => { 1483 if !s.success() { 1484 return Err(WaitTimeoutError::ExitStatus); 1485 } 1486 } 1487 }, 1488 } 1489 1490 Ok(()) 1491 } 1492 1493 pub fn measure_virtio_net_throughput( 1494 test_timeout: u32, 1495 queue_pairs: u32, 1496 guest: &Guest, 1497 receive: bool, 1498 ) -> Result<f64, Error> { 1499 let default_port = 5201; 1500 1501 // 1. start the iperf3 server on the guest 1502 for n in 0..queue_pairs { 1503 guest.ssh_command(&format!("iperf3 -s -p {} -D", default_port + n))?; 1504 } 1505 1506 thread::sleep(Duration::new(1, 0)); 1507 1508 // 2. start the iperf3 client on host to measure RX through-put 1509 let mut clients = Vec::new(); 1510 for n in 0..queue_pairs { 1511 let mut cmd = Command::new("iperf3"); 1512 cmd.args([ 1513 "-J", // Output in JSON format 1514 "-c", 1515 &guest.network.guest_ip, 1516 "-p", 1517 &format!("{}", default_port + n), 1518 "-t", 1519 &format!("{test_timeout}"), 1520 ]); 1521 // For measuring the guest transmit throughput (as a sender), 1522 // use reverse mode of the iperf3 client on the host 1523 if !receive { 1524 cmd.args(["-R"]); 1525 } 1526 let client = cmd 1527 .stderr(Stdio::piped()) 1528 .stdout(Stdio::piped()) 1529 .spawn() 1530 .map_err(Error::Spawn)?; 1531 1532 clients.push(client); 1533 } 1534 1535 let mut err: Option<Error> = None; 1536 let mut bps = Vec::new(); 1537 let mut failed = false; 1538 for c in clients { 1539 let mut c = c; 1540 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5) { 1541 err = Some(Error::WaitTimeout(e)); 1542 failed = true; 1543 } 1544 1545 if !failed { 1546 // Safe to unwrap as we know the child has terminated succesffully 1547 let output = c.wait_with_output().unwrap(); 1548 bps.push(parse_iperf3_output(&output.stdout, receive)?); 1549 } else { 1550 let _ = c.kill(); 1551 let output = c.wait_with_output().unwrap(); 1552 println!( 1553 "=============== Client output [Error] ===============\n\n{}\n\n===========end============\n\n", 1554 String::from_utf8_lossy(&output.stdout) 1555 ); 1556 } 1557 } 1558 1559 if let Some(e) = err { 1560 Err(e) 1561 } else { 1562 Ok(bps.iter().sum()) 1563 } 1564 } 1565 1566 pub fn parse_ethr_latency_output(output: &[u8]) -> Result<Vec<f64>, Error> { 1567 std::panic::catch_unwind(|| { 1568 let s = String::from_utf8_lossy(output); 1569 let mut latency = Vec::new(); 1570 for l in s.lines() { 1571 let v: Value = serde_json::from_str(l).expect("'ethr' parse error: invalid json line"); 1572 // Skip header/summary lines 1573 if let Some(avg) = v["Avg"].as_str() { 1574 // Assume the latency unit is always "us" 1575 latency.push( 1576 avg.split("us").collect::<Vec<&str>>()[0] 1577 .parse::<f64>() 1578 .expect("'ethr' parse error: invalid 'Avg' entry"), 1579 ); 1580 } 1581 } 1582 1583 assert!( 1584 !latency.is_empty(), 1585 "'ethr' parse error: no valid latency data found" 1586 ); 1587 1588 latency 1589 }) 1590 .map_err(|_| { 1591 eprintln!( 1592 "=============== ethr output ===============\n\n{}\n\n===========end============\n\n", 1593 String::from_utf8_lossy(output) 1594 ); 1595 Error::EthrLogParse 1596 }) 1597 } 1598 1599 pub fn measure_virtio_net_latency(guest: &Guest, test_timeout: u32) -> Result<Vec<f64>, Error> { 1600 // copy the 'ethr' tool to the guest image 1601 let ethr_path = "/usr/local/bin/ethr"; 1602 let ethr_remote_path = "/tmp/ethr"; 1603 scp_to_guest( 1604 Path::new(ethr_path), 1605 Path::new(ethr_remote_path), 1606 &guest.network.guest_ip, 1607 //DEFAULT_SSH_RETRIES, 1608 1, 1609 DEFAULT_SSH_TIMEOUT, 1610 )?; 1611 1612 // Start the ethr server on the guest 1613 guest.ssh_command(&format!("{ethr_remote_path} -s &> /dev/null &"))?; 1614 1615 thread::sleep(Duration::new(10, 0)); 1616 1617 // Start the ethr client on the host 1618 let log_file = guest 1619 .tmp_dir 1620 .as_path() 1621 .join("ethr.client.log") 1622 .to_str() 1623 .unwrap() 1624 .to_string(); 1625 let mut c = Command::new(ethr_path) 1626 .args([ 1627 "-c", 1628 &guest.network.guest_ip, 1629 "-t", 1630 "l", 1631 "-o", 1632 &log_file, // file output is JSON format 1633 "-d", 1634 &format!("{test_timeout}s"), 1635 ]) 1636 .stderr(Stdio::piped()) 1637 .stdout(Stdio::piped()) 1638 .spawn() 1639 .map_err(Error::Spawn)?; 1640 1641 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5).map_err(Error::WaitTimeout) 1642 { 1643 let _ = c.kill(); 1644 return Err(e); 1645 } 1646 1647 // Parse the ethr latency test output 1648 let content = fs::read(log_file).map_err(Error::EthrLogFile)?; 1649 parse_ethr_latency_output(&content) 1650 } 1651