xref: /qemu/migration/ram.c (revision dfeb8679db358e1f8e0ee4dd84f903d71f000378)
1 /*
2  * QEMU System Emulator
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2011-2015 Red Hat Inc
6  *
7  * Authors:
8  *  Juan Quintela <quintela@redhat.com>
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  */
28 #include <stdint.h>
29 #include <zlib.h>
30 #include "qemu/bitops.h"
31 #include "qemu/bitmap.h"
32 #include "qemu/timer.h"
33 #include "qemu/main-loop.h"
34 #include "migration/migration.h"
35 #include "exec/address-spaces.h"
36 #include "migration/page_cache.h"
37 #include "qemu/error-report.h"
38 #include "trace.h"
39 #include "exec/ram_addr.h"
40 #include "qemu/rcu_queue.h"
41 
42 #ifdef DEBUG_MIGRATION_RAM
43 #define DPRINTF(fmt, ...) \
44     do { fprintf(stdout, "migration_ram: " fmt, ## __VA_ARGS__); } while (0)
45 #else
46 #define DPRINTF(fmt, ...) \
47     do { } while (0)
48 #endif
49 
50 static int dirty_rate_high_cnt;
51 
52 static uint64_t bitmap_sync_count;
53 
54 /***********************************************************/
55 /* ram save/restore */
56 
57 #define RAM_SAVE_FLAG_FULL     0x01 /* Obsolete, not used anymore */
58 #define RAM_SAVE_FLAG_COMPRESS 0x02
59 #define RAM_SAVE_FLAG_MEM_SIZE 0x04
60 #define RAM_SAVE_FLAG_PAGE     0x08
61 #define RAM_SAVE_FLAG_EOS      0x10
62 #define RAM_SAVE_FLAG_CONTINUE 0x20
63 #define RAM_SAVE_FLAG_XBZRLE   0x40
64 /* 0x80 is reserved in migration.h start with 0x100 next */
65 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
66 
67 static const uint8_t ZERO_TARGET_PAGE[TARGET_PAGE_SIZE];
68 
69 static inline bool is_zero_range(uint8_t *p, uint64_t size)
70 {
71     return buffer_find_nonzero_offset(p, size) == size;
72 }
73 
74 /* struct contains XBZRLE cache and a static page
75    used by the compression */
76 static struct {
77     /* buffer used for XBZRLE encoding */
78     uint8_t *encoded_buf;
79     /* buffer for storing page content */
80     uint8_t *current_buf;
81     /* Cache for XBZRLE, Protected by lock. */
82     PageCache *cache;
83     QemuMutex lock;
84 } XBZRLE;
85 
86 /* buffer used for XBZRLE decoding */
87 static uint8_t *xbzrle_decoded_buf;
88 
89 static void XBZRLE_cache_lock(void)
90 {
91     if (migrate_use_xbzrle())
92         qemu_mutex_lock(&XBZRLE.lock);
93 }
94 
95 static void XBZRLE_cache_unlock(void)
96 {
97     if (migrate_use_xbzrle())
98         qemu_mutex_unlock(&XBZRLE.lock);
99 }
100 
101 /*
102  * called from qmp_migrate_set_cache_size in main thread, possibly while
103  * a migration is in progress.
104  * A running migration maybe using the cache and might finish during this
105  * call, hence changes to the cache are protected by XBZRLE.lock().
106  */
107 int64_t xbzrle_cache_resize(int64_t new_size)
108 {
109     PageCache *new_cache;
110     int64_t ret;
111 
112     if (new_size < TARGET_PAGE_SIZE) {
113         return -1;
114     }
115 
116     XBZRLE_cache_lock();
117 
118     if (XBZRLE.cache != NULL) {
119         if (pow2floor(new_size) == migrate_xbzrle_cache_size()) {
120             goto out_new_size;
121         }
122         new_cache = cache_init(new_size / TARGET_PAGE_SIZE,
123                                         TARGET_PAGE_SIZE);
124         if (!new_cache) {
125             error_report("Error creating cache");
126             ret = -1;
127             goto out;
128         }
129 
130         cache_fini(XBZRLE.cache);
131         XBZRLE.cache = new_cache;
132     }
133 
134 out_new_size:
135     ret = pow2floor(new_size);
136 out:
137     XBZRLE_cache_unlock();
138     return ret;
139 }
140 
141 /* accounting for migration statistics */
142 typedef struct AccountingInfo {
143     uint64_t dup_pages;
144     uint64_t skipped_pages;
145     uint64_t norm_pages;
146     uint64_t iterations;
147     uint64_t xbzrle_bytes;
148     uint64_t xbzrle_pages;
149     uint64_t xbzrle_cache_miss;
150     double xbzrle_cache_miss_rate;
151     uint64_t xbzrle_overflows;
152 } AccountingInfo;
153 
154 static AccountingInfo acct_info;
155 
156 static void acct_clear(void)
157 {
158     memset(&acct_info, 0, sizeof(acct_info));
159 }
160 
161 uint64_t dup_mig_bytes_transferred(void)
162 {
163     return acct_info.dup_pages * TARGET_PAGE_SIZE;
164 }
165 
166 uint64_t dup_mig_pages_transferred(void)
167 {
168     return acct_info.dup_pages;
169 }
170 
171 uint64_t skipped_mig_bytes_transferred(void)
172 {
173     return acct_info.skipped_pages * TARGET_PAGE_SIZE;
174 }
175 
176 uint64_t skipped_mig_pages_transferred(void)
177 {
178     return acct_info.skipped_pages;
179 }
180 
181 uint64_t norm_mig_bytes_transferred(void)
182 {
183     return acct_info.norm_pages * TARGET_PAGE_SIZE;
184 }
185 
186 uint64_t norm_mig_pages_transferred(void)
187 {
188     return acct_info.norm_pages;
189 }
190 
191 uint64_t xbzrle_mig_bytes_transferred(void)
192 {
193     return acct_info.xbzrle_bytes;
194 }
195 
196 uint64_t xbzrle_mig_pages_transferred(void)
197 {
198     return acct_info.xbzrle_pages;
199 }
200 
201 uint64_t xbzrle_mig_pages_cache_miss(void)
202 {
203     return acct_info.xbzrle_cache_miss;
204 }
205 
206 double xbzrle_mig_cache_miss_rate(void)
207 {
208     return acct_info.xbzrle_cache_miss_rate;
209 }
210 
211 uint64_t xbzrle_mig_pages_overflow(void)
212 {
213     return acct_info.xbzrle_overflows;
214 }
215 
216 /* This is the last block that we have visited serching for dirty pages
217  */
218 static RAMBlock *last_seen_block;
219 /* This is the last block from where we have sent data */
220 static RAMBlock *last_sent_block;
221 static ram_addr_t last_offset;
222 static unsigned long *migration_bitmap;
223 static QemuMutex migration_bitmap_mutex;
224 static uint64_t migration_dirty_pages;
225 static uint32_t last_version;
226 static bool ram_bulk_stage;
227 
228 /* used by the search for pages to send */
229 struct PageSearchStatus {
230     /* Current block being searched */
231     RAMBlock    *block;
232     /* Current offset to search from */
233     ram_addr_t   offset;
234     /* Set once we wrap around */
235     bool         complete_round;
236 };
237 typedef struct PageSearchStatus PageSearchStatus;
238 
239 struct CompressParam {
240     bool start;
241     bool done;
242     QEMUFile *file;
243     QemuMutex mutex;
244     QemuCond cond;
245     RAMBlock *block;
246     ram_addr_t offset;
247 };
248 typedef struct CompressParam CompressParam;
249 
250 struct DecompressParam {
251     bool start;
252     QemuMutex mutex;
253     QemuCond cond;
254     void *des;
255     uint8 *compbuf;
256     int len;
257 };
258 typedef struct DecompressParam DecompressParam;
259 
260 static CompressParam *comp_param;
261 static QemuThread *compress_threads;
262 /* comp_done_cond is used to wake up the migration thread when
263  * one of the compression threads has finished the compression.
264  * comp_done_lock is used to co-work with comp_done_cond.
265  */
266 static QemuMutex *comp_done_lock;
267 static QemuCond *comp_done_cond;
268 /* The empty QEMUFileOps will be used by file in CompressParam */
269 static const QEMUFileOps empty_ops = { };
270 
271 static bool compression_switch;
272 static bool quit_comp_thread;
273 static bool quit_decomp_thread;
274 static DecompressParam *decomp_param;
275 static QemuThread *decompress_threads;
276 static uint8_t *compressed_data_buf;
277 
278 static int do_compress_ram_page(CompressParam *param);
279 
280 static void *do_data_compress(void *opaque)
281 {
282     CompressParam *param = opaque;
283 
284     while (!quit_comp_thread) {
285         qemu_mutex_lock(&param->mutex);
286         /* Re-check the quit_comp_thread in case of
287          * terminate_compression_threads is called just before
288          * qemu_mutex_lock(&param->mutex) and after
289          * while(!quit_comp_thread), re-check it here can make
290          * sure the compression thread terminate as expected.
291          */
292         while (!param->start && !quit_comp_thread) {
293             qemu_cond_wait(&param->cond, &param->mutex);
294         }
295         if (!quit_comp_thread) {
296             do_compress_ram_page(param);
297         }
298         param->start = false;
299         qemu_mutex_unlock(&param->mutex);
300 
301         qemu_mutex_lock(comp_done_lock);
302         param->done = true;
303         qemu_cond_signal(comp_done_cond);
304         qemu_mutex_unlock(comp_done_lock);
305     }
306 
307     return NULL;
308 }
309 
310 static inline void terminate_compression_threads(void)
311 {
312     int idx, thread_count;
313 
314     thread_count = migrate_compress_threads();
315     quit_comp_thread = true;
316     for (idx = 0; idx < thread_count; idx++) {
317         qemu_mutex_lock(&comp_param[idx].mutex);
318         qemu_cond_signal(&comp_param[idx].cond);
319         qemu_mutex_unlock(&comp_param[idx].mutex);
320     }
321 }
322 
323 void migrate_compress_threads_join(void)
324 {
325     int i, thread_count;
326 
327     if (!migrate_use_compression()) {
328         return;
329     }
330     terminate_compression_threads();
331     thread_count = migrate_compress_threads();
332     for (i = 0; i < thread_count; i++) {
333         qemu_thread_join(compress_threads + i);
334         qemu_fclose(comp_param[i].file);
335         qemu_mutex_destroy(&comp_param[i].mutex);
336         qemu_cond_destroy(&comp_param[i].cond);
337     }
338     qemu_mutex_destroy(comp_done_lock);
339     qemu_cond_destroy(comp_done_cond);
340     g_free(compress_threads);
341     g_free(comp_param);
342     g_free(comp_done_cond);
343     g_free(comp_done_lock);
344     compress_threads = NULL;
345     comp_param = NULL;
346     comp_done_cond = NULL;
347     comp_done_lock = NULL;
348 }
349 
350 void migrate_compress_threads_create(void)
351 {
352     int i, thread_count;
353 
354     if (!migrate_use_compression()) {
355         return;
356     }
357     quit_comp_thread = false;
358     compression_switch = true;
359     thread_count = migrate_compress_threads();
360     compress_threads = g_new0(QemuThread, thread_count);
361     comp_param = g_new0(CompressParam, thread_count);
362     comp_done_cond = g_new0(QemuCond, 1);
363     comp_done_lock = g_new0(QemuMutex, 1);
364     qemu_cond_init(comp_done_cond);
365     qemu_mutex_init(comp_done_lock);
366     for (i = 0; i < thread_count; i++) {
367         /* com_param[i].file is just used as a dummy buffer to save data, set
368          * it's ops to empty.
369          */
370         comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
371         comp_param[i].done = true;
372         qemu_mutex_init(&comp_param[i].mutex);
373         qemu_cond_init(&comp_param[i].cond);
374         qemu_thread_create(compress_threads + i, "compress",
375                            do_data_compress, comp_param + i,
376                            QEMU_THREAD_JOINABLE);
377     }
378 }
379 
380 /**
381  * save_page_header: Write page header to wire
382  *
383  * If this is the 1st block, it also writes the block identification
384  *
385  * Returns: Number of bytes written
386  *
387  * @f: QEMUFile where to send the data
388  * @block: block that contains the page we want to send
389  * @offset: offset inside the block for the page
390  *          in the lower bits, it contains flags
391  */
392 static size_t save_page_header(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
393 {
394     size_t size, len;
395 
396     qemu_put_be64(f, offset);
397     size = 8;
398 
399     if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
400         len = strlen(block->idstr);
401         qemu_put_byte(f, len);
402         qemu_put_buffer(f, (uint8_t *)block->idstr, len);
403         size += 1 + len;
404     }
405     return size;
406 }
407 
408 /* Reduce amount of guest cpu execution to hopefully slow down memory writes.
409  * If guest dirty memory rate is reduced below the rate at which we can
410  * transfer pages to the destination then we should be able to complete
411  * migration. Some workloads dirty memory way too fast and will not effectively
412  * converge, even with auto-converge.
413  */
414 static void mig_throttle_guest_down(void)
415 {
416     MigrationState *s = migrate_get_current();
417     uint64_t pct_initial =
418             s->parameters[MIGRATION_PARAMETER_X_CPU_THROTTLE_INITIAL];
419     uint64_t pct_icrement =
420             s->parameters[MIGRATION_PARAMETER_X_CPU_THROTTLE_INCREMENT];
421 
422     /* We have not started throttling yet. Let's start it. */
423     if (!cpu_throttle_active()) {
424         cpu_throttle_set(pct_initial);
425     } else {
426         /* Throttling already on, just increase the rate */
427         cpu_throttle_set(cpu_throttle_get_percentage() + pct_icrement);
428     }
429 }
430 
431 /* Update the xbzrle cache to reflect a page that's been sent as all 0.
432  * The important thing is that a stale (not-yet-0'd) page be replaced
433  * by the new data.
434  * As a bonus, if the page wasn't in the cache it gets added so that
435  * when a small write is made into the 0'd page it gets XBZRLE sent
436  */
437 static void xbzrle_cache_zero_page(ram_addr_t current_addr)
438 {
439     if (ram_bulk_stage || !migrate_use_xbzrle()) {
440         return;
441     }
442 
443     /* We don't care if this fails to allocate a new cache page
444      * as long as it updated an old one */
445     cache_insert(XBZRLE.cache, current_addr, ZERO_TARGET_PAGE,
446                  bitmap_sync_count);
447 }
448 
449 #define ENCODING_FLAG_XBZRLE 0x1
450 
451 /**
452  * save_xbzrle_page: compress and send current page
453  *
454  * Returns: 1 means that we wrote the page
455  *          0 means that page is identical to the one already sent
456  *          -1 means that xbzrle would be longer than normal
457  *
458  * @f: QEMUFile where to send the data
459  * @current_data:
460  * @current_addr:
461  * @block: block that contains the page we want to send
462  * @offset: offset inside the block for the page
463  * @last_stage: if we are at the completion stage
464  * @bytes_transferred: increase it with the number of transferred bytes
465  */
466 static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
467                             ram_addr_t current_addr, RAMBlock *block,
468                             ram_addr_t offset, bool last_stage,
469                             uint64_t *bytes_transferred)
470 {
471     int encoded_len = 0, bytes_xbzrle;
472     uint8_t *prev_cached_page;
473 
474     if (!cache_is_cached(XBZRLE.cache, current_addr, bitmap_sync_count)) {
475         acct_info.xbzrle_cache_miss++;
476         if (!last_stage) {
477             if (cache_insert(XBZRLE.cache, current_addr, *current_data,
478                              bitmap_sync_count) == -1) {
479                 return -1;
480             } else {
481                 /* update *current_data when the page has been
482                    inserted into cache */
483                 *current_data = get_cached_data(XBZRLE.cache, current_addr);
484             }
485         }
486         return -1;
487     }
488 
489     prev_cached_page = get_cached_data(XBZRLE.cache, current_addr);
490 
491     /* save current buffer into memory */
492     memcpy(XBZRLE.current_buf, *current_data, TARGET_PAGE_SIZE);
493 
494     /* XBZRLE encoding (if there is no overflow) */
495     encoded_len = xbzrle_encode_buffer(prev_cached_page, XBZRLE.current_buf,
496                                        TARGET_PAGE_SIZE, XBZRLE.encoded_buf,
497                                        TARGET_PAGE_SIZE);
498     if (encoded_len == 0) {
499         DPRINTF("Skipping unmodified page\n");
500         return 0;
501     } else if (encoded_len == -1) {
502         DPRINTF("Overflow\n");
503         acct_info.xbzrle_overflows++;
504         /* update data in the cache */
505         if (!last_stage) {
506             memcpy(prev_cached_page, *current_data, TARGET_PAGE_SIZE);
507             *current_data = prev_cached_page;
508         }
509         return -1;
510     }
511 
512     /* we need to update the data in the cache, in order to get the same data */
513     if (!last_stage) {
514         memcpy(prev_cached_page, XBZRLE.current_buf, TARGET_PAGE_SIZE);
515     }
516 
517     /* Send XBZRLE based compressed page */
518     bytes_xbzrle = save_page_header(f, block, offset | RAM_SAVE_FLAG_XBZRLE);
519     qemu_put_byte(f, ENCODING_FLAG_XBZRLE);
520     qemu_put_be16(f, encoded_len);
521     qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len);
522     bytes_xbzrle += encoded_len + 1 + 2;
523     acct_info.xbzrle_pages++;
524     acct_info.xbzrle_bytes += bytes_xbzrle;
525     *bytes_transferred += bytes_xbzrle;
526 
527     return 1;
528 }
529 
530 /* Called with rcu_read_lock() to protect migration_bitmap */
531 static inline
532 ram_addr_t migration_bitmap_find_and_reset_dirty(RAMBlock *rb,
533                                                  ram_addr_t start)
534 {
535     unsigned long base = rb->offset >> TARGET_PAGE_BITS;
536     unsigned long nr = base + (start >> TARGET_PAGE_BITS);
537     uint64_t rb_size = rb->used_length;
538     unsigned long size = base + (rb_size >> TARGET_PAGE_BITS);
539     unsigned long *bitmap;
540 
541     unsigned long next;
542 
543     bitmap = atomic_rcu_read(&migration_bitmap);
544     if (ram_bulk_stage && nr > base) {
545         next = nr + 1;
546     } else {
547         next = find_next_bit(bitmap, size, nr);
548     }
549 
550     if (next < size) {
551         clear_bit(next, bitmap);
552         migration_dirty_pages--;
553     }
554     return (next - base) << TARGET_PAGE_BITS;
555 }
556 
557 /* Called with rcu_read_lock() to protect migration_bitmap */
558 static void migration_bitmap_sync_range(ram_addr_t start, ram_addr_t length)
559 {
560     unsigned long *bitmap;
561     bitmap = atomic_rcu_read(&migration_bitmap);
562     migration_dirty_pages +=
563         cpu_physical_memory_sync_dirty_bitmap(bitmap, start, length);
564 }
565 
566 /* Fix me: there are too many global variables used in migration process. */
567 static int64_t start_time;
568 static int64_t bytes_xfer_prev;
569 static int64_t num_dirty_pages_period;
570 static uint64_t xbzrle_cache_miss_prev;
571 static uint64_t iterations_prev;
572 
573 static void migration_bitmap_sync_init(void)
574 {
575     start_time = 0;
576     bytes_xfer_prev = 0;
577     num_dirty_pages_period = 0;
578     xbzrle_cache_miss_prev = 0;
579     iterations_prev = 0;
580 }
581 
582 /* Called with iothread lock held, to protect ram_list.dirty_memory[] */
583 static void migration_bitmap_sync(void)
584 {
585     RAMBlock *block;
586     uint64_t num_dirty_pages_init = migration_dirty_pages;
587     MigrationState *s = migrate_get_current();
588     int64_t end_time;
589     int64_t bytes_xfer_now;
590 
591     bitmap_sync_count++;
592 
593     if (!bytes_xfer_prev) {
594         bytes_xfer_prev = ram_bytes_transferred();
595     }
596 
597     if (!start_time) {
598         start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
599     }
600 
601     trace_migration_bitmap_sync_start();
602     address_space_sync_dirty_bitmap(&address_space_memory);
603 
604     qemu_mutex_lock(&migration_bitmap_mutex);
605     rcu_read_lock();
606     QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
607         migration_bitmap_sync_range(block->offset, block->used_length);
608     }
609     rcu_read_unlock();
610     qemu_mutex_unlock(&migration_bitmap_mutex);
611 
612     trace_migration_bitmap_sync_end(migration_dirty_pages
613                                     - num_dirty_pages_init);
614     num_dirty_pages_period += migration_dirty_pages - num_dirty_pages_init;
615     end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
616 
617     /* more than 1 second = 1000 millisecons */
618     if (end_time > start_time + 1000) {
619         if (migrate_auto_converge()) {
620             /* The following detection logic can be refined later. For now:
621                Check to see if the dirtied bytes is 50% more than the approx.
622                amount of bytes that just got transferred since the last time we
623                were in this routine. If that happens twice, start or increase
624                throttling */
625             bytes_xfer_now = ram_bytes_transferred();
626 
627             if (s->dirty_pages_rate &&
628                (num_dirty_pages_period * TARGET_PAGE_SIZE >
629                    (bytes_xfer_now - bytes_xfer_prev)/2) &&
630                (dirty_rate_high_cnt++ >= 2)) {
631                     trace_migration_throttle();
632                     dirty_rate_high_cnt = 0;
633                     mig_throttle_guest_down();
634              }
635              bytes_xfer_prev = bytes_xfer_now;
636         }
637 
638         if (migrate_use_xbzrle()) {
639             if (iterations_prev != acct_info.iterations) {
640                 acct_info.xbzrle_cache_miss_rate =
641                    (double)(acct_info.xbzrle_cache_miss -
642                             xbzrle_cache_miss_prev) /
643                    (acct_info.iterations - iterations_prev);
644             }
645             iterations_prev = acct_info.iterations;
646             xbzrle_cache_miss_prev = acct_info.xbzrle_cache_miss;
647         }
648         s->dirty_pages_rate = num_dirty_pages_period * 1000
649             / (end_time - start_time);
650         s->dirty_bytes_rate = s->dirty_pages_rate * TARGET_PAGE_SIZE;
651         start_time = end_time;
652         num_dirty_pages_period = 0;
653     }
654     s->dirty_sync_count = bitmap_sync_count;
655 }
656 
657 /**
658  * save_zero_page: Send the zero page to the stream
659  *
660  * Returns: Number of pages written.
661  *
662  * @f: QEMUFile where to send the data
663  * @block: block that contains the page we want to send
664  * @offset: offset inside the block for the page
665  * @p: pointer to the page
666  * @bytes_transferred: increase it with the number of transferred bytes
667  */
668 static int save_zero_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
669                           uint8_t *p, uint64_t *bytes_transferred)
670 {
671     int pages = -1;
672 
673     if (is_zero_range(p, TARGET_PAGE_SIZE)) {
674         acct_info.dup_pages++;
675         *bytes_transferred += save_page_header(f, block,
676                                                offset | RAM_SAVE_FLAG_COMPRESS);
677         qemu_put_byte(f, 0);
678         *bytes_transferred += 1;
679         pages = 1;
680     }
681 
682     return pages;
683 }
684 
685 /**
686  * ram_save_page: Send the given page to the stream
687  *
688  * Returns: Number of pages written.
689  *
690  * @f: QEMUFile where to send the data
691  * @block: block that contains the page we want to send
692  * @offset: offset inside the block for the page
693  * @last_stage: if we are at the completion stage
694  * @bytes_transferred: increase it with the number of transferred bytes
695  */
696 static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
697                          bool last_stage, uint64_t *bytes_transferred)
698 {
699     int pages = -1;
700     uint64_t bytes_xmit;
701     ram_addr_t current_addr;
702     uint8_t *p;
703     int ret;
704     bool send_async = true;
705 
706     p = block->host + offset;
707 
708     /* In doubt sent page as normal */
709     bytes_xmit = 0;
710     ret = ram_control_save_page(f, block->offset,
711                            offset, TARGET_PAGE_SIZE, &bytes_xmit);
712     if (bytes_xmit) {
713         *bytes_transferred += bytes_xmit;
714         pages = 1;
715     }
716 
717     XBZRLE_cache_lock();
718 
719     current_addr = block->offset + offset;
720 
721     if (block == last_sent_block) {
722         offset |= RAM_SAVE_FLAG_CONTINUE;
723     }
724     if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
725         if (ret != RAM_SAVE_CONTROL_DELAYED) {
726             if (bytes_xmit > 0) {
727                 acct_info.norm_pages++;
728             } else if (bytes_xmit == 0) {
729                 acct_info.dup_pages++;
730             }
731         }
732     } else {
733         pages = save_zero_page(f, block, offset, p, bytes_transferred);
734         if (pages > 0) {
735             /* Must let xbzrle know, otherwise a previous (now 0'd) cached
736              * page would be stale
737              */
738             xbzrle_cache_zero_page(current_addr);
739         } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
740             pages = save_xbzrle_page(f, &p, current_addr, block,
741                                      offset, last_stage, bytes_transferred);
742             if (!last_stage) {
743                 /* Can't send this cached data async, since the cache page
744                  * might get updated before it gets to the wire
745                  */
746                 send_async = false;
747             }
748         }
749     }
750 
751     /* XBZRLE overflow or normal page */
752     if (pages == -1) {
753         *bytes_transferred += save_page_header(f, block,
754                                                offset | RAM_SAVE_FLAG_PAGE);
755         if (send_async) {
756             qemu_put_buffer_async(f, p, TARGET_PAGE_SIZE);
757         } else {
758             qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
759         }
760         *bytes_transferred += TARGET_PAGE_SIZE;
761         pages = 1;
762         acct_info.norm_pages++;
763     }
764 
765     XBZRLE_cache_unlock();
766 
767     return pages;
768 }
769 
770 static int do_compress_ram_page(CompressParam *param)
771 {
772     int bytes_sent, blen;
773     uint8_t *p;
774     RAMBlock *block = param->block;
775     ram_addr_t offset = param->offset;
776 
777     p = block->host + (offset & TARGET_PAGE_MASK);
778 
779     bytes_sent = save_page_header(param->file, block, offset |
780                                   RAM_SAVE_FLAG_COMPRESS_PAGE);
781     blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
782                                      migrate_compress_level());
783     bytes_sent += blen;
784 
785     return bytes_sent;
786 }
787 
788 static inline void start_compression(CompressParam *param)
789 {
790     param->done = false;
791     qemu_mutex_lock(&param->mutex);
792     param->start = true;
793     qemu_cond_signal(&param->cond);
794     qemu_mutex_unlock(&param->mutex);
795 }
796 
797 static inline void start_decompression(DecompressParam *param)
798 {
799     qemu_mutex_lock(&param->mutex);
800     param->start = true;
801     qemu_cond_signal(&param->cond);
802     qemu_mutex_unlock(&param->mutex);
803 }
804 
805 static uint64_t bytes_transferred;
806 
807 static void flush_compressed_data(QEMUFile *f)
808 {
809     int idx, len, thread_count;
810 
811     if (!migrate_use_compression()) {
812         return;
813     }
814     thread_count = migrate_compress_threads();
815     for (idx = 0; idx < thread_count; idx++) {
816         if (!comp_param[idx].done) {
817             qemu_mutex_lock(comp_done_lock);
818             while (!comp_param[idx].done && !quit_comp_thread) {
819                 qemu_cond_wait(comp_done_cond, comp_done_lock);
820             }
821             qemu_mutex_unlock(comp_done_lock);
822         }
823         if (!quit_comp_thread) {
824             len = qemu_put_qemu_file(f, comp_param[idx].file);
825             bytes_transferred += len;
826         }
827     }
828 }
829 
830 static inline void set_compress_params(CompressParam *param, RAMBlock *block,
831                                        ram_addr_t offset)
832 {
833     param->block = block;
834     param->offset = offset;
835 }
836 
837 static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
838                                            ram_addr_t offset,
839                                            uint64_t *bytes_transferred)
840 {
841     int idx, thread_count, bytes_xmit = -1, pages = -1;
842 
843     thread_count = migrate_compress_threads();
844     qemu_mutex_lock(comp_done_lock);
845     while (true) {
846         for (idx = 0; idx < thread_count; idx++) {
847             if (comp_param[idx].done) {
848                 bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
849                 set_compress_params(&comp_param[idx], block, offset);
850                 start_compression(&comp_param[idx]);
851                 pages = 1;
852                 acct_info.norm_pages++;
853                 *bytes_transferred += bytes_xmit;
854                 break;
855             }
856         }
857         if (pages > 0) {
858             break;
859         } else {
860             qemu_cond_wait(comp_done_cond, comp_done_lock);
861         }
862     }
863     qemu_mutex_unlock(comp_done_lock);
864 
865     return pages;
866 }
867 
868 /**
869  * ram_save_compressed_page: compress the given page and send it to the stream
870  *
871  * Returns: Number of pages written.
872  *
873  * @f: QEMUFile where to send the data
874  * @block: block that contains the page we want to send
875  * @offset: offset inside the block for the page
876  * @last_stage: if we are at the completion stage
877  * @bytes_transferred: increase it with the number of transferred bytes
878  */
879 static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
880                                     ram_addr_t offset, bool last_stage,
881                                     uint64_t *bytes_transferred)
882 {
883     int pages = -1;
884     uint64_t bytes_xmit;
885     uint8_t *p;
886     int ret;
887 
888     p = block->host + offset;
889 
890     bytes_xmit = 0;
891     ret = ram_control_save_page(f, block->offset,
892                                 offset, TARGET_PAGE_SIZE, &bytes_xmit);
893     if (bytes_xmit) {
894         *bytes_transferred += bytes_xmit;
895         pages = 1;
896     }
897     if (block == last_sent_block) {
898         offset |= RAM_SAVE_FLAG_CONTINUE;
899     }
900     if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
901         if (ret != RAM_SAVE_CONTROL_DELAYED) {
902             if (bytes_xmit > 0) {
903                 acct_info.norm_pages++;
904             } else if (bytes_xmit == 0) {
905                 acct_info.dup_pages++;
906             }
907         }
908     } else {
909         /* When starting the process of a new block, the first page of
910          * the block should be sent out before other pages in the same
911          * block, and all the pages in last block should have been sent
912          * out, keeping this order is important, because the 'cont' flag
913          * is used to avoid resending the block name.
914          */
915         if (block != last_sent_block) {
916             flush_compressed_data(f);
917             pages = save_zero_page(f, block, offset, p, bytes_transferred);
918             if (pages == -1) {
919                 set_compress_params(&comp_param[0], block, offset);
920                 /* Use the qemu thread to compress the data to make sure the
921                  * first page is sent out before other pages
922                  */
923                 bytes_xmit = do_compress_ram_page(&comp_param[0]);
924                 acct_info.norm_pages++;
925                 qemu_put_qemu_file(f, comp_param[0].file);
926                 *bytes_transferred += bytes_xmit;
927                 pages = 1;
928             }
929         } else {
930             pages = save_zero_page(f, block, offset, p, bytes_transferred);
931             if (pages == -1) {
932                 pages = compress_page_with_multi_thread(f, block, offset,
933                                                         bytes_transferred);
934             }
935         }
936     }
937 
938     return pages;
939 }
940 
941 /*
942  * Find the next dirty page and update any state associated with
943  * the search process.
944  *
945  * Returns: True if a page is found
946  *
947  * @f: Current migration stream.
948  * @pss: Data about the state of the current dirty page scan.
949  * @*again: Set to false if the search has scanned the whole of RAM
950  */
951 static bool find_dirty_block(QEMUFile *f, PageSearchStatus *pss,
952                              bool *again)
953 {
954     pss->offset = migration_bitmap_find_and_reset_dirty(pss->block,
955                                                        pss->offset);
956     if (pss->complete_round && pss->block == last_seen_block &&
957         pss->offset >= last_offset) {
958         /*
959          * We've been once around the RAM and haven't found anything.
960          * Give up.
961          */
962         *again = false;
963         return false;
964     }
965     if (pss->offset >= pss->block->used_length) {
966         /* Didn't find anything in this RAM Block */
967         pss->offset = 0;
968         pss->block = QLIST_NEXT_RCU(pss->block, next);
969         if (!pss->block) {
970             /* Hit the end of the list */
971             pss->block = QLIST_FIRST_RCU(&ram_list.blocks);
972             /* Flag that we've looped */
973             pss->complete_round = true;
974             ram_bulk_stage = false;
975             if (migrate_use_xbzrle()) {
976                 /* If xbzrle is on, stop using the data compression at this
977                  * point. In theory, xbzrle can do better than compression.
978                  */
979                 flush_compressed_data(f);
980                 compression_switch = false;
981             }
982         }
983         /* Didn't find anything this time, but try again on the new block */
984         *again = true;
985         return false;
986     } else {
987         /* Can go around again, but... */
988         *again = true;
989         /* We've found something so probably don't need to */
990         return true;
991     }
992 }
993 
994 /**
995  * ram_find_and_save_block: Finds a dirty page and sends it to f
996  *
997  * Called within an RCU critical section.
998  *
999  * Returns:  The number of pages written
1000  *           0 means no dirty pages
1001  *
1002  * @f: QEMUFile where to send the data
1003  * @last_stage: if we are at the completion stage
1004  * @bytes_transferred: increase it with the number of transferred bytes
1005  */
1006 
1007 static int ram_find_and_save_block(QEMUFile *f, bool last_stage,
1008                                    uint64_t *bytes_transferred)
1009 {
1010     PageSearchStatus pss;
1011     int pages = 0;
1012     bool again, found;
1013 
1014     pss.block = last_seen_block;
1015     pss.offset = last_offset;
1016     pss.complete_round = false;
1017 
1018     if (!pss.block) {
1019         pss.block = QLIST_FIRST_RCU(&ram_list.blocks);
1020     }
1021 
1022     do {
1023         found = find_dirty_block(f, &pss, &again);
1024 
1025         if (found) {
1026             if (compression_switch && migrate_use_compression()) {
1027                 pages = ram_save_compressed_page(f, pss.block, pss.offset,
1028                                                  last_stage,
1029                                                  bytes_transferred);
1030             } else {
1031                 pages = ram_save_page(f, pss.block, pss.offset, last_stage,
1032                                       bytes_transferred);
1033             }
1034 
1035             /* if page is unmodified, continue to the next */
1036             if (pages > 0) {
1037                 last_sent_block = pss.block;
1038             }
1039         }
1040     } while (!pages && again);
1041 
1042     last_seen_block = pss.block;
1043     last_offset = pss.offset;
1044 
1045     return pages;
1046 }
1047 
1048 void acct_update_position(QEMUFile *f, size_t size, bool zero)
1049 {
1050     uint64_t pages = size / TARGET_PAGE_SIZE;
1051     if (zero) {
1052         acct_info.dup_pages += pages;
1053     } else {
1054         acct_info.norm_pages += pages;
1055         bytes_transferred += size;
1056         qemu_update_position(f, size);
1057     }
1058 }
1059 
1060 static ram_addr_t ram_save_remaining(void)
1061 {
1062     return migration_dirty_pages;
1063 }
1064 
1065 uint64_t ram_bytes_remaining(void)
1066 {
1067     return ram_save_remaining() * TARGET_PAGE_SIZE;
1068 }
1069 
1070 uint64_t ram_bytes_transferred(void)
1071 {
1072     return bytes_transferred;
1073 }
1074 
1075 uint64_t ram_bytes_total(void)
1076 {
1077     RAMBlock *block;
1078     uint64_t total = 0;
1079 
1080     rcu_read_lock();
1081     QLIST_FOREACH_RCU(block, &ram_list.blocks, next)
1082         total += block->used_length;
1083     rcu_read_unlock();
1084     return total;
1085 }
1086 
1087 void free_xbzrle_decoded_buf(void)
1088 {
1089     g_free(xbzrle_decoded_buf);
1090     xbzrle_decoded_buf = NULL;
1091 }
1092 
1093 static void migration_end(void)
1094 {
1095     /* caller have hold iothread lock or is in a bh, so there is
1096      * no writing race against this migration_bitmap
1097      */
1098     unsigned long *bitmap = migration_bitmap;
1099     atomic_rcu_set(&migration_bitmap, NULL);
1100     if (bitmap) {
1101         memory_global_dirty_log_stop();
1102         synchronize_rcu();
1103         g_free(bitmap);
1104     }
1105 
1106     XBZRLE_cache_lock();
1107     if (XBZRLE.cache) {
1108         cache_fini(XBZRLE.cache);
1109         g_free(XBZRLE.encoded_buf);
1110         g_free(XBZRLE.current_buf);
1111         XBZRLE.cache = NULL;
1112         XBZRLE.encoded_buf = NULL;
1113         XBZRLE.current_buf = NULL;
1114     }
1115     XBZRLE_cache_unlock();
1116 }
1117 
1118 static void ram_migration_cancel(void *opaque)
1119 {
1120     migration_end();
1121 }
1122 
1123 static void reset_ram_globals(void)
1124 {
1125     last_seen_block = NULL;
1126     last_sent_block = NULL;
1127     last_offset = 0;
1128     last_version = ram_list.version;
1129     ram_bulk_stage = true;
1130 }
1131 
1132 #define MAX_WAIT 50 /* ms, half buffered_file limit */
1133 
1134 void migration_bitmap_extend(ram_addr_t old, ram_addr_t new)
1135 {
1136     /* called in qemu main thread, so there is
1137      * no writing race against this migration_bitmap
1138      */
1139     if (migration_bitmap) {
1140         unsigned long *old_bitmap = migration_bitmap, *bitmap;
1141         bitmap = bitmap_new(new);
1142 
1143         /* prevent migration_bitmap content from being set bit
1144          * by migration_bitmap_sync_range() at the same time.
1145          * it is safe to migration if migration_bitmap is cleared bit
1146          * at the same time.
1147          */
1148         qemu_mutex_lock(&migration_bitmap_mutex);
1149         bitmap_copy(bitmap, old_bitmap, old);
1150         bitmap_set(bitmap, old, new - old);
1151         atomic_rcu_set(&migration_bitmap, bitmap);
1152         qemu_mutex_unlock(&migration_bitmap_mutex);
1153         migration_dirty_pages += new - old;
1154         synchronize_rcu();
1155         g_free(old_bitmap);
1156     }
1157 }
1158 
1159 /* Each of ram_save_setup, ram_save_iterate and ram_save_complete has
1160  * long-running RCU critical section.  When rcu-reclaims in the code
1161  * start to become numerous it will be necessary to reduce the
1162  * granularity of these critical sections.
1163  */
1164 
1165 static int ram_save_setup(QEMUFile *f, void *opaque)
1166 {
1167     RAMBlock *block;
1168     int64_t ram_bitmap_pages; /* Size of bitmap in pages, including gaps */
1169 
1170     dirty_rate_high_cnt = 0;
1171     bitmap_sync_count = 0;
1172     migration_bitmap_sync_init();
1173     qemu_mutex_init(&migration_bitmap_mutex);
1174 
1175     if (migrate_use_xbzrle()) {
1176         XBZRLE_cache_lock();
1177         XBZRLE.cache = cache_init(migrate_xbzrle_cache_size() /
1178                                   TARGET_PAGE_SIZE,
1179                                   TARGET_PAGE_SIZE);
1180         if (!XBZRLE.cache) {
1181             XBZRLE_cache_unlock();
1182             error_report("Error creating cache");
1183             return -1;
1184         }
1185         XBZRLE_cache_unlock();
1186 
1187         /* We prefer not to abort if there is no memory */
1188         XBZRLE.encoded_buf = g_try_malloc0(TARGET_PAGE_SIZE);
1189         if (!XBZRLE.encoded_buf) {
1190             error_report("Error allocating encoded_buf");
1191             return -1;
1192         }
1193 
1194         XBZRLE.current_buf = g_try_malloc(TARGET_PAGE_SIZE);
1195         if (!XBZRLE.current_buf) {
1196             error_report("Error allocating current_buf");
1197             g_free(XBZRLE.encoded_buf);
1198             XBZRLE.encoded_buf = NULL;
1199             return -1;
1200         }
1201 
1202         acct_clear();
1203     }
1204 
1205     /* iothread lock needed for ram_list.dirty_memory[] */
1206     qemu_mutex_lock_iothread();
1207     qemu_mutex_lock_ramlist();
1208     rcu_read_lock();
1209     bytes_transferred = 0;
1210     reset_ram_globals();
1211 
1212     ram_bitmap_pages = last_ram_offset() >> TARGET_PAGE_BITS;
1213     migration_bitmap = bitmap_new(ram_bitmap_pages);
1214     bitmap_set(migration_bitmap, 0, ram_bitmap_pages);
1215 
1216     /*
1217      * Count the total number of pages used by ram blocks not including any
1218      * gaps due to alignment or unplugs.
1219      */
1220     migration_dirty_pages = ram_bytes_total() >> TARGET_PAGE_BITS;
1221 
1222     memory_global_dirty_log_start();
1223     migration_bitmap_sync();
1224     qemu_mutex_unlock_ramlist();
1225     qemu_mutex_unlock_iothread();
1226 
1227     qemu_put_be64(f, ram_bytes_total() | RAM_SAVE_FLAG_MEM_SIZE);
1228 
1229     QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
1230         qemu_put_byte(f, strlen(block->idstr));
1231         qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
1232         qemu_put_be64(f, block->used_length);
1233     }
1234 
1235     rcu_read_unlock();
1236 
1237     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
1238     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
1239 
1240     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
1241 
1242     return 0;
1243 }
1244 
1245 static int ram_save_iterate(QEMUFile *f, void *opaque)
1246 {
1247     int ret;
1248     int i;
1249     int64_t t0;
1250     int pages_sent = 0;
1251 
1252     rcu_read_lock();
1253     if (ram_list.version != last_version) {
1254         reset_ram_globals();
1255     }
1256 
1257     /* Read version before ram_list.blocks */
1258     smp_rmb();
1259 
1260     ram_control_before_iterate(f, RAM_CONTROL_ROUND);
1261 
1262     t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1263     i = 0;
1264     while ((ret = qemu_file_rate_limit(f)) == 0) {
1265         int pages;
1266 
1267         pages = ram_find_and_save_block(f, false, &bytes_transferred);
1268         /* no more pages to sent */
1269         if (pages == 0) {
1270             break;
1271         }
1272         pages_sent += pages;
1273         acct_info.iterations++;
1274 
1275         /* we want to check in the 1st loop, just in case it was the 1st time
1276            and we had to sync the dirty bitmap.
1277            qemu_get_clock_ns() is a bit expensive, so we only check each some
1278            iterations
1279         */
1280         if ((i & 63) == 0) {
1281             uint64_t t1 = (qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - t0) / 1000000;
1282             if (t1 > MAX_WAIT) {
1283                 DPRINTF("big wait: %" PRIu64 " milliseconds, %d iterations\n",
1284                         t1, i);
1285                 break;
1286             }
1287         }
1288         i++;
1289     }
1290     flush_compressed_data(f);
1291     rcu_read_unlock();
1292 
1293     /*
1294      * Must occur before EOS (or any QEMUFile operation)
1295      * because of RDMA protocol.
1296      */
1297     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
1298 
1299     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
1300     bytes_transferred += 8;
1301 
1302     ret = qemu_file_get_error(f);
1303     if (ret < 0) {
1304         return ret;
1305     }
1306 
1307     return pages_sent;
1308 }
1309 
1310 /* Called with iothread lock */
1311 static int ram_save_complete(QEMUFile *f, void *opaque)
1312 {
1313     rcu_read_lock();
1314 
1315     migration_bitmap_sync();
1316 
1317     ram_control_before_iterate(f, RAM_CONTROL_FINISH);
1318 
1319     /* try transferring iterative blocks of memory */
1320 
1321     /* flush all remaining blocks regardless of rate limiting */
1322     while (true) {
1323         int pages;
1324 
1325         pages = ram_find_and_save_block(f, true, &bytes_transferred);
1326         /* no more blocks to sent */
1327         if (pages == 0) {
1328             break;
1329         }
1330     }
1331 
1332     flush_compressed_data(f);
1333     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
1334 
1335     rcu_read_unlock();
1336 
1337     migration_end();
1338     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
1339 
1340     return 0;
1341 }
1342 
1343 static uint64_t ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size)
1344 {
1345     uint64_t remaining_size;
1346 
1347     remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
1348 
1349     if (remaining_size < max_size) {
1350         qemu_mutex_lock_iothread();
1351         rcu_read_lock();
1352         migration_bitmap_sync();
1353         rcu_read_unlock();
1354         qemu_mutex_unlock_iothread();
1355         remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
1356     }
1357     return remaining_size;
1358 }
1359 
1360 static int load_xbzrle(QEMUFile *f, ram_addr_t addr, void *host)
1361 {
1362     unsigned int xh_len;
1363     int xh_flags;
1364 
1365     if (!xbzrle_decoded_buf) {
1366         xbzrle_decoded_buf = g_malloc(TARGET_PAGE_SIZE);
1367     }
1368 
1369     /* extract RLE header */
1370     xh_flags = qemu_get_byte(f);
1371     xh_len = qemu_get_be16(f);
1372 
1373     if (xh_flags != ENCODING_FLAG_XBZRLE) {
1374         error_report("Failed to load XBZRLE page - wrong compression!");
1375         return -1;
1376     }
1377 
1378     if (xh_len > TARGET_PAGE_SIZE) {
1379         error_report("Failed to load XBZRLE page - len overflow!");
1380         return -1;
1381     }
1382     /* load data and decode */
1383     qemu_get_buffer(f, xbzrle_decoded_buf, xh_len);
1384 
1385     /* decode RLE */
1386     if (xbzrle_decode_buffer(xbzrle_decoded_buf, xh_len, host,
1387                              TARGET_PAGE_SIZE) == -1) {
1388         error_report("Failed to load XBZRLE page - decode error!");
1389         return -1;
1390     }
1391 
1392     return 0;
1393 }
1394 
1395 /* Must be called from within a rcu critical section.
1396  * Returns a pointer from within the RCU-protected ram_list.
1397  */
1398 static inline void *host_from_stream_offset(QEMUFile *f,
1399                                             ram_addr_t offset,
1400                                             int flags)
1401 {
1402     static RAMBlock *block = NULL;
1403     char id[256];
1404     uint8_t len;
1405 
1406     if (flags & RAM_SAVE_FLAG_CONTINUE) {
1407         if (!block || block->max_length <= offset) {
1408             error_report("Ack, bad migration stream!");
1409             return NULL;
1410         }
1411 
1412         return block->host + offset;
1413     }
1414 
1415     len = qemu_get_byte(f);
1416     qemu_get_buffer(f, (uint8_t *)id, len);
1417     id[len] = 0;
1418 
1419     QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
1420         if (!strncmp(id, block->idstr, sizeof(id)) &&
1421             block->max_length > offset) {
1422             return block->host + offset;
1423         }
1424     }
1425 
1426     error_report("Can't find block %s!", id);
1427     return NULL;
1428 }
1429 
1430 /*
1431  * If a page (or a whole RDMA chunk) has been
1432  * determined to be zero, then zap it.
1433  */
1434 void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
1435 {
1436     if (ch != 0 || !is_zero_range(host, size)) {
1437         memset(host, ch, size);
1438     }
1439 }
1440 
1441 static void *do_data_decompress(void *opaque)
1442 {
1443     DecompressParam *param = opaque;
1444     unsigned long pagesize;
1445 
1446     while (!quit_decomp_thread) {
1447         qemu_mutex_lock(&param->mutex);
1448         while (!param->start && !quit_decomp_thread) {
1449             qemu_cond_wait(&param->cond, &param->mutex);
1450             pagesize = TARGET_PAGE_SIZE;
1451             if (!quit_decomp_thread) {
1452                 /* uncompress() will return failed in some case, especially
1453                  * when the page is dirted when doing the compression, it's
1454                  * not a problem because the dirty page will be retransferred
1455                  * and uncompress() won't break the data in other pages.
1456                  */
1457                 uncompress((Bytef *)param->des, &pagesize,
1458                            (const Bytef *)param->compbuf, param->len);
1459             }
1460             param->start = false;
1461         }
1462         qemu_mutex_unlock(&param->mutex);
1463     }
1464 
1465     return NULL;
1466 }
1467 
1468 void migrate_decompress_threads_create(void)
1469 {
1470     int i, thread_count;
1471 
1472     thread_count = migrate_decompress_threads();
1473     decompress_threads = g_new0(QemuThread, thread_count);
1474     decomp_param = g_new0(DecompressParam, thread_count);
1475     compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
1476     quit_decomp_thread = false;
1477     for (i = 0; i < thread_count; i++) {
1478         qemu_mutex_init(&decomp_param[i].mutex);
1479         qemu_cond_init(&decomp_param[i].cond);
1480         decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
1481         qemu_thread_create(decompress_threads + i, "decompress",
1482                            do_data_decompress, decomp_param + i,
1483                            QEMU_THREAD_JOINABLE);
1484     }
1485 }
1486 
1487 void migrate_decompress_threads_join(void)
1488 {
1489     int i, thread_count;
1490 
1491     quit_decomp_thread = true;
1492     thread_count = migrate_decompress_threads();
1493     for (i = 0; i < thread_count; i++) {
1494         qemu_mutex_lock(&decomp_param[i].mutex);
1495         qemu_cond_signal(&decomp_param[i].cond);
1496         qemu_mutex_unlock(&decomp_param[i].mutex);
1497     }
1498     for (i = 0; i < thread_count; i++) {
1499         qemu_thread_join(decompress_threads + i);
1500         qemu_mutex_destroy(&decomp_param[i].mutex);
1501         qemu_cond_destroy(&decomp_param[i].cond);
1502         g_free(decomp_param[i].compbuf);
1503     }
1504     g_free(decompress_threads);
1505     g_free(decomp_param);
1506     g_free(compressed_data_buf);
1507     decompress_threads = NULL;
1508     decomp_param = NULL;
1509     compressed_data_buf = NULL;
1510 }
1511 
1512 static void decompress_data_with_multi_threads(uint8_t *compbuf,
1513                                                void *host, int len)
1514 {
1515     int idx, thread_count;
1516 
1517     thread_count = migrate_decompress_threads();
1518     while (true) {
1519         for (idx = 0; idx < thread_count; idx++) {
1520             if (!decomp_param[idx].start) {
1521                 memcpy(decomp_param[idx].compbuf, compbuf, len);
1522                 decomp_param[idx].des = host;
1523                 decomp_param[idx].len = len;
1524                 start_decompression(&decomp_param[idx]);
1525                 break;
1526             }
1527         }
1528         if (idx < thread_count) {
1529             break;
1530         }
1531     }
1532 }
1533 
1534 static int ram_load(QEMUFile *f, void *opaque, int version_id)
1535 {
1536     int flags = 0, ret = 0;
1537     static uint64_t seq_iter;
1538     int len = 0;
1539 
1540     seq_iter++;
1541 
1542     if (version_id != 4) {
1543         ret = -EINVAL;
1544     }
1545 
1546     /* This RCU critical section can be very long running.
1547      * When RCU reclaims in the code start to become numerous,
1548      * it will be necessary to reduce the granularity of this
1549      * critical section.
1550      */
1551     rcu_read_lock();
1552     while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
1553         ram_addr_t addr, total_ram_bytes;
1554         void *host;
1555         uint8_t ch;
1556 
1557         addr = qemu_get_be64(f);
1558         flags = addr & ~TARGET_PAGE_MASK;
1559         addr &= TARGET_PAGE_MASK;
1560 
1561         switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
1562         case RAM_SAVE_FLAG_MEM_SIZE:
1563             /* Synchronize RAM block list */
1564             total_ram_bytes = addr;
1565             while (!ret && total_ram_bytes) {
1566                 RAMBlock *block;
1567                 char id[256];
1568                 ram_addr_t length;
1569 
1570                 len = qemu_get_byte(f);
1571                 qemu_get_buffer(f, (uint8_t *)id, len);
1572                 id[len] = 0;
1573                 length = qemu_get_be64(f);
1574 
1575                 QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
1576                     if (!strncmp(id, block->idstr, sizeof(id))) {
1577                         if (length != block->used_length) {
1578                             Error *local_err = NULL;
1579 
1580                             ret = qemu_ram_resize(block->offset, length, &local_err);
1581                             if (local_err) {
1582                                 error_report_err(local_err);
1583                             }
1584                         }
1585                         ram_control_load_hook(f, RAM_CONTROL_BLOCK_REG,
1586                                               block->idstr);
1587                         break;
1588                     }
1589                 }
1590 
1591                 if (!block) {
1592                     error_report("Unknown ramblock \"%s\", cannot "
1593                                  "accept migration", id);
1594                     ret = -EINVAL;
1595                 }
1596 
1597                 total_ram_bytes -= length;
1598             }
1599             break;
1600         case RAM_SAVE_FLAG_COMPRESS:
1601             host = host_from_stream_offset(f, addr, flags);
1602             if (!host) {
1603                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
1604                 ret = -EINVAL;
1605                 break;
1606             }
1607             ch = qemu_get_byte(f);
1608             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
1609             break;
1610         case RAM_SAVE_FLAG_PAGE:
1611             host = host_from_stream_offset(f, addr, flags);
1612             if (!host) {
1613                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
1614                 ret = -EINVAL;
1615                 break;
1616             }
1617             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
1618             break;
1619         case RAM_SAVE_FLAG_COMPRESS_PAGE:
1620             host = host_from_stream_offset(f, addr, flags);
1621             if (!host) {
1622                 error_report("Invalid RAM offset " RAM_ADDR_FMT, addr);
1623                 ret = -EINVAL;
1624                 break;
1625             }
1626 
1627             len = qemu_get_be32(f);
1628             if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
1629                 error_report("Invalid compressed data length: %d", len);
1630                 ret = -EINVAL;
1631                 break;
1632             }
1633             qemu_get_buffer(f, compressed_data_buf, len);
1634             decompress_data_with_multi_threads(compressed_data_buf, host, len);
1635             break;
1636         case RAM_SAVE_FLAG_XBZRLE:
1637             host = host_from_stream_offset(f, addr, flags);
1638             if (!host) {
1639                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
1640                 ret = -EINVAL;
1641                 break;
1642             }
1643             if (load_xbzrle(f, addr, host) < 0) {
1644                 error_report("Failed to decompress XBZRLE page at "
1645                              RAM_ADDR_FMT, addr);
1646                 ret = -EINVAL;
1647                 break;
1648             }
1649             break;
1650         case RAM_SAVE_FLAG_EOS:
1651             /* normal exit */
1652             break;
1653         default:
1654             if (flags & RAM_SAVE_FLAG_HOOK) {
1655                 ram_control_load_hook(f, RAM_CONTROL_HOOK, NULL);
1656             } else {
1657                 error_report("Unknown combination of migration flags: %#x",
1658                              flags);
1659                 ret = -EINVAL;
1660             }
1661         }
1662         if (!ret) {
1663             ret = qemu_file_get_error(f);
1664         }
1665     }
1666 
1667     rcu_read_unlock();
1668     DPRINTF("Completed load of VM with exit code %d seq iteration "
1669             "%" PRIu64 "\n", ret, seq_iter);
1670     return ret;
1671 }
1672 
1673 static SaveVMHandlers savevm_ram_handlers = {
1674     .save_live_setup = ram_save_setup,
1675     .save_live_iterate = ram_save_iterate,
1676     .save_live_complete = ram_save_complete,
1677     .save_live_pending = ram_save_pending,
1678     .load_state = ram_load,
1679     .cancel = ram_migration_cancel,
1680 };
1681 
1682 void ram_mig_init(void)
1683 {
1684     qemu_mutex_init(&XBZRLE.lock);
1685     register_savevm_live(NULL, "ram", 0, 4, &savevm_ram_handlers, NULL);
1686 }
1687