abschlussarbeit/_trash/lib/python3.11/site-packages/pyftdi/gpio.py

532 lines
21 KiB
Python

# Copyright (c) 2014-2024, Emmanuel Blot <emmanuel.blot@free.fr>
# Copyright (c) 2016, Emmanuel Bouaziz <ebouaziz@free.fr>
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""GPIO/BitBang support for PyFdti"""
from struct import calcsize as scalc, unpack as sunpack
from typing import Iterable, Optional, Tuple, Union
from .ftdi import Ftdi, FtdiError
from .misc import is_iterable
class GpioException(FtdiError):
"""Base class for GPIO errors.
"""
class GpioPort:
"""Duck-type GPIO port for GPIO all controllers.
"""
class GpioBaseController(GpioPort):
"""GPIO controller for an FTDI port, in bit-bang legacy mode.
GPIO bit-bang mode is limited to the 8 lower pins of each GPIO port.
"""
def __init__(self):
self._ftdi = Ftdi()
self._direction = 0
self._width = 0
self._mask = 0
self._frequency = 0
@property
def ftdi(self) -> Ftdi:
"""Return the Ftdi instance.
:return: the Ftdi instance
"""
return self._ftdi
@property
def is_connected(self) -> bool:
"""Reports whether a connection exists with the FTDI interface.
:return: the FTDI slave connection status
"""
return self._ftdi.is_connected
def configure(self, url: str, direction: int = 0,
**kwargs) -> int:
"""Open a new interface to the specified FTDI device in bitbang mode.
:param str url: a FTDI URL selector
:param int direction: a bitfield specifying the FTDI GPIO direction,
where high level defines an output, and low level defines an
input
:param initial: optional initial GPIO output value
:param pace: optional pace in GPIO sample per second
:return: actual bitbang pace in sample per second
"""
if self.is_connected:
raise FtdiError('Already connected')
kwargs = dict(kwargs)
frequency = kwargs.get('frequency', None)
if frequency is None:
frequency = kwargs.get('baudrate', None)
for k in ('direction', 'sync', 'frequency', 'baudrate'):
if k in kwargs:
del kwargs[k]
self._frequency = self._configure(url, direction, frequency, **kwargs)
def close(self, freeze: bool = False) -> None:
"""Close the GPIO port.
:param freeze: if set, FTDI port is not reset to its default
state on close. This means the port is left with
its current configuration and output signals.
This feature should not be used except for very
specific needs.
"""
if self._ftdi.is_connected:
self._ftdi.close(freeze)
def get_gpio(self) -> GpioPort:
"""Retrieve the GPIO port.
This method is mostly useless, it is a wrapper to duck type other
GPIO APIs (I2C, SPI, ...)
:return: GPIO port
"""
return self
@property
def direction(self) -> int:
"""Reports the GPIO direction.
:return: a bitfield specifying the FTDI GPIO direction, where high
level reports an output pin, and low level reports an input pin
"""
return self._direction
@property
def pins(self) -> int:
"""Report the configured GPIOs as a bitfield.
A true bit represents a GPIO, a false bit a reserved or not
configured pin.
:return: always 0xFF for GpioController instance.
"""
return self._mask
@property
def all_pins(self) -> int:
"""Report the addressable GPIOs as a bitfield.
A true bit represents a pin which may be used as a GPIO, a false bit
a reserved pin
:return: always 0xFF for GpioController instance.
"""
return self._mask
@property
def width(self) -> int:
"""Report the FTDI count of addressable pins.
:return: the width of the GPIO port.
"""
return self._width
@property
def frequency(self) -> float:
"""Return the pace at which sequence of GPIO samples are read
and written.
"""
return self._frequency
def set_frequency(self, frequency: Union[int, float]) -> None:
"""Set the frequency at which sequence of GPIO samples are read
and written.
:param frequency: the new frequency, in GPIO samples per second
"""
raise NotImplementedError('GpioBaseController cannot be instanciated')
def set_direction(self, pins: int, direction: int) -> None:
"""Update the GPIO pin direction.
:param pins: which GPIO pins should be reconfigured
:param direction: a bitfield of GPIO pins. Each bit represent a
GPIO pin, where a high level sets the pin as output and a low
level sets the pin as input/high-Z.
"""
if direction > self._mask:
raise GpioException("Invalid direction mask")
self._direction &= ~pins
self._direction |= (pins & direction)
self._update_direction()
def _configure(self, url: str, direction: int,
frequency: Union[int, float, None] = None, **kwargs) -> int:
raise NotImplementedError('GpioBaseController cannot be instanciated')
def _update_direction(self) -> None:
raise NotImplementedError('Missing implementation')
class GpioAsyncController(GpioBaseController):
"""GPIO controller for an FTDI port, in bit-bang asynchronous mode.
GPIO accessible pins are limited to the 8 lower pins of each GPIO port.
Asynchronous bitbang output are updated on write request using the
:py:meth:`write` method, clocked at the selected frequency.
Asynchronous bitbang input are sampled at the same rate, as soon as the
controller is initialized. The GPIO input samples fill in the FTDI HW
buffer until it is filled up, in which case sampling stops until the
GPIO samples are read out with the :py:meth:`read` method. It may be
therefore hard to use, except if peek mode is selected,
see :py:meth:`read` for details.
Note that FTDI internal clock divider cannot generate any arbitrary
frequency, so the closest frequency to the request one that can be
generated is selected. The actual :py:attr:`frequency` may be tested to
check if it matches the board requirements.
"""
def read(self, readlen: int = 1, peek: Optional[bool] = None,
noflush: bool = False) -> Union[int, bytes]:
"""Read the GPIO input pin electrical level.
:param readlen: how many GPIO samples to retrieve. Each sample is
8-bit wide.
:param peek: whether to peek/sample the instantaneous GPIO pin
values from port, or to use the HW FIFO. The HW FIFO is
continously filled up with GPIO sample at the current
frequency, until it is full - samples are no longer
collected until the FIFO is read. This means than
non-peek mode read "old" values, with no way to know at
which time they have been sampled. PyFtdi ensures that
old sampled values before the completion of a previous
GPIO write are discarded. When peek mode is selected,
readlen should be 1.
:param noflush: whether to disable the RX buffer flush before
reading out data
:return: a 8-bit wide integer if peek mode is used, or
a bytes buffer otherwise.
"""
if not self.is_connected:
raise GpioException('Not connected')
if peek is None and readlen == 1:
# compatibility with legacy API
peek = True
if peek:
if readlen != 1:
raise ValueError('Invalid read length with peek mode')
return self._ftdi.read_pins()
# in asynchronous bitbang mode, the FTDI-to-host FIFO is filled in
# continuously once this mode is activated. This means there is no
# way to trigger the exact moment where the buffer is filled in, nor
# to define the write pointer in the buffer. Reading out this buffer
# at any time is likely to contain a mix of old and new values.
# Anyway, flushing the FTDI-to-host buffer seems to be a proper
# to get in sync with the buffer.
if noflush:
return self._ftdi.read_data(readlen)
loop = 10000
while loop:
loop -= 1
# do not attempt to do anything till the FTDI HW buffer has been
# emptied, i.e. previous write calls have been handled.
status = self._ftdi.poll_modem_status()
if status & Ftdi.MODEM_TEMT:
# TX buffer is now empty, any "write" GPIO rquest has completed
# so start reading GPIO samples from this very moment.
break
else:
# sanity check to avoid endless loop on errors
raise FtdiError('FTDI TX buffer error')
# now flush the FTDI-to-host buffer as it keeps being filled with data
self._ftdi.purge_tx_buffer()
# finally perform the actual read out
return self._ftdi.read_data(readlen)
def write(self, out: Union[bytes, bytearray, int]) -> None:
"""Set the GPIO output pin electrical level, or output a sequence of
bytes @ constant frequency to GPIO output pins.
:param out: a bitfield of GPIO pins, or a sequence of them
"""
if not self.is_connected:
raise GpioException('Not connected')
if isinstance(out, (bytes, bytearray)):
pass
else:
if isinstance(out, int):
out = bytes([out])
else:
if not is_iterable(out):
raise TypeError('Invalid output value')
for val in out:
if val > self._mask:
raise ValueError('Invalid output value')
out = bytes(out)
self._ftdi.write_data(out)
def set_frequency(self, frequency: Union[int, float]) -> None:
"""Set the frequency at which sequence of GPIO samples are read
and written.
note: FTDI may update its clock register before it has emptied its
internal buffer. If the current frequency is "low", some
yet-to-output bytes may end up being clocked at the new frequency.
Unfortunately, it seems there is no way to wait for the internal
buffer to be emptied out. They can be flushed (i.e. discarded), but
not synchronized :-(
PyFtdi client should add "some" short delay to ensure a previous,
long write request has been fully output @ low freq before changing
the frequency.
Beware that only some exact frequencies can be generated. Contrary
to the UART mode, an approximate frequency is always accepted for
GPIO/bitbang mode. To get the actual frequency, and optionally abort
if it is out-of-spec, use :py:meth:`frequency` property.
:param frequency: the new frequency, in GPIO samples per second
"""
self._frequency = float(self._ftdi.set_baudrate(int(frequency), False))
def _configure(self, url: str, direction: int,
frequency: Union[int, float, None] = None, **kwargs) -> int:
if 'initial' in kwargs:
initial = kwargs['initial']
del kwargs['initial']
else:
initial = None
if 'debug' in kwargs:
# debug is not implemented
del kwargs['debug']
baudrate = int(frequency) if frequency is not None else None
baudrate = self._ftdi.open_bitbang_from_url(url,
direction=direction,
sync=False,
baudrate=baudrate,
**kwargs)
self._width = 8
self._mask = (1 << self._width) - 1
self._direction = direction & self._mask
if initial is not None:
initial &= self._mask
self.write(initial)
return float(baudrate)
def _update_direction(self) -> None:
self._ftdi.set_bitmode(self._direction, Ftdi.BitMode.BITBANG)
# old API names
open_from_url = GpioBaseController.configure
read_port = read
write_port = write
# old API compatibility
GpioController = GpioAsyncController
class GpioSyncController(GpioBaseController):
"""GPIO controller for an FTDI port, in bit-bang synchronous mode.
GPIO accessible pins are limited to the 8 lower pins of each GPIO port.
Synchronous bitbang input and output are synchronized. Eveery time GPIO
output is updated, the GPIO input is sampled and buffered.
Update and sampling are clocked at the selected frequency. The GPIO
samples are transfer in both direction with the :py:meth:`exchange`
method, which therefore always returns as many input samples as output
bytes.
Note that FTDI internal clock divider cannot generate any arbitrary
frequency, so the closest frequency to the request one that can be
generated is selected. The actual :py:attr:`frequency` may be tested to
check if it matches the board requirements.
"""
def exchange(self, out: Union[bytes, bytearray]) -> bytes:
"""Set the GPIO output pin electrical level, or output a sequence of
bytes @ constant frequency to GPIO output pins.
:param out: the byte buffer to output as GPIO
:return: a byte buffer of the same length as out buffer.
"""
if not self.is_connected:
raise GpioException('Not connected')
if isinstance(out, (bytes, bytearray)):
pass
else:
if isinstance(out, int):
out = bytes([out])
elif not is_iterable(out):
raise TypeError('Invalid output value')
for val in out:
if val > self._mask:
raise GpioException("Invalid value")
self._ftdi.write_data(out)
data = self._ftdi.read_data_bytes(len(out), 4)
return data
def set_frequency(self, frequency: Union[int, float]) -> None:
"""Set the frequency at which sequence of GPIO samples are read
and written.
:param frequency: the new frequency, in GPIO samples per second
"""
self._frequency = float(self._ftdi.set_baudrate(int(frequency), False))
def _configure(self, url: str, direction: int,
frequency: Union[int, float, None] = None, **kwargs):
if 'initial' in kwargs:
initial = kwargs['initial']
del kwargs['initial']
else:
initial = None
if 'debug' in kwargs:
# debug is not implemented
del kwargs['debug']
baudrate = int(frequency) if frequency is not None else None
baudrate = self._ftdi.open_bitbang_from_url(url,
direction=direction,
sync=True,
baudrate=baudrate,
**kwargs)
self._width = 8
self._mask = (1 << self._width) - 1
self._direction = direction & self._mask
if initial is not None:
initial &= self._mask
self.exchange(initial)
return float(baudrate)
def _update_direction(self) -> None:
self._ftdi.set_bitmode(self._direction, Ftdi.BitMode.SYNCBB)
class GpioMpsseController(GpioBaseController):
"""GPIO controller for an FTDI port, in MPSSE mode.
All GPIO pins are reachable, but MPSSE mode is slower than other modes.
Beware that LSBs (b0..b7) and MSBs (b8..b15) are accessed with two
subsequence commands, so a slight delay may occur when sampling or
changing both groups at once. In other word, it is not possible to
atomically read to / write from LSBs and MSBs. This might be worth
checking the board design if atomic access to several lines is required.
"""
MPSSE_PAYLOAD_MAX_LENGTH = 0xFF00 # 16 bits max (- spare for control)
def read(self, readlen: int = 1, peek: Optional[bool] = None) \
-> Union[int, bytes, Tuple[int]]:
"""Read the GPIO input pin electrical level.
:param readlen: how many GPIO samples to retrieve. Each sample if
:py:meth:`width` bit wide.
:param peek: whether to peak current value from port, or to use
MPSSE stream and HW FIFO. When peek mode is selected,
readlen should be 1. It is not available with wide
ports if some of the MSB pins are configured as input
:return: a :py:meth:`width` bit wide integer if direct mode is used,
a bytes buffer if :py:meth:`width` is a byte,
a list of integer otherwise (MPSSE mode only).
"""
if not self.is_connected:
raise GpioException('Not connected')
if peek:
if readlen != 1:
raise ValueError('Invalid read length with direct mode')
if self._width > 8:
if (0xFFFF & ~self._direction) >> 8:
raise ValueError('Peek mode not available with selected '
'input config')
if peek:
return self._ftdi.read_pins()
return self._read_mpsse(readlen)
def write(self, out: Union[bytes, bytearray, Iterable[int], int]) -> None:
"""Set the GPIO output pin electrical level, or output a sequence of
bytes @ constant frequency to GPIO output pins.
:param out: a bitfield of GPIO pins, or a sequence of them
"""
if not self.is_connected:
raise GpioException('Not connected')
if isinstance(out, (bytes, bytearray)):
pass
else:
if isinstance(out, int):
out = [out]
elif not is_iterable(out):
raise TypeError('Invalid output value')
for val in out:
if val > self._mask:
raise GpioException("Invalid value")
self._write_mpsse(out)
def set_frequency(self, frequency: Union[int, float]) -> None:
if not self.is_connected:
raise GpioException('Not connected')
self._frequency = self._ftdi.set_frequency(float(frequency))
def _update_direction(self) -> None:
# nothing to do in MPSSE mode, as direction is updated with each
# GPIO command
pass
def _configure(self, url: str, direction: int,
frequency: Union[int, float, None] = None, **kwargs):
frequency = self._ftdi.open_mpsse_from_url(url,
direction=direction,
frequency=frequency,
**kwargs)
self._width = self._ftdi.port_width
self._mask = (1 << self._width) - 1
self._direction = direction & self._mask
return frequency
def _read_mpsse(self, count: int) -> Tuple[int]:
if self._width > 8:
cmd = bytearray([Ftdi.GET_BITS_LOW, Ftdi.GET_BITS_HIGH] * count)
fmt = f'<{count}H'
else:
cmd = bytearray([Ftdi.GET_BITS_LOW] * count)
fmt = None
cmd.append(Ftdi.SEND_IMMEDIATE)
if len(cmd) > self.MPSSE_PAYLOAD_MAX_LENGTH:
raise ValueError('Too many samples')
self._ftdi.write_data(cmd)
size = scalc(fmt) if fmt else count
data = self._ftdi.read_data_bytes(size, 4)
if len(data) != size:
raise FtdiError(f'Cannot read GPIO, recv {len(data)} '
f'out of {size} bytes')
if fmt:
return sunpack(fmt, data)
return data
def _write_mpsse(self,
out: Union[bytes, bytearray, Iterable[int], int]) -> None:
cmd = []
low_dir = self._direction & 0xFF
if self._width > 8:
high_dir = (self._direction >> 8) & 0xFF
for data in out:
low_data = data & 0xFF
high_data = (data >> 8) & 0xFF
cmd.extend([Ftdi.SET_BITS_LOW, low_data, low_dir,
Ftdi.SET_BITS_HIGH, high_data, high_dir])
else:
for data in out:
cmd.extend([Ftdi.SET_BITS_LOW, data, low_dir])
self._ftdi.write_data(bytes(cmd))