MikrofonSensor und TemperaturSenor die zwei Python programme funktionieren. mit den jeweiligen 2 json Datein. Beim TemperaturSensor wird im Terminal keine Wertre ausgegeben aber in der json Datei kann man die Temp und Hum sehen.

This commit is contained in:
Chiara 2025-05-28 14:53:44 +02:00
parent 4c654ec969
commit 1751076592
2614 changed files with 349009 additions and 0 deletions

View file

@ -0,0 +1,102 @@
# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""Generic Linux I2C class using PureIO's smbus class"""
import warnings
from Adafruit_PureIO import smbus
class I2C:
"""
I2C class
Baudrate has no effect on Linux systems. The argument is only there for compatibility.
"""
MASTER = 0
SLAVE = 1
_baudrate = None
_mode = None
_i2c_bus = None
# pylint: disable=unused-argument
def __init__(self, bus_num, mode=MASTER, baudrate=None):
if mode != self.MASTER:
raise NotImplementedError("Only I2C Master supported!")
_mode = self.MASTER
if baudrate is not None:
warnings.warn(
"I2C frequency is not settable in python, ignoring!", RuntimeWarning
)
try:
self._i2c_bus = smbus.SMBus(bus_num)
except FileNotFoundError:
raise RuntimeError(
"I2C Bus #%d not found, check if enabled in config!" % bus_num
) from RuntimeError
# pylint: enable=unused-argument
def scan(self):
"""Try to read a byte from each address, if you get an OSError
it means the device isnt there"""
found = []
for addr in range(0, 0x80):
try:
self._i2c_bus.read_byte(addr)
except OSError:
continue
found.append(addr)
return found
# pylint: disable=unused-argument
def writeto(self, address, buffer, *, start=0, end=None, stop=True):
"""Write data from the buffer to an address"""
if end is None:
end = len(buffer)
self._i2c_bus.write_bytes(address, buffer[start:end])
def readfrom_into(self, address, buffer, *, start=0, end=None, stop=True):
"""Read data from an address and into the buffer"""
if end is None:
end = len(buffer)
readin = self._i2c_bus.read_bytes(address, end - start)
for i in range(end - start):
buffer[i + start] = readin[i]
# pylint: enable=unused-argument
def writeto_then_readfrom(
self,
address,
buffer_out,
buffer_in,
*,
out_start=0,
out_end=None,
in_start=0,
in_end=None,
stop=False,
):
"""Write data from buffer_out to an address and then
read data from an address and into buffer_in
"""
if out_end is None:
out_end = len(buffer_out)
if in_end is None:
in_end = len(buffer_in)
if stop:
# To generate a stop in linux, do in two transactions
self.writeto(address, buffer_out, start=out_start, end=out_end, stop=True)
self.readfrom_into(address, buffer_in, start=in_start, end=in_end)
else:
# To generate without a stop, do in one block transaction
readin = self._i2c_bus.read_i2c_block_data(
address, buffer_out[out_start:out_end], in_end - in_start
)
for i in range(in_end - in_start):
buffer_in[i + in_start] = readin[i]

View file

@ -0,0 +1,130 @@
# SPDX-FileCopyrightText: 2025 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""A Pin class for use with lgpio."""
from pathlib import Path
import lgpio
def _get_gpiochip():
"""
Determines the handle of the GPIO chip device to access.
iterate through sysfs to find a GPIO chip device with a driver known to be
used for userspace GPIO access.
"""
for dev in Path("/sys/bus/gpio/devices").glob("gpiochip*"):
if Path(dev / "of_node/compatible").is_file():
drivers = set((dev / "of_node/compatible").read_text().split("\0"))
# check if driver names are intended for userspace control
if drivers & {
"raspberrypi,rp1-gpio",
"raspberrypi,bcm2835-gpio",
"raspberrypi,bcm2711-gpio",
}:
return lgpio.gpiochip_open(int(dev.name[-1]))
# return chip0 as a fallback
return lgpio.gpiochip_open(0)
CHIP = _get_gpiochip()
class Pin:
"""Pins dont exist in CPython so...lets make our own!"""
LOW = 0
HIGH = 1
OFF = LOW
ON = HIGH
# values of lg mode constants
PULL_NONE = 0x80
PULL_UP = 0x20
PULL_DOWN = 0x40
ACTIVE_LOW = 0x02
# drive mode lg constants
OPEN_DRAIN = 0x04
IN = 0x0100
OUT = 0x0200
# LG mode constants
_LG_ALERT = 0x400
_LG_GROUP = 0x800
_LG_MODES = IN | OUT | _LG_ALERT | _LG_GROUP
_LG_PULLS = PULL_NONE | PULL_UP | PULL_NONE | ACTIVE_LOW
_LG_DRIVES = OPEN_DRAIN
id = None
_value = LOW
_mode = IN
# we want exceptions
lgpio.exceptions = True
def __init__(self, bcm_number):
self.id = bcm_number
def __repr__(self):
return str(self.id)
def __eq__(self, other):
return self.id == other
def init(self, mode=IN, pull=None):
"""Initialize the Pin"""
if mode is not None:
if mode == Pin.IN:
self._mode = Pin.IN
self._set_gpio_mode_in()
elif mode == self.OUT:
self._mode = Pin.OUT
Pin._check_result(lgpio.gpio_claim_output(CHIP, self.id, Pin.LOW))
else:
raise RuntimeError(f"Invalid mode for pin: {self.id}")
if pull is not None:
if self._mode != Pin.IN:
raise RuntimeError("Can only set pull resistor on input")
if pull in {Pin.PULL_UP, Pin.PULL_DOWN, Pin.PULL_NONE}:
self._set_gpio_mode_in(lflags=pull)
else:
raise RuntimeError(f"Invalid pull for pin: {self.id}")
def value(self, val=None):
"""Set or return the Pin Value"""
if val is not None:
if val == Pin.LOW:
self._value = val
Pin._check_result(lgpio.gpio_write(CHIP, self.id, val))
elif val == Pin.HIGH:
self._value = val
Pin._check_result(lgpio.gpio_write(CHIP, self.id, val))
else:
raise RuntimeError("Invalid value for pin")
return None
return Pin._check_result(lgpio.gpio_read(CHIP, self.id))
@staticmethod
def _check_result(result):
"""
convert any result other than zero to a text message and pass it back
as a runtime exception. Typical usage: use the lgpio call as the
argument.
"""
if result < 0:
raise RuntimeError(lgpio.error_text(result))
return result
def _set_gpio_mode_in(self, lflags=0):
"""
claim a gpio as input, or modify the flags (PULL_UP, PULL_DOWN, ... )
"""
# This gpio_free may seem redundant, but is required when
# changing the line-flags of an already acquired input line
try:
lgpio.gpio_free(CHIP, self.id)
except lgpio.error:
pass
Pin._check_result(lgpio.gpio_claim_input(CHIP, self.id, lFlags=lflags))

View file

