xref: /cloud-hypervisor/virtio-devices/src/net.rs (revision 686e6d50824fcc7403a51b91545899a6301d6216)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
5 // Use of this source code is governed by a BSD-style license that can be
6 // found in the THIRD-PARTY file.
7 
8 use super::Error as DeviceError;
9 use super::{
10     ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler,
11     RateLimiterConfig, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType,
12     EPOLL_HELPER_EVENT_LAST,
13 };
14 use crate::seccomp_filters::Thread;
15 use crate::thread_helper::spawn_virtio_thread;
16 use crate::GuestMemoryMmap;
17 use crate::VirtioInterrupt;
18 use net_util::CtrlQueue;
19 use net_util::{
20     build_net_config_space, build_net_config_space_with_mq, open_tap,
21     virtio_features_to_tap_offload, MacAddr, NetCounters, NetQueuePair, OpenTapError, RxVirtio,
22     Tap, TapError, TxVirtio, VirtioNetConfig,
23 };
24 use seccompiler::SeccompAction;
25 use std::net::Ipv4Addr;
26 use std::num::Wrapping;
27 use std::os::unix::io::{AsRawFd, RawFd};
28 use std::result;
29 use std::sync::atomic::{AtomicBool, Ordering};
30 use std::sync::{Arc, Barrier};
31 use std::thread;
32 use std::vec::Vec;
33 use std::{collections::HashMap, convert::TryInto};
34 use versionize::{VersionMap, Versionize, VersionizeResult};
35 use versionize_derive::Versionize;
36 use virtio_bindings::bindings::virtio_net::*;
37 use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
38 use virtio_queue::Queue;
39 use vm_memory::{ByteValued, GuestMemoryAtomic};
40 use vm_migration::VersionMapped;
41 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
42 use vm_virtio::AccessPlatform;
43 use vmm_sys_util::eventfd::EventFd;
44 
45 /// Control queue
46 // Event available on the control queue.
47 const CTRL_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
48 
49 pub struct NetCtrlEpollHandler {
50     pub kill_evt: EventFd,
51     pub pause_evt: EventFd,
52     pub ctrl_q: CtrlQueue,
53     pub queue_evt: EventFd,
54     pub queue: Queue<GuestMemoryAtomic<GuestMemoryMmap>>,
55     pub access_platform: Option<Arc<dyn AccessPlatform>>,
56     pub interrupt_cb: Arc<dyn VirtioInterrupt>,
57     pub queue_index: u16,
58 }
59 
60 impl NetCtrlEpollHandler {
61     fn signal_used_queue(&self, queue_index: u16) -> result::Result<(), DeviceError> {
62         self.interrupt_cb
63             .trigger(VirtioInterruptType::Queue(queue_index))
64             .map_err(|e| {
65                 error!("Failed to signal used queue: {:?}", e);
66                 DeviceError::FailedSignalingUsedQueue(e)
67             })
68     }
69 
70     pub fn run_ctrl(
71         &mut self,
72         paused: Arc<AtomicBool>,
73         paused_sync: Arc<Barrier>,
74     ) -> std::result::Result<(), EpollHelperError> {
75         let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
76         helper.add_event(self.queue_evt.as_raw_fd(), CTRL_QUEUE_EVENT)?;
77         helper.run(paused, paused_sync, self)?;
78 
79         Ok(())
80     }
81 }
82 
83 impl EpollHelperHandler for NetCtrlEpollHandler {
84     fn handle_event(&mut self, _helper: &mut EpollHelper, event: &epoll::Event) -> bool {
85         let ev_type = event.data as u16;
86         match ev_type {
87             CTRL_QUEUE_EVENT => {
88                 if let Err(e) = self.queue_evt.read() {
89                     error!("Failed to get control queue event: {:?}", e);
90                     return true;
91                 }
92                 if let Err(e) = self
93                     .ctrl_q
94                     .process(&mut self.queue, self.access_platform.as_ref())
95                 {
96                     error!("Failed to process control queue: {:?}", e);
97                     return true;
98                 } else {
99                     match self.queue.needs_notification() {
100                         Ok(true) => {
101                             if let Err(e) = self.signal_used_queue(self.queue_index) {
102                                 error!("Error signalling that control queue was used: {:?}", e);
103                                 return true;
104                             }
105                         }
106                         Ok(false) => {}
107                         Err(e) => {
108                             error!("Error getting notification state of control queue: {}", e);
109                             return true;
110                         }
111                     }
112                 }
113             }
114             _ => {
115                 error!("Unknown event for virtio-net control queue");
116                 return true;
117             }
118         }
119 
120         false
121     }
122 }
123 
124 /// Rx/Tx queue pair
125 // The guest has made a buffer available to receive a frame into.
126 pub const RX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
127 // The transmit queue has a frame that is ready to send from the guest.
128 pub const TX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
129 // A frame is available for reading from the tap device to receive in the guest.
130 pub const RX_TAP_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
131 // The TAP can be written to. Used after an EAGAIN error to retry TX.
132 pub const TX_TAP_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 4;
133 // New 'wake up' event from the rx rate limiter
134 pub const RX_RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 5;
135 // New 'wake up' event from the tx rate limiter
136 pub const TX_RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 6;
137 
138 #[derive(Debug)]
139 pub enum Error {
140     /// Failed to open taps.
141     OpenTap(OpenTapError),
142 
143     // Using existing tap
144     TapError(TapError),
145 
146     // Error calling dup() on tap fd
147     DuplicateTapFd(std::io::Error),
148 }
149 
150 pub type Result<T> = result::Result<T, Error>;
151 
152 struct NetEpollHandler {
153     net: NetQueuePair,
154     interrupt_cb: Arc<dyn VirtioInterrupt>,
155     kill_evt: EventFd,
156     pause_evt: EventFd,
157     queue_index_base: u16,
158     queue_pair: Vec<Queue<GuestMemoryAtomic<GuestMemoryMmap>>>,
159     queue_evt_pair: Vec<EventFd>,
160     // Always generate interrupts until the driver has signalled to the device.
161     // This mitigates a problem with interrupts from tap events being "lost" upon
162     // a restore as the vCPU thread isn't ready to handle the interrupt. This causes
163     // issues when combined with VIRTIO_RING_F_EVENT_IDX interrupt suppression.
164     driver_awake: bool,
165 }
166 
167 impl NetEpollHandler {
168     fn signal_used_queue(&self, queue_index: u16) -> result::Result<(), DeviceError> {
169         self.interrupt_cb
170             .trigger(VirtioInterruptType::Queue(queue_index))
171             .map_err(|e| {
172                 error!("Failed to signal used queue: {:?}", e);
173                 DeviceError::FailedSignalingUsedQueue(e)
174             })
175     }
176 
177     fn handle_rx_event(&mut self) -> result::Result<(), DeviceError> {
178         let queue_evt = &self.queue_evt_pair[0];
179         if let Err(e) = queue_evt.read() {
180             error!("Failed to get rx queue event: {:?}", e);
181         }
182 
183         self.net.rx_desc_avail = true;
184 
185         let rate_limit_reached = self
186             .net
187             .rx_rate_limiter
188             .as_ref()
189             .map_or(false, |r| r.is_blocked());
190 
191         // Start to listen on RX_TAP_EVENT only when the rate limit is not reached
192         if !self.net.rx_tap_listening && !rate_limit_reached {
193             net_util::register_listener(
194                 self.net.epoll_fd.unwrap(),
195                 self.net.tap.as_raw_fd(),
196                 epoll::Events::EPOLLIN,
197                 u64::from(self.net.tap_rx_event_id),
198             )
199             .map_err(DeviceError::IoError)?;
200             self.net.rx_tap_listening = true;
201         }
202 
203         Ok(())
204     }
205 
206     fn process_tx(&mut self) -> result::Result<(), DeviceError> {
207         if self
208             .net
209             .process_tx(&mut self.queue_pair[1])
210             .map_err(DeviceError::NetQueuePair)?
211             || !self.driver_awake
212         {
213             self.signal_used_queue(self.queue_index_base + 1)?;
214             debug!("Signalling TX queue");
215         } else {
216             debug!("Not signalling TX queue");
217         }
218         Ok(())
219     }
220 
221     fn handle_tx_event(&mut self) -> result::Result<(), DeviceError> {
222         let rate_limit_reached = self
223             .net
224             .tx_rate_limiter
225             .as_ref()
226             .map_or(false, |r| r.is_blocked());
227 
228         if !rate_limit_reached {
229             self.process_tx()?;
230         }
231 
232         Ok(())
233     }
234 
235     fn handle_rx_tap_event(&mut self) -> result::Result<(), DeviceError> {
236         if self
237             .net
238             .process_rx(&mut self.queue_pair[0])
239             .map_err(DeviceError::NetQueuePair)?
240             || !self.driver_awake
241         {
242             self.signal_used_queue(self.queue_index_base)?;
243             debug!("Signalling RX queue");
244         } else {
245             debug!("Not signalling RX queue");
246         }
247         Ok(())
248     }
249 
250     fn run(
251         &mut self,
252         paused: Arc<AtomicBool>,
253         paused_sync: Arc<Barrier>,
254     ) -> result::Result<(), EpollHelperError> {
255         let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
256         helper.add_event(self.queue_evt_pair[0].as_raw_fd(), RX_QUEUE_EVENT)?;
257         helper.add_event(self.queue_evt_pair[1].as_raw_fd(), TX_QUEUE_EVENT)?;
258         if let Some(rate_limiter) = &self.net.rx_rate_limiter {
259             helper.add_event(rate_limiter.as_raw_fd(), RX_RATE_LIMITER_EVENT)?;
260         }
261         if let Some(rate_limiter) = &self.net.tx_rate_limiter {
262             helper.add_event(rate_limiter.as_raw_fd(), TX_RATE_LIMITER_EVENT)?;
263         }
264 
265         // If there are some already available descriptors on the RX queue,
266         // then we can start the thread while listening onto the TAP.
267         if self.queue_pair[0]
268             .used_idx(Ordering::Acquire)
269             .map_err(EpollHelperError::QueueRingIndex)?
270             < self.queue_pair[0]
271                 .avail_idx(Ordering::Acquire)
272                 .map_err(EpollHelperError::QueueRingIndex)?
273         {
274             helper.add_event(self.net.tap.as_raw_fd(), RX_TAP_EVENT)?;
275             self.net.rx_tap_listening = true;
276             info!("Listener registered at start");
277         }
278 
279         // The NetQueuePair needs the epoll fd.
280         self.net.epoll_fd = Some(helper.as_raw_fd());
281 
282         helper.run(paused, paused_sync, self)?;
283 
284         Ok(())
285     }
286 }
287 
288 impl EpollHelperHandler for NetEpollHandler {
289     fn handle_event(&mut self, _helper: &mut EpollHelper, event: &epoll::Event) -> bool {
290         let ev_type = event.data as u16;
291         match ev_type {
292             RX_QUEUE_EVENT => {
293                 self.driver_awake = true;
294                 if let Err(e) = self.handle_rx_event() {
295                     error!("Error processing RX queue: {:?}", e);
296                     return true;
297                 }
298             }
299             TX_QUEUE_EVENT => {
300                 let queue_evt = &self.queue_evt_pair[1];
301                 if let Err(e) = queue_evt.read() {
302                     error!("Failed to get tx queue event: {:?}", e);
303                 }
304                 self.driver_awake = true;
305                 if let Err(e) = self.handle_tx_event() {
306                     error!("Error processing TX queue: {:?}", e);
307                     return true;
308                 }
309             }
310             TX_TAP_EVENT => {
311                 if let Err(e) = self.handle_tx_event() {
312                     error!("Error processing TX queue (TAP event): {:?}", e);
313                     return true;
314                 }
315             }
316             RX_TAP_EVENT => {
317                 if let Err(e) = self.handle_rx_tap_event() {
318                     error!("Error processing tap queue: {:?}", e);
319                     return true;
320                 }
321             }
322             RX_RATE_LIMITER_EVENT => {
323                 if let Some(rate_limiter) = &mut self.net.rx_rate_limiter {
324                     // Upon rate limiter event, call the rate limiter handler and register the
325                     // TAP fd for further processing if some RX buffers are available
326                     match rate_limiter.event_handler() {
327                         Ok(_) => {
328                             if !self.net.rx_tap_listening && self.net.rx_desc_avail {
329                                 if let Err(e) = net_util::register_listener(
330                                     self.net.epoll_fd.unwrap(),
331                                     self.net.tap.as_raw_fd(),
332                                     epoll::Events::EPOLLIN,
333                                     u64::from(self.net.tap_rx_event_id),
334                                 ) {
335                                     error!("Error register_listener with `RX_RATE_LIMITER_EVENT`: {:?}", e);
336                                     return true;
337                                 }
338                                 self.net.rx_tap_listening = true;
339                             }
340                         }
341                         Err(e) => {
342                             error!("Error from 'rate_limiter.event_handler()': {:?}", e);
343                             return true;
344                         }
345                     }
346                 } else {
347                     error!("Unexpected RX_RATE_LIMITER_EVENT");
348                     return true;
349                 }
350             }
351             TX_RATE_LIMITER_EVENT => {
352                 if let Some(rate_limiter) = &mut self.net.tx_rate_limiter {
353                     // Upon rate limiter event, call the rate limiter handler
354                     // and restart processing the queue.
355                     match rate_limiter.event_handler() {
356                         Ok(_) => {
357                             self.driver_awake = true;
358                             if let Err(e) = self.process_tx() {
359                                 error!("Error processing TX queue: {:?}", e);
360                                 return true;
361                             }
362                         }
363                         Err(e) => {
364                             error!("Error from 'rate_limiter.event_handler()': {:?}", e);
365                             return true;
366                         }
367                     }
368                 } else {
369                     error!("Unexpected TX_RATE_LIMITER_EVENT");
370                     return true;
371                 }
372             }
373             _ => {
374                 error!("Unknown event: {}", ev_type);
375                 return true;
376             }
377         }
378         false
379     }
380 }
381 
382 pub struct Net {
383     common: VirtioCommon,
384     id: String,
385     taps: Vec<Tap>,
386     config: VirtioNetConfig,
387     ctrl_queue_epoll_thread: Option<thread::JoinHandle<()>>,
388     counters: NetCounters,
389     seccomp_action: SeccompAction,
390     rate_limiter_config: Option<RateLimiterConfig>,
391     exit_evt: EventFd,
392 }
393 
394 #[derive(Versionize)]
395 pub struct NetState {
396     pub avail_features: u64,
397     pub acked_features: u64,
398     pub config: VirtioNetConfig,
399     pub queue_size: Vec<u16>,
400 }
401 
402 impl VersionMapped for NetState {}
403 
404 impl Net {
405     /// Create a new virtio network device with the given TAP interface.
406     #[allow(clippy::too_many_arguments)]
407     pub fn new_with_tap(
408         id: String,
409         taps: Vec<Tap>,
410         guest_mac: Option<MacAddr>,
411         iommu: bool,
412         num_queues: usize,
413         queue_size: u16,
414         seccomp_action: SeccompAction,
415         rate_limiter_config: Option<RateLimiterConfig>,
416         exit_evt: EventFd,
417     ) -> Result<Self> {
418         let mut avail_features = 1 << VIRTIO_NET_F_CSUM
419             | 1 << VIRTIO_NET_F_CTRL_GUEST_OFFLOADS
420             | 1 << VIRTIO_NET_F_GUEST_CSUM
421             | 1 << VIRTIO_NET_F_GUEST_ECN
422             | 1 << VIRTIO_NET_F_GUEST_TSO4
423             | 1 << VIRTIO_NET_F_GUEST_TSO6
424             | 1 << VIRTIO_NET_F_GUEST_UFO
425             | 1 << VIRTIO_NET_F_HOST_ECN
426             | 1 << VIRTIO_NET_F_HOST_TSO4
427             | 1 << VIRTIO_NET_F_HOST_TSO6
428             | 1 << VIRTIO_NET_F_HOST_UFO
429             | 1 << VIRTIO_RING_F_EVENT_IDX
430             | 1 << VIRTIO_F_VERSION_1;
431 
432         if iommu {
433             avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM;
434         }
435 
436         avail_features |= 1 << VIRTIO_NET_F_CTRL_VQ;
437         let queue_num = num_queues + 1;
438 
439         let mut config = VirtioNetConfig::default();
440         if let Some(mac) = guest_mac {
441             build_net_config_space(&mut config, mac, num_queues, &mut avail_features);
442         } else {
443             build_net_config_space_with_mq(&mut config, num_queues, &mut avail_features);
444         }
445 
446         Ok(Net {
447             common: VirtioCommon {
448                 device_type: VirtioDeviceType::Net as u32,
449                 avail_features,
450                 queue_sizes: vec![queue_size; queue_num],
451                 paused_sync: Some(Arc::new(Barrier::new((num_queues / 2) + 1))),
452                 min_queues: 2,
453                 ..Default::default()
454             },
455             id,
456             taps,
457             config,
458             ctrl_queue_epoll_thread: None,
459             counters: NetCounters::default(),
460             seccomp_action,
461             rate_limiter_config,
462             exit_evt,
463         })
464     }
465 
466     /// Create a new virtio network device with the given IP address and
467     /// netmask.
468     #[allow(clippy::too_many_arguments)]
469     pub fn new(
470         id: String,
471         if_name: Option<&str>,
472         ip_addr: Option<Ipv4Addr>,
473         netmask: Option<Ipv4Addr>,
474         guest_mac: Option<MacAddr>,
475         host_mac: &mut Option<MacAddr>,
476         iommu: bool,
477         num_queues: usize,
478         queue_size: u16,
479         seccomp_action: SeccompAction,
480         rate_limiter_config: Option<RateLimiterConfig>,
481         exit_evt: EventFd,
482     ) -> Result<Self> {
483         let taps = open_tap(if_name, ip_addr, netmask, host_mac, num_queues / 2, None)
484             .map_err(Error::OpenTap)?;
485 
486         Self::new_with_tap(
487             id,
488             taps,
489             guest_mac,
490             iommu,
491             num_queues,
492             queue_size,
493             seccomp_action,
494             rate_limiter_config,
495             exit_evt,
496         )
497     }
498 
499     #[allow(clippy::too_many_arguments)]
500     pub fn from_tap_fds(
501         id: String,
502         fds: &[RawFd],
503         guest_mac: Option<MacAddr>,
504         iommu: bool,
505         queue_size: u16,
506         seccomp_action: SeccompAction,
507         rate_limiter_config: Option<RateLimiterConfig>,
508         exit_evt: EventFd,
509     ) -> Result<Self> {
510         let mut taps: Vec<Tap> = Vec::new();
511         let num_queue_pairs = fds.len();
512 
513         for fd in fds.iter() {
514             // Duplicate so that it can survive reboots
515             // SAFETY: FFI call to dup. Trivially safe.
516             let fd = unsafe { libc::dup(*fd) };
517             if fd < 0 {
518                 return Err(Error::DuplicateTapFd(std::io::Error::last_os_error()));
519             }
520             let tap = Tap::from_tap_fd(fd, num_queue_pairs).map_err(Error::TapError)?;
521             taps.push(tap);
522         }
523 
524         Self::new_with_tap(
525             id,
526             taps,
527             guest_mac,
528             iommu,
529             num_queue_pairs * 2,
530             queue_size,
531             seccomp_action,
532             rate_limiter_config,
533             exit_evt,
534         )
535     }
536 
537     fn state(&self) -> NetState {
538         NetState {
539             avail_features: self.common.avail_features,
540             acked_features: self.common.acked_features,
541             config: self.config,
542             queue_size: self.common.queue_sizes.clone(),
543         }
544     }
545 
546     fn set_state(&mut self, state: &NetState) {
547         self.common.avail_features = state.avail_features;
548         self.common.acked_features = state.acked_features;
549         self.config = state.config;
550         self.common.queue_sizes = state.queue_size.clone();
551     }
552 }
553 
554 impl Drop for Net {
555     fn drop(&mut self) {
556         if let Some(kill_evt) = self.common.kill_evt.take() {
557             // Ignore the result because there is nothing we can do about it.
558             let _ = kill_evt.write(1);
559         }
560     }
561 }
562 
563 impl VirtioDevice for Net {
564     fn device_type(&self) -> u32 {
565         self.common.device_type
566     }
567 
568     fn queue_max_sizes(&self) -> &[u16] {
569         &self.common.queue_sizes
570     }
571 
572     fn features(&self) -> u64 {
573         self.common.avail_features
574     }
575 
576     fn ack_features(&mut self, value: u64) {
577         self.common.ack_features(value)
578     }
579 
580     fn read_config(&self, offset: u64, data: &mut [u8]) {
581         self.read_config_from_slice(self.config.as_slice(), offset, data);
582     }
583 
584     fn activate(
585         &mut self,
586         _mem: GuestMemoryAtomic<GuestMemoryMmap>,
587         interrupt_cb: Arc<dyn VirtioInterrupt>,
588         mut queues: Vec<(usize, Queue<GuestMemoryAtomic<GuestMemoryMmap>>, EventFd)>,
589     ) -> ActivateResult {
590         self.common.activate(&queues, &interrupt_cb)?;
591 
592         let num_queues = queues.len();
593         let event_idx = self.common.feature_acked(VIRTIO_RING_F_EVENT_IDX.into());
594         if self.common.feature_acked(VIRTIO_NET_F_CTRL_VQ.into()) && num_queues % 2 != 0 {
595             let ctrl_queue_index = num_queues - 1;
596             let (_, mut ctrl_queue, ctrl_queue_evt) = queues.remove(ctrl_queue_index);
597 
598             ctrl_queue.set_event_idx(event_idx);
599 
600             let (kill_evt, pause_evt) = self.common.dup_eventfds();
601             let mut ctrl_handler = NetCtrlEpollHandler {
602                 kill_evt,
603                 pause_evt,
604                 ctrl_q: CtrlQueue::new(self.taps.clone()),
605                 queue: ctrl_queue,
606                 queue_evt: ctrl_queue_evt,
607                 access_platform: self.common.access_platform.clone(),
608                 queue_index: ctrl_queue_index as u16,
609                 interrupt_cb: interrupt_cb.clone(),
610             };
611 
612             let paused = self.common.paused.clone();
613             // Let's update the barrier as we need 1 for each RX/TX pair +
614             // 1 for the control queue + 1 for the main thread signalling
615             // the pause.
616             self.common.paused_sync = Some(Arc::new(Barrier::new(self.taps.len() + 2)));
617             let paused_sync = self.common.paused_sync.clone();
618 
619             let mut epoll_threads = Vec::new();
620             spawn_virtio_thread(
621                 &format!("{}_ctrl", &self.id),
622                 &self.seccomp_action,
623                 Thread::VirtioNetCtl,
624                 &mut epoll_threads,
625                 &self.exit_evt,
626                 move || {
627                     if let Err(e) = ctrl_handler.run_ctrl(paused, paused_sync.unwrap()) {
628                         error!("Error running worker: {:?}", e);
629                     }
630                 },
631             )?;
632             self.ctrl_queue_epoll_thread = Some(epoll_threads.remove(0));
633         }
634 
635         let mut epoll_threads = Vec::new();
636         let mut taps = self.taps.clone();
637         for i in 0..queues.len() / 2 {
638             let rx = RxVirtio::new();
639             let tx = TxVirtio::new();
640             let rx_tap_listening = false;
641 
642             let (_, queue_0, queue_evt_0) = queues.remove(0);
643             let (_, queue_1, queue_evt_1) = queues.remove(0);
644             let mut queue_pair = vec![queue_0, queue_1];
645             queue_pair[0].set_event_idx(event_idx);
646             queue_pair[1].set_event_idx(event_idx);
647 
648             let queue_evt_pair = vec![queue_evt_0, queue_evt_1];
649 
650             let (kill_evt, pause_evt) = self.common.dup_eventfds();
651 
652             let rx_rate_limiter: Option<rate_limiter::RateLimiter> = self
653                 .rate_limiter_config
654                 .map(RateLimiterConfig::try_into)
655                 .transpose()
656                 .map_err(ActivateError::CreateRateLimiter)?;
657 
658             let tx_rate_limiter: Option<rate_limiter::RateLimiter> = self
659                 .rate_limiter_config
660                 .map(RateLimiterConfig::try_into)
661                 .transpose()
662                 .map_err(ActivateError::CreateRateLimiter)?;
663 
664             let tap = taps.remove(0);
665             tap.set_offload(virtio_features_to_tap_offload(self.common.acked_features))
666                 .map_err(|e| {
667                     error!("Error programming tap offload: {:?}", e);
668                     ActivateError::BadActivate
669                 })?;
670 
671             let mut handler = NetEpollHandler {
672                 net: NetQueuePair {
673                     tap_for_write_epoll: tap.clone(),
674                     tap,
675                     rx,
676                     tx,
677                     epoll_fd: None,
678                     rx_tap_listening,
679                     tx_tap_listening: false,
680                     counters: self.counters.clone(),
681                     tap_rx_event_id: RX_TAP_EVENT,
682                     tap_tx_event_id: TX_TAP_EVENT,
683                     rx_desc_avail: false,
684                     rx_rate_limiter,
685                     tx_rate_limiter,
686                     access_platform: self.common.access_platform.clone(),
687                 },
688                 queue_index_base: (i * 2) as u16,
689                 queue_pair,
690                 queue_evt_pair,
691                 interrupt_cb: interrupt_cb.clone(),
692                 kill_evt,
693                 pause_evt,
694                 driver_awake: false,
695             };
696 
697             let paused = self.common.paused.clone();
698             let paused_sync = self.common.paused_sync.clone();
699 
700             spawn_virtio_thread(
701                 &format!("{}_qp{}", self.id.clone(), i),
702                 &self.seccomp_action,
703                 Thread::VirtioNet,
704                 &mut epoll_threads,
705                 &self.exit_evt,
706                 move || {
707                     if let Err(e) = handler.run(paused, paused_sync.unwrap()) {
708                         error!("Error running worker: {:?}", e);
709                     }
710                 },
711             )?;
712         }
713 
714         self.common.epoll_threads = Some(epoll_threads);
715 
716         event!("virtio-device", "activated", "id", &self.id);
717         Ok(())
718     }
719 
720     fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> {
721         let result = self.common.reset();
722         event!("virtio-device", "reset", "id", &self.id);
723         result
724     }
725 
726     fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> {
727         let mut counters = HashMap::new();
728 
729         counters.insert(
730             "rx_bytes",
731             Wrapping(self.counters.rx_bytes.load(Ordering::Acquire)),
732         );
733         counters.insert(
734             "rx_frames",
735             Wrapping(self.counters.rx_frames.load(Ordering::Acquire)),
736         );
737         counters.insert(
738             "tx_bytes",
739             Wrapping(self.counters.tx_bytes.load(Ordering::Acquire)),
740         );
741         counters.insert(
742             "tx_frames",
743             Wrapping(self.counters.tx_frames.load(Ordering::Acquire)),
744         );
745 
746         Some(counters)
747     }
748 
749     fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) {
750         self.common.set_access_platform(access_platform)
751     }
752 }
753 
754 impl Pausable for Net {
755     fn pause(&mut self) -> result::Result<(), MigratableError> {
756         self.common.pause()
757     }
758 
759     fn resume(&mut self) -> result::Result<(), MigratableError> {
760         self.common.resume()?;
761 
762         if let Some(ctrl_queue_epoll_thread) = &self.ctrl_queue_epoll_thread {
763             ctrl_queue_epoll_thread.thread().unpark();
764         }
765         Ok(())
766     }
767 }
768 
769 impl Snapshottable for Net {
770     fn id(&self) -> String {
771         self.id.clone()
772     }
773 
774     fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
775         Snapshot::new_from_versioned_state(&self.id, &self.state())
776     }
777 
778     fn restore(&mut self, snapshot: Snapshot) -> std::result::Result<(), MigratableError> {
779         self.set_state(&snapshot.to_versioned_state(&self.id)?);
780         Ok(())
781     }
782 }
783 impl Transportable for Net {}
784 impl Migratable for Net {}
785