xref: /qemu/tests/migration-stress/guestperf/engine.py (revision 513823e7521a09ed7ad1e32e6454bac3b2cbf52d)
1#
2# Migration test main engine
3#
4# Copyright (c) 2016 Red Hat, Inc.
5#
6# This library is free software; you can redistribute it and/or
7# modify it under the terms of the GNU Lesser General Public
8# License as published by the Free Software Foundation; either
9# version 2.1 of the License, or (at your option) any later version.
10#
11# This library is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14# Lesser General Public License for more details.
15#
16# You should have received a copy of the GNU Lesser General Public
17# License along with this library; if not, see <http://www.gnu.org/licenses/>.
18#
19
20
21import os
22import re
23import sys
24import time
25
26from guestperf.progress import Progress, ProgressStats
27from guestperf.report import Report, ReportResult
28from guestperf.timings import TimingRecord, Timings
29
30sys.path.append(os.path.join(os.path.dirname(__file__),
31                             '..', '..', '..', 'python'))
32from qemu.machine import QEMUMachine
33
34# multifd supported compression algorithms
35MULTIFD_CMP_ALGS = ("zlib", "zstd", "qpl", "uadk")
36
37class Engine(object):
38
39    def __init__(self, binary, dst_host, kernel, initrd, transport="tcp",
40                 sleep=15, verbose=False, debug=False):
41
42        self._binary = binary # Path to QEMU binary
43        self._dst_host = dst_host # Hostname of target host
44        self._kernel = kernel # Path to kernel image
45        self._initrd = initrd # Path to stress initrd
46        self._transport = transport # 'unix' or 'tcp' or 'rdma'
47        self._sleep = sleep
48        self._verbose = verbose
49        self._debug = debug
50
51        if debug:
52            self._verbose = debug
53
54    def _vcpu_timing(self, pid, tid_list):
55        records = []
56        now = time.time()
57
58        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
59        for tid in tid_list:
60            statfile = "/proc/%d/task/%d/stat" % (pid, tid)
61            with open(statfile, "r") as fh:
62                stat = fh.readline()
63                fields = stat.split(" ")
64                stime = int(fields[13])
65                utime = int(fields[14])
66                records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec))
67        return records
68
69    def _cpu_timing(self, pid):
70        now = time.time()
71
72        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
73        statfile = "/proc/%d/stat" % pid
74        with open(statfile, "r") as fh:
75            stat = fh.readline()
76            fields = stat.split(" ")
77            stime = int(fields[13])
78            utime = int(fields[14])
79            return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec)
80
81    def _migrate_progress(self, vm):
82        info = vm.cmd("query-migrate")
83
84        if "ram" not in info:
85            info["ram"] = {}
86
87        return Progress(
88            info.get("status", "active"),
89            ProgressStats(
90                info["ram"].get("transferred", 0),
91                info["ram"].get("remaining", 0),
92                info["ram"].get("total", 0),
93                info["ram"].get("duplicate", 0),
94                info["ram"].get("skipped", 0),
95                info["ram"].get("normal", 0),
96                info["ram"].get("normal-bytes", 0),
97                info["ram"].get("dirty-pages-rate", 0),
98                info["ram"].get("mbps", 0),
99                info["ram"].get("dirty-sync-count", 0)
100            ),
101            time.time(),
102            info.get("total-time", 0),
103            info.get("downtime", 0),
104            info.get("expected-downtime", 0),
105            info.get("setup-time", 0),
106            info.get("cpu-throttle-percentage", 0),
107            info.get("dirty-limit-throttle-time-per-round", 0),
108            info.get("dirty-limit-ring-full-time", 0),
109        )
110
111    def _migrate(self, hardware, scenario, src,
112                 dst, connect_uri, defer_migrate):
113        src_qemu_time = []
114        src_vcpu_time = []
115        src_pid = src.get_pid()
116
117        vcpus = src.cmd("query-cpus-fast")
118        src_threads = []
119        for vcpu in vcpus:
120            src_threads.append(vcpu["thread-id"])
121
122        # XXX how to get dst timings on remote host ?
123
124        if self._verbose:
125            print("Sleeping %d seconds for initial guest workload run" % self._sleep)
126        sleep_secs = self._sleep
127        while sleep_secs > 1:
128            src_qemu_time.append(self._cpu_timing(src_pid))
129            src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
130            time.sleep(1)
131            sleep_secs -= 1
132
133        if self._verbose:
134            print("Starting migration")
135        if scenario._auto_converge:
136            resp = src.cmd("migrate-set-capabilities",
137                           capabilities = [
138                               { "capability": "auto-converge",
139                                 "state": True }
140                           ])
141            resp = src.cmd("migrate-set-parameters",
142                           cpu_throttle_increment=scenario._auto_converge_step)
143
144        if scenario._post_copy:
145            resp = src.cmd("migrate-set-capabilities",
146                           capabilities = [
147                               { "capability": "postcopy-ram",
148                                 "state": True }
149                           ])
150            resp = dst.cmd("migrate-set-capabilities",
151                           capabilities = [
152                               { "capability": "postcopy-ram",
153                                 "state": True }
154                           ])
155
156        resp = src.cmd("migrate-set-parameters",
157                       max_bandwidth=scenario._bandwidth * 1024 * 1024)
158
159        resp = src.cmd("migrate-set-parameters",
160                       downtime_limit=scenario._downtime)
161
162        if scenario._compression_mt:
163            resp = src.cmd("migrate-set-capabilities",
164                           capabilities = [
165                               { "capability": "compress",
166                                 "state": True }
167                           ])
168            resp = src.cmd("migrate-set-parameters",
169                           compress_threads=scenario._compression_mt_threads)
170            resp = dst.cmd("migrate-set-capabilities",
171                           capabilities = [
172                               { "capability": "compress",
173                                 "state": True }
174                           ])
175            resp = dst.cmd("migrate-set-parameters",
176                           decompress_threads=scenario._compression_mt_threads)
177
178        if scenario._compression_xbzrle:
179            resp = src.cmd("migrate-set-capabilities",
180                           capabilities = [
181                               { "capability": "xbzrle",
182                                 "state": True }
183                           ])
184            resp = dst.cmd("migrate-set-capabilities",
185                           capabilities = [
186                               { "capability": "xbzrle",
187                                 "state": True }
188                           ])
189            resp = src.cmd("migrate-set-parameters",
190                           xbzrle_cache_size=(
191                               hardware._mem *
192                               1024 * 1024 * 1024 / 100 *
193                               scenario._compression_xbzrle_cache))
194
195        if scenario._multifd:
196            if (scenario._multifd_compression and
197                (scenario._multifd_compression not in MULTIFD_CMP_ALGS)):
198                    raise Exception("unsupported multifd compression "
199                                    "algorithm: %s" %
200                                    scenario._multifd_compression)
201
202            resp = src.cmd("migrate-set-capabilities",
203                           capabilities = [
204                               { "capability": "multifd",
205                                 "state": True }
206                           ])
207            resp = src.cmd("migrate-set-parameters",
208                           multifd_channels=scenario._multifd_channels)
209            resp = dst.cmd("migrate-set-capabilities",
210                           capabilities = [
211                               { "capability": "multifd",
212                                 "state": True }
213                           ])
214            resp = dst.cmd("migrate-set-parameters",
215                           multifd_channels=scenario._multifd_channels)
216
217            if scenario._multifd_compression:
218                resp = src.cmd("migrate-set-parameters",
219                    multifd_compression=scenario._multifd_compression)
220                resp = dst.cmd("migrate-set-parameters",
221                    multifd_compression=scenario._multifd_compression)
222
223        if scenario._dirty_limit:
224            if not hardware._dirty_ring_size:
225                raise Exception("dirty ring size must be configured when "
226                                "testing dirty limit migration")
227
228            resp = src.cmd("migrate-set-capabilities",
229                           capabilities = [
230                               { "capability": "dirty-limit",
231                                 "state": True }
232                           ])
233            resp = src.cmd("migrate-set-parameters",
234                x_vcpu_dirty_limit_period=scenario._x_vcpu_dirty_limit_period)
235            resp = src.cmd("migrate-set-parameters",
236                           vcpu_dirty_limit=scenario._vcpu_dirty_limit)
237
238        if defer_migrate:
239            resp = dst.cmd("migrate-incoming", uri=connect_uri)
240        resp = src.cmd("migrate", uri=connect_uri)
241
242        post_copy = False
243        paused = False
244
245        progress_history = []
246
247        start = time.time()
248        loop = 0
249        while True:
250            loop = loop + 1
251            time.sleep(0.05)
252
253            progress = self._migrate_progress(src)
254            if (loop % 20) == 0:
255                src_qemu_time.append(self._cpu_timing(src_pid))
256                src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
257
258            if (len(progress_history) == 0 or
259                (progress_history[-1]._ram._iterations <
260                 progress._ram._iterations)):
261                progress_history.append(progress)
262
263            if progress._status in ("completed", "failed", "cancelled"):
264                if progress._status == "completed" and paused:
265                    dst.cmd("cont")
266                if progress_history[-1] != progress:
267                    progress_history.append(progress)
268
269                if progress._status == "completed":
270                    if self._verbose:
271                        print("Sleeping %d seconds for final guest workload run" % self._sleep)
272                    sleep_secs = self._sleep
273                    while sleep_secs > 1:
274                        time.sleep(1)
275                        src_qemu_time.append(self._cpu_timing(src_pid))
276                        src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
277                        sleep_secs -= 1
278
279                result = ReportResult()
280                if progress._status == "completed" and not paused:
281                    result = ReportResult(True)
282
283                return [progress_history, src_qemu_time, src_vcpu_time, result]
284
285            if self._verbose and (loop % 20) == 0:
286                print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % (
287                    progress._ram._iterations,
288                    progress._ram._remaining_bytes / (1024 * 1024),
289                    progress._ram._total_bytes / (1024 * 1024),
290                    progress._ram._transferred_bytes / (1024 * 1024),
291                    progress._ram._transfer_rate_mbs,
292                ))
293
294            if progress._ram._iterations > scenario._max_iters:
295                if self._verbose:
296                    print("No completion after %d iterations over RAM" % scenario._max_iters)
297                src.cmd("migrate_cancel")
298                continue
299
300            if time.time() > (start + scenario._max_time):
301                if self._verbose:
302                    print("No completion after %d seconds" % scenario._max_time)
303                src.cmd("migrate_cancel")
304                continue
305
306            if (scenario._post_copy and
307                progress._ram._iterations >= scenario._post_copy_iters and
308                not post_copy):
309                if self._verbose:
310                    print("Switching to post-copy after %d iterations" % scenario._post_copy_iters)
311                resp = src.cmd("migrate-start-postcopy")
312                post_copy = True
313
314            if (scenario._pause and
315                progress._ram._iterations >= scenario._pause_iters and
316                not paused):
317                if self._verbose:
318                    print("Pausing VM after %d iterations" % scenario._pause_iters)
319                resp = src.cmd("stop")
320                paused = True
321
322    def _is_ppc64le(self):
323        _, _, _, _, machine = os.uname()
324        if machine == "ppc64le":
325            return True
326        return False
327
328    def _get_guest_console_args(self):
329        if self._is_ppc64le():
330            return "console=hvc0"
331        else:
332            return "console=ttyS0"
333
334    def _get_qemu_serial_args(self):
335        if self._is_ppc64le():
336            return ["-chardev", "stdio,id=cdev0",
337                    "-device", "spapr-vty,chardev=cdev0"]
338        else:
339            return ["-chardev", "stdio,id=cdev0",
340                    "-device", "isa-serial,chardev=cdev0"]
341
342    def _get_common_args(self, hardware, tunnelled=False):
343        args = [
344            "noapic",
345            "edd=off",
346            "printk.time=1",
347            "noreplace-smp",
348            "cgroup_disable=memory",
349            "pci=noearly",
350        ]
351
352        args.append(self._get_guest_console_args())
353
354        if self._debug:
355            args.append("debug")
356        else:
357            args.append("quiet")
358
359        args.append("ramsize=%s" % hardware._mem)
360
361        cmdline = " ".join(args)
362        if tunnelled:
363            cmdline = "'" + cmdline + "'"
364
365        argv = [
366            "-cpu", "host",
367            "-kernel", self._kernel,
368            "-initrd", self._initrd,
369            "-append", cmdline,
370            "-m", str((hardware._mem * 1024) + 512),
371            "-smp", str(hardware._cpus),
372        ]
373        if hardware._dirty_ring_size:
374            argv.extend(["-accel", "kvm,dirty-ring-size=%s" %
375                         hardware._dirty_ring_size])
376        else:
377            argv.extend(["-accel", "kvm"])
378
379        argv.extend(self._get_qemu_serial_args())
380
381        if self._debug:
382            argv.extend(["-machine", "graphics=off"])
383
384        if hardware._prealloc_pages:
385            argv_source += ["-mem-path", "/dev/shm",
386                            "-mem-prealloc"]
387        if hardware._locked_pages:
388            argv_source += ["-overcommit", "mem-lock=on"]
389        if hardware._huge_pages:
390            pass
391
392        return argv
393
394    def _get_src_args(self, hardware):
395        return self._get_common_args(hardware)
396
397    def _get_dst_args(self, hardware, uri, defer_migrate):
398        tunnelled = False
399        if self._dst_host != "localhost":
400            tunnelled = True
401        argv = self._get_common_args(hardware, tunnelled)
402
403        if defer_migrate:
404            return argv + ["-incoming", "defer"]
405        return argv + ["-incoming", uri]
406
407    @staticmethod
408    def _get_common_wrapper(cpu_bind, mem_bind):
409        wrapper = []
410        if len(cpu_bind) > 0 or len(mem_bind) > 0:
411            wrapper.append("numactl")
412            if cpu_bind:
413                wrapper.append("--physcpubind=%s" % ",".join(cpu_bind))
414            if mem_bind:
415                wrapper.append("--membind=%s" % ",".join(mem_bind))
416
417        return wrapper
418
419    def _get_src_wrapper(self, hardware):
420        return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind)
421
422    def _get_dst_wrapper(self, hardware):
423        wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind)
424        if self._dst_host != "localhost":
425            return ["ssh",
426                    "-R", "9001:localhost:9001",
427                    self._dst_host] + wrapper
428        else:
429            return wrapper
430
431    def _get_timings(self, vm):
432        log = vm.get_log()
433        if not log:
434            return []
435        if self._debug:
436            print(log)
437
438        regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms"
439        matcher = re.compile(regex)
440        records = []
441        for line in log.split("\n"):
442            match = matcher.match(line)
443            if match:
444                records.append(TimingRecord(int(match.group(1)),
445                                            int(match.group(2)) / 1000.0,
446                                            int(match.group(3))))
447        return records
448
449    def run(self, hardware, scenario, result_dir=os.getcwd()):
450        abs_result_dir = os.path.join(result_dir, scenario._name)
451        defer_migrate = False
452
453        if self._transport == "tcp":
454            uri = "tcp:%s:9000" % self._dst_host
455        elif self._transport == "rdma":
456            uri = "rdma:%s:9000" % self._dst_host
457        elif self._transport == "unix":
458            if self._dst_host != "localhost":
459                raise Exception("Running use unix migration transport for non-local host")
460            uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid()
461            try:
462                os.remove(uri[5:])
463                os.remove(monaddr)
464            except:
465                pass
466
467        if scenario._multifd:
468            defer_migrate = True
469
470        if self._dst_host != "localhost":
471            dstmonaddr = ("localhost", 9001)
472        else:
473            dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid()
474        srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid()
475
476        src = QEMUMachine(self._binary,
477                          args=self._get_src_args(hardware),
478                          wrapper=self._get_src_wrapper(hardware),
479                          name="qemu-src-%d" % os.getpid(),
480                          monitor_address=srcmonaddr)
481
482        dst = QEMUMachine(self._binary,
483                          args=self._get_dst_args(hardware, uri, defer_migrate),
484                          wrapper=self._get_dst_wrapper(hardware),
485                          name="qemu-dst-%d" % os.getpid(),
486                          monitor_address=dstmonaddr)
487
488        try:
489            src.launch()
490            dst.launch()
491
492            ret = self._migrate(hardware, scenario, src,
493                                dst, uri, defer_migrate)
494            progress_history = ret[0]
495            qemu_timings = ret[1]
496            vcpu_timings = ret[2]
497            result = ret[3]
498            if uri[0:5] == "unix:" and os.path.exists(uri[5:]):
499                os.remove(uri[5:])
500
501            if os.path.exists(srcmonaddr):
502                os.remove(srcmonaddr)
503
504            if self._dst_host == "localhost" and os.path.exists(dstmonaddr):
505                os.remove(dstmonaddr)
506
507            if self._verbose:
508                print("Finished migration")
509
510            src.shutdown()
511            dst.shutdown()
512
513            return Report(hardware, scenario, progress_history,
514                          Timings(self._get_timings(src) + self._get_timings(dst)),
515                          Timings(qemu_timings),
516                          Timings(vcpu_timings),
517                          result,
518                          self._binary, self._dst_host, self._kernel,
519                          self._initrd, self._transport, self._sleep)
520        except Exception as e:
521            if self._debug:
522                print("Failed: %s" % str(e))
523            try:
524                src.shutdown()
525            except:
526                pass
527            try:
528                dst.shutdown()
529            except:
530                pass
531
532            if self._debug:
533                print(src.get_log())
534                print(dst.get_log())
535            raise
536
537