abschlussarbeit/_trash/lib/python3.11/site-packages/pyftdi/ftdi.py

2328 lines
94 KiB
Python

# Copyright (c) 2010-2024 Emmanuel Blot <emmanuel.blot@free.fr>
# Copyright (c) 2016 Emmanuel Bouaziz <ebouaziz@free.fr>
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""FTDI core driver."""
from binascii import hexlify
from collections import OrderedDict
from enum import IntEnum, unique
from errno import ENODEV
from logging import getLogger, DEBUG
from struct import unpack as sunpack
from sys import platform
from typing import Callable, Optional, List, Sequence, TextIO, Tuple, Union
from usb.core import (Configuration as UsbConfiguration, Device as UsbDevice,
USBError)
from usb.util import (build_request_type, release_interface, CTRL_IN, CTRL_OUT,
CTRL_TYPE_VENDOR, CTRL_RECIPIENT_DEVICE)
from .misc import to_bool
from .usbtools import UsbDeviceDescriptor, UsbTools
# pylint: disable=invalid-name
class FtdiError(IOError):
"""Base class error for all FTDI device"""
class FtdiFeatureError(FtdiError):
"""Requested feature is not available on FTDI device"""
class FtdiMpsseError(FtdiFeatureError):
"""MPSSE mode not supported on FTDI device"""
class FtdiEepromError(FtdiError):
"""FTDI EEPROM access errors"""
class Ftdi:
"""FTDI device driver"""
SCHEME = 'ftdi'
"""URL scheme for :py:class:`UsbTools`."""
FTDI_VENDOR = 0x403
"""USB VID for FTDI chips."""
VENDOR_IDS = {'ftdi': FTDI_VENDOR}
"""Supported vendors, only FTDI.
To add third parties vendors see :py:meth:`add_custom_vendor`.
"""
PRODUCT_IDS = {
FTDI_VENDOR: OrderedDict((
# use an ordered dict so that the first occurence of a PID takes
# precedence when generating URLs - order does matter.
('232', 0x6001),
('232r', 0x6001),
('232h', 0x6014),
('2232', 0x6010),
('2232c', 0x6010),
('2232d', 0x6010),
('2232h', 0x6010),
('4232', 0x6011),
('4232h', 0x6011),
('ft-x', 0x6015),
('230x', 0x6015),
('231x', 0x6015),
('234x', 0x6015),
('4232ha', 0x6048),
('ft232', 0x6001),
('ft232r', 0x6001),
('ft232h', 0x6014),
('ft2232', 0x6010),
('ft2232c', 0x6010),
('ft2232d', 0x6010),
('ft2232h', 0x6010),
('ft4232', 0x6011),
('ft4232h', 0x6011),
('ft230x', 0x6015),
('ft231x', 0x6015),
('ft234x', 0x6015),
('ft4232ha', 0x6048)))
}
"""Supported products, only FTDI officials ones.
To add third parties and customized products, see
:py:meth:`add_custom_product`.
"""
DEFAULT_VENDOR = FTDI_VENDOR
"""Default vendor: FTDI."""
DEVICE_NAMES = {
0x0200: 'ft232am',
0x0400: 'ft232bm',
0x0500: 'ft2232c',
0x0600: 'ft232r',
0x0700: 'ft2232h',
0x0800: 'ft4232h',
0x0900: 'ft232h',
0x1000: 'ft-x',
0x3600: 'ft4232ha'}
"""Common names of FTDI supported devices."""
# Note that the FTDI datasheets contradict themselves, so
# the following values may not be the right ones...
FIFO_SIZES = {
0x0200: (128, 128), # FT232AM: TX: 128, RX: 128
0x0400: (128, 384), # FT232BM: TX: 128, RX: 384
0x0500: (128, 384), # FT2232C: TX: 128, RX: 384
0x0600: (256, 128), # FT232R: TX: 256, RX: 128
0x0700: (4096, 4096), # FT2232H: TX: 4KiB, RX: 4KiB
0x0800: (2048, 2048), # FT4232H: TX: 2KiB, RX: 2KiB
0x0900: (1024, 1024), # FT232H: TX: 1KiB, RX: 1KiB
0x1000: (512, 512), # FT-X: TX: 512, RX: 512
0x3600: (2048, 2048), # FT4232HA: TX: 2KiB, RX: 2KiB
}
"""FTDI chip internal FIFO sizes
Note that 'TX' and 'RX' are inverted with the datasheet terminology:
Values here are seen from the host perspective, whereas datasheet
values are defined from the device perspective
"""
@unique
class BitMode(IntEnum):
"""Function selection."""
RESET = 0x00 # switch off altnerative mode (default to UART)
BITBANG = 0x01 # classical asynchronous bitbang mode
MPSSE = 0x02 # MPSSE mode, available on 2232x chips
SYNCBB = 0x04 # synchronous bitbang mode
MCU = 0x08 # MCU Host Bus Emulation mode,
OPTO = 0x10 # Fast Opto-Isolated Serial Interface Mode
CBUS = 0x20 # Bitbang on CBUS pins of R-type chips
SYNCFF = 0x40 # Single Channel Synchronous FIFO mode
# MPSSE Commands
WRITE_BYTES_PVE_MSB = 0x10
WRITE_BYTES_NVE_MSB = 0x11
WRITE_BITS_PVE_MSB = 0x12
WRITE_BITS_NVE_MSB = 0x13
WRITE_BYTES_PVE_LSB = 0x18
WRITE_BYTES_NVE_LSB = 0x19
WRITE_BITS_PVE_LSB = 0x1a
WRITE_BITS_NVE_LSB = 0x1b
READ_BYTES_PVE_MSB = 0x20
READ_BYTES_NVE_MSB = 0x24
READ_BITS_PVE_MSB = 0x22
READ_BITS_NVE_MSB = 0x26
READ_BYTES_PVE_LSB = 0x28
READ_BYTES_NVE_LSB = 0x2c
READ_BITS_PVE_LSB = 0x2a
READ_BITS_NVE_LSB = 0x2e
RW_BYTES_PVE_NVE_MSB = 0x31
RW_BYTES_NVE_PVE_MSB = 0x34
RW_BITS_PVE_NVE_MSB = 0x33
RW_BITS_NVE_PVE_MSB = 0x36
RW_BYTES_PVE_NVE_LSB = 0x39
RW_BYTES_NVE_PVE_LSB = 0x3c
RW_BITS_PVE_NVE_LSB = 0x3b
RW_BITS_NVE_PVE_LSB = 0x3e
WRITE_BITS_TMS_PVE = 0x4a
WRITE_BITS_TMS_NVE = 0x4b
RW_BITS_TMS_PVE_PVE = 0x6a
RW_BITS_TMS_PVE_NVE = 0x6b
RW_BITS_TMS_NVE_PVE = 0x6e
RW_BITS_TMS_NVE_NVE = 0x6f
SEND_IMMEDIATE = 0x87
WAIT_ON_HIGH = 0x88
WAIT_ON_LOW = 0x89
READ_SHORT = 0x90
READ_EXTENDED = 0x91
WRITE_SHORT = 0x92
WRITE_EXTENDED = 0x93
# -H series only
DISABLE_CLK_DIV5 = 0x8a
ENABLE_CLK_DIV5 = 0x8b
# Modem status
MODEM_CTS = 1 << 4 # Clear to send
MODEM_DSR = 1 << 5 # Data set ready
MODEM_RI = 1 << 6 # Ring indicator
MODEM_RLSD = 1 << 7 # Carrier detect
MODEM_DR = 1 << 8 # Data ready
MODEM_OE = 1 << 9 # Overrun error
MODEM_PE = 1 << 10 # Parity error
MODEM_FE = 1 << 11 # Framing error
MODEM_BI = 1 << 12 # Break interrupt
MODEM_THRE = 1 << 13 # Transmitter holding register
MODEM_TEMT = 1 << 14 # Transmitter empty
MODEM_RCVE = 1 << 15 # Error in RCVR FIFO
# FTDI MPSSE commands
SET_BITS_LOW = 0x80 # Change LSB GPIO output
SET_BITS_HIGH = 0x82 # Change MSB GPIO output
GET_BITS_LOW = 0x81 # Get LSB GPIO output
GET_BITS_HIGH = 0x83 # Get MSB GPIO output
LOOPBACK_START = 0x84 # Enable loopback
LOOPBACK_END = 0x85 # Disable loopback
SET_TCK_DIVISOR = 0x86 # Set clock
# -H series only
ENABLE_CLK_3PHASE = 0x8c # Enable 3-phase data clocking (I2C)
DISABLE_CLK_3PHASE = 0x8d # Disable 3-phase data clocking
CLK_BITS_NO_DATA = 0x8e # Allows JTAG clock to be output w/o data
CLK_BYTES_NO_DATA = 0x8f # Allows JTAG clock to be output w/o data
CLK_WAIT_ON_HIGH = 0x94 # Clock until GPIOL1 is high
CLK_WAIT_ON_LOW = 0x95 # Clock until GPIOL1 is low
ENABLE_CLK_ADAPTIVE = 0x96 # Enable JTAG adaptive clock for ARM
DISABLE_CLK_ADAPTIVE = 0x97 # Disable JTAG adaptive clock
CLK_COUNT_WAIT_ON_HIGH = 0x9c # Clock byte cycles until GPIOL1 is high
CLK_COUNT_WAIT_ON_LOW = 0x9d # Clock byte cycles until GPIOL1 is low
# FT232H only
DRIVE_ZERO = 0x9e # Drive-zero mode
# USB control requests
REQ_OUT = build_request_type(CTRL_OUT, CTRL_TYPE_VENDOR,
CTRL_RECIPIENT_DEVICE)
REQ_IN = build_request_type(CTRL_IN, CTRL_TYPE_VENDOR,
CTRL_RECIPIENT_DEVICE)
# Requests
SIO_REQ_RESET = 0x0 # Reset the port
SIO_REQ_SET_MODEM_CTRL = 0x1 # Set the modem control register
SIO_REQ_SET_FLOW_CTRL = 0x2 # Set flow control register
SIO_REQ_SET_BAUDRATE = 0x3 # Set baud rate
SIO_REQ_SET_DATA = 0x4 # Set the data characteristics of the port
SIO_REQ_POLL_MODEM_STATUS = 0x5 # Get line status
SIO_REQ_SET_EVENT_CHAR = 0x6 # Change event character
SIO_REQ_SET_ERROR_CHAR = 0x7 # Change error character
SIO_REQ_SET_LATENCY_TIMER = 0x9 # Change latency timer
SIO_REQ_GET_LATENCY_TIMER = 0xa # Get latency timer
SIO_REQ_SET_BITMODE = 0xb # Change bit mode
SIO_REQ_READ_PINS = 0xc # Read GPIO pin value (or "get bitmode")
# Eeprom requests
SIO_REQ_EEPROM = 0x90
SIO_REQ_READ_EEPROM = SIO_REQ_EEPROM + 0 # Read EEPROM content
SIO_REQ_WRITE_EEPROM = SIO_REQ_EEPROM + 1 # Write EEPROM content
SIO_REQ_ERASE_EEPROM = SIO_REQ_EEPROM + 2 # Erase EEPROM content
# Reset arguments
SIO_RESET_SIO = 0 # Reset device
SIO_RESET_PURGE_RX = 1 # Drain USB RX buffer (host-to-ftdi)
SIO_RESET_PURGE_TX = 2 # Drain USB TX buffer (ftdi-to-host)
# Flow control arguments
SIO_DISABLE_FLOW_CTRL = 0x0
SIO_RTS_CTS_HS = 0x1 << 8
SIO_DTR_DSR_HS = 0x2 << 8
SIO_XON_XOFF_HS = 0x4 << 8
SIO_SET_DTR_MASK = 0x1
SIO_SET_DTR_HIGH = SIO_SET_DTR_MASK | (SIO_SET_DTR_MASK << 8)
SIO_SET_DTR_LOW = 0x0 | (SIO_SET_DTR_MASK << 8)
SIO_SET_RTS_MASK = 0x2
SIO_SET_RTS_HIGH = SIO_SET_RTS_MASK | (SIO_SET_RTS_MASK << 8)
SIO_SET_RTS_LOW = 0x0 | (SIO_SET_RTS_MASK << 8)
# Parity bits
PARITY_NONE, PARITY_ODD, PARITY_EVEN, PARITY_MARK, PARITY_SPACE = range(5)
# Number of stop bits
STOP_BIT_1, STOP_BIT_15, STOP_BIT_2 = range(3)
# Number of bits
BITS_7, BITS_8 = [7+i for i in range(2)]
# Break type
BREAK_OFF, BREAK_ON = range(2)
# cts: Clear to send
# dsr: Data set ready
# ri: Ring indicator
# dcd: Data carrier detect
# dr: Data ready
# oe: Overrun error
# pe: Parity error
# fe: Framing error
# bi: Break interrupt
# thre: Transmitter holding register empty
# temt: Transmitter empty
# err: Error in RCVR FIFO
MODEM_STATUS = [('', '', '', '', 'cts', 'dsr', 'ri', 'dcd'),
('dr', 'overrun', 'parity', 'framing',
'break', 'thre', 'txe', 'rcve')]
ERROR_BITS = (0x00, 0x8E)
TX_EMPTY_BITS = 0x60
# Clocks and baudrates
BUS_CLOCK_BASE = 6.0E6 # 6 MHz
BUS_CLOCK_HIGH = 30.0E6 # 30 MHz
BAUDRATE_REF_BASE = int(3.0E6) # 3 MHz
BAUDRATE_REF_HIGH = int(12.0E6) # 12 MHz
BITBANG_BAUDRATE_RATIO_BASE = 16
BITBANG_BAUDRATE_RATIO_HIGH = 5
BAUDRATE_TOLERANCE = 3.0 # acceptable clock drift for UART, in %
FRAC_DIV_CODE = (0, 3, 2, 4, 1, 5, 6, 7)
# Latency
LATENCY_MIN = 1
LATENCY_MAX = 255
LATENCY_EEPROM_FT232R = 77
# EEPROM Properties
EXT_EEPROM_SIZES = (128, 256) # in bytes (93C66 seen as 93C56)
INT_EEPROMS = {
0x0600: 0x80, # FT232R: 128 bytes, 1024 bits
0x1000: 0x400 # FT230*X: 1KiB
}
def __init__(self):
self.log = getLogger('pyftdi.ftdi')
self._debug_log = False
self._usb_dev = None
self._usb_read_timeout = 5000
self._usb_write_timeout = 5000
self._baudrate = -1
self._readbuffer = bytearray()
self._readoffset = 0
self._readbuffer_chunksize = 4 << 10 # 4KiB
self._writebuffer_chunksize = 4 << 10 # 4KiB
self._max_packet_size = 0
self._interface = None
self._index = None
self._in_ep = None
self._out_ep = None
self._bitmode = Ftdi.BitMode.RESET
self._latency = 0
self._latency_count = 0
self._latency_min = self.LATENCY_MIN
self._latency_max = self.LATENCY_MAX
self._latency_threshold = None # disable dynamic latency
self._lineprop = 0
self._cbus_pins = (0, 0)
self._cbus_out = 0
self._tracer = None
# --- Public API -------------------------------------------------------
@classmethod
def create_from_url(cls, url: str) -> 'Ftdi':
"""Create an Ftdi instance from an URL
URL scheme: ftdi://[vendor[:product[:index|:serial]]]/interface
:param url: FTDI device selector
:return: a fresh, open Ftdi instance
"""
device = Ftdi()
device.open_from_url(url)
return device
@classmethod
def list_devices(cls, url: Optional[str] = None) -> \
List[Tuple[UsbDeviceDescriptor, int]]:
"""List of URLs of connected FTDI devices.
:param url: a pattern URL to restrict the search
:return: list of (UsbDeviceDescriptor, interface)
"""
return UsbTools.list_devices(url or 'ftdi:///?',
cls.VENDOR_IDS, cls.PRODUCT_IDS,
cls.DEFAULT_VENDOR)
@classmethod
def show_devices(cls, url: Optional[str] = None,
out: Optional[TextIO] = None) -> None:
"""Print the URLs and descriptors of connected FTDI devices.
:param url: a pattern URL to restrict the search
:param out: output stream, default to stdout
"""
devdescs = UsbTools.list_devices(url or 'ftdi:///?',
cls.VENDOR_IDS, cls.PRODUCT_IDS,
cls.DEFAULT_VENDOR)
UsbTools.show_devices('ftdi', cls.VENDOR_IDS, cls.PRODUCT_IDS,
devdescs, out)
@classmethod
def get_identifiers(cls, url: str) -> Tuple[UsbDeviceDescriptor, int]:
"""Extract the identifiers of an FTDI device from URL, if any
:param url: input URL to parse
"""
return UsbTools.parse_url(url,
cls.SCHEME, cls.VENDOR_IDS, cls.PRODUCT_IDS,
cls.DEFAULT_VENDOR)
@classmethod
def get_device(cls, url: str) -> UsbDevice:
"""Get a USB device from its URL, without opening an instance.
:param url: input URL to parse
:return: the USB device that match the specified URL
"""
devdesc, _ = cls.get_identifiers(url)
return UsbTools.get_device(devdesc)
@classmethod
def add_custom_vendor(cls, vid: int, vidname: str = '') -> None:
"""Add a custom USB vendor identifier.
It can be useful to use a pretty URL for opening FTDI device
:param vid: Vendor ID (USB 16-bit identifier)
:param vidname: Vendor name (arbitrary string)
:raise ValueError: if the vendor id is already referenced
"""
if vid in cls.VENDOR_IDS.values():
raise ValueError(f'Vendor ID 0x{vid:04x} already registered')
if not vidname:
vidname = f'0x{vid:04x}'
cls.VENDOR_IDS[vidname] = vid
@classmethod
def add_custom_product(cls, vid: int, pid: int, pidname: str = '') -> None:
"""Add a custom USB product identifier.
It is required for opening FTDI device with non-standard VID/PID
USB identifiers.
:param vid: Vendor ID (USB 16-bit identifier)
:param pid: Product ID (USB 16-bit identifier)
:param pidname: Product name (arbitrary string)
:raise ValueError: if the product id is already referenced
"""
if vid not in cls.PRODUCT_IDS:
cls.PRODUCT_IDS[vid] = OrderedDict()
elif pid in cls.PRODUCT_IDS[vid].values():
raise ValueError(f'Product ID 0x{vid:04x}:0x{pid:04x} already '
f'registered')
if not pidname:
pidname = f'0x{pid:04x}'
cls.PRODUCT_IDS[vid][pidname] = pid
@classmethod
def decode_modem_status(cls, value: bytes, error_only: bool = False) -> \
Tuple[str, ...]:
"""Decode the FTDI modem status bitfield into short strings.
:param value: 2-byte mode status
:param error_only: only decode error flags
:return: a tuple of status identifiers
"""
status = []
for pos, (byte_, ebits) in enumerate(zip(value, cls.ERROR_BITS)):
for bit, _ in enumerate(cls.MODEM_STATUS[pos]):
if error_only:
byte_ &= ebits
if byte_ & (1 << bit):
status.append(cls.MODEM_STATUS[pos][bit])
return tuple(status)
@staticmethod
def find_all(vps: Sequence[Tuple[int, int]], nocache: bool = False) -> \
List[Tuple[UsbDeviceDescriptor, int]]:
"""Find all devices that match the vendor/product pairs of the vps
list.
:param vps: a sequence of 2-tuple (vid, pid) pairs
:type vps: tuple(int, int)
:param bool nocache: bypass cache to re-enumerate USB devices on
the host
:return: a list of 5-tuple (vid, pid, sernum, iface, description)
device descriptors
:rtype: list(tuple(int,int,str,int,str))
"""
return UsbTools.find_all(vps, nocache)
@property
def is_connected(self) -> bool:
"""Tells whether this instance is connected to an actual USB slave.
:return: the slave connection status
"""
return bool(self._usb_dev)
def open_from_url(self, url: str) -> None:
"""Open a new interface to the specified FTDI device.
:param str url: a FTDI URL selector
"""
devdesc, interface = self.get_identifiers(url)
device = UsbTools.get_device(devdesc)
self.open_from_device(device, interface)
def open(self, vendor: int, product: int, bus: Optional[int] = None,
address: Optional[int] = None, index: int = 0,
serial: Optional[str] = None,
interface: int = 1) -> None:
"""Open a new interface to the specified FTDI device.
If several FTDI devices of the same kind (vid, pid) are connected
to the host, either index or serial argument should be used to
discriminate the FTDI device.
index argument is not a reliable solution as the host may enumerate
the USB device in random order. serial argument is more reliable
selector and should always be prefered.
Some FTDI devices support several interfaces/ports (such as FT2232H,
FT4232H and FT4232HA). The interface argument selects the FTDI port
to use, starting from 1 (not 0).
:param int vendor: USB vendor id
:param int product: USB product id
:param int bus: optional selector, USB bus
:param int address: optional selector, USB address on bus
:param int index: optional selector, specified the n-th matching
FTDI enumerated USB device on the host
:param str serial: optional selector, specified the FTDI device
by its serial number
:param str interface: FTDI interface/port
"""
devdesc = UsbDeviceDescriptor(vendor, product, bus, address, serial,
index, None)
device = UsbTools.get_device(devdesc)
self.open_from_device(device, interface)
def open_from_device(self, device: UsbDevice,
interface: int = 1) -> None:
"""Open a new interface from an existing USB device.
:param device: FTDI USB device (PyUSB instance)
:param interface: FTDI interface to use (integer starting from 1)
"""
if not isinstance(device, UsbDevice):
raise FtdiError(f"Device '{device}' is not a PyUSB device")
self._usb_dev = device
try:
self._usb_dev.set_configuration()
except USBError:
pass
# detect invalid interface as early as possible
config = self._usb_dev.get_active_configuration()
if interface > config.bNumInterfaces:
raise FtdiError(f'No such FTDI port: {interface}')
self._set_interface(config, interface)
self._max_packet_size = self._get_max_packet_size()
# Invalidate data in the readbuffer
self._readoffset = 0
self._readbuffer = bytearray()
# Drain input buffer
self.purge_buffers()
# Shallow reset
self._reset_device()
# Reset feature mode
self.set_bitmode(0, Ftdi.BitMode.RESET)
# Init latency
self._latency_threshold = None
self.set_latency_timer(self.LATENCY_MIN)
self._debug_log = self.log.getEffectiveLevel() == DEBUG
def close(self, freeze: bool = False) -> None:
"""Close the FTDI interface/port.
:param freeze: if set, FTDI port is not reset to its default
state on close. This means the port is left with
its current configuration and output signals.
This feature should not be used except for very
specific needs.
"""
if self._usb_dev:
dev = self._usb_dev
if self._is_pyusb_handle_active():
# Do not attempt to execute the following calls if the
# device has been closed: the ResourceManager may attempt
# to re-open the device that has been already closed, and
# this may lead to a (native) crash in libusb.
try:
if not freeze:
self.set_bitmode(0, Ftdi.BitMode.RESET)
self.set_latency_timer(self.LATENCY_MAX)
release_interface(dev, self._index - 1)
except FtdiError as exc:
self.log.warning('FTDI device may be gone: %s', exc)
try:
self._usb_dev.attach_kernel_driver(self._index - 1)
except (NotImplementedError, USBError):
pass
self._usb_dev = None
UsbTools.release_device(dev)
def reset(self, usb_reset: bool = False) -> None:
"""Reset FTDI device.
:param usb_reset: wether to perform a full USB reset of the device.
Beware that selecting usb_reset performs a full USB device reset,
which means all other interfaces of the same device are also
affected.
"""
if not self.is_connected:
raise FtdiError('Not connected')
self._reset_device()
if usb_reset:
self._reset_usb_device()
def open_mpsse_from_url(self, url: str, direction: int = 0x0,
initial: int = 0x0, frequency: float = 6.0E6,
latency: int = 16, debug: bool = False) -> float:
"""Open a new interface to the specified FTDI device in MPSSE mode.
MPSSE enables I2C, SPI, JTAG or other synchronous serial interface
modes (vs. UART mode).
:param url: a FTDI URL selector
:param direction: a bitfield specifying the FTDI GPIO direction,
where high level defines an output, and low level defines an
input
:param initial: a bitfield specifying the initial output value
:param float frequency: serial interface clock in Hz
:param latency: low-level latency in milliseconds. The shorter
the delay, the higher the host CPU load. Do not use shorter
values than the default, as it triggers data loss in FTDI.
:param debug: use a tracer to decode MPSSE protocol
:return: actual bus frequency in Hz
"""
devdesc, interface = self.get_identifiers(url)
device = UsbTools.get_device(devdesc)
return self.open_mpsse_from_device(device, interface,
direction=direction,
initial=initial,
frequency=frequency,
latency=latency,
debug=debug)
def open_mpsse(self, vendor: int, product: int, bus: Optional[int] = None,
address: Optional[int] = None, index: int = 0,
serial: Optional[str] = None, interface: int = 1,
direction: int = 0x0, initial: int = 0x0,
frequency: float = 6.0E6, latency: int = 16,
debug: bool = False) -> float:
"""Open a new interface to the specified FTDI device in MPSSE mode.
MPSSE enables I2C, SPI, JTAG or other synchronous serial interface
modes (vs. UART mode).
If several FTDI devices of the same kind (vid, pid) are connected
to the host, either index or serial argument should be used to
discriminate the FTDI device.
index argument is not a reliable solution as the host may enumerate
the USB device in random order. serial argument is more reliable
selector and should always be prefered.
Some FTDI devices support several interfaces/ports (such as FT2232H,
FT4232H and FT4232HA). The interface argument selects the FTDI port
to use, starting from 1 (not 0). Note that not all FTDI ports are
MPSSE capable.
:param vendor: USB vendor id
:param product: USB product id
:param bus: optional selector, USB bus
:param address: optional selector, USB address on bus
:param index: optional selector, specified the n-th matching
FTDI enumerated USB device on the host
:param serial: optional selector, specified the FTDI device
by its serial number
:param interface: FTDI interface/port
:param direction: a bitfield specifying the FTDI GPIO direction,
where high level defines an output, and low level defines an
input
:param initial: a bitfield specifying the initial output value
:param frequency: serial interface clock in Hz
:param latency: low-level latency in milliseconds. The shorter
the delay, the higher the host CPU load. Do not use shorter
values than the default, as it triggers data loss in FTDI.
:param bool debug: use a tracer to decode MPSSE protocol
:return: actual bus frequency in Hz
"""
devdesc = UsbDeviceDescriptor(vendor, product, bus, address, serial,
index, None)
device = UsbTools.get_device(devdesc)
return self.open_mpsse_from_device(device, interface,
direction=direction,
initial=initial,
frequency=frequency,
latency=latency,
debug=debug)
def open_mpsse_from_device(self, device: UsbDevice,
interface: int = 1, direction: int = 0x0,
initial: int = 0x0, frequency: float = 6.0E6,
latency: int = 16, tracer: bool = False,
debug: bool = False) -> float:
"""Open a new interface to the specified FTDI device in MPSSE mode.
MPSSE enables I2C, SPI, JTAG or other synchronous serial interface
modes (vs. UART mode).
If several FTDI devices of the same kind (vid, pid) are connected
to the host, either index or serial argument should be used to
discriminate the FTDI device.
index argument is not a reliable solution as the host may enumerate
the USB device in random order. serial argument is more reliable
selector and should always be prefered.
Some FTDI devices support several interfaces/ports (such as FT2232H,
FT4232H and FT4232HA). The interface argument selects the FTDI port
to use, starting from 1 (not 0). Note that not all FTDI ports are
MPSSE capable.
:param device: FTDI USB device
:param interface: FTDI interface/port
:param direction: a bitfield specifying the FTDI GPIO direction,
where high level defines an output, and low level defines an
input
:param initial: a bitfield specifying the initial output value
:param frequency: serial interface clock in Hz
:param latency: low-level latency in milliseconds. The shorter
the delay, the higher the host CPU load. Do not use shorter
values than the default, as it triggers data loss in FTDI.
:param bool tracer: use a tracer to decode MPSSE protocol
:param bool debug: add more debug traces
:return: actual bus frequency in Hz
"""
# pylint: disable=unused-argument
self.open_from_device(device, interface)
if not self.is_mpsse_interface(interface):
self.close()
raise FtdiMpsseError('This interface does not support MPSSE')
if to_bool(tracer): # accept strings as boolean
# pylint: disable=import-outside-toplevel
from .tracer import FtdiMpsseTracer
self._tracer = FtdiMpsseTracer(self.device_version)
self.log.debug('Using MPSSE tracer')
# Set latency timer
self.set_latency_timer(latency)
# Set chunk size
self.write_data_set_chunksize()
self.read_data_set_chunksize()
# Reset feature mode
self.set_bitmode(0, Ftdi.BitMode.RESET)
# Drain buffers
self.purge_buffers()
# Disable event and error characters
self.set_event_char(0, False)
self.set_error_char(0, False)
# Enable MPSSE mode
self.set_bitmode(direction, Ftdi.BitMode.MPSSE)
# Configure clock
frequency = self._set_frequency(frequency)
# Configure I/O
cmd = bytearray((Ftdi.SET_BITS_LOW, initial & 0xFF, direction & 0xFF))
if self.has_wide_port:
initial >>= 8
direction >>= 8
cmd.extend((Ftdi.SET_BITS_HIGH, initial & 0xFF, direction & 0xFF))
self.write_data(cmd)
# Disable loopback
self.write_data(bytearray((Ftdi.LOOPBACK_END,)))
self.validate_mpsse()
# Return the actual frequency
return frequency
def open_bitbang_from_url(self, url: str, direction: int = 0x0,
latency: int = 16, baudrate: int = 1000000,
sync: bool = False) -> float:
"""Open a new interface to the specified FTDI device in bitbang mode.
Bitbang enables direct read or write to FTDI GPIOs.
:param url: a FTDI URL selector
:param direction: a bitfield specifying the FTDI GPIO direction,
where high level defines an output, and low level defines an
input
:param latency: low-level latency to select the USB FTDI poll
delay. The shorter the delay, the higher the host CPU load.
:param baudrate: pace to sequence GPIO exchanges
:param sync: whether to use synchronous or asynchronous bitbang
:return: actual bitbang baudrate in bps
"""
devdesc, interface = self.get_identifiers(url)
device = UsbTools.get_device(devdesc)
return self.open_bitbang_from_device(device, interface,
direction=direction,
latency=latency,
baudrate=baudrate,
sync=sync)
def open_bitbang(self, vendor: int, product: int,
bus: Optional[int] = None, address: Optional[int] = None,
index: int = 0, serial: Optional[str] = None,
interface: int = 1, direction: int = 0x0,
latency: int = 16, baudrate: int = 1000000,
sync: bool = False) -> float:
"""Open a new interface to the specified FTDI device in bitbang mode.
Bitbang enables direct read or write to FTDI GPIOs.
:param vendor: USB vendor id
:param product: USB product id
:param index: optional selector, specified the n-th matching
FTDI enumerated USB device on the host
:param serial: optional selector, specified the FTDI device
by its serial number
:param interface: FTDI interface/port
:param direction: a bitfield specifying the FTDI GPIO direction,
where high level defines an output, and low level defines an
input
:param latency: low-level latency to select the USB FTDI poll
delay. The shorter the delay, the higher the host CPU load.
:param baudrate: pace to sequence GPIO exchanges
:param sync: whether to use synchronous or asynchronous bitbang
:return: actual bitbang baudrate in bps
"""
devdesc = UsbDeviceDescriptor(vendor, product, bus, address, serial,
index, None)
device = UsbTools.get_device(devdesc)
return self.open_bitbang_from_device(device, interface,
direction=direction,
latency=latency,
baudrate=baudrate,
sync=sync)
def open_bitbang_from_device(self, device: UsbDevice,
interface: int = 1, direction: int = 0x0,
latency: int = 16, baudrate: int = 1000000,
sync: bool = False) -> int:
"""Open a new interface to the specified FTDI device in bitbang mode.
Bitbang enables direct read or write to FTDI GPIOs.
:param device: FTDI USB device
:param interface: FTDI interface/port
:param direction: a bitfield specifying the FTDI GPIO direction,
where high level defines an output, and low level defines an
input
:param latency: low-level latency to select the USB FTDI poll
delay. The shorter the delay, the higher the host CPU load.
:param baudrate: pace to sequence GPIO exchanges
:param sync: whether to use synchronous or asynchronous bitbang
:return: actual bitbang baudrate in bps
"""
self.open_from_device(device, interface)
# Set latency timer
self.set_latency_timer(latency)
# Set chunk size
# Beware that RX buffer, over 512 bytes, contains 2-byte modem marker
# on every 512 byte chunk, so data and out-of-band marker get
# interleaved. This is not yet supported with read_data_bytes for now
self.write_data_set_chunksize()
self.read_data_set_chunksize()
# disable flow control
self.set_flowctrl('')
# Enable BITBANG mode
self.set_bitmode(direction, Ftdi.BitMode.BITBANG if not sync else
Ftdi.BitMode.SYNCBB)
# Configure clock
if baudrate:
self._baudrate = self._set_baudrate(baudrate, False)
# Drain input buffer
self.purge_buffers()
return self._baudrate
@property
def usb_path(self) -> Tuple[int, int, int]:
"""Provide the physical location on the USB topology.
:return: a tuple of bus, address, interface; if connected
"""
if not self.is_connected:
raise FtdiError('Not connected')
return (self._usb_dev.bus, self._usb_dev.address,
self._interface.bInterfaceNumber)
@property
def device_version(self) -> int:
"""Report the device version, i.e. the kind of device.
:see: :py:meth:`ic_name` for a product version of this information.
:return: the device version (16-bit integer)
"""
if not self.is_connected:
raise FtdiError('Device characteristics not yet known')
return self._usb_dev.bcdDevice
@property
def ic_name(self) -> str:
"""Return the current type of the FTDI device as a string
see also http://www.ftdichip.com/Support/
Documents/TechnicalNotes/TN_100_USB_VID-PID_Guidelines.pdf
:return: the identified FTDI device as a string
"""
if not self.is_connected:
return 'unknown'
return self.DEVICE_NAMES.get(self.device_version, 'undefined')
@property
def device_port_count(self) -> int:
"""Report the count of port/interface of the Ftdi device.
:return: the count of ports
"""
if not self.is_connected:
raise FtdiError('Device characteristics not yet known')
return self._usb_dev.get_active_configuration().bNumInterfaces
@property
def port_index(self) -> int:
"""Report the port/interface index, starting from 1
:return: the port position/index
"""
if not self.is_connected:
raise FtdiError('Device characteristics not yet known')
return self._index
@property
def port_width(self) -> int:
"""Report the width of a single port / interface
:return: the width of the port, in bits
:raise FtdiError: if no FTDI port is open
"""
if not self.is_connected:
raise FtdiError('Device characteristics not yet known')
if self.device_version in (0x0700, 0x0900):
return 16
if self.device_version in (0x0500, ):
return 12
return 8
@property
def has_mpsse(self) -> bool:
"""Tell whether the device supports MPSSE (I2C, SPI, JTAG, ...)
:return: True if the FTDI device supports MPSSE
:raise FtdiError: if no FTDI port is open
"""
if not self.is_connected:
raise FtdiError('Device characteristics not yet known')
return self.device_version in (0x0500, 0x0700, 0x0800, 0x0900, 0x3600)
@property
def has_wide_port(self) -> bool:
"""Tell whether the device supports 16-bit GPIO ports (vs. 8 bits)
:return: True if the FTDI device supports wide GPIO port
:raise FtdiError: if no FTDI port is open
"""
return self.port_width > 8
@property
def has_cbus(self) -> bool:
"""Tell whether the device supports CBUS bitbang.
CBUS bitbanging feature requires a special configuration in EEPROM.
This function only reports if the current device supports this mode,
not if this mode has been enabled in EEPROM.
EEPROM configuration must be queried to check which CBUS pins have
been configured for GPIO/bitbang mode.
:return: True if the FTDI device supports CBUS bitbang
:raise FtdiError: if no FTDI port is open
"""
if not self.is_connected:
raise FtdiError('Device characteristics not yet known')
return self.device_version in (0x0600, 0x0900, 0x1000)
@property
def has_drivezero(self) -> bool:
"""Tell whether the device supports drive-zero mode, i.e. if the
device supports the open-collector drive mode, useful for I2C
communication for example.
:return: True if the FTDI device features drive-zero mode
:raise FtdiError: if no FTDI port is open
"""
if not self.is_connected:
raise FtdiError('Device characteristics not yet known')
return self.device_version in (0x0900, )
@property
def is_legacy(self) -> bool:
"""Tell whether the device is a low-end FTDI
:return: True if the FTDI device can only be used as a slow USB-UART
bridge
:raise FtdiError: if no FTDI port is open
"""
if not self.is_connected:
raise FtdiError('Device characteristics not yet known')
return self.device_version <= 0x0200
@property
def is_H_series(self) -> bool:
"""Tell whether the device is a high-end FTDI
:return: True if the FTDI device is a high-end USB-UART bridge
:raise FtdiError: if no FTDI port is open
"""
if not self.is_connected:
raise FtdiError('Device characteristics not yet known')
return self.device_version in (0x0700, 0x0800, 0x0900, 0x3600)
@property
def is_mpsse(self) -> bool:
"""Tell whether the device is configured in MPSSE mode
:return: True if the FTDI interface is configured in MPSSE mode
"""
return self._bitmode == Ftdi.BitMode.MPSSE
def is_mpsse_interface(self, interface: int) -> bool:
"""Tell whether the interface supports MPSSE (I2C, SPI, JTAG, ...)
:return: True if the FTDI interface supports MPSSE
:raise FtdiError: if no FTDI port is open
"""
if not self.has_mpsse:
return False
if self.device_version == 0x0800 and interface > 2:
return False
if self.device_version == 0x3600 and interface > 2:
return False
return True
@property
def is_bitbang_enabled(self) -> bool:
"""Tell whether some bitbang mode is activated
:return: True if the FTDI interface is configured to support
bitbanging
"""
return self._bitmode not in (
Ftdi.BitMode.RESET,
Ftdi.BitMode.MPSSE,
Ftdi.BitMode.CBUS # CBUS mode does not change base frequency
)
# legacy API
bitbang_enabled = is_bitbang_enabled
@property
def is_eeprom_internal(self) -> bool:
"""Tell whether the device has an internal EEPROM.
:return: True if the device has an internal EEPROM.
"""
return self.device_version in self.INT_EEPROMS
@property
def max_eeprom_size(self) -> int:
"""Report the maximum size of the EEPROM.
The actual size may be lower, of even 0 if no EEPROM is connected
or supported.
:return: the maximum size in bytes.
"""
if self.device_version in self.INT_EEPROMS:
return self.INT_EEPROMS[self.device_version]
if self.device_version == 0x0600:
return 0x80
return 0x100
@property
def frequency_max(self) -> float:
"""Tells the maximum frequency for MPSSE clock.
:return: the maximum supported frequency in Hz
"""
return Ftdi.BUS_CLOCK_HIGH if self.is_H_series else Ftdi.BUS_CLOCK_BASE
@property
def fifo_sizes(self) -> Tuple[int, int]:
"""Return the (TX, RX) tupple of hardware FIFO sizes
:return: 2-tuple of TX, RX FIFO size in bytes
"""
try:
return Ftdi.FIFO_SIZES[self.device_version]
except KeyError as exc:
raise FtdiFeatureError(f'Unsupported device: '
f'0x{self.device_version:04x}') from exc
@property
def mpsse_bit_delay(self) -> float:
"""Delay between execution of two MPSSE SET_BITS commands.
:return: minimum delay (actual value might be larger) in seconds
"""
# measured on FTDI2232H, not documented in datasheet, hence may vary
# from on FTDI model to another...
# left as a variable so it could be tweaked base on the FTDI bcd type,
# the frequency, or ... whatever else
return 0.5E-6 # seems to vary between 5 and 6.5 us
@property
def baudrate(self) -> int:
"""Return current baudrate.
"""
return self._baudrate
@property
def usb_dev(self) -> UsbDevice:
"""Return the underlying USB Device.
"""
return self._usb_dev
def set_baudrate(self, baudrate: int, constrain: bool = True) -> int:
"""Change the current UART or BitBang baudrate.
The FTDI device is not able to use an arbitrary baudrate. Its
internal dividors are only able to achieve some baudrates.
PyFtdi attemps to find the closest configurable baudrate and if
the deviation from the requested baudrate is too high, it rejects
the configuration if constrain is set.
:py:attr:`baudrate` attribute can be used to retrieve the exact
selected baudrate.
:py:const:`BAUDRATE_TOLERANCE` defines the maximum deviation between
the requested baudrate and the closest FTDI achieveable baudrate,
which matches standard UART clock drift (3%). If the achievable
baudrate is not within limits, baudrate setting is rejected.
:param baudrate: the new baudrate for the UART.
:param constrain: whether to validate baudrate is in RS232 tolerance
limits or allow larger drift
:raise ValueError: if deviation from selected baudrate is too large
:raise FtdiError: on IO Error
:return: the effective baudrate
"""
self._baudrate = self._set_baudrate(baudrate, constrain)
return self._baudrate
def set_frequency(self, frequency: float) -> float:
"""Change the current MPSSE bus frequency
The FTDI device is not able to use an arbitrary frequency. Its
internal dividors are only able to achieve some frequencies.
PyFtdi finds and selects the closest configurable frequency.
:param frequency: the new frequency for the serial interface,
in Hz.
:return: the selected frequency, which may differ from the requested
one, in Hz
"""
return self._set_frequency(frequency)
def purge_rx_buffer(self) -> None:
"""Clear the USB receive buffer on the chip (host-to-ftdi) and the
internal read buffer."""
if self._ctrl_transfer_out(Ftdi.SIO_REQ_RESET,
Ftdi.SIO_RESET_PURGE_RX):
raise FtdiError('Unable to flush RX buffer')
# Invalidate data in the readbuffer
self._readoffset = 0
self._readbuffer = bytearray()
self.log.debug('rx buf purged')
def purge_tx_buffer(self) -> None:
"""Clear the USB transmit buffer on the chip (ftdi-to-host)."""
if self._ctrl_transfer_out(Ftdi.SIO_REQ_RESET,
Ftdi.SIO_RESET_PURGE_TX):
raise FtdiError('Unable to flush TX buffer')
def purge_buffers(self) -> None:
"""Clear the buffers on the chip and the internal read buffer."""
self.purge_rx_buffer()
self.purge_tx_buffer()
def write_data_set_chunksize(self, chunksize: int = 0) -> None:
"""Configure write buffer chunk size.
This is a low-level configuration option, which is not intended to
be use for a regular usage.
:param chunksize: the optional size of the write buffer in bytes,
it is recommended to use 0 to force automatic
evaluation of the best value.
"""
if chunksize == 0:
chunksize = self.fifo_sizes[0]
self._writebuffer_chunksize = chunksize
self.log.debug('TX chunksize: %d', self._writebuffer_chunksize)
def write_data_get_chunksize(self) -> int:
"""Get write buffer chunk size.
:return: the size of the write buffer in bytes
"""
return self._writebuffer_chunksize
def read_data_set_chunksize(self, chunksize: int = 0) -> None:
"""Configure read buffer chunk size.
This is a low-level configuration option, which is not intended to
be use for a regular usage.
:param chunksize: the optional size of the read buffer in bytes,
it is recommended to use 0 to force automatic
evaluation of the best value.
"""
# Invalidate all remaining data
self._readoffset = 0
self._readbuffer = bytearray()
if chunksize == 0:
# status byte prolog is emitted every maxpacketsize, but for "some"
# reasons, FT232R emits it every RX FIFO size bytes... Other
# devices use a maxpacketsize which is smaller or equal to their
# FIFO size, so this weird behavior is for now only experienced
# with FT232R. Any, the following compution should address all
# devices.
chunksize = min(self.fifo_sizes[0], self.fifo_sizes[1],
self._max_packet_size)
if platform == 'linux':
chunksize = min(chunksize, 16384)
self._readbuffer_chunksize = chunksize
self.log.debug('RX chunksize: %d', self._readbuffer_chunksize)
def read_data_get_chunksize(self) -> int:
"""Get read buffer chunk size.
:return: the size of the write buffer in bytes
"""
return self._readbuffer_chunksize
def set_bitmode(self, bitmask: int, mode: 'Ftdi.BitMode') -> None:
"""Enable/disable bitbang modes.
Switch the FTDI interface to bitbang mode.
"""
self.log.debug('bitmode: %s', mode.name)
mask = sum(Ftdi.BitMode)
value = (bitmask & 0xff) | ((mode.value & mask) << 8)
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_BITMODE, value):
raise FtdiError('Unable to set bitmode')
self._bitmode = mode
def read_pins(self) -> int:
"""Directly read pin state, circumventing the read buffer.
Useful for bitbang mode.
:return: bitfield of FTDI interface input GPIO
"""
pins = self._ctrl_transfer_in(Ftdi.SIO_REQ_READ_PINS, 1)
if not pins:
raise FtdiError('Unable to read pins')
return pins[0]
def set_cbus_direction(self, mask: int, direction: int) -> None:
"""Configure the CBUS pins used as GPIOs
:param mask: which pins to configure as GPIOs
:param direction: which pins are output (vs. input)
"""
# sanity check: there cannot be more than 4 CBUS pins in bitbang mode
if not 0 <= mask <= 0x0F:
raise ValueError(f'Invalid CBUS gpio mask: 0x{mask:02x}')
if not 0 <= direction <= 0x0F:
raise ValueError(f'Invalid CBUS gpio direction: 0x{direction:02x}')
self._cbus_pins = (mask, direction)
def get_cbus_gpio(self) -> int:
"""Get the CBUS pins configured as GPIO inputs
:return: bitfield of CBUS read pins
"""
if self._bitmode not in (Ftdi.BitMode.RESET, Ftdi.BitMode.CBUS):
raise FtdiError('CBUS gpio not available from current mode')
if not self._cbus_pins[0] & ~self._cbus_pins[1]:
raise FtdiError('No CBUS IO configured as input')
outv = (self._cbus_pins[1] << 4) | self._cbus_out
oldmode = self._bitmode
try:
self.set_bitmode(outv, Ftdi.BitMode.CBUS)
inv = self.read_pins()
finally:
if oldmode != self._bitmode:
self.set_bitmode(0, oldmode)
return inv & ~self._cbus_pins[1] & self._cbus_pins[0]
def set_cbus_gpio(self, pins: int) -> None:
"""Set the CBUS pins configured as GPIO outputs
:param pins: bitfield to apply to CBUS output pins
"""
if self._bitmode not in (Ftdi.BitMode.RESET, Ftdi.BitMode.CBUS):
raise FtdiError('CBUS gpio not available from current mode')
# sanity check: there cannot be more than 4 CBUS pins in bitbang mode
if not 0 <= pins <= 0x0F:
raise ValueError(f'Invalid CBUS gpio pins: 0x{pins:02x}')
if not self._cbus_pins[0] & self._cbus_pins[1]:
raise FtdiError('No CBUS IO configured as output')
pins &= self._cbus_pins[0] & self._cbus_pins[1]
value = (self._cbus_pins[1] << 4) | pins
oldmode = self._bitmode
try:
self.set_bitmode(value, Ftdi.BitMode.CBUS)
self._cbus_out = pins
finally:
if oldmode != self._bitmode:
self.set_bitmode(0, oldmode)
def set_latency_timer(self, latency: int):
"""Set latency timer.
The FTDI chip keeps data in the internal buffer for a specific
amount of time if the buffer is not full yet to decrease
load on the usb bus.
The shorted the latency, the shorted the delay to obtain data and
the higher the host CPU load. Be careful with this option.
:param latency: latency (unspecified unit)
"""
if not Ftdi.LATENCY_MIN <= latency <= Ftdi.LATENCY_MAX:
raise ValueError("Latency out of range")
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_LATENCY_TIMER, latency):
raise FtdiError('Unable to latency timer')
def get_latency_timer(self) -> int:
"""Get latency timer.
:return: the current latency (unspecified unit)
"""
latency = self._ctrl_transfer_in(Ftdi.SIO_REQ_GET_LATENCY_TIMER, 1)
if not latency:
raise FtdiError('Unable to get latency')
return latency[0]
def poll_modem_status(self) -> int:
"""Poll modem status information.
This function allows the retrieve the two status bytes of the
device, useful in UART mode.
FTDI device does not have a so-called USB "interrupt" end-point,
event polling on the UART interface is done through the regular
control endpoint.
see :py:func:`modem_status` to obtain decoded status strings
:return: modem status, as a proprietary bitfield
"""
value = self._ctrl_transfer_in(Ftdi.SIO_REQ_POLL_MODEM_STATUS, 2)
if not value or len(value) != 2:
raise FtdiError('Unable to get modem status')
status, = sunpack('<H', value)
return status
def modem_status(self) -> Tuple[str, ...]:
"""Provide the current modem status as a tuple of set signals
:return: decodede modem status as short strings
"""
value = self._ctrl_transfer_in(Ftdi.SIO_REQ_POLL_MODEM_STATUS, 2)
if not value or len(value) != 2:
raise FtdiError('Unable to get modem status')
return self.decode_modem_status(value)
def set_flowctrl(self, flowctrl: str) -> None:
"""Select flowcontrol in UART mode.
Either hardware flow control through RTS/CTS UART lines,
software or no flow control.
:param str flowctrl: either 'hw' for HW flow control or '' (empty
string) for no flow control.
:raise ValueError: if the flow control argument is invalid
.. note:: How does RTS/CTS flow control work (from FTDI FAQ):
FTxxx RTS# pin is an output. It should be connected to the CTS#
input pin of the device at the other end of the UART link.
* If RTS# is logic 0 it is indicating the FTxxx device can
accept more data on the RXD pin.
* If RTS# is logic 1 it is indicating the FTxxx device
cannot accept more data.
RTS# changes state when the chip buffer reaches its last 32
bytes of space to allow time for the external device to stop
sending data to the FTxxx device.
FTxxx CTS# pin is an input. It should be connected to the RTS#
output pin of the device at the other end of the UART link.
* If CTS# is logic 0 it is indicating the external device can
accept more data, and the FTxxx will transmit on the TXD
pin.
* If CTS# is logic 1 it is indicating the external device
cannot accept more data. the FTxxx will stop transmitting
within 0~3 characters, depending on what is in the buffer.
**This potential 3 character overrun does occasionally
present problems.** Customers shoud be made aware the FTxxx
is a USB device and not a "normal" RS232 device as seen on
a PC. As such the device operates on a packet basis as
opposed to a byte basis.
Word to the wise. Not only do RS232 level shifting devices
level shift, but they also invert the signal.
"""
ctrl = {'hw': Ftdi.SIO_RTS_CTS_HS,
'': Ftdi.SIO_DISABLE_FLOW_CTRL}
try:
value = ctrl[flowctrl] | self._index
except KeyError as exc:
raise ValueError(f'Unknown flow control: {flowctrl}') from exc
try:
if self._usb_dev.ctrl_transfer(
Ftdi.REQ_OUT, Ftdi.SIO_REQ_SET_FLOW_CTRL, 0, value,
bytearray(), self._usb_write_timeout):
raise FtdiError('Unable to set flow control')
except USBError as exc:
raise FtdiError(f'UsbError: {exc}') from exc
def set_dtr(self, state: bool) -> None:
"""Set dtr line
:param state: new DTR logical level
"""
value = Ftdi.SIO_SET_DTR_HIGH if state else Ftdi.SIO_SET_DTR_LOW
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_MODEM_CTRL, value):
raise FtdiError('Unable to set DTR line')
def set_rts(self, state: bool) -> None:
"""Set rts line
:param state: new RTS logical level
"""
value = Ftdi.SIO_SET_RTS_HIGH if state else Ftdi.SIO_SET_RTS_LOW
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_MODEM_CTRL, value):
raise FtdiError('Unable to set RTS line')
def set_dtr_rts(self, dtr: bool, rts: bool) -> None:
"""Set dtr and rts lines at once
:param dtr: new DTR logical level
:param rts: new RTS logical level
"""
value = 0
value |= Ftdi.SIO_SET_DTR_HIGH if dtr else Ftdi.SIO_SET_DTR_LOW
value |= Ftdi.SIO_SET_RTS_HIGH if rts else Ftdi.SIO_SET_RTS_LOW
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_MODEM_CTRL, value):
raise FtdiError('Unable to set DTR/RTS lines')
def set_break(self, break_: bool) -> None:
"""Start or stop a break exception event on the serial line
:param break_: either start or stop break event
"""
if break_:
value = self._lineprop | (0x01 << 14)
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_DATA, value):
raise FtdiError('Unable to start break sequence')
else:
value = self._lineprop & ~(0x01 << 14)
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_DATA, value):
raise FtdiError('Unable to stop break sequence')
self._lineprop = value
def set_event_char(self, eventch: int, enable: bool) -> None:
"""Set the special event character"""
value = eventch
if enable:
value |= 1 << 8
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_EVENT_CHAR, value):
raise FtdiError('Unable to set event char')
def set_error_char(self, errorch: int, enable: bool) -> None:
"""Set error character"""
value = errorch
if enable:
value |= 1 << 8
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_ERROR_CHAR, value):
raise FtdiError('Unable to set error char')
def set_line_property(self, bits: int, stopbit: Union[int, float],
parity: str, break_: bool = False) -> None:
"""Configure the (RS232) UART characteristics.
Arguments match the valid subset for FTDI HW of pyserial
definitions.
Bits accepts one of the following values:
* ``7`` for 7-bit characters
* ``8`` for 8-bit characters
Stopbit accepts one of the following values:
* ``1`` for a single bit
* ``1.5`` for a bit and a half
* ``2`` for two bits
Parity accepts one of the following strings:
* ``N`` for no parity bit
* ``O`` for odd parity bit
* ``E`` for even parity bit
* ``M`` for parity bit always set
* ``S`` for parity bit always reset
:param bits: data bit count
:param stopbit: stop bit count
:param parity: parity mode as a single uppercase character
:param break_: force break event
"""
bytelength = {7: Ftdi.BITS_7,
8: Ftdi.BITS_8}
parities = {'N': Ftdi.PARITY_NONE,
'O': Ftdi.PARITY_ODD,
'E': Ftdi.PARITY_EVEN,
'M': Ftdi.PARITY_MARK,
'S': Ftdi.PARITY_SPACE}
stopbits = {1: Ftdi.STOP_BIT_1,
1.5: Ftdi.STOP_BIT_15,
2: Ftdi.STOP_BIT_2}
if parity not in parities:
raise FtdiFeatureError("Unsupported parity")
if bits not in bytelength:
raise FtdiFeatureError("Unsupported byte length")
if stopbit not in stopbits:
raise FtdiFeatureError("Unsupported stop bits")
value = bits & 0x0F
try:
value |= {Ftdi.PARITY_NONE: 0x00 << 8,
Ftdi.PARITY_ODD: 0x01 << 8,
Ftdi.PARITY_EVEN: 0x02 << 8,
Ftdi.PARITY_MARK: 0x03 << 8,
Ftdi.PARITY_SPACE: 0x04 << 8}[parities[parity]]
value |= {Ftdi.STOP_BIT_1: 0x00 << 11,
Ftdi.STOP_BIT_15: 0x01 << 11,
Ftdi.STOP_BIT_2: 0x02 << 11}[stopbits[stopbit]]
if break_ == Ftdi.BREAK_ON:
value |= 0x01 << 14
except KeyError as exc:
raise ValueError('Invalid line property') from exc
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_DATA, value):
raise FtdiError('Unable to set line property')
self._lineprop = value
def enable_adaptive_clock(self, enable: bool = True) -> None:
"""Enable adaptative clock mode, useful in MPSEE mode.
Adaptive clock is a unique feature designed for a feedback clock
for JTAG with ARM core.
:param enable: whether to enable or disable this mode.
:raise FtdiMpsseError: if MPSSE mode is not enabled
"""
if not self.is_mpsse:
raise FtdiMpsseError('Setting adaptive clock mode is only '
'available from MPSSE mode')
self.write_data(bytearray([enable and Ftdi.ENABLE_CLK_ADAPTIVE or
Ftdi.DISABLE_CLK_ADAPTIVE]))
def enable_3phase_clock(self, enable: bool = True) -> None:
"""Enable 3-phase clocking mode, useful in MPSSE mode.
3-phase clock is mostly useful with I2C mode. It is also be used
as a workaround to support SPI mode 3.
:param enable: whether to enable or disable this mode.
:raise FtdiMpsseError: if MPSSE mode is not enabled or device is
not capable of 3-phase clocking
"""
if not self.is_mpsse:
raise FtdiMpsseError('Setting 3-phase clock mode is only '
'available from MPSSE mode')
if not self.is_H_series:
raise FtdiFeatureError('This device does not support 3-phase '
'clock')
self.write_data(bytearray([enable and Ftdi.ENABLE_CLK_3PHASE or
Ftdi.DISABLE_CLK_3PHASE]))
def enable_drivezero_mode(self, lines: int) -> None:
"""Enable drive-zero mode, useful in MPSSE mode.
drive-zero mode is mostly useful with I2C mode, to support the open
collector driving mode.
:param lines: bitfield of GPIO to drive in collector driven mode
:raise FtdiMpsseError: if MPSSE mode is not enabled or device is
not capable of drive-zero mode
"""
if not self.is_mpsse:
raise FtdiMpsseError('Setting drive-zero mode is only '
'available from MPSSE mode')
if not self.has_drivezero:
raise FtdiFeatureError('This device does not support drive-zero '
'mode')
self.write_data(bytearray([Ftdi.DRIVE_ZERO, lines & 0xff,
(lines >> 8) & 0xff]))
def enable_loopback_mode(self, loopback: bool = False) -> None:
"""Enable loopback, i.e. connect DO to DI in FTDI MPSSE port for test
purposes only. It does not support UART (TX to RX) mode.
:param loopback: whether to enable or disable this mode
"""
self.write_data(bytearray((Ftdi.LOOPBACK_START if loopback else
Ftdi.LOOPBACK_END,)))
def calc_eeprom_checksum(self, data: Union[bytes, bytearray]) -> int:
"""Calculate EEPROM checksum over the data
:param data: data to compute checksum over. Must be an even number
of bytes
:return: checksum
"""
length = len(data)
if not length:
raise ValueError('No data to checksum')
if length & 0x1:
raise ValueError('Length not even')
# NOTE: checksum is computed using 16-bit values in little endian
# ordering
checksum = 0XAAAA
mtp = self.device_version == 0x1000 # FT230X
for idx in range(0, length, 2):
if mtp and 0x24 <= idx < 0x80:
# special MTP user section which is not considered for the CRC
continue
val = ((data[idx+1] << 8) + data[idx]) & 0xffff
checksum = val ^ checksum
checksum = ((checksum << 1) & 0xffff) | ((checksum >> 15) & 0xffff)
return checksum
def read_eeprom(self, addr: int = 0, length: Optional[int] = None,
eeprom_size: Optional[int] = None) -> bytes:
"""Read the EEPROM starting at byte address, addr, and returning
length bytes. Here, addr and length are in bytes but we
access a 16-bit word at a time, so automatically update
addr and length to work with word accesses.
:param addr: byte address that desire to read.
:param length: byte length to read or None
:param eeprom_size: total size in bytes of the eeprom or None
:return: eeprom bytes, as an array of bytes
"""
eeprom_size = self._check_eeprom_size(eeprom_size)
if length is None:
length = eeprom_size
if addr < 0 or (addr+length) > eeprom_size:
raise ValueError('Invalid address/length')
word_addr = addr >> 1
word_count = length >> 1
if (addr & 0x1) | (length & 0x1):
word_count += 1
try:
data = bytearray()
while word_count:
buf = self._usb_dev.ctrl_transfer(
Ftdi.REQ_IN, Ftdi.SIO_REQ_READ_EEPROM, 0,
word_addr, 2, self._usb_read_timeout)
if not buf:
err_addr = word_addr << 1
raise FtdiEepromError(f'EEPROM read error @ {err_addr}')
data.extend(buf)
word_count -= 1
word_addr += 1
start = addr & 0x1
return bytes(data[start:start+length])
except USBError as exc:
raise FtdiError(f'UsbError: {exc}') from exc
def write_eeprom(self, addr: int, data: Union[bytes, bytearray],
eeprom_size: Optional[int] = None,
dry_run: bool = True) -> None:
"""Write multiple bytes to the EEPROM starting at byte address,
addr. This function also updates the checksum
automatically.
.. warning:: You can brick your device with invalid size or content.
Use this function at your own risk, and RTFM.
:param addr: starting byte address to start writing
:param data: data to be written
:param eeprom_size: total size in bytes of the eeprom or None
:param dry_run: log what should be written, do not actually
change the EEPROM content
"""
eeprom_size = self._check_eeprom_size(eeprom_size)
if not data:
return
length = len(data)
if addr < 0 or (addr+length) > eeprom_size:
# accept up to eeprom_size, even if the last two bytes are
# overwritten with a locally computed checksum
raise ValueError('Invalid address/length')
# First, read out the entire EEPROM, based on eeprom_size.
eeprom = bytearray(self.read_eeprom(0, eeprom_size))
# patch in the new data
eeprom[addr:addr+len(data)] = data
# compute new checksum
chksum = self.calc_eeprom_checksum(eeprom[:-2])
self.log.info('New EEPROM checksum: 0x%04x', chksum)
# insert updated checksum - it is last 16-bits in EEPROM
if self.device_version == 0x1000:
# FT230x EEPROM structure is different
eeprom[0x7e] = chksum & 0x0ff
eeprom[0x7f] = chksum >> 8
else:
eeprom[-2] = chksum & 0x0ff
eeprom[-1] = chksum >> 8
# Write back the new data and checksum back to
# EEPROM. Only write data that is changing instead of writing
# everything in EEPROM, even if the data does not change.
#
# Compute start and end sections of eeprom baring in mind that
# they must be even since it is a 16-bit EEPROM.
# If start addr is odd, back it up one.
start = addr
size = length
if start & 0x1:
start -= 1
size += 1
if size & 0x1:
size += 1
size = min(size, eeprom_size - 2)
# finally, write new section of data and ...
self._write_eeprom_raw(start, eeprom[start:start+size],
dry_run=dry_run)
# ... updated checksum
self._write_eeprom_raw((eeprom_size-2), eeprom[-2:], dry_run=dry_run)
def overwrite_eeprom(self, data: Union[bytes, bytearray],
dry_run: bool = True) -> None:
"""Write the whole EEPROM content, from first to last byte.
.. warning:: You can brick your device with invalid size or content.
Use this function at your own risk, and RTFM.
:param data: data to be written (should include the checksum)
:param dry_run: log what should be written, do not actually
change the EEPROM content
"""
if self.is_eeprom_internal:
eeprom_size = self.INT_EEPROMS[self.device_version]
if len(data) != eeprom_size:
raise ValueError('Invalid EEPROM size')
elif len(data) not in self.EXT_EEPROM_SIZES:
raise ValueError('Invalid EEPROM size')
self._write_eeprom_raw(0, data, dry_run=dry_run)
def write_data(self, data: Union[bytes, bytearray]) -> int:
"""Write data to the FTDI port.
In UART mode, data contains the serial stream to write to the UART
interface.
In MPSSE mode, data contains the sequence of MPSSE commands and
data.
Data buffer is split into chunk-sized blocks before being sent over
the USB bus.
:param data: the byte stream to send to the FTDI interface
:return: count of written bytes
"""
offset = 0
size = len(data)
try:
while offset < size:
write_size = self._writebuffer_chunksize
if offset + write_size > size:
write_size = size - offset
length = self._write(data[offset:offset+write_size])
if length <= 0:
raise FtdiError("Usb bulk write error")
offset += length
return offset
except USBError as exc:
raise FtdiError(f'UsbError: {exc}') from exc
def read_data_bytes(self, size: int, attempt: int = 1,
request_gen: Optional[Callable[[int],
Union[bytes, bytearray]]] = None) \
-> bytearray:
"""Read data from the FTDI interface
In UART mode, data contains the serial stream read from the UART
interface.
In MPSSE mode, data contains the sequence of data received and
processed with the MPSEE engine.
Data buffer is rebuilt from chunk-sized blocks received over the USB
bus.
FTDI device always sends internal status bytes, which are stripped
out as not part of the data payload.
Because of the multiple buses, buffers, FIFOs, and MPSSE command
processing, data might not be immediately available on the host
side. The attempt argument can be used to increase the attempt count
to retrieve the expected amount of data, before giving up and
returning all the received data, which may be shorted than the
requested amount.
:param size: the number of bytes to received from the device
:param attempt: attempt cycle count
:param request_gen: a callable that takes the number of bytes read
and expect a bytes byffer to send back to the
remote device. This is only useful to perform
optimized/continuous transfer from a slave
device.
:return: payload bytes, as bytes
"""
# Packet size sanity check
if not self._max_packet_size:
raise FtdiError("max_packet_size is bogus")
packet_size = self._max_packet_size
length = 1 # initial condition to enter the usb_read loop
data = bytearray()
# everything we want is still in the cache?
if size <= len(self._readbuffer)-self._readoffset:
data = self._readbuffer[self._readoffset:self._readoffset+size]
self._readoffset += size
return data
# something still in the cache, but not enough to satisfy 'size'?
if len(self._readbuffer)-self._readoffset != 0:
data = self._readbuffer[self._readoffset:]
# end of readbuffer reached
self._readoffset = len(self._readbuffer)
# read from USB, filling in the local cache as it is empty
retry = attempt
req_size = size
try:
while (len(data) < size) and (length > 0):
while True:
tempbuf = self._read()
retry -= 1
length = len(tempbuf)
# the received buffer contains at least one useful databyte
# (first 2 bytes in each packet represent the current modem
# status)
if length >= 2:
if tempbuf[1] & self.TX_EMPTY_BITS:
if request_gen:
req_size -= length-2
if req_size > 0:
cmd = request_gen(req_size)
if cmd:
self.write_data(cmd)
if length > 2:
retry = attempt
if self._latency_threshold:
self._adapt_latency(True)
# skip the status bytes
chunks = (length+packet_size-1) // packet_size
count = packet_size - 2
# if you want to show status, use the following code:
status = tempbuf[:2]
if status[1] & self.ERROR_BITS[1]:
self.log.error(
'FTDI error: %02x:%02x %s',
status[0], status[1], (' '.join(
self.decode_modem_status(status,
True)).title()))
self._readbuffer = bytearray()
self._readoffset = 0
srcoff = 2
for _ in range(chunks):
self._readbuffer += tempbuf[srcoff:srcoff+count]
srcoff += packet_size
length = len(self._readbuffer)
break
# received buffer only contains the modem status bytes
# no data received, may be late, try again
if retry > 0:
continue
# no actual data
self._readbuffer = bytearray()
self._readoffset = 0
if self._latency_threshold:
self._adapt_latency(False)
# no more data to read?
return data
if length > 0:
# data still fits in buf?
if (len(data) + length) <= size:
data += self._readbuffer[self._readoffset:
self._readoffset+length]
self._readoffset += length
# did we read exactly the right amount of bytes?
if len(data) == size:
return data
else:
# partial copy, not enough bytes in the local cache to
# fulfill the request
part_size = min(size-len(data),
len(self._readbuffer)-self._readoffset)
if part_size < 0:
raise FtdiError("Internal Error")
data += self._readbuffer[self._readoffset:
self._readoffset+part_size]
self._readoffset += part_size
return data
except USBError as exc:
raise FtdiError(f'UsbError: {exc}') from exc
# never reached
raise FtdiError("Internal error")
def read_data(self, size: int) -> bytes:
"""Shortcut to received a bytes buffer instead of the array of bytes.
Note that output byte buffer may be shorted than the requested
size.
:param size: the number of bytes to received from the device
:return: payload bytes
"""
return bytes(self.read_data_bytes(size))
def get_cts(self) -> bool:
"""Read terminal status line: Clear To Send
:return: CTS line logical level
"""
status = self.poll_modem_status()
return bool(status & self.MODEM_CTS)
def get_dsr(self) -> bool:
"""Read terminal status line: Data Set Ready
:return: DSR line logical level
"""
status = self.poll_modem_status()
return bool(status & self.MODEM_DSR)
def get_ri(self) -> bool:
"""Read terminal status line: Ring Indicator
:return: RI line logical level
"""
status = self.poll_modem_status()
return bool(status & self.MODEM_RI)
def get_cd(self) -> bool:
"""Read terminal status line: Carrier Detect
:return: CD line logical level
"""
status = self.poll_modem_status()
return bool(status & self.MODEM_RLSD)
def set_dynamic_latency(self, lmin: int, lmax: int,
threshold: int) -> None:
"""Set up or disable latency values.
Dynamic latency management is a load balancer to adapt the
responsiveness of FTDI read request vs. the host CPU load.
It is mostly useful in UART mode, so that read bandwidth can be
increased to the maximum achievable throughput, while maintaining
very low host CPU load when no data is received from the UART.
There should be no need to tweak the default values. Use with care.
Minimum latency is limited to 12 or above, at FTDI device starts
losing bytes when latency is too short...
Maximum latency value is 255 ms.
Polling latency is reset to `lmin` each time at least one payload
byte is received from the FTDI device.
It doubles, up to `lmax`, every `threshold` times no payload has
been received from the FTDI device.
:param lmin: minimum latency level (ms)
:param lmax: maximum latenty level (ms)
:param threshold: count to reset latency to maximum level
"""
if not threshold:
self._latency_count = 0
self._latency_threshold = None
else:
for lat in (lmin, lmax):
if not self.LATENCY_MIN <= lat <= self.LATENCY_MAX:
raise ValueError(f'Latency out of range: {lat}')
self._latency_min = lmin
self._latency_max = lmax
self._latency_threshold = threshold
self._latency = lmin
self.set_latency_timer(self._latency)
def validate_mpsse(self) -> None:
"""Check that the previous MPSSE request has been accepted by the FTDI
device.
:raise FtdiError: if the FTDI device rejected the command.
"""
# only useful in MPSSE mode
bytes_ = self.read_data(2)
if (len(bytes_) >= 2) and (bytes_[0] == '\xfa'):
raise FtdiError(f'Invalid command @ {bytes_[1]}')
@classmethod
def get_error_string(cls) -> str:
"""Wrapper for legacy compatibility.
:return: a constant, meaningless string
"""
return "Unknown error"
# --- Private implementation -------------------------------------------
def _set_interface(self, config: UsbConfiguration, ifnum: int):
"""Select the interface to use on the FTDI device"""
if ifnum == 0:
ifnum = 1
if ifnum-1 not in range(config.bNumInterfaces):
raise ValueError("No such interface for this device")
self._interface = config[(ifnum-1, 0)]
self._index = self._interface.bInterfaceNumber+1
endpoints = sorted([ep.bEndpointAddress for ep in self._interface])
self._in_ep, self._out_ep = endpoints[:2]
# detach kernel driver from the interface
try:
if self._usb_dev.is_kernel_driver_active(self._index - 1):
self._usb_dev.detach_kernel_driver(self._index - 1)
except (NotImplementedError, USBError):
pass
# pylint: disable=protected-access
# need to access private member _ctx of PyUSB device (resource manager)
# until PyUSB #302 is addressed
def _reset_usb_device(self) -> None:
"""Reset USB device (USB command, not FTDI specific)."""
self._usb_dev._ctx.backend.reset_device(self._usb_dev._ctx.handle)
def _is_pyusb_handle_active(self) -> bool:
# Unfortunately, we need to access pyusb ResourceManager
# and there is no public API for this.
return bool(self._usb_dev._ctx.handle)
# pylint: enable-msg=protected-access
def _reset_device(self):
"""Reset the FTDI device (FTDI vendor command)"""
if self._ctrl_transfer_out(Ftdi.SIO_REQ_RESET,
Ftdi.SIO_RESET_SIO):
raise FtdiError('Unable to reset FTDI device')
def _ctrl_transfer_out(self, reqtype: int, value: int, data: bytes = b''):
"""Send a control message to the device"""
try:
return self._usb_dev.ctrl_transfer(
Ftdi.REQ_OUT, reqtype, value, self._index,
bytearray(data), self._usb_write_timeout)
except USBError as exc:
raise FtdiError(f'UsbError: {exc}') from None
def _ctrl_transfer_in(self, reqtype: int, length: int):
"""Request for a control message from the device"""
try:
return self._usb_dev.ctrl_transfer(
Ftdi.REQ_IN, reqtype, 0, self._index, length,
self._usb_read_timeout)
except USBError as exc:
raise FtdiError(f'UsbError: {exc}') from None
def _write(self, data: Union[bytes, bytearray]) -> int:
if self._debug_log:
try:
self.log.debug('> %s', hexlify(data).decode())
except TypeError as exc:
self.log.warning('> (invalid output byte sequence: %s)', exc)
if self._tracer:
self._tracer.send(self._index, data)
try:
return self._usb_dev.write(self._in_ep, data,
self._usb_write_timeout)
except USBError as exc:
raise FtdiError(f'UsbError: {exc}') from None
def _read(self) -> bytes:
try:
data = self._usb_dev.read(self._out_ep, self._readbuffer_chunksize,
self._usb_read_timeout)
except USBError as exc:
raise FtdiError(f'UsbError: {exc}') from None
if data:
if self._debug_log:
self.log.debug('< %s', hexlify(data).decode())
if self._tracer and len(data) > 2:
self._tracer.receive(self._index, data[2:])
return data
def _adapt_latency(self, payload_detected: bool) -> None:
"""Dynamic latency adaptation depending on the presence of a
payload in a RX buffer.
:param payload_detected: whether a payload has been received
within last RX buffer
"""
if payload_detected:
self._latency_count = 0
if self._latency != self._latency_min:
self.set_latency_timer(self._latency_min)
self._latency = self._latency_min
return
# no payload received
self._latency_count += 1
if self._latency != self._latency_max:
if self._latency_count > \
self._latency_threshold:
self._latency *= 2
if self._latency > self._latency_max:
self._latency = self._latency_max
else:
self._latency_count = 0
self.set_latency_timer(self._latency)
def _check_eeprom_size(self, eeprom_size: Optional[int]) -> int:
if self.device_version in self.INT_EEPROMS:
if (eeprom_size and
eeprom_size != self.INT_EEPROMS[self.device_version]):
raise ValueError(f'Invalid EEPROM size: {eeprom_size}')
eeprom_size = self.INT_EEPROMS[self.device_version]
else:
if eeprom_size is None:
eeprom_size = self.max_eeprom_size
if eeprom_size not in self.EXT_EEPROM_SIZES:
raise ValueError(f'Invalid EEPROM size: {eeprom_size}')
return eeprom_size
def _write_eeprom_raw(self, addr: int, data: Union[bytes, bytearray],
dry_run: bool = True) -> None:
"""Write multiple bytes to the EEPROM starting at byte address,
addr. Length of data must be a multiple of 2 since the
EEPROM is 16-bits. So automatically extend data by 1 byte
if this is not the case.
:param int addr: starting byte address to start writing
:param bytes data: data to be written
:param dry_run: log what should be written, do not actually
change the EEPROM content
"""
if self.device_version == 0x0600:
# FT232R internal EEPROM is unstable and latency timer seems
# to have a direct impact on EEPROM programming...
latency = self.get_latency_timer()
else:
latency = 0
try:
if latency:
self.set_latency_timer(self.LATENCY_EEPROM_FT232R)
length = len(data)
if addr & 0x1 or length & 0x1:
raise ValueError('Address/length not even')
for word in sunpack(f'<{length//2}H', data):
if not dry_run:
out = self._usb_dev.ctrl_transfer(
Ftdi.REQ_OUT, Ftdi.SIO_REQ_WRITE_EEPROM,
word, addr >> 1, b'', self._usb_write_timeout)
if out:
raise FtdiEepromError(f'EEPROM Write Error @ {addr}')
self.log.debug('Write EEPROM [0x%02x]: 0x%04x', addr, word)
else:
self.log.info('Fake write EEPROM [0x%02x]: 0x%04x',
addr, word)
addr += 2
finally:
if latency:
self.set_latency_timer(latency)
def _get_max_packet_size(self) -> int:
"""Retrieve the maximum length of a data packet"""
if not self.is_connected:
raise IOError("Device is not yet known", ENODEV)
if not self._interface:
raise IOError("Interface is not yet known", ENODEV)
endpoint = self._interface[0]
packet_size = endpoint.wMaxPacketSize
return packet_size
def _convert_baudrate_legacy(self, baudrate: int) -> Tuple[int, int, int]:
if baudrate > self.BAUDRATE_REF_BASE:
raise ValueError('Invalid baudrate (too high)')
div8 = int(round((8 * self.BAUDRATE_REF_BASE) / baudrate))
if (div8 & 0x7) == 7:
div8 += 1
div = div8 >> 3
div8 &= 0x7
if div8 == 1:
div |= 0xc000
elif div8 >= 4:
div |= 0x4000
elif div8 != 0:
div |= 0x8000
elif div == 1:
div = 0
value = div & 0xFFFF
index = (div >> 16) & 0xFFFF
estimate = int(((8 * self.BAUDRATE_REF_BASE) + (div8//2))//div8)
return estimate, value, index
def _convert_baudrate(self, baudrate: int) -> Tuple[int, int, int]:
"""Convert a requested baudrate into the closest possible baudrate
that can be assigned to the FTDI device
:param baudrate: the baudrate in bps
:return: a 3-uple of the apprimated baudrate, the value and index
to use as the USB configuration parameter
"""
if self.device_version == 0x200:
return self._convert_baudrate_legacy(baudrate)
if self.is_H_series and baudrate >= 1200:
hispeed = True
clock = self.BAUDRATE_REF_HIGH
bb_ratio = self.BITBANG_BAUDRATE_RATIO_HIGH
else:
hispeed = False
clock = self.BAUDRATE_REF_BASE
bb_ratio = self.BITBANG_BAUDRATE_RATIO_BASE
if baudrate > clock:
raise ValueError('Invalid baudrate (too high)')
if baudrate < ((clock >> 14) + 1):
raise ValueError('Invalid baudrate (too low)')
if self.is_bitbang_enabled:
baudrate //= bb_ratio
div8 = int(round((8 * clock) / baudrate))
div = div8 >> 3
div |= self.FRAC_DIV_CODE[div8 & 0x7] << 14
if div == 1:
div = 0
elif div == 0x4001:
div = 1
if hispeed:
div |= 0x00020000
value = div & 0xFFFF
index = (div >> 16) & 0xFFFF
if self.device_version >= 0x0700 or self.device_version == 0x0500:
index <<= 8
index |= self._index
estimate = int(((8 * clock) + (div8//2))//div8)
if self.is_bitbang_enabled:
estimate *= bb_ratio
return estimate, value, index
def _set_baudrate(self, baudrate: int, constrain: bool) -> int:
if self.is_mpsse:
raise FtdiFeatureError('Cannot change frequency w/ current mode')
actual, value, index = self._convert_baudrate(baudrate)
delta = 100*abs(float(actual-baudrate))/baudrate
self.log.debug('Actual baudrate: %d %.1f%% div [%04x:%04x]',
actual, delta, index, value)
# return actual
if constrain and delta > Ftdi.BAUDRATE_TOLERANCE:
raise ValueError(f'Baudrate tolerance exceeded: {delta:.02f}% '
f'(wanted {baudrate}, achievable {actual})')
try:
if self._usb_dev.ctrl_transfer(
Ftdi.REQ_OUT, Ftdi.SIO_REQ_SET_BAUDRATE, value, index,
bytearray(), self._usb_write_timeout):
raise FtdiError('Unable to set baudrate')
return actual
except USBError as exc:
raise FtdiError('UsbError: {exc}') from exc
def _set_frequency(self, frequency: float) -> float:
"""Convert a frequency value into a TCK divisor setting"""
if not self.is_mpsse:
raise FtdiFeatureError('Cannot change frequency w/ current mode')
if frequency > self.frequency_max:
raise FtdiFeatureError(f'Unsupported frequency: {frequency:.0f}')
# Calculate base speed clock divider
divcode = Ftdi.ENABLE_CLK_DIV5
divisor = int((Ftdi.BUS_CLOCK_BASE+frequency/2)/frequency)-1
divisor = max(0, min(0xFFFF, divisor))
actual_freq = Ftdi.BUS_CLOCK_BASE/(divisor+1)
error = (actual_freq/frequency)-1
# Should we use high speed clock available in H series?
if self.is_H_series:
# Calculate high speed clock divider
divisor_hs = int((Ftdi.BUS_CLOCK_HIGH+frequency/2)/frequency)-1
divisor_hs = max(0, min(0xFFFF, divisor_hs))
actual_freq_hs = Ftdi.BUS_CLOCK_HIGH/(divisor_hs+1)
error_hs = (actual_freq_hs/frequency)-1
# Enable if closer to desired frequency (percentually)
if abs(error_hs) < abs(error):
divcode = Ftdi.DISABLE_CLK_DIV5
divisor = divisor_hs
actual_freq = actual_freq_hs
error = error_hs
# FTDI expects little endian
if self.is_H_series:
cmd = bytearray((divcode,))
else:
cmd = bytearray()
cmd.extend((Ftdi.SET_TCK_DIVISOR, divisor & 0xff,
(divisor >> 8) & 0xff))
self.write_data(cmd)
self.validate_mpsse()
# Drain input buffer
self.purge_rx_buffer()
# Note that bus frequency may differ from clock frequency, when
# 3-phase clock is enable, in which case bus frequency = 2/3 clock
# frequency
if actual_freq > 1E6:
self.log.debug('Clock frequency: %.6f MHz (error: %+.1f %%)',
(actual_freq/1E6), error*100)
else:
self.log.debug('Clock frequency: %.3f KHz (error: %+.1f %%)',
(actual_freq/1E3), error*100)
return actual_freq
def __get_timeouts(self) -> Tuple[int, int]:
return self._usb_read_timeout, self._usb_write_timeout
def __set_timeouts(self, timeouts: Tuple[int, int]):
(read_timeout, write_timeout) = timeouts
self._usb_read_timeout = read_timeout
self._usb_write_timeout = write_timeout
timeouts = property(__get_timeouts, __set_timeouts)