1 // Copyright © 2021 Intel Corporation 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 6 #![allow(clippy::undocumented_unsafe_blocks)] 7 8 use once_cell::sync::Lazy; 9 use serde_json::Value; 10 use ssh2::Session; 11 use std::env; 12 use std::ffi::OsStr; 13 use std::io; 14 use std::io::{Read, Write}; 15 use std::net::TcpListener; 16 use std::net::TcpStream; 17 use std::os::unix::fs::PermissionsExt; 18 use std::os::unix::io::{AsRawFd, FromRawFd}; 19 use std::path::Path; 20 use std::process::{Child, Command, ExitStatus, Output, Stdio}; 21 use std::str::FromStr; 22 use std::sync::Mutex; 23 use std::thread; 24 use std::time::Duration; 25 use std::{fmt, fs}; 26 use vmm_sys_util::tempdir::TempDir; 27 use wait_timeout::ChildExt; 28 29 #[derive(Debug)] 30 pub enum WaitTimeoutError { 31 Timedout, 32 ExitStatus, 33 General(std::io::Error), 34 } 35 36 #[derive(Debug)] 37 pub enum Error { 38 Parsing(std::num::ParseIntError), 39 SshCommand(SshCommandError), 40 WaitForBoot(WaitForBootError), 41 EthrLogFile(std::io::Error), 42 EthrLogParse, 43 FioOutputParse, 44 Iperf3Parse, 45 Spawn(std::io::Error), 46 WaitTimeout(WaitTimeoutError), 47 } 48 49 impl From<SshCommandError> for Error { 50 fn from(e: SshCommandError) -> Self { 51 Self::SshCommand(e) 52 } 53 } 54 55 pub struct GuestNetworkConfig { 56 pub guest_ip: String, 57 pub l2_guest_ip1: String, 58 pub l2_guest_ip2: String, 59 pub l2_guest_ip3: String, 60 pub host_ip: String, 61 pub guest_mac: String, 62 pub l2_guest_mac1: String, 63 pub l2_guest_mac2: String, 64 pub l2_guest_mac3: String, 65 pub tcp_listener_port: u16, 66 } 67 68 pub const DEFAULT_TCP_LISTENER_MESSAGE: &str = "booted"; 69 pub const DEFAULT_TCP_LISTENER_PORT: u16 = 8000; 70 pub const DEFAULT_TCP_LISTENER_TIMEOUT: i32 = 120; 71 72 #[derive(Debug)] 73 pub enum WaitForBootError { 74 EpollWait(std::io::Error), 75 Listen(std::io::Error), 76 EpollWaitTimeout, 77 WrongGuestAddr, 78 Accept(std::io::Error), 79 } 80 81 impl GuestNetworkConfig { 82 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), WaitForBootError> { 83 let start = std::time::Instant::now(); 84 // The 'port' is unique per 'GUEST' and listening to wild-card ip avoids retrying on 'TcpListener::bind()' 85 let listen_addr = format!("0.0.0.0:{}", self.tcp_listener_port); 86 let expected_guest_addr = self.guest_ip.as_str(); 87 let mut s = String::new(); 88 let timeout = match custom_timeout { 89 Some(t) => t, 90 None => DEFAULT_TCP_LISTENER_TIMEOUT, 91 }; 92 93 let mut closure = || -> Result<(), WaitForBootError> { 94 let listener = 95 TcpListener::bind(listen_addr.as_str()).map_err(WaitForBootError::Listen)?; 96 listener 97 .set_nonblocking(true) 98 .expect("Cannot set non-blocking for tcp listener"); 99 100 // Reply on epoll w/ timeout to wait for guest connections faithfully 101 let epoll_fd = epoll::create(true).expect("Cannot create epoll fd"); 102 // Use 'File' to enforce closing on 'epoll_fd' 103 let _epoll_file = unsafe { fs::File::from_raw_fd(epoll_fd) }; 104 epoll::ctl( 105 epoll_fd, 106 epoll::ControlOptions::EPOLL_CTL_ADD, 107 listener.as_raw_fd(), 108 epoll::Event::new(epoll::Events::EPOLLIN, 0), 109 ) 110 .expect("Cannot add 'tcp_listener' event to epoll"); 111 let mut events = [epoll::Event::new(epoll::Events::empty(), 0); 1]; 112 loop { 113 let num_events = match epoll::wait(epoll_fd, timeout * 1000_i32, &mut events[..]) { 114 Ok(num_events) => Ok(num_events), 115 Err(e) => match e.raw_os_error() { 116 Some(libc::EAGAIN) | Some(libc::EINTR) => continue, 117 _ => Err(e), 118 }, 119 } 120 .map_err(WaitForBootError::EpollWait)?; 121 if num_events == 0 { 122 return Err(WaitForBootError::EpollWaitTimeout); 123 } 124 break; 125 } 126 127 match listener.accept() { 128 Ok((_, addr)) => { 129 // Make sure the connection is from the expected 'guest_addr' 130 if addr.ip() != std::net::IpAddr::from_str(expected_guest_addr).unwrap() { 131 s = format!( 132 "Expecting the guest ip '{}' while being connected with ip '{}'", 133 expected_guest_addr, 134 addr.ip() 135 ); 136 return Err(WaitForBootError::WrongGuestAddr); 137 } 138 139 Ok(()) 140 } 141 Err(e) => { 142 s = "TcpListener::accept() failed".to_string(); 143 Err(WaitForBootError::Accept(e)) 144 } 145 } 146 }; 147 148 match closure() { 149 Err(e) => { 150 let duration = start.elapsed(); 151 eprintln!( 152 "\n\n==== Start 'wait_vm_boot' (FAILED) ==== \ 153 \n\nduration =\"{duration:?}, timeout = {timeout}s\" \ 154 \nlisten_addr=\"{listen_addr}\" \ 155 \nexpected_guest_addr=\"{expected_guest_addr}\" \ 156 \nmessage=\"{s}\" \ 157 \nerror=\"{e:?}\" \ 158 \n\n==== End 'wait_vm_boot' outout ====\n\n" 159 ); 160 161 Err(e) 162 } 163 Ok(_) => Ok(()), 164 } 165 } 166 } 167 168 pub enum DiskType { 169 OperatingSystem, 170 CloudInit, 171 } 172 173 pub trait DiskConfig { 174 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig); 175 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String; 176 fn disk(&self, disk_type: DiskType) -> Option<String>; 177 } 178 179 #[derive(Clone)] 180 pub struct UbuntuDiskConfig { 181 osdisk_path: String, 182 cloudinit_path: String, 183 image_name: String, 184 } 185 186 impl UbuntuDiskConfig { 187 pub fn new(image_name: String) -> Self { 188 UbuntuDiskConfig { 189 image_name, 190 osdisk_path: String::new(), 191 cloudinit_path: String::new(), 192 } 193 } 194 } 195 196 pub struct WindowsDiskConfig { 197 image_name: String, 198 osdisk_path: String, 199 loopback_device: String, 200 windows_snapshot_cow: String, 201 windows_snapshot: String, 202 } 203 204 impl WindowsDiskConfig { 205 pub fn new(image_name: String) -> Self { 206 WindowsDiskConfig { 207 image_name, 208 osdisk_path: String::new(), 209 loopback_device: String::new(), 210 windows_snapshot_cow: String::new(), 211 windows_snapshot: String::new(), 212 } 213 } 214 } 215 216 impl Drop for WindowsDiskConfig { 217 fn drop(&mut self) { 218 // dmsetup remove windows-snapshot-1 219 std::process::Command::new("dmsetup") 220 .arg("remove") 221 .arg(self.windows_snapshot.as_str()) 222 .output() 223 .expect("Expect removing Windows snapshot with 'dmsetup' to succeed"); 224 225 // dmsetup remove windows-snapshot-cow-1 226 std::process::Command::new("dmsetup") 227 .arg("remove") 228 .arg(self.windows_snapshot_cow.as_str()) 229 .output() 230 .expect("Expect removing Windows snapshot CoW with 'dmsetup' to succeed"); 231 232 // losetup -d <loopback_device> 233 std::process::Command::new("losetup") 234 .args(["-d", self.loopback_device.as_str()]) 235 .output() 236 .expect("Expect removing loopback device to succeed"); 237 } 238 } 239 240 impl DiskConfig for UbuntuDiskConfig { 241 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String { 242 let cloudinit_file_path = 243 String::from(tmp_dir.as_path().join("cloudinit").to_str().unwrap()); 244 245 let cloud_init_directory = tmp_dir.as_path().join("cloud-init").join("ubuntu"); 246 247 fs::create_dir_all(&cloud_init_directory) 248 .expect("Expect creating cloud-init directory to succeed"); 249 250 let source_file_dir = std::env::current_dir() 251 .unwrap() 252 .join("test_data") 253 .join("cloud-init") 254 .join("ubuntu") 255 .join("ci"); 256 257 ["meta-data"].iter().for_each(|x| { 258 rate_limited_copy(source_file_dir.join(x), cloud_init_directory.join(x)) 259 .expect("Expect copying cloud-init meta-data to succeed"); 260 }); 261 262 let mut user_data_string = String::new(); 263 fs::File::open(source_file_dir.join("user-data")) 264 .unwrap() 265 .read_to_string(&mut user_data_string) 266 .expect("Expected reading user-data file in to succeed"); 267 user_data_string = user_data_string.replace( 268 "@DEFAULT_TCP_LISTENER_MESSAGE", 269 DEFAULT_TCP_LISTENER_MESSAGE, 270 ); 271 user_data_string = user_data_string.replace("@HOST_IP", &network.host_ip); 272 user_data_string = 273 user_data_string.replace("@TCP_LISTENER_PORT", &network.tcp_listener_port.to_string()); 274 275 fs::File::create(cloud_init_directory.join("user-data")) 276 .unwrap() 277 .write_all(user_data_string.as_bytes()) 278 .expect("Expected writing out user-data to succeed"); 279 280 let mut network_config_string = String::new(); 281 282 fs::File::open(source_file_dir.join("network-config")) 283 .unwrap() 284 .read_to_string(&mut network_config_string) 285 .expect("Expected reading network-config file in to succeed"); 286 287 network_config_string = network_config_string.replace("192.168.2.1", &network.host_ip); 288 network_config_string = network_config_string.replace("192.168.2.2", &network.guest_ip); 289 network_config_string = network_config_string.replace("192.168.2.3", &network.l2_guest_ip1); 290 network_config_string = network_config_string.replace("192.168.2.4", &network.l2_guest_ip2); 291 network_config_string = network_config_string.replace("192.168.2.5", &network.l2_guest_ip3); 292 network_config_string = 293 network_config_string.replace("12:34:56:78:90:ab", &network.guest_mac); 294 network_config_string = 295 network_config_string.replace("de:ad:be:ef:12:34", &network.l2_guest_mac1); 296 network_config_string = 297 network_config_string.replace("de:ad:be:ef:34:56", &network.l2_guest_mac2); 298 network_config_string = 299 network_config_string.replace("de:ad:be:ef:56:78", &network.l2_guest_mac3); 300 301 fs::File::create(cloud_init_directory.join("network-config")) 302 .unwrap() 303 .write_all(network_config_string.as_bytes()) 304 .expect("Expected writing out network-config to succeed"); 305 306 std::process::Command::new("mkdosfs") 307 .args(["-n", "CIDATA"]) 308 .args(["-C", cloudinit_file_path.as_str()]) 309 .arg("8192") 310 .output() 311 .expect("Expect creating disk image to succeed"); 312 313 ["user-data", "meta-data", "network-config"] 314 .iter() 315 .for_each(|x| { 316 std::process::Command::new("mcopy") 317 .arg("-o") 318 .args(["-i", cloudinit_file_path.as_str()]) 319 .args(["-s", cloud_init_directory.join(x).to_str().unwrap(), "::"]) 320 .output() 321 .expect("Expect copying files to disk image to succeed"); 322 }); 323 324 cloudinit_file_path 325 } 326 327 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig) { 328 let mut workload_path = dirs::home_dir().unwrap(); 329 workload_path.push("workloads"); 330 331 let mut osdisk_base_path = workload_path; 332 osdisk_base_path.push(&self.image_name); 333 334 let osdisk_path = String::from(tmp_dir.as_path().join("osdisk.img").to_str().unwrap()); 335 let cloudinit_path = self.prepare_cloudinit(tmp_dir, network); 336 337 rate_limited_copy(osdisk_base_path, &osdisk_path) 338 .expect("copying of OS source disk image failed"); 339 340 self.cloudinit_path = cloudinit_path; 341 self.osdisk_path = osdisk_path; 342 } 343 344 fn disk(&self, disk_type: DiskType) -> Option<String> { 345 match disk_type { 346 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 347 DiskType::CloudInit => Some(self.cloudinit_path.clone()), 348 } 349 } 350 } 351 352 impl DiskConfig for WindowsDiskConfig { 353 fn prepare_cloudinit(&self, _tmp_dir: &TempDir, _network: &GuestNetworkConfig) -> String { 354 String::new() 355 } 356 357 fn prepare_files(&mut self, tmp_dir: &TempDir, _network: &GuestNetworkConfig) { 358 let mut workload_path = dirs::home_dir().unwrap(); 359 workload_path.push("workloads"); 360 361 let mut osdisk_path = workload_path; 362 osdisk_path.push(&self.image_name); 363 364 let osdisk_blk_size = fs::metadata(osdisk_path) 365 .expect("Expect retrieving Windows image metadata") 366 .len() 367 >> 9; 368 369 let snapshot_cow_path = 370 String::from(tmp_dir.as_path().join("snapshot_cow").to_str().unwrap()); 371 372 // Create and truncate CoW file for device mapper 373 let cow_file_size: u64 = 1 << 30; 374 let cow_file_blk_size = cow_file_size >> 9; 375 let cow_file = std::fs::File::create(snapshot_cow_path.as_str()) 376 .expect("Expect creating CoW image to succeed"); 377 cow_file 378 .set_len(cow_file_size) 379 .expect("Expect truncating CoW image to succeed"); 380 381 // losetup --find --show /tmp/snapshot_cow 382 let loopback_device = std::process::Command::new("losetup") 383 .arg("--find") 384 .arg("--show") 385 .arg(snapshot_cow_path.as_str()) 386 .output() 387 .expect("Expect creating loopback device from snapshot CoW image to succeed"); 388 389 self.loopback_device = String::from_utf8_lossy(&loopback_device.stdout) 390 .trim() 391 .to_string(); 392 393 let random_extension = tmp_dir.as_path().file_name().unwrap(); 394 let windows_snapshot_cow = format!( 395 "windows-snapshot-cow-{}", 396 random_extension.to_str().unwrap() 397 ); 398 399 // dmsetup create windows-snapshot-cow-1 --table '0 2097152 linear /dev/loop1 0' 400 std::process::Command::new("dmsetup") 401 .arg("create") 402 .arg(windows_snapshot_cow.as_str()) 403 .args([ 404 "--table", 405 format!("0 {} linear {} 0", cow_file_blk_size, self.loopback_device).as_str(), 406 ]) 407 .output() 408 .expect("Expect creating Windows snapshot CoW with 'dmsetup' to succeed"); 409 410 let windows_snapshot = format!("windows-snapshot-{}", random_extension.to_str().unwrap()); 411 412 // dmsetup mknodes 413 std::process::Command::new("dmsetup") 414 .arg("mknodes") 415 .output() 416 .expect("Expect device mapper nodes to be ready"); 417 418 // dmsetup create windows-snapshot-1 --table '0 41943040 snapshot /dev/mapper/windows-base /dev/mapper/windows-snapshot-cow-1 P 8' 419 std::process::Command::new("dmsetup") 420 .arg("create") 421 .arg(windows_snapshot.as_str()) 422 .args([ 423 "--table", 424 format!( 425 "0 {} snapshot /dev/mapper/windows-base /dev/mapper/{} P 8", 426 osdisk_blk_size, 427 windows_snapshot_cow.as_str() 428 ) 429 .as_str(), 430 ]) 431 .output() 432 .expect("Expect creating Windows snapshot with 'dmsetup' to succeed"); 433 434 // dmsetup mknodes 435 std::process::Command::new("dmsetup") 436 .arg("mknodes") 437 .output() 438 .expect("Expect device mapper nodes to be ready"); 439 440 self.osdisk_path = format!("/dev/mapper/{windows_snapshot}"); 441 self.windows_snapshot_cow = windows_snapshot_cow; 442 self.windows_snapshot = windows_snapshot; 443 } 444 445 fn disk(&self, disk_type: DiskType) -> Option<String> { 446 match disk_type { 447 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 448 DiskType::CloudInit => None, 449 } 450 } 451 } 452 453 pub fn rate_limited_copy<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<u64> { 454 for i in 0..10 { 455 let free_bytes = unsafe { 456 let mut stats = std::mem::MaybeUninit::zeroed(); 457 let fs_name = std::ffi::CString::new("/tmp").unwrap(); 458 libc::statvfs(fs_name.as_ptr(), stats.as_mut_ptr()); 459 460 let free_blocks = stats.assume_init().f_bfree; 461 let block_size = stats.assume_init().f_bsize; 462 463 free_blocks * block_size 464 }; 465 466 // Make sure there is at least 6 GiB of space 467 if free_bytes < 6 << 30 { 468 eprintln!("Not enough space on disk ({free_bytes}). Attempt {i} of 10. Sleeping."); 469 thread::sleep(std::time::Duration::new(60, 0)); 470 continue; 471 } 472 473 match fs::copy(&from, &to) { 474 Err(e) => { 475 if let Some(errno) = e.raw_os_error() { 476 if errno == libc::ENOSPC { 477 eprintln!("Copy returned ENOSPC. Attempt {i} of 10. Sleeping."); 478 thread::sleep(std::time::Duration::new(60, 0)); 479 continue; 480 } 481 } 482 return Err(e); 483 } 484 Ok(i) => return Ok(i), 485 } 486 } 487 Err(io::Error::last_os_error()) 488 } 489 490 pub fn handle_child_output( 491 r: Result<(), std::boxed::Box<dyn std::any::Any + std::marker::Send>>, 492 output: &std::process::Output, 493 ) { 494 use std::os::unix::process::ExitStatusExt; 495 if r.is_ok() && output.status.success() { 496 return; 497 } 498 499 match output.status.code() { 500 None => { 501 // Don't treat child.kill() as a problem 502 if output.status.signal() == Some(9) && r.is_ok() { 503 return; 504 } 505 506 eprintln!( 507 "==== child killed by signal: {} ====", 508 output.status.signal().unwrap() 509 ); 510 } 511 Some(code) => { 512 eprintln!("\n\n==== child exit code: {code} ===="); 513 } 514 } 515 516 eprintln!( 517 "\n\n==== Start child stdout ====\n\n{}\n\n==== End child stdout ====", 518 String::from_utf8_lossy(&output.stdout) 519 ); 520 eprintln!( 521 "\n\n==== Start child stderr ====\n\n{}\n\n==== End child stderr ====", 522 String::from_utf8_lossy(&output.stderr) 523 ); 524 525 panic!("Test failed") 526 } 527 528 #[derive(Debug)] 529 pub struct PasswordAuth { 530 pub username: String, 531 pub password: String, 532 } 533 534 pub const DEFAULT_SSH_RETRIES: u8 = 6; 535 pub const DEFAULT_SSH_TIMEOUT: u8 = 10; 536 537 #[derive(Debug)] 538 pub enum SshCommandError { 539 Connection(std::io::Error), 540 Handshake(ssh2::Error), 541 Authentication(ssh2::Error), 542 ChannelSession(ssh2::Error), 543 Command(ssh2::Error), 544 ExitStatus(ssh2::Error), 545 NonZeroExitStatus(i32), 546 FileRead(std::io::Error), 547 FileMetadata(std::io::Error), 548 ScpSend(ssh2::Error), 549 WriteAll(std::io::Error), 550 SendEof(ssh2::Error), 551 WaitEof(ssh2::Error), 552 } 553 554 fn scp_to_guest_with_auth( 555 path: &Path, 556 remote_path: &Path, 557 auth: &PasswordAuth, 558 ip: &str, 559 retries: u8, 560 timeout: u8, 561 ) -> Result<(), SshCommandError> { 562 let mut counter = 0; 563 loop { 564 let closure = || -> Result<(), SshCommandError> { 565 let tcp = 566 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?; 567 let mut sess = Session::new().unwrap(); 568 sess.set_tcp_stream(tcp); 569 sess.handshake().map_err(SshCommandError::Handshake)?; 570 571 sess.userauth_password(&auth.username, &auth.password) 572 .map_err(SshCommandError::Authentication)?; 573 assert!(sess.authenticated()); 574 575 let content = fs::read(path).map_err(SshCommandError::FileRead)?; 576 let mode = fs::metadata(path) 577 .map_err(SshCommandError::FileMetadata)? 578 .permissions() 579 .mode() 580 & 0o777; 581 582 let mut channel = sess 583 .scp_send(remote_path, mode as i32, content.len() as u64, None) 584 .map_err(SshCommandError::ScpSend)?; 585 channel 586 .write_all(&content) 587 .map_err(SshCommandError::WriteAll)?; 588 channel.send_eof().map_err(SshCommandError::SendEof)?; 589 channel.wait_eof().map_err(SshCommandError::WaitEof)?; 590 591 // Intentionally ignore these results here as their failure 592 // does not precipitate a repeat 593 let _ = channel.close(); 594 let _ = channel.wait_close(); 595 596 Ok(()) 597 }; 598 599 match closure() { 600 Ok(_) => break, 601 Err(e) => { 602 counter += 1; 603 if counter >= retries { 604 eprintln!( 605 "\n\n==== Start scp command output (FAILED) ====\n\n\ 606 path =\"{path:?}\"\n\ 607 remote_path =\"{remote_path:?}\"\n\ 608 auth=\"{auth:#?}\"\n\ 609 ip=\"{ip}\"\n\ 610 error=\"{e:?}\"\n\ 611 \n==== End scp command outout ====\n\n" 612 ); 613 614 return Err(e); 615 } 616 } 617 }; 618 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 619 } 620 Ok(()) 621 } 622 623 pub fn scp_to_guest( 624 path: &Path, 625 remote_path: &Path, 626 ip: &str, 627 retries: u8, 628 timeout: u8, 629 ) -> Result<(), SshCommandError> { 630 scp_to_guest_with_auth( 631 path, 632 remote_path, 633 &PasswordAuth { 634 username: String::from("cloud"), 635 password: String::from("cloud123"), 636 }, 637 ip, 638 retries, 639 timeout, 640 ) 641 } 642 643 pub fn ssh_command_ip_with_auth( 644 command: &str, 645 auth: &PasswordAuth, 646 ip: &str, 647 retries: u8, 648 timeout: u8, 649 ) -> Result<String, SshCommandError> { 650 let mut s = String::new(); 651 652 let mut counter = 0; 653 loop { 654 let mut closure = || -> Result<(), SshCommandError> { 655 let tcp = 656 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?; 657 let mut sess = Session::new().unwrap(); 658 sess.set_tcp_stream(tcp); 659 sess.handshake().map_err(SshCommandError::Handshake)?; 660 661 sess.userauth_password(&auth.username, &auth.password) 662 .map_err(SshCommandError::Authentication)?; 663 assert!(sess.authenticated()); 664 665 let mut channel = sess 666 .channel_session() 667 .map_err(SshCommandError::ChannelSession)?; 668 channel.exec(command).map_err(SshCommandError::Command)?; 669 670 // Intentionally ignore these results here as their failure 671 // does not precipitate a repeat 672 let _ = channel.read_to_string(&mut s); 673 let _ = channel.close(); 674 let _ = channel.wait_close(); 675 676 let status = channel.exit_status().map_err(SshCommandError::ExitStatus)?; 677 678 if status != 0 { 679 Err(SshCommandError::NonZeroExitStatus(status)) 680 } else { 681 Ok(()) 682 } 683 }; 684 685 match closure() { 686 Ok(_) => break, 687 Err(e) => { 688 counter += 1; 689 if counter >= retries { 690 eprintln!( 691 "\n\n==== Start ssh command output (FAILED) ====\n\n\ 692 command=\"{command}\"\n\ 693 auth=\"{auth:#?}\"\n\ 694 ip=\"{ip}\"\n\ 695 output=\"{s}\"\n\ 696 error=\"{e:?}\"\n\ 697 \n==== End ssh command outout ====\n\n" 698 ); 699 700 return Err(e); 701 } 702 } 703 }; 704 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 705 } 706 Ok(s) 707 } 708 709 pub fn ssh_command_ip( 710 command: &str, 711 ip: &str, 712 retries: u8, 713 timeout: u8, 714 ) -> Result<String, SshCommandError> { 715 ssh_command_ip_with_auth( 716 command, 717 &PasswordAuth { 718 username: String::from("cloud"), 719 password: String::from("cloud123"), 720 }, 721 ip, 722 retries, 723 timeout, 724 ) 725 } 726 727 pub fn exec_host_command_with_retries(command: &str, retries: u32, interval: Duration) -> bool { 728 for _ in 0..retries { 729 let s = exec_host_command_output(command).status; 730 if !s.success() { 731 eprintln!("\n\n==== retrying in {:?} ===\n\n", interval); 732 thread::sleep(interval); 733 } else { 734 return true; 735 } 736 } 737 738 false 739 } 740 741 pub fn exec_host_command_status(command: &str) -> ExitStatus { 742 exec_host_command_output(command).status 743 } 744 745 pub fn exec_host_command_output(command: &str) -> Output { 746 let output = std::process::Command::new("bash") 747 .args(["-c", command]) 748 .output() 749 .unwrap_or_else(|e| panic!("Expected '{command}' to run. Error: {:?}", e)); 750 751 if !output.status.success() { 752 let stdout = String::from_utf8_lossy(&output.stdout); 753 let stderr = String::from_utf8_lossy(&output.stderr); 754 eprintln!( 755 "\n\n==== Start 'exec_host_command' failed ==== \ 756 \n\n---stdout---\n{stdout}\n---stderr---{stderr} \ 757 \n\n==== End 'exec_host_command' failed ====", 758 ); 759 } 760 761 output 762 } 763 764 pub fn check_lines_count(input: &str, line_count: usize) -> bool { 765 if input.lines().count() == line_count { 766 true 767 } else { 768 eprintln!( 769 "\n\n==== Start 'check_lines_count' failed ==== \ 770 \n\ninput = {input}\nline_count = {line_count} \ 771 \n\n==== End 'check_lines_count' failed ====", 772 ); 773 774 false 775 } 776 } 777 778 pub fn check_matched_lines_count(input: &str, keywords: Vec<&str>, line_count: usize) -> bool { 779 let mut matches = String::new(); 780 for line in input.lines() { 781 if keywords.iter().all(|k| line.contains(k)) { 782 matches += line; 783 } 784 } 785 786 if matches.lines().count() == line_count { 787 true 788 } else { 789 eprintln!( 790 "\n\n==== Start 'check_matched_lines_count' failed ==== \ 791 \nkeywords = {keywords:?}, line_count = {line_count} \ 792 \n\ninput = {input} matches = {matches} \ 793 \n\n==== End 'check_matched_lines_count' failed ====", 794 ); 795 796 false 797 } 798 } 799 800 pub const PIPE_SIZE: i32 = 32 << 20; 801 802 static NEXT_VM_ID: Lazy<Mutex<u8>> = Lazy::new(|| Mutex::new(1)); 803 804 pub struct Guest { 805 pub tmp_dir: TempDir, 806 pub disk_config: Box<dyn DiskConfig>, 807 pub network: GuestNetworkConfig, 808 } 809 810 // Safe to implement as we know we have no interior mutability 811 impl std::panic::RefUnwindSafe for Guest {} 812 813 impl Guest { 814 pub fn new_from_ip_range(mut disk_config: Box<dyn DiskConfig>, class: &str, id: u8) -> Self { 815 let tmp_dir = TempDir::new_with_prefix("/tmp/ch").unwrap(); 816 817 let network = GuestNetworkConfig { 818 guest_ip: format!("{class}.{id}.2"), 819 l2_guest_ip1: format!("{class}.{id}.3"), 820 l2_guest_ip2: format!("{class}.{id}.4"), 821 l2_guest_ip3: format!("{class}.{id}.5"), 822 host_ip: format!("{class}.{id}.1"), 823 guest_mac: format!("12:34:56:78:90:{id:02x}"), 824 l2_guest_mac1: format!("de:ad:be:ef:12:{id:02x}"), 825 l2_guest_mac2: format!("de:ad:be:ef:34:{id:02x}"), 826 l2_guest_mac3: format!("de:ad:be:ef:56:{id:02x}"), 827 tcp_listener_port: DEFAULT_TCP_LISTENER_PORT + id as u16, 828 }; 829 830 disk_config.prepare_files(&tmp_dir, &network); 831 832 Guest { 833 tmp_dir, 834 disk_config, 835 network, 836 } 837 } 838 839 pub fn new(disk_config: Box<dyn DiskConfig>) -> Self { 840 let mut guard = NEXT_VM_ID.lock().unwrap(); 841 let id = *guard; 842 *guard = id + 1; 843 844 Self::new_from_ip_range(disk_config, "192.168", id) 845 } 846 847 pub fn default_net_string(&self) -> String { 848 format!( 849 "tap=,mac={},ip={},mask=255.255.255.0", 850 self.network.guest_mac, self.network.host_ip 851 ) 852 } 853 854 pub fn default_net_string_w_iommu(&self) -> String { 855 format!( 856 "tap=,mac={},ip={},mask=255.255.255.0,iommu=on", 857 self.network.guest_mac, self.network.host_ip 858 ) 859 } 860 861 pub fn default_net_string_w_mtu(&self, mtu: u16) -> String { 862 format!( 863 "tap=,mac={},ip={},mask=255.255.255.0,mtu={}", 864 self.network.guest_mac, self.network.host_ip, mtu 865 ) 866 } 867 868 pub fn ssh_command(&self, command: &str) -> Result<String, SshCommandError> { 869 ssh_command_ip( 870 command, 871 &self.network.guest_ip, 872 DEFAULT_SSH_RETRIES, 873 DEFAULT_SSH_TIMEOUT, 874 ) 875 } 876 877 #[cfg(target_arch = "x86_64")] 878 pub fn ssh_command_l1(&self, command: &str) -> Result<String, SshCommandError> { 879 ssh_command_ip( 880 command, 881 &self.network.guest_ip, 882 DEFAULT_SSH_RETRIES, 883 DEFAULT_SSH_TIMEOUT, 884 ) 885 } 886 887 #[cfg(target_arch = "x86_64")] 888 pub fn ssh_command_l2_1(&self, command: &str) -> Result<String, SshCommandError> { 889 ssh_command_ip( 890 command, 891 &self.network.l2_guest_ip1, 892 DEFAULT_SSH_RETRIES, 893 DEFAULT_SSH_TIMEOUT, 894 ) 895 } 896 897 #[cfg(target_arch = "x86_64")] 898 pub fn ssh_command_l2_2(&self, command: &str) -> Result<String, SshCommandError> { 899 ssh_command_ip( 900 command, 901 &self.network.l2_guest_ip2, 902 DEFAULT_SSH_RETRIES, 903 DEFAULT_SSH_TIMEOUT, 904 ) 905 } 906 907 #[cfg(target_arch = "x86_64")] 908 pub fn ssh_command_l2_3(&self, command: &str) -> Result<String, SshCommandError> { 909 ssh_command_ip( 910 command, 911 &self.network.l2_guest_ip3, 912 DEFAULT_SSH_RETRIES, 913 DEFAULT_SSH_TIMEOUT, 914 ) 915 } 916 917 pub fn api_create_body(&self, cpu_count: u8, kernel_path: &str, kernel_cmd: &str) -> String { 918 format! {"{{\"cpus\":{{\"boot_vcpus\":{},\"max_vcpus\":{}}},\"payload\":{{\"kernel\":\"{}\",\"cmdline\": \"{}\"}},\"net\":[{{\"ip\":\"{}\", \"mask\":\"255.255.255.0\", \"mac\":\"{}\"}}], \"disks\":[{{\"path\":\"{}\"}}, {{\"path\":\"{}\"}}]}}", 919 cpu_count, 920 cpu_count, 921 kernel_path, 922 kernel_cmd, 923 self.network.host_ip, 924 self.network.guest_mac, 925 self.disk_config.disk(DiskType::OperatingSystem).unwrap().as_str(), 926 self.disk_config.disk(DiskType::CloudInit).unwrap().as_str(), 927 } 928 } 929 930 pub fn get_cpu_count(&self) -> Result<u32, Error> { 931 self.ssh_command("grep -c processor /proc/cpuinfo")? 932 .trim() 933 .parse() 934 .map_err(Error::Parsing) 935 } 936 937 pub fn get_total_memory(&self) -> Result<u32, Error> { 938 self.ssh_command("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 939 .trim() 940 .parse() 941 .map_err(Error::Parsing) 942 } 943 944 #[cfg(target_arch = "x86_64")] 945 pub fn get_total_memory_l2(&self) -> Result<u32, Error> { 946 self.ssh_command_l2_1("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 947 .trim() 948 .parse() 949 .map_err(Error::Parsing) 950 } 951 952 pub fn get_numa_node_memory(&self, node_id: usize) -> Result<u32, Error> { 953 self.ssh_command( 954 format!( 955 "grep MemTotal /sys/devices/system/node/node{node_id}/meminfo \ 956 | cut -d \":\" -f 2 | grep -o \"[0-9]*\"" 957 ) 958 .as_str(), 959 )? 960 .trim() 961 .parse() 962 .map_err(Error::Parsing) 963 } 964 965 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), Error> { 966 self.network 967 .wait_vm_boot(custom_timeout) 968 .map_err(Error::WaitForBoot) 969 } 970 971 pub fn check_numa_node_cpus(&self, node_id: usize, cpus: Vec<usize>) -> Result<(), Error> { 972 for cpu in cpus.iter() { 973 let cmd = format!("[ -d \"/sys/devices/system/node/node{node_id}/cpu{cpu}\" ]"); 974 self.ssh_command(cmd.as_str())?; 975 } 976 977 Ok(()) 978 } 979 980 pub fn check_numa_node_distances( 981 &self, 982 node_id: usize, 983 distances: &str, 984 ) -> Result<bool, Error> { 985 let cmd = format!("cat /sys/devices/system/node/node{node_id}/distance"); 986 if self.ssh_command(cmd.as_str())?.trim() == distances { 987 Ok(true) 988 } else { 989 Ok(false) 990 } 991 } 992 993 pub fn check_numa_common( 994 &self, 995 mem_ref: Option<&[u32]>, 996 node_ref: Option<&[Vec<usize>]>, 997 distance_ref: Option<&[&str]>, 998 ) { 999 if let Some(mem_ref) = mem_ref { 1000 // Check each NUMA node has been assigned the right amount of 1001 // memory. 1002 for (i, &m) in mem_ref.iter().enumerate() { 1003 assert!(self.get_numa_node_memory(i).unwrap_or_default() > m); 1004 } 1005 } 1006 1007 if let Some(node_ref) = node_ref { 1008 // Check each NUMA node has been assigned the right CPUs set. 1009 for (i, n) in node_ref.iter().enumerate() { 1010 self.check_numa_node_cpus(i, n.clone()).unwrap(); 1011 } 1012 } 1013 1014 if let Some(distance_ref) = distance_ref { 1015 // Check each NUMA node has been assigned the right distances. 1016 for (i, &d) in distance_ref.iter().enumerate() { 1017 assert!(self.check_numa_node_distances(i, d).unwrap()); 1018 } 1019 } 1020 } 1021 1022 #[cfg(target_arch = "x86_64")] 1023 pub fn check_sgx_support(&self) -> Result<(), Error> { 1024 self.ssh_command( 1025 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX: \ 1026 Software Guard Extensions supported = true'", 1027 )?; 1028 self.ssh_command( 1029 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX_LC: \ 1030 SGX launch config supported = true'", 1031 )?; 1032 self.ssh_command( 1033 "cpuid -l 0x12 -s 0 | tr -s [:space:] | grep -q 'SGX1 \ 1034 supported = true'", 1035 )?; 1036 1037 Ok(()) 1038 } 1039 1040 pub fn get_pci_bridge_class(&self) -> Result<String, Error> { 1041 Ok(self 1042 .ssh_command("cat /sys/bus/pci/devices/0000:00:00.0/class")? 1043 .trim() 1044 .to_string()) 1045 } 1046 1047 pub fn get_pci_device_ids(&self) -> Result<String, Error> { 1048 Ok(self 1049 .ssh_command("cat /sys/bus/pci/devices/*/device")? 1050 .trim() 1051 .to_string()) 1052 } 1053 1054 pub fn get_pci_vendor_ids(&self) -> Result<String, Error> { 1055 Ok(self 1056 .ssh_command("cat /sys/bus/pci/devices/*/vendor")? 1057 .trim() 1058 .to_string()) 1059 } 1060 1061 pub fn does_device_vendor_pair_match( 1062 &self, 1063 device_id: &str, 1064 vendor_id: &str, 1065 ) -> Result<bool, Error> { 1066 // We are checking if console device's device id and vendor id pair matches 1067 let devices = self.get_pci_device_ids()?; 1068 let devices: Vec<&str> = devices.split('\n').collect(); 1069 let vendors = self.get_pci_vendor_ids()?; 1070 let vendors: Vec<&str> = vendors.split('\n').collect(); 1071 1072 for (index, d_id) in devices.iter().enumerate() { 1073 if *d_id == device_id { 1074 if let Some(v_id) = vendors.get(index) { 1075 if *v_id == vendor_id { 1076 return Ok(true); 1077 } 1078 } 1079 } 1080 } 1081 1082 Ok(false) 1083 } 1084 1085 pub fn check_vsock(&self, socket: &str) { 1086 // Listen from guest on vsock CID=3 PORT=16 1087 // SOCKET-LISTEN:<domain>:<protocol>:<local-address> 1088 let guest_ip = self.network.guest_ip.clone(); 1089 let listen_socat = thread::spawn(move || { 1090 ssh_command_ip("sudo socat - SOCKET-LISTEN:40:0:x00x00x10x00x00x00x03x00x00x00x00x00x00x00 > vsock_log", &guest_ip, DEFAULT_SSH_RETRIES, DEFAULT_SSH_TIMEOUT).unwrap(); 1091 }); 1092 1093 // Make sure socat is listening, which might take a few second on slow systems 1094 thread::sleep(std::time::Duration::new(10, 0)); 1095 1096 // Write something to vsock from the host 1097 assert!(exec_host_command_status(&format!( 1098 "echo -e \"CONNECT 16\\nHelloWorld!\" | socat - UNIX-CONNECT:{socket}" 1099 )) 1100 .success()); 1101 1102 // Wait for the thread to terminate. 1103 listen_socat.join().unwrap(); 1104 1105 assert_eq!( 1106 self.ssh_command("cat vsock_log").unwrap().trim(), 1107 "HelloWorld!" 1108 ); 1109 } 1110 1111 #[cfg(target_arch = "x86_64")] 1112 pub fn check_nvidia_gpu(&self) { 1113 assert!(self.ssh_command("nvidia-smi").unwrap().contains("Tesla T4")); 1114 } 1115 1116 pub fn reboot_linux(&self, current_reboot_count: u32, custom_timeout: Option<i32>) { 1117 let list_boots_cmd = "sudo last | grep -c reboot"; 1118 let boot_count = self 1119 .ssh_command(list_boots_cmd) 1120 .unwrap() 1121 .trim() 1122 .parse::<u32>() 1123 .unwrap_or_default(); 1124 1125 assert_eq!(boot_count, current_reboot_count + 1); 1126 self.ssh_command("sudo reboot").unwrap(); 1127 1128 self.wait_vm_boot(custom_timeout).unwrap(); 1129 let boot_count = self 1130 .ssh_command(list_boots_cmd) 1131 .unwrap() 1132 .trim() 1133 .parse::<u32>() 1134 .unwrap_or_default(); 1135 assert_eq!(boot_count, current_reboot_count + 2); 1136 } 1137 1138 pub fn enable_memory_hotplug(&self) { 1139 self.ssh_command("echo online | sudo tee /sys/devices/system/memory/auto_online_blocks") 1140 .unwrap(); 1141 } 1142 1143 pub fn check_devices_common( 1144 &self, 1145 socket: Option<&String>, 1146 console_text: Option<&String>, 1147 pmem_path: Option<&String>, 1148 ) { 1149 // Check block devices are readable 1150 self.ssh_command("sudo dd if=/dev/vda of=/dev/null bs=1M iflag=direct count=1024") 1151 .unwrap(); 1152 self.ssh_command("sudo dd if=/dev/vdb of=/dev/null bs=1M iflag=direct count=8") 1153 .unwrap(); 1154 // Check if the rng device is readable 1155 self.ssh_command("sudo head -c 1000 /dev/hwrng > /dev/null") 1156 .unwrap(); 1157 // Check vsock 1158 if let Some(socket) = socket { 1159 self.check_vsock(socket.as_str()); 1160 } 1161 // Check if the console is usable 1162 if let Some(console_text) = console_text { 1163 let console_cmd = format!("echo {console_text} | sudo tee /dev/hvc0"); 1164 self.ssh_command(&console_cmd).unwrap(); 1165 } 1166 // The net device is 'automatically' exercised through the above 'ssh' commands 1167 1168 // Check if the pmem device is usable 1169 if let Some(pmem_path) = pmem_path { 1170 assert_eq!( 1171 self.ssh_command(&format!("ls {pmem_path}")).unwrap().trim(), 1172 pmem_path 1173 ); 1174 assert_eq!( 1175 self.ssh_command(&format!("sudo mount {pmem_path} /mnt")) 1176 .unwrap(), 1177 "" 1178 ); 1179 assert_eq!(self.ssh_command("ls /mnt").unwrap(), "lost+found\n"); 1180 self.ssh_command("echo test123 | sudo tee /mnt/test") 1181 .unwrap(); 1182 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1183 assert_eq!(self.ssh_command("ls /mnt").unwrap(), ""); 1184 1185 assert_eq!( 1186 self.ssh_command(&format!("sudo mount {pmem_path} /mnt")) 1187 .unwrap(), 1188 "" 1189 ); 1190 assert_eq!( 1191 self.ssh_command("sudo cat /mnt/test || true") 1192 .unwrap() 1193 .trim(), 1194 "test123" 1195 ); 1196 self.ssh_command("sudo rm /mnt/test").unwrap(); 1197 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1198 } 1199 } 1200 } 1201 1202 pub enum VerbosityLevel { 1203 Warn, 1204 Info, 1205 Debug, 1206 } 1207 1208 impl Default for VerbosityLevel { 1209 fn default() -> Self { 1210 Self::Warn 1211 } 1212 } 1213 1214 impl ToString for VerbosityLevel { 1215 fn to_string(&self) -> String { 1216 use VerbosityLevel::*; 1217 match self { 1218 Warn => "".to_string(), 1219 Info => "-v".to_string(), 1220 Debug => "-vv".to_string(), 1221 } 1222 } 1223 } 1224 1225 pub struct GuestCommand<'a> { 1226 command: Command, 1227 guest: &'a Guest, 1228 capture_output: bool, 1229 print_cmd: bool, 1230 verbosity: VerbosityLevel, 1231 } 1232 1233 impl<'a> GuestCommand<'a> { 1234 pub fn new(guest: &'a Guest) -> Self { 1235 Self::new_with_binary_path(guest, &clh_command("cloud-hypervisor")) 1236 } 1237 1238 pub fn new_with_binary_path(guest: &'a Guest, binary_path: &str) -> Self { 1239 Self { 1240 command: Command::new(binary_path), 1241 guest, 1242 capture_output: false, 1243 print_cmd: true, 1244 verbosity: VerbosityLevel::Info, 1245 } 1246 } 1247 1248 pub fn verbosity(&mut self, verbosity: VerbosityLevel) -> &mut Self { 1249 self.verbosity = verbosity; 1250 self 1251 } 1252 1253 pub fn capture_output(&mut self) -> &mut Self { 1254 self.capture_output = true; 1255 self 1256 } 1257 1258 pub fn set_print_cmd(&mut self, print_cmd: bool) -> &mut Self { 1259 self.print_cmd = print_cmd; 1260 self 1261 } 1262 1263 pub fn spawn(&mut self) -> io::Result<Child> { 1264 use VerbosityLevel::*; 1265 match &self.verbosity { 1266 Warn => {} 1267 Info => { 1268 self.command.arg("-v"); 1269 } 1270 Debug => { 1271 self.command.args(["-vv"]); 1272 } 1273 }; 1274 1275 if self.print_cmd { 1276 println!( 1277 "\n\n==== Start cloud-hypervisor command-line ====\n\n\ 1278 {:?}\n\ 1279 \n==== End cloud-hypervisor command-line ====\n\n", 1280 self.command 1281 ); 1282 } 1283 1284 if self.capture_output { 1285 let child = self 1286 .command 1287 .stderr(Stdio::piped()) 1288 .stdout(Stdio::piped()) 1289 .spawn() 1290 .unwrap(); 1291 1292 let fd = child.stdout.as_ref().unwrap().as_raw_fd(); 1293 let pipesize = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1294 if pipesize == -1 { 1295 return Err(io::Error::last_os_error()); 1296 } 1297 let fd = child.stderr.as_ref().unwrap().as_raw_fd(); 1298 let pipesize1 = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1299 if pipesize1 == -1 { 1300 return Err(io::Error::last_os_error()); 1301 } 1302 1303 if pipesize >= PIPE_SIZE && pipesize1 >= PIPE_SIZE { 1304 Ok(child) 1305 } else { 1306 Err(std::io::Error::new( 1307 std::io::ErrorKind::Other, 1308 format!( 1309 "resizing pipe w/ 'fnctl' failed: stdout pipesize {pipesize}, stderr pipesize {pipesize1}" 1310 ), 1311 )) 1312 } 1313 } else { 1314 self.command.spawn() 1315 } 1316 } 1317 1318 pub fn args<I, S>(&mut self, args: I) -> &mut Self 1319 where 1320 I: IntoIterator<Item = S>, 1321 S: AsRef<OsStr>, 1322 { 1323 self.command.args(args); 1324 self 1325 } 1326 1327 pub fn default_disks(&mut self) -> &mut Self { 1328 if self.guest.disk_config.disk(DiskType::CloudInit).is_some() { 1329 self.args([ 1330 "--disk", 1331 format!( 1332 "path={}", 1333 self.guest 1334 .disk_config 1335 .disk(DiskType::OperatingSystem) 1336 .unwrap() 1337 ) 1338 .as_str(), 1339 format!( 1340 "path={}", 1341 self.guest.disk_config.disk(DiskType::CloudInit).unwrap() 1342 ) 1343 .as_str(), 1344 ]) 1345 } else { 1346 self.args([ 1347 "--disk", 1348 format!( 1349 "path={}", 1350 self.guest 1351 .disk_config 1352 .disk(DiskType::OperatingSystem) 1353 .unwrap() 1354 ) 1355 .as_str(), 1356 ]) 1357 } 1358 } 1359 1360 pub fn default_net(&mut self) -> &mut Self { 1361 self.args(["--net", self.guest.default_net_string().as_str()]) 1362 } 1363 } 1364 1365 pub fn clh_command(cmd: &str) -> String { 1366 env::var("BUILD_TARGET").map_or( 1367 format!("target/x86_64-unknown-linux-gnu/release/{cmd}"), 1368 |target| format!("target/{target}/release/{cmd}"), 1369 ) 1370 } 1371 1372 pub fn parse_iperf3_output(output: &[u8], sender: bool, bandwidth: bool) -> Result<f64, Error> { 1373 std::panic::catch_unwind(|| { 1374 let s = String::from_utf8_lossy(output); 1375 let v: Value = serde_json::from_str(&s).expect("'iperf3' parse error: invalid json output"); 1376 1377 if bandwidth { 1378 if sender { 1379 v["end"]["sum_sent"]["bits_per_second"] 1380 .as_f64() 1381 .expect("'iperf3' parse error: missing entry 'end.sum_sent.bits_per_second'") 1382 } else { 1383 v["end"]["sum_received"]["bits_per_second"].as_f64().expect( 1384 "'iperf3' parse error: missing entry 'end.sum_received.bits_per_second'", 1385 ) 1386 } 1387 } else { 1388 // iperf does not distinguish sent vs received in this case. 1389 1390 let lost_packets = v["end"]["sum"]["lost_packets"] 1391 .as_f64() 1392 .expect("'iperf3' parse error: missing entry 'end.sum.lost_packets'"); 1393 let packets = v["end"]["sum"]["packets"] 1394 .as_f64() 1395 .expect("'iperf3' parse error: missing entry 'end.sum.packets'"); 1396 let seconds = v["end"]["sum"]["seconds"] 1397 .as_f64() 1398 .expect("'iperf3' parse error: missing entry 'end.sum.seconds'"); 1399 1400 (packets - lost_packets) / seconds 1401 } 1402 }) 1403 .map_err(|_| { 1404 eprintln!( 1405 "==== Start iperf3 output ===\n\n{}\n\n=== End iperf3 output ===\n\n", 1406 String::from_utf8_lossy(output) 1407 ); 1408 Error::Iperf3Parse 1409 }) 1410 } 1411 1412 #[derive(Clone)] 1413 pub enum FioOps { 1414 Read, 1415 RandomRead, 1416 Write, 1417 RandomWrite, 1418 ReadWrite, 1419 RandRW, 1420 } 1421 1422 impl fmt::Display for FioOps { 1423 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1424 match self { 1425 FioOps::Read => write!(f, "read"), 1426 FioOps::RandomRead => write!(f, "randread"), 1427 FioOps::Write => write!(f, "write"), 1428 FioOps::RandomWrite => write!(f, "randwrite"), 1429 FioOps::ReadWrite => write!(f, "rw"), 1430 FioOps::RandRW => write!(f, "randrw"), 1431 } 1432 } 1433 } 1434 1435 pub fn parse_fio_output(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1436 std::panic::catch_unwind(|| { 1437 let v: Value = 1438 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1439 let jobs = v["jobs"] 1440 .as_array() 1441 .expect("'fio' parse error: missing entry 'jobs'"); 1442 assert_eq!( 1443 jobs.len(), 1444 num_jobs as usize, 1445 "'fio' parse error: Unexpected number of 'fio' jobs." 1446 ); 1447 1448 let (read, write) = match fio_ops { 1449 FioOps::Read | FioOps::RandomRead => (true, false), 1450 FioOps::Write | FioOps::RandomWrite => (false, true), 1451 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1452 }; 1453 1454 let mut total_bps = 0_f64; 1455 for j in jobs { 1456 if read { 1457 let bytes = j["read"]["io_bytes"] 1458 .as_u64() 1459 .expect("'fio' parse error: missing entry 'read.io_bytes'"); 1460 let runtime = j["read"]["runtime"] 1461 .as_u64() 1462 .expect("'fio' parse error: missing entry 'read.runtime'") 1463 as f64 1464 / 1000_f64; 1465 total_bps += bytes as f64 / runtime; 1466 } 1467 if write { 1468 let bytes = j["write"]["io_bytes"] 1469 .as_u64() 1470 .expect("'fio' parse error: missing entry 'write.io_bytes'"); 1471 let runtime = j["write"]["runtime"] 1472 .as_u64() 1473 .expect("'fio' parse error: missing entry 'write.runtime'") 1474 as f64 1475 / 1000_f64; 1476 total_bps += bytes as f64 / runtime; 1477 } 1478 } 1479 1480 total_bps 1481 }) 1482 .map_err(|_| { 1483 eprintln!("=== Start Fio output ===\n\n{output}\n\n=== End Fio output ===\n\n"); 1484 Error::FioOutputParse 1485 }) 1486 } 1487 1488 pub fn parse_fio_output_iops(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1489 std::panic::catch_unwind(|| { 1490 let v: Value = 1491 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1492 let jobs = v["jobs"] 1493 .as_array() 1494 .expect("'fio' parse error: missing entry 'jobs'"); 1495 assert_eq!( 1496 jobs.len(), 1497 num_jobs as usize, 1498 "'fio' parse error: Unexpected number of 'fio' jobs." 1499 ); 1500 1501 let (read, write) = match fio_ops { 1502 FioOps::Read | FioOps::RandomRead => (true, false), 1503 FioOps::Write | FioOps::RandomWrite => (false, true), 1504 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1505 }; 1506 1507 let mut total_iops = 0_f64; 1508 for j in jobs { 1509 if read { 1510 let ios = j["read"]["total_ios"] 1511 .as_u64() 1512 .expect("'fio' parse error: missing entry 'read.total_ios'"); 1513 let runtime = j["read"]["runtime"] 1514 .as_u64() 1515 .expect("'fio' parse error: missing entry 'read.runtime'") 1516 as f64 1517 / 1000_f64; 1518 total_iops += ios as f64 / runtime; 1519 } 1520 if write { 1521 let ios = j["write"]["total_ios"] 1522 .as_u64() 1523 .expect("'fio' parse error: missing entry 'write.total_ios'"); 1524 let runtime = j["write"]["runtime"] 1525 .as_u64() 1526 .expect("'fio' parse error: missing entry 'write.runtime'") 1527 as f64 1528 / 1000_f64; 1529 total_iops += ios as f64 / runtime; 1530 } 1531 } 1532 1533 total_iops 1534 }) 1535 .map_err(|_| { 1536 eprintln!("=== Start Fio output ===\n\n{output}\n\n=== End Fio output ===\n\n"); 1537 Error::FioOutputParse 1538 }) 1539 } 1540 1541 // Wait the child process for a given timeout 1542 fn child_wait_timeout(child: &mut Child, timeout: u64) -> Result<(), WaitTimeoutError> { 1543 match child.wait_timeout(Duration::from_secs(timeout)) { 1544 Err(e) => { 1545 return Err(WaitTimeoutError::General(e)); 1546 } 1547 Ok(s) => match s { 1548 None => { 1549 return Err(WaitTimeoutError::Timedout); 1550 } 1551 Some(s) => { 1552 if !s.success() { 1553 return Err(WaitTimeoutError::ExitStatus); 1554 } 1555 } 1556 }, 1557 } 1558 1559 Ok(()) 1560 } 1561 1562 pub fn measure_virtio_net_throughput( 1563 test_timeout: u32, 1564 queue_pairs: u32, 1565 guest: &Guest, 1566 receive: bool, 1567 bandwidth: bool, 1568 ) -> Result<f64, Error> { 1569 let default_port = 5201; 1570 1571 // 1. start the iperf3 server on the guest 1572 for n in 0..queue_pairs { 1573 guest.ssh_command(&format!("iperf3 -s -p {} -D", default_port + n))?; 1574 } 1575 1576 thread::sleep(Duration::new(1, 0)); 1577 1578 // 2. start the iperf3 client on host to measure RX through-put 1579 let mut clients = Vec::new(); 1580 for n in 0..queue_pairs { 1581 let mut cmd = Command::new("iperf3"); 1582 cmd.args([ 1583 "-J", // Output in JSON format 1584 "-c", 1585 &guest.network.guest_ip, 1586 "-p", 1587 &format!("{}", default_port + n), 1588 "-t", 1589 &format!("{test_timeout}"), 1590 "-i", 1591 "0", 1592 ]); 1593 // For measuring the guest transmit throughput (as a sender), 1594 // use reverse mode of the iperf3 client on the host 1595 if !receive { 1596 cmd.args(["-R"]); 1597 } 1598 // Use UDP stream to measure packets per second. The bitrate is set to 1599 // 1T to make sure it saturates the link. 1600 if !bandwidth { 1601 cmd.args(["-u", "-b", "1T"]); 1602 } 1603 let client = cmd 1604 .stderr(Stdio::piped()) 1605 .stdout(Stdio::piped()) 1606 .spawn() 1607 .map_err(Error::Spawn)?; 1608 1609 clients.push(client); 1610 } 1611 1612 let mut err: Option<Error> = None; 1613 let mut results = Vec::new(); 1614 let mut failed = false; 1615 for c in clients { 1616 let mut c = c; 1617 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5) { 1618 err = Some(Error::WaitTimeout(e)); 1619 failed = true; 1620 } 1621 1622 if !failed { 1623 // Safe to unwrap as we know the child has terminated succesffully 1624 let output = c.wait_with_output().unwrap(); 1625 results.push(parse_iperf3_output(&output.stdout, receive, bandwidth)?); 1626 } else { 1627 let _ = c.kill(); 1628 let output = c.wait_with_output().unwrap(); 1629 println!( 1630 "=============== Client output [Error] ===============\n\n{}\n\n===========end============\n\n", 1631 String::from_utf8_lossy(&output.stdout) 1632 ); 1633 } 1634 } 1635 1636 if let Some(e) = err { 1637 Err(e) 1638 } else { 1639 Ok(results.iter().sum()) 1640 } 1641 } 1642 1643 pub fn parse_ethr_latency_output(output: &[u8]) -> Result<Vec<f64>, Error> { 1644 std::panic::catch_unwind(|| { 1645 let s = String::from_utf8_lossy(output); 1646 let mut latency = Vec::new(); 1647 for l in s.lines() { 1648 let v: Value = serde_json::from_str(l).expect("'ethr' parse error: invalid json line"); 1649 // Skip header/summary lines 1650 if let Some(avg) = v["Avg"].as_str() { 1651 // Assume the latency unit is always "us" 1652 latency.push( 1653 avg.split("us").collect::<Vec<&str>>()[0] 1654 .parse::<f64>() 1655 .expect("'ethr' parse error: invalid 'Avg' entry"), 1656 ); 1657 } 1658 } 1659 1660 assert!( 1661 !latency.is_empty(), 1662 "'ethr' parse error: no valid latency data found" 1663 ); 1664 1665 latency 1666 }) 1667 .map_err(|_| { 1668 eprintln!( 1669 "=== Start ethr output ===\n\n{}\n\n=== End ethr output ===\n\n", 1670 String::from_utf8_lossy(output) 1671 ); 1672 Error::EthrLogParse 1673 }) 1674 } 1675 1676 pub fn measure_virtio_net_latency(guest: &Guest, test_timeout: u32) -> Result<Vec<f64>, Error> { 1677 // copy the 'ethr' tool to the guest image 1678 let ethr_path = "/usr/local/bin/ethr"; 1679 let ethr_remote_path = "/tmp/ethr"; 1680 scp_to_guest( 1681 Path::new(ethr_path), 1682 Path::new(ethr_remote_path), 1683 &guest.network.guest_ip, 1684 //DEFAULT_SSH_RETRIES, 1685 1, 1686 DEFAULT_SSH_TIMEOUT, 1687 )?; 1688 1689 // Start the ethr server on the guest 1690 guest.ssh_command(&format!("{ethr_remote_path} -s &> /dev/null &"))?; 1691 1692 thread::sleep(Duration::new(10, 0)); 1693 1694 // Start the ethr client on the host 1695 let log_file = guest 1696 .tmp_dir 1697 .as_path() 1698 .join("ethr.client.log") 1699 .to_str() 1700 .unwrap() 1701 .to_string(); 1702 let mut c = Command::new(ethr_path) 1703 .args([ 1704 "-c", 1705 &guest.network.guest_ip, 1706 "-t", 1707 "l", 1708 "-o", 1709 &log_file, // file output is JSON format 1710 "-d", 1711 &format!("{test_timeout}s"), 1712 ]) 1713 .stderr(Stdio::piped()) 1714 .stdout(Stdio::piped()) 1715 .spawn() 1716 .map_err(Error::Spawn)?; 1717 1718 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5).map_err(Error::WaitTimeout) 1719 { 1720 let _ = c.kill(); 1721 return Err(e); 1722 } 1723 1724 // Parse the ethr latency test output 1725 let content = fs::read(log_file).map_err(Error::EthrLogFile)?; 1726 parse_ethr_latency_output(&content) 1727 } 1728