xref: /cloud-hypervisor/virtio-devices/src/vsock/csm/connection.rs (revision eeae63b4595fbf0cc69f62b6e9d9a79c543c4ac7)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 //! The main job of `VsockConnection` is to forward data traffic, back and forth, between a
5 //! guest-side AF_VSOCK socket and a host-side generic `Read + Write + AsRawFd` stream, while
6 //! also managing its internal state.
7 //! To that end, `VsockConnection` implements:
8 //! - `VsockChannel` for:
9 //!   - moving data from the host stream to a guest-provided RX buffer, via `recv_pkt()`; and
10 //!   - moving data from a guest-provided TX buffer to the host stream, via `send_pkt()`; and
11 //!   - updating its internal state, by absorbing control packets (anything other than
12 //!     VSOCK_OP_RW).
13 //! - `VsockEpollListener` for getting notified about the availability of data or free buffer
14 //!   space at the host stream.
15 //!
16 //! Note: there is a certain asymmetry to the RX and TX data flows:
17 //! - RX transfers do not need any data buffering, since data is read straight from the
18 //!   host stream and into the guest-provided RX buffer;
19 //! - TX transfers may require some data to be buffered by `VsockConnection`, if the host
20 //!   peer can't keep up with reading the data that we're writing. This is because, once
21 //!   the guest driver provides some data in a virtio TX buffer, the vsock device must
22 //!   consume it.  If that data can't be forwarded straight to the host stream, we'll
23 //!   have to store it in a buffer (and flush it at a later time). Vsock flow control
24 //!   ensures that our TX buffer doesn't overflow.
25 //
26 // The code in this file is best read with a fresh memory of the vsock protocol inner-workings.
27 // To help with that, here is a
28 //
29 // Short primer on the vsock protocol
30 // ----------------------------------
31 //
32 // 1. Establishing a connection
33 //    A vsock connection is considered established after a two-way handshake:
34 //    - the initiating peer sends a connection request packet (`hdr.op` == VSOCK_OP_REQUEST);
35 //      then
36 //    - the listening peer sends back a connection response packet (`hdr.op` ==
37 //      VSOCK_OP_RESPONSE).
38 //
39 // 2. Terminating a connection
40 //    When a peer wants to shut down an established connection, it sends a VSOCK_OP_SHUTDOWN
41 //    packet. Two header flags are used with VSOCK_OP_SHUTDOWN, indicating the sender's
42 //    intention:
43 //    - VSOCK_FLAGS_SHUTDOWN_RCV: the sender will receive no more data for this connection; and
44 //    - VSOCK_FLAGS_SHUTDOWN_SEND: the sender will send no more data for this connection.
45 //    After a shutdown packet, the receiving peer will have some protocol-undefined time to
46 //    flush its buffers, and then forcefully terminate the connection by sending back an RST
47 //    packet. If the shutdown-initiating peer doesn't receive this RST packet during a timeout
48 //    period, it will send one itself, thus terminating the connection.
49 //    Note: a peer can send more than one VSOCK_OP_SHUTDOWN packets. However, read/write
50 //          indications cannot be undone. E.g. once a "no-more-sending" promise was made, it
51 //          cannot be taken back.  That is, `hdr.flags` will be ORed between subsequent
52 //          VSOCK_OP_SHUTDOWN packets.
53 //
54 // 3. Flow control
55 //    Before sending a data packet (VSOCK_OP_RW), the sender must make sure that the receiver
56 //    has enough free buffer space to store that data. If this condition is not respected, the
57 //    receiving peer's behavior is undefined. In this implementation, we forcefully terminate
58 //    the connection by sending back a VSOCK_OP_RST packet.
59 //    Note: all buffer space information is computed and stored on a per-connection basis.
60 //    Peers keep each other informed about the free buffer space they have by filling in two
61 //    packet header members with each packet they send:
62 //    - `hdr.buf_alloc`: the total buffer space the peer has allocated for receiving data; and
63 //    - `hdr.fwd_cnt`: the total number of bytes the peer has successfully flushed out of its
64 //       buffer.
65 //    One can figure out how much space its peer has available in its buffer by inspecting the
66 //    difference between how much it has sent to the peer and how much the peer has flushed out
67 //    (i.e.  "forwarded", in the vsock spec terminology):
68 //    `peer_free = peer_buf_alloc - (total_bytes_sent_to_peer - peer_fwd_cnt)`.
69 //    Note: the above requires that peers constantly keep each other informed on their buffer
70 //          space situation. However, since there are no receipt acknowledgement packets
71 //          defined for the vsock protocol, packet flow can often be unidirectional (just one
72 //          peer sending data to another), so the sender's information about the receiver's
73 //          buffer space can get quickly outdated. The vsock protocol defines two solutions to
74 //          this problem:
75 //          1. The sender can explicitly ask for a buffer space (i.e. "credit") update from its
76 //             peer, via a VSOCK_OP_CREDIT_REQUEST packet, to which it will get a
77 //             VSOCK_OP_CREDIT_UPDATE response (or any response will do, really, since credit
78 //             information must be included in any packet);
79 //          2. The receiver can be proactive, and send VSOCK_OP_CREDIT_UPDATE packet, whenever
80 //             it thinks its peer's information is out of date.
81 //          Our implementation uses the proactive approach.
82 //
83 use std::io::{ErrorKind, Read, Write};
84 use std::num::Wrapping;
85 use std::os::unix::io::{AsRawFd, RawFd};
86 use std::time::{Duration, Instant};
87 
88 use super::super::defs::uapi;
89 use super::super::packet::VsockPacket;
90 use super::super::{Result as VsockResult, VsockChannel, VsockEpollListener, VsockError};
91 use super::txbuf::TxBuf;
92 use super::{defs, ConnState, Error, PendingRx, PendingRxSet, Result};
93 
94 /// A self-managing connection object, that handles communication between a guest-side AF_VSOCK
95 /// socket and a host-side `Read + Write + AsRawFd` stream.
96 ///
97 pub struct VsockConnection<S: Read + Write + AsRawFd> {
98     /// The current connection state.
99     state: ConnState,
100     /// The local CID. Most of the time this will be the constant `2` (the vsock host CID).
101     local_cid: u64,
102     /// The peer (guest) CID.
103     peer_cid: u64,
104     /// The local (host) port.
105     local_port: u32,
106     /// The peer (guest) port.
107     peer_port: u32,
108     /// The (connected) host-side stream.
109     stream: S,
110     /// The TX buffer for this connection.
111     tx_buf: TxBuf,
112     /// Total number of bytes that have been successfully written to `self.stream`, either
113     /// directly, or flushed from `self.tx_buf`.
114     fwd_cnt: Wrapping<u32>,
115     /// The amount of buffer space that the peer (guest) has allocated for this connection.
116     peer_buf_alloc: u32,
117     /// The total number of bytes that the peer has forwarded away.
118     peer_fwd_cnt: Wrapping<u32>,
119     /// The total number of bytes sent to the peer (guest vsock driver)
120     rx_cnt: Wrapping<u32>,
121     /// Our `self.fwd_cnt`, as last sent to the peer. This is used to provide proactive credit
122     /// updates, and let the peer know it's OK to send more data.
123     last_fwd_cnt_to_peer: Wrapping<u32>,
124     /// The set of pending RX packet indications that `recv_pkt()` will use to fill in a
125     /// packet for the peer (guest).
126     pending_rx: PendingRxSet,
127     /// Instant when this connection should be scheduled for immediate termination, due to some
128     /// timeout condition having been fulfilled.
129     expiry: Option<Instant>,
130 }
131 
132 impl<S> VsockChannel for VsockConnection<S>
133 where
134     S: Read + Write + AsRawFd,
135 {
136     /// Fill in a vsock packet, to be delivered to our peer (the guest driver).
137     ///
138     /// As per the `VsockChannel` trait, this should only be called when there is data to be
139     /// fetched from the channel (i.e. `has_pending_rx()` is true). Otherwise, it will error
140     /// out with `VsockError::NoData`.
141     /// Pending RX indications are set by other mutable actions performed on the channel. For
142     /// instance, `send_pkt()` could set an Rst indication, if called with a VSOCK_OP_SHUTDOWN
143     /// packet, or `notify()` could set a Rw indication (a data packet can be fetched from the
144     /// channel), if data was ready to be read from the host stream.
145     ///
146     /// Returns:
147     /// - `Ok(())`: the packet has been successfully filled in and is ready for delivery;
148     /// - `Err(VsockError::NoData)`: there was no data available with which to fill in the
149     ///    packet;
150     /// - `Err(VsockError::PktBufMissing)`: the packet would've been filled in with data, but
151     ///    it is missing the data buffer.
152     ///
153     fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> {
154         // Perform some generic initialization that is the same for any packet operation (e.g.
155         // source, destination, credit, etc).
156         self.init_pkt(pkt);
157 
158         // If forceful termination is pending, there's no point in checking for anything else.
159         // It's dead, Jim.
160         if self.pending_rx.remove(PendingRx::Rst) {
161             pkt.set_op(uapi::VSOCK_OP_RST);
162             return Ok(());
163         }
164 
165         // Next up: if we're due a connection confirmation, that's all we need to know to fill
166         // in this packet.
167         if self.pending_rx.remove(PendingRx::Response) {
168             self.state = ConnState::Established;
169             pkt.set_op(uapi::VSOCK_OP_RESPONSE);
170             return Ok(());
171         }
172 
173         // Same thing goes for locally-initiated connections that need to yield a connection
174         // request.
175         if self.pending_rx.remove(PendingRx::Request) {
176             self.expiry =
177                 Some(Instant::now() + Duration::from_millis(defs::CONN_REQUEST_TIMEOUT_MS));
178             pkt.set_op(uapi::VSOCK_OP_REQUEST);
179             return Ok(());
180         }
181 
182         if self.pending_rx.remove(PendingRx::Rw) {
183             // We're due to produce a data packet, by reading the data from the host-side
184             // Unix socket.
185 
186             match self.state {
187                 // A data packet is only valid for established connections, and connections for
188                 // which our peer has initiated a graceful shutdown, but can still receive data.
189                 ConnState::Established | ConnState::PeerClosed(false, _) => (),
190                 _ => {
191                     // Any other connection state is invalid at this point, and we need to kill it
192                     // with fire.
193                     pkt.set_op(uapi::VSOCK_OP_RST);
194                     return Ok(());
195                 }
196             }
197 
198             // Oh wait, before we start bringing in the big data, can our peer handle receiving so
199             // much bytes goodness?
200             if self.need_credit_update_from_peer() {
201                 self.last_fwd_cnt_to_peer = self.fwd_cnt;
202                 pkt.set_op(uapi::VSOCK_OP_CREDIT_REQUEST);
203                 return Ok(());
204             }
205 
206             let buf = pkt.buf_mut().ok_or(VsockError::PktBufMissing)?;
207 
208             // The maximum amount of data we can read in is limited by both the RX buffer size and
209             // the peer available buffer space.
210             let max_len = std::cmp::min(buf.len(), self.peer_avail_credit());
211 
212             // Read data from the stream straight to the RX buffer, for maximum throughput.
213             match self.stream.read(&mut buf[..max_len]) {
214                 Ok(read_cnt) => {
215                     if read_cnt == 0 {
216                         // A 0-length read means the host stream was closed down. In that case,
217                         // we'll ask our peer to shut down the connection. We can neither send nor
218                         // receive any more data.
219                         self.state = ConnState::LocalClosed;
220                         self.expiry = Some(
221                             Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS),
222                         );
223                         pkt.set_op(uapi::VSOCK_OP_SHUTDOWN)
224                             .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV)
225                             .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND);
226                     } else {
227                         // On a successful data read, we fill in the packet with the RW op, and
228                         // length of the read data.
229                         pkt.set_op(uapi::VSOCK_OP_RW).set_len(read_cnt as u32);
230                     }
231                     self.rx_cnt += Wrapping(pkt.len());
232                     self.last_fwd_cnt_to_peer = self.fwd_cnt;
233                     return Ok(());
234                 }
235                 Err(err) if err.kind() == ErrorKind::WouldBlock => {
236                     // This shouldn't actually happen (receiving EWOULDBLOCK after EPOLLIN), but
237                     // apparently it does, so we need to handle it gracefully.
238                     warn!(
239                         "vsock: unexpected EWOULDBLOCK while reading from backing stream: \
240                          lp={}, pp={}, err={:?}",
241                         self.local_port, self.peer_port, err
242                     );
243                 }
244                 Err(err) => {
245                     // We are not expecting any other errors when reading from the underlying
246                     // stream. If any show up, we'll immediately kill this connection.
247                     error!(
248                         "vsock: error reading from backing stream: lp={}, pp={}, err={:?}",
249                         self.local_port, self.peer_port, err
250                     );
251                     pkt.set_op(uapi::VSOCK_OP_RST);
252                     self.last_fwd_cnt_to_peer = self.fwd_cnt;
253                     return Ok(());
254                 }
255             };
256         }
257 
258         // A credit update is basically a no-op, so we should only waste a perfectly fine RX
259         // buffer on it if we really have nothing else to say, hence we check for this RX
260         // indication last.
261         if self.pending_rx.remove(PendingRx::CreditUpdate) && !self.has_pending_rx() {
262             pkt.set_op(uapi::VSOCK_OP_CREDIT_UPDATE);
263             self.last_fwd_cnt_to_peer = self.fwd_cnt;
264             return Ok(());
265         }
266 
267         // We've already checked for all conditions that would have produced a packet, so
268         // if we got to here, we don't know how to yield one.
269         Err(VsockError::NoData)
270     }
271 
272     /// Deliver a guest-generated packet to this connection.
273     ///
274     /// This forwards the data in RW packets to the host stream, and absorbs control packets,
275     /// using them to manage the internal connection state.
276     ///
277     /// Returns:
278     /// always `Ok(())`: the packet has been consumed;
279     ///
280     fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> {
281         // Update the peer credit information.
282         self.peer_buf_alloc = pkt.buf_alloc();
283         self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt());
284 
285         match self.state {
286             // Most frequent case: this is an established connection that needs to forward some
287             // data to the host stream. Also works for a connection that has begun shutting
288             // down, but the peer still has some data to send.
289             ConnState::Established | ConnState::PeerClosed(_, false)
290                 if pkt.op() == uapi::VSOCK_OP_RW =>
291             {
292                 if pkt.buf().is_none() {
293                     info!(
294                         "vsock: dropping empty data packet from guest (lp={}, pp={}",
295                         self.local_port, self.peer_port
296                     );
297                     return Ok(());
298                 }
299 
300                 // Unwrapping here is safe, since we just checked `pkt.buf()` above.
301                 let buf_slice = &pkt.buf().unwrap()[..(pkt.len() as usize)];
302                 if let Err(err) = self.send_bytes(buf_slice) {
303                     // If we can't write to the host stream, that's an unrecoverable error, so
304                     // we'll terminate this connection.
305                     warn!(
306                         "vsock: error writing to local stream (lp={}, pp={}): {:?}",
307                         self.local_port, self.peer_port, err
308                     );
309                     self.kill();
310                     return Ok(());
311                 }
312 
313                 // We might've just consumed some data. If that's the case, we might need to
314                 // update the peer on our buffer space situation, so that it can keep sending
315                 // data packets our way.
316                 if self.peer_needs_credit_update() {
317                     self.pending_rx.insert(PendingRx::CreditUpdate);
318                 }
319             }
320 
321             // Next up: receiving a response / confirmation for a host-initiated connection.
322             // We'll move to an Established state, and pass on the good news through the host
323             // stream.
324             ConnState::LocalInit if pkt.op() == uapi::VSOCK_OP_RESPONSE => {
325                 self.expiry = None;
326                 self.state = ConnState::Established;
327             }
328 
329             // The peer wants to shut down an established connection.  If they have nothing
330             // more to send nor receive, and we don't have to wait to drain our TX buffer, we
331             // can schedule an RST packet (to terminate the connection on the next recv call).
332             // Otherwise, we'll arm the kill timer.
333             ConnState::Established if pkt.op() == uapi::VSOCK_OP_SHUTDOWN => {
334                 let recv_off = pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV != 0;
335                 let send_off = pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND != 0;
336                 self.state = ConnState::PeerClosed(recv_off, send_off);
337                 if recv_off && send_off {
338                     if self.tx_buf.is_empty() {
339                         self.pending_rx.insert(PendingRx::Rst);
340                     } else {
341                         self.expiry = Some(
342                             Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS),
343                         );
344                     }
345                 }
346             }
347 
348             // The peer wants to update a shutdown request, with more receive/send indications.
349             // The same logic as above applies.
350             ConnState::PeerClosed(ref mut recv_off, ref mut send_off)
351                 if pkt.op() == uapi::VSOCK_OP_SHUTDOWN =>
352             {
353                 *recv_off = *recv_off || (pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV != 0);
354                 *send_off = *send_off || (pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND != 0);
355                 if *recv_off && *send_off && self.tx_buf.is_empty() {
356                     self.pending_rx.insert(PendingRx::Rst);
357                 }
358             }
359 
360             // A credit update from our peer is valid only in a state which allows data
361             // transfer towards the peer.
362             ConnState::Established | ConnState::PeerInit | ConnState::PeerClosed(false, _)
363                 if pkt.op() == uapi::VSOCK_OP_CREDIT_UPDATE =>
364             {
365                 // Nothing to do here; we've already updated peer credit.
366             }
367 
368             // A credit request from our peer is valid only in a state which allows data
369             // transfer from the peer. We'll respond with a credit update packet.
370             ConnState::Established | ConnState::PeerInit | ConnState::PeerClosed(_, false)
371                 if pkt.op() == uapi::VSOCK_OP_CREDIT_REQUEST =>
372             {
373                 self.pending_rx.insert(PendingRx::CreditUpdate);
374             }
375 
376             _ => {
377                 debug!(
378                     "vsock: dropping invalid TX pkt for connection: state={:?}, pkt.hdr={:?}",
379                     self.state,
380                     pkt.hdr()
381                 );
382             }
383         };
384 
385         Ok(())
386     }
387 
388     /// Check if the connection has any pending packet addressed to the peer.
389     ///
390     fn has_pending_rx(&self) -> bool {
391         !self.pending_rx.is_empty()
392     }
393 }
394 
395 impl<S> VsockEpollListener for VsockConnection<S>
396 where
397     S: Read + Write + AsRawFd,
398 {
399     /// Get the file descriptor that this connection wants polled.
400     ///
401     /// The connection is interested in being notified about EPOLLIN / EPOLLOUT events on the
402     /// host stream.
403     ///
404     fn get_polled_fd(&self) -> RawFd {
405         self.stream.as_raw_fd()
406     }
407 
408     /// Get the event set that this connection is interested in.
409     ///
410     /// A connection will want to be notified when:
411     /// - data is available to be read from the host stream, so that it can store an RW pending
412     ///   RX indication; and
413     /// - data can be written to the host stream, and the TX buffer needs to be flushed.
414     ///
415     fn get_polled_evset(&self) -> epoll::Events {
416         let mut evset = epoll::Events::empty();
417         if !self.tx_buf.is_empty() {
418             // There's data waiting in the TX buffer, so we are interested in being notified
419             // when writing to the host stream wouldn't block.
420             evset.insert(epoll::Events::EPOLLOUT);
421         }
422         // We're generally interested in being notified when data can be read from the host
423         // stream, unless we're in a state which doesn't allow moving data from host to guest.
424         match self.state {
425             ConnState::Killed | ConnState::LocalClosed | ConnState::PeerClosed(true, _) => (),
426             _ if self.need_credit_update_from_peer() => (),
427             _ => evset.insert(epoll::Events::EPOLLIN),
428         }
429         evset
430     }
431 
432     /// Notify the connection about an event (or set of events) that it was interested in.
433     ///
434     fn notify(&mut self, evset: epoll::Events) {
435         if evset.contains(epoll::Events::EPOLLIN) {
436             // Data can be read from the host stream. Setting a Rw pending indication, so that
437             // the muxer will know to call `recv_pkt()` later.
438             self.pending_rx.insert(PendingRx::Rw);
439         }
440 
441         if evset.contains(epoll::Events::EPOLLOUT) {
442             // Data can be written to the host stream. Time to flush out the TX buffer.
443             //
444             if self.tx_buf.is_empty() {
445                 info!("vsock: connection received unexpected EPOLLOUT event");
446                 return;
447             }
448             let flushed = self
449                 .tx_buf
450                 .flush_to(&mut self.stream)
451                 .unwrap_or_else(|err| {
452                     warn!(
453                         "vsock: error flushing TX buf for (lp={}, pp={}): {:?}",
454                         self.local_port, self.peer_port, err
455                     );
456                     match err {
457                         Error::TxBufFlush(inner) if inner.kind() == ErrorKind::WouldBlock => {
458                             // This should never happen (EWOULDBLOCK after EPOLLOUT), but
459                             // it does, so let's absorb it.
460                         }
461                         _ => self.kill(),
462                     };
463                     0
464                 });
465             self.fwd_cnt += Wrapping(flushed as u32);
466 
467             // If this connection was shutting down, but is waiting to drain the TX buffer
468             // before forceful termination, the wait might be over.
469             if self.state == ConnState::PeerClosed(true, true) && self.tx_buf.is_empty() {
470                 self.pending_rx.insert(PendingRx::Rst);
471             } else if self.peer_needs_credit_update() {
472                 // If we've freed up some more buffer space, we may need to let the peer know it
473                 // can safely send more data our way.
474                 self.pending_rx.insert(PendingRx::CreditUpdate);
475             }
476         }
477     }
478 }
479 
480 impl<S> VsockConnection<S>
481 where
482     S: Read + Write + AsRawFd,
483 {
484     /// Create a new guest-initiated connection object.
485     ///
486     pub fn new_peer_init(
487         stream: S,
488         local_cid: u64,
489         peer_cid: u64,
490         local_port: u32,
491         peer_port: u32,
492         peer_buf_alloc: u32,
493     ) -> Self {
494         Self {
495             local_cid,
496             peer_cid,
497             local_port,
498             peer_port,
499             stream,
500             state: ConnState::PeerInit,
501             tx_buf: TxBuf::new(),
502             fwd_cnt: Wrapping(0),
503             peer_buf_alloc,
504             peer_fwd_cnt: Wrapping(0),
505             rx_cnt: Wrapping(0),
506             last_fwd_cnt_to_peer: Wrapping(0),
507             pending_rx: PendingRxSet::from(PendingRx::Response),
508             expiry: None,
509         }
510     }
511 
512     /// Create a new host-initiated connection object.
513     ///
514     pub fn new_local_init(
515         stream: S,
516         local_cid: u64,
517         peer_cid: u64,
518         local_port: u32,
519         peer_port: u32,
520     ) -> Self {
521         Self {
522             local_cid,
523             peer_cid,
524             local_port,
525             peer_port,
526             stream,
527             state: ConnState::LocalInit,
528             tx_buf: TxBuf::new(),
529             fwd_cnt: Wrapping(0),
530             peer_buf_alloc: 0,
531             peer_fwd_cnt: Wrapping(0),
532             rx_cnt: Wrapping(0),
533             last_fwd_cnt_to_peer: Wrapping(0),
534             pending_rx: PendingRxSet::from(PendingRx::Request),
535             expiry: None,
536         }
537     }
538 
539     /// Check if there is an expiry (kill) timer set for this connection, sometime in the
540     /// future.
541     ///
542     pub fn will_expire(&self) -> bool {
543         match self.expiry {
544             None => false,
545             Some(t) => t > Instant::now(),
546         }
547     }
548 
549     /// Check if this connection needs to be scheduled for forceful termination, due to its
550     /// kill timer having expired.
551     ///
552     pub fn has_expired(&self) -> bool {
553         match self.expiry {
554             None => false,
555             Some(t) => t <= Instant::now(),
556         }
557     }
558 
559     /// Get the kill timer value, if one is set.
560     ///
561     pub fn expiry(&self) -> Option<Instant> {
562         self.expiry
563     }
564 
565     /// Schedule the connection to be forcefully terminated ASAP (i.e. the next time the
566     /// connection is asked to yield a packet, via `recv_pkt()`).
567     ///
568     pub fn kill(&mut self) {
569         self.state = ConnState::Killed;
570         self.pending_rx.insert(PendingRx::Rst);
571     }
572 
573     /// Return the connections state.
574     ///
575     pub fn state(&self) -> ConnState {
576         self.state
577     }
578 
579     /// Send some raw, untracked, data straight to the underlying connected stream.
580     /// Returns: number of bytes written, or the error describing the write failure.
581     ///
582     /// Warning: this will bypass the connection state machine and write directly to the
583     /// underlying stream. No account of this write is kept, which includes bypassing
584     /// vsock flow control.
585     ///
586     pub fn send_bytes_raw(&mut self, buf: &[u8]) -> Result<usize> {
587         self.stream.write(buf).map_err(Error::StreamWrite)
588     }
589 
590     /// Send some raw data (a byte-slice) to the host stream.
591     ///
592     /// Raw data can either be sent straight to the host stream, or to our TX buffer, if the
593     /// former fails.
594     ///
595     fn send_bytes(&mut self, buf: &[u8]) -> Result<()> {
596         // If there is data in the TX buffer, that means we're already registered for EPOLLOUT
597         // events on the underlying stream. Therefore, there's no point in attempting a write
598         // at this point. `self.notify()` will get called when EPOLLOUT arrives, and it will
599         // attempt to drain the TX buffer then.
600         if !self.tx_buf.is_empty() {
601             return self.tx_buf.push(buf);
602         }
603 
604         // The TX buffer is empty, so we can try to write straight to the host stream.
605         let written = match self.stream.write(buf) {
606             Ok(cnt) => cnt,
607             Err(e) => {
608                 // Absorb any would-block errors, since we can always try again later.
609                 if e.kind() == ErrorKind::WouldBlock {
610                     0
611                 } else {
612                     // We don't know how to handle any other write error, so we'll send it up
613                     // the call chain.
614                     return Err(Error::StreamWrite(e));
615                 }
616             }
617         };
618         // Move the "forwarded bytes" counter ahead by how much we were able to send out.
619         self.fwd_cnt += Wrapping(written as u32);
620 
621         // If we couldn't write the whole slice, we'll need to push the remaining data to our
622         // buffer.
623         if written < buf.len() {
624             self.tx_buf.push(&buf[written..])?;
625         }
626 
627         Ok(())
628     }
629 
630     /// Check if the credit information the peer has last received from us is outdated.
631     ///
632     fn peer_needs_credit_update(&self) -> bool {
633         let peer_seen_free_buf =
634             Wrapping(defs::CONN_TX_BUF_SIZE) - (self.fwd_cnt - self.last_fwd_cnt_to_peer);
635         peer_seen_free_buf < Wrapping(defs::CONN_CREDIT_UPDATE_THRESHOLD)
636     }
637 
638     /// Check if we need to ask the peer for a credit update before sending any more data its
639     /// way.
640     ///
641     fn need_credit_update_from_peer(&self) -> bool {
642         self.peer_avail_credit() == 0
643     }
644 
645     /// Get the maximum number of bytes that we can send to our peer, without overflowing its
646     /// buffer.
647     ///
648     fn peer_avail_credit(&self) -> usize {
649         (Wrapping(self.peer_buf_alloc) - (self.rx_cnt - self.peer_fwd_cnt)).0 as usize
650     }
651 
652     /// Prepare a packet header for transmission to our peer.
653     ///
654     fn init_pkt<'a>(&self, pkt: &'a mut VsockPacket) -> &'a mut VsockPacket {
655         // Make sure the header is zeroed-out first.
656         // This looks sub-optimal, but it is actually optimized-out in the compiled code to be
657         // faster than a memset().
658         for b in pkt.hdr_mut() {
659             *b = 0;
660         }
661 
662         pkt.set_src_cid(self.local_cid)
663             .set_dst_cid(self.peer_cid)
664             .set_src_port(self.local_port)
665             .set_dst_port(self.peer_port)
666             .set_type(uapi::VSOCK_TYPE_STREAM)
667             .set_buf_alloc(defs::CONN_TX_BUF_SIZE)
668             .set_fwd_cnt(self.fwd_cnt.0)
669     }
670 }
671 
672 #[cfg(test)]
673 mod tests {
674     use std::io::{Error as IoError, Result as IoResult};
675 
676     use libc::EFD_NONBLOCK;
677     use virtio_queue::QueueOwnedT;
678     use vmm_sys_util::eventfd::EventFd;
679 
680     use super::super::super::tests::TestContext;
681     use super::super::defs as csm_defs;
682     use super::*;
683 
684     const LOCAL_CID: u64 = 2;
685     const PEER_CID: u64 = 3;
686     const LOCAL_PORT: u32 = 1002;
687     const PEER_PORT: u32 = 1003;
688     const PEER_BUF_ALLOC: u32 = 64 * 1024;
689 
690     enum StreamState {
691         Closed,
692         Error(ErrorKind),
693         Ready,
694         WouldBlock,
695     }
696 
697     struct TestStream {
698         fd: EventFd,
699         read_buf: Vec<u8>,
700         read_state: StreamState,
701         write_buf: Vec<u8>,
702         write_state: StreamState,
703     }
704     impl TestStream {
705         fn new() -> Self {
706             Self {
707                 fd: EventFd::new(EFD_NONBLOCK).unwrap(),
708                 read_state: StreamState::Ready,
709                 write_state: StreamState::Ready,
710                 read_buf: Vec::new(),
711                 write_buf: Vec::new(),
712             }
713         }
714         fn new_with_read_buf(buf: &[u8]) -> Self {
715             let mut stream = Self::new();
716             stream.read_buf = buf.to_vec();
717             stream
718         }
719     }
720 
721     impl AsRawFd for TestStream {
722         fn as_raw_fd(&self) -> RawFd {
723             self.fd.as_raw_fd()
724         }
725     }
726 
727     impl Read for TestStream {
728         fn read(&mut self, data: &mut [u8]) -> IoResult<usize> {
729             match self.read_state {
730                 StreamState::Closed => Ok(0),
731                 StreamState::Error(kind) => Err(IoError::new(kind, "whatevs")),
732                 StreamState::Ready => {
733                     if self.read_buf.is_empty() {
734                         return Err(IoError::new(ErrorKind::WouldBlock, "EAGAIN"));
735                     }
736                     let len = std::cmp::min(data.len(), self.read_buf.len());
737                     assert_ne!(len, 0);
738                     data[..len].copy_from_slice(&self.read_buf[..len]);
739                     self.read_buf = self.read_buf.split_off(len);
740                     Ok(len)
741                 }
742                 StreamState::WouldBlock => Err(IoError::new(ErrorKind::WouldBlock, "EAGAIN")),
743             }
744         }
745     }
746 
747     impl Write for TestStream {
748         fn write(&mut self, data: &[u8]) -> IoResult<usize> {
749             match self.write_state {
750                 StreamState::Closed => Err(IoError::new(ErrorKind::BrokenPipe, "EPIPE")),
751                 StreamState::Error(kind) => Err(IoError::new(kind, "whatevs")),
752                 StreamState::Ready => {
753                     self.write_buf.extend_from_slice(data);
754                     Ok(data.len())
755                 }
756                 StreamState::WouldBlock => Err(IoError::new(ErrorKind::WouldBlock, "EAGAIN")),
757             }
758         }
759         fn flush(&mut self) -> IoResult<()> {
760             Ok(())
761         }
762     }
763 
764     impl<S> VsockConnection<S>
765     where
766         S: Read + Write + AsRawFd,
767     {
768         /// Get the fwd_cnt value from the connection.
769         pub(crate) fn fwd_cnt(&self) -> Wrapping<u32> {
770             self.fwd_cnt
771         }
772 
773         /// Forcefully insert a credit update flag.
774         pub(crate) fn insert_credit_update(&mut self) {
775             self.pending_rx.insert(PendingRx::CreditUpdate);
776         }
777     }
778 
779     fn init_pkt(pkt: &mut VsockPacket, op: u16, len: u32) -> &mut VsockPacket {
780         for b in pkt.hdr_mut() {
781             *b = 0;
782         }
783         pkt.set_src_cid(PEER_CID)
784             .set_dst_cid(LOCAL_CID)
785             .set_src_port(PEER_PORT)
786             .set_dst_port(LOCAL_PORT)
787             .set_type(uapi::VSOCK_TYPE_STREAM)
788             .set_buf_alloc(PEER_BUF_ALLOC)
789             .set_op(op)
790             .set_len(len)
791     }
792 
793     // This is the connection state machine test context: a helper struct to provide CSM testing
794     // primitives. A single `VsockPacket` object will be enough for our testing needs. We'll be
795     // using it for simulating both packet sends and packet receives. We need to keep the vsock
796     // testing context alive, since `VsockPacket` is just a pointer-wrapper over some data that
797     // resides in guest memory. The vsock test context owns the `GuestMemory` object, so we'll make
798     // it a member here, in order to make sure that guest memory outlives our testing packet.  A
799     // single `VsockConnection` object will also suffice for our testing needs. We'll be using a
800     // specially crafted `Read + Write + AsRawFd` object as a backing stream, so that we can
801     // control the various error conditions that might arise.
802     struct CsmTestContext {
803         _vsock_test_ctx: TestContext,
804         pkt: VsockPacket,
805         conn: VsockConnection<TestStream>,
806     }
807 
808     impl CsmTestContext {
809         fn new_established() -> Self {
810             Self::new(ConnState::Established)
811         }
812 
813         fn new(conn_state: ConnState) -> Self {
814             let vsock_test_ctx = TestContext::new();
815             let mut handler_ctx = vsock_test_ctx.create_epoll_handler_context();
816             let stream = TestStream::new();
817             let mut pkt = VsockPacket::from_rx_virtq_head(
818                 &mut handler_ctx.handler.queues[0]
819                     .iter(&vsock_test_ctx.mem)
820                     .unwrap()
821                     .next()
822                     .unwrap(),
823                 None,
824             )
825             .unwrap();
826             let conn = match conn_state {
827                 ConnState::PeerInit => VsockConnection::<TestStream>::new_peer_init(
828                     stream,
829                     LOCAL_CID,
830                     PEER_CID,
831                     LOCAL_PORT,
832                     PEER_PORT,
833                     PEER_BUF_ALLOC,
834                 ),
835                 ConnState::LocalInit => VsockConnection::<TestStream>::new_local_init(
836                     stream, LOCAL_CID, PEER_CID, LOCAL_PORT, PEER_PORT,
837                 ),
838                 ConnState::Established => {
839                     let mut conn = VsockConnection::<TestStream>::new_peer_init(
840                         stream,
841                         LOCAL_CID,
842                         PEER_CID,
843                         LOCAL_PORT,
844                         PEER_PORT,
845                         PEER_BUF_ALLOC,
846                     );
847                     assert!(conn.has_pending_rx());
848                     conn.recv_pkt(&mut pkt).unwrap();
849                     assert_eq!(pkt.op(), uapi::VSOCK_OP_RESPONSE);
850                     conn
851                 }
852                 other => panic!("invalid ctx state: {other:?}"),
853             };
854             assert_eq!(conn.state, conn_state);
855             Self {
856                 _vsock_test_ctx: vsock_test_ctx,
857                 pkt,
858                 conn,
859             }
860         }
861 
862         fn set_stream(&mut self, stream: TestStream) {
863             self.conn.stream = stream;
864         }
865 
866         fn set_peer_credit(&mut self, credit: u32) {
867             assert!(credit < self.conn.peer_buf_alloc);
868             self.conn.peer_fwd_cnt = Wrapping(0);
869             self.conn.rx_cnt = Wrapping(self.conn.peer_buf_alloc - credit);
870             assert_eq!(self.conn.peer_avail_credit(), credit as usize);
871         }
872 
873         fn send(&mut self) {
874             self.conn.send_pkt(&self.pkt).unwrap();
875         }
876 
877         fn recv(&mut self) {
878             self.conn.recv_pkt(&mut self.pkt).unwrap();
879         }
880 
881         fn notify_epollin(&mut self) {
882             self.conn.notify(epoll::Events::EPOLLIN);
883             assert!(self.conn.has_pending_rx());
884         }
885 
886         fn notify_epollout(&mut self) {
887             self.conn.notify(epoll::Events::EPOLLOUT);
888         }
889 
890         fn init_pkt(&mut self, op: u16, len: u32) -> &mut VsockPacket {
891             init_pkt(&mut self.pkt, op, len)
892         }
893 
894         fn init_data_pkt(&mut self, data: &[u8]) -> &VsockPacket {
895             assert!(data.len() <= self.pkt.buf().unwrap().len());
896             self.init_pkt(uapi::VSOCK_OP_RW, data.len() as u32);
897             self.pkt.buf_mut().unwrap()[..data.len()].copy_from_slice(data);
898             &self.pkt
899         }
900     }
901 
902     #[test]
903     fn test_peer_request() {
904         let mut ctx = CsmTestContext::new(ConnState::PeerInit);
905         assert!(ctx.conn.has_pending_rx());
906         ctx.recv();
907         // For peer-initiated requests, our connection should always yield a vsock response packet,
908         // in order to establish the connection.
909         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
910         assert_eq!(ctx.pkt.src_cid(), LOCAL_CID);
911         assert_eq!(ctx.pkt.dst_cid(), PEER_CID);
912         assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
913         assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
914         assert_eq!(ctx.pkt.type_(), uapi::VSOCK_TYPE_STREAM);
915         assert_eq!(ctx.pkt.len(), 0);
916         // After yielding the response packet, the connection should have transitioned to the
917         // established state.
918         assert_eq!(ctx.conn.state, ConnState::Established);
919     }
920 
921     #[test]
922     fn test_local_request() {
923         let mut ctx = CsmTestContext::new(ConnState::LocalInit);
924         // Host-initiated connections should first yield a connection request packet.
925         assert!(ctx.conn.has_pending_rx());
926         // Before yielding the connection request packet, the timeout kill timer shouldn't be
927         // armed.
928         assert!(!ctx.conn.will_expire());
929         ctx.recv();
930         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_REQUEST);
931         // Since the request might time-out, the kill timer should now be armed.
932         assert!(ctx.conn.will_expire());
933         assert!(!ctx.conn.has_expired());
934         ctx.init_pkt(uapi::VSOCK_OP_RESPONSE, 0);
935         ctx.send();
936         // Upon receiving a connection response, the connection should have transitioned to the
937         // established state, and the kill timer should've been disarmed.
938         assert_eq!(ctx.conn.state, ConnState::Established);
939         assert!(!ctx.conn.will_expire());
940     }
941 
942     #[test]
943     fn test_local_request_timeout() {
944         let mut ctx = CsmTestContext::new(ConnState::LocalInit);
945         ctx.recv();
946         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_REQUEST);
947         assert!(ctx.conn.will_expire());
948         assert!(!ctx.conn.has_expired());
949         std::thread::sleep(std::time::Duration::from_millis(
950             defs::CONN_REQUEST_TIMEOUT_MS,
951         ));
952         assert!(ctx.conn.has_expired());
953     }
954 
955     #[test]
956     fn test_rx_data() {
957         let mut ctx = CsmTestContext::new_established();
958         let data = &[1, 2, 3, 4];
959         ctx.set_stream(TestStream::new_with_read_buf(data));
960         assert_eq!(ctx.conn.get_polled_fd(), ctx.conn.stream.as_raw_fd());
961         ctx.notify_epollin();
962         ctx.recv();
963         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW);
964         assert_eq!(ctx.pkt.len() as usize, data.len());
965         assert_eq!(ctx.pkt.buf().unwrap()[..ctx.pkt.len() as usize], *data);
966 
967         // There's no more data in the stream, so `recv_pkt` should yield `VsockError::NoData`.
968         match ctx.conn.recv_pkt(&mut ctx.pkt) {
969             Err(VsockError::NoData) => (),
970             other => panic!("{other:?}"),
971         }
972 
973         // A recv attempt in an invalid state should yield an instant reset packet.
974         ctx.conn.state = ConnState::LocalClosed;
975         ctx.notify_epollin();
976         ctx.recv();
977         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
978     }
979 
980     #[test]
981     fn test_local_close() {
982         let mut ctx = CsmTestContext::new_established();
983         let mut stream = TestStream::new();
984         stream.read_state = StreamState::Closed;
985         ctx.set_stream(stream);
986         ctx.notify_epollin();
987         ctx.recv();
988         // When the host-side stream is closed, we can neither send not receive any more data.
989         // Therefore, the vsock shutdown packet that we'll deliver to the guest must contain both
990         // the no-more-send and the no-more-recv indications.
991         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN);
992         assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND, 0);
993         assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV, 0);
994 
995         // The kill timer should now be armed.
996         assert!(ctx.conn.will_expire());
997         assert!(
998             ctx.conn.expiry().unwrap()
999                 < Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS)
1000         );
1001     }
1002 
1003     #[test]
1004     fn test_peer_close() {
1005         // Test that send/recv shutdown indications are handled correctly.
1006         // I.e. once set, an indication cannot be reset.
1007         {
1008             let mut ctx = CsmTestContext::new_established();
1009 
1010             ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0)
1011                 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_RCV);
1012             ctx.send();
1013             assert_eq!(ctx.conn.state, ConnState::PeerClosed(true, false));
1014 
1015             // Attempting to reset the no-more-recv indication should not work
1016             // (we are only setting the no-more-send indication here).
1017             ctx.pkt.set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_SEND);
1018             ctx.send();
1019             assert_eq!(ctx.conn.state, ConnState::PeerClosed(true, true));
1020         }
1021 
1022         // Test case:
1023         // - reading data from a no-more-send connection should work; and
1024         // - writing data should have no effect.
1025         {
1026             let data = &[1, 2, 3, 4];
1027             let mut ctx = CsmTestContext::new_established();
1028             ctx.set_stream(TestStream::new_with_read_buf(data));
1029             ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0)
1030                 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_SEND);
1031             ctx.send();
1032             ctx.notify_epollin();
1033             ctx.recv();
1034             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW);
1035             assert_eq!(&ctx.pkt.buf().unwrap()[..ctx.pkt.len() as usize], data);
1036 
1037             ctx.init_data_pkt(data);
1038             ctx.send();
1039             assert_eq!(ctx.conn.stream.write_buf.len(), 0);
1040             assert!(ctx.conn.tx_buf.is_empty());
1041         }
1042 
1043         // Test case:
1044         // - writing data to a no-more-recv connection should work; and
1045         // - attempting to read data from it should yield an RST packet.
1046         {
1047             let mut ctx = CsmTestContext::new_established();
1048             ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0)
1049                 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_RCV);
1050             ctx.send();
1051             let data = &[1, 2, 3, 4];
1052             ctx.init_data_pkt(data);
1053             ctx.send();
1054             assert_eq!(ctx.conn.stream.write_buf, data.to_vec());
1055 
1056             ctx.notify_epollin();
1057             ctx.recv();
1058             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1059         }
1060 
1061         // Test case: setting both no-more-send and no-more-recv indications should have the
1062         // connection confirm termination (i.e. yield an RST).
1063         {
1064             let mut ctx = CsmTestContext::new_established();
1065             ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0)
1066                 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_RCV | uapi::VSOCK_FLAGS_SHUTDOWN_SEND);
1067             ctx.send();
1068             assert!(ctx.conn.has_pending_rx());
1069             ctx.recv();
1070             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1071         }
1072     }
1073 
1074     #[test]
1075     fn test_local_read_error() {
1076         let mut ctx = CsmTestContext::new_established();
1077         let mut stream = TestStream::new();
1078         stream.read_state = StreamState::Error(ErrorKind::PermissionDenied);
1079         ctx.set_stream(stream);
1080         ctx.notify_epollin();
1081         ctx.recv();
1082         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1083     }
1084 
1085     #[test]
1086     fn test_credit_request_to_peer() {
1087         let mut ctx = CsmTestContext::new_established();
1088         ctx.set_peer_credit(0);
1089         ctx.notify_epollin();
1090         ctx.recv();
1091         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_CREDIT_REQUEST);
1092     }
1093 
1094     #[test]
1095     fn test_credit_request_from_peer() {
1096         let mut ctx = CsmTestContext::new_established();
1097         ctx.init_pkt(uapi::VSOCK_OP_CREDIT_REQUEST, 0);
1098         ctx.send();
1099         assert!(ctx.conn.has_pending_rx());
1100         ctx.recv();
1101         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_CREDIT_UPDATE);
1102         assert_eq!(ctx.pkt.buf_alloc(), csm_defs::CONN_TX_BUF_SIZE);
1103         assert_eq!(ctx.pkt.fwd_cnt(), ctx.conn.fwd_cnt.0);
1104     }
1105 
1106     #[test]
1107     fn test_credit_update_to_peer() {
1108         let mut ctx = CsmTestContext::new_established();
1109 
1110         // Force a stale state, where the peer hasn't been updated on our credit situation.
1111         ctx.conn.last_fwd_cnt_to_peer = Wrapping(0);
1112 
1113         // Since a credit update token is sent when the fwd_cnt value exceeds
1114         // CONN_TX_BUF_SIZE - CONN_CREDIT_UPDATE_THRESHOLD, we initialize
1115         // fwd_cnt at 6 bytes below the threshold.
1116         let initial_fwd_cnt =
1117             csm_defs::CONN_TX_BUF_SIZE - csm_defs::CONN_CREDIT_UPDATE_THRESHOLD - 6;
1118         ctx.conn.fwd_cnt = Wrapping(initial_fwd_cnt);
1119 
1120         // Use a 4-byte packet for triggering the credit update threshold.
1121         let data = &[1, 2, 3, 4];
1122 
1123         // Check that there is no pending RX.
1124         ctx.init_data_pkt(data);
1125         ctx.send();
1126         assert!(!ctx.conn.has_pending_rx());
1127 
1128         // Send a packet again.
1129         ctx.init_data_pkt(data);
1130         ctx.send();
1131 
1132         // The CSM should now have a credit update available for the peer.
1133         assert!(ctx.conn.has_pending_rx());
1134         ctx.recv();
1135         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_CREDIT_UPDATE);
1136         assert_eq!(ctx.pkt.fwd_cnt(), initial_fwd_cnt + data.len() as u32 * 2);
1137         assert_eq!(ctx.conn.fwd_cnt, ctx.conn.last_fwd_cnt_to_peer);
1138     }
1139 
1140     #[test]
1141     fn test_tx_buffering() {
1142         // Test case:
1143         // - when writing to the backing stream would block, TX data should end up in the TX buf
1144         // - when the CSM is notified that it can write to the backing stream, it should flush
1145         //   the TX buf.
1146         {
1147             let mut ctx = CsmTestContext::new_established();
1148 
1149             let mut stream = TestStream::new();
1150             stream.write_state = StreamState::WouldBlock;
1151             ctx.set_stream(stream);
1152 
1153             // Send some data through the connection. The backing stream is set to reject writes,
1154             // so the data should end up in the TX buffer.
1155             let data = &[1, 2, 3, 4];
1156             ctx.init_data_pkt(data);
1157             ctx.send();
1158 
1159             // When there's data in the TX buffer, the connection should ask to be notified when it
1160             // can write to its backing stream.
1161             assert!(ctx
1162                 .conn
1163                 .get_polled_evset()
1164                 .contains(epoll::Events::EPOLLOUT));
1165             assert_eq!(ctx.conn.tx_buf.len(), data.len());
1166 
1167             // Unlock the write stream and notify the connection it can now write its buffered
1168             // data.
1169             ctx.set_stream(TestStream::new());
1170             ctx.conn.notify(epoll::Events::EPOLLOUT);
1171             assert!(ctx.conn.tx_buf.is_empty());
1172             assert_eq!(ctx.conn.stream.write_buf, data);
1173         }
1174     }
1175 
1176     #[test]
1177     fn test_stream_write_error() {
1178         // Test case: sending a data packet to a broken / closed backing stream should kill it.
1179         {
1180             let mut ctx = CsmTestContext::new_established();
1181             let mut stream = TestStream::new();
1182             stream.write_state = StreamState::Closed;
1183             ctx.set_stream(stream);
1184 
1185             let data = &[1, 2, 3, 4];
1186             ctx.init_data_pkt(data);
1187             ctx.send();
1188 
1189             assert_eq!(ctx.conn.state, ConnState::Killed);
1190             assert!(ctx.conn.has_pending_rx());
1191             ctx.recv();
1192             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1193         }
1194 
1195         // Test case: notifying a connection that it can flush its TX buffer to a broken stream
1196         // should kill the connection.
1197         {
1198             let mut ctx = CsmTestContext::new_established();
1199 
1200             let mut stream = TestStream::new();
1201             stream.write_state = StreamState::WouldBlock;
1202             ctx.set_stream(stream);
1203 
1204             // Send some data through the connection. The backing stream is set to reject writes,
1205             // so the data should end up in the TX buffer.
1206             let data = &[1, 2, 3, 4];
1207             ctx.init_data_pkt(data);
1208             ctx.send();
1209 
1210             // Set the backing stream to error out on write.
1211             let mut stream = TestStream::new();
1212             stream.write_state = StreamState::Closed;
1213             ctx.set_stream(stream);
1214 
1215             assert!(ctx
1216                 .conn
1217                 .get_polled_evset()
1218                 .contains(epoll::Events::EPOLLOUT));
1219             ctx.notify_epollout();
1220             assert_eq!(ctx.conn.state, ConnState::Killed);
1221         }
1222     }
1223 
1224     #[test]
1225     fn test_peer_credit_misbehavior() {
1226         let mut ctx = CsmTestContext::new_established();
1227 
1228         let mut stream = TestStream::new();
1229         stream.write_state = StreamState::WouldBlock;
1230         ctx.set_stream(stream);
1231 
1232         // Fill up the TX buffer.
1233         let data = vec![0u8; ctx.pkt.buf().unwrap().len()];
1234         ctx.init_data_pkt(data.as_slice());
1235         for _i in 0..(csm_defs::CONN_TX_BUF_SIZE / data.len() as u32) {
1236             ctx.send();
1237         }
1238 
1239         // Then try to send more data.
1240         ctx.send();
1241 
1242         // The connection should've committed suicide.
1243         assert_eq!(ctx.conn.state, ConnState::Killed);
1244         assert!(ctx.conn.has_pending_rx());
1245         ctx.recv();
1246         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1247     }
1248 }
1249