xref: /cloud-hypervisor/virtio-devices/src/block.rs (revision f67b3f79ea19c9a66e04074cbbf5d292f6529e43)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 //
3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE-BSD-3-Clause file.
6 //
7 // Copyright © 2020 Intel Corporation
8 //
9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
10 
11 use super::Error as DeviceError;
12 use super::{
13     ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, Queue,
14     RateLimiterConfig, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType,
15     EPOLL_HELPER_EVENT_LAST,
16 };
17 use crate::seccomp_filters::Thread;
18 use crate::thread_helper::spawn_virtio_thread;
19 use crate::GuestMemoryMmap;
20 use crate::VirtioInterrupt;
21 use block_util::{
22     async_io::AsyncIo, async_io::AsyncIoError, async_io::DiskFile, build_disk_image_id, Request,
23     RequestType, VirtioBlockConfig,
24 };
25 use rate_limiter::{RateLimiter, TokenType};
26 use seccompiler::SeccompAction;
27 use std::io;
28 use std::num::Wrapping;
29 use std::os::unix::io::AsRawFd;
30 use std::path::PathBuf;
31 use std::result;
32 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33 use std::sync::{Arc, Barrier};
34 use std::{collections::HashMap, convert::TryInto};
35 use versionize::{VersionMap, Versionize, VersionizeResult};
36 use versionize_derive::Versionize;
37 use virtio_bindings::bindings::virtio_blk::*;
38 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic};
39 use vm_migration::VersionMapped;
40 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
41 use vmm_sys_util::eventfd::EventFd;
42 
43 const SECTOR_SHIFT: u8 = 9;
44 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
45 
46 // New descriptors are pending on the virtio queue.
47 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
48 // New completed tasks are pending on the completion ring.
49 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
50 // New 'wake up' event from the rate limiter
51 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
52 
53 #[derive(Debug)]
54 pub enum Error {
55     /// Failed to parse the request.
56     RequestParsing(block_util::Error),
57     /// Failed to execute the request.
58     RequestExecuting(block_util::ExecuteError),
59     /// Missing the expected entry in the list of requests.
60     MissingEntryRequestList,
61     /// The asynchronous request returned with failure.
62     AsyncRequestFailure,
63     /// Failed synchronizing the file
64     Fsync(AsyncIoError),
65 }
66 
67 pub type Result<T> = result::Result<T, Error>;
68 
69 #[derive(Default, Clone)]
70 pub struct BlockCounters {
71     read_bytes: Arc<AtomicU64>,
72     read_ops: Arc<AtomicU64>,
73     write_bytes: Arc<AtomicU64>,
74     write_ops: Arc<AtomicU64>,
75 }
76 
77 struct BlockEpollHandler {
78     queue: Queue,
79     mem: GuestMemoryAtomic<GuestMemoryMmap>,
80     disk_image: Box<dyn AsyncIo>,
81     disk_nsectors: u64,
82     interrupt_cb: Arc<dyn VirtioInterrupt>,
83     disk_image_id: Vec<u8>,
84     kill_evt: EventFd,
85     pause_evt: EventFd,
86     writeback: Arc<AtomicBool>,
87     counters: BlockCounters,
88     queue_evt: EventFd,
89     request_list: HashMap<u16, Request>,
90     rate_limiter: Option<RateLimiter>,
91 }
92 
93 impl BlockEpollHandler {
94     fn process_queue_submit(&mut self) -> Result<bool> {
95         let queue = &mut self.queue;
96         let mem = self.mem.memory();
97 
98         let mut used_desc_heads = Vec::new();
99         let mut used_count = 0;
100 
101         for avail_desc in queue.iter(&mem) {
102             let mut request = Request::parse(&avail_desc, &mem).map_err(Error::RequestParsing)?;
103 
104             if let Some(rate_limiter) = &mut self.rate_limiter {
105                 // If limiter.consume() fails it means there is no more TokenType::Ops
106                 // budget and rate limiting is in effect.
107                 if !rate_limiter.consume(1, TokenType::Ops) {
108                     // Stop processing the queue and return this descriptor chain to the
109                     // avail ring, for later processing.
110                     queue.go_to_previous_position();
111                     break;
112                 }
113                 // Exercise the rate limiter only if this request is of data transfer type.
114                 if request.request_type == RequestType::In
115                     || request.request_type == RequestType::Out
116                 {
117                     let mut bytes = Wrapping(0);
118                     for (_, data_len) in &request.data_descriptors {
119                         bytes += Wrapping(*data_len as u64);
120                     }
121 
122                     // If limiter.consume() fails it means there is no more TokenType::Bytes
123                     // budget and rate limiting is in effect.
124                     if !rate_limiter.consume(bytes.0, TokenType::Bytes) {
125                         // Revert the OPS consume().
126                         rate_limiter.manual_replenish(1, TokenType::Ops);
127                         // Stop processing the queue and return this descriptor chain to the
128                         // avail ring, for later processing.
129                         queue.go_to_previous_position();
130                         break;
131                     }
132                 };
133             }
134 
135             request.set_writeback(self.writeback.load(Ordering::Acquire));
136 
137             if request
138                 .execute_async(
139                     &mem,
140                     self.disk_nsectors,
141                     self.disk_image.as_mut(),
142                     &self.disk_image_id,
143                     avail_desc.index as u64,
144                 )
145                 .map_err(Error::RequestExecuting)?
146             {
147                 self.request_list.insert(avail_desc.index, request);
148             } else {
149                 // We use unwrap because the request parsing process already
150                 // checked that the status_addr was valid.
151                 mem.write_obj(VIRTIO_BLK_S_OK, request.status_addr).unwrap();
152 
153                 // If no asynchronous operation has been submitted, we can
154                 // simply return the used descriptor.
155                 used_desc_heads.push((avail_desc.index, 0));
156                 used_count += 1;
157             }
158         }
159 
160         for &(desc_index, len) in used_desc_heads.iter() {
161             queue.add_used(&mem, desc_index, len);
162         }
163 
164         Ok(used_count > 0)
165     }
166 
167     fn process_queue_complete(&mut self) -> Result<bool> {
168         let queue = &mut self.queue;
169 
170         let mut used_desc_heads = Vec::new();
171         let mut used_count = 0;
172         let mem = self.mem.memory();
173         let mut read_bytes = Wrapping(0);
174         let mut write_bytes = Wrapping(0);
175         let mut read_ops = Wrapping(0);
176         let mut write_ops = Wrapping(0);
177 
178         let completion_list = self.disk_image.complete();
179         for (user_data, result) in completion_list {
180             let desc_index = user_data as u16;
181             let request = self
182                 .request_list
183                 .remove(&desc_index)
184                 .ok_or(Error::MissingEntryRequestList)?;
185 
186             let (status, len) = if result >= 0 {
187                 match request.request_type {
188                     RequestType::In => {
189                         for (_, data_len) in &request.data_descriptors {
190                             read_bytes += Wrapping(*data_len as u64);
191                         }
192                         read_ops += Wrapping(1);
193                     }
194                     RequestType::Out => {
195                         if !request.writeback {
196                             self.disk_image.fsync(None).map_err(Error::Fsync)?;
197                         }
198                         for (_, data_len) in &request.data_descriptors {
199                             write_bytes += Wrapping(*data_len as u64);
200                         }
201                         write_ops += Wrapping(1);
202                     }
203                     _ => {}
204                 }
205 
206                 (VIRTIO_BLK_S_OK, result as u32)
207             } else {
208                 error!(
209                     "Request failed: {:?}",
210                     io::Error::from_raw_os_error(-result)
211                 );
212                 return Err(Error::AsyncRequestFailure);
213             };
214 
215             // We use unwrap because the request parsing process already
216             // checked that the status_addr was valid.
217             mem.write_obj(status, request.status_addr).unwrap();
218 
219             used_desc_heads.push((desc_index as u16, len));
220             used_count += 1;
221         }
222 
223         for &(desc_index, len) in used_desc_heads.iter() {
224             queue.add_used(&mem, desc_index, len);
225         }
226 
227         self.counters
228             .write_bytes
229             .fetch_add(write_bytes.0, Ordering::AcqRel);
230         self.counters
231             .write_ops
232             .fetch_add(write_ops.0, Ordering::AcqRel);
233 
234         self.counters
235             .read_bytes
236             .fetch_add(read_bytes.0, Ordering::AcqRel);
237         self.counters
238             .read_ops
239             .fetch_add(read_ops.0, Ordering::AcqRel);
240 
241         Ok(used_count > 0)
242     }
243 
244     fn signal_used_queue(&self) -> result::Result<(), DeviceError> {
245         self.interrupt_cb
246             .trigger(&VirtioInterruptType::Queue, Some(&self.queue))
247             .map_err(|e| {
248                 error!("Failed to signal used queue: {:?}", e);
249                 DeviceError::FailedSignalingUsedQueue(e)
250             })
251     }
252 
253     fn run(
254         &mut self,
255         paused: Arc<AtomicBool>,
256         paused_sync: Arc<Barrier>,
257     ) -> result::Result<(), EpollHelperError> {
258         let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
259         helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
260         helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?;
261         if let Some(rate_limiter) = &self.rate_limiter {
262             helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?;
263         }
264         helper.run(paused, paused_sync, self)?;
265 
266         Ok(())
267     }
268 }
269 
270 impl EpollHelperHandler for BlockEpollHandler {
271     fn handle_event(&mut self, _helper: &mut EpollHelper, event: &epoll::Event) -> bool {
272         let ev_type = event.data as u16;
273         match ev_type {
274             QUEUE_AVAIL_EVENT => {
275                 if let Err(e) = self.queue_evt.read() {
276                     error!("Failed to get queue event: {:?}", e);
277                     return true;
278                 }
279 
280                 let rate_limit_reached =
281                     self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked());
282 
283                 // Process the queue only when the rate limit is not reached
284                 if !rate_limit_reached {
285                     match self.process_queue_submit() {
286                         Ok(needs_notification) => {
287                             if needs_notification {
288                                 if let Err(e) = self.signal_used_queue() {
289                                     error!("Failed to signal used queue: {:?}", e);
290                                     return true;
291                                 }
292                             }
293                         }
294                         Err(e) => {
295                             error!("Failed to process queue (submit): {:?}", e);
296                             return true;
297                         }
298                     }
299                 }
300             }
301             COMPLETION_EVENT => {
302                 if let Err(e) = self.disk_image.notifier().read() {
303                     error!("Failed to get queue event: {:?}", e);
304                     return true;
305                 }
306 
307                 match self.process_queue_complete() {
308                     Ok(needs_notification) => {
309                         if needs_notification {
310                             if let Err(e) = self.signal_used_queue() {
311                                 error!("Failed to signal used queue: {:?}", e);
312                                 return true;
313                             }
314                         }
315                     }
316                     Err(e) => {
317                         error!("Failed to process queue (complete): {:?}", e);
318                         return true;
319                     }
320                 }
321             }
322             RATE_LIMITER_EVENT => {
323                 if let Some(rate_limiter) = &mut self.rate_limiter {
324                     // Upon rate limiter event, call the rate limiter handler
325                     // and restart processing the queue.
326                     if rate_limiter.event_handler().is_ok() {
327                         match self.process_queue_submit() {
328                             Ok(needs_notification) => {
329                                 if needs_notification {
330                                     if let Err(e) = self.signal_used_queue() {
331                                         error!("Failed to signal used queue: {:?}", e);
332                                         return true;
333                                     }
334                                 }
335                             }
336                             Err(e) => {
337                                 error!("Failed to process queue (submit): {:?}", e);
338                                 return true;
339                             }
340                         }
341                     }
342                 } else {
343                     error!("Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled.");
344                     return true;
345                 }
346             }
347             _ => {
348                 error!("Unexpected event: {}", ev_type);
349                 return true;
350             }
351         }
352         false
353     }
354 }
355 
356 /// Virtio device for exposing block level read/write operations on a host file.
357 pub struct Block {
358     common: VirtioCommon,
359     id: String,
360     disk_image: Box<dyn DiskFile>,
361     disk_path: PathBuf,
362     disk_nsectors: u64,
363     config: VirtioBlockConfig,
364     writeback: Arc<AtomicBool>,
365     counters: BlockCounters,
366     seccomp_action: SeccompAction,
367     rate_limiter_config: Option<RateLimiterConfig>,
368     exit_evt: EventFd,
369 }
370 
371 #[derive(Versionize)]
372 pub struct BlockState {
373     pub disk_path: String,
374     pub disk_nsectors: u64,
375     pub avail_features: u64,
376     pub acked_features: u64,
377     pub config: VirtioBlockConfig,
378 }
379 
380 impl VersionMapped for BlockState {}
381 
382 impl Block {
383     /// Create a new virtio block device that operates on the given file.
384     #[allow(clippy::too_many_arguments)]
385     pub fn new(
386         id: String,
387         mut disk_image: Box<dyn DiskFile>,
388         disk_path: PathBuf,
389         is_disk_read_only: bool,
390         iommu: bool,
391         num_queues: usize,
392         queue_size: u16,
393         seccomp_action: SeccompAction,
394         rate_limiter_config: Option<RateLimiterConfig>,
395         exit_evt: EventFd,
396     ) -> io::Result<Self> {
397         let disk_size = disk_image.size().map_err(|e| {
398             io::Error::new(
399                 io::ErrorKind::Other,
400                 format!("Failed getting disk size: {}", e),
401             )
402         })?;
403         if disk_size % SECTOR_SIZE != 0 {
404             warn!(
405                 "Disk size {} is not a multiple of sector size {}; \
406                  the remainder will not be visible to the guest.",
407                 disk_size, SECTOR_SIZE
408             );
409         }
410 
411         let mut avail_features = (1u64 << VIRTIO_F_VERSION_1)
412             | (1u64 << VIRTIO_BLK_F_FLUSH)
413             | (1u64 << VIRTIO_BLK_F_CONFIG_WCE);
414 
415         if iommu {
416             avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM;
417         }
418 
419         if is_disk_read_only {
420             avail_features |= 1u64 << VIRTIO_BLK_F_RO;
421         }
422 
423         let disk_nsectors = disk_size / SECTOR_SIZE;
424         let mut config = VirtioBlockConfig {
425             capacity: disk_nsectors,
426             writeback: 1,
427             ..Default::default()
428         };
429 
430         if num_queues > 1 {
431             avail_features |= 1u64 << VIRTIO_BLK_F_MQ;
432             config.num_queues = num_queues as u16;
433         }
434 
435         Ok(Block {
436             common: VirtioCommon {
437                 device_type: VirtioDeviceType::Block as u32,
438                 avail_features,
439                 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))),
440                 queue_sizes: vec![queue_size; num_queues],
441                 min_queues: 1,
442                 ..Default::default()
443             },
444             id,
445             disk_image,
446             disk_path,
447             disk_nsectors,
448             config,
449             writeback: Arc::new(AtomicBool::new(true)),
450             counters: BlockCounters::default(),
451             seccomp_action,
452             rate_limiter_config,
453             exit_evt,
454         })
455     }
456 
457     fn state(&self) -> BlockState {
458         BlockState {
459             disk_path: self.disk_path.to_str().unwrap().to_owned(),
460             disk_nsectors: self.disk_nsectors,
461             avail_features: self.common.avail_features,
462             acked_features: self.common.acked_features,
463             config: self.config,
464         }
465     }
466 
467     fn set_state(&mut self, state: &BlockState) {
468         self.disk_path = state.disk_path.clone().into();
469         self.disk_nsectors = state.disk_nsectors;
470         self.common.avail_features = state.avail_features;
471         self.common.acked_features = state.acked_features;
472         self.config = state.config;
473     }
474 
475     fn update_writeback(&mut self) {
476         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
477         let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) {
478             self.config.writeback == 1
479         } else {
480             // Else check if VIRTIO_BLK_F_FLUSH negotiated
481             self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into())
482         };
483 
484         info!(
485             "Changing cache mode to {}",
486             if writeback {
487                 "writeback"
488             } else {
489                 "writethrough"
490             }
491         );
492         self.writeback.store(writeback, Ordering::Release);
493     }
494 }
495 
496 impl Drop for Block {
497     fn drop(&mut self) {
498         if let Some(kill_evt) = self.common.kill_evt.take() {
499             // Ignore the result because there is nothing we can do about it.
500             let _ = kill_evt.write(1);
501         }
502     }
503 }
504 
505 impl VirtioDevice for Block {
506     fn device_type(&self) -> u32 {
507         self.common.device_type
508     }
509 
510     fn queue_max_sizes(&self) -> &[u16] {
511         &self.common.queue_sizes
512     }
513 
514     fn features(&self) -> u64 {
515         self.common.avail_features
516     }
517 
518     fn ack_features(&mut self, value: u64) {
519         self.common.ack_features(value)
520     }
521 
522     fn read_config(&self, offset: u64, data: &mut [u8]) {
523         self.read_config_from_slice(self.config.as_slice(), offset, data);
524     }
525 
526     fn write_config(&mut self, offset: u64, data: &[u8]) {
527         // The "writeback" field is the only mutable field
528         let writeback_offset =
529             (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64);
530         if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback)
531         {
532             error!(
533                 "Attempt to write to read-only field: offset {:x} length {}",
534                 offset,
535                 data.len()
536             );
537             return;
538         }
539 
540         self.config.writeback = data[0];
541         self.update_writeback();
542     }
543 
544     fn activate(
545         &mut self,
546         mem: GuestMemoryAtomic<GuestMemoryMmap>,
547         interrupt_cb: Arc<dyn VirtioInterrupt>,
548         mut queues: Vec<Queue>,
549         mut queue_evts: Vec<EventFd>,
550     ) -> ActivateResult {
551         self.common.activate(&queues, &queue_evts, &interrupt_cb)?;
552 
553         let disk_image_id = build_disk_image_id(&self.disk_path);
554         self.update_writeback();
555 
556         let mut epoll_threads = Vec::new();
557         for i in 0..queues.len() {
558             let queue_evt = queue_evts.remove(0);
559             let queue = queues.remove(0);
560             let queue_size = queue.size;
561             let (kill_evt, pause_evt) = self.common.dup_eventfds();
562 
563             let rate_limiter: Option<RateLimiter> = self
564                 .rate_limiter_config
565                 .map(RateLimiterConfig::try_into)
566                 .transpose()
567                 .map_err(ActivateError::CreateRateLimiter)?;
568 
569             let mut handler = BlockEpollHandler {
570                 queue,
571                 mem: mem.clone(),
572                 disk_image: self
573                     .disk_image
574                     .new_async_io(queue_size as u32)
575                     .map_err(|e| {
576                         error!("failed to create new AsyncIo: {}", e);
577                         ActivateError::BadActivate
578                     })?,
579                 disk_nsectors: self.disk_nsectors,
580                 interrupt_cb: interrupt_cb.clone(),
581                 disk_image_id: disk_image_id.clone(),
582                 kill_evt,
583                 pause_evt,
584                 writeback: self.writeback.clone(),
585                 counters: self.counters.clone(),
586                 queue_evt,
587                 request_list: HashMap::with_capacity(queue_size.into()),
588                 rate_limiter,
589             };
590 
591             let paused = self.common.paused.clone();
592             let paused_sync = self.common.paused_sync.clone();
593 
594             spawn_virtio_thread(
595                 &format!("{}_q{}", self.id.clone(), i),
596                 &self.seccomp_action,
597                 Thread::VirtioBlock,
598                 &mut epoll_threads,
599                 &self.exit_evt,
600                 move || {
601                     if let Err(e) = handler.run(paused, paused_sync.unwrap()) {
602                         error!("Error running worker: {:?}", e);
603                     }
604                 },
605             )?;
606         }
607 
608         self.common.epoll_threads = Some(epoll_threads);
609         event!("virtio-device", "activated", "id", &self.id);
610 
611         Ok(())
612     }
613 
614     fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> {
615         let result = self.common.reset();
616         event!("virtio-device", "reset", "id", &self.id);
617         result
618     }
619 
620     fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> {
621         let mut counters = HashMap::new();
622 
623         counters.insert(
624             "read_bytes",
625             Wrapping(self.counters.read_bytes.load(Ordering::Acquire)),
626         );
627         counters.insert(
628             "write_bytes",
629             Wrapping(self.counters.write_bytes.load(Ordering::Acquire)),
630         );
631         counters.insert(
632             "read_ops",
633             Wrapping(self.counters.read_ops.load(Ordering::Acquire)),
634         );
635         counters.insert(
636             "write_ops",
637             Wrapping(self.counters.write_ops.load(Ordering::Acquire)),
638         );
639 
640         Some(counters)
641     }
642 }
643 
644 impl Pausable for Block {
645     fn pause(&mut self) -> result::Result<(), MigratableError> {
646         self.common.pause()
647     }
648 
649     fn resume(&mut self) -> result::Result<(), MigratableError> {
650         self.common.resume()
651     }
652 }
653 
654 impl Snapshottable for Block {
655     fn id(&self) -> String {
656         self.id.clone()
657     }
658 
659     fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
660         Snapshot::new_from_versioned_state(&self.id(), &self.state())
661     }
662 
663     fn restore(&mut self, snapshot: Snapshot) -> std::result::Result<(), MigratableError> {
664         self.set_state(&snapshot.to_versioned_state(&self.id)?);
665         Ok(())
666     }
667 }
668 impl Transportable for Block {}
669 impl Migratable for Block {}
670