1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 // 4 /// The main job of `VsockConnection` is to forward data traffic, back and forth, between a 5 /// guest-side AF_VSOCK socket and a host-side generic `Read + Write + AsRawFd` stream, while 6 /// also managing its internal state. 7 /// To that end, `VsockConnection` implements: 8 /// - `VsockChannel` for: 9 /// - moving data from the host stream to a guest-provided RX buffer, via `recv_pkt()`; and 10 /// - moving data from a guest-provided TX buffer to the host stream, via `send_pkt()`; and 11 /// - updating its internal state, by absorbing control packets (anything other than 12 /// VSOCK_OP_RW). 13 /// - `VsockEpollListener` for getting notified about the availability of data or free buffer 14 /// space at the host stream. 15 /// 16 /// Note: there is a certain asymmetry to the RX and TX data flows: 17 /// - RX transfers do not need any data buffering, since data is read straight from the 18 /// host stream and into the guest-provided RX buffer; 19 /// - TX transfers may require some data to be buffered by `VsockConnection`, if the host 20 /// peer can't keep up with reading the data that we're writing. This is because, once 21 /// the guest driver provides some data in a virtio TX buffer, the vsock device must 22 /// consume it. If that data can't be forwarded straight to the host stream, we'll 23 /// have to store it in a buffer (and flush it at a later time). Vsock flow control 24 /// ensures that our TX buffer doesn't overflow. 25 /// 26 // The code in this file is best read with a fresh memory of the vsock protocol inner-workings. 27 // To help with that, here is a 28 // 29 // Short primer on the vsock protocol 30 // ---------------------------------- 31 // 32 // 1. Establishing a connection 33 // A vsock connection is considered established after a two-way handshake: 34 // - the initiating peer sends a connection request packet (`hdr.op` == VSOCK_OP_REQUEST); 35 // then 36 // - the listening peer sends back a connection response packet (`hdr.op` == 37 // VSOCK_OP_RESPONSE). 38 // 39 // 2. Terminating a connection 40 // When a peer wants to shut down an established connection, it sends a VSOCK_OP_SHUTDOWN 41 // packet. Two header flags are used with VSOCK_OP_SHUTDOWN, indicating the sender's 42 // intention: 43 // - VSOCK_FLAGS_SHUTDOWN_RCV: the sender will receive no more data for this connection; and 44 // - VSOCK_FLAGS_SHUTDOWN_SEND: the sender will send no more data for this connection. 45 // After a shutdown packet, the receiving peer will have some protocol-undefined time to 46 // flush its buffers, and then forcefully terminate the connection by sending back an RST 47 // packet. If the shutdown-initiating peer doesn't receive this RST packet during a timeout 48 // period, it will send one itself, thus terminating the connection. 49 // Note: a peer can send more than one VSOCK_OP_SHUTDOWN packets. However, read/write 50 // indications cannot be undone. E.g. once a "no-more-sending" promise was made, it 51 // cannot be taken back. That is, `hdr.flags` will be ORed between subsequent 52 // VSOCK_OP_SHUTDOWN packets. 53 // 54 // 3. Flow control 55 // Before sending a data packet (VSOCK_OP_RW), the sender must make sure that the receiver 56 // has enough free buffer space to store that data. If this condition is not respected, the 57 // receiving peer's behavior is undefined. In this implementation, we forcefully terminate 58 // the connection by sending back a VSOCK_OP_RST packet. 59 // Note: all buffer space information is computed and stored on a per-connection basis. 60 // Peers keep each other informed about the free buffer space they have by filling in two 61 // packet header members with each packet they send: 62 // - `hdr.buf_alloc`: the total buffer space the peer has allocated for receiving data; and 63 // - `hdr.fwd_cnt`: the total number of bytes the peer has successfully flushed out of its 64 // buffer. 65 // One can figure out how much space its peer has available in its buffer by inspecting the 66 // difference between how much it has sent to the peer and how much the peer has flushed out 67 // (i.e. "forwarded", in the vsock spec terminology): 68 // `peer_free = peer_buf_alloc - (total_bytes_sent_to_peer - peer_fwd_cnt)`. 69 // Note: the above requires that peers constantly keep each other informed on their buffer 70 // space situation. However, since there are no receipt acknowledgement packets 71 // defined for the vsock protocol, packet flow can often be unidirectional (just one 72 // peer sending data to another), so the sender's information about the receiver's 73 // buffer space can get quickly outdated. The vsock protocol defines two solutions to 74 // this problem: 75 // 1. The sender can explicitly ask for a buffer space (i.e. "credit") update from its 76 // peer, via a VSOCK_OP_CREDIT_REQUEST packet, to which it will get a 77 // VSOCK_OP_CREDIT_UPDATE response (or any response will do, really, since credit 78 // information must be included in any packet); 79 // 2. The receiver can be proactive, and send VSOCK_OP_CREDIT_UPDATE packet, whenever 80 // it thinks its peer's information is out of date. 81 // Our implementation uses the proactive approach. 82 // 83 use std::io::{ErrorKind, Read, Write}; 84 use std::num::Wrapping; 85 use std::os::unix::io::{AsRawFd, RawFd}; 86 use std::time::{Duration, Instant}; 87 88 use super::super::defs::uapi; 89 use super::super::packet::VsockPacket; 90 use super::super::{Result as VsockResult, VsockChannel, VsockEpollListener, VsockError}; 91 use super::defs; 92 use super::txbuf::TxBuf; 93 use super::{ConnState, Error, PendingRx, PendingRxSet, Result}; 94 95 /// A self-managing connection object, that handles communication between a guest-side AF_VSOCK 96 /// socket and a host-side `Read + Write + AsRawFd` stream. 97 /// 98 pub struct VsockConnection<S: Read + Write + AsRawFd> { 99 /// The current connection state. 100 state: ConnState, 101 /// The local CID. Most of the time this will be the constant `2` (the vsock host CID). 102 local_cid: u64, 103 /// The peer (guest) CID. 104 peer_cid: u64, 105 /// The local (host) port. 106 local_port: u32, 107 /// The peer (guest) port. 108 peer_port: u32, 109 /// The (connected) host-side stream. 110 stream: S, 111 /// The TX buffer for this connection. 112 tx_buf: TxBuf, 113 /// Total number of bytes that have been successfully written to `self.stream`, either 114 /// directly, or flushed from `self.tx_buf`. 115 fwd_cnt: Wrapping<u32>, 116 /// The amount of buffer space that the peer (guest) has allocated for this connection. 117 peer_buf_alloc: u32, 118 /// The total number of bytes that the peer has forwarded away. 119 peer_fwd_cnt: Wrapping<u32>, 120 /// The total number of bytes sent to the peer (guest vsock driver) 121 rx_cnt: Wrapping<u32>, 122 /// Our `self.fwd_cnt`, as last sent to the peer. This is used to provide proactive credit 123 /// updates, and let the peer know it's OK to send more data. 124 last_fwd_cnt_to_peer: Wrapping<u32>, 125 /// The set of pending RX packet indications that `recv_pkt()` will use to fill in a 126 /// packet for the peer (guest). 127 pending_rx: PendingRxSet, 128 /// Instant when this connection should be scheduled for immediate termination, due to some 129 /// timeout condition having been fulfilled. 130 expiry: Option<Instant>, 131 } 132 133 impl<S> VsockChannel for VsockConnection<S> 134 where 135 S: Read + Write + AsRawFd, 136 { 137 /// Fill in a vsock packet, to be delivered to our peer (the guest driver). 138 /// 139 /// As per the `VsockChannel` trait, this should only be called when there is data to be 140 /// fetched from the channel (i.e. `has_pending_rx()` is true). Otherwise, it will error 141 /// out with `VsockError::NoData`. 142 /// Pending RX indications are set by other mutable actions performed on the channel. For 143 /// instance, `send_pkt()` could set an Rst indication, if called with a VSOCK_OP_SHUTDOWN 144 /// packet, or `notify()` could set a Rw indication (a data packet can be fetched from the 145 /// channel), if data was ready to be read from the host stream. 146 /// 147 /// Returns: 148 /// - `Ok(())`: the packet has been successfully filled in and is ready for delivery; 149 /// - `Err(VsockError::NoData)`: there was no data available with which to fill in the 150 /// packet; 151 /// - `Err(VsockError::PktBufMissing)`: the packet would've been filled in with data, but 152 /// it is missing the data buffer. 153 /// 154 fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> { 155 // Perform some generic initialization that is the same for any packet operation (e.g. 156 // source, destination, credit, etc). 157 self.init_pkt(pkt); 158 159 // If forceful termination is pending, there's no point in checking for anything else. 160 // It's dead, Jim. 161 if self.pending_rx.remove(PendingRx::Rst) { 162 pkt.set_op(uapi::VSOCK_OP_RST); 163 return Ok(()); 164 } 165 166 // Next up: if we're due a connection confirmation, that's all we need to know to fill 167 // in this packet. 168 if self.pending_rx.remove(PendingRx::Response) { 169 self.state = ConnState::Established; 170 pkt.set_op(uapi::VSOCK_OP_RESPONSE); 171 return Ok(()); 172 } 173 174 // Same thing goes for locally-initiated connections that need to yield a connection 175 // request. 176 if self.pending_rx.remove(PendingRx::Request) { 177 self.expiry = 178 Some(Instant::now() + Duration::from_millis(defs::CONN_REQUEST_TIMEOUT_MS)); 179 pkt.set_op(uapi::VSOCK_OP_REQUEST); 180 return Ok(()); 181 } 182 183 if self.pending_rx.remove(PendingRx::Rw) { 184 // We're due to produce a data packet, by reading the data from the host-side 185 // Unix socket. 186 187 match self.state { 188 // A data packet is only valid for established connections, and connections for 189 // which our peer has initiated a graceful shutdown, but can still receive data. 190 ConnState::Established | ConnState::PeerClosed(false, _) => (), 191 _ => { 192 // Any other connection state is invalid at this point, and we need to kill it 193 // with fire. 194 pkt.set_op(uapi::VSOCK_OP_RST); 195 return Ok(()); 196 } 197 } 198 199 // Oh wait, before we start bringing in the big data, can our peer handle receiving so 200 // much bytes goodness? 201 if self.need_credit_update_from_peer() { 202 self.last_fwd_cnt_to_peer = self.fwd_cnt; 203 pkt.set_op(uapi::VSOCK_OP_CREDIT_REQUEST); 204 return Ok(()); 205 } 206 207 let buf = pkt.buf_mut().ok_or(VsockError::PktBufMissing)?; 208 209 // The maximum amount of data we can read in is limited by both the RX buffer size and 210 // the peer available buffer space. 211 let max_len = std::cmp::min(buf.len(), self.peer_avail_credit()); 212 213 // Read data from the stream straight to the RX buffer, for maximum throughput. 214 match self.stream.read(&mut buf[..max_len]) { 215 Ok(read_cnt) => { 216 if read_cnt == 0 { 217 // A 0-length read means the host stream was closed down. In that case, 218 // we'll ask our peer to shut down the connection. We can neither send nor 219 // receive any more data. 220 self.state = ConnState::LocalClosed; 221 self.expiry = Some( 222 Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS), 223 ); 224 pkt.set_op(uapi::VSOCK_OP_SHUTDOWN) 225 .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV) 226 .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND); 227 } else { 228 // On a successful data read, we fill in the packet with the RW op, and 229 // length of the read data. 230 pkt.set_op(uapi::VSOCK_OP_RW).set_len(read_cnt as u32); 231 } 232 self.rx_cnt += Wrapping(pkt.len()); 233 self.last_fwd_cnt_to_peer = self.fwd_cnt; 234 return Ok(()); 235 } 236 Err(err) if err.kind() == ErrorKind::WouldBlock => { 237 // This shouldn't actually happen (receiving EWOULDBLOCK after EPOLLIN), but 238 // apparently it does, so we need to handle it gracefully. 239 warn!( 240 "vsock: unexpected EWOULDBLOCK while reading from backing stream: \ 241 lp={}, pp={}, err={:?}", 242 self.local_port, self.peer_port, err 243 ); 244 } 245 Err(err) => { 246 // We are not expecting any other errors when reading from the underlying 247 // stream. If any show up, we'll immediately kill this connection. 248 error!( 249 "vsock: error reading from backing stream: lp={}, pp={}, err={:?}", 250 self.local_port, self.peer_port, err 251 ); 252 pkt.set_op(uapi::VSOCK_OP_RST); 253 self.last_fwd_cnt_to_peer = self.fwd_cnt; 254 return Ok(()); 255 } 256 }; 257 } 258 259 // A credit update is basically a no-op, so we should only waste a perfectly fine RX 260 // buffer on it if we really have nothing else to say, hence we check for this RX 261 // indication last. 262 if self.pending_rx.remove(PendingRx::CreditUpdate) && !self.has_pending_rx() { 263 pkt.set_op(uapi::VSOCK_OP_CREDIT_UPDATE); 264 self.last_fwd_cnt_to_peer = self.fwd_cnt; 265 return Ok(()); 266 } 267 268 // We've already checked for all conditions that would have produced a packet, so 269 // if we got to here, we don't know how to yield one. 270 Err(VsockError::NoData) 271 } 272 273 /// Deliver a guest-generated packet to this connection. 274 /// 275 /// This forwards the data in RW packets to the host stream, and absorbs control packets, 276 /// using them to manage the internal connection state. 277 /// 278 /// Returns: 279 /// always `Ok(())`: the packet has been consumed; 280 /// 281 fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> { 282 // Update the peer credit information. 283 self.peer_buf_alloc = pkt.buf_alloc(); 284 self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt()); 285 286 match self.state { 287 // Most frequent case: this is an established connection that needs to forward some 288 // data to the host stream. Also works for a connection that has begun shutting 289 // down, but the peer still has some data to send. 290 ConnState::Established | ConnState::PeerClosed(_, false) 291 if pkt.op() == uapi::VSOCK_OP_RW => 292 { 293 if pkt.buf().is_none() { 294 info!( 295 "vsock: dropping empty data packet from guest (lp={}, pp={}", 296 self.local_port, self.peer_port 297 ); 298 return Ok(()); 299 } 300 301 // Unwrapping here is safe, since we just checked `pkt.buf()` above. 302 let buf_slice = &pkt.buf().unwrap()[..(pkt.len() as usize)]; 303 if let Err(err) = self.send_bytes(buf_slice) { 304 // If we can't write to the host stream, that's an unrecoverable error, so 305 // we'll terminate this connection. 306 warn!( 307 "vsock: error writing to local stream (lp={}, pp={}): {:?}", 308 self.local_port, self.peer_port, err 309 ); 310 self.kill(); 311 return Ok(()); 312 } 313 314 // We might've just consumed some data. If that's the case, we might need to 315 // update the peer on our buffer space situation, so that it can keep sending 316 // data packets our way. 317 if self.peer_needs_credit_update() { 318 self.pending_rx.insert(PendingRx::CreditUpdate); 319 } 320 } 321 322 // Next up: receiving a response / confirmation for a host-initiated connection. 323 // We'll move to an Established state, and pass on the good news through the host 324 // stream. 325 ConnState::LocalInit if pkt.op() == uapi::VSOCK_OP_RESPONSE => { 326 self.expiry = None; 327 self.state = ConnState::Established; 328 } 329 330 // The peer wants to shut down an established connection. If they have nothing 331 // more to send nor receive, and we don't have to wait to drain our TX buffer, we 332 // can schedule an RST packet (to terminate the connection on the next recv call). 333 // Otherwise, we'll arm the kill timer. 334 ConnState::Established if pkt.op() == uapi::VSOCK_OP_SHUTDOWN => { 335 let recv_off = pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV != 0; 336 let send_off = pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND != 0; 337 self.state = ConnState::PeerClosed(recv_off, send_off); 338 if recv_off && send_off { 339 if self.tx_buf.is_empty() { 340 self.pending_rx.insert(PendingRx::Rst); 341 } else { 342 self.expiry = Some( 343 Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS), 344 ); 345 } 346 } 347 } 348 349 // The peer wants to update a shutdown request, with more receive/send indications. 350 // The same logic as above applies. 351 ConnState::PeerClosed(ref mut recv_off, ref mut send_off) 352 if pkt.op() == uapi::VSOCK_OP_SHUTDOWN => 353 { 354 *recv_off = *recv_off || (pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV != 0); 355 *send_off = *send_off || (pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND != 0); 356 if *recv_off && *send_off && self.tx_buf.is_empty() { 357 self.pending_rx.insert(PendingRx::Rst); 358 } 359 } 360 361 // A credit update from our peer is valid only in a state which allows data 362 // transfer towards the peer. 363 ConnState::Established | ConnState::PeerInit | ConnState::PeerClosed(false, _) 364 if pkt.op() == uapi::VSOCK_OP_CREDIT_UPDATE => 365 { 366 // Nothing to do here; we've already updated peer credit. 367 } 368 369 // A credit request from our peer is valid only in a state which allows data 370 // transfer from the peer. We'll respond with a credit update packet. 371 ConnState::Established | ConnState::PeerInit | ConnState::PeerClosed(_, false) 372 if pkt.op() == uapi::VSOCK_OP_CREDIT_REQUEST => 373 { 374 self.pending_rx.insert(PendingRx::CreditUpdate); 375 } 376 377 _ => { 378 debug!( 379 "vsock: dropping invalid TX pkt for connection: state={:?}, pkt.hdr={:?}", 380 self.state, 381 pkt.hdr() 382 ); 383 } 384 }; 385 386 Ok(()) 387 } 388 389 /// Check if the connection has any pending packet addressed to the peer. 390 /// 391 fn has_pending_rx(&self) -> bool { 392 !self.pending_rx.is_empty() 393 } 394 } 395 396 impl<S> VsockEpollListener for VsockConnection<S> 397 where 398 S: Read + Write + AsRawFd, 399 { 400 /// Get the file descriptor that this connection wants polled. 401 /// 402 /// The connection is interested in being notified about EPOLLIN / EPOLLOUT events on the 403 /// host stream. 404 /// 405 fn get_polled_fd(&self) -> RawFd { 406 self.stream.as_raw_fd() 407 } 408 409 /// Get the event set that this connection is interested in. 410 /// 411 /// A connection will want to be notified when: 412 /// - data is available to be read from the host stream, so that it can store an RW pending 413 /// RX indication; and 414 /// - data can be written to the host stream, and the TX buffer needs to be flushed. 415 /// 416 fn get_polled_evset(&self) -> epoll::Events { 417 let mut evset = epoll::Events::empty(); 418 if !self.tx_buf.is_empty() { 419 // There's data waiting in the TX buffer, so we are interested in being notified 420 // when writing to the host stream wouldn't block. 421 evset.insert(epoll::Events::EPOLLOUT); 422 } 423 // We're generally interested in being notified when data can be read from the host 424 // stream, unless we're in a state which doesn't allow moving data from host to guest. 425 match self.state { 426 ConnState::Killed | ConnState::LocalClosed | ConnState::PeerClosed(true, _) => (), 427 _ if self.need_credit_update_from_peer() => (), 428 _ => evset.insert(epoll::Events::EPOLLIN), 429 } 430 evset 431 } 432 433 /// Notify the connection about an event (or set of events) that it was interested in. 434 /// 435 fn notify(&mut self, evset: epoll::Events) { 436 if evset.contains(epoll::Events::EPOLLIN) { 437 // Data can be read from the host stream. Setting a Rw pending indication, so that 438 // the muxer will know to call `recv_pkt()` later. 439 self.pending_rx.insert(PendingRx::Rw); 440 } 441 442 if evset.contains(epoll::Events::EPOLLOUT) { 443 // Data can be written to the host stream. Time to flush out the TX buffer. 444 // 445 if self.tx_buf.is_empty() { 446 info!("vsock: connection received unexpected EPOLLOUT event"); 447 return; 448 } 449 let flushed = self 450 .tx_buf 451 .flush_to(&mut self.stream) 452 .unwrap_or_else(|err| { 453 warn!( 454 "vsock: error flushing TX buf for (lp={}, pp={}): {:?}", 455 self.local_port, self.peer_port, err 456 ); 457 match err { 458 Error::TxBufFlush(inner) if inner.kind() == ErrorKind::WouldBlock => { 459 // This should never happen (EWOULDBLOCK after EPOLLOUT), but 460 // it does, so let's absorb it. 461 } 462 _ => self.kill(), 463 }; 464 0 465 }); 466 self.fwd_cnt += Wrapping(flushed as u32); 467 468 // If this connection was shutting down, but is waiting to drain the TX buffer 469 // before forceful termination, the wait might be over. 470 if self.state == ConnState::PeerClosed(true, true) && self.tx_buf.is_empty() { 471 self.pending_rx.insert(PendingRx::Rst); 472 } else if self.peer_needs_credit_update() { 473 // If we've freed up some more buffer space, we may need to let the peer know it 474 // can safely send more data our way. 475 self.pending_rx.insert(PendingRx::CreditUpdate); 476 } 477 } 478 } 479 } 480 481 impl<S> VsockConnection<S> 482 where 483 S: Read + Write + AsRawFd, 484 { 485 /// Create a new guest-initiated connection object. 486 /// 487 pub fn new_peer_init( 488 stream: S, 489 local_cid: u64, 490 peer_cid: u64, 491 local_port: u32, 492 peer_port: u32, 493 peer_buf_alloc: u32, 494 ) -> Self { 495 Self { 496 local_cid, 497 peer_cid, 498 local_port, 499 peer_port, 500 stream, 501 state: ConnState::PeerInit, 502 tx_buf: TxBuf::new(), 503 fwd_cnt: Wrapping(0), 504 peer_buf_alloc, 505 peer_fwd_cnt: Wrapping(0), 506 rx_cnt: Wrapping(0), 507 last_fwd_cnt_to_peer: Wrapping(0), 508 pending_rx: PendingRxSet::from(PendingRx::Response), 509 expiry: None, 510 } 511 } 512 513 /// Create a new host-initiated connection object. 514 /// 515 pub fn new_local_init( 516 stream: S, 517 local_cid: u64, 518 peer_cid: u64, 519 local_port: u32, 520 peer_port: u32, 521 ) -> Self { 522 Self { 523 local_cid, 524 peer_cid, 525 local_port, 526 peer_port, 527 stream, 528 state: ConnState::LocalInit, 529 tx_buf: TxBuf::new(), 530 fwd_cnt: Wrapping(0), 531 peer_buf_alloc: 0, 532 peer_fwd_cnt: Wrapping(0), 533 rx_cnt: Wrapping(0), 534 last_fwd_cnt_to_peer: Wrapping(0), 535 pending_rx: PendingRxSet::from(PendingRx::Request), 536 expiry: None, 537 } 538 } 539 540 /// Check if there is an expiry (kill) timer set for this connection, sometime in the 541 /// future. 542 /// 543 pub fn will_expire(&self) -> bool { 544 match self.expiry { 545 None => false, 546 Some(t) => t > Instant::now(), 547 } 548 } 549 550 /// Check if this connection needs to be scheduled for forceful termination, due to its 551 /// kill timer having expired. 552 /// 553 pub fn has_expired(&self) -> bool { 554 match self.expiry { 555 None => false, 556 Some(t) => t <= Instant::now(), 557 } 558 } 559 560 /// Get the kill timer value, if one is set. 561 /// 562 pub fn expiry(&self) -> Option<Instant> { 563 self.expiry 564 } 565 566 /// Schedule the connection to be forcefully terminated ASAP (i.e. the next time the 567 /// connection is asked to yield a packet, via `recv_pkt()`). 568 /// 569 pub fn kill(&mut self) { 570 self.state = ConnState::Killed; 571 self.pending_rx.insert(PendingRx::Rst); 572 } 573 574 /// Return the connections state. 575 /// 576 pub fn state(&self) -> ConnState { 577 self.state 578 } 579 580 /// Send some raw, untracked, data straight to the underlying connected stream. 581 /// Returns: number of bytes written, or the error describing the write failure. 582 /// 583 /// Warning: this will bypass the connection state machine and write directly to the 584 /// underlying stream. No account of this write is kept, which includes bypassing 585 /// vsock flow control. 586 /// 587 pub fn send_bytes_raw(&mut self, buf: &[u8]) -> Result<usize> { 588 self.stream.write(buf).map_err(Error::StreamWrite) 589 } 590 591 /// Send some raw data (a byte-slice) to the host stream. 592 /// 593 /// Raw data can either be sent straight to the host stream, or to our TX buffer, if the 594 /// former fails. 595 /// 596 fn send_bytes(&mut self, buf: &[u8]) -> Result<()> { 597 // If there is data in the TX buffer, that means we're already registered for EPOLLOUT 598 // events on the underlying stream. Therefore, there's no point in attempting a write 599 // at this point. `self.notify()` will get called when EPOLLOUT arrives, and it will 600 // attempt to drain the TX buffer then. 601 if !self.tx_buf.is_empty() { 602 return self.tx_buf.push(buf); 603 } 604 605 // The TX buffer is empty, so we can try to write straight to the host stream. 606 let written = match self.stream.write(buf) { 607 Ok(cnt) => cnt, 608 Err(e) => { 609 // Absorb any would-block errors, since we can always try again later. 610 if e.kind() == ErrorKind::WouldBlock { 611 0 612 } else { 613 // We don't know how to handle any other write error, so we'll send it up 614 // the call chain. 615 return Err(Error::StreamWrite(e)); 616 } 617 } 618 }; 619 // Move the "forwarded bytes" counter ahead by how much we were able to send out. 620 self.fwd_cnt += Wrapping(written as u32); 621 622 // If we couldn't write the whole slice, we'll need to push the remaining data to our 623 // buffer. 624 if written < buf.len() { 625 self.tx_buf.push(&buf[written..])?; 626 } 627 628 Ok(()) 629 } 630 631 /// Check if the credit information the peer has last received from us is outdated. 632 /// 633 fn peer_needs_credit_update(&self) -> bool { 634 let peer_seen_free_buf = 635 Wrapping(defs::CONN_TX_BUF_SIZE) - (self.fwd_cnt - self.last_fwd_cnt_to_peer); 636 peer_seen_free_buf < Wrapping(defs::CONN_CREDIT_UPDATE_THRESHOLD) 637 } 638 639 /// Check if we need to ask the peer for a credit update before sending any more data its 640 /// way. 641 /// 642 fn need_credit_update_from_peer(&self) -> bool { 643 self.peer_avail_credit() == 0 644 } 645 646 /// Get the maximum number of bytes that we can send to our peer, without overflowing its 647 /// buffer. 648 /// 649 fn peer_avail_credit(&self) -> usize { 650 (Wrapping(self.peer_buf_alloc as u32) - (self.rx_cnt - self.peer_fwd_cnt)).0 as usize 651 } 652 653 /// Prepare a packet header for transmission to our peer. 654 /// 655 fn init_pkt<'a>(&self, pkt: &'a mut VsockPacket) -> &'a mut VsockPacket { 656 // Make sure the header is zeroed-out first. 657 // This looks sub-optimal, but it is actually optimized-out in the compiled code to be 658 // faster than a memset(). 659 for b in pkt.hdr_mut() { 660 *b = 0; 661 } 662 663 pkt.set_src_cid(self.local_cid) 664 .set_dst_cid(self.peer_cid) 665 .set_src_port(self.local_port) 666 .set_dst_port(self.peer_port) 667 .set_type(uapi::VSOCK_TYPE_STREAM) 668 .set_buf_alloc(defs::CONN_TX_BUF_SIZE) 669 .set_fwd_cnt(self.fwd_cnt.0) 670 } 671 } 672 673 #[cfg(test)] 674 mod tests { 675 use libc::EFD_NONBLOCK; 676 677 use std::io::{Error as IoError, ErrorKind, Read, Result as IoResult, Write}; 678 use std::os::unix::io::RawFd; 679 use std::time::{Duration, Instant}; 680 use vmm_sys_util::eventfd::EventFd; 681 682 use super::super::super::defs::uapi; 683 use super::super::super::tests::TestContext; 684 use super::super::defs as csm_defs; 685 use super::*; 686 687 const LOCAL_CID: u64 = 2; 688 const PEER_CID: u64 = 3; 689 const LOCAL_PORT: u32 = 1002; 690 const PEER_PORT: u32 = 1003; 691 const PEER_BUF_ALLOC: u32 = 64 * 1024; 692 693 enum StreamState { 694 Closed, 695 Error(ErrorKind), 696 Ready, 697 WouldBlock, 698 } 699 700 struct TestStream { 701 fd: EventFd, 702 read_buf: Vec<u8>, 703 read_state: StreamState, 704 write_buf: Vec<u8>, 705 write_state: StreamState, 706 } 707 impl TestStream { 708 fn new() -> Self { 709 Self { 710 fd: EventFd::new(EFD_NONBLOCK).unwrap(), 711 read_state: StreamState::Ready, 712 write_state: StreamState::Ready, 713 read_buf: Vec::new(), 714 write_buf: Vec::new(), 715 } 716 } 717 fn new_with_read_buf(buf: &[u8]) -> Self { 718 let mut stream = Self::new(); 719 stream.read_buf = buf.to_vec(); 720 stream 721 } 722 } 723 724 impl AsRawFd for TestStream { 725 fn as_raw_fd(&self) -> RawFd { 726 self.fd.as_raw_fd() 727 } 728 } 729 730 impl Read for TestStream { 731 fn read(&mut self, data: &mut [u8]) -> IoResult<usize> { 732 match self.read_state { 733 StreamState::Closed => Ok(0), 734 StreamState::Error(kind) => Err(IoError::new(kind, "whatevs")), 735 StreamState::Ready => { 736 if self.read_buf.is_empty() { 737 return Err(IoError::new(ErrorKind::WouldBlock, "EAGAIN")); 738 } 739 let len = std::cmp::min(data.len(), self.read_buf.len()); 740 assert_ne!(len, 0); 741 data[..len].copy_from_slice(&self.read_buf[..len]); 742 self.read_buf = self.read_buf.split_off(len); 743 Ok(len) 744 } 745 StreamState::WouldBlock => Err(IoError::new(ErrorKind::WouldBlock, "EAGAIN")), 746 } 747 } 748 } 749 750 impl Write for TestStream { 751 fn write(&mut self, data: &[u8]) -> IoResult<usize> { 752 match self.write_state { 753 StreamState::Closed => Err(IoError::new(ErrorKind::BrokenPipe, "EPIPE")), 754 StreamState::Error(kind) => Err(IoError::new(kind, "whatevs")), 755 StreamState::Ready => { 756 self.write_buf.extend_from_slice(data); 757 Ok(data.len()) 758 } 759 StreamState::WouldBlock => Err(IoError::new(ErrorKind::WouldBlock, "EAGAIN")), 760 } 761 } 762 fn flush(&mut self) -> IoResult<()> { 763 Ok(()) 764 } 765 } 766 767 impl<S> VsockConnection<S> 768 where 769 S: Read + Write + AsRawFd, 770 { 771 /// Get the fwd_cnt value from the connection. 772 pub(crate) fn fwd_cnt(&self) -> Wrapping<u32> { 773 self.fwd_cnt 774 } 775 776 /// Forcefully insert a credit update flag. 777 pub(crate) fn insert_credit_update(&mut self) { 778 self.pending_rx.insert(PendingRx::CreditUpdate); 779 } 780 } 781 782 fn init_pkt(pkt: &mut VsockPacket, op: u16, len: u32) -> &mut VsockPacket { 783 for b in pkt.hdr_mut() { 784 *b = 0; 785 } 786 pkt.set_src_cid(PEER_CID) 787 .set_dst_cid(LOCAL_CID) 788 .set_src_port(PEER_PORT) 789 .set_dst_port(LOCAL_PORT) 790 .set_type(uapi::VSOCK_TYPE_STREAM) 791 .set_buf_alloc(PEER_BUF_ALLOC) 792 .set_op(op) 793 .set_len(len) 794 } 795 796 // This is the connection state machine test context: a helper struct to provide CSM testing 797 // primitives. A single `VsockPacket` object will be enough for our testing needs. We'll be 798 // using it for simulating both packet sends and packet receives. We need to keep the vsock 799 // testing context alive, since `VsockPacket` is just a pointer-wrapper over some data that 800 // resides in guest memory. The vsock test context owns the `GuestMemory` object, so we'll make 801 // it a member here, in order to make sure that guest memory outlives our testing packet. A 802 // single `VsockConnection` object will also suffice for our testing needs. We'll be using a 803 // specially crafted `Read + Write + AsRawFd` object as a backing stream, so that we can 804 // control the various error conditions that might arise. 805 struct CsmTestContext { 806 _vsock_test_ctx: TestContext, 807 pkt: VsockPacket, 808 conn: VsockConnection<TestStream>, 809 } 810 811 impl CsmTestContext { 812 fn new_established() -> Self { 813 Self::new(ConnState::Established) 814 } 815 816 fn new(conn_state: ConnState) -> Self { 817 let vsock_test_ctx = TestContext::new(); 818 let mut handler_ctx = vsock_test_ctx.create_epoll_handler_context(); 819 let stream = TestStream::new(); 820 let mut pkt = VsockPacket::from_rx_virtq_head( 821 &mut handler_ctx.handler.queues[0] 822 .iter() 823 .unwrap() 824 .next() 825 .unwrap(), 826 None, 827 ) 828 .unwrap(); 829 let conn = match conn_state { 830 ConnState::PeerInit => VsockConnection::<TestStream>::new_peer_init( 831 stream, 832 LOCAL_CID, 833 PEER_CID, 834 LOCAL_PORT, 835 PEER_PORT, 836 PEER_BUF_ALLOC, 837 ), 838 ConnState::LocalInit => VsockConnection::<TestStream>::new_local_init( 839 stream, LOCAL_CID, PEER_CID, LOCAL_PORT, PEER_PORT, 840 ), 841 ConnState::Established => { 842 let mut conn = VsockConnection::<TestStream>::new_peer_init( 843 stream, 844 LOCAL_CID, 845 PEER_CID, 846 LOCAL_PORT, 847 PEER_PORT, 848 PEER_BUF_ALLOC, 849 ); 850 assert!(conn.has_pending_rx()); 851 conn.recv_pkt(&mut pkt).unwrap(); 852 assert_eq!(pkt.op(), uapi::VSOCK_OP_RESPONSE); 853 conn 854 } 855 other => panic!("invalid ctx state: {:?}", other), 856 }; 857 assert_eq!(conn.state, conn_state); 858 Self { 859 _vsock_test_ctx: vsock_test_ctx, 860 pkt, 861 conn, 862 } 863 } 864 865 fn set_stream(&mut self, stream: TestStream) { 866 self.conn.stream = stream; 867 } 868 869 fn set_peer_credit(&mut self, credit: u32) { 870 assert!(credit < self.conn.peer_buf_alloc); 871 self.conn.peer_fwd_cnt = Wrapping(0); 872 self.conn.rx_cnt = Wrapping(self.conn.peer_buf_alloc - credit); 873 assert_eq!(self.conn.peer_avail_credit(), credit as usize); 874 } 875 876 fn send(&mut self) { 877 self.conn.send_pkt(&self.pkt).unwrap(); 878 } 879 880 fn recv(&mut self) { 881 self.conn.recv_pkt(&mut self.pkt).unwrap(); 882 } 883 884 fn notify_epollin(&mut self) { 885 self.conn.notify(epoll::Events::EPOLLIN); 886 assert!(self.conn.has_pending_rx()); 887 } 888 889 fn notify_epollout(&mut self) { 890 self.conn.notify(epoll::Events::EPOLLOUT); 891 } 892 893 fn init_pkt(&mut self, op: u16, len: u32) -> &mut VsockPacket { 894 init_pkt(&mut self.pkt, op, len) 895 } 896 897 fn init_data_pkt(&mut self, data: &[u8]) -> &VsockPacket { 898 assert!(data.len() <= self.pkt.buf().unwrap().len()); 899 self.init_pkt(uapi::VSOCK_OP_RW, data.len() as u32); 900 self.pkt.buf_mut().unwrap()[..data.len()].copy_from_slice(data); 901 &self.pkt 902 } 903 } 904 905 #[test] 906 fn test_peer_request() { 907 let mut ctx = CsmTestContext::new(ConnState::PeerInit); 908 assert!(ctx.conn.has_pending_rx()); 909 ctx.recv(); 910 // For peer-initiated requests, our connection should always yield a vsock response packet, 911 // in order to establish the connection. 912 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE); 913 assert_eq!(ctx.pkt.src_cid(), LOCAL_CID); 914 assert_eq!(ctx.pkt.dst_cid(), PEER_CID); 915 assert_eq!(ctx.pkt.src_port(), LOCAL_PORT); 916 assert_eq!(ctx.pkt.dst_port(), PEER_PORT); 917 assert_eq!(ctx.pkt.type_(), uapi::VSOCK_TYPE_STREAM); 918 assert_eq!(ctx.pkt.len(), 0); 919 // After yielding the response packet, the connection should have transitioned to the 920 // established state. 921 assert_eq!(ctx.conn.state, ConnState::Established); 922 } 923 924 #[test] 925 fn test_local_request() { 926 let mut ctx = CsmTestContext::new(ConnState::LocalInit); 927 // Host-initiated connections should first yield a connection request packet. 928 assert!(ctx.conn.has_pending_rx()); 929 // Before yielding the connection request packet, the timeout kill timer shouldn't be 930 // armed. 931 assert!(!ctx.conn.will_expire()); 932 ctx.recv(); 933 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_REQUEST); 934 // Since the request might time-out, the kill timer should now be armed. 935 assert!(ctx.conn.will_expire()); 936 assert!(!ctx.conn.has_expired()); 937 ctx.init_pkt(uapi::VSOCK_OP_RESPONSE, 0); 938 ctx.send(); 939 // Upon receiving a connection response, the connection should have transitioned to the 940 // established state, and the kill timer should've been disarmed. 941 assert_eq!(ctx.conn.state, ConnState::Established); 942 assert!(!ctx.conn.will_expire()); 943 } 944 945 #[test] 946 fn test_local_request_timeout() { 947 let mut ctx = CsmTestContext::new(ConnState::LocalInit); 948 ctx.recv(); 949 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_REQUEST); 950 assert!(ctx.conn.will_expire()); 951 assert!(!ctx.conn.has_expired()); 952 std::thread::sleep(std::time::Duration::from_millis( 953 defs::CONN_REQUEST_TIMEOUT_MS, 954 )); 955 assert!(ctx.conn.has_expired()); 956 } 957 958 #[test] 959 fn test_rx_data() { 960 let mut ctx = CsmTestContext::new_established(); 961 let data = &[1, 2, 3, 4]; 962 ctx.set_stream(TestStream::new_with_read_buf(data)); 963 assert_eq!(ctx.conn.get_polled_fd(), ctx.conn.stream.as_raw_fd()); 964 ctx.notify_epollin(); 965 ctx.recv(); 966 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 967 assert_eq!(ctx.pkt.len() as usize, data.len()); 968 assert_eq!(ctx.pkt.buf().unwrap()[..ctx.pkt.len() as usize], *data); 969 970 // There's no more data in the stream, so `recv_pkt` should yield `VsockError::NoData`. 971 match ctx.conn.recv_pkt(&mut ctx.pkt) { 972 Err(VsockError::NoData) => (), 973 other => panic!("{:?}", other), 974 } 975 976 // A recv attempt in an invalid state should yield an instant reset packet. 977 ctx.conn.state = ConnState::LocalClosed; 978 ctx.notify_epollin(); 979 ctx.recv(); 980 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 981 } 982 983 #[test] 984 fn test_local_close() { 985 let mut ctx = CsmTestContext::new_established(); 986 let mut stream = TestStream::new(); 987 stream.read_state = StreamState::Closed; 988 ctx.set_stream(stream); 989 ctx.notify_epollin(); 990 ctx.recv(); 991 // When the host-side stream is closed, we can neither send not receive any more data. 992 // Therefore, the vsock shutdown packet that we'll deliver to the guest must contain both 993 // the no-more-send and the no-more-recv indications. 994 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN); 995 assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND, 0); 996 assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV, 0); 997 998 // The kill timer should now be armed. 999 assert!(ctx.conn.will_expire()); 1000 assert!( 1001 ctx.conn.expiry().unwrap() 1002 < Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS) 1003 ); 1004 } 1005 1006 #[test] 1007 fn test_peer_close() { 1008 // Test that send/recv shutdown indications are handled correctly. 1009 // I.e. once set, an indication cannot be reset. 1010 { 1011 let mut ctx = CsmTestContext::new_established(); 1012 1013 ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0) 1014 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_RCV); 1015 ctx.send(); 1016 assert_eq!(ctx.conn.state, ConnState::PeerClosed(true, false)); 1017 1018 // Attempting to reset the no-more-recv indication should not work 1019 // (we are only setting the no-more-send indication here). 1020 ctx.pkt.set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_SEND); 1021 ctx.send(); 1022 assert_eq!(ctx.conn.state, ConnState::PeerClosed(true, true)); 1023 } 1024 1025 // Test case: 1026 // - reading data from a no-more-send connection should work; and 1027 // - writing data should have no effect. 1028 { 1029 let data = &[1, 2, 3, 4]; 1030 let mut ctx = CsmTestContext::new_established(); 1031 ctx.set_stream(TestStream::new_with_read_buf(data)); 1032 ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0) 1033 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_SEND); 1034 ctx.send(); 1035 ctx.notify_epollin(); 1036 ctx.recv(); 1037 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW); 1038 assert_eq!(&ctx.pkt.buf().unwrap()[..ctx.pkt.len() as usize], data); 1039 1040 ctx.init_data_pkt(data); 1041 ctx.send(); 1042 assert_eq!(ctx.conn.stream.write_buf.len(), 0); 1043 assert!(ctx.conn.tx_buf.is_empty()); 1044 } 1045 1046 // Test case: 1047 // - writing data to a no-more-recv connection should work; and 1048 // - attempting to read data from it should yield an RST packet. 1049 { 1050 let mut ctx = CsmTestContext::new_established(); 1051 ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0) 1052 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_RCV); 1053 ctx.send(); 1054 let data = &[1, 2, 3, 4]; 1055 ctx.init_data_pkt(data); 1056 ctx.send(); 1057 assert_eq!(ctx.conn.stream.write_buf, data.to_vec()); 1058 1059 ctx.notify_epollin(); 1060 ctx.recv(); 1061 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1062 } 1063 1064 // Test case: setting both no-more-send and no-more-recv indications should have the 1065 // connection confirm termination (i.e. yield an RST). 1066 { 1067 let mut ctx = CsmTestContext::new_established(); 1068 ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0) 1069 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_RCV | uapi::VSOCK_FLAGS_SHUTDOWN_SEND); 1070 ctx.send(); 1071 assert!(ctx.conn.has_pending_rx()); 1072 ctx.recv(); 1073 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1074 } 1075 } 1076 1077 #[test] 1078 fn test_local_read_error() { 1079 let mut ctx = CsmTestContext::new_established(); 1080 let mut stream = TestStream::new(); 1081 stream.read_state = StreamState::Error(ErrorKind::PermissionDenied); 1082 ctx.set_stream(stream); 1083 ctx.notify_epollin(); 1084 ctx.recv(); 1085 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1086 } 1087 1088 #[test] 1089 fn test_credit_request_to_peer() { 1090 let mut ctx = CsmTestContext::new_established(); 1091 ctx.set_peer_credit(0); 1092 ctx.notify_epollin(); 1093 ctx.recv(); 1094 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_CREDIT_REQUEST); 1095 } 1096 1097 #[test] 1098 fn test_credit_request_from_peer() { 1099 let mut ctx = CsmTestContext::new_established(); 1100 ctx.init_pkt(uapi::VSOCK_OP_CREDIT_REQUEST, 0); 1101 ctx.send(); 1102 assert!(ctx.conn.has_pending_rx()); 1103 ctx.recv(); 1104 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_CREDIT_UPDATE); 1105 assert_eq!(ctx.pkt.buf_alloc(), csm_defs::CONN_TX_BUF_SIZE); 1106 assert_eq!(ctx.pkt.fwd_cnt(), ctx.conn.fwd_cnt.0); 1107 } 1108 1109 #[test] 1110 fn test_credit_update_to_peer() { 1111 let mut ctx = CsmTestContext::new_established(); 1112 1113 // Force a stale state, where the peer hasn't been updated on our credit situation. 1114 ctx.conn.last_fwd_cnt_to_peer = Wrapping(0); 1115 1116 // Since a credit update token is sent when the fwd_cnt value exceeds 1117 // CONN_TX_BUF_SIZE - CONN_CREDIT_UPDATE_THRESHOLD, we initialize 1118 // fwd_cnt at 6 bytes below the threshold. 1119 let initial_fwd_cnt = 1120 csm_defs::CONN_TX_BUF_SIZE as u32 - csm_defs::CONN_CREDIT_UPDATE_THRESHOLD as u32 - 6; 1121 ctx.conn.fwd_cnt = Wrapping(initial_fwd_cnt); 1122 1123 // Use a 4-byte packet for triggering the credit update threshold. 1124 let data = &[1, 2, 3, 4]; 1125 1126 // Check that there is no pending RX. 1127 ctx.init_data_pkt(data); 1128 ctx.send(); 1129 assert!(!ctx.conn.has_pending_rx()); 1130 1131 // Send a packet again. 1132 ctx.init_data_pkt(data); 1133 ctx.send(); 1134 1135 // The CSM should now have a credit update available for the peer. 1136 assert!(ctx.conn.has_pending_rx()); 1137 ctx.recv(); 1138 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_CREDIT_UPDATE); 1139 assert_eq!(ctx.pkt.fwd_cnt(), initial_fwd_cnt + data.len() as u32 * 2); 1140 assert_eq!(ctx.conn.fwd_cnt, ctx.conn.last_fwd_cnt_to_peer); 1141 } 1142 1143 #[test] 1144 fn test_tx_buffering() { 1145 // Test case: 1146 // - when writing to the backing stream would block, TX data should end up in the TX buf 1147 // - when the CSM is notified that it can write to the backing stream, it should flush 1148 // the TX buf. 1149 { 1150 let mut ctx = CsmTestContext::new_established(); 1151 1152 let mut stream = TestStream::new(); 1153 stream.write_state = StreamState::WouldBlock; 1154 ctx.set_stream(stream); 1155 1156 // Send some data through the connection. The backing stream is set to reject writes, 1157 // so the data should end up in the TX buffer. 1158 let data = &[1, 2, 3, 4]; 1159 ctx.init_data_pkt(data); 1160 ctx.send(); 1161 1162 // When there's data in the TX buffer, the connection should ask to be notified when it 1163 // can write to its backing stream. 1164 assert!(ctx 1165 .conn 1166 .get_polled_evset() 1167 .contains(epoll::Events::EPOLLOUT)); 1168 assert_eq!(ctx.conn.tx_buf.len(), data.len()); 1169 1170 // Unlock the write stream and notify the connection it can now write its buffered 1171 // data. 1172 ctx.set_stream(TestStream::new()); 1173 ctx.conn.notify(epoll::Events::EPOLLOUT); 1174 assert!(ctx.conn.tx_buf.is_empty()); 1175 assert_eq!(ctx.conn.stream.write_buf, data); 1176 } 1177 } 1178 1179 #[test] 1180 fn test_stream_write_error() { 1181 // Test case: sending a data packet to a broken / closed backing stream should kill it. 1182 { 1183 let mut ctx = CsmTestContext::new_established(); 1184 let mut stream = TestStream::new(); 1185 stream.write_state = StreamState::Closed; 1186 ctx.set_stream(stream); 1187 1188 let data = &[1, 2, 3, 4]; 1189 ctx.init_data_pkt(data); 1190 ctx.send(); 1191 1192 assert_eq!(ctx.conn.state, ConnState::Killed); 1193 assert!(ctx.conn.has_pending_rx()); 1194 ctx.recv(); 1195 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1196 } 1197 1198 // Test case: notifying a connection that it can flush its TX buffer to a broken stream 1199 // should kill the connection. 1200 { 1201 let mut ctx = CsmTestContext::new_established(); 1202 1203 let mut stream = TestStream::new(); 1204 stream.write_state = StreamState::WouldBlock; 1205 ctx.set_stream(stream); 1206 1207 // Send some data through the connection. The backing stream is set to reject writes, 1208 // so the data should end up in the TX buffer. 1209 let data = &[1, 2, 3, 4]; 1210 ctx.init_data_pkt(data); 1211 ctx.send(); 1212 1213 // Set the backing stream to error out on write. 1214 let mut stream = TestStream::new(); 1215 stream.write_state = StreamState::Closed; 1216 ctx.set_stream(stream); 1217 1218 assert!(ctx 1219 .conn 1220 .get_polled_evset() 1221 .contains(epoll::Events::EPOLLOUT)); 1222 ctx.notify_epollout(); 1223 assert_eq!(ctx.conn.state, ConnState::Killed); 1224 } 1225 } 1226 1227 #[test] 1228 fn test_peer_credit_misbehavior() { 1229 let mut ctx = CsmTestContext::new_established(); 1230 1231 let mut stream = TestStream::new(); 1232 stream.write_state = StreamState::WouldBlock; 1233 ctx.set_stream(stream); 1234 1235 // Fill up the TX buffer. 1236 let data = vec![0u8; ctx.pkt.buf().unwrap().len()]; 1237 ctx.init_data_pkt(data.as_slice()); 1238 for _i in 0..(csm_defs::CONN_TX_BUF_SIZE / data.len() as u32) { 1239 ctx.send(); 1240 } 1241 1242 // Then try to send more data. 1243 ctx.send(); 1244 1245 // The connection should've committed suicide. 1246 assert_eq!(ctx.conn.state, ConnState::Killed); 1247 assert!(ctx.conn.has_pending_rx()); 1248 ctx.recv(); 1249 assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST); 1250 } 1251 } 1252