2328 lines
94 KiB
Python
2328 lines
94 KiB
Python
# Copyright (c) 2010-2024 Emmanuel Blot <emmanuel.blot@free.fr>
|
|
# Copyright (c) 2016 Emmanuel Bouaziz <ebouaziz@free.fr>
|
|
# All rights reserved.
|
|
#
|
|
# SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
"""FTDI core driver."""
|
|
|
|
from binascii import hexlify
|
|
from collections import OrderedDict
|
|
from enum import IntEnum, unique
|
|
from errno import ENODEV
|
|
from logging import getLogger, DEBUG
|
|
from struct import unpack as sunpack
|
|
from sys import platform
|
|
from typing import Callable, Optional, List, Sequence, TextIO, Tuple, Union
|
|
from usb.core import (Configuration as UsbConfiguration, Device as UsbDevice,
|
|
USBError)
|
|
from usb.util import (build_request_type, release_interface, CTRL_IN, CTRL_OUT,
|
|
CTRL_TYPE_VENDOR, CTRL_RECIPIENT_DEVICE)
|
|
from .misc import to_bool
|
|
from .usbtools import UsbDeviceDescriptor, UsbTools
|
|
|
|
# pylint: disable=invalid-name
|
|
|
|
|
|
class FtdiError(IOError):
|
|
"""Base class error for all FTDI device"""
|
|
|
|
|
|
class FtdiFeatureError(FtdiError):
|
|
"""Requested feature is not available on FTDI device"""
|
|
|
|
|
|
class FtdiMpsseError(FtdiFeatureError):
|
|
"""MPSSE mode not supported on FTDI device"""
|
|
|
|
|
|
class FtdiEepromError(FtdiError):
|
|
"""FTDI EEPROM access errors"""
|
|
|
|
|
|
class Ftdi:
|
|
"""FTDI device driver"""
|
|
|
|
SCHEME = 'ftdi'
|
|
"""URL scheme for :py:class:`UsbTools`."""
|
|
|
|
FTDI_VENDOR = 0x403
|
|
"""USB VID for FTDI chips."""
|
|
|
|
VENDOR_IDS = {'ftdi': FTDI_VENDOR}
|
|
"""Supported vendors, only FTDI.
|
|
To add third parties vendors see :py:meth:`add_custom_vendor`.
|
|
"""
|
|
|
|
PRODUCT_IDS = {
|
|
FTDI_VENDOR: OrderedDict((
|
|
# use an ordered dict so that the first occurence of a PID takes
|
|
# precedence when generating URLs - order does matter.
|
|
('232', 0x6001),
|
|
('232r', 0x6001),
|
|
('232h', 0x6014),
|
|
('2232', 0x6010),
|
|
('2232c', 0x6010),
|
|
('2232d', 0x6010),
|
|
('2232h', 0x6010),
|
|
('4232', 0x6011),
|
|
('4232h', 0x6011),
|
|
('ft-x', 0x6015),
|
|
('230x', 0x6015),
|
|
('231x', 0x6015),
|
|
('234x', 0x6015),
|
|
('4232ha', 0x6048),
|
|
('ft232', 0x6001),
|
|
('ft232r', 0x6001),
|
|
('ft232h', 0x6014),
|
|
('ft2232', 0x6010),
|
|
('ft2232c', 0x6010),
|
|
('ft2232d', 0x6010),
|
|
('ft2232h', 0x6010),
|
|
('ft4232', 0x6011),
|
|
('ft4232h', 0x6011),
|
|
('ft230x', 0x6015),
|
|
('ft231x', 0x6015),
|
|
('ft234x', 0x6015),
|
|
('ft4232ha', 0x6048)))
|
|
}
|
|
"""Supported products, only FTDI officials ones.
|
|
To add third parties and customized products, see
|
|
:py:meth:`add_custom_product`.
|
|
"""
|
|
|
|
DEFAULT_VENDOR = FTDI_VENDOR
|
|
"""Default vendor: FTDI."""
|
|
|
|
DEVICE_NAMES = {
|
|
0x0200: 'ft232am',
|
|
0x0400: 'ft232bm',
|
|
0x0500: 'ft2232c',
|
|
0x0600: 'ft232r',
|
|
0x0700: 'ft2232h',
|
|
0x0800: 'ft4232h',
|
|
0x0900: 'ft232h',
|
|
0x1000: 'ft-x',
|
|
0x3600: 'ft4232ha'}
|
|
"""Common names of FTDI supported devices."""
|
|
|
|
# Note that the FTDI datasheets contradict themselves, so
|
|
# the following values may not be the right ones...
|
|
FIFO_SIZES = {
|
|
0x0200: (128, 128), # FT232AM: TX: 128, RX: 128
|
|
0x0400: (128, 384), # FT232BM: TX: 128, RX: 384
|
|
0x0500: (128, 384), # FT2232C: TX: 128, RX: 384
|
|
0x0600: (256, 128), # FT232R: TX: 256, RX: 128
|
|
0x0700: (4096, 4096), # FT2232H: TX: 4KiB, RX: 4KiB
|
|
0x0800: (2048, 2048), # FT4232H: TX: 2KiB, RX: 2KiB
|
|
0x0900: (1024, 1024), # FT232H: TX: 1KiB, RX: 1KiB
|
|
0x1000: (512, 512), # FT-X: TX: 512, RX: 512
|
|
0x3600: (2048, 2048), # FT4232HA: TX: 2KiB, RX: 2KiB
|
|
}
|
|
"""FTDI chip internal FIFO sizes
|
|
|
|
Note that 'TX' and 'RX' are inverted with the datasheet terminology:
|
|
Values here are seen from the host perspective, whereas datasheet
|
|
values are defined from the device perspective
|
|
"""
|
|
|
|
@unique
|
|
class BitMode(IntEnum):
|
|
"""Function selection."""
|
|
|
|
RESET = 0x00 # switch off altnerative mode (default to UART)
|
|
BITBANG = 0x01 # classical asynchronous bitbang mode
|
|
MPSSE = 0x02 # MPSSE mode, available on 2232x chips
|
|
SYNCBB = 0x04 # synchronous bitbang mode
|
|
MCU = 0x08 # MCU Host Bus Emulation mode,
|
|
OPTO = 0x10 # Fast Opto-Isolated Serial Interface Mode
|
|
CBUS = 0x20 # Bitbang on CBUS pins of R-type chips
|
|
SYNCFF = 0x40 # Single Channel Synchronous FIFO mode
|
|
|
|
# MPSSE Commands
|
|
WRITE_BYTES_PVE_MSB = 0x10
|
|
WRITE_BYTES_NVE_MSB = 0x11
|
|
WRITE_BITS_PVE_MSB = 0x12
|
|
WRITE_BITS_NVE_MSB = 0x13
|
|
WRITE_BYTES_PVE_LSB = 0x18
|
|
WRITE_BYTES_NVE_LSB = 0x19
|
|
WRITE_BITS_PVE_LSB = 0x1a
|
|
WRITE_BITS_NVE_LSB = 0x1b
|
|
READ_BYTES_PVE_MSB = 0x20
|
|
READ_BYTES_NVE_MSB = 0x24
|
|
READ_BITS_PVE_MSB = 0x22
|
|
READ_BITS_NVE_MSB = 0x26
|
|
READ_BYTES_PVE_LSB = 0x28
|
|
READ_BYTES_NVE_LSB = 0x2c
|
|
READ_BITS_PVE_LSB = 0x2a
|
|
READ_BITS_NVE_LSB = 0x2e
|
|
RW_BYTES_PVE_NVE_MSB = 0x31
|
|
RW_BYTES_NVE_PVE_MSB = 0x34
|
|
RW_BITS_PVE_NVE_MSB = 0x33
|
|
RW_BITS_NVE_PVE_MSB = 0x36
|
|
RW_BYTES_PVE_NVE_LSB = 0x39
|
|
RW_BYTES_NVE_PVE_LSB = 0x3c
|
|
RW_BITS_PVE_NVE_LSB = 0x3b
|
|
RW_BITS_NVE_PVE_LSB = 0x3e
|
|
WRITE_BITS_TMS_PVE = 0x4a
|
|
WRITE_BITS_TMS_NVE = 0x4b
|
|
RW_BITS_TMS_PVE_PVE = 0x6a
|
|
RW_BITS_TMS_PVE_NVE = 0x6b
|
|
RW_BITS_TMS_NVE_PVE = 0x6e
|
|
RW_BITS_TMS_NVE_NVE = 0x6f
|
|
SEND_IMMEDIATE = 0x87
|
|
WAIT_ON_HIGH = 0x88
|
|
WAIT_ON_LOW = 0x89
|
|
READ_SHORT = 0x90
|
|
READ_EXTENDED = 0x91
|
|
WRITE_SHORT = 0x92
|
|
WRITE_EXTENDED = 0x93
|
|
# -H series only
|
|
DISABLE_CLK_DIV5 = 0x8a
|
|
ENABLE_CLK_DIV5 = 0x8b
|
|
|
|
# Modem status
|
|
MODEM_CTS = 1 << 4 # Clear to send
|
|
MODEM_DSR = 1 << 5 # Data set ready
|
|
MODEM_RI = 1 << 6 # Ring indicator
|
|
MODEM_RLSD = 1 << 7 # Carrier detect
|
|
MODEM_DR = 1 << 8 # Data ready
|
|
MODEM_OE = 1 << 9 # Overrun error
|
|
MODEM_PE = 1 << 10 # Parity error
|
|
MODEM_FE = 1 << 11 # Framing error
|
|
MODEM_BI = 1 << 12 # Break interrupt
|
|
MODEM_THRE = 1 << 13 # Transmitter holding register
|
|
MODEM_TEMT = 1 << 14 # Transmitter empty
|
|
MODEM_RCVE = 1 << 15 # Error in RCVR FIFO
|
|
|
|
# FTDI MPSSE commands
|
|
SET_BITS_LOW = 0x80 # Change LSB GPIO output
|
|
SET_BITS_HIGH = 0x82 # Change MSB GPIO output
|
|
GET_BITS_LOW = 0x81 # Get LSB GPIO output
|
|
GET_BITS_HIGH = 0x83 # Get MSB GPIO output
|
|
LOOPBACK_START = 0x84 # Enable loopback
|
|
LOOPBACK_END = 0x85 # Disable loopback
|
|
SET_TCK_DIVISOR = 0x86 # Set clock
|
|
# -H series only
|
|
ENABLE_CLK_3PHASE = 0x8c # Enable 3-phase data clocking (I2C)
|
|
DISABLE_CLK_3PHASE = 0x8d # Disable 3-phase data clocking
|
|
CLK_BITS_NO_DATA = 0x8e # Allows JTAG clock to be output w/o data
|
|
CLK_BYTES_NO_DATA = 0x8f # Allows JTAG clock to be output w/o data
|
|
CLK_WAIT_ON_HIGH = 0x94 # Clock until GPIOL1 is high
|
|
CLK_WAIT_ON_LOW = 0x95 # Clock until GPIOL1 is low
|
|
ENABLE_CLK_ADAPTIVE = 0x96 # Enable JTAG adaptive clock for ARM
|
|
DISABLE_CLK_ADAPTIVE = 0x97 # Disable JTAG adaptive clock
|
|
CLK_COUNT_WAIT_ON_HIGH = 0x9c # Clock byte cycles until GPIOL1 is high
|
|
CLK_COUNT_WAIT_ON_LOW = 0x9d # Clock byte cycles until GPIOL1 is low
|
|
# FT232H only
|
|
DRIVE_ZERO = 0x9e # Drive-zero mode
|
|
|
|
# USB control requests
|
|
REQ_OUT = build_request_type(CTRL_OUT, CTRL_TYPE_VENDOR,
|
|
CTRL_RECIPIENT_DEVICE)
|
|
REQ_IN = build_request_type(CTRL_IN, CTRL_TYPE_VENDOR,
|
|
CTRL_RECIPIENT_DEVICE)
|
|
|
|
# Requests
|
|
SIO_REQ_RESET = 0x0 # Reset the port
|
|
SIO_REQ_SET_MODEM_CTRL = 0x1 # Set the modem control register
|
|
SIO_REQ_SET_FLOW_CTRL = 0x2 # Set flow control register
|
|
SIO_REQ_SET_BAUDRATE = 0x3 # Set baud rate
|
|
SIO_REQ_SET_DATA = 0x4 # Set the data characteristics of the port
|
|
SIO_REQ_POLL_MODEM_STATUS = 0x5 # Get line status
|
|
SIO_REQ_SET_EVENT_CHAR = 0x6 # Change event character
|
|
SIO_REQ_SET_ERROR_CHAR = 0x7 # Change error character
|
|
SIO_REQ_SET_LATENCY_TIMER = 0x9 # Change latency timer
|
|
SIO_REQ_GET_LATENCY_TIMER = 0xa # Get latency timer
|
|
SIO_REQ_SET_BITMODE = 0xb # Change bit mode
|
|
SIO_REQ_READ_PINS = 0xc # Read GPIO pin value (or "get bitmode")
|
|
|
|
# Eeprom requests
|
|
SIO_REQ_EEPROM = 0x90
|
|
SIO_REQ_READ_EEPROM = SIO_REQ_EEPROM + 0 # Read EEPROM content
|
|
SIO_REQ_WRITE_EEPROM = SIO_REQ_EEPROM + 1 # Write EEPROM content
|
|
SIO_REQ_ERASE_EEPROM = SIO_REQ_EEPROM + 2 # Erase EEPROM content
|
|
|
|
# Reset arguments
|
|
SIO_RESET_SIO = 0 # Reset device
|
|
SIO_RESET_PURGE_RX = 1 # Drain USB RX buffer (host-to-ftdi)
|
|
SIO_RESET_PURGE_TX = 2 # Drain USB TX buffer (ftdi-to-host)
|
|
|
|
# Flow control arguments
|
|
SIO_DISABLE_FLOW_CTRL = 0x0
|
|
SIO_RTS_CTS_HS = 0x1 << 8
|
|
SIO_DTR_DSR_HS = 0x2 << 8
|
|
SIO_XON_XOFF_HS = 0x4 << 8
|
|
SIO_SET_DTR_MASK = 0x1
|
|
SIO_SET_DTR_HIGH = SIO_SET_DTR_MASK | (SIO_SET_DTR_MASK << 8)
|
|
SIO_SET_DTR_LOW = 0x0 | (SIO_SET_DTR_MASK << 8)
|
|
SIO_SET_RTS_MASK = 0x2
|
|
SIO_SET_RTS_HIGH = SIO_SET_RTS_MASK | (SIO_SET_RTS_MASK << 8)
|
|
SIO_SET_RTS_LOW = 0x0 | (SIO_SET_RTS_MASK << 8)
|
|
|
|
# Parity bits
|
|
PARITY_NONE, PARITY_ODD, PARITY_EVEN, PARITY_MARK, PARITY_SPACE = range(5)
|
|
# Number of stop bits
|
|
STOP_BIT_1, STOP_BIT_15, STOP_BIT_2 = range(3)
|
|
# Number of bits
|
|
BITS_7, BITS_8 = [7+i for i in range(2)]
|
|
# Break type
|
|
BREAK_OFF, BREAK_ON = range(2)
|
|
|
|
# cts: Clear to send
|
|
# dsr: Data set ready
|
|
# ri: Ring indicator
|
|
# dcd: Data carrier detect
|
|
# dr: Data ready
|
|
# oe: Overrun error
|
|
# pe: Parity error
|
|
# fe: Framing error
|
|
# bi: Break interrupt
|
|
# thre: Transmitter holding register empty
|
|
# temt: Transmitter empty
|
|
# err: Error in RCVR FIFO
|
|
MODEM_STATUS = [('', '', '', '', 'cts', 'dsr', 'ri', 'dcd'),
|
|
('dr', 'overrun', 'parity', 'framing',
|
|
'break', 'thre', 'txe', 'rcve')]
|
|
|
|
ERROR_BITS = (0x00, 0x8E)
|
|
TX_EMPTY_BITS = 0x60
|
|
|
|
# Clocks and baudrates
|
|
BUS_CLOCK_BASE = 6.0E6 # 6 MHz
|
|
BUS_CLOCK_HIGH = 30.0E6 # 30 MHz
|
|
BAUDRATE_REF_BASE = int(3.0E6) # 3 MHz
|
|
BAUDRATE_REF_HIGH = int(12.0E6) # 12 MHz
|
|
BITBANG_BAUDRATE_RATIO_BASE = 16
|
|
BITBANG_BAUDRATE_RATIO_HIGH = 5
|
|
BAUDRATE_TOLERANCE = 3.0 # acceptable clock drift for UART, in %
|
|
|
|
FRAC_DIV_CODE = (0, 3, 2, 4, 1, 5, 6, 7)
|
|
|
|
# Latency
|
|
LATENCY_MIN = 1
|
|
LATENCY_MAX = 255
|
|
LATENCY_EEPROM_FT232R = 77
|
|
|
|
# EEPROM Properties
|
|
EXT_EEPROM_SIZES = (128, 256) # in bytes (93C66 seen as 93C56)
|
|
|
|
INT_EEPROMS = {
|
|
0x0600: 0x80, # FT232R: 128 bytes, 1024 bits
|
|
0x1000: 0x400 # FT230*X: 1KiB
|
|
}
|
|
|
|
def __init__(self):
|
|
self.log = getLogger('pyftdi.ftdi')
|
|
self._debug_log = False
|
|
self._usb_dev = None
|
|
self._usb_read_timeout = 5000
|
|
self._usb_write_timeout = 5000
|
|
self._baudrate = -1
|
|
self._readbuffer = bytearray()
|
|
self._readoffset = 0
|
|
self._readbuffer_chunksize = 4 << 10 # 4KiB
|
|
self._writebuffer_chunksize = 4 << 10 # 4KiB
|
|
self._max_packet_size = 0
|
|
self._interface = None
|
|
self._index = None
|
|
self._in_ep = None
|
|
self._out_ep = None
|
|
self._bitmode = Ftdi.BitMode.RESET
|
|
self._latency = 0
|
|
self._latency_count = 0
|
|
self._latency_min = self.LATENCY_MIN
|
|
self._latency_max = self.LATENCY_MAX
|
|
self._latency_threshold = None # disable dynamic latency
|
|
self._lineprop = 0
|
|
self._cbus_pins = (0, 0)
|
|
self._cbus_out = 0
|
|
self._tracer = None
|
|
|
|
# --- Public API -------------------------------------------------------
|
|
|
|
@classmethod
|
|
def create_from_url(cls, url: str) -> 'Ftdi':
|
|
"""Create an Ftdi instance from an URL
|
|
|
|
URL scheme: ftdi://[vendor[:product[:index|:serial]]]/interface
|
|
|
|
:param url: FTDI device selector
|
|
:return: a fresh, open Ftdi instance
|
|
"""
|
|
device = Ftdi()
|
|
device.open_from_url(url)
|
|
return device
|
|
|
|
@classmethod
|
|
def list_devices(cls, url: Optional[str] = None) -> \
|
|
List[Tuple[UsbDeviceDescriptor, int]]:
|
|
"""List of URLs of connected FTDI devices.
|
|
|
|
:param url: a pattern URL to restrict the search
|
|
:return: list of (UsbDeviceDescriptor, interface)
|
|
"""
|
|
return UsbTools.list_devices(url or 'ftdi:///?',
|
|
cls.VENDOR_IDS, cls.PRODUCT_IDS,
|
|
cls.DEFAULT_VENDOR)
|
|
|
|
@classmethod
|
|
def show_devices(cls, url: Optional[str] = None,
|
|
out: Optional[TextIO] = None) -> None:
|
|
"""Print the URLs and descriptors of connected FTDI devices.
|
|
|
|
:param url: a pattern URL to restrict the search
|
|
:param out: output stream, default to stdout
|
|
"""
|
|
devdescs = UsbTools.list_devices(url or 'ftdi:///?',
|
|
cls.VENDOR_IDS, cls.PRODUCT_IDS,
|
|
cls.DEFAULT_VENDOR)
|
|
UsbTools.show_devices('ftdi', cls.VENDOR_IDS, cls.PRODUCT_IDS,
|
|
devdescs, out)
|
|
|
|
@classmethod
|
|
def get_identifiers(cls, url: str) -> Tuple[UsbDeviceDescriptor, int]:
|
|
"""Extract the identifiers of an FTDI device from URL, if any
|
|
|
|
:param url: input URL to parse
|
|
"""
|
|
return UsbTools.parse_url(url,
|
|
cls.SCHEME, cls.VENDOR_IDS, cls.PRODUCT_IDS,
|
|
cls.DEFAULT_VENDOR)
|
|
|
|
@classmethod
|
|
def get_device(cls, url: str) -> UsbDevice:
|
|
"""Get a USB device from its URL, without opening an instance.
|
|
|
|
:param url: input URL to parse
|
|
:return: the USB device that match the specified URL
|
|
"""
|
|
devdesc, _ = cls.get_identifiers(url)
|
|
return UsbTools.get_device(devdesc)
|
|
|
|
@classmethod
|
|
def add_custom_vendor(cls, vid: int, vidname: str = '') -> None:
|
|
"""Add a custom USB vendor identifier.
|
|
|
|
It can be useful to use a pretty URL for opening FTDI device
|
|
|
|
:param vid: Vendor ID (USB 16-bit identifier)
|
|
:param vidname: Vendor name (arbitrary string)
|
|
:raise ValueError: if the vendor id is already referenced
|
|
"""
|
|
if vid in cls.VENDOR_IDS.values():
|
|
raise ValueError(f'Vendor ID 0x{vid:04x} already registered')
|
|
if not vidname:
|
|
vidname = f'0x{vid:04x}'
|
|
cls.VENDOR_IDS[vidname] = vid
|
|
|
|
@classmethod
|
|
def add_custom_product(cls, vid: int, pid: int, pidname: str = '') -> None:
|
|
"""Add a custom USB product identifier.
|
|
|
|
It is required for opening FTDI device with non-standard VID/PID
|
|
USB identifiers.
|
|
|
|
:param vid: Vendor ID (USB 16-bit identifier)
|
|
:param pid: Product ID (USB 16-bit identifier)
|
|
:param pidname: Product name (arbitrary string)
|
|
:raise ValueError: if the product id is already referenced
|
|
"""
|
|
if vid not in cls.PRODUCT_IDS:
|
|
cls.PRODUCT_IDS[vid] = OrderedDict()
|
|
elif pid in cls.PRODUCT_IDS[vid].values():
|
|
raise ValueError(f'Product ID 0x{vid:04x}:0x{pid:04x} already '
|
|
f'registered')
|
|
if not pidname:
|
|
pidname = f'0x{pid:04x}'
|
|
cls.PRODUCT_IDS[vid][pidname] = pid
|
|
|
|
@classmethod
|
|
def decode_modem_status(cls, value: bytes, error_only: bool = False) -> \
|
|
Tuple[str, ...]:
|
|
"""Decode the FTDI modem status bitfield into short strings.
|
|
|
|
:param value: 2-byte mode status
|
|
:param error_only: only decode error flags
|
|
:return: a tuple of status identifiers
|
|
"""
|
|
status = []
|
|
for pos, (byte_, ebits) in enumerate(zip(value, cls.ERROR_BITS)):
|
|
for bit, _ in enumerate(cls.MODEM_STATUS[pos]):
|
|
if error_only:
|
|
byte_ &= ebits
|
|
if byte_ & (1 << bit):
|
|
status.append(cls.MODEM_STATUS[pos][bit])
|
|
return tuple(status)
|
|
|
|
@staticmethod
|
|
def find_all(vps: Sequence[Tuple[int, int]], nocache: bool = False) -> \
|
|
List[Tuple[UsbDeviceDescriptor, int]]:
|
|
"""Find all devices that match the vendor/product pairs of the vps
|
|
list.
|
|
|
|
:param vps: a sequence of 2-tuple (vid, pid) pairs
|
|
:type vps: tuple(int, int)
|
|
:param bool nocache: bypass cache to re-enumerate USB devices on
|
|
the host
|
|
:return: a list of 5-tuple (vid, pid, sernum, iface, description)
|
|
device descriptors
|
|
:rtype: list(tuple(int,int,str,int,str))
|
|
"""
|
|
return UsbTools.find_all(vps, nocache)
|
|
|
|
@property
|
|
def is_connected(self) -> bool:
|
|
"""Tells whether this instance is connected to an actual USB slave.
|
|
|
|
:return: the slave connection status
|
|
"""
|
|
return bool(self._usb_dev)
|
|
|
|
def open_from_url(self, url: str) -> None:
|
|
"""Open a new interface to the specified FTDI device.
|
|
|
|
:param str url: a FTDI URL selector
|
|
"""
|
|
devdesc, interface = self.get_identifiers(url)
|
|
device = UsbTools.get_device(devdesc)
|
|
self.open_from_device(device, interface)
|
|
|
|
def open(self, vendor: int, product: int, bus: Optional[int] = None,
|
|
address: Optional[int] = None, index: int = 0,
|
|
serial: Optional[str] = None,
|
|
interface: int = 1) -> None:
|
|
"""Open a new interface to the specified FTDI device.
|
|
|
|
If several FTDI devices of the same kind (vid, pid) are connected
|
|
to the host, either index or serial argument should be used to
|
|
discriminate the FTDI device.
|
|
|
|
index argument is not a reliable solution as the host may enumerate
|
|
the USB device in random order. serial argument is more reliable
|
|
selector and should always be prefered.
|
|
|
|
Some FTDI devices support several interfaces/ports (such as FT2232H,
|
|
FT4232H and FT4232HA). The interface argument selects the FTDI port
|
|
to use, starting from 1 (not 0).
|
|
|
|
:param int vendor: USB vendor id
|
|
:param int product: USB product id
|
|
:param int bus: optional selector, USB bus
|
|
:param int address: optional selector, USB address on bus
|
|
:param int index: optional selector, specified the n-th matching
|
|
FTDI enumerated USB device on the host
|
|
:param str serial: optional selector, specified the FTDI device
|
|
by its serial number
|
|
:param str interface: FTDI interface/port
|
|
"""
|
|
devdesc = UsbDeviceDescriptor(vendor, product, bus, address, serial,
|
|
index, None)
|
|
device = UsbTools.get_device(devdesc)
|
|
self.open_from_device(device, interface)
|
|
|
|
def open_from_device(self, device: UsbDevice,
|
|
interface: int = 1) -> None:
|
|
"""Open a new interface from an existing USB device.
|
|
|
|
:param device: FTDI USB device (PyUSB instance)
|
|
:param interface: FTDI interface to use (integer starting from 1)
|
|
"""
|
|
if not isinstance(device, UsbDevice):
|
|
raise FtdiError(f"Device '{device}' is not a PyUSB device")
|
|
self._usb_dev = device
|
|
try:
|
|
self._usb_dev.set_configuration()
|
|
except USBError:
|
|
pass
|
|
# detect invalid interface as early as possible
|
|
config = self._usb_dev.get_active_configuration()
|
|
if interface > config.bNumInterfaces:
|
|
raise FtdiError(f'No such FTDI port: {interface}')
|
|
self._set_interface(config, interface)
|
|
self._max_packet_size = self._get_max_packet_size()
|
|
# Invalidate data in the readbuffer
|
|
self._readoffset = 0
|
|
self._readbuffer = bytearray()
|
|
# Drain input buffer
|
|
self.purge_buffers()
|
|
# Shallow reset
|
|
self._reset_device()
|
|
# Reset feature mode
|
|
self.set_bitmode(0, Ftdi.BitMode.RESET)
|
|
# Init latency
|
|
self._latency_threshold = None
|
|
self.set_latency_timer(self.LATENCY_MIN)
|
|
self._debug_log = self.log.getEffectiveLevel() == DEBUG
|
|
|
|
def close(self, freeze: bool = False) -> None:
|
|
"""Close the FTDI interface/port.
|
|
|
|
:param freeze: if set, FTDI port is not reset to its default
|
|
state on close. This means the port is left with
|
|
its current configuration and output signals.
|
|
This feature should not be used except for very
|
|
specific needs.
|
|
"""
|
|
if self._usb_dev:
|
|
dev = self._usb_dev
|
|
if self._is_pyusb_handle_active():
|
|
# Do not attempt to execute the following calls if the
|
|
# device has been closed: the ResourceManager may attempt
|
|
# to re-open the device that has been already closed, and
|
|
# this may lead to a (native) crash in libusb.
|
|
try:
|
|
if not freeze:
|
|
self.set_bitmode(0, Ftdi.BitMode.RESET)
|
|
self.set_latency_timer(self.LATENCY_MAX)
|
|
release_interface(dev, self._index - 1)
|
|
except FtdiError as exc:
|
|
self.log.warning('FTDI device may be gone: %s', exc)
|
|
try:
|
|
self._usb_dev.attach_kernel_driver(self._index - 1)
|
|
except (NotImplementedError, USBError):
|
|
pass
|
|
self._usb_dev = None
|
|
UsbTools.release_device(dev)
|
|
|
|
def reset(self, usb_reset: bool = False) -> None:
|
|
"""Reset FTDI device.
|
|
|
|
:param usb_reset: wether to perform a full USB reset of the device.
|
|
|
|
Beware that selecting usb_reset performs a full USB device reset,
|
|
which means all other interfaces of the same device are also
|
|
affected.
|
|
"""
|
|
if not self.is_connected:
|
|
raise FtdiError('Not connected')
|
|
self._reset_device()
|
|
if usb_reset:
|
|
self._reset_usb_device()
|
|
|
|
def open_mpsse_from_url(self, url: str, direction: int = 0x0,
|
|
initial: int = 0x0, frequency: float = 6.0E6,
|
|
latency: int = 16, debug: bool = False) -> float:
|
|
"""Open a new interface to the specified FTDI device in MPSSE mode.
|
|
|
|
MPSSE enables I2C, SPI, JTAG or other synchronous serial interface
|
|
modes (vs. UART mode).
|
|
|
|
:param url: a FTDI URL selector
|
|
:param direction: a bitfield specifying the FTDI GPIO direction,
|
|
where high level defines an output, and low level defines an
|
|
input
|
|
:param initial: a bitfield specifying the initial output value
|
|
:param float frequency: serial interface clock in Hz
|
|
:param latency: low-level latency in milliseconds. The shorter
|
|
the delay, the higher the host CPU load. Do not use shorter
|
|
values than the default, as it triggers data loss in FTDI.
|
|
:param debug: use a tracer to decode MPSSE protocol
|
|
:return: actual bus frequency in Hz
|
|
"""
|
|
devdesc, interface = self.get_identifiers(url)
|
|
device = UsbTools.get_device(devdesc)
|
|
return self.open_mpsse_from_device(device, interface,
|
|
direction=direction,
|
|
initial=initial,
|
|
frequency=frequency,
|
|
latency=latency,
|
|
debug=debug)
|
|
|
|
def open_mpsse(self, vendor: int, product: int, bus: Optional[int] = None,
|
|
address: Optional[int] = None, index: int = 0,
|
|
serial: Optional[str] = None, interface: int = 1,
|
|
direction: int = 0x0, initial: int = 0x0,
|
|
frequency: float = 6.0E6, latency: int = 16,
|
|
debug: bool = False) -> float:
|
|
"""Open a new interface to the specified FTDI device in MPSSE mode.
|
|
|
|
MPSSE enables I2C, SPI, JTAG or other synchronous serial interface
|
|
modes (vs. UART mode).
|
|
|
|
If several FTDI devices of the same kind (vid, pid) are connected
|
|
to the host, either index or serial argument should be used to
|
|
discriminate the FTDI device.
|
|
|
|
index argument is not a reliable solution as the host may enumerate
|
|
the USB device in random order. serial argument is more reliable
|
|
selector and should always be prefered.
|
|
|
|
Some FTDI devices support several interfaces/ports (such as FT2232H,
|
|
FT4232H and FT4232HA). The interface argument selects the FTDI port
|
|
to use, starting from 1 (not 0). Note that not all FTDI ports are
|
|
MPSSE capable.
|
|
|
|
:param vendor: USB vendor id
|
|
:param product: USB product id
|
|
:param bus: optional selector, USB bus
|
|
:param address: optional selector, USB address on bus
|
|
:param index: optional selector, specified the n-th matching
|
|
FTDI enumerated USB device on the host
|
|
:param serial: optional selector, specified the FTDI device
|
|
by its serial number
|
|
:param interface: FTDI interface/port
|
|
:param direction: a bitfield specifying the FTDI GPIO direction,
|
|
where high level defines an output, and low level defines an
|
|
input
|
|
:param initial: a bitfield specifying the initial output value
|
|
:param frequency: serial interface clock in Hz
|
|
:param latency: low-level latency in milliseconds. The shorter
|
|
the delay, the higher the host CPU load. Do not use shorter
|
|
values than the default, as it triggers data loss in FTDI.
|
|
:param bool debug: use a tracer to decode MPSSE protocol
|
|
:return: actual bus frequency in Hz
|
|
"""
|
|
devdesc = UsbDeviceDescriptor(vendor, product, bus, address, serial,
|
|
index, None)
|
|
device = UsbTools.get_device(devdesc)
|
|
return self.open_mpsse_from_device(device, interface,
|
|
direction=direction,
|
|
initial=initial,
|
|
frequency=frequency,
|
|
latency=latency,
|
|
debug=debug)
|
|
|
|
def open_mpsse_from_device(self, device: UsbDevice,
|
|
interface: int = 1, direction: int = 0x0,
|
|
initial: int = 0x0, frequency: float = 6.0E6,
|
|
latency: int = 16, tracer: bool = False,
|
|
debug: bool = False) -> float:
|
|
"""Open a new interface to the specified FTDI device in MPSSE mode.
|
|
|
|
MPSSE enables I2C, SPI, JTAG or other synchronous serial interface
|
|
modes (vs. UART mode).
|
|
|
|
If several FTDI devices of the same kind (vid, pid) are connected
|
|
to the host, either index or serial argument should be used to
|
|
discriminate the FTDI device.
|
|
|
|
index argument is not a reliable solution as the host may enumerate
|
|
the USB device in random order. serial argument is more reliable
|
|
selector and should always be prefered.
|
|
|
|
Some FTDI devices support several interfaces/ports (such as FT2232H,
|
|
FT4232H and FT4232HA). The interface argument selects the FTDI port
|
|
to use, starting from 1 (not 0). Note that not all FTDI ports are
|
|
MPSSE capable.
|
|
|
|
:param device: FTDI USB device
|
|
:param interface: FTDI interface/port
|
|
:param direction: a bitfield specifying the FTDI GPIO direction,
|
|
where high level defines an output, and low level defines an
|
|
input
|
|
:param initial: a bitfield specifying the initial output value
|
|
:param frequency: serial interface clock in Hz
|
|
:param latency: low-level latency in milliseconds. The shorter
|
|
the delay, the higher the host CPU load. Do not use shorter
|
|
values than the default, as it triggers data loss in FTDI.
|
|
:param bool tracer: use a tracer to decode MPSSE protocol
|
|
:param bool debug: add more debug traces
|
|
:return: actual bus frequency in Hz
|
|
"""
|
|
# pylint: disable=unused-argument
|
|
self.open_from_device(device, interface)
|
|
if not self.is_mpsse_interface(interface):
|
|
self.close()
|
|
raise FtdiMpsseError('This interface does not support MPSSE')
|
|
if to_bool(tracer): # accept strings as boolean
|
|
# pylint: disable=import-outside-toplevel
|
|
from .tracer import FtdiMpsseTracer
|
|
self._tracer = FtdiMpsseTracer(self.device_version)
|
|
self.log.debug('Using MPSSE tracer')
|
|
# Set latency timer
|
|
self.set_latency_timer(latency)
|
|
# Set chunk size
|
|
self.write_data_set_chunksize()
|
|
self.read_data_set_chunksize()
|
|
# Reset feature mode
|
|
self.set_bitmode(0, Ftdi.BitMode.RESET)
|
|
# Drain buffers
|
|
self.purge_buffers()
|
|
# Disable event and error characters
|
|
self.set_event_char(0, False)
|
|
self.set_error_char(0, False)
|
|
# Enable MPSSE mode
|
|
self.set_bitmode(direction, Ftdi.BitMode.MPSSE)
|
|
# Configure clock
|
|
frequency = self._set_frequency(frequency)
|
|
# Configure I/O
|
|
cmd = bytearray((Ftdi.SET_BITS_LOW, initial & 0xFF, direction & 0xFF))
|
|
if self.has_wide_port:
|
|
initial >>= 8
|
|
direction >>= 8
|
|
cmd.extend((Ftdi.SET_BITS_HIGH, initial & 0xFF, direction & 0xFF))
|
|
self.write_data(cmd)
|
|
# Disable loopback
|
|
self.write_data(bytearray((Ftdi.LOOPBACK_END,)))
|
|
self.validate_mpsse()
|
|
# Return the actual frequency
|
|
return frequency
|
|
|
|
def open_bitbang_from_url(self, url: str, direction: int = 0x0,
|
|
latency: int = 16, baudrate: int = 1000000,
|
|
sync: bool = False) -> float:
|
|
"""Open a new interface to the specified FTDI device in bitbang mode.
|
|
|
|
Bitbang enables direct read or write to FTDI GPIOs.
|
|
|
|
:param url: a FTDI URL selector
|
|
:param direction: a bitfield specifying the FTDI GPIO direction,
|
|
where high level defines an output, and low level defines an
|
|
input
|
|
:param latency: low-level latency to select the USB FTDI poll
|
|
delay. The shorter the delay, the higher the host CPU load.
|
|
:param baudrate: pace to sequence GPIO exchanges
|
|
:param sync: whether to use synchronous or asynchronous bitbang
|
|
:return: actual bitbang baudrate in bps
|
|
"""
|
|
devdesc, interface = self.get_identifiers(url)
|
|
device = UsbTools.get_device(devdesc)
|
|
return self.open_bitbang_from_device(device, interface,
|
|
direction=direction,
|
|
latency=latency,
|
|
baudrate=baudrate,
|
|
sync=sync)
|
|
|
|
def open_bitbang(self, vendor: int, product: int,
|
|
bus: Optional[int] = None, address: Optional[int] = None,
|
|
index: int = 0, serial: Optional[str] = None,
|
|
interface: int = 1, direction: int = 0x0,
|
|
latency: int = 16, baudrate: int = 1000000,
|
|
sync: bool = False) -> float:
|
|
"""Open a new interface to the specified FTDI device in bitbang mode.
|
|
|
|
Bitbang enables direct read or write to FTDI GPIOs.
|
|
|
|
:param vendor: USB vendor id
|
|
:param product: USB product id
|
|
:param index: optional selector, specified the n-th matching
|
|
FTDI enumerated USB device on the host
|
|
:param serial: optional selector, specified the FTDI device
|
|
by its serial number
|
|
:param interface: FTDI interface/port
|
|
:param direction: a bitfield specifying the FTDI GPIO direction,
|
|
where high level defines an output, and low level defines an
|
|
input
|
|
:param latency: low-level latency to select the USB FTDI poll
|
|
delay. The shorter the delay, the higher the host CPU load.
|
|
:param baudrate: pace to sequence GPIO exchanges
|
|
:param sync: whether to use synchronous or asynchronous bitbang
|
|
:return: actual bitbang baudrate in bps
|
|
"""
|
|
devdesc = UsbDeviceDescriptor(vendor, product, bus, address, serial,
|
|
index, None)
|
|
device = UsbTools.get_device(devdesc)
|
|
return self.open_bitbang_from_device(device, interface,
|
|
direction=direction,
|
|
latency=latency,
|
|
baudrate=baudrate,
|
|
sync=sync)
|
|
|
|
def open_bitbang_from_device(self, device: UsbDevice,
|
|
interface: int = 1, direction: int = 0x0,
|
|
latency: int = 16, baudrate: int = 1000000,
|
|
sync: bool = False) -> int:
|
|
"""Open a new interface to the specified FTDI device in bitbang mode.
|
|
|
|
Bitbang enables direct read or write to FTDI GPIOs.
|
|
|
|
:param device: FTDI USB device
|
|
:param interface: FTDI interface/port
|
|
:param direction: a bitfield specifying the FTDI GPIO direction,
|
|
where high level defines an output, and low level defines an
|
|
input
|
|
:param latency: low-level latency to select the USB FTDI poll
|
|
delay. The shorter the delay, the higher the host CPU load.
|
|
:param baudrate: pace to sequence GPIO exchanges
|
|
:param sync: whether to use synchronous or asynchronous bitbang
|
|
:return: actual bitbang baudrate in bps
|
|
"""
|
|
self.open_from_device(device, interface)
|
|
# Set latency timer
|
|
self.set_latency_timer(latency)
|
|
# Set chunk size
|
|
# Beware that RX buffer, over 512 bytes, contains 2-byte modem marker
|
|
# on every 512 byte chunk, so data and out-of-band marker get
|
|
# interleaved. This is not yet supported with read_data_bytes for now
|
|
self.write_data_set_chunksize()
|
|
self.read_data_set_chunksize()
|
|
# disable flow control
|
|
self.set_flowctrl('')
|
|
# Enable BITBANG mode
|
|
self.set_bitmode(direction, Ftdi.BitMode.BITBANG if not sync else
|
|
Ftdi.BitMode.SYNCBB)
|
|
# Configure clock
|
|
if baudrate:
|
|
self._baudrate = self._set_baudrate(baudrate, False)
|
|
# Drain input buffer
|
|
self.purge_buffers()
|
|
return self._baudrate
|
|
|
|
@property
|
|
def usb_path(self) -> Tuple[int, int, int]:
|
|
"""Provide the physical location on the USB topology.
|
|
|
|
:return: a tuple of bus, address, interface; if connected
|
|
"""
|
|
if not self.is_connected:
|
|
raise FtdiError('Not connected')
|
|
return (self._usb_dev.bus, self._usb_dev.address,
|
|
self._interface.bInterfaceNumber)
|
|
|
|
@property
|
|
def device_version(self) -> int:
|
|
"""Report the device version, i.e. the kind of device.
|
|
|
|
:see: :py:meth:`ic_name` for a product version of this information.
|
|
|
|
:return: the device version (16-bit integer)
|
|
"""
|
|
if not self.is_connected:
|
|
raise FtdiError('Device characteristics not yet known')
|
|
return self._usb_dev.bcdDevice
|
|
|
|
@property
|
|
def ic_name(self) -> str:
|
|
"""Return the current type of the FTDI device as a string
|
|
|
|
see also http://www.ftdichip.com/Support/
|
|
Documents/TechnicalNotes/TN_100_USB_VID-PID_Guidelines.pdf
|
|
|
|
:return: the identified FTDI device as a string
|
|
"""
|
|
if not self.is_connected:
|
|
return 'unknown'
|
|
return self.DEVICE_NAMES.get(self.device_version, 'undefined')
|
|
|
|
@property
|
|
def device_port_count(self) -> int:
|
|
"""Report the count of port/interface of the Ftdi device.
|
|
|
|
:return: the count of ports
|
|
"""
|
|
if not self.is_connected:
|
|
raise FtdiError('Device characteristics not yet known')
|
|
return self._usb_dev.get_active_configuration().bNumInterfaces
|
|
|
|
@property
|
|
def port_index(self) -> int:
|
|
"""Report the port/interface index, starting from 1
|
|
|
|
:return: the port position/index
|
|
"""
|
|
if not self.is_connected:
|
|
raise FtdiError('Device characteristics not yet known')
|
|
return self._index
|
|
|
|
@property
|
|
def port_width(self) -> int:
|
|
"""Report the width of a single port / interface
|
|
|
|
:return: the width of the port, in bits
|
|
:raise FtdiError: if no FTDI port is open
|
|
"""
|
|
if not self.is_connected:
|
|
raise FtdiError('Device characteristics not yet known')
|
|
if self.device_version in (0x0700, 0x0900):
|
|
return 16
|
|
if self.device_version in (0x0500, ):
|
|
return 12
|
|
return 8
|
|
|
|
@property
|
|
def has_mpsse(self) -> bool:
|
|
"""Tell whether the device supports MPSSE (I2C, SPI, JTAG, ...)
|
|
|
|
:return: True if the FTDI device supports MPSSE
|
|
:raise FtdiError: if no FTDI port is open
|
|
"""
|
|
if not self.is_connected:
|
|
raise FtdiError('Device characteristics not yet known')
|
|
return self.device_version in (0x0500, 0x0700, 0x0800, 0x0900, 0x3600)
|
|
|
|
@property
|
|
def has_wide_port(self) -> bool:
|
|
"""Tell whether the device supports 16-bit GPIO ports (vs. 8 bits)
|
|
|
|
:return: True if the FTDI device supports wide GPIO port
|
|
:raise FtdiError: if no FTDI port is open
|
|
"""
|
|
return self.port_width > 8
|
|
|
|
@property
|
|
def has_cbus(self) -> bool:
|
|
"""Tell whether the device supports CBUS bitbang.
|
|
|
|
CBUS bitbanging feature requires a special configuration in EEPROM.
|
|
This function only reports if the current device supports this mode,
|
|
not if this mode has been enabled in EEPROM.
|
|
|
|
EEPROM configuration must be queried to check which CBUS pins have
|
|
been configured for GPIO/bitbang mode.
|
|
|
|
:return: True if the FTDI device supports CBUS bitbang
|
|
:raise FtdiError: if no FTDI port is open
|
|
"""
|
|
if not self.is_connected:
|
|
raise FtdiError('Device characteristics not yet known')
|
|
return self.device_version in (0x0600, 0x0900, 0x1000)
|
|
|
|
@property
|
|
def has_drivezero(self) -> bool:
|
|
"""Tell whether the device supports drive-zero mode, i.e. if the
|
|
device supports the open-collector drive mode, useful for I2C
|
|
communication for example.
|
|
|
|
:return: True if the FTDI device features drive-zero mode
|
|
:raise FtdiError: if no FTDI port is open
|
|
"""
|
|
if not self.is_connected:
|
|
raise FtdiError('Device characteristics not yet known')
|
|
return self.device_version in (0x0900, )
|
|
|
|
@property
|
|
def is_legacy(self) -> bool:
|
|
"""Tell whether the device is a low-end FTDI
|
|
|
|
:return: True if the FTDI device can only be used as a slow USB-UART
|
|
bridge
|
|
:raise FtdiError: if no FTDI port is open
|
|
"""
|
|
if not self.is_connected:
|
|
raise FtdiError('Device characteristics not yet known')
|
|
return self.device_version <= 0x0200
|
|
|
|
@property
|
|
def is_H_series(self) -> bool:
|
|
"""Tell whether the device is a high-end FTDI
|
|
|
|
:return: True if the FTDI device is a high-end USB-UART bridge
|
|
:raise FtdiError: if no FTDI port is open
|
|
"""
|
|
if not self.is_connected:
|
|
raise FtdiError('Device characteristics not yet known')
|
|
return self.device_version in (0x0700, 0x0800, 0x0900, 0x3600)
|
|
|
|
@property
|
|
def is_mpsse(self) -> bool:
|
|
"""Tell whether the device is configured in MPSSE mode
|
|
|
|
:return: True if the FTDI interface is configured in MPSSE mode
|
|
"""
|
|
return self._bitmode == Ftdi.BitMode.MPSSE
|
|
|
|
def is_mpsse_interface(self, interface: int) -> bool:
|
|
"""Tell whether the interface supports MPSSE (I2C, SPI, JTAG, ...)
|
|
|
|
:return: True if the FTDI interface supports MPSSE
|
|
:raise FtdiError: if no FTDI port is open
|
|
"""
|
|
if not self.has_mpsse:
|
|
return False
|
|
if self.device_version == 0x0800 and interface > 2:
|
|
return False
|
|
if self.device_version == 0x3600 and interface > 2:
|
|
return False
|
|
return True
|
|
|
|
@property
|
|
def is_bitbang_enabled(self) -> bool:
|
|
"""Tell whether some bitbang mode is activated
|
|
|
|
:return: True if the FTDI interface is configured to support
|
|
bitbanging
|
|
"""
|
|
return self._bitmode not in (
|
|
Ftdi.BitMode.RESET,
|
|
Ftdi.BitMode.MPSSE,
|
|
Ftdi.BitMode.CBUS # CBUS mode does not change base frequency
|
|
)
|
|
|
|
# legacy API
|
|
bitbang_enabled = is_bitbang_enabled
|
|
|
|
@property
|
|
def is_eeprom_internal(self) -> bool:
|
|
"""Tell whether the device has an internal EEPROM.
|
|
|
|
:return: True if the device has an internal EEPROM.
|
|
"""
|
|
return self.device_version in self.INT_EEPROMS
|
|
|
|
@property
|
|
def max_eeprom_size(self) -> int:
|
|
"""Report the maximum size of the EEPROM.
|
|
The actual size may be lower, of even 0 if no EEPROM is connected
|
|
or supported.
|
|
|
|
:return: the maximum size in bytes.
|
|
"""
|
|
if self.device_version in self.INT_EEPROMS:
|
|
return self.INT_EEPROMS[self.device_version]
|
|
if self.device_version == 0x0600:
|
|
return 0x80
|
|
return 0x100
|
|
|
|
@property
|
|
def frequency_max(self) -> float:
|
|
"""Tells the maximum frequency for MPSSE clock.
|
|
|
|
:return: the maximum supported frequency in Hz
|
|
"""
|
|
return Ftdi.BUS_CLOCK_HIGH if self.is_H_series else Ftdi.BUS_CLOCK_BASE
|
|
|
|
@property
|
|
def fifo_sizes(self) -> Tuple[int, int]:
|
|
"""Return the (TX, RX) tupple of hardware FIFO sizes
|
|
|
|
:return: 2-tuple of TX, RX FIFO size in bytes
|
|
"""
|
|
try:
|
|
return Ftdi.FIFO_SIZES[self.device_version]
|
|
except KeyError as exc:
|
|
raise FtdiFeatureError(f'Unsupported device: '
|
|
f'0x{self.device_version:04x}') from exc
|
|
|
|
@property
|
|
def mpsse_bit_delay(self) -> float:
|
|
"""Delay between execution of two MPSSE SET_BITS commands.
|
|
|
|
:return: minimum delay (actual value might be larger) in seconds
|
|
"""
|
|
# measured on FTDI2232H, not documented in datasheet, hence may vary
|
|
# from on FTDI model to another...
|
|
# left as a variable so it could be tweaked base on the FTDI bcd type,
|
|
# the frequency, or ... whatever else
|
|
return 0.5E-6 # seems to vary between 5 and 6.5 us
|
|
|
|
@property
|
|
def baudrate(self) -> int:
|
|
"""Return current baudrate.
|
|
"""
|
|
return self._baudrate
|
|
|
|
@property
|
|
def usb_dev(self) -> UsbDevice:
|
|
"""Return the underlying USB Device.
|
|
"""
|
|
return self._usb_dev
|
|
|
|
def set_baudrate(self, baudrate: int, constrain: bool = True) -> int:
|
|
"""Change the current UART or BitBang baudrate.
|
|
|
|
The FTDI device is not able to use an arbitrary baudrate. Its
|
|
internal dividors are only able to achieve some baudrates.
|
|
|
|
PyFtdi attemps to find the closest configurable baudrate and if
|
|
the deviation from the requested baudrate is too high, it rejects
|
|
the configuration if constrain is set.
|
|
|
|
:py:attr:`baudrate` attribute can be used to retrieve the exact
|
|
selected baudrate.
|
|
|
|
:py:const:`BAUDRATE_TOLERANCE` defines the maximum deviation between
|
|
the requested baudrate and the closest FTDI achieveable baudrate,
|
|
which matches standard UART clock drift (3%). If the achievable
|
|
baudrate is not within limits, baudrate setting is rejected.
|
|
|
|
:param baudrate: the new baudrate for the UART.
|
|
:param constrain: whether to validate baudrate is in RS232 tolerance
|
|
limits or allow larger drift
|
|
:raise ValueError: if deviation from selected baudrate is too large
|
|
:raise FtdiError: on IO Error
|
|
:return: the effective baudrate
|
|
"""
|
|
self._baudrate = self._set_baudrate(baudrate, constrain)
|
|
return self._baudrate
|
|
|
|
def set_frequency(self, frequency: float) -> float:
|
|
"""Change the current MPSSE bus frequency
|
|
|
|
The FTDI device is not able to use an arbitrary frequency. Its
|
|
internal dividors are only able to achieve some frequencies.
|
|
|
|
PyFtdi finds and selects the closest configurable frequency.
|
|
|
|
:param frequency: the new frequency for the serial interface,
|
|
in Hz.
|
|
:return: the selected frequency, which may differ from the requested
|
|
one, in Hz
|
|
"""
|
|
return self._set_frequency(frequency)
|
|
|
|
def purge_rx_buffer(self) -> None:
|
|
"""Clear the USB receive buffer on the chip (host-to-ftdi) and the
|
|
internal read buffer."""
|
|
if self._ctrl_transfer_out(Ftdi.SIO_REQ_RESET,
|
|
Ftdi.SIO_RESET_PURGE_RX):
|
|
raise FtdiError('Unable to flush RX buffer')
|
|
# Invalidate data in the readbuffer
|
|
self._readoffset = 0
|
|
self._readbuffer = bytearray()
|
|
self.log.debug('rx buf purged')
|
|
|
|
def purge_tx_buffer(self) -> None:
|
|
"""Clear the USB transmit buffer on the chip (ftdi-to-host)."""
|
|
if self._ctrl_transfer_out(Ftdi.SIO_REQ_RESET,
|
|
Ftdi.SIO_RESET_PURGE_TX):
|
|
raise FtdiError('Unable to flush TX buffer')
|
|
|
|
def purge_buffers(self) -> None:
|
|
"""Clear the buffers on the chip and the internal read buffer."""
|
|
self.purge_rx_buffer()
|
|
self.purge_tx_buffer()
|
|
|
|
def write_data_set_chunksize(self, chunksize: int = 0) -> None:
|
|
"""Configure write buffer chunk size.
|
|
|
|
This is a low-level configuration option, which is not intended to
|
|
be use for a regular usage.
|
|
|
|
:param chunksize: the optional size of the write buffer in bytes,
|
|
it is recommended to use 0 to force automatic
|
|
evaluation of the best value.
|
|
"""
|
|
if chunksize == 0:
|
|
chunksize = self.fifo_sizes[0]
|
|
self._writebuffer_chunksize = chunksize
|
|
self.log.debug('TX chunksize: %d', self._writebuffer_chunksize)
|
|
|
|
def write_data_get_chunksize(self) -> int:
|
|
"""Get write buffer chunk size.
|
|
|
|
:return: the size of the write buffer in bytes
|
|
"""
|
|
return self._writebuffer_chunksize
|
|
|
|
def read_data_set_chunksize(self, chunksize: int = 0) -> None:
|
|
"""Configure read buffer chunk size.
|
|
|
|
This is a low-level configuration option, which is not intended to
|
|
be use for a regular usage.
|
|
|
|
:param chunksize: the optional size of the read buffer in bytes,
|
|
it is recommended to use 0 to force automatic
|
|
evaluation of the best value.
|
|
"""
|
|
# Invalidate all remaining data
|
|
self._readoffset = 0
|
|
self._readbuffer = bytearray()
|
|
if chunksize == 0:
|
|
# status byte prolog is emitted every maxpacketsize, but for "some"
|
|
# reasons, FT232R emits it every RX FIFO size bytes... Other
|
|
# devices use a maxpacketsize which is smaller or equal to their
|
|
# FIFO size, so this weird behavior is for now only experienced
|
|
# with FT232R. Any, the following compution should address all
|
|
# devices.
|
|
chunksize = min(self.fifo_sizes[0], self.fifo_sizes[1],
|
|
self._max_packet_size)
|
|
if platform == 'linux':
|
|
chunksize = min(chunksize, 16384)
|
|
self._readbuffer_chunksize = chunksize
|
|
self.log.debug('RX chunksize: %d', self._readbuffer_chunksize)
|
|
|
|
def read_data_get_chunksize(self) -> int:
|
|
"""Get read buffer chunk size.
|
|
|
|
:return: the size of the write buffer in bytes
|
|
"""
|
|
return self._readbuffer_chunksize
|
|
|
|
def set_bitmode(self, bitmask: int, mode: 'Ftdi.BitMode') -> None:
|
|
"""Enable/disable bitbang modes.
|
|
|
|
Switch the FTDI interface to bitbang mode.
|
|
"""
|
|
self.log.debug('bitmode: %s', mode.name)
|
|
mask = sum(Ftdi.BitMode)
|
|
value = (bitmask & 0xff) | ((mode.value & mask) << 8)
|
|
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_BITMODE, value):
|
|
raise FtdiError('Unable to set bitmode')
|
|
self._bitmode = mode
|
|
|
|
def read_pins(self) -> int:
|
|
"""Directly read pin state, circumventing the read buffer.
|
|
Useful for bitbang mode.
|
|
|
|
:return: bitfield of FTDI interface input GPIO
|
|
"""
|
|
pins = self._ctrl_transfer_in(Ftdi.SIO_REQ_READ_PINS, 1)
|
|
if not pins:
|
|
raise FtdiError('Unable to read pins')
|
|
return pins[0]
|
|
|
|
def set_cbus_direction(self, mask: int, direction: int) -> None:
|
|
"""Configure the CBUS pins used as GPIOs
|
|
|
|
:param mask: which pins to configure as GPIOs
|
|
:param direction: which pins are output (vs. input)
|
|
"""
|
|
# sanity check: there cannot be more than 4 CBUS pins in bitbang mode
|
|
if not 0 <= mask <= 0x0F:
|
|
raise ValueError(f'Invalid CBUS gpio mask: 0x{mask:02x}')
|
|
if not 0 <= direction <= 0x0F:
|
|
raise ValueError(f'Invalid CBUS gpio direction: 0x{direction:02x}')
|
|
self._cbus_pins = (mask, direction)
|
|
|
|
def get_cbus_gpio(self) -> int:
|
|
"""Get the CBUS pins configured as GPIO inputs
|
|
|
|
:return: bitfield of CBUS read pins
|
|
"""
|
|
if self._bitmode not in (Ftdi.BitMode.RESET, Ftdi.BitMode.CBUS):
|
|
raise FtdiError('CBUS gpio not available from current mode')
|
|
if not self._cbus_pins[0] & ~self._cbus_pins[1]:
|
|
raise FtdiError('No CBUS IO configured as input')
|
|
outv = (self._cbus_pins[1] << 4) | self._cbus_out
|
|
oldmode = self._bitmode
|
|
try:
|
|
self.set_bitmode(outv, Ftdi.BitMode.CBUS)
|
|
inv = self.read_pins()
|
|
finally:
|
|
if oldmode != self._bitmode:
|
|
self.set_bitmode(0, oldmode)
|
|
return inv & ~self._cbus_pins[1] & self._cbus_pins[0]
|
|
|
|
def set_cbus_gpio(self, pins: int) -> None:
|
|
"""Set the CBUS pins configured as GPIO outputs
|
|
|
|
:param pins: bitfield to apply to CBUS output pins
|
|
"""
|
|
if self._bitmode not in (Ftdi.BitMode.RESET, Ftdi.BitMode.CBUS):
|
|
raise FtdiError('CBUS gpio not available from current mode')
|
|
# sanity check: there cannot be more than 4 CBUS pins in bitbang mode
|
|
if not 0 <= pins <= 0x0F:
|
|
raise ValueError(f'Invalid CBUS gpio pins: 0x{pins:02x}')
|
|
if not self._cbus_pins[0] & self._cbus_pins[1]:
|
|
raise FtdiError('No CBUS IO configured as output')
|
|
pins &= self._cbus_pins[0] & self._cbus_pins[1]
|
|
value = (self._cbus_pins[1] << 4) | pins
|
|
oldmode = self._bitmode
|
|
try:
|
|
self.set_bitmode(value, Ftdi.BitMode.CBUS)
|
|
self._cbus_out = pins
|
|
finally:
|
|
if oldmode != self._bitmode:
|
|
self.set_bitmode(0, oldmode)
|
|
|
|
def set_latency_timer(self, latency: int):
|
|
"""Set latency timer.
|
|
|
|
The FTDI chip keeps data in the internal buffer for a specific
|
|
amount of time if the buffer is not full yet to decrease
|
|
load on the usb bus.
|
|
|
|
The shorted the latency, the shorted the delay to obtain data and
|
|
the higher the host CPU load. Be careful with this option.
|
|
|
|
:param latency: latency (unspecified unit)
|
|
"""
|
|
if not Ftdi.LATENCY_MIN <= latency <= Ftdi.LATENCY_MAX:
|
|
raise ValueError("Latency out of range")
|
|
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_LATENCY_TIMER, latency):
|
|
raise FtdiError('Unable to latency timer')
|
|
|
|
def get_latency_timer(self) -> int:
|
|
"""Get latency timer.
|
|
|
|
:return: the current latency (unspecified unit)
|
|
"""
|
|
latency = self._ctrl_transfer_in(Ftdi.SIO_REQ_GET_LATENCY_TIMER, 1)
|
|
if not latency:
|
|
raise FtdiError('Unable to get latency')
|
|
return latency[0]
|
|
|
|
def poll_modem_status(self) -> int:
|
|
"""Poll modem status information.
|
|
|
|
This function allows the retrieve the two status bytes of the
|
|
device, useful in UART mode.
|
|
|
|
FTDI device does not have a so-called USB "interrupt" end-point,
|
|
event polling on the UART interface is done through the regular
|
|
control endpoint.
|
|
|
|
see :py:func:`modem_status` to obtain decoded status strings
|
|
|
|
:return: modem status, as a proprietary bitfield
|
|
"""
|
|
value = self._ctrl_transfer_in(Ftdi.SIO_REQ_POLL_MODEM_STATUS, 2)
|
|
if not value or len(value) != 2:
|
|
raise FtdiError('Unable to get modem status')
|
|
status, = sunpack('<H', value)
|
|
return status
|
|
|
|
def modem_status(self) -> Tuple[str, ...]:
|
|
"""Provide the current modem status as a tuple of set signals
|
|
|
|
:return: decodede modem status as short strings
|
|
"""
|
|
value = self._ctrl_transfer_in(Ftdi.SIO_REQ_POLL_MODEM_STATUS, 2)
|
|
if not value or len(value) != 2:
|
|
raise FtdiError('Unable to get modem status')
|
|
return self.decode_modem_status(value)
|
|
|
|
def set_flowctrl(self, flowctrl: str) -> None:
|
|
"""Select flowcontrol in UART mode.
|
|
|
|
Either hardware flow control through RTS/CTS UART lines,
|
|
software or no flow control.
|
|
|
|
:param str flowctrl: either 'hw' for HW flow control or '' (empty
|
|
string) for no flow control.
|
|
:raise ValueError: if the flow control argument is invalid
|
|
|
|
.. note:: How does RTS/CTS flow control work (from FTDI FAQ):
|
|
|
|
FTxxx RTS# pin is an output. It should be connected to the CTS#
|
|
input pin of the device at the other end of the UART link.
|
|
|
|
* If RTS# is logic 0 it is indicating the FTxxx device can
|
|
accept more data on the RXD pin.
|
|
* If RTS# is logic 1 it is indicating the FTxxx device
|
|
cannot accept more data.
|
|
|
|
RTS# changes state when the chip buffer reaches its last 32
|
|
bytes of space to allow time for the external device to stop
|
|
sending data to the FTxxx device.
|
|
|
|
FTxxx CTS# pin is an input. It should be connected to the RTS#
|
|
output pin of the device at the other end of the UART link.
|
|
|
|
* If CTS# is logic 0 it is indicating the external device can
|
|
accept more data, and the FTxxx will transmit on the TXD
|
|
pin.
|
|
* If CTS# is logic 1 it is indicating the external device
|
|
cannot accept more data. the FTxxx will stop transmitting
|
|
within 0~3 characters, depending on what is in the buffer.
|
|
|
|
**This potential 3 character overrun does occasionally
|
|
present problems.** Customers shoud be made aware the FTxxx
|
|
is a USB device and not a "normal" RS232 device as seen on
|
|
a PC. As such the device operates on a packet basis as
|
|
opposed to a byte basis.
|
|
|
|
Word to the wise. Not only do RS232 level shifting devices
|
|
level shift, but they also invert the signal.
|
|
"""
|
|
ctrl = {'hw': Ftdi.SIO_RTS_CTS_HS,
|
|
'': Ftdi.SIO_DISABLE_FLOW_CTRL}
|
|
try:
|
|
value = ctrl[flowctrl] | self._index
|
|
except KeyError as exc:
|
|
raise ValueError(f'Unknown flow control: {flowctrl}') from exc
|
|
try:
|
|
if self._usb_dev.ctrl_transfer(
|
|
Ftdi.REQ_OUT, Ftdi.SIO_REQ_SET_FLOW_CTRL, 0, value,
|
|
bytearray(), self._usb_write_timeout):
|
|
raise FtdiError('Unable to set flow control')
|
|
except USBError as exc:
|
|
raise FtdiError(f'UsbError: {exc}') from exc
|
|
|
|
def set_dtr(self, state: bool) -> None:
|
|
"""Set dtr line
|
|
|
|
:param state: new DTR logical level
|
|
"""
|
|
value = Ftdi.SIO_SET_DTR_HIGH if state else Ftdi.SIO_SET_DTR_LOW
|
|
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_MODEM_CTRL, value):
|
|
raise FtdiError('Unable to set DTR line')
|
|
|
|
def set_rts(self, state: bool) -> None:
|
|
"""Set rts line
|
|
|
|
:param state: new RTS logical level
|
|
"""
|
|
value = Ftdi.SIO_SET_RTS_HIGH if state else Ftdi.SIO_SET_RTS_LOW
|
|
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_MODEM_CTRL, value):
|
|
raise FtdiError('Unable to set RTS line')
|
|
|
|
def set_dtr_rts(self, dtr: bool, rts: bool) -> None:
|
|
"""Set dtr and rts lines at once
|
|
|
|
:param dtr: new DTR logical level
|
|
:param rts: new RTS logical level
|
|
"""
|
|
value = 0
|
|
value |= Ftdi.SIO_SET_DTR_HIGH if dtr else Ftdi.SIO_SET_DTR_LOW
|
|
value |= Ftdi.SIO_SET_RTS_HIGH if rts else Ftdi.SIO_SET_RTS_LOW
|
|
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_MODEM_CTRL, value):
|
|
raise FtdiError('Unable to set DTR/RTS lines')
|
|
|
|
def set_break(self, break_: bool) -> None:
|
|
"""Start or stop a break exception event on the serial line
|
|
|
|
:param break_: either start or stop break event
|
|
"""
|
|
if break_:
|
|
value = self._lineprop | (0x01 << 14)
|
|
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_DATA, value):
|
|
raise FtdiError('Unable to start break sequence')
|
|
else:
|
|
value = self._lineprop & ~(0x01 << 14)
|
|
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_DATA, value):
|
|
raise FtdiError('Unable to stop break sequence')
|
|
self._lineprop = value
|
|
|
|
def set_event_char(self, eventch: int, enable: bool) -> None:
|
|
"""Set the special event character"""
|
|
value = eventch
|
|
if enable:
|
|
value |= 1 << 8
|
|
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_EVENT_CHAR, value):
|
|
raise FtdiError('Unable to set event char')
|
|
|
|
def set_error_char(self, errorch: int, enable: bool) -> None:
|
|
"""Set error character"""
|
|
value = errorch
|
|
if enable:
|
|
value |= 1 << 8
|
|
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_ERROR_CHAR, value):
|
|
raise FtdiError('Unable to set error char')
|
|
|
|
def set_line_property(self, bits: int, stopbit: Union[int, float],
|
|
parity: str, break_: bool = False) -> None:
|
|
"""Configure the (RS232) UART characteristics.
|
|
|
|
Arguments match the valid subset for FTDI HW of pyserial
|
|
definitions.
|
|
|
|
Bits accepts one of the following values:
|
|
|
|
* ``7`` for 7-bit characters
|
|
* ``8`` for 8-bit characters
|
|
|
|
Stopbit accepts one of the following values:
|
|
|
|
* ``1`` for a single bit
|
|
* ``1.5`` for a bit and a half
|
|
* ``2`` for two bits
|
|
|
|
Parity accepts one of the following strings:
|
|
|
|
* ``N`` for no parity bit
|
|
* ``O`` for odd parity bit
|
|
* ``E`` for even parity bit
|
|
* ``M`` for parity bit always set
|
|
* ``S`` for parity bit always reset
|
|
|
|
:param bits: data bit count
|
|
:param stopbit: stop bit count
|
|
:param parity: parity mode as a single uppercase character
|
|
:param break_: force break event
|
|
"""
|
|
bytelength = {7: Ftdi.BITS_7,
|
|
8: Ftdi.BITS_8}
|
|
parities = {'N': Ftdi.PARITY_NONE,
|
|
'O': Ftdi.PARITY_ODD,
|
|
'E': Ftdi.PARITY_EVEN,
|
|
'M': Ftdi.PARITY_MARK,
|
|
'S': Ftdi.PARITY_SPACE}
|
|
stopbits = {1: Ftdi.STOP_BIT_1,
|
|
1.5: Ftdi.STOP_BIT_15,
|
|
2: Ftdi.STOP_BIT_2}
|
|
if parity not in parities:
|
|
raise FtdiFeatureError("Unsupported parity")
|
|
if bits not in bytelength:
|
|
raise FtdiFeatureError("Unsupported byte length")
|
|
if stopbit not in stopbits:
|
|
raise FtdiFeatureError("Unsupported stop bits")
|
|
value = bits & 0x0F
|
|
try:
|
|
value |= {Ftdi.PARITY_NONE: 0x00 << 8,
|
|
Ftdi.PARITY_ODD: 0x01 << 8,
|
|
Ftdi.PARITY_EVEN: 0x02 << 8,
|
|
Ftdi.PARITY_MARK: 0x03 << 8,
|
|
Ftdi.PARITY_SPACE: 0x04 << 8}[parities[parity]]
|
|
value |= {Ftdi.STOP_BIT_1: 0x00 << 11,
|
|
Ftdi.STOP_BIT_15: 0x01 << 11,
|
|
Ftdi.STOP_BIT_2: 0x02 << 11}[stopbits[stopbit]]
|
|
if break_ == Ftdi.BREAK_ON:
|
|
value |= 0x01 << 14
|
|
except KeyError as exc:
|
|
raise ValueError('Invalid line property') from exc
|
|
if self._ctrl_transfer_out(Ftdi.SIO_REQ_SET_DATA, value):
|
|
raise FtdiError('Unable to set line property')
|
|
self._lineprop = value
|
|
|
|
def enable_adaptive_clock(self, enable: bool = True) -> None:
|
|
"""Enable adaptative clock mode, useful in MPSEE mode.
|
|
|
|
Adaptive clock is a unique feature designed for a feedback clock
|
|
for JTAG with ARM core.
|
|
|
|
:param enable: whether to enable or disable this mode.
|
|
:raise FtdiMpsseError: if MPSSE mode is not enabled
|
|
"""
|
|
if not self.is_mpsse:
|
|
raise FtdiMpsseError('Setting adaptive clock mode is only '
|
|
'available from MPSSE mode')
|
|
self.write_data(bytearray([enable and Ftdi.ENABLE_CLK_ADAPTIVE or
|
|
Ftdi.DISABLE_CLK_ADAPTIVE]))
|
|
|
|
def enable_3phase_clock(self, enable: bool = True) -> None:
|
|
"""Enable 3-phase clocking mode, useful in MPSSE mode.
|
|
|
|
3-phase clock is mostly useful with I2C mode. It is also be used
|
|
as a workaround to support SPI mode 3.
|
|
|
|
:param enable: whether to enable or disable this mode.
|
|
:raise FtdiMpsseError: if MPSSE mode is not enabled or device is
|
|
not capable of 3-phase clocking
|
|
"""
|
|
if not self.is_mpsse:
|
|
raise FtdiMpsseError('Setting 3-phase clock mode is only '
|
|
'available from MPSSE mode')
|
|
if not self.is_H_series:
|
|
raise FtdiFeatureError('This device does not support 3-phase '
|
|
'clock')
|
|
self.write_data(bytearray([enable and Ftdi.ENABLE_CLK_3PHASE or
|
|
Ftdi.DISABLE_CLK_3PHASE]))
|
|
|
|
def enable_drivezero_mode(self, lines: int) -> None:
|
|
"""Enable drive-zero mode, useful in MPSSE mode.
|
|
|
|
drive-zero mode is mostly useful with I2C mode, to support the open
|
|
collector driving mode.
|
|
|
|
:param lines: bitfield of GPIO to drive in collector driven mode
|
|
:raise FtdiMpsseError: if MPSSE mode is not enabled or device is
|
|
not capable of drive-zero mode
|
|
"""
|
|
if not self.is_mpsse:
|
|
raise FtdiMpsseError('Setting drive-zero mode is only '
|
|
'available from MPSSE mode')
|
|
if not self.has_drivezero:
|
|
raise FtdiFeatureError('This device does not support drive-zero '
|
|
'mode')
|
|
self.write_data(bytearray([Ftdi.DRIVE_ZERO, lines & 0xff,
|
|
(lines >> 8) & 0xff]))
|
|
|
|
def enable_loopback_mode(self, loopback: bool = False) -> None:
|
|
"""Enable loopback, i.e. connect DO to DI in FTDI MPSSE port for test
|
|
purposes only. It does not support UART (TX to RX) mode.
|
|
|
|
:param loopback: whether to enable or disable this mode
|
|
"""
|
|
self.write_data(bytearray((Ftdi.LOOPBACK_START if loopback else
|
|
Ftdi.LOOPBACK_END,)))
|
|
|
|
def calc_eeprom_checksum(self, data: Union[bytes, bytearray]) -> int:
|
|
"""Calculate EEPROM checksum over the data
|
|
|
|
:param data: data to compute checksum over. Must be an even number
|
|
of bytes
|
|
:return: checksum
|
|
"""
|
|
length = len(data)
|
|
if not length:
|
|
raise ValueError('No data to checksum')
|
|
if length & 0x1:
|
|
raise ValueError('Length not even')
|
|
# NOTE: checksum is computed using 16-bit values in little endian
|
|
# ordering
|
|
checksum = 0XAAAA
|
|
mtp = self.device_version == 0x1000 # FT230X
|
|
for idx in range(0, length, 2):
|
|
if mtp and 0x24 <= idx < 0x80:
|
|
# special MTP user section which is not considered for the CRC
|
|
continue
|
|
val = ((data[idx+1] << 8) + data[idx]) & 0xffff
|
|
checksum = val ^ checksum
|
|
checksum = ((checksum << 1) & 0xffff) | ((checksum >> 15) & 0xffff)
|
|
return checksum
|
|
|
|
def read_eeprom(self, addr: int = 0, length: Optional[int] = None,
|
|
eeprom_size: Optional[int] = None) -> bytes:
|
|
"""Read the EEPROM starting at byte address, addr, and returning
|
|
length bytes. Here, addr and length are in bytes but we
|
|
access a 16-bit word at a time, so automatically update
|
|
addr and length to work with word accesses.
|
|
|
|
:param addr: byte address that desire to read.
|
|
:param length: byte length to read or None
|
|
:param eeprom_size: total size in bytes of the eeprom or None
|
|
:return: eeprom bytes, as an array of bytes
|
|
"""
|
|
eeprom_size = self._check_eeprom_size(eeprom_size)
|
|
if length is None:
|
|
length = eeprom_size
|
|
if addr < 0 or (addr+length) > eeprom_size:
|
|
raise ValueError('Invalid address/length')
|
|
word_addr = addr >> 1
|
|
word_count = length >> 1
|
|
if (addr & 0x1) | (length & 0x1):
|
|
word_count += 1
|
|
try:
|
|
data = bytearray()
|
|
while word_count:
|
|
buf = self._usb_dev.ctrl_transfer(
|
|
Ftdi.REQ_IN, Ftdi.SIO_REQ_READ_EEPROM, 0,
|
|
word_addr, 2, self._usb_read_timeout)
|
|
if not buf:
|
|
err_addr = word_addr << 1
|
|
raise FtdiEepromError(f'EEPROM read error @ {err_addr}')
|
|
data.extend(buf)
|
|
word_count -= 1
|
|
word_addr += 1
|
|
start = addr & 0x1
|
|
return bytes(data[start:start+length])
|
|
except USBError as exc:
|
|
raise FtdiError(f'UsbError: {exc}') from exc
|
|
|
|
def write_eeprom(self, addr: int, data: Union[bytes, bytearray],
|
|
eeprom_size: Optional[int] = None,
|
|
dry_run: bool = True) -> None:
|
|
"""Write multiple bytes to the EEPROM starting at byte address,
|
|
addr. This function also updates the checksum
|
|
automatically.
|
|
|
|
.. warning:: You can brick your device with invalid size or content.
|
|
Use this function at your own risk, and RTFM.
|
|
|
|
:param addr: starting byte address to start writing
|
|
:param data: data to be written
|
|
:param eeprom_size: total size in bytes of the eeprom or None
|
|
:param dry_run: log what should be written, do not actually
|
|
change the EEPROM content
|
|
"""
|
|
eeprom_size = self._check_eeprom_size(eeprom_size)
|
|
if not data:
|
|
return
|
|
length = len(data)
|
|
if addr < 0 or (addr+length) > eeprom_size:
|
|
# accept up to eeprom_size, even if the last two bytes are
|
|
# overwritten with a locally computed checksum
|
|
raise ValueError('Invalid address/length')
|
|
# First, read out the entire EEPROM, based on eeprom_size.
|
|
eeprom = bytearray(self.read_eeprom(0, eeprom_size))
|
|
# patch in the new data
|
|
eeprom[addr:addr+len(data)] = data
|
|
# compute new checksum
|
|
chksum = self.calc_eeprom_checksum(eeprom[:-2])
|
|
self.log.info('New EEPROM checksum: 0x%04x', chksum)
|
|
# insert updated checksum - it is last 16-bits in EEPROM
|
|
if self.device_version == 0x1000:
|
|
# FT230x EEPROM structure is different
|
|
eeprom[0x7e] = chksum & 0x0ff
|
|
eeprom[0x7f] = chksum >> 8
|
|
else:
|
|
eeprom[-2] = chksum & 0x0ff
|
|
eeprom[-1] = chksum >> 8
|
|
# Write back the new data and checksum back to
|
|
# EEPROM. Only write data that is changing instead of writing
|
|
# everything in EEPROM, even if the data does not change.
|
|
#
|
|
# Compute start and end sections of eeprom baring in mind that
|
|
# they must be even since it is a 16-bit EEPROM.
|
|
# If start addr is odd, back it up one.
|
|
start = addr
|
|
size = length
|
|
if start & 0x1:
|
|
start -= 1
|
|
size += 1
|
|
if size & 0x1:
|
|
size += 1
|
|
size = min(size, eeprom_size - 2)
|
|
# finally, write new section of data and ...
|
|
self._write_eeprom_raw(start, eeprom[start:start+size],
|
|
dry_run=dry_run)
|
|
# ... updated checksum
|
|
self._write_eeprom_raw((eeprom_size-2), eeprom[-2:], dry_run=dry_run)
|
|
|
|
def overwrite_eeprom(self, data: Union[bytes, bytearray],
|
|
dry_run: bool = True) -> None:
|
|
"""Write the whole EEPROM content, from first to last byte.
|
|
|
|
.. warning:: You can brick your device with invalid size or content.
|
|
Use this function at your own risk, and RTFM.
|
|
|
|
:param data: data to be written (should include the checksum)
|
|
:param dry_run: log what should be written, do not actually
|
|
change the EEPROM content
|
|
"""
|
|
if self.is_eeprom_internal:
|
|
eeprom_size = self.INT_EEPROMS[self.device_version]
|
|
if len(data) != eeprom_size:
|
|
raise ValueError('Invalid EEPROM size')
|
|
elif len(data) not in self.EXT_EEPROM_SIZES:
|
|
raise ValueError('Invalid EEPROM size')
|
|
self._write_eeprom_raw(0, data, dry_run=dry_run)
|
|
|
|
def write_data(self, data: Union[bytes, bytearray]) -> int:
|
|
"""Write data to the FTDI port.
|
|
|
|
In UART mode, data contains the serial stream to write to the UART
|
|
interface.
|
|
|
|
In MPSSE mode, data contains the sequence of MPSSE commands and
|
|
data.
|
|
|
|
Data buffer is split into chunk-sized blocks before being sent over
|
|
the USB bus.
|
|
|
|
:param data: the byte stream to send to the FTDI interface
|
|
:return: count of written bytes
|
|
"""
|
|
offset = 0
|
|
size = len(data)
|
|
try:
|
|
while offset < size:
|
|
write_size = self._writebuffer_chunksize
|
|
if offset + write_size > size:
|
|
write_size = size - offset
|
|
length = self._write(data[offset:offset+write_size])
|
|
if length <= 0:
|
|
raise FtdiError("Usb bulk write error")
|
|
offset += length
|
|
return offset
|
|
except USBError as exc:
|
|
raise FtdiError(f'UsbError: {exc}') from exc
|
|
|
|
def read_data_bytes(self, size: int, attempt: int = 1,
|
|
request_gen: Optional[Callable[[int],
|
|
Union[bytes, bytearray]]] = None) \
|
|
-> bytearray:
|
|
"""Read data from the FTDI interface
|
|
|
|
In UART mode, data contains the serial stream read from the UART
|
|
interface.
|
|
|
|
In MPSSE mode, data contains the sequence of data received and
|
|
processed with the MPSEE engine.
|
|
|
|
Data buffer is rebuilt from chunk-sized blocks received over the USB
|
|
bus.
|
|
|
|
FTDI device always sends internal status bytes, which are stripped
|
|
out as not part of the data payload.
|
|
|
|
Because of the multiple buses, buffers, FIFOs, and MPSSE command
|
|
processing, data might not be immediately available on the host
|
|
side. The attempt argument can be used to increase the attempt count
|
|
to retrieve the expected amount of data, before giving up and
|
|
returning all the received data, which may be shorted than the
|
|
requested amount.
|
|
|
|
:param size: the number of bytes to received from the device
|
|
:param attempt: attempt cycle count
|
|
:param request_gen: a callable that takes the number of bytes read
|
|
and expect a bytes byffer to send back to the
|
|
remote device. This is only useful to perform
|
|
optimized/continuous transfer from a slave
|
|
device.
|
|
:return: payload bytes, as bytes
|
|
"""
|
|
# Packet size sanity check
|
|
if not self._max_packet_size:
|
|
raise FtdiError("max_packet_size is bogus")
|
|
packet_size = self._max_packet_size
|
|
length = 1 # initial condition to enter the usb_read loop
|
|
data = bytearray()
|
|
# everything we want is still in the cache?
|
|
if size <= len(self._readbuffer)-self._readoffset:
|
|
data = self._readbuffer[self._readoffset:self._readoffset+size]
|
|
self._readoffset += size
|
|
return data
|
|
# something still in the cache, but not enough to satisfy 'size'?
|
|
if len(self._readbuffer)-self._readoffset != 0:
|
|
data = self._readbuffer[self._readoffset:]
|
|
# end of readbuffer reached
|
|
self._readoffset = len(self._readbuffer)
|
|
# read from USB, filling in the local cache as it is empty
|
|
retry = attempt
|
|
req_size = size
|
|
try:
|
|
while (len(data) < size) and (length > 0):
|
|
while True:
|
|
tempbuf = self._read()
|
|
retry -= 1
|
|
length = len(tempbuf)
|
|
# the received buffer contains at least one useful databyte
|
|
# (first 2 bytes in each packet represent the current modem
|
|
# status)
|
|
if length >= 2:
|
|
if tempbuf[1] & self.TX_EMPTY_BITS:
|
|
if request_gen:
|
|
req_size -= length-2
|
|
if req_size > 0:
|
|
cmd = request_gen(req_size)
|
|
if cmd:
|
|
self.write_data(cmd)
|
|
if length > 2:
|
|
retry = attempt
|
|
if self._latency_threshold:
|
|
self._adapt_latency(True)
|
|
# skip the status bytes
|
|
chunks = (length+packet_size-1) // packet_size
|
|
count = packet_size - 2
|
|
# if you want to show status, use the following code:
|
|
status = tempbuf[:2]
|
|
if status[1] & self.ERROR_BITS[1]:
|
|
self.log.error(
|
|
'FTDI error: %02x:%02x %s',
|
|
status[0], status[1], (' '.join(
|
|
self.decode_modem_status(status,
|
|
True)).title()))
|
|
self._readbuffer = bytearray()
|
|
self._readoffset = 0
|
|
srcoff = 2
|
|
for _ in range(chunks):
|
|
self._readbuffer += tempbuf[srcoff:srcoff+count]
|
|
srcoff += packet_size
|
|
length = len(self._readbuffer)
|
|
break
|
|
# received buffer only contains the modem status bytes
|
|
# no data received, may be late, try again
|
|
if retry > 0:
|
|
continue
|
|
# no actual data
|
|
self._readbuffer = bytearray()
|
|
self._readoffset = 0
|
|
if self._latency_threshold:
|
|
self._adapt_latency(False)
|
|
# no more data to read?
|
|
return data
|
|
if length > 0:
|
|
# data still fits in buf?
|
|
if (len(data) + length) <= size:
|
|
data += self._readbuffer[self._readoffset:
|
|
self._readoffset+length]
|
|
self._readoffset += length
|
|
# did we read exactly the right amount of bytes?
|
|
if len(data) == size:
|
|
return data
|
|
else:
|
|
# partial copy, not enough bytes in the local cache to
|
|
# fulfill the request
|
|
part_size = min(size-len(data),
|
|
len(self._readbuffer)-self._readoffset)
|
|
if part_size < 0:
|
|
raise FtdiError("Internal Error")
|
|
data += self._readbuffer[self._readoffset:
|
|
self._readoffset+part_size]
|
|
self._readoffset += part_size
|
|
return data
|
|
except USBError as exc:
|
|
raise FtdiError(f'UsbError: {exc}') from exc
|
|
# never reached
|
|
raise FtdiError("Internal error")
|
|
|
|
def read_data(self, size: int) -> bytes:
|
|
"""Shortcut to received a bytes buffer instead of the array of bytes.
|
|
|
|
Note that output byte buffer may be shorted than the requested
|
|
size.
|
|
|
|
:param size: the number of bytes to received from the device
|
|
:return: payload bytes
|
|
"""
|
|
return bytes(self.read_data_bytes(size))
|
|
|
|
def get_cts(self) -> bool:
|
|
"""Read terminal status line: Clear To Send
|
|
|
|
:return: CTS line logical level
|
|
"""
|
|
status = self.poll_modem_status()
|
|
return bool(status & self.MODEM_CTS)
|
|
|
|
def get_dsr(self) -> bool:
|
|
"""Read terminal status line: Data Set Ready
|
|
|
|
:return: DSR line logical level
|
|
"""
|
|
status = self.poll_modem_status()
|
|
return bool(status & self.MODEM_DSR)
|
|
|
|
def get_ri(self) -> bool:
|
|
"""Read terminal status line: Ring Indicator
|
|
|
|
:return: RI line logical level
|
|
"""
|
|
status = self.poll_modem_status()
|
|
return bool(status & self.MODEM_RI)
|
|
|
|
def get_cd(self) -> bool:
|
|
"""Read terminal status line: Carrier Detect
|
|
|
|
:return: CD line logical level
|
|
"""
|
|
status = self.poll_modem_status()
|
|
return bool(status & self.MODEM_RLSD)
|
|
|
|
def set_dynamic_latency(self, lmin: int, lmax: int,
|
|
threshold: int) -> None:
|
|
"""Set up or disable latency values.
|
|
|
|
Dynamic latency management is a load balancer to adapt the
|
|
responsiveness of FTDI read request vs. the host CPU load.
|
|
|
|
It is mostly useful in UART mode, so that read bandwidth can be
|
|
increased to the maximum achievable throughput, while maintaining
|
|
very low host CPU load when no data is received from the UART.
|
|
|
|
There should be no need to tweak the default values. Use with care.
|
|
|
|
Minimum latency is limited to 12 or above, at FTDI device starts
|
|
losing bytes when latency is too short...
|
|
|
|
Maximum latency value is 255 ms.
|
|
|
|
Polling latency is reset to `lmin` each time at least one payload
|
|
byte is received from the FTDI device.
|
|
|
|
It doubles, up to `lmax`, every `threshold` times no payload has
|
|
been received from the FTDI device.
|
|
|
|
:param lmin: minimum latency level (ms)
|
|
:param lmax: maximum latenty level (ms)
|
|
:param threshold: count to reset latency to maximum level
|
|
"""
|
|
if not threshold:
|
|
self._latency_count = 0
|
|
self._latency_threshold = None
|
|
else:
|
|
for lat in (lmin, lmax):
|
|
if not self.LATENCY_MIN <= lat <= self.LATENCY_MAX:
|
|
raise ValueError(f'Latency out of range: {lat}')
|
|
self._latency_min = lmin
|
|
self._latency_max = lmax
|
|
self._latency_threshold = threshold
|
|
self._latency = lmin
|
|
self.set_latency_timer(self._latency)
|
|
|
|
def validate_mpsse(self) -> None:
|
|
"""Check that the previous MPSSE request has been accepted by the FTDI
|
|
device.
|
|
|
|
:raise FtdiError: if the FTDI device rejected the command.
|
|
"""
|
|
# only useful in MPSSE mode
|
|
bytes_ = self.read_data(2)
|
|
if (len(bytes_) >= 2) and (bytes_[0] == '\xfa'):
|
|
raise FtdiError(f'Invalid command @ {bytes_[1]}')
|
|
|
|
@classmethod
|
|
def get_error_string(cls) -> str:
|
|
"""Wrapper for legacy compatibility.
|
|
|
|
:return: a constant, meaningless string
|
|
"""
|
|
return "Unknown error"
|
|
|
|
# --- Private implementation -------------------------------------------
|
|
|
|
def _set_interface(self, config: UsbConfiguration, ifnum: int):
|
|
"""Select the interface to use on the FTDI device"""
|
|
if ifnum == 0:
|
|
ifnum = 1
|
|
if ifnum-1 not in range(config.bNumInterfaces):
|
|
raise ValueError("No such interface for this device")
|
|
self._interface = config[(ifnum-1, 0)]
|
|
self._index = self._interface.bInterfaceNumber+1
|
|
endpoints = sorted([ep.bEndpointAddress for ep in self._interface])
|
|
self._in_ep, self._out_ep = endpoints[:2]
|
|
|
|
# detach kernel driver from the interface
|
|
try:
|
|
if self._usb_dev.is_kernel_driver_active(self._index - 1):
|
|
self._usb_dev.detach_kernel_driver(self._index - 1)
|
|
except (NotImplementedError, USBError):
|
|
pass
|
|
|
|
# pylint: disable=protected-access
|
|
# need to access private member _ctx of PyUSB device (resource manager)
|
|
# until PyUSB #302 is addressed
|
|
|
|
def _reset_usb_device(self) -> None:
|
|
"""Reset USB device (USB command, not FTDI specific)."""
|
|
self._usb_dev._ctx.backend.reset_device(self._usb_dev._ctx.handle)
|
|
|
|
def _is_pyusb_handle_active(self) -> bool:
|
|
# Unfortunately, we need to access pyusb ResourceManager
|
|
# and there is no public API for this.
|
|
return bool(self._usb_dev._ctx.handle)
|
|
|
|
# pylint: enable-msg=protected-access
|
|
|
|
def _reset_device(self):
|
|
"""Reset the FTDI device (FTDI vendor command)"""
|
|
if self._ctrl_transfer_out(Ftdi.SIO_REQ_RESET,
|
|
Ftdi.SIO_RESET_SIO):
|
|
raise FtdiError('Unable to reset FTDI device')
|
|
|
|
def _ctrl_transfer_out(self, reqtype: int, value: int, data: bytes = b''):
|
|
"""Send a control message to the device"""
|
|
try:
|
|
return self._usb_dev.ctrl_transfer(
|
|
Ftdi.REQ_OUT, reqtype, value, self._index,
|
|
bytearray(data), self._usb_write_timeout)
|
|
except USBError as exc:
|
|
raise FtdiError(f'UsbError: {exc}') from None
|
|
|
|
def _ctrl_transfer_in(self, reqtype: int, length: int):
|
|
"""Request for a control message from the device"""
|
|
try:
|
|
return self._usb_dev.ctrl_transfer(
|
|
Ftdi.REQ_IN, reqtype, 0, self._index, length,
|
|
self._usb_read_timeout)
|
|
except USBError as exc:
|
|
raise FtdiError(f'UsbError: {exc}') from None
|
|
|
|
def _write(self, data: Union[bytes, bytearray]) -> int:
|
|
if self._debug_log:
|
|
try:
|
|
self.log.debug('> %s', hexlify(data).decode())
|
|
except TypeError as exc:
|
|
self.log.warning('> (invalid output byte sequence: %s)', exc)
|
|
if self._tracer:
|
|
self._tracer.send(self._index, data)
|
|
try:
|
|
return self._usb_dev.write(self._in_ep, data,
|
|
self._usb_write_timeout)
|
|
except USBError as exc:
|
|
raise FtdiError(f'UsbError: {exc}') from None
|
|
|
|
def _read(self) -> bytes:
|
|
try:
|
|
data = self._usb_dev.read(self._out_ep, self._readbuffer_chunksize,
|
|
self._usb_read_timeout)
|
|
except USBError as exc:
|
|
raise FtdiError(f'UsbError: {exc}') from None
|
|
if data:
|
|
if self._debug_log:
|
|
self.log.debug('< %s', hexlify(data).decode())
|
|
if self._tracer and len(data) > 2:
|
|
self._tracer.receive(self._index, data[2:])
|
|
return data
|
|
|
|
def _adapt_latency(self, payload_detected: bool) -> None:
|
|
"""Dynamic latency adaptation depending on the presence of a
|
|
payload in a RX buffer.
|
|
|
|
:param payload_detected: whether a payload has been received
|
|
within last RX buffer
|
|
"""
|
|
if payload_detected:
|
|
self._latency_count = 0
|
|
if self._latency != self._latency_min:
|
|
self.set_latency_timer(self._latency_min)
|
|
self._latency = self._latency_min
|
|
return
|
|
# no payload received
|
|
self._latency_count += 1
|
|
if self._latency != self._latency_max:
|
|
if self._latency_count > \
|
|
self._latency_threshold:
|
|
self._latency *= 2
|
|
if self._latency > self._latency_max:
|
|
self._latency = self._latency_max
|
|
else:
|
|
self._latency_count = 0
|
|
self.set_latency_timer(self._latency)
|
|
|
|
def _check_eeprom_size(self, eeprom_size: Optional[int]) -> int:
|
|
if self.device_version in self.INT_EEPROMS:
|
|
if (eeprom_size and
|
|
eeprom_size != self.INT_EEPROMS[self.device_version]):
|
|
raise ValueError(f'Invalid EEPROM size: {eeprom_size}')
|
|
eeprom_size = self.INT_EEPROMS[self.device_version]
|
|
else:
|
|
if eeprom_size is None:
|
|
eeprom_size = self.max_eeprom_size
|
|
if eeprom_size not in self.EXT_EEPROM_SIZES:
|
|
raise ValueError(f'Invalid EEPROM size: {eeprom_size}')
|
|
return eeprom_size
|
|
|
|
def _write_eeprom_raw(self, addr: int, data: Union[bytes, bytearray],
|
|
dry_run: bool = True) -> None:
|
|
"""Write multiple bytes to the EEPROM starting at byte address,
|
|
addr. Length of data must be a multiple of 2 since the
|
|
EEPROM is 16-bits. So automatically extend data by 1 byte
|
|
if this is not the case.
|
|
|
|
:param int addr: starting byte address to start writing
|
|
:param bytes data: data to be written
|
|
:param dry_run: log what should be written, do not actually
|
|
change the EEPROM content
|
|
"""
|
|
if self.device_version == 0x0600:
|
|
# FT232R internal EEPROM is unstable and latency timer seems
|
|
# to have a direct impact on EEPROM programming...
|
|
latency = self.get_latency_timer()
|
|
else:
|
|
latency = 0
|
|
try:
|
|
if latency:
|
|
self.set_latency_timer(self.LATENCY_EEPROM_FT232R)
|
|
length = len(data)
|
|
if addr & 0x1 or length & 0x1:
|
|
raise ValueError('Address/length not even')
|
|
for word in sunpack(f'<{length//2}H', data):
|
|
if not dry_run:
|
|
out = self._usb_dev.ctrl_transfer(
|
|
Ftdi.REQ_OUT, Ftdi.SIO_REQ_WRITE_EEPROM,
|
|
word, addr >> 1, b'', self._usb_write_timeout)
|
|
if out:
|
|
raise FtdiEepromError(f'EEPROM Write Error @ {addr}')
|
|
self.log.debug('Write EEPROM [0x%02x]: 0x%04x', addr, word)
|
|
else:
|
|
self.log.info('Fake write EEPROM [0x%02x]: 0x%04x',
|
|
addr, word)
|
|
addr += 2
|
|
finally:
|
|
if latency:
|
|
self.set_latency_timer(latency)
|
|
|
|
def _get_max_packet_size(self) -> int:
|
|
"""Retrieve the maximum length of a data packet"""
|
|
if not self.is_connected:
|
|
raise IOError("Device is not yet known", ENODEV)
|
|
if not self._interface:
|
|
raise IOError("Interface is not yet known", ENODEV)
|
|
endpoint = self._interface[0]
|
|
packet_size = endpoint.wMaxPacketSize
|
|
return packet_size
|
|
|
|
def _convert_baudrate_legacy(self, baudrate: int) -> Tuple[int, int, int]:
|
|
if baudrate > self.BAUDRATE_REF_BASE:
|
|
raise ValueError('Invalid baudrate (too high)')
|
|
div8 = int(round((8 * self.BAUDRATE_REF_BASE) / baudrate))
|
|
if (div8 & 0x7) == 7:
|
|
div8 += 1
|
|
div = div8 >> 3
|
|
div8 &= 0x7
|
|
if div8 == 1:
|
|
div |= 0xc000
|
|
elif div8 >= 4:
|
|
div |= 0x4000
|
|
elif div8 != 0:
|
|
div |= 0x8000
|
|
elif div == 1:
|
|
div = 0
|
|
value = div & 0xFFFF
|
|
index = (div >> 16) & 0xFFFF
|
|
estimate = int(((8 * self.BAUDRATE_REF_BASE) + (div8//2))//div8)
|
|
return estimate, value, index
|
|
|
|
def _convert_baudrate(self, baudrate: int) -> Tuple[int, int, int]:
|
|
"""Convert a requested baudrate into the closest possible baudrate
|
|
that can be assigned to the FTDI device
|
|
|
|
:param baudrate: the baudrate in bps
|
|
:return: a 3-uple of the apprimated baudrate, the value and index
|
|
to use as the USB configuration parameter
|
|
"""
|
|
if self.device_version == 0x200:
|
|
return self._convert_baudrate_legacy(baudrate)
|
|
if self.is_H_series and baudrate >= 1200:
|
|
hispeed = True
|
|
clock = self.BAUDRATE_REF_HIGH
|
|
bb_ratio = self.BITBANG_BAUDRATE_RATIO_HIGH
|
|
else:
|
|
hispeed = False
|
|
clock = self.BAUDRATE_REF_BASE
|
|
bb_ratio = self.BITBANG_BAUDRATE_RATIO_BASE
|
|
if baudrate > clock:
|
|
raise ValueError('Invalid baudrate (too high)')
|
|
if baudrate < ((clock >> 14) + 1):
|
|
raise ValueError('Invalid baudrate (too low)')
|
|
if self.is_bitbang_enabled:
|
|
baudrate //= bb_ratio
|
|
div8 = int(round((8 * clock) / baudrate))
|
|
div = div8 >> 3
|
|
div |= self.FRAC_DIV_CODE[div8 & 0x7] << 14
|
|
if div == 1:
|
|
div = 0
|
|
elif div == 0x4001:
|
|
div = 1
|
|
if hispeed:
|
|
div |= 0x00020000
|
|
value = div & 0xFFFF
|
|
index = (div >> 16) & 0xFFFF
|
|
if self.device_version >= 0x0700 or self.device_version == 0x0500:
|
|
index <<= 8
|
|
index |= self._index
|
|
estimate = int(((8 * clock) + (div8//2))//div8)
|
|
if self.is_bitbang_enabled:
|
|
estimate *= bb_ratio
|
|
return estimate, value, index
|
|
|
|
def _set_baudrate(self, baudrate: int, constrain: bool) -> int:
|
|
if self.is_mpsse:
|
|
raise FtdiFeatureError('Cannot change frequency w/ current mode')
|
|
actual, value, index = self._convert_baudrate(baudrate)
|
|
delta = 100*abs(float(actual-baudrate))/baudrate
|
|
self.log.debug('Actual baudrate: %d %.1f%% div [%04x:%04x]',
|
|
actual, delta, index, value)
|
|
# return actual
|
|
if constrain and delta > Ftdi.BAUDRATE_TOLERANCE:
|
|
raise ValueError(f'Baudrate tolerance exceeded: {delta:.02f}% '
|
|
f'(wanted {baudrate}, achievable {actual})')
|
|
try:
|
|
if self._usb_dev.ctrl_transfer(
|
|
Ftdi.REQ_OUT, Ftdi.SIO_REQ_SET_BAUDRATE, value, index,
|
|
bytearray(), self._usb_write_timeout):
|
|
raise FtdiError('Unable to set baudrate')
|
|
return actual
|
|
except USBError as exc:
|
|
raise FtdiError('UsbError: {exc}') from exc
|
|
|
|
def _set_frequency(self, frequency: float) -> float:
|
|
"""Convert a frequency value into a TCK divisor setting"""
|
|
if not self.is_mpsse:
|
|
raise FtdiFeatureError('Cannot change frequency w/ current mode')
|
|
if frequency > self.frequency_max:
|
|
raise FtdiFeatureError(f'Unsupported frequency: {frequency:.0f}')
|
|
# Calculate base speed clock divider
|
|
divcode = Ftdi.ENABLE_CLK_DIV5
|
|
divisor = int((Ftdi.BUS_CLOCK_BASE+frequency/2)/frequency)-1
|
|
divisor = max(0, min(0xFFFF, divisor))
|
|
actual_freq = Ftdi.BUS_CLOCK_BASE/(divisor+1)
|
|
error = (actual_freq/frequency)-1
|
|
# Should we use high speed clock available in H series?
|
|
if self.is_H_series:
|
|
# Calculate high speed clock divider
|
|
divisor_hs = int((Ftdi.BUS_CLOCK_HIGH+frequency/2)/frequency)-1
|
|
divisor_hs = max(0, min(0xFFFF, divisor_hs))
|
|
actual_freq_hs = Ftdi.BUS_CLOCK_HIGH/(divisor_hs+1)
|
|
error_hs = (actual_freq_hs/frequency)-1
|
|
# Enable if closer to desired frequency (percentually)
|
|
if abs(error_hs) < abs(error):
|
|
divcode = Ftdi.DISABLE_CLK_DIV5
|
|
divisor = divisor_hs
|
|
actual_freq = actual_freq_hs
|
|
error = error_hs
|
|
# FTDI expects little endian
|
|
if self.is_H_series:
|
|
cmd = bytearray((divcode,))
|
|
else:
|
|
cmd = bytearray()
|
|
cmd.extend((Ftdi.SET_TCK_DIVISOR, divisor & 0xff,
|
|
(divisor >> 8) & 0xff))
|
|
self.write_data(cmd)
|
|
self.validate_mpsse()
|
|
# Drain input buffer
|
|
self.purge_rx_buffer()
|
|
# Note that bus frequency may differ from clock frequency, when
|
|
# 3-phase clock is enable, in which case bus frequency = 2/3 clock
|
|
# frequency
|
|
if actual_freq > 1E6:
|
|
self.log.debug('Clock frequency: %.6f MHz (error: %+.1f %%)',
|
|
(actual_freq/1E6), error*100)
|
|
else:
|
|
self.log.debug('Clock frequency: %.3f KHz (error: %+.1f %%)',
|
|
(actual_freq/1E3), error*100)
|
|
return actual_freq
|
|
|
|
def __get_timeouts(self) -> Tuple[int, int]:
|
|
return self._usb_read_timeout, self._usb_write_timeout
|
|
|
|
def __set_timeouts(self, timeouts: Tuple[int, int]):
|
|
(read_timeout, write_timeout) = timeouts
|
|
self._usb_read_timeout = read_timeout
|
|
self._usb_write_timeout = write_timeout
|
|
|
|
timeouts = property(__get_timeouts, __set_timeouts)
|