xref: /cloud-hypervisor/virtio-devices/src/vsock/csm/connection.rs (revision 3ce0fef7fd546467398c914dbc74d8542e45cf6f)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 //! The main job of `VsockConnection` is to forward data traffic, back and forth, between a
5 //! guest-side AF_VSOCK socket and a host-side generic `Read + Write + AsRawFd` stream, while
6 //! also managing its internal state.
7 //! To that end, `VsockConnection` implements:
8 //! - `VsockChannel` for:
9 //!   - moving data from the host stream to a guest-provided RX buffer, via `recv_pkt()`; and
10 //!   - moving data from a guest-provided TX buffer to the host stream, via `send_pkt()`; and
11 //!   - updating its internal state, by absorbing control packets (anything other than
12 //!     VSOCK_OP_RW).
13 //! - `VsockEpollListener` for getting notified about the availability of data or free buffer
14 //!   space at the host stream.
15 //!
16 //! Note: there is a certain asymmetry to the RX and TX data flows:
17 //! - RX transfers do not need any data buffering, since data is read straight from the
18 //!   host stream and into the guest-provided RX buffer;
19 //! - TX transfers may require some data to be buffered by `VsockConnection`, if the host
20 //!   peer can't keep up with reading the data that we're writing. This is because, once
21 //!   the guest driver provides some data in a virtio TX buffer, the vsock device must
22 //!   consume it.  If that data can't be forwarded straight to the host stream, we'll
23 //!   have to store it in a buffer (and flush it at a later time). Vsock flow control
24 //!   ensures that our TX buffer doesn't overflow.
25 //
26 // The code in this file is best read with a fresh memory of the vsock protocol inner-workings.
27 // To help with that, here is a
28 //
29 // Short primer on the vsock protocol
30 // ----------------------------------
31 //
32 // 1. Establishing a connection
33 //    A vsock connection is considered established after a two-way handshake:
34 //    - the initiating peer sends a connection request packet (`hdr.op` == VSOCK_OP_REQUEST);
35 //      then
36 //    - the listening peer sends back a connection response packet (`hdr.op` ==
37 //      VSOCK_OP_RESPONSE).
38 //
39 // 2. Terminating a connection
40 //    When a peer wants to shut down an established connection, it sends a VSOCK_OP_SHUTDOWN
41 //    packet. Two header flags are used with VSOCK_OP_SHUTDOWN, indicating the sender's
42 //    intention:
43 //    - VSOCK_FLAGS_SHUTDOWN_RCV: the sender will receive no more data for this connection; and
44 //    - VSOCK_FLAGS_SHUTDOWN_SEND: the sender will send no more data for this connection.
45 //    After a shutdown packet, the receiving peer will have some protocol-undefined time to
46 //    flush its buffers, and then forcefully terminate the connection by sending back an RST
47 //    packet. If the shutdown-initiating peer doesn't receive this RST packet during a timeout
48 //    period, it will send one itself, thus terminating the connection.
49 //    Note: a peer can send more than one VSOCK_OP_SHUTDOWN packets. However, read/write
50 //          indications cannot be undone. E.g. once a "no-more-sending" promise was made, it
51 //          cannot be taken back.  That is, `hdr.flags` will be ORed between subsequent
52 //          VSOCK_OP_SHUTDOWN packets.
53 //
54 // 3. Flow control
55 //    Before sending a data packet (VSOCK_OP_RW), the sender must make sure that the receiver
56 //    has enough free buffer space to store that data. If this condition is not respected, the
57 //    receiving peer's behavior is undefined. In this implementation, we forcefully terminate
58 //    the connection by sending back a VSOCK_OP_RST packet.
59 //    Note: all buffer space information is computed and stored on a per-connection basis.
60 //    Peers keep each other informed about the free buffer space they have by filling in two
61 //    packet header members with each packet they send:
62 //    - `hdr.buf_alloc`: the total buffer space the peer has allocated for receiving data; and
63 //    - `hdr.fwd_cnt`: the total number of bytes the peer has successfully flushed out of its
64 //       buffer.
65 //    One can figure out how much space its peer has available in its buffer by inspecting the
66 //    difference between how much it has sent to the peer and how much the peer has flushed out
67 //    (i.e.  "forwarded", in the vsock spec terminology):
68 //    `peer_free = peer_buf_alloc - (total_bytes_sent_to_peer - peer_fwd_cnt)`.
69 //    Note: the above requires that peers constantly keep each other informed on their buffer
70 //          space situation. However, since there are no receipt acknowledgement packets
71 //          defined for the vsock protocol, packet flow can often be unidirectional (just one
72 //          peer sending data to another), so the sender's information about the receiver's
73 //          buffer space can get quickly outdated. The vsock protocol defines two solutions to
74 //          this problem:
75 //          1. The sender can explicitly ask for a buffer space (i.e. "credit") update from its
76 //             peer, via a VSOCK_OP_CREDIT_REQUEST packet, to which it will get a
77 //             VSOCK_OP_CREDIT_UPDATE response (or any response will do, really, since credit
78 //             information must be included in any packet);
79 //          2. The receiver can be proactive, and send VSOCK_OP_CREDIT_UPDATE packet, whenever
80 //             it thinks its peer's information is out of date.
81 //          Our implementation uses the proactive approach.
82 //
83 use std::io::{ErrorKind, Read, Write};
84 use std::num::Wrapping;
85 use std::os::unix::io::{AsRawFd, RawFd};
86 use std::time::{Duration, Instant};
87 
88 use super::super::defs::uapi;
89 use super::super::packet::VsockPacket;
90 use super::super::{Result as VsockResult, VsockChannel, VsockEpollListener, VsockError};
91 use super::defs;
92 use super::txbuf::TxBuf;
93 use super::{ConnState, Error, PendingRx, PendingRxSet, Result};
94 
95 /// A self-managing connection object, that handles communication between a guest-side AF_VSOCK
96 /// socket and a host-side `Read + Write + AsRawFd` stream.
97 ///
98 pub struct VsockConnection<S: Read + Write + AsRawFd> {
99     /// The current connection state.
100     state: ConnState,
101     /// The local CID. Most of the time this will be the constant `2` (the vsock host CID).
102     local_cid: u64,
103     /// The peer (guest) CID.
104     peer_cid: u64,
105     /// The local (host) port.
106     local_port: u32,
107     /// The peer (guest) port.
108     peer_port: u32,
109     /// The (connected) host-side stream.
110     stream: S,
111     /// The TX buffer for this connection.
112     tx_buf: TxBuf,
113     /// Total number of bytes that have been successfully written to `self.stream`, either
114     /// directly, or flushed from `self.tx_buf`.
115     fwd_cnt: Wrapping<u32>,
116     /// The amount of buffer space that the peer (guest) has allocated for this connection.
117     peer_buf_alloc: u32,
118     /// The total number of bytes that the peer has forwarded away.
119     peer_fwd_cnt: Wrapping<u32>,
120     /// The total number of bytes sent to the peer (guest vsock driver)
121     rx_cnt: Wrapping<u32>,
122     /// Our `self.fwd_cnt`, as last sent to the peer. This is used to provide proactive credit
123     /// updates, and let the peer know it's OK to send more data.
124     last_fwd_cnt_to_peer: Wrapping<u32>,
125     /// The set of pending RX packet indications that `recv_pkt()` will use to fill in a
126     /// packet for the peer (guest).
127     pending_rx: PendingRxSet,
128     /// Instant when this connection should be scheduled for immediate termination, due to some
129     /// timeout condition having been fulfilled.
130     expiry: Option<Instant>,
131 }
132 
133 impl<S> VsockChannel for VsockConnection<S>
134 where
135     S: Read + Write + AsRawFd,
136 {
137     /// Fill in a vsock packet, to be delivered to our peer (the guest driver).
138     ///
139     /// As per the `VsockChannel` trait, this should only be called when there is data to be
140     /// fetched from the channel (i.e. `has_pending_rx()` is true). Otherwise, it will error
141     /// out with `VsockError::NoData`.
142     /// Pending RX indications are set by other mutable actions performed on the channel. For
143     /// instance, `send_pkt()` could set an Rst indication, if called with a VSOCK_OP_SHUTDOWN
144     /// packet, or `notify()` could set a Rw indication (a data packet can be fetched from the
145     /// channel), if data was ready to be read from the host stream.
146     ///
147     /// Returns:
148     /// - `Ok(())`: the packet has been successfully filled in and is ready for delivery;
149     /// - `Err(VsockError::NoData)`: there was no data available with which to fill in the
150     ///    packet;
151     /// - `Err(VsockError::PktBufMissing)`: the packet would've been filled in with data, but
152     ///    it is missing the data buffer.
153     ///
154     fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> {
155         // Perform some generic initialization that is the same for any packet operation (e.g.
156         // source, destination, credit, etc).
157         self.init_pkt(pkt);
158 
159         // If forceful termination is pending, there's no point in checking for anything else.
160         // It's dead, Jim.
161         if self.pending_rx.remove(PendingRx::Rst) {
162             pkt.set_op(uapi::VSOCK_OP_RST);
163             return Ok(());
164         }
165 
166         // Next up: if we're due a connection confirmation, that's all we need to know to fill
167         // in this packet.
168         if self.pending_rx.remove(PendingRx::Response) {
169             self.state = ConnState::Established;
170             pkt.set_op(uapi::VSOCK_OP_RESPONSE);
171             return Ok(());
172         }
173 
174         // Same thing goes for locally-initiated connections that need to yield a connection
175         // request.
176         if self.pending_rx.remove(PendingRx::Request) {
177             self.expiry =
178                 Some(Instant::now() + Duration::from_millis(defs::CONN_REQUEST_TIMEOUT_MS));
179             pkt.set_op(uapi::VSOCK_OP_REQUEST);
180             return Ok(());
181         }
182 
183         if self.pending_rx.remove(PendingRx::Rw) {
184             // We're due to produce a data packet, by reading the data from the host-side
185             // Unix socket.
186 
187             match self.state {
188                 // A data packet is only valid for established connections, and connections for
189                 // which our peer has initiated a graceful shutdown, but can still receive data.
190                 ConnState::Established | ConnState::PeerClosed(false, _) => (),
191                 _ => {
192                     // Any other connection state is invalid at this point, and we need to kill it
193                     // with fire.
194                     pkt.set_op(uapi::VSOCK_OP_RST);
195                     return Ok(());
196                 }
197             }
198 
199             // Oh wait, before we start bringing in the big data, can our peer handle receiving so
200             // much bytes goodness?
201             if self.need_credit_update_from_peer() {
202                 self.last_fwd_cnt_to_peer = self.fwd_cnt;
203                 pkt.set_op(uapi::VSOCK_OP_CREDIT_REQUEST);
204                 return Ok(());
205             }
206 
207             let buf = pkt.buf_mut().ok_or(VsockError::PktBufMissing)?;
208 
209             // The maximum amount of data we can read in is limited by both the RX buffer size and
210             // the peer available buffer space.
211             let max_len = std::cmp::min(buf.len(), self.peer_avail_credit());
212 
213             // Read data from the stream straight to the RX buffer, for maximum throughput.
214             match self.stream.read(&mut buf[..max_len]) {
215                 Ok(read_cnt) => {
216                     if read_cnt == 0 {
217                         // A 0-length read means the host stream was closed down. In that case,
218                         // we'll ask our peer to shut down the connection. We can neither send nor
219                         // receive any more data.
220                         self.state = ConnState::LocalClosed;
221                         self.expiry = Some(
222                             Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS),
223                         );
224                         pkt.set_op(uapi::VSOCK_OP_SHUTDOWN)
225                             .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV)
226                             .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND);
227                     } else {
228                         // On a successful data read, we fill in the packet with the RW op, and
229                         // length of the read data.
230                         pkt.set_op(uapi::VSOCK_OP_RW).set_len(read_cnt as u32);
231                     }
232                     self.rx_cnt += Wrapping(pkt.len());
233                     self.last_fwd_cnt_to_peer = self.fwd_cnt;
234                     return Ok(());
235                 }
236                 Err(err) if err.kind() == ErrorKind::WouldBlock => {
237                     // This shouldn't actually happen (receiving EWOULDBLOCK after EPOLLIN), but
238                     // apparently it does, so we need to handle it gracefully.
239                     warn!(
240                         "vsock: unexpected EWOULDBLOCK while reading from backing stream: \
241                          lp={}, pp={}, err={:?}",
242                         self.local_port, self.peer_port, err
243                     );
244                 }
245                 Err(err) => {
246                     // We are not expecting any other errors when reading from the underlying
247                     // stream. If any show up, we'll immediately kill this connection.
248                     error!(
249                         "vsock: error reading from backing stream: lp={}, pp={}, err={:?}",
250                         self.local_port, self.peer_port, err
251                     );
252                     pkt.set_op(uapi::VSOCK_OP_RST);
253                     self.last_fwd_cnt_to_peer = self.fwd_cnt;
254                     return Ok(());
255                 }
256             };
257         }
258 
259         // A credit update is basically a no-op, so we should only waste a perfectly fine RX
260         // buffer on it if we really have nothing else to say, hence we check for this RX
261         // indication last.
262         if self.pending_rx.remove(PendingRx::CreditUpdate) && !self.has_pending_rx() {
263             pkt.set_op(uapi::VSOCK_OP_CREDIT_UPDATE);
264             self.last_fwd_cnt_to_peer = self.fwd_cnt;
265             return Ok(());
266         }
267 
268         // We've already checked for all conditions that would have produced a packet, so
269         // if we got to here, we don't know how to yield one.
270         Err(VsockError::NoData)
271     }
272 
273     /// Deliver a guest-generated packet to this connection.
274     ///
275     /// This forwards the data in RW packets to the host stream, and absorbs control packets,
276     /// using them to manage the internal connection state.
277     ///
278     /// Returns:
279     /// always `Ok(())`: the packet has been consumed;
280     ///
281     fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> {
282         // Update the peer credit information.
283         self.peer_buf_alloc = pkt.buf_alloc();
284         self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt());
285 
286         match self.state {
287             // Most frequent case: this is an established connection that needs to forward some
288             // data to the host stream. Also works for a connection that has begun shutting
289             // down, but the peer still has some data to send.
290             ConnState::Established | ConnState::PeerClosed(_, false)
291                 if pkt.op() == uapi::VSOCK_OP_RW =>
292             {
293                 if pkt.buf().is_none() {
294                     info!(
295                         "vsock: dropping empty data packet from guest (lp={}, pp={}",
296                         self.local_port, self.peer_port
297                     );
298                     return Ok(());
299                 }
300 
301                 // Unwrapping here is safe, since we just checked `pkt.buf()` above.
302                 let buf_slice = &pkt.buf().unwrap()[..(pkt.len() as usize)];
303                 if let Err(err) = self.send_bytes(buf_slice) {
304                     // If we can't write to the host stream, that's an unrecoverable error, so
305                     // we'll terminate this connection.
306                     warn!(
307                         "vsock: error writing to local stream (lp={}, pp={}): {:?}",
308                         self.local_port, self.peer_port, err
309                     );
310                     self.kill();
311                     return Ok(());
312                 }
313 
314                 // We might've just consumed some data. If that's the case, we might need to
315                 // update the peer on our buffer space situation, so that it can keep sending
316                 // data packets our way.
317                 if self.peer_needs_credit_update() {
318                     self.pending_rx.insert(PendingRx::CreditUpdate);
319                 }
320             }
321 
322             // Next up: receiving a response / confirmation for a host-initiated connection.
323             // We'll move to an Established state, and pass on the good news through the host
324             // stream.
325             ConnState::LocalInit if pkt.op() == uapi::VSOCK_OP_RESPONSE => {
326                 self.expiry = None;
327                 self.state = ConnState::Established;
328             }
329 
330             // The peer wants to shut down an established connection.  If they have nothing
331             // more to send nor receive, and we don't have to wait to drain our TX buffer, we
332             // can schedule an RST packet (to terminate the connection on the next recv call).
333             // Otherwise, we'll arm the kill timer.
334             ConnState::Established if pkt.op() == uapi::VSOCK_OP_SHUTDOWN => {
335                 let recv_off = pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV != 0;
336                 let send_off = pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND != 0;
337                 self.state = ConnState::PeerClosed(recv_off, send_off);
338                 if recv_off && send_off {
339                     if self.tx_buf.is_empty() {
340                         self.pending_rx.insert(PendingRx::Rst);
341                     } else {
342                         self.expiry = Some(
343                             Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS),
344                         );
345                     }
346                 }
347             }
348 
349             // The peer wants to update a shutdown request, with more receive/send indications.
350             // The same logic as above applies.
351             ConnState::PeerClosed(ref mut recv_off, ref mut send_off)
352                 if pkt.op() == uapi::VSOCK_OP_SHUTDOWN =>
353             {
354                 *recv_off = *recv_off || (pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV != 0);
355                 *send_off = *send_off || (pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND != 0);
356                 if *recv_off && *send_off && self.tx_buf.is_empty() {
357                     self.pending_rx.insert(PendingRx::Rst);
358                 }
359             }
360 
361             // A credit update from our peer is valid only in a state which allows data
362             // transfer towards the peer.
363             ConnState::Established | ConnState::PeerInit | ConnState::PeerClosed(false, _)
364                 if pkt.op() == uapi::VSOCK_OP_CREDIT_UPDATE =>
365             {
366                 // Nothing to do here; we've already updated peer credit.
367             }
368 
369             // A credit request from our peer is valid only in a state which allows data
370             // transfer from the peer. We'll respond with a credit update packet.
371             ConnState::Established | ConnState::PeerInit | ConnState::PeerClosed(_, false)
372                 if pkt.op() == uapi::VSOCK_OP_CREDIT_REQUEST =>
373             {
374                 self.pending_rx.insert(PendingRx::CreditUpdate);
375             }
376 
377             _ => {
378                 debug!(
379                     "vsock: dropping invalid TX pkt for connection: state={:?}, pkt.hdr={:?}",
380                     self.state,
381                     pkt.hdr()
382                 );
383             }
384         };
385 
386         Ok(())
387     }
388 
389     /// Check if the connection has any pending packet addressed to the peer.
390     ///
391     fn has_pending_rx(&self) -> bool {
392         !self.pending_rx.is_empty()
393     }
394 }
395 
396 impl<S> VsockEpollListener for VsockConnection<S>
397 where
398     S: Read + Write + AsRawFd,
399 {
400     /// Get the file descriptor that this connection wants polled.
401     ///
402     /// The connection is interested in being notified about EPOLLIN / EPOLLOUT events on the
403     /// host stream.
404     ///
405     fn get_polled_fd(&self) -> RawFd {
406         self.stream.as_raw_fd()
407     }
408 
409     /// Get the event set that this connection is interested in.
410     ///
411     /// A connection will want to be notified when:
412     /// - data is available to be read from the host stream, so that it can store an RW pending
413     ///   RX indication; and
414     /// - data can be written to the host stream, and the TX buffer needs to be flushed.
415     ///
416     fn get_polled_evset(&self) -> epoll::Events {
417         let mut evset = epoll::Events::empty();
418         if !self.tx_buf.is_empty() {
419             // There's data waiting in the TX buffer, so we are interested in being notified
420             // when writing to the host stream wouldn't block.
421             evset.insert(epoll::Events::EPOLLOUT);
422         }
423         // We're generally interested in being notified when data can be read from the host
424         // stream, unless we're in a state which doesn't allow moving data from host to guest.
425         match self.state {
426             ConnState::Killed | ConnState::LocalClosed | ConnState::PeerClosed(true, _) => (),
427             _ if self.need_credit_update_from_peer() => (),
428             _ => evset.insert(epoll::Events::EPOLLIN),
429         }
430         evset
431     }
432 
433     /// Notify the connection about an event (or set of events) that it was interested in.
434     ///
435     fn notify(&mut self, evset: epoll::Events) {
436         if evset.contains(epoll::Events::EPOLLIN) {
437             // Data can be read from the host stream. Setting a Rw pending indication, so that
438             // the muxer will know to call `recv_pkt()` later.
439             self.pending_rx.insert(PendingRx::Rw);
440         }
441 
442         if evset.contains(epoll::Events::EPOLLOUT) {
443             // Data can be written to the host stream. Time to flush out the TX buffer.
444             //
445             if self.tx_buf.is_empty() {
446                 info!("vsock: connection received unexpected EPOLLOUT event");
447                 return;
448             }
449             let flushed = self
450                 .tx_buf
451                 .flush_to(&mut self.stream)
452                 .unwrap_or_else(|err| {
453                     warn!(
454                         "vsock: error flushing TX buf for (lp={}, pp={}): {:?}",
455                         self.local_port, self.peer_port, err
456                     );
457                     match err {
458                         Error::TxBufFlush(inner) if inner.kind() == ErrorKind::WouldBlock => {
459                             // This should never happen (EWOULDBLOCK after EPOLLOUT), but
460                             // it does, so let's absorb it.
461                         }
462                         _ => self.kill(),
463                     };
464                     0
465                 });
466             self.fwd_cnt += Wrapping(flushed as u32);
467 
468             // If this connection was shutting down, but is waiting to drain the TX buffer
469             // before forceful termination, the wait might be over.
470             if self.state == ConnState::PeerClosed(true, true) && self.tx_buf.is_empty() {
471                 self.pending_rx.insert(PendingRx::Rst);
472             } else if self.peer_needs_credit_update() {
473                 // If we've freed up some more buffer space, we may need to let the peer know it
474                 // can safely send more data our way.
475                 self.pending_rx.insert(PendingRx::CreditUpdate);
476             }
477         }
478     }
479 }
480 
481 impl<S> VsockConnection<S>
482 where
483     S: Read + Write + AsRawFd,
484 {
485     /// Create a new guest-initiated connection object.
486     ///
487     pub fn new_peer_init(
488         stream: S,
489         local_cid: u64,
490         peer_cid: u64,
491         local_port: u32,
492         peer_port: u32,
493         peer_buf_alloc: u32,
494     ) -> Self {
495         Self {
496             local_cid,
497             peer_cid,
498             local_port,
499             peer_port,
500             stream,
501             state: ConnState::PeerInit,
502             tx_buf: TxBuf::new(),
503             fwd_cnt: Wrapping(0),
504             peer_buf_alloc,
505             peer_fwd_cnt: Wrapping(0),
506             rx_cnt: Wrapping(0),
507             last_fwd_cnt_to_peer: Wrapping(0),
508             pending_rx: PendingRxSet::from(PendingRx::Response),
509             expiry: None,
510         }
511     }
512 
513     /// Create a new host-initiated connection object.
514     ///
515     pub fn new_local_init(
516         stream: S,
517         local_cid: u64,
518         peer_cid: u64,
519         local_port: u32,
520         peer_port: u32,
521     ) -> Self {
522         Self {
523             local_cid,
524             peer_cid,
525             local_port,
526             peer_port,
527             stream,
528             state: ConnState::LocalInit,
529             tx_buf: TxBuf::new(),
530             fwd_cnt: Wrapping(0),
531             peer_buf_alloc: 0,
532             peer_fwd_cnt: Wrapping(0),
533             rx_cnt: Wrapping(0),
534             last_fwd_cnt_to_peer: Wrapping(0),
535             pending_rx: PendingRxSet::from(PendingRx::Request),
536             expiry: None,
537         }
538     }
539 
540     /// Check if there is an expiry (kill) timer set for this connection, sometime in the
541     /// future.
542     ///
543     pub fn will_expire(&self) -> bool {
544         match self.expiry {
545             None => false,
546             Some(t) => t > Instant::now(),
547         }
548     }
549 
550     /// Check if this connection needs to be scheduled for forceful termination, due to its
551     /// kill timer having expired.
552     ///
553     pub fn has_expired(&self) -> bool {
554         match self.expiry {
555             None => false,
556             Some(t) => t <= Instant::now(),
557         }
558     }
559 
560     /// Get the kill timer value, if one is set.
561     ///
562     pub fn expiry(&self) -> Option<Instant> {
563         self.expiry
564     }
565 
566     /// Schedule the connection to be forcefully terminated ASAP (i.e. the next time the
567     /// connection is asked to yield a packet, via `recv_pkt()`).
568     ///
569     pub fn kill(&mut self) {
570         self.state = ConnState::Killed;
571         self.pending_rx.insert(PendingRx::Rst);
572     }
573 
574     /// Return the connections state.
575     ///
576     pub fn state(&self) -> ConnState {
577         self.state
578     }
579 
580     /// Send some raw, untracked, data straight to the underlying connected stream.
581     /// Returns: number of bytes written, or the error describing the write failure.
582     ///
583     /// Warning: this will bypass the connection state machine and write directly to the
584     /// underlying stream. No account of this write is kept, which includes bypassing
585     /// vsock flow control.
586     ///
587     pub fn send_bytes_raw(&mut self, buf: &[u8]) -> Result<usize> {
588         self.stream.write(buf).map_err(Error::StreamWrite)
589     }
590 
591     /// Send some raw data (a byte-slice) to the host stream.
592     ///
593     /// Raw data can either be sent straight to the host stream, or to our TX buffer, if the
594     /// former fails.
595     ///
596     fn send_bytes(&mut self, buf: &[u8]) -> Result<()> {
597         // If there is data in the TX buffer, that means we're already registered for EPOLLOUT
598         // events on the underlying stream. Therefore, there's no point in attempting a write
599         // at this point. `self.notify()` will get called when EPOLLOUT arrives, and it will
600         // attempt to drain the TX buffer then.
601         if !self.tx_buf.is_empty() {
602             return self.tx_buf.push(buf);
603         }
604 
605         // The TX buffer is empty, so we can try to write straight to the host stream.
606         let written = match self.stream.write(buf) {
607             Ok(cnt) => cnt,
608             Err(e) => {
609                 // Absorb any would-block errors, since we can always try again later.
610                 if e.kind() == ErrorKind::WouldBlock {
611                     0
612                 } else {
613                     // We don't know how to handle any other write error, so we'll send it up
614                     // the call chain.
615                     return Err(Error::StreamWrite(e));
616                 }
617             }
618         };
619         // Move the "forwarded bytes" counter ahead by how much we were able to send out.
620         self.fwd_cnt += Wrapping(written as u32);
621 
622         // If we couldn't write the whole slice, we'll need to push the remaining data to our
623         // buffer.
624         if written < buf.len() {
625             self.tx_buf.push(&buf[written..])?;
626         }
627 
628         Ok(())
629     }
630 
631     /// Check if the credit information the peer has last received from us is outdated.
632     ///
633     fn peer_needs_credit_update(&self) -> bool {
634         let peer_seen_free_buf =
635             Wrapping(defs::CONN_TX_BUF_SIZE) - (self.fwd_cnt - self.last_fwd_cnt_to_peer);
636         peer_seen_free_buf < Wrapping(defs::CONN_CREDIT_UPDATE_THRESHOLD)
637     }
638 
639     /// Check if we need to ask the peer for a credit update before sending any more data its
640     /// way.
641     ///
642     fn need_credit_update_from_peer(&self) -> bool {
643         self.peer_avail_credit() == 0
644     }
645 
646     /// Get the maximum number of bytes that we can send to our peer, without overflowing its
647     /// buffer.
648     ///
649     fn peer_avail_credit(&self) -> usize {
650         (Wrapping(self.peer_buf_alloc) - (self.rx_cnt - self.peer_fwd_cnt)).0 as usize
651     }
652 
653     /// Prepare a packet header for transmission to our peer.
654     ///
655     fn init_pkt<'a>(&self, pkt: &'a mut VsockPacket) -> &'a mut VsockPacket {
656         // Make sure the header is zeroed-out first.
657         // This looks sub-optimal, but it is actually optimized-out in the compiled code to be
658         // faster than a memset().
659         for b in pkt.hdr_mut() {
660             *b = 0;
661         }
662 
663         pkt.set_src_cid(self.local_cid)
664             .set_dst_cid(self.peer_cid)
665             .set_src_port(self.local_port)
666             .set_dst_port(self.peer_port)
667             .set_type(uapi::VSOCK_TYPE_STREAM)
668             .set_buf_alloc(defs::CONN_TX_BUF_SIZE)
669             .set_fwd_cnt(self.fwd_cnt.0)
670     }
671 }
672 
673 #[cfg(test)]
674 mod tests {
675     use libc::EFD_NONBLOCK;
676     use virtio_queue::QueueOwnedT;
677 
678     use std::io::{Error as IoError, ErrorKind, Read, Result as IoResult, Write};
679     use std::os::unix::io::RawFd;
680     use std::time::{Duration, Instant};
681     use vmm_sys_util::eventfd::EventFd;
682 
683     use super::super::super::defs::uapi;
684     use super::super::super::tests::TestContext;
685     use super::super::defs as csm_defs;
686     use super::*;
687 
688     const LOCAL_CID: u64 = 2;
689     const PEER_CID: u64 = 3;
690     const LOCAL_PORT: u32 = 1002;
691     const PEER_PORT: u32 = 1003;
692     const PEER_BUF_ALLOC: u32 = 64 * 1024;
693 
694     enum StreamState {
695         Closed,
696         Error(ErrorKind),
697         Ready,
698         WouldBlock,
699     }
700 
701     struct TestStream {
702         fd: EventFd,
703         read_buf: Vec<u8>,
704         read_state: StreamState,
705         write_buf: Vec<u8>,
706         write_state: StreamState,
707     }
708     impl TestStream {
709         fn new() -> Self {
710             Self {
711                 fd: EventFd::new(EFD_NONBLOCK).unwrap(),
712                 read_state: StreamState::Ready,
713                 write_state: StreamState::Ready,
714                 read_buf: Vec::new(),
715                 write_buf: Vec::new(),
716             }
717         }
718         fn new_with_read_buf(buf: &[u8]) -> Self {
719             let mut stream = Self::new();
720             stream.read_buf = buf.to_vec();
721             stream
722         }
723     }
724 
725     impl AsRawFd for TestStream {
726         fn as_raw_fd(&self) -> RawFd {
727             self.fd.as_raw_fd()
728         }
729     }
730 
731     impl Read for TestStream {
732         fn read(&mut self, data: &mut [u8]) -> IoResult<usize> {
733             match self.read_state {
734                 StreamState::Closed => Ok(0),
735                 StreamState::Error(kind) => Err(IoError::new(kind, "whatevs")),
736                 StreamState::Ready => {
737                     if self.read_buf.is_empty() {
738                         return Err(IoError::new(ErrorKind::WouldBlock, "EAGAIN"));
739                     }
740                     let len = std::cmp::min(data.len(), self.read_buf.len());
741                     assert_ne!(len, 0);
742                     data[..len].copy_from_slice(&self.read_buf[..len]);
743                     self.read_buf = self.read_buf.split_off(len);
744                     Ok(len)
745                 }
746                 StreamState::WouldBlock => Err(IoError::new(ErrorKind::WouldBlock, "EAGAIN")),
747             }
748         }
749     }
750 
751     impl Write for TestStream {
752         fn write(&mut self, data: &[u8]) -> IoResult<usize> {
753             match self.write_state {
754                 StreamState::Closed => Err(IoError::new(ErrorKind::BrokenPipe, "EPIPE")),
755                 StreamState::Error(kind) => Err(IoError::new(kind, "whatevs")),
756                 StreamState::Ready => {
757                     self.write_buf.extend_from_slice(data);
758                     Ok(data.len())
759                 }
760                 StreamState::WouldBlock => Err(IoError::new(ErrorKind::WouldBlock, "EAGAIN")),
761             }
762         }
763         fn flush(&mut self) -> IoResult<()> {
764             Ok(())
765         }
766     }
767 
768     impl<S> VsockConnection<S>
769     where
770         S: Read + Write + AsRawFd,
771     {
772         /// Get the fwd_cnt value from the connection.
773         pub(crate) fn fwd_cnt(&self) -> Wrapping<u32> {
774             self.fwd_cnt
775         }
776 
777         /// Forcefully insert a credit update flag.
778         pub(crate) fn insert_credit_update(&mut self) {
779             self.pending_rx.insert(PendingRx::CreditUpdate);
780         }
781     }
782 
783     fn init_pkt(pkt: &mut VsockPacket, op: u16, len: u32) -> &mut VsockPacket {
784         for b in pkt.hdr_mut() {
785             *b = 0;
786         }
787         pkt.set_src_cid(PEER_CID)
788             .set_dst_cid(LOCAL_CID)
789             .set_src_port(PEER_PORT)
790             .set_dst_port(LOCAL_PORT)
791             .set_type(uapi::VSOCK_TYPE_STREAM)
792             .set_buf_alloc(PEER_BUF_ALLOC)
793             .set_op(op)
794             .set_len(len)
795     }
796 
797     // This is the connection state machine test context: a helper struct to provide CSM testing
798     // primitives. A single `VsockPacket` object will be enough for our testing needs. We'll be
799     // using it for simulating both packet sends and packet receives. We need to keep the vsock
800     // testing context alive, since `VsockPacket` is just a pointer-wrapper over some data that
801     // resides in guest memory. The vsock test context owns the `GuestMemory` object, so we'll make
802     // it a member here, in order to make sure that guest memory outlives our testing packet.  A
803     // single `VsockConnection` object will also suffice for our testing needs. We'll be using a
804     // specially crafted `Read + Write + AsRawFd` object as a backing stream, so that we can
805     // control the various error conditions that might arise.
806     struct CsmTestContext {
807         _vsock_test_ctx: TestContext,
808         pkt: VsockPacket,
809         conn: VsockConnection<TestStream>,
810     }
811 
812     impl CsmTestContext {
813         fn new_established() -> Self {
814             Self::new(ConnState::Established)
815         }
816 
817         fn new(conn_state: ConnState) -> Self {
818             let vsock_test_ctx = TestContext::new();
819             let mut handler_ctx = vsock_test_ctx.create_epoll_handler_context();
820             let stream = TestStream::new();
821             let mut pkt = VsockPacket::from_rx_virtq_head(
822                 &mut handler_ctx.handler.queues[0]
823                     .iter(&vsock_test_ctx.mem)
824                     .unwrap()
825                     .next()
826                     .unwrap(),
827                 None,
828             )
829             .unwrap();
830             let conn = match conn_state {
831                 ConnState::PeerInit => VsockConnection::<TestStream>::new_peer_init(
832                     stream,
833                     LOCAL_CID,
834                     PEER_CID,
835                     LOCAL_PORT,
836                     PEER_PORT,
837                     PEER_BUF_ALLOC,
838                 ),
839                 ConnState::LocalInit => VsockConnection::<TestStream>::new_local_init(
840                     stream, LOCAL_CID, PEER_CID, LOCAL_PORT, PEER_PORT,
841                 ),
842                 ConnState::Established => {
843                     let mut conn = VsockConnection::<TestStream>::new_peer_init(
844                         stream,
845                         LOCAL_CID,
846                         PEER_CID,
847                         LOCAL_PORT,
848                         PEER_PORT,
849                         PEER_BUF_ALLOC,
850                     );
851                     assert!(conn.has_pending_rx());
852                     conn.recv_pkt(&mut pkt).unwrap();
853                     assert_eq!(pkt.op(), uapi::VSOCK_OP_RESPONSE);
854                     conn
855                 }
856                 other => panic!("invalid ctx state: {other:?}"),
857             };
858             assert_eq!(conn.state, conn_state);
859             Self {
860                 _vsock_test_ctx: vsock_test_ctx,
861                 pkt,
862                 conn,
863             }
864         }
865 
866         fn set_stream(&mut self, stream: TestStream) {
867             self.conn.stream = stream;
868         }
869 
870         fn set_peer_credit(&mut self, credit: u32) {
871             assert!(credit < self.conn.peer_buf_alloc);
872             self.conn.peer_fwd_cnt = Wrapping(0);
873             self.conn.rx_cnt = Wrapping(self.conn.peer_buf_alloc - credit);
874             assert_eq!(self.conn.peer_avail_credit(), credit as usize);
875         }
876 
877         fn send(&mut self) {
878             self.conn.send_pkt(&self.pkt).unwrap();
879         }
880 
881         fn recv(&mut self) {
882             self.conn.recv_pkt(&mut self.pkt).unwrap();
883         }
884 
885         fn notify_epollin(&mut self) {
886             self.conn.notify(epoll::Events::EPOLLIN);
887             assert!(self.conn.has_pending_rx());
888         }
889 
890         fn notify_epollout(&mut self) {
891             self.conn.notify(epoll::Events::EPOLLOUT);
892         }
893 
894         fn init_pkt(&mut self, op: u16, len: u32) -> &mut VsockPacket {
895             init_pkt(&mut self.pkt, op, len)
896         }
897 
898         fn init_data_pkt(&mut self, data: &[u8]) -> &VsockPacket {
899             assert!(data.len() <= self.pkt.buf().unwrap().len());
900             self.init_pkt(uapi::VSOCK_OP_RW, data.len() as u32);
901             self.pkt.buf_mut().unwrap()[..data.len()].copy_from_slice(data);
902             &self.pkt
903         }
904     }
905 
906     #[test]
907     fn test_peer_request() {
908         let mut ctx = CsmTestContext::new(ConnState::PeerInit);
909         assert!(ctx.conn.has_pending_rx());
910         ctx.recv();
911         // For peer-initiated requests, our connection should always yield a vsock response packet,
912         // in order to establish the connection.
913         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
914         assert_eq!(ctx.pkt.src_cid(), LOCAL_CID);
915         assert_eq!(ctx.pkt.dst_cid(), PEER_CID);
916         assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
917         assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
918         assert_eq!(ctx.pkt.type_(), uapi::VSOCK_TYPE_STREAM);
919         assert_eq!(ctx.pkt.len(), 0);
920         // After yielding the response packet, the connection should have transitioned to the
921         // established state.
922         assert_eq!(ctx.conn.state, ConnState::Established);
923     }
924 
925     #[test]
926     fn test_local_request() {
927         let mut ctx = CsmTestContext::new(ConnState::LocalInit);
928         // Host-initiated connections should first yield a connection request packet.
929         assert!(ctx.conn.has_pending_rx());
930         // Before yielding the connection request packet, the timeout kill timer shouldn't be
931         // armed.
932         assert!(!ctx.conn.will_expire());
933         ctx.recv();
934         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_REQUEST);
935         // Since the request might time-out, the kill timer should now be armed.
936         assert!(ctx.conn.will_expire());
937         assert!(!ctx.conn.has_expired());
938         ctx.init_pkt(uapi::VSOCK_OP_RESPONSE, 0);
939         ctx.send();
940         // Upon receiving a connection response, the connection should have transitioned to the
941         // established state, and the kill timer should've been disarmed.
942         assert_eq!(ctx.conn.state, ConnState::Established);
943         assert!(!ctx.conn.will_expire());
944     }
945 
946     #[test]
947     fn test_local_request_timeout() {
948         let mut ctx = CsmTestContext::new(ConnState::LocalInit);
949         ctx.recv();
950         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_REQUEST);
951         assert!(ctx.conn.will_expire());
952         assert!(!ctx.conn.has_expired());
953         std::thread::sleep(std::time::Duration::from_millis(
954             defs::CONN_REQUEST_TIMEOUT_MS,
955         ));
956         assert!(ctx.conn.has_expired());
957     }
958 
959     #[test]
960     fn test_rx_data() {
961         let mut ctx = CsmTestContext::new_established();
962         let data = &[1, 2, 3, 4];
963         ctx.set_stream(TestStream::new_with_read_buf(data));
964         assert_eq!(ctx.conn.get_polled_fd(), ctx.conn.stream.as_raw_fd());
965         ctx.notify_epollin();
966         ctx.recv();
967         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW);
968         assert_eq!(ctx.pkt.len() as usize, data.len());
969         assert_eq!(ctx.pkt.buf().unwrap()[..ctx.pkt.len() as usize], *data);
970 
971         // There's no more data in the stream, so `recv_pkt` should yield `VsockError::NoData`.
972         match ctx.conn.recv_pkt(&mut ctx.pkt) {
973             Err(VsockError::NoData) => (),
974             other => panic!("{other:?}"),
975         }
976 
977         // A recv attempt in an invalid state should yield an instant reset packet.
978         ctx.conn.state = ConnState::LocalClosed;
979         ctx.notify_epollin();
980         ctx.recv();
981         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
982     }
983 
984     #[test]
985     fn test_local_close() {
986         let mut ctx = CsmTestContext::new_established();
987         let mut stream = TestStream::new();
988         stream.read_state = StreamState::Closed;
989         ctx.set_stream(stream);
990         ctx.notify_epollin();
991         ctx.recv();
992         // When the host-side stream is closed, we can neither send not receive any more data.
993         // Therefore, the vsock shutdown packet that we'll deliver to the guest must contain both
994         // the no-more-send and the no-more-recv indications.
995         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN);
996         assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND, 0);
997         assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV, 0);
998 
999         // The kill timer should now be armed.
1000         assert!(ctx.conn.will_expire());
1001         assert!(
1002             ctx.conn.expiry().unwrap()
1003                 < Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS)
1004         );
1005     }
1006 
1007     #[test]
1008     fn test_peer_close() {
1009         // Test that send/recv shutdown indications are handled correctly.
1010         // I.e. once set, an indication cannot be reset.
1011         {
1012             let mut ctx = CsmTestContext::new_established();
1013 
1014             ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0)
1015                 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_RCV);
1016             ctx.send();
1017             assert_eq!(ctx.conn.state, ConnState::PeerClosed(true, false));
1018 
1019             // Attempting to reset the no-more-recv indication should not work
1020             // (we are only setting the no-more-send indication here).
1021             ctx.pkt.set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_SEND);
1022             ctx.send();
1023             assert_eq!(ctx.conn.state, ConnState::PeerClosed(true, true));
1024         }
1025 
1026         // Test case:
1027         // - reading data from a no-more-send connection should work; and
1028         // - writing data should have no effect.
1029         {
1030             let data = &[1, 2, 3, 4];
1031             let mut ctx = CsmTestContext::new_established();
1032             ctx.set_stream(TestStream::new_with_read_buf(data));
1033             ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0)
1034                 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_SEND);
1035             ctx.send();
1036             ctx.notify_epollin();
1037             ctx.recv();
1038             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW);
1039             assert_eq!(&ctx.pkt.buf().unwrap()[..ctx.pkt.len() as usize], data);
1040 
1041             ctx.init_data_pkt(data);
1042             ctx.send();
1043             assert_eq!(ctx.conn.stream.write_buf.len(), 0);
1044             assert!(ctx.conn.tx_buf.is_empty());
1045         }
1046 
1047         // Test case:
1048         // - writing data to a no-more-recv connection should work; and
1049         // - attempting to read data from it should yield an RST packet.
1050         {
1051             let mut ctx = CsmTestContext::new_established();
1052             ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0)
1053                 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_RCV);
1054             ctx.send();
1055             let data = &[1, 2, 3, 4];
1056             ctx.init_data_pkt(data);
1057             ctx.send();
1058             assert_eq!(ctx.conn.stream.write_buf, data.to_vec());
1059 
1060             ctx.notify_epollin();
1061             ctx.recv();
1062             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1063         }
1064 
1065         // Test case: setting both no-more-send and no-more-recv indications should have the
1066         // connection confirm termination (i.e. yield an RST).
1067         {
1068             let mut ctx = CsmTestContext::new_established();
1069             ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0)
1070                 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_RCV | uapi::VSOCK_FLAGS_SHUTDOWN_SEND);
1071             ctx.send();
1072             assert!(ctx.conn.has_pending_rx());
1073             ctx.recv();
1074             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1075         }
1076     }
1077 
1078     #[test]
1079     fn test_local_read_error() {
1080         let mut ctx = CsmTestContext::new_established();
1081         let mut stream = TestStream::new();
1082         stream.read_state = StreamState::Error(ErrorKind::PermissionDenied);
1083         ctx.set_stream(stream);
1084         ctx.notify_epollin();
1085         ctx.recv();
1086         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1087     }
1088 
1089     #[test]
1090     fn test_credit_request_to_peer() {
1091         let mut ctx = CsmTestContext::new_established();
1092         ctx.set_peer_credit(0);
1093         ctx.notify_epollin();
1094         ctx.recv();
1095         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_CREDIT_REQUEST);
1096     }
1097 
1098     #[test]
1099     fn test_credit_request_from_peer() {
1100         let mut ctx = CsmTestContext::new_established();
1101         ctx.init_pkt(uapi::VSOCK_OP_CREDIT_REQUEST, 0);
1102         ctx.send();
1103         assert!(ctx.conn.has_pending_rx());
1104         ctx.recv();
1105         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_CREDIT_UPDATE);
1106         assert_eq!(ctx.pkt.buf_alloc(), csm_defs::CONN_TX_BUF_SIZE);
1107         assert_eq!(ctx.pkt.fwd_cnt(), ctx.conn.fwd_cnt.0);
1108     }
1109 
1110     #[test]
1111     fn test_credit_update_to_peer() {
1112         let mut ctx = CsmTestContext::new_established();
1113 
1114         // Force a stale state, where the peer hasn't been updated on our credit situation.
1115         ctx.conn.last_fwd_cnt_to_peer = Wrapping(0);
1116 
1117         // Since a credit update token is sent when the fwd_cnt value exceeds
1118         // CONN_TX_BUF_SIZE - CONN_CREDIT_UPDATE_THRESHOLD, we initialize
1119         // fwd_cnt at 6 bytes below the threshold.
1120         let initial_fwd_cnt =
1121             csm_defs::CONN_TX_BUF_SIZE - csm_defs::CONN_CREDIT_UPDATE_THRESHOLD - 6;
1122         ctx.conn.fwd_cnt = Wrapping(initial_fwd_cnt);
1123 
1124         // Use a 4-byte packet for triggering the credit update threshold.
1125         let data = &[1, 2, 3, 4];
1126 
1127         // Check that there is no pending RX.
1128         ctx.init_data_pkt(data);
1129         ctx.send();
1130         assert!(!ctx.conn.has_pending_rx());
1131 
1132         // Send a packet again.
1133         ctx.init_data_pkt(data);
1134         ctx.send();
1135 
1136         // The CSM should now have a credit update available for the peer.
1137         assert!(ctx.conn.has_pending_rx());
1138         ctx.recv();
1139         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_CREDIT_UPDATE);
1140         assert_eq!(ctx.pkt.fwd_cnt(), initial_fwd_cnt + data.len() as u32 * 2);
1141         assert_eq!(ctx.conn.fwd_cnt, ctx.conn.last_fwd_cnt_to_peer);
1142     }
1143 
1144     #[test]
1145     fn test_tx_buffering() {
1146         // Test case:
1147         // - when writing to the backing stream would block, TX data should end up in the TX buf
1148         // - when the CSM is notified that it can write to the backing stream, it should flush
1149         //   the TX buf.
1150         {
1151             let mut ctx = CsmTestContext::new_established();
1152 
1153             let mut stream = TestStream::new();
1154             stream.write_state = StreamState::WouldBlock;
1155             ctx.set_stream(stream);
1156 
1157             // Send some data through the connection. The backing stream is set to reject writes,
1158             // so the data should end up in the TX buffer.
1159             let data = &[1, 2, 3, 4];
1160             ctx.init_data_pkt(data);
1161             ctx.send();
1162 
1163             // When there's data in the TX buffer, the connection should ask to be notified when it
1164             // can write to its backing stream.
1165             assert!(ctx
1166                 .conn
1167                 .get_polled_evset()
1168                 .contains(epoll::Events::EPOLLOUT));
1169             assert_eq!(ctx.conn.tx_buf.len(), data.len());
1170 
1171             // Unlock the write stream and notify the connection it can now write its buffered
1172             // data.
1173             ctx.set_stream(TestStream::new());
1174             ctx.conn.notify(epoll::Events::EPOLLOUT);
1175             assert!(ctx.conn.tx_buf.is_empty());
1176             assert_eq!(ctx.conn.stream.write_buf, data);
1177         }
1178     }
1179 
1180     #[test]
1181     fn test_stream_write_error() {
1182         // Test case: sending a data packet to a broken / closed backing stream should kill it.
1183         {
1184             let mut ctx = CsmTestContext::new_established();
1185             let mut stream = TestStream::new();
1186             stream.write_state = StreamState::Closed;
1187             ctx.set_stream(stream);
1188 
1189             let data = &[1, 2, 3, 4];
1190             ctx.init_data_pkt(data);
1191             ctx.send();
1192 
1193             assert_eq!(ctx.conn.state, ConnState::Killed);
1194             assert!(ctx.conn.has_pending_rx());
1195             ctx.recv();
1196             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1197         }
1198 
1199         // Test case: notifying a connection that it can flush its TX buffer to a broken stream
1200         // should kill the connection.
1201         {
1202             let mut ctx = CsmTestContext::new_established();
1203 
1204             let mut stream = TestStream::new();
1205             stream.write_state = StreamState::WouldBlock;
1206             ctx.set_stream(stream);
1207 
1208             // Send some data through the connection. The backing stream is set to reject writes,
1209             // so the data should end up in the TX buffer.
1210             let data = &[1, 2, 3, 4];
1211             ctx.init_data_pkt(data);
1212             ctx.send();
1213 
1214             // Set the backing stream to error out on write.
1215             let mut stream = TestStream::new();
1216             stream.write_state = StreamState::Closed;
1217             ctx.set_stream(stream);
1218 
1219             assert!(ctx
1220                 .conn
1221                 .get_polled_evset()
1222                 .contains(epoll::Events::EPOLLOUT));
1223             ctx.notify_epollout();
1224             assert_eq!(ctx.conn.state, ConnState::Killed);
1225         }
1226     }
1227 
1228     #[test]
1229     fn test_peer_credit_misbehavior() {
1230         let mut ctx = CsmTestContext::new_established();
1231 
1232         let mut stream = TestStream::new();
1233         stream.write_state = StreamState::WouldBlock;
1234         ctx.set_stream(stream);
1235 
1236         // Fill up the TX buffer.
1237         let data = vec![0u8; ctx.pkt.buf().unwrap().len()];
1238         ctx.init_data_pkt(data.as_slice());
1239         for _i in 0..(csm_defs::CONN_TX_BUF_SIZE / data.len() as u32) {
1240             ctx.send();
1241         }
1242 
1243         // Then try to send more data.
1244         ctx.send();
1245 
1246         // The connection should've committed suicide.
1247         assert_eq!(ctx.conn.state, ConnState::Killed);
1248         assert!(ctx.conn.has_pending_rx());
1249         ctx.recv();
1250         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1251     }
1252 }
1253