1162 lines
43 KiB
Python
1162 lines
43 KiB
Python
# Copyright (c) 2017-2024, Emmanuel Blot <emmanuel.blot@free.fr>
|
|
# All rights reserved.
|
|
#
|
|
# SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
"""I2C support for PyFdti"""
|
|
|
|
from binascii import hexlify
|
|
from collections import namedtuple
|
|
from logging import getLogger
|
|
from struct import calcsize as scalc, pack as spack, unpack as sunpack
|
|
from threading import Lock
|
|
from typing import Any, Iterable, Mapping, Optional, Tuple, Union
|
|
from usb.core import Device as UsbDevice
|
|
from .ftdi import Ftdi, FtdiFeatureError
|
|
from .misc import to_bool
|
|
|
|
|
|
class I2cIOError(IOError):
|
|
"""I2c I/O error"""
|
|
|
|
|
|
class I2cNackError(I2cIOError):
|
|
"""I2c NACK receive from slave"""
|
|
|
|
|
|
class I2cTimeoutError(TimeoutError):
|
|
"""I2c timeout on polling"""
|
|
|
|
|
|
class I2cPort:
|
|
"""I2C port.
|
|
|
|
An I2C port is never instanciated directly:
|
|
use :py:meth:`I2cController.get_port()` method to obtain an I2C port.
|
|
|
|
``relax`` parameter in I2cPort methods may be used to prevent the master
|
|
from releasing the I2C bus, if some further data should be exchanged
|
|
with the slave device. Note that in case of any error, the I2C bus is
|
|
released and the ``relax`` parameter is ignored in such an event.
|
|
|
|
Example:
|
|
|
|
>>> ctrl = I2cController()
|
|
>>> ctrl.configure('ftdi://ftdi:232h/1')
|
|
>>> i2c = ctrl.get_port(0x21)
|
|
>>> # send 2 bytes
|
|
>>> i2c.write([0x12, 0x34])
|
|
>>> # send 2 bytes, then receive 2 bytes
|
|
>>> out = i2c.exchange([0x12, 0x34], 2)
|
|
"""
|
|
FORMATS = {scalc(fmt): fmt for fmt in 'BHI'}
|
|
|
|
def __init__(self, controller: 'I2cController', address: int):
|
|
self._controller = controller
|
|
self._address = address
|
|
self._shift = 0
|
|
self._endian = '<'
|
|
self._format = 'B'
|
|
|
|
def configure_register(self,
|
|
bigendian: bool = False, width: int = 1) -> None:
|
|
"""Reconfigure the format of the slave address register (if any)
|
|
|
|
:param bigendian: True for a big endian encoding, False otherwise
|
|
:param width: width, in bytes, of the register
|
|
"""
|
|
try:
|
|
self._format = self.FORMATS[width]
|
|
except KeyError as exc:
|
|
raise I2cIOError('Unsupported integer width') from exc
|
|
self._endian = '>' if bigendian else '<'
|
|
|
|
def shift_address(self, offset: int):
|
|
"""Tweak the I2C slave address, as required with some devices
|
|
"""
|
|
I2cController.validate_address(self._address+offset)
|
|
self._shift = offset
|
|
|
|
def read(self, readlen: int = 0, relax: bool = True,
|
|
start: bool = True) -> bytes:
|
|
"""Read one or more bytes from a remote slave
|
|
|
|
:param readlen: count of bytes to read out.
|
|
:param relax: whether to relax the bus (emit STOP) or not
|
|
:param start: whether to emit a start sequence (w/ address)
|
|
:return: byte sequence of read out bytes
|
|
:raise I2cIOError: if device is not configured or input parameters
|
|
are invalid
|
|
"""
|
|
return self._controller.read(
|
|
self._address+self._shift if start else None,
|
|
readlen=readlen, relax=relax)
|
|
|
|
def write(self, out: Union[bytes, bytearray, Iterable[int]],
|
|
relax: bool = True, start: bool = True) -> None:
|
|
"""Write one or more bytes to a remote slave
|
|
|
|
:param out: the byte buffer to send
|
|
:param relax: whether to relax the bus (emit STOP) or not
|
|
:param start: whether to emit a start sequence (w/ address)
|
|
:raise I2cIOError: if device is not configured or input parameters
|
|
are invalid
|
|
"""
|
|
return self._controller.write(
|
|
self._address+self._shift if start else None,
|
|
out, relax=relax)
|
|
|
|
def read_from(self, regaddr: int, readlen: int = 0,
|
|
relax: bool = True, start: bool = True) -> bytes:
|
|
"""Read one or more bytes from a given register at remote slave
|
|
|
|
:param regaddr: slave register address to read from
|
|
:param readlen: count of bytes to read out.
|
|
:param relax: whether to relax the bus (emit STOP) or not
|
|
:param start: whether to emit a start sequence (w/ address)
|
|
:return: data read out from the slave
|
|
:raise I2cIOError: if device is not configured or input parameters
|
|
are invalid
|
|
"""
|
|
return self._controller.exchange(
|
|
self._address+self._shift if start else None,
|
|
out=self._make_buffer(regaddr), readlen=readlen, relax=relax)
|
|
|
|
def write_to(self, regaddr: int,
|
|
out: Union[bytes, bytearray, Iterable[int]],
|
|
relax: bool = True, start: bool = True):
|
|
"""Write one or more bytes to a given register at a remote slave
|
|
|
|
:param regaddr: slave register address to write to
|
|
:param out: the byte buffer to send
|
|
:param relax: whether to relax the bus (emit STOP) or not
|
|
:param start: whether to emit a start sequence (w/ address)
|
|
:raise I2cIOError: if device is not configured or input parameters
|
|
are invalid
|
|
"""
|
|
return self._controller.write(
|
|
self._address+self._shift if start else None,
|
|
out=self._make_buffer(regaddr, out), relax=relax)
|
|
|
|
def exchange(self, out: Union[bytes, bytearray, Iterable[int]] = b'',
|
|
readlen: int = 0,
|
|
relax: bool = True, start: bool = True) -> bytes:
|
|
"""Perform an exchange or a transaction with the I2c slave
|
|
|
|
:param out: an array of bytes to send to the I2c slave,
|
|
may be empty to only read out data from the slave
|
|
:param readlen: count of bytes to read out from the slave,
|
|
may be zero to only write to the slave
|
|
:param relax: whether to relax the bus (emit STOP) or not
|
|
:param start: whether to emit a start sequence (w/ address)
|
|
:return: data read out from the slave
|
|
"""
|
|
return self._controller.exchange(
|
|
self._address+self._shift if start else None, out,
|
|
readlen, relax=relax)
|
|
|
|
def poll(self, write: bool = False,
|
|
relax: bool = True, start: bool = True) -> bool:
|
|
"""Poll a remote slave, expect ACK or NACK.
|
|
|
|
:param write: poll in write mode (vs. read)
|
|
:param relax: whether to relax the bus (emit STOP) or not
|
|
:param start: whether to emit a start sequence (w/ address)
|
|
:return: True if the slave acknowledged, False otherwise
|
|
"""
|
|
return self._controller.poll(
|
|
self._address+self._shift if start else None, write,
|
|
relax=relax)
|
|
|
|
def poll_cond(self, width: int, mask: int, value: int, count: int,
|
|
relax: bool = True, start: bool = True) -> Optional[bytes]:
|
|
"""Poll a remove slave, watching for condition to satisfy.
|
|
On each poll cycle, a repeated start condition is emitted, without
|
|
releasing the I2C bus, and an ACK is returned to the slave.
|
|
|
|
If relax is set, this method releases the I2C bus however it leaves.
|
|
|
|
:param width: count of bytes to poll for the condition check,
|
|
that is the size of the condition register
|
|
:param mask: binary mask to apply on the condition register
|
|
before testing for the value
|
|
:param value: value to test the masked condition register
|
|
against. Condition is satisfied when register & mask == value
|
|
:param count: maximum poll count before raising a timeout
|
|
:param relax: whether to relax the bus (emit STOP) or not
|
|
:param start: whether to emit a start sequence (w/ address)
|
|
:return: the polled register value
|
|
:raise I2cTimeoutError: if poll condition is not satisified
|
|
"""
|
|
try:
|
|
fmt = ''.join((self._endian, self.FORMATS[width]))
|
|
except KeyError as exc:
|
|
raise I2cIOError('Unsupported integer width') from exc
|
|
return self._controller.poll_cond(
|
|
self._address+self._shift if start else None,
|
|
fmt, mask, value, count, relax=relax)
|
|
|
|
def flush(self) -> None:
|
|
"""Force the flush of the HW FIFOs.
|
|
"""
|
|
self._controller.flush()
|
|
|
|
@property
|
|
def frequency(self) -> float:
|
|
"""Provide the current I2c bus frequency.
|
|
"""
|
|
return self._controller.frequency
|
|
|
|
@property
|
|
def address(self) -> int:
|
|
"""Return the slave address."""
|
|
return self._address
|
|
|
|
def _make_buffer(self, regaddr: int,
|
|
out: Union[bytes, bytearray, Iterable[int],
|
|
None] = None) -> bytes:
|
|
data = bytearray()
|
|
data.extend(spack(f'{self._endian}{self._format}', regaddr))
|
|
if out:
|
|
data.extend(out)
|
|
return bytes(data)
|
|
|
|
|
|
class I2cGpioPort:
|
|
"""GPIO port
|
|
|
|
A I2cGpioPort instance enables to drive GPIOs wich are not reserved for
|
|
I2c feature as regular GPIOs.
|
|
|
|
GPIO are managed as a bitfield. The LSBs are reserved for the I2c
|
|
feature, which means that the lowest pin that can be used as a GPIO is
|
|
*b3*:
|
|
|
|
* *b0*: I2C SCL
|
|
* *b1*: I2C SDA_O
|
|
* *b2*: I2C SDA_I
|
|
* *b3*: first GPIO
|
|
* *b7*: reserved for I2C clock stretching, if this mode is enabled
|
|
|
|
There is no offset bias in GPIO bit position, *i.e.* the first available
|
|
GPIO can be reached from as ``0x08``.
|
|
|
|
Bitfield size depends on the FTDI device: 4432H series use 8-bit GPIO
|
|
ports, while 232H and 2232H series use wide 16-bit ports.
|
|
|
|
An I2cGpio port is never instanciated directly: use
|
|
:py:meth:`I2cController.get_gpio()` method to obtain the GPIO port.
|
|
"""
|
|
def __init__(self, controller: 'I2cController'):
|
|
self.log = getLogger('pyftdi.i2c.gpio')
|
|
self._controller = controller
|
|
|
|
@property
|
|
def pins(self) -> int:
|
|
"""Report the configured GPIOs as a bitfield.
|
|
|
|
A true bit represents a GPIO, a false bit a reserved or not
|
|
configured pin.
|
|
|
|
:return: the bitfield of configured GPIO pins.
|
|
"""
|
|
return self._controller.gpio_pins
|
|
|
|
@property
|
|
def all_pins(self) -> int:
|
|
"""Report the addressable GPIOs as a bitfield.
|
|
|
|
A true bit represents a pin which may be used as a GPIO, a false bit
|
|
a reserved pin (for I2C support)
|
|
|
|
:return: the bitfield of configurable GPIO pins.
|
|
"""
|
|
return self._controller.gpio_all_pins
|
|
|
|
@property
|
|
def width(self) -> int:
|
|
"""Report the FTDI count of addressable pins.
|
|
|
|
Note that all pins, including reserved I2C ones, are reported.
|
|
|
|
:return: the count of IO pins (including I2C ones).
|
|
"""
|
|
return self._controller.width
|
|
|
|
@property
|
|
def direction(self) -> int:
|
|
"""Provide the FTDI GPIO direction.self
|
|
|
|
A true bit represents an output GPIO, a false bit an input GPIO.
|
|
|
|
:return: the bitfield of direction.
|
|
"""
|
|
return self._controller.direction
|
|
|
|
def read(self, with_output: bool = False) -> int:
|
|
"""Read GPIO port.
|
|
|
|
:param with_output: set to unmask output pins
|
|
:return: the GPIO port pins as a bitfield
|
|
"""
|
|
return self._controller.read_gpio(with_output)
|
|
|
|
def write(self, value: int) -> None:
|
|
"""Write GPIO port.
|
|
|
|
:param value: the GPIO port pins as a bitfield
|
|
"""
|
|
return self._controller.write_gpio(value)
|
|
|
|
def set_direction(self, pins: int, direction: int) -> None:
|
|
"""Change the direction of the GPIO pins.
|
|
|
|
:param pins: which GPIO pins should be reconfigured
|
|
:param direction: direction bitfield (high level for output)
|
|
"""
|
|
self._controller.set_gpio_direction(pins, direction)
|
|
|
|
|
|
I2CTimings = namedtuple('I2CTimings', 't_hd_sta t_su_sta t_su_sto t_buf')
|
|
"""I2C standard timings.
|
|
"""
|
|
|
|
|
|
class I2cController:
|
|
"""I2c master.
|
|
|
|
An I2c master should be instanciated only once for each FTDI port that
|
|
supports MPSSE (one or two ports, depending on the FTDI device).
|
|
|
|
Once configured, :py:func:`get_port` should be invoked to obtain an I2c
|
|
port for each I2c slave to drive. I2c port should handle all I/O
|
|
requests for its associated HW slave.
|
|
|
|
It is not recommended to use I2cController :py:func:`read`,
|
|
:py:func:`write` or :py:func:`exchange` directly.
|
|
|
|
* ``SCK`` should be connected to ``A*BUS0``, and ``A*BUS7`` if clock
|
|
stretching mode is enabled
|
|
* ``SDA`` should be connected to ``A*BUS1`` **and** ``A*BUS2``
|
|
"""
|
|
|
|
LOW = 0x00
|
|
HIGH = 0xff
|
|
BIT0 = 0x01
|
|
IDLE = HIGH
|
|
SCL_BIT = 0x01 # AD0
|
|
SDA_O_BIT = 0x02 # AD1
|
|
SDA_I_BIT = 0x04 # AD2
|
|
SCL_FB_BIT = 0x80 # AD7
|
|
PAYLOAD_MAX_LENGTH = 0xFF00 # 16 bits max (- spare for control)
|
|
HIGHEST_I2C_ADDRESS = 0x7F
|
|
DEFAULT_BUS_FREQUENCY = 100000.0
|
|
HIGH_BUS_FREQUENCY = 400000.0
|
|
RETRY_COUNT = 3
|
|
|
|
I2C_MASK = SCL_BIT | SDA_O_BIT | SDA_I_BIT
|
|
I2C_MASK_CS = SCL_BIT | SDA_O_BIT | SDA_I_BIT | SCL_FB_BIT
|
|
I2C_DIR = SCL_BIT | SDA_O_BIT
|
|
|
|
I2C_100K = I2CTimings(4.0E-6, 4.7E-6, 4.0E-6, 4.7E-6)
|
|
I2C_400K = I2CTimings(0.6E-6, 0.6E-6, 0.6E-6, 1.3E-6)
|
|
I2C_1M = I2CTimings(0.26E-6, 0.26E-6, 0.26E-6, 0.5E-6)
|
|
|
|
def __init__(self):
|
|
self._ftdi = Ftdi()
|
|
self._lock = Lock()
|
|
self.log = getLogger('pyftdi.i2c')
|
|
self._gpio_port = None
|
|
self._gpio_dir = 0
|
|
self._gpio_low = 0
|
|
self._gpio_mask = 0
|
|
self._i2c_mask = 0
|
|
self._wide_port = False
|
|
self._slaves = {}
|
|
self._retry_count = self.RETRY_COUNT
|
|
self._frequency = 0.0
|
|
self._immediate = (Ftdi.SEND_IMMEDIATE,)
|
|
self._read_bit = (Ftdi.READ_BITS_PVE_MSB, 0)
|
|
self._read_byte = (Ftdi.READ_BYTES_PVE_MSB, 0, 0)
|
|
self._write_byte = (Ftdi.WRITE_BYTES_NVE_MSB, 0, 0)
|
|
self._nack = (Ftdi.WRITE_BITS_NVE_MSB, 0, self.HIGH)
|
|
self._ack = (Ftdi.WRITE_BITS_NVE_MSB, 0, self.LOW)
|
|
self._ck_delay = 1
|
|
self._fake_tristate = False
|
|
self._tx_size = 1
|
|
self._rx_size = 1
|
|
self._ck_hd_sta = 0
|
|
self._ck_su_sto = 0
|
|
self._ck_idle = 0
|
|
self._read_optim = True
|
|
self._disable_3phase_clock = False
|
|
|
|
def set_retry_count(self, count: int) -> None:
|
|
"""Change the default retry count when a communication error occurs,
|
|
before bailing out.
|
|
:param count: count of retries
|
|
"""
|
|
if not isinstance(count, int) or not 0 < count <= 16:
|
|
raise ValueError('Invalid retry count')
|
|
self._retry_count = count
|
|
|
|
def configure(self, url: Union[str, UsbDevice],
|
|
**kwargs: Mapping[str, Any]) -> None:
|
|
"""Configure the FTDI interface as a I2c master.
|
|
|
|
:param url: FTDI URL string, such as ``ftdi://ftdi:232h/1``
|
|
:param kwargs: options to configure the I2C bus
|
|
|
|
Accepted options:
|
|
|
|
* ``interface``: when URL is specifed as a USB device, the interface
|
|
named argument can be used to select a specific port of the FTDI
|
|
device, as an integer starting from 1.
|
|
* ``direction`` a bitfield specifying the FTDI GPIO direction,
|
|
where high level defines an output, and low level defines an
|
|
input. Only useful to setup default IOs at start up, use
|
|
:py:class:`I2cGpioPort` to drive GPIOs. Note that pins reserved
|
|
for I2C feature take precedence over any this setting.
|
|
* ``initial`` a bitfield specifying the initial output value. Only
|
|
useful to setup default IOs at start up, use
|
|
:py:class:`I2cGpioPort` to drive GPIOs.
|
|
* ``frequency`` float value the I2C bus frequency in Hz
|
|
* ``clockstretching`` boolean value to enable clockstreching.
|
|
xD7 (GPIO7) pin should be connected back to xD0 (SCK)
|
|
* ``debug`` to increase log verbosity, using MPSSE tracer
|
|
"""
|
|
if 'frequency' in kwargs:
|
|
frequency = kwargs['frequency']
|
|
del kwargs['frequency']
|
|
else:
|
|
frequency = self.DEFAULT_BUS_FREQUENCY
|
|
# Fix frequency for 3-phase clock
|
|
if frequency <= 100E3:
|
|
timings = self.I2C_100K
|
|
elif frequency <= 400E3:
|
|
timings = self.I2C_400K
|
|
else:
|
|
timings = self.I2C_1M
|
|
if 'clockstretching' in kwargs:
|
|
clkstrch = bool(kwargs['clockstretching'])
|
|
del kwargs['clockstretching']
|
|
else:
|
|
clkstrch = False
|
|
if 'direction' in kwargs:
|
|
io_dir = int(kwargs['direction'])
|
|
del kwargs['direction']
|
|
else:
|
|
io_dir = 0
|
|
if 'initial' in kwargs:
|
|
io_out = int(kwargs['initial'])
|
|
del kwargs['initial']
|
|
else:
|
|
io_out = 0
|
|
if 'interface' in kwargs:
|
|
if isinstance(url, str):
|
|
raise I2cIOError('url and interface are mutually exclusive')
|
|
interface = int(kwargs['interface'])
|
|
del kwargs['interface']
|
|
else:
|
|
interface = 1
|
|
if 'rdoptim' in kwargs:
|
|
self._read_optim = to_bool(kwargs['rdoptim'])
|
|
del kwargs['rdoptim']
|
|
with self._lock:
|
|
self._ck_hd_sta = self._compute_delay_cycles(timings.t_hd_sta)
|
|
self._ck_su_sto = self._compute_delay_cycles(timings.t_su_sto)
|
|
ck_su_sta = self._compute_delay_cycles(timings.t_su_sta)
|
|
ck_buf = self._compute_delay_cycles(timings.t_buf)
|
|
self._ck_idle = max(ck_su_sta, ck_buf)
|
|
self._ck_delay = ck_buf
|
|
if clkstrch:
|
|
self._i2c_mask = self.I2C_MASK_CS
|
|
else:
|
|
self._i2c_mask = self.I2C_MASK
|
|
# until the device is open, there is no way to tell if it has a
|
|
# wide (16) or narrow port (8). Lower API can deal with any, so
|
|
# delay any truncation till the device is actually open
|
|
self._set_gpio_direction(16, io_out, io_dir)
|
|
# as 3-phase clock frequency mode is required for I2C mode, the
|
|
# FTDI clock should be adapted to match the required frequency.
|
|
kwargs['direction'] = self.I2C_DIR | self._gpio_dir
|
|
kwargs['initial'] = self.IDLE | (io_out & self._gpio_mask)
|
|
kwargs['frequency'] = (3.0*frequency)/2.0
|
|
if not isinstance(url, str):
|
|
frequency = self._ftdi.open_mpsse_from_device(
|
|
url, interface=interface, **kwargs)
|
|
else:
|
|
frequency = self._ftdi.open_mpsse_from_url(url, **kwargs)
|
|
self._frequency = (2.0*frequency)/3.0
|
|
self._tx_size, self._rx_size = self._ftdi.fifo_sizes
|
|
self._ftdi.enable_adaptive_clock(clkstrch)
|
|
if not self._disable_3phase_clock:
|
|
self._ftdi.enable_3phase_clock(True)
|
|
try:
|
|
self._ftdi.enable_drivezero_mode(self.SCL_BIT |
|
|
self.SDA_O_BIT |
|
|
self.SDA_I_BIT)
|
|
except FtdiFeatureError:
|
|
# when open collector feature is not available (FT2232, FT4232)
|
|
# SDA line is temporary move to high-z to enable ACK/NACK
|
|
# read back from slave
|
|
self._fake_tristate = True
|
|
self._wide_port = self._ftdi.has_wide_port
|
|
if not self._wide_port:
|
|
self._set_gpio_direction(8, io_out & 0xFF, io_dir & 0xFF)
|
|
|
|
def force_clock_mode(self, enable: bool) -> None:
|
|
"""Force unsupported I2C clock signalling on devices that have no I2C
|
|
capabilities (i.e. FT2232D). I2cController cowardly refuses to use
|
|
unsupported devices. When this mode is enabled, I2cController can
|
|
drive such devices, but I2C signalling is not compliant with I2C
|
|
specifications and may not work with most I2C slaves.
|
|
|
|
:py:meth:`force_clock_mode` should always be called before
|
|
:py:meth:`configure` to be effective.
|
|
|
|
This is a fully unsupported feature (bug reports will be ignored).
|
|
|
|
:param enable: whether to drive non-I2C capable devices.
|
|
"""
|
|
if enable:
|
|
self.log.info('I2C signalling forced to non-I2C compliant mode.')
|
|
self._disable_3phase_clock = enable
|
|
|
|
def close(self, freeze: bool = False) -> None:
|
|
"""Close the FTDI interface.
|
|
|
|
:param freeze: if set, FTDI port is not reset to its default
|
|
state on close.
|
|
"""
|
|
with self._lock:
|
|
if self._ftdi.is_connected:
|
|
self._ftdi.close(freeze)
|
|
|
|
def terminate(self) -> None:
|
|
"""Close the FTDI interface.
|
|
|
|
:note: deprecated API, use close()
|
|
"""
|
|
self.close()
|
|
|
|
def get_port(self, address: int) -> I2cPort:
|
|
"""Obtain an I2cPort to drive an I2c slave.
|
|
|
|
:param address: the address on the I2C bus
|
|
:return: an I2cPort instance
|
|
"""
|
|
if not self._ftdi.is_connected:
|
|
raise I2cIOError("FTDI controller not initialized")
|
|
self.validate_address(address)
|
|
if address not in self._slaves:
|
|
self._slaves[address] = I2cPort(self, address)
|
|
return self._slaves[address]
|
|
|
|
def get_gpio(self) -> I2cGpioPort:
|
|
"""Retrieve the GPIO port.
|
|
|
|
:return: GPIO port
|
|
"""
|
|
with self._lock:
|
|
if not self._ftdi.is_connected:
|
|
raise I2cIOError("FTDI controller not initialized")
|
|
if not self._gpio_port:
|
|
self._gpio_port = I2cGpioPort(self)
|
|
return self._gpio_port
|
|
|
|
@property
|
|
def ftdi(self) -> Ftdi:
|
|
"""Return the Ftdi instance.
|
|
|
|
:return: the Ftdi instance
|
|
"""
|
|
return self._ftdi
|
|
|
|
@property
|
|
def configured(self) -> bool:
|
|
"""Test whether the device has been properly configured.
|
|
|
|
:return: True if configured
|
|
"""
|
|
return self._ftdi.is_connected and bool(self._start)
|
|
|
|
@classmethod
|
|
def validate_address(cls, address: Optional[int]) -> None:
|
|
"""Assert an I2C slave address is in the supported range.
|
|
None is a special bypass address.
|
|
|
|
:param address: the address on the I2C bus
|
|
:raise I2cIOError: if the I2C slave address is not supported
|
|
"""
|
|
if address is None:
|
|
return
|
|
if address > cls.HIGHEST_I2C_ADDRESS:
|
|
raise I2cIOError(f'No such I2c slave: 0x{address:02x}')
|
|
|
|
@property
|
|
def frequency_max(self) -> float:
|
|
"""Provides the maximum I2C clock frequency in Hz.
|
|
|
|
:return: I2C bus clock frequency
|
|
"""
|
|
return self._ftdi.frequency_max
|
|
|
|
@property
|
|
def frequency(self) -> float:
|
|
"""Provides the current I2C clock frequency in Hz.
|
|
|
|
:return: the I2C bus clock frequency
|
|
"""
|
|
return self._frequency
|
|
|
|
@property
|
|
def direction(self) -> int:
|
|
"""Provide the FTDI pin direction
|
|
|
|
A true bit represents an output pin, a false bit an input pin.
|
|
|
|
:return: the bitfield of direction.
|
|
"""
|
|
return self.I2C_DIR | self._gpio_dir
|
|
|
|
@property
|
|
def gpio_pins(self) -> int:
|
|
"""Report the configured GPIOs as a bitfield.
|
|
|
|
A true bit represents a GPIO, a false bit a reserved or not
|
|
configured pin.
|
|
|
|
:return: the bitfield of configured GPIO pins.
|
|
"""
|
|
with self._lock:
|
|
return self._gpio_mask
|
|
|
|
@property
|
|
def gpio_all_pins(self) -> int:
|
|
"""Report the addressable GPIOs as a bitfield.
|
|
|
|
A true bit represents a pin which may be used as a GPIO, a false bit
|
|
a reserved pin (for I2C support)
|
|
|
|
:return: the bitfield of configurable GPIO pins.
|
|
"""
|
|
mask = (1 << self.width) - 1
|
|
with self._lock:
|
|
return mask & ~self._i2c_mask
|
|
|
|
@property
|
|
def width(self) -> int:
|
|
"""Report the FTDI count of addressable pins.
|
|
|
|
:return: the count of IO pins (including I2C ones).
|
|
"""
|
|
return 16 if self._wide_port else 8
|
|
|
|
def read(self, address: Optional[int], readlen: int = 1,
|
|
relax: bool = True) -> bytes:
|
|
"""Read one or more bytes from a remote slave
|
|
|
|
:param address: the address on the I2C bus, or None to discard start
|
|
:param readlen: count of bytes to read out.
|
|
:param relax: not used
|
|
:return: read bytes
|
|
:raise I2cIOError: if device is not configured or input parameters
|
|
are invalid
|
|
|
|
Address is a logical slave address (0x7f max)
|
|
|
|
Most I2C devices require a register address to read out
|
|
check out the exchange() method.
|
|
"""
|
|
if not self.configured:
|
|
raise I2cIOError("FTDI controller not initialized")
|
|
self.validate_address(address)
|
|
if address is None:
|
|
i2caddress = None
|
|
else:
|
|
i2caddress = (address << 1) & self.HIGH
|
|
i2caddress |= self.BIT0
|
|
retries = self._retry_count
|
|
do_epilog = True
|
|
with self._lock:
|
|
while True:
|
|
try:
|
|
self._do_prolog(i2caddress)
|
|
data = self._do_read(readlen)
|
|
do_epilog = relax
|
|
return data
|
|
except I2cNackError:
|
|
retries -= 1
|
|
if not retries:
|
|
raise
|
|
self.log.warning('Retry read')
|
|
finally:
|
|
if do_epilog:
|
|
self._do_epilog()
|
|
|
|
def write(self, address: Optional[int],
|
|
out: Union[bytes, bytearray, Iterable[int]],
|
|
relax: bool = True) -> None:
|
|
"""Write one or more bytes to a remote slave
|
|
|
|
:param address: the address on the I2C bus, or None to discard start
|
|
:param out: the byte buffer to send
|
|
:param relax: whether to relax the bus (emit STOP) or not
|
|
:raise I2cIOError: if device is not configured or input parameters
|
|
are invalid
|
|
|
|
Address is a logical slave address (0x7f max)
|
|
|
|
Most I2C devices require a register address to write into. It should
|
|
be added as the first (byte)s of the output buffer.
|
|
"""
|
|
if not self.configured:
|
|
raise I2cIOError("FTDI controller not initialized")
|
|
self.validate_address(address)
|
|
if address is None:
|
|
i2caddress = None
|
|
else:
|
|
i2caddress = (address << 1) & self.HIGH
|
|
retries = self._retry_count
|
|
do_epilog = True
|
|
with self._lock:
|
|
while True:
|
|
try:
|
|
self._do_prolog(i2caddress)
|
|
self._do_write(out)
|
|
do_epilog = relax
|
|
return
|
|
except I2cNackError:
|
|
retries -= 1
|
|
if not retries:
|
|
raise
|
|
self.log.warning('Retry write')
|
|
finally:
|
|
if do_epilog:
|
|
self._do_epilog()
|
|
|
|
def exchange(self, address: Optional[int],
|
|
out: Union[bytes, bytearray, Iterable[int]],
|
|
readlen: int = 0, relax: bool = True) -> bytearray:
|
|
"""Send a byte sequence to a remote slave followed with
|
|
a read request of one or more bytes.
|
|
|
|
This command is useful to tell the slave what data
|
|
should be read out.
|
|
|
|
:param address: the address on the I2C bus, or None to discard start
|
|
:param out: the byte buffer to send
|
|
:param readlen: count of bytes to read out.
|
|
:param relax: whether to relax the bus (emit STOP) or not
|
|
:return: read bytes
|
|
:raise I2cIOError: if device is not configured or input parameters
|
|
are invalid
|
|
|
|
Address is a logical slave address (0x7f max)
|
|
"""
|
|
if not self.configured:
|
|
raise I2cIOError("FTDI controller not initialized")
|
|
self.validate_address(address)
|
|
if readlen < 1:
|
|
raise I2cIOError('Nothing to read')
|
|
if readlen > (self.PAYLOAD_MAX_LENGTH/3-1):
|
|
raise I2cIOError("Input payload is too large")
|
|
if address is None:
|
|
i2caddress = None
|
|
else:
|
|
i2caddress = (address << 1) & self.HIGH
|
|
retries = self._retry_count
|
|
do_epilog = True
|
|
data = bytearray()
|
|
with self._lock:
|
|
while True:
|
|
try:
|
|
self._do_prolog(i2caddress)
|
|
self._do_write(out)
|
|
if i2caddress is not None:
|
|
self._do_prolog(i2caddress | self.BIT0)
|
|
if readlen:
|
|
data = self._do_read(readlen)
|
|
do_epilog = relax
|
|
return data
|
|
except I2cNackError:
|
|
retries -= 1
|
|
if not retries:
|
|
raise
|
|
self.log.warning('Retry exchange')
|
|
finally:
|
|
if do_epilog:
|
|
self._do_epilog()
|
|
|
|
def poll(self, address: int, write: bool = False,
|
|
relax: bool = True) -> bool:
|
|
"""Poll a remote slave, expect ACK or NACK.
|
|
|
|
:param address: the address on the I2C bus, or None to discard start
|
|
:param write: poll in write mode (vs. read)
|
|
:param relax: whether to relax the bus (emit STOP) or not
|
|
:return: True if the slave acknowledged, False otherwise
|
|
"""
|
|
if not self.configured:
|
|
raise I2cIOError("FTDI controller not initialized")
|
|
self.validate_address(address)
|
|
if address is None:
|
|
i2caddress = None
|
|
else:
|
|
i2caddress = (address << 1) & self.HIGH
|
|
if not write:
|
|
i2caddress |= self.BIT0
|
|
do_epilog = True
|
|
with self._lock:
|
|
try:
|
|
self._do_prolog(i2caddress)
|
|
do_epilog = relax
|
|
return True
|
|
except I2cNackError:
|
|
self.log.info('Not ready')
|
|
return False
|
|
finally:
|
|
if do_epilog:
|
|
self._do_epilog()
|
|
|
|
def poll_cond(self, address: int, fmt: str, mask: int, value: int,
|
|
count: int, relax: bool = True) -> Optional[bytes]:
|
|
"""Poll a remove slave, watching for condition to satisfy.
|
|
On each poll cycle, a repeated start condition is emitted, without
|
|
releasing the I2C bus, and an ACK is returned to the slave.
|
|
|
|
If relax is set, this method releases the I2C bus however it leaves.
|
|
|
|
:param address: the address on the I2C bus, or None to discard start
|
|
:param fmt: struct format for poll register
|
|
:param mask: binary mask to apply on the condition register
|
|
before testing for the value
|
|
:param value: value to test the masked condition register
|
|
against. Condition is satisfied when register & mask == value
|
|
:param count: maximum poll count before raising a timeout
|
|
:param relax: whether to relax the bus (emit STOP) or not
|
|
:return: the polled register value, or None if poll failed
|
|
"""
|
|
if not self.configured:
|
|
raise I2cIOError("FTDI controller not initialized")
|
|
self.validate_address(address)
|
|
if address is None:
|
|
i2caddress = None
|
|
else:
|
|
i2caddress = (address << 1) & self.HIGH
|
|
i2caddress |= self.BIT0
|
|
do_epilog = True
|
|
with self._lock:
|
|
try:
|
|
retry = 0
|
|
while retry < count:
|
|
retry += 1
|
|
size = scalc(fmt)
|
|
self._do_prolog(i2caddress)
|
|
data = self._do_read(size)
|
|
self.log.debug("Poll data: %s", hexlify(data).decode())
|
|
cond, = sunpack(fmt, data)
|
|
if (cond & mask) == value:
|
|
self.log.debug('Poll condition matched')
|
|
break
|
|
data = None
|
|
self.log.debug('Poll condition not fulfilled: %x/%x',
|
|
cond & mask, value)
|
|
do_epilog = relax
|
|
if not data:
|
|
self.log.warning('Poll condition failed')
|
|
return data
|
|
except I2cNackError:
|
|
self.log.info('Not ready')
|
|
return None
|
|
finally:
|
|
if do_epilog:
|
|
self._do_epilog()
|
|
|
|
def flush(self) -> None:
|
|
"""Flush the HW FIFOs.
|
|
"""
|
|
if not self.configured:
|
|
raise I2cIOError("FTDI controller not initialized")
|
|
with self._lock:
|
|
self._ftdi.write_data(self._immediate)
|
|
self._ftdi.purge_buffers()
|
|
|
|
def read_gpio(self, with_output: bool = False) -> int:
|
|
"""Read GPIO port.
|
|
|
|
:param with_output: set to unmask output pins
|
|
:return: the GPIO port pins as a bitfield
|
|
"""
|
|
with self._lock:
|
|
data = self._read_raw(self._wide_port)
|
|
value = data & self._gpio_mask
|
|
if not with_output:
|
|
value &= ~self._gpio_dir
|
|
return value
|
|
|
|
def write_gpio(self, value: int) -> None:
|
|
"""Write GPIO port.
|
|
|
|
:param value: the GPIO port pins as a bitfield
|
|
"""
|
|
with self._lock:
|
|
if (value & self._gpio_dir) != value:
|
|
raise I2cIOError(f'No such GPO pins: '
|
|
f'{self._gpio_dir:04x}/{value:04x}')
|
|
# perform read-modify-write
|
|
use_high = self._wide_port and (self.direction & 0xff00)
|
|
data = self._read_raw(use_high)
|
|
data &= ~self._gpio_mask
|
|
data |= value
|
|
self._write_raw(data, use_high)
|
|
self._gpio_low = data & 0xFF & ~self._i2c_mask
|
|
|
|
def set_gpio_direction(self, pins: int, direction: int) -> None:
|
|
"""Change the direction of the GPIO pins.
|
|
|
|
:param pins: which GPIO pins should be reconfigured
|
|
:param direction: direction bitfield (on for output)
|
|
"""
|
|
with self._lock:
|
|
self._set_gpio_direction(16 if self._wide_port else 8,
|
|
pins, direction)
|
|
|
|
def _set_gpio_direction(self, width: int, pins: int,
|
|
direction: int) -> None:
|
|
if pins & self._i2c_mask:
|
|
raise I2cIOError('Cannot access I2C pins as GPIO')
|
|
gpio_mask = (1 << width) - 1
|
|
gpio_mask &= ~self._i2c_mask
|
|
if (pins & gpio_mask) != pins:
|
|
raise I2cIOError('No such GPIO pin(s)')
|
|
self._gpio_dir &= ~pins
|
|
self._gpio_dir |= (pins & direction)
|
|
self._gpio_mask = gpio_mask & pins
|
|
|
|
@property
|
|
def _data_lo(self) -> Tuple[int]:
|
|
return (Ftdi.SET_BITS_LOW,
|
|
self.SCL_BIT | self._gpio_low,
|
|
self.I2C_DIR | (self._gpio_dir & 0xFF))
|
|
|
|
@property
|
|
def _clk_lo_data_hi(self) -> Tuple[int]:
|
|
return (Ftdi.SET_BITS_LOW,
|
|
self.SDA_O_BIT | self._gpio_low,
|
|
self.I2C_DIR | (self._gpio_dir & 0xFF))
|
|
|
|
@property
|
|
def _clk_lo_data_input(self) -> Tuple[int]:
|
|
return (Ftdi.SET_BITS_LOW,
|
|
self.LOW | self._gpio_low,
|
|
self.SCL_BIT | (self._gpio_dir & 0xFF))
|
|
|
|
@property
|
|
def _clk_lo_data_lo(self) -> Tuple[int]:
|
|
return (Ftdi.SET_BITS_LOW,
|
|
self._gpio_low,
|
|
self.I2C_DIR | (self._gpio_dir & 0xFF))
|
|
|
|
@property
|
|
def _clk_input_data_input(self) -> Tuple[int]:
|
|
return (Ftdi.SET_BITS_LOW,
|
|
self._gpio_low,
|
|
(self._gpio_dir & 0xFF))
|
|
|
|
@property
|
|
def _idle(self) -> Tuple[int]:
|
|
return (Ftdi.SET_BITS_LOW,
|
|
self.I2C_DIR | self._gpio_low,
|
|
self.I2C_DIR | (self._gpio_dir & 0xFF))
|
|
|
|
@property
|
|
def _start(self) -> Tuple[int]:
|
|
return self._data_lo * self._ck_hd_sta + \
|
|
self._clk_lo_data_lo * self._ck_hd_sta
|
|
|
|
@property
|
|
def _stop(self) -> Tuple[int]:
|
|
return self._clk_lo_data_hi * self._ck_hd_sta + \
|
|
self._clk_lo_data_lo * self._ck_hd_sta + \
|
|
self._data_lo * self._ck_su_sto + \
|
|
self._idle * self._ck_idle
|
|
|
|
def _compute_delay_cycles(self, value: Union[int, float]) -> int:
|
|
# approx ceiling without relying on math module
|
|
# the bit delay is far from being precisely known anyway
|
|
bit_delay = self._ftdi.mpsse_bit_delay
|
|
return max(1, int((value + bit_delay) / bit_delay))
|
|
|
|
def _read_raw(self, read_high: bool) -> int:
|
|
if read_high:
|
|
cmd = bytes([Ftdi.GET_BITS_LOW,
|
|
Ftdi.GET_BITS_HIGH,
|
|
Ftdi.SEND_IMMEDIATE])
|
|
fmt = '<H'
|
|
else:
|
|
cmd = bytes([Ftdi.GET_BITS_LOW,
|
|
Ftdi.SEND_IMMEDIATE])
|
|
fmt = 'B'
|
|
self._ftdi.write_data(cmd)
|
|
size = scalc(fmt)
|
|
data = self._ftdi.read_data_bytes(size, 4)
|
|
if len(data) != size:
|
|
raise I2cIOError('Cannot read GPIO')
|
|
value, = sunpack(fmt, data)
|
|
return value
|
|
|
|
def _write_raw(self, data: int, write_high: bool):
|
|
direction = self.direction
|
|
low_data = data & 0xFF
|
|
low_dir = direction & 0xFF
|
|
if write_high:
|
|
high_data = (data >> 8) & 0xFF
|
|
high_dir = (direction >> 8) & 0xFF
|
|
cmd = bytes([Ftdi.SET_BITS_LOW, low_data, low_dir,
|
|
Ftdi.SET_BITS_HIGH, high_data, high_dir])
|
|
else:
|
|
cmd = bytes([Ftdi.SET_BITS_LOW, low_data, low_dir])
|
|
self._ftdi.write_data(cmd)
|
|
|
|
def _do_prolog(self, i2caddress: Optional[int]) -> None:
|
|
if i2caddress is None:
|
|
return
|
|
self.log.debug(' prolog 0x%x', i2caddress >> 1)
|
|
cmd = bytearray(self._idle * self._ck_delay)
|
|
cmd.extend(self._start)
|
|
cmd.extend(self._write_byte)
|
|
cmd.append(i2caddress)
|
|
try:
|
|
self._send_check_ack(cmd)
|
|
except I2cNackError:
|
|
self.log.warning('NACK @ 0x%02x', (i2caddress >> 1))
|
|
raise
|
|
|
|
def _do_epilog(self) -> None:
|
|
self.log.debug(' epilog')
|
|
cmd = bytearray(self._stop)
|
|
if self._fake_tristate:
|
|
# SCL high-Z, SDA high-Z
|
|
cmd.extend(self._clk_input_data_input)
|
|
self._ftdi.write_data(cmd)
|
|
# be sure to purge the MPSSE reply
|
|
self._ftdi.read_data_bytes(1, 1)
|
|
|
|
def _send_check_ack(self, cmd: bytearray):
|
|
# note: cmd is modified
|
|
if self._fake_tristate:
|
|
# SCL low, SDA high-Z (input)
|
|
cmd.extend(self._clk_lo_data_input)
|
|
# read SDA (ack from slave)
|
|
cmd.extend(self._read_bit)
|
|
else:
|
|
# SCL low, SDA high-Z
|
|
cmd.extend(self._clk_lo_data_hi)
|
|
# read SDA (ack from slave)
|
|
cmd.extend(self._read_bit)
|
|
cmd.extend(self._immediate)
|
|
self._ftdi.write_data(cmd)
|
|
ack = self._ftdi.read_data_bytes(1, 4)
|
|
if not ack:
|
|
raise I2cIOError('No answer from FTDI')
|
|
if ack[0] & self.BIT0:
|
|
raise I2cNackError('NACK from slave')
|
|
|
|
def _do_read(self, readlen: int) -> bytearray:
|
|
self.log.debug('- read %d byte(s)', readlen)
|
|
if not readlen:
|
|
# force a real read request on device, but discard any result
|
|
cmd = bytearray()
|
|
cmd.extend(self._immediate)
|
|
self._ftdi.write_data(cmd)
|
|
self._ftdi.read_data_bytes(0, 4)
|
|
return bytearray()
|
|
if self._fake_tristate:
|
|
read_byte = (self._clk_lo_data_input +
|
|
self._read_byte +
|
|
self._clk_lo_data_hi)
|
|
read_not_last = (read_byte + self._ack +
|
|
self._clk_lo_data_lo * self._ck_delay)
|
|
read_last = (read_byte + self._nack +
|
|
self._clk_lo_data_hi * self._ck_delay)
|
|
else:
|
|
read_not_last = (self._read_byte + self._ack +
|
|
self._clk_lo_data_hi * self._ck_delay)
|
|
read_last = (self._read_byte + self._nack +
|
|
self._clk_lo_data_hi * self._ck_delay)
|
|
# maximum RX size to fit in FTDI FIFO, minus 2 status bytes
|
|
chunk_size = self._rx_size-2
|
|
cmd_size = len(read_last)
|
|
# limit RX chunk size to the count of I2C packable commands in the FTDI
|
|
# TX FIFO (minus one byte for the last 'send immediate' command)
|
|
tx_count = (self._tx_size-1) // cmd_size
|
|
chunk_size = min(tx_count, chunk_size)
|
|
chunks = []
|
|
cmd = None
|
|
rem = readlen
|
|
if self._read_optim and rem > chunk_size:
|
|
chunk_size //= 2
|
|
self.log.debug('Use optimized transfer, %d byte at a time',
|
|
chunk_size)
|
|
cmd_chunk = bytearray()
|
|
cmd_chunk.extend(read_not_last * chunk_size)
|
|
cmd_chunk.extend(self._immediate)
|
|
|
|
def write_command_gen(length: int) -> bytearray:
|
|
if length <= 0:
|
|
# no more data
|
|
return bytearray()
|
|
if length <= chunk_size:
|
|
cmd = bytearray()
|
|
cmd.extend(read_not_last * (length-1))
|
|
cmd.extend(read_last)
|
|
cmd.extend(self._immediate)
|
|
return cmd
|
|
return cmd_chunk
|
|
|
|
while rem:
|
|
buf = self._ftdi.read_data_bytes(rem, 4, write_command_gen)
|
|
self.log.debug('- read %d bytes, rem: %d', len(buf), rem)
|
|
chunks.append(buf)
|
|
rem -= len(buf)
|
|
else:
|
|
while rem:
|
|
size = rem
|
|
if rem > chunk_size:
|
|
if not cmd:
|
|
# build the command sequence only once, as it may be
|
|
# repeated till the end of the transfer
|
|
cmd = bytearray()
|
|
cmd.extend(read_not_last * chunk_size)
|
|
size = chunk_size
|
|
else:
|
|
cmd = bytearray()
|
|
cmd.extend(read_not_last * (rem-1))
|
|
cmd.extend(read_last)
|
|
cmd.extend(self._immediate)
|
|
self._ftdi.write_data(cmd)
|
|
buf = self._ftdi.read_data_bytes(size, 4)
|
|
self.log.debug('- read %d byte(s): %s',
|
|
len(buf), hexlify(buf).decode())
|
|
chunks.append(buf)
|
|
rem -= size
|
|
return bytearray(b''.join(chunks))
|
|
|
|
def _do_write(self, out: Union[bytes, bytearray, Iterable[int]]):
|
|
if not isinstance(out, bytearray):
|
|
out = bytearray(out)
|
|
if not out:
|
|
return
|
|
self.log.debug('- write %d byte(s): %s',
|
|
len(out), hexlify(out).decode())
|
|
for byte in out:
|
|
if self._fake_tristate:
|
|
# leave SCL low, restore SDA as output
|
|
cmd = bytearray(self._clk_lo_data_hi)
|
|
cmd.extend(self._write_byte)
|
|
else:
|
|
cmd = bytearray(self._write_byte)
|
|
cmd.append(byte)
|
|
self._send_check_ack(cmd)
|