Source code for bricknil.messages
# Copyright 2019 Virantha N. Ekanayake
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Message parsers for each message type
"""
import struct, logging
from .const import DEVICES
logger = logging.getLogger(__name__)
[docs]class Message:
"""Base class for each message parser.
This class instance keeps track of each subclass and stores an object of
each subclass in the attribue `parsers`. None of these subclass instances
should ever store any instance data since these are shared across multiple
hubs.
Attributes:
parsers (dict) : msg_type (int) -> Message parser
msg_type(int) : msg_type of each subclassed message
"""
parsers = {}
def __init_subclass__(cls):
"""Register message subclasses"""
logger.debug(f"registering {cls}")
assert cls.msg_type not in Message.parsers, f'Duplicate Message parser type {cls.msg_type} found in code!'
Message.parsers[cls.msg_type] = cls()
def _parse_msg_bytes(self, msg_bytes):
hex_bytes = ':'.join(hex(c) for c in msg_bytes)
return hex_bytes
[docs] def parse(self, msg_bytes, l, dispatcher):
"""Implement this handle parsing of each message body type.
Args:
msg_bytes (bytearray): Message body
l (list): text description of what's being parsed for logging (just append details as you go along)
dispatcher (:class:`bricknil.message_dispatch.MessageDispatch`): The dispatch object that is sending messages.
Call back into its methods to send messages back to the hub.
"""
pass
[docs]class PortValueMessage(Message):
"""Single value update from a sensor
"""
msg_type = 0x45
[docs] def parse(self, msg_bytes, l, dispatcher):
port = msg_bytes.pop(0)
dispatcher.message_update_value_to_peripheral(port, msg_bytes)
l.append(f'Port {port} changed value to {msg_bytes}')
[docs]class PortComboValueMessage(Message):
"""Multiple (combination) value updates from different modes of a sensor
"""
msg_type = 0x46
[docs] def parse(self, msg_bytes, l, dispatcher):
port = msg_bytes.pop(0)
dispatcher.message_update_value_to_peripheral(port, msg_bytes)
l.append(f'Port {port} changed combo value to {msg_bytes}')
[docs]class HubPropertiesMessage(Message):
"""Used to get data on the hub as well as button press information on the hub
"""
msg_type = 0x01
prop_names = { 0x01: 'Advertising Name',
0x02: 'Button',
0x03: 'FW Version',
0x04: 'HW Version',
0x05: 'RSSI',
0x06: 'Battery Voltage',
0x07: 'Battery Type',
0x08: 'Manufacturer Name',
0x09: 'Radio FW Version',
0x0A: 'LEGO Wireles Protocol Version',
0x0B: 'System Type ID',
0x0C: 'HW Network ID',
0x0D: 'Primary MAC address',
0x0E: 'Seconary MAC address',
0X0F: 'HW Network Family',
}
operation_names = { 0x01: 'Set (downstream)',
0x02: 'Enable Updates (Downstream)',
0x03: 'Disable Updates (Downstream)',
0x04: 'Reset (Downstream)',
0x05: 'Request Update (Downstream)',
0x06: 'Update (Upstream)',
}
[docs] def parse(self, msg_bytes, l, dispatcher):
l.append('Hub property: ')
prop = msg_bytes.pop(0)
if prop not in self.prop_names:
raise UnknownMessageError
l.append(self.prop_names[prop])
op = msg_bytes.pop(0)
if op not in self.operation_names:
raise UnknownMessageError
l.append(self.operation_names[op])
# Now, just append the number
l.append(self._parse_msg_bytes(msg_bytes))
# Now forward any button presses as if it were a "port value" change
if prop==0x02 and op == 0x06: # Button and update op
msg_bytes.insert(0, 0xFF) # Insert Dummy port value of 255
Message.parsers[PortValueMessage.msg_type].parse(msg_bytes, l, dispatcher)
[docs]class PortInformationMessage(Message):
"""Information on what modes are supported on a port and whether a port
is input/output.
"""
msg_type = 0x43
def _parse_mode_info(self, msg_bytes, l, port_info):
l.append(' INFO:')
capabilities = msg_bytes.pop(0)
bitmask = ['output', 'input', 'combinable', 'synchronizable']
for i, attr in enumerate(bitmask):
port_info[attr] = capabilities & 1<<i
if port_info[attr]: l.append(attr[:3])
def _parse_mode_info_input_output(self, msg_bytes, l, modes_info):
input_modes = msg_bytes.pop(0) + msg_bytes.pop(0)*256
output_modes = msg_bytes.pop(0) + msg_bytes.pop(0)*256
for i in range(16):
if input_modes & (1<<i):
l.append(i)
mode_info = modes_info.setdefault(i, {})
mode_info['input'] = True
l.append(', output: ')
for i in range(16):
if output_modes & (1<<i):
l.append(i)
mode_info = modes_info.setdefault(i, {})
mode_info['output'] = True
def _parse_combination_info(self, msg_bytes, l, port_info):
port_info['mode_combinations'] = []
mode_combo = msg_bytes.pop(0) + msg_bytes.pop(0)*256
l.append('Combinations:')
while mode_combo != 0:
cmodes = []
for i in range(16):
if mode_combo & (1<<i):
cmodes.append(i)
l.append('+'.join([f'Mode {m}' for m in cmodes]))
port_info['mode_combinations'].append(cmodes)
if len(msg_bytes) == 0:
mode_combo = 0
else:
mode_combo = msg_bytes.pop(0) + msg_bytes.pop(0)*256
l.append(', ')
[docs] def parse(self, msg_bytes, l, dispatcher):
port = msg_bytes.pop(0)
mode = msg_bytes.pop(0)
l.append(f'Port {port} Mode {mode}:')
port_info = dispatcher.port_info.setdefault(port, {})
modes_info = port_info.setdefault('modes', {})
if mode == 0x01: # MODE INFO
self._parse_mode_info(msg_bytes, l, port_info)
nModes = msg_bytes.pop(0)
l.append(f'nModes:{nModes}, input:')
self._parse_mode_info_input_output(msg_bytes, l, modes_info)
dispatcher.message_port_info_to_peripheral(port, 'port_info_received')
elif mode == 0x02: # Combination info
self._parse_combination_info(msg_bytes, l, port_info)
dispatcher.message_port_info_to_peripheral(port, 'port_combination_info_received')
else:
raise UnknownMessageError
[docs]class PortOutputFeedbackMessage(Message):
"""Ack messages/error messages sent in response to a command being issued to the hub
"""
msg_type = 0x82
[docs] def parse(self, msg_bytes, l, dispatcher):
port = msg_bytes.pop(0)
l.append(f'Command feedback: Port {port}')
feedback = msg_bytes.pop(0)
if feedback & 1:
l.append('Buffer empty, Command in progess')
if feedback & 2:
l.append('Buffer empty, Command completed')
if feedback & 8:
l.append(': Idle ')
if feedback & 4:
l.append(': Command discarded')
if feedback & 16:
l.append(': Busy/Full')
[docs]class PortModeInformationMessage(Message):
"""Information on a specific mode
This tells us a mode's name, what numeric format it uses, and it's range.
"""
msg_type = 0x44
[docs] def parse(self, msg_bytes, l, dispatcher):
port = msg_bytes.pop(0)
mode = msg_bytes.pop(0)
mode_type = msg_bytes.pop(0)
port_info = dispatcher.port_info.setdefault(port, {})
modes_info = port_info.setdefault('modes', {})
mode_info = modes_info.setdefault(mode, {})
l.append(f'MODE INFO Port:{port} Mode:{mode}')
mode_types = { 0: self._parse_name,
0x1: self._parse_raw_range,
0x2: self._parse_pct_range,
0x3: self._parse_si_range,
0x4: self._parse_symbol,
0x5: self._parse_mapping,
0x80: self._parse_format,
}
if mode_type in mode_types:
mode_types[mode_type](msg_bytes, l, mode_info)
else:
raise UnknownMessageError
dispatcher.message_port_info_to_peripheral(port, 'port_mode_info_received')
def _parse_format(self, msg_bytes, l, mode_info):
# 4 bytes
# [0] = Number of datasets (e.g. RBG has 3 for each color)
# [1] = Dataset type. 00-byte, 01=16b, 10=32b, 11=float
# [2] = Total figures
# [3] = Decimals if any
mode_info['datasets'] = msg_bytes.pop(0)
dataset_types = ['8b', '16b', '32b', 'float']
mode_info['dataset_type'] = dataset_types[msg_bytes.pop(0)]
mode_info['dataset_total_figures'] = msg_bytes.pop(0)
mode_info['dataset_decimals'] = msg_bytes.pop(0)
def _parse_mapping(self, msg_bytes, l, mode_info):
l.append('Input Mapping:')
bits = ['NA', 'NA', 'Discrete', 'Relative', 'Absolute', 'NA', 'Supports Functional Mapping 2.0}', 'Supports NULL']
# First byte is bit-mask of input details
mask = msg_bytes[0]
maps = [ bits[i] for i in range(8) if (mask>>i) & 1]
l.append(','.join(maps))
mode_info['input_mapping'] = maps
l.append('Output Mapping:')
mask = msg_bytes[1]
maps = [ bits[i] for i in range(8) if (mask>>i)&1]
l.append(','.join(maps))
mode_info['output_mapping'] = maps
def _parse_symbol(self, msg_bytes, l, mode_info):
l.append('Symbol:')
symbol = ''.join( [chr(b) for b in msg_bytes if b!=0])
l.append(symbol)
mode_info['symbol'] =symbol
def _unpack_float(self, b):
return struct.unpack('<f', bytearray(b[0:4]))
def _parse_si_range(self, msg_bytes, l, mode_info):
l.append('SI range:')
mn = self._unpack_float(msg_bytes[0:4])[0]
mx = self._unpack_float(msg_bytes[4:])[0]
l.append(f'{mn} to {mx}')
mode_info['si_range'] = (mn, mx)
def _parse_pct_range(self, msg_bytes, l, mode_info):
l.append('Pct range:')
b_array = bytearray(msg_bytes)
pct_min = struct.unpack('<f', b_array[0:4])[0]
pct_max = struct.unpack('<f', b_array[4:])[0]
l.append(f'{pct_min} to {pct_max}')
mode_info['pct_range'] = (pct_min, pct_max)
def _parse_raw_range(self, msg_bytes, l, mode_info):
l.append('Raw range:')
b_array = bytearray(msg_bytes)
raw_min = struct.unpack('<f', b_array[0:4])[0]
raw_max = struct.unpack('<f', b_array[4:])[0]
l.append(f'{raw_min} to {raw_max}')
mode_info['raw_range'] = (raw_min, raw_max)
def _parse_name(self, msg_bytes, l, mode_info):
l.append('Name:')
name = ''.join( [chr(b) for b in msg_bytes if b!=0])
l.append(name)
mode_info['name'] = name
[docs]class AttachedIOMessage(Message):
"""Peripheral attach and detach message
"""
msg_type = 0x04
[docs] def parse(self, msg_bytes, l, dispatcher):
# 5-bytes = detached
# 15 bytes = attached
# 9 bytes = virtual attached
# Subtract 3 bytes for what we've already popped off
port = msg_bytes.pop(0)
event = msg_bytes.pop(0)
detach, attach, virtual_attach = [event==x for x in range(3)]
if detach:
l.append(f'Detached IO Port:{port}')
return
elif attach:
l.append(f'Attached IO Port:{port}')
elif virtual_attach:
l.append(f'Attached VirtualIO Port:{port}')
if attach or virtual_attach:
# Next two bytes (little-endian) is the device number (MSB is not used)
device_id = msg_bytes.pop(0)
assert device_id in DEVICES, f'Unknown device with id {device_id} being attached (port {port}'
device_name = DEVICES[device_id]
self._add_port_info(dispatcher,port, 'id', device_id)
self._add_port_info(dispatcher,port, 'name', device_name)
msg_bytes.pop(0) # pop off MSB that's always 0
l.append(f'{device_name}')
# register the handler for this IO
dispatcher.message_attach_to_hub(device_name, port)
if attach:
for ver_type in ['HW', 'SW']:
# NExt few bytes are fw versions
build0 = hex(msg_bytes.pop(0))
build1 = hex(msg_bytes.pop(0))
bugfix = hex(msg_bytes.pop(0))
ver = hex(msg_bytes.pop(0))
l.append(f'{ver_type}:{ver}.{bugfix}.{build1}{build0}')
if virtual_attach:
assert len(msg_bytes) == 2
port0, port1 = msg_bytes
l.append(f'Port A: {port0}, Port B: {port1}')
self._add_port_info(dispatcher, port, 'virtual', (port0, port1))
def _add_port_info(self, dispatcher, port, info_key, info_val):
port_info_item = dispatcher.port_info.get(port, {})
port_info_item[info_key] = info_val
dispatcher.port_info[port] = port_info_item
if __name__ == '__main__':
from mock import MagicMock
hub = MagicMock()
dis = MessageDispatch(hub)
dis.port_to_peripheral[1] = MagicMock()
msg = bytearray([0,0,0x45,1,9,9,9])
l = dis.parse(msg)
print(l)
msg = bytearray([0,0,0x46,1,9,9,9])
l = dis.parse(msg)
print(l)