1#!/usr/bin/env python3 2# 3# Functional test that check overcommit memlock options 4# 5# Copyright (c) Yandex Technologies LLC, 2025 6# 7# Author: 8# Alexandr Moshkov <dtalexundeer@yandex-team.ru> 9# 10# SPDX-License-Identifier: GPL-2.0-or-later 11 12import re 13 14from typing import Dict 15 16from qemu_test import QemuSystemTest 17from qemu_test import skipLockedMemoryTest 18 19 20STATUS_VALUE_PATTERN = re.compile(r'^(\w+):\s+(\d+) kB', re.MULTILINE) 21 22 23@skipLockedMemoryTest(2_097_152) # 2GB 24class MemlockTest(QemuSystemTest): 25 """ 26 Runs a guest with memlock options. 27 Then verify, that this options is working correctly 28 by checking the status file of the QEMU process. 29 """ 30 31 def common_vm_setup_with_memlock(self, memlock): 32 self.vm.add_args('-overcommit', f'mem-lock={memlock}') 33 self.vm.launch() 34 35 def test_memlock_off(self): 36 self.common_vm_setup_with_memlock('off') 37 38 status = self.get_process_status_values(self.vm.get_pid()) 39 40 self.assertTrue(status['VmLck'] == 0) 41 42 def test_memlock_on(self): 43 self.common_vm_setup_with_memlock('on') 44 45 status = self.get_process_status_values(self.vm.get_pid()) 46 47 # VmLck > 0 kB and almost all memory is resident 48 self.assertTrue(status['VmLck'] > 0) 49 self.assertTrue(status['VmRSS'] >= status['VmSize'] * 0.70) 50 51 def test_memlock_onfault(self): 52 self.common_vm_setup_with_memlock('on-fault') 53 54 status = self.get_process_status_values(self.vm.get_pid()) 55 56 # VmLck > 0 kB and only few memory is resident 57 self.assertTrue(status['VmLck'] > 0) 58 self.assertTrue(status['VmRSS'] <= status['VmSize'] * 0.30) 59 60 def get_process_status_values(self, pid: int) -> Dict[str, int]: 61 result = {} 62 raw_status = self._get_raw_process_status(pid) 63 64 for line in raw_status.split('\n'): 65 if m := STATUS_VALUE_PATTERN.match(line): 66 result[m.group(1)] = int(m.group(2)) 67 68 return result 69 70 def _get_raw_process_status(self, pid: int) -> str: 71 try: 72 with open(f'/proc/{pid}/status', 'r') as f: 73 return f.read() 74 except FileNotFoundError: 75 self.skipTest("Can't open status file of the process") 76 77 78if __name__ == '__main__': 79 MemlockTest.main() 80