abschlussarbeit/bin/pyterm.py

367 lines
14 KiB
Python
Executable file

#!/home/pi/git/abschlussarbeit/bin/python3
"""Simple Python serial terminal
"""
# Copyright (c) 2010-2024, Emmanuel Blot <emmanuel.blot@free.fr>
# Copyright (c) 2016, Emmanuel Bouaziz <ebouaziz@free.fr>
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
# pylint: disable=broad-except
# pylint: disable=wrong-import-position
from argparse import ArgumentParser, FileType
from atexit import register
from collections import deque
from logging import Formatter, StreamHandler, DEBUG, ERROR
from os import environ, linesep, stat
from re import search
from sys import exit as sys_exit, modules, platform, stderr, stdout
from time import sleep
from threading import Event, Thread
from traceback import format_exc
from _thread import interrupt_main
# pylint: disable=import-error
# pylint: disable=import-outside-toplevel
from pyftdi import FtdiLogger
from pyftdi.ftdi import Ftdi
from pyftdi.misc import to_bps, add_custom_devices
from pyftdi.term import Terminal
class MiniTerm:
"""A mini serial terminal to demonstrate pyserial extensions"""
DEFAULT_BAUDRATE = 115200
def __init__(self, device, baudrate=None, parity=None, rtscts=False,
debug=False):
self._terminal = Terminal()
self._device = device
self._baudrate = baudrate or self.DEFAULT_BAUDRATE
self._port = self._open_port(self._device, self._baudrate, parity,
rtscts, debug)
self._resume = False
self._silent = False
self._rxq = deque()
self._rxe = Event()
self._debug = debug
register(self._cleanup)
def run(self, fullmode=False, loopback=False, silent=False,
localecho=False, autocr=False):
"""Switch to a pure serial terminal application"""
self._terminal.init(fullmode)
print(f'Entering minicom mode @ { self._port.baudrate} bps')
stdout.flush()
self._resume = True
# start the reader (target to host direction) within a dedicated thread
args = [loopback]
if self._device.startswith('ftdi://'):
# with pyftdi/pyusb/libusb stack, there is no kernel buffering
# which means that a UART source with data burst may overflow the
# FTDI HW buffer while the SW stack is dealing with formatting
# and console output. Use an intermediate thread to pop out data
# out from the HW as soon as it is made available, and use a deque
# to serve the actual reader thread
args.append(self._get_from_source)
sourcer = Thread(target=self._sourcer, daemon=True)
sourcer.start()
else:
# regular kernel buffered device
args.append(self._get_from_port)
reader = Thread(target=self._reader, args=tuple(args), daemon=True)
reader.start()
# start the writer (host to target direction)
self._writer(fullmode, silent, localecho, autocr)
def _sourcer(self):
try:
while self._resume:
data = self._port.read(4096)
if not data:
continue
self._rxq.append(data)
self._rxe.set()
except Exception as ex:
self._resume = False
print(str(ex), file=stderr)
interrupt_main()
def _get_from_source(self):
while not self._rxq and self._resume:
if self._rxe.wait(0.1):
self._rxe.clear()
break
if not self._rxq:
return bytearray()
return self._rxq.popleft()
def _get_from_port(self):
try:
return self._port.read(4096)
except OSError as ex:
self._resume = False
print(str(ex), file=stderr)
interrupt_main()
return bytearray()
except Exception as ex:
print(str(ex), file=stderr)
return bytearray()
def _reader(self, loopback, getfunc):
"""Loop forever, processing received serial data in terminal mode"""
try:
# Try to read as many bytes as possible at once, and use a short
# timeout to avoid blocking for more data
self._port.timeout = 0.050
while self._resume:
if self._silent:
sleep(0.25)
continue
data = getfunc()
if data:
stdout.write(data.decode('utf8', errors='replace'))
stdout.flush()
if loopback:
self._port.write(data)
except KeyboardInterrupt:
return
except Exception as exc:
print(f'Exception: {exc}')
if self._debug:
print(format_exc(chain=False), file=stderr)
interrupt_main()
def _writer(self, fullmode, silent, localecho, crlf=0):
"""Loop and copy console->serial until EOF character is found"""
while self._resume:
try:
char = self._terminal.getkey()
if fullmode and ord(char) == 0x2: # Ctrl+B
self._cleanup(True)
return
if self._terminal.IS_MSWIN:
if ord(char) in (0, 224):
char = self._terminal.getkey()
self._port.write(self._terminal.getch_to_escape(char))
continue
if ord(char) == 0x3: # Ctrl+C
raise KeyboardInterrupt('Ctrl-C break')
if silent:
if ord(char) == 0x6: # Ctrl+F
self._silent = True
print('Silent\n')
continue
if ord(char) == 0x7: # Ctrl+G
self._silent = False
print('Reg\n')
continue
if localecho:
stdout.write(char.decode('utf8', errors='replace'))
stdout.flush()
if crlf:
if char == b'\n':
self._port.write(b'\r')
if crlf > 1:
continue
self._port.write(char)
except KeyError:
continue
except KeyboardInterrupt:
if fullmode:
if self._terminal.IS_MSWIN:
self._port.write(b'\x03')
continue
self._cleanup(True)
def _cleanup(self, *args):
"""Cleanup resource before exiting"""
if args and args[0]:
print(f'{linesep}Aborting...')
try:
self._resume = False
if self._port:
# wait till the other thread completes
sleep(0.5)
try:
rem = self._port.inWaiting()
except IOError:
# maybe a bug in underlying wrapper...
rem = 0
# consumes all the received bytes
for _ in range(rem):
self._port.read()
self._port.close()
self._port = None
print('Bye.')
except Exception as ex:
print(str(ex), file=stderr)
finally:
if self._terminal:
self._terminal.reset()
self._terminal = None
@staticmethod
def _open_port(device, baudrate, parity, rtscts, debug=False):
"""Open the serial communication port"""
try:
from serial.serialutil import SerialException
from serial import PARITY_NONE
except ImportError as exc:
raise ImportError("Python serial module not installed") from exc
try:
from serial import serial_for_url, VERSION as serialver
# use a simple regex rather than adding a new dependency on the
# more complete 'packaging' module
vmo = search(r'^(\d+)\.(\d+)', serialver)
if not vmo:
# unable to parse version
raise ValueError()
if tuple(int(x) for x in vmo.groups()) < (3, 0):
# pysrial version is too old
raise ValueError()
except (ValueError, IndexError, ImportError) as exc:
raise ImportError("pyserial 3.0+ is required") from exc
# the following import enables serial protocol extensions
if device.startswith('ftdi:'):
try:
from pyftdi import serialext
serialext.touch()
except ImportError as exc:
raise ImportError("PyFTDI module not installed") from exc
try:
port = serial_for_url(device,
baudrate=baudrate,
parity=parity or PARITY_NONE,
rtscts=rtscts,
timeout=0)
if not port.is_open:
port.open()
if not port.is_open:
raise IOError(f"Cannot open port '{device}'")
if debug:
backend = port.BACKEND if hasattr(port, 'BACKEND') else '?'
print(f"Using serial backend '{backend}'")
return port
except SerialException as exc:
raise IOError(str(exc)) from exc
def get_default_device() -> str:
"""Return the default comm device, depending on the host/OS."""
envdev = environ.get('FTDI_DEVICE', '')
if envdev:
return envdev
if platform == 'win32':
device = 'COM1'
elif platform == 'darwin':
device = '/dev/cu.usbserial'
elif platform == 'linux':
device = '/dev/ttyS0'
else:
device = ''
try:
stat(device)
except OSError:
device = 'ftdi:///1'
return device
def main():
"""Main routine"""
debug = False
try:
default_device = get_default_device()
argparser = ArgumentParser(description=modules[__name__].__doc__)
argparser.add_argument('-f', '--fullmode', dest='fullmode',
action='store_true',
help='use full terminal mode, exit with '
'[Ctrl]+B')
argparser.add_argument('device', nargs='?', default=default_device,
help=f'serial port device name '
f'(default: {default_device}')
argparser.add_argument('-b', '--baudrate',
default=str(MiniTerm.DEFAULT_BAUDRATE),
help=f'serial port baudrate '
f'(default: {MiniTerm.DEFAULT_BAUDRATE})')
argparser.add_argument('-w', '--hwflow',
action='store_true',
help='hardware flow control')
argparser.add_argument('-e', '--localecho',
action='store_true',
help='local echo mode (print all typed chars)')
argparser.add_argument('-r', '--crlf',
action='count', default=0,
help='prefix LF with CR char, use twice to '
'replace all LF with CR chars')
argparser.add_argument('-l', '--loopback',
action='store_true',
help='loopback mode (send back all received '
'chars)')
argparser.add_argument('-s', '--silent', action='store_true',
help='silent mode')
argparser.add_argument('-P', '--vidpid', action='append',
help='specify a custom VID:PID device ID, '
'may be repeated')
argparser.add_argument('-V', '--virtual', type=FileType('r'),
help='use a virtual device, specified as YaML')
argparser.add_argument('-v', '--verbose', action='count',
help='increase verbosity')
argparser.add_argument('-d', '--debug', action='store_true',
help='enable debug mode')
args = argparser.parse_args()
debug = args.debug
if not args.device:
argparser.error('Serial device not specified')
loglevel = max(DEBUG, ERROR - (10 * (args.verbose or 0)))
loglevel = min(ERROR, loglevel)
if debug:
formatter = Formatter('%(asctime)s.%(msecs)03d %(name)-20s '
'%(message)s', '%H:%M:%S')
else:
formatter = Formatter('%(message)s')
FtdiLogger.set_formatter(formatter)
FtdiLogger.set_level(loglevel)
FtdiLogger.log.addHandler(StreamHandler(stderr))
if args.virtual:
from pyftdi.usbtools import UsbTools
# Force PyUSB to use PyFtdi test framework for USB backends
UsbTools.BACKENDS = ('pyftdi.tests.backend.usbvirt', )
# Ensure the virtual backend can be found and is loaded
backend = UsbTools.find_backend()
loader = backend.create_loader()()
loader.load(args.virtual)
try:
add_custom_devices(Ftdi, args.vidpid, force_hex=True)
except ValueError as exc:
argparser.error(str(exc))
miniterm = MiniTerm(device=args.device,
baudrate=to_bps(args.baudrate),
parity='N',
rtscts=args.hwflow,
debug=args.debug)
miniterm.run(args.fullmode, args.loopback, args.silent, args.localecho,
args.crlf)
except (IOError, ValueError) as exc:
print(f'\nError: {exc}', file=stderr)
if debug:
print(format_exc(chain=False), file=stderr)
sys_exit(1)
except KeyboardInterrupt:
sys_exit(2)
if __name__ == '__main__':
main()