1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 // 4 5 //! `VsockMuxer` is the device-facing component of the Unix domain sockets vsock backend. I.e. 6 //! by implementing the `VsockBackend` trait, it abstracts away the gory details of translating 7 //! between AF_VSOCK and AF_UNIX, and presents a clean interface to the rest of the vsock 8 //! device model. 9 //! 10 //! The vsock muxer has two main roles: 11 //! 12 //! ## Vsock connection multiplexer 13 //! 14 //! It's the muxer's job to create, manage, and terminate `VsockConnection` objects. The 15 //! muxer also routes packets to their owning connections. It does so via a connection 16 //! `HashMap`, keyed by what is basically a (host_port, guest_port) tuple. 17 //! 18 //! Vsock packet traffic needs to be inspected, in order to detect connection request 19 //! packets (leading to the creation of a new connection), and connection reset packets 20 //! (leading to the termination of an existing connection). All other packets, though, must 21 //! belong to an existing connection and, as such, the muxer simply forwards them. 22 //! 23 //! ## Event dispatcher 24 //! 25 //! There are three event categories that the vsock backend is interested it: 26 //! 1. A new host-initiated connection is ready to be accepted from the listening host Unix 27 //! socket; 28 //! 2. Data is available for reading from a newly-accepted host-initiated connection (i.e. 29 //! the host is ready to issue a vsock connection request, informing us of the 30 //! destination port to which it wants to connect); 31 //! 3. Some event was triggered for a connected Unix socket, that belongs to a 32 //! `VsockConnection`. 33 //! 34 //! The muxer gets notified about all of these events, because, as a `VsockEpollListener` 35 //! implementor, it gets to register a nested epoll FD into the main VMM epoll()ing loop. All 36 //! other pollable FDs are then registered under this nested epoll FD. 37 //! 38 //! To route all these events to their handlers, the muxer uses another `HashMap` object, 39 //! mapping `RawFd`s to `EpollListener`s. 40 41 use std::collections::{HashMap, HashSet}; 42 use std::fs::File; 43 use std::io::{self, ErrorKind, Read}; 44 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; 45 use std::os::unix::net::{UnixListener, UnixStream}; 46 47 use super::super::csm::ConnState; 48 use super::super::defs::uapi; 49 use super::super::packet::VsockPacket; 50 use super::super::{ 51 Result as VsockResult, VsockBackend, VsockChannel, VsockEpollListener, VsockError, 52 }; 53 use super::defs; 54 use super::muxer_killq::MuxerKillQ; 55 use super::muxer_rxq::MuxerRxQ; 56 use super::MuxerConnection; 57 use super::{Error, Result}; 58 59 /// A unique identifier of a `MuxerConnection` object. Connections are stored in a hash map, 60 /// keyed by a `ConnMapKey` object. 61 /// 62 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] 63 pub struct ConnMapKey { 64 local_port: u32, 65 peer_port: u32, 66 } 67 68 /// A muxer RX queue item. 69 /// 70 #[derive(Clone, Copy, Debug)] 71 pub enum MuxerRx { 72 /// The packet must be fetched from the connection identified by `ConnMapKey`. 73 ConnRx(ConnMapKey), 74 /// The muxer must produce an RST packet. 75 RstPkt { local_port: u32, peer_port: u32 }, 76 } 77 78 /// An epoll listener, registered under the muxer's nested epoll FD. 79 /// 80 enum EpollListener { 81 /// The listener is a `MuxerConnection`, identified by `key`, and interested in the events 82 /// in `evset`. Since `MuxerConnection` implements `VsockEpollListener`, notifications will 83 /// be forwarded to the listener via `VsockEpollListener::notify()`. 84 Connection { 85 key: ConnMapKey, 86 evset: epoll::Events, 87 }, 88 /// A listener interested in new host-initiated connections. 89 HostSock, 90 /// A listener interested in reading host "connect \<port>" commands from a freshly 91 /// connected host socket. 92 LocalStream(UnixStream), 93 } 94 95 /// A partially read "CONNECT" command. 96 #[derive(Default)] 97 struct PartiallyReadCommand { 98 /// The bytes of the command that have been read so far. 99 buf: [u8; 32], 100 /// How much of `buf` has been used. 101 len: usize, 102 } 103 104 /// The vsock connection multiplexer. 105 /// 106 pub struct VsockMuxer { 107 /// Guest CID. 108 cid: u64, 109 /// A hash map used to store the active connections. 110 conn_map: HashMap<ConnMapKey, MuxerConnection>, 111 /// A hash map used to store epoll event listeners / handlers. 112 listener_map: HashMap<RawFd, EpollListener>, 113 /// A hash map used to store partially read "connect" commands. 114 partial_command_map: HashMap<RawFd, PartiallyReadCommand>, 115 /// The RX queue. Items in this queue are consumed by `VsockMuxer::recv_pkt()`, and 116 /// produced 117 /// - by `VsockMuxer::send_pkt()` (e.g. RST in response to a connection request packet); 118 /// and 119 /// - in response to EPOLLIN events (e.g. data available to be read from an AF_UNIX 120 /// socket). 121 rxq: MuxerRxQ, 122 /// A queue used for terminating connections that are taking too long to shut down. 123 killq: MuxerKillQ, 124 /// The Unix socket, through which host-initiated connections are accepted. 125 host_sock: UnixListener, 126 /// The file system path of the host-side Unix socket. This is used to figure out the path 127 /// to Unix sockets listening on specific ports. I.e. "\<this path>_\<port number>". 128 host_sock_path: String, 129 /// The nested epoll File, used to register epoll listeners. 130 epoll_file: File, 131 /// A hash set used to keep track of used host-side (local) ports, in order to assign local 132 /// ports to host-initiated connections. 133 local_port_set: HashSet<u32>, 134 /// The last used host-side port. 135 local_port_last: u32, 136 } 137 138 impl VsockChannel for VsockMuxer { 139 /// Deliver a vsock packet to the guest vsock driver. 140 /// 141 /// Returns: 142 /// - `Ok(())`: `pkt` has been successfully filled in; or 143 /// - `Err(VsockError::NoData)`: there was no available data with which to fill in the 144 /// packet. 145 /// 146 fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> { 147 // We'll look for instructions on how to build the RX packet in the RX queue. If the 148 // queue is empty, that doesn't necessarily mean we don't have any pending RX, since 149 // the queue might be out-of-sync. If that's the case, we'll attempt to sync it first, 150 // and then try to pop something out again. 151 if self.rxq.is_empty() && !self.rxq.is_synced() { 152 self.rxq = MuxerRxQ::from_conn_map(&self.conn_map); 153 } 154 155 while let Some(rx) = self.rxq.peek() { 156 let res = match rx { 157 // We need to build an RST packet, going from `local_port` to `peer_port`. 158 MuxerRx::RstPkt { 159 local_port, 160 peer_port, 161 } => { 162 pkt.set_op(uapi::VSOCK_OP_RST) 163 .set_src_cid(uapi::VSOCK_HOST_CID) 164 .set_dst_cid(self.cid) 165 .set_src_port(local_port) 166 .set_dst_port(peer_port) 167 .set_len(0) 168 .set_type(uapi::VSOCK_TYPE_STREAM) 169 .set_flags(0) 170 .set_buf_alloc(0) 171 .set_fwd_cnt(0); 172 self.rxq.pop().unwrap(); 173 return Ok(()); 174 } 175 176 // We'll defer building the packet to this connection, since it has something 177 // to say. 178 MuxerRx::ConnRx(key) => { 179 let mut conn_res = Err(VsockError::NoData); 180 let mut do_pop = true; 181 self.apply_conn_mutation(key, |conn| { 182 conn_res = conn.recv_pkt(pkt); 183 do_pop = !conn.has_pending_rx(); 184 }); 185 if do_pop { 186 self.rxq.pop().unwrap(); 187 } 188 conn_res 189 } 190 }; 191 192 if res.is_ok() { 193 // Inspect traffic, looking for RST packets, since that means we have to 194 // terminate and remove this connection from the active connection pool. 195 // 196 if pkt.op() == uapi::VSOCK_OP_RST { 197 self.remove_connection(ConnMapKey { 198 local_port: pkt.src_port(), 199 peer_port: pkt.dst_port(), 200 }); 201 } 202 203 debug!("vsock muxer: RX pkt: {:?}", pkt.hdr()); 204 return Ok(()); 205 } 206 } 207 208 Err(VsockError::NoData) 209 } 210 211 /// Deliver a guest-generated packet to its destination in the vsock backend. 212 /// 213 /// This absorbs unexpected packets, handles RSTs (by dropping connections), and forwards 214 /// all the rest to their owning `MuxerConnection`. 215 /// 216 /// Returns: 217 /// always `Ok(())` - the packet has been consumed, and its virtio TX buffers can be 218 /// returned to the guest vsock driver. 219 /// 220 fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> { 221 let conn_key = ConnMapKey { 222 local_port: pkt.dst_port(), 223 peer_port: pkt.src_port(), 224 }; 225 226 debug!( 227 "vsock: muxer.send[rxq.len={}]: {:?}", 228 self.rxq.len(), 229 pkt.hdr() 230 ); 231 232 // If this packet has an unsupported type (!=stream), we must send back an RST. 233 // 234 if pkt.type_() != uapi::VSOCK_TYPE_STREAM { 235 self.enq_rst(pkt.dst_port(), pkt.src_port()); 236 return Ok(()); 237 } 238 239 // We don't know how to handle packets addressed to other CIDs. We only handle the host 240 // part of the guest - host communication here. 241 if pkt.dst_cid() != uapi::VSOCK_HOST_CID { 242 info!( 243 "vsock: dropping guest packet for unknown CID: {:?}", 244 pkt.hdr() 245 ); 246 return Ok(()); 247 } 248 249 if !self.conn_map.contains_key(&conn_key) { 250 // This packet can't be routed to any active connection (based on its src and dst 251 // ports). The only orphan / unroutable packets we know how to handle are 252 // connection requests. 253 if pkt.op() == uapi::VSOCK_OP_REQUEST { 254 // Oh, this is a connection request! 255 self.handle_peer_request_pkt(pkt); 256 } else { 257 // Send back an RST, to let the drive know we weren't expecting this packet. 258 self.enq_rst(pkt.dst_port(), pkt.src_port()); 259 } 260 return Ok(()); 261 } 262 263 // Right, we know where to send this packet, then (to `conn_key`). 264 // However, if this is an RST, we have to forcefully terminate the connection, so 265 // there's no point in forwarding it the packet. 266 if pkt.op() == uapi::VSOCK_OP_RST { 267 self.remove_connection(conn_key); 268 return Ok(()); 269 } 270 271 // Alright, everything looks in order - forward this packet to its owning connection. 272 let mut res: VsockResult<()> = Ok(()); 273 self.apply_conn_mutation(conn_key, |conn| { 274 res = conn.send_pkt(pkt); 275 }); 276 277 res 278 } 279 280 /// Check if the muxer has any pending RX data, with which to fill a guest-provided RX 281 /// buffer. 282 /// 283 fn has_pending_rx(&self) -> bool { 284 !self.rxq.is_empty() || !self.rxq.is_synced() 285 } 286 } 287 288 impl VsockEpollListener for VsockMuxer { 289 /// Get the FD to be registered for polling upstream (in the main VMM epoll loop, in this 290 /// case). 291 /// 292 /// This will be the muxer's nested epoll FD. 293 /// 294 fn get_polled_fd(&self) -> RawFd { 295 self.epoll_file.as_raw_fd() 296 } 297 298 /// Get the epoll events to be polled upstream. 299 /// 300 /// Since the polled FD is a nested epoll FD, we're only interested in EPOLLIN events (i.e. 301 /// some event occurred on one of the FDs registered under our epoll FD). 302 /// 303 fn get_polled_evset(&self) -> epoll::Events { 304 epoll::Events::EPOLLIN 305 } 306 307 /// Notify the muxer about a pending event having occurred under its nested epoll FD. 308 /// 309 fn notify(&mut self, _: epoll::Events) { 310 debug!("vsock: muxer received kick"); 311 312 let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); 32]; 313 'epoll: loop { 314 match epoll::wait(self.epoll_file.as_raw_fd(), 0, epoll_events.as_mut_slice()) { 315 Ok(ev_cnt) => { 316 for evt in epoll_events.iter().take(ev_cnt) { 317 self.handle_event( 318 evt.data as RawFd, 319 // It's ok to unwrap here, since the `evt.events` is filled 320 // in by `epoll::wait()`, and therefore contains only valid epoll 321 // flags. 322 epoll::Events::from_bits(evt.events).unwrap(), 323 ); 324 } 325 } 326 Err(e) => { 327 if e.kind() == io::ErrorKind::Interrupted { 328 // It's well defined from the epoll_wait() syscall 329 // documentation that the epoll loop can be interrupted 330 // before any of the requested events occurred or the 331 // timeout expired. In both those cases, epoll_wait() 332 // returns an error of type EINTR, but this should not 333 // be considered as a regular error. Instead it is more 334 // appropriate to retry, by calling into epoll_wait(). 335 continue; 336 } 337 warn!("vsock: failed to consume muxer epoll event: {}", e); 338 } 339 } 340 break 'epoll; 341 } 342 } 343 } 344 345 impl VsockBackend for VsockMuxer {} 346 347 impl VsockMuxer { 348 /// Muxer constructor. 349 /// 350 pub fn new(cid: u32, host_sock_path: String) -> Result<Self> { 351 // Create the nested epoll FD. This FD will be added to the VMM `EpollContext`, at 352 // device activation time. 353 let epoll_fd = epoll::create(true).map_err(Error::EpollFdCreate)?; 354 // Use 'File' to enforce closing on 'epoll_fd' 355 // SAFETY: epoll_fd is a valid fd 356 let epoll_file = unsafe { File::from_raw_fd(epoll_fd) }; 357 358 // Open/bind/listen on the host Unix socket, so we can accept host-initiated 359 // connections. 360 let host_sock = UnixListener::bind(&host_sock_path) 361 .and_then(|sock| sock.set_nonblocking(true).map(|_| sock)) 362 .map_err(Error::UnixBind)?; 363 364 let mut muxer = Self { 365 cid: cid.into(), 366 host_sock, 367 host_sock_path, 368 epoll_file, 369 rxq: MuxerRxQ::new(), 370 conn_map: HashMap::with_capacity(defs::MAX_CONNECTIONS), 371 listener_map: HashMap::with_capacity(defs::MAX_CONNECTIONS + 1), 372 partial_command_map: Default::default(), 373 killq: MuxerKillQ::new(), 374 local_port_last: (1u32 << 30) - 1, 375 local_port_set: HashSet::with_capacity(defs::MAX_CONNECTIONS), 376 }; 377 378 muxer.add_listener(muxer.host_sock.as_raw_fd(), EpollListener::HostSock)?; 379 Ok(muxer) 380 } 381 382 /// Handle/dispatch an epoll event to its listener. 383 /// 384 fn handle_event(&mut self, fd: RawFd, event_set: epoll::Events) { 385 debug!( 386 "vsock: muxer processing event: fd={}, event_set={:?}", 387 fd, event_set 388 ); 389 390 match self.listener_map.get_mut(&fd) { 391 // This event needs to be forwarded to a `MuxerConnection` that is listening for 392 // it. 393 // 394 Some(EpollListener::Connection { key, evset: _ }) => { 395 let key_copy = *key; 396 // The handling of this event will most probably mutate the state of the 397 // receiving connection. We'll need to check for new pending RX, event set 398 // mutation, and all that, so we're wrapping the event delivery inside those 399 // checks. 400 self.apply_conn_mutation(key_copy, |conn| { 401 conn.notify(event_set); 402 }); 403 } 404 405 // A new host-initiated connection is ready to be accepted. 406 // 407 Some(EpollListener::HostSock) => { 408 if self.conn_map.len() == defs::MAX_CONNECTIONS { 409 // If we're already maxed-out on connections, we'll just accept and 410 // immediately discard this potentially new one. 411 warn!("vsock: connection limit reached; refusing new host connection"); 412 self.host_sock.accept().map(|_| 0).unwrap_or(0); 413 return; 414 } 415 self.host_sock 416 .accept() 417 .map_err(Error::UnixAccept) 418 .and_then(|(stream, _)| { 419 stream 420 .set_nonblocking(true) 421 .map(|_| stream) 422 .map_err(Error::UnixAccept) 423 }) 424 .and_then(|stream| { 425 // Before forwarding this connection to a listening AF_VSOCK socket on 426 // the guest side, we need to know the destination port. We'll read 427 // that port from a "connect" command received on this socket, so the 428 // next step is to ask to be notified the moment we can read from it. 429 self.add_listener(stream.as_raw_fd(), EpollListener::LocalStream(stream)) 430 }) 431 .unwrap_or_else(|err| { 432 warn!("vsock: unable to accept local connection: {:?}", err); 433 }); 434 } 435 436 // Data is ready to be read from a host-initiated connection. That would be the 437 // "connect" command that we're expecting. 438 Some(EpollListener::LocalStream(_)) => { 439 if let Some(EpollListener::LocalStream(stream)) = self.listener_map.get_mut(&fd) { 440 let port = Self::read_local_stream_port(&mut self.partial_command_map, stream); 441 442 if let Err(Error::UnixRead(ref e)) = port { 443 if e.kind() == ErrorKind::WouldBlock { 444 return; 445 } 446 } 447 448 let stream = match self.remove_listener(fd) { 449 Some(EpollListener::LocalStream(s)) => s, 450 _ => unreachable!(), 451 }; 452 453 port.and_then(|peer_port| { 454 let local_port = self.allocate_local_port(); 455 456 self.add_connection( 457 ConnMapKey { 458 local_port, 459 peer_port, 460 }, 461 MuxerConnection::new_local_init( 462 stream, 463 uapi::VSOCK_HOST_CID, 464 self.cid, 465 local_port, 466 peer_port, 467 ), 468 ) 469 }) 470 .unwrap_or_else(|err| { 471 info!("vsock: error adding local-init connection: {:?}", err); 472 }) 473 } 474 } 475 476 _ => { 477 info!( 478 "vsock: unexpected event: fd={:?}, event_set={:?}", 479 fd, event_set 480 ); 481 } 482 } 483 } 484 485 /// Parse a host "connect" command, and extract the destination vsock port. 486 /// 487 fn read_local_stream_port( 488 partial_command_map: &mut HashMap<RawFd, PartiallyReadCommand>, 489 stream: &mut UnixStream, 490 ) -> Result<u32> { 491 let command = partial_command_map.entry(stream.as_raw_fd()).or_default(); 492 493 // This is the minimum number of bytes that we should be able to read, when parsing a 494 // valid connection request. I.e. `b"connect 0\n".len()`. 495 const MIN_COMMAND_LEN: usize = 10; 496 497 // Bring in the minimum number of bytes that we should be able to read. 498 stream 499 .read_exact(&mut command.buf[command.len..MIN_COMMAND_LEN]) 500 .map_err(Error::UnixRead)?; 501 command.len = MIN_COMMAND_LEN; 502 503 // Now, finish reading the destination port number, by bringing in one byte at a time, 504 // until we reach an EOL terminator (or our buffer space runs out). Yeah, not 505 // particularly proud of this approach, but it will have to do for now. 506 while command.buf[command.len - 1] != b'\n' && command.len < command.buf.len() { 507 command.len += stream 508 .read(&mut command.buf[command.len..=command.len]) 509 .map_err(Error::UnixRead)?; 510 } 511 512 let command = partial_command_map.remove(&stream.as_raw_fd()).unwrap(); 513 514 let mut word_iter = std::str::from_utf8(&command.buf[..command.len]) 515 .map_err(Error::ConvertFromUtf8)? 516 .split_whitespace(); 517 518 word_iter 519 .next() 520 .ok_or(Error::InvalidPortRequest) 521 .and_then(|word| { 522 if word.to_lowercase() == "connect" { 523 Ok(()) 524 } else { 525 Err(Error::InvalidPortRequest) 526 } 527 }) 528 .and_then(|_| word_iter.next().ok_or(Error::InvalidPortRequest)) 529 .and_then(|word| word.parse::<u32>().map_err(Error::ParseInteger)) 530 .map_err(|e| Error::ReadStreamPort(Box::new(e))) 531 } 532 533 /// Add a new connection to the active connection pool. 534 /// 535 fn add_connection(&mut self, key: ConnMapKey, conn: MuxerConnection) -> Result<()> { 536 // We might need to make room for this new connection, so let's sweep the kill queue 537 // first. It's fine to do this here because: 538 // - unless the kill queue is out of sync, this is a pretty inexpensive operation; and 539 // - we are under no pressure to respect any accurate timing for connection 540 // termination. 541 self.sweep_killq(); 542 543 if self.conn_map.len() >= defs::MAX_CONNECTIONS { 544 info!( 545 "vsock: muxer connection limit reached ({})", 546 defs::MAX_CONNECTIONS 547 ); 548 return Err(Error::TooManyConnections); 549 } 550 551 self.add_listener( 552 conn.get_polled_fd(), 553 EpollListener::Connection { 554 key, 555 evset: conn.get_polled_evset(), 556 }, 557 ) 558 .map(|_| { 559 if conn.has_pending_rx() { 560 // We can safely ignore any error in adding a connection RX indication. Worst 561 // case scenario, the RX queue will get desynchronized, but we'll handle that 562 // the next time we need to yield an RX packet. 563 self.rxq.push(MuxerRx::ConnRx(key)); 564 } 565 self.conn_map.insert(key, conn); 566 }) 567 } 568 569 /// Remove a connection from the active connection poll. 570 /// 571 fn remove_connection(&mut self, key: ConnMapKey) { 572 if let Some(conn) = self.conn_map.remove(&key) { 573 self.remove_listener(conn.get_polled_fd()); 574 } 575 self.free_local_port(key.local_port); 576 } 577 578 /// Schedule a connection for immediate termination. 579 /// I.e. as soon as we can also let our peer know we're dropping the connection, by sending 580 /// it an RST packet. 581 /// 582 fn kill_connection(&mut self, key: ConnMapKey) { 583 let mut had_rx = false; 584 self.conn_map.entry(key).and_modify(|conn| { 585 had_rx = conn.has_pending_rx(); 586 conn.kill(); 587 }); 588 // This connection will now have an RST packet to yield, so we need to add it to the RX 589 // queue. However, there's no point in doing that if it was already in the queue. 590 if !had_rx { 591 // We can safely ignore any error in adding a connection RX indication. Worst case 592 // scenario, the RX queue will get desynchronized, but we'll handle that the next 593 // time we need to yield an RX packet. 594 self.rxq.push(MuxerRx::ConnRx(key)); 595 } 596 } 597 598 /// Register a new epoll listener under the muxer's nested epoll FD. 599 /// 600 fn add_listener(&mut self, fd: RawFd, listener: EpollListener) -> Result<()> { 601 let evset = match listener { 602 EpollListener::Connection { evset, .. } => evset, 603 EpollListener::LocalStream(_) => epoll::Events::EPOLLIN, 604 EpollListener::HostSock => epoll::Events::EPOLLIN, 605 }; 606 607 epoll::ctl( 608 self.epoll_file.as_raw_fd(), 609 epoll::ControlOptions::EPOLL_CTL_ADD, 610 fd, 611 epoll::Event::new(evset, fd as u64), 612 ) 613 .map(|_| { 614 self.listener_map.insert(fd, listener); 615 }) 616 .map_err(Error::EpollAdd)?; 617 618 Ok(()) 619 } 620 621 /// Remove (and return) a previously registered epoll listener. 622 /// 623 fn remove_listener(&mut self, fd: RawFd) -> Option<EpollListener> { 624 let maybe_listener = self.listener_map.remove(&fd); 625 626 if maybe_listener.is_some() { 627 epoll::ctl( 628 self.epoll_file.as_raw_fd(), 629 epoll::ControlOptions::EPOLL_CTL_DEL, 630 fd, 631 epoll::Event::new(epoll::Events::empty(), 0), 632 ) 633 .unwrap_or_else(|err| { 634 warn!( 635 "vosck muxer: error removing epoll listener for fd {:?}: {:?}", 636 fd, err 637 ); 638 }); 639 } 640 641 maybe_listener 642 } 643 644 /// Allocate a host-side port to be assigned to a new host-initiated connection. 645 /// 646 /// 647 fn allocate_local_port(&mut self) -> u32 { 648 // TODO: this doesn't seem very space-efficient. 649 // Maybe rewrite this to limit port range and use a bitmap? 650 // 651 652 loop { 653 self.local_port_last = (self.local_port_last + 1) & !(1 << 31) | (1 << 30); 654 if self.local_port_set.insert(self.local_port_last) { 655 break; 656 } 657 } 658 self.local_port_last 659 } 660 661 /// Mark a previously used host-side port as free. 662 /// 663 fn free_local_port(&mut self, port: u32) { 664 self.local_port_set.remove(&port); 665 } 666 667 /// Handle a new connection request coming from our peer (the guest vsock driver). 668 /// 669 /// This will attempt to connect to a host-side Unix socket, expected to be listening at 670 /// the file system path corresponding to the destination port. If successful, a new 671 /// connection object will be created and added to the connection pool. On failure, a new 672 /// RST packet will be scheduled for delivery to the guest. 673 /// 674 fn handle_peer_request_pkt(&mut self, pkt: &VsockPacket) { 675 let port_path = format!("{}_{}", self.host_sock_path, pkt.dst_port()); 676 677 UnixStream::connect(port_path) 678 .and_then(|stream| stream.set_nonblocking(true).map(|_| stream)) 679 .map_err(Error::UnixConnect) 680 .and_then(|stream| { 681 self.add_connection( 682 ConnMapKey { 683 local_port: pkt.dst_port(), 684 peer_port: pkt.src_port(), 685 }, 686 MuxerConnection::new_peer_init( 687 stream, 688 uapi::VSOCK_HOST_CID, 689 self.cid, 690 pkt.dst_port(), 691 pkt.src_port(), 692 pkt.buf_alloc(), 693 ), 694 ) 695 }) 696 .unwrap_or_else(|_| self.enq_rst(pkt.dst_port(), pkt.src_port())); 697 } 698 699 /// Perform an action that might mutate a connection's state. 700 /// 701 /// This is used as shorthand for repetitive tasks that need to be performed after a 702 /// connection object mutates. E.g. 703 /// - update the connection's epoll listener; 704 /// - schedule the connection to be queried for RX data; 705 /// - kill the connection if an unrecoverable error occurs. 706 /// 707 fn apply_conn_mutation<F>(&mut self, key: ConnMapKey, mut_fn: F) 708 where 709 F: FnOnce(&mut MuxerConnection), 710 { 711 if let Some(conn) = self.conn_map.get_mut(&key) { 712 let had_rx = conn.has_pending_rx(); 713 let was_expiring = conn.will_expire(); 714 let prev_state = conn.state(); 715 716 mut_fn(conn); 717 718 // If this is a host-initiated connection that has just become established, we'll have 719 // to send an ack message to the host end. 720 if prev_state == ConnState::LocalInit && conn.state() == ConnState::Established { 721 let msg = format!("OK {}\n", key.local_port); 722 match conn.send_bytes_raw(msg.as_bytes()) { 723 Ok(written) if written == msg.len() => (), 724 Ok(_) => { 725 // If we can't write a dozen bytes to a pristine connection something 726 // must be really wrong. Killing it. 727 conn.kill(); 728 warn!("vsock: unable to fully write connection ack msg."); 729 } 730 Err(err) => { 731 conn.kill(); 732 warn!("vsock: unable to ack host connection: {:?}", err); 733 } 734 }; 735 } 736 737 // If the connection wasn't previously scheduled for RX, add it to our RX queue. 738 if !had_rx && conn.has_pending_rx() { 739 self.rxq.push(MuxerRx::ConnRx(key)); 740 } 741 742 // If the connection wasn't previously scheduled for termination, add it to the 743 // kill queue. 744 if !was_expiring && conn.will_expire() { 745 // It's safe to unwrap here, since `conn.will_expire()` already guaranteed that 746 // an `conn.expiry` is available. 747 self.killq.push(key, conn.expiry().unwrap()); 748 } 749 750 let fd = conn.get_polled_fd(); 751 let new_evset = conn.get_polled_evset(); 752 if new_evset.is_empty() { 753 // If the connection no longer needs epoll notifications, remove its listener 754 // from our list. 755 self.remove_listener(fd); 756 return; 757 } 758 if let Some(EpollListener::Connection { evset, .. }) = self.listener_map.get_mut(&fd) { 759 if *evset != new_evset { 760 // If the set of events that the connection is interested in has changed, 761 // we need to update its epoll listener. 762 debug!( 763 "vsock: updating listener for (lp={}, pp={}): old={:?}, new={:?}", 764 key.local_port, key.peer_port, *evset, new_evset 765 ); 766 767 *evset = new_evset; 768 epoll::ctl( 769 self.epoll_file.as_raw_fd(), 770 epoll::ControlOptions::EPOLL_CTL_MOD, 771 fd, 772 epoll::Event::new(new_evset, fd as u64), 773 ) 774 .unwrap_or_else(|err| { 775 // This really shouldn't happen, like, ever. However, "famous last 776 // words" and all that, so let's just kill it with fire, and walk away. 777 self.kill_connection(key); 778 error!( 779 "vsock: error updating epoll listener for (lp={}, pp={}): {:?}", 780 key.local_port, key.peer_port, err 781 ); 782 }); 783 } 784 } else { 785 // The connection had previously asked to be removed from the listener map (by 786 // returning an empty event set via `get_polled_fd()`), but now wants back in. 787 self.add_listener( 788 fd, 789 EpollListener::Connection { 790 key, 791 evset: new_evset, 792 }, 793 ) 794 .unwrap_or_else(|err| { 795 self.kill_connection(key); 796 error!( 797 "vsock: error updating epoll listener for (lp={}, pp={}): {:?}", 798 key.local_port, key.peer_port, err 799 ); 800 }); 801 } 802 } 803 } 804 805 /// Check if any connections have timed out, and if so, schedule them for immediate 806 /// termination. 807 /// 808 fn sweep_killq(&mut self) { 809 while let Some(key) = self.killq.pop() { 810 // Connections don't get removed from the kill queue when their kill timer is 811 // disarmed, since that would be a costly operation. This means we must check if 812 // the connection has indeed expired, prior to killing it. 813 let mut kill = false; 814 self.conn_map 815 .entry(key) 816 .and_modify(|conn| kill = conn.has_expired()); 817 if kill { 818 self.kill_connection(key); 819 } 820 } 821 822 if self.killq.is_empty() && !self.killq.is_synced() { 823 self.killq = MuxerKillQ::from_conn_map(&self.conn_map); 824 // If we've just re-created the kill queue, we can sweep it again; maybe there's 825 // more to kill. 826 self.sweep_killq(); 827 } 828 } 829 830 /// Enqueue an RST packet into `self.rxq`. 831 /// 832 /// Enqueue errors aren't propagated up the call chain, since there is nothing we can do to 833 /// handle them. We do, however, log a warning, since not being able to enqueue an RST 834 /// packet means we have to drop it, which is not normal operation. 835 /// 836 fn enq_rst(&mut self, local_port: u32, peer_port: u32) { 837 let pushed = self.rxq.push(MuxerRx::RstPkt { 838 local_port, 839 peer_port, 840 }); 841 if !pushed { 842 warn!( 843 "vsock: muxer.rxq full; dropping RST packet for lp={}, pp={}", 844 local_port, peer_port 845 ); 846 } 847 } 848 } 849 850 #[cfg(test)] 851 mod tests { 852 use super::super::super::csm::defs as csm_defs; 853 use super::super::super::tests::TestContext as VsockTestContext; 854 use super::*; 855 use std::io::Write; 856 use std::path::{Path, PathBuf}; 857 use virtio_queue::QueueOwnedT; 858 859 const PEER_CID: u32 = 3; 860 const PEER_BUF_ALLOC: u32 = 64 * 1024; 861 862 struct MuxerTestContext { 863 _vsock_test_ctx: VsockTestContext, 864 pkt: VsockPacket, 865 muxer: VsockMuxer, 866 } 867 868 impl Drop for MuxerTestContext { 869 fn drop(&mut self) { 870 std::fs::remove_file(self.muxer.host_sock_path.as_str()).unwrap(); 871 } 872 } 873 874 impl MuxerTestContext { 875 fn new(name: &str) -> Self { 876 let vsock_test_ctx = VsockTestContext::new(); 877 let mut handler_ctx = vsock_test_ctx.create_epoll_handler_context(); 878 let pkt = VsockPacket::from_rx_virtq_head( 879 &mut handler_ctx.handler.queues[0] 880 .iter(&vsock_test_ctx.mem) 881 .unwrap() 882 .next() 883 .unwrap(), 884 None, 885 ) 886 .unwrap(); 887 let uds_path = format!("test_vsock_{name}.sock"); 888 let muxer = VsockMuxer::new(PEER_CID, uds_path).unwrap(); 889 890 Self { 891 _vsock_test_ctx: vsock_test_ctx, 892 pkt, 893 muxer, 894 } 895 } 896 897 fn init_pkt(&mut self, local_port: u32, peer_port: u32, op: u16) -> &mut VsockPacket { 898 for b in self.pkt.hdr_mut() { 899 *b = 0; 900 } 901 self.pkt 902 .set_type(uapi::VSOCK_TYPE_STREAM) 903 .set_src_cid(PEER_CID.into()) 904 .set_dst_cid(uapi::VSOCK_HOST_CID) 905 .set_src_port(peer_port) 906 .set_dst_port(local_port) 907 .set_op(op) 908 .set_buf_alloc(PEER_BUF_ALLOC) 909 } 910 911 fn init_data_pkt( 912 &mut self, 913 local_port: u32, 914 peer_port: u32, 915 data: &[u8], 916 ) -> &mut VsockPacket { 917 assert!(data.len() <= self.pkt.buf().unwrap().len()); 918 self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RW) 919 .set_len(data.len() as u32); 920 self.pkt.buf_mut().unwrap()[..data.len()].copy_from_slice(data); 921 &mut self.pkt 922 } 923 924 fn send(&mut self) { 925 self.muxer.send_pkt(&self.pkt).unwrap(); 926 } 927 928 fn recv(&mut self) { 929 self.muxer.recv_pkt(&mut self.pkt).unwrap(); 930 } 931 932 fn notify_muxer(&mut self) { 933 self.muxer.notify(epoll::Events::EPOLLIN); 934 } 935 936 fn count_epoll_listeners(&self) -> (usize, usize) { 937 let mut local_lsn_count = 0usize; 938 let mut conn_lsn_count = 0usize; 939 for key in self.muxer.listener_map.values() { 940 match key { 941 EpollListener::LocalStream(_) => local_lsn_count += 1, 942 EpollListener::Connection { .. } => conn_lsn_count += 1, 943 _ => (), 944 }; 945 } 946 (local_lsn_count, conn_lsn_count) 947 } 948 949 fn create_local_listener(&self, port: u32) -> LocalListener { 950 LocalListener::new(format!("{}_{}", self.muxer.host_sock_path, port)) 951 } 952 953 fn local_connect(&mut self, peer_port: u32) -> (UnixStream, u32) { 954 let (init_local_lsn_count, init_conn_lsn_count) = self.count_epoll_listeners(); 955 956 let mut stream = UnixStream::connect(self.muxer.host_sock_path.clone()).unwrap(); 957 stream.set_nonblocking(true).unwrap(); 958 // The muxer would now get notified of a new connection having arrived at its Unix 959 // socket, so it can accept it. 960 self.notify_muxer(); 961 962 // Just after having accepted a new local connection, the muxer should've added a new 963 // `LocalStream` listener to its `listener_map`. 964 let (local_lsn_count, _) = self.count_epoll_listeners(); 965 assert_eq!(local_lsn_count, init_local_lsn_count + 1); 966 967 let buf = format!("CONNECT {peer_port}\n"); 968 stream.write_all(buf.as_bytes()).unwrap(); 969 // The muxer would now get notified that data is available for reading from the locally 970 // initiated connection. 971 self.notify_muxer(); 972 973 // Successfully reading and parsing the connection request should have removed the 974 // LocalStream epoll listener and added a Connection epoll listener. 975 let (local_lsn_count, conn_lsn_count) = self.count_epoll_listeners(); 976 assert_eq!(local_lsn_count, init_local_lsn_count); 977 assert_eq!(conn_lsn_count, init_conn_lsn_count + 1); 978 979 // A LocalInit connection should've been added to the muxer connection map. A new 980 // local port should also have been allocated for the new LocalInit connection. 981 let local_port = self.muxer.local_port_last; 982 let key = ConnMapKey { 983 local_port, 984 peer_port, 985 }; 986 assert!(self.muxer.conn_map.contains_key(&key)); 987 assert!(self.muxer.local_port_set.contains(&local_port)); 988 989 // A connection request for the peer should now be available from the muxer. 990 assert!(self.muxer.has_pending_rx()); 991 self.recv(); 992 assert_eq!(self.pkt.op(), uapi::VSOCK_OP_REQUEST); 993 assert_eq!(self.pkt.dst_port(), peer_port); 994 assert_eq!(self.pkt.src_port(), local_port); 995 996 self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RESPONSE); 997 self.send(); 998 999 let mut buf = [0u8; 32]; 1000 let len = stream.read(&mut buf[..]).unwrap(); 1001 assert_eq!(&buf[..len], format!("OK {local_port}\n").as_bytes()); 1002 1003 (stream, local_port) 1004 } 1005 } 1006 1007 struct LocalListener { 1008 path: PathBuf, 1009 sock: UnixListener, 1010 } 1011 impl LocalListener { 1012 fn new<P: AsRef<Path> + Clone>(path: P) -> Self { 1013 let path_buf = path.as_ref().to_path_buf(); 1014 let sock = UnixListener::bind(path).unwrap(); 1015 sock.set_nonblocking(true).unwrap(); 1016 Self { 1017 path: path_buf, 1018 sock, 1019 } 1020 } 1021 fn accept(&mut self) -> UnixStream { 1022 let (stream, _) = self.sock.accept().unwrap(); 1023 stream.set_nonblocking(true).unwrap(); 1024 stream 1025 } 1026 } 1027 impl Drop for LocalListener { 1028 fn drop(&mut self) { 1029 std::fs::remove_file(&self.path).unwrap(); 1030 } 1031 } 1032 1033 #[test] 1034 fn test_muxer_epoll_listener() { 1035 let ctx = MuxerTestContext::new("muxer_epoll_listener"); 1036 assert_eq!(ctx.muxer.get_polled_fd(), ctx.muxer.epoll_file.as_raw_fd()); 1037 assert_eq!(ctx.muxer.get_polled_evset(), epoll::Events::EPOLLIN); 1038 } 1039 1040 #[test] 1041 fn test_bad_peer_pkt() { 1042 const LOCAL_PORT: u32 = 1026; 1043 const PEER_PORT: u32 = 1025; 1044 const SOCK_DGRAM: u16 = 2; 1045 1046 let mut ctx = MuxerTestContext::new("bad_peer_pkt"); 1047 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST) 1048 .set_type(SOCK_DGRAM); 1049 ctx.send(); 1050 1051 // The guest sent a SOCK_DGRAM packet. Per the vsock spec, we need to reply with an RST 1052 // packet, since vsock only supports stream sockets. 1053 assert!(ctx.muxer.has_pending_rx()); 1054 ctx.recv(); 1055 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1056 assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1057 assert_eq!(ctx.pkt.dst_cid(), PEER_CID as u64); 1058 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1059 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1060 1061 // Any orphan (i.e. without a connection), non-RST packet, should be replied to with an 1062 // RST. 1063 let bad_ops = [ 1064 uapi::VSOCK_OP_RESPONSE, 1065 uapi::VSOCK_OP_CREDIT_REQUEST, 1066 uapi::VSOCK_OP_CREDIT_UPDATE, 1067 uapi::VSOCK_OP_SHUTDOWN, 1068 uapi::VSOCK_OP_RW, 1069 ]; 1070 for op in bad_ops.iter() { 1071 ctx.init_pkt(LOCAL_PORT, PEER_PORT, *op); 1072 ctx.send(); 1073 assert!(ctx.muxer.has_pending_rx()); 1074 ctx.recv(); 1075 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1076 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1077 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1078 } 1079 1080 // Any packet addressed to anything other than VSOCK_VHOST_CID should get dropped. 1081 assert!(!ctx.muxer.has_pending_rx()); 1082 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST) 1083 .set_dst_cid(uapi::VSOCK_HOST_CID + 1); 1084 ctx.send(); 1085 assert!(!ctx.muxer.has_pending_rx()); 1086 } 1087 1088 #[test] 1089 fn test_peer_connection() { 1090 const LOCAL_PORT: u32 = 1026; 1091 const PEER_PORT: u32 = 1025; 1092 1093 let mut ctx = MuxerTestContext::new("peer_connection"); 1094 1095 // Test peer connection refused. 1096 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST); 1097 ctx.send(); 1098 ctx.recv(); 1099 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1100 assert_eq!(ctx.pkt.len(), 0); 1101 assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1102 assert_eq!(ctx.pkt.dst_cid(), PEER_CID as u64); 1103 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1104 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1105 1106 // Test peer connection accepted. 1107 let mut listener = ctx.create_local_listener(LOCAL_PORT); 1108 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST); 1109 ctx.send(); 1110 assert_eq!(ctx.muxer.conn_map.len(), 1); 1111 let mut stream = listener.accept(); 1112 ctx.recv(); 1113 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1114 assert_eq!(ctx.pkt.len(), 0); 1115 assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1116 assert_eq!(ctx.pkt.dst_cid(), PEER_CID as u64); 1117 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1118 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1119 let key = ConnMapKey { 1120 local_port: LOCAL_PORT, 1121 peer_port: PEER_PORT, 1122 }; 1123 assert!(ctx.muxer.conn_map.contains_key(&key)); 1124 1125 // Test guest -> host data flow. 1126 let data = [1, 2, 3, 4]; 1127 ctx.init_data_pkt(LOCAL_PORT, PEER_PORT, &data); 1128 ctx.send(); 1129 let mut buf = vec![0; data.len()]; 1130 stream.read_exact(buf.as_mut_slice()).unwrap(); 1131 assert_eq!(buf.as_slice(), data); 1132 1133 // Test host -> guest data flow. 1134 let data = [5u8, 6, 7, 8]; 1135 stream.write_all(&data).unwrap(); 1136 1137 // When data is available on the local stream, an EPOLLIN event would normally be delivered 1138 // to the muxer's nested epoll FD. For testing only, we can fake that event notification 1139 // here. 1140 ctx.notify_muxer(); 1141 // After being notified, the muxer should've figured out that RX data was available for one 1142 // of its connections, so it should now be reporting that it can fill in an RX packet. 1143 assert!(ctx.muxer.has_pending_rx()); 1144 ctx.recv(); 1145 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 1146 assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data); 1147 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1148 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1149 1150 assert!(!ctx.muxer.has_pending_rx()); 1151 } 1152 1153 #[test] 1154 fn test_local_connection() { 1155 let mut ctx = MuxerTestContext::new("local_connection"); 1156 let peer_port = 1025; 1157 let (mut stream, local_port) = ctx.local_connect(peer_port); 1158 1159 // Test guest -> host data flow. 1160 let data = [1, 2, 3, 4]; 1161 ctx.init_data_pkt(local_port, peer_port, &data); 1162 ctx.send(); 1163 1164 let mut buf = vec![0u8; data.len()]; 1165 stream.read_exact(buf.as_mut_slice()).unwrap(); 1166 assert_eq!(buf.as_slice(), &data); 1167 1168 // Test host -> guest data flow. 1169 let data = [5, 6, 7, 8]; 1170 stream.write_all(&data).unwrap(); 1171 ctx.notify_muxer(); 1172 1173 assert!(ctx.muxer.has_pending_rx()); 1174 ctx.recv(); 1175 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 1176 assert_eq!(ctx.pkt.src_port(), local_port); 1177 assert_eq!(ctx.pkt.dst_port(), peer_port); 1178 assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data); 1179 } 1180 1181 #[test] 1182 fn test_local_close() { 1183 let peer_port = 1025; 1184 let mut ctx = MuxerTestContext::new("local_close"); 1185 let local_port; 1186 { 1187 let (_stream, local_port_) = ctx.local_connect(peer_port); 1188 local_port = local_port_; 1189 } 1190 // Local var `_stream` was now dropped, thus closing the local stream. After the muxer gets 1191 // notified via EPOLLIN, it should attempt to gracefully shutdown the connection, issuing a 1192 // VSOCK_OP_SHUTDOWN with both no-more-send and no-more-recv indications set. 1193 ctx.notify_muxer(); 1194 assert!(ctx.muxer.has_pending_rx()); 1195 ctx.recv(); 1196 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN); 1197 assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND, 0); 1198 assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV, 0); 1199 assert_eq!(ctx.pkt.src_port(), local_port); 1200 assert_eq!(ctx.pkt.dst_port(), peer_port); 1201 1202 // The connection should get removed (and its local port freed), after the peer replies 1203 // with an RST. 1204 ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RST); 1205 ctx.send(); 1206 let key = ConnMapKey { 1207 local_port, 1208 peer_port, 1209 }; 1210 assert!(!ctx.muxer.conn_map.contains_key(&key)); 1211 assert!(!ctx.muxer.local_port_set.contains(&local_port)); 1212 } 1213 1214 #[test] 1215 fn test_peer_close() { 1216 let peer_port = 1025; 1217 let local_port = 1026; 1218 let mut ctx = MuxerTestContext::new("peer_close"); 1219 1220 let mut sock = ctx.create_local_listener(local_port); 1221 ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_REQUEST); 1222 ctx.send(); 1223 let mut stream = sock.accept(); 1224 1225 assert!(ctx.muxer.has_pending_rx()); 1226 ctx.recv(); 1227 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1228 assert_eq!(ctx.pkt.src_port(), local_port); 1229 assert_eq!(ctx.pkt.dst_port(), peer_port); 1230 let key = ConnMapKey { 1231 local_port, 1232 peer_port, 1233 }; 1234 assert!(ctx.muxer.conn_map.contains_key(&key)); 1235 1236 // Emulate a full shutdown from the peer (no-more-send + no-more-recv). 1237 ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_SHUTDOWN) 1238 .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND) 1239 .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV); 1240 ctx.send(); 1241 1242 // Now, the muxer should remove the connection from its map, and reply with an RST. 1243 assert!(ctx.muxer.has_pending_rx()); 1244 ctx.recv(); 1245 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1246 assert_eq!(ctx.pkt.src_port(), local_port); 1247 assert_eq!(ctx.pkt.dst_port(), peer_port); 1248 let key = ConnMapKey { 1249 local_port, 1250 peer_port, 1251 }; 1252 assert!(!ctx.muxer.conn_map.contains_key(&key)); 1253 1254 // The muxer should also drop / close the local Unix socket for this connection. 1255 let mut buf = vec![0u8; 16]; 1256 assert_eq!(stream.read(buf.as_mut_slice()).unwrap(), 0); 1257 } 1258 1259 #[test] 1260 fn test_muxer_rxq() { 1261 let mut ctx = MuxerTestContext::new("muxer_rxq"); 1262 let local_port = 1026; 1263 let peer_port_first = 1025; 1264 let mut listener = ctx.create_local_listener(local_port); 1265 let mut streams: Vec<UnixStream> = Vec::new(); 1266 1267 for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE { 1268 ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST); 1269 ctx.send(); 1270 streams.push(listener.accept()); 1271 } 1272 1273 // The muxer RX queue should now be full (with connection responses), but still 1274 // synchronized. 1275 assert!(ctx.muxer.rxq.is_synced()); 1276 1277 // One more queued reply should desync the RX queue. 1278 ctx.init_pkt( 1279 local_port, 1280 (peer_port_first + defs::MUXER_RXQ_SIZE) as u32, 1281 uapi::VSOCK_OP_REQUEST, 1282 ); 1283 ctx.send(); 1284 assert!(!ctx.muxer.rxq.is_synced()); 1285 1286 // With an out-of-sync queue, an RST should evict any non-RST packet from the queue, and 1287 // take its place. We'll check that by making sure that the last packet popped from the 1288 // queue is an RST. 1289 ctx.init_pkt( 1290 local_port + 1, 1291 peer_port_first as u32, 1292 uapi::VSOCK_OP_REQUEST, 1293 ); 1294 ctx.send(); 1295 1296 for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE - 1 { 1297 ctx.recv(); 1298 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1299 // The response order should hold. The evicted response should have been the last 1300 // enqueued. 1301 assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 1302 } 1303 // There should be one more packet in the queue: the RST. 1304 assert_eq!(ctx.muxer.rxq.len(), 1); 1305 ctx.recv(); 1306 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1307 1308 // The queue should now be empty, but out-of-sync, so the muxer should report it has some 1309 // pending RX. 1310 assert!(ctx.muxer.rxq.is_empty()); 1311 assert!(!ctx.muxer.rxq.is_synced()); 1312 assert!(ctx.muxer.has_pending_rx()); 1313 1314 // The next recv should sync the queue back up. It should also yield one of the two 1315 // responses that are still left: 1316 // - the one that desynchronized the queue; and 1317 // - the one that got evicted by the RST. 1318 ctx.recv(); 1319 assert!(ctx.muxer.rxq.is_synced()); 1320 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1321 1322 assert!(ctx.muxer.has_pending_rx()); 1323 ctx.recv(); 1324 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1325 } 1326 1327 #[test] 1328 fn test_muxer_killq() { 1329 let mut ctx = MuxerTestContext::new("muxer_killq"); 1330 let local_port = 1026; 1331 let peer_port_first = 1025; 1332 let peer_port_last = peer_port_first + defs::MUXER_KILLQ_SIZE; 1333 let mut listener = ctx.create_local_listener(local_port); 1334 1335 for peer_port in peer_port_first..=peer_port_last { 1336 ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST); 1337 ctx.send(); 1338 ctx.notify_muxer(); 1339 ctx.recv(); 1340 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1341 assert_eq!(ctx.pkt.src_port(), local_port); 1342 assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 1343 { 1344 let _stream = listener.accept(); 1345 } 1346 ctx.notify_muxer(); 1347 ctx.recv(); 1348 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN); 1349 assert_eq!(ctx.pkt.src_port(), local_port); 1350 assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 1351 // The kill queue should be synchronized, up until the `defs::MUXER_KILLQ_SIZE`th 1352 // connection we schedule for termination. 1353 assert_eq!( 1354 ctx.muxer.killq.is_synced(), 1355 peer_port < peer_port_first + defs::MUXER_KILLQ_SIZE 1356 ); 1357 } 1358 1359 assert!(!ctx.muxer.killq.is_synced()); 1360 assert!(!ctx.muxer.has_pending_rx()); 1361 1362 // Wait for the kill timers to expire. 1363 std::thread::sleep(std::time::Duration::from_millis( 1364 csm_defs::CONN_SHUTDOWN_TIMEOUT_MS, 1365 )); 1366 1367 // Trigger a kill queue sweep, by requesting a new connection. 1368 ctx.init_pkt( 1369 local_port, 1370 peer_port_last as u32 + 1, 1371 uapi::VSOCK_OP_REQUEST, 1372 ); 1373 ctx.send(); 1374 1375 // After sweeping the kill queue, it should now be synced (assuming the RX queue is larger 1376 // than the kill queue, since an RST packet will be queued for each killed connection). 1377 assert!(ctx.muxer.killq.is_synced()); 1378 assert!(ctx.muxer.has_pending_rx()); 1379 // There should be `defs::MUXER_KILLQ_SIZE` RSTs in the RX queue, from terminating the 1380 // dying connections in the recent killq sweep. 1381 for _p in peer_port_first..peer_port_last { 1382 ctx.recv(); 1383 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1384 assert_eq!(ctx.pkt.src_port(), local_port); 1385 } 1386 1387 // There should be one more packet in the RX queue: the connection response our request 1388 // that triggered the kill queue sweep. 1389 ctx.recv(); 1390 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1391 assert_eq!(ctx.pkt.dst_port(), peer_port_last as u32 + 1); 1392 1393 assert!(!ctx.muxer.has_pending_rx()); 1394 } 1395 1396 #[test] 1397 fn test_regression_handshake() { 1398 // Address one of the issues found while fixing the following issue: 1399 // https://github.com/firecracker-microvm/firecracker/issues/1751 1400 // This test checks that the handshake message is not accounted for 1401 let mut ctx = MuxerTestContext::new("regression_handshake"); 1402 let peer_port = 1025; 1403 1404 // Create a local connection. 1405 let (_, local_port) = ctx.local_connect(peer_port); 1406 1407 // Get the connection from the connection map. 1408 let key = ConnMapKey { 1409 local_port, 1410 peer_port, 1411 }; 1412 let conn = ctx.muxer.conn_map.get_mut(&key).unwrap(); 1413 1414 // Check that fwd_cnt is 0 - "OK ..." was not accounted for. 1415 assert_eq!(conn.fwd_cnt().0, 0); 1416 } 1417 1418 #[test] 1419 fn test_regression_rxq_pop() { 1420 // Address one of the issues found while fixing the following issue: 1421 // https://github.com/firecracker-microvm/firecracker/issues/1751 1422 // This test checks that a connection is not popped out of the muxer 1423 // rxq when multiple flags are set 1424 let mut ctx = MuxerTestContext::new("regression_rxq_pop"); 1425 let peer_port = 1025; 1426 let (mut stream, local_port) = ctx.local_connect(peer_port); 1427 1428 // Send some data. 1429 let data = [5u8, 6, 7, 8]; 1430 stream.write_all(&data).unwrap(); 1431 ctx.notify_muxer(); 1432 1433 // Get the connection from the connection map. 1434 let key = ConnMapKey { 1435 local_port, 1436 peer_port, 1437 }; 1438 let conn = ctx.muxer.conn_map.get_mut(&key).unwrap(); 1439 1440 // Forcefully insert another flag. 1441 conn.insert_credit_update(); 1442 1443 // Call recv twice in order to check that the connection is still 1444 // in the rxq. 1445 assert!(ctx.muxer.has_pending_rx()); 1446 ctx.recv(); 1447 assert!(ctx.muxer.has_pending_rx()); 1448 ctx.recv(); 1449 1450 // Since initially the connection had two flags set, now there should 1451 // not be any pending RX in the muxer. 1452 assert!(!ctx.muxer.has_pending_rx()); 1453 } 1454 } 1455