1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 // 4 5 /// `VsockMuxer` is the device-facing component of the Unix domain sockets vsock backend. I.e. 6 /// by implementing the `VsockBackend` trait, it abstracts away the gory details of translating 7 /// between AF_VSOCK and AF_UNIX, and presents a clean interface to the rest of the vsock 8 /// device model. 9 /// 10 /// The vsock muxer has two main roles: 11 /// 1. Vsock connection multiplexer: 12 /// It's the muxer's job to create, manage, and terminate `VsockConnection` objects. The 13 /// muxer also routes packets to their owning connections. It does so via a connection 14 /// `HashMap`, keyed by what is basically a (host_port, guest_port) tuple. 15 /// Vsock packet traffic needs to be inspected, in order to detect connection request 16 /// packets (leading to the creation of a new connection), and connection reset packets 17 /// (leading to the termination of an existing connection). All other packets, though, must 18 /// belong to an existing connection and, as such, the muxer simply forwards them. 19 /// 2. Event dispatcher 20 /// There are three event categories that the vsock backend is interested it: 21 /// 1. A new host-initiated connection is ready to be accepted from the listening host Unix 22 /// socket; 23 /// 2. Data is available for reading from a newly-accepted host-initiated connection (i.e. 24 /// the host is ready to issue a vsock connection request, informing us of the 25 /// destination port to which it wants to connect); 26 /// 3. Some event was triggered for a connected Unix socket, that belongs to a 27 /// `VsockConnection`. 28 /// The muxer gets notified about all of these events, because, as a `VsockEpollListener` 29 /// implementor, it gets to register a nested epoll FD into the main VMM epolling loop. All 30 /// other pollable FDs are then registered under this nested epoll FD. 31 /// To route all these events to their handlers, the muxer uses another `HashMap` object, 32 /// mapping `RawFd`s to `EpollListener`s. 33 /// 34 use std::collections::{HashMap, HashSet}; 35 use std::fs::File; 36 use std::io::{self, Read}; 37 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; 38 use std::os::unix::net::{UnixListener, UnixStream}; 39 40 use super::super::csm::ConnState; 41 use super::super::defs::uapi; 42 use super::super::packet::VsockPacket; 43 use super::super::{ 44 Result as VsockResult, VsockBackend, VsockChannel, VsockEpollListener, VsockError, 45 }; 46 use super::defs; 47 use super::muxer_killq::MuxerKillQ; 48 use super::muxer_rxq::MuxerRxQ; 49 use super::MuxerConnection; 50 use super::{Error, Result}; 51 52 /// A unique identifier of a `MuxerConnection` object. Connections are stored in a hash map, 53 /// keyed by a `ConnMapKey` object. 54 /// 55 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] 56 pub struct ConnMapKey { 57 local_port: u32, 58 peer_port: u32, 59 } 60 61 /// A muxer RX queue item. 62 /// 63 #[derive(Clone, Copy, Debug)] 64 pub enum MuxerRx { 65 /// The packet must be fetched from the connection identified by `ConnMapKey`. 66 ConnRx(ConnMapKey), 67 /// The muxer must produce an RST packet. 68 RstPkt { local_port: u32, peer_port: u32 }, 69 } 70 71 /// An epoll listener, registered under the muxer's nested epoll FD. 72 /// 73 enum EpollListener { 74 /// The listener is a `MuxerConnection`, identified by `key`, and interested in the events 75 /// in `evset`. Since `MuxerConnection` implements `VsockEpollListener`, notifications will 76 /// be forwarded to the listener via `VsockEpollListener::notify()`. 77 Connection { 78 key: ConnMapKey, 79 evset: epoll::Events, 80 }, 81 /// A listener interested in new host-initiated connections. 82 HostSock, 83 /// A listener interested in reading host "connect <port>" commands from a freshly 84 /// connected host socket. 85 LocalStream(UnixStream), 86 } 87 88 /// The vsock connection multiplexer. 89 /// 90 pub struct VsockMuxer { 91 /// Guest CID. 92 cid: u64, 93 /// A hash map used to store the active connections. 94 conn_map: HashMap<ConnMapKey, MuxerConnection>, 95 /// A hash map used to store epoll event listeners / handlers. 96 listener_map: HashMap<RawFd, EpollListener>, 97 /// The RX queue. Items in this queue are consumed by `VsockMuxer::recv_pkt()`, and 98 /// produced 99 /// - by `VsockMuxer::send_pkt()` (e.g. RST in response to a connection request packet); 100 /// and 101 /// - in response to EPOLLIN events (e.g. data available to be read from an AF_UNIX 102 /// socket). 103 rxq: MuxerRxQ, 104 /// A queue used for terminating connections that are taking too long to shut down. 105 killq: MuxerKillQ, 106 /// The Unix socket, through which host-initiated connections are accepted. 107 host_sock: UnixListener, 108 /// The file system path of the host-side Unix socket. This is used to figure out the path 109 /// to Unix sockets listening on specific ports. I.e. "<this path>_<port number>". 110 host_sock_path: String, 111 /// The nested epoll File, used to register epoll listeners. 112 epoll_file: File, 113 /// A hash set used to keep track of used host-side (local) ports, in order to assign local 114 /// ports to host-initiated connections. 115 local_port_set: HashSet<u32>, 116 /// The last used host-side port. 117 local_port_last: u32, 118 } 119 120 impl VsockChannel for VsockMuxer { 121 /// Deliver a vsock packet to the guest vsock driver. 122 /// 123 /// Returns: 124 /// - `Ok(())`: `pkt` has been successfully filled in; or 125 /// - `Err(VsockError::NoData)`: there was no available data with which to fill in the 126 /// packet. 127 /// 128 fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> { 129 // We'll look for instructions on how to build the RX packet in the RX queue. If the 130 // queue is empty, that doesn't necessarily mean we don't have any pending RX, since 131 // the queue might be out-of-sync. If that's the case, we'll attempt to sync it first, 132 // and then try to pop something out again. 133 if self.rxq.is_empty() && !self.rxq.is_synced() { 134 self.rxq = MuxerRxQ::from_conn_map(&self.conn_map); 135 } 136 137 while let Some(rx) = self.rxq.peek() { 138 let res = match rx { 139 // We need to build an RST packet, going from `local_port` to `peer_port`. 140 MuxerRx::RstPkt { 141 local_port, 142 peer_port, 143 } => { 144 pkt.set_op(uapi::VSOCK_OP_RST) 145 .set_src_cid(uapi::VSOCK_HOST_CID) 146 .set_dst_cid(self.cid) 147 .set_src_port(local_port) 148 .set_dst_port(peer_port) 149 .set_len(0) 150 .set_type(uapi::VSOCK_TYPE_STREAM) 151 .set_flags(0) 152 .set_buf_alloc(0) 153 .set_fwd_cnt(0); 154 self.rxq.pop().unwrap(); 155 return Ok(()); 156 } 157 158 // We'll defer building the packet to this connection, since it has something 159 // to say. 160 MuxerRx::ConnRx(key) => { 161 let mut conn_res = Err(VsockError::NoData); 162 let mut do_pop = true; 163 self.apply_conn_mutation(key, |conn| { 164 conn_res = conn.recv_pkt(pkt); 165 do_pop = !conn.has_pending_rx(); 166 }); 167 if do_pop { 168 self.rxq.pop().unwrap(); 169 } 170 conn_res 171 } 172 }; 173 174 if res.is_ok() { 175 // Inspect traffic, looking for RST packets, since that means we have to 176 // terminate and remove this connection from the active connection pool. 177 // 178 if pkt.op() == uapi::VSOCK_OP_RST { 179 self.remove_connection(ConnMapKey { 180 local_port: pkt.src_port(), 181 peer_port: pkt.dst_port(), 182 }); 183 } 184 185 debug!("vsock muxer: RX pkt: {:?}", pkt.hdr()); 186 return Ok(()); 187 } 188 } 189 190 Err(VsockError::NoData) 191 } 192 193 /// Deliver a guest-generated packet to its destination in the vsock backend. 194 /// 195 /// This absorbs unexpected packets, handles RSTs (by dropping connections), and forwards 196 /// all the rest to their owning `MuxerConnection`. 197 /// 198 /// Returns: 199 /// always `Ok(())` - the packet has been consumed, and its virtio TX buffers can be 200 /// returned to the guest vsock driver. 201 /// 202 fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> { 203 let conn_key = ConnMapKey { 204 local_port: pkt.dst_port(), 205 peer_port: pkt.src_port(), 206 }; 207 208 debug!( 209 "vsock: muxer.send[rxq.len={}]: {:?}", 210 self.rxq.len(), 211 pkt.hdr() 212 ); 213 214 // If this packet has an unsupported type (!=stream), we must send back an RST. 215 // 216 if pkt.type_() != uapi::VSOCK_TYPE_STREAM { 217 self.enq_rst(pkt.dst_port(), pkt.src_port()); 218 return Ok(()); 219 } 220 221 // We don't know how to handle packets addressed to other CIDs. We only handle the host 222 // part of the guest - host communication here. 223 if pkt.dst_cid() != uapi::VSOCK_HOST_CID { 224 info!( 225 "vsock: dropping guest packet for unknown CID: {:?}", 226 pkt.hdr() 227 ); 228 return Ok(()); 229 } 230 231 if !self.conn_map.contains_key(&conn_key) { 232 // This packet can't be routed to any active connection (based on its src and dst 233 // ports). The only orphan / unroutable packets we know how to handle are 234 // connection requests. 235 if pkt.op() == uapi::VSOCK_OP_REQUEST { 236 // Oh, this is a connection request! 237 self.handle_peer_request_pkt(pkt); 238 } else { 239 // Send back an RST, to let the drive know we weren't expecting this packet. 240 self.enq_rst(pkt.dst_port(), pkt.src_port()); 241 } 242 return Ok(()); 243 } 244 245 // Right, we know where to send this packet, then (to `conn_key`). 246 // However, if this is an RST, we have to forcefully terminate the connection, so 247 // there's no point in forwarding it the packet. 248 if pkt.op() == uapi::VSOCK_OP_RST { 249 self.remove_connection(conn_key); 250 return Ok(()); 251 } 252 253 // Alright, everything looks in order - forward this packet to its owning connection. 254 let mut res: VsockResult<()> = Ok(()); 255 self.apply_conn_mutation(conn_key, |conn| { 256 res = conn.send_pkt(pkt); 257 }); 258 259 res 260 } 261 262 /// Check if the muxer has any pending RX data, with which to fill a guest-provided RX 263 /// buffer. 264 /// 265 fn has_pending_rx(&self) -> bool { 266 !self.rxq.is_empty() || !self.rxq.is_synced() 267 } 268 } 269 270 impl VsockEpollListener for VsockMuxer { 271 /// Get the FD to be registered for polling upstream (in the main VMM epoll loop, in this 272 /// case). 273 /// 274 /// This will be the muxer's nested epoll FD. 275 /// 276 fn get_polled_fd(&self) -> RawFd { 277 self.epoll_file.as_raw_fd() 278 } 279 280 /// Get the epoll events to be polled upstream. 281 /// 282 /// Since the polled FD is a nested epoll FD, we're only interested in EPOLLIN events (i.e. 283 /// some event occurred on one of the FDs registered under our epoll FD). 284 /// 285 fn get_polled_evset(&self) -> epoll::Events { 286 epoll::Events::EPOLLIN 287 } 288 289 /// Notify the muxer about a pending event having occurred under its nested epoll FD. 290 /// 291 fn notify(&mut self, _: epoll::Events) { 292 debug!("vsock: muxer received kick"); 293 294 let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); 32]; 295 'epoll: loop { 296 match epoll::wait(self.epoll_file.as_raw_fd(), 0, epoll_events.as_mut_slice()) { 297 Ok(ev_cnt) => { 298 for evt in epoll_events.iter().take(ev_cnt) { 299 self.handle_event( 300 evt.data as RawFd, 301 // It's ok to unwrap here, since the `evt.events` is filled 302 // in by `epoll::wait()`, and therefore contains only valid epoll 303 // flags. 304 epoll::Events::from_bits(evt.events).unwrap(), 305 ); 306 } 307 } 308 Err(e) => { 309 if e.kind() == io::ErrorKind::Interrupted { 310 // It's well defined from the epoll_wait() syscall 311 // documentation that the epoll loop can be interrupted 312 // before any of the requested events occurred or the 313 // timeout expired. In both those cases, epoll_wait() 314 // returns an error of type EINTR, but this should not 315 // be considered as a regular error. Instead it is more 316 // appropriate to retry, by calling into epoll_wait(). 317 continue; 318 } 319 warn!("vsock: failed to consume muxer epoll event: {}", e); 320 } 321 } 322 break 'epoll; 323 } 324 } 325 } 326 327 impl VsockBackend for VsockMuxer {} 328 329 impl VsockMuxer { 330 /// Muxer constructor. 331 /// 332 pub fn new(cid: u64, host_sock_path: String) -> Result<Self> { 333 // Create the nested epoll FD. This FD will be added to the VMM `EpollContext`, at 334 // device activation time. 335 let epoll_fd = epoll::create(true).map_err(Error::EpollFdCreate)?; 336 // Use 'File' to enforce closing on 'epoll_fd' 337 let epoll_file = unsafe { File::from_raw_fd(epoll_fd) }; 338 339 // Open/bind/listen on the host Unix socket, so we can accept host-initiated 340 // connections. 341 let host_sock = UnixListener::bind(&host_sock_path) 342 .and_then(|sock| sock.set_nonblocking(true).map(|_| sock)) 343 .map_err(Error::UnixBind)?; 344 345 let mut muxer = Self { 346 cid, 347 host_sock, 348 host_sock_path, 349 epoll_file, 350 rxq: MuxerRxQ::new(), 351 conn_map: HashMap::with_capacity(defs::MAX_CONNECTIONS), 352 listener_map: HashMap::with_capacity(defs::MAX_CONNECTIONS + 1), 353 killq: MuxerKillQ::new(), 354 local_port_last: (1u32 << 30) - 1, 355 local_port_set: HashSet::with_capacity(defs::MAX_CONNECTIONS), 356 }; 357 358 muxer.add_listener(muxer.host_sock.as_raw_fd(), EpollListener::HostSock)?; 359 Ok(muxer) 360 } 361 362 /// Handle/dispatch an epoll event to its listener. 363 /// 364 fn handle_event(&mut self, fd: RawFd, event_set: epoll::Events) { 365 debug!( 366 "vsock: muxer processing event: fd={}, event_set={:?}", 367 fd, event_set 368 ); 369 370 match self.listener_map.get_mut(&fd) { 371 // This event needs to be forwarded to a `MuxerConnection` that is listening for 372 // it. 373 // 374 Some(EpollListener::Connection { key, evset: _ }) => { 375 let key_copy = *key; 376 // The handling of this event will most probably mutate the state of the 377 // receiving connection. We'll need to check for new pending RX, event set 378 // mutation, and all that, so we're wrapping the event delivery inside those 379 // checks. 380 self.apply_conn_mutation(key_copy, |conn| { 381 conn.notify(event_set); 382 }); 383 } 384 385 // A new host-initiated connection is ready to be accepted. 386 // 387 Some(EpollListener::HostSock) => { 388 if self.conn_map.len() == defs::MAX_CONNECTIONS { 389 // If we're already maxed-out on connections, we'll just accept and 390 // immediately discard this potentially new one. 391 warn!("vsock: connection limit reached; refusing new host connection"); 392 self.host_sock.accept().map(|_| 0).unwrap_or(0); 393 return; 394 } 395 self.host_sock 396 .accept() 397 .map_err(Error::UnixAccept) 398 .and_then(|(stream, _)| { 399 stream 400 .set_nonblocking(true) 401 .map(|_| stream) 402 .map_err(Error::UnixAccept) 403 }) 404 .and_then(|stream| { 405 // Before forwarding this connection to a listening AF_VSOCK socket on 406 // the guest side, we need to know the destination port. We'll read 407 // that port from a "connect" command received on this socket, so the 408 // next step is to ask to be notified the moment we can read from it. 409 self.add_listener(stream.as_raw_fd(), EpollListener::LocalStream(stream)) 410 }) 411 .unwrap_or_else(|err| { 412 warn!("vsock: unable to accept local connection: {:?}", err); 413 }); 414 } 415 416 // Data is ready to be read from a host-initiated connection. That would be the 417 // "connect" command that we're expecting. 418 Some(EpollListener::LocalStream(_)) => { 419 if let Some(EpollListener::LocalStream(mut stream)) = self.remove_listener(fd) { 420 Self::read_local_stream_port(&mut stream) 421 .map(|peer_port| (self.allocate_local_port(), peer_port)) 422 .and_then(|(local_port, peer_port)| { 423 self.add_connection( 424 ConnMapKey { 425 local_port, 426 peer_port, 427 }, 428 MuxerConnection::new_local_init( 429 stream, 430 uapi::VSOCK_HOST_CID, 431 self.cid, 432 local_port, 433 peer_port, 434 ), 435 ) 436 }) 437 .unwrap_or_else(|err| { 438 info!("vsock: error adding local-init connection: {:?}", err); 439 }) 440 } 441 } 442 443 _ => { 444 info!( 445 "vsock: unexpected event: fd={:?}, event_set={:?}", 446 fd, event_set 447 ); 448 } 449 } 450 } 451 452 /// Parse a host "connect" command, and extract the destination vsock port. 453 /// 454 fn read_local_stream_port(stream: &mut UnixStream) -> Result<u32> { 455 let mut buf = [0u8; 32]; 456 457 // This is the minimum number of bytes that we should be able to read, when parsing a 458 // valid connection request. I.e. `b"connect 0\n".len()`. 459 const MIN_READ_LEN: usize = 10; 460 461 // Bring in the minimum number of bytes that we should be able to read. 462 stream 463 .read_exact(&mut buf[..MIN_READ_LEN]) 464 .map_err(Error::UnixRead)?; 465 466 // Now, finish reading the destination port number, by bringing in one byte at a time, 467 // until we reach an EOL terminator (or our buffer space runs out). Yeah, not 468 // particularly proud of this approach, but it will have to do for now. 469 let mut blen = MIN_READ_LEN; 470 while buf[blen - 1] != b'\n' && blen < buf.len() { 471 stream 472 .read_exact(&mut buf[blen..=blen]) 473 .map_err(Error::UnixRead)?; 474 blen += 1; 475 } 476 477 let mut word_iter = std::str::from_utf8(&buf[..blen]) 478 .map_err(Error::ConvertFromUtf8)? 479 .split_whitespace(); 480 481 word_iter 482 .next() 483 .ok_or(Error::InvalidPortRequest) 484 .and_then(|word| { 485 if word.to_lowercase() == "connect" { 486 Ok(()) 487 } else { 488 Err(Error::InvalidPortRequest) 489 } 490 }) 491 .and_then(|_| word_iter.next().ok_or(Error::InvalidPortRequest)) 492 .and_then(|word| word.parse::<u32>().map_err(Error::ParseInteger)) 493 .map_err(|e| Error::ReadStreamPort(Box::new(e))) 494 } 495 496 /// Add a new connection to the active connection pool. 497 /// 498 fn add_connection(&mut self, key: ConnMapKey, conn: MuxerConnection) -> Result<()> { 499 // We might need to make room for this new connection, so let's sweep the kill queue 500 // first. It's fine to do this here because: 501 // - unless the kill queue is out of sync, this is a pretty inexpensive operation; and 502 // - we are under no pressure to respect any accurate timing for connection 503 // termination. 504 self.sweep_killq(); 505 506 if self.conn_map.len() >= defs::MAX_CONNECTIONS { 507 info!( 508 "vsock: muxer connection limit reached ({})", 509 defs::MAX_CONNECTIONS 510 ); 511 return Err(Error::TooManyConnections); 512 } 513 514 self.add_listener( 515 conn.get_polled_fd(), 516 EpollListener::Connection { 517 key, 518 evset: conn.get_polled_evset(), 519 }, 520 ) 521 .map(|_| { 522 if conn.has_pending_rx() { 523 // We can safely ignore any error in adding a connection RX indication. Worst 524 // case scenario, the RX queue will get desynchronized, but we'll handle that 525 // the next time we need to yield an RX packet. 526 self.rxq.push(MuxerRx::ConnRx(key)); 527 } 528 self.conn_map.insert(key, conn); 529 }) 530 } 531 532 /// Remove a connection from the active connection poll. 533 /// 534 fn remove_connection(&mut self, key: ConnMapKey) { 535 if let Some(conn) = self.conn_map.remove(&key) { 536 self.remove_listener(conn.get_polled_fd()); 537 } 538 self.free_local_port(key.local_port); 539 } 540 541 /// Schedule a connection for immediate termination. 542 /// I.e. as soon as we can also let our peer know we're dropping the connection, by sending 543 /// it an RST packet. 544 /// 545 fn kill_connection(&mut self, key: ConnMapKey) { 546 let mut had_rx = false; 547 self.conn_map.entry(key).and_modify(|conn| { 548 had_rx = conn.has_pending_rx(); 549 conn.kill(); 550 }); 551 // This connection will now have an RST packet to yield, so we need to add it to the RX 552 // queue. However, there's no point in doing that if it was already in the queue. 553 if !had_rx { 554 // We can safely ignore any error in adding a connection RX indication. Worst case 555 // scenario, the RX queue will get desynchronized, but we'll handle that the next 556 // time we need to yield an RX packet. 557 self.rxq.push(MuxerRx::ConnRx(key)); 558 } 559 } 560 561 /// Register a new epoll listener under the muxer's nested epoll FD. 562 /// 563 fn add_listener(&mut self, fd: RawFd, listener: EpollListener) -> Result<()> { 564 let evset = match listener { 565 EpollListener::Connection { evset, .. } => evset, 566 EpollListener::LocalStream(_) => epoll::Events::EPOLLIN, 567 EpollListener::HostSock => epoll::Events::EPOLLIN, 568 }; 569 570 epoll::ctl( 571 self.epoll_file.as_raw_fd(), 572 epoll::ControlOptions::EPOLL_CTL_ADD, 573 fd, 574 epoll::Event::new(evset, fd as u64), 575 ) 576 .map(|_| { 577 self.listener_map.insert(fd, listener); 578 }) 579 .map_err(Error::EpollAdd)?; 580 581 Ok(()) 582 } 583 584 /// Remove (and return) a previously registered epoll listener. 585 /// 586 fn remove_listener(&mut self, fd: RawFd) -> Option<EpollListener> { 587 let maybe_listener = self.listener_map.remove(&fd); 588 589 if maybe_listener.is_some() { 590 epoll::ctl( 591 self.epoll_file.as_raw_fd(), 592 epoll::ControlOptions::EPOLL_CTL_DEL, 593 fd, 594 epoll::Event::new(epoll::Events::empty(), 0), 595 ) 596 .unwrap_or_else(|err| { 597 warn!( 598 "vosck muxer: error removing epoll listener for fd {:?}: {:?}", 599 fd, err 600 ); 601 }); 602 } 603 604 maybe_listener 605 } 606 607 /// Allocate a host-side port to be assigned to a new host-initiated connection. 608 /// 609 /// 610 fn allocate_local_port(&mut self) -> u32 { 611 // TODO: this doesn't seem very space-efficient. 612 // Maybe rewrite this to limit port range and use a bitmap? 613 // 614 615 loop { 616 self.local_port_last = (self.local_port_last + 1) & !(1 << 31) | (1 << 30); 617 if self.local_port_set.insert(self.local_port_last) { 618 break; 619 } 620 } 621 self.local_port_last 622 } 623 624 /// Mark a previously used host-side port as free. 625 /// 626 fn free_local_port(&mut self, port: u32) { 627 self.local_port_set.remove(&port); 628 } 629 630 /// Handle a new connection request coming from our peer (the guest vsock driver). 631 /// 632 /// This will attempt to connect to a host-side Unix socket, expected to be listening at 633 /// the file system path corresponding to the destination port. If successful, a new 634 /// connection object will be created and added to the connection pool. On failure, a new 635 /// RST packet will be scheduled for delivery to the guest. 636 /// 637 fn handle_peer_request_pkt(&mut self, pkt: &VsockPacket) { 638 let port_path = format!("{}_{}", self.host_sock_path, pkt.dst_port()); 639 640 UnixStream::connect(port_path) 641 .and_then(|stream| stream.set_nonblocking(true).map(|_| stream)) 642 .map_err(Error::UnixConnect) 643 .and_then(|stream| { 644 self.add_connection( 645 ConnMapKey { 646 local_port: pkt.dst_port(), 647 peer_port: pkt.src_port(), 648 }, 649 MuxerConnection::new_peer_init( 650 stream, 651 uapi::VSOCK_HOST_CID, 652 self.cid, 653 pkt.dst_port(), 654 pkt.src_port(), 655 pkt.buf_alloc(), 656 ), 657 ) 658 }) 659 .unwrap_or_else(|_| self.enq_rst(pkt.dst_port(), pkt.src_port())); 660 } 661 662 /// Perform an action that might mutate a connection's state. 663 /// 664 /// This is used as shorthand for repetitive tasks that need to be performed after a 665 /// connection object mutates. E.g. 666 /// - update the connection's epoll listener; 667 /// - schedule the connection to be queried for RX data; 668 /// - kill the connection if an unrecoverable error occurs. 669 /// 670 fn apply_conn_mutation<F>(&mut self, key: ConnMapKey, mut_fn: F) 671 where 672 F: FnOnce(&mut MuxerConnection), 673 { 674 if let Some(conn) = self.conn_map.get_mut(&key) { 675 let had_rx = conn.has_pending_rx(); 676 let was_expiring = conn.will_expire(); 677 let prev_state = conn.state(); 678 679 mut_fn(conn); 680 681 // If this is a host-initiated connection that has just become established, we'll have 682 // to send an ack message to the host end. 683 if prev_state == ConnState::LocalInit && conn.state() == ConnState::Established { 684 let msg = format!("OK {}\n", key.local_port); 685 match conn.send_bytes_raw(msg.as_bytes()) { 686 Ok(written) if written == msg.len() => (), 687 Ok(_) => { 688 // If we can't write a dozen bytes to a pristine connection something 689 // must be really wrong. Killing it. 690 conn.kill(); 691 warn!("vsock: unable to fully write connection ack msg."); 692 } 693 Err(err) => { 694 conn.kill(); 695 warn!("vsock: unable to ack host connection: {:?}", err); 696 } 697 }; 698 } 699 700 // If the connection wasn't previously scheduled for RX, add it to our RX queue. 701 if !had_rx && conn.has_pending_rx() { 702 self.rxq.push(MuxerRx::ConnRx(key)); 703 } 704 705 // If the connection wasn't previously scheduled for termination, add it to the 706 // kill queue. 707 if !was_expiring && conn.will_expire() { 708 // It's safe to unwrap here, since `conn.will_expire()` already guaranteed that 709 // an `conn.expiry` is available. 710 self.killq.push(key, conn.expiry().unwrap()); 711 } 712 713 let fd = conn.get_polled_fd(); 714 let new_evset = conn.get_polled_evset(); 715 if new_evset.is_empty() { 716 // If the connection no longer needs epoll notifications, remove its listener 717 // from our list. 718 self.remove_listener(fd); 719 return; 720 } 721 if let Some(EpollListener::Connection { evset, .. }) = self.listener_map.get_mut(&fd) { 722 if *evset != new_evset { 723 // If the set of events that the connection is interested in has changed, 724 // we need to update its epoll listener. 725 debug!( 726 "vsock: updating listener for (lp={}, pp={}): old={:?}, new={:?}", 727 key.local_port, key.peer_port, *evset, new_evset 728 ); 729 730 *evset = new_evset; 731 epoll::ctl( 732 self.epoll_file.as_raw_fd(), 733 epoll::ControlOptions::EPOLL_CTL_MOD, 734 fd, 735 epoll::Event::new(new_evset, fd as u64), 736 ) 737 .unwrap_or_else(|err| { 738 // This really shouldn't happen, like, ever. However, "famous last 739 // words" and all that, so let's just kill it with fire, and walk away. 740 self.kill_connection(key); 741 error!( 742 "vsock: error updating epoll listener for (lp={}, pp={}): {:?}", 743 key.local_port, key.peer_port, err 744 ); 745 }); 746 } 747 } else { 748 // The connection had previously asked to be removed from the listener map (by 749 // returning an empty event set via `get_polled_fd()`), but now wants back in. 750 self.add_listener( 751 fd, 752 EpollListener::Connection { 753 key, 754 evset: new_evset, 755 }, 756 ) 757 .unwrap_or_else(|err| { 758 self.kill_connection(key); 759 error!( 760 "vsock: error updating epoll listener for (lp={}, pp={}): {:?}", 761 key.local_port, key.peer_port, err 762 ); 763 }); 764 } 765 } 766 } 767 768 /// Check if any connections have timed out, and if so, schedule them for immediate 769 /// termination. 770 /// 771 fn sweep_killq(&mut self) { 772 while let Some(key) = self.killq.pop() { 773 // Connections don't get removed from the kill queue when their kill timer is 774 // disarmed, since that would be a costly operation. This means we must check if 775 // the connection has indeed expired, prior to killing it. 776 let mut kill = false; 777 self.conn_map 778 .entry(key) 779 .and_modify(|conn| kill = conn.has_expired()); 780 if kill { 781 self.kill_connection(key); 782 } 783 } 784 785 if self.killq.is_empty() && !self.killq.is_synced() { 786 self.killq = MuxerKillQ::from_conn_map(&self.conn_map); 787 // If we've just re-created the kill queue, we can sweep it again; maybe there's 788 // more to kill. 789 self.sweep_killq(); 790 } 791 } 792 793 /// Enqueue an RST packet into `self.rxq`. 794 /// 795 /// Enqueue errors aren't propagated up the call chain, since there is nothing we can do to 796 /// handle them. We do, however, log a warning, since not being able to enqueue an RST 797 /// packet means we have to drop it, which is not normal operation. 798 /// 799 fn enq_rst(&mut self, local_port: u32, peer_port: u32) { 800 let pushed = self.rxq.push(MuxerRx::RstPkt { 801 local_port, 802 peer_port, 803 }); 804 if !pushed { 805 warn!( 806 "vsock: muxer.rxq full; dropping RST packet for lp={}, pp={}", 807 local_port, peer_port 808 ); 809 } 810 } 811 } 812 813 #[cfg(test)] 814 mod tests { 815 use std::io::{Read, Write}; 816 use std::ops::Drop; 817 use std::os::unix::net::{UnixListener, UnixStream}; 818 use std::path::{Path, PathBuf}; 819 820 use virtio_queue::QueueOwnedT; 821 822 use super::super::super::csm::defs as csm_defs; 823 use super::super::super::tests::TestContext as VsockTestContext; 824 use super::*; 825 826 const PEER_CID: u64 = 3; 827 const PEER_BUF_ALLOC: u32 = 64 * 1024; 828 829 struct MuxerTestContext { 830 _vsock_test_ctx: VsockTestContext, 831 pkt: VsockPacket, 832 muxer: VsockMuxer, 833 } 834 835 impl Drop for MuxerTestContext { 836 fn drop(&mut self) { 837 std::fs::remove_file(self.muxer.host_sock_path.as_str()).unwrap(); 838 } 839 } 840 841 impl MuxerTestContext { 842 fn new(name: &str) -> Self { 843 let vsock_test_ctx = VsockTestContext::new(); 844 let mut handler_ctx = vsock_test_ctx.create_epoll_handler_context(); 845 let pkt = VsockPacket::from_rx_virtq_head( 846 &mut handler_ctx.handler.queues[0] 847 .iter(&vsock_test_ctx.mem) 848 .unwrap() 849 .next() 850 .unwrap(), 851 None, 852 ) 853 .unwrap(); 854 let uds_path = format!("test_vsock_{}.sock", name); 855 let muxer = VsockMuxer::new(PEER_CID, uds_path).unwrap(); 856 857 Self { 858 _vsock_test_ctx: vsock_test_ctx, 859 pkt, 860 muxer, 861 } 862 } 863 864 fn init_pkt(&mut self, local_port: u32, peer_port: u32, op: u16) -> &mut VsockPacket { 865 for b in self.pkt.hdr_mut() { 866 *b = 0; 867 } 868 self.pkt 869 .set_type(uapi::VSOCK_TYPE_STREAM) 870 .set_src_cid(PEER_CID) 871 .set_dst_cid(uapi::VSOCK_HOST_CID) 872 .set_src_port(peer_port) 873 .set_dst_port(local_port) 874 .set_op(op) 875 .set_buf_alloc(PEER_BUF_ALLOC) 876 } 877 878 fn init_data_pkt( 879 &mut self, 880 local_port: u32, 881 peer_port: u32, 882 data: &[u8], 883 ) -> &mut VsockPacket { 884 assert!(data.len() <= self.pkt.buf().unwrap().len() as usize); 885 self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RW) 886 .set_len(data.len() as u32); 887 self.pkt.buf_mut().unwrap()[..data.len()].copy_from_slice(data); 888 &mut self.pkt 889 } 890 891 fn send(&mut self) { 892 self.muxer.send_pkt(&self.pkt).unwrap(); 893 } 894 895 fn recv(&mut self) { 896 self.muxer.recv_pkt(&mut self.pkt).unwrap(); 897 } 898 899 fn notify_muxer(&mut self) { 900 self.muxer.notify(epoll::Events::EPOLLIN); 901 } 902 903 fn count_epoll_listeners(&self) -> (usize, usize) { 904 let mut local_lsn_count = 0usize; 905 let mut conn_lsn_count = 0usize; 906 for key in self.muxer.listener_map.values() { 907 match key { 908 EpollListener::LocalStream(_) => local_lsn_count += 1, 909 EpollListener::Connection { .. } => conn_lsn_count += 1, 910 _ => (), 911 }; 912 } 913 (local_lsn_count, conn_lsn_count) 914 } 915 916 fn create_local_listener(&self, port: u32) -> LocalListener { 917 LocalListener::new(format!("{}_{}", self.muxer.host_sock_path, port)) 918 } 919 920 fn local_connect(&mut self, peer_port: u32) -> (UnixStream, u32) { 921 let (init_local_lsn_count, init_conn_lsn_count) = self.count_epoll_listeners(); 922 923 let mut stream = UnixStream::connect(self.muxer.host_sock_path.clone()).unwrap(); 924 stream.set_nonblocking(true).unwrap(); 925 // The muxer would now get notified of a new connection having arrived at its Unix 926 // socket, so it can accept it. 927 self.notify_muxer(); 928 929 // Just after having accepted a new local connection, the muxer should've added a new 930 // `LocalStream` listener to its `listener_map`. 931 let (local_lsn_count, _) = self.count_epoll_listeners(); 932 assert_eq!(local_lsn_count, init_local_lsn_count + 1); 933 934 let buf = format!("CONNECT {}\n", peer_port); 935 stream.write_all(buf.as_bytes()).unwrap(); 936 // The muxer would now get notified that data is available for reading from the locally 937 // initiated connection. 938 self.notify_muxer(); 939 940 // Successfully reading and parsing the connection request should have removed the 941 // LocalStream epoll listener and added a Connection epoll listener. 942 let (local_lsn_count, conn_lsn_count) = self.count_epoll_listeners(); 943 assert_eq!(local_lsn_count, init_local_lsn_count); 944 assert_eq!(conn_lsn_count, init_conn_lsn_count + 1); 945 946 // A LocalInit connection should've been added to the muxer connection map. A new 947 // local port should also have been allocated for the new LocalInit connection. 948 let local_port = self.muxer.local_port_last; 949 let key = ConnMapKey { 950 local_port, 951 peer_port, 952 }; 953 assert!(self.muxer.conn_map.contains_key(&key)); 954 assert!(self.muxer.local_port_set.contains(&local_port)); 955 956 // A connection request for the peer should now be available from the muxer. 957 assert!(self.muxer.has_pending_rx()); 958 self.recv(); 959 assert_eq!(self.pkt.op(), uapi::VSOCK_OP_REQUEST); 960 assert_eq!(self.pkt.dst_port(), peer_port); 961 assert_eq!(self.pkt.src_port(), local_port); 962 963 self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RESPONSE); 964 self.send(); 965 966 let mut buf = vec![0u8; 32]; 967 let len = stream.read(&mut buf[..]).unwrap(); 968 assert_eq!(&buf[..len], format!("OK {}\n", local_port).as_bytes()); 969 970 (stream, local_port) 971 } 972 } 973 974 struct LocalListener { 975 path: PathBuf, 976 sock: UnixListener, 977 } 978 impl LocalListener { 979 fn new<P: AsRef<Path> + Clone>(path: P) -> Self { 980 let path_buf = path.as_ref().to_path_buf(); 981 let sock = UnixListener::bind(path).unwrap(); 982 sock.set_nonblocking(true).unwrap(); 983 Self { 984 path: path_buf, 985 sock, 986 } 987 } 988 fn accept(&mut self) -> UnixStream { 989 let (stream, _) = self.sock.accept().unwrap(); 990 stream.set_nonblocking(true).unwrap(); 991 stream 992 } 993 } 994 impl Drop for LocalListener { 995 fn drop(&mut self) { 996 std::fs::remove_file(&self.path).unwrap(); 997 } 998 } 999 1000 #[test] 1001 fn test_muxer_epoll_listener() { 1002 let ctx = MuxerTestContext::new("muxer_epoll_listener"); 1003 assert_eq!(ctx.muxer.get_polled_fd(), ctx.muxer.epoll_file.as_raw_fd()); 1004 assert_eq!(ctx.muxer.get_polled_evset(), epoll::Events::EPOLLIN); 1005 } 1006 1007 #[test] 1008 fn test_bad_peer_pkt() { 1009 const LOCAL_PORT: u32 = 1026; 1010 const PEER_PORT: u32 = 1025; 1011 const SOCK_DGRAM: u16 = 2; 1012 1013 let mut ctx = MuxerTestContext::new("bad_peer_pkt"); 1014 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST) 1015 .set_type(SOCK_DGRAM); 1016 ctx.send(); 1017 1018 // The guest sent a SOCK_DGRAM packet. Per the vsock spec, we need to reply with an RST 1019 // packet, since vsock only supports stream sockets. 1020 assert!(ctx.muxer.has_pending_rx()); 1021 ctx.recv(); 1022 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1023 assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1024 assert_eq!(ctx.pkt.dst_cid(), PEER_CID); 1025 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1026 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1027 1028 // Any orphan (i.e. without a connection), non-RST packet, should be replied to with an 1029 // RST. 1030 let bad_ops = [ 1031 uapi::VSOCK_OP_RESPONSE, 1032 uapi::VSOCK_OP_CREDIT_REQUEST, 1033 uapi::VSOCK_OP_CREDIT_UPDATE, 1034 uapi::VSOCK_OP_SHUTDOWN, 1035 uapi::VSOCK_OP_RW, 1036 ]; 1037 for op in bad_ops.iter() { 1038 ctx.init_pkt(LOCAL_PORT, PEER_PORT, *op); 1039 ctx.send(); 1040 assert!(ctx.muxer.has_pending_rx()); 1041 ctx.recv(); 1042 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1043 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1044 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1045 } 1046 1047 // Any packet addressed to anything other than VSOCK_VHOST_CID should get dropped. 1048 assert!(!ctx.muxer.has_pending_rx()); 1049 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST) 1050 .set_dst_cid(uapi::VSOCK_HOST_CID + 1); 1051 ctx.send(); 1052 assert!(!ctx.muxer.has_pending_rx()); 1053 } 1054 1055 #[test] 1056 fn test_peer_connection() { 1057 const LOCAL_PORT: u32 = 1026; 1058 const PEER_PORT: u32 = 1025; 1059 1060 let mut ctx = MuxerTestContext::new("peer_connection"); 1061 1062 // Test peer connection refused. 1063 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST); 1064 ctx.send(); 1065 ctx.recv(); 1066 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1067 assert_eq!(ctx.pkt.len(), 0); 1068 assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1069 assert_eq!(ctx.pkt.dst_cid(), PEER_CID); 1070 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1071 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1072 1073 // Test peer connection accepted. 1074 let mut listener = ctx.create_local_listener(LOCAL_PORT); 1075 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST); 1076 ctx.send(); 1077 assert_eq!(ctx.muxer.conn_map.len(), 1); 1078 let mut stream = listener.accept(); 1079 ctx.recv(); 1080 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1081 assert_eq!(ctx.pkt.len(), 0); 1082 assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1083 assert_eq!(ctx.pkt.dst_cid(), PEER_CID); 1084 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1085 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1086 let key = ConnMapKey { 1087 local_port: LOCAL_PORT, 1088 peer_port: PEER_PORT, 1089 }; 1090 assert!(ctx.muxer.conn_map.contains_key(&key)); 1091 1092 // Test guest -> host data flow. 1093 let data = [1, 2, 3, 4]; 1094 ctx.init_data_pkt(LOCAL_PORT, PEER_PORT, &data); 1095 ctx.send(); 1096 let mut buf = vec![0; data.len()]; 1097 stream.read_exact(buf.as_mut_slice()).unwrap(); 1098 assert_eq!(buf.as_slice(), data); 1099 1100 // Test host -> guest data flow. 1101 let data = [5u8, 6, 7, 8]; 1102 stream.write_all(&data).unwrap(); 1103 1104 // When data is available on the local stream, an EPOLLIN event would normally be delivered 1105 // to the muxer's nested epoll FD. For testing only, we can fake that event notification 1106 // here. 1107 ctx.notify_muxer(); 1108 // After being notified, the muxer should've figured out that RX data was available for one 1109 // of its connections, so it should now be reporting that it can fill in an RX packet. 1110 assert!(ctx.muxer.has_pending_rx()); 1111 ctx.recv(); 1112 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 1113 assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data); 1114 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1115 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1116 1117 assert!(!ctx.muxer.has_pending_rx()); 1118 } 1119 1120 #[test] 1121 fn test_local_connection() { 1122 let mut ctx = MuxerTestContext::new("local_connection"); 1123 let peer_port = 1025; 1124 let (mut stream, local_port) = ctx.local_connect(peer_port); 1125 1126 // Test guest -> host data flow. 1127 let data = [1, 2, 3, 4]; 1128 ctx.init_data_pkt(local_port, peer_port, &data); 1129 ctx.send(); 1130 1131 let mut buf = vec![0u8; data.len()]; 1132 stream.read_exact(buf.as_mut_slice()).unwrap(); 1133 assert_eq!(buf.as_slice(), &data); 1134 1135 // Test host -> guest data flow. 1136 let data = [5, 6, 7, 8]; 1137 stream.write_all(&data).unwrap(); 1138 ctx.notify_muxer(); 1139 1140 assert!(ctx.muxer.has_pending_rx()); 1141 ctx.recv(); 1142 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 1143 assert_eq!(ctx.pkt.src_port(), local_port); 1144 assert_eq!(ctx.pkt.dst_port(), peer_port); 1145 assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data); 1146 } 1147 1148 #[test] 1149 fn test_local_close() { 1150 let peer_port = 1025; 1151 let mut ctx = MuxerTestContext::new("local_close"); 1152 let local_port; 1153 { 1154 let (_stream, local_port_) = ctx.local_connect(peer_port); 1155 local_port = local_port_; 1156 } 1157 // Local var `_stream` was now dropped, thus closing the local stream. After the muxer gets 1158 // notified via EPOLLIN, it should attempt to gracefully shutdown the connection, issuing a 1159 // VSOCK_OP_SHUTDOWN with both no-more-send and no-more-recv indications set. 1160 ctx.notify_muxer(); 1161 assert!(ctx.muxer.has_pending_rx()); 1162 ctx.recv(); 1163 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN); 1164 assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND, 0); 1165 assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV, 0); 1166 assert_eq!(ctx.pkt.src_port(), local_port); 1167 assert_eq!(ctx.pkt.dst_port(), peer_port); 1168 1169 // The connection should get removed (and its local port freed), after the peer replies 1170 // with an RST. 1171 ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RST); 1172 ctx.send(); 1173 let key = ConnMapKey { 1174 local_port, 1175 peer_port, 1176 }; 1177 assert!(!ctx.muxer.conn_map.contains_key(&key)); 1178 assert!(!ctx.muxer.local_port_set.contains(&local_port)); 1179 } 1180 1181 #[test] 1182 fn test_peer_close() { 1183 let peer_port = 1025; 1184 let local_port = 1026; 1185 let mut ctx = MuxerTestContext::new("peer_close"); 1186 1187 let mut sock = ctx.create_local_listener(local_port); 1188 ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_REQUEST); 1189 ctx.send(); 1190 let mut stream = sock.accept(); 1191 1192 assert!(ctx.muxer.has_pending_rx()); 1193 ctx.recv(); 1194 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1195 assert_eq!(ctx.pkt.src_port(), local_port); 1196 assert_eq!(ctx.pkt.dst_port(), peer_port); 1197 let key = ConnMapKey { 1198 local_port, 1199 peer_port, 1200 }; 1201 assert!(ctx.muxer.conn_map.contains_key(&key)); 1202 1203 // Emulate a full shutdown from the peer (no-more-send + no-more-recv). 1204 ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_SHUTDOWN) 1205 .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND) 1206 .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV); 1207 ctx.send(); 1208 1209 // Now, the muxer should remove the connection from its map, and reply with an RST. 1210 assert!(ctx.muxer.has_pending_rx()); 1211 ctx.recv(); 1212 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1213 assert_eq!(ctx.pkt.src_port(), local_port); 1214 assert_eq!(ctx.pkt.dst_port(), peer_port); 1215 let key = ConnMapKey { 1216 local_port, 1217 peer_port, 1218 }; 1219 assert!(!ctx.muxer.conn_map.contains_key(&key)); 1220 1221 // The muxer should also drop / close the local Unix socket for this connection. 1222 let mut buf = vec![0u8; 16]; 1223 assert_eq!(stream.read(buf.as_mut_slice()).unwrap(), 0); 1224 } 1225 1226 #[test] 1227 fn test_muxer_rxq() { 1228 let mut ctx = MuxerTestContext::new("muxer_rxq"); 1229 let local_port = 1026; 1230 let peer_port_first = 1025; 1231 let mut listener = ctx.create_local_listener(local_port); 1232 let mut streams: Vec<UnixStream> = Vec::new(); 1233 1234 for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE { 1235 ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST); 1236 ctx.send(); 1237 streams.push(listener.accept()); 1238 } 1239 1240 // The muxer RX queue should now be full (with connection responses), but still 1241 // synchronized. 1242 assert!(ctx.muxer.rxq.is_synced()); 1243 1244 // One more queued reply should desync the RX queue. 1245 ctx.init_pkt( 1246 local_port, 1247 (peer_port_first + defs::MUXER_RXQ_SIZE) as u32, 1248 uapi::VSOCK_OP_REQUEST, 1249 ); 1250 ctx.send(); 1251 assert!(!ctx.muxer.rxq.is_synced()); 1252 1253 // With an out-of-sync queue, an RST should evict any non-RST packet from the queue, and 1254 // take its place. We'll check that by making sure that the last packet popped from the 1255 // queue is an RST. 1256 ctx.init_pkt( 1257 local_port + 1, 1258 peer_port_first as u32, 1259 uapi::VSOCK_OP_REQUEST, 1260 ); 1261 ctx.send(); 1262 1263 for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE - 1 { 1264 ctx.recv(); 1265 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1266 // The response order should hold. The evicted response should have been the last 1267 // enqueued. 1268 assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 1269 } 1270 // There should be one more packet in the queue: the RST. 1271 assert_eq!(ctx.muxer.rxq.len(), 1); 1272 ctx.recv(); 1273 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1274 1275 // The queue should now be empty, but out-of-sync, so the muxer should report it has some 1276 // pending RX. 1277 assert!(ctx.muxer.rxq.is_empty()); 1278 assert!(!ctx.muxer.rxq.is_synced()); 1279 assert!(ctx.muxer.has_pending_rx()); 1280 1281 // The next recv should sync the queue back up. It should also yield one of the two 1282 // responses that are still left: 1283 // - the one that desynchronized the queue; and 1284 // - the one that got evicted by the RST. 1285 ctx.recv(); 1286 assert!(ctx.muxer.rxq.is_synced()); 1287 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1288 1289 assert!(ctx.muxer.has_pending_rx()); 1290 ctx.recv(); 1291 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1292 } 1293 1294 #[test] 1295 fn test_muxer_killq() { 1296 let mut ctx = MuxerTestContext::new("muxer_killq"); 1297 let local_port = 1026; 1298 let peer_port_first = 1025; 1299 let peer_port_last = peer_port_first + defs::MUXER_KILLQ_SIZE; 1300 let mut listener = ctx.create_local_listener(local_port); 1301 1302 for peer_port in peer_port_first..=peer_port_last { 1303 ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST); 1304 ctx.send(); 1305 ctx.notify_muxer(); 1306 ctx.recv(); 1307 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1308 assert_eq!(ctx.pkt.src_port(), local_port); 1309 assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 1310 { 1311 let _stream = listener.accept(); 1312 } 1313 ctx.notify_muxer(); 1314 ctx.recv(); 1315 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN); 1316 assert_eq!(ctx.pkt.src_port(), local_port); 1317 assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 1318 // The kill queue should be synchronized, up until the `defs::MUXER_KILLQ_SIZE`th 1319 // connection we schedule for termination. 1320 assert_eq!( 1321 ctx.muxer.killq.is_synced(), 1322 peer_port < peer_port_first + defs::MUXER_KILLQ_SIZE 1323 ); 1324 } 1325 1326 assert!(!ctx.muxer.killq.is_synced()); 1327 assert!(!ctx.muxer.has_pending_rx()); 1328 1329 // Wait for the kill timers to expire. 1330 std::thread::sleep(std::time::Duration::from_millis( 1331 csm_defs::CONN_SHUTDOWN_TIMEOUT_MS, 1332 )); 1333 1334 // Trigger a kill queue sweep, by requesting a new connection. 1335 ctx.init_pkt( 1336 local_port, 1337 peer_port_last as u32 + 1, 1338 uapi::VSOCK_OP_REQUEST, 1339 ); 1340 ctx.send(); 1341 1342 // After sweeping the kill queue, it should now be synced (assuming the RX queue is larger 1343 // than the kill queue, since an RST packet will be queued for each killed connection). 1344 assert!(ctx.muxer.killq.is_synced()); 1345 assert!(ctx.muxer.has_pending_rx()); 1346 // There should be `defs::MUXER_KILLQ_SIZE` RSTs in the RX queue, from terminating the 1347 // dying connections in the recent killq sweep. 1348 for _p in peer_port_first..peer_port_last { 1349 ctx.recv(); 1350 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1351 assert_eq!(ctx.pkt.src_port(), local_port); 1352 } 1353 1354 // There should be one more packet in the RX queue: the connection response our request 1355 // that triggered the kill queue sweep. 1356 ctx.recv(); 1357 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1358 assert_eq!(ctx.pkt.dst_port(), peer_port_last as u32 + 1); 1359 1360 assert!(!ctx.muxer.has_pending_rx()); 1361 } 1362 1363 #[test] 1364 fn test_regression_handshake() { 1365 // Address one of the issues found while fixing the following issue: 1366 // https://github.com/firecracker-microvm/firecracker/issues/1751 1367 // This test checks that the handshake message is not accounted for 1368 let mut ctx = MuxerTestContext::new("regression_handshake"); 1369 let peer_port = 1025; 1370 1371 // Create a local connection. 1372 let (_, local_port) = ctx.local_connect(peer_port); 1373 1374 // Get the connection from the connection map. 1375 let key = ConnMapKey { 1376 local_port, 1377 peer_port, 1378 }; 1379 let conn = ctx.muxer.conn_map.get_mut(&key).unwrap(); 1380 1381 // Check that fwd_cnt is 0 - "OK ..." was not accounted for. 1382 assert_eq!(conn.fwd_cnt().0, 0); 1383 } 1384 1385 #[test] 1386 fn test_regression_rxq_pop() { 1387 // Address one of the issues found while fixing the following issue: 1388 // https://github.com/firecracker-microvm/firecracker/issues/1751 1389 // This test checks that a connection is not popped out of the muxer 1390 // rxq when multiple flags are set 1391 let mut ctx = MuxerTestContext::new("regression_rxq_pop"); 1392 let peer_port = 1025; 1393 let (mut stream, local_port) = ctx.local_connect(peer_port); 1394 1395 // Send some data. 1396 let data = [5u8, 6, 7, 8]; 1397 stream.write_all(&data).unwrap(); 1398 ctx.notify_muxer(); 1399 1400 // Get the connection from the connection map. 1401 let key = ConnMapKey { 1402 local_port, 1403 peer_port, 1404 }; 1405 let conn = ctx.muxer.conn_map.get_mut(&key).unwrap(); 1406 1407 // Forcefully insert another flag. 1408 conn.insert_credit_update(); 1409 1410 // Call recv twice in order to check that the connection is still 1411 // in the rxq. 1412 assert!(ctx.muxer.has_pending_rx()); 1413 ctx.recv(); 1414 assert!(ctx.muxer.has_pending_rx()); 1415 ctx.recv(); 1416 1417 // Since initially the connection had two flags set, now there should 1418 // not be any pending RX in the muxer. 1419 assert!(!ctx.muxer.has_pending_rx()); 1420 } 1421 } 1422