1# 2# Migration test main engine 3# 4# Copyright (c) 2016 Red Hat, Inc. 5# 6# This library is free software; you can redistribute it and/or 7# modify it under the terms of the GNU Lesser General Public 8# License as published by the Free Software Foundation; either 9# version 2.1 of the License, or (at your option) any later version. 10# 11# This library is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14# Lesser General Public License for more details. 15# 16# You should have received a copy of the GNU Lesser General Public 17# License along with this library; if not, see <http://www.gnu.org/licenses/>. 18# 19 20 21import os 22import re 23import sys 24import time 25 26from guestperf.progress import Progress, ProgressStats 27from guestperf.report import Report 28from guestperf.timings import TimingRecord, Timings 29 30sys.path.append(os.path.join(os.path.dirname(__file__), 31 '..', '..', '..', 'python')) 32from qemu.machine import QEMUMachine 33 34 35class Engine(object): 36 37 def __init__(self, binary, dst_host, kernel, initrd, transport="tcp", 38 sleep=15, verbose=False, debug=False): 39 40 self._binary = binary # Path to QEMU binary 41 self._dst_host = dst_host # Hostname of target host 42 self._kernel = kernel # Path to kernel image 43 self._initrd = initrd # Path to stress initrd 44 self._transport = transport # 'unix' or 'tcp' or 'rdma' 45 self._sleep = sleep 46 self._verbose = verbose 47 self._debug = debug 48 49 if debug: 50 self._verbose = debug 51 52 def _vcpu_timing(self, pid, tid_list): 53 records = [] 54 now = time.time() 55 56 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK']) 57 for tid in tid_list: 58 statfile = "/proc/%d/task/%d/stat" % (pid, tid) 59 with open(statfile, "r") as fh: 60 stat = fh.readline() 61 fields = stat.split(" ") 62 stime = int(fields[13]) 63 utime = int(fields[14]) 64 records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec)) 65 return records 66 67 def _cpu_timing(self, pid): 68 now = time.time() 69 70 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK']) 71 statfile = "/proc/%d/stat" % pid 72 with open(statfile, "r") as fh: 73 stat = fh.readline() 74 fields = stat.split(" ") 75 stime = int(fields[13]) 76 utime = int(fields[14]) 77 return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec) 78 79 def _migrate_progress(self, vm): 80 info = vm.cmd("query-migrate") 81 82 if "ram" not in info: 83 info["ram"] = {} 84 85 return Progress( 86 info.get("status", "active"), 87 ProgressStats( 88 info["ram"].get("transferred", 0), 89 info["ram"].get("remaining", 0), 90 info["ram"].get("total", 0), 91 info["ram"].get("duplicate", 0), 92 info["ram"].get("skipped", 0), 93 info["ram"].get("normal", 0), 94 info["ram"].get("normal-bytes", 0), 95 info["ram"].get("dirty-pages-rate", 0), 96 info["ram"].get("mbps", 0), 97 info["ram"].get("dirty-sync-count", 0) 98 ), 99 time.time(), 100 info.get("total-time", 0), 101 info.get("downtime", 0), 102 info.get("expected-downtime", 0), 103 info.get("setup-time", 0), 104 info.get("cpu-throttle-percentage", 0), 105 info.get("dirty-limit-throttle-time-per-round", 0), 106 info.get("dirty-limit-ring-full-time", 0), 107 ) 108 109 def _migrate(self, hardware, scenario, src, 110 dst, connect_uri, defer_migrate): 111 src_qemu_time = [] 112 src_vcpu_time = [] 113 src_pid = src.get_pid() 114 115 vcpus = src.cmd("query-cpus-fast") 116 src_threads = [] 117 for vcpu in vcpus: 118 src_threads.append(vcpu["thread-id"]) 119 120 # XXX how to get dst timings on remote host ? 121 122 if self._verbose: 123 print("Sleeping %d seconds for initial guest workload run" % self._sleep) 124 sleep_secs = self._sleep 125 while sleep_secs > 1: 126 src_qemu_time.append(self._cpu_timing(src_pid)) 127 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 128 time.sleep(1) 129 sleep_secs -= 1 130 131 if self._verbose: 132 print("Starting migration") 133 if scenario._auto_converge: 134 resp = src.cmd("migrate-set-capabilities", 135 capabilities = [ 136 { "capability": "auto-converge", 137 "state": True } 138 ]) 139 resp = src.cmd("migrate-set-parameters", 140 cpu_throttle_increment=scenario._auto_converge_step) 141 142 if scenario._post_copy: 143 resp = src.cmd("migrate-set-capabilities", 144 capabilities = [ 145 { "capability": "postcopy-ram", 146 "state": True } 147 ]) 148 resp = dst.cmd("migrate-set-capabilities", 149 capabilities = [ 150 { "capability": "postcopy-ram", 151 "state": True } 152 ]) 153 154 resp = src.cmd("migrate-set-parameters", 155 max_bandwidth=scenario._bandwidth * 1024 * 1024) 156 157 resp = src.cmd("migrate-set-parameters", 158 downtime_limit=scenario._downtime) 159 160 if scenario._compression_mt: 161 resp = src.cmd("migrate-set-capabilities", 162 capabilities = [ 163 { "capability": "compress", 164 "state": True } 165 ]) 166 resp = src.cmd("migrate-set-parameters", 167 compress_threads=scenario._compression_mt_threads) 168 resp = dst.cmd("migrate-set-capabilities", 169 capabilities = [ 170 { "capability": "compress", 171 "state": True } 172 ]) 173 resp = dst.cmd("migrate-set-parameters", 174 decompress_threads=scenario._compression_mt_threads) 175 176 if scenario._compression_xbzrle: 177 resp = src.cmd("migrate-set-capabilities", 178 capabilities = [ 179 { "capability": "xbzrle", 180 "state": True } 181 ]) 182 resp = dst.cmd("migrate-set-capabilities", 183 capabilities = [ 184 { "capability": "xbzrle", 185 "state": True } 186 ]) 187 resp = src.cmd("migrate-set-parameters", 188 xbzrle_cache_size=( 189 hardware._mem * 190 1024 * 1024 * 1024 / 100 * 191 scenario._compression_xbzrle_cache)) 192 193 if scenario._multifd: 194 resp = src.cmd("migrate-set-capabilities", 195 capabilities = [ 196 { "capability": "multifd", 197 "state": True } 198 ]) 199 resp = src.cmd("migrate-set-parameters", 200 multifd_channels=scenario._multifd_channels) 201 resp = dst.cmd("migrate-set-capabilities", 202 capabilities = [ 203 { "capability": "multifd", 204 "state": True } 205 ]) 206 resp = dst.cmd("migrate-set-parameters", 207 multifd_channels=scenario._multifd_channels) 208 209 if scenario._dirty_limit: 210 if not hardware._dirty_ring_size: 211 raise Exception("dirty ring size must be configured when " 212 "testing dirty limit migration") 213 214 resp = src.cmd("migrate-set-capabilities", 215 capabilities = [ 216 { "capability": "dirty-limit", 217 "state": True } 218 ]) 219 resp = src.cmd("migrate-set-parameters", 220 x_vcpu_dirty_limit_period=scenario._x_vcpu_dirty_limit_period) 221 resp = src.cmd("migrate-set-parameters", 222 vcpu_dirty_limit=scenario._vcpu_dirty_limit) 223 224 if defer_migrate: 225 resp = dst.cmd("migrate-incoming", uri=connect_uri) 226 resp = src.cmd("migrate", uri=connect_uri) 227 228 post_copy = False 229 paused = False 230 231 progress_history = [] 232 233 start = time.time() 234 loop = 0 235 while True: 236 loop = loop + 1 237 time.sleep(0.05) 238 239 progress = self._migrate_progress(src) 240 if (loop % 20) == 0: 241 src_qemu_time.append(self._cpu_timing(src_pid)) 242 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 243 244 if (len(progress_history) == 0 or 245 (progress_history[-1]._ram._iterations < 246 progress._ram._iterations)): 247 progress_history.append(progress) 248 249 if progress._status in ("completed", "failed", "cancelled"): 250 if progress._status == "completed" and paused: 251 dst.cmd("cont") 252 if progress_history[-1] != progress: 253 progress_history.append(progress) 254 255 if progress._status == "completed": 256 if self._verbose: 257 print("Sleeping %d seconds for final guest workload run" % self._sleep) 258 sleep_secs = self._sleep 259 while sleep_secs > 1: 260 time.sleep(1) 261 src_qemu_time.append(self._cpu_timing(src_pid)) 262 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 263 sleep_secs -= 1 264 265 return [progress_history, src_qemu_time, src_vcpu_time] 266 267 if self._verbose and (loop % 20) == 0: 268 print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % ( 269 progress._ram._iterations, 270 progress._ram._remaining_bytes / (1024 * 1024), 271 progress._ram._total_bytes / (1024 * 1024), 272 progress._ram._transferred_bytes / (1024 * 1024), 273 progress._ram._transfer_rate_mbs, 274 )) 275 276 if progress._ram._iterations > scenario._max_iters: 277 if self._verbose: 278 print("No completion after %d iterations over RAM" % scenario._max_iters) 279 src.cmd("migrate_cancel") 280 continue 281 282 if time.time() > (start + scenario._max_time): 283 if self._verbose: 284 print("No completion after %d seconds" % scenario._max_time) 285 src.cmd("migrate_cancel") 286 continue 287 288 if (scenario._post_copy and 289 progress._ram._iterations >= scenario._post_copy_iters and 290 not post_copy): 291 if self._verbose: 292 print("Switching to post-copy after %d iterations" % scenario._post_copy_iters) 293 resp = src.cmd("migrate-start-postcopy") 294 post_copy = True 295 296 if (scenario._pause and 297 progress._ram._iterations >= scenario._pause_iters and 298 not paused): 299 if self._verbose: 300 print("Pausing VM after %d iterations" % scenario._pause_iters) 301 resp = src.cmd("stop") 302 paused = True 303 304 def _is_ppc64le(self): 305 _, _, _, _, machine = os.uname() 306 if machine == "ppc64le": 307 return True 308 return False 309 310 def _get_guest_console_args(self): 311 if self._is_ppc64le(): 312 return "console=hvc0" 313 else: 314 return "console=ttyS0" 315 316 def _get_qemu_serial_args(self): 317 if self._is_ppc64le(): 318 return ["-chardev", "stdio,id=cdev0", 319 "-device", "spapr-vty,chardev=cdev0"] 320 else: 321 return ["-chardev", "stdio,id=cdev0", 322 "-device", "isa-serial,chardev=cdev0"] 323 324 def _get_common_args(self, hardware, tunnelled=False): 325 args = [ 326 "noapic", 327 "edd=off", 328 "printk.time=1", 329 "noreplace-smp", 330 "cgroup_disable=memory", 331 "pci=noearly", 332 ] 333 334 args.append(self._get_guest_console_args()) 335 336 if self._debug: 337 args.append("debug") 338 else: 339 args.append("quiet") 340 341 args.append("ramsize=%s" % hardware._mem) 342 343 cmdline = " ".join(args) 344 if tunnelled: 345 cmdline = "'" + cmdline + "'" 346 347 argv = [ 348 "-cpu", "host", 349 "-kernel", self._kernel, 350 "-initrd", self._initrd, 351 "-append", cmdline, 352 "-m", str((hardware._mem * 1024) + 512), 353 "-smp", str(hardware._cpus), 354 ] 355 if hardware._dirty_ring_size: 356 argv.extend(["-accel", "kvm,dirty-ring-size=%s" % 357 hardware._dirty_ring_size]) 358 else: 359 argv.extend(["-accel", "kvm"]) 360 361 argv.extend(self._get_qemu_serial_args()) 362 363 if self._debug: 364 argv.extend(["-machine", "graphics=off"]) 365 366 if hardware._prealloc_pages: 367 argv_source += ["-mem-path", "/dev/shm", 368 "-mem-prealloc"] 369 if hardware._locked_pages: 370 argv_source += ["-overcommit", "mem-lock=on"] 371 if hardware._huge_pages: 372 pass 373 374 return argv 375 376 def _get_src_args(self, hardware): 377 return self._get_common_args(hardware) 378 379 def _get_dst_args(self, hardware, uri, defer_migrate): 380 tunnelled = False 381 if self._dst_host != "localhost": 382 tunnelled = True 383 argv = self._get_common_args(hardware, tunnelled) 384 385 if defer_migrate: 386 return argv + ["-incoming", "defer"] 387 return argv + ["-incoming", uri] 388 389 @staticmethod 390 def _get_common_wrapper(cpu_bind, mem_bind): 391 wrapper = [] 392 if len(cpu_bind) > 0 or len(mem_bind) > 0: 393 wrapper.append("numactl") 394 if cpu_bind: 395 wrapper.append("--physcpubind=%s" % ",".join(cpu_bind)) 396 if mem_bind: 397 wrapper.append("--membind=%s" % ",".join(mem_bind)) 398 399 return wrapper 400 401 def _get_src_wrapper(self, hardware): 402 return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind) 403 404 def _get_dst_wrapper(self, hardware): 405 wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind) 406 if self._dst_host != "localhost": 407 return ["ssh", 408 "-R", "9001:localhost:9001", 409 self._dst_host] + wrapper 410 else: 411 return wrapper 412 413 def _get_timings(self, vm): 414 log = vm.get_log() 415 if not log: 416 return [] 417 if self._debug: 418 print(log) 419 420 regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms" 421 matcher = re.compile(regex) 422 records = [] 423 for line in log.split("\n"): 424 match = matcher.match(line) 425 if match: 426 records.append(TimingRecord(int(match.group(1)), 427 int(match.group(2)) / 1000.0, 428 int(match.group(3)))) 429 return records 430 431 def run(self, hardware, scenario, result_dir=os.getcwd()): 432 abs_result_dir = os.path.join(result_dir, scenario._name) 433 defer_migrate = False 434 435 if self._transport == "tcp": 436 uri = "tcp:%s:9000" % self._dst_host 437 elif self._transport == "rdma": 438 uri = "rdma:%s:9000" % self._dst_host 439 elif self._transport == "unix": 440 if self._dst_host != "localhost": 441 raise Exception("Running use unix migration transport for non-local host") 442 uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid() 443 try: 444 os.remove(uri[5:]) 445 os.remove(monaddr) 446 except: 447 pass 448 449 if scenario._multifd: 450 defer_migrate = True 451 452 if self._dst_host != "localhost": 453 dstmonaddr = ("localhost", 9001) 454 else: 455 dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid() 456 srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid() 457 458 src = QEMUMachine(self._binary, 459 args=self._get_src_args(hardware), 460 wrapper=self._get_src_wrapper(hardware), 461 name="qemu-src-%d" % os.getpid(), 462 monitor_address=srcmonaddr) 463 464 dst = QEMUMachine(self._binary, 465 args=self._get_dst_args(hardware, uri, defer_migrate), 466 wrapper=self._get_dst_wrapper(hardware), 467 name="qemu-dst-%d" % os.getpid(), 468 monitor_address=dstmonaddr) 469 470 try: 471 src.launch() 472 dst.launch() 473 474 ret = self._migrate(hardware, scenario, src, 475 dst, uri, defer_migrate) 476 progress_history = ret[0] 477 qemu_timings = ret[1] 478 vcpu_timings = ret[2] 479 if uri[0:5] == "unix:" and os.path.exists(uri[5:]): 480 os.remove(uri[5:]) 481 482 if os.path.exists(srcmonaddr): 483 os.remove(srcmonaddr) 484 485 if self._dst_host == "localhost" and os.path.exists(dstmonaddr): 486 os.remove(dstmonaddr) 487 488 if self._verbose: 489 print("Finished migration") 490 491 src.shutdown() 492 dst.shutdown() 493 494 return Report(hardware, scenario, progress_history, 495 Timings(self._get_timings(src) + self._get_timings(dst)), 496 Timings(qemu_timings), 497 Timings(vcpu_timings), 498 self._binary, self._dst_host, self._kernel, 499 self._initrd, self._transport, self._sleep) 500 except Exception as e: 501 if self._debug: 502 print("Failed: %s" % str(e)) 503 try: 504 src.shutdown() 505 except: 506 pass 507 try: 508 dst.shutdown() 509 except: 510 pass 511 512 if self._debug: 513 print(src.get_log()) 514 print(dst.get_log()) 515 raise 516 517