1 // Copyright © 2021 Intel Corporation 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 6 #![allow(clippy::undocumented_unsafe_blocks)] 7 8 use once_cell::sync::Lazy; 9 use serde_json::Value; 10 use ssh2::Session; 11 use std::env; 12 use std::ffi::OsStr; 13 use std::io; 14 use std::io::{Read, Write}; 15 use std::net::TcpListener; 16 use std::net::TcpStream; 17 use std::os::unix::fs::PermissionsExt; 18 use std::os::unix::io::{AsRawFd, FromRawFd}; 19 use std::path::Path; 20 use std::process::{Child, Command, ExitStatus, Output, Stdio}; 21 use std::str::FromStr; 22 use std::sync::Mutex; 23 use std::thread; 24 use std::time::Duration; 25 use std::{fmt, fs}; 26 use vmm_sys_util::tempdir::TempDir; 27 use wait_timeout::ChildExt; 28 29 #[derive(Debug)] 30 pub enum WaitTimeoutError { 31 Timedout, 32 ExitStatus, 33 General(std::io::Error), 34 } 35 36 #[derive(Debug)] 37 pub enum Error { 38 Parsing(std::num::ParseIntError), 39 SshCommand(SshCommandError), 40 WaitForBoot(WaitForBootError), 41 EthrLogFile(std::io::Error), 42 EthrLogParse, 43 FioOutputParse, 44 Iperf3Parse, 45 Spawn(std::io::Error), 46 WaitTimeout(WaitTimeoutError), 47 } 48 49 impl From<SshCommandError> for Error { 50 fn from(e: SshCommandError) -> Self { 51 Self::SshCommand(e) 52 } 53 } 54 55 pub struct GuestNetworkConfig { 56 pub guest_ip: String, 57 pub l2_guest_ip1: String, 58 pub l2_guest_ip2: String, 59 pub l2_guest_ip3: String, 60 pub host_ip: String, 61 pub guest_mac: String, 62 pub l2_guest_mac1: String, 63 pub l2_guest_mac2: String, 64 pub l2_guest_mac3: String, 65 pub tcp_listener_port: u16, 66 } 67 68 pub const DEFAULT_TCP_LISTENER_MESSAGE: &str = "booted"; 69 pub const DEFAULT_TCP_LISTENER_PORT: u16 = 8000; 70 pub const DEFAULT_TCP_LISTENER_TIMEOUT: i32 = 120; 71 72 #[derive(Debug)] 73 pub enum WaitForBootError { 74 EpollWait(std::io::Error), 75 Listen(std::io::Error), 76 EpollWaitTimeout, 77 WrongGuestAddr, 78 Accept(std::io::Error), 79 } 80 81 impl GuestNetworkConfig { 82 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), WaitForBootError> { 83 let start = std::time::Instant::now(); 84 // The 'port' is unique per 'GUEST' and listening to wild-card ip avoids retrying on 'TcpListener::bind()' 85 let listen_addr = format!("0.0.0.0:{}", self.tcp_listener_port); 86 let expected_guest_addr = self.guest_ip.as_str(); 87 let mut s = String::new(); 88 let timeout = match custom_timeout { 89 Some(t) => t, 90 None => DEFAULT_TCP_LISTENER_TIMEOUT, 91 }; 92 93 match (|| -> Result<(), WaitForBootError> { 94 let listener = 95 TcpListener::bind(listen_addr.as_str()).map_err(WaitForBootError::Listen)?; 96 listener 97 .set_nonblocking(true) 98 .expect("Cannot set non-blocking for tcp listener"); 99 100 // Reply on epoll w/ timeout to wait for guest connections faithfully 101 let epoll_fd = epoll::create(true).expect("Cannot create epoll fd"); 102 // Use 'File' to enforce closing on 'epoll_fd' 103 let _epoll_file = unsafe { fs::File::from_raw_fd(epoll_fd) }; 104 epoll::ctl( 105 epoll_fd, 106 epoll::ControlOptions::EPOLL_CTL_ADD, 107 listener.as_raw_fd(), 108 epoll::Event::new(epoll::Events::EPOLLIN, 0), 109 ) 110 .expect("Cannot add 'tcp_listener' event to epoll"); 111 let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); 1]; 112 loop { 113 let num_events = match epoll::wait(epoll_fd, timeout * 1000_i32, &mut events[..]) { 114 Ok(num_events) => Ok(num_events), 115 Err(e) => match e.raw_os_error() { 116 Some(libc::EAGAIN) | Some(libc::EINTR) => continue, 117 _ => Err(e), 118 }, 119 } 120 .map_err(WaitForBootError::EpollWait)?; 121 if num_events == 0 { 122 return Err(WaitForBootError::EpollWaitTimeout); 123 } 124 break; 125 } 126 127 match listener.accept() { 128 Ok((_, addr)) => { 129 // Make sure the connection is from the expected 'guest_addr' 130 if addr.ip() != std::net::IpAddr::from_str(expected_guest_addr).unwrap() { 131 s = format!( 132 "Expecting the guest ip '{}' while being connected with ip '{}'", 133 expected_guest_addr, 134 addr.ip() 135 ); 136 return Err(WaitForBootError::WrongGuestAddr); 137 } 138 139 Ok(()) 140 } 141 Err(e) => { 142 s = "TcpListener::accept() failed".to_string(); 143 Err(WaitForBootError::Accept(e)) 144 } 145 } 146 })() { 147 Err(e) => { 148 let duration = start.elapsed(); 149 eprintln!( 150 "\n\n==== Start 'wait_vm_boot' (FAILED) ====\n\n\ 151 duration =\"{:?}, timeout = {}s\"\n\ 152 listen_addr=\"{}\"\n\ 153 expected_guest_addr=\"{}\"\n\ 154 message=\"{}\"\n\ 155 error=\"{:?}\"\n\ 156 \n==== End 'wait_vm_boot' outout ====\n\n", 157 duration, timeout, listen_addr, expected_guest_addr, s, e 158 ); 159 160 Err(e) 161 } 162 Ok(_) => Ok(()), 163 } 164 } 165 } 166 167 pub enum DiskType { 168 OperatingSystem, 169 CloudInit, 170 } 171 172 pub trait DiskConfig { 173 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig); 174 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String; 175 fn disk(&self, disk_type: DiskType) -> Option<String>; 176 } 177 178 #[derive(Clone)] 179 pub struct UbuntuDiskConfig { 180 osdisk_path: String, 181 cloudinit_path: String, 182 image_name: String, 183 } 184 185 impl UbuntuDiskConfig { 186 pub fn new(image_name: String) -> Self { 187 UbuntuDiskConfig { 188 image_name, 189 osdisk_path: String::new(), 190 cloudinit_path: String::new(), 191 } 192 } 193 } 194 195 pub struct WindowsDiskConfig { 196 image_name: String, 197 osdisk_path: String, 198 loopback_device: String, 199 windows_snapshot_cow: String, 200 windows_snapshot: String, 201 } 202 203 impl WindowsDiskConfig { 204 pub fn new(image_name: String) -> Self { 205 WindowsDiskConfig { 206 image_name, 207 osdisk_path: String::new(), 208 loopback_device: String::new(), 209 windows_snapshot_cow: String::new(), 210 windows_snapshot: String::new(), 211 } 212 } 213 } 214 215 impl Drop for WindowsDiskConfig { 216 fn drop(&mut self) { 217 // dmsetup remove windows-snapshot-1 218 std::process::Command::new("dmsetup") 219 .arg("remove") 220 .arg(self.windows_snapshot.as_str()) 221 .output() 222 .expect("Expect removing Windows snapshot with 'dmsetup' to succeed"); 223 224 // dmsetup remove windows-snapshot-cow-1 225 std::process::Command::new("dmsetup") 226 .arg("remove") 227 .arg(self.windows_snapshot_cow.as_str()) 228 .output() 229 .expect("Expect removing Windows snapshot CoW with 'dmsetup' to succeed"); 230 231 // losetup -d <loopback_device> 232 std::process::Command::new("losetup") 233 .args(["-d", self.loopback_device.as_str()]) 234 .output() 235 .expect("Expect removing loopback device to succeed"); 236 } 237 } 238 239 impl DiskConfig for UbuntuDiskConfig { 240 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String { 241 let cloudinit_file_path = 242 String::from(tmp_dir.as_path().join("cloudinit").to_str().unwrap()); 243 244 let cloud_init_directory = tmp_dir.as_path().join("cloud-init").join("ubuntu"); 245 246 fs::create_dir_all(&cloud_init_directory) 247 .expect("Expect creating cloud-init directory to succeed"); 248 249 let source_file_dir = std::env::current_dir() 250 .unwrap() 251 .join("test_data") 252 .join("cloud-init") 253 .join("ubuntu"); 254 255 vec!["meta-data"].iter().for_each(|x| { 256 rate_limited_copy(source_file_dir.join(x), cloud_init_directory.join(x)) 257 .expect("Expect copying cloud-init meta-data to succeed"); 258 }); 259 260 let mut user_data_string = String::new(); 261 fs::File::open(source_file_dir.join("user-data")) 262 .unwrap() 263 .read_to_string(&mut user_data_string) 264 .expect("Expected reading user-data file in to succeed"); 265 user_data_string = user_data_string.replace( 266 "@DEFAULT_TCP_LISTENER_MESSAGE", 267 DEFAULT_TCP_LISTENER_MESSAGE, 268 ); 269 user_data_string = user_data_string.replace("@HOST_IP", &network.host_ip); 270 user_data_string = 271 user_data_string.replace("@TCP_LISTENER_PORT", &network.tcp_listener_port.to_string()); 272 273 fs::File::create(cloud_init_directory.join("user-data")) 274 .unwrap() 275 .write_all(user_data_string.as_bytes()) 276 .expect("Expected writing out user-data to succeed"); 277 278 let mut network_config_string = String::new(); 279 280 fs::File::open(source_file_dir.join("network-config")) 281 .unwrap() 282 .read_to_string(&mut network_config_string) 283 .expect("Expected reading network-config file in to succeed"); 284 285 network_config_string = network_config_string.replace("192.168.2.1", &network.host_ip); 286 network_config_string = network_config_string.replace("192.168.2.2", &network.guest_ip); 287 network_config_string = network_config_string.replace("192.168.2.3", &network.l2_guest_ip1); 288 network_config_string = network_config_string.replace("192.168.2.4", &network.l2_guest_ip2); 289 network_config_string = network_config_string.replace("192.168.2.5", &network.l2_guest_ip3); 290 network_config_string = 291 network_config_string.replace("12:34:56:78:90:ab", &network.guest_mac); 292 network_config_string = 293 network_config_string.replace("de:ad:be:ef:12:34", &network.l2_guest_mac1); 294 network_config_string = 295 network_config_string.replace("de:ad:be:ef:34:56", &network.l2_guest_mac2); 296 network_config_string = 297 network_config_string.replace("de:ad:be:ef:56:78", &network.l2_guest_mac3); 298 299 fs::File::create(cloud_init_directory.join("network-config")) 300 .unwrap() 301 .write_all(network_config_string.as_bytes()) 302 .expect("Expected writing out network-config to succeed"); 303 304 std::process::Command::new("mkdosfs") 305 .args(["-n", "cidata"]) 306 .args(["-C", cloudinit_file_path.as_str()]) 307 .arg("8192") 308 .output() 309 .expect("Expect creating disk image to succeed"); 310 311 vec!["user-data", "meta-data", "network-config"] 312 .iter() 313 .for_each(|x| { 314 std::process::Command::new("mcopy") 315 .arg("-o") 316 .args(["-i", cloudinit_file_path.as_str()]) 317 .args(["-s", cloud_init_directory.join(x).to_str().unwrap(), "::"]) 318 .output() 319 .expect("Expect copying files to disk image to succeed"); 320 }); 321 322 cloudinit_file_path 323 } 324 325 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig) { 326 let mut workload_path = dirs::home_dir().unwrap(); 327 workload_path.push("workloads"); 328 329 let mut osdisk_base_path = workload_path; 330 osdisk_base_path.push(&self.image_name); 331 332 let osdisk_path = String::from(tmp_dir.as_path().join("osdisk.img").to_str().unwrap()); 333 let cloudinit_path = self.prepare_cloudinit(tmp_dir, network); 334 335 rate_limited_copy(osdisk_base_path, &osdisk_path) 336 .expect("copying of OS source disk image failed"); 337 338 self.cloudinit_path = cloudinit_path; 339 self.osdisk_path = osdisk_path; 340 } 341 342 fn disk(&self, disk_type: DiskType) -> Option<String> { 343 match disk_type { 344 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 345 DiskType::CloudInit => Some(self.cloudinit_path.clone()), 346 } 347 } 348 } 349 350 impl DiskConfig for WindowsDiskConfig { 351 fn prepare_cloudinit(&self, _tmp_dir: &TempDir, _network: &GuestNetworkConfig) -> String { 352 String::new() 353 } 354 355 fn prepare_files(&mut self, tmp_dir: &TempDir, _network: &GuestNetworkConfig) { 356 let mut workload_path = dirs::home_dir().unwrap(); 357 workload_path.push("workloads"); 358 359 let mut osdisk_path = workload_path; 360 osdisk_path.push(&self.image_name); 361 362 let osdisk_blk_size = fs::metadata(osdisk_path) 363 .expect("Expect retrieving Windows image metadata") 364 .len() 365 >> 9; 366 367 let snapshot_cow_path = 368 String::from(tmp_dir.as_path().join("snapshot_cow").to_str().unwrap()); 369 370 // Create and truncate CoW file for device mapper 371 let cow_file_size: u64 = 1 << 30; 372 let cow_file_blk_size = cow_file_size >> 9; 373 let cow_file = std::fs::File::create(snapshot_cow_path.as_str()) 374 .expect("Expect creating CoW image to succeed"); 375 cow_file 376 .set_len(cow_file_size) 377 .expect("Expect truncating CoW image to succeed"); 378 379 // losetup --find --show /tmp/snapshot_cow 380 let loopback_device = std::process::Command::new("losetup") 381 .arg("--find") 382 .arg("--show") 383 .arg(snapshot_cow_path.as_str()) 384 .output() 385 .expect("Expect creating loopback device from snapshot CoW image to succeed"); 386 387 self.loopback_device = String::from_utf8_lossy(&loopback_device.stdout) 388 .trim() 389 .to_string(); 390 391 let random_extension = tmp_dir.as_path().file_name().unwrap(); 392 let windows_snapshot_cow = format!( 393 "windows-snapshot-cow-{}", 394 random_extension.to_str().unwrap() 395 ); 396 397 // dmsetup create windows-snapshot-cow-1 --table '0 2097152 linear /dev/loop1 0' 398 std::process::Command::new("dmsetup") 399 .arg("create") 400 .arg(windows_snapshot_cow.as_str()) 401 .args([ 402 "--table", 403 format!("0 {} linear {} 0", cow_file_blk_size, self.loopback_device).as_str(), 404 ]) 405 .output() 406 .expect("Expect creating Windows snapshot CoW with 'dmsetup' to succeed"); 407 408 let windows_snapshot = format!("windows-snapshot-{}", random_extension.to_str().unwrap()); 409 410 // dmsetup mknodes 411 std::process::Command::new("dmsetup") 412 .arg("mknodes") 413 .output() 414 .expect("Expect device mapper nodes to be ready"); 415 416 // dmsetup create windows-snapshot-1 --table '0 41943040 snapshot /dev/mapper/windows-base /dev/mapper/windows-snapshot-cow-1 P 8' 417 std::process::Command::new("dmsetup") 418 .arg("create") 419 .arg(windows_snapshot.as_str()) 420 .args([ 421 "--table", 422 format!( 423 "0 {} snapshot /dev/mapper/windows-base /dev/mapper/{} P 8", 424 osdisk_blk_size, 425 windows_snapshot_cow.as_str() 426 ) 427 .as_str(), 428 ]) 429 .output() 430 .expect("Expect creating Windows snapshot with 'dmsetup' to succeed"); 431 432 // dmsetup mknodes 433 std::process::Command::new("dmsetup") 434 .arg("mknodes") 435 .output() 436 .expect("Expect device mapper nodes to be ready"); 437 438 self.osdisk_path = format!("/dev/mapper/{}", windows_snapshot); 439 self.windows_snapshot_cow = windows_snapshot_cow; 440 self.windows_snapshot = windows_snapshot; 441 } 442 443 fn disk(&self, disk_type: DiskType) -> Option<String> { 444 match disk_type { 445 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 446 DiskType::CloudInit => None, 447 } 448 } 449 } 450 451 pub fn rate_limited_copy<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<u64> { 452 for i in 0..10 { 453 let free_bytes = unsafe { 454 let mut stats = std::mem::MaybeUninit::zeroed(); 455 let fs_name = std::ffi::CString::new("/tmp").unwrap(); 456 libc::statvfs(fs_name.as_ptr(), stats.as_mut_ptr()); 457 458 let free_blocks = stats.assume_init().f_bfree; 459 let block_size = stats.assume_init().f_bsize; 460 461 free_blocks * block_size 462 }; 463 464 // Make sure there is at least 6 GiB of space 465 if free_bytes < 6 << 30 { 466 eprintln!( 467 "Not enough space on disk ({}). Attempt {} of 10. Sleeping.", 468 free_bytes, i 469 ); 470 thread::sleep(std::time::Duration::new(60, 0)); 471 continue; 472 } 473 474 match fs::copy(&from, &to) { 475 Err(e) => { 476 if let Some(errno) = e.raw_os_error() { 477 if errno == libc::ENOSPC { 478 eprintln!("Copy returned ENOSPC. Attempt {} of 10. Sleeping.", i); 479 thread::sleep(std::time::Duration::new(60, 0)); 480 continue; 481 } 482 } 483 return Err(e); 484 } 485 Ok(i) => return Ok(i), 486 } 487 } 488 Err(io::Error::last_os_error()) 489 } 490 491 pub fn handle_child_output( 492 r: Result<(), std::boxed::Box<dyn std::any::Any + std::marker::Send>>, 493 output: &std::process::Output, 494 ) { 495 use std::os::unix::process::ExitStatusExt; 496 if r.is_ok() && output.status.success() { 497 return; 498 } 499 500 match output.status.code() { 501 None => { 502 // Don't treat child.kill() as a problem 503 if output.status.signal() == Some(9) && r.is_ok() { 504 return; 505 } 506 507 eprintln!( 508 "==== child killed by signal: {} ====", 509 output.status.signal().unwrap() 510 ); 511 } 512 Some(code) => { 513 eprintln!("\n\n==== child exit code: {} ====", code); 514 } 515 } 516 517 eprintln!( 518 "\n\n==== Start child stdout ====\n\n{}\n\n==== End child stdout ====", 519 String::from_utf8_lossy(&output.stdout) 520 ); 521 eprintln!( 522 "\n\n==== Start child stderr ====\n\n{}\n\n==== End child stderr ====", 523 String::from_utf8_lossy(&output.stderr) 524 ); 525 526 panic!("Test failed") 527 } 528 529 #[derive(Debug)] 530 pub struct PasswordAuth { 531 pub username: String, 532 pub password: String, 533 } 534 535 pub const DEFAULT_SSH_RETRIES: u8 = 6; 536 pub const DEFAULT_SSH_TIMEOUT: u8 = 10; 537 538 #[derive(Debug)] 539 pub enum SshCommandError { 540 Connection(std::io::Error), 541 Handshake(ssh2::Error), 542 Authentication(ssh2::Error), 543 ChannelSession(ssh2::Error), 544 Command(ssh2::Error), 545 ExitStatus(ssh2::Error), 546 NonZeroExitStatus(i32), 547 FileRead(std::io::Error), 548 FileMetadata(std::io::Error), 549 ScpSend(ssh2::Error), 550 WriteAll(std::io::Error), 551 SendEof(ssh2::Error), 552 WaitEof(ssh2::Error), 553 } 554 555 fn scp_to_guest_with_auth( 556 path: &Path, 557 remote_path: &Path, 558 auth: &PasswordAuth, 559 ip: &str, 560 retries: u8, 561 timeout: u8, 562 ) -> Result<(), SshCommandError> { 563 let mut counter = 0; 564 loop { 565 match (|| -> Result<(), SshCommandError> { 566 let tcp = 567 TcpStream::connect(format!("{}:22", ip)).map_err(SshCommandError::Connection)?; 568 let mut sess = Session::new().unwrap(); 569 sess.set_tcp_stream(tcp); 570 sess.handshake().map_err(SshCommandError::Handshake)?; 571 572 sess.userauth_password(&auth.username, &auth.password) 573 .map_err(SshCommandError::Authentication)?; 574 assert!(sess.authenticated()); 575 576 let content = fs::read(path).map_err(SshCommandError::FileRead)?; 577 let mode = fs::metadata(path) 578 .map_err(SshCommandError::FileMetadata)? 579 .permissions() 580 .mode() 581 & 0o777; 582 583 let mut channel = sess 584 .scp_send(remote_path, mode as i32, content.len() as u64, None) 585 .map_err(SshCommandError::ScpSend)?; 586 channel 587 .write_all(&content) 588 .map_err(SshCommandError::WriteAll)?; 589 channel.send_eof().map_err(SshCommandError::SendEof)?; 590 channel.wait_eof().map_err(SshCommandError::WaitEof)?; 591 592 // Intentionally ignore these results here as their failure 593 // does not precipitate a repeat 594 let _ = channel.close(); 595 let _ = channel.wait_close(); 596 597 Ok(()) 598 })() { 599 Ok(_) => break, 600 Err(e) => { 601 counter += 1; 602 if counter >= retries { 603 eprintln!( 604 "\n\n==== Start scp command output (FAILED) ====\n\n\ 605 path =\"{:?}\"\n\ 606 remote_path =\"{:?}\"\n\ 607 auth=\"{:#?}\"\n\ 608 ip=\"{}\"\n\ 609 error=\"{:?}\"\n\ 610 \n==== End scp command outout ====\n\n", 611 path, remote_path, auth, ip, e 612 ); 613 614 return Err(e); 615 } 616 } 617 }; 618 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 619 } 620 Ok(()) 621 } 622 623 pub fn scp_to_guest( 624 path: &Path, 625 remote_path: &Path, 626 ip: &str, 627 retries: u8, 628 timeout: u8, 629 ) -> Result<(), SshCommandError> { 630 scp_to_guest_with_auth( 631 path, 632 remote_path, 633 &PasswordAuth { 634 username: String::from("cloud"), 635 password: String::from("cloud123"), 636 }, 637 ip, 638 retries, 639 timeout, 640 ) 641 } 642 643 pub fn ssh_command_ip_with_auth( 644 command: &str, 645 auth: &PasswordAuth, 646 ip: &str, 647 retries: u8, 648 timeout: u8, 649 ) -> Result<String, SshCommandError> { 650 let mut s = String::new(); 651 652 let mut counter = 0; 653 loop { 654 match (|| -> Result<(), SshCommandError> { 655 let tcp = 656 TcpStream::connect(format!("{}:22", ip)).map_err(SshCommandError::Connection)?; 657 let mut sess = Session::new().unwrap(); 658 sess.set_tcp_stream(tcp); 659 sess.handshake().map_err(SshCommandError::Handshake)?; 660 661 sess.userauth_password(&auth.username, &auth.password) 662 .map_err(SshCommandError::Authentication)?; 663 assert!(sess.authenticated()); 664 665 let mut channel = sess 666 .channel_session() 667 .map_err(SshCommandError::ChannelSession)?; 668 channel.exec(command).map_err(SshCommandError::Command)?; 669 670 // Intentionally ignore these results here as their failure 671 // does not precipitate a repeat 672 let _ = channel.read_to_string(&mut s); 673 let _ = channel.close(); 674 let _ = channel.wait_close(); 675 676 let status = channel.exit_status().map_err(SshCommandError::ExitStatus)?; 677 678 if status != 0 { 679 Err(SshCommandError::NonZeroExitStatus(status)) 680 } else { 681 Ok(()) 682 } 683 })() { 684 Ok(_) => break, 685 Err(e) => { 686 counter += 1; 687 if counter >= retries { 688 eprintln!( 689 "\n\n==== Start ssh command output (FAILED) ====\n\n\ 690 command=\"{}\"\n\ 691 auth=\"{:#?}\"\n\ 692 ip=\"{}\"\n\ 693 output=\"{}\"\n\ 694 error=\"{:?}\"\n\ 695 \n==== End ssh command outout ====\n\n", 696 command, auth, ip, s, e 697 ); 698 699 return Err(e); 700 } 701 } 702 }; 703 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 704 } 705 Ok(s) 706 } 707 708 pub fn ssh_command_ip( 709 command: &str, 710 ip: &str, 711 retries: u8, 712 timeout: u8, 713 ) -> Result<String, SshCommandError> { 714 ssh_command_ip_with_auth( 715 command, 716 &PasswordAuth { 717 username: String::from("cloud"), 718 password: String::from("cloud123"), 719 }, 720 ip, 721 retries, 722 timeout, 723 ) 724 } 725 726 pub fn exec_host_command_status(command: &str) -> ExitStatus { 727 std::process::Command::new("bash") 728 .args(["-c", command]) 729 .status() 730 .unwrap_or_else(|_| panic!("Expected '{}' to run", command)) 731 } 732 733 pub fn exec_host_command_output(command: &str) -> Output { 734 std::process::Command::new("bash") 735 .args(["-c", command]) 736 .output() 737 .unwrap_or_else(|_| panic!("Expected '{}' to run", command)) 738 } 739 740 pub const PIPE_SIZE: i32 = 32 << 20; 741 742 static NEXT_VM_ID: Lazy<Mutex<u8>> = Lazy::new(|| Mutex::new(1)); 743 744 pub struct Guest { 745 pub tmp_dir: TempDir, 746 pub disk_config: Box<dyn DiskConfig>, 747 pub network: GuestNetworkConfig, 748 } 749 750 // Safe to implement as we know we have no interior mutability 751 impl std::panic::RefUnwindSafe for Guest {} 752 753 impl Guest { 754 pub fn new_from_ip_range(mut disk_config: Box<dyn DiskConfig>, class: &str, id: u8) -> Self { 755 let tmp_dir = TempDir::new_with_prefix("/tmp/ch").unwrap(); 756 757 let network = GuestNetworkConfig { 758 guest_ip: format!("{}.{}.2", class, id), 759 l2_guest_ip1: format!("{}.{}.3", class, id), 760 l2_guest_ip2: format!("{}.{}.4", class, id), 761 l2_guest_ip3: format!("{}.{}.5", class, id), 762 host_ip: format!("{}.{}.1", class, id), 763 guest_mac: format!("12:34:56:78:90:{:02x}", id), 764 l2_guest_mac1: format!("de:ad:be:ef:12:{:02x}", id), 765 l2_guest_mac2: format!("de:ad:be:ef:34:{:02x}", id), 766 l2_guest_mac3: format!("de:ad:be:ef:56:{:02x}", id), 767 tcp_listener_port: DEFAULT_TCP_LISTENER_PORT + id as u16, 768 }; 769 770 disk_config.prepare_files(&tmp_dir, &network); 771 772 Guest { 773 tmp_dir, 774 disk_config, 775 network, 776 } 777 } 778 779 pub fn new(disk_config: Box<dyn DiskConfig>) -> Self { 780 let mut guard = NEXT_VM_ID.lock().unwrap(); 781 let id = *guard; 782 *guard = id + 1; 783 784 Self::new_from_ip_range(disk_config, "192.168", id) 785 } 786 787 pub fn default_net_string(&self) -> String { 788 format!( 789 "tap=,mac={},ip={},mask=255.255.255.0", 790 self.network.guest_mac, self.network.host_ip 791 ) 792 } 793 794 pub fn default_net_string_w_iommu(&self) -> String { 795 format!( 796 "tap=,mac={},ip={},mask=255.255.255.0,iommu=on", 797 self.network.guest_mac, self.network.host_ip 798 ) 799 } 800 801 pub fn default_net_string_w_mtu(&self, mtu: u16) -> String { 802 format!( 803 "tap=,mac={},ip={},mask=255.255.255.0,mtu={}", 804 self.network.guest_mac, self.network.host_ip, mtu 805 ) 806 } 807 808 pub fn ssh_command(&self, command: &str) -> Result<String, SshCommandError> { 809 ssh_command_ip( 810 command, 811 &self.network.guest_ip, 812 DEFAULT_SSH_RETRIES, 813 DEFAULT_SSH_TIMEOUT, 814 ) 815 } 816 817 #[cfg(target_arch = "x86_64")] 818 pub fn ssh_command_l1(&self, command: &str) -> Result<String, SshCommandError> { 819 ssh_command_ip( 820 command, 821 &self.network.guest_ip, 822 DEFAULT_SSH_RETRIES, 823 DEFAULT_SSH_TIMEOUT, 824 ) 825 } 826 827 #[cfg(target_arch = "x86_64")] 828 pub fn ssh_command_l2_1(&self, command: &str) -> Result<String, SshCommandError> { 829 ssh_command_ip( 830 command, 831 &self.network.l2_guest_ip1, 832 DEFAULT_SSH_RETRIES, 833 DEFAULT_SSH_TIMEOUT, 834 ) 835 } 836 837 #[cfg(target_arch = "x86_64")] 838 pub fn ssh_command_l2_2(&self, command: &str) -> Result<String, SshCommandError> { 839 ssh_command_ip( 840 command, 841 &self.network.l2_guest_ip2, 842 DEFAULT_SSH_RETRIES, 843 DEFAULT_SSH_TIMEOUT, 844 ) 845 } 846 847 #[cfg(target_arch = "x86_64")] 848 pub fn ssh_command_l2_3(&self, command: &str) -> Result<String, SshCommandError> { 849 ssh_command_ip( 850 command, 851 &self.network.l2_guest_ip3, 852 DEFAULT_SSH_RETRIES, 853 DEFAULT_SSH_TIMEOUT, 854 ) 855 } 856 857 pub fn api_create_body(&self, cpu_count: u8, kernel_path: &str, kernel_cmd: &str) -> String { 858 format! {"{{\"cpus\":{{\"boot_vcpus\":{},\"max_vcpus\":{}}},\"payload\":{{\"kernel\":\"{}\",\"cmdline\": \"{}\"}},\"net\":[{{\"ip\":\"{}\", \"mask\":\"255.255.255.0\", \"mac\":\"{}\"}}], \"disks\":[{{\"path\":\"{}\"}}, {{\"path\":\"{}\"}}]}}", 859 cpu_count, 860 cpu_count, 861 kernel_path, 862 kernel_cmd, 863 self.network.host_ip, 864 self.network.guest_mac, 865 self.disk_config.disk(DiskType::OperatingSystem).unwrap().as_str(), 866 self.disk_config.disk(DiskType::CloudInit).unwrap().as_str(), 867 } 868 } 869 870 pub fn get_cpu_count(&self) -> Result<u32, Error> { 871 self.ssh_command("grep -c processor /proc/cpuinfo")? 872 .trim() 873 .parse() 874 .map_err(Error::Parsing) 875 } 876 877 #[cfg(target_arch = "x86_64")] 878 pub fn get_initial_apicid(&self) -> Result<u32, Error> { 879 self.ssh_command("grep \"initial apicid\" /proc/cpuinfo | grep -o \"[0-9]*\"")? 880 .trim() 881 .parse() 882 .map_err(Error::Parsing) 883 } 884 885 pub fn get_total_memory(&self) -> Result<u32, Error> { 886 self.ssh_command("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 887 .trim() 888 .parse() 889 .map_err(Error::Parsing) 890 } 891 892 #[cfg(target_arch = "x86_64")] 893 pub fn get_total_memory_l2(&self) -> Result<u32, Error> { 894 self.ssh_command_l2_1("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 895 .trim() 896 .parse() 897 .map_err(Error::Parsing) 898 } 899 900 pub fn get_numa_node_memory(&self, node_id: usize) -> Result<u32, Error> { 901 self.ssh_command( 902 format!( 903 "grep MemTotal /sys/devices/system/node/node{}/meminfo \ 904 | cut -d \":\" -f 2 | grep -o \"[0-9]*\"", 905 node_id 906 ) 907 .as_str(), 908 )? 909 .trim() 910 .parse() 911 .map_err(Error::Parsing) 912 } 913 914 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), Error> { 915 self.network 916 .wait_vm_boot(custom_timeout) 917 .map_err(Error::WaitForBoot) 918 } 919 920 pub fn check_numa_node_cpus(&self, node_id: usize, cpus: Vec<usize>) -> Result<(), Error> { 921 for cpu in cpus.iter() { 922 let cmd = format!( 923 "[ -d \"/sys/devices/system/node/node{}/cpu{}\" ]", 924 node_id, cpu 925 ); 926 self.ssh_command(cmd.as_str())?; 927 } 928 929 Ok(()) 930 } 931 932 pub fn check_numa_node_distances( 933 &self, 934 node_id: usize, 935 distances: &str, 936 ) -> Result<bool, Error> { 937 let cmd = format!("cat /sys/devices/system/node/node{}/distance", node_id); 938 if self.ssh_command(cmd.as_str())?.trim() == distances { 939 Ok(true) 940 } else { 941 Ok(false) 942 } 943 } 944 945 pub fn check_numa_common( 946 &self, 947 mem_ref: Option<&[u32]>, 948 node_ref: Option<&[Vec<usize>]>, 949 distance_ref: Option<&[&str]>, 950 ) { 951 if let Some(mem_ref) = mem_ref { 952 // Check each NUMA node has been assigned the right amount of 953 // memory. 954 for (i, &m) in mem_ref.iter().enumerate() { 955 assert!(self.get_numa_node_memory(i).unwrap_or_default() > m); 956 } 957 } 958 959 if let Some(node_ref) = node_ref { 960 // Check each NUMA node has been assigned the right CPUs set. 961 for (i, n) in node_ref.iter().enumerate() { 962 self.check_numa_node_cpus(i, n.clone()).unwrap(); 963 } 964 } 965 966 if let Some(distance_ref) = distance_ref { 967 // Check each NUMA node has been assigned the right distances. 968 for (i, &d) in distance_ref.iter().enumerate() { 969 assert!(self.check_numa_node_distances(i, d).unwrap()); 970 } 971 } 972 } 973 974 #[cfg(target_arch = "x86_64")] 975 pub fn check_sgx_support(&self) -> Result<(), Error> { 976 self.ssh_command( 977 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX: \ 978 Software Guard Extensions supported = true'", 979 )?; 980 self.ssh_command( 981 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX_LC: \ 982 SGX launch config supported = true'", 983 )?; 984 self.ssh_command( 985 "cpuid -l 0x12 -s 0 | tr -s [:space:] | grep -q 'SGX1 \ 986 supported = true'", 987 )?; 988 989 Ok(()) 990 } 991 992 pub fn get_pci_bridge_class(&self) -> Result<String, Error> { 993 Ok(self 994 .ssh_command("cat /sys/bus/pci/devices/0000:00:00.0/class")? 995 .trim() 996 .to_string()) 997 } 998 999 pub fn get_pci_device_ids(&self) -> Result<String, Error> { 1000 Ok(self 1001 .ssh_command("cat /sys/bus/pci/devices/*/device")? 1002 .trim() 1003 .to_string()) 1004 } 1005 1006 pub fn get_pci_vendor_ids(&self) -> Result<String, Error> { 1007 Ok(self 1008 .ssh_command("cat /sys/bus/pci/devices/*/vendor")? 1009 .trim() 1010 .to_string()) 1011 } 1012 1013 pub fn does_device_vendor_pair_match( 1014 &self, 1015 device_id: &str, 1016 vendor_id: &str, 1017 ) -> Result<bool, Error> { 1018 // We are checking if console device's device id and vendor id pair matches 1019 let devices = self.get_pci_device_ids()?; 1020 let devices: Vec<&str> = devices.split('\n').collect(); 1021 let vendors = self.get_pci_vendor_ids()?; 1022 let vendors: Vec<&str> = vendors.split('\n').collect(); 1023 1024 for (index, d_id) in devices.iter().enumerate() { 1025 if *d_id == device_id { 1026 if let Some(v_id) = vendors.get(index) { 1027 if *v_id == vendor_id { 1028 return Ok(true); 1029 } 1030 } 1031 } 1032 } 1033 1034 Ok(false) 1035 } 1036 1037 pub fn check_vsock(&self, socket: &str) { 1038 // Listen from guest on vsock CID=3 PORT=16 1039 // SOCKET-LISTEN:<domain>:<protocol>:<local-address> 1040 let guest_ip = self.network.guest_ip.clone(); 1041 let listen_socat = thread::spawn(move || { 1042 ssh_command_ip("sudo socat - SOCKET-LISTEN:40:0:x00x00x10x00x00x00x03x00x00x00x00x00x00x00 > vsock_log", &guest_ip, DEFAULT_SSH_RETRIES, DEFAULT_SSH_TIMEOUT).unwrap(); 1043 }); 1044 1045 // Make sure socat is listening, which might take a few second on slow systems 1046 thread::sleep(std::time::Duration::new(10, 0)); 1047 1048 // Write something to vsock from the host 1049 assert!(exec_host_command_status(&format!( 1050 "echo -e \"CONNECT 16\\nHelloWorld!\" | socat - UNIX-CONNECT:{}", 1051 socket 1052 )) 1053 .success()); 1054 1055 // Wait for the thread to terminate. 1056 listen_socat.join().unwrap(); 1057 1058 assert_eq!( 1059 self.ssh_command("cat vsock_log").unwrap().trim(), 1060 "HelloWorld!" 1061 ); 1062 } 1063 1064 #[cfg(target_arch = "x86_64")] 1065 pub fn check_nvidia_gpu(&self) { 1066 assert!(self.ssh_command("nvidia-smi").unwrap().contains("Tesla T4")); 1067 } 1068 1069 pub fn reboot_linux(&self, current_reboot_count: u32, custom_timeout: Option<i32>) { 1070 let list_boots_cmd = "sudo last | grep -c reboot"; 1071 let boot_count = self 1072 .ssh_command(list_boots_cmd) 1073 .unwrap() 1074 .trim() 1075 .parse::<u32>() 1076 .unwrap_or_default(); 1077 1078 assert_eq!(boot_count, current_reboot_count + 1); 1079 self.ssh_command("sudo reboot").unwrap(); 1080 1081 self.wait_vm_boot(custom_timeout).unwrap(); 1082 let boot_count = self 1083 .ssh_command(list_boots_cmd) 1084 .unwrap() 1085 .trim() 1086 .parse::<u32>() 1087 .unwrap_or_default(); 1088 assert_eq!(boot_count, current_reboot_count + 2); 1089 } 1090 1091 pub fn enable_memory_hotplug(&self) { 1092 self.ssh_command("echo online | sudo tee /sys/devices/system/memory/auto_online_blocks") 1093 .unwrap(); 1094 } 1095 1096 pub fn check_devices_common( 1097 &self, 1098 socket: Option<&String>, 1099 console_text: Option<&String>, 1100 pmem_path: Option<&String>, 1101 ) { 1102 // Check block devices are readable 1103 self.ssh_command("sudo dd if=/dev/vda of=/dev/null bs=1M iflag=direct count=1024") 1104 .unwrap(); 1105 self.ssh_command("sudo dd if=/dev/vdb of=/dev/null bs=1M iflag=direct count=8") 1106 .unwrap(); 1107 // Check if the rng device is readable 1108 self.ssh_command("sudo head -c 1000 /dev/hwrng > /dev/null") 1109 .unwrap(); 1110 // Check vsock 1111 if let Some(socket) = socket { 1112 self.check_vsock(socket.as_str()); 1113 } 1114 // Check if the console is usable 1115 if let Some(console_text) = console_text { 1116 let console_cmd = format!("echo {} | sudo tee /dev/hvc0", console_text); 1117 self.ssh_command(&console_cmd).unwrap(); 1118 } 1119 // The net device is 'automatically' exercised through the above 'ssh' commands 1120 1121 // Check if the pmem device is usable 1122 if let Some(pmem_path) = pmem_path { 1123 assert_eq!( 1124 self.ssh_command(&format!("ls {}", pmem_path)) 1125 .unwrap() 1126 .trim(), 1127 pmem_path 1128 ); 1129 assert_eq!( 1130 self.ssh_command(&format!("sudo mount {} /mnt", pmem_path)) 1131 .unwrap(), 1132 "" 1133 ); 1134 assert_eq!(self.ssh_command("ls /mnt").unwrap(), "lost+found\n"); 1135 self.ssh_command("echo test123 | sudo tee /mnt/test") 1136 .unwrap(); 1137 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1138 assert_eq!(self.ssh_command("ls /mnt").unwrap(), ""); 1139 1140 assert_eq!( 1141 self.ssh_command(&format!("sudo mount {} /mnt", pmem_path)) 1142 .unwrap(), 1143 "" 1144 ); 1145 assert_eq!( 1146 self.ssh_command("sudo cat /mnt/test || true") 1147 .unwrap() 1148 .trim(), 1149 "test123" 1150 ); 1151 self.ssh_command("sudo rm /mnt/test").unwrap(); 1152 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1153 } 1154 } 1155 } 1156 1157 pub enum VerbosityLevel { 1158 Warn, 1159 Info, 1160 Debug, 1161 } 1162 1163 impl Default for VerbosityLevel { 1164 fn default() -> Self { 1165 Self::Warn 1166 } 1167 } 1168 1169 impl ToString for VerbosityLevel { 1170 fn to_string(&self) -> String { 1171 use VerbosityLevel::*; 1172 match self { 1173 Warn => "".to_string(), 1174 Info => "-v".to_string(), 1175 Debug => "-vv".to_string(), 1176 } 1177 } 1178 } 1179 1180 pub struct GuestCommand<'a> { 1181 command: Command, 1182 guest: &'a Guest, 1183 capture_output: bool, 1184 print_cmd: bool, 1185 verbosity: VerbosityLevel, 1186 } 1187 1188 impl<'a> GuestCommand<'a> { 1189 pub fn new(guest: &'a Guest) -> Self { 1190 Self::new_with_binary_path(guest, &clh_command("cloud-hypervisor")) 1191 } 1192 1193 pub fn new_with_binary_path(guest: &'a Guest, binary_path: &str) -> Self { 1194 Self { 1195 command: Command::new(binary_path), 1196 guest, 1197 capture_output: false, 1198 print_cmd: true, 1199 verbosity: VerbosityLevel::Info, 1200 } 1201 } 1202 1203 pub fn verbosity(&mut self, verbosity: VerbosityLevel) -> &mut Self { 1204 self.verbosity = verbosity; 1205 self 1206 } 1207 1208 pub fn capture_output(&mut self) -> &mut Self { 1209 self.capture_output = true; 1210 self 1211 } 1212 1213 pub fn set_print_cmd(&mut self, print_cmd: bool) -> &mut Self { 1214 self.print_cmd = print_cmd; 1215 self 1216 } 1217 1218 pub fn spawn(&mut self) -> io::Result<Child> { 1219 use VerbosityLevel::*; 1220 match &self.verbosity { 1221 Warn => {} 1222 Info => { 1223 self.command.arg("-v"); 1224 } 1225 Debug => { 1226 self.command.arg("-vv"); 1227 } 1228 }; 1229 1230 if self.print_cmd { 1231 println!( 1232 "\n\n==== Start cloud-hypervisor command-line ====\n\n\ 1233 {:?}\n\ 1234 \n==== End cloud-hypervisor command-line ====\n\n", 1235 self.command 1236 ); 1237 } 1238 1239 if self.capture_output { 1240 let child = self 1241 .command 1242 .stderr(Stdio::piped()) 1243 .stdout(Stdio::piped()) 1244 .spawn() 1245 .unwrap(); 1246 1247 let fd = child.stdout.as_ref().unwrap().as_raw_fd(); 1248 let pipesize = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1249 if pipesize == -1 { 1250 return Err(io::Error::last_os_error()); 1251 } 1252 let fd = child.stderr.as_ref().unwrap().as_raw_fd(); 1253 let pipesize1 = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1254 if pipesize1 == -1 { 1255 return Err(io::Error::last_os_error()); 1256 } 1257 1258 if pipesize >= PIPE_SIZE && pipesize1 >= PIPE_SIZE { 1259 Ok(child) 1260 } else { 1261 Err(std::io::Error::new( 1262 std::io::ErrorKind::Other, 1263 format!( 1264 "resizing pipe w/ 'fnctl' failed: stdout pipesize {}, stderr pipesize {}", 1265 pipesize, pipesize1 1266 ), 1267 )) 1268 } 1269 } else { 1270 self.command.spawn() 1271 } 1272 } 1273 1274 pub fn args<I, S>(&mut self, args: I) -> &mut Self 1275 where 1276 I: IntoIterator<Item = S>, 1277 S: AsRef<OsStr>, 1278 { 1279 self.command.args(args); 1280 self 1281 } 1282 1283 pub fn default_disks(&mut self) -> &mut Self { 1284 if self.guest.disk_config.disk(DiskType::CloudInit).is_some() { 1285 self.args([ 1286 "--disk", 1287 format!( 1288 "path={}", 1289 self.guest 1290 .disk_config 1291 .disk(DiskType::OperatingSystem) 1292 .unwrap() 1293 ) 1294 .as_str(), 1295 format!( 1296 "path={}", 1297 self.guest.disk_config.disk(DiskType::CloudInit).unwrap() 1298 ) 1299 .as_str(), 1300 ]) 1301 } else { 1302 self.args([ 1303 "--disk", 1304 format!( 1305 "path={}", 1306 self.guest 1307 .disk_config 1308 .disk(DiskType::OperatingSystem) 1309 .unwrap() 1310 ) 1311 .as_str(), 1312 ]) 1313 } 1314 } 1315 1316 pub fn default_net(&mut self) -> &mut Self { 1317 self.args(["--net", self.guest.default_net_string().as_str()]) 1318 } 1319 } 1320 1321 pub fn clh_command(cmd: &str) -> String { 1322 env::var("BUILD_TARGET").map_or( 1323 format!("target/x86_64-unknown-linux-gnu/release/{}", cmd), 1324 |target| format!("target/{}/release/{}", target, cmd), 1325 ) 1326 } 1327 1328 pub fn parse_iperf3_output(output: &[u8], sender: bool) -> Result<f64, Error> { 1329 std::panic::catch_unwind(|| { 1330 let s = String::from_utf8_lossy(output); 1331 let v: Value = serde_json::from_str(&s).expect("'iperf3' parse error: invalid json output"); 1332 1333 let bps: f64 = if sender { 1334 v["end"]["sum_sent"]["bits_per_second"] 1335 .as_f64() 1336 .expect("'iperf3' parse error: missing entry 'end.sum_sent.bits_per_second'") 1337 } else { 1338 v["end"]["sum_received"]["bits_per_second"] 1339 .as_f64() 1340 .expect("'iperf3' parse error: missing entry 'end.sum_received.bits_per_second'") 1341 }; 1342 1343 bps 1344 }) 1345 .map_err(|_| { 1346 eprintln!( 1347 "=============== iperf3 output ===============\n\n{}\n\n===========end============\n\n", 1348 String::from_utf8_lossy(output) 1349 ); 1350 Error::Iperf3Parse 1351 }) 1352 } 1353 1354 pub enum FioOps { 1355 Read, 1356 RandomRead, 1357 Write, 1358 RandomWrite, 1359 ReadWrite, 1360 RandRW, 1361 } 1362 1363 impl fmt::Display for FioOps { 1364 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1365 match self { 1366 FioOps::Read => write!(f, "read"), 1367 FioOps::RandomRead => write!(f, "randread"), 1368 FioOps::Write => write!(f, "write"), 1369 FioOps::RandomWrite => write!(f, "randwrite"), 1370 FioOps::ReadWrite => write!(f, "rw"), 1371 FioOps::RandRW => write!(f, "randrw"), 1372 } 1373 } 1374 } 1375 1376 pub fn parse_fio_output(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1377 std::panic::catch_unwind(|| { 1378 let v: Value = 1379 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1380 let jobs = v["jobs"] 1381 .as_array() 1382 .expect("'fio' parse error: missing entry 'jobs'"); 1383 assert_eq!( 1384 jobs.len(), 1385 num_jobs as usize, 1386 "'fio' parse error: Unexpected number of 'fio' jobs." 1387 ); 1388 1389 let (read, write) = match fio_ops { 1390 FioOps::Read | FioOps::RandomRead => (true, false), 1391 FioOps::Write | FioOps::RandomWrite => (false, true), 1392 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1393 }; 1394 1395 let mut total_bps = 0_f64; 1396 for j in jobs { 1397 if read { 1398 let bytes = j["read"]["io_bytes"] 1399 .as_u64() 1400 .expect("'fio' parse error: missing entry 'read.io_bytes'"); 1401 let runtime = j["read"]["runtime"] 1402 .as_u64() 1403 .expect("'fio' parse error: missing entry 'read.runtime'") 1404 as f64 1405 / 1000_f64; 1406 total_bps += bytes as f64 / runtime; 1407 } 1408 if write { 1409 let bytes = j["write"]["io_bytes"] 1410 .as_u64() 1411 .expect("'fio' parse error: missing entry 'write.io_bytes'"); 1412 let runtime = j["write"]["runtime"] 1413 .as_u64() 1414 .expect("'fio' parse error: missing entry 'write.runtime'") 1415 as f64 1416 / 1000_f64; 1417 total_bps += bytes as f64 / runtime; 1418 } 1419 } 1420 1421 total_bps 1422 }) 1423 .map_err(|_| { 1424 eprintln!( 1425 "=============== Fio output ===============\n\n{}\n\n===========end============\n\n", 1426 output 1427 ); 1428 Error::FioOutputParse 1429 }) 1430 } 1431 1432 pub fn parse_fio_output_iops(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1433 std::panic::catch_unwind(|| { 1434 let v: Value = 1435 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1436 let jobs = v["jobs"] 1437 .as_array() 1438 .expect("'fio' parse error: missing entry 'jobs'"); 1439 assert_eq!( 1440 jobs.len(), 1441 num_jobs as usize, 1442 "'fio' parse error: Unexpected number of 'fio' jobs." 1443 ); 1444 1445 let (read, write) = match fio_ops { 1446 FioOps::Read | FioOps::RandomRead => (true, false), 1447 FioOps::Write | FioOps::RandomWrite => (false, true), 1448 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1449 }; 1450 1451 let mut total_iops = 0_f64; 1452 for j in jobs { 1453 if read { 1454 let ios = j["read"]["total_ios"] 1455 .as_u64() 1456 .expect("'fio' parse error: missing entry 'read.total_ios'"); 1457 let runtime = j["read"]["runtime"] 1458 .as_u64() 1459 .expect("'fio' parse error: missing entry 'read.runtime'") 1460 as f64 1461 / 1000_f64; 1462 total_iops += ios as f64 / runtime; 1463 } 1464 if write { 1465 let ios = j["write"]["total_ios"] 1466 .as_u64() 1467 .expect("'fio' parse error: missing entry 'write.total_ios'"); 1468 let runtime = j["write"]["runtime"] 1469 .as_u64() 1470 .expect("'fio' parse error: missing entry 'write.runtime'") 1471 as f64 1472 / 1000_f64; 1473 total_iops += ios as f64 / runtime; 1474 } 1475 } 1476 1477 total_iops 1478 }) 1479 .map_err(|_| { 1480 eprintln!( 1481 "=============== Fio output ===============\n\n{}\n\n===========end============\n\n", 1482 output 1483 ); 1484 Error::FioOutputParse 1485 }) 1486 } 1487 1488 // Wait the child process for a given timeout 1489 fn child_wait_timeout(child: &mut Child, timeout: u64) -> Result<(), WaitTimeoutError> { 1490 match child.wait_timeout(Duration::from_secs(timeout)) { 1491 Err(e) => { 1492 return Err(WaitTimeoutError::General(e)); 1493 } 1494 Ok(s) => match s { 1495 None => { 1496 return Err(WaitTimeoutError::Timedout); 1497 } 1498 Some(s) => { 1499 if !s.success() { 1500 return Err(WaitTimeoutError::ExitStatus); 1501 } 1502 } 1503 }, 1504 } 1505 1506 Ok(()) 1507 } 1508 1509 pub fn measure_virtio_net_throughput( 1510 test_timeout: u32, 1511 queue_pairs: u32, 1512 guest: &Guest, 1513 receive: bool, 1514 ) -> Result<f64, Error> { 1515 let default_port = 5201; 1516 1517 // 1. start the iperf3 server on the guest 1518 for n in 0..queue_pairs { 1519 guest.ssh_command(&format!("iperf3 -s -p {} -D", default_port + n))?; 1520 } 1521 1522 thread::sleep(Duration::new(1, 0)); 1523 1524 // 2. start the iperf3 client on host to measure RX through-put 1525 let mut clients = Vec::new(); 1526 for n in 0..queue_pairs { 1527 let mut cmd = Command::new("iperf3"); 1528 cmd.args([ 1529 "-J", // Output in JSON format 1530 "-c", 1531 &guest.network.guest_ip, 1532 "-p", 1533 &format!("{}", default_port + n), 1534 "-t", 1535 &format!("{}", test_timeout), 1536 ]); 1537 // For measuring the guest transmit throughput (as a sender), 1538 // use reverse mode of the iperf3 client on the host 1539 if !receive { 1540 cmd.args(["-R"]); 1541 } 1542 let client = cmd 1543 .stderr(Stdio::piped()) 1544 .stdout(Stdio::piped()) 1545 .spawn() 1546 .map_err(Error::Spawn)?; 1547 1548 clients.push(client); 1549 } 1550 1551 let mut err: Option<Error> = None; 1552 let mut bps = Vec::new(); 1553 let mut failed = false; 1554 for c in clients { 1555 let mut c = c; 1556 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5) { 1557 err = Some(Error::WaitTimeout(e)); 1558 failed = true; 1559 } 1560 1561 if !failed { 1562 // Safe to unwrap as we know the child has terminated succesffully 1563 let output = c.wait_with_output().unwrap(); 1564 bps.push(parse_iperf3_output(&output.stdout, receive)?); 1565 } else { 1566 let _ = c.kill(); 1567 let output = c.wait_with_output().unwrap(); 1568 println!( 1569 "=============== Client output [Error] ===============\n\n{}\n\n===========end============\n\n", 1570 String::from_utf8_lossy(&output.stdout) 1571 ); 1572 } 1573 } 1574 1575 if let Some(e) = err { 1576 Err(e) 1577 } else { 1578 Ok(bps.iter().sum()) 1579 } 1580 } 1581 1582 pub fn parse_ethr_latency_output(output: &[u8]) -> Result<Vec<f64>, Error> { 1583 std::panic::catch_unwind(|| { 1584 let s = String::from_utf8_lossy(output); 1585 let mut latency = Vec::new(); 1586 for l in s.lines() { 1587 let v: Value = serde_json::from_str(l).expect("'ethr' parse error: invalid json line"); 1588 // Skip header/summary lines 1589 if let Some(avg) = v["Avg"].as_str() { 1590 // Assume the latency unit is always "us" 1591 latency.push( 1592 avg.split("us").collect::<Vec<&str>>()[0] 1593 .parse::<f64>() 1594 .expect("'ethr' parse error: invalid 'Avg' entry"), 1595 ); 1596 } 1597 } 1598 1599 assert!( 1600 !latency.is_empty(), 1601 "'ethr' parse error: no valid latency data found" 1602 ); 1603 1604 latency 1605 }) 1606 .map_err(|_| { 1607 eprintln!( 1608 "=============== ethr output ===============\n\n{}\n\n===========end============\n\n", 1609 String::from_utf8_lossy(output) 1610 ); 1611 Error::EthrLogParse 1612 }) 1613 } 1614 1615 pub fn measure_virtio_net_latency(guest: &Guest, test_timeout: u32) -> Result<Vec<f64>, Error> { 1616 // copy the 'ethr' tool to the guest image 1617 let ethr_path = "/usr/local/bin/ethr"; 1618 let ethr_remote_path = "/tmp/ethr"; 1619 scp_to_guest( 1620 Path::new(ethr_path), 1621 Path::new(ethr_remote_path), 1622 &guest.network.guest_ip, 1623 //DEFAULT_SSH_RETRIES, 1624 1, 1625 DEFAULT_SSH_TIMEOUT, 1626 )?; 1627 1628 // Start the ethr server on the guest 1629 guest.ssh_command(&format!("{} -s &> /dev/null &", ethr_remote_path))?; 1630 1631 thread::sleep(Duration::new(10, 0)); 1632 1633 // Start the ethr client on the host 1634 let log_file = guest 1635 .tmp_dir 1636 .as_path() 1637 .join("ethr.client.log") 1638 .to_str() 1639 .unwrap() 1640 .to_string(); 1641 let mut c = Command::new(ethr_path) 1642 .args([ 1643 "-c", 1644 &guest.network.guest_ip, 1645 "-t", 1646 "l", 1647 "-o", 1648 &log_file, // file output is JSON format 1649 "-d", 1650 &format!("{}s", test_timeout), 1651 ]) 1652 .stderr(Stdio::piped()) 1653 .stdout(Stdio::piped()) 1654 .spawn() 1655 .map_err(Error::Spawn)?; 1656 1657 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5).map_err(Error::WaitTimeout) 1658 { 1659 let _ = c.kill(); 1660 return Err(e); 1661 } 1662 1663 // Parse the ethr latency test output 1664 let content = fs::read(log_file).map_err(Error::EthrLogFile)?; 1665 parse_ethr_latency_output(&content) 1666 } 1667