xref: /cloud-hypervisor/virtio-devices/src/vsock/csm/connection.rs (revision 7d7bfb2034001d4cb15df2ddc56d2d350c8da30f)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 /// The main job of `VsockConnection` is to forward data traffic, back and forth, between a
5 /// guest-side AF_VSOCK socket and a host-side generic `Read + Write + AsRawFd` stream, while
6 /// also managing its internal state.
7 /// To that end, `VsockConnection` implements:
8 /// - `VsockChannel` for:
9 ///   - moving data from the host stream to a guest-provided RX buffer, via `recv_pkt()`; and
10 ///   - moving data from a guest-provided TX buffer to the host stream, via `send_pkt()`; and
11 ///   - updating its internal state, by absorbing control packets (anything other than
12 ///     VSOCK_OP_RW).
13 /// - `VsockEpollListener` for getting notified about the availability of data or free buffer
14 ///   space at the host stream.
15 ///
16 /// Note: there is a certain asymmetry to the RX and TX data flows:
17 ///       - RX transfers do not need any data buffering, since data is read straight from the
18 ///         host stream and into the guest-provided RX buffer;
19 ///       - TX transfers may require some data to be buffered by `VsockConnection`, if the host
20 ///         peer can't keep up with reading the data that we're writing. This is because, once
21 ///         the guest driver provides some data in a virtio TX buffer, the vsock device must
22 ///         consume it.  If that data can't be forwarded straight to the host stream, we'll
23 ///         have to store it in a buffer (and flush it at a later time). Vsock flow control
24 ///         ensures that our TX buffer doesn't overflow.
25 ///
26 // The code in this file is best read with a fresh memory of the vsock protocol inner-workings.
27 // To help with that, here is a
28 //
29 // Short primer on the vsock protocol
30 // ----------------------------------
31 //
32 // 1. Establishing a connection
33 //    A vsock connection is considered established after a two-way handshake:
34 //    - the initiating peer sends a connection request packet (`hdr.op` == VSOCK_OP_REQUEST);
35 //      then
36 //    - the listening peer sends back a connection response packet (`hdr.op` ==
37 //      VSOCK_OP_RESPONSE).
38 //
39 // 2. Terminating a connection
40 //    When a peer wants to shut down an established connection, it sends a VSOCK_OP_SHUTDOWN
41 //    packet. Two header flags are used with VSOCK_OP_SHUTDOWN, indicating the sender's
42 //    intention:
43 //    - VSOCK_FLAGS_SHUTDOWN_RCV: the sender will receive no more data for this connection; and
44 //    - VSOCK_FLAGS_SHUTDOWN_SEND: the sender will send no more data for this connection.
45 //    After a shutdown packet, the receiving peer will have some protocol-undefined time to
46 //    flush its buffers, and then forcefully terminate the connection by sending back an RST
47 //    packet. If the shutdown-initiating peer doesn't receive this RST packet during a timeout
48 //    period, it will send one itself, thus terminating the connection.
49 //    Note: a peer can send more than one VSOCK_OP_SHUTDOWN packets. However, read/write
50 //          indications cannot be undone. E.g. once a "no-more-sending" promise was made, it
51 //          cannot be taken back.  That is, `hdr.flags` will be ORed between subsequent
52 //          VSOCK_OP_SHUTDOWN packets.
53 //
54 // 3. Flow control
55 //    Before sending a data packet (VSOCK_OP_RW), the sender must make sure that the receiver
56 //    has enough free buffer space to store that data. If this condition is not respected, the
57 //    receiving peer's behavior is undefined. In this implementation, we forcefully terminate
58 //    the connection by sending back a VSOCK_OP_RST packet.
59 //    Note: all buffer space information is computed and stored on a per-connection basis.
60 //    Peers keep each other informed about the free buffer space they have by filling in two
61 //    packet header members with each packet they send:
62 //    - `hdr.buf_alloc`: the total buffer space the peer has allocated for receiving data; and
63 //    - `hdr.fwd_cnt`: the total number of bytes the peer has successfully flushed out of its
64 //       buffer.
65 //    One can figure out how much space its peer has available in its buffer by inspecting the
66 //    difference between how much it has sent to the peer and how much the peer has flushed out
67 //    (i.e.  "forwarded", in the vsock spec terminology):
68 //    `peer_free = peer_buf_alloc - (total_bytes_sent_to_peer - peer_fwd_cnt)`.
69 //    Note: the above requires that peers constantly keep each other informed on their buffer
70 //          space situation. However, since there are no receipt acknowledgement packets
71 //          defined for the vsock protocol, packet flow can often be unidirectional (just one
72 //          peer sending data to another), so the sender's information about the receiver's
73 //          buffer space can get quickly outdated. The vsock protocol defines two solutions to
74 //          this problem:
75 //          1. The sender can explicitly ask for a buffer space (i.e. "credit") update from its
76 //             peer, via a VSOCK_OP_CREDIT_REQUEST packet, to which it will get a
77 //             VSOCK_OP_CREDIT_UPDATE response (or any response will do, really, since credit
78 //             information must be included in any packet);
79 //          2. The receiver can be proactive, and send VSOCK_OP_CREDIT_UPDATE packet, whenever
80 //             it thinks its peer's information is out of date.
81 //          Our implementation uses the proactive approach.
82 //
83 use std::io::{ErrorKind, Read, Write};
84 use std::num::Wrapping;
85 use std::os::unix::io::{AsRawFd, RawFd};
86 use std::time::{Duration, Instant};
87 
88 use super::super::defs::uapi;
89 use super::super::packet::VsockPacket;
90 use super::super::{Result as VsockResult, VsockChannel, VsockEpollListener, VsockError};
91 use super::defs;
92 use super::txbuf::TxBuf;
93 use super::{ConnState, Error, PendingRx, PendingRxSet, Result};
94 
95 /// A self-managing connection object, that handles communication between a guest-side AF_VSOCK
96 /// socket and a host-side `Read + Write + AsRawFd` stream.
97 ///
98 pub struct VsockConnection<S: Read + Write + AsRawFd> {
99     /// The current connection state.
100     state: ConnState,
101     /// The local CID. Most of the time this will be the constant `2` (the vsock host CID).
102     local_cid: u64,
103     /// The peer (guest) CID.
104     peer_cid: u64,
105     /// The local (host) port.
106     local_port: u32,
107     /// The peer (guest) port.
108     peer_port: u32,
109     /// The (connected) host-side stream.
110     stream: S,
111     /// The TX buffer for this connection.
112     tx_buf: TxBuf,
113     /// Total number of bytes that have been successfully written to `self.stream`, either
114     /// directly, or flushed from `self.tx_buf`.
115     fwd_cnt: Wrapping<u32>,
116     /// The amount of buffer space that the peer (guest) has allocated for this connection.
117     peer_buf_alloc: u32,
118     /// The total number of bytes that the peer has forwarded away.
119     peer_fwd_cnt: Wrapping<u32>,
120     /// The total number of bytes sent to the peer (guest vsock driver)
121     rx_cnt: Wrapping<u32>,
122     /// Our `self.fwd_cnt`, as last sent to the peer. This is used to provide proactive credit
123     /// updates, and let the peer know it's OK to send more data.
124     last_fwd_cnt_to_peer: Wrapping<u32>,
125     /// The set of pending RX packet indications that `recv_pkt()` will use to fill in a
126     /// packet for the peer (guest).
127     pending_rx: PendingRxSet,
128     /// Instant when this connection should be scheduled for immediate termination, due to some
129     /// timeout condition having been fulfilled.
130     expiry: Option<Instant>,
131 }
132 
133 impl<S> VsockChannel for VsockConnection<S>
134 where
135     S: Read + Write + AsRawFd,
136 {
137     /// Fill in a vsock packet, to be delivered to our peer (the guest driver).
138     ///
139     /// As per the `VsockChannel` trait, this should only be called when there is data to be
140     /// fetched from the channel (i.e. `has_pending_rx()` is true). Otherwise, it will error
141     /// out with `VsockError::NoData`.
142     /// Pending RX indications are set by other mutable actions performed on the channel. For
143     /// instance, `send_pkt()` could set an Rst indication, if called with a VSOCK_OP_SHUTDOWN
144     /// packet, or `notify()` could set a Rw indication (a data packet can be fetched from the
145     /// channel), if data was ready to be read from the host stream.
146     ///
147     /// Returns:
148     /// - `Ok(())`: the packet has been successfully filled in and is ready for delivery;
149     /// - `Err(VsockError::NoData)`: there was no data available with which to fill in the
150     ///    packet;
151     /// - `Err(VsockError::PktBufMissing)`: the packet would've been filled in with data, but
152     ///    it is missing the data buffer.
153     ///
154     fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> {
155         // Perform some generic initialization that is the same for any packet operation (e.g.
156         // source, destination, credit, etc).
157         self.init_pkt(pkt);
158 
159         // If forceful termination is pending, there's no point in checking for anything else.
160         // It's dead, Jim.
161         if self.pending_rx.remove(PendingRx::Rst) {
162             pkt.set_op(uapi::VSOCK_OP_RST);
163             return Ok(());
164         }
165 
166         // Next up: if we're due a connection confirmation, that's all we need to know to fill
167         // in this packet.
168         if self.pending_rx.remove(PendingRx::Response) {
169             self.state = ConnState::Established;
170             pkt.set_op(uapi::VSOCK_OP_RESPONSE);
171             return Ok(());
172         }
173 
174         // Same thing goes for locally-initiated connections that need to yield a connection
175         // request.
176         if self.pending_rx.remove(PendingRx::Request) {
177             self.expiry =
178                 Some(Instant::now() + Duration::from_millis(defs::CONN_REQUEST_TIMEOUT_MS));
179             pkt.set_op(uapi::VSOCK_OP_REQUEST);
180             return Ok(());
181         }
182 
183         if self.pending_rx.remove(PendingRx::Rw) {
184             // We're due to produce a data packet, by reading the data from the host-side
185             // Unix socket.
186 
187             match self.state {
188                 // A data packet is only valid for established connections, and connections for
189                 // which our peer has initiated a graceful shutdown, but can still receive data.
190                 ConnState::Established | ConnState::PeerClosed(false, _) => (),
191                 _ => {
192                     // Any other connection state is invalid at this point, and we need to kill it
193                     // with fire.
194                     pkt.set_op(uapi::VSOCK_OP_RST);
195                     return Ok(());
196                 }
197             }
198 
199             // Oh wait, before we start bringing in the big data, can our peer handle receiving so
200             // much bytes goodness?
201             if self.need_credit_update_from_peer() {
202                 self.last_fwd_cnt_to_peer = self.fwd_cnt;
203                 pkt.set_op(uapi::VSOCK_OP_CREDIT_REQUEST);
204                 return Ok(());
205             }
206 
207             let buf = pkt.buf_mut().ok_or(VsockError::PktBufMissing)?;
208 
209             // The maximum amount of data we can read in is limited by both the RX buffer size and
210             // the peer available buffer space.
211             let max_len = std::cmp::min(buf.len(), self.peer_avail_credit());
212 
213             // Read data from the stream straight to the RX buffer, for maximum throughput.
214             match self.stream.read(&mut buf[..max_len]) {
215                 Ok(read_cnt) => {
216                     if read_cnt == 0 {
217                         // A 0-length read means the host stream was closed down. In that case,
218                         // we'll ask our peer to shut down the connection. We can neither send nor
219                         // receive any more data.
220                         self.state = ConnState::LocalClosed;
221                         self.expiry = Some(
222                             Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS),
223                         );
224                         pkt.set_op(uapi::VSOCK_OP_SHUTDOWN)
225                             .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV)
226                             .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND);
227                     } else {
228                         // On a successful data read, we fill in the packet with the RW op, and
229                         // length of the read data.
230                         pkt.set_op(uapi::VSOCK_OP_RW).set_len(read_cnt as u32);
231                     }
232                     self.rx_cnt += Wrapping(pkt.len());
233                     self.last_fwd_cnt_to_peer = self.fwd_cnt;
234                     return Ok(());
235                 }
236                 Err(err) if err.kind() == ErrorKind::WouldBlock => {
237                     // This shouldn't actually happen (receiving EWOULDBLOCK after EPOLLIN), but
238                     // apparently it does, so we need to handle it gracefully.
239                     warn!(
240                         "vsock: unexpected EWOULDBLOCK while reading from backing stream: \
241                          lp={}, pp={}, err={:?}",
242                         self.local_port, self.peer_port, err
243                     );
244                 }
245                 Err(err) => {
246                     // We are not expecting any other errors when reading from the underlying
247                     // stream. If any show up, we'll immediately kill this connection.
248                     error!(
249                         "vsock: error reading from backing stream: lp={}, pp={}, err={:?}",
250                         self.local_port, self.peer_port, err
251                     );
252                     pkt.set_op(uapi::VSOCK_OP_RST);
253                     self.last_fwd_cnt_to_peer = self.fwd_cnt;
254                     return Ok(());
255                 }
256             };
257         }
258 
259         // A credit update is basically a no-op, so we should only waste a perfectly fine RX
260         // buffer on it if we really have nothing else to say, hence we check for this RX
261         // indication last.
262         if self.pending_rx.remove(PendingRx::CreditUpdate) && !self.has_pending_rx() {
263             pkt.set_op(uapi::VSOCK_OP_CREDIT_UPDATE);
264             self.last_fwd_cnt_to_peer = self.fwd_cnt;
265             return Ok(());
266         }
267 
268         // We've already checked for all conditions that would have produced a packet, so
269         // if we got to here, we don't know how to yield one.
270         Err(VsockError::NoData)
271     }
272 
273     /// Deliver a guest-generated packet to this connection.
274     ///
275     /// This forwards the data in RW packets to the host stream, and absorbs control packets,
276     /// using them to manage the internal connection state.
277     ///
278     /// Returns:
279     /// always `Ok(())`: the packet has been consumed;
280     ///
281     fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> {
282         // Update the peer credit information.
283         self.peer_buf_alloc = pkt.buf_alloc();
284         self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt());
285 
286         match self.state {
287             // Most frequent case: this is an established connection that needs to forward some
288             // data to the host stream. Also works for a connection that has begun shutting
289             // down, but the peer still has some data to send.
290             ConnState::Established | ConnState::PeerClosed(_, false)
291                 if pkt.op() == uapi::VSOCK_OP_RW =>
292             {
293                 if pkt.buf().is_none() {
294                     info!(
295                         "vsock: dropping empty data packet from guest (lp={}, pp={}",
296                         self.local_port, self.peer_port
297                     );
298                     return Ok(());
299                 }
300 
301                 // Unwrapping here is safe, since we just checked `pkt.buf()` above.
302                 let buf_slice = &pkt.buf().unwrap()[..(pkt.len() as usize)];
303                 if let Err(err) = self.send_bytes(buf_slice) {
304                     // If we can't write to the host stream, that's an unrecoverable error, so
305                     // we'll terminate this connection.
306                     warn!(
307                         "vsock: error writing to local stream (lp={}, pp={}): {:?}",
308                         self.local_port, self.peer_port, err
309                     );
310                     self.kill();
311                     return Ok(());
312                 }
313 
314                 // We might've just consumed some data. If that's the case, we might need to
315                 // update the peer on our buffer space situation, so that it can keep sending
316                 // data packets our way.
317                 if self.peer_needs_credit_update() {
318                     self.pending_rx.insert(PendingRx::CreditUpdate);
319                 }
320             }
321 
322             // Next up: receiving a response / confirmation for a host-initiated connection.
323             // We'll move to an Established state, and pass on the good news through the host
324             // stream.
325             ConnState::LocalInit if pkt.op() == uapi::VSOCK_OP_RESPONSE => {
326                 self.expiry = None;
327                 self.state = ConnState::Established;
328             }
329 
330             // The peer wants to shut down an established connection.  If they have nothing
331             // more to send nor receive, and we don't have to wait to drain our TX buffer, we
332             // can schedule an RST packet (to terminate the connection on the next recv call).
333             // Otherwise, we'll arm the kill timer.
334             ConnState::Established if pkt.op() == uapi::VSOCK_OP_SHUTDOWN => {
335                 let recv_off = pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV != 0;
336                 let send_off = pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND != 0;
337                 self.state = ConnState::PeerClosed(recv_off, send_off);
338                 if recv_off && send_off {
339                     if self.tx_buf.is_empty() {
340                         self.pending_rx.insert(PendingRx::Rst);
341                     } else {
342                         self.expiry = Some(
343                             Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS),
344                         );
345                     }
346                 }
347             }
348 
349             // The peer wants to update a shutdown request, with more receive/send indications.
350             // The same logic as above applies.
351             ConnState::PeerClosed(ref mut recv_off, ref mut send_off)
352                 if pkt.op() == uapi::VSOCK_OP_SHUTDOWN =>
353             {
354                 *recv_off = *recv_off || (pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV != 0);
355                 *send_off = *send_off || (pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND != 0);
356                 if *recv_off && *send_off && self.tx_buf.is_empty() {
357                     self.pending_rx.insert(PendingRx::Rst);
358                 }
359             }
360 
361             // A credit update from our peer is valid only in a state which allows data
362             // transfer towards the peer.
363             ConnState::Established | ConnState::PeerInit | ConnState::PeerClosed(false, _)
364                 if pkt.op() == uapi::VSOCK_OP_CREDIT_UPDATE =>
365             {
366                 // Nothing to do here; we've already updated peer credit.
367             }
368 
369             // A credit request from our peer is valid only in a state which allows data
370             // transfer from the peer. We'll respond with a credit update packet.
371             ConnState::Established | ConnState::PeerInit | ConnState::PeerClosed(_, false)
372                 if pkt.op() == uapi::VSOCK_OP_CREDIT_REQUEST =>
373             {
374                 self.pending_rx.insert(PendingRx::CreditUpdate);
375             }
376 
377             _ => {
378                 debug!(
379                     "vsock: dropping invalid TX pkt for connection: state={:?}, pkt.hdr={:?}",
380                     self.state,
381                     pkt.hdr()
382                 );
383             }
384         };
385 
386         Ok(())
387     }
388 
389     /// Check if the connection has any pending packet addressed to the peer.
390     ///
391     fn has_pending_rx(&self) -> bool {
392         !self.pending_rx.is_empty()
393     }
394 }
395 
396 impl<S> VsockEpollListener for VsockConnection<S>
397 where
398     S: Read + Write + AsRawFd,
399 {
400     /// Get the file descriptor that this connection wants polled.
401     ///
402     /// The connection is interested in being notified about EPOLLIN / EPOLLOUT events on the
403     /// host stream.
404     ///
405     fn get_polled_fd(&self) -> RawFd {
406         self.stream.as_raw_fd()
407     }
408 
409     /// Get the event set that this connection is interested in.
410     ///
411     /// A connection will want to be notified when:
412     /// - data is available to be read from the host stream, so that it can store an RW pending
413     ///   RX indication; and
414     /// - data can be written to the host stream, and the TX buffer needs to be flushed.
415     ///
416     fn get_polled_evset(&self) -> epoll::Events {
417         let mut evset = epoll::Events::empty();
418         if !self.tx_buf.is_empty() {
419             // There's data waiting in the TX buffer, so we are interested in being notified
420             // when writing to the host stream wouldn't block.
421             evset.insert(epoll::Events::EPOLLOUT);
422         }
423         // We're generally interested in being notified when data can be read from the host
424         // stream, unless we're in a state which doesn't allow moving data from host to guest.
425         match self.state {
426             ConnState::Killed | ConnState::LocalClosed | ConnState::PeerClosed(true, _) => (),
427             _ if self.need_credit_update_from_peer() => (),
428             _ => evset.insert(epoll::Events::EPOLLIN),
429         }
430         evset
431     }
432 
433     /// Notify the connection about an event (or set of events) that it was interested in.
434     ///
435     fn notify(&mut self, evset: epoll::Events) {
436         if evset.contains(epoll::Events::EPOLLIN) {
437             // Data can be read from the host stream. Setting a Rw pending indication, so that
438             // the muxer will know to call `recv_pkt()` later.
439             self.pending_rx.insert(PendingRx::Rw);
440         }
441 
442         if evset.contains(epoll::Events::EPOLLOUT) {
443             // Data can be written to the host stream. Time to flush out the TX buffer.
444             //
445             if self.tx_buf.is_empty() {
446                 info!("vsock: connection received unexpected EPOLLOUT event");
447                 return;
448             }
449             let flushed = self
450                 .tx_buf
451                 .flush_to(&mut self.stream)
452                 .unwrap_or_else(|err| {
453                     warn!(
454                         "vsock: error flushing TX buf for (lp={}, pp={}): {:?}",
455                         self.local_port, self.peer_port, err
456                     );
457                     match err {
458                         Error::TxBufFlush(inner) if inner.kind() == ErrorKind::WouldBlock => {
459                             // This should never happen (EWOULDBLOCK after EPOLLOUT), but
460                             // it does, so let's absorb it.
461                         }
462                         _ => self.kill(),
463                     };
464                     0
465                 });
466             self.fwd_cnt += Wrapping(flushed as u32);
467 
468             // If this connection was shutting down, but is waiting to drain the TX buffer
469             // before forceful termination, the wait might be over.
470             if self.state == ConnState::PeerClosed(true, true) && self.tx_buf.is_empty() {
471                 self.pending_rx.insert(PendingRx::Rst);
472             } else if self.peer_needs_credit_update() {
473                 // If we've freed up some more buffer space, we may need to let the peer know it
474                 // can safely send more data our way.
475                 self.pending_rx.insert(PendingRx::CreditUpdate);
476             }
477         }
478     }
479 }
480 
481 impl<S> VsockConnection<S>
482 where
483     S: Read + Write + AsRawFd,
484 {
485     /// Create a new guest-initiated connection object.
486     ///
487     pub fn new_peer_init(
488         stream: S,
489         local_cid: u64,
490         peer_cid: u64,
491         local_port: u32,
492         peer_port: u32,
493         peer_buf_alloc: u32,
494     ) -> Self {
495         Self {
496             local_cid,
497             peer_cid,
498             local_port,
499             peer_port,
500             stream,
501             state: ConnState::PeerInit,
502             tx_buf: TxBuf::new(),
503             fwd_cnt: Wrapping(0),
504             peer_buf_alloc,
505             peer_fwd_cnt: Wrapping(0),
506             rx_cnt: Wrapping(0),
507             last_fwd_cnt_to_peer: Wrapping(0),
508             pending_rx: PendingRxSet::from(PendingRx::Response),
509             expiry: None,
510         }
511     }
512 
513     /// Create a new host-initiated connection object.
514     ///
515     pub fn new_local_init(
516         stream: S,
517         local_cid: u64,
518         peer_cid: u64,
519         local_port: u32,
520         peer_port: u32,
521     ) -> Self {
522         Self {
523             local_cid,
524             peer_cid,
525             local_port,
526             peer_port,
527             stream,
528             state: ConnState::LocalInit,
529             tx_buf: TxBuf::new(),
530             fwd_cnt: Wrapping(0),
531             peer_buf_alloc: 0,
532             peer_fwd_cnt: Wrapping(0),
533             rx_cnt: Wrapping(0),
534             last_fwd_cnt_to_peer: Wrapping(0),
535             pending_rx: PendingRxSet::from(PendingRx::Request),
536             expiry: None,
537         }
538     }
539 
540     /// Check if there is an expiry (kill) timer set for this connection, sometime in the
541     /// future.
542     ///
543     pub fn will_expire(&self) -> bool {
544         match self.expiry {
545             None => false,
546             Some(t) => t > Instant::now(),
547         }
548     }
549 
550     /// Check if this connection needs to be scheduled for forceful termination, due to its
551     /// kill timer having expired.
552     ///
553     pub fn has_expired(&self) -> bool {
554         match self.expiry {
555             None => false,
556             Some(t) => t <= Instant::now(),
557         }
558     }
559 
560     /// Get the kill timer value, if one is set.
561     ///
562     pub fn expiry(&self) -> Option<Instant> {
563         self.expiry
564     }
565 
566     /// Schedule the connection to be forcefully terminated ASAP (i.e. the next time the
567     /// connection is asked to yield a packet, via `recv_pkt()`).
568     ///
569     pub fn kill(&mut self) {
570         self.state = ConnState::Killed;
571         self.pending_rx.insert(PendingRx::Rst);
572     }
573 
574     /// Return the connections state.
575     ///
576     pub fn state(&self) -> ConnState {
577         self.state
578     }
579 
580     /// Send some raw, untracked, data straight to the underlying connected stream.
581     /// Returns: number of bytes written, or the error describing the write failure.
582     ///
583     /// Warning: this will bypass the connection state machine and write directly to the
584     /// underlying stream. No account of this write is kept, which includes bypassing
585     /// vsock flow control.
586     ///
587     pub fn send_bytes_raw(&mut self, buf: &[u8]) -> Result<usize> {
588         self.stream.write(buf).map_err(Error::StreamWrite)
589     }
590 
591     /// Send some raw data (a byte-slice) to the host stream.
592     ///
593     /// Raw data can either be sent straight to the host stream, or to our TX buffer, if the
594     /// former fails.
595     ///
596     fn send_bytes(&mut self, buf: &[u8]) -> Result<()> {
597         // If there is data in the TX buffer, that means we're already registered for EPOLLOUT
598         // events on the underlying stream. Therefore, there's no point in attempting a write
599         // at this point. `self.notify()` will get called when EPOLLOUT arrives, and it will
600         // attempt to drain the TX buffer then.
601         if !self.tx_buf.is_empty() {
602             return self.tx_buf.push(buf);
603         }
604 
605         // The TX buffer is empty, so we can try to write straight to the host stream.
606         let written = match self.stream.write(buf) {
607             Ok(cnt) => cnt,
608             Err(e) => {
609                 // Absorb any would-block errors, since we can always try again later.
610                 if e.kind() == ErrorKind::WouldBlock {
611                     0
612                 } else {
613                     // We don't know how to handle any other write error, so we'll send it up
614                     // the call chain.
615                     return Err(Error::StreamWrite(e));
616                 }
617             }
618         };
619         // Move the "forwarded bytes" counter ahead by how much we were able to send out.
620         self.fwd_cnt += Wrapping(written as u32);
621 
622         // If we couldn't write the whole slice, we'll need to push the remaining data to our
623         // buffer.
624         if written < buf.len() {
625             self.tx_buf.push(&buf[written..])?;
626         }
627 
628         Ok(())
629     }
630 
631     /// Check if the credit information the peer has last received from us is outdated.
632     ///
633     fn peer_needs_credit_update(&self) -> bool {
634         let peer_seen_free_buf =
635             Wrapping(defs::CONN_TX_BUF_SIZE) - (self.fwd_cnt - self.last_fwd_cnt_to_peer);
636         peer_seen_free_buf < Wrapping(defs::CONN_CREDIT_UPDATE_THRESHOLD)
637     }
638 
639     /// Check if we need to ask the peer for a credit update before sending any more data its
640     /// way.
641     ///
642     fn need_credit_update_from_peer(&self) -> bool {
643         self.peer_avail_credit() == 0
644     }
645 
646     /// Get the maximum number of bytes that we can send to our peer, without overflowing its
647     /// buffer.
648     ///
649     fn peer_avail_credit(&self) -> usize {
650         (Wrapping(self.peer_buf_alloc as u32) - (self.rx_cnt - self.peer_fwd_cnt)).0 as usize
651     }
652 
653     /// Prepare a packet header for transmission to our peer.
654     ///
655     fn init_pkt<'a>(&self, pkt: &'a mut VsockPacket) -> &'a mut VsockPacket {
656         // Make sure the header is zeroed-out first.
657         // This looks sub-optimal, but it is actually optimized-out in the compiled code to be
658         // faster than a memset().
659         for b in pkt.hdr_mut() {
660             *b = 0;
661         }
662 
663         pkt.set_src_cid(self.local_cid)
664             .set_dst_cid(self.peer_cid)
665             .set_src_port(self.local_port)
666             .set_dst_port(self.peer_port)
667             .set_type(uapi::VSOCK_TYPE_STREAM)
668             .set_buf_alloc(defs::CONN_TX_BUF_SIZE)
669             .set_fwd_cnt(self.fwd_cnt.0)
670     }
671 }
672 
673 #[cfg(test)]
674 mod tests {
675     use libc::EFD_NONBLOCK;
676 
677     use std::io::{Error as IoError, ErrorKind, Read, Result as IoResult, Write};
678     use std::os::unix::io::RawFd;
679     use std::time::{Duration, Instant};
680     use vmm_sys_util::eventfd::EventFd;
681 
682     use super::super::super::defs::uapi;
683     use super::super::super::tests::TestContext;
684     use super::super::defs as csm_defs;
685     use super::*;
686 
687     const LOCAL_CID: u64 = 2;
688     const PEER_CID: u64 = 3;
689     const LOCAL_PORT: u32 = 1002;
690     const PEER_PORT: u32 = 1003;
691     const PEER_BUF_ALLOC: u32 = 64 * 1024;
692 
693     enum StreamState {
694         Closed,
695         Error(ErrorKind),
696         Ready,
697         WouldBlock,
698     }
699 
700     struct TestStream {
701         fd: EventFd,
702         read_buf: Vec<u8>,
703         read_state: StreamState,
704         write_buf: Vec<u8>,
705         write_state: StreamState,
706     }
707     impl TestStream {
708         fn new() -> Self {
709             Self {
710                 fd: EventFd::new(EFD_NONBLOCK).unwrap(),
711                 read_state: StreamState::Ready,
712                 write_state: StreamState::Ready,
713                 read_buf: Vec::new(),
714                 write_buf: Vec::new(),
715             }
716         }
717         fn new_with_read_buf(buf: &[u8]) -> Self {
718             let mut stream = Self::new();
719             stream.read_buf = buf.to_vec();
720             stream
721         }
722     }
723 
724     impl AsRawFd for TestStream {
725         fn as_raw_fd(&self) -> RawFd {
726             self.fd.as_raw_fd()
727         }
728     }
729 
730     impl Read for TestStream {
731         fn read(&mut self, data: &mut [u8]) -> IoResult<usize> {
732             match self.read_state {
733                 StreamState::Closed => Ok(0),
734                 StreamState::Error(kind) => Err(IoError::new(kind, "whatevs")),
735                 StreamState::Ready => {
736                     if self.read_buf.is_empty() {
737                         return Err(IoError::new(ErrorKind::WouldBlock, "EAGAIN"));
738                     }
739                     let len = std::cmp::min(data.len(), self.read_buf.len());
740                     assert_ne!(len, 0);
741                     data[..len].copy_from_slice(&self.read_buf[..len]);
742                     self.read_buf = self.read_buf.split_off(len);
743                     Ok(len)
744                 }
745                 StreamState::WouldBlock => Err(IoError::new(ErrorKind::WouldBlock, "EAGAIN")),
746             }
747         }
748     }
749 
750     impl Write for TestStream {
751         fn write(&mut self, data: &[u8]) -> IoResult<usize> {
752             match self.write_state {
753                 StreamState::Closed => Err(IoError::new(ErrorKind::BrokenPipe, "EPIPE")),
754                 StreamState::Error(kind) => Err(IoError::new(kind, "whatevs")),
755                 StreamState::Ready => {
756                     self.write_buf.extend_from_slice(data);
757                     Ok(data.len())
758                 }
759                 StreamState::WouldBlock => Err(IoError::new(ErrorKind::WouldBlock, "EAGAIN")),
760             }
761         }
762         fn flush(&mut self) -> IoResult<()> {
763             Ok(())
764         }
765     }
766 
767     impl<S> VsockConnection<S>
768     where
769         S: Read + Write + AsRawFd,
770     {
771         /// Get the fwd_cnt value from the connection.
772         pub(crate) fn fwd_cnt(&self) -> Wrapping<u32> {
773             self.fwd_cnt
774         }
775 
776         /// Forcefully insert a credit update flag.
777         pub(crate) fn insert_credit_update(&mut self) {
778             self.pending_rx.insert(PendingRx::CreditUpdate);
779         }
780     }
781 
782     fn init_pkt(pkt: &mut VsockPacket, op: u16, len: u32) -> &mut VsockPacket {
783         for b in pkt.hdr_mut() {
784             *b = 0;
785         }
786         pkt.set_src_cid(PEER_CID)
787             .set_dst_cid(LOCAL_CID)
788             .set_src_port(PEER_PORT)
789             .set_dst_port(LOCAL_PORT)
790             .set_type(uapi::VSOCK_TYPE_STREAM)
791             .set_buf_alloc(PEER_BUF_ALLOC)
792             .set_op(op)
793             .set_len(len)
794     }
795 
796     // This is the connection state machine test context: a helper struct to provide CSM testing
797     // primitives. A single `VsockPacket` object will be enough for our testing needs. We'll be
798     // using it for simulating both packet sends and packet receives. We need to keep the vsock
799     // testing context alive, since `VsockPacket` is just a pointer-wrapper over some data that
800     // resides in guest memory. The vsock test context owns the `GuestMemory` object, so we'll make
801     // it a member here, in order to make sure that guest memory outlives our testing packet.  A
802     // single `VsockConnection` object will also suffice for our testing needs. We'll be using a
803     // specially crafted `Read + Write + AsRawFd` object as a backing stream, so that we can
804     // control the various error conditions that might arise.
805     struct CsmTestContext {
806         _vsock_test_ctx: TestContext,
807         pkt: VsockPacket,
808         conn: VsockConnection<TestStream>,
809     }
810 
811     impl CsmTestContext {
812         fn new_established() -> Self {
813             Self::new(ConnState::Established)
814         }
815 
816         fn new(conn_state: ConnState) -> Self {
817             let vsock_test_ctx = TestContext::new();
818             let mut handler_ctx = vsock_test_ctx.create_epoll_handler_context();
819             let stream = TestStream::new();
820             let mut pkt = VsockPacket::from_rx_virtq_head(
821                 &mut handler_ctx.handler.queues[0]
822                     .iter()
823                     .unwrap()
824                     .next()
825                     .unwrap(),
826                 None,
827             )
828             .unwrap();
829             let conn = match conn_state {
830                 ConnState::PeerInit => VsockConnection::<TestStream>::new_peer_init(
831                     stream,
832                     LOCAL_CID,
833                     PEER_CID,
834                     LOCAL_PORT,
835                     PEER_PORT,
836                     PEER_BUF_ALLOC,
837                 ),
838                 ConnState::LocalInit => VsockConnection::<TestStream>::new_local_init(
839                     stream, LOCAL_CID, PEER_CID, LOCAL_PORT, PEER_PORT,
840                 ),
841                 ConnState::Established => {
842                     let mut conn = VsockConnection::<TestStream>::new_peer_init(
843                         stream,
844                         LOCAL_CID,
845                         PEER_CID,
846                         LOCAL_PORT,
847                         PEER_PORT,
848                         PEER_BUF_ALLOC,
849                     );
850                     assert!(conn.has_pending_rx());
851                     conn.recv_pkt(&mut pkt).unwrap();
852                     assert_eq!(pkt.op(), uapi::VSOCK_OP_RESPONSE);
853                     conn
854                 }
855                 other => panic!("invalid ctx state: {:?}", other),
856             };
857             assert_eq!(conn.state, conn_state);
858             Self {
859                 _vsock_test_ctx: vsock_test_ctx,
860                 pkt,
861                 conn,
862             }
863         }
864 
865         fn set_stream(&mut self, stream: TestStream) {
866             self.conn.stream = stream;
867         }
868 
869         fn set_peer_credit(&mut self, credit: u32) {
870             assert!(credit < self.conn.peer_buf_alloc);
871             self.conn.peer_fwd_cnt = Wrapping(0);
872             self.conn.rx_cnt = Wrapping(self.conn.peer_buf_alloc - credit);
873             assert_eq!(self.conn.peer_avail_credit(), credit as usize);
874         }
875 
876         fn send(&mut self) {
877             self.conn.send_pkt(&self.pkt).unwrap();
878         }
879 
880         fn recv(&mut self) {
881             self.conn.recv_pkt(&mut self.pkt).unwrap();
882         }
883 
884         fn notify_epollin(&mut self) {
885             self.conn.notify(epoll::Events::EPOLLIN);
886             assert!(self.conn.has_pending_rx());
887         }
888 
889         fn notify_epollout(&mut self) {
890             self.conn.notify(epoll::Events::EPOLLOUT);
891         }
892 
893         fn init_pkt(&mut self, op: u16, len: u32) -> &mut VsockPacket {
894             init_pkt(&mut self.pkt, op, len)
895         }
896 
897         fn init_data_pkt(&mut self, data: &[u8]) -> &VsockPacket {
898             assert!(data.len() <= self.pkt.buf().unwrap().len());
899             self.init_pkt(uapi::VSOCK_OP_RW, data.len() as u32);
900             self.pkt.buf_mut().unwrap()[..data.len()].copy_from_slice(data);
901             &self.pkt
902         }
903     }
904 
905     #[test]
906     fn test_peer_request() {
907         let mut ctx = CsmTestContext::new(ConnState::PeerInit);
908         assert!(ctx.conn.has_pending_rx());
909         ctx.recv();
910         // For peer-initiated requests, our connection should always yield a vsock response packet,
911         // in order to establish the connection.
912         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RESPONSE);
913         assert_eq!(ctx.pkt.src_cid(), LOCAL_CID);
914         assert_eq!(ctx.pkt.dst_cid(), PEER_CID);
915         assert_eq!(ctx.pkt.src_port(), LOCAL_PORT);
916         assert_eq!(ctx.pkt.dst_port(), PEER_PORT);
917         assert_eq!(ctx.pkt.type_(), uapi::VSOCK_TYPE_STREAM);
918         assert_eq!(ctx.pkt.len(), 0);
919         // After yielding the response packet, the connection should have transitioned to the
920         // established state.
921         assert_eq!(ctx.conn.state, ConnState::Established);
922     }
923 
924     #[test]
925     fn test_local_request() {
926         let mut ctx = CsmTestContext::new(ConnState::LocalInit);
927         // Host-initiated connections should first yield a connection request packet.
928         assert!(ctx.conn.has_pending_rx());
929         // Before yielding the connection request packet, the timeout kill timer shouldn't be
930         // armed.
931         assert!(!ctx.conn.will_expire());
932         ctx.recv();
933         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_REQUEST);
934         // Since the request might time-out, the kill timer should now be armed.
935         assert!(ctx.conn.will_expire());
936         assert!(!ctx.conn.has_expired());
937         ctx.init_pkt(uapi::VSOCK_OP_RESPONSE, 0);
938         ctx.send();
939         // Upon receiving a connection response, the connection should have transitioned to the
940         // established state, and the kill timer should've been disarmed.
941         assert_eq!(ctx.conn.state, ConnState::Established);
942         assert!(!ctx.conn.will_expire());
943     }
944 
945     #[test]
946     fn test_local_request_timeout() {
947         let mut ctx = CsmTestContext::new(ConnState::LocalInit);
948         ctx.recv();
949         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_REQUEST);
950         assert!(ctx.conn.will_expire());
951         assert!(!ctx.conn.has_expired());
952         std::thread::sleep(std::time::Duration::from_millis(
953             defs::CONN_REQUEST_TIMEOUT_MS,
954         ));
955         assert!(ctx.conn.has_expired());
956     }
957 
958     #[test]
959     fn test_rx_data() {
960         let mut ctx = CsmTestContext::new_established();
961         let data = &[1, 2, 3, 4];
962         ctx.set_stream(TestStream::new_with_read_buf(data));
963         assert_eq!(ctx.conn.get_polled_fd(), ctx.conn.stream.as_raw_fd());
964         ctx.notify_epollin();
965         ctx.recv();
966         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW);
967         assert_eq!(ctx.pkt.len() as usize, data.len());
968         assert_eq!(ctx.pkt.buf().unwrap()[..ctx.pkt.len() as usize], *data);
969 
970         // There's no more data in the stream, so `recv_pkt` should yield `VsockError::NoData`.
971         match ctx.conn.recv_pkt(&mut ctx.pkt) {
972             Err(VsockError::NoData) => (),
973             other => panic!("{:?}", other),
974         }
975 
976         // A recv attempt in an invalid state should yield an instant reset packet.
977         ctx.conn.state = ConnState::LocalClosed;
978         ctx.notify_epollin();
979         ctx.recv();
980         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
981     }
982 
983     #[test]
984     fn test_local_close() {
985         let mut ctx = CsmTestContext::new_established();
986         let mut stream = TestStream::new();
987         stream.read_state = StreamState::Closed;
988         ctx.set_stream(stream);
989         ctx.notify_epollin();
990         ctx.recv();
991         // When the host-side stream is closed, we can neither send not receive any more data.
992         // Therefore, the vsock shutdown packet that we'll deliver to the guest must contain both
993         // the no-more-send and the no-more-recv indications.
994         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_SHUTDOWN);
995         assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND, 0);
996         assert_ne!(ctx.pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV, 0);
997 
998         // The kill timer should now be armed.
999         assert!(ctx.conn.will_expire());
1000         assert!(
1001             ctx.conn.expiry().unwrap()
1002                 < Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS)
1003         );
1004     }
1005 
1006     #[test]
1007     fn test_peer_close() {
1008         // Test that send/recv shutdown indications are handled correctly.
1009         // I.e. once set, an indication cannot be reset.
1010         {
1011             let mut ctx = CsmTestContext::new_established();
1012 
1013             ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0)
1014                 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_RCV);
1015             ctx.send();
1016             assert_eq!(ctx.conn.state, ConnState::PeerClosed(true, false));
1017 
1018             // Attempting to reset the no-more-recv indication should not work
1019             // (we are only setting the no-more-send indication here).
1020             ctx.pkt.set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_SEND);
1021             ctx.send();
1022             assert_eq!(ctx.conn.state, ConnState::PeerClosed(true, true));
1023         }
1024 
1025         // Test case:
1026         // - reading data from a no-more-send connection should work; and
1027         // - writing data should have no effect.
1028         {
1029             let data = &[1, 2, 3, 4];
1030             let mut ctx = CsmTestContext::new_established();
1031             ctx.set_stream(TestStream::new_with_read_buf(data));
1032             ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0)
1033                 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_SEND);
1034             ctx.send();
1035             ctx.notify_epollin();
1036             ctx.recv();
1037             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RW);
1038             assert_eq!(&ctx.pkt.buf().unwrap()[..ctx.pkt.len() as usize], data);
1039 
1040             ctx.init_data_pkt(data);
1041             ctx.send();
1042             assert_eq!(ctx.conn.stream.write_buf.len(), 0);
1043             assert!(ctx.conn.tx_buf.is_empty());
1044         }
1045 
1046         // Test case:
1047         // - writing data to a no-more-recv connection should work; and
1048         // - attempting to read data from it should yield an RST packet.
1049         {
1050             let mut ctx = CsmTestContext::new_established();
1051             ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0)
1052                 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_RCV);
1053             ctx.send();
1054             let data = &[1, 2, 3, 4];
1055             ctx.init_data_pkt(data);
1056             ctx.send();
1057             assert_eq!(ctx.conn.stream.write_buf, data.to_vec());
1058 
1059             ctx.notify_epollin();
1060             ctx.recv();
1061             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1062         }
1063 
1064         // Test case: setting both no-more-send and no-more-recv indications should have the
1065         // connection confirm termination (i.e. yield an RST).
1066         {
1067             let mut ctx = CsmTestContext::new_established();
1068             ctx.init_pkt(uapi::VSOCK_OP_SHUTDOWN, 0)
1069                 .set_flags(uapi::VSOCK_FLAGS_SHUTDOWN_RCV | uapi::VSOCK_FLAGS_SHUTDOWN_SEND);
1070             ctx.send();
1071             assert!(ctx.conn.has_pending_rx());
1072             ctx.recv();
1073             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1074         }
1075     }
1076 
1077     #[test]
1078     fn test_local_read_error() {
1079         let mut ctx = CsmTestContext::new_established();
1080         let mut stream = TestStream::new();
1081         stream.read_state = StreamState::Error(ErrorKind::PermissionDenied);
1082         ctx.set_stream(stream);
1083         ctx.notify_epollin();
1084         ctx.recv();
1085         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1086     }
1087 
1088     #[test]
1089     fn test_credit_request_to_peer() {
1090         let mut ctx = CsmTestContext::new_established();
1091         ctx.set_peer_credit(0);
1092         ctx.notify_epollin();
1093         ctx.recv();
1094         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_CREDIT_REQUEST);
1095     }
1096 
1097     #[test]
1098     fn test_credit_request_from_peer() {
1099         let mut ctx = CsmTestContext::new_established();
1100         ctx.init_pkt(uapi::VSOCK_OP_CREDIT_REQUEST, 0);
1101         ctx.send();
1102         assert!(ctx.conn.has_pending_rx());
1103         ctx.recv();
1104         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_CREDIT_UPDATE);
1105         assert_eq!(ctx.pkt.buf_alloc(), csm_defs::CONN_TX_BUF_SIZE);
1106         assert_eq!(ctx.pkt.fwd_cnt(), ctx.conn.fwd_cnt.0);
1107     }
1108 
1109     #[test]
1110     fn test_credit_update_to_peer() {
1111         let mut ctx = CsmTestContext::new_established();
1112 
1113         // Force a stale state, where the peer hasn't been updated on our credit situation.
1114         ctx.conn.last_fwd_cnt_to_peer = Wrapping(0);
1115 
1116         // Since a credit update token is sent when the fwd_cnt value exceeds
1117         // CONN_TX_BUF_SIZE - CONN_CREDIT_UPDATE_THRESHOLD, we initialize
1118         // fwd_cnt at 6 bytes below the threshold.
1119         let initial_fwd_cnt =
1120             csm_defs::CONN_TX_BUF_SIZE as u32 - csm_defs::CONN_CREDIT_UPDATE_THRESHOLD as u32 - 6;
1121         ctx.conn.fwd_cnt = Wrapping(initial_fwd_cnt);
1122 
1123         // Use a 4-byte packet for triggering the credit update threshold.
1124         let data = &[1, 2, 3, 4];
1125 
1126         // Check that there is no pending RX.
1127         ctx.init_data_pkt(data);
1128         ctx.send();
1129         assert!(!ctx.conn.has_pending_rx());
1130 
1131         // Send a packet again.
1132         ctx.init_data_pkt(data);
1133         ctx.send();
1134 
1135         // The CSM should now have a credit update available for the peer.
1136         assert!(ctx.conn.has_pending_rx());
1137         ctx.recv();
1138         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_CREDIT_UPDATE);
1139         assert_eq!(ctx.pkt.fwd_cnt(), initial_fwd_cnt + data.len() as u32 * 2);
1140         assert_eq!(ctx.conn.fwd_cnt, ctx.conn.last_fwd_cnt_to_peer);
1141     }
1142 
1143     #[test]
1144     fn test_tx_buffering() {
1145         // Test case:
1146         // - when writing to the backing stream would block, TX data should end up in the TX buf
1147         // - when the CSM is notified that it can write to the backing stream, it should flush
1148         //   the TX buf.
1149         {
1150             let mut ctx = CsmTestContext::new_established();
1151 
1152             let mut stream = TestStream::new();
1153             stream.write_state = StreamState::WouldBlock;
1154             ctx.set_stream(stream);
1155 
1156             // Send some data through the connection. The backing stream is set to reject writes,
1157             // so the data should end up in the TX buffer.
1158             let data = &[1, 2, 3, 4];
1159             ctx.init_data_pkt(data);
1160             ctx.send();
1161 
1162             // When there's data in the TX buffer, the connection should ask to be notified when it
1163             // can write to its backing stream.
1164             assert!(ctx
1165                 .conn
1166                 .get_polled_evset()
1167                 .contains(epoll::Events::EPOLLOUT));
1168             assert_eq!(ctx.conn.tx_buf.len(), data.len());
1169 
1170             // Unlock the write stream and notify the connection it can now write its buffered
1171             // data.
1172             ctx.set_stream(TestStream::new());
1173             ctx.conn.notify(epoll::Events::EPOLLOUT);
1174             assert!(ctx.conn.tx_buf.is_empty());
1175             assert_eq!(ctx.conn.stream.write_buf, data);
1176         }
1177     }
1178 
1179     #[test]
1180     fn test_stream_write_error() {
1181         // Test case: sending a data packet to a broken / closed backing stream should kill it.
1182         {
1183             let mut ctx = CsmTestContext::new_established();
1184             let mut stream = TestStream::new();
1185             stream.write_state = StreamState::Closed;
1186             ctx.set_stream(stream);
1187 
1188             let data = &[1, 2, 3, 4];
1189             ctx.init_data_pkt(data);
1190             ctx.send();
1191 
1192             assert_eq!(ctx.conn.state, ConnState::Killed);
1193             assert!(ctx.conn.has_pending_rx());
1194             ctx.recv();
1195             assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1196         }
1197 
1198         // Test case: notifying a connection that it can flush its TX buffer to a broken stream
1199         // should kill the connection.
1200         {
1201             let mut ctx = CsmTestContext::new_established();
1202 
1203             let mut stream = TestStream::new();
1204             stream.write_state = StreamState::WouldBlock;
1205             ctx.set_stream(stream);
1206 
1207             // Send some data through the connection. The backing stream is set to reject writes,
1208             // so the data should end up in the TX buffer.
1209             let data = &[1, 2, 3, 4];
1210             ctx.init_data_pkt(data);
1211             ctx.send();
1212 
1213             // Set the backing stream to error out on write.
1214             let mut stream = TestStream::new();
1215             stream.write_state = StreamState::Closed;
1216             ctx.set_stream(stream);
1217 
1218             assert!(ctx
1219                 .conn
1220                 .get_polled_evset()
1221                 .contains(epoll::Events::EPOLLOUT));
1222             ctx.notify_epollout();
1223             assert_eq!(ctx.conn.state, ConnState::Killed);
1224         }
1225     }
1226 
1227     #[test]
1228     fn test_peer_credit_misbehavior() {
1229         let mut ctx = CsmTestContext::new_established();
1230 
1231         let mut stream = TestStream::new();
1232         stream.write_state = StreamState::WouldBlock;
1233         ctx.set_stream(stream);
1234 
1235         // Fill up the TX buffer.
1236         let data = vec![0u8; ctx.pkt.buf().unwrap().len()];
1237         ctx.init_data_pkt(data.as_slice());
1238         for _i in 0..(csm_defs::CONN_TX_BUF_SIZE / data.len() as u32) {
1239             ctx.send();
1240         }
1241 
1242         // Then try to send more data.
1243         ctx.send();
1244 
1245         // The connection should've committed suicide.
1246         assert_eq!(ctx.conn.state, ConnState::Killed);
1247         assert!(ctx.conn.has_pending_rx());
1248         ctx.recv();
1249         assert_eq!(ctx.pkt.op(), uapi::VSOCK_OP_RST);
1250     }
1251 }
1252