1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 // 4 5 //! `VsockMuxer` is the device-facing component of the Unix domain sockets vsock backend. I.e. 6 //! by implementing the `VsockBackend` trait, it abstracts away the gory details of translating 7 //! between AF_VSOCK and AF_UNIX, and presents a clean interface to the rest of the vsock 8 //! device model. 9 //! 10 //! The vsock muxer has two main roles: 11 //! 12 //! ## Vsock connection multiplexer 13 //! 14 //! It's the muxer's job to create, manage, and terminate `VsockConnection` objects. The 15 //! muxer also routes packets to their owning connections. It does so via a connection 16 //! `HashMap`, keyed by what is basically a (host_port, guest_port) tuple. 17 //! 18 //! Vsock packet traffic needs to be inspected, in order to detect connection request 19 //! packets (leading to the creation of a new connection), and connection reset packets 20 //! (leading to the termination of an existing connection). All other packets, though, must 21 //! belong to an existing connection and, as such, the muxer simply forwards them. 22 //! 23 //! ## Event dispatcher 24 //! 25 //! There are three event categories that the vsock backend is interested it: 26 //! 1. A new host-initiated connection is ready to be accepted from the listening host Unix 27 //! socket; 28 //! 2. Data is available for reading from a newly-accepted host-initiated connection (i.e. 29 //! the host is ready to issue a vsock connection request, informing us of the 30 //! destination port to which it wants to connect); 31 //! 3. Some event was triggered for a connected Unix socket, that belongs to a 32 //! `VsockConnection`. 33 //! 34 //! The muxer gets notified about all of these events, because, as a `VsockEpollListener` 35 //! implementor, it gets to register a nested epoll FD into the main VMM epolling loop. All 36 //! other pollable FDs are then registered under this nested epoll FD. 37 //! 38 //! To route all these events to their handlers, the muxer uses another `HashMap` object, 39 //! mapping `RawFd`s to `EpollListener`s. 40 41 use std::collections::{HashMap, HashSet}; 42 use std::fs::File; 43 use std::io::{self, Read}; 44 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; 45 use std::os::unix::net::{UnixListener, UnixStream}; 46 47 use super::super::csm::ConnState; 48 use super::super::defs::uapi; 49 use super::super::packet::VsockPacket; 50 use super::super::{ 51 Result as VsockResult, VsockBackend, VsockChannel, VsockEpollListener, VsockError, 52 }; 53 use super::defs; 54 use super::muxer_killq::MuxerKillQ; 55 use super::muxer_rxq::MuxerRxQ; 56 use super::MuxerConnection; 57 use super::{Error, Result}; 58 59 /// A unique identifier of a `MuxerConnection` object. Connections are stored in a hash map, 60 /// keyed by a `ConnMapKey` object. 61 /// 62 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] 63 pub struct ConnMapKey { 64 local_port: u32, 65 peer_port: u32, 66 } 67 68 /// A muxer RX queue item. 69 /// 70 #[derive(Clone, Copy, Debug)] 71 pub enum MuxerRx { 72 /// The packet must be fetched from the connection identified by `ConnMapKey`. 73 ConnRx(ConnMapKey), 74 /// The muxer must produce an RST packet. 75 RstPkt { local_port: u32, peer_port: u32 }, 76 } 77 78 /// An epoll listener, registered under the muxer's nested epoll FD. 79 /// 80 enum EpollListener { 81 /// The listener is a `MuxerConnection`, identified by `key`, and interested in the events 82 /// in `evset`. Since `MuxerConnection` implements `VsockEpollListener`, notifications will 83 /// be forwarded to the listener via `VsockEpollListener::notify()`. 84 Connection { 85 key: ConnMapKey, 86 evset: epoll::Events, 87 }, 88 /// A listener interested in new host-initiated connections. 89 HostSock, 90 /// A listener interested in reading host "connect \<port>" commands from a freshly 91 /// connected host socket. 92 LocalStream(UnixStream), 93 } 94 95 /// The vsock connection multiplexer. 96 /// 97 pub struct VsockMuxer { 98 /// Guest CID. 99 cid: u64, 100 /// A hash map used to store the active connections. 101 conn_map: HashMap<ConnMapKey, MuxerConnection>, 102 /// A hash map used to store epoll event listeners / handlers. 103 listener_map: HashMap<RawFd, EpollListener>, 104 /// The RX queue. Items in this queue are consumed by `VsockMuxer::recv_pkt()`, and 105 /// produced 106 /// - by `VsockMuxer::send_pkt()` (e.g. RST in response to a connection request packet); 107 /// and 108 /// - in response to EPOLLIN events (e.g. data available to be read from an AF_UNIX 109 /// socket). 110 rxq: MuxerRxQ, 111 /// A queue used for terminating connections that are taking too long to shut down. 112 killq: MuxerKillQ, 113 /// The Unix socket, through which host-initiated connections are accepted. 114 host_sock: UnixListener, 115 /// The file system path of the host-side Unix socket. This is used to figure out the path 116 /// to Unix sockets listening on specific ports. I.e. "\<this path>_\<port number>". 117 host_sock_path: String, 118 /// The nested epoll File, used to register epoll listeners. 119 epoll_file: File, 120 /// A hash set used to keep track of used host-side (local) ports, in order to assign local 121 /// ports to host-initiated connections. 122 local_port_set: HashSet<u32>, 123 /// The last used host-side port. 124 local_port_last: u32, 125 } 126 127 impl VsockChannel for VsockMuxer { 128 /// Deliver a vsock packet to the guest vsock driver. 129 /// 130 /// Returns: 131 /// - `Ok(())`: `pkt` has been successfully filled in; or 132 /// - `Err(VsockError::NoData)`: there was no available data with which to fill in the 133 /// packet. 134 /// 135 fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> { 136 // We'll look for instructions on how to build the RX packet in the RX queue. If the 137 // queue is empty, that doesn't necessarily mean we don't have any pending RX, since 138 // the queue might be out-of-sync. If that's the case, we'll attempt to sync it first, 139 // and then try to pop something out again. 140 if self.rxq.is_empty() && !self.rxq.is_synced() { 141 self.rxq = MuxerRxQ::from_conn_map(&self.conn_map); 142 } 143 144 while let Some(rx) = self.rxq.peek() { 145 let res = match rx { 146 // We need to build an RST packet, going from `local_port` to `peer_port`. 147 MuxerRx::RstPkt { 148 local_port, 149 peer_port, 150 } => { 151 pkt.set_op(uapi::VSOCK_OP_RST) 152 .set_src_cid(uapi::VSOCK_HOST_CID) 153 .set_dst_cid(self.cid) 154 .set_src_port(local_port) 155 .set_dst_port(peer_port) 156 .set_len(0) 157 .set_type(uapi::VSOCK_TYPE_STREAM) 158 .set_flags(0) 159 .set_buf_alloc(0) 160 .set_fwd_cnt(0); 161 self.rxq.pop().unwrap(); 162 return Ok(()); 163 } 164 165 // We'll defer building the packet to this connection, since it has something 166 // to say. 167 MuxerRx::ConnRx(key) => { 168 let mut conn_res = Err(VsockError::NoData); 169 let mut do_pop = true; 170 self.apply_conn_mutation(key, |conn| { 171 conn_res = conn.recv_pkt(pkt); 172 do_pop = !conn.has_pending_rx(); 173 }); 174 if do_pop { 175 self.rxq.pop().unwrap(); 176 } 177 conn_res 178 } 179 }; 180 181 if res.is_ok() { 182 // Inspect traffic, looking for RST packets, since that means we have to 183 // terminate and remove this connection from the active connection pool. 184 // 185 if pkt.op() == uapi::VSOCK_OP_RST { 186 self.remove_connection(ConnMapKey { 187 local_port: pkt.src_port(), 188 peer_port: pkt.dst_port(), 189 }); 190 } 191 192 debug!("vsock muxer: RX pkt: {:?}", pkt.hdr()); 193 return Ok(()); 194 } 195 } 196 197 Err(VsockError::NoData) 198 } 199 200 /// Deliver a guest-generated packet to its destination in the vsock backend. 201 /// 202 /// This absorbs unexpected packets, handles RSTs (by dropping connections), and forwards 203 /// all the rest to their owning `MuxerConnection`. 204 /// 205 /// Returns: 206 /// always `Ok(())` - the packet has been consumed, and its virtio TX buffers can be 207 /// returned to the guest vsock driver. 208 /// 209 fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> { 210 let conn_key = ConnMapKey { 211 local_port: pkt.dst_port(), 212 peer_port: pkt.src_port(), 213 }; 214 215 debug!( 216 "vsock: muxer.send[rxq.len={}]: {:?}", 217 self.rxq.len(), 218 pkt.hdr() 219 ); 220 221 // If this packet has an unsupported type (!=stream), we must send back an RST. 222 // 223 if pkt.type_() != uapi::VSOCK_TYPE_STREAM { 224 self.enq_rst(pkt.dst_port(), pkt.src_port()); 225 return Ok(()); 226 } 227 228 // We don't know how to handle packets addressed to other CIDs. We only handle the host 229 // part of the guest - host communication here. 230 if pkt.dst_cid() != uapi::VSOCK_HOST_CID { 231 info!( 232 "vsock: dropping guest packet for unknown CID: {:?}", 233 pkt.hdr() 234 ); 235 return Ok(()); 236 } 237 238 if !self.conn_map.contains_key(&conn_key) { 239 // This packet can't be routed to any active connection (based on its src and dst 240 // ports). The only orphan / unroutable packets we know how to handle are 241 // connection requests. 242 if pkt.op() == uapi::VSOCK_OP_REQUEST { 243 // Oh, this is a connection request! 244 self.handle_peer_request_pkt(pkt); 245 } else { 246 // Send back an RST, to let the drive know we weren't expecting this packet. 247 self.enq_rst(pkt.dst_port(), pkt.src_port()); 248 } 249 return Ok(()); 250 } 251 252 // Right, we know where to send this packet, then (to `conn_key`). 253 // However, if this is an RST, we have to forcefully terminate the connection, so 254 // there's no point in forwarding it the packet. 255 if pkt.op() == uapi::VSOCK_OP_RST { 256 self.remove_connection(conn_key); 257 return Ok(()); 258 } 259 260 // Alright, everything looks in order - forward this packet to its owning connection. 261 let mut res: VsockResult<()> = Ok(()); 262 self.apply_conn_mutation(conn_key, |conn| { 263 res = conn.send_pkt(pkt); 264 }); 265 266 res 267 } 268 269 /// Check if the muxer has any pending RX data, with which to fill a guest-provided RX 270 /// buffer. 271 /// 272 fn has_pending_rx(&self) -> bool { 273 !self.rxq.is_empty() || !self.rxq.is_synced() 274 } 275 } 276 277 impl VsockEpollListener for VsockMuxer { 278 /// Get the FD to be registered for polling upstream (in the main VMM epoll loop, in this 279 /// case). 280 /// 281 /// This will be the muxer's nested epoll FD. 282 /// 283 fn get_polled_fd(&self) -> RawFd { 284 self.epoll_file.as_raw_fd() 285 } 286 287 /// Get the epoll events to be polled upstream. 288 /// 289 /// Since the polled FD is a nested epoll FD, we're only interested in EPOLLIN events (i.e. 290 /// some event occurred on one of the FDs registered under our epoll FD). 291 /// 292 fn get_polled_evset(&self) -> epoll::Events { 293 epoll::Events::EPOLLIN 294 } 295 296 /// Notify the muxer about a pending event having occurred under its nested epoll FD. 297 /// 298 fn notify(&mut self, _: epoll::Events) { 299 debug!("vsock: muxer received kick"); 300 301 let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); 32]; 302 'epoll: loop { 303 match epoll::wait(self.epoll_file.as_raw_fd(), 0, epoll_events.as_mut_slice()) { 304 Ok(ev_cnt) => { 305 for evt in epoll_events.iter().take(ev_cnt) { 306 self.handle_event( 307 evt.data as RawFd, 308 // It's ok to unwrap here, since the `evt.events` is filled 309 // in by `epoll::wait()`, and therefore contains only valid epoll 310 // flags. 311 epoll::Events::from_bits(evt.events).unwrap(), 312 ); 313 } 314 } 315 Err(e) => { 316 if e.kind() == io::ErrorKind::Interrupted { 317 // It's well defined from the epoll_wait() syscall 318 // documentation that the epoll loop can be interrupted 319 // before any of the requested events occurred or the 320 // timeout expired. In both those cases, epoll_wait() 321 // returns an error of type EINTR, but this should not 322 // be considered as a regular error. Instead it is more 323 // appropriate to retry, by calling into epoll_wait(). 324 continue; 325 } 326 warn!("vsock: failed to consume muxer epoll event: {}", e); 327 } 328 } 329 break 'epoll; 330 } 331 } 332 } 333 334 impl VsockBackend for VsockMuxer {} 335 336 impl VsockMuxer { 337 /// Muxer constructor. 338 /// 339 pub fn new(cid: u64, host_sock_path: String) -> Result<Self> { 340 // Create the nested epoll FD. This FD will be added to the VMM `EpollContext`, at 341 // device activation time. 342 let epoll_fd = epoll::create(true).map_err(Error::EpollFdCreate)?; 343 // Use 'File' to enforce closing on 'epoll_fd' 344 // SAFETY: epoll_fd is a valid fd 345 let epoll_file = unsafe { File::from_raw_fd(epoll_fd) }; 346 347 // Open/bind/listen on the host Unix socket, so we can accept host-initiated 348 // connections. 349 let host_sock = UnixListener::bind(&host_sock_path) 350 .and_then(|sock| sock.set_nonblocking(true).map(|_| sock)) 351 .map_err(Error::UnixBind)?; 352 353 let mut muxer = Self { 354 cid, 355 host_sock, 356 host_sock_path, 357 epoll_file, 358 rxq: MuxerRxQ::new(), 359 conn_map: HashMap::with_capacity(defs::MAX_CONNECTIONS), 360 listener_map: HashMap::with_capacity(defs::MAX_CONNECTIONS + 1), 361 killq: MuxerKillQ::new(), 362 local_port_last: (1u32 << 30) - 1, 363 local_port_set: HashSet::with_capacity(defs::MAX_CONNECTIONS), 364 }; 365 366 muxer.add_listener(muxer.host_sock.as_raw_fd(), EpollListener::HostSock)?; 367 Ok(muxer) 368 } 369 370 /// Handle/dispatch an epoll event to its listener. 371 /// 372 fn handle_event(&mut self, fd: RawFd, event_set: epoll::Events) { 373 debug!( 374 "vsock: muxer processing event: fd={}, event_set={:?}", 375 fd, event_set 376 ); 377 378 match self.listener_map.get_mut(&fd) { 379 // This event needs to be forwarded to a `MuxerConnection` that is listening for 380 // it. 381 // 382 Some(EpollListener::Connection { key, evset: _ }) => { 383 let key_copy = *key; 384 // The handling of this event will most probably mutate the state of the 385 // receiving connection. We'll need to check for new pending RX, event set 386 // mutation, and all that, so we're wrapping the event delivery inside those 387 // checks. 388 self.apply_conn_mutation(key_copy, |conn| { 389 conn.notify(event_set); 390 }); 391 } 392 393 // A new host-initiated connection is ready to be accepted. 394 // 395 Some(EpollListener::HostSock) => { 396 if self.conn_map.len() == defs::MAX_CONNECTIONS { 397 // If we're already maxed-out on connections, we'll just accept and 398 // immediately discard this potentially new one. 399 warn!("vsock: connection limit reached; refusing new host connection"); 400 self.host_sock.accept().map(|_| 0).unwrap_or(0); 401 return; 402 } 403 self.host_sock 404 .accept() 405 .map_err(Error::UnixAccept) 406 .and_then(|(stream, _)| { 407 stream 408 .set_nonblocking(true) 409 .map(|_| stream) 410 .map_err(Error::UnixAccept) 411 }) 412 .and_then(|stream| { 413 // Before forwarding this connection to a listening AF_VSOCK socket on 414 // the guest side, we need to know the destination port. We'll read 415 // that port from a "connect" command received on this socket, so the 416 // next step is to ask to be notified the moment we can read from it. 417 self.add_listener(stream.as_raw_fd(), EpollListener::LocalStream(stream)) 418 }) 419 .unwrap_or_else(|err| { 420 warn!("vsock: unable to accept local connection: {:?}", err); 421 }); 422 } 423 424 // Data is ready to be read from a host-initiated connection. That would be the 425 // "connect" command that we're expecting. 426 Some(EpollListener::LocalStream(_)) => { 427 if let Some(EpollListener::LocalStream(mut stream)) = self.remove_listener(fd) { 428 Self::read_local_stream_port(&mut stream) 429 .map(|peer_port| (self.allocate_local_port(), peer_port)) 430 .and_then(|(local_port, peer_port)| { 431 self.add_connection( 432 ConnMapKey { 433 local_port, 434 peer_port, 435 }, 436 MuxerConnection::new_local_init( 437 stream, 438 uapi::VSOCK_HOST_CID, 439 self.cid, 440 local_port, 441 peer_port, 442 ), 443 ) 444 }) 445 .unwrap_or_else(|err| { 446 info!("vsock: error adding local-init connection: {:?}", err); 447 }) 448 } 449 } 450 451 _ => { 452 info!( 453 "vsock: unexpected event: fd={:?}, event_set={:?}", 454 fd, event_set 455 ); 456 } 457 } 458 } 459 460 /// Parse a host "connect" command, and extract the destination vsock port. 461 /// 462 fn read_local_stream_port(stream: &mut UnixStream) -> Result<u32> { 463 let mut buf = [0u8; 32]; 464 465 // This is the minimum number of bytes that we should be able to read, when parsing a 466 // valid connection request. I.e. `b"connect 0\n".len()`. 467 const MIN_READ_LEN: usize = 10; 468 469 // Bring in the minimum number of bytes that we should be able to read. 470 stream 471 .read_exact(&mut buf[..MIN_READ_LEN]) 472 .map_err(Error::UnixRead)?; 473 474 // Now, finish reading the destination port number, by bringing in one byte at a time, 475 // until we reach an EOL terminator (or our buffer space runs out). Yeah, not 476 // particularly proud of this approach, but it will have to do for now. 477 let mut blen = MIN_READ_LEN; 478 while buf[blen - 1] != b'\n' && blen < buf.len() { 479 stream 480 .read_exact(&mut buf[blen..=blen]) 481 .map_err(Error::UnixRead)?; 482 blen += 1; 483 } 484 485 let mut word_iter = std::str::from_utf8(&buf[..blen]) 486 .map_err(Error::ConvertFromUtf8)? 487 .split_whitespace(); 488 489 word_iter 490 .next() 491 .ok_or(Error::InvalidPortRequest) 492 .and_then(|word| { 493 if word.to_lowercase() == "connect" { 494 Ok(()) 495 } else { 496 Err(Error::InvalidPortRequest) 497 } 498 }) 499 .and_then(|_| word_iter.next().ok_or(Error::InvalidPortRequest)) 500 .and_then(|word| word.parse::<u32>().map_err(Error::ParseInteger)) 501 .map_err(|e| Error::ReadStreamPort(Box::new(e))) 502 } 503 504 /// Add a new connection to the active connection pool. 505 /// 506 fn add_connection(&mut self, key: ConnMapKey, conn: MuxerConnection) -> Result<()> { 507 // We might need to make room for this new connection, so let's sweep the kill queue 508 // first. It's fine to do this here because: 509 // - unless the kill queue is out of sync, this is a pretty inexpensive operation; and 510 // - we are under no pressure to respect any accurate timing for connection 511 // termination. 512 self.sweep_killq(); 513 514 if self.conn_map.len() >= defs::MAX_CONNECTIONS { 515 info!( 516 "vsock: muxer connection limit reached ({})", 517 defs::MAX_CONNECTIONS 518 ); 519 return Err(Error::TooManyConnections); 520 } 521 522 self.add_listener( 523 conn.get_polled_fd(), 524 EpollListener::Connection { 525 key, 526 evset: conn.get_polled_evset(), 527 }, 528 ) 529 .map(|_| { 530 if conn.has_pending_rx() { 531 // We can safely ignore any error in adding a connection RX indication. Worst 532 // case scenario, the RX queue will get desynchronized, but we'll handle that 533 // the next time we need to yield an RX packet. 534 self.rxq.push(MuxerRx::ConnRx(key)); 535 } 536 self.conn_map.insert(key, conn); 537 }) 538 } 539 540 /// Remove a connection from the active connection poll. 541 /// 542 fn remove_connection(&mut self, key: ConnMapKey) { 543 if let Some(conn) = self.conn_map.remove(&key) { 544 self.remove_listener(conn.get_polled_fd()); 545 } 546 self.free_local_port(key.local_port); 547 } 548 549 /// Schedule a connection for immediate termination. 550 /// I.e. as soon as we can also let our peer know we're dropping the connection, by sending 551 /// it an RST packet. 552 /// 553 fn kill_connection(&mut self, key: ConnMapKey) { 554 let mut had_rx = false; 555 self.conn_map.entry(key).and_modify(|conn| { 556 had_rx = conn.has_pending_rx(); 557 conn.kill(); 558 }); 559 // This connection will now have an RST packet to yield, so we need to add it to the RX 560 // queue. However, there's no point in doing that if it was already in the queue. 561 if !had_rx { 562 // We can safely ignore any error in adding a connection RX indication. Worst case 563 // scenario, the RX queue will get desynchronized, but we'll handle that the next 564 // time we need to yield an RX packet. 565 self.rxq.push(MuxerRx::ConnRx(key)); 566 } 567 } 568 569 /// Register a new epoll listener under the muxer's nested epoll FD. 570 /// 571 fn add_listener(&mut self, fd: RawFd, listener: EpollListener) -> Result<()> { 572 let evset = match listener { 573 EpollListener::Connection { evset, .. } => evset, 574 EpollListener::LocalStream(_) => epoll::Events::EPOLLIN, 575 EpollListener::HostSock => epoll::Events::EPOLLIN, 576 }; 577 578 epoll::ctl( 579 self.epoll_file.as_raw_fd(), 580 epoll::ControlOptions::EPOLL_CTL_ADD, 581 fd, 582 epoll::Event::new(evset, fd as u64), 583 ) 584 .map(|_| { 585 self.listener_map.insert(fd, listener); 586 }) 587 .map_err(Error::EpollAdd)?; 588 589 Ok(()) 590 } 591 592 /// Remove (and return) a previously registered epoll listener. 593 /// 594 fn remove_listener(&mut self, fd: RawFd) -> Option<EpollListener> { 595 let maybe_listener = self.listener_map.remove(&fd); 596 597 if maybe_listener.is_some() { 598 epoll::ctl( 599 self.epoll_file.as_raw_fd(), 600 epoll::ControlOptions::EPOLL_CTL_DEL, 601 fd, 602 epoll::Event::new(epoll::Events::empty(), 0), 603 ) 604 .unwrap_or_else(|err| { 605 warn!( 606 "vosck muxer: error removing epoll listener for fd {:?}: {:?}", 607 fd, err 608 ); 609 }); 610 } 611 612 maybe_listener 613 } 614 615 /// Allocate a host-side port to be assigned to a new host-initiated connection. 616 /// 617 /// 618 fn allocate_local_port(&mut self) -> u32 { 619 // TODO: this doesn't seem very space-efficient. 620 // Maybe rewrite this to limit port range and use a bitmap? 621 // 622 623 loop { 624 self.local_port_last = (self.local_port_last + 1) & !(1 << 31) | (1 << 30); 625 if self.local_port_set.insert(self.local_port_last) { 626 break; 627 } 628 } 629 self.local_port_last 630 } 631 632 /// Mark a previously used host-side port as free. 633 /// 634 fn free_local_port(&mut self, port: u32) { 635 self.local_port_set.remove(&port); 636 } 637 638 /// Handle a new connection request coming from our peer (the guest vsock driver). 639 /// 640 /// This will attempt to connect to a host-side Unix socket, expected to be listening at 641 /// the file system path corresponding to the destination port. If successful, a new 642 /// connection object will be created and added to the connection pool. On failure, a new 643 /// RST packet will be scheduled for delivery to the guest. 644 /// 645 fn handle_peer_request_pkt(&mut self, pkt: &VsockPacket) { 646 let port_path = format!("{}_{}", self.host_sock_path, pkt.dst_port()); 647 648 UnixStream::connect(port_path) 649 .and_then(|stream| stream.set_nonblocking(true).map(|_| stream)) 650 .map_err(Error::UnixConnect) 651 .and_then(|stream| { 652 self.add_connection( 653 ConnMapKey { 654 local_port: pkt.dst_port(), 655 peer_port: pkt.src_port(), 656 }, 657 MuxerConnection::new_peer_init( 658 stream, 659 uapi::VSOCK_HOST_CID, 660 self.cid, 661 pkt.dst_port(), 662 pkt.src_port(), 663 pkt.buf_alloc(), 664 ), 665 ) 666 }) 667 .unwrap_or_else(|_| self.enq_rst(pkt.dst_port(), pkt.src_port())); 668 } 669 670 /// Perform an action that might mutate a connection's state. 671 /// 672 /// This is used as shorthand for repetitive tasks that need to be performed after a 673 /// connection object mutates. E.g. 674 /// - update the connection's epoll listener; 675 /// - schedule the connection to be queried for RX data; 676 /// - kill the connection if an unrecoverable error occurs. 677 /// 678 fn apply_conn_mutation<F>(&mut self, key: ConnMapKey, mut_fn: F) 679 where 680 F: FnOnce(&mut MuxerConnection), 681 { 682 if let Some(conn) = self.conn_map.get_mut(&key) { 683 let had_rx = conn.has_pending_rx(); 684 let was_expiring = conn.will_expire(); 685 let prev_state = conn.state(); 686 687 mut_fn(conn); 688 689 // If this is a host-initiated connection that has just become established, we'll have 690 // to send an ack message to the host end. 691 if prev_state == ConnState::LocalInit && conn.state() == ConnState::Established { 692 let msg = format!("OK {}\n", key.local_port); 693 match conn.send_bytes_raw(msg.as_bytes()) { 694 Ok(written) if written == msg.len() => (), 695 Ok(_) => { 696 // If we can't write a dozen bytes to a pristine connection something 697 // must be really wrong. Killing it. 698 conn.kill(); 699 warn!("vsock: unable to fully write connection ack msg."); 700 } 701 Err(err) => { 702 conn.kill(); 703 warn!("vsock: unable to ack host connection: {:?}", err); 704 } 705 }; 706 } 707 708 // If the connection wasn't previously scheduled for RX, add it to our RX queue. 709 if !had_rx && conn.has_pending_rx() { 710 self.rxq.push(MuxerRx::ConnRx(key)); 711 } 712 713 // If the connection wasn't previously scheduled for termination, add it to the 714 // kill queue. 715 if !was_expiring && conn.will_expire() { 716 // It's safe to unwrap here, since `conn.will_expire()` already guaranteed that 717 // an `conn.expiry` is available. 718 self.killq.push(key, conn.expiry().unwrap()); 719 } 720 721 let fd = conn.get_polled_fd(); 722 let new_evset = conn.get_polled_evset(); 723 if new_evset.is_empty() { 724 // If the connection no longer needs epoll notifications, remove its listener 725 // from our list. 726 self.remove_listener(fd); 727 return; 728 } 729 if let Some(EpollListener::Connection { evset, .. }) = self.listener_map.get_mut(&fd) { 730 if *evset != new_evset { 731 // If the set of events that the connection is interested in has changed, 732 // we need to update its epoll listener. 733 debug!( 734 "vsock: updating listener for (lp={}, pp={}): old={:?}, new={:?}", 735 key.local_port, key.peer_port, *evset, new_evset 736 ); 737 738 *evset = new_evset; 739 epoll::ctl( 740 self.epoll_file.as_raw_fd(), 741 epoll::ControlOptions::EPOLL_CTL_MOD, 742 fd, 743 epoll::Event::new(new_evset, fd as u64), 744 ) 745 .unwrap_or_else(|err| { 746 // This really shouldn't happen, like, ever. However, "famous last 747 // words" and all that, so let's just kill it with fire, and walk away. 748 self.kill_connection(key); 749 error!( 750 "vsock: error updating epoll listener for (lp={}, pp={}): {:?}", 751 key.local_port, key.peer_port, err 752 ); 753 }); 754 } 755 } else { 756 // The connection had previously asked to be removed from the listener map (by 757 // returning an empty event set via `get_polled_fd()`), but now wants back in. 758 self.add_listener( 759 fd, 760 EpollListener::Connection { 761 key, 762 evset: new_evset, 763 }, 764 ) 765 .unwrap_or_else(|err| { 766 self.kill_connection(key); 767 error!( 768 "vsock: error updating epoll listener for (lp={}, pp={}): {:?}", 769 key.local_port, key.peer_port, err 770 ); 771 }); 772 } 773 } 774 } 775 776 /// Check if any connections have timed out, and if so, schedule them for immediate 777 /// termination. 778 /// 779 fn sweep_killq(&mut self) { 780 while let Some(key) = self.killq.pop() { 781 // Connections don't get removed from the kill queue when their kill timer is 782 // disarmed, since that would be a costly operation. This means we must check if 783 // the connection has indeed expired, prior to killing it. 784 let mut kill = false; 785 self.conn_map 786 .entry(key) 787 .and_modify(|conn| kill = conn.has_expired()); 788 if kill { 789 self.kill_connection(key); 790 } 791 } 792 793 if self.killq.is_empty() && !self.killq.is_synced() { 794 self.killq = MuxerKillQ::from_conn_map(&self.conn_map); 795 // If we've just re-created the kill queue, we can sweep it again; maybe there's 796 // more to kill. 797 self.sweep_killq(); 798 } 799 } 800 801 /// Enqueue an RST packet into `self.rxq`. 802 /// 803 /// Enqueue errors aren't propagated up the call chain, since there is nothing we can do to 804 /// handle them. We do, however, log a warning, since not being able to enqueue an RST 805 /// packet means we have to drop it, which is not normal operation. 806 /// 807 fn enq_rst(&mut self, local_port: u32, peer_port: u32) { 808 let pushed = self.rxq.push(MuxerRx::RstPkt { 809 local_port, 810 peer_port, 811 }); 812 if !pushed { 813 warn!( 814 "vsock: muxer.rxq full; dropping RST packet for lp={}, pp={}", 815 local_port, peer_port 816 ); 817 } 818 } 819 } 820 821 #[cfg(test)] 822 mod tests { 823 use std::io::{Read, Write}; 824 use std::ops::Drop; 825 use std::os::unix::net::{UnixListener, UnixStream}; 826 use std::path::{Path, PathBuf}; 827 828 use virtio_queue::QueueOwnedT; 829 830 use super::super::super::csm::defs as csm_defs; 831 use super::super::super::tests::TestContext as VsockTestContext; 832 use super::*; 833 834 const PEER_CID: u64 = 3; 835 const PEER_BUF_ALLOC: u32 = 64 * 1024; 836 837 struct MuxerTestContext { 838 _vsock_test_ctx: VsockTestContext, 839 pkt: VsockPacket, 840 muxer: VsockMuxer, 841 } 842 843 impl Drop for MuxerTestContext { 844 fn drop(&mut self) { 845 std::fs::remove_file(self.muxer.host_sock_path.as_str()).unwrap(); 846 } 847 } 848 849 impl MuxerTestContext { 850 fn new(name: &str) -> Self { 851 let vsock_test_ctx = VsockTestContext::new(); 852 let mut handler_ctx = vsock_test_ctx.create_epoll_handler_context(); 853 let pkt = VsockPacket::from_rx_virtq_head( 854 &mut handler_ctx.handler.queues[0] 855 .iter(&vsock_test_ctx.mem) 856 .unwrap() 857 .next() 858 .unwrap(), 859 None, 860 ) 861 .unwrap(); 862 let uds_path = format!("test_vsock_{name}.sock"); 863 let muxer = VsockMuxer::new(PEER_CID, uds_path).unwrap(); 864 865 Self { 866 _vsock_test_ctx: vsock_test_ctx, 867 pkt, 868 muxer, 869 } 870 } 871 872 fn init_pkt(&mut self, local_port: u32, peer_port: u32, op: u16) -> &mut VsockPacket { 873 for b in self.pkt.hdr_mut() { 874 *b = 0; 875 } 876 self.pkt 877 .set_type(uapi::VSOCK_TYPE_STREAM) 878 .set_src_cid(PEER_CID) 879 .set_dst_cid(uapi::VSOCK_HOST_CID) 880 .set_src_port(peer_port) 881 .set_dst_port(local_port) 882 .set_op(op) 883 .set_buf_alloc(PEER_BUF_ALLOC) 884 } 885 886 fn init_data_pkt( 887 &mut self, 888 local_port: u32, 889 peer_port: u32, 890 data: &[u8], 891 ) -> &mut VsockPacket { 892 assert!(data.len() <= self.pkt.buf().unwrap().len()); 893 self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RW) 894 .set_len(data.len() as u32); 895 self.pkt.buf_mut().unwrap()[..data.len()].copy_from_slice(data); 896 &mut self.pkt 897 } 898 899 fn send(&mut self) { 900 self.muxer.send_pkt(&self.pkt).unwrap(); 901 } 902 903 fn recv(&mut self) { 904 self.muxer.recv_pkt(&mut self.pkt).unwrap(); 905 } 906 907 fn notify_muxer(&mut self) { 908 self.muxer.notify(epoll::Events::EPOLLIN); 909 } 910 911 fn count_epoll_listeners(&self) -> (usize, usize) { 912 let mut local_lsn_count = 0usize; 913 let mut conn_lsn_count = 0usize; 914 for key in self.muxer.listener_map.values() { 915 match key { 916 EpollListener::LocalStream(_) => local_lsn_count += 1, 917 EpollListener::Connection { .. } => conn_lsn_count += 1, 918 _ => (), 919 }; 920 } 921 (local_lsn_count, conn_lsn_count) 922 } 923 924 fn create_local_listener(&self, port: u32) -> LocalListener { 925 LocalListener::new(format!("{}_{}", self.muxer.host_sock_path, port)) 926 } 927 928 fn local_connect(&mut self, peer_port: u32) -> (UnixStream, u32) { 929 let (init_local_lsn_count, init_conn_lsn_count) = self.count_epoll_listeners(); 930 931 let mut stream = UnixStream::connect(self.muxer.host_sock_path.clone()).unwrap(); 932 stream.set_nonblocking(true).unwrap(); 933 // The muxer would now get notified of a new connection having arrived at its Unix 934 // socket, so it can accept it. 935 self.notify_muxer(); 936 937 // Just after having accepted a new local connection, the muxer should've added a new 938 // `LocalStream` listener to its `listener_map`. 939 let (local_lsn_count, _) = self.count_epoll_listeners(); 940 assert_eq!(local_lsn_count, init_local_lsn_count + 1); 941 942 let buf = format!("CONNECT {peer_port}\n"); 943 stream.write_all(buf.as_bytes()).unwrap(); 944 // The muxer would now get notified that data is available for reading from the locally 945 // initiated connection. 946 self.notify_muxer(); 947 948 // Successfully reading and parsing the connection request should have removed the 949 // LocalStream epoll listener and added a Connection epoll listener. 950 let (local_lsn_count, conn_lsn_count) = self.count_epoll_listeners(); 951 assert_eq!(local_lsn_count, init_local_lsn_count); 952 assert_eq!(conn_lsn_count, init_conn_lsn_count + 1); 953 954 // A LocalInit connection should've been added to the muxer connection map. A new 955 // local port should also have been allocated for the new LocalInit connection. 956 let local_port = self.muxer.local_port_last; 957 let key = ConnMapKey { 958 local_port, 959 peer_port, 960 }; 961 assert!(self.muxer.conn_map.contains_key(&key)); 962 assert!(self.muxer.local_port_set.contains(&local_port)); 963 964 // A connection request for the peer should now be available from the muxer. 965 assert!(self.muxer.has_pending_rx()); 966 self.recv(); 967 assert_eq!(self.pkt.op(), uapi::VSOCK_OP_REQUEST); 968 assert_eq!(self.pkt.dst_port(), peer_port); 969 assert_eq!(self.pkt.src_port(), local_port); 970 971 self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RESPONSE); 972 self.send(); 973 974 let mut buf = [0u8; 32]; 975 let len = stream.read(&mut buf[..]).unwrap(); 976 assert_eq!(&buf[..len], format!("OK {local_port}\n").as_bytes()); 977 978 (stream, local_port) 979 } 980 } 981 982 struct LocalListener { 983 path: PathBuf, 984 sock: UnixListener, 985 } 986 impl LocalListener { 987 fn new<P: AsRef<Path> + Clone>(path: P) -> Self { 988 let path_buf = path.as_ref().to_path_buf(); 989 let sock = UnixListener::bind(path).unwrap(); 990 sock.set_nonblocking(true).unwrap(); 991 Self { 992 path: path_buf, 993 sock, 994 } 995 } 996 fn accept(&mut self) -> UnixStream { 997 let (stream, _) = self.sock.accept().unwrap(); 998 stream.set_nonblocking(true).unwrap(); 999 stream 1000 } 1001 } 1002 impl Drop for LocalListener { 1003 fn drop(&mut self) { 1004 std::fs::remove_file(&self.path).unwrap(); 1005 } 1006 } 1007 1008 #[test] 1009 fn test_muxer_epoll_listener() { 1010 let ctx = MuxerTestContext::new("muxer_epoll_listener"); 1011 assert_eq!(ctx.muxer.get_polled_fd(), ctx.muxer.epoll_file.as_raw_fd()); 1012 assert_eq!(ctx.muxer.get_polled_evset(), epoll::Events::EPOLLIN); 1013 } 1014 1015 #[test] 1016 fn test_bad_peer_pkt() { 1017 const LOCAL_PORT: u32 = 1026; 1018 const PEER_PORT: u32 = 1025; 1019 const SOCK_DGRAM: u16 = 2; 1020 1021 let mut ctx = MuxerTestContext::new("bad_peer_pkt"); 1022 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST) 1023 .set_type(SOCK_DGRAM); 1024 ctx.send(); 1025 1026 // The guest sent a SOCK_DGRAM packet. Per the vsock spec, we need to reply with an RST 1027 // packet, since vsock only supports stream sockets. 1028 assert!(ctx.muxer.has_pending_rx()); 1029 ctx.recv(); 1030 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1031 assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1032 assert_eq!(ctx.pkt.dst_cid(), PEER_CID); 1033 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1034 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1035 1036 // Any orphan (i.e. without a connection), non-RST packet, should be replied to with an 1037 // RST. 1038 let bad_ops = [ 1039 uapi::VSOCK_OP_RESPONSE, 1040 uapi::VSOCK_OP_CREDIT_REQUEST, 1041 uapi::VSOCK_OP_CREDIT_UPDATE, 1042 uapi::VSOCK_OP_SHUTDOWN, 1043 uapi::VSOCK_OP_RW, 1044 ]; 1045 for op in bad_ops.iter() { 1046 ctx.init_pkt(LOCAL_PORT, PEER_PORT, *op); 1047 ctx.send(); 1048 assert!(ctx.muxer.has_pending_rx()); 1049 ctx.recv(); 1050 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1051 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1052 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1053 } 1054 1055 // Any packet addressed to anything other than VSOCK_VHOST_CID should get dropped. 1056 assert!(!ctx.muxer.has_pending_rx()); 1057 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST) 1058 .set_dst_cid(uapi::VSOCK_HOST_CID + 1); 1059 ctx.send(); 1060 assert!(!ctx.muxer.has_pending_rx()); 1061 } 1062 1063 #[test] 1064 fn test_peer_connection() { 1065 const LOCAL_PORT: u32 = 1026; 1066 const PEER_PORT: u32 = 1025; 1067 1068 let mut ctx = MuxerTestContext::new("peer_connection"); 1069 1070 // Test peer connection refused. 1071 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST); 1072 ctx.send(); 1073 ctx.recv(); 1074 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1075 assert_eq!(ctx.pkt.len(), 0); 1076 assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1077 assert_eq!(ctx.pkt.dst_cid(), PEER_CID); 1078 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1079 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1080 1081 // Test peer connection accepted. 1082 let mut listener = ctx.create_local_listener(LOCAL_PORT); 1083 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST); 1084 ctx.send(); 1085 assert_eq!(ctx.muxer.conn_map.len(), 1); 1086 let mut stream = listener.accept(); 1087 ctx.recv(); 1088 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1089 assert_eq!(ctx.pkt.len(), 0); 1090 assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1091 assert_eq!(ctx.pkt.dst_cid(), PEER_CID); 1092 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1093 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1094 let key = ConnMapKey { 1095 local_port: LOCAL_PORT, 1096 peer_port: PEER_PORT, 1097 }; 1098 assert!(ctx.muxer.conn_map.contains_key(&key)); 1099 1100 // Test guest -> host data flow. 1101 let data = [1, 2, 3, 4]; 1102 ctx.init_data_pkt(LOCAL_PORT, PEER_PORT, &data); 1103 ctx.send(); 1104 let mut buf = vec![0; data.len()]; 1105 stream.read_exact(buf.as_mut_slice()).unwrap(); 1106 assert_eq!(buf.as_slice(), data); 1107 1108 // Test host -> guest data flow. 1109 let data = [5u8, 6, 7, 8]; 1110 stream.write_all(&data).unwrap(); 1111 1112 // When data is available on the local stream, an EPOLLIN event would normally be delivered 1113 // to the muxer's nested epoll FD. For testing only, we can fake that event notification 1114 // here. 1115 ctx.notify_muxer(); 1116 // After being notified, the muxer should've figured out that RX data was available for one 1117 // of its connections, so it should now be reporting that it can fill in an RX packet. 1118 assert!(ctx.muxer.has_pending_rx()); 1119 ctx.recv(); 1120 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 1121 assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data); 1122 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1123 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1124 1125 assert!(!ctx.muxer.has_pending_rx()); 1126 } 1127 1128 #[test] 1129 fn test_local_connection() { 1130 let mut ctx = MuxerTestContext::new("local_connection"); 1131 let peer_port = 1025; 1132 let (mut stream, local_port) = ctx.local_connect(peer_port); 1133 1134 // Test guest -> host data flow. 1135 let data = [1, 2, 3, 4]; 1136 ctx.init_data_pkt(local_port, peer_port, &data); 1137 ctx.send(); 1138 1139 let mut buf = vec![0u8; data.len()]; 1140 stream.read_exact(buf.as_mut_slice()).unwrap(); 1141 assert_eq!(buf.as_slice(), &data); 1142 1143 // Test host -> guest data flow. 1144 let data = [5, 6, 7, 8]; 1145 stream.write_all(&data).unwrap(); 1146 ctx.notify_muxer(); 1147 1148 assert!(ctx.muxer.has_pending_rx()); 1149 ctx.recv(); 1150 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 1151 assert_eq!(ctx.pkt.src_port(), local_port); 1152 assert_eq!(ctx.pkt.dst_port(), peer_port); 1153 assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data); 1154 } 1155 1156 #[test] 1157 fn test_local_close() { 1158 let peer_port = 1025; 1159 let mut ctx = MuxerTestContext::new("local_close"); 1160 let local_port; 1161 { 1162 let (_stream, local_port_) = ctx.local_connect(peer_port); 1163 local_port = local_port_; 1164 } 1165 // Local var `_stream` was now dropped, thus closing the local stream. After the muxer gets 1166 // notified via EPOLLIN, it should attempt to gracefully shutdown the connection, issuing a 1167 // VSOCK_OP_SHUTDOWN with both no-more-send and no-more-recv indications set. 1168 ctx.notify_muxer(); 1169 assert!(ctx.muxer.has_pending_rx()); 1170 ctx.recv(); 1171 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN); 1172 assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND, 0); 1173 assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV, 0); 1174 assert_eq!(ctx.pkt.src_port(), local_port); 1175 assert_eq!(ctx.pkt.dst_port(), peer_port); 1176 1177 // The connection should get removed (and its local port freed), after the peer replies 1178 // with an RST. 1179 ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RST); 1180 ctx.send(); 1181 let key = ConnMapKey { 1182 local_port, 1183 peer_port, 1184 }; 1185 assert!(!ctx.muxer.conn_map.contains_key(&key)); 1186 assert!(!ctx.muxer.local_port_set.contains(&local_port)); 1187 } 1188 1189 #[test] 1190 fn test_peer_close() { 1191 let peer_port = 1025; 1192 let local_port = 1026; 1193 let mut ctx = MuxerTestContext::new("peer_close"); 1194 1195 let mut sock = ctx.create_local_listener(local_port); 1196 ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_REQUEST); 1197 ctx.send(); 1198 let mut stream = sock.accept(); 1199 1200 assert!(ctx.muxer.has_pending_rx()); 1201 ctx.recv(); 1202 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1203 assert_eq!(ctx.pkt.src_port(), local_port); 1204 assert_eq!(ctx.pkt.dst_port(), peer_port); 1205 let key = ConnMapKey { 1206 local_port, 1207 peer_port, 1208 }; 1209 assert!(ctx.muxer.conn_map.contains_key(&key)); 1210 1211 // Emulate a full shutdown from the peer (no-more-send + no-more-recv). 1212 ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_SHUTDOWN) 1213 .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND) 1214 .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV); 1215 ctx.send(); 1216 1217 // Now, the muxer should remove the connection from its map, and reply with an RST. 1218 assert!(ctx.muxer.has_pending_rx()); 1219 ctx.recv(); 1220 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1221 assert_eq!(ctx.pkt.src_port(), local_port); 1222 assert_eq!(ctx.pkt.dst_port(), peer_port); 1223 let key = ConnMapKey { 1224 local_port, 1225 peer_port, 1226 }; 1227 assert!(!ctx.muxer.conn_map.contains_key(&key)); 1228 1229 // The muxer should also drop / close the local Unix socket for this connection. 1230 let mut buf = vec![0u8; 16]; 1231 assert_eq!(stream.read(buf.as_mut_slice()).unwrap(), 0); 1232 } 1233 1234 #[test] 1235 fn test_muxer_rxq() { 1236 let mut ctx = MuxerTestContext::new("muxer_rxq"); 1237 let local_port = 1026; 1238 let peer_port_first = 1025; 1239 let mut listener = ctx.create_local_listener(local_port); 1240 let mut streams: Vec<UnixStream> = Vec::new(); 1241 1242 for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE { 1243 ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST); 1244 ctx.send(); 1245 streams.push(listener.accept()); 1246 } 1247 1248 // The muxer RX queue should now be full (with connection responses), but still 1249 // synchronized. 1250 assert!(ctx.muxer.rxq.is_synced()); 1251 1252 // One more queued reply should desync the RX queue. 1253 ctx.init_pkt( 1254 local_port, 1255 (peer_port_first + defs::MUXER_RXQ_SIZE) as u32, 1256 uapi::VSOCK_OP_REQUEST, 1257 ); 1258 ctx.send(); 1259 assert!(!ctx.muxer.rxq.is_synced()); 1260 1261 // With an out-of-sync queue, an RST should evict any non-RST packet from the queue, and 1262 // take its place. We'll check that by making sure that the last packet popped from the 1263 // queue is an RST. 1264 ctx.init_pkt( 1265 local_port + 1, 1266 peer_port_first as u32, 1267 uapi::VSOCK_OP_REQUEST, 1268 ); 1269 ctx.send(); 1270 1271 for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE - 1 { 1272 ctx.recv(); 1273 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1274 // The response order should hold. The evicted response should have been the last 1275 // enqueued. 1276 assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 1277 } 1278 // There should be one more packet in the queue: the RST. 1279 assert_eq!(ctx.muxer.rxq.len(), 1); 1280 ctx.recv(); 1281 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1282 1283 // The queue should now be empty, but out-of-sync, so the muxer should report it has some 1284 // pending RX. 1285 assert!(ctx.muxer.rxq.is_empty()); 1286 assert!(!ctx.muxer.rxq.is_synced()); 1287 assert!(ctx.muxer.has_pending_rx()); 1288 1289 // The next recv should sync the queue back up. It should also yield one of the two 1290 // responses that are still left: 1291 // - the one that desynchronized the queue; and 1292 // - the one that got evicted by the RST. 1293 ctx.recv(); 1294 assert!(ctx.muxer.rxq.is_synced()); 1295 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1296 1297 assert!(ctx.muxer.has_pending_rx()); 1298 ctx.recv(); 1299 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1300 } 1301 1302 #[test] 1303 fn test_muxer_killq() { 1304 let mut ctx = MuxerTestContext::new("muxer_killq"); 1305 let local_port = 1026; 1306 let peer_port_first = 1025; 1307 let peer_port_last = peer_port_first + defs::MUXER_KILLQ_SIZE; 1308 let mut listener = ctx.create_local_listener(local_port); 1309 1310 for peer_port in peer_port_first..=peer_port_last { 1311 ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST); 1312 ctx.send(); 1313 ctx.notify_muxer(); 1314 ctx.recv(); 1315 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1316 assert_eq!(ctx.pkt.src_port(), local_port); 1317 assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 1318 { 1319 let _stream = listener.accept(); 1320 } 1321 ctx.notify_muxer(); 1322 ctx.recv(); 1323 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN); 1324 assert_eq!(ctx.pkt.src_port(), local_port); 1325 assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 1326 // The kill queue should be synchronized, up until the `defs::MUXER_KILLQ_SIZE`th 1327 // connection we schedule for termination. 1328 assert_eq!( 1329 ctx.muxer.killq.is_synced(), 1330 peer_port < peer_port_first + defs::MUXER_KILLQ_SIZE 1331 ); 1332 } 1333 1334 assert!(!ctx.muxer.killq.is_synced()); 1335 assert!(!ctx.muxer.has_pending_rx()); 1336 1337 // Wait for the kill timers to expire. 1338 std::thread::sleep(std::time::Duration::from_millis( 1339 csm_defs::CONN_SHUTDOWN_TIMEOUT_MS, 1340 )); 1341 1342 // Trigger a kill queue sweep, by requesting a new connection. 1343 ctx.init_pkt( 1344 local_port, 1345 peer_port_last as u32 + 1, 1346 uapi::VSOCK_OP_REQUEST, 1347 ); 1348 ctx.send(); 1349 1350 // After sweeping the kill queue, it should now be synced (assuming the RX queue is larger 1351 // than the kill queue, since an RST packet will be queued for each killed connection). 1352 assert!(ctx.muxer.killq.is_synced()); 1353 assert!(ctx.muxer.has_pending_rx()); 1354 // There should be `defs::MUXER_KILLQ_SIZE` RSTs in the RX queue, from terminating the 1355 // dying connections in the recent killq sweep. 1356 for _p in peer_port_first..peer_port_last { 1357 ctx.recv(); 1358 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1359 assert_eq!(ctx.pkt.src_port(), local_port); 1360 } 1361 1362 // There should be one more packet in the RX queue: the connection response our request 1363 // that triggered the kill queue sweep. 1364 ctx.recv(); 1365 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1366 assert_eq!(ctx.pkt.dst_port(), peer_port_last as u32 + 1); 1367 1368 assert!(!ctx.muxer.has_pending_rx()); 1369 } 1370 1371 #[test] 1372 fn test_regression_handshake() { 1373 // Address one of the issues found while fixing the following issue: 1374 // https://github.com/firecracker-microvm/firecracker/issues/1751 1375 // This test checks that the handshake message is not accounted for 1376 let mut ctx = MuxerTestContext::new("regression_handshake"); 1377 let peer_port = 1025; 1378 1379 // Create a local connection. 1380 let (_, local_port) = ctx.local_connect(peer_port); 1381 1382 // Get the connection from the connection map. 1383 let key = ConnMapKey { 1384 local_port, 1385 peer_port, 1386 }; 1387 let conn = ctx.muxer.conn_map.get_mut(&key).unwrap(); 1388 1389 // Check that fwd_cnt is 0 - "OK ..." was not accounted for. 1390 assert_eq!(conn.fwd_cnt().0, 0); 1391 } 1392 1393 #[test] 1394 fn test_regression_rxq_pop() { 1395 // Address one of the issues found while fixing the following issue: 1396 // https://github.com/firecracker-microvm/firecracker/issues/1751 1397 // This test checks that a connection is not popped out of the muxer 1398 // rxq when multiple flags are set 1399 let mut ctx = MuxerTestContext::new("regression_rxq_pop"); 1400 let peer_port = 1025; 1401 let (mut stream, local_port) = ctx.local_connect(peer_port); 1402 1403 // Send some data. 1404 let data = [5u8, 6, 7, 8]; 1405 stream.write_all(&data).unwrap(); 1406 ctx.notify_muxer(); 1407 1408 // Get the connection from the connection map. 1409 let key = ConnMapKey { 1410 local_port, 1411 peer_port, 1412 }; 1413 let conn = ctx.muxer.conn_map.get_mut(&key).unwrap(); 1414 1415 // Forcefully insert another flag. 1416 conn.insert_credit_update(); 1417 1418 // Call recv twice in order to check that the connection is still 1419 // in the rxq. 1420 assert!(ctx.muxer.has_pending_rx()); 1421 ctx.recv(); 1422 assert!(ctx.muxer.has_pending_rx()); 1423 ctx.recv(); 1424 1425 // Since initially the connection had two flags set, now there should 1426 // not be any pending RX in the muxer. 1427 assert!(!ctx.muxer.has_pending_rx()); 1428 } 1429 } 1430