1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 // 4 5 /// `VsockMuxer` is the device-facing component of the Unix domain sockets vsock backend. I.e. 6 /// by implementing the `VsockBackend` trait, it abstracts away the gory details of translating 7 /// between AF_VSOCK and AF_UNIX, and presents a clean interface to the rest of the vsock 8 /// device model. 9 /// 10 /// The vsock muxer has two main roles: 11 /// 1. Vsock connection multiplexer: 12 /// It's the muxer's job to create, manage, and terminate `VsockConnection` objects. The 13 /// muxer also routes packets to their owning connections. It does so via a connection 14 /// `HashMap`, keyed by what is basically a (host_port, guest_port) tuple. 15 /// Vsock packet traffic needs to be inspected, in order to detect connection request 16 /// packets (leading to the creation of a new connection), and connection reset packets 17 /// (leading to the termination of an existing connection). All other packets, though, must 18 /// belong to an existing connection and, as such, the muxer simply forwards them. 19 /// 2. Event dispatcher 20 /// There are three event categories that the vsock backend is interested it: 21 /// 1. A new host-initiated connection is ready to be accepted from the listening host Unix 22 /// socket; 23 /// 2. Data is available for reading from a newly-accepted host-initiated connection (i.e. 24 /// the host is ready to issue a vsock connection request, informing us of the 25 /// destination port to which it wants to connect); 26 /// 3. Some event was triggered for a connected Unix socket, that belongs to a 27 /// `VsockConnection`. 28 /// The muxer gets notified about all of these events, because, as a `VsockEpollListener` 29 /// implementor, it gets to register a nested epoll FD into the main VMM epolling loop. All 30 /// other pollable FDs are then registered under this nested epoll FD. 31 /// To route all these events to their handlers, the muxer uses another `HashMap` object, 32 /// mapping `RawFd`s to `EpollListener`s. 33 /// 34 use std::collections::{HashMap, HashSet}; 35 use std::fs::File; 36 use std::io::{self, Read}; 37 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; 38 use std::os::unix::net::{UnixListener, UnixStream}; 39 40 use super::super::csm::ConnState; 41 use super::super::defs::uapi; 42 use super::super::packet::VsockPacket; 43 use super::super::{ 44 Result as VsockResult, VsockBackend, VsockChannel, VsockEpollListener, VsockError, 45 }; 46 use super::defs; 47 use super::muxer_killq::MuxerKillQ; 48 use super::muxer_rxq::MuxerRxQ; 49 use super::MuxerConnection; 50 use super::{Error, Result}; 51 52 /// A unique identifier of a `MuxerConnection` object. Connections are stored in a hash map, 53 /// keyed by a `ConnMapKey` object. 54 /// 55 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] 56 pub struct ConnMapKey { 57 local_port: u32, 58 peer_port: u32, 59 } 60 61 /// A muxer RX queue item. 62 /// 63 #[derive(Clone, Copy, Debug)] 64 pub enum MuxerRx { 65 /// The packet must be fetched from the connection identified by `ConnMapKey`. 66 ConnRx(ConnMapKey), 67 /// The muxer must produce an RST packet. 68 RstPkt { local_port: u32, peer_port: u32 }, 69 } 70 71 /// An epoll listener, registered under the muxer's nested epoll FD. 72 /// 73 enum EpollListener { 74 /// The listener is a `MuxerConnection`, identified by `key`, and interested in the events 75 /// in `evset`. Since `MuxerConnection` implements `VsockEpollListener`, notifications will 76 /// be forwarded to the listener via `VsockEpollListener::notify()`. 77 Connection { 78 key: ConnMapKey, 79 evset: epoll::Events, 80 }, 81 /// A listener interested in new host-initiated connections. 82 HostSock, 83 /// A listener interested in reading host "connect <port>" commands from a freshly 84 /// connected host socket. 85 LocalStream(UnixStream), 86 } 87 88 /// The vsock connection multiplexer. 89 /// 90 pub struct VsockMuxer { 91 /// Guest CID. 92 cid: u64, 93 /// A hash map used to store the active connections. 94 conn_map: HashMap<ConnMapKey, MuxerConnection>, 95 /// A hash map used to store epoll event listeners / handlers. 96 listener_map: HashMap<RawFd, EpollListener>, 97 /// The RX queue. Items in this queue are consumed by `VsockMuxer::recv_pkt()`, and 98 /// produced 99 /// - by `VsockMuxer::send_pkt()` (e.g. RST in response to a connection request packet); 100 /// and 101 /// - in response to EPOLLIN events (e.g. data available to be read from an AF_UNIX 102 /// socket). 103 rxq: MuxerRxQ, 104 /// A queue used for terminating connections that are taking too long to shut down. 105 killq: MuxerKillQ, 106 /// The Unix socket, through which host-initiated connections are accepted. 107 host_sock: UnixListener, 108 /// The file system path of the host-side Unix socket. This is used to figure out the path 109 /// to Unix sockets listening on specific ports. I.e. "<this path>_<port number>". 110 host_sock_path: String, 111 /// The nested epoll File, used to register epoll listeners. 112 epoll_file: File, 113 /// A hash set used to keep track of used host-side (local) ports, in order to assign local 114 /// ports to host-initiated connections. 115 local_port_set: HashSet<u32>, 116 /// The last used host-side port. 117 local_port_last: u32, 118 } 119 120 impl VsockChannel for VsockMuxer { 121 /// Deliver a vsock packet to the guest vsock driver. 122 /// 123 /// Returns: 124 /// - `Ok(())`: `pkt` has been successfully filled in; or 125 /// - `Err(VsockError::NoData)`: there was no available data with which to fill in the 126 /// packet. 127 /// 128 fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> { 129 // We'll look for instructions on how to build the RX packet in the RX queue. If the 130 // queue is empty, that doesn't necessarily mean we don't have any pending RX, since 131 // the queue might be out-of-sync. If that's the case, we'll attempt to sync it first, 132 // and then try to pop something out again. 133 if self.rxq.is_empty() && !self.rxq.is_synced() { 134 self.rxq = MuxerRxQ::from_conn_map(&self.conn_map); 135 } 136 137 while let Some(rx) = self.rxq.peek() { 138 let res = match rx { 139 // We need to build an RST packet, going from `local_port` to `peer_port`. 140 MuxerRx::RstPkt { 141 local_port, 142 peer_port, 143 } => { 144 pkt.set_op(uapi::VSOCK_OP_RST) 145 .set_src_cid(uapi::VSOCK_HOST_CID) 146 .set_dst_cid(self.cid) 147 .set_src_port(local_port) 148 .set_dst_port(peer_port) 149 .set_len(0) 150 .set_type(uapi::VSOCK_TYPE_STREAM) 151 .set_flags(0) 152 .set_buf_alloc(0) 153 .set_fwd_cnt(0); 154 self.rxq.pop().unwrap(); 155 return Ok(()); 156 } 157 158 // We'll defer building the packet to this connection, since it has something 159 // to say. 160 MuxerRx::ConnRx(key) => { 161 let mut conn_res = Err(VsockError::NoData); 162 let mut do_pop = true; 163 self.apply_conn_mutation(key, |conn| { 164 conn_res = conn.recv_pkt(pkt); 165 do_pop = !conn.has_pending_rx(); 166 }); 167 if do_pop { 168 self.rxq.pop().unwrap(); 169 } 170 conn_res 171 } 172 }; 173 174 if res.is_ok() { 175 // Inspect traffic, looking for RST packets, since that means we have to 176 // terminate and remove this connection from the active connection pool. 177 // 178 if pkt.op() == uapi::VSOCK_OP_RST { 179 self.remove_connection(ConnMapKey { 180 local_port: pkt.src_port(), 181 peer_port: pkt.dst_port(), 182 }); 183 } 184 185 debug!("vsock muxer: RX pkt: {:?}", pkt.hdr()); 186 return Ok(()); 187 } 188 } 189 190 Err(VsockError::NoData) 191 } 192 193 /// Deliver a guest-generated packet to its destination in the vsock backend. 194 /// 195 /// This absorbs unexpected packets, handles RSTs (by dropping connections), and forwards 196 /// all the rest to their owning `MuxerConnection`. 197 /// 198 /// Returns: 199 /// always `Ok(())` - the packet has been consumed, and its virtio TX buffers can be 200 /// returned to the guest vsock driver. 201 /// 202 fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> { 203 let conn_key = ConnMapKey { 204 local_port: pkt.dst_port(), 205 peer_port: pkt.src_port(), 206 }; 207 208 debug!( 209 "vsock: muxer.send[rxq.len={}]: {:?}", 210 self.rxq.len(), 211 pkt.hdr() 212 ); 213 214 // If this packet has an unsupported type (!=stream), we must send back an RST. 215 // 216 if pkt.type_() != uapi::VSOCK_TYPE_STREAM { 217 self.enq_rst(pkt.dst_port(), pkt.src_port()); 218 return Ok(()); 219 } 220 221 // We don't know how to handle packets addressed to other CIDs. We only handle the host 222 // part of the guest - host communication here. 223 if pkt.dst_cid() != uapi::VSOCK_HOST_CID { 224 info!( 225 "vsock: dropping guest packet for unknown CID: {:?}", 226 pkt.hdr() 227 ); 228 return Ok(()); 229 } 230 231 if !self.conn_map.contains_key(&conn_key) { 232 // This packet can't be routed to any active connection (based on its src and dst 233 // ports). The only orphan / unroutable packets we know how to handle are 234 // connection requests. 235 if pkt.op() == uapi::VSOCK_OP_REQUEST { 236 // Oh, this is a connection request! 237 self.handle_peer_request_pkt(pkt); 238 } else { 239 // Send back an RST, to let the drive know we weren't expecting this packet. 240 self.enq_rst(pkt.dst_port(), pkt.src_port()); 241 } 242 return Ok(()); 243 } 244 245 // Right, we know where to send this packet, then (to `conn_key`). 246 // However, if this is an RST, we have to forcefully terminate the connection, so 247 // there's no point in forwarding it the packet. 248 if pkt.op() == uapi::VSOCK_OP_RST { 249 self.remove_connection(conn_key); 250 return Ok(()); 251 } 252 253 // Alright, everything looks in order - forward this packet to its owning connection. 254 let mut res: VsockResult<()> = Ok(()); 255 self.apply_conn_mutation(conn_key, |conn| { 256 res = conn.send_pkt(pkt); 257 }); 258 259 res 260 } 261 262 /// Check if the muxer has any pending RX data, with which to fill a guest-provided RX 263 /// buffer. 264 /// 265 fn has_pending_rx(&self) -> bool { 266 !self.rxq.is_empty() || !self.rxq.is_synced() 267 } 268 } 269 270 impl VsockEpollListener for VsockMuxer { 271 /// Get the FD to be registered for polling upstream (in the main VMM epoll loop, in this 272 /// case). 273 /// 274 /// This will be the muxer's nested epoll FD. 275 /// 276 fn get_polled_fd(&self) -> RawFd { 277 self.epoll_file.as_raw_fd() 278 } 279 280 /// Get the epoll events to be polled upstream. 281 /// 282 /// Since the polled FD is a nested epoll FD, we're only interested in EPOLLIN events (i.e. 283 /// some event occurred on one of the FDs registered under our epoll FD). 284 /// 285 fn get_polled_evset(&self) -> epoll::Events { 286 epoll::Events::EPOLLIN 287 } 288 289 /// Notify the muxer about a pending event having occurred under its nested epoll FD. 290 /// 291 fn notify(&mut self, _: epoll::Events) { 292 debug!("vsock: muxer received kick"); 293 294 let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); 32]; 295 'epoll: loop { 296 match epoll::wait(self.epoll_file.as_raw_fd(), 0, epoll_events.as_mut_slice()) { 297 Ok(ev_cnt) => { 298 for evt in epoll_events.iter().take(ev_cnt) { 299 self.handle_event( 300 evt.data as RawFd, 301 // It's ok to unwrap here, since the `evt.events` is filled 302 // in by `epoll::wait()`, and therefore contains only valid epoll 303 // flags. 304 epoll::Events::from_bits(evt.events).unwrap(), 305 ); 306 } 307 } 308 Err(e) => { 309 if e.kind() == io::ErrorKind::Interrupted { 310 // It's well defined from the epoll_wait() syscall 311 // documentation that the epoll loop can be interrupted 312 // before any of the requested events occurred or the 313 // timeout expired. In both those cases, epoll_wait() 314 // returns an error of type EINTR, but this should not 315 // be considered as a regular error. Instead it is more 316 // appropriate to retry, by calling into epoll_wait(). 317 continue; 318 } 319 warn!("vsock: failed to consume muxer epoll event: {}", e); 320 } 321 } 322 break 'epoll; 323 } 324 } 325 } 326 327 impl VsockBackend for VsockMuxer {} 328 329 impl VsockMuxer { 330 /// Muxer constructor. 331 /// 332 pub fn new(cid: u64, host_sock_path: String) -> Result<Self> { 333 // Create the nested epoll FD. This FD will be added to the VMM `EpollContext`, at 334 // device activation time. 335 let epoll_fd = epoll::create(true).map_err(Error::EpollFdCreate)?; 336 // Use 'File' to enforce closing on 'epoll_fd' 337 // SAFETY: epoll_fd is a valid fd 338 let epoll_file = unsafe { File::from_raw_fd(epoll_fd) }; 339 340 // Open/bind/listen on the host Unix socket, so we can accept host-initiated 341 // connections. 342 let host_sock = UnixListener::bind(&host_sock_path) 343 .and_then(|sock| sock.set_nonblocking(true).map(|_| sock)) 344 .map_err(Error::UnixBind)?; 345 346 let mut muxer = Self { 347 cid, 348 host_sock, 349 host_sock_path, 350 epoll_file, 351 rxq: MuxerRxQ::new(), 352 conn_map: HashMap::with_capacity(defs::MAX_CONNECTIONS), 353 listener_map: HashMap::with_capacity(defs::MAX_CONNECTIONS + 1), 354 killq: MuxerKillQ::new(), 355 local_port_last: (1u32 << 30) - 1, 356 local_port_set: HashSet::with_capacity(defs::MAX_CONNECTIONS), 357 }; 358 359 muxer.add_listener(muxer.host_sock.as_raw_fd(), EpollListener::HostSock)?; 360 Ok(muxer) 361 } 362 363 /// Handle/dispatch an epoll event to its listener. 364 /// 365 fn handle_event(&mut self, fd: RawFd, event_set: epoll::Events) { 366 debug!( 367 "vsock: muxer processing event: fd={}, event_set={:?}", 368 fd, event_set 369 ); 370 371 match self.listener_map.get_mut(&fd) { 372 // This event needs to be forwarded to a `MuxerConnection` that is listening for 373 // it. 374 // 375 Some(EpollListener::Connection { key, evset: _ }) => { 376 let key_copy = *key; 377 // The handling of this event will most probably mutate the state of the 378 // receiving connection. We'll need to check for new pending RX, event set 379 // mutation, and all that, so we're wrapping the event delivery inside those 380 // checks. 381 self.apply_conn_mutation(key_copy, |conn| { 382 conn.notify(event_set); 383 }); 384 } 385 386 // A new host-initiated connection is ready to be accepted. 387 // 388 Some(EpollListener::HostSock) => { 389 if self.conn_map.len() == defs::MAX_CONNECTIONS { 390 // If we're already maxed-out on connections, we'll just accept and 391 // immediately discard this potentially new one. 392 warn!("vsock: connection limit reached; refusing new host connection"); 393 self.host_sock.accept().map(|_| 0).unwrap_or(0); 394 return; 395 } 396 self.host_sock 397 .accept() 398 .map_err(Error::UnixAccept) 399 .and_then(|(stream, _)| { 400 stream 401 .set_nonblocking(true) 402 .map(|_| stream) 403 .map_err(Error::UnixAccept) 404 }) 405 .and_then(|stream| { 406 // Before forwarding this connection to a listening AF_VSOCK socket on 407 // the guest side, we need to know the destination port. We'll read 408 // that port from a "connect" command received on this socket, so the 409 // next step is to ask to be notified the moment we can read from it. 410 self.add_listener(stream.as_raw_fd(), EpollListener::LocalStream(stream)) 411 }) 412 .unwrap_or_else(|err| { 413 warn!("vsock: unable to accept local connection: {:?}", err); 414 }); 415 } 416 417 // Data is ready to be read from a host-initiated connection. That would be the 418 // "connect" command that we're expecting. 419 Some(EpollListener::LocalStream(_)) => { 420 if let Some(EpollListener::LocalStream(mut stream)) = self.remove_listener(fd) { 421 Self::read_local_stream_port(&mut stream) 422 .map(|peer_port| (self.allocate_local_port(), peer_port)) 423 .and_then(|(local_port, peer_port)| { 424 self.add_connection( 425 ConnMapKey { 426 local_port, 427 peer_port, 428 }, 429 MuxerConnection::new_local_init( 430 stream, 431 uapi::VSOCK_HOST_CID, 432 self.cid, 433 local_port, 434 peer_port, 435 ), 436 ) 437 }) 438 .unwrap_or_else(|err| { 439 info!("vsock: error adding local-init connection: {:?}", err); 440 }) 441 } 442 } 443 444 _ => { 445 info!( 446 "vsock: unexpected event: fd={:?}, event_set={:?}", 447 fd, event_set 448 ); 449 } 450 } 451 } 452 453 /// Parse a host "connect" command, and extract the destination vsock port. 454 /// 455 fn read_local_stream_port(stream: &mut UnixStream) -> Result<u32> { 456 let mut buf = [0u8; 32]; 457 458 // This is the minimum number of bytes that we should be able to read, when parsing a 459 // valid connection request. I.e. `b"connect 0\n".len()`. 460 const MIN_READ_LEN: usize = 10; 461 462 // Bring in the minimum number of bytes that we should be able to read. 463 stream 464 .read_exact(&mut buf[..MIN_READ_LEN]) 465 .map_err(Error::UnixRead)?; 466 467 // Now, finish reading the destination port number, by bringing in one byte at a time, 468 // until we reach an EOL terminator (or our buffer space runs out). Yeah, not 469 // particularly proud of this approach, but it will have to do for now. 470 let mut blen = MIN_READ_LEN; 471 while buf[blen - 1] != b'\n' && blen < buf.len() { 472 stream 473 .read_exact(&mut buf[blen..=blen]) 474 .map_err(Error::UnixRead)?; 475 blen += 1; 476 } 477 478 let mut word_iter = std::str::from_utf8(&buf[..blen]) 479 .map_err(Error::ConvertFromUtf8)? 480 .split_whitespace(); 481 482 word_iter 483 .next() 484 .ok_or(Error::InvalidPortRequest) 485 .and_then(|word| { 486 if word.to_lowercase() == "connect" { 487 Ok(()) 488 } else { 489 Err(Error::InvalidPortRequest) 490 } 491 }) 492 .and_then(|_| word_iter.next().ok_or(Error::InvalidPortRequest)) 493 .and_then(|word| word.parse::<u32>().map_err(Error::ParseInteger)) 494 .map_err(|e| Error::ReadStreamPort(Box::new(e))) 495 } 496 497 /// Add a new connection to the active connection pool. 498 /// 499 fn add_connection(&mut self, key: ConnMapKey, conn: MuxerConnection) -> Result<()> { 500 // We might need to make room for this new connection, so let's sweep the kill queue 501 // first. It's fine to do this here because: 502 // - unless the kill queue is out of sync, this is a pretty inexpensive operation; and 503 // - we are under no pressure to respect any accurate timing for connection 504 // termination. 505 self.sweep_killq(); 506 507 if self.conn_map.len() >= defs::MAX_CONNECTIONS { 508 info!( 509 "vsock: muxer connection limit reached ({})", 510 defs::MAX_CONNECTIONS 511 ); 512 return Err(Error::TooManyConnections); 513 } 514 515 self.add_listener( 516 conn.get_polled_fd(), 517 EpollListener::Connection { 518 key, 519 evset: conn.get_polled_evset(), 520 }, 521 ) 522 .map(|_| { 523 if conn.has_pending_rx() { 524 // We can safely ignore any error in adding a connection RX indication. Worst 525 // case scenario, the RX queue will get desynchronized, but we'll handle that 526 // the next time we need to yield an RX packet. 527 self.rxq.push(MuxerRx::ConnRx(key)); 528 } 529 self.conn_map.insert(key, conn); 530 }) 531 } 532 533 /// Remove a connection from the active connection poll. 534 /// 535 fn remove_connection(&mut self, key: ConnMapKey) { 536 if let Some(conn) = self.conn_map.remove(&key) { 537 self.remove_listener(conn.get_polled_fd()); 538 } 539 self.free_local_port(key.local_port); 540 } 541 542 /// Schedule a connection for immediate termination. 543 /// I.e. as soon as we can also let our peer know we're dropping the connection, by sending 544 /// it an RST packet. 545 /// 546 fn kill_connection(&mut self, key: ConnMapKey) { 547 let mut had_rx = false; 548 self.conn_map.entry(key).and_modify(|conn| { 549 had_rx = conn.has_pending_rx(); 550 conn.kill(); 551 }); 552 // This connection will now have an RST packet to yield, so we need to add it to the RX 553 // queue. However, there's no point in doing that if it was already in the queue. 554 if !had_rx { 555 // We can safely ignore any error in adding a connection RX indication. Worst case 556 // scenario, the RX queue will get desynchronized, but we'll handle that the next 557 // time we need to yield an RX packet. 558 self.rxq.push(MuxerRx::ConnRx(key)); 559 } 560 } 561 562 /// Register a new epoll listener under the muxer's nested epoll FD. 563 /// 564 fn add_listener(&mut self, fd: RawFd, listener: EpollListener) -> Result<()> { 565 let evset = match listener { 566 EpollListener::Connection { evset, .. } => evset, 567 EpollListener::LocalStream(_) => epoll::Events::EPOLLIN, 568 EpollListener::HostSock => epoll::Events::EPOLLIN, 569 }; 570 571 epoll::ctl( 572 self.epoll_file.as_raw_fd(), 573 epoll::ControlOptions::EPOLL_CTL_ADD, 574 fd, 575 epoll::Event::new(evset, fd as u64), 576 ) 577 .map(|_| { 578 self.listener_map.insert(fd, listener); 579 }) 580 .map_err(Error::EpollAdd)?; 581 582 Ok(()) 583 } 584 585 /// Remove (and return) a previously registered epoll listener. 586 /// 587 fn remove_listener(&mut self, fd: RawFd) -> Option<EpollListener> { 588 let maybe_listener = self.listener_map.remove(&fd); 589 590 if maybe_listener.is_some() { 591 epoll::ctl( 592 self.epoll_file.as_raw_fd(), 593 epoll::ControlOptions::EPOLL_CTL_DEL, 594 fd, 595 epoll::Event::new(epoll::Events::empty(), 0), 596 ) 597 .unwrap_or_else(|err| { 598 warn!( 599 "vosck muxer: error removing epoll listener for fd {:?}: {:?}", 600 fd, err 601 ); 602 }); 603 } 604 605 maybe_listener 606 } 607 608 /// Allocate a host-side port to be assigned to a new host-initiated connection. 609 /// 610 /// 611 fn allocate_local_port(&mut self) -> u32 { 612 // TODO: this doesn't seem very space-efficient. 613 // Maybe rewrite this to limit port range and use a bitmap? 614 // 615 616 loop { 617 self.local_port_last = (self.local_port_last + 1) & !(1 << 31) | (1 << 30); 618 if self.local_port_set.insert(self.local_port_last) { 619 break; 620 } 621 } 622 self.local_port_last 623 } 624 625 /// Mark a previously used host-side port as free. 626 /// 627 fn free_local_port(&mut self, port: u32) { 628 self.local_port_set.remove(&port); 629 } 630 631 /// Handle a new connection request coming from our peer (the guest vsock driver). 632 /// 633 /// This will attempt to connect to a host-side Unix socket, expected to be listening at 634 /// the file system path corresponding to the destination port. If successful, a new 635 /// connection object will be created and added to the connection pool. On failure, a new 636 /// RST packet will be scheduled for delivery to the guest. 637 /// 638 fn handle_peer_request_pkt(&mut self, pkt: &VsockPacket) { 639 let port_path = format!("{}_{}", self.host_sock_path, pkt.dst_port()); 640 641 UnixStream::connect(port_path) 642 .and_then(|stream| stream.set_nonblocking(true).map(|_| stream)) 643 .map_err(Error::UnixConnect) 644 .and_then(|stream| { 645 self.add_connection( 646 ConnMapKey { 647 local_port: pkt.dst_port(), 648 peer_port: pkt.src_port(), 649 }, 650 MuxerConnection::new_peer_init( 651 stream, 652 uapi::VSOCK_HOST_CID, 653 self.cid, 654 pkt.dst_port(), 655 pkt.src_port(), 656 pkt.buf_alloc(), 657 ), 658 ) 659 }) 660 .unwrap_or_else(|_| self.enq_rst(pkt.dst_port(), pkt.src_port())); 661 } 662 663 /// Perform an action that might mutate a connection's state. 664 /// 665 /// This is used as shorthand for repetitive tasks that need to be performed after a 666 /// connection object mutates. E.g. 667 /// - update the connection's epoll listener; 668 /// - schedule the connection to be queried for RX data; 669 /// - kill the connection if an unrecoverable error occurs. 670 /// 671 fn apply_conn_mutation<F>(&mut self, key: ConnMapKey, mut_fn: F) 672 where 673 F: FnOnce(&mut MuxerConnection), 674 { 675 if let Some(conn) = self.conn_map.get_mut(&key) { 676 let had_rx = conn.has_pending_rx(); 677 let was_expiring = conn.will_expire(); 678 let prev_state = conn.state(); 679 680 mut_fn(conn); 681 682 // If this is a host-initiated connection that has just become established, we'll have 683 // to send an ack message to the host end. 684 if prev_state == ConnState::LocalInit && conn.state() == ConnState::Established { 685 let msg = format!("OK {}\n", key.local_port); 686 match conn.send_bytes_raw(msg.as_bytes()) { 687 Ok(written) if written == msg.len() => (), 688 Ok(_) => { 689 // If we can't write a dozen bytes to a pristine connection something 690 // must be really wrong. Killing it. 691 conn.kill(); 692 warn!("vsock: unable to fully write connection ack msg."); 693 } 694 Err(err) => { 695 conn.kill(); 696 warn!("vsock: unable to ack host connection: {:?}", err); 697 } 698 }; 699 } 700 701 // If the connection wasn't previously scheduled for RX, add it to our RX queue. 702 if !had_rx && conn.has_pending_rx() { 703 self.rxq.push(MuxerRx::ConnRx(key)); 704 } 705 706 // If the connection wasn't previously scheduled for termination, add it to the 707 // kill queue. 708 if !was_expiring && conn.will_expire() { 709 // It's safe to unwrap here, since `conn.will_expire()` already guaranteed that 710 // an `conn.expiry` is available. 711 self.killq.push(key, conn.expiry().unwrap()); 712 } 713 714 let fd = conn.get_polled_fd(); 715 let new_evset = conn.get_polled_evset(); 716 if new_evset.is_empty() { 717 // If the connection no longer needs epoll notifications, remove its listener 718 // from our list. 719 self.remove_listener(fd); 720 return; 721 } 722 if let Some(EpollListener::Connection { evset, .. }) = self.listener_map.get_mut(&fd) { 723 if *evset != new_evset { 724 // If the set of events that the connection is interested in has changed, 725 // we need to update its epoll listener. 726 debug!( 727 "vsock: updating listener for (lp={}, pp={}): old={:?}, new={:?}", 728 key.local_port, key.peer_port, *evset, new_evset 729 ); 730 731 *evset = new_evset; 732 epoll::ctl( 733 self.epoll_file.as_raw_fd(), 734 epoll::ControlOptions::EPOLL_CTL_MOD, 735 fd, 736 epoll::Event::new(new_evset, fd as u64), 737 ) 738 .unwrap_or_else(|err| { 739 // This really shouldn't happen, like, ever. However, "famous last 740 // words" and all that, so let's just kill it with fire, and walk away. 741 self.kill_connection(key); 742 error!( 743 "vsock: error updating epoll listener for (lp={}, pp={}): {:?}", 744 key.local_port, key.peer_port, err 745 ); 746 }); 747 } 748 } else { 749 // The connection had previously asked to be removed from the listener map (by 750 // returning an empty event set via `get_polled_fd()`), but now wants back in. 751 self.add_listener( 752 fd, 753 EpollListener::Connection { 754 key, 755 evset: new_evset, 756 }, 757 ) 758 .unwrap_or_else(|err| { 759 self.kill_connection(key); 760 error!( 761 "vsock: error updating epoll listener for (lp={}, pp={}): {:?}", 762 key.local_port, key.peer_port, err 763 ); 764 }); 765 } 766 } 767 } 768 769 /// Check if any connections have timed out, and if so, schedule them for immediate 770 /// termination. 771 /// 772 fn sweep_killq(&mut self) { 773 while let Some(key) = self.killq.pop() { 774 // Connections don't get removed from the kill queue when their kill timer is 775 // disarmed, since that would be a costly operation. This means we must check if 776 // the connection has indeed expired, prior to killing it. 777 let mut kill = false; 778 self.conn_map 779 .entry(key) 780 .and_modify(|conn| kill = conn.has_expired()); 781 if kill { 782 self.kill_connection(key); 783 } 784 } 785 786 if self.killq.is_empty() && !self.killq.is_synced() { 787 self.killq = MuxerKillQ::from_conn_map(&self.conn_map); 788 // If we've just re-created the kill queue, we can sweep it again; maybe there's 789 // more to kill. 790 self.sweep_killq(); 791 } 792 } 793 794 /// Enqueue an RST packet into `self.rxq`. 795 /// 796 /// Enqueue errors aren't propagated up the call chain, since there is nothing we can do to 797 /// handle them. We do, however, log a warning, since not being able to enqueue an RST 798 /// packet means we have to drop it, which is not normal operation. 799 /// 800 fn enq_rst(&mut self, local_port: u32, peer_port: u32) { 801 let pushed = self.rxq.push(MuxerRx::RstPkt { 802 local_port, 803 peer_port, 804 }); 805 if !pushed { 806 warn!( 807 "vsock: muxer.rxq full; dropping RST packet for lp={}, pp={}", 808 local_port, peer_port 809 ); 810 } 811 } 812 } 813 814 #[cfg(test)] 815 mod tests { 816 use std::io::{Read, Write}; 817 use std::ops::Drop; 818 use std::os::unix::net::{UnixListener, UnixStream}; 819 use std::path::{Path, PathBuf}; 820 821 use virtio_queue::QueueOwnedT; 822 823 use super::super::super::csm::defs as csm_defs; 824 use super::super::super::tests::TestContext as VsockTestContext; 825 use super::*; 826 827 const PEER_CID: u64 = 3; 828 const PEER_BUF_ALLOC: u32 = 64 * 1024; 829 830 struct MuxerTestContext { 831 _vsock_test_ctx: VsockTestContext, 832 pkt: VsockPacket, 833 muxer: VsockMuxer, 834 } 835 836 impl Drop for MuxerTestContext { 837 fn drop(&mut self) { 838 std::fs::remove_file(self.muxer.host_sock_path.as_str()).unwrap(); 839 } 840 } 841 842 impl MuxerTestContext { 843 fn new(name: &str) -> Self { 844 let vsock_test_ctx = VsockTestContext::new(); 845 let mut handler_ctx = vsock_test_ctx.create_epoll_handler_context(); 846 let pkt = VsockPacket::from_rx_virtq_head( 847 &mut handler_ctx.handler.queues[0] 848 .iter(&vsock_test_ctx.mem) 849 .unwrap() 850 .next() 851 .unwrap(), 852 None, 853 ) 854 .unwrap(); 855 let uds_path = format!("test_vsock_{}.sock", name); 856 let muxer = VsockMuxer::new(PEER_CID, uds_path).unwrap(); 857 858 Self { 859 _vsock_test_ctx: vsock_test_ctx, 860 pkt, 861 muxer, 862 } 863 } 864 865 fn init_pkt(&mut self, local_port: u32, peer_port: u32, op: u16) -> &mut VsockPacket { 866 for b in self.pkt.hdr_mut() { 867 *b = 0; 868 } 869 self.pkt 870 .set_type(uapi::VSOCK_TYPE_STREAM) 871 .set_src_cid(PEER_CID) 872 .set_dst_cid(uapi::VSOCK_HOST_CID) 873 .set_src_port(peer_port) 874 .set_dst_port(local_port) 875 .set_op(op) 876 .set_buf_alloc(PEER_BUF_ALLOC) 877 } 878 879 fn init_data_pkt( 880 &mut self, 881 local_port: u32, 882 peer_port: u32, 883 data: &[u8], 884 ) -> &mut VsockPacket { 885 assert!(data.len() <= self.pkt.buf().unwrap().len()); 886 self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RW) 887 .set_len(data.len() as u32); 888 self.pkt.buf_mut().unwrap()[..data.len()].copy_from_slice(data); 889 &mut self.pkt 890 } 891 892 fn send(&mut self) { 893 self.muxer.send_pkt(&self.pkt).unwrap(); 894 } 895 896 fn recv(&mut self) { 897 self.muxer.recv_pkt(&mut self.pkt).unwrap(); 898 } 899 900 fn notify_muxer(&mut self) { 901 self.muxer.notify(epoll::Events::EPOLLIN); 902 } 903 904 fn count_epoll_listeners(&self) -> (usize, usize) { 905 let mut local_lsn_count = 0usize; 906 let mut conn_lsn_count = 0usize; 907 for key in self.muxer.listener_map.values() { 908 match key { 909 EpollListener::LocalStream(_) => local_lsn_count += 1, 910 EpollListener::Connection { .. } => conn_lsn_count += 1, 911 _ => (), 912 }; 913 } 914 (local_lsn_count, conn_lsn_count) 915 } 916 917 fn create_local_listener(&self, port: u32) -> LocalListener { 918 LocalListener::new(format!("{}_{}", self.muxer.host_sock_path, port)) 919 } 920 921 fn local_connect(&mut self, peer_port: u32) -> (UnixStream, u32) { 922 let (init_local_lsn_count, init_conn_lsn_count) = self.count_epoll_listeners(); 923 924 let mut stream = UnixStream::connect(self.muxer.host_sock_path.clone()).unwrap(); 925 stream.set_nonblocking(true).unwrap(); 926 // The muxer would now get notified of a new connection having arrived at its Unix 927 // socket, so it can accept it. 928 self.notify_muxer(); 929 930 // Just after having accepted a new local connection, the muxer should've added a new 931 // `LocalStream` listener to its `listener_map`. 932 let (local_lsn_count, _) = self.count_epoll_listeners(); 933 assert_eq!(local_lsn_count, init_local_lsn_count + 1); 934 935 let buf = format!("CONNECT {}\n", peer_port); 936 stream.write_all(buf.as_bytes()).unwrap(); 937 // The muxer would now get notified that data is available for reading from the locally 938 // initiated connection. 939 self.notify_muxer(); 940 941 // Successfully reading and parsing the connection request should have removed the 942 // LocalStream epoll listener and added a Connection epoll listener. 943 let (local_lsn_count, conn_lsn_count) = self.count_epoll_listeners(); 944 assert_eq!(local_lsn_count, init_local_lsn_count); 945 assert_eq!(conn_lsn_count, init_conn_lsn_count + 1); 946 947 // A LocalInit connection should've been added to the muxer connection map. A new 948 // local port should also have been allocated for the new LocalInit connection. 949 let local_port = self.muxer.local_port_last; 950 let key = ConnMapKey { 951 local_port, 952 peer_port, 953 }; 954 assert!(self.muxer.conn_map.contains_key(&key)); 955 assert!(self.muxer.local_port_set.contains(&local_port)); 956 957 // A connection request for the peer should now be available from the muxer. 958 assert!(self.muxer.has_pending_rx()); 959 self.recv(); 960 assert_eq!(self.pkt.op(), uapi::VSOCK_OP_REQUEST); 961 assert_eq!(self.pkt.dst_port(), peer_port); 962 assert_eq!(self.pkt.src_port(), local_port); 963 964 self.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RESPONSE); 965 self.send(); 966 967 let mut buf = vec![0u8; 32]; 968 let len = stream.read(&mut buf[..]).unwrap(); 969 assert_eq!(&buf[..len], format!("OK {}\n", local_port).as_bytes()); 970 971 (stream, local_port) 972 } 973 } 974 975 struct LocalListener { 976 path: PathBuf, 977 sock: UnixListener, 978 } 979 impl LocalListener { 980 fn new<P: AsRef<Path> + Clone>(path: P) -> Self { 981 let path_buf = path.as_ref().to_path_buf(); 982 let sock = UnixListener::bind(path).unwrap(); 983 sock.set_nonblocking(true).unwrap(); 984 Self { 985 path: path_buf, 986 sock, 987 } 988 } 989 fn accept(&mut self) -> UnixStream { 990 let (stream, _) = self.sock.accept().unwrap(); 991 stream.set_nonblocking(true).unwrap(); 992 stream 993 } 994 } 995 impl Drop for LocalListener { 996 fn drop(&mut self) { 997 std::fs::remove_file(&self.path).unwrap(); 998 } 999 } 1000 1001 #[test] 1002 fn test_muxer_epoll_listener() { 1003 let ctx = MuxerTestContext::new("muxer_epoll_listener"); 1004 assert_eq!(ctx.muxer.get_polled_fd(), ctx.muxer.epoll_file.as_raw_fd()); 1005 assert_eq!(ctx.muxer.get_polled_evset(), epoll::Events::EPOLLIN); 1006 } 1007 1008 #[test] 1009 fn test_bad_peer_pkt() { 1010 const LOCAL_PORT: u32 = 1026; 1011 const PEER_PORT: u32 = 1025; 1012 const SOCK_DGRAM: u16 = 2; 1013 1014 let mut ctx = MuxerTestContext::new("bad_peer_pkt"); 1015 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST) 1016 .set_type(SOCK_DGRAM); 1017 ctx.send(); 1018 1019 // The guest sent a SOCK_DGRAM packet. Per the vsock spec, we need to reply with an RST 1020 // packet, since vsock only supports stream sockets. 1021 assert!(ctx.muxer.has_pending_rx()); 1022 ctx.recv(); 1023 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1024 assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1025 assert_eq!(ctx.pkt.dst_cid(), PEER_CID); 1026 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1027 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1028 1029 // Any orphan (i.e. without a connection), non-RST packet, should be replied to with an 1030 // RST. 1031 let bad_ops = [ 1032 uapi::VSOCK_OP_RESPONSE, 1033 uapi::VSOCK_OP_CREDIT_REQUEST, 1034 uapi::VSOCK_OP_CREDIT_UPDATE, 1035 uapi::VSOCK_OP_SHUTDOWN, 1036 uapi::VSOCK_OP_RW, 1037 ]; 1038 for op in bad_ops.iter() { 1039 ctx.init_pkt(LOCAL_PORT, PEER_PORT, *op); 1040 ctx.send(); 1041 assert!(ctx.muxer.has_pending_rx()); 1042 ctx.recv(); 1043 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1044 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1045 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1046 } 1047 1048 // Any packet addressed to anything other than VSOCK_VHOST_CID should get dropped. 1049 assert!(!ctx.muxer.has_pending_rx()); 1050 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST) 1051 .set_dst_cid(uapi::VSOCK_HOST_CID + 1); 1052 ctx.send(); 1053 assert!(!ctx.muxer.has_pending_rx()); 1054 } 1055 1056 #[test] 1057 fn test_peer_connection() { 1058 const LOCAL_PORT: u32 = 1026; 1059 const PEER_PORT: u32 = 1025; 1060 1061 let mut ctx = MuxerTestContext::new("peer_connection"); 1062 1063 // Test peer connection refused. 1064 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST); 1065 ctx.send(); 1066 ctx.recv(); 1067 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1068 assert_eq!(ctx.pkt.len(), 0); 1069 assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1070 assert_eq!(ctx.pkt.dst_cid(), PEER_CID); 1071 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1072 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1073 1074 // Test peer connection accepted. 1075 let mut listener = ctx.create_local_listener(LOCAL_PORT); 1076 ctx.init_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST); 1077 ctx.send(); 1078 assert_eq!(ctx.muxer.conn_map.len(), 1); 1079 let mut stream = listener.accept(); 1080 ctx.recv(); 1081 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1082 assert_eq!(ctx.pkt.len(), 0); 1083 assert_eq!(ctx.pkt.src_cid(), uapi::VSOCK_HOST_CID); 1084 assert_eq!(ctx.pkt.dst_cid(), PEER_CID); 1085 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1086 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1087 let key = ConnMapKey { 1088 local_port: LOCAL_PORT, 1089 peer_port: PEER_PORT, 1090 }; 1091 assert!(ctx.muxer.conn_map.contains_key(&key)); 1092 1093 // Test guest -> host data flow. 1094 let data = [1, 2, 3, 4]; 1095 ctx.init_data_pkt(LOCAL_PORT, PEER_PORT, &data); 1096 ctx.send(); 1097 let mut buf = vec![0; data.len()]; 1098 stream.read_exact(buf.as_mut_slice()).unwrap(); 1099 assert_eq!(buf.as_slice(), data); 1100 1101 // Test host -> guest data flow. 1102 let data = [5u8, 6, 7, 8]; 1103 stream.write_all(&data).unwrap(); 1104 1105 // When data is available on the local stream, an EPOLLIN event would normally be delivered 1106 // to the muxer's nested epoll FD. For testing only, we can fake that event notification 1107 // here. 1108 ctx.notify_muxer(); 1109 // After being notified, the muxer should've figured out that RX data was available for one 1110 // of its connections, so it should now be reporting that it can fill in an RX packet. 1111 assert!(ctx.muxer.has_pending_rx()); 1112 ctx.recv(); 1113 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 1114 assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data); 1115 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 1116 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 1117 1118 assert!(!ctx.muxer.has_pending_rx()); 1119 } 1120 1121 #[test] 1122 fn test_local_connection() { 1123 let mut ctx = MuxerTestContext::new("local_connection"); 1124 let peer_port = 1025; 1125 let (mut stream, local_port) = ctx.local_connect(peer_port); 1126 1127 // Test guest -> host data flow. 1128 let data = [1, 2, 3, 4]; 1129 ctx.init_data_pkt(local_port, peer_port, &data); 1130 ctx.send(); 1131 1132 let mut buf = vec![0u8; data.len()]; 1133 stream.read_exact(buf.as_mut_slice()).unwrap(); 1134 assert_eq!(buf.as_slice(), &data); 1135 1136 // Test host -> guest data flow. 1137 let data = [5, 6, 7, 8]; 1138 stream.write_all(&data).unwrap(); 1139 ctx.notify_muxer(); 1140 1141 assert!(ctx.muxer.has_pending_rx()); 1142 ctx.recv(); 1143 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 1144 assert_eq!(ctx.pkt.src_port(), local_port); 1145 assert_eq!(ctx.pkt.dst_port(), peer_port); 1146 assert_eq!(ctx.pkt.buf().unwrap()[..data.len()], data); 1147 } 1148 1149 #[test] 1150 fn test_local_close() { 1151 let peer_port = 1025; 1152 let mut ctx = MuxerTestContext::new("local_close"); 1153 let local_port; 1154 { 1155 let (_stream, local_port_) = ctx.local_connect(peer_port); 1156 local_port = local_port_; 1157 } 1158 // Local var `_stream` was now dropped, thus closing the local stream. After the muxer gets 1159 // notified via EPOLLIN, it should attempt to gracefully shutdown the connection, issuing a 1160 // VSOCK_OP_SHUTDOWN with both no-more-send and no-more-recv indications set. 1161 ctx.notify_muxer(); 1162 assert!(ctx.muxer.has_pending_rx()); 1163 ctx.recv(); 1164 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN); 1165 assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND, 0); 1166 assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV, 0); 1167 assert_eq!(ctx.pkt.src_port(), local_port); 1168 assert_eq!(ctx.pkt.dst_port(), peer_port); 1169 1170 // The connection should get removed (and its local port freed), after the peer replies 1171 // with an RST. 1172 ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_RST); 1173 ctx.send(); 1174 let key = ConnMapKey { 1175 local_port, 1176 peer_port, 1177 }; 1178 assert!(!ctx.muxer.conn_map.contains_key(&key)); 1179 assert!(!ctx.muxer.local_port_set.contains(&local_port)); 1180 } 1181 1182 #[test] 1183 fn test_peer_close() { 1184 let peer_port = 1025; 1185 let local_port = 1026; 1186 let mut ctx = MuxerTestContext::new("peer_close"); 1187 1188 let mut sock = ctx.create_local_listener(local_port); 1189 ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_REQUEST); 1190 ctx.send(); 1191 let mut stream = sock.accept(); 1192 1193 assert!(ctx.muxer.has_pending_rx()); 1194 ctx.recv(); 1195 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1196 assert_eq!(ctx.pkt.src_port(), local_port); 1197 assert_eq!(ctx.pkt.dst_port(), peer_port); 1198 let key = ConnMapKey { 1199 local_port, 1200 peer_port, 1201 }; 1202 assert!(ctx.muxer.conn_map.contains_key(&key)); 1203 1204 // Emulate a full shutdown from the peer (no-more-send + no-more-recv). 1205 ctx.init_pkt(local_port, peer_port, uapi::VSOCK_OP_SHUTDOWN) 1206 .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND) 1207 .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV); 1208 ctx.send(); 1209 1210 // Now, the muxer should remove the connection from its map, and reply with an RST. 1211 assert!(ctx.muxer.has_pending_rx()); 1212 ctx.recv(); 1213 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1214 assert_eq!(ctx.pkt.src_port(), local_port); 1215 assert_eq!(ctx.pkt.dst_port(), peer_port); 1216 let key = ConnMapKey { 1217 local_port, 1218 peer_port, 1219 }; 1220 assert!(!ctx.muxer.conn_map.contains_key(&key)); 1221 1222 // The muxer should also drop / close the local Unix socket for this connection. 1223 let mut buf = vec![0u8; 16]; 1224 assert_eq!(stream.read(buf.as_mut_slice()).unwrap(), 0); 1225 } 1226 1227 #[test] 1228 fn test_muxer_rxq() { 1229 let mut ctx = MuxerTestContext::new("muxer_rxq"); 1230 let local_port = 1026; 1231 let peer_port_first = 1025; 1232 let mut listener = ctx.create_local_listener(local_port); 1233 let mut streams: Vec<UnixStream> = Vec::new(); 1234 1235 for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE { 1236 ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST); 1237 ctx.send(); 1238 streams.push(listener.accept()); 1239 } 1240 1241 // The muxer RX queue should now be full (with connection responses), but still 1242 // synchronized. 1243 assert!(ctx.muxer.rxq.is_synced()); 1244 1245 // One more queued reply should desync the RX queue. 1246 ctx.init_pkt( 1247 local_port, 1248 (peer_port_first + defs::MUXER_RXQ_SIZE) as u32, 1249 uapi::VSOCK_OP_REQUEST, 1250 ); 1251 ctx.send(); 1252 assert!(!ctx.muxer.rxq.is_synced()); 1253 1254 // With an out-of-sync queue, an RST should evict any non-RST packet from the queue, and 1255 // take its place. We'll check that by making sure that the last packet popped from the 1256 // queue is an RST. 1257 ctx.init_pkt( 1258 local_port + 1, 1259 peer_port_first as u32, 1260 uapi::VSOCK_OP_REQUEST, 1261 ); 1262 ctx.send(); 1263 1264 for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE - 1 { 1265 ctx.recv(); 1266 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1267 // The response order should hold. The evicted response should have been the last 1268 // enqueued. 1269 assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 1270 } 1271 // There should be one more packet in the queue: the RST. 1272 assert_eq!(ctx.muxer.rxq.len(), 1); 1273 ctx.recv(); 1274 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1275 1276 // The queue should now be empty, but out-of-sync, so the muxer should report it has some 1277 // pending RX. 1278 assert!(ctx.muxer.rxq.is_empty()); 1279 assert!(!ctx.muxer.rxq.is_synced()); 1280 assert!(ctx.muxer.has_pending_rx()); 1281 1282 // The next recv should sync the queue back up. It should also yield one of the two 1283 // responses that are still left: 1284 // - the one that desynchronized the queue; and 1285 // - the one that got evicted by the RST. 1286 ctx.recv(); 1287 assert!(ctx.muxer.rxq.is_synced()); 1288 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1289 1290 assert!(ctx.muxer.has_pending_rx()); 1291 ctx.recv(); 1292 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1293 } 1294 1295 #[test] 1296 fn test_muxer_killq() { 1297 let mut ctx = MuxerTestContext::new("muxer_killq"); 1298 let local_port = 1026; 1299 let peer_port_first = 1025; 1300 let peer_port_last = peer_port_first + defs::MUXER_KILLQ_SIZE; 1301 let mut listener = ctx.create_local_listener(local_port); 1302 1303 for peer_port in peer_port_first..=peer_port_last { 1304 ctx.init_pkt(local_port, peer_port as u32, uapi::VSOCK_OP_REQUEST); 1305 ctx.send(); 1306 ctx.notify_muxer(); 1307 ctx.recv(); 1308 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1309 assert_eq!(ctx.pkt.src_port(), local_port); 1310 assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 1311 { 1312 let _stream = listener.accept(); 1313 } 1314 ctx.notify_muxer(); 1315 ctx.recv(); 1316 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN); 1317 assert_eq!(ctx.pkt.src_port(), local_port); 1318 assert_eq!(ctx.pkt.dst_port(), peer_port as u32); 1319 // The kill queue should be synchronized, up until the `defs::MUXER_KILLQ_SIZE`th 1320 // connection we schedule for termination. 1321 assert_eq!( 1322 ctx.muxer.killq.is_synced(), 1323 peer_port < peer_port_first + defs::MUXER_KILLQ_SIZE 1324 ); 1325 } 1326 1327 assert!(!ctx.muxer.killq.is_synced()); 1328 assert!(!ctx.muxer.has_pending_rx()); 1329 1330 // Wait for the kill timers to expire. 1331 std::thread::sleep(std::time::Duration::from_millis( 1332 csm_defs::CONN_SHUTDOWN_TIMEOUT_MS, 1333 )); 1334 1335 // Trigger a kill queue sweep, by requesting a new connection. 1336 ctx.init_pkt( 1337 local_port, 1338 peer_port_last as u32 + 1, 1339 uapi::VSOCK_OP_REQUEST, 1340 ); 1341 ctx.send(); 1342 1343 // After sweeping the kill queue, it should now be synced (assuming the RX queue is larger 1344 // than the kill queue, since an RST packet will be queued for each killed connection). 1345 assert!(ctx.muxer.killq.is_synced()); 1346 assert!(ctx.muxer.has_pending_rx()); 1347 // There should be `defs::MUXER_KILLQ_SIZE` RSTs in the RX queue, from terminating the 1348 // dying connections in the recent killq sweep. 1349 for _p in peer_port_first..peer_port_last { 1350 ctx.recv(); 1351 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1352 assert_eq!(ctx.pkt.src_port(), local_port); 1353 } 1354 1355 // There should be one more packet in the RX queue: the connection response our request 1356 // that triggered the kill queue sweep. 1357 ctx.recv(); 1358 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 1359 assert_eq!(ctx.pkt.dst_port(), peer_port_last as u32 + 1); 1360 1361 assert!(!ctx.muxer.has_pending_rx()); 1362 } 1363 1364 #[test] 1365 fn test_regression_handshake() { 1366 // Address one of the issues found while fixing the following issue: 1367 // https://github.com/firecracker-microvm/firecracker/issues/1751 1368 // This test checks that the handshake message is not accounted for 1369 let mut ctx = MuxerTestContext::new("regression_handshake"); 1370 let peer_port = 1025; 1371 1372 // Create a local connection. 1373 let (_, local_port) = ctx.local_connect(peer_port); 1374 1375 // Get the connection from the connection map. 1376 let key = ConnMapKey { 1377 local_port, 1378 peer_port, 1379 }; 1380 let conn = ctx.muxer.conn_map.get_mut(&key).unwrap(); 1381 1382 // Check that fwd_cnt is 0 - "OK ..." was not accounted for. 1383 assert_eq!(conn.fwd_cnt().0, 0); 1384 } 1385 1386 #[test] 1387 fn test_regression_rxq_pop() { 1388 // Address one of the issues found while fixing the following issue: 1389 // https://github.com/firecracker-microvm/firecracker/issues/1751 1390 // This test checks that a connection is not popped out of the muxer 1391 // rxq when multiple flags are set 1392 let mut ctx = MuxerTestContext::new("regression_rxq_pop"); 1393 let peer_port = 1025; 1394 let (mut stream, local_port) = ctx.local_connect(peer_port); 1395 1396 // Send some data. 1397 let data = [5u8, 6, 7, 8]; 1398 stream.write_all(&data).unwrap(); 1399 ctx.notify_muxer(); 1400 1401 // Get the connection from the connection map. 1402 let key = ConnMapKey { 1403 local_port, 1404 peer_port, 1405 }; 1406 let conn = ctx.muxer.conn_map.get_mut(&key).unwrap(); 1407 1408 // Forcefully insert another flag. 1409 conn.insert_credit_update(); 1410 1411 // Call recv twice in order to check that the connection is still 1412 // in the rxq. 1413 assert!(ctx.muxer.has_pending_rx()); 1414 ctx.recv(); 1415 assert!(ctx.muxer.has_pending_rx()); 1416 ctx.recv(); 1417 1418 // Since initially the connection had two flags set, now there should 1419 // not be any pending RX in the muxer. 1420 assert!(!ctx.muxer.has_pending_rx()); 1421 } 1422 } 1423