xref: /qemu/tests/migration-stress/guestperf/engine.py (revision 32a1bb21c6f4d569427099e4e495f1d07d017fdb)
1#
2# Migration test main engine
3#
4# Copyright (c) 2016 Red Hat, Inc.
5#
6# This library is free software; you can redistribute it and/or
7# modify it under the terms of the GNU Lesser General Public
8# License as published by the Free Software Foundation; either
9# version 2.1 of the License, or (at your option) any later version.
10#
11# This library is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14# Lesser General Public License for more details.
15#
16# You should have received a copy of the GNU Lesser General Public
17# License along with this library; if not, see <http://www.gnu.org/licenses/>.
18#
19
20
21import os
22import re
23import sys
24import time
25
26from guestperf.progress import Progress, ProgressStats
27from guestperf.report import Report
28from guestperf.timings import TimingRecord, Timings
29
30sys.path.append(os.path.join(os.path.dirname(__file__),
31                             '..', '..', '..', 'python'))
32from qemu.machine import QEMUMachine
33
34
35class Engine(object):
36
37    def __init__(self, binary, dst_host, kernel, initrd, transport="tcp",
38                 sleep=15, verbose=False, debug=False):
39
40        self._binary = binary # Path to QEMU binary
41        self._dst_host = dst_host # Hostname of target host
42        self._kernel = kernel # Path to kernel image
43        self._initrd = initrd # Path to stress initrd
44        self._transport = transport # 'unix' or 'tcp' or 'rdma'
45        self._sleep = sleep
46        self._verbose = verbose
47        self._debug = debug
48
49        if debug:
50            self._verbose = debug
51
52    def _vcpu_timing(self, pid, tid_list):
53        records = []
54        now = time.time()
55
56        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
57        for tid in tid_list:
58            statfile = "/proc/%d/task/%d/stat" % (pid, tid)
59            with open(statfile, "r") as fh:
60                stat = fh.readline()
61                fields = stat.split(" ")
62                stime = int(fields[13])
63                utime = int(fields[14])
64                records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec))
65        return records
66
67    def _cpu_timing(self, pid):
68        now = time.time()
69
70        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
71        statfile = "/proc/%d/stat" % pid
72        with open(statfile, "r") as fh:
73            stat = fh.readline()
74            fields = stat.split(" ")
75            stime = int(fields[13])
76            utime = int(fields[14])
77            return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec)
78
79    def _migrate_progress(self, vm):
80        info = vm.cmd("query-migrate")
81
82        if "ram" not in info:
83            info["ram"] = {}
84
85        return Progress(
86            info.get("status", "active"),
87            ProgressStats(
88                info["ram"].get("transferred", 0),
89                info["ram"].get("remaining", 0),
90                info["ram"].get("total", 0),
91                info["ram"].get("duplicate", 0),
92                info["ram"].get("skipped", 0),
93                info["ram"].get("normal", 0),
94                info["ram"].get("normal-bytes", 0),
95                info["ram"].get("dirty-pages-rate", 0),
96                info["ram"].get("mbps", 0),
97                info["ram"].get("dirty-sync-count", 0)
98            ),
99            time.time(),
100            info.get("total-time", 0),
101            info.get("downtime", 0),
102            info.get("expected-downtime", 0),
103            info.get("setup-time", 0),
104            info.get("cpu-throttle-percentage", 0),
105            info.get("dirty-limit-throttle-time-per-round", 0),
106            info.get("dirty-limit-ring-full-time", 0),
107        )
108
109    def _migrate(self, hardware, scenario, src,
110                 dst, connect_uri, defer_migrate):
111        src_qemu_time = []
112        src_vcpu_time = []
113        src_pid = src.get_pid()
114
115        vcpus = src.cmd("query-cpus-fast")
116        src_threads = []
117        for vcpu in vcpus:
118            src_threads.append(vcpu["thread-id"])
119
120        # XXX how to get dst timings on remote host ?
121
122        if self._verbose:
123            print("Sleeping %d seconds for initial guest workload run" % self._sleep)
124        sleep_secs = self._sleep
125        while sleep_secs > 1:
126            src_qemu_time.append(self._cpu_timing(src_pid))
127            src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
128            time.sleep(1)
129            sleep_secs -= 1
130
131        if self._verbose:
132            print("Starting migration")
133        if scenario._auto_converge:
134            resp = src.cmd("migrate-set-capabilities",
135                           capabilities = [
136                               { "capability": "auto-converge",
137                                 "state": True }
138                           ])
139            resp = src.cmd("migrate-set-parameters",
140                           cpu_throttle_increment=scenario._auto_converge_step)
141
142        if scenario._post_copy:
143            resp = src.cmd("migrate-set-capabilities",
144                           capabilities = [
145                               { "capability": "postcopy-ram",
146                                 "state": True }
147                           ])
148            resp = dst.cmd("migrate-set-capabilities",
149                           capabilities = [
150                               { "capability": "postcopy-ram",
151                                 "state": True }
152                           ])
153
154        resp = src.cmd("migrate-set-parameters",
155                       max_bandwidth=scenario._bandwidth * 1024 * 1024)
156
157        resp = src.cmd("migrate-set-parameters",
158                       downtime_limit=scenario._downtime)
159
160        if scenario._compression_mt:
161            resp = src.cmd("migrate-set-capabilities",
162                           capabilities = [
163                               { "capability": "compress",
164                                 "state": True }
165                           ])
166            resp = src.cmd("migrate-set-parameters",
167                           compress_threads=scenario._compression_mt_threads)
168            resp = dst.cmd("migrate-set-capabilities",
169                           capabilities = [
170                               { "capability": "compress",
171                                 "state": True }
172                           ])
173            resp = dst.cmd("migrate-set-parameters",
174                           decompress_threads=scenario._compression_mt_threads)
175
176        if scenario._compression_xbzrle:
177            resp = src.cmd("migrate-set-capabilities",
178                           capabilities = [
179                               { "capability": "xbzrle",
180                                 "state": True }
181                           ])
182            resp = dst.cmd("migrate-set-capabilities",
183                           capabilities = [
184                               { "capability": "xbzrle",
185                                 "state": True }
186                           ])
187            resp = src.cmd("migrate-set-parameters",
188                           xbzrle_cache_size=(
189                               hardware._mem *
190                               1024 * 1024 * 1024 / 100 *
191                               scenario._compression_xbzrle_cache))
192
193        if scenario._multifd:
194            resp = src.cmd("migrate-set-capabilities",
195                           capabilities = [
196                               { "capability": "multifd",
197                                 "state": True }
198                           ])
199            resp = src.cmd("migrate-set-parameters",
200                           multifd_channels=scenario._multifd_channels)
201            resp = dst.cmd("migrate-set-capabilities",
202                           capabilities = [
203                               { "capability": "multifd",
204                                 "state": True }
205                           ])
206            resp = dst.cmd("migrate-set-parameters",
207                           multifd_channels=scenario._multifd_channels)
208
209        if scenario._dirty_limit:
210            if not hardware._dirty_ring_size:
211                raise Exception("dirty ring size must be configured when "
212                                "testing dirty limit migration")
213
214            resp = src.cmd("migrate-set-capabilities",
215                           capabilities = [
216                               { "capability": "dirty-limit",
217                                 "state": True }
218                           ])
219            resp = src.cmd("migrate-set-parameters",
220                x_vcpu_dirty_limit_period=scenario._x_vcpu_dirty_limit_period)
221            resp = src.cmd("migrate-set-parameters",
222                           vcpu_dirty_limit=scenario._vcpu_dirty_limit)
223
224        if defer_migrate:
225            resp = dst.cmd("migrate-incoming", uri=connect_uri)
226        resp = src.cmd("migrate", uri=connect_uri)
227
228        post_copy = False
229        paused = False
230
231        progress_history = []
232
233        start = time.time()
234        loop = 0
235        while True:
236            loop = loop + 1
237            time.sleep(0.05)
238
239            progress = self._migrate_progress(src)
240            if (loop % 20) == 0:
241                src_qemu_time.append(self._cpu_timing(src_pid))
242                src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
243
244            if (len(progress_history) == 0 or
245                (progress_history[-1]._ram._iterations <
246                 progress._ram._iterations)):
247                progress_history.append(progress)
248
249            if progress._status in ("completed", "failed", "cancelled"):
250                if progress._status == "completed" and paused:
251                    dst.cmd("cont")
252                if progress_history[-1] != progress:
253                    progress_history.append(progress)
254
255                if progress._status == "completed":
256                    if self._verbose:
257                        print("Sleeping %d seconds for final guest workload run" % self._sleep)
258                    sleep_secs = self._sleep
259                    while sleep_secs > 1:
260                        time.sleep(1)
261                        src_qemu_time.append(self._cpu_timing(src_pid))
262                        src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
263                        sleep_secs -= 1
264
265                return [progress_history, src_qemu_time, src_vcpu_time]
266
267            if self._verbose and (loop % 20) == 0:
268                print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % (
269                    progress._ram._iterations,
270                    progress._ram._remaining_bytes / (1024 * 1024),
271                    progress._ram._total_bytes / (1024 * 1024),
272                    progress._ram._transferred_bytes / (1024 * 1024),
273                    progress._ram._transfer_rate_mbs,
274                ))
275
276            if progress._ram._iterations > scenario._max_iters:
277                if self._verbose:
278                    print("No completion after %d iterations over RAM" % scenario._max_iters)
279                src.cmd("migrate_cancel")
280                continue
281
282            if time.time() > (start + scenario._max_time):
283                if self._verbose:
284                    print("No completion after %d seconds" % scenario._max_time)
285                src.cmd("migrate_cancel")
286                continue
287
288            if (scenario._post_copy and
289                progress._ram._iterations >= scenario._post_copy_iters and
290                not post_copy):
291                if self._verbose:
292                    print("Switching to post-copy after %d iterations" % scenario._post_copy_iters)
293                resp = src.cmd("migrate-start-postcopy")
294                post_copy = True
295
296            if (scenario._pause and
297                progress._ram._iterations >= scenario._pause_iters and
298                not paused):
299                if self._verbose:
300                    print("Pausing VM after %d iterations" % scenario._pause_iters)
301                resp = src.cmd("stop")
302                paused = True
303
304    def _is_ppc64le(self):
305        _, _, _, _, machine = os.uname()
306        if machine == "ppc64le":
307            return True
308        return False
309
310    def _get_guest_console_args(self):
311        if self._is_ppc64le():
312            return "console=hvc0"
313        else:
314            return "console=ttyS0"
315
316    def _get_qemu_serial_args(self):
317        if self._is_ppc64le():
318            return ["-chardev", "stdio,id=cdev0",
319                    "-device", "spapr-vty,chardev=cdev0"]
320        else:
321            return ["-chardev", "stdio,id=cdev0",
322                    "-device", "isa-serial,chardev=cdev0"]
323
324    def _get_common_args(self, hardware, tunnelled=False):
325        args = [
326            "noapic",
327            "edd=off",
328            "printk.time=1",
329            "noreplace-smp",
330            "cgroup_disable=memory",
331            "pci=noearly",
332        ]
333
334        args.append(self._get_guest_console_args())
335
336        if self._debug:
337            args.append("debug")
338        else:
339            args.append("quiet")
340
341        args.append("ramsize=%s" % hardware._mem)
342
343        cmdline = " ".join(args)
344        if tunnelled:
345            cmdline = "'" + cmdline + "'"
346
347        argv = [
348            "-cpu", "host",
349            "-kernel", self._kernel,
350            "-initrd", self._initrd,
351            "-append", cmdline,
352            "-m", str((hardware._mem * 1024) + 512),
353            "-smp", str(hardware._cpus),
354        ]
355        if hardware._dirty_ring_size:
356            argv.extend(["-accel", "kvm,dirty-ring-size=%s" %
357                         hardware._dirty_ring_size])
358        else:
359            argv.extend(["-accel", "kvm"])
360
361        argv.extend(self._get_qemu_serial_args())
362
363        if self._debug:
364            argv.extend(["-machine", "graphics=off"])
365
366        if hardware._prealloc_pages:
367            argv_source += ["-mem-path", "/dev/shm",
368                            "-mem-prealloc"]
369        if hardware._locked_pages:
370            argv_source += ["-overcommit", "mem-lock=on"]
371        if hardware._huge_pages:
372            pass
373
374        return argv
375
376    def _get_src_args(self, hardware):
377        return self._get_common_args(hardware)
378
379    def _get_dst_args(self, hardware, uri, defer_migrate):
380        tunnelled = False
381        if self._dst_host != "localhost":
382            tunnelled = True
383        argv = self._get_common_args(hardware, tunnelled)
384
385        if defer_migrate:
386            return argv + ["-incoming", "defer"]
387        return argv + ["-incoming", uri]
388
389    @staticmethod
390    def _get_common_wrapper(cpu_bind, mem_bind):
391        wrapper = []
392        if len(cpu_bind) > 0 or len(mem_bind) > 0:
393            wrapper.append("numactl")
394            if cpu_bind:
395                wrapper.append("--physcpubind=%s" % ",".join(cpu_bind))
396            if mem_bind:
397                wrapper.append("--membind=%s" % ",".join(mem_bind))
398
399        return wrapper
400
401    def _get_src_wrapper(self, hardware):
402        return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind)
403
404    def _get_dst_wrapper(self, hardware):
405        wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind)
406        if self._dst_host != "localhost":
407            return ["ssh",
408                    "-R", "9001:localhost:9001",
409                    self._dst_host] + wrapper
410        else:
411            return wrapper
412
413    def _get_timings(self, vm):
414        log = vm.get_log()
415        if not log:
416            return []
417        if self._debug:
418            print(log)
419
420        regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms"
421        matcher = re.compile(regex)
422        records = []
423        for line in log.split("\n"):
424            match = matcher.match(line)
425            if match:
426                records.append(TimingRecord(int(match.group(1)),
427                                            int(match.group(2)) / 1000.0,
428                                            int(match.group(3))))
429        return records
430
431    def run(self, hardware, scenario, result_dir=os.getcwd()):
432        abs_result_dir = os.path.join(result_dir, scenario._name)
433        defer_migrate = False
434
435        if self._transport == "tcp":
436            uri = "tcp:%s:9000" % self._dst_host
437        elif self._transport == "rdma":
438            uri = "rdma:%s:9000" % self._dst_host
439        elif self._transport == "unix":
440            if self._dst_host != "localhost":
441                raise Exception("Running use unix migration transport for non-local host")
442            uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid()
443            try:
444                os.remove(uri[5:])
445                os.remove(monaddr)
446            except:
447                pass
448
449        if scenario._multifd:
450            defer_migrate = True
451
452        if self._dst_host != "localhost":
453            dstmonaddr = ("localhost", 9001)
454        else:
455            dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid()
456        srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid()
457
458        src = QEMUMachine(self._binary,
459                          args=self._get_src_args(hardware),
460                          wrapper=self._get_src_wrapper(hardware),
461                          name="qemu-src-%d" % os.getpid(),
462                          monitor_address=srcmonaddr)
463
464        dst = QEMUMachine(self._binary,
465                          args=self._get_dst_args(hardware, uri, defer_migrate),
466                          wrapper=self._get_dst_wrapper(hardware),
467                          name="qemu-dst-%d" % os.getpid(),
468                          monitor_address=dstmonaddr)
469
470        try:
471            src.launch()
472            dst.launch()
473
474            ret = self._migrate(hardware, scenario, src,
475                                dst, uri, defer_migrate)
476            progress_history = ret[0]
477            qemu_timings = ret[1]
478            vcpu_timings = ret[2]
479            if uri[0:5] == "unix:" and os.path.exists(uri[5:]):
480                os.remove(uri[5:])
481
482            if os.path.exists(srcmonaddr):
483                os.remove(srcmonaddr)
484
485            if self._dst_host == "localhost" and os.path.exists(dstmonaddr):
486                os.remove(dstmonaddr)
487
488            if self._verbose:
489                print("Finished migration")
490
491            src.shutdown()
492            dst.shutdown()
493
494            return Report(hardware, scenario, progress_history,
495                          Timings(self._get_timings(src) + self._get_timings(dst)),
496                          Timings(qemu_timings),
497                          Timings(vcpu_timings),
498                          self._binary, self._dst_host, self._kernel,
499                          self._initrd, self._transport, self._sleep)
500        except Exception as e:
501            if self._debug:
502                print("Failed: %s" % str(e))
503            try:
504                src.shutdown()
505            except:
506                pass
507            try:
508                dst.shutdown()
509            except:
510                pass
511
512            if self._debug:
513                print(src.get_log())
514                print(dst.get_log())
515            raise
516
517