1#!/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3# -*- coding: utf-8 -*- 4# 5# Copyright (c) 2017 Benjamin Tissoires <benjamin.tissoires@gmail.com> 6# Copyright (c) 2017 Red Hat, Inc. 7# 8# This program is free software: you can redistribute it and/or modify 9# it under the terms of the GNU General Public License as published by 10# the Free Software Foundation; either version 2 of the License, or 11# (at your option) any later version. 12# 13# This program is distributed in the hope that it will be useful, 14# but WITHOUT ANY WARRANTY; without even the implied warranty of 15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16# GNU General Public License for more details. 17# 18# You should have received a copy of the GNU General Public License 19# along with this program. If not, see <http://www.gnu.org/licenses/>. 20 21import dataclasses 22import fcntl 23import functools 24import libevdev 25import os 26import threading 27 28try: 29 import pyudev 30except ImportError: 31 raise ImportError("UHID is not supported due to missing pyudev dependency") 32 33import logging 34 35import hidtools.hid as hid 36from hidtools.uhid import UHIDDevice 37from hidtools.util import BusType 38 39from pathlib import Path 40from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, Union 41 42logger = logging.getLogger("hidtools.device.base_device") 43 44 45class SysfsFile(object): 46 def __init__(self, path): 47 self.path = path 48 49 def __set_value(self, value): 50 with open(self.path, "w") as f: 51 return f.write(f"{value}\n") 52 53 def __get_value(self): 54 with open(self.path) as f: 55 return f.read().strip() 56 57 @property 58 def int_value(self) -> int: 59 return int(self.__get_value()) 60 61 @int_value.setter 62 def int_value(self, v: int) -> None: 63 self.__set_value(v) 64 65 @property 66 def str_value(self) -> str: 67 return self.__get_value() 68 69 @str_value.setter 70 def str_value(self, v: str) -> None: 71 self.__set_value(v) 72 73 74class LED(object): 75 def __init__(self, sys_path): 76 self.max_brightness = SysfsFile(sys_path / "max_brightness").int_value 77 self.__brightness = SysfsFile(sys_path / "brightness") 78 79 @property 80 def brightness(self) -> int: 81 return self.__brightness.int_value 82 83 @brightness.setter 84 def brightness(self, value: int) -> None: 85 self.__brightness.int_value = value 86 87 88class PowerSupply(object): 89 """Represents Linux power_supply_class sysfs nodes.""" 90 91 def __init__(self, sys_path): 92 self._capacity = SysfsFile(sys_path / "capacity") 93 self._status = SysfsFile(sys_path / "status") 94 self._type = SysfsFile(sys_path / "type") 95 96 @property 97 def capacity(self) -> int: 98 return self._capacity.int_value 99 100 @property 101 def status(self) -> str: 102 return self._status.str_value 103 104 @property 105 def type(self) -> str: 106 return self._type.str_value 107 108 109@dataclasses.dataclass 110class HidReadiness: 111 is_ready: bool = False 112 count: int = 0 113 114 115class HIDIsReady(object): 116 """ 117 Companion class that binds to a kernel mechanism 118 and that allows to know when a uhid device is ready or not. 119 120 See :meth:`is_ready` for details. 121 """ 122 123 def __init__(self: "HIDIsReady", uhid: UHIDDevice) -> None: 124 self.uhid = uhid 125 126 def is_ready(self: "HIDIsReady") -> HidReadiness: 127 """ 128 Overwrite in subclasses: should return True or False whether 129 the attached uhid device is ready or not. 130 """ 131 return HidReadiness() 132 133 134class UdevHIDIsReady(HIDIsReady): 135 _pyudev_context: ClassVar[Optional[pyudev.Context]] = None 136 _pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None 137 _uhid_devices: ClassVar[Dict[int, HidReadiness]] = {} 138 139 def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None: 140 super().__init__(uhid) 141 self._init_pyudev() 142 143 @classmethod 144 def _init_pyudev(cls: Type["UdevHIDIsReady"]) -> None: 145 if cls._pyudev_context is None: 146 cls._pyudev_context = pyudev.Context() 147 cls._pyudev_monitor = pyudev.Monitor.from_netlink(cls._pyudev_context) 148 cls._pyudev_monitor.filter_by("hid") 149 cls._pyudev_monitor.start() 150 151 UHIDDevice._append_fd_to_poll( 152 cls._pyudev_monitor.fileno(), cls._cls_udev_event_callback 153 ) 154 155 @classmethod 156 def _cls_udev_event_callback(cls: Type["UdevHIDIsReady"]) -> None: 157 if cls._pyudev_monitor is None: 158 return 159 event: pyudev.Device 160 for event in iter(functools.partial(cls._pyudev_monitor.poll, 0.02), None): 161 if event.action not in ["bind", "remove", "unbind"]: 162 return 163 164 logger.debug(f"udev event: {event.action} -> {event}") 165 166 id = int(event.sys_path.strip().split(".")[-1], 16) 167 168 readiness = cls._uhid_devices.setdefault(id, HidReadiness()) 169 170 ready = event.action == "bind" 171 if not readiness.is_ready and ready: 172 readiness.count += 1 173 174 readiness.is_ready = ready 175 176 def is_ready(self: "UdevHIDIsReady") -> HidReadiness: 177 try: 178 return self._uhid_devices[self.uhid.hid_id] 179 except KeyError: 180 return HidReadiness() 181 182 183class EvdevMatch(object): 184 def __init__( 185 self: "EvdevMatch", 186 *, 187 requires: List[Any] = [], 188 excludes: List[Any] = [], 189 req_properties: List[Any] = [], 190 excl_properties: List[Any] = [], 191 ) -> None: 192 self.requires = requires 193 self.excludes = excludes 194 self.req_properties = req_properties 195 self.excl_properties = excl_properties 196 197 def is_a_match(self: "EvdevMatch", evdev: libevdev.Device) -> bool: 198 for m in self.requires: 199 if not evdev.has(m): 200 return False 201 for m in self.excludes: 202 if evdev.has(m): 203 return False 204 for p in self.req_properties: 205 if not evdev.has_property(p): 206 return False 207 for p in self.excl_properties: 208 if evdev.has_property(p): 209 return False 210 return True 211 212 213class EvdevDevice(object): 214 """ 215 Represents an Evdev node and its properties. 216 This is a stub for the libevdev devices, as they are relying on 217 uevent to get the data, saving us some ioctls to fetch the names 218 and properties. 219 """ 220 221 def __init__(self: "EvdevDevice", sysfs: Path) -> None: 222 self.sysfs = sysfs 223 self.event_node: Any = None 224 self.libevdev: Optional[libevdev.Device] = None 225 226 self.uevents = {} 227 # all of the interesting properties are stored in the input uevent, so in the parent 228 # so convert the uevent file of the parent input node into a dict 229 with open(sysfs.parent / "uevent") as f: 230 for line in f.readlines(): 231 key, value = line.strip().split("=") 232 self.uevents[key] = value.strip('"') 233 234 # we open all evdev nodes in order to not miss any event 235 self.open() 236 237 @property 238 def name(self: "EvdevDevice") -> str: 239 assert "NAME" in self.uevents 240 241 return self.uevents["NAME"] 242 243 @property 244 def evdev(self: "EvdevDevice") -> Path: 245 return Path("/dev/input") / self.sysfs.name 246 247 def matches_application( 248 self: "EvdevDevice", application: str, matches: Dict[str, EvdevMatch] 249 ) -> bool: 250 if self.libevdev is None: 251 return False 252 253 if application in matches: 254 return matches[application].is_a_match(self.libevdev) 255 256 logger.error( 257 f"application '{application}' is unknown, please update/fix hid-tools" 258 ) 259 assert False # hid-tools likely needs an update 260 261 def open(self: "EvdevDevice") -> libevdev.Device: 262 self.event_node = open(self.evdev, "rb") 263 self.libevdev = libevdev.Device(self.event_node) 264 265 assert self.libevdev.fd is not None 266 267 fd = self.libevdev.fd.fileno() 268 flag = fcntl.fcntl(fd, fcntl.F_GETFD) 269 fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) 270 271 return self.libevdev 272 273 def close(self: "EvdevDevice") -> None: 274 if self.libevdev is not None and self.libevdev.fd is not None: 275 self.libevdev.fd.close() 276 self.libevdev = None 277 if self.event_node is not None: 278 self.event_node.close() 279 self.event_node = None 280 281 282class BaseDevice(UHIDDevice): 283 # default _application_matches that matches nothing. This needs 284 # to be set in the subclasses to have get_evdev() working 285 _application_matches: Dict[str, EvdevMatch] = {} 286 287 def __init__( 288 self, 289 name, 290 application, 291 rdesc_str: Optional[str] = None, 292 rdesc: Optional[Union[hid.ReportDescriptor, str, bytes]] = None, 293 input_info=None, 294 ) -> None: 295 self._kernel_is_ready: HIDIsReady = UdevHIDIsReady(self) 296 if rdesc_str is None and rdesc is None: 297 raise Exception("Please provide at least a rdesc or rdesc_str") 298 super().__init__() 299 if name is None: 300 name = f"uhid gamepad test {self.__class__.__name__}" 301 if input_info is None: 302 input_info = (BusType.USB, 1, 2) 303 self.name = name 304 self.info = input_info 305 self.default_reportID = None 306 self.opened = False 307 self.started = False 308 self.application = application 309 self._input_nodes: Optional[list[EvdevDevice]] = None 310 if rdesc is None: 311 assert rdesc_str is not None 312 self.rdesc = hid.ReportDescriptor.from_human_descr(rdesc_str) # type: ignore 313 else: 314 self.rdesc = rdesc # type: ignore 315 316 @property 317 def power_supply_class(self: "BaseDevice") -> Optional[PowerSupply]: 318 ps = self.walk_sysfs("power_supply", "power_supply/*") 319 if ps is None or len(ps) < 1: 320 return None 321 322 return PowerSupply(ps[0]) 323 324 @property 325 def led_classes(self: "BaseDevice") -> List[LED]: 326 leds = self.walk_sysfs("led", "**/max_brightness") 327 if leds is None: 328 return [] 329 330 return [LED(led.parent) for led in leds] 331 332 @property 333 def kernel_is_ready(self: "BaseDevice") -> bool: 334 return self._kernel_is_ready.is_ready().is_ready and self.started 335 336 @property 337 def kernel_ready_count(self: "BaseDevice") -> int: 338 return self._kernel_is_ready.is_ready().count 339 340 @property 341 def input_nodes(self: "BaseDevice") -> List[EvdevDevice]: 342 if self._input_nodes is not None: 343 return self._input_nodes 344 345 if not self.kernel_is_ready or not self.started: 346 return [] 347 348 # Starting with kernel v6.16, an event is emitted when 349 # userspace opens a kernel device, and for some devices 350 # this translates into a SET_REPORT. 351 # Because EvdevDevice(path) opens every single evdev node 352 # we need to have a separate thread to process the incoming 353 # SET_REPORT or we end up having to wait for the kernel 354 # timeout of 5 seconds. 355 done = False 356 357 def dispatch(): 358 while not done: 359 self.dispatch(1) 360 361 t = threading.Thread(target=dispatch) 362 t.start() 363 364 self._input_nodes = [ 365 EvdevDevice(path) 366 for path in self.walk_sysfs("input", "input/input*/event*") 367 ] 368 done = True 369 t.join() 370 return self._input_nodes 371 372 def match_evdev_rule(self, application, evdev): 373 """Replace this in subclasses if the device has multiple reports 374 of the same type and we need to filter based on the actual evdev 375 node. 376 377 returning True will append the corresponding report to 378 `self.input_nodes[type]` 379 returning False will ignore this report / type combination 380 for the device. 381 """ 382 return True 383 384 def open(self): 385 self.opened = True 386 387 def _close_all_opened_evdev(self): 388 if self._input_nodes is not None: 389 for e in self._input_nodes: 390 e.close() 391 392 def __del__(self): 393 self._close_all_opened_evdev() 394 395 def close(self): 396 self.opened = False 397 398 def start(self, flags): 399 self.started = True 400 401 def stop(self): 402 self.started = False 403 self._close_all_opened_evdev() 404 405 def next_sync_events(self, application=None): 406 evdev = self.get_evdev(application) 407 if evdev is not None: 408 return list(evdev.events()) 409 return [] 410 411 @property 412 def application_matches(self: "BaseDevice") -> Dict[str, EvdevMatch]: 413 return self._application_matches 414 415 @application_matches.setter 416 def application_matches(self: "BaseDevice", data: Dict[str, EvdevMatch]) -> None: 417 self._application_matches = data 418 419 def get_evdev(self, application=None): 420 if application is None: 421 application = self.application 422 423 if len(self.input_nodes) == 0: 424 return None 425 426 assert self._input_nodes is not None 427 428 if len(self._input_nodes) == 1: 429 evdev = self._input_nodes[0] 430 if self.match_evdev_rule(application, evdev.libevdev): 431 return evdev.libevdev 432 else: 433 for _evdev in self._input_nodes: 434 if _evdev.matches_application(application, self.application_matches): 435 if self.match_evdev_rule(application, _evdev.libevdev): 436 return _evdev.libevdev 437 438 def is_ready(self): 439 """Returns whether a UHID device is ready. Can be overwritten in 440 subclasses to add extra conditions on when to consider a UHID 441 device ready. This can be: 442 443 - we need to wait on different types of input devices to be ready 444 (Touch Screen and Pen for example) 445 - we need to have at least 4 LEDs present 446 (len(self.uhdev.leds_classes) == 4) 447 - or any other combinations""" 448 return self.kernel_is_ready 449