1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 // 4 //! The main job of `VsockConnection` is to forward data traffic, back and forth, between a 5 //! guest-side AF_VSOCK socket and a host-side generic `Read + Write + AsRawFd` stream, while 6 //! also managing its internal state. 7 //! To that end, `VsockConnection` implements: 8 //! - `VsockChannel` for: 9 //! - moving data from the host stream to a guest-provided RX buffer, via `recv_pkt()`; and 10 //! - moving data from a guest-provided TX buffer to the host stream, via `send_pkt()`; and 11 //! - updating its internal state, by absorbing control packets (anything other than 12 //! VSOCK_OP_RW). 13 //! - `VsockEpollListener` for getting notified about the availability of data or free buffer 14 //! space at the host stream. 15 //! 16 //! Note: there is a certain asymmetry to the RX and TX data flows: 17 //! - RX transfers do not need any data buffering, since data is read straight from the 18 //! host stream and into the guest-provided RX buffer; 19 //! - TX transfers may require some data to be buffered by `VsockConnection`, if the host 20 //! peer can't keep up with reading the data that we're writing. This is because, once 21 //! the guest driver provides some data in a virtio TX buffer, the vsock device must 22 //! consume it. If that data can't be forwarded straight to the host stream, we'll 23 //! have to store it in a buffer (and flush it at a later time). Vsock flow control 24 //! ensures that our TX buffer doesn't overflow. 25 // 26 // The code in this file is best read with a fresh memory of the vsock protocol inner-workings. 27 // To help with that, here is a 28 // 29 // Short primer on the vsock protocol 30 // ---------------------------------- 31 // 32 // 1. Establishing a connection 33 // A vsock connection is considered established after a two-way handshake: 34 // - the initiating peer sends a connection request packet (`hdr.op` == VSOCK_OP_REQUEST); 35 // then 36 // - the listening peer sends back a connection response packet (`hdr.op` == 37 // VSOCK_OP_RESPONSE). 38 // 39 // 2. Terminating a connection 40 // When a peer wants to shut down an established connection, it sends a VSOCK_OP_SHUTDOWN 41 // packet. Two header flags are used with VSOCK_OP_SHUTDOWN, indicating the sender's 42 // intention: 43 // - VSOCK_FLAGS_SHUTDOWN_RCV: the sender will receive no more data for this connection; and 44 // - VSOCK_FLAGS_SHUTDOWN_SEND: the sender will send no more data for this connection. 45 // After a shutdown packet, the receiving peer will have some protocol-undefined time to 46 // flush its buffers, and then forcefully terminate the connection by sending back an RST 47 // packet. If the shutdown-initiating peer doesn't receive this RST packet during a timeout 48 // period, it will send one itself, thus terminating the connection. 49 // Note: a peer can send more than one VSOCK_OP_SHUTDOWN packets. However, read/write 50 // indications cannot be undone. E.g. once a "no-more-sending" promise was made, it 51 // cannot be taken back. That is, `hdr.flags` will be ORed between subsequent 52 // VSOCK_OP_SHUTDOWN packets. 53 // 54 // 3. Flow control 55 // Before sending a data packet (VSOCK_OP_RW), the sender must make sure that the receiver 56 // has enough free buffer space to store that data. If this condition is not respected, the 57 // receiving peer's behavior is undefined. In this implementation, we forcefully terminate 58 // the connection by sending back a VSOCK_OP_RST packet. 59 // Note: all buffer space information is computed and stored on a per-connection basis. 60 // Peers keep each other informed about the free buffer space they have by filling in two 61 // packet header members with each packet they send: 62 // - `hdr.buf_alloc`: the total buffer space the peer has allocated for receiving data; and 63 // - `hdr.fwd_cnt`: the total number of bytes the peer has successfully flushed out of its 64 // buffer. 65 // One can figure out how much space its peer has available in its buffer by inspecting the 66 // difference between how much it has sent to the peer and how much the peer has flushed out 67 // (i.e. "forwarded", in the vsock spec terminology): 68 // `peer_free = peer_buf_alloc - (total_bytes_sent_to_peer - peer_fwd_cnt)`. 69 // Note: the above requires that peers constantly keep each other informed on their buffer 70 // space situation. However, since there are no receipt acknowledgement packets 71 // defined for the vsock protocol, packet flow can often be unidirectional (just one 72 // peer sending data to another), so the sender's information about the receiver's 73 // buffer space can get quickly outdated. The vsock protocol defines two solutions to 74 // this problem: 75 // 1. The sender can explicitly ask for a buffer space (i.e. "credit") update from its 76 // peer, via a VSOCK_OP_CREDIT_REQUEST packet, to which it will get a 77 // VSOCK_OP_CREDIT_UPDATE response (or any response will do, really, since credit 78 // information must be included in any packet); 79 // 2. The receiver can be proactive, and send VSOCK_OP_CREDIT_UPDATE packet, whenever 80 // it thinks its peer's information is out of date. 81 // Our implementation uses the proactive approach. 82 // 83 use std::io::{ErrorKind, Read, Write}; 84 use std::num::Wrapping; 85 use std::os::unix::io::{AsRawFd, RawFd}; 86 use std::time::{Duration, Instant}; 87 88 use super::super::defs::uapi; 89 use super::super::packet::VsockPacket; 90 use super::super::{Result as VsockResult, VsockChannel, VsockEpollListener, VsockError}; 91 use super::txbuf::TxBuf; 92 use super::{defs, ConnState, Error, PendingRx, PendingRxSet, Result}; 93 94 /// A self-managing connection object, that handles communication between a guest-side AF_VSOCK 95 /// socket and a host-side `Read + Write + AsRawFd` stream. 96 /// 97 pub struct VsockConnection<S: Read + Write + AsRawFd> { 98 /// The current connection state. 99 state: ConnState, 100 /// The local CID. Most of the time this will be the constant `2` (the vsock host CID). 101 local_cid: u64, 102 /// The peer (guest) CID. 103 peer_cid: u64, 104 /// The local (host) port. 105 local_port: u32, 106 /// The peer (guest) port. 107 peer_port: u32, 108 /// The (connected) host-side stream. 109 stream: S, 110 /// The TX buffer for this connection. 111 tx_buf: TxBuf, 112 /// Total number of bytes that have been successfully written to `self.stream`, either 113 /// directly, or flushed from `self.tx_buf`. 114 fwd_cnt: Wrapping<u32>, 115 /// The amount of buffer space that the peer (guest) has allocated for this connection. 116 peer_buf_alloc: u32, 117 /// The total number of bytes that the peer has forwarded away. 118 peer_fwd_cnt: Wrapping<u32>, 119 /// The total number of bytes sent to the peer (guest vsock driver) 120 rx_cnt: Wrapping<u32>, 121 /// Our `self.fwd_cnt`, as last sent to the peer. This is used to provide proactive credit 122 /// updates, and let the peer know it's OK to send more data. 123 last_fwd_cnt_to_peer: Wrapping<u32>, 124 /// The set of pending RX packet indications that `recv_pkt()` will use to fill in a 125 /// packet for the peer (guest). 126 pending_rx: PendingRxSet, 127 /// Instant when this connection should be scheduled for immediate termination, due to some 128 /// timeout condition having been fulfilled. 129 expiry: Option<Instant>, 130 } 131 132 impl<S> VsockChannel for VsockConnection<S> 133 where 134 S: Read + Write + AsRawFd, 135 { 136 /// Fill in a vsock packet, to be delivered to our peer (the guest driver). 137 /// 138 /// As per the `VsockChannel` trait, this should only be called when there is data to be 139 /// fetched from the channel (i.e. `has_pending_rx()` is true). Otherwise, it will error 140 /// out with `VsockError::NoData`. 141 /// Pending RX indications are set by other mutable actions performed on the channel. For 142 /// instance, `send_pkt()` could set an Rst indication, if called with a VSOCK_OP_SHUTDOWN 143 /// packet, or `notify()` could set a Rw indication (a data packet can be fetched from the 144 /// channel), if data was ready to be read from the host stream. 145 /// 146 /// Returns: 147 /// - `Ok(())`: the packet has been successfully filled in and is ready for delivery; 148 /// - `Err(VsockError::NoData)`: there was no data available with which to fill in the 149 /// packet; 150 /// - `Err(VsockError::PktBufMissing)`: the packet would've been filled in with data, but 151 /// it is missing the data buffer. 152 /// 153 fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> { 154 // Perform some generic initialization that is the same for any packet operation (e.g. 155 // source, destination, credit, etc). 156 self.init_pkt(pkt); 157 158 // If forceful termination is pending, there's no point in checking for anything else. 159 // It's dead, Jim. 160 if self.pending_rx.remove(PendingRx::Rst) { 161 pkt.set_op(uapi::VSOCK_OP_RST); 162 return Ok(()); 163 } 164 165 // Next up: if we're due a connection confirmation, that's all we need to know to fill 166 // in this packet. 167 if self.pending_rx.remove(PendingRx::Response) { 168 self.state = ConnState::Established; 169 pkt.set_op(uapi::VSOCK_OP_RESPONSE); 170 return Ok(()); 171 } 172 173 // Same thing goes for locally-initiated connections that need to yield a connection 174 // request. 175 if self.pending_rx.remove(PendingRx::Request) { 176 self.expiry = 177 Some(Instant::now() + Duration::from_millis(defs::CONN_REQUEST_TIMEOUT_MS)); 178 pkt.set_op(uapi::VSOCK_OP_REQUEST); 179 return Ok(()); 180 } 181 182 if self.pending_rx.remove(PendingRx::Rw) { 183 // We're due to produce a data packet, by reading the data from the host-side 184 // Unix socket. 185 186 match self.state { 187 // A data packet is only valid for established connections, and connections for 188 // which our peer has initiated a graceful shutdown, but can still receive data. 189 ConnState::Established | ConnState::PeerClosed(false, _) => (), 190 _ => { 191 // Any other connection state is invalid at this point, and we need to kill it 192 // with fire. 193 pkt.set_op(uapi::VSOCK_OP_RST); 194 return Ok(()); 195 } 196 } 197 198 // Oh wait, before we start bringing in the big data, can our peer handle receiving so 199 // much bytes goodness? 200 if self.need_credit_update_from_peer() { 201 self.last_fwd_cnt_to_peer = self.fwd_cnt; 202 pkt.set_op(uapi::VSOCK_OP_CREDIT_REQUEST); 203 return Ok(()); 204 } 205 206 let buf = pkt.buf_mut().ok_or(VsockError::PktBufMissing)?; 207 208 // The maximum amount of data we can read in is limited by both the RX buffer size and 209 // the peer available buffer space. 210 let max_len = std::cmp::min(buf.len(), self.peer_avail_credit()); 211 212 // Read data from the stream straight to the RX buffer, for maximum throughput. 213 match self.stream.read(&mut buf[..max_len]) { 214 Ok(read_cnt) => { 215 if read_cnt == 0 { 216 // A 0-length read means the host stream was closed down. In that case, 217 // we'll ask our peer to shut down the connection. We can neither send nor 218 // receive any more data. 219 self.state = ConnState::LocalClosed; 220 self.expiry = Some( 221 Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS), 222 ); 223 pkt.set_op(uapi::VSOCK_OP_SHUTDOWN) 224 .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV) 225 .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND); 226 } else { 227 // On a successful data read, we fill in the packet with the RW op, and 228 // length of the read data. 229 pkt.set_op(uapi::VSOCK_OP_RW).set_len(read_cnt as u32); 230 } 231 self.rx_cnt += Wrapping(pkt.len()); 232 self.last_fwd_cnt_to_peer = self.fwd_cnt; 233 return Ok(()); 234 } 235 Err(err) if err.kind() == ErrorKind::WouldBlock => { 236 // This shouldn't actually happen (receiving EWOULDBLOCK after EPOLLIN), but 237 // apparently it does, so we need to handle it gracefully. 238 warn!( 239 "vsock: unexpected EWOULDBLOCK while reading from backing stream: \ 240 lp={}, pp={}, err={:?}", 241 self.local_port, self.peer_port, err 242 ); 243 } 244 Err(err) => { 245 // We are not expecting any other errors when reading from the underlying 246 // stream. If any show up, we'll immediately kill this connection. 247 error!( 248 "vsock: error reading from backing stream: lp={}, pp={}, err={:?}", 249 self.local_port, self.peer_port, err 250 ); 251 pkt.set_op(uapi::VSOCK_OP_RST); 252 self.last_fwd_cnt_to_peer = self.fwd_cnt; 253 return Ok(()); 254 } 255 }; 256 } 257 258 // A credit update is basically a no-op, so we should only waste a perfectly fine RX 259 // buffer on it if we really have nothing else to say, hence we check for this RX 260 // indication last. 261 if self.pending_rx.remove(PendingRx::CreditUpdate) && !self.has_pending_rx() { 262 pkt.set_op(uapi::VSOCK_OP_CREDIT_UPDATE); 263 self.last_fwd_cnt_to_peer = self.fwd_cnt; 264 return Ok(()); 265 } 266 267 // We've already checked for all conditions that would have produced a packet, so 268 // if we got to here, we don't know how to yield one. 269 Err(VsockError::NoData) 270 } 271 272 /// Deliver a guest-generated packet to this connection. 273 /// 274 /// This forwards the data in RW packets to the host stream, and absorbs control packets, 275 /// using them to manage the internal connection state. 276 /// 277 /// Returns: 278 /// always `Ok(())`: the packet has been consumed; 279 /// 280 fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> { 281 // Update the peer credit information. 282 self.peer_buf_alloc = pkt.buf_alloc(); 283 self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt()); 284 285 match self.state { 286 // Most frequent case: this is an established connection that needs to forward some 287 // data to the host stream. Also works for a connection that has begun shutting 288 // down, but the peer still has some data to send. 289 ConnState::Established | ConnState::PeerClosed(_, false) 290 if pkt.op() == uapi::VSOCK_OP_RW => 291 { 292 if pkt.buf().is_none() { 293 info!( 294 "vsock: dropping empty data packet from guest (lp={}, pp={}", 295 self.local_port, self.peer_port 296 ); 297 return Ok(()); 298 } 299 300 // Unwrapping here is safe, since we just checked `pkt.buf()` above. 301 let buf_slice = &pkt.buf().unwrap()[..(pkt.len() as usize)]; 302 if let Err(err) = self.send_bytes(buf_slice) { 303 // If we can't write to the host stream, that's an unrecoverable error, so 304 // we'll terminate this connection. 305 warn!( 306 "vsock: error writing to local stream (lp={}, pp={}): {:?}", 307 self.local_port, self.peer_port, err 308 ); 309 self.kill(); 310 return Ok(()); 311 } 312 313 // We might've just consumed some data. If that's the case, we might need to 314 // update the peer on our buffer space situation, so that it can keep sending 315 // data packets our way. 316 if self.peer_needs_credit_update() { 317 self.pending_rx.insert(PendingRx::CreditUpdate); 318 } 319 } 320 321 // Next up: receiving a response / confirmation for a host-initiated connection. 322 // We'll move to an Established state, and pass on the good news through the host 323 // stream. 324 ConnState::LocalInit if pkt.op() == uapi::VSOCK_OP_RESPONSE => { 325 self.expiry = None; 326 self.state = ConnState::Established; 327 } 328 329 // The peer wants to shut down an established connection. If they have nothing 330 // more to send nor receive, and we don't have to wait to drain our TX buffer, we 331 // can schedule an RST packet (to terminate the connection on the next recv call). 332 // Otherwise, we'll arm the kill timer. 333 ConnState::Established if pkt.op() == uapi::VSOCK_OP_SHUTDOWN => { 334 let recv_off = pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV != 0; 335 let send_off = pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND != 0; 336 self.state = ConnState::PeerClosed(recv_off, send_off); 337 if recv_off && send_off { 338 if self.tx_buf.is_empty() { 339 self.pending_rx.insert(PendingRx::Rst); 340 } else { 341 self.expiry = Some( 342 Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS), 343 ); 344 } 345 } 346 } 347 348 // The peer wants to update a shutdown request, with more receive/send indications. 349 // The same logic as above applies. 350 ConnState::PeerClosed(ref mut recv_off, ref mut send_off) 351 if pkt.op() == uapi::VSOCK_OP_SHUTDOWN => 352 { 353 *recv_off = *recv_off || (pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV != 0); 354 *send_off = *send_off || (pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND != 0); 355 if *recv_off && *send_off && self.tx_buf.is_empty() { 356 self.pending_rx.insert(PendingRx::Rst); 357 } 358 } 359 360 // A credit update from our peer is valid only in a state which allows data 361 // transfer towards the peer. 362 ConnState::Established | ConnState::PeerInit | ConnState::PeerClosed(false, _) 363 if pkt.op() == uapi::VSOCK_OP_CREDIT_UPDATE => 364 { 365 // Nothing to do here; we've already updated peer credit. 366 } 367 368 // A credit request from our peer is valid only in a state which allows data 369 // transfer from the peer. We'll respond with a credit update packet. 370 ConnState::Established | ConnState::PeerInit | ConnState::PeerClosed(_, false) 371 if pkt.op() == uapi::VSOCK_OP_CREDIT_REQUEST => 372 { 373 self.pending_rx.insert(PendingRx::CreditUpdate); 374 } 375 376 _ => { 377 debug!( 378 "vsock: dropping invalid TX pkt for connection: state={:?}, pkt.hdr={:?}", 379 self.state, 380 pkt.hdr() 381 ); 382 } 383 }; 384 385 Ok(()) 386 } 387 388 /// Check if the connection has any pending packet addressed to the peer. 389 /// 390 fn has_pending_rx(&self) -> bool { 391 !self.pending_rx.is_empty() 392 } 393 } 394 395 impl<S> VsockEpollListener for VsockConnection<S> 396 where 397 S: Read + Write + AsRawFd, 398 { 399 /// Get the file descriptor that this connection wants polled. 400 /// 401 /// The connection is interested in being notified about EPOLLIN / EPOLLOUT events on the 402 /// host stream. 403 /// 404 fn get_polled_fd(&self) -> RawFd { 405 self.stream.as_raw_fd() 406 } 407 408 /// Get the event set that this connection is interested in. 409 /// 410 /// A connection will want to be notified when: 411 /// - data is available to be read from the host stream, so that it can store an RW pending 412 /// RX indication; and 413 /// - data can be written to the host stream, and the TX buffer needs to be flushed. 414 /// 415 fn get_polled_evset(&self) -> epoll::Events { 416 let mut evset = epoll::Events::empty(); 417 if !self.tx_buf.is_empty() { 418 // There's data waiting in the TX buffer, so we are interested in being notified 419 // when writing to the host stream wouldn't block. 420 evset.insert(epoll::Events::EPOLLOUT); 421 } 422 // We're generally interested in being notified when data can be read from the host 423 // stream, unless we're in a state which doesn't allow moving data from host to guest. 424 match self.state { 425 ConnState::Killed | ConnState::LocalClosed | ConnState::PeerClosed(true, _) => (), 426 _ if self.need_credit_update_from_peer() => (), 427 _ => evset.insert(epoll::Events::EPOLLIN), 428 } 429 evset 430 } 431 432 /// Notify the connection about an event (or set of events) that it was interested in. 433 /// 434 fn notify(&mut self, evset: epoll::Events) { 435 if evset.contains(epoll::Events::EPOLLIN) { 436 // Data can be read from the host stream. Setting a Rw pending indication, so that 437 // the muxer will know to call `recv_pkt()` later. 438 self.pending_rx.insert(PendingRx::Rw); 439 } 440 441 if evset.contains(epoll::Events::EPOLLOUT) { 442 // Data can be written to the host stream. Time to flush out the TX buffer. 443 // 444 if self.tx_buf.is_empty() { 445 info!("vsock: connection received unexpected EPOLLOUT event"); 446 return; 447 } 448 let flushed = self 449 .tx_buf 450 .flush_to(&mut self.stream) 451 .unwrap_or_else(|err| { 452 warn!( 453 "vsock: error flushing TX buf for (lp={}, pp={}): {:?}", 454 self.local_port, self.peer_port, err 455 ); 456 match err { 457 Error::TxBufFlush(inner) if inner.kind() == ErrorKind::WouldBlock => { 458 // This should never happen (EWOULDBLOCK after EPOLLOUT), but 459 // it does, so let's absorb it. 460 } 461 _ => self.kill(), 462 }; 463 0 464 }); 465 self.fwd_cnt += Wrapping(flushed as u32); 466 467 // If this connection was shutting down, but is waiting to drain the TX buffer 468 // before forceful termination, the wait might be over. 469 if self.state == ConnState::PeerClosed(true, true) && self.tx_buf.is_empty() { 470 self.pending_rx.insert(PendingRx::Rst); 471 } else if self.peer_needs_credit_update() { 472 // If we've freed up some more buffer space, we may need to let the peer know it 473 // can safely send more data our way. 474 self.pending_rx.insert(PendingRx::CreditUpdate); 475 } 476 } 477 } 478 } 479 480 impl<S> VsockConnection<S> 481 where 482 S: Read + Write + AsRawFd, 483 { 484 /// Create a new guest-initiated connection object. 485 /// 486 pub fn new_peer_init( 487 stream: S, 488 local_cid: u64, 489 peer_cid: u64, 490 local_port: u32, 491 peer_port: u32, 492 peer_buf_alloc: u32, 493 ) -> Self { 494 Self { 495 local_cid, 496 peer_cid, 497 local_port, 498 peer_port, 499 stream, 500 state: ConnState::PeerInit, 501 tx_buf: TxBuf::new(), 502 fwd_cnt: Wrapping(0), 503 peer_buf_alloc, 504 peer_fwd_cnt: Wrapping(0), 505 rx_cnt: Wrapping(0), 506 last_fwd_cnt_to_peer: Wrapping(0), 507 pending_rx: PendingRxSet::from(PendingRx::Response), 508 expiry: None, 509 } 510 } 511 512 /// Create a new host-initiated connection object. 513 /// 514 pub fn new_local_init( 515 stream: S, 516 local_cid: u64, 517 peer_cid: u64, 518 local_port: u32, 519 peer_port: u32, 520 ) -> Self { 521 Self { 522 local_cid, 523 peer_cid, 524 local_port, 525 peer_port, 526 stream, 527 state: ConnState::LocalInit, 528 tx_buf: TxBuf::new(), 529 fwd_cnt: Wrapping(0), 530 peer_buf_alloc: 0, 531 peer_fwd_cnt: Wrapping(0), 532 rx_cnt: Wrapping(0), 533 last_fwd_cnt_to_peer: Wrapping(0), 534 pending_rx: PendingRxSet::from(PendingRx::Request), 535 expiry: None, 536 } 537 } 538 539 /// Check if there is an expiry (kill) timer set for this connection, sometime in the 540 /// future. 541 /// 542 pub fn will_expire(&self) -> bool { 543 match self.expiry { 544 None => false, 545 Some(t) => t > Instant::now(), 546 } 547 } 548 549 /// Check if this connection needs to be scheduled for forceful termination, due to its 550 /// kill timer having expired. 551 /// 552 pub fn has_expired(&self) -> bool { 553 match self.expiry { 554 None => false, 555 Some(t) => t <= Instant::now(), 556 } 557 } 558 559 /// Get the kill timer value, if one is set. 560 /// 561 pub fn expiry(&self) -> Option<Instant> { 562 self.expiry 563 } 564 565 /// Schedule the connection to be forcefully terminated ASAP (i.e. the next time the 566 /// connection is asked to yield a packet, via `recv_pkt()`). 567 /// 568 pub fn kill(&mut self) { 569 self.state = ConnState::Killed; 570 self.pending_rx.insert(PendingRx::Rst); 571 } 572 573 /// Return the connections state. 574 /// 575 pub fn state(&self) -> ConnState { 576 self.state 577 } 578 579 /// Send some raw, untracked, data straight to the underlying connected stream. 580 /// Returns: number of bytes written, or the error describing the write failure. 581 /// 582 /// Warning: this will bypass the connection state machine and write directly to the 583 /// underlying stream. No account of this write is kept, which includes bypassing 584 /// vsock flow control. 585 /// 586 pub fn send_bytes_raw(&mut self, buf: &[u8]) -> Result<usize> { 587 self.stream.write(buf).map_err(Error::StreamWrite) 588 } 589 590 /// Send some raw data (a byte-slice) to the host stream. 591 /// 592 /// Raw data can either be sent straight to the host stream, or to our TX buffer, if the 593 /// former fails. 594 /// 595 fn send_bytes(&mut self, buf: &[u8]) -> Result<()> { 596 // If there is data in the TX buffer, that means we're already registered for EPOLLOUT 597 // events on the underlying stream. Therefore, there's no point in attempting a write 598 // at this point. `self.notify()` will get called when EPOLLOUT arrives, and it will 599 // attempt to drain the TX buffer then. 600 if !self.tx_buf.is_empty() { 601 return self.tx_buf.push(buf); 602 } 603 604 // The TX buffer is empty, so we can try to write straight to the host stream. 605 let written = match self.stream.write(buf) { 606 Ok(cnt) => cnt, 607 Err(e) => { 608 // Absorb any would-block errors, since we can always try again later. 609 if e.kind() == ErrorKind::WouldBlock { 610 0 611 } else { 612 // We don't know how to handle any other write error, so we'll send it up 613 // the call chain. 614 return Err(Error::StreamWrite(e)); 615 } 616 } 617 }; 618 // Move the "forwarded bytes" counter ahead by how much we were able to send out. 619 self.fwd_cnt += Wrapping(written as u32); 620 621 // If we couldn't write the whole slice, we'll need to push the remaining data to our 622 // buffer. 623 if written < buf.len() { 624 self.tx_buf.push(&buf[written..])?; 625 } 626 627 Ok(()) 628 } 629 630 /// Check if the credit information the peer has last received from us is outdated. 631 /// 632 fn peer_needs_credit_update(&self) -> bool { 633 let peer_seen_free_buf = 634 Wrapping(defs::CONN_TX_BUF_SIZE) - (self.fwd_cnt - self.last_fwd_cnt_to_peer); 635 peer_seen_free_buf < Wrapping(defs::CONN_CREDIT_UPDATE_THRESHOLD) 636 } 637 638 /// Check if we need to ask the peer for a credit update before sending any more data its 639 /// way. 640 /// 641 fn need_credit_update_from_peer(&self) -> bool { 642 self.peer_avail_credit() == 0 643 } 644 645 /// Get the maximum number of bytes that we can send to our peer, without overflowing its 646 /// buffer. 647 /// 648 fn peer_avail_credit(&self) -> usize { 649 (Wrapping(self.peer_buf_alloc) - (self.rx_cnt - self.peer_fwd_cnt)).0 as usize 650 } 651 652 /// Prepare a packet header for transmission to our peer. 653 /// 654 fn init_pkt<'a>(&self, pkt: &'a mut VsockPacket) -> &'a mut VsockPacket { 655 // Make sure the header is zeroed-out first. 656 // This looks sub-optimal, but it is actually optimized-out in the compiled code to be 657 // faster than a memset(). 658 for b in pkt.hdr_mut() { 659 *b = 0; 660 } 661 662 pkt.set_src_cid(self.local_cid) 663 .set_dst_cid(self.peer_cid) 664 .set_src_port(self.local_port) 665 .set_dst_port(self.peer_port) 666 .set_type(uapi::VSOCK_TYPE_STREAM) 667 .set_buf_alloc(defs::CONN_TX_BUF_SIZE) 668 .set_fwd_cnt(self.fwd_cnt.0) 669 } 670 } 671 672 #[cfg(test)] 673 mod tests { 674 use std::io::{Error as IoError, Result as IoResult}; 675 676 use libc::EFD_NONBLOCK; 677 use virtio_queue::QueueOwnedT; 678 use vmm_sys_util::eventfd::EventFd; 679 680 use super::super::super::tests::TestContext; 681 use super::super::defs as csm_defs; 682 use super::*; 683 684 const LOCAL_CID: u64 = 2; 685 const PEER_CID: u64 = 3; 686 const LOCAL_PORT: u32 = 1002; 687 const PEER_PORT: u32 = 1003; 688 const PEER_BUF_ALLOC: u32 = 64 * 1024; 689 690 enum StreamState { 691 Closed, 692 Error(ErrorKind), 693 Ready, 694 WouldBlock, 695 } 696 697 struct TestStream { 698 fd: EventFd, 699 read_buf: Vec<u8>, 700 read_state: StreamState, 701 write_buf: Vec<u8>, 702 write_state: StreamState, 703 } 704 impl TestStream { 705 fn new() -> Self { 706 Self { 707 fd: EventFd::new(EFD_NONBLOCK).unwrap(), 708 read_state: StreamState::Ready, 709 write_state: StreamState::Ready, 710 read_buf: Vec::new(), 711 write_buf: Vec::new(), 712 } 713 } 714 fn new_with_read_buf(buf: &[u8]) -> Self { 715 let mut stream = Self::new(); 716 stream.read_buf = buf.to_vec(); 717 stream 718 } 719 } 720 721 impl AsRawFd for TestStream { 722 fn as_raw_fd(&self) -> RawFd { 723 self.fd.as_raw_fd() 724 } 725 } 726 727 impl Read for TestStream { 728 fn read(&mut self, data: &mut [u8]) -> IoResult<usize> { 729 match self.read_state { 730 StreamState::Closed => Ok(0), 731 StreamState::Error(kind) => Err(IoError::new(kind, "whatevs")), 732 StreamState::Ready => { 733 if self.read_buf.is_empty() { 734 return Err(IoError::new(ErrorKind::WouldBlock, "EAGAIN")); 735 } 736 let len = std::cmp::min(data.len(), self.read_buf.len()); 737 assert_ne!(len, 0); 738 data[..len].copy_from_slice(&self.read_buf[..len]); 739 self.read_buf = self.read_buf.split_off(len); 740 Ok(len) 741 } 742 StreamState::WouldBlock => Err(IoError::new(ErrorKind::WouldBlock, "EAGAIN")), 743 } 744 } 745 } 746 747 impl Write for TestStream { 748 fn write(&mut self, data: &[u8]) -> IoResult<usize> { 749 match self.write_state { 750 StreamState::Closed => Err(IoError::new(ErrorKind::BrokenPipe, "EPIPE")), 751 StreamState::Error(kind) => Err(IoError::new(kind, "whatevs")), 752 StreamState::Ready => { 753 self.write_buf.extend_from_slice(data); 754 Ok(data.len()) 755 } 756 StreamState::WouldBlock => Err(IoError::new(ErrorKind::WouldBlock, "EAGAIN")), 757 } 758 } 759 fn flush(&mut self) -> IoResult<()> { 760 Ok(()) 761 } 762 } 763 764 impl<S> VsockConnection<S> 765 where 766 S: Read + Write + AsRawFd, 767 { 768 /// Get the fwd_cnt value from the connection. 769 pub(crate) fn fwd_cnt(&self) -> Wrapping<u32> { 770 self.fwd_cnt 771 } 772 773 /// Forcefully insert a credit update flag. 774 pub(crate) fn insert_credit_update(&mut self) { 775 self.pending_rx.insert(PendingRx::CreditUpdate); 776 } 777 } 778 779 fn init_pkt(pkt: &mut VsockPacket, op: u16, len: u32) -> &mut VsockPacket { 780 for b in pkt.hdr_mut() { 781 *b = 0; 782 } 783 pkt.set_src_cid(PEER_CID) 784 .set_dst_cid(LOCAL_CID) 785 .set_src_port(PEER_PORT) 786 .set_dst_port(LOCAL_PORT) 787 .set_type(uapi::VSOCK_TYPE_STREAM) 788 .set_buf_alloc(PEER_BUF_ALLOC) 789 .set_op(op) 790 .set_len(len) 791 } 792 793 // This is the connection state machine test context: a helper struct to provide CSM testing 794 // primitives. A single `VsockPacket` object will be enough for our testing needs. We'll be 795 // using it for simulating both packet sends and packet receives. We need to keep the vsock 796 // testing context alive, since `VsockPacket` is just a pointer-wrapper over some data that 797 // resides in guest memory. The vsock test context owns the `GuestMemory` object, so we'll make 798 // it a member here, in order to make sure that guest memory outlives our testing packet. A 799 // single `VsockConnection` object will also suffice for our testing needs. We'll be using a 800 // specially crafted `Read + Write + AsRawFd` object as a backing stream, so that we can 801 // control the various error conditions that might arise. 802 struct CsmTestContext { 803 _vsock_test_ctx: TestContext, 804 pkt: VsockPacket, 805 conn: VsockConnection<TestStream>, 806 } 807 808 impl CsmTestContext { 809 fn new_established() -> Self { 810 Self::new(ConnState::Established) 811 } 812 813 fn new(conn_state: ConnState) -> Self { 814 let vsock_test_ctx = TestContext::new(); 815 let mut handler_ctx = vsock_test_ctx.create_epoll_handler_context(); 816 let stream = TestStream::new(); 817 let mut pkt = VsockPacket::from_rx_virtq_head( 818 &mut handler_ctx.handler.queues[0] 819 .iter(&vsock_test_ctx.mem) 820 .unwrap() 821 .next() 822 .unwrap(), 823 None, 824 ) 825 .unwrap(); 826 let conn = match conn_state { 827 ConnState::PeerInit => VsockConnection::<TestStream>::new_peer_init( 828 stream, 829 LOCAL_CID, 830 PEER_CID, 831 LOCAL_PORT, 832 PEER_PORT, 833 PEER_BUF_ALLOC, 834 ), 835 ConnState::LocalInit => VsockConnection::<TestStream>::new_local_init( 836 stream, LOCAL_CID, PEER_CID, LOCAL_PORT, PEER_PORT, 837 ), 838 ConnState::Established => { 839 let mut conn = VsockConnection::<TestStream>::new_peer_init( 840 stream, 841 LOCAL_CID, 842 PEER_CID, 843 LOCAL_PORT, 844 PEER_PORT, 845 PEER_BUF_ALLOC, 846 ); 847 assert!(conn.has_pending_rx()); 848 conn.recv_pkt(&mut pkt).unwrap(); 849 assert_eq!(pkt.op(), uapi::VSOCK_OP_RESPONSE); 850 conn 851 } 852 other => panic!("invalid ctx state: {other:?}"), 853 }; 854 assert_eq!(conn.state, conn_state); 855 Self { 856 _vsock_test_ctx: vsock_test_ctx, 857 pkt, 858 conn, 859 } 860 } 861 862 fn set_stream(&mut self, stream: TestStream) { 863 self.conn.stream = stream; 864 } 865 866 fn set_peer_credit(&mut self, credit: u32) { 867 assert!(credit < self.conn.peer_buf_alloc); 868 self.conn.peer_fwd_cnt = Wrapping(0); 869 self.conn.rx_cnt = Wrapping(self.conn.peer_buf_alloc - credit); 870 assert_eq!(self.conn.peer_avail_credit(), credit as usize); 871 } 872 873 fn send(&mut self) { 874 self.conn.send_pkt(&self.pkt).unwrap(); 875 } 876 877 fn recv(&mut self) { 878 self.conn.recv_pkt(&mut self.pkt).unwrap(); 879 } 880 881 fn notify_epollin(&mut self) { 882 self.conn.notify(epoll::Events::EPOLLIN); 883 assert!(self.conn.has_pending_rx()); 884 } 885 886 fn notify_epollout(&mut self) { 887 self.conn.notify(epoll::Events::EPOLLOUT); 888 } 889 890 fn init_pkt(&mut self, op: u16, len: u32) -> &mut VsockPacket { 891 init_pkt(&mut self.pkt, op, len) 892 } 893 894 fn init_data_pkt(&mut self, data: &[u8]) -> &VsockPacket { 895 assert!(data.len() <= self.pkt.buf().unwrap().len()); 896 self.init_pkt(uapi::VSOCK_OP_RW, data.len() as u32); 897 self.pkt.buf_mut().unwrap()[..data.len()].copy_from_slice(data); 898 &self.pkt 899 } 900 } 901 902 #[test] 903 fn test_peer_request() { 904 let mut ctx = CsmTestContext::new(ConnState::PeerInit); 905 assert!(ctx.conn.has_pending_rx()); 906 ctx.recv(); 907 // For peer-initiated requests, our connection should always yield a vsock response packet, 908 // in order to establish the connection. 909 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 910 assert_eq!(ctx.pkt.src_cid(), LOCAL_CID); 911 assert_eq!(ctx.pkt.dst_cid(), PEER_CID); 912 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 913 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 914 assert_eq!(ctx.pkt.type_(), uapi::VSOCK_TYPE_STREAM); 915 assert_eq!(ctx.pkt.len(), 0); 916 // After yielding the response packet, the connection should have transitioned to the 917 // established state. 918 assert_eq!(ctx.conn.state, ConnState::Established); 919 } 920 921 #[test] 922 fn test_local_request() { 923 let mut ctx = CsmTestContext::new(ConnState::LocalInit); 924 // Host-initiated connections should first yield a connection request packet. 925 assert!(ctx.conn.has_pending_rx()); 926 // Before yielding the connection request packet, the timeout kill timer shouldn't be 927 // armed. 928 assert!(!ctx.conn.will_expire()); 929 ctx.recv(); 930 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_REQUEST); 931 // Since the request might time-out, the kill timer should now be armed. 932 assert!(ctx.conn.will_expire()); 933 assert!(!ctx.conn.has_expired()); 934 ctx.init_pkt(uapi::VSOCK_OP_RESPONSE, 0); 935 ctx.send(); 936 // Upon receiving a connection response, the connection should have transitioned to the 937 // established state, and the kill timer should've been disarmed. 938 assert_eq!(ctx.conn.state, ConnState::Established); 939 assert!(!ctx.conn.will_expire()); 940 } 941 942 #[test] 943 fn test_local_request_timeout() { 944 let mut ctx = CsmTestContext::new(ConnState::LocalInit); 945 ctx.recv(); 946 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_REQUEST); 947 assert!(ctx.conn.will_expire()); 948 assert!(!ctx.conn.has_expired()); 949 std::thread::sleep(std::time::Duration::from_millis( 950 defs::CONN_REQUEST_TIMEOUT_MS, 951 )); 952 assert!(ctx.conn.has_expired()); 953 } 954 955 #[test] 956 fn test_rx_data() { 957 let mut ctx = CsmTestContext::new_established(); 958 let data = &[1, 2, 3, 4]; 959 ctx.set_stream(TestStream::new_with_read_buf(data)); 960 assert_eq!(ctx.conn.get_polled_fd(), ctx.conn.stream.as_raw_fd()); 961 ctx.notify_epollin(); 962 ctx.recv(); 963 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 964 assert_eq!(ctx.pkt.len() as usize, data.len()); 965 assert_eq!(ctx.pkt.buf().unwrap()[..ctx.pkt.len() as usize], *data); 966 967 // There's no more data in the stream, so `recv_pkt` should yield `VsockError::NoData`. 968 match ctx.conn.recv_pkt(&mut ctx.pkt) { 969 Err(VsockError::NoData) => (), 970 other => panic!("{other:?}"), 971 } 972 973 // A recv attempt in an invalid state should yield an instant reset packet. 974 ctx.conn.state = ConnState::LocalClosed; 975 ctx.notify_epollin(); 976 ctx.recv(); 977 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 978 } 979 980 #[test] 981 fn test_local_close() { 982 let mut ctx = CsmTestContext::new_established(); 983 let mut stream = TestStream::new(); 984 stream.read_state = StreamState::Closed; 985 ctx.set_stream(stream); 986 ctx.notify_epollin(); 987 ctx.recv(); 988 // When the host-side stream is closed, we can neither send not receive any more data. 989 // Therefore, the vsock shutdown packet that we'll deliver to the guest must contain both 990 // the no-more-send and the no-more-recv indications. 991 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN); 992 assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND, 0); 993 assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV, 0); 994 995 // The kill timer should now be armed. 996 assert!(ctx.conn.will_expire()); 997 assert!( 998 ctx.conn.expiry().unwrap() 999 < Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS) 1000 ); 1001 } 1002 1003 #[test] 1004 fn test_peer_close() { 1005 // Test that send/recv shutdown indications are handled correctly. 1006 // I.e. once set, an indication cannot be reset. 1007 { 1008 let mut ctx = CsmTestContext::new_established(); 1009 1010 ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0) 1011 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_RCV); 1012 ctx.send(); 1013 assert_eq!(ctx.conn.state, ConnState::PeerClosed(true, false)); 1014 1015 // Attempting to reset the no-more-recv indication should not work 1016 // (we are only setting the no-more-send indication here). 1017 ctx.pkt.set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_SEND); 1018 ctx.send(); 1019 assert_eq!(ctx.conn.state, ConnState::PeerClosed(true, true)); 1020 } 1021 1022 // Test case: 1023 // - reading data from a no-more-send connection should work; and 1024 // - writing data should have no effect. 1025 { 1026 let data = &[1, 2, 3, 4]; 1027 let mut ctx = CsmTestContext::new_established(); 1028 ctx.set_stream(TestStream::new_with_read_buf(data)); 1029 ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0) 1030 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_SEND); 1031 ctx.send(); 1032 ctx.notify_epollin(); 1033 ctx.recv(); 1034 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 1035 assert_eq!(&ctx.pkt.buf().unwrap()[..ctx.pkt.len() as usize], data); 1036 1037 ctx.init_data_pkt(data); 1038 ctx.send(); 1039 assert_eq!(ctx.conn.stream.write_buf.len(), 0); 1040 assert!(ctx.conn.tx_buf.is_empty()); 1041 } 1042 1043 // Test case: 1044 // - writing data to a no-more-recv connection should work; and 1045 // - attempting to read data from it should yield an RST packet. 1046 { 1047 let mut ctx = CsmTestContext::new_established(); 1048 ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0) 1049 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_RCV); 1050 ctx.send(); 1051 let data = &[1, 2, 3, 4]; 1052 ctx.init_data_pkt(data); 1053 ctx.send(); 1054 assert_eq!(ctx.conn.stream.write_buf, data.to_vec()); 1055 1056 ctx.notify_epollin(); 1057 ctx.recv(); 1058 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1059 } 1060 1061 // Test case: setting both no-more-send and no-more-recv indications should have the 1062 // connection confirm termination (i.e. yield an RST). 1063 { 1064 let mut ctx = CsmTestContext::new_established(); 1065 ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0) 1066 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_RCV | uapi::VSOCK_FLAGS_SHUTDOWN_SEND); 1067 ctx.send(); 1068 assert!(ctx.conn.has_pending_rx()); 1069 ctx.recv(); 1070 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1071 } 1072 } 1073 1074 #[test] 1075 fn test_local_read_error() { 1076 let mut ctx = CsmTestContext::new_established(); 1077 let mut stream = TestStream::new(); 1078 stream.read_state = StreamState::Error(ErrorKind::PermissionDenied); 1079 ctx.set_stream(stream); 1080 ctx.notify_epollin(); 1081 ctx.recv(); 1082 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1083 } 1084 1085 #[test] 1086 fn test_credit_request_to_peer() { 1087 let mut ctx = CsmTestContext::new_established(); 1088 ctx.set_peer_credit(0); 1089 ctx.notify_epollin(); 1090 ctx.recv(); 1091 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_CREDIT_REQUEST); 1092 } 1093 1094 #[test] 1095 fn test_credit_request_from_peer() { 1096 let mut ctx = CsmTestContext::new_established(); 1097 ctx.init_pkt(uapi::VSOCK_OP_CREDIT_REQUEST, 0); 1098 ctx.send(); 1099 assert!(ctx.conn.has_pending_rx()); 1100 ctx.recv(); 1101 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_CREDIT_UPDATE); 1102 assert_eq!(ctx.pkt.buf_alloc(), csm_defs::CONN_TX_BUF_SIZE); 1103 assert_eq!(ctx.pkt.fwd_cnt(), ctx.conn.fwd_cnt.0); 1104 } 1105 1106 #[test] 1107 fn test_credit_update_to_peer() { 1108 let mut ctx = CsmTestContext::new_established(); 1109 1110 // Force a stale state, where the peer hasn't been updated on our credit situation. 1111 ctx.conn.last_fwd_cnt_to_peer = Wrapping(0); 1112 1113 // Since a credit update token is sent when the fwd_cnt value exceeds 1114 // CONN_TX_BUF_SIZE - CONN_CREDIT_UPDATE_THRESHOLD, we initialize 1115 // fwd_cnt at 6 bytes below the threshold. 1116 let initial_fwd_cnt = 1117 csm_defs::CONN_TX_BUF_SIZE - csm_defs::CONN_CREDIT_UPDATE_THRESHOLD - 6; 1118 ctx.conn.fwd_cnt = Wrapping(initial_fwd_cnt); 1119 1120 // Use a 4-byte packet for triggering the credit update threshold. 1121 let data = &[1, 2, 3, 4]; 1122 1123 // Check that there is no pending RX. 1124 ctx.init_data_pkt(data); 1125 ctx.send(); 1126 assert!(!ctx.conn.has_pending_rx()); 1127 1128 // Send a packet again. 1129 ctx.init_data_pkt(data); 1130 ctx.send(); 1131 1132 // The CSM should now have a credit update available for the peer. 1133 assert!(ctx.conn.has_pending_rx()); 1134 ctx.recv(); 1135 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_CREDIT_UPDATE); 1136 assert_eq!(ctx.pkt.fwd_cnt(), initial_fwd_cnt + data.len() as u32 * 2); 1137 assert_eq!(ctx.conn.fwd_cnt, ctx.conn.last_fwd_cnt_to_peer); 1138 } 1139 1140 #[test] 1141 fn test_tx_buffering() { 1142 // Test case: 1143 // - when writing to the backing stream would block, TX data should end up in the TX buf 1144 // - when the CSM is notified that it can write to the backing stream, it should flush 1145 // the TX buf. 1146 { 1147 let mut ctx = CsmTestContext::new_established(); 1148 1149 let mut stream = TestStream::new(); 1150 stream.write_state = StreamState::WouldBlock; 1151 ctx.set_stream(stream); 1152 1153 // Send some data through the connection. The backing stream is set to reject writes, 1154 // so the data should end up in the TX buffer. 1155 let data = &[1, 2, 3, 4]; 1156 ctx.init_data_pkt(data); 1157 ctx.send(); 1158 1159 // When there's data in the TX buffer, the connection should ask to be notified when it 1160 // can write to its backing stream. 1161 assert!(ctx 1162 .conn 1163 .get_polled_evset() 1164 .contains(epoll::Events::EPOLLOUT)); 1165 assert_eq!(ctx.conn.tx_buf.len(), data.len()); 1166 1167 // Unlock the write stream and notify the connection it can now write its buffered 1168 // data. 1169 ctx.set_stream(TestStream::new()); 1170 ctx.conn.notify(epoll::Events::EPOLLOUT); 1171 assert!(ctx.conn.tx_buf.is_empty()); 1172 assert_eq!(ctx.conn.stream.write_buf, data); 1173 } 1174 } 1175 1176 #[test] 1177 fn test_stream_write_error() { 1178 // Test case: sending a data packet to a broken / closed backing stream should kill it. 1179 { 1180 let mut ctx = CsmTestContext::new_established(); 1181 let mut stream = TestStream::new(); 1182 stream.write_state = StreamState::Closed; 1183 ctx.set_stream(stream); 1184 1185 let data = &[1, 2, 3, 4]; 1186 ctx.init_data_pkt(data); 1187 ctx.send(); 1188 1189 assert_eq!(ctx.conn.state, ConnState::Killed); 1190 assert!(ctx.conn.has_pending_rx()); 1191 ctx.recv(); 1192 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1193 } 1194 1195 // Test case: notifying a connection that it can flush its TX buffer to a broken stream 1196 // should kill the connection. 1197 { 1198 let mut ctx = CsmTestContext::new_established(); 1199 1200 let mut stream = TestStream::new(); 1201 stream.write_state = StreamState::WouldBlock; 1202 ctx.set_stream(stream); 1203 1204 // Send some data through the connection. The backing stream is set to reject writes, 1205 // so the data should end up in the TX buffer. 1206 let data = &[1, 2, 3, 4]; 1207 ctx.init_data_pkt(data); 1208 ctx.send(); 1209 1210 // Set the backing stream to error out on write. 1211 let mut stream = TestStream::new(); 1212 stream.write_state = StreamState::Closed; 1213 ctx.set_stream(stream); 1214 1215 assert!(ctx 1216 .conn 1217 .get_polled_evset() 1218 .contains(epoll::Events::EPOLLOUT)); 1219 ctx.notify_epollout(); 1220 assert_eq!(ctx.conn.state, ConnState::Killed); 1221 } 1222 } 1223 1224 #[test] 1225 fn test_peer_credit_misbehavior() { 1226 let mut ctx = CsmTestContext::new_established(); 1227 1228 let mut stream = TestStream::new(); 1229 stream.write_state = StreamState::WouldBlock; 1230 ctx.set_stream(stream); 1231 1232 // Fill up the TX buffer. 1233 let data = vec![0u8; ctx.pkt.buf().unwrap().len()]; 1234 ctx.init_data_pkt(data.as_slice()); 1235 for _i in 0..(csm_defs::CONN_TX_BUF_SIZE / data.len() as u32) { 1236 ctx.send(); 1237 } 1238 1239 // Then try to send more data. 1240 ctx.send(); 1241 1242 // The connection should've committed suicide. 1243 assert_eq!(ctx.conn.state, ConnState::Killed); 1244 assert!(ctx.conn.has_pending_rx()); 1245 ctx.recv(); 1246 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1247 } 1248 } 1249