# -*- coding: utf-8 -*-
# :Project: LasarApp -- serial communication management
# :Created: Sun Mar 18 18:04:23 2012 +0100
# :Author: Stefan Hechenberger <stefan@nortd.com>
# :License: GNU General Public License version 3 or later
# :Copyright: © 2012-2016 Stefan Hechenberger <stefan@nortd.com> and others,
# see AUTHORS.txt
#
"""
Serial Manager
--------------
This module manages the communication between the `frontend` and the `ATmega`
that *speaks* only serial.
An instance of the `SerialManager` is istanciated by the main module logic.
Its primary functions are those of sending `GCODE` commands to te
microcontroller and read back status information about the just executed
operation and the various subsystems and sensors.
The main stuff is in `SerialManager.queue_gcode` which is the entry point
from the frontend logic that enqueues new commands to be sent.
The other interesting code is in `SerialManager.send_queue_as_ready` method
which polls the serial line for statuses and send new commands if there are any
waiting in the send queue.
The protocol employed uses two special control characters (`ASCII`'s ``DC2``
and ``DC4``, used as ``READY_CHAR`` and ``REQUEST_READY_CHAR`` in the code) to
control the flow on the wire. ``REQUEST_READY_CHAR`` is sent by the tx part of
`SerialManager.send_queue_as_ready` and then the next chunk of data (as much
as `SerialManager.TX_CHUNK_SIZE`) is sent as soon as a ``READY_CHAR`` is read
from the rx part of the same method. It's a bit like half an `XON/XOFF` flow
control with the logic reversed.
THE `GCODE` commands are sent as-is or an error detection byte or a so-called
error correction line is added to the bytes to be sent.
"""
import collections
import logging
import os
import time
import serial
from serial.tools import list_ports
log = logging.getLogger(__name__)
FEC_TYPES = collections.namedtuple(
'FecTypes',
['NONE', 'ERROR_DETECTION', 'ERROR_CORRECTION']
)(0, 1, 2)
"""Types of Forward Error Correction.
NONE
No error correction is applied or detected.
ERROR_DETECTION
A checksum is added to the sent lines so that the other endpoint can detect
RX errors.
ERROR_CORRECTION
Every sent line is doubled enabling some kind of error correction.
"""
SERIAL_MANAGER = None
[docs]class SerialManager:
"""Manages the serial communication with the `ATmega`"""
TX_CHUNK_SIZE = 16
"""This is the number of bytes to be written to the device in one
go. It needs to match the `firmware`.
"""
RX_CHUNK_SIZE = 16
"""Number of bytes read from the device in one go.
"""
LASAURGRBL_FIRST_STRING = b"LasaurGrbl"
"""String to find at connection initialization time, sent by the `ATmega`.
"""
READY_CHAR = b'\x12'
"""Character sent by the `ATmega` in response to the reception of a
`REQUEST_READY_CHAR`, it's like a pong in a ping-pong protocol.
"""
REQUEST_READY_CHAR = b'\x14'
"""Character sent by this code to the `ATmega` to be sure that the other other
side is still functional, it like a ping in a ping-pong protocol.
"""
def __init__(self):
self.device = None
self.rx_buffer = bytearray()
self.tx_buffer = bytearray()
self.tx_index = 0
self.nRequested = 0
# used for calculating percentage done
self.job_active = False
# status flags
self.status = {}
"""Dictionary member containing status information decoded by parsing the line
sent by the `ATmega`, see `process_status_line`.
"""
self.reset_status()
self.fec_redundancy = FEC_TYPES.ERROR_CORRECTION
"""Forward Error Detection.
By default enable error correction.
See `FEC_TYPES`
"""
self.last_request_ready = 0
def reset_status(self):
self.status = {
'ready': True, # turns True by querying status
'paused': False, # this is also a control flag
'buffer_overflow': False,
'transmission_error': False,
'bad_number_format_error': False,
'expected_command_letter_error': False,
'unsupported_statement_error': False,
'power_off': False,
'limit_hit': False,
'serial_stop_request': False,
'door_open': False,
'chiller_off': False,
'x': False,
'y': False,
'firmware_version': None
}
def list_devices(self, baudrate):
ports = []
if os.name == 'posix':
iterator = sorted(list_ports.grep('tty'))
log.debug("Found ports:")
for port, desc, hwid in iterator:
ports.append(port)
log.debug("%-20s", port)
log.debug(" desc: %s", desc)
log.debug(" hwid: %s", hwid)
else:
# scan for available ports. return a list of tuples (num, name)
available = []
for i in range(24):
try:
s = serial.Serial(port=i, baudrate=baudrate)
ports.append(s.portstr)
available.append( (i, s.portstr))
s.close()
except serial.SerialException:
pass
log.debug("Found ports:")
for n,s in available: log.debug("(%d) %s", n, s)
return ports
def match_device(self, search_regex, baudrate):
if os.name == 'posix':
matched_ports = list_ports.grep(search_regex)
if matched_ports:
for match_tuple in matched_ports:
if match_tuple:
return match_tuple[0]
log.debug("No serial port match for anything like: " + search_regex)
return None
else:
# windows hack because pyserial does not enumerate USB-style com ports
log.info("Trying to find Controller ...")
for i in range(24):
try:
s = serial.Serial(port=i, baudrate=baudrate, timeout=2.0)
lasaur_hello = s.read(32)
if lasaur_hello.find(self.LASAURGRBL_FIRST_STRING) > -1:
return s.portstr
s.close()
except serial.SerialException:
pass
return None
def connect(self, port, baudrate):
self.rx_buffer = bytearray()
self.tx_buffer = bytearray()
self.tx_index = 0
self.reset_status()
# Create serial device with both read timeout set to 0.
# This results in the read() being non-blocking
# Write on the other hand uses a large timeout but should not be blocking
# much because we ask it only to write TX_CHUNK_SIZE at a time.
# BUG WARNING: the pyserial write function does not report how
# many bytes were actually written if this is different from requested.
# Work around: use a big enough timeout and a small enough chunk size.
self.device = serial.Serial(port, baudrate, timeout=0, write_timeout=1)
log.debug('Connect: (%s) %r', bool(self.device), self.device)
def close(self):
if self.device:
try:
self.device.flushOutput()
self.device.flushInput()
self.device.close()
self.device = None
except:
self.device = None
self.status['ready'] = False
return True
else:
return False
def is_connected(self):
return bool(self.device)
def get_hardware_status(self):
if self.is_queue_empty():
# trigger a status report
# will update for the next status request
self.queue_gcode(b'?')
return self.status
def flush_input(self):
if self.device:
self.device.flushInput()
def flush_output(self):
if self.device:
self.device.flushOutput()
[docs] def queue_gcode(self, gcode):
"""Processes a group of `GCODE` instructions, add redundancy for error
detection and correction and queue them."""
if isinstance(gcode, str):
gcode = gcode.encode('ascii')
lines = gcode.split(b'\n')
log.debug("Adding to queue %s lines" % len(lines))
job_list = []
for line in lines:
line = line.strip()
if line == b'' or line[0] == b'%':
continue
if line[0] == b'!':
self.cancel_queue()
self.reset_status()
job_list.append(b'!')
else:
if line != b'?': # not ready unless just a ?-query
self.status['ready'] = False
if self.fec_redundancy > FEC_TYPES.NONE: # using error correction
# prepend marker and checksum
checksum = 0
for c in line:
if c > ord(b' ') and c != ord(b'~') and c != ord(b'!'): # ignore 32 and lower, ~, !
checksum += c
if checksum >= 128:
checksum -= 128
checksum = (checksum >> 1) + 128
line_redundant = bytearray()
if self.fec_redundancy == FEC_TYPES.ERROR_CORRECTION:
line_redundant += b'^' + bytes([checksum]) + line + b'\n'
line = line_redundant + b'*' + bytes([checksum]) + line
job_list.append(line)
gcode_processed = b'\n'.join(job_list) + b'\n'
self.tx_buffer += gcode_processed
self.job_active = True
[docs] def cancel_queue(self):
"""Removes all the instructions from the queue"""
self.tx_buffer = bytearray()
self.tx_index = 0
self.job_active = False
def is_queue_empty(self):
return self.tx_index >= len(self.tx_buffer)
def get_queue_percentage_done(self):
buflen = len(self.tx_buffer)
if buflen == 0:
return ""
return str(100 * self.tx_index / float(buflen))
def set_pause(self, flag):
# returns pause status
if self.is_queue_empty():
return False
else:
if flag: # pause
self.status['paused'] = True
return True
else: # unpause
self.status['paused'] = False
return False
[docs] def send_queue_as_ready(self):
"""This is the communication workhorse, it reads and sends return-terminated
lines via the serial interface. It gets polled from the main app code.
"""
if self.device and not self.status['paused']:
try:
### receiving
chars = self.device.read(self.RX_CHUNK_SIZE)
if len(chars) > 0:
## check for data request
if self.READY_CHAR in chars:
# print "=========================== READY"
self.nRequested = self.TX_CHUNK_SIZE
# remove control chars
chars = chars.replace(self.READY_CHAR, b'')
## assemble lines
self.rx_buffer += chars
while True: # process all lines in buffer
posNewline = self.rx_buffer.find(b'\n')
if posNewline == -1:
break # no more complete lines
else: # we got a line
line = self.rx_buffer[:posNewline]
self.rx_buffer = self.rx_buffer[posNewline + 1:]
log.debug("RX < DATA: %s" % line.decode('ascii'))
self.process_status_line(line)
else:
if self.nRequested == 0:
time.sleep(0.001) # no rx/tx, rest a bit
### sending
if self.tx_index < len(self.tx_buffer):
if self.nRequested > 0:
try:
t_prewrite = time.time()
actuallySent = self.device.write(
self.tx_buffer[self.tx_index:self.tx_index +
self.nRequested])
log.debug("TX > DATA: %s",
self.tx_buffer[self.tx_index:self.tx_index +
actuallySent].decode(
'ascii',
errors='backslashreplace')
)
if time.time()-t_prewrite > 0.02:
log.warn("TX > DATA: Delay ")
except serial.SerialTimeoutException:
# skip, report
actuallySent = 0 # assume nothing has been sent
log.exception("TX > DATA: Timeout!")
self.tx_index += actuallySent
self.nRequested -= actuallySent
if self.nRequested <= 0:
self.last_request_ready = 0 # make sure to request ready
elif self.tx_buffer[self.tx_index] in [b'!', b'~']: # send
# control chars no matter what
try:
t_prewrite = time.time()
actuallySent = self.device.write(self.tx_buffer[self.tx_index])
if time.time() - t_prewrite > 0.02:
log.warn("TX > CONTROL_CHAR: Delay ")
except serial.SerialTimeoutException:
actuallySent = 0 # assume nothing has been sent
log.exception("TX > CONTROL_CHAR: Timeout!")
self.tx_index += actuallySent
else:
if (time.time() - self.last_request_ready) > 2.0:
# ask to send a ready byte
# only ask for this when sending is on hold
# only ask once (and after a big time out)
# print "=========================== REQUEST READY"
try:
t_prewrite = time.time()
actuallySent = self.device.write(self.REQUEST_READY_CHAR)
if time.time() - t_prewrite > 0.02:
log.warn("TX > REQUEST_READY: Delay ")
except serial.SerialTimeoutException:
# skip, report
actuallySent = self.nRequested # pyserial
# does not report this sufficiently
log.exception("TX > REQUEST_READY: Timeout!")
if actuallySent == 1:
self.last_request_ready = time.time()
else:
if self.job_active:
# print "\nG-code stream finished!"
# print "(LasaurGrbl may take some extra time to finalize)"
self.tx_buffer = bytearray()
self.tx_index = 0
self.job_active = False
# ready whenever a job is done, including a status
# request via '?'
self.status['ready'] = True
except OSError:
# Serial port appears closed => reset
log.exception('Error in sqar()')
self.close()
except ValueError:
# Serial port appears closed => reset
log.exception('Error in sqar()')
self.close()
else:
# serial disconnected
self.status['ready'] = False
[docs] def process_status_line(self, line):
"""Process a line read from the serial interface and transform single byte
status reports into flags inside the status mapping.
"""
if b'#' in line[:3]:
# print and ignore
log.debug('Status: ignored %s', line.decode('ascii'))
elif b'^' in line:
log.debug("Status: FEC Correction")
else:
if b'!' in line:
# in stop mode
self.cancel_queue()
# not ready whenever in stop mode
self.status['ready'] = False
log.info('Status: stop')
else:
log.info("Status: run")
if b'N' in line:
self.status['bad_number_format_error'] = True
if b'E' in line:
self.status['expected_command_letter_error'] = True
if b'U' in line:
self.status['unsupported_statement_error'] = True
if b'B' in line: # Stop: Buffer Overflow
self.status['buffer_overflow'] = True
else:
self.status['buffer_overflow'] = False
if b'T' in line: # Stop: Transmission Error
self.status['transmission_error'] = True
else:
self.status['transmission_error'] = False
if b'P' in line: # Stop: Power is off
self.status['power_off'] = True
else:
self.status['power_off'] = False
if b'L' in line: # Stop: A limit was hit
self.status['limit_hit'] = True
else:
self.status['limit_hit'] = False
if b'R' in line: # Stop: by serial requested
self.status['serial_stop_request'] = True
else:
self.status['serial_stop_request'] = False
if b'D' in line: # Warning: Door Open
self.status['door_open'] = True
else:
self.status['door_open'] = False
if b'C' in line: # Warning: Chiller Off
self.status['chiller_off'] = True
else:
self.status['chiller_off'] = False
if b'X' in line:
self.status['x'] = line[line.find(b'X') + 1:line.find(b'Y')].decode('utf-8')
# else:
# self.status['x'] = False
if b'Y' in line:
self.status['y'] = line[line.find(b'Y') + 1:line.find(b'V')].decode('utf-8')
# else:
# self.status['y'] = False
if b'V' in line:
self.status['firmware_version'] = line[line.find(b'V') + 1:].decode('utf-8')
def get_serial_manager():
global SERIAL_MANAGER
if not SERIAL_MANAGER:
SERIAL_MANAGER = SerialManager()
return SERIAL_MANAGER