xref: /qemu/tests/functional/test_migration.py (revision 55f5bf716a65f67663d0769bcb8c017764b3e53a)
1#!/usr/bin/env python3
2#
3# Migration test
4#
5# Copyright (c) 2019 Red Hat, Inc.
6#
7# Authors:
8#  Cleber Rosa <crosa@redhat.com>
9#  Caio Carrara <ccarrara@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import tempfile
15import time
16
17from qemu_test import QemuSystemTest, skipIfMissingCommands
18from qemu_test.ports import Ports
19
20
21class MigrationTest(QemuSystemTest):
22
23    timeout = 10
24
25    @staticmethod
26    def migration_finished(vm):
27        return vm.cmd('query-migrate')['status'] in ('completed', 'failed')
28
29    def assert_migration(self, src_vm, dst_vm):
30
31        end = time.monotonic() + self.timeout
32        while time.monotonic() < end and not self.migration_finished(src_vm):
33           time.sleep(0.1)
34
35        end = time.monotonic() + self.timeout
36        while time.monotonic() < end and not self.migration_finished(dst_vm):
37           time.sleep(0.1)
38
39        self.assertEqual(src_vm.cmd('query-migrate')['status'], 'completed')
40        self.assertEqual(dst_vm.cmd('query-migrate')['status'], 'completed')
41        self.assertEqual(dst_vm.cmd('query-status')['status'], 'running')
42        self.assertEqual(src_vm.cmd('query-status')['status'],'postmigrate')
43
44    def select_machine(self):
45        target_machine = {
46            'aarch64': 'quanta-gsj',
47            'alpha': 'clipper',
48            'arm': 'npcm750-evb',
49            'i386': 'isapc',
50            'ppc': 'sam460ex',
51            'ppc64': 'mac99',
52            'riscv32': 'spike',
53            'riscv64': 'virt',
54            'sparc': 'SS-4',
55            'sparc64': 'sun4u',
56            'x86_64': 'microvm',
57        }
58        self.set_machine(target_machine[self.arch])
59
60    def do_migrate(self, dest_uri, src_uri=None):
61        self.select_machine()
62        dest_vm = self.get_vm('-incoming', dest_uri, name="dest-qemu")
63        dest_vm.add_args('-nodefaults')
64        dest_vm.launch()
65        if src_uri is None:
66            src_uri = dest_uri
67        source_vm = self.get_vm(name="source-qemu")
68        source_vm.add_args('-nodefaults')
69        source_vm.launch()
70        source_vm.qmp('migrate', uri=src_uri)
71        self.assert_migration(source_vm, dest_vm)
72
73    def _get_free_port(self, ports):
74        port = ports.find_free_port()
75        if port is None:
76            self.skipTest('Failed to find a free port')
77        return port
78
79    def test_migration_with_tcp_localhost(self):
80        with Ports() as ports:
81            dest_uri = 'tcp:localhost:%u' % self._get_free_port(ports)
82            self.do_migrate(dest_uri)
83
84    def test_migration_with_unix(self):
85        with tempfile.TemporaryDirectory(prefix='socket_') as socket_path:
86            dest_uri = 'unix:%s/qemu-test.sock' % socket_path
87            self.do_migrate(dest_uri)
88
89    @skipIfMissingCommands('ncat')
90    def test_migration_with_exec(self):
91        with Ports() as ports:
92            free_port = self._get_free_port(ports)
93            dest_uri = 'exec:ncat -l localhost %u' % free_port
94            src_uri = 'exec:ncat localhost %u' % free_port
95            self.do_migrate(dest_uri, src_uri)
96
97if __name__ == '__main__':
98    QemuSystemTest.main()
99