1 // Copyright © 2021 Intel Corporation 2 // 3 // SPDX-License-Identifier: Apache-2.0 4 // 5 6 #![allow(clippy::undocumented_unsafe_blocks)] 7 8 use std::ffi::OsStr; 9 use std::fmt::Display; 10 use std::io::{Read, Write}; 11 use std::net::{TcpListener, TcpStream}; 12 use std::os::unix::fs::PermissionsExt; 13 use std::os::unix::io::{AsRawFd, FromRawFd}; 14 use std::path::Path; 15 use std::process::{Child, Command, ExitStatus, Output, Stdio}; 16 use std::str::FromStr; 17 use std::sync::Mutex; 18 use std::time::Duration; 19 use std::{env, fmt, fs, io, thread}; 20 21 use once_cell::sync::Lazy; 22 use serde_json::Value; 23 use ssh2::Session; 24 use vmm_sys_util::tempdir::TempDir; 25 use wait_timeout::ChildExt; 26 27 #[derive(Debug)] 28 pub enum WaitTimeoutError { 29 Timedout, 30 ExitStatus, 31 General(std::io::Error), 32 } 33 34 #[derive(Debug)] 35 pub enum Error { 36 Parsing(std::num::ParseIntError), 37 SshCommand(SshCommandError), 38 WaitForBoot(WaitForBootError), 39 EthrLogFile(std::io::Error), 40 EthrLogParse, 41 FioOutputParse, 42 Iperf3Parse, 43 Spawn(std::io::Error), 44 WaitTimeout(WaitTimeoutError), 45 } 46 47 impl From<SshCommandError> for Error { 48 fn from(e: SshCommandError) -> Self { 49 Self::SshCommand(e) 50 } 51 } 52 53 pub struct GuestNetworkConfig { 54 pub guest_ip: String, 55 pub l2_guest_ip1: String, 56 pub l2_guest_ip2: String, 57 pub l2_guest_ip3: String, 58 pub host_ip: String, 59 pub guest_mac: String, 60 pub l2_guest_mac1: String, 61 pub l2_guest_mac2: String, 62 pub l2_guest_mac3: String, 63 pub tcp_listener_port: u16, 64 } 65 66 pub const DEFAULT_TCP_LISTENER_MESSAGE: &str = "booted"; 67 pub const DEFAULT_TCP_LISTENER_PORT: u16 = 8000; 68 pub const DEFAULT_TCP_LISTENER_TIMEOUT: i32 = 120; 69 70 #[derive(Debug)] 71 pub enum WaitForBootError { 72 EpollWait(std::io::Error), 73 Listen(std::io::Error), 74 EpollWaitTimeout, 75 WrongGuestAddr, 76 Accept(std::io::Error), 77 } 78 79 impl GuestNetworkConfig { 80 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), WaitForBootError> { 81 let start = std::time::Instant::now(); 82 // The 'port' is unique per 'GUEST' and listening to wild-card ip avoids retrying on 'TcpListener::bind()' 83 let listen_addr = format!("0.0.0.0:{}", self.tcp_listener_port); 84 let expected_guest_addr = self.guest_ip.as_str(); 85 let mut s = String::new(); 86 let timeout = match custom_timeout { 87 Some(t) => t, 88 None => DEFAULT_TCP_LISTENER_TIMEOUT, 89 }; 90 91 let mut closure = || -> Result<(), WaitForBootError> { 92 let listener = 93 TcpListener::bind(listen_addr.as_str()).map_err(WaitForBootError::Listen)?; 94 listener 95 .set_nonblocking(true) 96 .expect("Cannot set non-blocking for tcp listener"); 97 98 // Reply on epoll w/ timeout to wait for guest connections faithfully 99 let epoll_fd = epoll::create(true).expect("Cannot create epoll fd"); 100 // Use 'File' to enforce closing on 'epoll_fd' 101 let _epoll_file = unsafe { fs::File::from_raw_fd(epoll_fd) }; 102 epoll::ctl( 103 epoll_fd, 104 epoll::ControlOptions::EPOLL_CTL_ADD, 105 listener.as_raw_fd(), 106 epoll::Event::new(epoll::Events::EPOLLIN, 0), 107 ) 108 .expect("Cannot add 'tcp_listener' event to epoll"); 109 let mut events = [epoll::Event::new(epoll::Events::empty(), 0); 1]; 110 loop { 111 let num_events = match epoll::wait(epoll_fd, timeout * 1000_i32, &mut events[..]) { 112 Ok(num_events) => Ok(num_events), 113 Err(e) => match e.raw_os_error() { 114 Some(libc::EAGAIN) | Some(libc::EINTR) => continue, 115 _ => Err(e), 116 }, 117 } 118 .map_err(WaitForBootError::EpollWait)?; 119 if num_events == 0 { 120 return Err(WaitForBootError::EpollWaitTimeout); 121 } 122 break; 123 } 124 125 match listener.accept() { 126 Ok((_, addr)) => { 127 // Make sure the connection is from the expected 'guest_addr' 128 if addr.ip() != std::net::IpAddr::from_str(expected_guest_addr).unwrap() { 129 s = format!( 130 "Expecting the guest ip '{}' while being connected with ip '{}'", 131 expected_guest_addr, 132 addr.ip() 133 ); 134 return Err(WaitForBootError::WrongGuestAddr); 135 } 136 137 Ok(()) 138 } 139 Err(e) => { 140 s = "TcpListener::accept() failed".to_string(); 141 Err(WaitForBootError::Accept(e)) 142 } 143 } 144 }; 145 146 match closure() { 147 Err(e) => { 148 let duration = start.elapsed(); 149 eprintln!( 150 "\n\n==== Start 'wait_vm_boot' (FAILED) ==== \ 151 \n\nduration =\"{duration:?}, timeout = {timeout}s\" \ 152 \nlisten_addr=\"{listen_addr}\" \ 153 \nexpected_guest_addr=\"{expected_guest_addr}\" \ 154 \nmessage=\"{s}\" \ 155 \nerror=\"{e:?}\" \ 156 \n\n==== End 'wait_vm_boot' outout ====\n\n" 157 ); 158 159 Err(e) 160 } 161 Ok(_) => Ok(()), 162 } 163 } 164 } 165 166 pub enum DiskType { 167 OperatingSystem, 168 CloudInit, 169 } 170 171 pub trait DiskConfig { 172 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig); 173 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String; 174 fn disk(&self, disk_type: DiskType) -> Option<String>; 175 } 176 177 #[derive(Clone)] 178 pub struct UbuntuDiskConfig { 179 osdisk_path: String, 180 cloudinit_path: String, 181 image_name: String, 182 } 183 184 impl UbuntuDiskConfig { 185 pub fn new(image_name: String) -> Self { 186 UbuntuDiskConfig { 187 image_name, 188 osdisk_path: String::new(), 189 cloudinit_path: String::new(), 190 } 191 } 192 } 193 194 pub struct WindowsDiskConfig { 195 image_name: String, 196 osdisk_path: String, 197 loopback_device: String, 198 windows_snapshot_cow: String, 199 windows_snapshot: String, 200 } 201 202 impl WindowsDiskConfig { 203 pub fn new(image_name: String) -> Self { 204 WindowsDiskConfig { 205 image_name, 206 osdisk_path: String::new(), 207 loopback_device: String::new(), 208 windows_snapshot_cow: String::new(), 209 windows_snapshot: String::new(), 210 } 211 } 212 } 213 214 impl Drop for WindowsDiskConfig { 215 fn drop(&mut self) { 216 // dmsetup remove windows-snapshot-1 217 std::process::Command::new("dmsetup") 218 .arg("remove") 219 .arg(self.windows_snapshot.as_str()) 220 .output() 221 .expect("Expect removing Windows snapshot with 'dmsetup' to succeed"); 222 223 // dmsetup remove windows-snapshot-cow-1 224 std::process::Command::new("dmsetup") 225 .arg("remove") 226 .arg(self.windows_snapshot_cow.as_str()) 227 .output() 228 .expect("Expect removing Windows snapshot CoW with 'dmsetup' to succeed"); 229 230 // losetup -d <loopback_device> 231 std::process::Command::new("losetup") 232 .args(["-d", self.loopback_device.as_str()]) 233 .output() 234 .expect("Expect removing loopback device to succeed"); 235 } 236 } 237 238 impl DiskConfig for UbuntuDiskConfig { 239 fn prepare_cloudinit(&self, tmp_dir: &TempDir, network: &GuestNetworkConfig) -> String { 240 let cloudinit_file_path = 241 String::from(tmp_dir.as_path().join("cloudinit").to_str().unwrap()); 242 243 let cloud_init_directory = tmp_dir.as_path().join("cloud-init").join("ubuntu"); 244 245 fs::create_dir_all(&cloud_init_directory) 246 .expect("Expect creating cloud-init directory to succeed"); 247 248 let source_file_dir = std::env::current_dir() 249 .unwrap() 250 .join("test_data") 251 .join("cloud-init") 252 .join("ubuntu") 253 .join("ci"); 254 255 ["meta-data"].iter().for_each(|x| { 256 rate_limited_copy(source_file_dir.join(x), cloud_init_directory.join(x)) 257 .expect("Expect copying cloud-init meta-data to succeed"); 258 }); 259 260 let mut user_data_string = String::new(); 261 fs::File::open(source_file_dir.join("user-data")) 262 .unwrap() 263 .read_to_string(&mut user_data_string) 264 .expect("Expected reading user-data file to succeed"); 265 user_data_string = user_data_string.replace( 266 "@DEFAULT_TCP_LISTENER_MESSAGE", 267 DEFAULT_TCP_LISTENER_MESSAGE, 268 ); 269 user_data_string = user_data_string.replace("@HOST_IP", &network.host_ip); 270 user_data_string = 271 user_data_string.replace("@TCP_LISTENER_PORT", &network.tcp_listener_port.to_string()); 272 273 fs::File::create(cloud_init_directory.join("user-data")) 274 .unwrap() 275 .write_all(user_data_string.as_bytes()) 276 .expect("Expected writing out user-data to succeed"); 277 278 let mut network_config_string = String::new(); 279 280 fs::File::open(source_file_dir.join("network-config")) 281 .unwrap() 282 .read_to_string(&mut network_config_string) 283 .expect("Expected reading network-config file to succeed"); 284 285 network_config_string = network_config_string.replace("192.168.2.1", &network.host_ip); 286 network_config_string = network_config_string.replace("192.168.2.2", &network.guest_ip); 287 network_config_string = network_config_string.replace("192.168.2.3", &network.l2_guest_ip1); 288 network_config_string = network_config_string.replace("192.168.2.4", &network.l2_guest_ip2); 289 network_config_string = network_config_string.replace("192.168.2.5", &network.l2_guest_ip3); 290 network_config_string = 291 network_config_string.replace("12:34:56:78:90:ab", &network.guest_mac); 292 network_config_string = 293 network_config_string.replace("de:ad:be:ef:12:34", &network.l2_guest_mac1); 294 network_config_string = 295 network_config_string.replace("de:ad:be:ef:34:56", &network.l2_guest_mac2); 296 network_config_string = 297 network_config_string.replace("de:ad:be:ef:56:78", &network.l2_guest_mac3); 298 299 fs::File::create(cloud_init_directory.join("network-config")) 300 .unwrap() 301 .write_all(network_config_string.as_bytes()) 302 .expect("Expected writing out network-config to succeed"); 303 304 std::process::Command::new("mkdosfs") 305 .args(["-n", "CIDATA"]) 306 .args(["-C", cloudinit_file_path.as_str()]) 307 .arg("8192") 308 .output() 309 .expect("Expect creating disk image to succeed"); 310 311 ["user-data", "meta-data", "network-config"] 312 .iter() 313 .for_each(|x| { 314 std::process::Command::new("mcopy") 315 .arg("-o") 316 .args(["-i", cloudinit_file_path.as_str()]) 317 .args(["-s", cloud_init_directory.join(x).to_str().unwrap(), "::"]) 318 .output() 319 .expect("Expect copying files to disk image to succeed"); 320 }); 321 322 cloudinit_file_path 323 } 324 325 fn prepare_files(&mut self, tmp_dir: &TempDir, network: &GuestNetworkConfig) { 326 let mut workload_path = dirs::home_dir().unwrap(); 327 workload_path.push("workloads"); 328 329 let mut osdisk_base_path = workload_path; 330 osdisk_base_path.push(&self.image_name); 331 332 let osdisk_path = String::from(tmp_dir.as_path().join("osdisk.img").to_str().unwrap()); 333 let cloudinit_path = self.prepare_cloudinit(tmp_dir, network); 334 335 rate_limited_copy(osdisk_base_path, &osdisk_path) 336 .expect("copying of OS source disk image failed"); 337 338 self.cloudinit_path = cloudinit_path; 339 self.osdisk_path = osdisk_path; 340 } 341 342 fn disk(&self, disk_type: DiskType) -> Option<String> { 343 match disk_type { 344 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 345 DiskType::CloudInit => Some(self.cloudinit_path.clone()), 346 } 347 } 348 } 349 350 impl DiskConfig for WindowsDiskConfig { 351 fn prepare_cloudinit(&self, _tmp_dir: &TempDir, _network: &GuestNetworkConfig) -> String { 352 String::new() 353 } 354 355 fn prepare_files(&mut self, tmp_dir: &TempDir, _network: &GuestNetworkConfig) { 356 let mut workload_path = dirs::home_dir().unwrap(); 357 workload_path.push("workloads"); 358 359 let mut osdisk_path = workload_path; 360 osdisk_path.push(&self.image_name); 361 362 let osdisk_blk_size = fs::metadata(osdisk_path) 363 .expect("Expect retrieving Windows image metadata") 364 .len() 365 >> 9; 366 367 let snapshot_cow_path = 368 String::from(tmp_dir.as_path().join("snapshot_cow").to_str().unwrap()); 369 370 // Create and truncate CoW file for device mapper 371 let cow_file_size: u64 = 1 << 30; 372 let cow_file_blk_size = cow_file_size >> 9; 373 let cow_file = std::fs::File::create(snapshot_cow_path.as_str()) 374 .expect("Expect creating CoW image to succeed"); 375 cow_file 376 .set_len(cow_file_size) 377 .expect("Expect truncating CoW image to succeed"); 378 379 // losetup --find --show /tmp/snapshot_cow 380 let loopback_device = std::process::Command::new("losetup") 381 .arg("--find") 382 .arg("--show") 383 .arg(snapshot_cow_path.as_str()) 384 .output() 385 .expect("Expect creating loopback device from snapshot CoW image to succeed"); 386 387 self.loopback_device = String::from_utf8_lossy(&loopback_device.stdout) 388 .trim() 389 .to_string(); 390 391 let random_extension = tmp_dir.as_path().file_name().unwrap(); 392 let windows_snapshot_cow = format!( 393 "windows-snapshot-cow-{}", 394 random_extension.to_str().unwrap() 395 ); 396 397 // dmsetup create windows-snapshot-cow-1 --table '0 2097152 linear /dev/loop1 0' 398 std::process::Command::new("dmsetup") 399 .arg("create") 400 .arg(windows_snapshot_cow.as_str()) 401 .args([ 402 "--table", 403 format!("0 {} linear {} 0", cow_file_blk_size, self.loopback_device).as_str(), 404 ]) 405 .output() 406 .expect("Expect creating Windows snapshot CoW with 'dmsetup' to succeed"); 407 408 let windows_snapshot = format!("windows-snapshot-{}", random_extension.to_str().unwrap()); 409 410 // dmsetup mknodes 411 std::process::Command::new("dmsetup") 412 .arg("mknodes") 413 .output() 414 .expect("Expect device mapper nodes to be ready"); 415 416 // dmsetup create windows-snapshot-1 --table '0 41943040 snapshot /dev/mapper/windows-base /dev/mapper/windows-snapshot-cow-1 P 8' 417 std::process::Command::new("dmsetup") 418 .arg("create") 419 .arg(windows_snapshot.as_str()) 420 .args([ 421 "--table", 422 format!( 423 "0 {} snapshot /dev/mapper/windows-base /dev/mapper/{} P 8", 424 osdisk_blk_size, 425 windows_snapshot_cow.as_str() 426 ) 427 .as_str(), 428 ]) 429 .output() 430 .expect("Expect creating Windows snapshot with 'dmsetup' to succeed"); 431 432 // dmsetup mknodes 433 std::process::Command::new("dmsetup") 434 .arg("mknodes") 435 .output() 436 .expect("Expect device mapper nodes to be ready"); 437 438 self.osdisk_path = format!("/dev/mapper/{windows_snapshot}"); 439 self.windows_snapshot_cow = windows_snapshot_cow; 440 self.windows_snapshot = windows_snapshot; 441 } 442 443 fn disk(&self, disk_type: DiskType) -> Option<String> { 444 match disk_type { 445 DiskType::OperatingSystem => Some(self.osdisk_path.clone()), 446 DiskType::CloudInit => None, 447 } 448 } 449 } 450 451 pub fn rate_limited_copy<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<u64> { 452 for i in 0..10 { 453 let free_bytes = unsafe { 454 let mut stats = std::mem::MaybeUninit::zeroed(); 455 let fs_name = std::ffi::CString::new("/tmp").unwrap(); 456 libc::statvfs(fs_name.as_ptr(), stats.as_mut_ptr()); 457 458 let free_blocks = stats.assume_init().f_bfree; 459 let block_size = stats.assume_init().f_bsize; 460 461 free_blocks * block_size 462 }; 463 464 // Make sure there is at least 6 GiB of space 465 if free_bytes < 6 << 30 { 466 eprintln!("Not enough space on disk ({free_bytes}). Attempt {i} of 10. Sleeping."); 467 thread::sleep(std::time::Duration::new(60, 0)); 468 continue; 469 } 470 471 match fs::copy(&from, &to) { 472 Err(e) => { 473 if let Some(errno) = e.raw_os_error() { 474 if errno == libc::ENOSPC { 475 eprintln!("Copy returned ENOSPC. Attempt {i} of 10. Sleeping."); 476 thread::sleep(std::time::Duration::new(60, 0)); 477 continue; 478 } 479 } 480 return Err(e); 481 } 482 Ok(i) => return Ok(i), 483 } 484 } 485 Err(io::Error::last_os_error()) 486 } 487 488 pub fn handle_child_output( 489 r: Result<(), std::boxed::Box<dyn std::any::Any + std::marker::Send>>, 490 output: &std::process::Output, 491 ) { 492 use std::os::unix::process::ExitStatusExt; 493 if r.is_ok() && output.status.success() { 494 return; 495 } 496 497 match output.status.code() { 498 None => { 499 // Don't treat child.kill() as a problem 500 if output.status.signal() == Some(9) && r.is_ok() { 501 return; 502 } 503 504 eprintln!( 505 "==== child killed by signal: {} ====", 506 output.status.signal().unwrap() 507 ); 508 } 509 Some(code) => { 510 eprintln!("\n\n==== child exit code: {code} ===="); 511 } 512 } 513 514 eprintln!( 515 "\n\n==== Start child stdout ====\n\n{}\n\n==== End child stdout ====", 516 String::from_utf8_lossy(&output.stdout) 517 ); 518 eprintln!( 519 "\n\n==== Start child stderr ====\n\n{}\n\n==== End child stderr ====", 520 String::from_utf8_lossy(&output.stderr) 521 ); 522 523 panic!("Test failed") 524 } 525 526 #[derive(Debug)] 527 pub struct PasswordAuth { 528 pub username: String, 529 pub password: String, 530 } 531 532 pub const DEFAULT_SSH_RETRIES: u8 = 6; 533 pub const DEFAULT_SSH_TIMEOUT: u8 = 10; 534 535 #[derive(Debug)] 536 pub enum SshCommandError { 537 Connection(std::io::Error), 538 Handshake(ssh2::Error), 539 Authentication(ssh2::Error), 540 ChannelSession(ssh2::Error), 541 Command(ssh2::Error), 542 ExitStatus(ssh2::Error), 543 NonZeroExitStatus(i32), 544 FileRead(std::io::Error), 545 FileMetadata(std::io::Error), 546 ScpSend(ssh2::Error), 547 WriteAll(std::io::Error), 548 SendEof(ssh2::Error), 549 WaitEof(ssh2::Error), 550 } 551 552 fn scp_to_guest_with_auth( 553 path: &Path, 554 remote_path: &Path, 555 auth: &PasswordAuth, 556 ip: &str, 557 retries: u8, 558 timeout: u8, 559 ) -> Result<(), SshCommandError> { 560 let mut counter = 0; 561 loop { 562 let closure = || -> Result<(), SshCommandError> { 563 let tcp = 564 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?; 565 let mut sess = Session::new().unwrap(); 566 sess.set_tcp_stream(tcp); 567 sess.handshake().map_err(SshCommandError::Handshake)?; 568 569 sess.userauth_password(&auth.username, &auth.password) 570 .map_err(SshCommandError::Authentication)?; 571 assert!(sess.authenticated()); 572 573 let content = fs::read(path).map_err(SshCommandError::FileRead)?; 574 let mode = fs::metadata(path) 575 .map_err(SshCommandError::FileMetadata)? 576 .permissions() 577 .mode() 578 & 0o777; 579 580 let mut channel = sess 581 .scp_send(remote_path, mode as i32, content.len() as u64, None) 582 .map_err(SshCommandError::ScpSend)?; 583 channel 584 .write_all(&content) 585 .map_err(SshCommandError::WriteAll)?; 586 channel.send_eof().map_err(SshCommandError::SendEof)?; 587 channel.wait_eof().map_err(SshCommandError::WaitEof)?; 588 589 // Intentionally ignore these results here as their failure 590 // does not precipitate a repeat 591 let _ = channel.close(); 592 let _ = channel.wait_close(); 593 594 Ok(()) 595 }; 596 597 match closure() { 598 Ok(_) => break, 599 Err(e) => { 600 counter += 1; 601 if counter >= retries { 602 eprintln!( 603 "\n\n==== Start scp command output (FAILED) ====\n\n\ 604 path =\"{path:?}\"\n\ 605 remote_path =\"{remote_path:?}\"\n\ 606 auth=\"{auth:#?}\"\n\ 607 ip=\"{ip}\"\n\ 608 error=\"{e:?}\"\n\ 609 \n==== End scp command outout ====\n\n" 610 ); 611 612 return Err(e); 613 } 614 } 615 }; 616 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 617 } 618 Ok(()) 619 } 620 621 pub fn scp_to_guest( 622 path: &Path, 623 remote_path: &Path, 624 ip: &str, 625 retries: u8, 626 timeout: u8, 627 ) -> Result<(), SshCommandError> { 628 scp_to_guest_with_auth( 629 path, 630 remote_path, 631 &PasswordAuth { 632 username: String::from("cloud"), 633 password: String::from("cloud123"), 634 }, 635 ip, 636 retries, 637 timeout, 638 ) 639 } 640 641 pub fn ssh_command_ip_with_auth( 642 command: &str, 643 auth: &PasswordAuth, 644 ip: &str, 645 retries: u8, 646 timeout: u8, 647 ) -> Result<String, SshCommandError> { 648 let mut s = String::new(); 649 650 let mut counter = 0; 651 loop { 652 let mut closure = || -> Result<(), SshCommandError> { 653 let tcp = 654 TcpStream::connect(format!("{ip}:22")).map_err(SshCommandError::Connection)?; 655 let mut sess = Session::new().unwrap(); 656 sess.set_tcp_stream(tcp); 657 sess.handshake().map_err(SshCommandError::Handshake)?; 658 659 sess.userauth_password(&auth.username, &auth.password) 660 .map_err(SshCommandError::Authentication)?; 661 assert!(sess.authenticated()); 662 663 let mut channel = sess 664 .channel_session() 665 .map_err(SshCommandError::ChannelSession)?; 666 channel.exec(command).map_err(SshCommandError::Command)?; 667 668 // Intentionally ignore these results here as their failure 669 // does not precipitate a repeat 670 let _ = channel.read_to_string(&mut s); 671 let _ = channel.close(); 672 let _ = channel.wait_close(); 673 674 let status = channel.exit_status().map_err(SshCommandError::ExitStatus)?; 675 676 if status != 0 { 677 Err(SshCommandError::NonZeroExitStatus(status)) 678 } else { 679 Ok(()) 680 } 681 }; 682 683 match closure() { 684 Ok(_) => break, 685 Err(e) => { 686 counter += 1; 687 if counter >= retries { 688 eprintln!( 689 "\n\n==== Start ssh command output (FAILED) ====\n\n\ 690 command=\"{command}\"\n\ 691 auth=\"{auth:#?}\"\n\ 692 ip=\"{ip}\"\n\ 693 output=\"{s}\"\n\ 694 error=\"{e:?}\"\n\ 695 \n==== End ssh command outout ====\n\n" 696 ); 697 698 return Err(e); 699 } 700 } 701 }; 702 thread::sleep(std::time::Duration::new((timeout * counter).into(), 0)); 703 } 704 Ok(s) 705 } 706 707 pub fn ssh_command_ip( 708 command: &str, 709 ip: &str, 710 retries: u8, 711 timeout: u8, 712 ) -> Result<String, SshCommandError> { 713 ssh_command_ip_with_auth( 714 command, 715 &PasswordAuth { 716 username: String::from("cloud"), 717 password: String::from("cloud123"), 718 }, 719 ip, 720 retries, 721 timeout, 722 ) 723 } 724 725 pub fn exec_host_command_with_retries(command: &str, retries: u32, interval: Duration) -> bool { 726 for _ in 0..retries { 727 let s = exec_host_command_output(command).status; 728 if !s.success() { 729 eprintln!("\n\n==== retrying in {:?} ===\n\n", interval); 730 thread::sleep(interval); 731 } else { 732 return true; 733 } 734 } 735 736 false 737 } 738 739 pub fn exec_host_command_status(command: &str) -> ExitStatus { 740 exec_host_command_output(command).status 741 } 742 743 pub fn exec_host_command_output(command: &str) -> Output { 744 let output = std::process::Command::new("bash") 745 .args(["-c", command]) 746 .output() 747 .unwrap_or_else(|e| panic!("Expected '{command}' to run. Error: {:?}", e)); 748 749 if !output.status.success() { 750 let stdout = String::from_utf8_lossy(&output.stdout); 751 let stderr = String::from_utf8_lossy(&output.stderr); 752 eprintln!( 753 "\n\n==== Start 'exec_host_command' failed ==== \ 754 \n\n---stdout---\n{stdout}\n---stderr---{stderr} \ 755 \n\n==== End 'exec_host_command' failed ====", 756 ); 757 } 758 759 output 760 } 761 762 pub fn check_lines_count(input: &str, line_count: usize) -> bool { 763 if input.lines().count() == line_count { 764 true 765 } else { 766 eprintln!( 767 "\n\n==== Start 'check_lines_count' failed ==== \ 768 \n\ninput = {input}\nline_count = {line_count} \ 769 \n\n==== End 'check_lines_count' failed ====", 770 ); 771 772 false 773 } 774 } 775 776 pub fn check_matched_lines_count(input: &str, keywords: Vec<&str>, line_count: usize) -> bool { 777 let mut matches = String::new(); 778 for line in input.lines() { 779 if keywords.iter().all(|k| line.contains(k)) { 780 matches += line; 781 } 782 } 783 784 if matches.lines().count() == line_count { 785 true 786 } else { 787 eprintln!( 788 "\n\n==== Start 'check_matched_lines_count' failed ==== \ 789 \nkeywords = {keywords:?}, line_count = {line_count} \ 790 \n\ninput = {input} matches = {matches} \ 791 \n\n==== End 'check_matched_lines_count' failed ====", 792 ); 793 794 false 795 } 796 } 797 798 pub fn kill_child(child: &mut Child) { 799 let r = unsafe { libc::kill(child.id() as i32, libc::SIGTERM) }; 800 if r != 0 { 801 let e = io::Error::last_os_error(); 802 if e.raw_os_error().unwrap() == libc::ESRCH { 803 return; 804 } 805 eprintln!("Failed to kill child with SIGTERM: {e:?}"); 806 } 807 808 // The timeout period elapsed without the child exiting 809 if child.wait_timeout(Duration::new(10, 0)).unwrap().is_none() { 810 let _ = child.kill(); 811 let rust_flags = env::var("RUSTFLAGS").unwrap_or_default(); 812 if rust_flags.contains("-Cinstrument-coverage") { 813 panic!("Wait child timeout, please check the reason.") 814 } 815 } 816 } 817 818 pub const PIPE_SIZE: i32 = 32 << 20; 819 820 static NEXT_VM_ID: Lazy<Mutex<u8>> = Lazy::new(|| Mutex::new(1)); 821 822 pub struct Guest { 823 pub tmp_dir: TempDir, 824 pub disk_config: Box<dyn DiskConfig>, 825 pub network: GuestNetworkConfig, 826 } 827 828 // Safe to implement as we know we have no interior mutability 829 impl std::panic::RefUnwindSafe for Guest {} 830 831 impl Guest { 832 pub fn new_from_ip_range(mut disk_config: Box<dyn DiskConfig>, class: &str, id: u8) -> Self { 833 let tmp_dir = TempDir::new_with_prefix("/tmp/ch").unwrap(); 834 835 let network = GuestNetworkConfig { 836 guest_ip: format!("{class}.{id}.2"), 837 l2_guest_ip1: format!("{class}.{id}.3"), 838 l2_guest_ip2: format!("{class}.{id}.4"), 839 l2_guest_ip3: format!("{class}.{id}.5"), 840 host_ip: format!("{class}.{id}.1"), 841 guest_mac: format!("12:34:56:78:90:{id:02x}"), 842 l2_guest_mac1: format!("de:ad:be:ef:12:{id:02x}"), 843 l2_guest_mac2: format!("de:ad:be:ef:34:{id:02x}"), 844 l2_guest_mac3: format!("de:ad:be:ef:56:{id:02x}"), 845 tcp_listener_port: DEFAULT_TCP_LISTENER_PORT + id as u16, 846 }; 847 848 disk_config.prepare_files(&tmp_dir, &network); 849 850 Guest { 851 tmp_dir, 852 disk_config, 853 network, 854 } 855 } 856 857 pub fn new(disk_config: Box<dyn DiskConfig>) -> Self { 858 let mut guard = NEXT_VM_ID.lock().unwrap(); 859 let id = *guard; 860 *guard = id + 1; 861 862 Self::new_from_ip_range(disk_config, "192.168", id) 863 } 864 865 pub fn default_net_string(&self) -> String { 866 format!( 867 "tap=,mac={},ip={},mask=255.255.255.0", 868 self.network.guest_mac, self.network.host_ip 869 ) 870 } 871 872 pub fn default_net_string_w_iommu(&self) -> String { 873 format!( 874 "tap=,mac={},ip={},mask=255.255.255.0,iommu=on", 875 self.network.guest_mac, self.network.host_ip 876 ) 877 } 878 879 pub fn default_net_string_w_mtu(&self, mtu: u16) -> String { 880 format!( 881 "tap=,mac={},ip={},mask=255.255.255.0,mtu={}", 882 self.network.guest_mac, self.network.host_ip, mtu 883 ) 884 } 885 886 pub fn ssh_command(&self, command: &str) -> Result<String, SshCommandError> { 887 ssh_command_ip( 888 command, 889 &self.network.guest_ip, 890 DEFAULT_SSH_RETRIES, 891 DEFAULT_SSH_TIMEOUT, 892 ) 893 } 894 895 #[cfg(target_arch = "x86_64")] 896 pub fn ssh_command_l1(&self, command: &str) -> Result<String, SshCommandError> { 897 ssh_command_ip( 898 command, 899 &self.network.guest_ip, 900 DEFAULT_SSH_RETRIES, 901 DEFAULT_SSH_TIMEOUT, 902 ) 903 } 904 905 #[cfg(target_arch = "x86_64")] 906 pub fn ssh_command_l2_1(&self, command: &str) -> Result<String, SshCommandError> { 907 ssh_command_ip( 908 command, 909 &self.network.l2_guest_ip1, 910 DEFAULT_SSH_RETRIES, 911 DEFAULT_SSH_TIMEOUT, 912 ) 913 } 914 915 #[cfg(target_arch = "x86_64")] 916 pub fn ssh_command_l2_2(&self, command: &str) -> Result<String, SshCommandError> { 917 ssh_command_ip( 918 command, 919 &self.network.l2_guest_ip2, 920 DEFAULT_SSH_RETRIES, 921 DEFAULT_SSH_TIMEOUT, 922 ) 923 } 924 925 #[cfg(target_arch = "x86_64")] 926 pub fn ssh_command_l2_3(&self, command: &str) -> Result<String, SshCommandError> { 927 ssh_command_ip( 928 command, 929 &self.network.l2_guest_ip3, 930 DEFAULT_SSH_RETRIES, 931 DEFAULT_SSH_TIMEOUT, 932 ) 933 } 934 935 pub fn api_create_body(&self, cpu_count: u8, kernel_path: &str, kernel_cmd: &str) -> String { 936 format! {"{{\"cpus\":{{\"boot_vcpus\":{},\"max_vcpus\":{}}},\"payload\":{{\"kernel\":\"{}\",\"cmdline\": \"{}\"}},\"net\":[{{\"ip\":\"{}\", \"mask\":\"255.255.255.0\", \"mac\":\"{}\"}}], \"disks\":[{{\"path\":\"{}\"}}, {{\"path\":\"{}\"}}]}}", 937 cpu_count, 938 cpu_count, 939 kernel_path, 940 kernel_cmd, 941 self.network.host_ip, 942 self.network.guest_mac, 943 self.disk_config.disk(DiskType::OperatingSystem).unwrap().as_str(), 944 self.disk_config.disk(DiskType::CloudInit).unwrap().as_str(), 945 } 946 } 947 948 pub fn get_cpu_count(&self) -> Result<u32, Error> { 949 self.ssh_command("grep -c processor /proc/cpuinfo")? 950 .trim() 951 .parse() 952 .map_err(Error::Parsing) 953 } 954 955 pub fn get_total_memory(&self) -> Result<u32, Error> { 956 self.ssh_command("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 957 .trim() 958 .parse() 959 .map_err(Error::Parsing) 960 } 961 962 #[cfg(target_arch = "x86_64")] 963 pub fn get_total_memory_l2(&self) -> Result<u32, Error> { 964 self.ssh_command_l2_1("grep MemTotal /proc/meminfo | grep -o \"[0-9]*\"")? 965 .trim() 966 .parse() 967 .map_err(Error::Parsing) 968 } 969 970 pub fn get_numa_node_memory(&self, node_id: usize) -> Result<u32, Error> { 971 self.ssh_command( 972 format!( 973 "grep MemTotal /sys/devices/system/node/node{node_id}/meminfo \ 974 | cut -d \":\" -f 2 | grep -o \"[0-9]*\"" 975 ) 976 .as_str(), 977 )? 978 .trim() 979 .parse() 980 .map_err(Error::Parsing) 981 } 982 983 pub fn wait_vm_boot(&self, custom_timeout: Option<i32>) -> Result<(), Error> { 984 self.network 985 .wait_vm_boot(custom_timeout) 986 .map_err(Error::WaitForBoot) 987 } 988 989 pub fn check_numa_node_cpus(&self, node_id: usize, cpus: Vec<usize>) -> Result<(), Error> { 990 for cpu in cpus.iter() { 991 let cmd = format!("[ -d \"/sys/devices/system/node/node{node_id}/cpu{cpu}\" ]"); 992 self.ssh_command(cmd.as_str())?; 993 } 994 995 Ok(()) 996 } 997 998 pub fn check_numa_node_distances( 999 &self, 1000 node_id: usize, 1001 distances: &str, 1002 ) -> Result<bool, Error> { 1003 let cmd = format!("cat /sys/devices/system/node/node{node_id}/distance"); 1004 if self.ssh_command(cmd.as_str())?.trim() == distances { 1005 Ok(true) 1006 } else { 1007 Ok(false) 1008 } 1009 } 1010 1011 pub fn check_numa_common( 1012 &self, 1013 mem_ref: Option<&[u32]>, 1014 node_ref: Option<&[Vec<usize>]>, 1015 distance_ref: Option<&[&str]>, 1016 ) { 1017 if let Some(mem_ref) = mem_ref { 1018 // Check each NUMA node has been assigned the right amount of 1019 // memory. 1020 for (i, &m) in mem_ref.iter().enumerate() { 1021 assert!(self.get_numa_node_memory(i).unwrap_or_default() > m); 1022 } 1023 } 1024 1025 if let Some(node_ref) = node_ref { 1026 // Check each NUMA node has been assigned the right CPUs set. 1027 for (i, n) in node_ref.iter().enumerate() { 1028 self.check_numa_node_cpus(i, n.clone()).unwrap(); 1029 } 1030 } 1031 1032 if let Some(distance_ref) = distance_ref { 1033 // Check each NUMA node has been assigned the right distances. 1034 for (i, &d) in distance_ref.iter().enumerate() { 1035 assert!(self.check_numa_node_distances(i, d).unwrap()); 1036 } 1037 } 1038 } 1039 1040 #[cfg(target_arch = "x86_64")] 1041 pub fn check_sgx_support(&self) -> Result<(), Error> { 1042 self.ssh_command( 1043 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX: \ 1044 Software Guard Extensions supported = true'", 1045 )?; 1046 self.ssh_command( 1047 "cpuid -l 0x7 -s 0 | tr -s [:space:] | grep -q 'SGX_LC: \ 1048 SGX launch config supported = true'", 1049 )?; 1050 self.ssh_command( 1051 "cpuid -l 0x12 -s 0 | tr -s [:space:] | grep -q 'SGX1 \ 1052 supported = true'", 1053 )?; 1054 1055 Ok(()) 1056 } 1057 1058 pub fn get_pci_bridge_class(&self) -> Result<String, Error> { 1059 Ok(self 1060 .ssh_command("cat /sys/bus/pci/devices/0000:00:00.0/class")? 1061 .trim() 1062 .to_string()) 1063 } 1064 1065 pub fn get_pci_device_ids(&self) -> Result<String, Error> { 1066 Ok(self 1067 .ssh_command("cat /sys/bus/pci/devices/*/device")? 1068 .trim() 1069 .to_string()) 1070 } 1071 1072 pub fn get_pci_vendor_ids(&self) -> Result<String, Error> { 1073 Ok(self 1074 .ssh_command("cat /sys/bus/pci/devices/*/vendor")? 1075 .trim() 1076 .to_string()) 1077 } 1078 1079 pub fn does_device_vendor_pair_match( 1080 &self, 1081 device_id: &str, 1082 vendor_id: &str, 1083 ) -> Result<bool, Error> { 1084 // We are checking if console device's device id and vendor id pair matches 1085 let devices = self.get_pci_device_ids()?; 1086 let devices: Vec<&str> = devices.split('\n').collect(); 1087 let vendors = self.get_pci_vendor_ids()?; 1088 let vendors: Vec<&str> = vendors.split('\n').collect(); 1089 1090 for (index, d_id) in devices.iter().enumerate() { 1091 if *d_id == device_id { 1092 if let Some(v_id) = vendors.get(index) { 1093 if *v_id == vendor_id { 1094 return Ok(true); 1095 } 1096 } 1097 } 1098 } 1099 1100 Ok(false) 1101 } 1102 1103 pub fn check_vsock(&self, socket: &str) { 1104 // Listen from guest on vsock CID=3 PORT=16 1105 // SOCKET-LISTEN:<domain>:<protocol>:<local-address> 1106 let guest_ip = self.network.guest_ip.clone(); 1107 let listen_socat = thread::spawn(move || { 1108 ssh_command_ip("sudo socat - SOCKET-LISTEN:40:0:x00x00x10x00x00x00x03x00x00x00x00x00x00x00 > vsock_log", &guest_ip, DEFAULT_SSH_RETRIES, DEFAULT_SSH_TIMEOUT).unwrap(); 1109 }); 1110 1111 // Make sure socat is listening, which might take a few second on slow systems 1112 thread::sleep(std::time::Duration::new(10, 0)); 1113 1114 // Write something to vsock from the host 1115 assert!(exec_host_command_status(&format!( 1116 "echo -e \"CONNECT 16\\nHelloWorld!\" | socat - UNIX-CONNECT:{socket}" 1117 )) 1118 .success()); 1119 1120 // Wait for the thread to terminate. 1121 listen_socat.join().unwrap(); 1122 1123 assert_eq!( 1124 self.ssh_command("cat vsock_log").unwrap().trim(), 1125 "HelloWorld!" 1126 ); 1127 } 1128 1129 #[cfg(target_arch = "x86_64")] 1130 pub fn check_nvidia_gpu(&self) { 1131 assert!(self 1132 .ssh_command("nvidia-smi") 1133 .unwrap() 1134 .contains("NVIDIA L40S")); 1135 } 1136 1137 pub fn reboot_linux(&self, current_reboot_count: u32, custom_timeout: Option<i32>) { 1138 let list_boots_cmd = "sudo last | grep -c reboot"; 1139 let boot_count = self 1140 .ssh_command(list_boots_cmd) 1141 .unwrap() 1142 .trim() 1143 .parse::<u32>() 1144 .unwrap_or_default(); 1145 1146 assert_eq!(boot_count, current_reboot_count + 1); 1147 self.ssh_command("sudo reboot").unwrap(); 1148 1149 self.wait_vm_boot(custom_timeout).unwrap(); 1150 let boot_count = self 1151 .ssh_command(list_boots_cmd) 1152 .unwrap() 1153 .trim() 1154 .parse::<u32>() 1155 .unwrap_or_default(); 1156 assert_eq!(boot_count, current_reboot_count + 2); 1157 } 1158 1159 pub fn enable_memory_hotplug(&self) { 1160 self.ssh_command("echo online | sudo tee /sys/devices/system/memory/auto_online_blocks") 1161 .unwrap(); 1162 } 1163 1164 pub fn check_devices_common( 1165 &self, 1166 socket: Option<&String>, 1167 console_text: Option<&String>, 1168 pmem_path: Option<&String>, 1169 ) { 1170 // Check block devices are readable 1171 self.ssh_command("sudo dd if=/dev/vda of=/dev/null bs=1M iflag=direct count=1024") 1172 .unwrap(); 1173 self.ssh_command("sudo dd if=/dev/vdb of=/dev/null bs=1M iflag=direct count=8") 1174 .unwrap(); 1175 // Check if the rng device is readable 1176 self.ssh_command("sudo head -c 1000 /dev/hwrng > /dev/null") 1177 .unwrap(); 1178 // Check vsock 1179 if let Some(socket) = socket { 1180 self.check_vsock(socket.as_str()); 1181 } 1182 // Check if the console is usable 1183 if let Some(console_text) = console_text { 1184 let console_cmd = format!("echo {console_text} | sudo tee /dev/hvc0"); 1185 self.ssh_command(&console_cmd).unwrap(); 1186 } 1187 // The net device is 'automatically' exercised through the above 'ssh' commands 1188 1189 // Check if the pmem device is usable 1190 if let Some(pmem_path) = pmem_path { 1191 assert_eq!( 1192 self.ssh_command(&format!("ls {pmem_path}")).unwrap().trim(), 1193 pmem_path 1194 ); 1195 assert_eq!( 1196 self.ssh_command(&format!("sudo mount {pmem_path} /mnt")) 1197 .unwrap(), 1198 "" 1199 ); 1200 assert_eq!(self.ssh_command("ls /mnt").unwrap(), "lost+found\n"); 1201 self.ssh_command("echo test123 | sudo tee /mnt/test") 1202 .unwrap(); 1203 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1204 assert_eq!(self.ssh_command("ls /mnt").unwrap(), ""); 1205 1206 assert_eq!( 1207 self.ssh_command(&format!("sudo mount {pmem_path} /mnt")) 1208 .unwrap(), 1209 "" 1210 ); 1211 assert_eq!( 1212 self.ssh_command("sudo cat /mnt/test || true") 1213 .unwrap() 1214 .trim(), 1215 "test123" 1216 ); 1217 self.ssh_command("sudo rm /mnt/test").unwrap(); 1218 assert_eq!(self.ssh_command("sudo umount /mnt").unwrap(), ""); 1219 } 1220 } 1221 } 1222 1223 pub enum VerbosityLevel { 1224 Warn, 1225 Info, 1226 Debug, 1227 } 1228 1229 impl Default for VerbosityLevel { 1230 fn default() -> Self { 1231 Self::Warn 1232 } 1233 } 1234 1235 impl Display for VerbosityLevel { 1236 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1237 use VerbosityLevel::*; 1238 match self { 1239 Warn => (), 1240 Info => write!(f, "-v")?, 1241 Debug => write!(f, "-vv")?, 1242 } 1243 Ok(()) 1244 } 1245 } 1246 1247 pub struct GuestCommand<'a> { 1248 command: Command, 1249 guest: &'a Guest, 1250 capture_output: bool, 1251 print_cmd: bool, 1252 verbosity: VerbosityLevel, 1253 } 1254 1255 impl<'a> GuestCommand<'a> { 1256 pub fn new(guest: &'a Guest) -> Self { 1257 Self::new_with_binary_path(guest, &clh_command("cloud-hypervisor")) 1258 } 1259 1260 pub fn new_with_binary_path(guest: &'a Guest, binary_path: &str) -> Self { 1261 Self { 1262 command: Command::new(binary_path), 1263 guest, 1264 capture_output: false, 1265 print_cmd: true, 1266 verbosity: VerbosityLevel::Info, 1267 } 1268 } 1269 1270 pub fn verbosity(&mut self, verbosity: VerbosityLevel) -> &mut Self { 1271 self.verbosity = verbosity; 1272 self 1273 } 1274 1275 pub fn capture_output(&mut self) -> &mut Self { 1276 self.capture_output = true; 1277 self 1278 } 1279 1280 pub fn set_print_cmd(&mut self, print_cmd: bool) -> &mut Self { 1281 self.print_cmd = print_cmd; 1282 self 1283 } 1284 1285 pub fn spawn(&mut self) -> io::Result<Child> { 1286 use VerbosityLevel::*; 1287 match &self.verbosity { 1288 Warn => {} 1289 Info => { 1290 self.command.arg("-v"); 1291 } 1292 Debug => { 1293 self.command.args(["-vv"]); 1294 } 1295 }; 1296 1297 if self.print_cmd { 1298 println!( 1299 "\n\n==== Start cloud-hypervisor command-line ====\n\n\ 1300 {:?}\n\ 1301 \n==== End cloud-hypervisor command-line ====\n\n", 1302 self.command 1303 ); 1304 } 1305 1306 if self.capture_output { 1307 // The caller should call .wait() on the returned child 1308 #[allow(unknown_lints)] 1309 #[allow(clippy::zombie_processes)] 1310 let child = self 1311 .command 1312 .stderr(Stdio::piped()) 1313 .stdout(Stdio::piped()) 1314 .spawn() 1315 .unwrap(); 1316 1317 let fd = child.stdout.as_ref().unwrap().as_raw_fd(); 1318 let pipesize = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1319 if pipesize == -1 { 1320 return Err(io::Error::last_os_error()); 1321 } 1322 let fd = child.stderr.as_ref().unwrap().as_raw_fd(); 1323 let pipesize1 = unsafe { libc::fcntl(fd, libc::F_SETPIPE_SZ, PIPE_SIZE) }; 1324 if pipesize1 == -1 { 1325 return Err(io::Error::last_os_error()); 1326 } 1327 1328 if pipesize >= PIPE_SIZE && pipesize1 >= PIPE_SIZE { 1329 Ok(child) 1330 } else { 1331 Err(std::io::Error::new( 1332 std::io::ErrorKind::Other, 1333 format!( 1334 "resizing pipe w/ 'fnctl' failed: stdout pipesize {pipesize}, stderr pipesize {pipesize1}" 1335 ), 1336 )) 1337 } 1338 } else { 1339 // The caller should call .wait() on the returned child 1340 #[allow(unknown_lints)] 1341 #[allow(clippy::zombie_processes)] 1342 self.command.spawn() 1343 } 1344 } 1345 1346 pub fn args<I, S>(&mut self, args: I) -> &mut Self 1347 where 1348 I: IntoIterator<Item = S>, 1349 S: AsRef<OsStr>, 1350 { 1351 self.command.args(args); 1352 self 1353 } 1354 1355 pub fn default_disks(&mut self) -> &mut Self { 1356 if self.guest.disk_config.disk(DiskType::CloudInit).is_some() { 1357 self.args([ 1358 "--disk", 1359 format!( 1360 "path={}", 1361 self.guest 1362 .disk_config 1363 .disk(DiskType::OperatingSystem) 1364 .unwrap() 1365 ) 1366 .as_str(), 1367 format!( 1368 "path={}", 1369 self.guest.disk_config.disk(DiskType::CloudInit).unwrap() 1370 ) 1371 .as_str(), 1372 ]) 1373 } else { 1374 self.args([ 1375 "--disk", 1376 format!( 1377 "path={}", 1378 self.guest 1379 .disk_config 1380 .disk(DiskType::OperatingSystem) 1381 .unwrap() 1382 ) 1383 .as_str(), 1384 ]) 1385 } 1386 } 1387 1388 pub fn default_net(&mut self) -> &mut Self { 1389 self.args(["--net", self.guest.default_net_string().as_str()]) 1390 } 1391 } 1392 1393 pub fn clh_command(cmd: &str) -> String { 1394 env::var("BUILD_TARGET").map_or( 1395 format!("target/x86_64-unknown-linux-gnu/release/{cmd}"), 1396 |target| format!("target/{target}/release/{cmd}"), 1397 ) 1398 } 1399 1400 pub fn parse_iperf3_output(output: &[u8], sender: bool, bandwidth: bool) -> Result<f64, Error> { 1401 std::panic::catch_unwind(|| { 1402 let s = String::from_utf8_lossy(output); 1403 let v: Value = serde_json::from_str(&s).expect("'iperf3' parse error: invalid json output"); 1404 1405 if bandwidth { 1406 if sender { 1407 v["end"]["sum_sent"]["bits_per_second"] 1408 .as_f64() 1409 .expect("'iperf3' parse error: missing entry 'end.sum_sent.bits_per_second'") 1410 } else { 1411 v["end"]["sum_received"]["bits_per_second"].as_f64().expect( 1412 "'iperf3' parse error: missing entry 'end.sum_received.bits_per_second'", 1413 ) 1414 } 1415 } else { 1416 // iperf does not distinguish sent vs received in this case. 1417 1418 let lost_packets = v["end"]["sum"]["lost_packets"] 1419 .as_f64() 1420 .expect("'iperf3' parse error: missing entry 'end.sum.lost_packets'"); 1421 let packets = v["end"]["sum"]["packets"] 1422 .as_f64() 1423 .expect("'iperf3' parse error: missing entry 'end.sum.packets'"); 1424 let seconds = v["end"]["sum"]["seconds"] 1425 .as_f64() 1426 .expect("'iperf3' parse error: missing entry 'end.sum.seconds'"); 1427 1428 (packets - lost_packets) / seconds 1429 } 1430 }) 1431 .map_err(|_| { 1432 eprintln!( 1433 "==== Start iperf3 output ===\n\n{}\n\n=== End iperf3 output ===\n\n", 1434 String::from_utf8_lossy(output) 1435 ); 1436 Error::Iperf3Parse 1437 }) 1438 } 1439 1440 #[derive(Clone)] 1441 pub enum FioOps { 1442 Read, 1443 RandomRead, 1444 Write, 1445 RandomWrite, 1446 ReadWrite, 1447 RandRW, 1448 } 1449 1450 impl fmt::Display for FioOps { 1451 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 1452 match self { 1453 FioOps::Read => write!(f, "read"), 1454 FioOps::RandomRead => write!(f, "randread"), 1455 FioOps::Write => write!(f, "write"), 1456 FioOps::RandomWrite => write!(f, "randwrite"), 1457 FioOps::ReadWrite => write!(f, "rw"), 1458 FioOps::RandRW => write!(f, "randrw"), 1459 } 1460 } 1461 } 1462 1463 pub fn parse_fio_output(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1464 std::panic::catch_unwind(|| { 1465 let v: Value = 1466 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1467 let jobs = v["jobs"] 1468 .as_array() 1469 .expect("'fio' parse error: missing entry 'jobs'"); 1470 assert_eq!( 1471 jobs.len(), 1472 num_jobs as usize, 1473 "'fio' parse error: Unexpected number of 'fio' jobs." 1474 ); 1475 1476 let (read, write) = match fio_ops { 1477 FioOps::Read | FioOps::RandomRead => (true, false), 1478 FioOps::Write | FioOps::RandomWrite => (false, true), 1479 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1480 }; 1481 1482 let mut total_bps = 0_f64; 1483 for j in jobs { 1484 if read { 1485 let bytes = j["read"]["io_bytes"] 1486 .as_u64() 1487 .expect("'fio' parse error: missing entry 'read.io_bytes'"); 1488 let runtime = j["read"]["runtime"] 1489 .as_u64() 1490 .expect("'fio' parse error: missing entry 'read.runtime'") 1491 as f64 1492 / 1000_f64; 1493 total_bps += bytes as f64 / runtime; 1494 } 1495 if write { 1496 let bytes = j["write"]["io_bytes"] 1497 .as_u64() 1498 .expect("'fio' parse error: missing entry 'write.io_bytes'"); 1499 let runtime = j["write"]["runtime"] 1500 .as_u64() 1501 .expect("'fio' parse error: missing entry 'write.runtime'") 1502 as f64 1503 / 1000_f64; 1504 total_bps += bytes as f64 / runtime; 1505 } 1506 } 1507 1508 total_bps 1509 }) 1510 .map_err(|_| { 1511 eprintln!("=== Start Fio output ===\n\n{output}\n\n=== End Fio output ===\n\n"); 1512 Error::FioOutputParse 1513 }) 1514 } 1515 1516 pub fn parse_fio_output_iops(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> { 1517 std::panic::catch_unwind(|| { 1518 let v: Value = 1519 serde_json::from_str(output).expect("'fio' parse error: invalid json output"); 1520 let jobs = v["jobs"] 1521 .as_array() 1522 .expect("'fio' parse error: missing entry 'jobs'"); 1523 assert_eq!( 1524 jobs.len(), 1525 num_jobs as usize, 1526 "'fio' parse error: Unexpected number of 'fio' jobs." 1527 ); 1528 1529 let (read, write) = match fio_ops { 1530 FioOps::Read | FioOps::RandomRead => (true, false), 1531 FioOps::Write | FioOps::RandomWrite => (false, true), 1532 FioOps::ReadWrite | FioOps::RandRW => (true, true), 1533 }; 1534 1535 let mut total_iops = 0_f64; 1536 for j in jobs { 1537 if read { 1538 let ios = j["read"]["total_ios"] 1539 .as_u64() 1540 .expect("'fio' parse error: missing entry 'read.total_ios'"); 1541 let runtime = j["read"]["runtime"] 1542 .as_u64() 1543 .expect("'fio' parse error: missing entry 'read.runtime'") 1544 as f64 1545 / 1000_f64; 1546 total_iops += ios as f64 / runtime; 1547 } 1548 if write { 1549 let ios = j["write"]["total_ios"] 1550 .as_u64() 1551 .expect("'fio' parse error: missing entry 'write.total_ios'"); 1552 let runtime = j["write"]["runtime"] 1553 .as_u64() 1554 .expect("'fio' parse error: missing entry 'write.runtime'") 1555 as f64 1556 / 1000_f64; 1557 total_iops += ios as f64 / runtime; 1558 } 1559 } 1560 1561 total_iops 1562 }) 1563 .map_err(|_| { 1564 eprintln!("=== Start Fio output ===\n\n{output}\n\n=== End Fio output ===\n\n"); 1565 Error::FioOutputParse 1566 }) 1567 } 1568 1569 // Wait the child process for a given timeout 1570 fn child_wait_timeout(child: &mut Child, timeout: u64) -> Result<(), WaitTimeoutError> { 1571 match child.wait_timeout(Duration::from_secs(timeout)) { 1572 Err(e) => { 1573 return Err(WaitTimeoutError::General(e)); 1574 } 1575 Ok(s) => match s { 1576 None => { 1577 return Err(WaitTimeoutError::Timedout); 1578 } 1579 Some(s) => { 1580 if !s.success() { 1581 return Err(WaitTimeoutError::ExitStatus); 1582 } 1583 } 1584 }, 1585 } 1586 1587 Ok(()) 1588 } 1589 1590 pub fn measure_virtio_net_throughput( 1591 test_timeout: u32, 1592 queue_pairs: u32, 1593 guest: &Guest, 1594 receive: bool, 1595 bandwidth: bool, 1596 ) -> Result<f64, Error> { 1597 let default_port = 5201; 1598 1599 // 1. start the iperf3 server on the guest 1600 for n in 0..queue_pairs { 1601 guest.ssh_command(&format!("iperf3 -s -p {} -D", default_port + n))?; 1602 } 1603 1604 thread::sleep(Duration::new(1, 0)); 1605 1606 // 2. start the iperf3 client on host to measure RX through-put 1607 let mut clients = Vec::new(); 1608 for n in 0..queue_pairs { 1609 let mut cmd = Command::new("iperf3"); 1610 cmd.args([ 1611 "-J", // Output in JSON format 1612 "-c", 1613 &guest.network.guest_ip, 1614 "-p", 1615 &format!("{}", default_port + n), 1616 "-t", 1617 &format!("{test_timeout}"), 1618 "-i", 1619 "0", 1620 ]); 1621 // For measuring the guest transmit throughput (as a sender), 1622 // use reverse mode of the iperf3 client on the host 1623 if !receive { 1624 cmd.args(["-R"]); 1625 } 1626 // Use UDP stream to measure packets per second. The bitrate is set to 1627 // 1T to make sure it saturates the link. 1628 if !bandwidth { 1629 cmd.args(["-u", "-b", "1T"]); 1630 } 1631 let client = cmd 1632 .stderr(Stdio::piped()) 1633 .stdout(Stdio::piped()) 1634 .spawn() 1635 .map_err(Error::Spawn)?; 1636 1637 clients.push(client); 1638 } 1639 1640 let mut err: Option<Error> = None; 1641 let mut results = Vec::new(); 1642 let mut failed = false; 1643 for c in clients { 1644 let mut c = c; 1645 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5) { 1646 err = Some(Error::WaitTimeout(e)); 1647 failed = true; 1648 } 1649 1650 if !failed { 1651 // Safe to unwrap as we know the child has terminated successfully 1652 let output = c.wait_with_output().unwrap(); 1653 results.push(parse_iperf3_output(&output.stdout, receive, bandwidth)?); 1654 } else { 1655 let _ = c.kill(); 1656 let output = c.wait_with_output().unwrap(); 1657 println!( 1658 "=============== Client output [Error] ===============\n\n{}\n\n===========end============\n\n", 1659 String::from_utf8_lossy(&output.stdout) 1660 ); 1661 } 1662 } 1663 1664 if let Some(e) = err { 1665 Err(e) 1666 } else { 1667 Ok(results.iter().sum()) 1668 } 1669 } 1670 1671 pub fn parse_ethr_latency_output(output: &[u8]) -> Result<Vec<f64>, Error> { 1672 std::panic::catch_unwind(|| { 1673 let s = String::from_utf8_lossy(output); 1674 let mut latency = Vec::new(); 1675 for l in s.lines() { 1676 let v: Value = serde_json::from_str(l).expect("'ethr' parse error: invalid json line"); 1677 // Skip header/summary lines 1678 if let Some(avg) = v["Avg"].as_str() { 1679 // Assume the latency unit is always "us" 1680 latency.push( 1681 avg.split("us").collect::<Vec<&str>>()[0] 1682 .parse::<f64>() 1683 .expect("'ethr' parse error: invalid 'Avg' entry"), 1684 ); 1685 } 1686 } 1687 1688 assert!( 1689 !latency.is_empty(), 1690 "'ethr' parse error: no valid latency data found" 1691 ); 1692 1693 latency 1694 }) 1695 .map_err(|_| { 1696 eprintln!( 1697 "=== Start ethr output ===\n\n{}\n\n=== End ethr output ===\n\n", 1698 String::from_utf8_lossy(output) 1699 ); 1700 Error::EthrLogParse 1701 }) 1702 } 1703 1704 pub fn measure_virtio_net_latency(guest: &Guest, test_timeout: u32) -> Result<Vec<f64>, Error> { 1705 // copy the 'ethr' tool to the guest image 1706 let ethr_path = "/usr/local/bin/ethr"; 1707 let ethr_remote_path = "/tmp/ethr"; 1708 scp_to_guest( 1709 Path::new(ethr_path), 1710 Path::new(ethr_remote_path), 1711 &guest.network.guest_ip, 1712 //DEFAULT_SSH_RETRIES, 1713 1, 1714 DEFAULT_SSH_TIMEOUT, 1715 )?; 1716 1717 // Start the ethr server on the guest 1718 guest.ssh_command(&format!("{ethr_remote_path} -s &> /dev/null &"))?; 1719 1720 thread::sleep(Duration::new(10, 0)); 1721 1722 // Start the ethr client on the host 1723 let log_file = guest 1724 .tmp_dir 1725 .as_path() 1726 .join("ethr.client.log") 1727 .to_str() 1728 .unwrap() 1729 .to_string(); 1730 let mut c = Command::new(ethr_path) 1731 .args([ 1732 "-c", 1733 &guest.network.guest_ip, 1734 "-t", 1735 "l", 1736 "-o", 1737 &log_file, // file output is JSON format 1738 "-d", 1739 &format!("{test_timeout}s"), 1740 ]) 1741 .stderr(Stdio::piped()) 1742 .stdout(Stdio::piped()) 1743 .spawn() 1744 .map_err(Error::Spawn)?; 1745 1746 if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5).map_err(Error::WaitTimeout) 1747 { 1748 let _ = c.kill(); 1749 return Err(e); 1750 } 1751 1752 // Parse the ethr latency test output 1753 let content = fs::read(log_file).map_err(Error::EthrLogFile)?; 1754 parse_ethr_latency_output(&content) 1755 } 1756