@ -0,0 +1,158 @@
# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
""" PWMOut Class for lgpio lg library tx_pwm library """
import lgpio
from adafruit_blinka.microcontroller.generic_linux.lgpio_pin import CHIP
class PWMError(IOError):
"""Base class for PWM errors."""
class PWMOut:
"""Pulse Width Modulation Output Class"""
def __init__(self, pin, *, frequency=500, duty_cycle=0, variable_frequency=False):
if variable_frequency:
print("Variable Frequency is not supported, ignoring...")
self._pin = pin
result = lgpio.gpio_claim_output(CHIP, self._pin.id, lFlags=lgpio.SET_PULL_NONE)
if result < 0:
raise RuntimeError(lgpio.error_text(result))
self._enabled = False
self._deinited = False
self._period = 0
# set frequency
self._frequency = frequency
# set duty
self.duty_cycle = duty_cycle
self.enabled = True
def __del__(self):
self.deinit()
def __enter__(self):
return self
def __exit__(self, _exc_type, _exc_val, _exc_tb):
self.deinit()
def deinit(self):
"""Deinit the PWM."""
if not self._deinited:
if self.enabled:
self._enabled = False # turn off the pwm
self._deinited = True
def _is_deinited(self):
"""raise Value error if the object has been de-inited"""
if self._deinited:
raise ValueError(
"Object has been deinitialize and can no longer "
"be used. Create a new object."
)
@property
def period(self):
"""Get or set the PWM's output period in seconds.
Raises:
PWMError: if an I/O or OS error occurs.
TypeError: if value type is not int or float.
:type: int, float
"""
return 1.0 / self.frequency
@period.setter
def period(self, period):
if not isinstance(period, (int, float)):
raise TypeError("Invalid period type, should be int or float.")
self.frequency = 1.0 / period
@property
def duty_cycle(self):
"""Get or set the PWM's output duty cycle which is the fraction of
each pulse which is high. 16-bit
Raises:
PWMError: if an I/O or OS error occurs.
TypeError: if value type is not int or float.
ValueError: if value is out of bounds of 0.0 to 1.0.
:type: int, float
"""
return int(self._duty_cycle * 65535)
@duty_cycle.setter
def duty_cycle(self, duty_cycle):
if not isinstance(duty_cycle, (int, float)):
raise TypeError("Invalid duty cycle type, should be int or float.")
if not 0 <= duty_cycle <= 65535:
raise ValueError("Invalid duty cycle value, should be between 0 and 65535")
# convert from 16-bit
duty_cycle /= 65535.0
self._duty_cycle = duty_cycle
if self._enabled:
self.enabled = True # turn on with new values
@property
def frequency(self):
"""Get or set the PWM's output frequency in Hertz.
Raises:
PWMError: if an I/O or OS error occurs.
TypeError: if value type is not int or float.
:type: int, float
"""
return self._frequency
@frequency.setter
def frequency(self, frequency):
if not isinstance(frequency, (int, float)):
raise TypeError("Invalid frequency type, should be int or float.")
self._frequency = frequency
if self.enabled:
self.enabled = True # turn on with new values
@property
def enabled(self):
"""Get or set the PWM's output enabled state.
Raises:
PWMError: if an I/O or OS error occurs.
TypeError: if value type is not bool.
:type: bool
"""
return self._enabled
@enabled.setter
def enabled(self, value):
if not isinstance(value, bool):
raise TypeError("Invalid enabled type, should be bool.")
frequency = self._frequency if value else 0
duty_cycle = round(self._duty_cycle * 100)
self._enabled = value
result = lgpio.tx_pwm(CHIP, self._pin.id, frequency, duty_cycle)
if result < 0:
raise RuntimeError(lgpio.error_text(result))
return result
# String representation
def __str__(self):
return (
f"pin {self._pin} (freq={self.frequency:f} Hz, duty_cycle="
f"{self.duty_cycle}({round(self.duty_cycle / 655.35)}%)"
)

View file

@ -0,0 +1,33 @@
# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""A Chip class for use with libgpiod 1.x."""
import gpiod
# pylint: disable=too-many-branches,too-many-statements
class Chip:
"""Abstraction for handling all breaking changes over the lifecycle of gpiod"""
_CONSUMER = "adafruit_blinka"
id: str = None
num_lines: int
def __init__(self, chip_id: str):
self.id = chip_id
if hasattr(gpiod, "Chip"):
self._chip = gpiod.Chip(self.id)
else:
self._chip = gpiod.chip(self.id)
if callable(self._chip.num_lines):
self.num_lines = self._chip.num_lines()
else:
self.num_lines = self.num_lines
def __repr__(self):
return self.id
def __eq__(self, other):
return self.id == other

View file

@ -0,0 +1,29 @@
# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""A Chip class for use with libgpiod 2.x."""
import gpiod
# pylint: disable=too-many-branches,too-many-statements
class Chip:
"""Abstraction for handling all breaking changes over the lifecycle of gpiod"""
_CONSUMER = "adafruit_blinka"
id: str = None
num_lines: int
def __init__(self, chip_id: str):
self.id = chip_id
path = f"/dev/gpiochip{self.id}"
self._chip = gpiod.Chip(path)
info = self._chip.get_info()
self.num_lines = info.num_lines
def __repr__(self):
return self.id
def __eq__(self, other):
return self.id == other

View file

@ -0,0 +1,127 @@
# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""A Pin class for use with libgpiod 1.x."""
import gpiod
# pylint: disable=too-many-branches,too-many-statements
class Pin:
"""Pins dont exist in CPython so...lets make our own!"""
IN = 0
OUT = 1
LOW = 0
HIGH = 1
PULL_NONE = 0
PULL_UP = 1
PULL_DOWN = 2
_CONSUMER = "adafruit_blinka"
id = None
_value = LOW
_mode = IN
def __init__(self, pin_id):
self.id = pin_id
if isinstance(pin_id, tuple):
self._num = int(pin_id[1])
if hasattr(gpiod, "Chip"):
self._chip = gpiod.Chip(str(pin_id[0]), gpiod.Chip.OPEN_BY_NUMBER)
else:
self._chip = gpiod.chip(str(pin_id[0]), gpiod.chip.OPEN_BY_NUMBER)
else:
self._num = int(pin_id)
if hasattr(gpiod, "Chip"):
self._chip = gpiod.Chip("gpiochip0", gpiod.Chip.OPEN_BY_NAME)
else:
self._chip = gpiod.chip("gpiochip0", gpiod.chip.OPEN_BY_NAME)
self._line = None
def __repr__(self):
return str(self.id)
def __eq__(self, other):
return self.id == other
def init(self, mode=IN, pull=None):
"""Initialize the Pin"""
if not self._line:
self._line = self._chip.get_line(int(self._num))
# print("init line: ", self.id, self._line)
if mode is not None:
if mode == self.IN:
flags = 0
self._line.release()
if pull is not None:
if pull == self.PULL_UP:
if hasattr(gpiod, "LINE_REQ_FLAG_BIAS_PULL_UP"):
flags |= gpiod.LINE_REQ_FLAG_BIAS_PULL_UP
else:
raise NotImplementedError(
"Internal pullups not supported in this version of libgpiod, "
"use physical resistor instead!"
)
elif pull == self.PULL_DOWN:
if hasattr(gpiod, "line") and hasattr(
gpiod, "LINE_REQ_FLAG_BIAS_PULL_DOWN"
):
flags |= gpiod.LINE_REQ_FLAG_BIAS_PULL_DOWN
else:
raise NotImplementedError(
"Internal pulldowns not supported in this version of libgpiod, "
"use physical resistor instead!"
)
elif pull == self.PULL_NONE:
if hasattr(gpiod, "line") and hasattr(
gpiod, "LINE_REQ_FLAG_BIAS_DISABLE"
):
flags |= gpiod.LINE_REQ_FLAG_BIAS_DISABLE
else:
raise NotImplementedError(
"Internal pulldowns not supported in this version of libgpiod, "
"use physical resistor instead!"
)
else:
raise RuntimeError(f"Invalid pull for pin: {self.id}")
self._mode = self.IN
self._line.release()
if hasattr(gpiod, "LINE_REQ_DIR_IN"):
self._line.request(
consumer=self._CONSUMER, type=gpiod.LINE_REQ_DIR_IN, flags=flags
)
else:
config = gpiod.line_request()
config.consumer = self._CONSUMER
config.request_type = gpiod.line_request.DIRECTION_INPUT
self._line.request(config)
elif mode == self.OUT:
if pull is not None:
raise RuntimeError("Cannot set pull resistor on output")
self._mode = self.OUT
self._line.release()
if hasattr(gpiod, "LINE_REQ_DIR_OUT"):
self._line.request(
consumer=self._CONSUMER, type=gpiod.LINE_REQ_DIR_OUT
)
else:
config = gpiod.line_request()
config.consumer = self._CONSUMER
config.request_type = gpiod.line_request.DIRECTION_OUTPUT
self._line.request(config)
else:
raise RuntimeError("Invalid mode for pin: %s" % self.id)
def value(self, val=None):
"""Set or return the Pin Value"""
if val is None:
return self._line.get_value()
if val in (self.LOW, self.HIGH):
self._value = val
self._line.set_value(val)
return None
raise RuntimeError("Invalid value for pin")

