1# SPDX-License-Identifier: GPL-2.0 2 3import os 4import time 5from pathlib import Path 6from lib.py import KsftSkipEx, KsftXfailEx 7from lib.py import ksft_setup 8from lib.py import cmd, ethtool, ip, CmdExitFailure 9from lib.py import NetNS, NetdevSimDev 10from .remote import Remote 11 12 13class NetDrvEnvBase: 14 """ 15 Base class for a NIC / host envirnoments 16 17 Attributes: 18 test_dir: Path to the source directory of the test 19 net_lib_dir: Path to the net/lib directory 20 """ 21 def __init__(self, src_path): 22 self.src_path = Path(src_path) 23 self.test_dir = self.src_path.parent.resolve() 24 self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve() 25 26 self.env = self._load_env_file() 27 28 def _load_env_file(self): 29 env = os.environ.copy() 30 31 src_dir = Path(self.src_path).parent.resolve() 32 if not (src_dir / "net.config").exists(): 33 return ksft_setup(env) 34 35 with open((src_dir / "net.config").as_posix(), 'r') as fp: 36 for line in fp.readlines(): 37 full_file = line 38 # Strip comments 39 pos = line.find("#") 40 if pos >= 0: 41 line = line[:pos] 42 line = line.strip() 43 if not line: 44 continue 45 pair = line.split('=', maxsplit=1) 46 if len(pair) != 2: 47 raise Exception("Can't parse configuration line:", full_file) 48 env[pair[0]] = pair[1] 49 return ksft_setup(env) 50 51 52class NetDrvEnv(NetDrvEnvBase): 53 """ 54 Class for a single NIC / host env, with no remote end 55 """ 56 def __init__(self, src_path, nsim_test=None, **kwargs): 57 super().__init__(src_path) 58 59 self._ns = None 60 61 if 'NETIF' in self.env: 62 if nsim_test is True: 63 raise KsftXfailEx("Test only works on netdevsim") 64 65 self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0] 66 else: 67 if nsim_test is False: 68 raise KsftXfailEx("Test does not work on netdevsim") 69 70 self._ns = NetdevSimDev(**kwargs) 71 self.dev = self._ns.nsims[0].dev 72 self.ifname = self.dev['ifname'] 73 self.ifindex = self.dev['ifindex'] 74 75 def __enter__(self): 76 ip(f"link set dev {self.dev['ifname']} up") 77 78 return self 79 80 def __exit__(self, ex_type, ex_value, ex_tb): 81 """ 82 __exit__ gets called at the end of a "with" block. 83 """ 84 self.__del__() 85 86 def __del__(self): 87 if self._ns: 88 self._ns.remove() 89 self._ns = None 90 91 92class NetDrvEpEnv(NetDrvEnvBase): 93 """ 94 Class for an environment with a local device and "remote endpoint" 95 which can be used to send traffic in. 96 97 For local testing it creates two network namespaces and a pair 98 of netdevsim devices. 99 """ 100 101 # Network prefixes used for local tests 102 nsim_v4_pfx = "192.0.2." 103 nsim_v6_pfx = "2001:db8::" 104 105 def __init__(self, src_path, nsim_test=None): 106 super().__init__(src_path) 107 108 self._stats_settle_time = None 109 110 # Things we try to destroy 111 self.remote = None 112 # These are for local testing state 113 self._netns = None 114 self._ns = None 115 self._ns_peer = None 116 117 self.addr_v = { "4": None, "6": None } 118 self.remote_addr_v = { "4": None, "6": None } 119 120 if "NETIF" in self.env: 121 if nsim_test is True: 122 raise KsftXfailEx("Test only works on netdevsim") 123 self._check_env() 124 125 self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0] 126 127 self.addr_v["4"] = self.env.get("LOCAL_V4") 128 self.addr_v["6"] = self.env.get("LOCAL_V6") 129 self.remote_addr_v["4"] = self.env.get("REMOTE_V4") 130 self.remote_addr_v["6"] = self.env.get("REMOTE_V6") 131 kind = self.env["REMOTE_TYPE"] 132 args = self.env["REMOTE_ARGS"] 133 else: 134 if nsim_test is False: 135 raise KsftXfailEx("Test does not work on netdevsim") 136 137 self.create_local() 138 139 self.dev = self._ns.nsims[0].dev 140 141 self.addr_v["4"] = self.nsim_v4_pfx + "1" 142 self.addr_v["6"] = self.nsim_v6_pfx + "1" 143 self.remote_addr_v["4"] = self.nsim_v4_pfx + "2" 144 self.remote_addr_v["6"] = self.nsim_v6_pfx + "2" 145 kind = "netns" 146 args = self._netns.name 147 148 self.remote = Remote(kind, args, src_path) 149 150 self.addr_ipver = "6" if self.addr_v["6"] else "4" 151 self.addr = self.addr_v[self.addr_ipver] 152 self.remote_addr = self.remote_addr_v[self.addr_ipver] 153 154 # Bracketed addresses, some commands need IPv6 to be inside [] 155 self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"] 156 self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"] 157 158 self.ifname = self.dev['ifname'] 159 self.ifindex = self.dev['ifindex'] 160 161 # resolve remote interface name 162 self.remote_ifname = self.resolve_remote_ifc() 163 164 self._required_cmd = {} 165 166 def create_local(self): 167 self._netns = NetNS() 168 self._ns = NetdevSimDev() 169 self._ns_peer = NetdevSimDev(ns=self._netns) 170 171 with open("/proc/self/ns/net") as nsfd0, \ 172 open("/var/run/netns/" + self._netns.name) as nsfd1: 173 ifi0 = self._ns.nsims[0].ifindex 174 ifi1 = self._ns_peer.nsims[0].ifindex 175 NetdevSimDev.ctrl_write('link_device', 176 f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}') 177 178 ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24") 179 ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad") 180 ip(f" link set dev {self._ns.nsims[0].ifname} up") 181 182 ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns) 183 ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns) 184 ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns) 185 186 def _check_env(self): 187 vars_needed = [ 188 ["LOCAL_V4", "LOCAL_V6"], 189 ["REMOTE_V4", "REMOTE_V6"], 190 ["REMOTE_TYPE"], 191 ["REMOTE_ARGS"] 192 ] 193 missing = [] 194 195 for choice in vars_needed: 196 for entry in choice: 197 if entry in self.env: 198 break 199 else: 200 missing.append(choice) 201 # Make sure v4 / v6 configs are symmetric 202 if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env): 203 missing.append(["LOCAL_V6", "REMOTE_V6"]) 204 if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env): 205 missing.append(["LOCAL_V4", "REMOTE_V4"]) 206 if missing: 207 raise Exception("Invalid environment, missing configuration:", missing, 208 "Please see tools/testing/selftests/drivers/net/README.rst") 209 210 def resolve_remote_ifc(self): 211 v4 = v6 = None 212 if self.remote_addr_v["4"]: 213 v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote) 214 if self.remote_addr_v["6"]: 215 v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote) 216 if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]: 217 raise Exception("Can't resolve remote interface name, v4 and v6 don't match") 218 if (v4 and len(v4) > 1) or (v6 and len(v6) > 1): 219 raise Exception("Can't resolve remote interface name, multiple interfaces match") 220 return v6[0]["ifname"] if v6 else v4[0]["ifname"] 221 222 def __enter__(self): 223 return self 224 225 def __exit__(self, ex_type, ex_value, ex_tb): 226 """ 227 __exit__ gets called at the end of a "with" block. 228 """ 229 self.__del__() 230 231 def __del__(self): 232 if self._ns: 233 self._ns.remove() 234 self._ns = None 235 if self._ns_peer: 236 self._ns_peer.remove() 237 self._ns_peer = None 238 if self._netns: 239 del self._netns 240 self._netns = None 241 if self.remote: 242 del self.remote 243 self.remote = None 244 245 def require_ipver(self, ipver): 246 if not self.addr_v[ipver] or not self.remote_addr_v[ipver]: 247 raise KsftSkipEx(f"Test requires IPv{ipver} connectivity") 248 249 def _require_cmd(self, comm, key, host=None): 250 cached = self._required_cmd.get(comm, {}) 251 if cached.get(key) is None: 252 cached[key] = cmd("command -v -- " + comm, fail=False, 253 shell=True, host=host).ret == 0 254 self._required_cmd[comm] = cached 255 return cached[key] 256 257 def require_cmd(self, comm, local=True, remote=False): 258 if local: 259 if not self._require_cmd(comm, "local"): 260 raise KsftSkipEx("Test requires command: " + comm) 261 if remote: 262 if not self._require_cmd(comm, "remote"): 263 raise KsftSkipEx("Test requires (remote) command: " + comm) 264 265 def wait_hw_stats_settle(self): 266 """ 267 Wait for HW stats to become consistent, some devices DMA HW stats 268 periodically so events won't be reflected until next sync. 269 Good drivers will tell us via ethtool what their sync period is. 270 """ 271 if self._stats_settle_time is None: 272 data = {} 273 try: 274 data = ethtool("-c " + self.ifname, json=True)[0] 275 except CmdExitFailure as e: 276 if "Operation not supported" not in e.cmd.stderr: 277 raise 278 279 self._stats_settle_time = 0.025 + \ 280 data.get('stats-block-usecs', 0) / 1000 / 1000 281 282 time.sleep(self._stats_settle_time) 283