#!/home/pi/git/abschlussarbeit/bin/python3 """Simple Python serial terminal """ # Copyright (c) 2010-2024, Emmanuel Blot # Copyright (c) 2016, Emmanuel Bouaziz # All rights reserved. # # SPDX-License-Identifier: BSD-3-Clause # pylint: disable=broad-except # pylint: disable=wrong-import-position from argparse import ArgumentParser, FileType from atexit import register from collections import deque from logging import Formatter, StreamHandler, DEBUG, ERROR from os import environ, linesep, stat from re import search from sys import exit as sys_exit, modules, platform, stderr, stdout from time import sleep from threading import Event, Thread from traceback import format_exc from _thread import interrupt_main # pylint: disable=import-error # pylint: disable=import-outside-toplevel from pyftdi import FtdiLogger from pyftdi.ftdi import Ftdi from pyftdi.misc import to_bps, add_custom_devices from pyftdi.term import Terminal class MiniTerm: """A mini serial terminal to demonstrate pyserial extensions""" DEFAULT_BAUDRATE = 115200 def __init__(self, device, baudrate=None, parity=None, rtscts=False, debug=False): self._terminal = Terminal() self._device = device self._baudrate = baudrate or self.DEFAULT_BAUDRATE self._port = self._open_port(self._device, self._baudrate, parity, rtscts, debug) self._resume = False self._silent = False self._rxq = deque() self._rxe = Event() self._debug = debug register(self._cleanup) def run(self, fullmode=False, loopback=False, silent=False, localecho=False, autocr=False): """Switch to a pure serial terminal application""" self._terminal.init(fullmode) print(f'Entering minicom mode @ { self._port.baudrate} bps') stdout.flush() self._resume = True # start the reader (target to host direction) within a dedicated thread args = [loopback] if self._device.startswith('ftdi://'): # with pyftdi/pyusb/libusb stack, there is no kernel buffering # which means that a UART source with data burst may overflow the # FTDI HW buffer while the SW stack is dealing with formatting # and console output. Use an intermediate thread to pop out data # out from the HW as soon as it is made available, and use a deque # to serve the actual reader thread args.append(self._get_from_source) sourcer = Thread(target=self._sourcer, daemon=True) sourcer.start() else: # regular kernel buffered device args.append(self._get_from_port) reader = Thread(target=self._reader, args=tuple(args), daemon=True) reader.start() # start the writer (host to target direction) self._writer(fullmode, silent, localecho, autocr) def _sourcer(self): try: while self._resume: data = self._port.read(4096) if not data: continue self._rxq.append(data) self._rxe.set() except Exception as ex: self._resume = False print(str(ex), file=stderr) interrupt_main() def _get_from_source(self): while not self._rxq and self._resume: if self._rxe.wait(0.1): self._rxe.clear() break if not self._rxq: return bytearray() return self._rxq.popleft() def _get_from_port(self): try: return self._port.read(4096) except OSError as ex: self._resume = False print(str(ex), file=stderr) interrupt_main() return bytearray() except Exception as ex: print(str(ex), file=stderr) return bytearray() def _reader(self, loopback, getfunc): """Loop forever, processing received serial data in terminal mode""" try: # Try to read as many bytes as possible at once, and use a short # timeout to avoid blocking for more data self._port.timeout = 0.050 while self._resume: if self._silent: sleep(0.25) continue data = getfunc() if data: stdout.write(data.decode('utf8', errors='replace')) stdout.flush() if loopback: self._port.write(data) except KeyboardInterrupt: return except Exception as exc: print(f'Exception: {exc}') if self._debug: print(format_exc(chain=False), file=stderr) interrupt_main() def _writer(self, fullmode, silent, localecho, crlf=0): """Loop and copy console->serial until EOF character is found""" while self._resume: try: char = self._terminal.getkey() if fullmode and ord(char) == 0x2: # Ctrl+B self._cleanup(True) return if self._terminal.IS_MSWIN: if ord(char) in (0, 224): char = self._terminal.getkey() self._port.write(self._terminal.getch_to_escape(char)) continue if ord(char) == 0x3: # Ctrl+C raise KeyboardInterrupt('Ctrl-C break') if silent: if ord(char) == 0x6: # Ctrl+F self._silent = True print('Silent\n') continue if ord(char) == 0x7: # Ctrl+G self._silent = False print('Reg\n') continue if localecho: stdout.write(char.decode('utf8', errors='replace')) stdout.flush() if crlf: if char == b'\n': self._port.write(b'\r') if crlf > 1: continue self._port.write(char) except KeyError: continue except KeyboardInterrupt: if fullmode: if self._terminal.IS_MSWIN: self._port.write(b'\x03') continue self._cleanup(True) def _cleanup(self, *args): """Cleanup resource before exiting""" if args and args[0]: print(f'{linesep}Aborting...') try: self._resume = False if self._port: # wait till the other thread completes sleep(0.5) try: rem = self._port.inWaiting() except IOError: # maybe a bug in underlying wrapper... rem = 0 # consumes all the received bytes for _ in range(rem): self._port.read() self._port.close() self._port = None print('Bye.') except Exception as ex: print(str(ex), file=stderr) finally: if self._terminal: self._terminal.reset() self._terminal = None @staticmethod def _open_port(device, baudrate, parity, rtscts, debug=False): """Open the serial communication port""" try: from serial.serialutil import SerialException from serial import PARITY_NONE except ImportError as exc: raise ImportError("Python serial module not installed") from exc try: from serial import serial_for_url, VERSION as serialver # use a simple regex rather than adding a new dependency on the # more complete 'packaging' module vmo = search(r'^(\d+)\.(\d+)', serialver) if not vmo: # unable to parse version raise ValueError() if tuple(int(x) for x in vmo.groups()) < (3, 0): # pysrial version is too old raise ValueError() except (ValueError, IndexError, ImportError) as exc: raise ImportError("pyserial 3.0+ is required") from exc # the following import enables serial protocol extensions if device.startswith('ftdi:'): try: from pyftdi import serialext serialext.touch() except ImportError as exc: raise ImportError("PyFTDI module not installed") from exc try: port = serial_for_url(device, baudrate=baudrate, parity=parity or PARITY_NONE, rtscts=rtscts, timeout=0) if not port.is_open: port.open() if not port.is_open: raise IOError(f"Cannot open port '{device}'") if debug: backend = port.BACKEND if hasattr(port, 'BACKEND') else '?' print(f"Using serial backend '{backend}'") return port except SerialException as exc: raise IOError(str(exc)) from exc def get_default_device() -> str: """Return the default comm device, depending on the host/OS.""" envdev = environ.get('FTDI_DEVICE', '') if envdev: return envdev if platform == 'win32': device = 'COM1' elif platform == 'darwin': device = '/dev/cu.usbserial' elif platform == 'linux': device = '/dev/ttyS0' else: device = '' try: stat(device) except OSError: device = 'ftdi:///1' return device def main(): """Main routine""" debug = False try: default_device = get_default_device() argparser = ArgumentParser(description=modules[__name__].__doc__) argparser.add_argument('-f', '--fullmode', dest='fullmode', action='store_true', help='use full terminal mode, exit with ' '[Ctrl]+B') argparser.add_argument('device', nargs='?', default=default_device, help=f'serial port device name ' f'(default: {default_device}') argparser.add_argument('-b', '--baudrate', default=str(MiniTerm.DEFAULT_BAUDRATE), help=f'serial port baudrate ' f'(default: {MiniTerm.DEFAULT_BAUDRATE})') argparser.add_argument('-w', '--hwflow', action='store_true', help='hardware flow control') argparser.add_argument('-e', '--localecho', action='store_true', help='local echo mode (print all typed chars)') argparser.add_argument('-r', '--crlf', action='count', default=0, help='prefix LF with CR char, use twice to ' 'replace all LF with CR chars') argparser.add_argument('-l', '--loopback', action='store_true', help='loopback mode (send back all received ' 'chars)') argparser.add_argument('-s', '--silent', action='store_true', help='silent mode') argparser.add_argument('-P', '--vidpid', action='append', help='specify a custom VID:PID device ID, ' 'may be repeated') argparser.add_argument('-V', '--virtual', type=FileType('r'), help='use a virtual device, specified as YaML') argparser.add_argument('-v', '--verbose', action='count', help='increase verbosity') argparser.add_argument('-d', '--debug', action='store_true', help='enable debug mode') args = argparser.parse_args() debug = args.debug if not args.device: argparser.error('Serial device not specified') loglevel = max(DEBUG, ERROR - (10 * (args.verbose or 0))) loglevel = min(ERROR, loglevel) if debug: formatter = Formatter('%(asctime)s.%(msecs)03d %(name)-20s ' '%(message)s', '%H:%M:%S') else: formatter = Formatter('%(message)s') FtdiLogger.set_formatter(formatter) FtdiLogger.set_level(loglevel) FtdiLogger.log.addHandler(StreamHandler(stderr)) if args.virtual: from pyftdi.usbtools import UsbTools # Force PyUSB to use PyFtdi test framework for USB backends UsbTools.BACKENDS = ('pyftdi.tests.backend.usbvirt', ) # Ensure the virtual backend can be found and is loaded backend = UsbTools.find_backend() loader = backend.create_loader()() loader.load(args.virtual) try: add_custom_devices(Ftdi, args.vidpid, force_hex=True) except ValueError as exc: argparser.error(str(exc)) miniterm = MiniTerm(device=args.device, baudrate=to_bps(args.baudrate), parity='N', rtscts=args.hwflow, debug=args.debug) miniterm.run(args.fullmode, args.loopback, args.silent, args.localecho, args.crlf) except (IOError, ValueError) as exc: print(f'\nError: {exc}', file=stderr) if debug: print(format_exc(chain=False), file=stderr) sys_exit(1) except KeyboardInterrupt: sys_exit(2) if __name__ == '__main__': main()