# Copyright (c) 2017-2024, Emmanuel Blot # All rights reserved. # # SPDX-License-Identifier: BSD-3-Clause """MPSSE command debug tracer.""" # pylint: disable=missing-docstring from binascii import hexlify from collections import deque from importlib import import_module from inspect import currentframe from logging import getLogger from string import ascii_uppercase from struct import unpack as sunpack from sys import modules from typing import Union class FtdiMpsseTracer: """FTDI MPSSE protocol decoder.""" MPSSE_ENGINES = { 0x0200: 0, 0x0400: 0, 0x0500: 0, 0x0600: 0, 0x0700: 2, 0x0800: 2, 0x0900: 1, 0x1000: 0, 0x3600: 2} """Count of MPSSE engines.""" def __init__(self, version): count = self.MPSSE_ENGINES[version] self._engines = [None] * count def send(self, iface: int, buf: Union[bytes, bytearray]) -> None: self._get_engine(iface).send(buf) def receive(self, iface: int, buf: Union[bytes, bytearray]) -> None: self._get_engine(iface).receive(buf) def _get_engine(self, iface: int): iface -= 1 try: self._engines[iface] except IndexError as exc: raise ValueError(f'No MPSSE engine available on interface ' f'{iface}') from exc if not self._engines[iface]: self._engines[iface] = FtdiMpsseEngine(iface) return self._engines[iface] class FtdiMpsseEngine: """FTDI MPSSE virtual engine Far from being complete for now """ COMMAND_PREFIX = \ 'GET SET READ WRITE RW ENABLE DISABLE CLK LOOPBACK SEND DRIVE' ST_IDLE = range(1) def __init__(self, iface: int): self.log = getLogger('pyftdi.mpsse.tracer') self._if = iface self._trace_tx = bytearray() self._trace_rx = bytearray() self._state = self.ST_IDLE self._clkdiv5 = False self._cmd_decoded = True self._resp_decoded = True self._last_codes = deque() self._expect_resp = deque() # positive: byte, negative: bit count self._commands = self._build_commands() def send(self, buf: Union[bytes, bytearray]) -> None: self._trace_tx.extend(buf) while self._trace_tx: try: code = self._trace_tx[0] cmd = self._commands[code] if self._cmd_decoded: self.log.debug('[%d]:[Command: %02X: %s]', self._if, code, cmd) cmd_decoder = getattr(self, f'_cmd_{cmd.lower()}') rdepth = len(self._expect_resp) try: self._cmd_decoded = cmd_decoder() except AttributeError as exc: raise ValueError(str(exc)) from exc if len(self._expect_resp) > rdepth: self._last_codes.append(code) if self._cmd_decoded: continue # not enough data in buffer to decode a whole command return except IndexError: self.log.warning('[%d]:Empty buffer on %02X: %s', self._if, code, cmd) except KeyError: self.log.warning('[%d]:Unknown command code: %02X', self._if, code) except AttributeError: self.log.warning('[%d]:Decoder for command %s [%02X] is not ' 'implemented', self._if, cmd, code) except ValueError as exc: self.log.warning('[%d]:Decoder for command %s [%02X] failed: ' '%s', self._if, cmd, code, exc) # on error, flush all buffers self.log.warning('Flush TX/RX buffers') self._trace_tx = bytearray() self._trace_rx = bytearray() self._last_codes.clear() def receive(self, buf: Union[bytes, bytearray]) -> None: self.log.info(' .. %s', hexlify(buf).decode()) self._trace_rx.extend(buf) while self._trace_rx: code = None try: code = self._last_codes.popleft() cmd = self._commands[code] resp_decoder = getattr(self, f'_resp_{cmd.lower()}') self._resp_decoded = resp_decoder() if self._resp_decoded: continue # not enough data in buffer to decode a whole response return except IndexError: self.log.warning('[%d]:Empty buffer', self._if) except KeyError: self.log.warning('[%d]:Unknown command code: %02X', self._if, code) except AttributeError: self.log.warning('[%d]:Decoder for response %s [%02X] is not ' 'implemented', self._if, cmd, code) # on error, flush RX buffer self.log.warning('[%d]:Flush RX buffer', self._if) self._trace_rx = bytearray() self._last_codes.clear() @classmethod def _build_commands(cls): # pylint: disable=no-self-argument commands = {} fdti_mod_name = 'pyftdi.ftdi' ftdi_mod = modules.get(fdti_mod_name) if not ftdi_mod: ftdi_mod = import_module(fdti_mod_name) ftdi_type = getattr(ftdi_mod, 'Ftdi') for cmd in dir(ftdi_type): if cmd[0] not in ascii_uppercase: continue value = getattr(ftdi_type, cmd) if not isinstance(value, int): continue family = cmd.split('_')[0] # pylint: disable=no-member if family not in cls.COMMAND_PREFIX.split(): continue commands[value] = cmd return commands def _cmd_enable_clk_div5(self): self.log.info(' [%d]:Enable clock divisor /5', self._if) self._clkdiv5 = True self._trace_tx[:] = self._trace_tx[1:] return True def _cmd_disable_clk_div5(self): self.log.info(' [%d]:Disable clock divisor /5', self._if) self._clkdiv5 = False self._trace_tx[:] = self._trace_tx[1:] return True def _cmd_set_tck_divisor(self): if len(self._trace_tx) < 3: return False value, = sunpack(' (%d) %s', self._if, funcname, length, hexlify(payload).decode('utf8')) self._trace_tx[:] = self._trace_tx[3+length:] return True def _decode_output_mpsse_bits(self, caller, expect_rx=False): if len(self._trace_tx) < 3: return False bitlen = self._trace_tx[1] + 1 if expect_rx: self._expect_resp.append(-bitlen) payload = self._trace_tx[2] funcname = caller[5:].title().replace('_', '') msb = caller[5:][-3].lower() == 'm' self.log.info(' %s> (%d) %s', funcname, bitlen, self.bit2str(payload, bitlen, msb)) self._trace_tx[:] = self._trace_tx[3:] return True def _decode_input_mpsse_byte_request(self): if len(self._trace_tx) < 3: return False length = sunpack(' 0: self.log.warning('[%d]:Handling bit request w/ byte length', self._if) bitlen = -self._expect_resp.popleft() payload = self._trace_rx[0] self._trace_rx[:] = self._trace_rx[1:] funcname = caller[5:].title().replace('_', '') msb = caller[5:][-3].lower() == 'm' self.log.info(' %s< (%d) %s', funcname, bitlen, self.bit2str(payload, bitlen, msb)) return True @classmethod def bit2str(cls, value: int, count: int, msb: bool, hiz: str = '_') -> str: mask = (1 << count) - 1 if msb: mask <<= 8 - count return cls.bm2str(value, mask, hiz) @classmethod def bm2str(cls, value: int, mask: int, hiz: str = '_') -> str: vstr = cls.bitfmt(value, 8) mstr = cls.bitfmt(mask, 8) return ''.join([m == '1' and v or hiz for v, m in zip(vstr, mstr)]) @classmethod def bitfmt(cls, value, width): return format(value, f'0{width}b') # rw_bytes_pve_pve_lsb # rw_bytes_pve_nve_lsb # rw_bytes_nve_pve_lsb # rw_bytes_nve_nve_lsb # rw_bits_pve_pve_lsb # rw_bits_pve_nve_lsb # rw_bits_nve_pve_lsb # rw_bits_nve_nve_lsb # write_bits_tms_pve # write_bits_tms_nve # rw_bits_tms_pve_pve # rw_bits_tms_nve_pve # rw_bits_tms_pve_nve # rw_bits_tms_nve_nve