# Copyright (c) 2014-2024, Emmanuel Blot # Copyright (c) 2016, Emmanuel Bouaziz # All rights reserved. # # SPDX-License-Identifier: BSD-3-Clause """USB Helpers""" import sys from importlib import import_module from string import printable as printablechars from threading import RLock from typing import (Any, Dict, List, NamedTuple, Optional, Sequence, Set, TextIO, Type, Tuple, Union) from urllib.parse import SplitResult, urlsplit, urlunsplit from usb.backend import IBackend from usb.core import Device as UsbDevice, USBError from usb.util import dispose_resources, get_string as usb_get_string from .misc import to_int # pylint: disable=broad-except UsbDeviceDescriptor = NamedTuple('UsbDeviceDescriptor', (('vid', int), ('pid', int), ('bus', Optional[int]), ('address', Optional[int]), ('sn', Optional[str]), ('index', Optional[int]), ('description', Optional[str]))) """USB Device descriptor are used to report known information about a FTDI compatible device, and as a device selection filter * vid: vendor identifier, 16-bit integer * pid: product identifier, 16-bit integer * bus: USB bus identifier, host dependent integer * address: USB address identifier on a USB bus, host dependent integer * sn: serial number, string * index: integer, can be used to descriminate similar devices * description: device description, as a string To select a device, use None for unknown fields .. note:: * Always prefer serial number to other identification methods if available * Prefer bus/address selector over index """ UsbDeviceKey = Union[Tuple[int, int, int, int], Tuple[int, int]] """USB device indentifier on the system. This is used as USB device identifiers on the host. On proper hosts, this is a (bus, address, vid, pid) 4-uple. On stupid hosts (such as M$Win), it may be degraded to (vid, pid) 2-uple. """ class UsbToolsError(Exception): """UsbTools error.""" class UsbTools: """Helpers to obtain information about connected USB devices.""" # Supported back ends, in preference order BACKENDS = ('usb.backend.libusb1', 'usb.backend.libusb0') # Need to maintain a list of reference USB devices, to circumvent a # limitation in pyusb that prevents from opening several times the same # USB device. The following dictionary used bus/address/vendor/product keys # to track (device, refcount) pairs Lock = RLock() Devices = {} # (bus, address, vid, pid): (usb.core.Device, refcount) UsbDevices = {} # (vid, pid): {usb.core.Device} UsbApi = None @classmethod def find_all(cls, vps: Sequence[Tuple[int, int]], nocache: bool = False) -> \ List[Tuple[UsbDeviceDescriptor, int]]: """Find all devices that match the specified vendor/product pairs. :param vps: a sequence of 2-tuple (vid, pid) pairs :param bool nocache: bypass cache to re-enumerate USB devices on the host :return: a list of 2-tuple (UsbDeviceDescriptor, interface count) """ with cls.Lock: devs = set() for vid, pid in vps: # TODO optimize useless loops devs.update(UsbTools._find_devices(vid, pid, nocache)) devices = set() for dev in devs: ifcount = max(cfg.bNumInterfaces for cfg in dev) # TODO: handle / is serial number strings sernum = UsbTools.get_string(dev, dev.iSerialNumber) description = UsbTools.get_string(dev, dev.iProduct) descriptor = UsbDeviceDescriptor(dev.idVendor, dev.idProduct, dev.bus, dev.address, sernum, None, description) devices.add((descriptor, ifcount)) return list(devices) @classmethod def flush_cache(cls, ): """Flush the FTDI device cache. It is highly recommanded to call this method a FTDI device is unplugged/plugged back since the last enumeration, as the device may appear on a different USB location each time it is plugged in. Failing to clear out the cache may lead to USB Error 19: ``Device may have been disconnected``. """ with cls.Lock: cls.UsbDevices.clear() @classmethod def get_device(cls, devdesc: UsbDeviceDescriptor) -> UsbDevice: """Find a previously open device with the same vendor/product or initialize a new one, and return it. If several FTDI devices of the same kind (vid, pid) are connected to the host, either index or serial argument should be used to discriminate the FTDI device. index argument is not a reliable solution as the host may enumerate the USB device in random order. serial argument is more reliable selector and should always be prefered. Some FTDI devices support several interfaces/ports (such as FT2232H, FT4232H and FT4232HA). The interface argument selects the FTDI port to use, starting from 1 (not 0). :param devdesc: Device descriptor that identifies the device by constraints. :return: PyUSB device instance """ with cls.Lock: if devdesc.index or devdesc.sn or devdesc.description: dev = None if not devdesc.vid: raise ValueError('Vendor identifier is required') devs = cls._find_devices(devdesc.vid, devdesc.pid) if devdesc.description: devs = [dev for dev in devs if UsbTools.get_string(dev, dev.iProduct) == devdesc.description] if devdesc.sn: devs = [dev for dev in devs if UsbTools.get_string(dev, dev.iSerialNumber) == devdesc.sn] if devdesc.bus is not None and devdesc.address is not None: devs = [dev for dev in devs if (devdesc.bus == dev.bus and devdesc.address == dev.address)] if isinstance(devs, set): # there is no guarantee the same index with lead to the # same device. Indexing should be reworked devs = list(devs) try: dev = devs[devdesc.index or 0] except IndexError as exc: raise IOError("No such device") from exc else: devs = cls._find_devices(devdesc.vid, devdesc.pid) dev = list(devs)[0] if devs else None if not dev: raise IOError('Device not found') try: devkey = (dev.bus, dev.address, devdesc.vid, devdesc.pid) if None in devkey[0:2]: raise AttributeError('USB backend does not support bus ' 'enumeration') except AttributeError: devkey = (devdesc.vid, devdesc.pid) if devkey not in cls.Devices: # only change the active configuration if the active one is # not the first. This allows other libusb sessions running # with the same device to run seamlessly. try: config = dev.get_active_configuration() setconf = config.bConfigurationValue != 1 except USBError: setconf = True if setconf: try: dev.set_configuration() except USBError: pass cls.Devices[devkey] = [dev, 1] else: cls.Devices[devkey][1] += 1 return cls.Devices[devkey][0] @classmethod def release_device(cls, usb_dev: UsbDevice): """Release a previously open device, if it not used anymore. :param usb_dev: a previously instanciated USB device instance """ # Lookup for ourselves in the class dictionary with cls.Lock: # pylint: disable=unnecessary-dict-index-lookup for devkey, (dev, refcount) in cls.Devices.items(): if dev == usb_dev: # found if refcount > 1: # another interface is open, decrement cls.Devices[devkey][1] -= 1 else: # last interface in use, release dispose_resources(cls.Devices[devkey][0]) del cls.Devices[devkey] break @classmethod def release_all_devices(cls, devclass: Optional[Type] = None) -> int: """Release all open devices. :param devclass: optional class to only release devices of one type :return: the count of device that have been released. """ with cls.Lock: remove_devs = set() # pylint: disable=consider-using-dict-items for devkey in cls.Devices: if devclass: dev = cls._get_backend_device(cls.Devices[devkey][0]) if dev is None or not isinstance(dev, devclass): continue dispose_resources(cls.Devices[devkey][0]) remove_devs.add(devkey) for devkey in remove_devs: del cls.Devices[devkey] return len(remove_devs) @classmethod def list_devices(cls, urlstr: str, vdict: Dict[str, int], pdict: Dict[int, Dict[str, int]], default_vendor: int) -> \ List[Tuple[UsbDeviceDescriptor, int]]: """List candidates that match the device URL pattern. :see: :py:meth:`show_devices` to generate the URLs from the candidates list :param url: the URL to parse :param vdict: vendor name map of USB vendor ids :param pdict: vendor id map of product name map of product ids :param default_vendor: default vendor id :return: list of (UsbDeviceDescriptor, interface) """ urlparts = urlsplit(urlstr) if not urlparts.path: raise UsbToolsError('URL string is missing device port') candidates, _ = cls.enumerate_candidates(urlparts, vdict, pdict, default_vendor) return candidates @classmethod def parse_url(cls, urlstr: str, scheme: str, vdict: Dict[str, int], pdict: Dict[int, Dict[str, int]], default_vendor: int) -> Tuple[UsbDeviceDescriptor, int]: """Parse a device specifier URL. :param url: the URL to parse :param scheme: scheme to match in the URL string (scheme://...) :param vdict: vendor name map of USB vendor ids :param pdict: vendor id map of product name map of product ids :param default_vendor: default vendor id :return: UsbDeviceDescriptor, interface ..note: URL syntax: protocol://vendor:product[:serial|:index|:bus:addr]/interface """ urlparts = urlsplit(urlstr) if scheme != urlparts.scheme: raise UsbToolsError(f'Invalid URL: {urlstr}') try: if not urlparts.path: raise UsbToolsError('URL string is missing device port') path = urlparts.path.strip('/') if path == '?' or (not path and urlstr.endswith('?')): report_devices = True interface = -1 else: interface = to_int(path) report_devices = False except (IndexError, ValueError) as exc: raise UsbToolsError(f'Invalid device URL: {urlstr}') from exc candidates, idx = cls.enumerate_candidates(urlparts, vdict, pdict, default_vendor) if report_devices: UsbTools.show_devices(scheme, vdict, pdict, candidates) raise SystemExit(candidates and 'Please specify the USB device' or 'No USB-Serial device has been detected') if idx is None: if len(candidates) > 1: raise UsbToolsError(f"{len(candidates)} USB devices match URL " f"'{urlstr}'") idx = 0 try: desc, _ = candidates[idx] vendor, product = desc[:2] except IndexError: raise UsbToolsError(f'No USB device matches URL {urlstr}') \ from None if not vendor: cvendors = {candidate[0] for candidate in candidates} if len(cvendors) == 1: vendor = cvendors.pop() if vendor not in pdict: vstr = '0x{vendor:04x}' if vendor is not None else '?' raise UsbToolsError(f'Vendor ID {vstr} not supported') if not product: cproducts = {candidate[1] for candidate in candidates if candidate[0] == vendor} if len(cproducts) == 1: product = cproducts.pop() if product not in pdict[vendor].values(): pstr = '0x{vendor:04x}' if product is not None else '?' raise UsbToolsError(f'Product ID {pstr} not supported') devdesc = UsbDeviceDescriptor(vendor, product, desc.bus, desc.address, desc.sn, idx, desc.description) return devdesc, interface @classmethod def enumerate_candidates(cls, urlparts: SplitResult, vdict: Dict[str, int], pdict: Dict[int, Dict[str, int]], default_vendor: int) -> \ Tuple[List[Tuple[UsbDeviceDescriptor, int]], Optional[int]]: """Enumerate USB device URLs that match partial URL and VID/PID criteria. :param urlpart: splitted device specifier URL :param vdict: vendor name map of USB vendor ids :param pdict: vendor id map of product name map of product ids :param default_vendor: default vendor id :return: list of (usbdev, iface), parsed index if any """ specifiers = urlparts.netloc.split(':') plcomps = specifiers + [''] * 2 try: plcomps[0] = vdict.get(plcomps[0], plcomps[0]) if plcomps[0]: vendor = to_int(plcomps[0]) else: vendor = None product_ids = pdict.get(vendor, None) if not product_ids: product_ids = pdict[default_vendor] plcomps[1] = product_ids.get(plcomps[1], plcomps[1]) if plcomps[1]: try: product = to_int(plcomps[1]) except ValueError as exc: raise UsbToolsError(f'Product {plcomps[1]} is not ' f'referenced') from exc else: product = None except (IndexError, ValueError) as exc: raise UsbToolsError(f'Invalid device URL: ' f'{urlunsplit(urlparts)}') from exc sernum = None idx = None bus = None address = None locators = specifiers[2:] if len(locators) > 1: try: bus = int(locators[0], 16) address = int(locators[1], 16) except ValueError as exc: raise UsbToolsError(f'Invalid bus/address: ' f'{":".join(locators)}') from exc else: if locators and locators[0]: try: devidx = to_int(locators[0]) if devidx > 255: raise ValueError() idx = devidx if idx: idx = devidx-1 except ValueError: sernum = locators[0] candidates = [] vendors = [vendor] if vendor else set(vdict.values()) vps = set() for vid in vendors: products = pdict.get(vid, []) for pid in products: vps.add((vid, products[pid])) devices = cls.find_all(vps) if sernum: if sernum not in [dev.sn for dev, _ in devices]: raise UsbToolsError(f'No USB device with S/N {sernum}') for desc, ifcount in devices: if vendor and vendor != desc.vid: continue if product and product != desc.pid: continue if sernum and sernum != desc.sn: continue if bus is not None: if bus != desc.bus or address != desc.address: continue candidates.append((desc, ifcount)) return candidates, idx @classmethod def show_devices(cls, scheme: str, vdict: Dict[str, int], pdict: Dict[int, Dict[str, int]], devdescs: Sequence[Tuple[UsbDeviceDescriptor, int]], out: Optional[TextIO] = None): """Show supported devices. When the joker url ``scheme://*/?`` is specified as an URL, it generates a list of connected USB devices that match the supported USB devices. It can be used to provide the end-user with a list of valid URL schemes. :param scheme: scheme to match in the URL string (scheme://...) :param vdict: vendor name map of USB vendor ids :param pdict: vendor id map of product name map of product ids :param devdescs: candidate devices :param out: output stream, none for stdout """ if not devdescs: return if not out: out = sys.stdout devstrs = cls.build_dev_strings(scheme, vdict, pdict, devdescs) max_url_len = max(len(url) for url, _ in devstrs) print('Available interfaces:', file=out) for url, desc in devstrs: print(f' {url:{max_url_len}s} {desc}', file=out) print('', file=out) @classmethod def build_dev_strings(cls, scheme: str, vdict: Dict[str, int], pdict: Dict[int, Dict[str, int]], devdescs: Sequence[Tuple[UsbDeviceDescriptor, int]]) -> \ List[Tuple[str, str]]: """Build URL and device descriptors from UsbDeviceDescriptors. :param scheme: protocol part of the URLs to generate :param vdict: vendor name map of USB vendor ids :param pdict: vendor id map of product name map of product ids :param devdescs: USB devices and interfaces :return: list of (url, descriptors) """ indices = {} # Dict[Tuple[int, int], int] descs = [] for desc, ifcount in sorted(devdescs): ikey = (desc.vid, desc.pid) indices[ikey] = indices.get(ikey, 0) + 1 # try to find a matching string for the current vendor vendors = [] # fallback if no matching string for the current vendor is found vendor = f'{desc.vid:04x}' for vidc in vdict: if vdict[vidc] == desc.vid: vendors.append(vidc) if vendors: vendors.sort(key=len) vendor = vendors[0] # try to find a matching string for the current vendor # fallback if no matching string for the current product is found product = f'{desc.pid:04x}' try: products = [] productids = pdict[desc.vid] for prdc in productids: if productids[prdc] == desc.pid: products.append(prdc) if products: product = products[0] except KeyError: pass for port in range(1, ifcount+1): fmt = '%s://%s/%d' parts = [vendor, product] sernum = desc.sn if not sernum: sernum = '' if [c for c in sernum if c not in printablechars or c == '?']: serial = f'{indices[ikey]}' else: serial = sernum if serial: parts.append(serial) elif desc.bus is not None and desc.address is not None: parts.append(f'{desc.bus:x}') parts.append(f'{desc.address:x}') # the description may contain characters that cannot be # emitted in the output stream encoding format try: url = fmt % (scheme, ':'.join(parts), port) except Exception: url = fmt % (scheme, ':'.join([vendor, product, '???']), port) try: if desc.description: description = f'({desc.description})' else: description = '' except Exception: description = '' descs.append((url, description)) return descs @classmethod def get_string(cls, device: UsbDevice, stridx: int) -> str: """Retrieve a string from the USB device, dealing with PyUSB API breaks :param device: USB device instance :param stridx: the string identifier :return: the string read from the USB device """ if cls.UsbApi is None: # pylint: disable=import-outside-toplevel import inspect args, _, _, _ = \ inspect.signature(UsbDevice.read).parameters if (len(args) >= 3) and args[1] == 'length': cls.UsbApi = 1 else: cls.UsbApi = 2 try: if cls.UsbApi == 2: return usb_get_string(device, stridx) return usb_get_string(device, 64, stridx) except UnicodeDecodeError: # do not abort if EEPROM data is somewhat incoherent return '' @classmethod def find_backend(cls) -> IBackend: """Try to find and load an PyUSB backend. ..note:: There is no need to call this method for regular usage. :return: PyUSB backend """ with cls.Lock: return cls._load_backend() @classmethod def _find_devices(cls, vendor: int, product: int, nocache: bool = False) -> Set[UsbDevice]: """Find a USB device and return it. This code re-implements the usb.core.find() method using a local cache to avoid calling several times the underlying LibUSB and the system USB calls to enumerate the available USB devices. As these calls are time-hungry (about 1 second/call), the enumerated devices are cached. It consumes a bit more memory but dramatically improves start-up time. Hopefully, this kludge is temporary and replaced with a better implementation from PyUSB at some point. :param vendor: USB vendor id :param product: USB product id :param bool nocache: bypass cache to re-enumerate USB devices on the host :return: a set of USB device matching the vendor/product identifier pair """ backend = cls._load_backend() vidpid = (vendor, product) if nocache or (vidpid not in cls.UsbDevices): # not freed until Python runtime completion # enumerate_devices returns a generator, so back up the # generated device into a list. To save memory, we only # back up the supported devices devs = set() vpdict = {} # Dict[int, List[int]] vpdict.setdefault(vendor, []) vpdict[vendor].append(product) for dev in backend.enumerate_devices(): # pylint: disable=no-member device = UsbDevice(dev, backend) if device.idVendor in vpdict: products = vpdict[device.idVendor] if products and (device.idProduct not in products): continue devs.add(device) if sys.platform == 'win32': # ugly kludge for a boring OS: # on Windows, the USB stack may enumerate the very same # devices several times: a real device with N interface # appears also as N device with as single interface. # We only keep the "device" that declares the most # interface count and discard the "virtual" ones. filtered_devs = {} for dev in devs: vid = dev.idVendor pid = dev.idProduct ifc = max(cfg.bNumInterfaces for cfg in dev) k = (vid, pid, dev.bus, dev.address) if k not in filtered_devs: filtered_devs[k] = dev else: fdev = filtered_devs[k] fifc = max(cfg.bNumInterfaces for cfg in fdev) if fifc < ifc: filtered_devs[k] = dev devs = set(filtered_devs.values()) cls.UsbDevices[vidpid] = devs return cls.UsbDevices[vidpid] @classmethod def _get_backend_device(cls, device: UsbDevice) -> Any: """Return the backend implementation of a device. :param device: the UsbDevice (usb.core.Device) :return: the implementation of any """ try: # pylint: disable=protected-access # need to access private member _ctx of PyUSB device # (resource manager) until PyUSB #302 is addressed return device._ctx.dev # pylint: disable=protected-access except AttributeError: return None @classmethod def _load_backend(cls) -> IBackend: backend = None # Optional[IBackend] for candidate in cls.BACKENDS: mod = import_module(candidate) backend = mod.get_backend() if backend is not None: return backend raise ValueError('No backend available')