1# 2# Migration test main engine 3# 4# Copyright (c) 2016 Red Hat, Inc. 5# 6# This library is free software; you can redistribute it and/or 7# modify it under the terms of the GNU Lesser General Public 8# License as published by the Free Software Foundation; either 9# version 2.1 of the License, or (at your option) any later version. 10# 11# This library is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14# Lesser General Public License for more details. 15# 16# You should have received a copy of the GNU Lesser General Public 17# License along with this library; if not, see <http://www.gnu.org/licenses/>. 18# 19 20 21import os 22import re 23import sys 24import time 25 26from guestperf.progress import Progress, ProgressStats 27from guestperf.report import Report, ReportResult 28from guestperf.timings import TimingRecord, Timings 29 30sys.path.append(os.path.join(os.path.dirname(__file__), 31 '..', '..', '..', 'python')) 32from qemu.machine import QEMUMachine 33 34# multifd supported compression algorithms 35MULTIFD_CMP_ALGS = ("zlib", "zstd", "qpl", "uadk") 36 37class Engine(object): 38 39 def __init__(self, binary, dst_host, kernel, initrd, transport="tcp", 40 sleep=15, verbose=False, debug=False): 41 42 self._binary = binary # Path to QEMU binary 43 self._dst_host = dst_host # Hostname of target host 44 self._kernel = kernel # Path to kernel image 45 self._initrd = initrd # Path to stress initrd 46 self._transport = transport # 'unix' or 'tcp' or 'rdma' 47 self._sleep = sleep 48 self._verbose = verbose 49 self._debug = debug 50 51 if debug: 52 self._verbose = debug 53 54 def _vcpu_timing(self, pid, tid_list): 55 records = [] 56 now = time.time() 57 58 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK']) 59 for tid in tid_list: 60 statfile = "/proc/%d/task/%d/stat" % (pid, tid) 61 with open(statfile, "r") as fh: 62 stat = fh.readline() 63 fields = stat.split(" ") 64 stime = int(fields[13]) 65 utime = int(fields[14]) 66 records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec)) 67 return records 68 69 def _cpu_timing(self, pid): 70 now = time.time() 71 72 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK']) 73 statfile = "/proc/%d/stat" % pid 74 with open(statfile, "r") as fh: 75 stat = fh.readline() 76 fields = stat.split(" ") 77 stime = int(fields[13]) 78 utime = int(fields[14]) 79 return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec) 80 81 def _migrate_progress(self, vm): 82 info = vm.cmd("query-migrate") 83 84 if "ram" not in info: 85 info["ram"] = {} 86 87 return Progress( 88 info.get("status", "active"), 89 ProgressStats( 90 info["ram"].get("transferred", 0), 91 info["ram"].get("remaining", 0), 92 info["ram"].get("total", 0), 93 info["ram"].get("duplicate", 0), 94 info["ram"].get("skipped", 0), 95 info["ram"].get("normal", 0), 96 info["ram"].get("normal-bytes", 0), 97 info["ram"].get("dirty-pages-rate", 0), 98 info["ram"].get("mbps", 0), 99 info["ram"].get("dirty-sync-count", 0) 100 ), 101 time.time(), 102 info.get("total-time", 0), 103 info.get("downtime", 0), 104 info.get("expected-downtime", 0), 105 info.get("setup-time", 0), 106 info.get("cpu-throttle-percentage", 0), 107 info.get("dirty-limit-throttle-time-per-round", 0), 108 info.get("dirty-limit-ring-full-time", 0), 109 ) 110 111 def _migrate(self, hardware, scenario, src, 112 dst, connect_uri, defer_migrate): 113 src_qemu_time = [] 114 src_vcpu_time = [] 115 src_pid = src.get_pid() 116 117 vcpus = src.cmd("query-cpus-fast") 118 src_threads = [] 119 for vcpu in vcpus: 120 src_threads.append(vcpu["thread-id"]) 121 122 # XXX how to get dst timings on remote host ? 123 124 if self._verbose: 125 print("Sleeping %d seconds for initial guest workload run" % self._sleep) 126 sleep_secs = self._sleep 127 while sleep_secs > 1: 128 src_qemu_time.append(self._cpu_timing(src_pid)) 129 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 130 time.sleep(1) 131 sleep_secs -= 1 132 133 if self._verbose: 134 print("Starting migration") 135 if scenario._auto_converge: 136 resp = src.cmd("migrate-set-capabilities", 137 capabilities = [ 138 { "capability": "auto-converge", 139 "state": True } 140 ]) 141 resp = src.cmd("migrate-set-parameters", 142 cpu_throttle_increment=scenario._auto_converge_step) 143 144 if scenario._post_copy: 145 resp = src.cmd("migrate-set-capabilities", 146 capabilities = [ 147 { "capability": "postcopy-ram", 148 "state": True } 149 ]) 150 resp = dst.cmd("migrate-set-capabilities", 151 capabilities = [ 152 { "capability": "postcopy-ram", 153 "state": True } 154 ]) 155 156 resp = src.cmd("migrate-set-parameters", 157 max_bandwidth=scenario._bandwidth * 1024 * 1024) 158 159 resp = src.cmd("migrate-set-parameters", 160 downtime_limit=scenario._downtime) 161 162 if scenario._compression_mt: 163 resp = src.cmd("migrate-set-capabilities", 164 capabilities = [ 165 { "capability": "compress", 166 "state": True } 167 ]) 168 resp = src.cmd("migrate-set-parameters", 169 compress_threads=scenario._compression_mt_threads) 170 resp = dst.cmd("migrate-set-capabilities", 171 capabilities = [ 172 { "capability": "compress", 173 "state": True } 174 ]) 175 resp = dst.cmd("migrate-set-parameters", 176 decompress_threads=scenario._compression_mt_threads) 177 178 if scenario._compression_xbzrle: 179 resp = src.cmd("migrate-set-capabilities", 180 capabilities = [ 181 { "capability": "xbzrle", 182 "state": True } 183 ]) 184 resp = dst.cmd("migrate-set-capabilities", 185 capabilities = [ 186 { "capability": "xbzrle", 187 "state": True } 188 ]) 189 resp = src.cmd("migrate-set-parameters", 190 xbzrle_cache_size=( 191 hardware._mem * 192 1024 * 1024 * 1024 / 100 * 193 scenario._compression_xbzrle_cache)) 194 195 if scenario._multifd: 196 if (scenario._multifd_compression and 197 (scenario._multifd_compression not in MULTIFD_CMP_ALGS)): 198 raise Exception("unsupported multifd compression " 199 "algorithm: %s" % 200 scenario._multifd_compression) 201 202 resp = src.cmd("migrate-set-capabilities", 203 capabilities = [ 204 { "capability": "multifd", 205 "state": True } 206 ]) 207 resp = src.cmd("migrate-set-parameters", 208 multifd_channels=scenario._multifd_channels) 209 resp = dst.cmd("migrate-set-capabilities", 210 capabilities = [ 211 { "capability": "multifd", 212 "state": True } 213 ]) 214 resp = dst.cmd("migrate-set-parameters", 215 multifd_channels=scenario._multifd_channels) 216 217 if scenario._multifd_compression: 218 resp = src.cmd("migrate-set-parameters", 219 multifd_compression=scenario._multifd_compression) 220 resp = dst.cmd("migrate-set-parameters", 221 multifd_compression=scenario._multifd_compression) 222 223 if scenario._dirty_limit: 224 if not hardware._dirty_ring_size: 225 raise Exception("dirty ring size must be configured when " 226 "testing dirty limit migration") 227 228 resp = src.cmd("migrate-set-capabilities", 229 capabilities = [ 230 { "capability": "dirty-limit", 231 "state": True } 232 ]) 233 resp = src.cmd("migrate-set-parameters", 234 x_vcpu_dirty_limit_period=scenario._x_vcpu_dirty_limit_period) 235 resp = src.cmd("migrate-set-parameters", 236 vcpu_dirty_limit=scenario._vcpu_dirty_limit) 237 238 if defer_migrate: 239 resp = dst.cmd("migrate-incoming", uri=connect_uri) 240 resp = src.cmd("migrate", uri=connect_uri) 241 242 post_copy = False 243 paused = False 244 245 progress_history = [] 246 247 start = time.time() 248 loop = 0 249 while True: 250 loop = loop + 1 251 time.sleep(0.05) 252 253 progress = self._migrate_progress(src) 254 if (loop % 20) == 0: 255 src_qemu_time.append(self._cpu_timing(src_pid)) 256 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 257 258 if (len(progress_history) == 0 or 259 (progress_history[-1]._ram._iterations < 260 progress._ram._iterations)): 261 progress_history.append(progress) 262 263 if progress._status in ("completed", "failed", "cancelled"): 264 if progress._status == "completed" and paused: 265 dst.cmd("cont") 266 if progress_history[-1] != progress: 267 progress_history.append(progress) 268 269 if progress._status == "completed": 270 if self._verbose: 271 print("Sleeping %d seconds for final guest workload run" % self._sleep) 272 sleep_secs = self._sleep 273 while sleep_secs > 1: 274 time.sleep(1) 275 src_qemu_time.append(self._cpu_timing(src_pid)) 276 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 277 sleep_secs -= 1 278 279 result = ReportResult() 280 if progress._status == "completed" and not paused: 281 result = ReportResult(True) 282 283 return [progress_history, src_qemu_time, src_vcpu_time, result] 284 285 if self._verbose and (loop % 20) == 0: 286 print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % ( 287 progress._ram._iterations, 288 progress._ram._remaining_bytes / (1024 * 1024), 289 progress._ram._total_bytes / (1024 * 1024), 290 progress._ram._transferred_bytes / (1024 * 1024), 291 progress._ram._transfer_rate_mbs, 292 )) 293 294 if progress._ram._iterations > scenario._max_iters: 295 if self._verbose: 296 print("No completion after %d iterations over RAM" % scenario._max_iters) 297 src.cmd("migrate_cancel") 298 continue 299 300 if time.time() > (start + scenario._max_time): 301 if self._verbose: 302 print("No completion after %d seconds" % scenario._max_time) 303 src.cmd("migrate_cancel") 304 continue 305 306 if (scenario._post_copy and 307 progress._ram._iterations >= scenario._post_copy_iters and 308 not post_copy): 309 if self._verbose: 310 print("Switching to post-copy after %d iterations" % scenario._post_copy_iters) 311 resp = src.cmd("migrate-start-postcopy") 312 post_copy = True 313 314 if (scenario._pause and 315 progress._ram._iterations >= scenario._pause_iters and 316 not paused): 317 if self._verbose: 318 print("Pausing VM after %d iterations" % scenario._pause_iters) 319 resp = src.cmd("stop") 320 paused = True 321 322 def _is_ppc64le(self): 323 _, _, _, _, machine = os.uname() 324 if machine == "ppc64le": 325 return True 326 return False 327 328 def _get_guest_console_args(self): 329 if self._is_ppc64le(): 330 return "console=hvc0" 331 else: 332 return "console=ttyS0" 333 334 def _get_qemu_serial_args(self): 335 if self._is_ppc64le(): 336 return ["-chardev", "stdio,id=cdev0", 337 "-device", "spapr-vty,chardev=cdev0"] 338 else: 339 return ["-chardev", "stdio,id=cdev0", 340 "-device", "isa-serial,chardev=cdev0"] 341 342 def _get_common_args(self, hardware, tunnelled=False): 343 args = [ 344 "noapic", 345 "edd=off", 346 "printk.time=1", 347 "noreplace-smp", 348 "cgroup_disable=memory", 349 "pci=noearly", 350 ] 351 352 args.append(self._get_guest_console_args()) 353 354 if self._debug: 355 args.append("debug") 356 else: 357 args.append("quiet") 358 359 args.append("ramsize=%s" % hardware._mem) 360 361 cmdline = " ".join(args) 362 if tunnelled: 363 cmdline = "'" + cmdline + "'" 364 365 argv = [ 366 "-cpu", "host", 367 "-kernel", self._kernel, 368 "-initrd", self._initrd, 369 "-append", cmdline, 370 "-m", str((hardware._mem * 1024) + 512), 371 "-smp", str(hardware._cpus), 372 ] 373 if hardware._dirty_ring_size: 374 argv.extend(["-accel", "kvm,dirty-ring-size=%s" % 375 hardware._dirty_ring_size]) 376 else: 377 argv.extend(["-accel", "kvm"]) 378 379 argv.extend(self._get_qemu_serial_args()) 380 381 if self._debug: 382 argv.extend(["-machine", "graphics=off"]) 383 384 if hardware._prealloc_pages: 385 argv_source += ["-mem-path", "/dev/shm", 386 "-mem-prealloc"] 387 if hardware._locked_pages: 388 argv_source += ["-overcommit", "mem-lock=on"] 389 if hardware._huge_pages: 390 pass 391 392 return argv 393 394 def _get_src_args(self, hardware): 395 return self._get_common_args(hardware) 396 397 def _get_dst_args(self, hardware, uri, defer_migrate): 398 tunnelled = False 399 if self._dst_host != "localhost": 400 tunnelled = True 401 argv = self._get_common_args(hardware, tunnelled) 402 403 if defer_migrate: 404 return argv + ["-incoming", "defer"] 405 return argv + ["-incoming", uri] 406 407 @staticmethod 408 def _get_common_wrapper(cpu_bind, mem_bind): 409 wrapper = [] 410 if len(cpu_bind) > 0 or len(mem_bind) > 0: 411 wrapper.append("numactl") 412 if cpu_bind: 413 wrapper.append("--physcpubind=%s" % ",".join(cpu_bind)) 414 if mem_bind: 415 wrapper.append("--membind=%s" % ",".join(mem_bind)) 416 417 return wrapper 418 419 def _get_src_wrapper(self, hardware): 420 return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind) 421 422 def _get_dst_wrapper(self, hardware): 423 wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind) 424 if self._dst_host != "localhost": 425 return ["ssh", 426 "-R", "9001:localhost:9001", 427 self._dst_host] + wrapper 428 else: 429 return wrapper 430 431 def _get_timings(self, vm): 432 log = vm.get_log() 433 if not log: 434 return [] 435 if self._debug: 436 print(log) 437 438 regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms" 439 matcher = re.compile(regex) 440 records = [] 441 for line in log.split("\n"): 442 match = matcher.match(line) 443 if match: 444 records.append(TimingRecord(int(match.group(1)), 445 int(match.group(2)) / 1000.0, 446 int(match.group(3)))) 447 return records 448 449 def run(self, hardware, scenario, result_dir=os.getcwd()): 450 abs_result_dir = os.path.join(result_dir, scenario._name) 451 defer_migrate = False 452 453 if self._transport == "tcp": 454 uri = "tcp:%s:9000" % self._dst_host 455 elif self._transport == "rdma": 456 uri = "rdma:%s:9000" % self._dst_host 457 elif self._transport == "unix": 458 if self._dst_host != "localhost": 459 raise Exception("Running use unix migration transport for non-local host") 460 uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid() 461 try: 462 os.remove(uri[5:]) 463 os.remove(monaddr) 464 except: 465 pass 466 467 if scenario._multifd: 468 defer_migrate = True 469 470 if self._dst_host != "localhost": 471 dstmonaddr = ("localhost", 9001) 472 else: 473 dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid() 474 srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid() 475 476 src = QEMUMachine(self._binary, 477 args=self._get_src_args(hardware), 478 wrapper=self._get_src_wrapper(hardware), 479 name="qemu-src-%d" % os.getpid(), 480 monitor_address=srcmonaddr) 481 482 dst = QEMUMachine(self._binary, 483 args=self._get_dst_args(hardware, uri, defer_migrate), 484 wrapper=self._get_dst_wrapper(hardware), 485 name="qemu-dst-%d" % os.getpid(), 486 monitor_address=dstmonaddr) 487 488 try: 489 src.launch() 490 dst.launch() 491 492 ret = self._migrate(hardware, scenario, src, 493 dst, uri, defer_migrate) 494 progress_history = ret[0] 495 qemu_timings = ret[1] 496 vcpu_timings = ret[2] 497 result = ret[3] 498 if uri[0:5] == "unix:" and os.path.exists(uri[5:]): 499 os.remove(uri[5:]) 500 501 if os.path.exists(srcmonaddr): 502 os.remove(srcmonaddr) 503 504 if self._dst_host == "localhost" and os.path.exists(dstmonaddr): 505 os.remove(dstmonaddr) 506 507 if self._verbose: 508 print("Finished migration") 509 510 src.shutdown() 511 dst.shutdown() 512 513 return Report(hardware, scenario, progress_history, 514 Timings(self._get_timings(src) + self._get_timings(dst)), 515 Timings(qemu_timings), 516 Timings(vcpu_timings), 517 result, 518 self._binary, self._dst_host, self._kernel, 519 self._initrd, self._transport, self._sleep) 520 except Exception as e: 521 if self._debug: 522 print("Failed: %s" % str(e)) 523 try: 524 src.shutdown() 525 except: 526 pass 527 try: 528 dst.shutdown() 529 except: 530 pass 531 532 if self._debug: 533 print(src.get_log()) 534 print(dst.get_log()) 535 raise 536 537