View file

@ -0,0 +1,99 @@
# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""A Pin class for use with libgpiod 2.x."""
import gpiod
# pylint: disable=too-many-branches,too-many-statements
class Pin:
"""Pins dont exist in CPython so...lets make our own!"""
IN = 0
OUT = 1
LOW = 0
HIGH = 1
PULL_NONE = 0
PULL_UP = 1
PULL_DOWN = 2
_CONSUMER = "adafruit_blinka"
id = None
_value = LOW
_mode = IN
_value_map = (gpiod.line.Value.INACTIVE, gpiod.line.Value.ACTIVE)
def __init__(self, pin_id):
self.id = pin_id
chip_id = 0
if isinstance(pin_id, tuple):
chip_id, self._num = pin_id
if isinstance(chip_id, int):
chip_id = f"/dev/gpiochip{chip_id}"
self._chip = gpiod.Chip(chip_id)
self._line_request = None
def __del__(self):
if self._line_request:
self._line_request.release()
def __repr__(self):
return str(self.id)
def __eq__(self, other):
return self.id == other
def init(self, mode=IN, pull=None):
"""Initialize the Pin"""
# Input,
if not self._line_request:
self._line_request = self._chip.request_lines(
config={int(self._num): None},
consumer=self._CONSUMER,
)
# print("init line: ", self.id, self._line)
if mode is not None:
line_config = gpiod.LineSettings()
if mode == self.IN:
line_config.direction = gpiod.line.Direction.INPUT
if pull is not None:
if pull == self.PULL_UP:
line_config.bias = gpiod.line.Bias.PULL_UP
elif pull == self.PULL_DOWN:
line_config.bias = gpiod.line.Bias.PULL_DOWN
elif pull == self.PULL_NONE:
line_config.bias = gpiod.line.Bias.DISABLED
else:
raise RuntimeError(f"Invalid pull for pin: {self.id}")
self._mode = self.IN
self._line_request.reconfigure_lines(
{
int(self._num): line_config,
}
)
elif mode == self.OUT:
if pull is not None:
raise RuntimeError("Cannot set pull resistor on output")
self._mode = self.OUT
line_config.direction = gpiod.line.Direction.OUTPUT
self._line_request.reconfigure_lines(
{
int(self._num): line_config,
}
)
else:
raise RuntimeError("Invalid mode for pin: %s" % self.id)
def value(self, val=None):
"""Set or return the Pin Value"""
if val is None:
return bool(self._value_map.index(self._line_request.get_value(self._num)))
if val in (self.LOW, self.HIGH):
self._value = val
self._line_request.set_value(self._num, self._value_map[int(val)])
return None
raise RuntimeError("Invalid value for pin")

View file

@ -0,0 +1,22 @@
# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""A Chip class for use with libgpiod."""
try:
import gpiod
except ImportError:
raise ImportError(
"libgpiod Python bindings not found, please install and try again! See "
"https://github.com/adafruit/Raspberry-Pi-Installer-Scripts/blob/main/libgpiod.py"
) from ImportError
# Versions 1.5.4 and earlier have no __version__ attribute
if hasattr(gpiod, "__version__"):
version = gpiod.__version__
else:
version = "1.x"
if version.startswith("1."):
from .libgpiod.libgpiod_chip_1_x import Chip # pylint: disable=unused-import
else:
from .libgpiod.libgpiod_chip_2_x import Chip # pylint: disable=unused-import

View file

@ -0,0 +1,22 @@
# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""A Pin class for use with libgpiod."""
try:
import gpiod
except ImportError:
raise ImportError(
"libgpiod Python bindings not found, please install and try again! See "
"https://github.com/adafruit/Raspberry-Pi-Installer-Scripts/blob/main/libgpiod.py"
) from ImportError
# Versions 1.5.4 and earlier have no __version__ attribute
if hasattr(gpiod, "__version__"):
version = gpiod.__version__
else:
version = "1.x"
if version.startswith("1."):
from .libgpiod.libgpiod_pin_1_x import Pin # pylint: disable=unused-import
else:
from .libgpiod.libgpiod_pin_2_x import Pin # pylint: disable=unused-import

View file

@ -0,0 +1,86 @@
# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""A Pin class for use with periphery."""
try:
from periphery import GPIO
except ImportError:
raise ImportError(
"Periphery Python bindings not found, please install and try again! "
"Try running 'pip3 install python-periphery'"
) from ImportError
class Pin:
"""Pins dont exist in CPython so...lets make our own!"""
IN = "in"
OUT = "out"
LOW = 0
HIGH = 1
PULL_NONE = 0
PULL_UP = 1
PULL_DOWN = 2
id = None
_value = LOW
_mode = IN
def __init__(self, pin_id):
self.id = pin_id
if isinstance(pin_id, tuple):
self._num = int(pin_id[1])
self._chippath = "/dev/gpiochip{}".format(pin_id[0])
else:
self._num = int(pin_id)
self._chippath = "/dev/gpiochip0"
self._line = None
def __repr__(self):
return str(self.id)
def __eq__(self, other):
return self.id == other
def init(self, mode=IN, pull=None):
"""Initialize the Pin"""
if mode is not None:
if mode == self.IN:
self._mode = self.IN
if self._line is not None:
self._line.close()
self._line = GPIO(self._chippath, int(self._num), self.IN)
elif mode == self.OUT:
self._mode = self.OUT
if self._line is not None:
self._line.close()
self._line = GPIO(self._chippath, int(self._num), self.OUT)
else:
raise RuntimeError("Invalid mode for pin: %s" % self.id)
if pull is not None:
if pull == self.PULL_UP:
raise NotImplementedError(
"Internal pullups not supported in periphery, "
"use physical resistor instead!"
)
if pull == self.PULL_DOWN:
raise NotImplementedError(
"Internal pulldowns not supported in periphery, "
"use physical resistor instead!"
)
raise RuntimeError("Invalid pull for pin: %s" % self.id)
def value(self, val=None):
"""Set or return the Pin Value"""
if val is not None:
if val == self.LOW:
self._value = val
self._line.write(False)
return None
if val == self.HIGH:
self._value = val
self._line.write(True)
return None
raise RuntimeError("Invalid value for pin")
return self.HIGH if self._line.read() else self.LOW

