1 // Copyright © 2021 Intel Corporation 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 6 use once_cell::sync::Lazy; 7 use serde_json::Value; 8 use ssh2::Session; 9 use std::env; 10 use std::ffi::OsStr; 11 use std::io; 12 use std::io::{Read, Write}; 13 use std::net::TcpListener; 14 use std::net::TcpStream; 15 use std::os::unix::fs::PermissionsExt; 16 use std::os::unix::io::{AsRawFd, FromRawFd}; 17 use std::path::Path; 18 use std::process::{Child, Command, ExitStatus, Output, Stdio}; 19 use std::str::FromStr; 20 use std::sync::Mutex; 21 use std::thread; 22 use std::time::Duration; 23 use std::{fmt, fs}; 24 use vmm_sys_util::tempdir::TempDir; 25 use wait_timeout::ChildExt; 26 27 #[derive(Debug)] 28 pub enum WaitTimeoutError { 29 Timedout, 30 ExitStatus, 31 General(std::io::Error), 32 } 33 34 #[derive(Debug)] 35 pub enum Error { 36 Parsing(std::num::ParseIntError), 37 SshCommand(SshCommandError), 38 WaitForBoot(WaitForBootError), 39 EthrLogFile(std::io::Error), 40 EthrLogParse, 41 FioOutputParse, 42 Iperf3Parse, 43 Spawn(std::io::Error), 44 WaitTimeout(WaitTimeoutError), 45 } 46 47 impl From<SshCommandError> for Error { 48 fn from(e: SshCommandError) -> Self { 49 Self::SshCommand(e) 50 } 51 } 52 53 pub struct GuestNetworkConfig { 54 pub guest_ip: String, 55 pub l2_guest_ip1: String, 56 pub l2_guest_ip2: String, 57 pub l2_guest_ip3: String, 58 pub host_ip: String, 59 pub guest_mac: String, 60 pub l2_guest_mac1: String, 61 pub l2_guest_mac2: String, 62 pub l2_guest_mac3: String, 63 pub tcp_listener_port: u16, 64 } 65 66 pub const DEFAULT_TCP_LISTENER_MESSAGE: &str = "booted"; 67 pub const DEFAULT_TCP_LISTENER_PORT: u16 = 8000; 68 pub const DEFAULT_TCP_LISTENER_TIMEOUT: i32 = 120; 69 70 #[derive(Debug)] 71 pub enum WaitForBootError { 72 EpollWait(std::io::Error), 73 Listen(std::io::Error), 74 EpollWaitTimeout, 75 WrongGuestAddr, 76 Accept(std::io::Error), 77 } 78 79 impl GuestNetworkConfig { 80 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), WaitForBootError> { 81 let start = std::time::Instant::now(); 82 // The 'port' is unique per 'GUEST' and listening to wild-card ip avoids retrying on 'TcpListener::bind()' 83 let listen_addr = format!("0.0.0.0:{}", self.tcp_listener_port); 84 let expected_guest_addr = self.guest_ip.as_str(); 85 let mut s = String::new(); 86 let timeout = match custom_timeout { 87 Some(t) => t, 88 None => DEFAULT_TCP_LISTENER_TIMEOUT, 89 }; 90 91 match (|| -> Result<(), WaitForBootError> { 92 let listener = 93 TcpListener::bind(listen_addr.as_str()).map_err(WaitForBootError::Listen)?; 94 listener 95 .set_nonblocking(true) 96 .expect("Cannot set non-blocking for tcp listener"); 97 98 // Reply on epoll w/ timeout to wait for guest connections faithfully 99 let epoll_fd = epoll::create(true).expect("Cannot create epoll fd"); 100 // Use 'File' to enforce closing on 'epoll_fd' 101 let _epoll_file = unsafe { fs::File::from_raw_fd(epoll_fd) }; 102 epoll::ctl( 103 epoll_fd, 104 epoll::ControlOptions::EPOLL_CTL_ADD, 105 listener.as_raw_fd(), 106 epoll::Event::new(epoll::Events::EPOLLIN, 0), 107 ) 108 .expect("Cannot add 'tcp_listener' event to epoll"); 109 let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); 1]; 110 loop { 111 let num_events = match epoll::wait(epoll_fd, timeout * 1000_i32, &mut events[..]) { 112 Ok(num_events) => Ok(num_events), 113 Err(e) => match e.raw_os_error() { 114 Some(libc::EAGAIN) | Some(libc::EINTR) => continue, 115 _ => Err(e), 116 }, 117 } 118 .map_err(WaitForBootError::EpollWait)?; 119 if num_events == 0 { 120 return Err(WaitForBootError::EpollWaitTimeout); 121 } 122 break; 123 } 124 125 match listener.accept() { 126 Ok((_, addr)) => { 127 // Make sure the connection is from the expected 'guest_addr' 128 if addr.ip() != std::net::IpAddr::from_str(expected_guest_addr).unwrap() { 129 s = format!( 130 "Expecting the guest ip '{}' while being connected with ip '{}'", 131 expected_guest_addr, 132 addr.ip() 133 ); 134 return Err(WaitForBootError::WrongGuestAddr); 135 } 136 137 Ok(()) 138 } 139 Err(e) => { 140 s = "TcpListener::accept() failed".to_string(); 141 Err(WaitForBootError::Accept(e)) 142 } 143 } 144 })() { 145 Err(e) => { 146 let duration = start.elapsed(); 147 eprintln!( 148 "\n\n==== Start 'wait_vm_boot' (FAILED) ====\n\n\ 149 duration =\"{:?}, timeout = {}s\"\n\ 150 listen_addr=\"{}\"\n\ 151 expected_guest_addr=\"{}\"\n\ 152 message=\"{}\"\n\ 153 error=\"{:?}\"\n\ 154 \n==== End 'wait_vm_boot' outout ====\n\n", 155 duration, timeout, listen_addr, expected_guest_addr, s, e 156 ); 157 158 Err(e) 159 } 160 Ok(_) => Ok(()), 161 } 162 } 163 } 164 165 pub enum DiskType { 166 OperatingSystem, 167 CloudInit, 168 } 169 170 pub trait DiskConfig { 171 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig); 172 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String; 173 fn disk(&self, disk_type: DiskType) -> Option<String>; 174 } 175 176 #[derive(Clone)] 177 pub struct UbuntuDiskConfig { 178 osdisk_path: String, 179 cloudinit_path: String, 180 image_name: String, 181 } 182 183 impl UbuntuDiskConfig { 184 pub fn new(image_name: String) -> Self { 185 UbuntuDiskConfig { 186 image_name, 187 osdisk_path: String::new(), 188 cloudinit_path: String::new(), 189 } 190 } 191 } 192 193 pub struct WindowsDiskConfig { 194 image_name: String, 195 osdisk_path: String, 196 loopback_device: String, 197 windows_snapshot_cow: String, 198 windows_snapshot: String, 199 } 200 201 impl WindowsDiskConfig { 202 pub fn new(image_name: String) -> Self { 203 WindowsDiskConfig { 204 image_name, 205 osdisk_path: String::new(), 206 loopback_device: String::new(), 207 windows_snapshot_cow: String::new(), 208 windows_snapshot: String::new(), 209 } 210 } 211 } 212 213 impl Drop for WindowsDiskConfig { 214 fn drop(&mut self) { 215 // dmsetup remove windows-snapshot-1 216 std::process::Command::new("dmsetup") 217 .arg("remove") 218 .arg(self.windows_snapshot.as_str()) 219 .output() 220 .expect("Expect removing Windows snapshot with 'dmsetup' to succeed"); 221 222 // dmsetup remove windows-snapshot-cow-1 223 std::process::Command::new("dmsetup") 224 .arg("remove") 225 .arg(self.windows_snapshot_cow.as_str()) 226 .output() 227 .expect("Expect removing Windows snapshot CoW with 'dmsetup' to succeed"); 228 229 // losetup -d <loopback_device> 230 std::process::Command::new("losetup") 231 .args(["-d", self.loopback_device.as_str()]) 232 .output() 233 .expect("Expect removing loopback device to succeed"); 234 } 235 } 236 237 impl DiskConfig for UbuntuDiskConfig { 238 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String { 239 let cloudinit_file_path = 240 String::from(tmp_dir.as_path().join("cloudinit").to_str().unwrap()); 241 242 let cloud_init_directory = tmp_dir.as_path().join("cloud-init").join("ubuntu"); 243 244 fs::create_dir_all(&cloud_init_directory) 245 .expect("Expect creating cloud-init directory to succeed"); 246 247 let source_file_dir = std::env::current_dir() 248 .unwrap() 249 .join("test_data") 250 .join("cloud-init") 251 .join("ubuntu"); 252 253 vec!["meta-data"].iter().for_each(|x| { 254 rate_limited_copy(source_file_dir.join(x), cloud_init_directory.join(x)) 255 .expect("Expect copying cloud-init meta-data to succeed"); 256 }); 257 258 let mut user_data_string = String::new(); 259 fs::File::open(source_file_dir.join("user-data")) 260 .unwrap() 261 .read_to_string(&mut user_data_string) 262 .expect("Expected reading user-data file in to succeed"); 263 user_data_string = user_data_string.replace( 264 "@DEFAULT_TCP_LISTENER_MESSAGE", 265 DEFAULT_TCP_LISTENER_MESSAGE, 266 ); 267 user_data_string = user_data_string.replace("@HOST_IP", &network.host_ip); 268 user_data_string = 269 user_data_string.replace("@TCP_LISTENER_PORT", &network.tcp_listener_port.to_string()); 270 271 fs::File::create(cloud_init_directory.join("user-data")) 272 .unwrap() 273 .write_all(user_data_string.as_bytes()) 274 .expect("Expected writing out user-data to succeed"); 275 276 let mut network_config_string = String::new(); 277 278 fs::File::open(source_file_dir.join("network-config")) 279 .unwrap() 280 .read_to_string(&mut network_config_string) 281 .expect("Expected reading network-config file in to succeed"); 282 283 network_config_string = network_config_string.replace("192.168.2.1", &network.host_ip); 284 network_config_string = network_config_string.replace("192.168.2.2", &network.guest_ip); 285 network_config_string = network_config_string.replace("192.168.2.3", &network.l2_guest_ip1); 286 network_config_string = network_config_string.replace("192.168.2.4", &network.l2_guest_ip2); 287 network_config_string = network_config_string.replace("192.168.2.5", &network.l2_guest_ip3); 288 network_config_string = 289 network_config_string.replace("12:34:56:78:90:ab", &network.guest_mac); 290 network_config_string = 291 network_config_string.replace("de:ad:be:ef:12:34", &network.l2_guest_mac1); 292 network_config_string = 293 network_config_string.replace("de:ad:be:ef:34:56", &network.l2_guest_mac2); 294 network_config_string = 295 network_config_string.replace("de:ad:be:ef:56:78", &network.l2_guest_mac3); 296 297 fs::File::create(cloud_init_directory.join("network-config")) 298 .unwrap() 299 .write_all(network_config_string.as_bytes()) 300 .expect("Expected writing out network-config to succeed"); 301 302 std::process::Command::new("mkdosfs") 303 .args(["-n", "cidata"]) 304 .args(["-C", cloudinit_file_path.as_str()]) 305 .arg("8192") 306 .output() 307 .expect("Expect creating disk image to succeed"); 308 309 vec!["user-data", "meta-data", "network-config"] 310 .iter() 311 .for_each(|x| { 312 std::process::Command::new("mcopy") 313 .arg("-o") 314 .args(["-i", cloudinit_file_path.as_str()]) 315 .args(["-s", cloud_init_directory.join(x).to_str().unwrap(), "::"]) 316 .output() 317 .expect("Expect copying files to disk image to succeed"); 318 }); 319 320 cloudinit_file_path 321 } 322 323 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig) { 324 let mut workload_path = dirs::home_dir().unwrap(); 325 workload_path.push("workloads"); 326 327 let mut osdisk_base_path = workload_path; 328 osdisk_base_path.push(&self.image_name); 329 330 let osdisk_path = String::from(tmp_dir.as_path().join("osdisk.img").to_str().unwrap()); 331 let cloudinit_path = self.prepare_cloudinit(tmp_dir, network); 332 333 rate_limited_copy(osdisk_base_path, &osdisk_path) 334 .expect("copying of OS source disk image failed"); 335 336 self.cloudinit_path = cloudinit_path; 337 self.osdisk_path = osdisk_path; 338 } 339 340 fn disk(&self, disk_type: DiskType) -> Option<String> { 341 match disk_type { 342 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 343 DiskType::CloudInit => Some(self.cloudinit_path.clone()), 344 } 345 } 346 } 347 348 impl DiskConfig for WindowsDiskConfig { 349 fn prepare_cloudinit(&self, _tmp_dir: &TempDir, _network: &GuestNetworkConfig) -> String { 350 String::new() 351 } 352 353 fn prepare_files(&mut self, tmp_dir: &TempDir, _network: &GuestNetworkConfig) { 354 let mut workload_path = dirs::home_dir().unwrap(); 355 workload_path.push("workloads"); 356 357 let mut osdisk_path = workload_path; 358 osdisk_path.push(&self.image_name); 359 360 let osdisk_blk_size = fs::metadata(osdisk_path) 361 .expect("Expect retrieving Windows image metadata") 362 .len() 363 >> 9; 364 365 let snapshot_cow_path = 366 String::from(tmp_dir.as_path().join("snapshot_cow").to_str().unwrap()); 367 368 // Create and truncate CoW file for device mapper 369 let cow_file_size: u64 = 1 << 30; 370 let cow_file_blk_size = cow_file_size >> 9; 371 let cow_file = std::fs::File::create(snapshot_cow_path.as_str()) 372 .expect("Expect creating CoW image to succeed"); 373 cow_file 374 .set_len(cow_file_size) 375 .expect("Expect truncating CoW image to succeed"); 376 377 // losetup --find --show /tmp/snapshot_cow 378 let loopback_device = std::process::Command::new("losetup") 379 .arg("--find") 380 .arg("--show") 381 .arg(snapshot_cow_path.as_str()) 382 .output() 383 .expect("Expect creating loopback device from snapshot CoW image to succeed"); 384 385 self.loopback_device = String::from_utf8_lossy(&loopback_device.stdout) 386 .trim() 387 .to_string(); 388 389 let random_extension = tmp_dir.as_path().file_name().unwrap(); 390 let windows_snapshot_cow = format!( 391 "windows-snapshot-cow-{}", 392 random_extension.to_str().unwrap() 393 ); 394 395 // dmsetup create windows-snapshot-cow-1 --table '0 2097152 linear /dev/loop1 0' 396 std::process::Command::new("dmsetup") 397 .arg("create") 398 .arg(windows_snapshot_cow.as_str()) 399 .args([ 400 "--table", 401 format!("0 {} linear {} 0", cow_file_blk_size, self.loopback_device).as_str(), 402 ]) 403 .output() 404 .expect("Expect creating Windows snapshot CoW with 'dmsetup' to succeed"); 405 406 let windows_snapshot = format!("windows-snapshot-{}", random_extension.to_str().unwrap()); 407 408 // dmsetup mknodes 409 std::process::Command::new("dmsetup") 410 .arg("mknodes") 411 .output() 412 .expect("Expect device mapper nodes to be ready"); 413 414 // dmsetup create windows-snapshot-1 --table '0 41943040 snapshot /dev/mapper/windows-base /dev/mapper/windows-snapshot-cow-1 P 8' 415 std::process::Command::new("dmsetup") 416 .arg("create") 417 .arg(windows_snapshot.as_str()) 418 .args([ 419 "--table", 420 format!( 421 "0 {} snapshot /dev/mapper/windows-base /dev/mapper/{} P 8", 422 osdisk_blk_size, 423 windows_snapshot_cow.as_str() 424 ) 425 .as_str(), 426 ]) 427 .output() 428 .expect("Expect creating Windows snapshot with 'dmsetup' to succeed"); 429 430 // dmsetup mknodes 431 std::process::Command::new("dmsetup") 432 .arg("mknodes") 433 .output() 434 .expect("Expect device mapper nodes to be ready"); 435 436 self.osdisk_path = format!("/dev/mapper/{}", windows_snapshot); 437 self.windows_snapshot_cow = windows_snapshot_cow; 438 self.windows_snapshot = windows_snapshot; 439 } 440 441 fn disk(&self, disk_type: DiskType) -> Option<String> { 442 match disk_type { 443 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 444 DiskType::CloudInit => None, 445 } 446 } 447 } 448 449 pub fn rate_limited_copy<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<u64> { 450 for i in 0..10 { 451 let free_bytes = unsafe { 452 let mut stats = std::mem::MaybeUninit::zeroed(); 453 let fs_name = std::ffi::CString::new("/tmp").unwrap(); 454 libc::statvfs(fs_name.as_ptr(), stats.as_mut_ptr()); 455 456 let free_blocks = stats.assume_init().f_bfree; 457 let block_size = stats.assume_init().f_bsize; 458 459 free_blocks * block_size 460 }; 461 462 // Make sure there is at least 6 GiB of space 463 if free_bytes < 6 << 30 { 464 eprintln!( 465 "Not enough space on disk ({}). Attempt {} of 10. Sleeping.", 466 free_bytes, i 467 ); 468 thread::sleep(std::time::Duration::new(60, 0)); 469 continue; 470 } 471 472 match fs::copy(&from, &to) { 473 Err(e) => { 474 if let Some(errno) = e.raw_os_error() { 475 if errno == libc::ENOSPC { 476 eprintln!("Copy returned ENOSPC. Attempt {} of 10. Sleeping.", i); 477 thread::sleep(std::time::Duration::new(60, 0)); 478 continue; 479 } 480 } 481 return Err(e); 482 } 483 Ok(i) => return Ok(i), 484 } 485 } 486 Err(io::Error::last_os_error()) 487 } 488 489 pub fn handle_child_output( 490 r: Result<(), std::boxed::Box<dyn std::any::Any + std::marker::Send>>, 491 output: &std::process::Output, 492 ) { 493 use std::os::unix::process::ExitStatusExt; 494 if r.is_ok() && output.status.success() { 495 return; 496 } 497 498 match output.status.code() { 499 None => { 500 // Don't treat child.kill() as a problem 501 if output.status.signal() == Some(9) && r.is_ok() { 502 return; 503 } 504 505 eprintln!( 506 "==== child killed by signal: {} ====", 507 output.status.signal().unwrap() 508 ); 509 } 510 Some(code) => { 511 eprintln!("\n\n==== child exit code: {} ====", code); 512 } 513 } 514 515 eprintln!( 516 "\n\n==== Start child stdout ====\n\n{}\n\n==== End child stdout ====", 517 String::from_utf8_lossy(&output.stdout) 518 ); 519 eprintln!( 520 "\n\n==== Start child stderr ====\n\n{}\n\n==== End child stderr ====", 521 String::from_utf8_lossy(&output.stderr) 522 ); 523 524 panic!("Test failed") 525 } 526 527 #[derive(Debug)] 528 pub struct PasswordAuth { 529 pub username: String, 530 pub password: String, 531 } 532 533 pub const DEFAULT_SSH_RETRIES: u8 = 6; 534 pub const DEFAULT_SSH_TIMEOUT: u8 = 10; 535 536 #[derive(Debug)] 537 pub enum SshCommandError { 538 Connection(std::io::Error), 539 Handshake(ssh2::Error), 540 Authentication(ssh2::Error), 541 ChannelSession(ssh2::Error), 542 Command(ssh2::Error), 543 ExitStatus(ssh2::Error), 544 NonZeroExitStatus(i32), 545 FileRead(std::io::Error), 546 FileMetadata(std::io::Error), 547 ScpSend(ssh2::Error), 548 WriteAll(std::io::Error), 549 SendEof(ssh2::Error), 550 WaitEof(ssh2::Error), 551 } 552 553 fn scp_to_guest_with_auth( 554 path: &Path, 555 remote_path: &Path, 556 auth: &PasswordAuth, 557 ip: &str, 558 retries: u8, 559 timeout: u8, 560 ) -> Result<(), SshCommandError> { 561 let mut counter = 0; 562 loop { 563 match (|| -> Result<(), SshCommandError> { 564 let tcp = 565 TcpStream::connect(format!("{}:22", ip)).map_err(SshCommandError::Connection)?; 566 let mut sess = Session::new().unwrap(); 567 sess.set_tcp_stream(tcp); 568 sess.handshake().map_err(SshCommandError::Handshake)?; 569 570 sess.userauth_password(&auth.username, &auth.password) 571 .map_err(SshCommandError::Authentication)?; 572 assert!(sess.authenticated()); 573 574 let content = fs::read(path).map_err(SshCommandError::FileRead)?; 575 let mode = fs::metadata(path) 576 .map_err(SshCommandError::FileMetadata)? 577 .permissions() 578 .mode() 579 & 0o777; 580 581 let mut channel = sess 582 .scp_send(remote_path, mode as i32, content.len() as u64, None) 583 .map_err(SshCommandError::ScpSend)?; 584 channel 585 .write_all(&content) 586 .map_err(SshCommandError::WriteAll)?; 587 channel.send_eof().map_err(SshCommandError::SendEof)?; 588 channel.wait_eof().map_err(SshCommandError::WaitEof)?; 589 590 // Intentionally ignore these results here as their failure 591 // does not precipitate a repeat 592 let _ = channel.close(); 593 let _ = channel.wait_close(); 594 595 Ok(()) 596 })() { 597 Ok(_) => break, 598 Err(e) => { 599 counter += 1; 600 if counter >= retries { 601 eprintln!( 602 "\n\n==== Start scp command output (FAILED) ====\n\n\ 603 path =\"{:?}\"\n\ 604 remote_path =\"{:?}\"\n\ 605 auth=\"{:#?}\"\n\ 606 ip=\"{}\"\n\ 607 error=\"{:?}\"\n\ 608 \n==== End scp command outout ====\n\n", 609 path, remote_path, auth, ip, e 610 ); 611 612 return Err(e); 613 } 614 } 615 }; 616 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 617 } 618 Ok(()) 619 } 620 621 pub fn scp_to_guest( 622 path: &Path, 623 remote_path: &Path, 624 ip: &str, 625 retries: u8, 626 timeout: u8, 627 ) -> Result<(), SshCommandError> { 628 scp_to_guest_with_auth( 629 path, 630 remote_path, 631 &PasswordAuth { 632 username: String::from("cloud"), 633 password: String::from("cloud123"), 634 }, 635 ip, 636 retries, 637 timeout, 638 ) 639 } 640 641 pub fn ssh_command_ip_with_auth( 642 command: &str, 643 auth: &PasswordAuth, 644 ip: &str, 645 retries: u8, 646 timeout: u8, 647 ) -> Result<String, SshCommandError> { 648 let mut s = String::new(); 649 650 let mut counter = 0; 651 loop { 652 match (|| -> Result<(), SshCommandError> { 653 let tcp = 654 TcpStream::connect(format!("{}:22", ip)).map_err(SshCommandError::Connection)?; 655 let mut sess = Session::new().unwrap(); 656 sess.set_tcp_stream(tcp); 657 sess.handshake().map_err(SshCommandError::Handshake)?; 658 659 sess.userauth_password(&auth.username, &auth.password) 660 .map_err(SshCommandError::Authentication)?; 661 assert!(sess.authenticated()); 662 663 let mut channel = sess 664 .channel_session() 665 .map_err(SshCommandError::ChannelSession)?; 666 channel.exec(command).map_err(SshCommandError::Command)?; 667 668 // Intentionally ignore these results here as their failure 669 // does not precipitate a repeat 670 let _ = channel.read_to_string(&mut s); 671 let _ = channel.close(); 672 let _ = channel.wait_close(); 673 674 let status = channel.exit_status().map_err(SshCommandError::ExitStatus)?; 675 676 if status != 0 { 677 Err(SshCommandError::NonZeroExitStatus(status)) 678 } else { 679 Ok(()) 680 } 681 })() { 682 Ok(_) => break, 683 Err(e) => { 684 counter += 1; 685 if counter >= retries { 686 eprintln!( 687 "\n\n==== Start ssh command output (FAILED) ====\n\n\ 688 command=\"{}\"\n\ 689 auth=\"{:#?}\"\n\ 690 ip=\"{}\"\n\ 691 output=\"{}\"\n\ 692 error=\"{:?}\"\n\ 693 \n==== End ssh command outout ====\n\n", 694 command, auth, ip, s, e 695 ); 696 697 return Err(e); 698 } 699 } 700 }; 701 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 702 } 703 Ok(s) 704 } 705 706 pub fn ssh_command_ip( 707 command: &str, 708 ip: &str, 709 retries: u8, 710 timeout: u8, 711 ) -> Result<String, SshCommandError> { 712 ssh_command_ip_with_auth( 713 command, 714 &PasswordAuth { 715 username: String::from("cloud"), 716 password: String::from("cloud123"), 717 }, 718 ip, 719 retries, 720 timeout, 721 ) 722 } 723 724 pub fn exec_host_command_status(command: &str) -> ExitStatus { 725 std::process::Command::new("bash") 726 .args(["-c", command]) 727 .status() 728 .unwrap_or_else(|_| panic!("Expected '{}' to run", command)) 729 } 730 731 pub fn exec_host_command_output(command: &str) -> Output { 732 std::process::Command::new("bash") 733 .args(["-c", command]) 734 .output() 735 .unwrap_or_else(|_| panic!("Expected '{}' to run", command)) 736 } 737 738 pub const PIPE_SIZE: i32 = 32 << 20; 739 740 static NEXT_VM_ID: Lazy<Mutex<u8>> = Lazy::new(|| Mutex::new(1)); 741 742 pub struct Guest { 743 pub tmp_dir: TempDir, 744 pub disk_config: Box<dyn DiskConfig>, 745 pub network: GuestNetworkConfig, 746 } 747 748 // Safe to implement as we know we have no interior mutability 749 impl std::panic::RefUnwindSafe for Guest {} 750 751 impl Guest { 752 pub fn new_from_ip_range(mut disk_config: Box<dyn DiskConfig>, class: &str, id: u8) -> Self { 753 let tmp_dir = TempDir::new_with_prefix("/tmp/ch").unwrap(); 754 755 let network = GuestNetworkConfig { 756 guest_ip: format!("{}.{}.2", class, id), 757 l2_guest_ip1: format!("{}.{}.3", class, id), 758 l2_guest_ip2: format!("{}.{}.4", class, id), 759 l2_guest_ip3: format!("{}.{}.5", class, id), 760 host_ip: format!("{}.{}.1", class, id), 761 guest_mac: format!("12:34:56:78:90:{:02x}", id), 762 l2_guest_mac1: format!("de:ad:be:ef:12:{:02x}", id), 763 l2_guest_mac2: format!("de:ad:be:ef:34:{:02x}", id), 764 l2_guest_mac3: format!("de:ad:be:ef:56:{:02x}", id), 765 tcp_listener_port: DEFAULT_TCP_LISTENER_PORT + id as u16, 766 }; 767 768 disk_config.prepare_files(&tmp_dir, &network); 769 770 Guest { 771 tmp_dir, 772 disk_config, 773 network, 774 } 775 } 776 777 pub fn new(disk_config: Box<dyn DiskConfig>) -> Self { 778 let mut guard = NEXT_VM_ID.lock().unwrap(); 779 let id = *guard; 780 *guard = id + 1; 781 782 Self::new_from_ip_range(disk_config, "192.168", id) 783 } 784 785 pub fn default_net_string(&self) -> String { 786 format!( 787 "tap=,mac={},ip={},mask=255.255.255.0", 788 self.network.guest_mac, self.network.host_ip 789 ) 790 } 791 792 pub fn default_net_string_w_iommu(&self) -> String { 793 format!( 794 "tap=,mac={},ip={},mask=255.255.255.0,iommu=on", 795 self.network.guest_mac, self.network.host_ip 796 ) 797 } 798 799 pub fn default_net_string_w_mtu(&self, mtu: u16) -> String { 800 format!( 801 "tap=,mac={},ip={},mask=255.255.255.0,mtu={}", 802 self.network.guest_mac, self.network.host_ip, mtu 803 ) 804 } 805 806 pub fn ssh_command(&self, command: &str) -> Result<String, SshCommandError> { 807 ssh_command_ip( 808 command, 809 &self.network.guest_ip, 810 DEFAULT_SSH_RETRIES, 811 DEFAULT_SSH_TIMEOUT, 812 ) 813 } 814 815 #[cfg(target_arch = "x86_64")] 816 pub fn ssh_command_l1(&self, command: &str) -> Result<String, SshCommandError> { 817 ssh_command_ip( 818 command, 819 &self.network.guest_ip, 820 DEFAULT_SSH_RETRIES, 821 DEFAULT_SSH_TIMEOUT, 822 ) 823 } 824 825 #[cfg(target_arch = "x86_64")] 826 pub fn ssh_command_l2_1(&self, command: &str) -> Result<String, SshCommandError> { 827 ssh_command_ip( 828 command, 829 &self.network.l2_guest_ip1, 830 DEFAULT_SSH_RETRIES, 831 DEFAULT_SSH_TIMEOUT, 832 ) 833 } 834 835 #[cfg(target_arch = "x86_64")] 836 pub fn ssh_command_l2_2(&self, command: &str) -> Result<String, SshCommandError> { 837 ssh_command_ip( 838 command, 839 &self.network.l2_guest_ip2, 840 DEFAULT_SSH_RETRIES, 841 DEFAULT_SSH_TIMEOUT, 842 ) 843 } 844 845 #[cfg(target_arch = "x86_64")] 846 pub fn ssh_command_l2_3(&self, command: &str) -> Result<String, SshCommandError> { 847 ssh_command_ip( 848 command, 849 &self.network.l2_guest_ip3, 850 DEFAULT_SSH_RETRIES, 851 DEFAULT_SSH_TIMEOUT, 852 ) 853 } 854 855 pub fn api_create_body(&self, cpu_count: u8, kernel_path: &str, kernel_cmd: &str) -> String { 856 format! {"{{\"cpus\":{{\"boot_vcpus\":{},\"max_vcpus\":{}}},\"payload\":{{\"kernel\":\"{}\",\"cmdline\": \"{}\"}},\"net\":[{{\"ip\":\"{}\", \"mask\":\"255.255.255.0\", \"mac\":\"{}\"}}], \"disks\":[{{\"path\":\"{}\"}}, {{\"path\":\"{}\"}}]}}", 857 cpu_count, 858 cpu_count, 859 kernel_path, 860 kernel_cmd, 861 self.network.host_ip, 862 self.network.guest_mac, 863 self.disk_config.disk(DiskType::OperatingSystem).unwrap().as_str(), 864 self.disk_config.disk(DiskType::CloudInit).unwrap().as_str(), 865 } 866 } 867 868 pub fn get_cpu_count(&self) -> Result<u32, Error> { 869 self.ssh_command("grep -c processor /proc/cpuinfo")? 870 .trim() 871 .parse() 872 .map_err(Error::Parsing) 873 } 874 875 #[cfg(target_arch = "x86_64")] 876 pub fn get_initial_apicid(&self) -> Result<u32, Error> { 877 self.ssh_command("grep \"initial apicid\" /proc/cpuinfo | grep -o \"[0-9]*\"")? 878 .trim() 879 .parse() 880 .map_err(Error::Parsing) 881 } 882 883 pub fn get_total_memory(&self) -> Result<u32, Error> { 884 self.ssh_command("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 885 .trim() 886 .parse() 887 .map_err(Error::Parsing) 888 } 889 890 #[cfg(target_arch = "x86_64")] 891 pub fn get_total_memory_l2(&self) -> Result<u32, Error> { 892 self.ssh_command_l2_1("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 893 .trim() 894 .parse() 895 .map_err(Error::Parsing) 896 } 897 898 pub fn get_numa_node_memory(&self, node_id: usize) -> Result<u32, Error> { 899 self.ssh_command( 900 format!( 901 "grep MemTotal /sys/devices/system/node/node{}/meminfo \ 902 | cut -d \":\" -f 2 | grep -o \"[0-9]*\"", 903 node_id 904 ) 905 .as_str(), 906 )? 907 .trim() 908 .parse() 909 .map_err(Error::Parsing) 910 } 911 912 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), Error> { 913 self.network 914 .wait_vm_boot(custom_timeout) 915 .map_err(Error::WaitForBoot) 916 } 917 918 pub fn check_numa_node_cpus(&self, node_id: usize, cpus: Vec<usize>) -> Result<(), Error> { 919 for cpu in cpus.iter() { 920 let cmd = format!( 921 "[ -d \"/sys/devices/system/node/node{}/cpu{}\" ]", 922 node_id, cpu 923 ); 924 self.ssh_command(cmd.as_str())?; 925 } 926 927 Ok(()) 928 } 929 930 pub fn check_numa_node_distances( 931 &self, 932 node_id: usize, 933 distances: &str, 934 ) -> Result<bool, Error> { 935 let cmd = format!("cat /sys/devices/system/node/node{}/distance", node_id); 936 if self.ssh_command(cmd.as_str())?.trim() == distances { 937 Ok(true) 938 } else { 939 Ok(false) 940 } 941 } 942 943 pub fn check_numa_common( 944 &self, 945 mem_ref: Option<&[u32]>, 946 node_ref: Option<&[Vec<usize>]>, 947 distance_ref: Option<&[&str]>, 948 ) { 949 if let Some(mem_ref) = mem_ref { 950 // Check each NUMA node has been assigned the right amount of 951 // memory. 952 for (i, &m) in mem_ref.iter().enumerate() { 953 assert!(self.get_numa_node_memory(i).unwrap_or_default() > m); 954 } 955 } 956 957 if let Some(node_ref) = node_ref { 958 // Check each NUMA node has been assigned the right CPUs set. 959 for (i, n) in node_ref.iter().enumerate() { 960 self.check_numa_node_cpus(i, n.clone()).unwrap(); 961 } 962 } 963 964 if let Some(distance_ref) = distance_ref { 965 // Check each NUMA node has been assigned the right distances. 966 for (i, &d) in distance_ref.iter().enumerate() { 967 assert!(self.check_numa_node_distances(i, d).unwrap()); 968 } 969 } 970 } 971 972 #[cfg(target_arch = "x86_64")] 973 pub fn check_sgx_support(&self) -> Result<(), Error> { 974 self.ssh_command( 975 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX: \ 976 Software Guard Extensions supported = true'", 977 )?; 978 self.ssh_command( 979 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX_LC: \ 980 SGX launch config supported = true'", 981 )?; 982 self.ssh_command( 983 "cpuid -l 0x12 -s 0 | tr -s [:space:] | grep -q 'SGX1 \ 984 supported = true'", 985 )?; 986 987 Ok(()) 988 } 989 990 pub fn get_pci_bridge_class(&self) -> Result<String, Error> { 991 Ok(self 992 .ssh_command("cat /sys/bus/pci/devices/0000:00:00.0/class")? 993 .trim() 994 .to_string()) 995 } 996 997 pub fn get_pci_device_ids(&self) -> Result<String, Error> { 998 Ok(self 999 .ssh_command("cat /sys/bus/pci/devices/*/device")? 1000 .trim() 1001 .to_string()) 1002 } 1003 1004 pub fn get_pci_vendor_ids(&self) -> Result<String, Error> { 1005 Ok(self 1006 .ssh_command("cat /sys/bus/pci/devices/*/vendor")? 1007 .trim() 1008 .to_string()) 1009 } 1010 1011 pub fn does_device_vendor_pair_match( 1012 &self, 1013 device_id: &str, 1014 vendor_id: &str, 1015 ) -> Result<bool, Error> { 1016 // We are checking if console device's device id and vendor id pair matches 1017 let devices = self.get_pci_device_ids()?; 1018 let devices: Vec<&str> = devices.split('\n').collect(); 1019 let vendors = self.get_pci_vendor_ids()?; 1020 let vendors: Vec<&str> = vendors.split('\n').collect(); 1021 1022 for (index, d_id) in devices.iter().enumerate() { 1023 if *d_id == device_id { 1024 if let Some(v_id) = vendors.get(index) { 1025 if *v_id == vendor_id { 1026 return Ok(true); 1027 } 1028 } 1029 } 1030 } 1031 1032 Ok(false) 1033 } 1034 1035 pub fn check_vsock(&self, socket: &str) { 1036 // Listen from guest on vsock CID=3 PORT=16 1037 // SOCKET-LISTEN:<domain>:<protocol>:<local-address> 1038 let guest_ip = self.network.guest_ip.clone(); 1039 let listen_socat = thread::spawn(move || { 1040 ssh_command_ip("sudo socat - SOCKET-LISTEN:40:0:x00x00x10x00x00x00x03x00x00x00x00x00x00x00 > vsock_log", &guest_ip, DEFAULT_SSH_RETRIES, DEFAULT_SSH_TIMEOUT).unwrap(); 1041 }); 1042 1043 // Make sure socat is listening, which might take a few second on slow systems 1044 thread::sleep(std::time::Duration::new(10, 0)); 1045 1046 // Write something to vsock from the host 1047 assert!(exec_host_command_status(&format!( 1048 "echo -e \"CONNECT 16\\nHelloWorld!\" | socat - UNIX-CONNECT:{}", 1049 socket 1050 )) 1051 .success()); 1052 1053 // Wait for the thread to terminate. 1054 listen_socat.join().unwrap(); 1055 1056 assert_eq!( 1057 self.ssh_command("cat vsock_log").unwrap().trim(), 1058 "HelloWorld!" 1059 ); 1060 } 1061 1062 #[cfg(target_arch = "x86_64")] 1063 pub fn check_nvidia_gpu(&self) { 1064 // Run CUDA sample to validate it can find the device 1065 let device_query_result = self 1066 .ssh_command("sudo /root/NVIDIA_CUDA-11.3_Samples/bin/x86_64/linux/release/deviceQuery") 1067 .unwrap(); 1068 assert!(device_query_result.contains("Detected 1 CUDA Capable device")); 1069 assert!(device_query_result.contains("Device 0: \"NVIDIA Tesla T4\"")); 1070 assert!(device_query_result.contains("Result = PASS")); 1071 1072 // Run NVIDIA DCGM Diagnostics to validate the device is functional 1073 self.ssh_command("sudo nv-hostengine").unwrap(); 1074 1075 assert!(self 1076 .ssh_command("sudo dcgmi discovery -l") 1077 .unwrap() 1078 .contains("Name: NVIDIA Tesla T4")); 1079 assert_eq!( 1080 self.ssh_command("sudo dcgmi diag -r 'diagnostic' | grep Pass | wc -l") 1081 .unwrap() 1082 .trim(), 1083 "10" 1084 ); 1085 } 1086 1087 pub fn reboot_linux(&self, current_reboot_count: u32, custom_timeout: Option<i32>) { 1088 let list_boots_cmd = "sudo journalctl --list-boots | wc -l"; 1089 let boot_count = self 1090 .ssh_command(list_boots_cmd) 1091 .unwrap() 1092 .trim() 1093 .parse::<u32>() 1094 .unwrap_or_default(); 1095 1096 assert_eq!(boot_count, current_reboot_count + 1); 1097 self.ssh_command("sudo reboot").unwrap(); 1098 1099 self.wait_vm_boot(custom_timeout).unwrap(); 1100 let boot_count = self 1101 .ssh_command(list_boots_cmd) 1102 .unwrap() 1103 .trim() 1104 .parse::<u32>() 1105 .unwrap_or_default(); 1106 assert_eq!(boot_count, current_reboot_count + 2); 1107 } 1108 1109 pub fn enable_memory_hotplug(&self) { 1110 self.ssh_command("echo online | sudo tee /sys/devices/system/memory/auto_online_blocks") 1111 .unwrap(); 1112 } 1113 1114 pub fn check_devices_common( 1115 &self, 1116 socket: Option<&String>, 1117 console_text: Option<&String>, 1118 pmem_path: Option<&String>, 1119 ) { 1120 // Check block devices are readable 1121 self.ssh_command("sudo dd if=/dev/vda of=/dev/null bs=1M iflag=direct count=1024") 1122 .unwrap(); 1123 self.ssh_command("sudo dd if=/dev/vdb of=/dev/null bs=1M iflag=direct count=8") 1124 .unwrap(); 1125 // Check if the rng device is readable 1126 self.ssh_command("sudo head -c 1000 /dev/hwrng > /dev/null") 1127 .unwrap(); 1128 // Check vsock 1129 if let Some(socket) = socket { 1130 self.check_vsock(socket.as_str()); 1131 } 1132 // Check if the console is usable 1133 if let Some(console_text) = console_text { 1134 let console_cmd = format!("echo {} | sudo tee /dev/hvc0", console_text); 1135 self.ssh_command(&console_cmd).unwrap(); 1136 } 1137 // The net device is 'automatically' exercised through the above 'ssh' commands 1138 1139 // Check if the pmem device is usable 1140 if let Some(pmem_path) = pmem_path { 1141 assert_eq!( 1142 self.ssh_command(&format!("ls {}", pmem_path)) 1143 .unwrap() 1144 .trim(), 1145 pmem_path 1146 ); 1147 assert_eq!( 1148 self.ssh_command(&format!("sudo mount {} /mnt", pmem_path)) 1149 .unwrap(), 1150 "" 1151 ); 1152 assert_eq!(self.ssh_command("ls /mnt").unwrap(), "lost+found\n"); 1153 self.ssh_command("echo test123 | sudo tee /mnt/test") 1154 .unwrap(); 1155 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1156 assert_eq!(self.ssh_command("ls /mnt").unwrap(), ""); 1157 1158 assert_eq!( 1159 self.ssh_command(&format!("sudo mount {} /mnt", pmem_path)) 1160 .unwrap(), 1161 "" 1162 ); 1163 assert_eq!( 1164 self.ssh_command("sudo cat /mnt/test || true") 1165 .unwrap() 1166 .trim(), 1167 "test123" 1168 ); 1169 self.ssh_command("sudo rm /mnt/test").unwrap(); 1170 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1171 } 1172 } 1173 } 1174 1175 pub enum VerbosityLevel { 1176 Warn, 1177 Info, 1178 Debug, 1179 } 1180 1181 impl Default for VerbosityLevel { 1182 fn default() -> Self { 1183 Self::Warn 1184 } 1185 } 1186 1187 impl ToString for VerbosityLevel { 1188 fn to_string(&self) -> String { 1189 use VerbosityLevel::*; 1190 match self { 1191 Warn => "".to_string(), 1192 Info => "-v".to_string(), 1193 Debug => "-vv".to_string(), 1194 } 1195 } 1196 } 1197 1198 pub struct GuestCommand<'a> { 1199 command: Command, 1200 guest: &'a Guest, 1201 capture_output: bool, 1202 print_cmd: bool, 1203 verbosity: VerbosityLevel, 1204 } 1205 1206 impl<'a> GuestCommand<'a> { 1207 pub fn new(guest: &'a Guest) -> Self { 1208 Self::new_with_binary_path(guest, &clh_command("cloud-hypervisor")) 1209 } 1210 1211 pub fn new_with_binary_path(guest: &'a Guest, binary_path: &str) -> Self { 1212 Self { 1213 command: Command::new(binary_path), 1214 guest, 1215 capture_output: false, 1216 print_cmd: true, 1217 verbosity: VerbosityLevel::Info, 1218 } 1219 } 1220 1221 pub fn verbosity(&mut self, verbosity: VerbosityLevel) -> &mut Self { 1222 self.verbosity = verbosity; 1223 self 1224 } 1225 1226 pub fn capture_output(&mut self) -> &mut Self { 1227 self.capture_output = true; 1228 self 1229 } 1230 1231 pub fn set_print_cmd(&mut self, print_cmd: bool) -> &mut Self { 1232 self.print_cmd = print_cmd; 1233 self 1234 } 1235 1236 pub fn spawn(&mut self) -> io::Result<Child> { 1237 use VerbosityLevel::*; 1238 match &self.verbosity { 1239 Warn => {} 1240 Info => { 1241 self.command.arg("-v"); 1242 } 1243 Debug => { 1244 self.command.arg("-vv"); 1245 } 1246 }; 1247 1248 if self.print_cmd { 1249 println!( 1250 "\n\n==== Start cloud-hypervisor command-line ====\n\n\ 1251 {:?}\n\ 1252 \n==== End cloud-hypervisor command-line ====\n\n", 1253 self.command 1254 ); 1255 } 1256 1257 if self.capture_output { 1258 let child = self 1259 .command 1260 .stderr(Stdio::piped()) 1261 .stdout(Stdio::piped()) 1262 .spawn() 1263 .unwrap(); 1264 1265 let fd = child.stdout.as_ref().unwrap().as_raw_fd(); 1266 let pipesize = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1267 if pipesize == -1 { 1268 return Err(io::Error::last_os_error()); 1269 } 1270 let fd = child.stderr.as_ref().unwrap().as_raw_fd(); 1271 let pipesize1 = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1272 if pipesize1 == -1 { 1273 return Err(io::Error::last_os_error()); 1274 } 1275 1276 if pipesize >= PIPE_SIZE && pipesize1 >= PIPE_SIZE { 1277 Ok(child) 1278 } else { 1279 Err(std::io::Error::new( 1280 std::io::ErrorKind::Other, 1281 format!( 1282 "resizing pipe w/ 'fnctl' failed: stdout pipesize {}, stderr pipesize {}", 1283 pipesize, pipesize1 1284 ), 1285 )) 1286 } 1287 } else { 1288 self.command.spawn() 1289 } 1290 } 1291 1292 pub fn args<I, S>(&mut self, args: I) -> &mut Self 1293 where 1294 I: IntoIterator<Item = S>, 1295 S: AsRef<OsStr>, 1296 { 1297 self.command.args(args); 1298 self 1299 } 1300 1301 pub fn default_disks(&mut self) -> &mut Self { 1302 if self.guest.disk_config.disk(DiskType::CloudInit).is_some() { 1303 self.args([ 1304 "--disk", 1305 format!( 1306 "path={}", 1307 self.guest 1308 .disk_config 1309 .disk(DiskType::OperatingSystem) 1310 .unwrap() 1311 ) 1312 .as_str(), 1313 format!( 1314 "path={}", 1315 self.guest.disk_config.disk(DiskType::CloudInit).unwrap() 1316 ) 1317 .as_str(), 1318 ]) 1319 } else { 1320 self.args([ 1321 "--disk", 1322 format!( 1323 "path={}", 1324 self.guest 1325 .disk_config 1326 .disk(DiskType::OperatingSystem) 1327 .unwrap() 1328 ) 1329 .as_str(), 1330 ]) 1331 } 1332 } 1333 1334 pub fn default_net(&mut self) -> &mut Self { 1335 self.args(["--net", self.guest.default_net_string().as_str()]) 1336 } 1337 } 1338 1339 pub fn clh_command(cmd: &str) -> String { 1340 env::var("BUILD_TARGET").map_or( 1341 format!("target/x86_64-unknown-linux-gnu/release/{}", cmd), 1342 |target| format!("target/{}/release/{}", target, cmd), 1343 ) 1344 } 1345 1346 pub fn parse_iperf3_output(output: &[u8], sender: bool) -> Result<f64, Error> { 1347 std::panic::catch_unwind(|| { 1348 let s = String::from_utf8_lossy(output); 1349 let v: Value = serde_json::from_str(&s).expect("'iperf3' parse error: invalid json output"); 1350 1351 let bps: f64 = if sender { 1352 v["end"]["sum_sent"]["bits_per_second"] 1353 .as_f64() 1354 .expect("'iperf3' parse error: missing entry 'end.sum_sent.bits_per_second'") 1355 } else { 1356 v["end"]["sum_received"]["bits_per_second"] 1357 .as_f64() 1358 .expect("'iperf3' parse error: missing entry 'end.sum_received.bits_per_second'") 1359 }; 1360 1361 bps 1362 }) 1363 .map_err(|_| { 1364 eprintln!( 1365 "=============== iperf3 output ===============\n\n{}\n\n===========end============\n\n", 1366 String::from_utf8_lossy(output) 1367 ); 1368 Error::Iperf3Parse 1369 }) 1370 } 1371 1372 pub enum FioOps { 1373 Read, 1374 RandomRead, 1375 Write, 1376 RandomWrite, 1377 ReadWrite, 1378 RandRW, 1379 } 1380 1381 impl fmt::Display for FioOps { 1382 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1383 match self { 1384 FioOps::Read => write!(f, "read"), 1385 FioOps::RandomRead => write!(f, "randread"), 1386 FioOps::Write => write!(f, "write"), 1387 FioOps::RandomWrite => write!(f, "randwrite"), 1388 FioOps::ReadWrite => write!(f, "rw"), 1389 FioOps::RandRW => write!(f, "randrw"), 1390 } 1391 } 1392 } 1393 1394 pub fn parse_fio_output(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1395 std::panic::catch_unwind(|| { 1396 let v: Value = 1397 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1398 let jobs = v["jobs"] 1399 .as_array() 1400 .expect("'fio' parse error: missing entry 'jobs'"); 1401 assert_eq!( 1402 jobs.len(), 1403 num_jobs as usize, 1404 "'fio' parse error: Unexpected number of 'fio' jobs." 1405 ); 1406 1407 let (read, write) = match fio_ops { 1408 FioOps::Read | FioOps::RandomRead => (true, false), 1409 FioOps::Write | FioOps::RandomWrite => (false, true), 1410 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1411 }; 1412 1413 let mut total_bps = 0_f64; 1414 for j in jobs { 1415 if read { 1416 let bytes = j["read"]["io_bytes"] 1417 .as_u64() 1418 .expect("'fio' parse error: missing entry 'read.io_bytes'"); 1419 let runtime = j["read"]["runtime"] 1420 .as_u64() 1421 .expect("'fio' parse error: missing entry 'read.runtime'") 1422 as f64 1423 / 1000_f64; 1424 total_bps += bytes as f64 / runtime as f64; 1425 } 1426 if write { 1427 let bytes = j["write"]["io_bytes"] 1428 .as_u64() 1429 .expect("'fio' parse error: missing entry 'write.io_bytes'"); 1430 let runtime = j["write"]["runtime"] 1431 .as_u64() 1432 .expect("'fio' parse error: missing entry 'write.runtime'") 1433 as f64 1434 / 1000_f64; 1435 total_bps += bytes as f64 / runtime as f64; 1436 } 1437 } 1438 1439 total_bps 1440 }) 1441 .map_err(|_| { 1442 eprintln!( 1443 "=============== Fio output ===============\n\n{}\n\n===========end============\n\n", 1444 output 1445 ); 1446 Error::FioOutputParse 1447 }) 1448 } 1449 1450 pub fn parse_fio_output_iops(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1451 std::panic::catch_unwind(|| { 1452 let v: Value = 1453 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1454 let jobs = v["jobs"] 1455 .as_array() 1456 .expect("'fio' parse error: missing entry 'jobs'"); 1457 assert_eq!( 1458 jobs.len(), 1459 num_jobs as usize, 1460 "'fio' parse error: Unexpected number of 'fio' jobs." 1461 ); 1462 1463 let (read, write) = match fio_ops { 1464 FioOps::Read | FioOps::RandomRead => (true, false), 1465 FioOps::Write | FioOps::RandomWrite => (false, true), 1466 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1467 }; 1468 1469 let mut total_iops = 0_f64; 1470 for j in jobs { 1471 if read { 1472 let ios = j["read"]["total_ios"] 1473 .as_u64() 1474 .expect("'fio' parse error: missing entry 'read.total_ios'"); 1475 let runtime = j["read"]["runtime"] 1476 .as_u64() 1477 .expect("'fio' parse error: missing entry 'read.runtime'") 1478 as f64 1479 / 1000_f64; 1480 total_iops += ios as f64 / runtime as f64; 1481 } 1482 if write { 1483 let ios = j["write"]["total_ios"] 1484 .as_u64() 1485 .expect("'fio' parse error: missing entry 'write.total_ios'"); 1486 let runtime = j["write"]["runtime"] 1487 .as_u64() 1488 .expect("'fio' parse error: missing entry 'write.runtime'") 1489 as f64 1490 / 1000_f64; 1491 total_iops += ios as f64 / runtime as f64; 1492 } 1493 } 1494 1495 total_iops 1496 }) 1497 .map_err(|_| { 1498 eprintln!( 1499 "=============== Fio output ===============\n\n{}\n\n===========end============\n\n", 1500 output 1501 ); 1502 Error::FioOutputParse 1503 }) 1504 } 1505 1506 // Wait the child process for a given timeout 1507 fn child_wait_timeout(child: &mut Child, timeout: u64) -> Result<(), WaitTimeoutError> { 1508 match child.wait_timeout(Duration::from_secs(timeout)) { 1509 Err(e) => { 1510 return Err(WaitTimeoutError::General(e)); 1511 } 1512 Ok(s) => match s { 1513 None => { 1514 return Err(WaitTimeoutError::Timedout); 1515 } 1516 Some(s) => { 1517 if !s.success() { 1518 return Err(WaitTimeoutError::ExitStatus); 1519 } 1520 } 1521 }, 1522 } 1523 1524 Ok(()) 1525 } 1526 1527 pub fn measure_virtio_net_throughput( 1528 test_timeout: u32, 1529 queue_pairs: u32, 1530 guest: &Guest, 1531 receive: bool, 1532 ) -> Result<f64, Error> { 1533 let default_port = 5201; 1534 1535 // 1. start the iperf3 server on the guest 1536 for n in 0..queue_pairs { 1537 guest.ssh_command(&format!("iperf3 -s -p {} -D", default_port + n))?; 1538 } 1539 1540 thread::sleep(Duration::new(1, 0)); 1541 1542 // 2. start the iperf3 client on host to measure RX through-put 1543 let mut clients = Vec::new(); 1544 for n in 0..queue_pairs { 1545 let mut cmd = Command::new("iperf3"); 1546 cmd.args([ 1547 "-J", // Output in JSON format 1548 "-c", 1549 &guest.network.guest_ip, 1550 "-p", 1551 &format!("{}", default_port + n), 1552 "-t", 1553 &format!("{}", test_timeout), 1554 ]); 1555 // For measuring the guest transmit throughput (as a sender), 1556 // use reverse mode of the iperf3 client on the host 1557 if !receive { 1558 cmd.args(["-R"]); 1559 } 1560 let client = cmd 1561 .stderr(Stdio::piped()) 1562 .stdout(Stdio::piped()) 1563 .spawn() 1564 .map_err(Error::Spawn)?; 1565 1566 clients.push(client); 1567 } 1568 1569 let mut err: Option<Error> = None; 1570 let mut bps = Vec::new(); 1571 let mut failed = false; 1572 for c in clients { 1573 let mut c = c; 1574 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5) { 1575 err = Some(Error::WaitTimeout(e)); 1576 failed = true; 1577 } 1578 1579 if !failed { 1580 // Safe to unwrap as we know the child has terminated succesffully 1581 let output = c.wait_with_output().unwrap(); 1582 bps.push(parse_iperf3_output(&output.stdout, receive)?); 1583 } else { 1584 let _ = c.kill(); 1585 let output = c.wait_with_output().unwrap(); 1586 println!( 1587 "=============== Client output [Error] ===============\n\n{}\n\n===========end============\n\n", 1588 String::from_utf8_lossy(&output.stdout) 1589 ); 1590 } 1591 } 1592 1593 if let Some(e) = err { 1594 Err(e) 1595 } else { 1596 Ok(bps.iter().sum()) 1597 } 1598 } 1599 1600 pub fn parse_ethr_latency_output(output: &[u8]) -> Result<Vec<f64>, Error> { 1601 std::panic::catch_unwind(|| { 1602 let s = String::from_utf8_lossy(output); 1603 let mut latency = Vec::new(); 1604 for l in s.lines() { 1605 let v: Value = serde_json::from_str(l).expect("'ethr' parse error: invalid json line"); 1606 // Skip header/summary lines 1607 if let Some(avg) = v["Avg"].as_str() { 1608 // Assume the latency unit is always "us" 1609 latency.push( 1610 avg.split("us").collect::<Vec<&str>>()[0] 1611 .parse::<f64>() 1612 .expect("'ethr' parse error: invalid 'Avg' entry"), 1613 ); 1614 } 1615 } 1616 1617 assert!( 1618 !latency.is_empty(), 1619 "'ethr' parse error: no valid latency data found" 1620 ); 1621 1622 latency 1623 }) 1624 .map_err(|_| { 1625 eprintln!( 1626 "=============== ethr output ===============\n\n{}\n\n===========end============\n\n", 1627 String::from_utf8_lossy(output) 1628 ); 1629 Error::EthrLogParse 1630 }) 1631 } 1632 1633 pub fn measure_virtio_net_latency(guest: &Guest, test_timeout: u32) -> Result<Vec<f64>, Error> { 1634 // copy the 'ethr' tool to the guest image 1635 let ethr_path = "/usr/local/bin/ethr"; 1636 let ethr_remote_path = "/tmp/ethr"; 1637 scp_to_guest( 1638 Path::new(ethr_path), 1639 Path::new(ethr_remote_path), 1640 &guest.network.guest_ip, 1641 //DEFAULT_SSH_RETRIES, 1642 1, 1643 DEFAULT_SSH_TIMEOUT, 1644 )?; 1645 1646 // Start the ethr server on the guest 1647 guest.ssh_command(&format!("{} -s &> /dev/null &", ethr_remote_path))?; 1648 1649 thread::sleep(Duration::new(10, 0)); 1650 1651 // Start the ethr client on the host 1652 let log_file = guest 1653 .tmp_dir 1654 .as_path() 1655 .join("ethr.client.log") 1656 .to_str() 1657 .unwrap() 1658 .to_string(); 1659 let mut c = Command::new(ethr_path) 1660 .args([ 1661 "-c", 1662 &guest.network.guest_ip, 1663 "-t", 1664 "l", 1665 "-o", 1666 &log_file, // file output is JSON format 1667 "-d", 1668 &format!("{}s", test_timeout), 1669 ]) 1670 .stderr(Stdio::piped()) 1671 .stdout(Stdio::piped()) 1672 .spawn() 1673 .map_err(Error::Spawn)?; 1674 1675 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5).map_err(Error::WaitTimeout) 1676 { 1677 let _ = c.kill(); 1678 return Err(e); 1679 } 1680 1681 // Parse the ethr latency test output 1682 let content = fs::read(log_file).map_err(Error::EthrLogFile)?; 1683 parse_ethr_latency_output(&content) 1684 } 1685