Source code for bricknil.message_dispatch
"""Parse incoming BLE Lego messages from hubs
Each hub has one of these objects to control access to the underlying BLE library notification thread.
Communication back into the hub (running in python async-land) is through a :class:`curio.UniversalQueue`
object.
Todo:
* The message parsers need to handle detaching of peripherals
"""
import struct, logging
from .const import DEVICES
from .messages import Message, UnknownMessageError
logger = logging.getLogger(__name__)
[docs]class MessageDispatch:
"""Parse messages (bytearray)
Once the :meth:`parse` method is called, the message header will be parsed, and based on the msg_type
byte, the processing of the message body will be dispatched to the `parse` method of the matching Message body parser.
Message body parsers are subclasses of :class:`bricknil.messages.Message`, and will call back
to the `message*` methods below. This object will then send a message to the connected :class:`bricknil.hub.Hub`
object.
"""
def __init__(self, hub):
"""
Args:
hub (:class:`bricknil.hub.Hub`) : The hub that will be sending messages
Attributes:
port_info (dict): A mirror copy of the :py:attr:`bricknil.hub.Hub.port_info` object. This object is sent every time
an update on the port meta data is made.
"""
self.hub = hub
self.port_info = {}
[docs] def parse(self, msg:bytearray):
"""Parse the header of the message and dispatch message body processing
`l` is only used to build up a log message to display during operation, telling
the user what kind of message was received and how it was parsed. If the message
cannot be parsed, then `l` contains the remaining unparsed raw message that was received from the
hardware ble device.
"""
msg_bytes = list(msg)
msg_bytes = msg_bytes[2:] # Skip the first two bytes (msg length and hub id (always 0) )
msg_type = msg_bytes.pop(0)
l = [] # keep track of the parsed return message
try:
if msg_type in Message.parsers:
Message.parsers[msg_type].parse(msg_bytes, l, self)
else:
raise UnknownMessageError
except UnknownMessageError:
l.append(self._parse_msg_bytes(msg))
return ' '.join([str(x) for x in l])
def _parse_msg_bytes(self, msg_bytes):
hex_bytes = ':'.join(hex(c) for c in msg_bytes)
return hex_bytes
[docs] def message_update_value_to_peripheral(self, port, value):
"""Called whenever a peripheral on the hub reports a change in its sensed value
"""
self.hub.peripheral_queue.put( ('value_change', (port, value)) )
[docs] def message_port_info_to_peripheral(self, port, message):
"""Called whenever a peripheral needs to update its meta-data
"""
self.hub.peripheral_queue.put( ('update_port', (port, self.port_info[port])) )
self.hub.peripheral_queue.put( (message, port) )
[docs] def message_attach_to_hub(self, device_name, port):
"""Called whenever a peripheral is attached to the hub
"""
# Now, we should activate updates from this sensor
self.hub.peripheral_queue.put( ('attach', (port, device_name)) )
# Send a message to update the information on this port
self.hub.peripheral_queue.put( ('update_port', (port, self.port_info[port])) )
# Send a message saying this port is detected, in case the hub
# wants to query for more properties. (Since an attach message
# doesn't do anything if the user hasn't @attach'ed a peripheral to it)
self.hub.peripheral_queue.put( ('port_detected', port))