View file

@ -0,0 +1,139 @@
# SPDX-FileCopyrightText: 2025 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`rotaryio` - Support for reading rotation sensors
===========================================================
See `CircuitPython:rotaryio` in CircuitPython for more details.
Generic Threading/DigitalIO implementation for Linux
* Author(s): Melissa LeBlanc-Williams
"""
from __future__ import annotations
import threading
import microcontroller
import digitalio
# Define the state transition table for the quadrature encoder
transitions = [
0, # 00 -> 00 no movement
-1, # 00 -> 01 3/4 ccw (11 detent) or 1/4 ccw (00 at detent)
+1, # 00 -> 10 3/4 cw or 1/4 cw
0, # 00 -> 11 non-Gray-code transition
+1, # 01 -> 00 2/4 or 4/4 cw
0, # 01 -> 01 no movement
0, # 01 -> 10 non-Gray-code transition
-1, # 01 -> 11 4/4 or 2/4 ccw
-1, # 10 -> 00 2/4 or 4/4 ccw
0, # 10 -> 01 non-Gray-code transition
0, # 10 -> 10 no movement
+1, # 10 -> 11 4/4 or 2/4 cw
0, # 11 -> 00 non-Gray-code transition
+1, # 11 -> 01 1/4 or 3/4 cw
-1, # 11 -> 10 1/4 or 3/4 ccw
0, # 11 -> 11 no movement
]
class IncrementalEncoder:
"""
IncrementalEncoder determines the relative rotational position based on two series of
pulses. It assumes that the encoders common pin(s) are connected to ground,and enables
pull-ups on pin_a and pin_b.
Create an IncrementalEncoder object associated with the given pins. It tracks the
positional state of an incremental rotary encoder (also known as a quadrature encoder.)
Position is relative to the position when the object is constructed.
"""
def __init__(
self, pin_a: microcontroller.Pin, pin_b: microcontroller.Pin, divisor: int = 4
):
"""
Create an IncrementalEncoder object associated with the given pins. It tracks the
positional state of an incremental rotary encoder (also known as a quadrature encoder.)
Position is relative to the position when the object is constructed.
:param microcontroller.Pin pin_a: The first pin connected to the encoder.
:param microcontroller.Pin pin_b: The second pin connected to the encoder.
:param int divisor: The number of pulses per encoder step. Default is 4.
"""
self._pin_a = digitalio.DigitalInOut(pin_a)
self._pin_a.switch_to_input(pull=digitalio.Pull.UP)
self._pin_b = digitalio.DigitalInOut(pin_b)
self._pin_b.switch_to_input(pull=digitalio.Pull.UP)
self._position = 0
self._last_state = 0
self._divisor = divisor
self._sub_count = 0
self._poll_thread = threading.Thread(target=self._polling_loop, daemon=True)
self._poll_thread.start()
def deinit(self):
"""Deinitializes the IncrementalEncoder and releases any hardware resources for reuse."""
self._pin_a.deinit()
self._pin_b.deinit()
if self._poll_thread.is_alive():
self._poll_thread.join()
def __enter__(self) -> IncrementalEncoder:
"""No-op used by Context Managers."""
return self
def __exit__(self, _type, _value, _traceback):
"""
Automatically deinitializes when exiting a context. See
:ref:`lifetime-and-contextmanagers` for more info.
"""
self.deinit()
@property
def divisor(self) -> int:
"""The divisor of the quadrature signal. Use 1 for encoders without detents, or encoders
with 4 detents per cycle. Use 2 for encoders with 2 detents per cycle. Use 4 for encoders
with 1 detent per cycle."""
return self._divisor
@divisor.setter
def divisor(self, value: int):
self._divisor = value
@property
def position(self) -> int:
"""The current position in terms of pulses. The number of pulses per rotation is defined
by the specific hardware and by the divisor."""
return self._position
@position.setter
def position(self, value: int):
self._position = value
def _get_pin_state(self) -> int:
"""Returns the current state of the pins."""
return self._pin_a.value << 1 | self._pin_b.value
def _polling_loop(self):
while True:
self._poll_encoder()
def _poll_encoder(self):
# Check the state of the pins
# if either pin has changed, update the state
new_state = self._get_pin_state()
if new_state != self._last_state:
self._state_update(new_state)
self._last_state = new_state
def _state_update(self, new_state: int):
new_state &= 3
index = self._last_state << 2 | new_state
sub_increment = transitions[index]
self._sub_count += sub_increment
if self._sub_count >= self._divisor:
self._position += 1
self._sub_count = 0
elif self._sub_count <= -self._divisor:
self._position -= 1
self._sub_count = 0

View file

@ -0,0 +1,69 @@
# SPDX-FileCopyrightText: 2025 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""A Pin class for use with Rpi.GPIO."""
from RPi import GPIO
GPIO.setmode(GPIO.BCM) # Use BCM pins D4 = GPIO #4
GPIO.setwarnings(False) # shh!
class Pin:
"""Pins dont exist in CPython so...lets make our own!"""
IN = 0
OUT = 1
LOW = 0
HIGH = 1
PULL_NONE = 0
PULL_UP = 1
PULL_DOWN = 2
id = None
_value = LOW
_mode = IN
def __init__(self, bcm_number):
self.id = bcm_number
def __repr__(self):
return str(self.id)
def __eq__(self, other):
return self.id == other
def init(self, mode=IN, pull=None):
"""Initialize the Pin"""
if mode is not None:
if mode == self.IN:
self._mode = self.IN
GPIO.setup(self.id, GPIO.IN)
elif mode == self.OUT:
self._mode = self.OUT
GPIO.setup(self.id, GPIO.OUT)
else:
raise RuntimeError("Invalid mode for pin: %s" % self.id)
if pull is not None:
if self._mode != self.IN:
raise RuntimeError("Cannot set pull resistor on output")
if pull == self.PULL_UP:
GPIO.setup(self.id, GPIO.IN, pull_up_down=GPIO.PUD_UP)
elif pull == self.PULL_DOWN:
GPIO.setup(self.id, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
else:
raise RuntimeError("Invalid pull for pin: %s" % self.id)
def value(self, val=None):
"""Set or return the Pin Value"""
if val is not None:
if val == self.LOW:
self._value = val
GPIO.output(self.id, val)
elif val == self.HIGH:
self._value = val
GPIO.output(self.id, val)
else:
raise RuntimeError("Invalid value for pin")
return None
return GPIO.input(self.id)

View file

@ -0,0 +1,165 @@
# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""Custom PWMOut Wrapper for Rpi.GPIO PWM Class"""
from RPi import GPIO
GPIO.setmode(GPIO.BCM) # Use BCM pins D4 = GPIO #4
GPIO.setwarnings(False) # shh!
# pylint: disable=unnecessary-pass
class PWMError(IOError):
"""Base class for PWM errors."""
pass
# pylint: enable=unnecessary-pass
class PWMOut:
"""Pulse Width Modulation Output Class"""
def __init__(self, pin, *, frequency=500, duty_cycle=0, variable_frequency=False):
self._pwmpin = None
self._period = 0
self._open(pin, duty_cycle, frequency, variable_frequency)
def __del__(self):
self.deinit()
def __enter__(self):
return self
def __exit__(self, t, value, traceback):
self.deinit()
def _open(self, pin, duty=0, freq=500, variable_frequency=False):
self._pin = pin
GPIO.setup(pin.id, GPIO.OUT)
self._pwmpin = GPIO.PWM(pin.id, freq)
if variable_frequency:
print("Variable Frequency is not supported, continuing without it...")
# set frequency
self.frequency = freq
# set duty
self.duty_cycle = duty
self.enabled = True
def deinit(self):
"""Deinit the PWM."""
if self._pwmpin is not None:
self._pwmpin.stop()
GPIO.cleanup(self._pin.id)
self._pwmpin = None
def _is_deinited(self):
if self._pwmpin is None:
raise ValueError(
"Object has been deinitialize and can no longer "
"be used. Create a new object."
)
@property
def period(self):
"""Get or set the PWM's output period in seconds.
Raises:
PWMError: if an I/O or OS error occurs.
TypeError: if value type is not int or float.
:type: int, float
"""
return 1.0 / self.frequency
@period.setter
def period(self, period):
if not isinstance(period, (int, float)):
raise TypeError("Invalid period type, should be int or float.")
self.frequency = 1.0 / period
@property
def duty_cycle(self):
"""Get or set the PWM's output duty cycle which is the fraction of
each pulse which is high. 16-bit
Raises:
PWMError: if an I/O or OS error occurs.
TypeError: if value type is not int or float.
ValueError: if value is out of bounds of 0.0 to 1.0.
:type: int, float
"""
return int(self._duty_cycle * 65535)
@duty_cycle.setter
def duty_cycle(self, duty_cycle):
if not isinstance(duty_cycle, (int, float)):
raise TypeError("Invalid duty cycle type, should be int or float.")
if not 0 <= duty_cycle <= 65535:
raise ValueError("Invalid duty cycle value, should be between 0 and 65535")
# convert from 16-bit
duty_cycle /= 65535.0
self._duty_cycle = duty_cycle
self._pwmpin.ChangeDutyCycle(round(self._duty_cycle * 100))
@property
def frequency(self):
"""Get or set the PWM's output frequency in Hertz.
Raises:
PWMError: if an I/O or OS error occurs.
TypeError: if value type is not int or float.
:type: int, float
"""
return self._frequency
@frequency.setter
def frequency(self, frequency):
if not isinstance(frequency, (int, float)):
raise TypeError("Invalid frequency type, should be int or float.")
self._pwmpin.ChangeFrequency(round(frequency))
self._frequency = frequency
@property
def enabled(self):
"""Get or set the PWM's output enabled state.
Raises:
PWMError: if an I/O or OS error occurs.
TypeError: if value type is not bool.
:type: bool
"""
return self._enabled
@enabled.setter
def enabled(self, value):
if not isinstance(value, bool):
raise TypeError("Invalid enabled type, should be string.")
if value:
self._pwmpin.start(round(self._duty_cycle * 100))
else:
self._pwmpin.stop()
self._enabled = value
# String representation
def __str__(self):
return "pin %s (freq=%f Hz, duty_cycle=%f%%)" % (
self._pin,
self.frequency,
self.duty_cycle,
)

View file

@ -0,0 +1,143 @@
# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""Generic Linux SPI class using PureIO's SPI class"""
from Adafruit_PureIO import spi
from adafruit_blinka.agnostic import detector
class SPI:
"""SPI Class"""
MSB = 0
LSB = 1
CPHA = 1
CPOL = 2
baudrate = 100000
mode = 0
bits = 8
def __init__(self, portid):
if isinstance(portid, tuple):
self._spi = spi.SPI(device=portid)
else:
self._spi = spi.SPI(device=(portid, 0))
self.clock_pin = None
self.mosi_pin = None
self.miso_pin = None
self.chip = None
# pylint: disable=too-many-arguments,unused-argument
def init(
self,
baudrate=100000,
polarity=0,
phase=0,
bits=8,
firstbit=MSB,
sck=None,
mosi=None,
miso=None,
):
"""Initialize SPI"""
mode = 0
if polarity:
mode |= self.CPOL
if phase:
mode |= self.CPHA
self.baudrate = baudrate
self.mode = mode
self.bits = bits
self.chip = detector.chip
# Pins are not used
self.clock_pin = sck
self.mosi_pin = mosi
self.miso_pin = miso
# pylint: enable=too-many-arguments,unused-argument
# pylint: disable=unnecessary-pass
def set_no_cs(self):
"""Setting so that SPI doesn't automatically set the CS pin"""
# No kernel seems to support this, so we're just going to pass
pass
# pylint: enable=unnecessary-pass
@property
def frequency(self):
"""Return the current baudrate"""
return self.baudrate
def write(self, buf, start=0, end=None):
"""Write data from the buffer to SPI"""
if buf is None or len(buf) < 1:
return
if end is None:
end = len(buf)
try:
# self._spi.open(self._port, 0)
self.set_no_cs()
self._spi.max_speed_hz = self.baudrate
self._spi.mode = self.mode
self._spi.bits_per_word = self.bits
self._spi.writebytes(buf[start:end])
# self._spi.close()
except FileNotFoundError:
print("Could not open SPI device - check if SPI is enabled in kernel!")
raise
def readinto(self, buf, start=0, end=None, write_value=0):
"""Read data from SPI and into the buffer"""
if buf is None or len(buf) < 1:
return
if end is None:
end = len(buf)
try:
# self._spi.open(self._port, 0)
# self.set_no_cs()
self._spi.max_speed_hz = self.baudrate
self._spi.mode = self.mode
self._spi.bits_per_word = self.bits
data = self._spi.transfer([write_value] * (end - start))
for i in range(end - start): # 'readinto' the given buffer
buf[start + i] = data[i]
# self._spi.close()
except FileNotFoundError:
print("Could not open SPI device - check if SPI is enabled in kernel!")
raise
# pylint: disable=too-many-arguments
def write_readinto(
self, buffer_out, buffer_in, out_start=0, out_end=None, in_start=0, in_end=None
):
"""Perform a half-duplex write from buffer_out and then
read data into buffer_in
"""
if buffer_out is None or buffer_in is None:
return
if len(buffer_out) < 1 or len(buffer_in) < 1:
return
if out_end is None:
out_end = len(buffer_out)
if in_end is None:
in_end = len(buffer_in)
if out_end - out_start != in_end - in_start:
raise RuntimeError("Buffer slices must be of equal length.")
try:
# self._spi.open(self._port, 0)
# self.set_no_cs()
self._spi.max_speed_hz = self.baudrate
self._spi.mode = self.mode
self._spi.bits_per_word = self.bits
data = self._spi.transfer(list(buffer_out[out_start : out_end + 1]))
for i in range((in_end - in_start)):
buffer_in[i + in_start] = data[i]
# self._spi.close()
except FileNotFoundError:
print("Could not open SPI device - check if SPI is enabled in kernel!")
raise
# pylint: enable=too-many-arguments

View file

@ -0,0 +1,98 @@
# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`analogio` - Analog input control
=================================================
Read the SysFS ADC using IIO (Industrial Input/Output) and return the value
See `CircuitPython:analogio` in CircuitPython for more details.
* Author(s): Melissa LeBlanc-Williams
"""
import os
from adafruit_blinka import ContextManaged
try:
from microcontroller.pin import analogIns
except ImportError:
raise RuntimeError("No Analog Inputs defined for this board") from ImportError
class AnalogIn(ContextManaged):
"""Analog Input Class"""
# Sysfs paths
_sysfs_path = "/sys/bus/iio/devices/"
_device_path = "iio:device{}"
# Channel paths
_channel_path = "in_voltage{}_raw"
_scale_path = "in_voltage_scale"
def __init__(self, adc_id):
"""Instantiate an AnalogIn object and verify the sysfs IIO
corresponding to the specified channel and pin.
Args:
adc_id (int): Analog Input ID as defined in microcontroller.pin
Returns:
AnalogIn: AnalogIn object.
Raises:
TypeError: if `channel` or `pin` types are invalid.
ValueError: if AnalogIn channel does not exist.
"""
self.id = adc_id
self._device = None
self._channel = None
self._open(adc_id)
def __enter__(self):
return self
def _open(self, adc_id):
self._device = None
for adcpair in analogIns:
if adcpair[0] == adc_id:
self._device = adcpair[1]
self._channel = adcpair[2]
if self._device is None:
raise RuntimeError("No AnalogIn device found for the given ID")
device_path = os.path.join(
self._sysfs_path, self._device_path.format(self._device)
)
if not os.path.isdir(device_path):
raise ValueError(
"AnalogIn device does not exist, check that the required modules are loaded."
)
@property
def value(self):
"""Read the ADC and return the value as an integer"""
path = os.path.join(
self._sysfs_path,
self._device_path.format(self._device),
self._channel_path.format(self._channel),
)
with open(path, "r", encoding="utf-8") as analog_in:
return int(analog_in.read().strip())
# pylint: disable=no-self-use
@value.setter
def value(self, value):
# emulate what CircuitPython does
raise AttributeError("'AnalogIn' object has no attribute 'value'")
# pylint: enable=no-self-use
def deinit(self):
self._device = None
self._channel = None

View file

@ -0,0 +1,96 @@
# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`analogio` - Analog output control
=================================================
Write the SysFS DAC using IIO (Industrial Input/Output)
See `CircuitPython:analogio` in CircuitPython for more details.
* Author(s): Melissa LeBlanc-Williams
"""
import os
from adafruit_blinka import ContextManaged
try:
from microcontroller.pin import analogOuts
except ImportError:
raise RuntimeError("No Analog Outputs defined for this board") from ImportError
class AnalogOut(ContextManaged):
"""Analog Output Class"""
# Sysfs paths
_sysfs_path = "/sys/bus/iio/devices/"
_device_path = "iio:device{}"
# Channel paths
_channel_path = "out_voltage{}_raw"
_scale_path = "out_voltage_scale"
def __init__(self, dac_id):
"""Instantiate an AnalogOut object and verify the sysfs IIO
corresponding to the specified channel and pin.
Args:
dac_id (int): Analog Output ID as defined in microcontroller.pin
Returns:
AnalogOut: AnalogOut object.
Raises:
TypeError: if `channel` or `pin` types are invalid.
ValueError: if AnalogOut channel does not exist.
"""
self.id = dac_id
self._device = None
self._channel = None
self._open(dac_id)
def __enter__(self):
return self
def _open(self, dac_id):
self._device = None
for dacpair in analogOuts:
if dacpair[0] == dac_id:
self._device = dacpair[1]
self._channel = dacpair[2]
if self._device is None:
raise RuntimeError("No AnalogOut device found for the given ID")
device_path = os.path.join(
self._sysfs_path, self._device_path.format(self._device)
)
if not os.path.isdir(device_path):
raise ValueError(
"AnalogOut device does not exist, check that the required modules are loaded."
)
@property
def value(self):
"""Return an error. This is output only."""
# emulate what CircuitPython does
raise AttributeError("unreadable attribute")
@value.setter
def value(self, value):
"""Write to the DAC"""
path = os.path.join(
self._sysfs_path,
self._device_path.format(self._device),
self._channel_path.format(self._channel),
)
with open(path, "w", encoding="utf-8") as analog_out:
return analog_out.write(value + "\n")
def deinit(self):
self._device = None
self._channel = None

View file

@ -0,0 +1,347 @@
# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
Much code from https://github.com/vsergeev/python-periphery/blob/master/periphery/gpio.py
Copyright (c) 2015-2019 vsergeev / Ivan (Vanya) A. Sergeev
License: MIT
"""
import os
import errno
import time
# pylint: disable=unnecessary-pass
class GPIOError(IOError):
"""Base class for GPIO errors."""
pass
# pylint: enable=unnecessary-pass
class Pin:
"""SysFS GPIO Pin Class"""
# Number of retries to check for GPIO export or direction write on open
GPIO_OPEN_RETRIES = 10
# Delay between check for GPIO export or direction write on open (100ms)
GPIO_OPEN_DELAY = 0.1
IN = "in"
OUT = "out"
LOW = 0
HIGH = 1
PULL_NONE = 0
PULL_UP = 1
PULL_DOWN = 2
id = None
_value = LOW
_mode = IN
# Sysfs paths
_sysfs_path = "/sys/class/gpio/"
_channel_path = "gpiochip{}"
# Channel paths
_export_path = "export"
_unexport_path = "unexport"
_pin_path = "gpio{}"
def __init__(self, pin_id):
"""Instantiate a Pin object and open the sysfs GPIO with the specified
pin number.
Args:
pin_id (int): GPIO pin number.
Returns:
SysfsGPIO: GPIO object.
Raises:
GPIOError: if an I/O or OS error occurs.
TypeError: if `line` or `direction` types are invalid.
ValueError: if `direction` value is invalid.
TimeoutError: if waiting for GPIO export times out.
"""
if not isinstance(pin_id, int):
raise TypeError("Invalid Pin ID, should be integer.")
self.id = pin_id
self._fd = None
self._line = None
self._path = None
def __del__(self):
self._close()
def __enter__(self):
return self
def __exit__(self, t, value, traceback):
self._close()
def init(self, mode=IN, pull=None):
"""Initialize the Pin"""
if mode is not None:
if mode == self.IN:
self._mode = self.IN
self._open(self.IN)
elif mode == self.OUT:
self._mode = self.OUT
self._open(self.OUT)
else:
raise RuntimeError("Invalid mode for pin: %s" % self.id)
if pull is not None:
if pull == self.PULL_UP:
raise NotImplementedError(
"Internal pullups not supported in periphery, "
"use physical resistor instead!"
)
if pull == self.PULL_DOWN:
raise NotImplementedError(
"Internal pulldowns not supported in periphery, "
"use physical resistor instead!"
)
raise RuntimeError("Invalid pull for pin: %s" % self.id)
def value(self, val=None):
"""Set or return the Pin Value"""
if val is not None:
if val == self.LOW:
self._value = val
self._write(False)
return None
if val == self.HIGH:
self._value = val
self._write(True)
return None
raise RuntimeError("Invalid value for pin")
return self.HIGH if self._read() else self.LOW
# pylint: disable=too-many-branches
def _open(self, direction):
if not isinstance(direction, str):
raise TypeError("Invalid direction type, should be string.")
if direction.lower() not in ["in", "out", "high", "low"]:
raise ValueError('Invalid direction, can be: "in", "out", "high", "low".')
gpio_path = "/sys/class/gpio/gpio{:d}".format(self.id)
if not os.path.isdir(gpio_path):
# Export the line
try:
with open("/sys/class/gpio/export", "w", encoding="utf-8") as f_export:
f_export.write("{:d}\n".format(self.id))
except IOError as e:
raise GPIOError(e.errno, "Exporting GPIO: " + e.strerror) from IOError
# Loop until GPIO is exported
exported = False
for i in range(self.GPIO_OPEN_RETRIES):
if os.path.isdir(gpio_path):
exported = True
break
time.sleep(self.GPIO_OPEN_DELAY)
if not exported:
raise TimeoutError(
'Exporting GPIO: waiting for "{:s}" timed out'.format(gpio_path)
)
# Write direction, looping in case of EACCES errors due to delayed udev
# permission rule application after export
for i in range(self.GPIO_OPEN_RETRIES):
try:
with open(
os.path.join(gpio_path, "direction"), "w", encoding="utf-8"
) as f_direction:
f_direction.write(direction.lower() + "\n")
break
except IOError as e:
if e.errno != errno.EACCES or (
e.errno == errno.EACCES and i == self.GPIO_OPEN_RETRIES - 1
):
raise GPIOError(
e.errno, "Setting GPIO direction: " + e.strerror
) from IOError
time.sleep(self.GPIO_OPEN_DELAY)
else:
# Write direction
try:
with open(
os.path.join(gpio_path, "direction"), "w", encoding="utf-8"
) as f_direction:
f_direction.write(direction.lower() + "\n")
except IOError as e:
raise GPIOError(
e.errno, "Setting GPIO direction: " + e.strerror
) from IOError
# Open value
try:
self._fd = os.open(os.path.join(gpio_path, "value"), os.O_RDWR)
except OSError as e:
raise GPIOError(e.errno, "Opening GPIO: " + e.strerror) from OSError
self._path = gpio_path
# pylint: enable=too-many-branches
def _close(self):
if self._fd is None:
return
try:
os.close(self._fd)
except OSError as e:
raise GPIOError(e.errno, "Closing GPIO: " + e.strerror) from OSError
self._fd = None
# Unexport the line
try:
unexport_fd = os.open("/sys/class/gpio/unexport", os.O_WRONLY)
os.write(unexport_fd, "{:d}\n".format(self.id).encode())
os.close(unexport_fd)
except OSError as e:
raise GPIOError(e.errno, "Unexporting GPIO: " + e.strerror) from OSError
def _read(self):
# Read value
try:
buf = os.read(self._fd, 2)
except OSError as e:
raise GPIOError(e.errno, "Reading GPIO: " + e.strerror) from OSError
# Rewind
try:
os.lseek(self._fd, 0, os.SEEK_SET)
except OSError as e:
raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror) from OSError
if buf[0] == b"0"[0]:
return False
if buf[0] == b"1"[0]:
return True
raise GPIOError(None, "Unknown GPIO value: {}".format(buf))
def _write(self, value):
if not isinstance(value, bool):
raise TypeError("Invalid value type, should be bool.")
# Write value
try:
if value:
os.write(self._fd, b"1\n")
else:
os.write(self._fd, b"0\n")
except OSError as e:
raise GPIOError(e.errno, "Writing GPIO: " + e.strerror) from OSError
# Rewind
try:
os.lseek(self._fd, 0, os.SEEK_SET)
except OSError as e:
raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror) from OSError
@property
def chip_name(self):
"""Return the Chip Name"""
gpio_path = os.path.join(self._path, "device")
gpiochip_path = os.readlink(gpio_path)
if "/" not in gpiochip_path:
raise GPIOError(
None,
'Reading gpiochip name: invalid device symlink "{:s}"'.format(
gpiochip_path
),
)
return gpiochip_path.split("/")[-1]
@property
def chip_label(self):
"""Return the Chip Label"""
gpio_path = "/sys/class/gpio/{:s}/label".format(self.chip_name)
try:
with open(gpio_path, "r", encoding="utf-8") as f_label:
label = f_label.read()
except (GPIOError, IOError) as e:
if isinstance(e, IOError):
raise GPIOError(
e.errno, "Reading gpiochip label: " + e.strerror
) from IOError
raise GPIOError(
None, "Reading gpiochip label: " + e.strerror
) from GPIOError
return label.strip()
# Mutable properties
def _get_direction(self):
# Read direction
try:
with open(
os.path.join(self._path, "direction"), "r", encoding="utf-8"
) as f_direction:
direction = f_direction.read()
except IOError as e:
raise GPIOError(
e.errno, "Getting GPIO direction: " + e.strerror
) from IOError
return direction.strip()
def _set_direction(self, direction):
if not isinstance(direction, str):
raise TypeError("Invalid direction type, should be string.")
if direction.lower() not in ["in", "out", "high", "low"]:
raise ValueError('Invalid direction, can be: "in", "out", "high", "low".')
# Write direction
try:
with open(
os.path.join(self._path, "direction"), "w", encoding="utf-8"
) as f_direction:
f_direction.write(direction.lower() + "\n")
except IOError as e:
raise GPIOError(
e.errno, "Setting GPIO direction: " + e.strerror
) from IOError
direction = property(_get_direction, _set_direction)
def __str__(self):
try:
str_direction = self.direction
except GPIOError:
str_direction = "<error>"
try:
str_chip_name = self.chip_name
except GPIOError:
str_chip_name = "<error>"
try:
str_chip_label = self.chip_label
except GPIOError:
str_chip_label = "<error>"
output = "Pin {:d} (dev={:s}, fd={:d}, dir={:s}, chip_name='{:s}', chip_label='{:s}')"
return output.format(
self.id, self._path, self._fd, str_direction, str_chip_name, str_chip_label
)

View file

@ -0,0 +1,355 @@
# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
Much code from https://github.com/vsergeev/python-periphery/blob/master/periphery/pwm.py
Copyright (c) 2015-2016 vsergeev / Ivan (Vanya) A. Sergeev
License: MIT
"""
import os
from time import sleep
from errno import EACCES
try:
from microcontroller.pin import pwmOuts
except ImportError:
raise RuntimeError("No PWM outputs defined for this board") from ImportError
# pylint: disable=unnecessary-pass
class PWMError(IOError):
"""Base class for PWM errors."""
pass
# pylint: enable=unnecessary-pass
class PWMOut:
"""Pulse Width Modulation Output Class"""
# Number of retries to check for successful PWM export on open
PWM_STAT_RETRIES = 10
# Delay between check for scucessful PWM export on open (100ms)
PWM_STAT_DELAY = 0.1
# Sysfs paths
_sysfs_path = "/sys/class/pwm/"
_channel_path = "pwmchip{}"
# Channel paths
_export_path = "export"
_unexport_path = "unexport"
_pin_path = "pwm{}"
# Pin attribute paths
_pin_period_path = "period"
_pin_duty_cycle_path = "duty_cycle"
_pin_polarity_path = "polarity"
_pin_enable_path = "enable"
def __init__(self, pin, *, frequency=500, duty_cycle=0, variable_frequency=False):
"""Instantiate a PWM object and open the sysfs PWM corresponding to the
specified channel and pin.
Args:
pin (Pin): CircuitPython Pin object to output to
duty_cycle (int) : The fraction of each pulse which is high. 16-bit
frequency (int) : target frequency in Hertz (32-bit)
variable_frequency (bool) : True if the frequency will change over time
Returns:
PWMOut: PWMOut object.
Raises:
PWMError: if an I/O or OS error occurs.
TypeError: if `channel` or `pin` types are invalid.
ValueError: if PWM channel does not exist.
"""
self._pwmpin = None
self._channel = None
self._period = 0
self._open(pin, duty_cycle, frequency, variable_frequency)
def __del__(self):
self.deinit()
def __enter__(self):
return self
def __exit__(self, t, value, traceback):
self.deinit()
def _open(self, pin, duty=0, freq=500, variable_frequency=False):
self._channel = None
for pwmpair in pwmOuts:
if pwmpair[1] == pin:
self._channel = pwmpair[0][0]
self._pwmpin = pwmpair[0][1]
self._pin = pin
if self._channel is None:
raise RuntimeError("No PWM channel found for this Pin")
if variable_frequency:
print("Variable Frequency is not supported, continuing without it...")
channel_path = os.path.join(
self._sysfs_path, self._channel_path.format(self._channel)
)
if not os.path.isdir(channel_path):
raise ValueError(
"PWM channel does not exist, check that the required modules are loaded."
)
try:
with open(
os.path.join(channel_path, self._unexport_path), "w", encoding="utf-8"
) as f_unexport:
f_unexport.write("%d\n" % self._pwmpin)
except IOError:
pass # not unusual, it doesnt already exist
try:
with open(
os.path.join(channel_path, self._export_path), "w", encoding="utf-8"
) as f_export:
f_export.write("%d\n" % self._pwmpin)
except IOError as e:
raise PWMError(e.errno, "Exporting PWM pin: " + e.strerror) from IOError
# Loop until 'period' is writable, because application of udev rules
# after the above pin export is asynchronous.
# Without this loop, the following properties may not be writable yet.
for i in range(PWMOut.PWM_STAT_RETRIES):
try:
with open(
os.path.join(
channel_path, self._pin_path.format(self._pwmpin), "period"
),
"w",
encoding="utf-8",
):
break
except IOError as e:
if e.errno != EACCES or (
e.errno == EACCES and i == PWMOut.PWM_STAT_RETRIES - 1
):
raise PWMError(e.errno, "Opening PWM period: " + e.strerror) from e
sleep(PWMOut.PWM_STAT_DELAY)
# self._set_enabled(False) # This line causes a write error when trying to enable
# Look up the period, for fast duty cycle updates
self._period = self._get_period()
# self.duty_cycle = 0 # This line causes a write error when trying to enable
# set frequency
self.frequency = freq
# set duty
self.duty_cycle = duty
self._set_enabled(True)
def deinit(self):
"""Deinit the sysfs PWM."""
if self._channel is not None:
try:
channel_path = os.path.join(
self._sysfs_path, self._channel_path.format(self._channel)
)
with open(
os.path.join(channel_path, self._unexport_path),
"w",
encoding="utf-8",
) as f_unexport:
f_unexport.write("%d\n" % self._pwmpin)
except IOError as e:
raise PWMError(
e.errno, "Unexporting PWM pin: " + e.strerror
) from IOError
self._channel = None
self._pwmpin = None
def _is_deinited(self):
if self._pwmpin is None:
raise ValueError(
"Object has been deinitialize and can no longer "
"be used. Create a new object."
)
def _write_pin_attr(self, attr, value):
# Make sure the pin is active
self._is_deinited()
path = os.path.join(
self._sysfs_path,
self._channel_path.format(self._channel),
self._pin_path.format(self._pwmpin),
attr,
)
with open(path, "w", encoding="utf-8") as f_attr:
# print(value, path)
f_attr.write(value + "\n")
def _read_pin_attr(self, attr):
# Make sure the pin is active
self._is_deinited()
path = os.path.join(
self._sysfs_path,
self._channel_path.format(self._channel),
self._pin_path.format(self._pwmpin),
attr,
)
with open(path, "r", encoding="utf-8") as f_attr:
return f_attr.read().strip()
# Mutable properties
def _get_period(self):
period_ns = self._read_pin_attr(self._pin_period_path)
try:
period_ns = int(period_ns)
except ValueError:
raise PWMError(
None, 'Unknown period value: "%s"' % period_ns
) from ValueError
# Convert period from nanoseconds to seconds
period = period_ns / 1e9
# Update our cached period
self._period = period
return period
def _set_period(self, period):
if not isinstance(period, (int, float)):
raise TypeError("Invalid period type, should be int or float.")
# Convert period from seconds to integer nanoseconds
period_ns = int(period * 1e9)
self._write_pin_attr(self._pin_period_path, "{}".format(period_ns))
# Update our cached period
self._period = float(period)
period = property(_get_period, _set_period)
"""Get or set the PWM's output period in seconds.
Raises:
PWMError: if an I/O or OS error occurs.
TypeError: if value type is not int or float.
:type: int, float
"""
def _get_duty_cycle(self):
duty_cycle_ns = self._read_pin_attr(self._pin_duty_cycle_path)
try:
duty_cycle_ns = int(duty_cycle_ns)
except ValueError:
raise PWMError(
None, 'Unknown duty cycle value: "%s"' % duty_cycle_ns
) from ValueError
# Convert duty cycle from nanoseconds to seconds
duty_cycle = duty_cycle_ns / 1e9
# Convert duty cycle to ratio from 0.0 to 1.0
duty_cycle = duty_cycle / self._period
# convert to 16-bit
duty_cycle = int(duty_cycle * 65535)
return duty_cycle
def _set_duty_cycle(self, duty_cycle):
if not isinstance(duty_cycle, (int, float)):
raise TypeError("Invalid duty cycle type, should be int or float.")
# convert from 16-bit
duty_cycle /= 65535.0
if not 0.0 <= duty_cycle <= 1.0:
raise ValueError("Invalid duty cycle value, should be between 0.0 and 1.0.")
# Convert duty cycle from ratio to seconds
duty_cycle = duty_cycle * self._period
# Convert duty cycle from seconds to integer nanoseconds
duty_cycle_ns = int(duty_cycle * 1e9)
self._write_pin_attr(self._pin_duty_cycle_path, "{}".format(duty_cycle_ns))
duty_cycle = property(_get_duty_cycle, _set_duty_cycle)
"""Get or set the PWM's output duty cycle as a ratio from 0.0 to 1.0.
Raises:
PWMError: if an I/O or OS error occurs.
TypeError: if value type is not int or float.
ValueError: if value is out of bounds of 0.0 to 1.0.
:type: int, float
"""
def _get_frequency(self):
return 1.0 / self._get_period()
def _set_frequency(self, frequency):
if not isinstance(frequency, (int, float)):
raise TypeError("Invalid frequency type, should be int or float.")
self._set_period(1.0 / frequency)
frequency = property(_get_frequency, _set_frequency)
"""Get or set the PWM's output frequency in Hertz.
Raises:
PWMError: if an I/O or OS error occurs.
TypeError: if value type is not int or float.
:type: int, float
"""
def _get_enabled(self):
enabled = self._read_pin_attr(self._pin_enable_path)
if enabled == "1":
return True
if enabled == "0":
return False
raise PWMError(None, 'Unknown enabled value: "%s"' % enabled)
def _set_enabled(self, value):
"""Get or set the PWM's output enabled state.
Raises:
PWMError: if an I/O or OS error occurs.
TypeError: if value type is not bool.
:type: bool
"""
if not isinstance(value, bool):
raise TypeError("Invalid enabled type, should be string.")
self._write_pin_attr(self._pin_enable_path, "1" if value else "0")
# String representation
def __str__(self):
return "PWM%d, pin %s (freq=%f Hz, duty_cycle=%f%%)" % (
self._channel,
self._pin,
self.frequency,
self.duty_cycle * 100,
)