xref: /cloud-hypervisor/virtio-devices/src/net.rs (revision eea9bcea38e0c5649f444c829f3a4f9c22aa486c)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
5 // Use of this source code is governed by a BSD-style license that can be
6 // found in the THIRD-PARTY file.
7 
8 use super::Error as DeviceError;
9 use super::{
10     ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler,
11     RateLimiterConfig, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType,
12     EPOLL_HELPER_EVENT_LAST,
13 };
14 use crate::seccomp_filters::Thread;
15 use crate::thread_helper::spawn_virtio_thread;
16 use crate::GuestMemoryMmap;
17 use crate::VirtioInterrupt;
18 use anyhow::anyhow;
19 use net_util::CtrlQueue;
20 use net_util::{
21     build_net_config_space, build_net_config_space_with_mq, open_tap,
22     virtio_features_to_tap_offload, MacAddr, NetCounters, NetQueuePair, OpenTapError, RxVirtio,
23     Tap, TapError, TxVirtio, VirtioNetConfig,
24 };
25 use seccompiler::SeccompAction;
26 use std::net::Ipv4Addr;
27 use std::num::Wrapping;
28 use std::ops::Deref;
29 use std::os::unix::io::{AsRawFd, RawFd};
30 use std::result;
31 use std::sync::atomic::{AtomicBool, Ordering};
32 use std::sync::{Arc, Barrier};
33 use std::thread;
34 use std::vec::Vec;
35 use std::{collections::HashMap, convert::TryInto};
36 use thiserror::Error;
37 use versionize::{VersionMap, Versionize, VersionizeResult};
38 use versionize_derive::Versionize;
39 use virtio_bindings::bindings::virtio_net::*;
40 use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
41 use virtio_queue::{Queue, QueueT};
42 use vm_memory::{ByteValued, GuestAddressSpace, GuestMemoryAtomic};
43 use vm_migration::VersionMapped;
44 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
45 use vm_virtio::AccessPlatform;
46 use vmm_sys_util::eventfd::EventFd;
47 
48 /// Control queue
49 // Event available on the control queue.
50 const CTRL_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
51 
52 // Following the VIRTIO specification, the MTU should be at least 1280.
53 pub const MIN_MTU: u16 = 1280;
54 
55 pub struct NetCtrlEpollHandler {
56     pub mem: GuestMemoryAtomic<GuestMemoryMmap>,
57     pub kill_evt: EventFd,
58     pub pause_evt: EventFd,
59     pub ctrl_q: CtrlQueue,
60     pub queue_evt: EventFd,
61     pub queue: Queue,
62     pub access_platform: Option<Arc<dyn AccessPlatform>>,
63     pub interrupt_cb: Arc<dyn VirtioInterrupt>,
64     pub queue_index: u16,
65 }
66 
67 impl NetCtrlEpollHandler {
68     fn signal_used_queue(&self, queue_index: u16) -> result::Result<(), DeviceError> {
69         self.interrupt_cb
70             .trigger(VirtioInterruptType::Queue(queue_index))
71             .map_err(|e| {
72                 error!("Failed to signal used queue: {:?}", e);
73                 DeviceError::FailedSignalingUsedQueue(e)
74             })
75     }
76 
77     pub fn run_ctrl(
78         &mut self,
79         paused: Arc<AtomicBool>,
80         paused_sync: Arc<Barrier>,
81     ) -> std::result::Result<(), EpollHelperError> {
82         let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
83         helper.add_event(self.queue_evt.as_raw_fd(), CTRL_QUEUE_EVENT)?;
84         helper.run(paused, paused_sync, self)?;
85 
86         Ok(())
87     }
88 }
89 
90 impl EpollHelperHandler for NetCtrlEpollHandler {
91     fn handle_event(
92         &mut self,
93         _helper: &mut EpollHelper,
94         event: &epoll::Event,
95     ) -> result::Result<(), EpollHelperError> {
96         let ev_type = event.data as u16;
97         match ev_type {
98             CTRL_QUEUE_EVENT => {
99                 let mem = self.mem.memory();
100                 self.queue_evt.read().map_err(|e| {
101                     EpollHelperError::HandleEvent(anyhow!(
102                         "Failed to get control queue event: {:?}",
103                         e
104                     ))
105                 })?;
106                 self.ctrl_q
107                     .process(mem.deref(), &mut self.queue, self.access_platform.as_ref())
108                     .map_err(|e| {
109                         EpollHelperError::HandleEvent(anyhow!(
110                             "Failed to process control queue: {:?}",
111                             e
112                         ))
113                     })?;
114                 match self.queue.needs_notification(mem.deref()) {
115                     Ok(true) => {
116                         self.signal_used_queue(self.queue_index).map_err(|e| {
117                             EpollHelperError::HandleEvent(anyhow!(
118                                 "Error signalling that control queue was used: {:?}",
119                                 e
120                             ))
121                         })?;
122                     }
123                     Ok(false) => {}
124                     Err(e) => {
125                         return Err(EpollHelperError::HandleEvent(anyhow!(
126                             "Error getting notification state of control queue: {}",
127                             e
128                         )));
129                     }
130                 };
131             }
132             _ => {
133                 return Err(EpollHelperError::HandleEvent(anyhow!(
134                     "Unknown event for virtio-net control queue"
135                 )));
136             }
137         }
138 
139         Ok(())
140     }
141 }
142 
143 /// Rx/Tx queue pair
144 // The guest has made a buffer available to receive a frame into.
145 pub const RX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
146 // The transmit queue has a frame that is ready to send from the guest.
147 pub const TX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
148 // A frame is available for reading from the tap device to receive in the guest.
149 pub const RX_TAP_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
150 // The TAP can be written to. Used after an EAGAIN error to retry TX.
151 pub const TX_TAP_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 4;
152 // New 'wake up' event from the rx rate limiter
153 pub const RX_RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 5;
154 // New 'wake up' event from the tx rate limiter
155 pub const TX_RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 6;
156 
157 #[derive(Error, Debug)]
158 pub enum Error {
159     #[error("Failed to open taps: {0}")]
160     OpenTap(OpenTapError),
161     #[error("Using existing tap: {0}")]
162     TapError(TapError),
163     #[error("Error calling dup() on tap fd: {0}")]
164     DuplicateTapFd(std::io::Error),
165 }
166 
167 pub type Result<T> = result::Result<T, Error>;
168 
169 struct NetEpollHandler {
170     net: NetQueuePair,
171     mem: GuestMemoryAtomic<GuestMemoryMmap>,
172     interrupt_cb: Arc<dyn VirtioInterrupt>,
173     kill_evt: EventFd,
174     pause_evt: EventFd,
175     queue_index_base: u16,
176     queue_pair: Vec<Queue>,
177     queue_evt_pair: Vec<EventFd>,
178     // Always generate interrupts until the driver has signalled to the device.
179     // This mitigates a problem with interrupts from tap events being "lost" upon
180     // a restore as the vCPU thread isn't ready to handle the interrupt. This causes
181     // issues when combined with VIRTIO_RING_F_EVENT_IDX interrupt suppression.
182     driver_awake: bool,
183 }
184 
185 impl NetEpollHandler {
186     fn signal_used_queue(&self, queue_index: u16) -> result::Result<(), DeviceError> {
187         self.interrupt_cb
188             .trigger(VirtioInterruptType::Queue(queue_index))
189             .map_err(|e| {
190                 error!("Failed to signal used queue: {:?}", e);
191                 DeviceError::FailedSignalingUsedQueue(e)
192             })
193     }
194 
195     fn handle_rx_event(&mut self) -> result::Result<(), DeviceError> {
196         let queue_evt = &self.queue_evt_pair[0];
197         if let Err(e) = queue_evt.read() {
198             error!("Failed to get rx queue event: {:?}", e);
199         }
200 
201         self.net.rx_desc_avail = true;
202 
203         let rate_limit_reached = self
204             .net
205             .rx_rate_limiter
206             .as_ref()
207             .map_or(false, |r| r.is_blocked());
208 
209         // Start to listen on RX_TAP_EVENT only when the rate limit is not reached
210         if !self.net.rx_tap_listening && !rate_limit_reached {
211             net_util::register_listener(
212                 self.net.epoll_fd.unwrap(),
213                 self.net.tap.as_raw_fd(),
214                 epoll::Events::EPOLLIN,
215                 u64::from(self.net.tap_rx_event_id),
216             )
217             .map_err(DeviceError::IoError)?;
218             self.net.rx_tap_listening = true;
219         }
220 
221         Ok(())
222     }
223 
224     fn process_tx(&mut self) -> result::Result<(), DeviceError> {
225         if self
226             .net
227             .process_tx(&self.mem.memory(), &mut self.queue_pair[1])
228             .map_err(DeviceError::NetQueuePair)?
229             || !self.driver_awake
230         {
231             self.signal_used_queue(self.queue_index_base + 1)?;
232             debug!("Signalling TX queue");
233         } else {
234             debug!("Not signalling TX queue");
235         }
236         Ok(())
237     }
238 
239     fn handle_tx_event(&mut self) -> result::Result<(), DeviceError> {
240         let rate_limit_reached = self
241             .net
242             .tx_rate_limiter
243             .as_ref()
244             .map_or(false, |r| r.is_blocked());
245 
246         if !rate_limit_reached {
247             self.process_tx()?;
248         }
249 
250         Ok(())
251     }
252 
253     fn handle_rx_tap_event(&mut self) -> result::Result<(), DeviceError> {
254         if self
255             .net
256             .process_rx(&self.mem.memory(), &mut self.queue_pair[0])
257             .map_err(DeviceError::NetQueuePair)?
258             || !self.driver_awake
259         {
260             self.signal_used_queue(self.queue_index_base)?;
261             debug!("Signalling RX queue");
262         } else {
263             debug!("Not signalling RX queue");
264         }
265         Ok(())
266     }
267 
268     fn run(
269         &mut self,
270         paused: Arc<AtomicBool>,
271         paused_sync: Arc<Barrier>,
272     ) -> result::Result<(), EpollHelperError> {
273         let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
274         helper.add_event(self.queue_evt_pair[0].as_raw_fd(), RX_QUEUE_EVENT)?;
275         helper.add_event(self.queue_evt_pair[1].as_raw_fd(), TX_QUEUE_EVENT)?;
276         if let Some(rate_limiter) = &self.net.rx_rate_limiter {
277             helper.add_event(rate_limiter.as_raw_fd(), RX_RATE_LIMITER_EVENT)?;
278         }
279         if let Some(rate_limiter) = &self.net.tx_rate_limiter {
280             helper.add_event(rate_limiter.as_raw_fd(), TX_RATE_LIMITER_EVENT)?;
281         }
282 
283         let mem = self.mem.memory();
284         // If there are some already available descriptors on the RX queue,
285         // then we can start the thread while listening onto the TAP.
286         if self.queue_pair[0]
287             .used_idx(mem.deref(), Ordering::Acquire)
288             .map_err(EpollHelperError::QueueRingIndex)?
289             < self.queue_pair[0]
290                 .avail_idx(mem.deref(), Ordering::Acquire)
291                 .map_err(EpollHelperError::QueueRingIndex)?
292         {
293             helper.add_event(self.net.tap.as_raw_fd(), RX_TAP_EVENT)?;
294             self.net.rx_tap_listening = true;
295             info!("Listener registered at start");
296         }
297 
298         // The NetQueuePair needs the epoll fd.
299         self.net.epoll_fd = Some(helper.as_raw_fd());
300 
301         helper.run(paused, paused_sync, self)?;
302 
303         Ok(())
304     }
305 }
306 
307 impl EpollHelperHandler for NetEpollHandler {
308     fn handle_event(
309         &mut self,
310         _helper: &mut EpollHelper,
311         event: &epoll::Event,
312     ) -> result::Result<(), EpollHelperError> {
313         let ev_type = event.data as u16;
314         match ev_type {
315             RX_QUEUE_EVENT => {
316                 self.driver_awake = true;
317                 self.handle_rx_event().map_err(|e| {
318                     EpollHelperError::HandleEvent(anyhow!("Error processing RX queue: {:?}", e))
319                 })?;
320             }
321             TX_QUEUE_EVENT => {
322                 let queue_evt = &self.queue_evt_pair[1];
323                 if let Err(e) = queue_evt.read() {
324                     error!("Failed to get tx queue event: {:?}", e);
325                 }
326                 self.driver_awake = true;
327                 self.handle_tx_event().map_err(|e| {
328                     EpollHelperError::HandleEvent(anyhow!("Error processing TX queue: {:?}", e))
329                 })?;
330             }
331             TX_TAP_EVENT => {
332                 self.handle_tx_event().map_err(|e| {
333                     EpollHelperError::HandleEvent(anyhow!(
334                         "Error processing TX queue (TAP event): {:?}",
335                         e
336                     ))
337                 })?;
338             }
339             RX_TAP_EVENT => {
340                 self.handle_rx_tap_event().map_err(|e| {
341                     EpollHelperError::HandleEvent(anyhow!("Error processing tap queue: {:?}", e))
342                 })?;
343             }
344             RX_RATE_LIMITER_EVENT => {
345                 if let Some(rate_limiter) = &mut self.net.rx_rate_limiter {
346                     // Upon rate limiter event, call the rate limiter handler and register the
347                     // TAP fd for further processing if some RX buffers are available
348                     rate_limiter.event_handler().map_err(|e| {
349                         EpollHelperError::HandleEvent(anyhow!(
350                             "Error from 'rate_limiter.event_handler()': {:?}",
351                             e
352                         ))
353                     })?;
354 
355                     if !self.net.rx_tap_listening && self.net.rx_desc_avail {
356                         net_util::register_listener(
357                             self.net.epoll_fd.unwrap(),
358                             self.net.tap.as_raw_fd(),
359                             epoll::Events::EPOLLIN,
360                             u64::from(self.net.tap_rx_event_id),
361                         )
362                         .map_err(|e| {
363                             EpollHelperError::HandleEvent(anyhow!(
364                                 "Error register_listener with `RX_RATE_LIMITER_EVENT`: {:?}",
365                                 e
366                             ))
367                         })?;
368 
369                         self.net.rx_tap_listening = true;
370                     }
371                 } else {
372                     return Err(EpollHelperError::HandleEvent(anyhow!(
373                         "Unexpected RX_RATE_LIMITER_EVENT"
374                     )));
375                 }
376             }
377             TX_RATE_LIMITER_EVENT => {
378                 if let Some(rate_limiter) = &mut self.net.tx_rate_limiter {
379                     // Upon rate limiter event, call the rate limiter handler
380                     // and restart processing the queue.
381                     rate_limiter.event_handler().map_err(|e| {
382                         EpollHelperError::HandleEvent(anyhow!(
383                             "Error from 'rate_limiter.event_handler()': {:?}",
384                             e
385                         ))
386                     })?;
387 
388                     self.driver_awake = true;
389                     self.process_tx().map_err(|e| {
390                         EpollHelperError::HandleEvent(anyhow!("Error processing TX queue: {:?}", e))
391                     })?;
392                 } else {
393                     return Err(EpollHelperError::HandleEvent(anyhow!(
394                         "Unexpected TX_RATE_LIMITER_EVENT"
395                     )));
396                 }
397             }
398             _ => {
399                 return Err(EpollHelperError::HandleEvent(anyhow!(
400                     "Unexpected event: {}",
401                     ev_type
402                 )));
403             }
404         }
405         Ok(())
406     }
407 }
408 
409 pub struct Net {
410     common: VirtioCommon,
411     id: String,
412     taps: Vec<Tap>,
413     config: VirtioNetConfig,
414     ctrl_queue_epoll_thread: Option<thread::JoinHandle<()>>,
415     counters: NetCounters,
416     seccomp_action: SeccompAction,
417     rate_limiter_config: Option<RateLimiterConfig>,
418     exit_evt: EventFd,
419 }
420 
421 #[derive(Versionize)]
422 pub struct NetState {
423     pub avail_features: u64,
424     pub acked_features: u64,
425     pub config: VirtioNetConfig,
426     pub queue_size: Vec<u16>,
427 }
428 
429 impl VersionMapped for NetState {}
430 
431 impl Net {
432     /// Create a new virtio network device with the given TAP interface.
433     #[allow(clippy::too_many_arguments)]
434     fn new_with_tap(
435         id: String,
436         taps: Vec<Tap>,
437         guest_mac: Option<MacAddr>,
438         iommu: bool,
439         num_queues: usize,
440         queue_size: u16,
441         seccomp_action: SeccompAction,
442         rate_limiter_config: Option<RateLimiterConfig>,
443         exit_evt: EventFd,
444     ) -> Result<Self> {
445         assert!(!taps.is_empty());
446 
447         let mtu = taps[0].mtu().map_err(Error::TapError)? as u16;
448 
449         let mut avail_features = 1 << VIRTIO_NET_F_CSUM
450             | 1 << VIRTIO_NET_F_CTRL_GUEST_OFFLOADS
451             | 1 << VIRTIO_NET_F_GUEST_CSUM
452             | 1 << VIRTIO_NET_F_GUEST_ECN
453             | 1 << VIRTIO_NET_F_GUEST_TSO4
454             | 1 << VIRTIO_NET_F_GUEST_TSO6
455             | 1 << VIRTIO_NET_F_GUEST_UFO
456             | 1 << VIRTIO_NET_F_HOST_ECN
457             | 1 << VIRTIO_NET_F_HOST_TSO4
458             | 1 << VIRTIO_NET_F_HOST_TSO6
459             | 1 << VIRTIO_NET_F_HOST_UFO
460             | 1 << VIRTIO_NET_F_MTU
461             | 1 << VIRTIO_RING_F_EVENT_IDX
462             | 1 << VIRTIO_F_VERSION_1;
463 
464         if iommu {
465             avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM;
466         }
467 
468         avail_features |= 1 << VIRTIO_NET_F_CTRL_VQ;
469         let queue_num = num_queues + 1;
470 
471         let mut config = VirtioNetConfig::default();
472         if let Some(mac) = guest_mac {
473             build_net_config_space(&mut config, mac, num_queues, Some(mtu), &mut avail_features);
474         } else {
475             build_net_config_space_with_mq(&mut config, num_queues, Some(mtu), &mut avail_features);
476         }
477 
478         Ok(Net {
479             common: VirtioCommon {
480                 device_type: VirtioDeviceType::Net as u32,
481                 avail_features,
482                 queue_sizes: vec![queue_size; queue_num],
483                 paused_sync: Some(Arc::new(Barrier::new((num_queues / 2) + 1))),
484                 min_queues: 2,
485                 ..Default::default()
486             },
487             id,
488             taps,
489             config,
490             ctrl_queue_epoll_thread: None,
491             counters: NetCounters::default(),
492             seccomp_action,
493             rate_limiter_config,
494             exit_evt,
495         })
496     }
497 
498     /// Create a new virtio network device with the given IP address and
499     /// netmask.
500     #[allow(clippy::too_many_arguments)]
501     pub fn new(
502         id: String,
503         if_name: Option<&str>,
504         ip_addr: Option<Ipv4Addr>,
505         netmask: Option<Ipv4Addr>,
506         guest_mac: Option<MacAddr>,
507         host_mac: &mut Option<MacAddr>,
508         mtu: Option<u16>,
509         iommu: bool,
510         num_queues: usize,
511         queue_size: u16,
512         seccomp_action: SeccompAction,
513         rate_limiter_config: Option<RateLimiterConfig>,
514         exit_evt: EventFd,
515     ) -> Result<Self> {
516         let taps = open_tap(
517             if_name,
518             ip_addr,
519             netmask,
520             host_mac,
521             mtu,
522             num_queues / 2,
523             None,
524         )
525         .map_err(Error::OpenTap)?;
526 
527         Self::new_with_tap(
528             id,
529             taps,
530             guest_mac,
531             iommu,
532             num_queues,
533             queue_size,
534             seccomp_action,
535             rate_limiter_config,
536             exit_evt,
537         )
538     }
539 
540     #[allow(clippy::too_many_arguments)]
541     pub fn from_tap_fds(
542         id: String,
543         fds: &[RawFd],
544         guest_mac: Option<MacAddr>,
545         mtu: Option<u16>,
546         iommu: bool,
547         queue_size: u16,
548         seccomp_action: SeccompAction,
549         rate_limiter_config: Option<RateLimiterConfig>,
550         exit_evt: EventFd,
551     ) -> Result<Self> {
552         let mut taps: Vec<Tap> = Vec::new();
553         let num_queue_pairs = fds.len();
554 
555         for fd in fds.iter() {
556             // Duplicate so that it can survive reboots
557             // SAFETY: FFI call to dup. Trivially safe.
558             let fd = unsafe { libc::dup(*fd) };
559             if fd < 0 {
560                 return Err(Error::DuplicateTapFd(std::io::Error::last_os_error()));
561             }
562             let tap = Tap::from_tap_fd(fd, num_queue_pairs).map_err(Error::TapError)?;
563             taps.push(tap);
564         }
565 
566         assert!(!taps.is_empty());
567 
568         if let Some(mtu) = mtu {
569             taps[0].set_mtu(mtu as i32).map_err(Error::TapError)?;
570         }
571 
572         Self::new_with_tap(
573             id,
574             taps,
575             guest_mac,
576             iommu,
577             num_queue_pairs * 2,
578             queue_size,
579             seccomp_action,
580             rate_limiter_config,
581             exit_evt,
582         )
583     }
584 
585     fn state(&self) -> NetState {
586         NetState {
587             avail_features: self.common.avail_features,
588             acked_features: self.common.acked_features,
589             config: self.config,
590             queue_size: self.common.queue_sizes.clone(),
591         }
592     }
593 
594     fn set_state(&mut self, state: &NetState) {
595         self.common.avail_features = state.avail_features;
596         self.common.acked_features = state.acked_features;
597         self.config = state.config;
598         self.common.queue_sizes = state.queue_size.clone();
599     }
600 }
601 
602 impl Drop for Net {
603     fn drop(&mut self) {
604         if let Some(kill_evt) = self.common.kill_evt.take() {
605             // Ignore the result because there is nothing we can do about it.
606             let _ = kill_evt.write(1);
607         }
608     }
609 }
610 
611 impl VirtioDevice for Net {
612     fn device_type(&self) -> u32 {
613         self.common.device_type
614     }
615 
616     fn queue_max_sizes(&self) -> &[u16] {
617         &self.common.queue_sizes
618     }
619 
620     fn features(&self) -> u64 {
621         self.common.avail_features
622     }
623 
624     fn ack_features(&mut self, value: u64) {
625         self.common.ack_features(value)
626     }
627 
628     fn read_config(&self, offset: u64, data: &mut [u8]) {
629         self.read_config_from_slice(self.config.as_slice(), offset, data);
630     }
631 
632     fn activate(
633         &mut self,
634         mem: GuestMemoryAtomic<GuestMemoryMmap>,
635         interrupt_cb: Arc<dyn VirtioInterrupt>,
636         mut queues: Vec<(usize, Queue, EventFd)>,
637     ) -> ActivateResult {
638         self.common.activate(&queues, &interrupt_cb)?;
639 
640         let num_queues = queues.len();
641         let event_idx = self.common.feature_acked(VIRTIO_RING_F_EVENT_IDX.into());
642         if self.common.feature_acked(VIRTIO_NET_F_CTRL_VQ.into()) && num_queues % 2 != 0 {
643             let ctrl_queue_index = num_queues - 1;
644             let (_, mut ctrl_queue, ctrl_queue_evt) = queues.remove(ctrl_queue_index);
645 
646             ctrl_queue.set_event_idx(event_idx);
647 
648             let (kill_evt, pause_evt) = self.common.dup_eventfds();
649             let mut ctrl_handler = NetCtrlEpollHandler {
650                 mem: mem.clone(),
651                 kill_evt,
652                 pause_evt,
653                 ctrl_q: CtrlQueue::new(self.taps.clone()),
654                 queue: ctrl_queue,
655                 queue_evt: ctrl_queue_evt,
656                 access_platform: self.common.access_platform.clone(),
657                 queue_index: ctrl_queue_index as u16,
658                 interrupt_cb: interrupt_cb.clone(),
659             };
660 
661             let paused = self.common.paused.clone();
662             // Let's update the barrier as we need 1 for each RX/TX pair +
663             // 1 for the control queue + 1 for the main thread signalling
664             // the pause.
665             self.common.paused_sync = Some(Arc::new(Barrier::new(self.taps.len() + 2)));
666             let paused_sync = self.common.paused_sync.clone();
667 
668             let mut epoll_threads = Vec::new();
669             spawn_virtio_thread(
670                 &format!("{}_ctrl", &self.id),
671                 &self.seccomp_action,
672                 Thread::VirtioNetCtl,
673                 &mut epoll_threads,
674                 &self.exit_evt,
675                 move || ctrl_handler.run_ctrl(paused, paused_sync.unwrap()),
676             )?;
677             self.ctrl_queue_epoll_thread = Some(epoll_threads.remove(0));
678         }
679 
680         let mut epoll_threads = Vec::new();
681         let mut taps = self.taps.clone();
682         for i in 0..queues.len() / 2 {
683             let rx = RxVirtio::new();
684             let tx = TxVirtio::new();
685             let rx_tap_listening = false;
686 
687             let (_, queue_0, queue_evt_0) = queues.remove(0);
688             let (_, queue_1, queue_evt_1) = queues.remove(0);
689             let mut queue_pair = vec![queue_0, queue_1];
690             queue_pair[0].set_event_idx(event_idx);
691             queue_pair[1].set_event_idx(event_idx);
692 
693             let queue_evt_pair = vec![queue_evt_0, queue_evt_1];
694 
695             let (kill_evt, pause_evt) = self.common.dup_eventfds();
696 
697             let rx_rate_limiter: Option<rate_limiter::RateLimiter> = self
698                 .rate_limiter_config
699                 .map(RateLimiterConfig::try_into)
700                 .transpose()
701                 .map_err(ActivateError::CreateRateLimiter)?;
702 
703             let tx_rate_limiter: Option<rate_limiter::RateLimiter> = self
704                 .rate_limiter_config
705                 .map(RateLimiterConfig::try_into)
706                 .transpose()
707                 .map_err(ActivateError::CreateRateLimiter)?;
708 
709             let tap = taps.remove(0);
710             tap.set_offload(virtio_features_to_tap_offload(self.common.acked_features))
711                 .map_err(|e| {
712                     error!("Error programming tap offload: {:?}", e);
713                     ActivateError::BadActivate
714                 })?;
715 
716             let mut handler = NetEpollHandler {
717                 net: NetQueuePair {
718                     tap_for_write_epoll: tap.clone(),
719                     tap,
720                     rx,
721                     tx,
722                     epoll_fd: None,
723                     rx_tap_listening,
724                     tx_tap_listening: false,
725                     counters: self.counters.clone(),
726                     tap_rx_event_id: RX_TAP_EVENT,
727                     tap_tx_event_id: TX_TAP_EVENT,
728                     rx_desc_avail: false,
729                     rx_rate_limiter,
730                     tx_rate_limiter,
731                     access_platform: self.common.access_platform.clone(),
732                 },
733                 mem: mem.clone(),
734                 queue_index_base: (i * 2) as u16,
735                 queue_pair,
736                 queue_evt_pair,
737                 interrupt_cb: interrupt_cb.clone(),
738                 kill_evt,
739                 pause_evt,
740                 driver_awake: false,
741             };
742 
743             let paused = self.common.paused.clone();
744             let paused_sync = self.common.paused_sync.clone();
745 
746             spawn_virtio_thread(
747                 &format!("{}_qp{}", self.id.clone(), i),
748                 &self.seccomp_action,
749                 Thread::VirtioNet,
750                 &mut epoll_threads,
751                 &self.exit_evt,
752                 move || handler.run(paused, paused_sync.unwrap()),
753             )?;
754         }
755 
756         self.common.epoll_threads = Some(epoll_threads);
757 
758         event!("virtio-device", "activated", "id", &self.id);
759         Ok(())
760     }
761 
762     fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> {
763         let result = self.common.reset();
764         event!("virtio-device", "reset", "id", &self.id);
765         result
766     }
767 
768     fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> {
769         let mut counters = HashMap::new();
770 
771         counters.insert(
772             "rx_bytes",
773             Wrapping(self.counters.rx_bytes.load(Ordering::Acquire)),
774         );
775         counters.insert(
776             "rx_frames",
777             Wrapping(self.counters.rx_frames.load(Ordering::Acquire)),
778         );
779         counters.insert(
780             "tx_bytes",
781             Wrapping(self.counters.tx_bytes.load(Ordering::Acquire)),
782         );
783         counters.insert(
784             "tx_frames",
785             Wrapping(self.counters.tx_frames.load(Ordering::Acquire)),
786         );
787 
788         Some(counters)
789     }
790 
791     fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) {
792         self.common.set_access_platform(access_platform)
793     }
794 }
795 
796 impl Pausable for Net {
797     fn pause(&mut self) -> result::Result<(), MigratableError> {
798         self.common.pause()
799     }
800 
801     fn resume(&mut self) -> result::Result<(), MigratableError> {
802         self.common.resume()?;
803 
804         if let Some(ctrl_queue_epoll_thread) = &self.ctrl_queue_epoll_thread {
805             ctrl_queue_epoll_thread.thread().unpark();
806         }
807         Ok(())
808     }
809 }
810 
811 impl Snapshottable for Net {
812     fn id(&self) -> String {
813         self.id.clone()
814     }
815 
816     fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
817         Snapshot::new_from_versioned_state(&self.id, &self.state())
818     }
819 
820     fn restore(&mut self, snapshot: Snapshot) -> std::result::Result<(), MigratableError> {
821         self.set_state(&snapshot.to_versioned_state(&self.id)?);
822         Ok(())
823     }
824 }
825 impl Transportable for Net {}
826 impl Migratable for Net {}
827