abschlussarbeit/_trash/lib/python3.11/site-packages/pyftdi/tracer.py

489 lines
17 KiB
Python

# Copyright (c) 2017-2024, Emmanuel Blot <emmanuel.blot@free.fr>
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""MPSSE command debug tracer."""
# pylint: disable=missing-docstring
from binascii import hexlify
from collections import deque
from importlib import import_module
from inspect import currentframe
from logging import getLogger
from string import ascii_uppercase
from struct import unpack as sunpack
from sys import modules
from typing import Union
class FtdiMpsseTracer:
"""FTDI MPSSE protocol decoder."""
MPSSE_ENGINES = {
0x0200: 0,
0x0400: 0,
0x0500: 0,
0x0600: 0,
0x0700: 2,
0x0800: 2,
0x0900: 1,
0x1000: 0,
0x3600: 2}
"""Count of MPSSE engines."""
def __init__(self, version):
count = self.MPSSE_ENGINES[version]
self._engines = [None] * count
def send(self, iface: int, buf: Union[bytes, bytearray]) -> None:
self._get_engine(iface).send(buf)
def receive(self, iface: int, buf: Union[bytes, bytearray]) -> None:
self._get_engine(iface).receive(buf)
def _get_engine(self, iface: int):
iface -= 1
try:
self._engines[iface]
except IndexError as exc:
raise ValueError(f'No MPSSE engine available on interface '
f'{iface}') from exc
if not self._engines[iface]:
self._engines[iface] = FtdiMpsseEngine(iface)
return self._engines[iface]
class FtdiMpsseEngine:
"""FTDI MPSSE virtual engine
Far from being complete for now
"""
COMMAND_PREFIX = \
'GET SET READ WRITE RW ENABLE DISABLE CLK LOOPBACK SEND DRIVE'
ST_IDLE = range(1)
def __init__(self, iface: int):
self.log = getLogger('pyftdi.mpsse.tracer')
self._if = iface
self._trace_tx = bytearray()
self._trace_rx = bytearray()
self._state = self.ST_IDLE
self._clkdiv5 = False
self._cmd_decoded = True
self._resp_decoded = True
self._last_codes = deque()
self._expect_resp = deque() # positive: byte, negative: bit count
self._commands = self._build_commands()
def send(self, buf: Union[bytes, bytearray]) -> None:
self._trace_tx.extend(buf)
while self._trace_tx:
try:
code = self._trace_tx[0]
cmd = self._commands[code]
if self._cmd_decoded:
self.log.debug('[%d]:[Command: %02X: %s]',
self._if, code, cmd)
cmd_decoder = getattr(self, f'_cmd_{cmd.lower()}')
rdepth = len(self._expect_resp)
try:
self._cmd_decoded = cmd_decoder()
except AttributeError as exc:
raise ValueError(str(exc)) from exc
if len(self._expect_resp) > rdepth:
self._last_codes.append(code)
if self._cmd_decoded:
continue
# not enough data in buffer to decode a whole command
return
except IndexError:
self.log.warning('[%d]:Empty buffer on %02X: %s',
self._if, code, cmd)
except KeyError:
self.log.warning('[%d]:Unknown command code: %02X',
self._if, code)
except AttributeError:
self.log.warning('[%d]:Decoder for command %s [%02X] is not '
'implemented', self._if, cmd, code)
except ValueError as exc:
self.log.warning('[%d]:Decoder for command %s [%02X] failed: '
'%s', self._if, cmd, code, exc)
# on error, flush all buffers
self.log.warning('Flush TX/RX buffers')
self._trace_tx = bytearray()
self._trace_rx = bytearray()
self._last_codes.clear()
def receive(self, buf: Union[bytes, bytearray]) -> None:
self.log.info(' .. %s', hexlify(buf).decode())
self._trace_rx.extend(buf)
while self._trace_rx:
code = None
try:
code = self._last_codes.popleft()
cmd = self._commands[code]
resp_decoder = getattr(self, f'_resp_{cmd.lower()}')
self._resp_decoded = resp_decoder()
if self._resp_decoded:
continue
# not enough data in buffer to decode a whole response
return
except IndexError:
self.log.warning('[%d]:Empty buffer', self._if)
except KeyError:
self.log.warning('[%d]:Unknown command code: %02X',
self._if, code)
except AttributeError:
self.log.warning('[%d]:Decoder for response %s [%02X] is not '
'implemented', self._if, cmd, code)
# on error, flush RX buffer
self.log.warning('[%d]:Flush RX buffer', self._if)
self._trace_rx = bytearray()
self._last_codes.clear()
@classmethod
def _build_commands(cls):
# pylint: disable=no-self-argument
commands = {}
fdti_mod_name = 'pyftdi.ftdi'
ftdi_mod = modules.get(fdti_mod_name)
if not ftdi_mod:
ftdi_mod = import_module(fdti_mod_name)
ftdi_type = getattr(ftdi_mod, 'Ftdi')
for cmd in dir(ftdi_type):
if cmd[0] not in ascii_uppercase:
continue
value = getattr(ftdi_type, cmd)
if not isinstance(value, int):
continue
family = cmd.split('_')[0]
# pylint: disable=no-member
if family not in cls.COMMAND_PREFIX.split():
continue
commands[value] = cmd
return commands
def _cmd_enable_clk_div5(self):
self.log.info(' [%d]:Enable clock divisor /5', self._if)
self._clkdiv5 = True
self._trace_tx[:] = self._trace_tx[1:]
return True
def _cmd_disable_clk_div5(self):
self.log.info(' [%d]:Disable clock divisor /5', self._if)
self._clkdiv5 = False
self._trace_tx[:] = self._trace_tx[1:]
return True
def _cmd_set_tck_divisor(self):
if len(self._trace_tx) < 3:
return False
value, = sunpack('<H', self._trace_tx[1:3])
base = 12E6 if self._clkdiv5 else 60E6
freq = base / ((1 + value) * 2)
self.log.info(' [%d]:Set frequency %.3fMHZ', self._if, freq/1E6)
self._trace_tx[:] = self._trace_tx[3:]
return True
def _cmd_loopback_end(self):
self.log.info(' [%d]:Disable loopback', self._if)
self._trace_tx[:] = self._trace_tx[1:]
return True
def _cmd_enable_clk_adaptive(self):
self.log.info(' [%d]:Enable adaptive clock', self._if)
self._trace_tx[:] = self._trace_tx[1:]
return True
def _cmd_disable_clk_adaptive(self):
self.log.info(' [%d]:Disable adaptive clock', self._if)
self._trace_tx[:] = self._trace_tx[1:]
return True
def _cmd_enable_clk_3phase(self):
self.log.info(' [%d]:Enable 3-phase clock', self._if)
self._trace_tx[:] = self._trace_tx[1:]
return True
def _cmd_disable_clk_3phase(self):
self.log.info(' [%d]:Disable 3-phase clock', self._if)
self._trace_tx[:] = self._trace_tx[1:]
return True
def _cmd_drive_zero(self):
if len(self._trace_tx) < 3:
return False
value, = sunpack('H', self._trace_tx[1:3])
self.log.info(' [%d]:Open collector [15:0] %04x %s',
self._if, value, self.bitfmt(value, 16))
self._trace_tx[:] = self._trace_tx[3:]
return True
def _cmd_send_immediate(self):
self.log.debug(' [%d]:Send immediate', self._if)
self._trace_tx[:] = self._trace_tx[1:]
return True
def _cmd_get_bits_low(self):
self._trace_tx[:] = self._trace_tx[1:]
self._expect_resp.append(1)
return True
def _cmd_get_bits_high(self):
self._trace_tx[:] = self._trace_tx[1:]
self._expect_resp.append(1)
return True
def _cmd_set_bits_low(self):
if len(self._trace_tx) < 3:
return False
value, direction = sunpack('BB', self._trace_tx[1:3])
self.log.info(' [%d]:Set gpio[7:0] %02x %s',
self._if, value, self.bm2str(value, direction))
self._trace_tx[:] = self._trace_tx[3:]
return True
def _cmd_set_bits_high(self):
if len(self._trace_tx) < 3:
return False
value, direction = sunpack('BB', self._trace_tx[1:3])
self.log.info(' [%d]:Set gpio[15:8] %02x %s',
self._if, value, self.bm2str(value, direction))
self._trace_tx[:] = self._trace_tx[3:]
return True
def _cmd_write_bytes_pve_msb(self):
return self._decode_output_mpsse_bytes(currentframe().f_code.co_name)
def _cmd_write_bytes_nve_msb(self):
return self._decode_output_mpsse_bytes(currentframe().f_code.co_name)
def _cmd_write_bytes_pve_lsb(self):
return self._decode_output_mpsse_bytes(currentframe().f_code.co_name)
def _cmd_write_bytes_nve_lsb(self):
return self._decode_output_mpsse_bytes(currentframe().f_code.co_name)
def _cmd_read_bytes_pve_msb(self):
return self._decode_input_mpsse_byte_request()
def _resp_read_bytes_pve_msb(self):
return self._decode_input_mpsse_bytes(currentframe().f_code.co_name)
def _cmd_read_bytes_nve_msb(self):
return self._decode_input_mpsse_byte_request()
def _resp_read_bytes_nve_msb(self):
return self._decode_input_mpsse_bytes(currentframe().f_code.co_name)
def _cmd_read_bytes_pve_lsb(self):
return self._decode_input_mpsse_byte_request()
def _resp_read_bytes_pve_lsb(self):
return self._decode_input_mpsse_bytes(currentframe().f_code.co_name)
def _cmd_read_bytes_nve_lsb(self):
return self._decode_input_mpsse_byte_request()
def _resp_read_bytes_nve_lsb(self):
return self._decode_input_mpsse_bytes(currentframe().f_code.co_name)
def _cmd_rw_bytes_nve_pve_msb(self):
return self._decode_output_mpsse_bytes(currentframe().f_code.co_name,
True)
def _resp_rw_bytes_nve_pve_msb(self):
return self._decode_input_mpsse_bytes(currentframe().f_code.co_name)
def _cmd_rw_bytes_pve_nve_msb(self):
return self._decode_output_mpsse_bytes(currentframe().f_code.co_name,
True)
def _resp_rw_bytes_pve_nve_msb(self):
return self._decode_input_mpsse_bytes(currentframe().f_code.co_name)
def _cmd_write_bits_pve_msb(self):
return self._decode_output_mpsse_bits(currentframe().f_code.co_name)
def _cmd_write_bits_nve_msb(self):
return self._decode_output_mpsse_bits(currentframe().f_code.co_name)
def _cmd_write_bits_pve_lsb(self):
return self._decode_output_mpsse_bits(currentframe().f_code.co_name)
def _cmd_write_bits_nve_lsb(self):
return self._decode_output_mpsse_bits(currentframe().f_code.co_name)
def _cmd_read_bits_pve_msb(self):
return self._decode_input_mpsse_bit_request()
def _resp_read_bits_pve_msb(self):
return self._decode_input_mpsse_bits(currentframe().f_code.co_name)
def _cmd_read_bits_nve_msb(self):
return self._decode_input_mpsse_bit_request()
def _resp_read_bits_nve_msb(self):
return self._decode_input_mpsse_bits(currentframe().f_code.co_name)
def _cmd_read_bits_pve_lsb(self):
return self._decode_input_mpsse_bit_request()
def _resp_read_bits_pve_lsb(self):
return self._decode_input_mpsse_bits(currentframe().f_code.co_name)
def _cmd_read_bits_nve_lsb(self):
return self._decode_input_mpsse_bit_request()
def _resp_read_bits_nve_lsb(self):
return self._decode_input_mpsse_bits(currentframe().f_code.co_name)
def _cmd_rw_bits_nve_pve_msb(self):
return self._decode_output_mpsse_bits(currentframe().f_code.co_name,
True)
def _resp_rw_bits_nve_pve_msb(self):
return self._decode_input_mpsse_bits(currentframe().f_code.co_name)
def _cmd_rw_bits_pve_nve_msb(self):
return self._decode_output_mpsse_bits(currentframe().f_code.co_name,
True)
def _resp_rw_bits_pve_nve_msb(self):
return self._decode_input_mpsse_bits(currentframe().f_code.co_name)
def _resp_get_bits_low(self):
if self._trace_rx:
return False
value = self._trace_rx[0]
self.log.info(' [%d]:Get gpio[7:0] %02x %s',
self._if, value, self.bm2str(value, 0xFF))
self._trace_rx[:] = self._trace_rx[1:]
return True
def _resp_get_bits_high(self):
if self._trace_rx:
return False
value = self._trace_rx[0]
self.log.info(' [%d]:Get gpio[15:8] %02x %s',
self._if, value, self.bm2str(value, 0xFF))
self._trace_rx[:] = self._trace_rx[1:]
return True
def _decode_output_mpsse_bytes(self, caller, expect_rx=False):
if len(self._trace_tx) < 4:
return False
length = sunpack('<H', self._trace_tx[1:3])[0] + 1
if len(self._trace_tx) < 4 + length:
return False
if expect_rx:
self._expect_resp.append(length)
payload = self._trace_tx[3:3+length]
funcname = caller[5:].title().replace('_', '')
self.log.info(' [%d]:%s> (%d) %s',
self._if, funcname, length,
hexlify(payload).decode('utf8'))
self._trace_tx[:] = self._trace_tx[3+length:]
return True
def _decode_output_mpsse_bits(self, caller, expect_rx=False):
if len(self._trace_tx) < 3:
return False
bitlen = self._trace_tx[1] + 1
if expect_rx:
self._expect_resp.append(-bitlen)
payload = self._trace_tx[2]
funcname = caller[5:].title().replace('_', '')
msb = caller[5:][-3].lower() == 'm'
self.log.info(' %s> (%d) %s',
funcname, bitlen, self.bit2str(payload, bitlen, msb))
self._trace_tx[:] = self._trace_tx[3:]
return True
def _decode_input_mpsse_byte_request(self):
if len(self._trace_tx) < 3:
return False
length = sunpack('<H', self._trace_tx[1:3])[0] + 1
self._expect_resp.append(length)
self._trace_tx[:] = self._trace_tx[3:]
return True
def _decode_input_mpsse_bit_request(self):
if len(self._trace_tx) < 2:
return False
bitlen = self._trace_tx[1] + 1
self._expect_resp.append(-bitlen)
self._trace_tx[:] = self._trace_tx[2:]
return True
def _decode_input_mpsse_bytes(self, caller):
if not self._expect_resp:
self.log.warning('[%d]:Response w/o request?', self._if)
return False
if self._expect_resp[0] < 0:
self.log.warning('[%d]:Handling byte request w/ bit length',
self._if)
return False
if len(self._trace_rx) < self._expect_resp[0]: # peek
return False
length = self._expect_resp.popleft()
payload = self._trace_rx[:length]
self._trace_rx[:] = self._trace_rx[length:]
funcname = caller[5:].title().replace('_', '')
self.log.info(' %s< (%d) %s',
funcname, length, hexlify(payload).decode('utf8'))
return True
def _decode_input_mpsse_bits(self, caller):
if not self._expect_resp:
self.log.warning('[%d]:Response w/o request?', self._if)
return False
if not self._trace_rx: # peek
return False
if self._expect_resp[0] > 0:
self.log.warning('[%d]:Handling bit request w/ byte length',
self._if)
bitlen = -self._expect_resp.popleft()
payload = self._trace_rx[0]
self._trace_rx[:] = self._trace_rx[1:]
funcname = caller[5:].title().replace('_', '')
msb = caller[5:][-3].lower() == 'm'
self.log.info(' %s< (%d) %s',
funcname, bitlen, self.bit2str(payload, bitlen, msb))
return True
@classmethod
def bit2str(cls, value: int, count: int, msb: bool, hiz: str = '_') -> str:
mask = (1 << count) - 1
if msb:
mask <<= 8 - count
return cls.bm2str(value, mask, hiz)
@classmethod
def bm2str(cls, value: int, mask: int, hiz: str = '_') -> str:
vstr = cls.bitfmt(value, 8)
mstr = cls.bitfmt(mask, 8)
return ''.join([m == '1' and v or hiz for v, m in zip(vstr, mstr)])
@classmethod
def bitfmt(cls, value, width):
return format(value, f'0{width}b')
# rw_bytes_pve_pve_lsb
# rw_bytes_pve_nve_lsb
# rw_bytes_nve_pve_lsb
# rw_bytes_nve_nve_lsb
# rw_bits_pve_pve_lsb
# rw_bits_pve_nve_lsb
# rw_bits_nve_pve_lsb
# rw_bits_nve_nve_lsb
# write_bits_tms_pve
# write_bits_tms_nve
# rw_bits_tms_pve_pve
# rw_bits_tms_nve_pve
# rw_bits_tms_pve_nve
# rw_bits_tms_nve_nve