xref: /linux/tools/testing/selftests/hid/tests/base_device.py (revision b80a75cf6999fb79971b41eaec7af2bb4b514714)
1#!/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3# -*- coding: utf-8 -*-
4#
5# Copyright (c) 2017 Benjamin Tissoires <benjamin.tissoires@gmail.com>
6# Copyright (c) 2017 Red Hat, Inc.
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
21import dataclasses
22import fcntl
23import functools
24import libevdev
25import os
26import threading
27
28try:
29    import pyudev
30except ImportError:
31    raise ImportError("UHID is not supported due to missing pyudev dependency")
32
33import logging
34
35import hidtools.hid as hid
36from hidtools.uhid import UHIDDevice
37from hidtools.util import BusType
38
39from pathlib import Path
40from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, Union
41
42logger = logging.getLogger("hidtools.device.base_device")
43
44
45class SysfsFile(object):
46    def __init__(self, path):
47        self.path = path
48
49    def __set_value(self, value):
50        with open(self.path, "w") as f:
51            return f.write(f"{value}\n")
52
53    def __get_value(self):
54        with open(self.path) as f:
55            return f.read().strip()
56
57    @property
58    def int_value(self) -> int:
59        return int(self.__get_value())
60
61    @int_value.setter
62    def int_value(self, v: int) -> None:
63        self.__set_value(v)
64
65    @property
66    def str_value(self) -> str:
67        return self.__get_value()
68
69    @str_value.setter
70    def str_value(self, v: str) -> None:
71        self.__set_value(v)
72
73
74class LED(object):
75    def __init__(self, sys_path):
76        self.max_brightness = SysfsFile(sys_path / "max_brightness").int_value
77        self.__brightness = SysfsFile(sys_path / "brightness")
78
79    @property
80    def brightness(self) -> int:
81        return self.__brightness.int_value
82
83    @brightness.setter
84    def brightness(self, value: int) -> None:
85        self.__brightness.int_value = value
86
87
88class PowerSupply(object):
89    """Represents Linux power_supply_class sysfs nodes."""
90
91    def __init__(self, sys_path):
92        self._capacity = SysfsFile(sys_path / "capacity")
93        self._status = SysfsFile(sys_path / "status")
94        self._type = SysfsFile(sys_path / "type")
95
96    @property
97    def capacity(self) -> int:
98        return self._capacity.int_value
99
100    @property
101    def status(self) -> str:
102        return self._status.str_value
103
104    @property
105    def type(self) -> str:
106        return self._type.str_value
107
108
109@dataclasses.dataclass
110class HidReadiness:
111    is_ready: bool = False
112    count: int = 0
113
114
115class HIDIsReady(object):
116    """
117    Companion class that binds to a kernel mechanism
118    and that allows to know when a uhid device is ready or not.
119
120    See :meth:`is_ready` for details.
121    """
122
123    def __init__(self: "HIDIsReady", uhid: UHIDDevice) -> None:
124        self.uhid = uhid
125
126    def is_ready(self: "HIDIsReady") -> HidReadiness:
127        """
128        Overwrite in subclasses: should return True or False whether
129        the attached uhid device is ready or not.
130        """
131        return HidReadiness()
132
133
134class UdevHIDIsReady(HIDIsReady):
135    _pyudev_context: ClassVar[Optional[pyudev.Context]] = None
136    _pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None
137    _uhid_devices: ClassVar[Dict[int, HidReadiness]] = {}
138
139    def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None:
140        super().__init__(uhid)
141        self._init_pyudev()
142
143    @classmethod
144    def _init_pyudev(cls: Type["UdevHIDIsReady"]) -> None:
145        if cls._pyudev_context is None:
146            cls._pyudev_context = pyudev.Context()
147            cls._pyudev_monitor = pyudev.Monitor.from_netlink(cls._pyudev_context)
148            cls._pyudev_monitor.filter_by("hid")
149            cls._pyudev_monitor.start()
150
151            UHIDDevice._append_fd_to_poll(
152                cls._pyudev_monitor.fileno(), cls._cls_udev_event_callback
153            )
154
155    @classmethod
156    def _cls_udev_event_callback(cls: Type["UdevHIDIsReady"]) -> None:
157        if cls._pyudev_monitor is None:
158            return
159        event: pyudev.Device
160        for event in iter(functools.partial(cls._pyudev_monitor.poll, 0.02), None):
161            if event.action not in ["bind", "remove", "unbind"]:
162                return
163
164            logger.debug(f"udev event: {event.action} -> {event}")
165
166            id = int(event.sys_path.strip().split(".")[-1], 16)
167
168            readiness = cls._uhid_devices.setdefault(id, HidReadiness())
169
170            ready = event.action == "bind"
171            if not readiness.is_ready and ready:
172                readiness.count += 1
173
174            readiness.is_ready = ready
175
176    def is_ready(self: "UdevHIDIsReady") -> HidReadiness:
177        try:
178            return self._uhid_devices[self.uhid.hid_id]
179        except KeyError:
180            return HidReadiness()
181
182
183class EvdevMatch(object):
184    def __init__(
185        self: "EvdevMatch",
186        *,
187        requires: List[Any] = [],
188        excludes: List[Any] = [],
189        req_properties: List[Any] = [],
190        excl_properties: List[Any] = [],
191    ) -> None:
192        self.requires = requires
193        self.excludes = excludes
194        self.req_properties = req_properties
195        self.excl_properties = excl_properties
196
197    def is_a_match(self: "EvdevMatch", evdev: libevdev.Device) -> bool:
198        for m in self.requires:
199            if not evdev.has(m):
200                return False
201        for m in self.excludes:
202            if evdev.has(m):
203                return False
204        for p in self.req_properties:
205            if not evdev.has_property(p):
206                return False
207        for p in self.excl_properties:
208            if evdev.has_property(p):
209                return False
210        return True
211
212
213class EvdevDevice(object):
214    """
215    Represents an Evdev node and its properties.
216    This is a stub for the libevdev devices, as they are relying on
217    uevent to get the data, saving us some ioctls to fetch the names
218    and properties.
219    """
220
221    def __init__(self: "EvdevDevice", sysfs: Path) -> None:
222        self.sysfs = sysfs
223        self.event_node: Any = None
224        self.libevdev: Optional[libevdev.Device] = None
225
226        self.uevents = {}
227        # all of the interesting properties are stored in the input uevent, so in the parent
228        # so convert the uevent file of the parent input node into a dict
229        with open(sysfs.parent / "uevent") as f:
230            for line in f.readlines():
231                key, value = line.strip().split("=")
232                self.uevents[key] = value.strip('"')
233
234        # we open all evdev nodes in order to not miss any event
235        self.open()
236
237    @property
238    def name(self: "EvdevDevice") -> str:
239        assert "NAME" in self.uevents
240
241        return self.uevents["NAME"]
242
243    @property
244    def evdev(self: "EvdevDevice") -> Path:
245        return Path("/dev/input") / self.sysfs.name
246
247    def matches_application(
248        self: "EvdevDevice", application: str, matches: Dict[str, EvdevMatch]
249    ) -> bool:
250        if self.libevdev is None:
251            return False
252
253        if application in matches:
254            return matches[application].is_a_match(self.libevdev)
255
256        logger.error(
257            f"application '{application}' is unknown, please update/fix hid-tools"
258        )
259        assert False  # hid-tools likely needs an update
260
261    def open(self: "EvdevDevice") -> libevdev.Device:
262        self.event_node = open(self.evdev, "rb")
263        self.libevdev = libevdev.Device(self.event_node)
264
265        assert self.libevdev.fd is not None
266
267        fd = self.libevdev.fd.fileno()
268        flag = fcntl.fcntl(fd, fcntl.F_GETFD)
269        fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
270
271        return self.libevdev
272
273    def close(self: "EvdevDevice") -> None:
274        if self.libevdev is not None and self.libevdev.fd is not None:
275            self.libevdev.fd.close()
276            self.libevdev = None
277        if self.event_node is not None:
278            self.event_node.close()
279            self.event_node = None
280
281
282class BaseDevice(UHIDDevice):
283    # default _application_matches that matches nothing. This needs
284    # to be set in the subclasses to have get_evdev() working
285    _application_matches: Dict[str, EvdevMatch] = {}
286
287    def __init__(
288        self,
289        name,
290        application,
291        rdesc_str: Optional[str] = None,
292        rdesc: Optional[Union[hid.ReportDescriptor, str, bytes]] = None,
293        input_info=None,
294    ) -> None:
295        self._kernel_is_ready: HIDIsReady = UdevHIDIsReady(self)
296        if rdesc_str is None and rdesc is None:
297            raise Exception("Please provide at least a rdesc or rdesc_str")
298        super().__init__()
299        if name is None:
300            name = f"uhid gamepad test {self.__class__.__name__}"
301        if input_info is None:
302            input_info = (BusType.USB, 1, 2)
303        self.name = name
304        self.info = input_info
305        self.default_reportID = None
306        self.opened = False
307        self.started = False
308        self.application = application
309        self._input_nodes: Optional[list[EvdevDevice]] = None
310        if rdesc is None:
311            assert rdesc_str is not None
312            self.rdesc = hid.ReportDescriptor.from_human_descr(rdesc_str)  # type: ignore
313        else:
314            self.rdesc = rdesc  # type: ignore
315
316    @property
317    def power_supply_class(self: "BaseDevice") -> Optional[PowerSupply]:
318        ps = self.walk_sysfs("power_supply", "power_supply/*")
319        if ps is None or len(ps) < 1:
320            return None
321
322        return PowerSupply(ps[0])
323
324    @property
325    def led_classes(self: "BaseDevice") -> List[LED]:
326        leds = self.walk_sysfs("led", "**/max_brightness")
327        if leds is None:
328            return []
329
330        return [LED(led.parent) for led in leds]
331
332    @property
333    def kernel_is_ready(self: "BaseDevice") -> bool:
334        return self._kernel_is_ready.is_ready().is_ready and self.started
335
336    @property
337    def kernel_ready_count(self: "BaseDevice") -> int:
338        return self._kernel_is_ready.is_ready().count
339
340    @property
341    def input_nodes(self: "BaseDevice") -> List[EvdevDevice]:
342        if self._input_nodes is not None:
343            return self._input_nodes
344
345        if not self.kernel_is_ready or not self.started:
346            return []
347
348        # Starting with kernel v6.16, an event is emitted when
349        # userspace opens a kernel device, and for some devices
350        # this translates into a SET_REPORT.
351        # Because EvdevDevice(path) opens every single evdev node
352        # we need to have a separate thread to process the incoming
353        # SET_REPORT or we end up having to wait for the kernel
354        # timeout of 5 seconds.
355        done = False
356
357        def dispatch():
358            while not done:
359                self.dispatch(1)
360
361        t = threading.Thread(target=dispatch)
362        t.start()
363
364        self._input_nodes = [
365            EvdevDevice(path)
366            for path in self.walk_sysfs("input", "input/input*/event*")
367        ]
368        done = True
369        t.join()
370        return self._input_nodes
371
372    def match_evdev_rule(self, application, evdev):
373        """Replace this in subclasses if the device has multiple reports
374        of the same type and we need to filter based on the actual evdev
375        node.
376
377        returning True will append the corresponding report to
378        `self.input_nodes[type]`
379        returning False  will ignore this report / type combination
380        for the device.
381        """
382        return True
383
384    def open(self):
385        self.opened = True
386
387    def _close_all_opened_evdev(self):
388        if self._input_nodes is not None:
389            for e in self._input_nodes:
390                e.close()
391
392    def __del__(self):
393        self._close_all_opened_evdev()
394
395    def close(self):
396        self.opened = False
397
398    def start(self, flags):
399        self.started = True
400
401    def stop(self):
402        self.started = False
403        self._close_all_opened_evdev()
404
405    def next_sync_events(self, application=None):
406        evdev = self.get_evdev(application)
407        if evdev is not None:
408            return list(evdev.events())
409        return []
410
411    @property
412    def application_matches(self: "BaseDevice") -> Dict[str, EvdevMatch]:
413        return self._application_matches
414
415    @application_matches.setter
416    def application_matches(self: "BaseDevice", data: Dict[str, EvdevMatch]) -> None:
417        self._application_matches = data
418
419    def get_evdev(self, application=None):
420        if application is None:
421            application = self.application
422
423        if len(self.input_nodes) == 0:
424            return None
425
426        assert self._input_nodes is not None
427
428        if len(self._input_nodes) == 1:
429            evdev = self._input_nodes[0]
430            if self.match_evdev_rule(application, evdev.libevdev):
431                return evdev.libevdev
432        else:
433            for _evdev in self._input_nodes:
434                if _evdev.matches_application(application, self.application_matches):
435                    if self.match_evdev_rule(application, _evdev.libevdev):
436                        return _evdev.libevdev
437
438    def is_ready(self):
439        """Returns whether a UHID device is ready. Can be overwritten in
440        subclasses to add extra conditions on when to consider a UHID
441        device ready. This can be:
442
443        - we need to wait on different types of input devices to be ready
444          (Touch Screen and Pen for example)
445        - we need to have at least 4 LEDs present
446          (len(self.uhdev.leds_classes) == 4)
447        - or any other combinations"""
448        return self.kernel_is_ready
449