abschlussarbeit/_trash/lib/python3.11/site-packages/pyftdi/spi.py

949 lines
37 KiB
Python

# Copyright (c) 2010-2024, Emmanuel Blot <emmanuel.blot@free.fr>
# Copyright (c) 2016, Emmanuel Bouaziz <ebouaziz@free.fr>
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""SPI support for PyFdti"""
# pylint: disable=invalid-name
from logging import getLogger
from struct import calcsize as scalc, pack as spack, unpack as sunpack
from threading import Lock
from typing import Any, Iterable, Mapping, Optional, Set, Union
from usb.core import Device as UsbDevice
from .ftdi import Ftdi, FtdiError
class SpiIOError(FtdiError):
"""SPI I/O error"""
class SpiPort:
"""SPI port
An SPI port is never instanciated directly: use
:py:meth:`SpiController.get_port()` method to obtain an SPI port.
Example:
>>> ctrl = SpiController()
>>> ctrl.configure('ftdi://ftdi:232h/1')
>>> spi = ctrl.get_port(1)
>>> spi.set_frequency(1000000)
>>> # send 2 bytes
>>> spi.exchange([0x12, 0x34])
>>> # send 2 bytes, then receive 2 bytes
>>> out = spi.exchange([0x12, 0x34], 2)
>>> # send 2 bytes, then receive 4 bytes, manage the transaction
>>> out = spi.exchange([0x12, 0x34], 2, True, False)
>>> out.extend(spi.exchange([], 2, False, True))
"""
def __init__(self, controller: 'SpiController', cs: int, cs_hold: int = 3,
spi_mode: int = 0):
self.log = getLogger('pyftdi.spi.port')
self._controller = controller
self._frequency = self._controller.frequency
self._cs = cs
self._cs_hold = cs_hold
self.set_mode(spi_mode)
def exchange(self, out: Union[bytes, bytearray, Iterable[int]] = b'',
readlen: int = 0, start: bool = True, stop: bool = True,
duplex: bool = False, droptail: int = 0) -> bytes:
"""Perform an exchange or a transaction with the SPI slave
:param out: data to send to the SPI slave, may be empty to read out
data from the slave with no write.
:param readlen: count of bytes to read out from the slave,
may be zero to only write to the slave
:param start: whether to start an SPI transaction, i.e.
activate the /CS line for the slave. Use False to
resume a previously started transaction
:param stop: whether to desactivete the /CS line for the slave.
Use False if the transaction should complete with a
further call to exchange()
:param duplex: perform a full-duplex exchange (vs. half-duplex),
i.e. bits are clocked in and out at once.
:param droptail: ignore up to 7 last bits (for non-byte sized SPI
accesses)
:return: an array of bytes containing the data read out from the
slave
"""
return self._controller.exchange(self._frequency, out, readlen,
start and self._cs_prolog,
stop and self._cs_epilog,
self._cpol, self._cpha,
duplex, droptail)
def read(self, readlen: int = 0, start: bool = True, stop: bool = True,
droptail: int = 0) -> bytes:
"""Read out bytes from the slave
:param readlen: count of bytes to read out from the slave,
may be zero to only write to the slave
:param start: whether to start an SPI transaction, i.e.
activate the /CS line for the slave. Use False to
resume a previously started transaction
:param stop: whether to desactivete the /CS line for the slave.
Use False if the transaction should complete with a
further call to exchange()
:param droptail: ignore up to 7 last bits (for non-byte sized SPI
accesses)
:return: an array of bytes containing the data read out from the
slave
"""
return self._controller.exchange(self._frequency, [], readlen,
start and self._cs_prolog,
stop and self._cs_epilog,
self._cpol, self._cpha, False,
droptail)
def write(self, out: Union[bytes, bytearray, Iterable[int]],
start: bool = True, stop: bool = True, droptail: int = 0) \
-> None:
"""Write bytes to the slave
:param out: data to send to the SPI slave, may be empty to read out
data from the slave with no write.
:param start: whether to start an SPI transaction, i.e.
activate the /CS line for the slave. Use False to
resume a previously started transaction
:param stop: whether to desactivete the /CS line for the slave.
Use False if the transaction should complete with a
further call to exchange()
:param droptail: ignore up to 7 last bits (for non-byte sized SPI
accesses)
"""
return self._controller.exchange(self._frequency, out, 0,
start and self._cs_prolog,
stop and self._cs_epilog,
self._cpol, self._cpha, False,
droptail)
def flush(self) -> None:
"""Force the flush of the HW FIFOs"""
self._controller.flush()
def set_frequency(self, frequency: float):
"""Change SPI bus frequency
:param float frequency: the new frequency in Hz
"""
self._frequency = min(frequency, self._controller.frequency_max)
def set_mode(self, mode: int, cs_hold: Optional[int] = None) -> None:
"""Set or change the SPI mode to communicate with the SPI slave.
:param mode: new SPI mode
:param cs_hold: change the /CS hold duration (or keep using previous
value)
"""
if not 0 <= mode <= 3:
raise SpiIOError(f'Invalid SPI mode: {mode}')
if (mode & 0x2) and not self._controller.is_inverted_cpha_supported:
raise SpiIOError('SPI with CPHA high is not supported by '
'this FTDI device')
if cs_hold is None:
cs_hold = self._cs_hold
else:
self._cs_hold = cs_hold
self._cpol = bool(mode & 0x2)
self._cpha = bool(mode & 0x1)
cs_clock = 0xFF & ~((int(not self._cpol) and SpiController.SCK_BIT) |
SpiController.DO_BIT)
cs_select = 0xFF & ~((SpiController.CS_BIT << self._cs) |
(int(not self._cpol) and SpiController.SCK_BIT) |
SpiController.DO_BIT)
self._cs_prolog = bytes([cs_clock, cs_select])
self._cs_epilog = bytes([cs_select] + [cs_clock] * int(cs_hold))
def force_select(self, level: Optional[bool] = None,
cs_hold: float = 0) -> None:
"""Force-drive /CS signal.
This API is not designed for a regular usage, but is reserved to
very specific slave devices that require non-standard SPI
signalling. There are very few use cases where this API is required.
:param level: level to force on /CS output. This is a tri-state
value. A boolean value forces the selected signal
level; note that SpiPort no longer enforces that
following API calls generates valid SPI signalling:
use with extreme care. `None` triggers a pulse on /CS
output, i.e. /CS is not asserted once the method
returns, whatever the actual /CS level when this API
is called.
:param cs_hold: /CS hold duration, as a unitless value. It is not
possible to control the exact duration of the pulse,
as it depends on the USB bus and the FTDI frequency.
"""
clk, sel = self._cs_prolog
if cs_hold:
hold = max(1, cs_hold)
if hold > SpiController.PAYLOAD_MAX_LENGTH:
raise ValueError('cs_hold is too long')
else:
hold = self._cs_hold
if level is None:
seq = bytearray([clk])
seq.extend([sel]*(1+hold))
seq.extend([clk]*self._cs_hold)
elif level:
seq = bytearray([clk] * hold)
else:
seq = bytearray([clk] * hold)
seq.extend([sel]*(1+hold))
self._controller.force_control(self._frequency, bytes(seq))
@property
def frequency(self) -> float:
"""Return the current SPI bus block"""
return self._frequency
@property
def cs(self) -> int:
"""Return the /CS index.
:return: the /CS index (starting from 0)
"""
return self._cs
@property
def mode(self) -> int:
"""Return the current SPI mode.
:return: the SPI mode
"""
return (int(self._cpol) << 2) | int(self._cpha)
class SpiGpioPort:
"""GPIO port
A SpiGpioPort instance enables to drive GPIOs wich are not reserved for
SPI feature as regular GPIOs.
GPIO are managed as a bitfield. The LSBs are reserved for the SPI
feature, which means that the lowest pin that can be used as a GPIO is
*b4*:
* *b0*: SPI SCLK
* *b1*: SPI MOSI
* *b2*: SPI MISO
* *b3*: SPI CS0
* *b4*: SPI CS1 or first GPIO
If more than one SPI device is used, less GPIO pins are available, see
the cs_count argument of the SpiController constructor.
There is no offset bias in GPIO bit position, *i.e.* the first available
GPIO can be reached from as ``0x10``.
Bitfield size depends on the FTDI device: 4432H series use 8-bit GPIO
ports, while 232H and 2232H series use wide 16-bit ports.
An SpiGpio port is never instanciated directly: use
:py:meth:`SpiController.get_gpio()` method to obtain the GPIO port.
"""
def __init__(self, controller: 'SpiController'):
self.log = getLogger('pyftdi.spi.gpio')
self._controller = controller
@property
def pins(self) -> int:
"""Report the configured GPIOs as a bitfield.
A true bit represents a GPIO, a false bit a reserved or not
configured pin.
:return: the bitfield of configured GPIO pins.
"""
return self._controller.gpio_pins
@property
def all_pins(self) -> int:
"""Report the addressable GPIOs as a bitfield.
A true bit represents a pin which may be used as a GPIO, a false bit
a reserved pin (for SPI support)
:return: the bitfield of configurable GPIO pins.
"""
return self._controller.gpio_all_pins
@property
def width(self) -> int:
"""Report the FTDI count of addressable pins.
Note that all pins, including reserved SPI ones, are reported.
:return: the count of IO pins (including SPI ones).
"""
return self._controller.width
@property
def direction(self) -> int:
"""Provide the FTDI GPIO direction.self
A true bit represents an output GPIO, a false bit an input GPIO.
:return: the bitfield of direction.
"""
return self._controller.direction
def read(self, with_output: bool = False) -> int:
"""Read GPIO port.
:param with_output: set to unmask output pins
:return: the GPIO port pins as a bitfield
"""
return self._controller.read_gpio(with_output)
def write(self, value: int) -> None:
"""Write GPIO port.
:param value: the GPIO port pins as a bitfield
"""
return self._controller.write_gpio(value)
def set_direction(self, pins: int, direction: int) -> None:
"""Change the direction of the GPIO pins.
:param pins: which GPIO pins should be reconfigured
:param direction: direction bitfield (high level for output)
"""
self._controller.set_gpio_direction(pins, direction)
class SpiController:
"""SPI master.
:param int cs_count: is the number of /CS lines (one per device to
drive on the SPI bus)
:param turbo: increase throughput over USB bus, but may not be
supported with some specific slaves
"""
SCK_BIT = 0x01
DO_BIT = 0x02
DI_BIT = 0x04
CS_BIT = 0x08
SPI_BITS = DI_BIT | DO_BIT | SCK_BIT
PAYLOAD_MAX_LENGTH = 0xFF00 # 16 bits max (- spare for control)
def __init__(self, cs_count: int = 1, turbo: bool = True):
self.log = getLogger('pyftdi.spi.ctrl')
self._ftdi = Ftdi()
self._lock = Lock()
self._gpio_port = None
self._gpio_dir = 0
self._gpio_mask = 0
self._gpio_low = 0
self._wide_port = False
self._cs_count = cs_count
self._turbo = turbo
self._immediate = bytes((Ftdi.SEND_IMMEDIATE,))
self._frequency = 0.0
self._clock_phase = False
self._cs_bits = 0
self._spi_ports = []
self._spi_dir = 0
self._spi_mask = self.SPI_BITS
def configure(self, url: Union[str, UsbDevice],
**kwargs: Mapping[str, Any]) -> None:
"""Configure the FTDI interface as a SPI master
:param url: FTDI URL string, such as ``ftdi://ftdi:232h/1``
:param kwargs: options to configure the SPI bus
Accepted options:
* ``interface``: when URL is specifed as a USB device, the interface
named argument can be used to select a specific port of the FTDI
device, as an integer starting from 1.
* ``direction`` a bitfield specifying the FTDI GPIO direction,
where high level defines an output, and low level defines an
input. Only useful to setup default IOs at start up, use
:py:class:`SpiGpioPort` to drive GPIOs. Note that pins reserved
for SPI feature take precedence over any this setting.
* ``initial`` a bitfield specifying the initial output value. Only
useful to setup default IOs at start up, use
:py:class:`SpiGpioPort` to drive GPIOs.
* ``frequency`` the SPI bus frequency in Hz. Note that each slave
may reconfigure the SPI bus with a specialized
frequency.
* ``cs_count`` count of chip select signals dedicated to select
SPI slave devices, starting from A*BUS3 pin
* ``turbo`` whether to enable or disable turbo mode
* ``debug`` to increase log verbosity, using MPSSE tracer
"""
# it is better to specify CS and turbo in configure, but the older
# API where these parameters are specified at instanciation has been
# preserved
if 'cs_count' in kwargs:
self._cs_count = int(kwargs['cs_count'])
del kwargs['cs_count']
if not 1 <= self._cs_count <= 5:
raise ValueError(f'Unsupported CS line count: {self._cs_count}')
if 'turbo' in kwargs:
self._turbo = bool(kwargs['turbo'])
del kwargs['turbo']
if 'direction' in kwargs:
io_dir = int(kwargs['direction'])
del kwargs['direction']
else:
io_dir = 0
if 'initial' in kwargs:
io_out = int(kwargs['initial'])
del kwargs['initial']
else:
io_out = 0
if 'interface' in kwargs:
if isinstance(url, str):
raise SpiIOError('url and interface are mutually exclusive')
interface = int(kwargs['interface'])
del kwargs['interface']
else:
interface = 1
with self._lock:
if self._frequency > 0.0:
raise SpiIOError('Already configured')
self._cs_bits = (((SpiController.CS_BIT << self._cs_count) - 1) &
~(SpiController.CS_BIT - 1))
self._spi_ports = [None] * self._cs_count
self._spi_dir = (self._cs_bits |
SpiController.DO_BIT |
SpiController.SCK_BIT)
self._spi_mask = self._cs_bits | self.SPI_BITS
# until the device is open, there is no way to tell if it has a
# wide (16) or narrow port (8). Lower API can deal with any, so
# delay any truncation till the device is actually open
self._set_gpio_direction(16, (~self._spi_mask) & 0xFFFF, io_dir)
kwargs['direction'] = self._spi_dir | self._gpio_dir
kwargs['initial'] = self._cs_bits | (io_out & self._gpio_mask)
if not isinstance(url, str):
self._frequency = self._ftdi.open_mpsse_from_device(
url, interface=interface, **kwargs)
else:
self._frequency = self._ftdi.open_mpsse_from_url(url, **kwargs)
self._ftdi.enable_adaptive_clock(False)
self._wide_port = self._ftdi.has_wide_port
if not self._wide_port:
self._set_gpio_direction(8, io_out & 0xFF, io_dir & 0xFF)
def close(self, freeze: bool = False) -> None:
"""Close the FTDI interface.
:param freeze: if set, FTDI port is not reset to its default
state on close.
"""
with self._lock:
if self._ftdi.is_connected:
self._ftdi.close(freeze)
self._frequency = 0.0
def terminate(self) -> None:
"""Close the FTDI interface.
:note: deprecated API, use close()
"""
self.close()
def get_port(self, cs: int, freq: Optional[float] = None,
mode: int = 0) -> SpiPort:
"""Obtain a SPI port to drive a SPI device selected by Chip Select.
:note: SPI mode 1 and 3 are not officially supported.
:param cs: chip select slot, starting from 0
:param freq: SPI bus frequency for this slave in Hz
:param mode: SPI mode [0, 1, 2, 3]
"""
with self._lock:
if not self._ftdi.is_connected:
raise SpiIOError("FTDI controller not initialized")
if cs >= len(self._spi_ports):
if cs < 5:
# increase cs_count (up to 4) to reserve more /CS channels
raise SpiIOError('/CS pin {cs} not reserved for SPI')
raise SpiIOError(f'No such SPI port: {cs}')
if not self._spi_ports[cs]:
freq = min(freq or self._frequency, self.frequency_max)
hold = freq and (1+int(1E6/freq))
self._spi_ports[cs] = SpiPort(self, cs, cs_hold=hold,
spi_mode=mode)
self._spi_ports[cs].set_frequency(freq)
self._flush()
return self._spi_ports[cs]
def get_gpio(self) -> SpiGpioPort:
"""Retrieve the GPIO port.
:return: GPIO port
"""
with self._lock:
if not self._ftdi.is_connected:
raise SpiIOError("FTDI controller not initialized")
if not self._gpio_port:
self._gpio_port = SpiGpioPort(self)
return self._gpio_port
@property
def ftdi(self) -> Ftdi:
"""Return the Ftdi instance.
:return: the Ftdi instance
"""
return self._ftdi
@property
def configured(self) -> bool:
"""Test whether the device has been properly configured.
:return: True if configured
"""
return self._ftdi.is_connected
@property
def frequency_max(self) -> float:
"""Provides the maximum SPI clock frequency in Hz.
:return: SPI bus clock frequency
"""
return self._ftdi.frequency_max
@property
def frequency(self) -> float:
"""Provides the current SPI clock frequency in Hz.
:return: the SPI bus clock frequency
"""
return self._frequency
@property
def direction(self):
"""Provide the FTDI pin direction
A true bit represents an output pin, a false bit an input pin.
:return: the bitfield of direction.
"""
return self._spi_dir | self._gpio_dir
@property
def channels(self) -> int:
"""Provide the maximum count of slaves.
:return: the count of pins reserved to drive the /CS signal
"""
return self._cs_count
@property
def active_channels(self) -> Set[int]:
"""Provide the set of configured slaves /CS.
:return: Set of /CS, one for each configured slaves
"""
return {port[0] for port in enumerate(self._spi_ports) if port[1]}
@property
def gpio_pins(self):
"""Report the configured GPIOs as a bitfield.
A true bit represents a GPIO, a false bit a reserved or not
configured pin.
:return: the bitfield of configured GPIO pins.
"""
with self._lock:
return self._gpio_mask
@property
def gpio_all_pins(self):
"""Report the addressable GPIOs as a bitfield.
A true bit represents a pin which may be used as a GPIO, a false bit
a reserved pin (for SPI support)
:return: the bitfield of configurable GPIO pins.
"""
mask = (1 << self.width) - 1
with self._lock:
return mask & ~self._spi_mask
@property
def width(self):
"""Report the FTDI count of addressable pins.
:return: the count of IO pins (including SPI ones).
"""
return 16 if self._wide_port else 8
@property
def is_inverted_cpha_supported(self) -> bool:
"""Report whether it is possible to supported CPHA=1.
:return: inverted CPHA supported (with a kludge)
"""
return self._ftdi.is_H_series
def exchange(self, frequency: float,
out: Union[bytes, bytearray, Iterable[int]], readlen: int,
cs_prolog: Optional[bytes] = None,
cs_epilog: Optional[bytes] = None,
cpol: bool = False, cpha: bool = False,
duplex: bool = False, droptail: int = 0) -> bytes:
"""Perform an exchange or a transaction with the SPI slave
:param out: data to send to the SPI slave, may be empty to read out
data from the slave with no write.
:param readlen: count of bytes to read out from the slave,
may be zero to only write to the slave,
:param cs_prolog: the prolog MPSSE command sequence to execute
before the actual exchange.
:param cs_epilog: the epilog MPSSE command sequence to execute
after the actual exchange.
:param cpol: SPI clock polarity, derived from the SPI mode
:param cpol: SPI clock phase, derived from the SPI mode
:param duplex: perform a full-duplex exchange (vs. half-duplex),
i.e. bits are clocked in and out at once or
in a write-then-read manner.
:param droptail: ignore up to 7 last bits (for non-byte sized SPI
accesses)
:return: bytes containing the data read out from the slave, if any
"""
if not 0 <= droptail <= 7:
raise ValueError('Invalid skip bit count')
if duplex:
if readlen > len(out):
tmp = bytearray(out)
tmp.extend([0] * (readlen - len(out)))
out = tmp
elif not readlen:
readlen = len(out)
with self._lock:
if duplex:
data = self._exchange_full_duplex(frequency, out,
cs_prolog, cs_epilog,
cpol, cpha, droptail)
return data[:readlen]
return self._exchange_half_duplex(frequency, out, readlen,
cs_prolog, cs_epilog,
cpol, cpha, droptail)
def force_control(self, frequency: float, sequence: bytes) -> None:
"""Execution an arbitrary SPI control bit sequence.
Use with extreme care, as it may lead to unexpected results. Regular
usage of SPI does not require to invoke this API.
:param sequence: the bit sequence to execute.
"""
with self._lock:
self._force(frequency, sequence)
def flush(self) -> None:
"""Flush the HW FIFOs.
"""
with self._lock:
self._flush()
def read_gpio(self, with_output: bool = False) -> int:
"""Read GPIO port
:param with_output: set to unmask output pins
:return: the GPIO port pins as a bitfield
"""
with self._lock:
data = self._read_raw(self._wide_port)
value = data & self._gpio_mask
if not with_output:
value &= ~self._gpio_dir
return value
def write_gpio(self, value: int) -> None:
"""Write GPIO port
:param value: the GPIO port pins as a bitfield
"""
with self._lock:
if (value & self._gpio_dir) != value:
raise SpiIOError(f'No such GPO pins: '
f'{self._gpio_dir:04x}/{value:04x}')
# perform read-modify-write
use_high = self._wide_port and (self.direction & 0xff00)
data = self._read_raw(use_high)
data &= ~self._gpio_mask
data |= value
self._write_raw(data, use_high)
self._gpio_low = data & 0xFF & ~self._spi_mask
def set_gpio_direction(self, pins: int, direction: int) -> None:
"""Change the direction of the GPIO pins
:param pins: which GPIO pins should be reconfigured
:param direction: direction bitfield (on for output)
"""
with self._lock:
self._set_gpio_direction(16 if self._wide_port else 8,
pins, direction)
def _set_gpio_direction(self, width: int, pins: int,
direction: int) -> None:
if pins & self._spi_mask:
raise SpiIOError('Cannot access SPI pins as GPIO')
gpio_mask = (1 << width) - 1
gpio_mask &= ~self._spi_mask
if (pins & gpio_mask) != pins:
raise SpiIOError('No such GPIO pin(s)')
self._gpio_dir &= ~pins
self._gpio_dir |= (pins & direction)
self._gpio_mask = gpio_mask & pins
def _read_raw(self, read_high: bool) -> int:
if not self._ftdi.is_connected:
raise SpiIOError("FTDI controller not initialized")
if read_high:
cmd = bytes([Ftdi.GET_BITS_LOW,
Ftdi.GET_BITS_HIGH,
Ftdi.SEND_IMMEDIATE])
fmt = '<H'
else:
cmd = bytes([Ftdi.GET_BITS_LOW,
Ftdi.SEND_IMMEDIATE])
fmt = 'B'
self._ftdi.write_data(cmd)
size = scalc(fmt)
data = self._ftdi.read_data_bytes(size, 4)
if len(data) != size:
raise SpiIOError('Cannot read GPIO')
value, = sunpack(fmt, data)
return value
def _write_raw(self, data: int, write_high: bool) -> None:
if not self._ftdi.is_connected:
raise SpiIOError("FTDI controller not initialized")
direction = self.direction
low_data = data & 0xFF
low_dir = direction & 0xFF
if write_high:
high_data = (data >> 8) & 0xFF
high_dir = (direction >> 8) & 0xFF
cmd = bytes([Ftdi.SET_BITS_LOW, low_data, low_dir,
Ftdi.SET_BITS_HIGH, high_data, high_dir])
else:
cmd = bytes([Ftdi.SET_BITS_LOW, low_data, low_dir])
self._ftdi.write_data(cmd)
def _force(self, frequency: float, sequence: bytes):
if not self._ftdi.is_connected:
raise SpiIOError("FTDI controller not initialized")
if len(sequence) > SpiController.PAYLOAD_MAX_LENGTH:
raise SpiIOError("Output payload is too large")
if self._frequency != frequency:
self._ftdi.set_frequency(frequency)
# store the requested value, not the actual one (best effort),
# to avoid setting unavailable values on each call.
self._frequency = frequency
cmd = bytearray()
direction = self.direction & 0xFF
for ctrl in sequence:
ctrl &= self._spi_mask
ctrl |= self._gpio_low
cmd.extend((Ftdi.SET_BITS_LOW, ctrl, direction))
self._ftdi.write_data(cmd)
def _exchange_half_duplex(self, frequency: float,
out: Union[bytes, bytearray, Iterable[int]],
readlen: int, cs_prolog: bytes, cs_epilog: bytes,
cpol: bool, cpha: bool,
droptail: int) -> bytes:
if not self._ftdi.is_connected:
raise SpiIOError("FTDI controller not initialized")
if len(out) > SpiController.PAYLOAD_MAX_LENGTH:
raise SpiIOError("Output payload is too large")
if readlen > SpiController.PAYLOAD_MAX_LENGTH:
raise SpiIOError("Input payload is too large")
if cpha:
# to enable CPHA, we need to use a workaround with FTDI device,
# that is enable 3-phase clocking (which is usually dedicated to
# I2C support). This mode use use 3 clock period instead of 2,
# which implies the FTDI frequency should be fixed to match the
# requested one.
frequency = (3*frequency)//2
if self._frequency != frequency:
self._ftdi.set_frequency(frequency)
# store the requested value, not the actual one (best effort),
# to avoid setting unavailable values on each call.
self._frequency = frequency
direction = self.direction & 0xFF # low bits only
cmd = bytearray()
for ctrl in cs_prolog or []:
ctrl &= self._spi_mask
ctrl |= self._gpio_low
cmd.extend((Ftdi.SET_BITS_LOW, ctrl, direction))
epilog = bytearray()
if cs_epilog:
for ctrl in cs_epilog:
ctrl &= self._spi_mask
ctrl |= self._gpio_low
epilog.extend((Ftdi.SET_BITS_LOW, ctrl, direction))
# Restore idle state
cs_high = [Ftdi.SET_BITS_LOW, self._cs_bits | self._gpio_low,
direction]
if not self._turbo:
cs_high.append(Ftdi.SEND_IMMEDIATE)
epilog.extend(cs_high)
writelen = len(out)
if self._clock_phase != cpha:
self._ftdi.enable_3phase_clock(cpha)
self._clock_phase = cpha
if writelen:
if not droptail:
wcmd = (Ftdi.WRITE_BYTES_NVE_MSB if not cpol else
Ftdi.WRITE_BYTES_PVE_MSB)
write_cmd = spack('<BH', wcmd, writelen-1)
cmd.extend(write_cmd)
cmd.extend(out)
else:
bytelen = writelen-1
if bytelen:
wcmd = (Ftdi.WRITE_BYTES_NVE_MSB if not cpol else
Ftdi.WRITE_BYTES_PVE_MSB)
write_cmd = spack('<BH', wcmd, bytelen-1)
cmd.extend(write_cmd)
cmd.extend(out[:-1])
wcmd = (Ftdi.WRITE_BITS_NVE_MSB if not cpol else
Ftdi.WRITE_BITS_PVE_MSB)
write_cmd = spack('<BBB', wcmd, 7-droptail, out[-1])
cmd.extend(write_cmd)
if readlen:
if not droptail:
rcmd = (Ftdi.READ_BYTES_NVE_MSB if not cpol else
Ftdi.READ_BYTES_PVE_MSB)
read_cmd = spack('<BH', rcmd, readlen-1)
cmd.extend(read_cmd)
else:
bytelen = readlen-1
if bytelen:
rcmd = (Ftdi.READ_BYTES_NVE_MSB if not cpol else
Ftdi.READ_BYTES_PVE_MSB)
read_cmd = spack('<BH', rcmd, bytelen-1)
cmd.extend(read_cmd)
rcmd = (Ftdi.READ_BITS_NVE_MSB if not cpol else
Ftdi.READ_BITS_PVE_MSB)
read_cmd = spack('<BB', rcmd, 7-droptail)
cmd.extend(read_cmd)
cmd.extend(self._immediate)
if self._turbo:
if epilog:
cmd.extend(epilog)
self._ftdi.write_data(cmd)
else:
self._ftdi.write_data(cmd)
if epilog:
self._ftdi.write_data(epilog)
# USB read cycle may occur before the FTDI device has actually
# sent the data, so try to read more than once if no data is
# actually received
data = self._ftdi.read_data_bytes(readlen, 4)
if droptail:
data[-1] = 0xff & (data[-1] << droptail)
else:
if writelen:
if self._turbo:
if epilog:
cmd.extend(epilog)
self._ftdi.write_data(cmd)
else:
self._ftdi.write_data(cmd)
if epilog:
self._ftdi.write_data(epilog)
data = bytearray()
return data
def _exchange_full_duplex(self, frequency: float,
out: Union[bytes, bytearray, Iterable[int]],
cs_prolog: bytes, cs_epilog: bytes,
cpol: bool, cpha: bool,
droptail: int) -> bytes:
if not self._ftdi.is_connected:
raise SpiIOError("FTDI controller not initialized")
if len(out) > SpiController.PAYLOAD_MAX_LENGTH:
raise SpiIOError("Output payload is too large")
if cpha:
# to enable CPHA, we need to use a workaround with FTDI device,
# that is enable 3-phase clocking (which is usually dedicated to
# I2C support). This mode use use 3 clock period instead of 2,
# which implies the FTDI frequency should be fixed to match the
# requested one.
frequency = (3*frequency)//2
if self._frequency != frequency:
self._ftdi.set_frequency(frequency)
# store the requested value, not the actual one (best effort),
# to avoid setting unavailable values on each call.
self._frequency = frequency
direction = self.direction & 0xFF # low bits only
cmd = bytearray()
for ctrl in cs_prolog or []:
ctrl &= self._spi_mask
ctrl |= self._gpio_low
cmd.extend((Ftdi.SET_BITS_LOW, ctrl, direction))
epilog = bytearray()
if cs_epilog:
for ctrl in cs_epilog:
ctrl &= self._spi_mask
ctrl |= self._gpio_low
epilog.extend((Ftdi.SET_BITS_LOW, ctrl, direction))
# Restore idle state
cs_high = [Ftdi.SET_BITS_LOW, self._cs_bits | self._gpio_low,
direction]
if not self._turbo:
cs_high.append(Ftdi.SEND_IMMEDIATE)
epilog.extend(cs_high)
exlen = len(out)
if self._clock_phase != cpha:
self._ftdi.enable_3phase_clock(cpha)
self._clock_phase = cpha
if not droptail:
wcmd = (Ftdi.RW_BYTES_PVE_NVE_MSB if not cpol else
Ftdi.RW_BYTES_NVE_PVE_MSB)
write_cmd = spack('<BH', wcmd, exlen-1)
cmd.extend(write_cmd)
cmd.extend(out)
else:
bytelen = exlen-1
if bytelen:
wcmd = (Ftdi.RW_BYTES_PVE_NVE_MSB if not cpol else
Ftdi.RW_BYTES_NVE_PVE_MSB)
write_cmd = spack('<BH', wcmd, bytelen-1)
cmd.extend(write_cmd)
cmd.extend(out[:-1])
wcmd = (Ftdi.RW_BITS_PVE_NVE_MSB if not cpol else
Ftdi.RW_BITS_NVE_PVE_MSB)
write_cmd = spack('<BBB', wcmd, 7-droptail, out[-1])
cmd.extend(write_cmd)
cmd.extend(self._immediate)
if self._turbo:
if epilog:
cmd.extend(epilog)
self._ftdi.write_data(cmd)
else:
self._ftdi.write_data(cmd)
if epilog:
self._ftdi.write_data(epilog)
# USB read cycle may occur before the FTDI device has actually
# sent the data, so try to read more than once if no data is
# actually received
data = self._ftdi.read_data_bytes(exlen, 4)
if droptail:
data[-1] = 0xff & (data[-1] << droptail)
return data
def _flush(self) -> None:
self._ftdi.write_data(self._immediate)
self._ftdi.purge_buffers()