1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 // 4 5 /// `VsockMuxer` is the device-facing component of the Unix domain sockets vsock backend. I.e. 6 /// by implementing the `VsockBackend` trait, it abstracts away the gory details of translating 7 /// between AF_VSOCK and AF_UNIX, and presents a clean interface to the rest of the vsock 8 /// device model. 9 /// 10 /// The vsock muxer has two main roles: 11 /// 1. Vsock connection multiplexer: 12 /// It's the muxer's job to create, manage, and terminate `VsockConnection` objects. The 13 /// muxer also routes packets to their owning connections. It does so via a connection 14 /// `HashMap`, keyed by what is basically a (host_port, guest_port) tuple. 15 /// Vsock packet traffic needs to be inspected, in order to detect connection request 16 /// packets (leading to the creation of a new connection), and connection reset packets 17 /// (leading to the termination of an existing connection). All other packets, though, must 18 /// belong to an existing connection and, as such, the muxer simply forwards them. 19 /// 2. Event dispatcher 20 /// There are three event categories that the vsock backend is interested it: 21 /// 1. A new host-initiated connection is ready to be accepted from the listening host Unix 22 /// socket; 23 /// 2. Data is available for reading from a newly-accepted host-initiated connection (i.e. 24 /// the host is ready to issue a vsock connection request, informing us of the 25 /// destination port to which it wants to connect); 26 /// 3. Some event was triggered for a connected Unix socket, that belongs to a 27 /// `VsockConnection`. 28 /// The muxer gets notified about all of these events, because, as a `VsockEpollListener` 29 /// implementor, it gets to register a nested epoll FD into the main VMM epolling loop. All 30 /// other pollable FDs are then registered under this nested epoll FD. 31 /// To route all these events to their handlers, the muxer uses another `HashMap` object, 32 /// mapping `RawFd`s to `EpollListener`s. 33 /// 34 use std::collections::{HashMap, HashSet}; 35 use std::fs::File; 36 use std::io::{self, Read}; 37 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; 38 use std::os::unix::net::{UnixListener, UnixStream}; 39 40 use super::super::csm::ConnState; 41 use super::super::defs::uapi; 42 use super::super::packet::VsockPacket; 43 use super::super::{ 44 Result as VsockResult, VsockBackend, VsockChannel, VsockEpollListener, VsockError, 45 }; 46 use super::defs; 47 use super::muxer_killq::MuxerKillQ; 48 use super::muxer_rxq::MuxerRxQ; 49 use super::MuxerConnection; 50 use super::{Error, Result}; 51 52 /// A unique identifier of a `MuxerConnection` object. Connections are stored in a hash map, 53 /// keyed by a `ConnMapKey` object. 54 /// 55 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] 56 pub struct ConnMapKey { 57 local_port: u32, 58 peer_port: u32, 59 } 60 61 /// A muxer RX queue item. 62 /// 63 #[derive(Clone, Copy, Debug)] 64 pub enum MuxerRx { 65 /// The packet must be fetched from the connection identified by `ConnMapKey`. 66 ConnRx(ConnMapKey), 67 /// The muxer must produce an RST packet. 68 RstPkt { local_port: u32, peer_port: u32 }, 69 } 70 71 /// An epoll listener, registered under the muxer's nested epoll FD. 72 /// 73 enum EpollListener { 74 /// The listener is a `MuxerConnection`, identified by `key`, and interested in the events 75 /// in `evset`. Since `MuxerConnection` implements `VsockEpollListener`, notifications will 76 /// be forwarded to the listener via `VsockEpollListener::notify()`. 77 Connection { 78 key: ConnMapKey, 79 evset: epoll::Events, 80 }, 81 /// A listener interested in new host-initiated connections. 82 HostSock, 83 /// A listener interested in reading host "connect <port>" commands from a freshly 84 /// connected host socket. 85 LocalStream(UnixStream), 86 } 87 88 /// The vsock connection multiplexer. 89 /// 90 pub struct VsockMuxer { 91 /// Guest CID. 92 cid: u64, 93 /// A hash map used to store the active connections. 94 conn_map: HashMap<ConnMapKey, MuxerConnection>, 95 /// A hash map used to store epoll event listeners / handlers. 96 listener_map: HashMap<RawFd, EpollListener>, 97 /// The RX queue. Items in this queue are consumed by `VsockMuxer::recv_pkt()`, and 98 /// produced 99 /// - by `VsockMuxer::send_pkt()` (e.g. RST in response to a connection request packet); 100 /// and 101 /// - in response to EPOLLIN events (e.g. data available to be read from an AF_UNIX 102 /// socket). 103 rxq: MuxerRxQ, 104 /// A queue used for terminating connections that are taking too long to shut down. 105 killq: MuxerKillQ, 106 /// The Unix socket, through which host-initiated connections are accepted. 107 host_sock: UnixListener, 108 /// The file system path of the host-side Unix socket. This is used to figure out the path 109 /// to Unix sockets listening on specific ports. I.e. "<this path>_<port number>". 110 host_sock_path: String, 111 /// The nested epoll File, used to register epoll listeners. 112 epoll_file: File, 113 /// A hash set used to keep track of used host-side (local) ports, in order to assign local 114 /// ports to host-initiated connections. 115 local_port_set: HashSet<u32>, 116 /// The last used host-side port. 117 local_port_last: u32, 118 } 119 120 impl VsockChannel for VsockMuxer { 121 /// Deliver a vsock packet to the guest vsock driver. 122 /// 123 /// Returns: 124 /// - `Ok(())`: `pkt` has been successfully filled in; or 125 /// - `Err(VsockError::NoData)`: there was no available data with which to fill in the 126 /// packet. 127 /// 128 fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> { 129 // We'll look for instructions on how to build the RX packet in the RX queue. If the 130 // queue is empty, that doesn't necessarily mean we don't have any pending RX, since 131 // the queue might be out-of-sync. If that's the case, we'll attempt to sync it first, 132 // and then try to pop something out again. 133 if self.rxq.is_empty() && !self.rxq.is_synced() { 134 self.rxq = MuxerRxQ::from_conn_map(&self.conn_map); 135 } 136 137 while let Some(rx) = self.rxq.peek() { 138 let res = match rx { 139 // We need to build an RST packet, going from `local_port` to `peer_port`. 140 MuxerRx::RstPkt { 141 local_port, 142 peer_port, 143 } => { 144 pkt.set_op(uapi::VSOCK_OP_RST) 145 .set_src_cid(uapi::VSOCK_HOST_CID) 146 .set_dst_cid(self.cid) 147 .set_src_port(local_port) 148 .set_dst_port(peer_port) 149 .set_len(0) 150 .set_type(uapi::VSOCK_TYPE_STREAM) 151 .set_flags(0) 152 .set_buf_alloc(0) 153 .set_fwd_cnt(0); 154 self.rxq.pop().unwrap(); 155 return Ok(()); 156 } 157 158 // We'll defer building the packet to this connection, since it has something 159 // to say. 160 MuxerRx::ConnRx(key) => { 161 let mut conn_res = Err(VsockError::NoData); 162 let mut do_pop = true; 163 self.apply_conn_mutation(key, |conn| { 164 conn_res = conn.recv_pkt(pkt); 165 do_pop = !conn.has_pending_rx(); 166 }); 167 if do_pop { 168 self.rxq.pop().unwrap(); 169 } 170 conn_res 171 } 172 }; 173 174 if res.is_ok() { 175 // Inspect traffic, looking for RST packets, since that means we have to 176 // terminate and remove this connection from the active connection pool. 177 // 178 if pkt.op() == uapi::VSOCK_OP_RST { 179 self.remove_connection(ConnMapKey { 180 local_port: pkt.src_port(), 181 peer_port: pkt.dst_port(), 182 }); 183 } 184 185 debug!("vsock muxer: RX pkt: {:?}", pkt.hdr()); 186 return Ok(()); 187 } 188 } 189 190 Err(VsockError::NoData) 191 } 192 193 /// Deliver a guest-generated packet to its destination in the vsock backend. 194 /// 195 /// This absorbs unexpected packets, handles RSTs (by dropping connections), and forwards 196 /// all the rest to their owning `MuxerConnection`. 197 /// 198 /// Returns: 199 /// always `Ok(())` - the packet has been consumed, and its virtio TX buffers can be 200 /// returned to the guest vsock driver. 201 /// 202 fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> { 203 let conn_key = ConnMapKey { 204 local_port: pkt.dst_port(), 205 peer_port: pkt.src_port(), 206 }; 207 208 debug!( 209 "vsock: muxer.send[rxq.len={}]: {:?}", 210 self.rxq.len(), 211 pkt.hdr() 212 ); 213 214 // If this packet has an unsupported type (!=stream), we must send back an RST. 215 // 216 if pkt.type_() != uapi::VSOCK_TYPE_STREAM { 217 self.enq_rst(pkt.dst_port(), pkt.src_port()); 218 return Ok(()); 219 } 220 221 // We don't know how to handle packets addressed to other CIDs. We only handle the host 222 // part of the guest - host communication here. 223 if pkt.dst_cid() != uapi::VSOCK_HOST_CID { 224 info!( 225 "vsock: dropping guest packet for unknown CID: {:?}", 226 pkt.hdr() 227 ); 228 return Ok(()); 229 } 230 231 if !self.conn_map.contains_key(&conn_key) { 232 // This packet can't be routed to any active connection (based on its src and dst 233 // ports). The only orphan / unroutable packets we know how to handle are 234 // connection requests. 235 if pkt.op() == uapi::VSOCK_OP_REQUEST { 236 // Oh, this is a connection request! 237 self.handle_peer_request_pkt(pkt); 238 } else { 239 // Send back an RST, to let the drive know we weren't expecting this packet. 240 self.enq_rst(pkt.dst_port(), pkt.src_port()); 241 } 242 return Ok(()); 243 } 244 245 // Right, we know where to send this packet, then (to `conn_key`). 246 // However, if this is an RST, we have to forcefully terminate the connection, so 247 // there's no point in forwarding it the packet. 248 if pkt.op() == uapi::VSOCK_OP_RST { 249 self.remove_connection(conn_key); 250 return Ok(()); 251 } 252 253 // Alright, everything looks in order - forward this packet to its owning connection. 254 let mut res: VsockResult<()> = Ok(()); 255 self.apply_conn_mutation(conn_key, |conn| { 256 res = conn.send_pkt(pkt); 257 }); 258 259 res 260 } 261 262 /// Check if the muxer has any pending RX data, with which to fill a guest-provided RX 263 /// buffer. 264 /// 265 fn has_pending_rx(&self) -> bool { 266 !self.rxq.is_empty() || !self.rxq.is_synced() 267 } 268 } 269 270 impl VsockEpollListener for VsockMuxer { 271 /// Get the FD to be registered for polling upstream (in the main VMM epoll loop, in this 272 /// case). 273 /// 274 /// This will be the muxer's nested epoll FD. 275 /// 276 fn get_polled_fd(&self) -> RawFd { 277 self.epoll_file.as_raw_fd() 278 } 279 280 /// Get the epoll events to be polled upstream. 281 /// 282 /// Since the polled FD is a nested epoll FD, we're only interested in EPOLLIN events (i.e. 283 /// some event occurred on one of the FDs registered under our epoll FD). 284 /// 285 fn get_polled_evset(&self) -> epoll::Events { 286 epoll::Events::EPOLLIN 287 } 288 289 /// Notify the muxer about a pending event having occurred under its nested epoll FD. 290 /// 291 fn notify(&mut self, _: epoll::Events) { 292 debug!("vsock: muxer received kick"); 293 294 let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); 32]; 295 'epoll: loop { 296 match epoll::wait(self.epoll_file.as_raw_fd(), 0, epoll_events.as_mut_slice()) { 297 Ok(ev_cnt) => { 298 #[allow(clippy::needless_range_loop)] 299 for i in 0..ev_cnt { 300 self.handle_event( 301 epoll_events[i].data as RawFd, 302 // It's ok to unwrap here, since the `epoll_events[i].events` is filled 303 // in by `epoll::wait()`, and therefore contains only valid epoll 304 // flags. 305 epoll::Events::from_bits(epoll_events[i].events).unwrap(), 306 ); 307 } 308 } 309 Err(e) => { 310 if e.kind() == io::ErrorKind::Interrupted { 311 // It's well defined from the epoll_wait() syscall 312 // documentation that the epoll loop can be interrupted 313 // before any of the requested events occurred or the 314 // timeout expired. In both those cases, epoll_wait() 315 // returns an error of type EINTR, but this should not 316 // be considered as a regular error. Instead it is more 317 // appropriate to retry, by calling into epoll_wait(). 318 continue; 319 } 320 warn!("vsock: failed to consume muxer epoll event: {}", e); 321 } 322 } 323 break 'epoll; 324 } 325 } 326 } 327 328 impl VsockBackend for VsockMuxer {} 329 330 impl VsockMuxer { 331 /// Muxer constructor. 332 /// 333 pub fn new(cid: u64, host_sock_path: String) -> Result<Self> { 334 // Create the nested epoll FD. This FD will be added to the VMM `EpollContext`, at 335 // device activation time. 336 let epoll_fd = epoll::create(true).map_err(Error::EpollFdCreate)?; 337 // Use 'File' to enforce closing on 'epoll_fd' 338 let epoll_file = unsafe { File::from_raw_fd(epoll_fd) }; 339 340 // Open/bind/listen on the host Unix socket, so we can accept host-initiated 341 // connections. 342 let host_sock = UnixListener::bind(&host_sock_path) 343 .and_then(|sock| sock.set_nonblocking(true).map(|_| sock)) 344 .map_err(Error::UnixBind)?; 345 346 let mut muxer = Self { 347 cid, 348 host_sock, 349 host_sock_path, 350 epoll_file, 351 rxq: MuxerRxQ::new(), 352 conn_map: HashMap::with_capacity(defs::MAX_CONNECTIONS), 353 listener_map: HashMap::with_capacity(defs::MAX_CONNECTIONS + 1), 354 killq: MuxerKillQ::new(), 355 local_port_last: (1u32 << 30) - 1, 356 local_port_set: HashSet::with_capacity(defs::MAX_CONNECTIONS), 357 }; 358 359 muxer.add_listener(muxer.host_sock.as_raw_fd(), EpollListener::HostSock)?; 360 Ok(muxer) 361 } 362 363 /// Handle/dispatch an epoll event to its listener. 364 /// 365 fn handle_event(&mut self, fd: RawFd, evset: epoll::Events) { 366 debug!( 367 "vsock: muxer processing event: fd={}, evset={:?}", 368 fd, evset 369 ); 370 371 match self.listener_map.get_mut(&fd) { 372 // This event needs to be forwarded to a `MuxerConnection` that is listening for 373 // it. 374 // 375 Some(EpollListener::Connection { key, evset }) => { 376 let key_copy = *key; 377 let evset_copy = *evset; 378 // The handling of this event will most probably mutate the state of the 379 // receiving connection. We'll need to check for new pending RX, event set 380 // mutation, and all that, so we're wrapping the event delivery inside those 381 // checks. 382 self.apply_conn_mutation(key_copy, |conn| { 383 conn.notify(evset_copy); 384 }); 385 } 386 387 // A new host-initiated connection is ready to be accepted. 388 // 389 Some(EpollListener::HostSock) => { 390 if self.conn_map.len() == defs::MAX_CONNECTIONS { 391 // If we're already maxed-out on connections, we'll just accept and 392 // immediately discard this potentially new one. 393 warn!("vsock: connection limit reached; refusing new host connection"); 394 self.host_sock.accept().map(|_| 0).unwrap_or(0); 395 return; 396 } 397 self.host_sock 398 .accept() 399 .map_err(Error::UnixAccept) 400 .and_then(|(stream, _)| { 401 stream 402 .set_nonblocking(true) 403 .map(|_| stream) 404 .map_err(Error::UnixAccept) 405 }) 406 .and_then(|stream| { 407 // Before forwarding this connection to a listening AF_VSOCK socket on 408 // the guest side, we need to know the destination port. We'll read 409 // that port from a "connect" command received on this socket, so the 410 // next step is to ask to be notified the moment we can read from it. 411 self.add_listener(stream.as_raw_fd(), EpollListener::LocalStream(stream)) 412 }) 413 .unwrap_or_else(|err| { 414 warn!("vsock: unable to accept local connection: {:?}", err); 415 }); 416 } 417 418 // Data is ready to be read from a host-initiated connection. That would be the 419 // "connect" command that we're expecting. 420 Some(EpollListener::LocalStream(_)) => { 421 if let Some(EpollListener::LocalStream(mut stream)) = self.remove_listener(fd) { 422 Self::read_local_stream_port(&mut stream) 423 .map(|peer_port| (self.allocate_local_port(), peer_port)) 424 .and_then(|(local_port, peer_port)| { 425 self.add_connection( 426 ConnMapKey { 427 local_port, 428 peer_port, 429 }, 430 MuxerConnection::new_local_init( 431 stream, 432 uapi::VSOCK_HOST_CID, 433 self.cid, 434 local_port, 435 peer_port, 436 ), 437 ) 438 }) 439 .unwrap_or_else(|err| { 440 info!("vsock: error adding local-init connection: {:?}", err); 441 }) 442 } 443 } 444 445 _ => { 446 info!("vsock: unexpected event: fd={:?}, evset={:?}", fd, evset); 447 } 448 } 449 } 450 451 /// Parse a host "connect" command, and extract the destination vsock port. 452 /// 453 fn read_local_stream_port(stream: &mut UnixStream) -> Result<u32> { 454 let mut buf = [0u8; 32]; 455 456 // This is the minimum number of bytes that we should be able to read, when parsing a 457 // valid connection request. I.e. `b"connect 0\n".len()`. 458 const MIN_READ_LEN: usize = 10; 459 460 // Bring in the minimum number of bytes that we should be able to read. 461 stream 462 .read_exact(&mut buf[..MIN_READ_LEN]) 463 .map_err(Error::UnixRead)?; 464 465 // Now, finish reading the destination port number, by bringing in one byte at a time, 466 // until we reach an EOL terminator (or our buffer space runs out). Yeah, not 467 // particularly proud of this approach, but it will have to do for now. 468 let mut blen = MIN_READ_LEN; 469 while buf[blen - 1] != b'\n' && blen < buf.len() { 470 stream 471 .read_exact(&mut buf[blen..=blen]) 472 .map_err(Error::UnixRead)?; 473 blen += 1; 474 } 475 476 let mut word_iter = std::str::from_utf8(&buf[..blen]) 477 .map_err(Error::ConvertFromUtf8)? 478 .split_whitespace(); 479 480 word_iter 481 .next() 482 .ok_or(Error::InvalidPortRequest) 483 .and_then(|word| { 484 if word.to_lowercase() == "connect" { 485 Ok(()) 486 } else { 487 Err(Error::InvalidPortRequest) 488 } 489 }) 490 .and_then(|_| word_iter.next().ok_or(Error::InvalidPortRequest)) 491 .and_then(|word| word.parse::<u32>().map_err(Error::ParseInteger)) 492 .map_err(|e| Error::ReadStreamPort(Box::new(e))) 493 } 494 495 /// Add a new connection to the active connection pool. 496 /// 497 fn add_connection(&mut self, key: ConnMapKey, conn: MuxerConnection) -> Result<()> { 498 // We might need to make room for this new connection, so let's sweep the kill queue 499 // first. It's fine to do this here because: 500 // - unless the kill queue is out of sync, this is a pretty inexpensive operation; and 501 // - we are under no pressure to respect any accurate timing for connection 502 // termination. 503 self.sweep_killq(); 504 505 if self.conn_map.len() >= defs::MAX_CONNECTIONS { 506 info!( 507 "vsock: muxer connection limit reached ({})", 508 defs::MAX_CONNECTIONS 509 ); 510 return Err(Error::TooManyConnections); 511 } 512 513 self.add_listener( 514 conn.get_polled_fd(), 515 EpollListener::Connection { 516 key, 517 evset: conn.get_polled_evset(), 518 }, 519 ) 520 .map(|_| { 521 if conn.has_pending_rx() { 522 // We can safely ignore any error in adding a connection RX indication. Worst 523 // case scenario, the RX queue will get desynchronized, but we'll handle that 524 // the next time we need to yield an RX packet. 525 self.rxq.push(MuxerRx::ConnRx(key)); 526 } 527 self.conn_map.insert(key, conn); 528 }) 529 } 530 531 /// Remove a connection from the active connection poll. 532 /// 533 fn remove_connection(&mut self, key: ConnMapKey) { 534 if let Some(conn) = self.conn_map.remove(&key) { 535 self.remove_listener(conn.get_polled_fd()); 536 } 537 self.free_local_port(key.local_port); 538 } 539 540 /// Schedule a connection for immediate termination. 541 /// I.e. as soon as we can also let our peer know we're dropping the connection, by sending 542 /// it an RST packet. 543 /// 544 fn kill_connection(&mut self, key: ConnMapKey) { 545 let mut had_rx = false; 546 self.conn_map.entry(key).and_modify(|conn| { 547 had_rx = conn.has_pending_rx(); 548 conn.kill(); 549 }); 550 // This connection will now have an RST packet to yield, so we need to add it to the RX 551 // queue. However, there's no point in doing that if it was already in the queue. 552 if !had_rx { 553 // We can safely ignore any error in adding a connection RX indication. Worst case 554 // scenario, the RX queue will get desynchronized, but we'll handle that the next 555 // time we need to yield an RX packet. 556 self.rxq.push(MuxerRx::ConnRx(key)); 557 } 558 } 559 560 /// Register a new epoll listener under the muxer's nested epoll FD. 561 /// 562 fn add_listener(&mut self, fd: RawFd, listener: EpollListener) -> Result<()> { 563 let evset = match listener { 564 EpollListener::Connection { evset, .. } => evset, 565 EpollListener::LocalStream(_) => epoll::Events::EPOLLIN, 566 EpollListener::HostSock => epoll::Events::EPOLLIN, 567 }; 568 569 epoll::ctl( 570 self.epoll_file.as_raw_fd(), 571 epoll::ControlOptions::EPOLL_CTL_ADD, 572 fd, 573 epoll::Event::new(evset, fd as u64), 574 ) 575 .map(|_| { 576 self.listener_map.insert(fd, listener); 577 }) 578 .map_err(Error::EpollAdd)?; 579 580 Ok(()) 581 } 582 583 /// Remove (and return) a previously registered epoll listener. 584 /// 585 fn remove_listener(&mut self, fd: RawFd) -> Option<EpollListener> { 586 let maybe_listener = self.listener_map.remove(&fd); 587 588 if maybe_listener.is_some() { 589 epoll::ctl( 590 self.epoll_file.as_raw_fd(), 591 epoll::ControlOptions::EPOLL_CTL_DEL, 592 fd, 593 epoll::Event::new(epoll::Events::empty(), 0), 594 ) 595 .unwrap_or_else(|err| { 596 warn!( 597 "vosck muxer: error removing epoll listener for fd {:?}: {:?}", 598 fd, err 599 ); 600 }); 601 } 602 603 maybe_listener 604 } 605 606 /// Allocate a host-side port to be assigned to a new host-initiated connection. 607 /// 608 /// 609 fn allocate_local_port(&mut self) -> u32 { 610 // TODO: this doesn't seem very space-efficient. 611 // Maybe rewrite this to limit port range and use a bitmap? 612 // 613 614 loop { 615 self.local_port_last = (self.local_port_last + 1) & !(1 << 31) | (1 << 30); 616 if self.local_port_set.insert(self.local_port_last) { 617 break; 618 } 619 } 620 self.local_port_last 621 } 622 623 /// Mark a previously used host-side port as free. 624 /// 625 fn free_local_port(&mut self, port: u32) { 626 self.local_port_set.remove(&port); 627 } 628 629 /// Handle a new connection request coming from our peer (the guest vsock driver). 630 /// 631 /// This will attempt to connect to a host-side Unix socket, expected to be listening at 632 /// the file system path corresponding to the destination port. If successful, a new 633 /// connection object will be created and added to the connection pool. On failure, a new 634 /// RST packet will be scheduled for delivery to the guest. 635 /// 636 fn handle_peer_request_pkt(&mut self, pkt: &VsockPacket) { 637 let port_path = format!("{}_{}", self.host_sock_path, pkt.dst_port()); 638 639 UnixStream::connect(port_path) 640 .and_then(|stream| stream.set_nonblocking(true).map(|_| stream)) 641 .map_err(Error::UnixConnect) 642 .and_then(|stream| { 643 self.add_connection( 644 ConnMapKey { 645 local_port: pkt.dst_port(), 646 peer_port: pkt.src_port(), 647 }, 648 MuxerConnection::new_peer_init( 649 stream, 650 uapi::VSOCK_HOST_CID, 651 self.cid, 652 pkt.dst_port(), 653 pkt.src_port(), 654 pkt.buf_alloc(), 655 ), 656 ) 657 }) 658 .unwrap_or_else(|_| self.enq_rst(pkt.dst_port(), pkt.src_port())); 659 } 660 661 /// Perform an action that might mutate a connection's state. 662 /// 663 /// This is used as shorthand for repetitive tasks that need to be performed after a 664 /// connection object mutates. E.g. 665 /// - update the connection's epoll listener; 666 /// - schedule the connection to be queried for RX data; 667 /// - kill the connection if an unrecoverable error occurs. 668 /// 669 fn apply_conn_mutation<F>(&mut self, key: ConnMapKey, mut_fn: F) 670 where 671 F: FnOnce(&mut MuxerConnection), 672 { 673 if let Some(conn) = self.conn_map.get_mut(&key) { 674 let had_rx = conn.has_pending_rx(); 675 let was_expiring = conn.will_expire(); 676 let prev_state = conn.state(); 677 678 mut_fn(conn); 679 680 // If this is a host-initiated connection that has just become established, we'll have 681 // to send an ack message to the host end. 682 if prev_state == ConnState::LocalInit && conn.state() == ConnState::Established { 683 let msg = format!("OK {}\n", key.local_port); 684 match conn.send_bytes_raw(msg.as_bytes()) { 685 Ok(written) if written == msg.len() => (), 686 Ok(_) => { 687 // If we can't write a dozen bytes to a pristine connection something 688 // must be really wrong. Killing it. 689 conn.kill(); 690 warn!("vsock: unable to fully write connection ack msg."); 691 } 692 Err(err) => { 693 conn.kill(); 694 warn!("vsock: unable to ack host connection: {:?}", err); 695 } 696 }; 697 } 698 699 // If the connection wasn't previously scheduled for RX, add it to our RX queue. 700 if !had_rx && conn.has_pending_rx() { 701 self.rxq.push(MuxerRx::ConnRx(key)); 702 } 703 704 // If the connection wasn't previously scheduled for termination, add it to the 705 // kill queue. 706 if !was_expiring && conn.will_expire() { 707 // It's safe to unwrap here, since `conn.will_expire()` already guaranteed that 708 // an `conn.expiry` is available. 709 self.killq.push(key, conn.expiry().unwrap()); 710 } 711 712 let fd = conn.get_polled_fd(); 713 let new_evset = conn.get_polled_evset(); 714 if new_evset.is_empty() { 715 // If the connection no longer needs epoll notifications, remove its listener 716 // from our list. 717 self.remove_listener(fd); 718 return; 719 } 720 if let Some(EpollListener::Connection { evset, .. }) = self.listener_map.get_mut(&fd) { 721 if *evset != new_evset { 722 // If the set of events that the connection is interested in has changed, 723 // we need to update its epoll listener. 724 debug!( 725 "vsock: updating listener for (lp={}, pp={}): old={:?}, new={:?}", 726 key.local_port, key.peer_port, *evset, new_evset 727 ); 728 729 *evset = new_evset; 730 epoll::ctl( 731 self.epoll_file.as_raw_fd(), 732 epoll::ControlOptions::EPOLL_CTL_MOD, 733 fd, 734 epoll::Event::new(new_evset, fd as u64), 735 ) 736 .unwrap_or_else(|err| { 737 // This really shouldn't happen, like, ever. However, "famous last 738 // words" and all that, so let's just kill it with fire, and walk away. 739 self.kill_connection(key); 740 error!( 741 "vsock: error updating epoll listener for (lp={}, pp={}): {:?}", 742 key.local_port, key.peer_port, err 743 ); 744 }); 745 } 746 } else { 747 // The connection had previously asked to be removed from the listener map (by 748 // returning an empty event set via `get_polled_fd()`), but now wants back in. 749 self.add_listener( 750 fd, 751 EpollListener::Connection { 752 key, 753 evset: new_evset, 754 }, 755 ) 756 .unwrap_or_else(|err| { 757 self.kill_connection(key); 758 error!( 759 "vsock: error updating epoll listener for (lp={}, pp={}): {:?}", 760 key.local_port, key.peer_port, err 761 ); 762 }); 763 } 764 } 765 } 766 767 /// Check if any connections have timed out, and if so, schedule them for immediate 768 /// termination. 769 /// 770 fn sweep_killq(&mut self) { 771 while let Some(key) = self.killq.pop() { 772 // Connections don't get removed from the kill queue when their kill timer is 773 // disarmed, since that would be a costly operation. This means we must check if 774 // the connection has indeed expired, prior to killing it. 775 let mut kill = false; 776 self.conn_map 777 .entry(key) 778 .and_modify(|conn| kill = conn.has_expired()); 779 if kill { 780 self.kill_connection(key); 781 } 782 } 783 784 if self.killq.is_empty() && !self.killq.is_synced() { 785 self.killq = MuxerKillQ::from_conn_map(&self.conn_map); 786 // If we've just re-created the kill queue, we can sweep it again; maybe there's 787 // more to kill. 788 self.sweep_killq(); 789 } 790 } 791 792 /// Enqueue an RST packet into `self.rxq`. 793 /// 794 /// Enqueue errors aren't propagated up the call chain, since there is nothing we can do to 795 /// handle them. We do, however, log a warning, since not being able to enqueue an RST 796 /// packet means we have to drop it, which is not normal operation. 797 /// 798 fn enq_rst(&mut self, local_port: u32, peer_port: u32) { 799 let pushed = self.rxq.push(MuxerRx::RstPkt { 800 local_port, 801 peer_port, 802 }); 803 if !pushed { 804 warn!( 805 "vsock: muxer.rxq full; dropping RST packet for lp={}, pp={}", 806 local_port, peer_port 807 ); 808 } 809 } 810 } 811 812 #[cfg(test)] 813 mod tests { 814 use std::io::{Read, Write}; 815 use std::ops::Drop; 816 use std::os::unix::net::{UnixListener, UnixStream}; 817 use std::path::{Path, PathBuf}; 818 819 use super::super::super::csm::defs as csm_defs; 820 use super::super::super::tests::TestContext as VsockTestContext; 821 use super::*; 822 823 const PEER_CID: u64 = 3; 824 const PEER_BUF_ALLOC: u32 = 64 * 1024; 825 826 struct MuxerTestContext { 827 _vsock_test_ctx: VsockTestContext, 828 pkt: VsockPacket, 829 muxer: VsockMuxer, 830 } 831 832 impl Drop for MuxerTestContext { 833 fn drop(&mut self) { 834 std::fs::remove_file(self.muxer.host_sock_path.as_str()).unwrap(); 835 } 836 } 837 838 impl MuxerTestContext { 839 fn new(name: &str) -> Self { 840 let vsock_test_ctx = VsockTestContext::new(); 841 let mut handler_ctx = vsock_test_ctx.create_epoll_handler_context(); 842 let pkt = VsockPacket::from_rx_virtq_head( 843 &handler_ctx.handler.queues[0] 844 .iter(&vsock_test_ctx.mem) 845 .next() 846 .unwrap(), 847 ) 848 .unwrap(); 849 let uds_path = format!("test_vsock_{}.sock", name); 850 let muxer = VsockMuxer::new(PEER_CID, uds_path).unwrap(); 851 852 Self { 853 _vsock_test_ctx: vsock_test_ctx, 854 pkt, 855 muxer, 856 } 857 } 858 859 fn init_pkt(&mut self, local_port: u32, peer_port: u32, op: u16) -> &mut VsockPacket { 860 for b in self.pkt.hdr_mut() { 861 *b = 0; 862 } 863 self.pkt 864 .set_type(uapi::VSOCK_TYPE_STREAM) 865 .set_src_cid(PEER_CID) 866 .set_dst_cid(uapi::VSOCK_HOST_CID) 867 .set_src_port(peer_port) 868 .set_dst_port(local_port) 869 .set_op(op) 870 .set_buf_alloc(PEER_BUF_ALLOC) 871 } 872 873 fn init_data_pkt( 874 &mut self, 875 local_port: u32, 876 peer_port: u32, 877 data: &[u8], 878 ) -> &mut VsockPacket { 879 assert!(data.len() <= self.pkt.buf().unwrap().len() as usize); 880 self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RW) 881 .set_len(data.len() as u32); 882 self.pkt.buf_mut().unwrap()[..data.len()].copy_from_slice(data); 883 &mut self.pkt 884 } 885 886 fn send(&mut self) { 887 self.muxer.send_pkt(&self.pkt).unwrap(); 888 } 889 890 fn recv(&mut self) { 891 self.muxer.recv_pkt(&mut self.pkt).unwrap(); 892 } 893 894 fn notify_muxer(&mut self) { 895 self.muxer.notify(epoll::Events::EPOLLIN); 896 } 897 898 fn count_epoll_listeners(&self) -> (usize, usize) { 899 let mut local_lsn_count = 0usize; 900 let mut conn_lsn_count = 0usize; 901 for key in self.muxer.listener_map.values() { 902 match key { 903 EpollListener::LocalStream(_) => local_lsn_count += 1, 904 EpollListener::Connection { .. } => conn_lsn_count += 1, 905 _ => (), 906 }; 907 } 908 (local_lsn_count, conn_lsn_count) 909 } 910 911 fn create_local_listener(&self, port: u32) -> LocalListener { 912 LocalListener::new(format!("{}_{}", self.muxer.host_sock_path, port)) 913 } 914 915 fn local_connect(&mut self, peer_port: u32) -> (UnixStream, u32) { 916 let (init_local_lsn_count, init_conn_lsn_count) = self.count_epoll_listeners(); 917 918 let mut stream = UnixStream::connect(self.muxer.host_sock_path.clone()).unwrap(); 919 stream.set_nonblocking(true).unwrap(); 920 // The muxer would now get notified of a new connection having arrived at its Unix 921 // socket, so it can accept it. 922 self.notify_muxer(); 923 924 // Just after having accepted a new local connection, the muxer should've added a new 925 // `LocalStream` listener to its `listener_map`. 926 let (local_lsn_count, _) = self.count_epoll_listeners(); 927 assert_eq!(local_lsn_count, init_local_lsn_count + 1); 928 929 let buf = format!("CONNECT {}\n", peer_port); 930 stream.write_all(buf.as_bytes()).unwrap(); 931 // The muxer would now get notified that data is available for reading from the locally 932 // initiated connection. 933 self.notify_muxer(); 934 935 // Successfully reading and parsing the connection request should have removed the 936 // LocalStream epoll listener and added a Connection epoll listener. 937 let (local_lsn_count, conn_lsn_count) = self.count_epoll_listeners(); 938 assert_eq!(local_lsn_count, init_local_lsn_count); 939 assert_eq!(conn_lsn_count, init_conn_lsn_count + 1); 940 941 // A LocalInit connection should've been added to the muxer connection map. A new 942 // local port should also have been allocated for the new LocalInit connection. 943 let local_port = self.muxer.local_port_last; 944 let key = ConnMapKey { 945 local_port, 946 peer_port, 947 }; 948 assert!(self.muxer.conn_map.contains_key(&key)); 949 assert!(self.muxer.local_port_set.contains(&local_port)); 950 951 // A connection request for the peer should now be available from the muxer. 952 assert!(self.muxer.has_pending_rx()); 953 self.recv(); 954 assert_eq!(self.pkt.op(), uapi::VSOCK_OP_REQUEST); 955 assert_eq!(self.pkt.dst_port(), peer_port); 956 assert_eq!(self.pkt.src_port(), local_port); 957 958 self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RESPONSE); 959 self.send(); 960 961 let mut buf = vec![0u8; 32]; 962 let len = stream.read(&mut buf[..]).unwrap(); 963 assert_eq!(&buf[..len], format!("OK {}\n", local_port).as_bytes()); 964 965 (stream, local_port) 966 } 967 } 968 969 struct LocalListener { 970 path: PathBuf, 971 sock: UnixListener, 972 } 973 impl LocalListener { 974 fn new<P: AsRef<Path> + Clone>(path: P) -> Self { 975 let path_buf = path.as_ref().to_path_buf(); 976 let sock = UnixListener::bind(path).unwrap(); 977 sock.set_nonblocking(true).unwrap(); 978 Self { 979 path: path_buf, 980 sock, 981 } 982 } 983 fn accept(&mut self) -> UnixStream { 984 let (stream, _) = self.sock.accept().unwrap(); 985 stream.set_nonblocking(true).unwrap(); 986 stream 987 } 988 } 989 impl Drop for LocalListener { 990 fn drop(&mut self) { 991 std::fs::remove_file(&self.path).unwrap(); 992 } 993 } 994 995 #[test] 996 fn test_muxer_epoll_listener() { 997 let ctx = MuxerTestContext::new("muxer_epoll_listener"); 998 assert_eq!(ctx.muxer.get_polled_fd(), ctx.muxer.epoll_file.as_raw_fd()); 999 assert_eq!(ctx.muxer.get_polled_evset(), epoll::Events::EPOLLIN); 1000 } 1001 1002 #[test] 1003 fn test_bad_peer_pkt() { 1004 const LOCAL_PORT: u32 = 1026; 1005 const PEER_PORT: u32 = 1025; 1006 const SOCK_DGRAM: u16 = 2; 1007 1008 let mut ctx = MuxerTestContext::new("bad_peer_pkt"); 1009 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST) 1010 .set_type(SOCK_DGRAM); 1011 ctx.send(); 1012 1013 // The guest sent a SOCK_DGRAM packet. Per the vsock spec, we need to reply with an RST 1014 // packet, since vsock only supports stream sockets. 1015 assert!(ctx.muxer.has_pending_rx()); 1016 ctx.recv(); 1017 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1018 assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1019 assert_eq!(ctx.pkt.dst_cid(), PEER_CID); 1020 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1021 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1022 1023 // Any orphan (i.e. without a connection), non-RST packet, should be replied to with an 1024 // RST. 1025 let bad_ops = [ 1026 uapi::VSOCK_OP_RESPONSE, 1027 uapi::VSOCK_OP_CREDIT_REQUEST, 1028 uapi::VSOCK_OP_CREDIT_UPDATE, 1029 uapi::VSOCK_OP_SHUTDOWN, 1030 uapi::VSOCK_OP_RW, 1031 ]; 1032 for op in bad_ops.iter() { 1033 ctx.init_pkt(LOCAL_PORT, PEER_PORT, *op); 1034 ctx.send(); 1035 assert!(ctx.muxer.has_pending_rx()); 1036 ctx.recv(); 1037 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1038 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1039 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1040 } 1041 1042 // Any packet addressed to anything other than VSOCK_VHOST_CID should get dropped. 1043 assert!(!ctx.muxer.has_pending_rx()); 1044 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST) 1045 .set_dst_cid(uapi::VSOCK_HOST_CID + 1); 1046 ctx.send(); 1047 assert!(!ctx.muxer.has_pending_rx()); 1048 } 1049 1050 #[test] 1051 fn test_peer_connection() { 1052 const LOCAL_PORT: u32 = 1026; 1053 const PEER_PORT: u32 = 1025; 1054 1055 let mut ctx = MuxerTestContext::new("peer_connection"); 1056 1057 // Test peer connection refused. 1058 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST); 1059 ctx.send(); 1060 ctx.recv(); 1061 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1062 assert_eq!(ctx.pkt.len(), 0); 1063 assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1064 assert_eq!(ctx.pkt.dst_cid(), PEER_CID); 1065 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1066 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1067 1068 // Test peer connection accepted. 1069 let mut listener = ctx.create_local_listener(LOCAL_PORT); 1070 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST); 1071 ctx.send(); 1072 assert_eq!(ctx.muxer.conn_map.len(), 1); 1073 let mut stream = listener.accept(); 1074 ctx.recv(); 1075 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1076 assert_eq!(ctx.pkt.len(), 0); 1077 assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1078 assert_eq!(ctx.pkt.dst_cid(), PEER_CID); 1079 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1080 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1081 let key = ConnMapKey { 1082 local_port: LOCAL_PORT, 1083 peer_port: PEER_PORT, 1084 }; 1085 assert!(ctx.muxer.conn_map.contains_key(&key)); 1086 1087 // Test guest -> host data flow. 1088 let data = [1, 2, 3, 4]; 1089 ctx.init_data_pkt(LOCAL_PORT, PEER_PORT, &data); 1090 ctx.send(); 1091 let mut buf = vec![0; data.len()]; 1092 stream.read_exact(buf.as_mut_slice()).unwrap(); 1093 assert_eq!(buf.as_slice(), data); 1094 1095 // Test host -> guest data flow. 1096 let data = [5u8, 6, 7, 8]; 1097 stream.write_all(&data).unwrap(); 1098 1099 // When data is available on the local stream, an EPOLLIN event would normally be delivered 1100 // to the muxer's nested epoll FD. For testing only, we can fake that event notification 1101 // here. 1102 ctx.notify_muxer(); 1103 // After being notified, the muxer should've figured out that RX data was available for one 1104 // of its connections, so it should now be reporting that it can fill in an RX packet. 1105 assert!(ctx.muxer.has_pending_rx()); 1106 ctx.recv(); 1107 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 1108 assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data); 1109 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1110 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1111 1112 assert!(!ctx.muxer.has_pending_rx()); 1113 } 1114 1115 #[test] 1116 fn test_local_connection() { 1117 let mut ctx = MuxerTestContext::new("local_connection"); 1118 let peer_port = 1025; 1119 let (mut stream, local_port) = ctx.local_connect(peer_port); 1120 1121 // Test guest -> host data flow. 1122 let data = [1, 2, 3, 4]; 1123 ctx.init_data_pkt(local_port, peer_port, &data); 1124 ctx.send(); 1125 1126 let mut buf = vec![0u8; data.len()]; 1127 stream.read_exact(buf.as_mut_slice()).unwrap(); 1128 assert_eq!(buf.as_slice(), &data); 1129 1130 // Test host -> guest data flow. 1131 let data = [5, 6, 7, 8]; 1132 stream.write_all(&data).unwrap(); 1133 ctx.notify_muxer(); 1134 1135 assert!(ctx.muxer.has_pending_rx()); 1136 ctx.recv(); 1137 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 1138 assert_eq!(ctx.pkt.src_port(), local_port); 1139 assert_eq!(ctx.pkt.dst_port(), peer_port); 1140 assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data); 1141 } 1142 1143 #[test] 1144 fn test_local_close() { 1145 let peer_port = 1025; 1146 let mut ctx = MuxerTestContext::new("local_close"); 1147 let local_port; 1148 { 1149 let (_stream, local_port_) = ctx.local_connect(peer_port); 1150 local_port = local_port_; 1151 } 1152 // Local var `_stream` was now dropped, thus closing the local stream. After the muxer gets 1153 // notified via EPOLLIN, it should attempt to gracefully shutdown the connection, issuing a 1154 // VSOCK_OP_SHUTDOWN with both no-more-send and no-more-recv indications set. 1155 ctx.notify_muxer(); 1156 assert!(ctx.muxer.has_pending_rx()); 1157 ctx.recv(); 1158 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN); 1159 assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND, 0); 1160 assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV, 0); 1161 assert_eq!(ctx.pkt.src_port(), local_port); 1162 assert_eq!(ctx.pkt.dst_port(), peer_port); 1163 1164 // The connection should get removed (and its local port freed), after the peer replies 1165 // with an RST. 1166 ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RST); 1167 ctx.send(); 1168 let key = ConnMapKey { 1169 local_port, 1170 peer_port, 1171 }; 1172 assert!(!ctx.muxer.conn_map.contains_key(&key)); 1173 assert!(!ctx.muxer.local_port_set.contains(&local_port)); 1174 } 1175 1176 #[test] 1177 fn test_peer_close() { 1178 let peer_port = 1025; 1179 let local_port = 1026; 1180 let mut ctx = MuxerTestContext::new("peer_close"); 1181 1182 let mut sock = ctx.create_local_listener(local_port); 1183 ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_REQUEST); 1184 ctx.send(); 1185 let mut stream = sock.accept(); 1186 1187 assert!(ctx.muxer.has_pending_rx()); 1188 ctx.recv(); 1189 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1190 assert_eq!(ctx.pkt.src_port(), local_port); 1191 assert_eq!(ctx.pkt.dst_port(), peer_port); 1192 let key = ConnMapKey { 1193 local_port, 1194 peer_port, 1195 }; 1196 assert!(ctx.muxer.conn_map.contains_key(&key)); 1197 1198 // Emulate a full shutdown from the peer (no-more-send + no-more-recv). 1199 ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_SHUTDOWN) 1200 .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND) 1201 .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV); 1202 ctx.send(); 1203 1204 // Now, the muxer should remove the connection from its map, and reply with an RST. 1205 assert!(ctx.muxer.has_pending_rx()); 1206 ctx.recv(); 1207 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1208 assert_eq!(ctx.pkt.src_port(), local_port); 1209 assert_eq!(ctx.pkt.dst_port(), peer_port); 1210 let key = ConnMapKey { 1211 local_port, 1212 peer_port, 1213 }; 1214 assert!(!ctx.muxer.conn_map.contains_key(&key)); 1215 1216 // The muxer should also drop / close the local Unix socket for this connection. 1217 let mut buf = vec![0u8; 16]; 1218 assert_eq!(stream.read(buf.as_mut_slice()).unwrap(), 0); 1219 } 1220 1221 #[test] 1222 fn test_muxer_rxq() { 1223 let mut ctx = MuxerTestContext::new("muxer_rxq"); 1224 let local_port = 1026; 1225 let peer_port_first = 1025; 1226 let mut listener = ctx.create_local_listener(local_port); 1227 let mut streams: Vec<UnixStream> = Vec::new(); 1228 1229 for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE { 1230 ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST); 1231 ctx.send(); 1232 streams.push(listener.accept()); 1233 } 1234 1235 // The muxer RX queue should now be full (with connection responses), but still 1236 // synchronized. 1237 assert!(ctx.muxer.rxq.is_synced()); 1238 1239 // One more queued reply should desync the RX queue. 1240 ctx.init_pkt( 1241 local_port, 1242 (peer_port_first + defs::MUXER_RXQ_SIZE) as u32, 1243 uapi::VSOCK_OP_REQUEST, 1244 ); 1245 ctx.send(); 1246 assert!(!ctx.muxer.rxq.is_synced()); 1247 1248 // With an out-of-sync queue, an RST should evict any non-RST packet from the queue, and 1249 // take its place. We'll check that by making sure that the last packet popped from the 1250 // queue is an RST. 1251 ctx.init_pkt( 1252 local_port + 1, 1253 peer_port_first as u32, 1254 uapi::VSOCK_OP_REQUEST, 1255 ); 1256 ctx.send(); 1257 1258 for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE - 1 { 1259 ctx.recv(); 1260 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1261 // The response order should hold. The evicted response should have been the last 1262 // enqueued. 1263 assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 1264 } 1265 // There should be one more packet in the queue: the RST. 1266 assert_eq!(ctx.muxer.rxq.len(), 1); 1267 ctx.recv(); 1268 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1269 1270 // The queue should now be empty, but out-of-sync, so the muxer should report it has some 1271 // pending RX. 1272 assert!(ctx.muxer.rxq.is_empty()); 1273 assert!(!ctx.muxer.rxq.is_synced()); 1274 assert!(ctx.muxer.has_pending_rx()); 1275 1276 // The next recv should sync the queue back up. It should also yield one of the two 1277 // responses that are still left: 1278 // - the one that desynchronized the queue; and 1279 // - the one that got evicted by the RST. 1280 ctx.recv(); 1281 assert!(ctx.muxer.rxq.is_synced()); 1282 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1283 1284 assert!(ctx.muxer.has_pending_rx()); 1285 ctx.recv(); 1286 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1287 } 1288 1289 #[test] 1290 fn test_muxer_killq() { 1291 let mut ctx = MuxerTestContext::new("muxer_killq"); 1292 let local_port = 1026; 1293 let peer_port_first = 1025; 1294 let peer_port_last = peer_port_first + defs::MUXER_KILLQ_SIZE; 1295 let mut listener = ctx.create_local_listener(local_port); 1296 1297 for peer_port in peer_port_first..=peer_port_last { 1298 ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST); 1299 ctx.send(); 1300 ctx.notify_muxer(); 1301 ctx.recv(); 1302 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1303 assert_eq!(ctx.pkt.src_port(), local_port); 1304 assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 1305 { 1306 let _stream = listener.accept(); 1307 } 1308 ctx.notify_muxer(); 1309 ctx.recv(); 1310 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN); 1311 assert_eq!(ctx.pkt.src_port(), local_port); 1312 assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 1313 // The kill queue should be synchronized, up until the `defs::MUXER_KILLQ_SIZE`th 1314 // connection we schedule for termination. 1315 assert_eq!( 1316 ctx.muxer.killq.is_synced(), 1317 peer_port < peer_port_first + defs::MUXER_KILLQ_SIZE 1318 ); 1319 } 1320 1321 assert!(!ctx.muxer.killq.is_synced()); 1322 assert!(!ctx.muxer.has_pending_rx()); 1323 1324 // Wait for the kill timers to expire. 1325 std::thread::sleep(std::time::Duration::from_millis( 1326 csm_defs::CONN_SHUTDOWN_TIMEOUT_MS, 1327 )); 1328 1329 // Trigger a kill queue sweep, by requesting a new connection. 1330 ctx.init_pkt( 1331 local_port, 1332 peer_port_last as u32 + 1, 1333 uapi::VSOCK_OP_REQUEST, 1334 ); 1335 ctx.send(); 1336 1337 // After sweeping the kill queue, it should now be synced (assuming the RX queue is larger 1338 // than the kill queue, since an RST packet will be queued for each killed connection). 1339 assert!(ctx.muxer.killq.is_synced()); 1340 assert!(ctx.muxer.has_pending_rx()); 1341 // There should be `defs::MUXER_KILLQ_SIZE` RSTs in the RX queue, from terminating the 1342 // dying connections in the recent killq sweep. 1343 for _p in peer_port_first..peer_port_last { 1344 ctx.recv(); 1345 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1346 assert_eq!(ctx.pkt.src_port(), local_port); 1347 } 1348 1349 // There should be one more packet in the RX queue: the connection response our request 1350 // that triggered the kill queue sweep. 1351 ctx.recv(); 1352 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1353 assert_eq!(ctx.pkt.dst_port(), peer_port_last as u32 + 1); 1354 1355 assert!(!ctx.muxer.has_pending_rx()); 1356 } 1357 1358 #[test] 1359 fn test_regression_handshake() { 1360 // Address one of the issues found while fixing the following issue: 1361 // https://github.com/firecracker-microvm/firecracker/issues/1751 1362 // This test checks that the handshake message is not accounted for 1363 let mut ctx = MuxerTestContext::new("regression_handshake"); 1364 let peer_port = 1025; 1365 1366 // Create a local connection. 1367 let (_, local_port) = ctx.local_connect(peer_port); 1368 1369 // Get the connection from the connection map. 1370 let key = ConnMapKey { 1371 local_port, 1372 peer_port, 1373 }; 1374 let conn = ctx.muxer.conn_map.get_mut(&key).unwrap(); 1375 1376 // Check that fwd_cnt is 0 - "OK ..." was not accounted for. 1377 assert_eq!(conn.fwd_cnt().0, 0); 1378 } 1379 1380 #[test] 1381 fn test_regression_rxq_pop() { 1382 // Address one of the issues found while fixing the following issue: 1383 // https://github.com/firecracker-microvm/firecracker/issues/1751 1384 // This test checks that a connection is not popped out of the muxer 1385 // rxq when multiple flags are set 1386 let mut ctx = MuxerTestContext::new("regression_rxq_pop"); 1387 let peer_port = 1025; 1388 let (mut stream, local_port) = ctx.local_connect(peer_port); 1389 1390 // Send some data. 1391 let data = [5u8, 6, 7, 8]; 1392 stream.write_all(&data).unwrap(); 1393 ctx.notify_muxer(); 1394 1395 // Get the connection from the connection map. 1396 let key = ConnMapKey { 1397 local_port, 1398 peer_port, 1399 }; 1400 let conn = ctx.muxer.conn_map.get_mut(&key).unwrap(); 1401 1402 // Forcefully insert another flag. 1403 conn.insert_credit_update(); 1404 1405 // Call recv twice in order to check that the connection is still 1406 // in the rxq. 1407 assert!(ctx.muxer.has_pending_rx()); 1408 ctx.recv(); 1409 assert!(ctx.muxer.has_pending_rx()); 1410 ctx.recv(); 1411 1412 // Since initially the connection had two flags set, now there should 1413 // not be any pending RX in the muxer. 1414 assert!(!ctx.muxer.has_pending_rx()); 1415 } 1416 } 1417