xref: /cloud-hypervisor/virtio-devices/src/vsock/device.rs (revision fa7a000dbe9637eb256af18ae8c3c4a8d5bf9c8f)
1 // Copyright 2019 Intel Corporation. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5 // SPDX-License-Identifier: Apache-2.0
6 //
7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
8 // Use of this source code is governed by a BSD-style license that can be
9 // found in the THIRD-PARTY file.
10 
11 /// This is the `VirtioDevice` implementation for our vsock device. It handles the virtio-level
12 /// device logic: feature negotiation, device configuration, and device activation.
13 /// The run-time device logic (i.e. event-driven data handling) is implemented by
14 /// `super::epoll_handler::EpollHandler`.
15 ///
16 /// We aim to conform to the VirtIO v1.1 spec:
17 /// https://docs.oasis-open.org/virtio/virtio/v1.1/virtio-v1.1.html
18 ///
19 /// The vsock device has two input parameters: a CID to identify the device, and a `VsockBackend`
20 /// to use for offloading vsock traffic.
21 ///
22 /// Upon its activation, the vsock device creates its `EpollHandler`, passes it the event-interested
23 /// file descriptors, and registers these descriptors with the VMM `EpollContext`. Going forward,
24 /// the `EpollHandler` will get notified whenever an event occurs on the just-registered FDs:
25 /// - an RX queue FD;
26 /// - a TX queue FD;
27 /// - an event queue FD; and
28 /// - a backend FD.
29 ///
30 use super::{VsockBackend, VsockPacket};
31 use crate::seccomp_filters::Thread;
32 use crate::Error as DeviceError;
33 use crate::GuestMemoryMmap;
34 use crate::VirtioInterrupt;
35 use crate::{
36     thread_helper::spawn_virtio_thread, ActivateResult, EpollHelper, EpollHelperError,
37     EpollHelperHandler, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType,
38     EPOLL_HELPER_EVENT_LAST, VIRTIO_F_IN_ORDER, VIRTIO_F_IOMMU_PLATFORM, VIRTIO_F_VERSION_1,
39 };
40 use anyhow::anyhow;
41 use byteorder::{ByteOrder, LittleEndian};
42 use seccompiler::SeccompAction;
43 use std::io;
44 use std::os::unix::io::AsRawFd;
45 use std::path::PathBuf;
46 use std::result;
47 use std::sync::atomic::AtomicBool;
48 use std::sync::{Arc, Barrier, RwLock};
49 use versionize::{VersionMap, Versionize, VersionizeResult};
50 use versionize_derive::Versionize;
51 use virtio_queue::Queue;
52 use virtio_queue::QueueOwnedT;
53 use virtio_queue::QueueT;
54 use vm_memory::GuestAddressSpace;
55 use vm_memory::GuestMemoryAtomic;
56 use vm_migration::{
57     Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable, VersionMapped,
58 };
59 use vm_virtio::AccessPlatform;
60 use vmm_sys_util::eventfd::EventFd;
61 
62 const QUEUE_SIZE: u16 = 256;
63 const NUM_QUEUES: usize = 3;
64 const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE; NUM_QUEUES];
65 
66 // New descriptors are pending on the rx queue.
67 pub const RX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
68 // New descriptors are pending on the tx queue.
69 pub const TX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
70 // New descriptors are pending on the event queue.
71 pub const EVT_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
72 // Notification coming from the backend.
73 pub const BACKEND_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 4;
74 
75 /// The `VsockEpollHandler` implements the runtime logic of our vsock device:
76 /// 1. Respond to TX queue events by wrapping virtio buffers into `VsockPacket`s, then sending those
77 ///    packets to the `VsockBackend`;
78 /// 2. Forward backend FD event notifications to the `VsockBackend`;
79 /// 3. Fetch incoming packets from the `VsockBackend` and place them into the virtio RX queue;
80 /// 4. Whenever we have processed some virtio buffers (either TX or RX), let the driver know by
81 ///    raising our assigned IRQ.
82 ///
83 /// In a nutshell, the `VsockEpollHandler` logic looks like this:
84 /// - on TX queue event:
85 ///   - fetch all packets from the TX queue and send them to the backend; then
86 ///   - if the backend has queued up any incoming packets, fetch them into any available RX buffers.
87 /// - on RX queue event:
88 ///   - fetch any incoming packets, queued up by the backend, into newly available RX buffers.
89 /// - on backend event:
90 ///   - forward the event to the backend; then
91 ///   - again, attempt to fetch any incoming packets queued by the backend into virtio RX buffers.
92 ///
93 pub struct VsockEpollHandler<B: VsockBackend> {
94     pub mem: GuestMemoryAtomic<GuestMemoryMmap>,
95     pub queues: Vec<Queue>,
96     pub queue_evts: Vec<EventFd>,
97     pub kill_evt: EventFd,
98     pub pause_evt: EventFd,
99     pub interrupt_cb: Arc<dyn VirtioInterrupt>,
100     pub backend: Arc<RwLock<B>>,
101     pub access_platform: Option<Arc<dyn AccessPlatform>>,
102 }
103 
104 impl<B> VsockEpollHandler<B>
105 where
106     B: VsockBackend,
107 {
108     /// Signal the guest driver that we've used some virtio buffers that it had previously made
109     /// available.
110     ///
111     fn signal_used_queue(&self, queue_index: u16) -> result::Result<(), DeviceError> {
112         debug!("vsock: raising IRQ");
113 
114         self.interrupt_cb
115             .trigger(VirtioInterruptType::Queue(queue_index))
116             .map_err(|e| {
117                 error!("Failed to signal used queue: {:?}", e);
118                 DeviceError::FailedSignalingUsedQueue(e)
119             })
120     }
121 
122     /// Walk the driver-provided RX queue buffers and attempt to fill them up with any data that we
123     /// have pending.
124     ///
125     fn process_rx(&mut self) -> result::Result<(), DeviceError> {
126         debug!("vsock: epoll_handler::process_rx()");
127 
128         let mut used_descs = false;
129 
130         while let Some(mut desc_chain) = self.queues[0].pop_descriptor_chain(self.mem.memory()) {
131             let used_len = match VsockPacket::from_rx_virtq_head(
132                 &mut desc_chain,
133                 self.access_platform.as_ref(),
134             ) {
135                 Ok(mut pkt) => {
136                     if self.backend.write().unwrap().recv_pkt(&mut pkt).is_ok() {
137                         pkt.hdr().len() as u32 + pkt.len()
138                     } else {
139                         // We are using a consuming iterator over the virtio buffers, so, if we can't
140                         // fill in this buffer, we'll need to undo the last iterator step.
141                         self.queues[0].go_to_previous_position();
142                         break;
143                     }
144                 }
145                 Err(e) => {
146                     warn!("vsock: RX queue error: {:?}", e);
147                     0
148                 }
149             };
150 
151             self.queues[0]
152                 .add_used(desc_chain.memory(), desc_chain.head_index(), used_len)
153                 .map_err(DeviceError::QueueAddUsed)?;
154             used_descs = true;
155         }
156 
157         if used_descs {
158             self.signal_used_queue(0)
159         } else {
160             Ok(())
161         }
162     }
163 
164     /// Walk the driver-provided TX queue buffers, package them up as vsock packets, and send them to
165     /// the backend for processing.
166     ///
167     fn process_tx(&mut self) -> result::Result<(), DeviceError> {
168         debug!("vsock: epoll_handler::process_tx()");
169 
170         let mut used_descs = false;
171 
172         while let Some(mut desc_chain) = self.queues[1].pop_descriptor_chain(self.mem.memory()) {
173             let pkt = match VsockPacket::from_tx_virtq_head(
174                 &mut desc_chain,
175                 self.access_platform.as_ref(),
176             ) {
177                 Ok(pkt) => pkt,
178                 Err(e) => {
179                     error!("vsock: error reading TX packet: {:?}", e);
180                     self.queues[1]
181                         .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
182                         .map_err(DeviceError::QueueAddUsed)?;
183                     used_descs = true;
184                     continue;
185                 }
186             };
187 
188             if self.backend.write().unwrap().send_pkt(&pkt).is_err() {
189                 self.queues[1].go_to_previous_position();
190                 break;
191             }
192 
193             self.queues[1]
194                 .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
195                 .map_err(DeviceError::QueueAddUsed)?;
196             used_descs = true;
197         }
198 
199         if used_descs {
200             self.signal_used_queue(1)
201         } else {
202             Ok(())
203         }
204     }
205 
206     fn run(
207         &mut self,
208         paused: Arc<AtomicBool>,
209         paused_sync: Arc<Barrier>,
210     ) -> result::Result<(), EpollHelperError> {
211         let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
212         helper.add_event(self.queue_evts[0].as_raw_fd(), RX_QUEUE_EVENT)?;
213         helper.add_event(self.queue_evts[1].as_raw_fd(), TX_QUEUE_EVENT)?;
214         helper.add_event(self.queue_evts[2].as_raw_fd(), EVT_QUEUE_EVENT)?;
215         helper.add_event(self.backend.read().unwrap().get_polled_fd(), BACKEND_EVENT)?;
216         helper.run(paused, paused_sync, self)?;
217 
218         Ok(())
219     }
220 }
221 
222 impl<B> EpollHelperHandler for VsockEpollHandler<B>
223 where
224     B: VsockBackend,
225 {
226     fn handle_event(
227         &mut self,
228         _helper: &mut EpollHelper,
229         event: &epoll::Event,
230     ) -> result::Result<(), EpollHelperError> {
231         let evset = match epoll::Events::from_bits(event.events) {
232             Some(evset) => evset,
233             None => {
234                 let evbits = event.events;
235                 warn!("epoll: ignoring unknown event set: 0x{:x}", evbits);
236                 return Ok(());
237             }
238         };
239 
240         let ev_type = event.data as u16;
241         match ev_type {
242             RX_QUEUE_EVENT => {
243                 debug!("vsock: RX queue event");
244                 self.queue_evts[0].read().map_err(|e| {
245                     EpollHelperError::HandleEvent(anyhow!("Failed to get RX queue event: {:?}", e))
246                 })?;
247                 if self.backend.read().unwrap().has_pending_rx() {
248                     self.process_rx().map_err(|e| {
249                         EpollHelperError::HandleEvent(anyhow!(
250                             "Failed to process RX queue: {:?}",
251                             e
252                         ))
253                     })?;
254                 }
255             }
256             TX_QUEUE_EVENT => {
257                 debug!("vsock: TX queue event");
258                 self.queue_evts[1].read().map_err(|e| {
259                     EpollHelperError::HandleEvent(anyhow!("Failed to get TX queue event: {:?}", e))
260                 })?;
261 
262                 self.process_tx().map_err(|e| {
263                     EpollHelperError::HandleEvent(anyhow!("Failed to process TX queue: {:?}", e))
264                 })?;
265 
266                 // The backend may have queued up responses to the packets we sent during TX queue
267                 // processing. If that happened, we need to fetch those responses and place them
268                 // into RX buffers.
269                 if self.backend.read().unwrap().has_pending_rx() {
270                     self.process_rx().map_err(|e| {
271                         EpollHelperError::HandleEvent(anyhow!(
272                             "Failed to process RX queue: {:?}",
273                             e
274                         ))
275                     })?;
276                 }
277             }
278             EVT_QUEUE_EVENT => {
279                 debug!("vsock: EVT queue event");
280                 self.queue_evts[2].read().map_err(|e| {
281                     EpollHelperError::HandleEvent(anyhow!("Failed to get EVT queue event: {:?}", e))
282                 })?;
283             }
284             BACKEND_EVENT => {
285                 debug!("vsock: backend event");
286                 self.backend.write().unwrap().notify(evset);
287                 // After the backend has been kicked, it might've freed up some resources, so we
288                 // can attempt to send it more data to process.
289                 // In particular, if `self.backend.send_pkt()` halted the TX queue processing (by
290                 // returning an error) at some point in the past, now is the time to try walking the
291                 // TX queue again.
292                 self.process_tx().map_err(|e| {
293                     EpollHelperError::HandleEvent(anyhow!("Failed to process TX queue: {:?}", e))
294                 })?;
295                 if self.backend.read().unwrap().has_pending_rx() {
296                     self.process_rx().map_err(|e| {
297                         EpollHelperError::HandleEvent(anyhow!(
298                             "Failed to process RX queue: {:?}",
299                             e
300                         ))
301                     })?;
302                 }
303             }
304             _ => {
305                 return Err(EpollHelperError::HandleEvent(anyhow!(
306                     "Unknown event for virtio-vsock"
307                 )));
308             }
309         }
310 
311         Ok(())
312     }
313 }
314 
315 /// Virtio device exposing virtual socket to the guest.
316 pub struct Vsock<B: VsockBackend> {
317     common: VirtioCommon,
318     id: String,
319     cid: u64,
320     backend: Arc<RwLock<B>>,
321     path: PathBuf,
322     seccomp_action: SeccompAction,
323     exit_evt: EventFd,
324 }
325 
326 #[derive(Versionize)]
327 pub struct VsockState {
328     pub avail_features: u64,
329     pub acked_features: u64,
330 }
331 
332 impl VersionMapped for VsockState {}
333 
334 impl<B> Vsock<B>
335 where
336     B: VsockBackend + Sync,
337 {
338     /// Create a new virtio-vsock device with the given VM CID and vsock
339     /// backend.
340     #[allow(clippy::too_many_arguments)]
341     pub fn new(
342         id: String,
343         cid: u32,
344         path: PathBuf,
345         backend: B,
346         iommu: bool,
347         seccomp_action: SeccompAction,
348         exit_evt: EventFd,
349         state: Option<VsockState>,
350     ) -> io::Result<Vsock<B>> {
351         let (avail_features, acked_features, paused) = if let Some(state) = state {
352             info!("Restoring virtio-vsock {}", id);
353             (state.avail_features, state.acked_features, true)
354         } else {
355             let mut avail_features = 1u64 << VIRTIO_F_VERSION_1 | 1u64 << VIRTIO_F_IN_ORDER;
356 
357             if iommu {
358                 avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM;
359             }
360             (avail_features, 0, false)
361         };
362 
363         Ok(Vsock {
364             common: VirtioCommon {
365                 device_type: VirtioDeviceType::Vsock as u32,
366                 avail_features,
367                 acked_features,
368                 paused_sync: Some(Arc::new(Barrier::new(2))),
369                 queue_sizes: QUEUE_SIZES.to_vec(),
370                 min_queues: NUM_QUEUES as u16,
371                 paused: Arc::new(AtomicBool::new(paused)),
372                 ..Default::default()
373             },
374             id,
375             cid: cid.into(),
376             backend: Arc::new(RwLock::new(backend)),
377             path,
378             seccomp_action,
379             exit_evt,
380         })
381     }
382 
383     fn state(&self) -> VsockState {
384         VsockState {
385             avail_features: self.common.avail_features,
386             acked_features: self.common.acked_features,
387         }
388     }
389 }
390 
391 impl<B> Drop for Vsock<B>
392 where
393     B: VsockBackend,
394 {
395     fn drop(&mut self) {
396         if let Some(kill_evt) = self.common.kill_evt.take() {
397             // Ignore the result because there is nothing we can do about it.
398             let _ = kill_evt.write(1);
399         }
400         self.common.wait_for_epoll_threads();
401     }
402 }
403 
404 impl<B> VirtioDevice for Vsock<B>
405 where
406     B: VsockBackend + Sync + 'static,
407 {
408     fn device_type(&self) -> u32 {
409         self.common.device_type
410     }
411 
412     fn queue_max_sizes(&self) -> &[u16] {
413         &self.common.queue_sizes
414     }
415 
416     fn features(&self) -> u64 {
417         self.common.avail_features
418     }
419 
420     fn ack_features(&mut self, value: u64) {
421         self.common.ack_features(value)
422     }
423 
424     fn read_config(&self, offset: u64, data: &mut [u8]) {
425         match offset {
426             0 if data.len() == 8 => LittleEndian::write_u64(data, self.cid),
427             0 if data.len() == 4 => LittleEndian::write_u32(data, (self.cid & 0xffff_ffff) as u32),
428             4 if data.len() == 4 => {
429                 LittleEndian::write_u32(data, ((self.cid >> 32) & 0xffff_ffff) as u32)
430             }
431             _ => warn!(
432                 "vsock: virtio-vsock received invalid read request of {} bytes at offset {}",
433                 data.len(),
434                 offset
435             ),
436         }
437     }
438 
439     fn activate(
440         &mut self,
441         mem: GuestMemoryAtomic<GuestMemoryMmap>,
442         interrupt_cb: Arc<dyn VirtioInterrupt>,
443         queues: Vec<(usize, Queue, EventFd)>,
444     ) -> ActivateResult {
445         self.common.activate(&queues, &interrupt_cb)?;
446         let (kill_evt, pause_evt) = self.common.dup_eventfds();
447 
448         let mut virtqueues = Vec::new();
449         let mut queue_evts = Vec::new();
450         for (_, queue, queue_evt) in queues {
451             virtqueues.push(queue);
452             queue_evts.push(queue_evt);
453         }
454 
455         let mut handler = VsockEpollHandler {
456             mem,
457             queues: virtqueues,
458             queue_evts,
459             kill_evt,
460             pause_evt,
461             interrupt_cb,
462             backend: self.backend.clone(),
463             access_platform: self.common.access_platform.clone(),
464         };
465 
466         let paused = self.common.paused.clone();
467         let paused_sync = self.common.paused_sync.clone();
468         let mut epoll_threads = Vec::new();
469 
470         spawn_virtio_thread(
471             &self.id,
472             &self.seccomp_action,
473             Thread::VirtioVsock,
474             &mut epoll_threads,
475             &self.exit_evt,
476             move || handler.run(paused, paused_sync.unwrap()),
477         )?;
478 
479         self.common.epoll_threads = Some(epoll_threads);
480 
481         event!("virtio-device", "activated", "id", &self.id);
482         Ok(())
483     }
484 
485     fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> {
486         let result = self.common.reset();
487         event!("virtio-device", "reset", "id", &self.id);
488         result
489     }
490 
491     fn shutdown(&mut self) {
492         std::fs::remove_file(&self.path).ok();
493     }
494 
495     fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) {
496         self.common.set_access_platform(access_platform)
497     }
498 }
499 
500 impl<B> Pausable for Vsock<B>
501 where
502     B: VsockBackend + Sync + 'static,
503 {
504     fn pause(&mut self) -> result::Result<(), MigratableError> {
505         self.common.pause()
506     }
507 
508     fn resume(&mut self) -> result::Result<(), MigratableError> {
509         self.common.resume()
510     }
511 }
512 
513 impl<B> Snapshottable for Vsock<B>
514 where
515     B: VsockBackend + Sync + 'static,
516 {
517     fn id(&self) -> String {
518         self.id.clone()
519     }
520 
521     fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
522         Snapshot::new_from_versioned_state(&self.state())
523     }
524 }
525 impl<B> Transportable for Vsock<B> where B: VsockBackend + Sync + 'static {}
526 impl<B> Migratable for Vsock<B> where B: VsockBackend + Sync + 'static {}
527 
528 #[cfg(test)]
529 mod tests {
530     use super::super::tests::{NoopVirtioInterrupt, TestContext};
531     use super::super::*;
532     use super::*;
533     use crate::ActivateError;
534     use libc::EFD_NONBLOCK;
535 
536     #[test]
537     fn test_virtio_device() {
538         let mut ctx = TestContext::new();
539         let avail_features = 1u64 << VIRTIO_F_VERSION_1 | 1u64 << VIRTIO_F_IN_ORDER;
540         let device_features = avail_features;
541         let driver_features: u64 = avail_features | 1 | (1 << 32);
542         let device_pages = [
543             (device_features & 0xffff_ffff) as u32,
544             (device_features >> 32) as u32,
545         ];
546         let driver_pages = [
547             (driver_features & 0xffff_ffff) as u32,
548             (driver_features >> 32) as u32,
549         ];
550         assert_eq!(ctx.device.device_type(), VirtioDeviceType::Vsock as u32);
551         assert_eq!(ctx.device.queue_max_sizes(), QUEUE_SIZES);
552         assert_eq!(ctx.device.features() as u32, device_pages[0]);
553         assert_eq!((ctx.device.features() >> 32) as u32, device_pages[1]);
554 
555         // Ack device features, page 0.
556         ctx.device.ack_features(u64::from(driver_pages[0]));
557         // Ack device features, page 1.
558         ctx.device.ack_features(u64::from(driver_pages[1]) << 32);
559         // Check that no side effect are present, and that the acked features are exactly the same
560         // as the device features.
561         assert_eq!(
562             ctx.device.common.acked_features,
563             device_features & driver_features
564         );
565 
566         // Test reading 32-bit chunks.
567         let mut data = [0u8; 8];
568         ctx.device.read_config(0, &mut data[..4]);
569         assert_eq!(
570             u64::from(LittleEndian::read_u32(&data)),
571             ctx.cid & 0xffff_ffff
572         );
573         ctx.device.read_config(4, &mut data[4..]);
574         assert_eq!(
575             u64::from(LittleEndian::read_u32(&data[4..])),
576             (ctx.cid >> 32) & 0xffff_ffff
577         );
578 
579         // Test reading 64-bit.
580         let mut data = [0u8; 8];
581         ctx.device.read_config(0, &mut data);
582         assert_eq!(LittleEndian::read_u64(&data), ctx.cid);
583 
584         // Check that out-of-bounds reading doesn't mutate the destination buffer.
585         let mut data = [0u8, 1, 2, 3, 4, 5, 6, 7];
586         ctx.device.read_config(2, &mut data);
587         assert_eq!(data, [0u8, 1, 2, 3, 4, 5, 6, 7]);
588 
589         // Just covering lines here, since the vsock device has no writable config.
590         // A warning is, however, logged, if the guest driver attempts to write any config data.
591         ctx.device.write_config(0, &data[..4]);
592 
593         let memory = GuestMemoryAtomic::new(ctx.mem.clone());
594 
595         // Test a bad activation.
596         let bad_activate =
597             ctx.device
598                 .activate(memory.clone(), Arc::new(NoopVirtioInterrupt {}), Vec::new());
599         match bad_activate {
600             Err(ActivateError::BadActivate) => (),
601             other => panic!("{other:?}"),
602         }
603 
604         // Test a correct activation.
605         ctx.device
606             .activate(
607                 memory,
608                 Arc::new(NoopVirtioInterrupt {}),
609                 vec![
610                     (
611                         0,
612                         Queue::new(256).unwrap(),
613                         EventFd::new(EFD_NONBLOCK).unwrap(),
614                     ),
615                     (
616                         1,
617                         Queue::new(256).unwrap(),
618                         EventFd::new(EFD_NONBLOCK).unwrap(),
619                     ),
620                     (
621                         2,
622                         Queue::new(256).unwrap(),
623                         EventFd::new(EFD_NONBLOCK).unwrap(),
624                     ),
625                 ],
626             )
627             .unwrap();
628     }
629 
630     #[test]
631     fn test_irq() {
632         // Test case: successful IRQ signaling.
633         {
634             let test_ctx = TestContext::new();
635             let ctx = test_ctx.create_epoll_handler_context();
636 
637             let _queue: Queue = Queue::new(256).unwrap();
638             assert!(ctx.handler.signal_used_queue(0).is_ok());
639         }
640     }
641 
642     #[test]
643     fn test_txq_event() {
644         // Test case:
645         // - the driver has something to send (there's data in the TX queue); and
646         // - the backend has no pending RX data.
647         {
648             let test_ctx = TestContext::new();
649             let mut ctx = test_ctx.create_epoll_handler_context();
650 
651             ctx.handler.backend.write().unwrap().set_pending_rx(false);
652             ctx.signal_txq_event();
653 
654             // The available TX descriptor should have been used.
655             assert_eq!(ctx.guest_txvq.used.idx.get(), 1);
656             // The available RX descriptor should be untouched.
657             assert_eq!(ctx.guest_rxvq.used.idx.get(), 0);
658         }
659 
660         // Test case:
661         // - the driver has something to send (there's data in the TX queue); and
662         // - the backend also has some pending RX data.
663         {
664             let test_ctx = TestContext::new();
665             let mut ctx = test_ctx.create_epoll_handler_context();
666 
667             ctx.handler.backend.write().unwrap().set_pending_rx(true);
668             ctx.signal_txq_event();
669 
670             // Both available RX and TX descriptors should have been used.
671             assert_eq!(ctx.guest_txvq.used.idx.get(), 1);
672             assert_eq!(ctx.guest_rxvq.used.idx.get(), 1);
673         }
674 
675         // Test case:
676         // - the driver has something to send (there's data in the TX queue); and
677         // - the backend errors out and cannot process the TX queue.
678         {
679             let test_ctx = TestContext::new();
680             let mut ctx = test_ctx.create_epoll_handler_context();
681 
682             ctx.handler.backend.write().unwrap().set_pending_rx(false);
683             ctx.handler
684                 .backend
685                 .write()
686                 .unwrap()
687                 .set_tx_err(Some(VsockError::NoData));
688             ctx.signal_txq_event();
689 
690             // Both RX and TX queues should be untouched.
691             assert_eq!(ctx.guest_txvq.used.idx.get(), 0);
692             assert_eq!(ctx.guest_rxvq.used.idx.get(), 0);
693         }
694 
695         // Test case:
696         // - the driver supplied a malformed TX buffer.
697         {
698             let test_ctx = TestContext::new();
699             let mut ctx = test_ctx.create_epoll_handler_context();
700 
701             // Invalidate the packet header descriptor, by setting its length to 0.
702             ctx.guest_txvq.dtable[0].len.set(0);
703             ctx.signal_txq_event();
704 
705             // The available descriptor should have been consumed, but no packet should have
706             // reached the backend.
707             assert_eq!(ctx.guest_txvq.used.idx.get(), 1);
708             assert_eq!(ctx.handler.backend.read().unwrap().tx_ok_cnt, 0);
709         }
710 
711         // Test case: spurious TXQ_EVENT.
712         {
713             let test_ctx = TestContext::new();
714             let mut ctx = test_ctx.create_epoll_handler_context();
715 
716             let events = epoll::Events::EPOLLIN;
717             let event = epoll::Event::new(events, TX_QUEUE_EVENT as u64);
718             let mut epoll_helper =
719                 EpollHelper::new(&ctx.handler.kill_evt, &ctx.handler.pause_evt).unwrap();
720 
721             assert!(
722                 ctx.handler.handle_event(&mut epoll_helper, &event).is_err(),
723                 "handle_event() should have failed"
724             );
725         }
726     }
727 
728     #[test]
729     fn test_rxq_event() {
730         // Test case:
731         // - there is pending RX data in the backend; and
732         // - the driver makes RX buffers available; and
733         // - the backend successfully places its RX data into the queue.
734         {
735             let test_ctx = TestContext::new();
736             let mut ctx = test_ctx.create_epoll_handler_context();
737 
738             ctx.handler.backend.write().unwrap().set_pending_rx(true);
739             ctx.handler
740                 .backend
741                 .write()
742                 .unwrap()
743                 .set_rx_err(Some(VsockError::NoData));
744             ctx.signal_rxq_event();
745 
746             // The available RX buffer should've been left untouched.
747             assert_eq!(ctx.guest_rxvq.used.idx.get(), 0);
748         }
749 
750         // Test case:
751         // - there is pending RX data in the backend; and
752         // - the driver makes RX buffers available; and
753         // - the backend errors out, when attempting to receive data.
754         {
755             let test_ctx = TestContext::new();
756             let mut ctx = test_ctx.create_epoll_handler_context();
757 
758             ctx.handler.backend.write().unwrap().set_pending_rx(true);
759             ctx.signal_rxq_event();
760 
761             // The available RX buffer should have been used.
762             assert_eq!(ctx.guest_rxvq.used.idx.get(), 1);
763         }
764 
765         // Test case: the driver provided a malformed RX descriptor chain.
766         {
767             let test_ctx = TestContext::new();
768             let mut ctx = test_ctx.create_epoll_handler_context();
769 
770             // Invalidate the packet header descriptor, by setting its length to 0.
771             ctx.guest_rxvq.dtable[0].len.set(0);
772 
773             // The chain should've been processed, without employing the backend.
774             assert!(ctx.handler.process_rx().is_ok());
775             assert_eq!(ctx.guest_rxvq.used.idx.get(), 1);
776             assert_eq!(ctx.handler.backend.read().unwrap().rx_ok_cnt, 0);
777         }
778 
779         // Test case: spurious RXQ_EVENT.
780         {
781             let test_ctx = TestContext::new();
782             let mut ctx = test_ctx.create_epoll_handler_context();
783             ctx.handler.backend.write().unwrap().set_pending_rx(false);
784 
785             let events = epoll::Events::EPOLLIN;
786             let event = epoll::Event::new(events, RX_QUEUE_EVENT as u64);
787             let mut epoll_helper =
788                 EpollHelper::new(&ctx.handler.kill_evt, &ctx.handler.pause_evt).unwrap();
789 
790             assert!(
791                 ctx.handler.handle_event(&mut epoll_helper, &event).is_err(),
792                 "handle_event() should have failed"
793             );
794         }
795     }
796 
797     #[test]
798     fn test_evq_event() {
799         // Test case: spurious EVQ_EVENT.
800         {
801             let test_ctx = TestContext::new();
802             let mut ctx = test_ctx.create_epoll_handler_context();
803             ctx.handler.backend.write().unwrap().set_pending_rx(false);
804 
805             let events = epoll::Events::EPOLLIN;
806             let event = epoll::Event::new(events, EVT_QUEUE_EVENT as u64);
807             let mut epoll_helper =
808                 EpollHelper::new(&ctx.handler.kill_evt, &ctx.handler.pause_evt).unwrap();
809 
810             assert!(
811                 ctx.handler.handle_event(&mut epoll_helper, &event).is_err(),
812                 "handle_event() should have failed"
813             );
814         }
815     }
816 
817     #[test]
818     fn test_backend_event() {
819         // Test case:
820         // - a backend event is received; and
821         // - the backend has pending RX data.
822         {
823             let test_ctx = TestContext::new();
824             let mut ctx = test_ctx.create_epoll_handler_context();
825 
826             ctx.handler.backend.write().unwrap().set_pending_rx(true);
827 
828             let events = epoll::Events::EPOLLIN;
829             let event = epoll::Event::new(events, BACKEND_EVENT as u64);
830             let mut epoll_helper =
831                 EpollHelper::new(&ctx.handler.kill_evt, &ctx.handler.pause_evt).unwrap();
832             assert!(ctx.handler.handle_event(&mut epoll_helper, &event).is_ok());
833 
834             // The backend should've received this event.
835             assert_eq!(
836                 ctx.handler.backend.read().unwrap().evset,
837                 Some(epoll::Events::EPOLLIN)
838             );
839             // TX queue processing should've been triggered.
840             assert_eq!(ctx.guest_txvq.used.idx.get(), 1);
841             // RX queue processing should've been triggered.
842             assert_eq!(ctx.guest_rxvq.used.idx.get(), 1);
843         }
844 
845         // Test case:
846         // - a backend event is received; and
847         // - the backend doesn't have any pending RX data.
848         {
849             let test_ctx = TestContext::new();
850             let mut ctx = test_ctx.create_epoll_handler_context();
851 
852             ctx.handler.backend.write().unwrap().set_pending_rx(false);
853 
854             let events = epoll::Events::EPOLLIN;
855             let event = epoll::Event::new(events, BACKEND_EVENT as u64);
856             let mut epoll_helper =
857                 EpollHelper::new(&ctx.handler.kill_evt, &ctx.handler.pause_evt).unwrap();
858             assert!(ctx.handler.handle_event(&mut epoll_helper, &event).is_ok());
859 
860             // The backend should've received this event.
861             assert_eq!(
862                 ctx.handler.backend.read().unwrap().evset,
863                 Some(epoll::Events::EPOLLIN)
864             );
865             // TX queue processing should've been triggered.
866             assert_eq!(ctx.guest_txvq.used.idx.get(), 1);
867             // The RX queue should've been left untouched.
868             assert_eq!(ctx.guest_rxvq.used.idx.get(), 0);
869         }
870     }
871 
872     #[test]
873     fn test_unknown_event() {
874         let test_ctx = TestContext::new();
875         let mut ctx = test_ctx.create_epoll_handler_context();
876 
877         let events = epoll::Events::EPOLLIN;
878         let event = epoll::Event::new(events, 0xff);
879         let mut epoll_helper =
880             EpollHelper::new(&ctx.handler.kill_evt, &ctx.handler.pause_evt).unwrap();
881 
882         assert!(
883             ctx.handler.handle_event(&mut epoll_helper, &event).is_err(),
884             "handle_event() should have failed"
885         );
886     }
887 }
888