Source code for bricknil.sensor.motor

# Copyright 2019 Virantha N. Ekanayake 
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
# http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""All motor related peripherals including base motor classes"""

from curio import sleep, current_task, spawn  # Needed for motor speed ramp

from enum import Enum
from struct import pack

from .peripheral import Peripheral

[docs]class Motor(Peripheral): """Utility class for common functions shared between Train Motors, Internal Motors, and External Motors """ def __init__(self, name, port=None, capabilities=[]): self.speed = 0 # Initialize current speed to 0 self.ramp_in_progress_task = None super().__init__(name, port, capabilities)
[docs] async def set_speed(self, speed): """ Validate and set the train speed If there is an in-progress ramp, and this command is not part of that ramp, then cancel that in-progress ramp first, before issuing this set_speed command. Args: speed (int) : Range -100 to 100 where negative numbers are reverse. Use 0 to put the motor into neutral. 255 will do a hard brake """ await self._cancel_existing_differet_ramp() self.speed = speed self.message_info(f'Setting speed to {speed}') await self.set_output(0, self._convert_speed_to_val(speed))
async def _cancel_existing_differet_ramp(self): """Cancel the existing speed ramp if it was from a different task Remember that speed ramps must be a task with daemon=True, so there is no one awaiting its future. """ # Check if there's a ramp task in progress if self.ramp_in_progress_task: # Check if it's this current task or not current = await current_task() if current != self.ramp_in_progress_task: # We're trying to set the speed # outside a previously in-progress ramp, so cancel the previous ramp await self.ramp_in_progress_task.cancel() self.ramp_in_progress_task = None self.message_debug(f'Canceling previous speed ramp in progress')
[docs] async def ramp_speed(self, target_speed, ramp_time_ms): """Ramp the speed by 10 units in the time given in milliseconds """ TIME_STEP_MS = 100 await self._cancel_existing_differet_ramp() assert ramp_time_ms > 100, f'Ramp speed time must be greater than 100ms ({ramp_time_ms}ms used)' # 500ms ramp time, 100ms per step # Therefore, number of steps = 500/100 = 5 # Therefore speed_step = speed_diff/5 number_of_steps = ramp_time_ms/TIME_STEP_MS speed_diff = target_speed - self.speed speed_step = speed_diff/number_of_steps start_speed = self.speed self.message_debug(f'ramp_speed steps: {number_of_steps}, speed_diff: {speed_diff}, speed_step: {speed_step}') current_step = 0 async def _ramp_speed(): nonlocal current_step # Since this is being assigned to, we need to mark it as coming from the enclosed scope while current_step < number_of_steps: next_speed = int(start_speed + current_step*speed_step) self.message_debug(f'Setting next_speed: {next_speed}') current_step +=1 if current_step == number_of_steps: next_speed = target_speed await self.set_speed(next_speed) await sleep(TIME_STEP_MS/1000) await self.set_speed(target_speed) self.ramp_in_progress_task = None self.message_debug(f'Starting ramp of speed: {start_speed} -> {target_speed} ({ramp_time_ms/1000}s)') self.ramp_in_progress_task = await spawn(_ramp_speed, daemon = True)
[docs]class TachoMotor(Motor): capability = Enum("capability", {"sense_speed":1, "sense_pos":2}) datasets = { capability.sense_speed: (1, 1), capability.sense_pos: (1, 4), } """ Dict of (num_datasets, bytes_per_dataset). `sense_speed` (1-byte), and `sense_pos` (uint32)""" allowed_combo = [ capability.sense_speed, capability.sense_pos, ]
[docs] async def set_pos(self, pos, speed=50, max_power=50): """Set the absolute position of the motor Everytime the hub is powered up, the zero-angle reference will be reset to the motor's current position. When you issue this command, the motor will rotate to the position given in degrees. The sign of the pos tells you which direction to rotate: (1) a positive number will rotate clockwise as looking from end of shaft towards the motor, (2) a negative number will rotate counter-clockwise Examples:: await self.motor.set_pos(90) # Rotate 90 degrees clockwise (looking from end of shaft towards motor) await self.motor.set_pos(-90) # Rotate conter-clockwise 90 degrees await self.motor.set_pos(720) # Rotate two full circles clockwise Args: pos (int) : Absolute position in degrees. speed (int) : Absolute value from 0-100 max_power (int): Max percentage power that will be applied (0-100%) Notes: Use command GotoAbsolutePosition * 0x00 = hub id * 0x81 = Port Output command * port * 0x11 = Upper nibble (0=buffer, 1=immediate execution), Lower nibble (0=No ack, 1=command feedback) * 0x0d = Subcommand * abs_pos (int32) * speed -100 - 100 * max_power abs(0-100%) * endstate = 0 (float), 126 (hold), 127 (brake) * Use Accel profile = (bit 0 = acc profile, bit 1 = decc profile) * """ abs_pos = list(pack('i', pos)) speed = self._convert_speed_to_val(speed) b = [0x00, 0x81, self.port, 0x01, 0x0d] + abs_pos + [speed, max_power, 126, 3] await self.send_message(f'set pos {pos} with speed {speed}', b)
[docs] async def rotate(self, degrees, speed, max_power=50): """Rotate the given number of degrees from current position, with direction given by sign of speed Examples:: await self.motor.rotate(90, speed=50) # Rotate 90 degrees clockwise (looking from end of shaft towards motor) await self.motor.set_pos(90, speed=-50) # Rotate conter-clockwise 90 degrees await self.motor.set_pos(720, speed=50) # Rotate two full circles clockwise Args: degrees (uint) : Relative number of degrees to rotate speed (int) : -100 to 100 max_power (int): Max percentage power that will be applied (0-100%) Notes: Use command StartSpeedForDegrees * 0x00 = hub id * 0x81 = Port Output command * port * 0x11 = Upper nibble (0=buffer, 1=immediate execution), Lower nibble (0=No ack, 1=command feedback) * 0x0b = Subcommand * degrees (int32) 0..1000000 * speed -100 - 100% * max_power abs(0-100%) * endstate = 0 (float), 126 (hold), 127 (brake) * Use Accel profile = (bit 0 = acc profile, bit 1 = decc profile) * """ degrees = list(pack('i', degrees)) speed = self._convert_speed_to_val(speed) b = [0x00, 0x81, self.port, 0x01, 0x0b] + degrees + [speed, max_power, 126, 3] await self.send_message(f'rotate {degrees} deg with speed {speed}', b)
[docs] async def ramp_speed2(self, target_speed, ramp_time_ms): # pragma: no cover """Experimental function, not implemented yet DO NOT USE """ # Set acceleration profile delta_speed = target_speed - self.speed zero_100_ramp_time_ms = int(ramp_time_ms/delta_speed * 100.0) zero_100_ramp_time_ms = zero_100_ramp_time_ms % 10000 # limit time to 10s hi = (zero_100_ramp_time_ms >> 8) & 255 lo = zero_100_ramp_time_ms & 255 profile = 1 b = [0x00, 0x81, self.port, 0x01, 0x05, 10, 10, profile] await self.send_message(f'set accel profile {zero_100_ramp_time_ms} {hi} {lo} ', b) b = [0x00, 0x81, self.port, 0x01, 0x07, self._convert_speed_to_val(target_speed), 80, 1] await self.send_message('set speed', b)
[docs]class InternalMotor(TachoMotor): """ Access the internal motor(s) in the Boost Move Hub. Unlike the train motors, these motors (as well as the stand-alone Boost motors :class:`ExternalMotor`) have a built-in sensor/tachometer for sending back the motor's current speed and position. However, you don't need to use the sensors, and can treat this motor strictly as an output device. Examples:: # Basic connection to the motor on Port A @attach(InternalMotor, name='left_motor', port=InternalMotor.Port.A) # Basic connection to both motors at the same time (virtual I/O port). # Any speed command will cause both motors to rotate at the same speed @attach(InternalMotor, name='motors', port=InternalMotor.Port.AB) # Report back when motor speed changes. You must have a motor_change method defined @attach(InternalMotor, name='motor', port=InternalMotor.Port.A, capabilities=['sense_speed']) # Only report back when speed change exceeds 5 units @attach(InternalMotor, name='motors', port=InternalMotor.Port.A, capabilities=[('sense_speed', 5)]) And within the run body you can control the motor output:: await self.motor.set_speed(50) # Setting the speed await self.motor.ramp_speed(80, 2000) # Ramp speed to 80 over 2 seconds await self.motor.set_pos(90, speed=20) # Turn clockwise to 3 o'clock position await self.motor.rotate(60, speed=-50) # Turn 60 degrees counter-clockwise from current position See Also: * :class:`TrainMotor` for connecting to a train motor * :class:`ExternalMotor` for connecting to a Boost tacho motor """ _sensor_id = 0x0027 _DEFAULT_THRESHOLD=2 """Set to 2 to avoid a lot of updates since the speed seems to oscillate a lot""" Port = Enum('Port', 'A B AB', start=0) """Address either motor A or Motor B, or both AB at the same time""" def __init__(self, name, port=None, capabilities=[]): """Maps the port names `A`, `B`, `AB` to hard-coded port numbers""" if port: port_map = [55, 56, 57] port = port_map[port.value] self.speed = 0 super().__init__(name, port, capabilities)
[docs]class ExternalMotor(TachoMotor): """ Access the stand-alone Boost motors These are similar to the :class:`InternalMotor` with build-in tachometer and sensor for sending back the motor's current speed and position. You don't need to use the sensors, and can treat this as strictly an output. Examples:: # Basic connection to the motor on Port A @attach(ExternalMotor, name='motor') # Report back when motor speed changes. You must have a motor_change method defined @attach(ExternalMotor, name='motor', capabilities=['sense_speed']) # Only report back when speed change exceeds 5 units, and position changes (degrees) @attach(ExternalMotor, name='motor', capabilities=[('sense_speed', 5), 'sense_pos']) And then within the run body:: await self.motor.set_speed(50) # Setting the speed await self.motor.ramp_speed(80, 2000) # Ramp speed to 80 over 2 seconds await self.motor.set_pos(90, speed=20) # Turn clockwise to 3 o'clock position await self.motor.rotate(60, speed=-50) # Turn 60 degrees counter-clockwise from current position See Also: * :class:`TrainMotor` for connecting to a train motor * :class:`InternalMotor` for connecting to the Boost hub built-in motors """ _sensor_id = 0x26
[docs]class CPlusLargeMotor(TachoMotor): """ Access the Technic Control Plus Large motors These are similar to the :class:`InternalMotor` with build-in tachometer and sensor for sending back the motor's current speed and position. You don't need to use the sensors, and can treat this as strictly an output. Examples:: # Basic connection to the motor on Port A @attach(CPlusLargeMotor, name='motor') # Report back when motor speed changes. You must have a motor_change method defined @attach(CPlusLargeMotor, name='motor', capabilities=['sense_speed']) # Only report back when speed change exceeds 5 units, and position changes (degrees) @attach(CPlusLargeMotor, name='motor', capabilities=[('sense_speed', 5), 'sense_pos']) And then within the run body:: await self.motor.set_speed(50) # Setting the speed await self.motor.ramp_speed(80, 2000) # Ramp speed to 80 over 2 seconds await self.motor.set_pos(90, speed=20) # Turn clockwise to 3 o'clock position await self.motor.rotate(60, speed=-50) # Turn 60 degrees counter-clockwise from current position See Also: * :class:`TrainMotor` for connecting to a train motor * :class:`InternalMotor` for connecting to the Boost hub built-in motors """ _sensor_id = 0x2E
[docs]class CPlusXLMotor(TachoMotor): """ Access the Technic Control Plus XL motors These are similar to the :class:`InternalMotor` with build-in tachometer and sensor for sending back the motor's current speed and position. You don't need to use the sensors, and can treat this as strictly an output. Examples:: # Basic connection to the motor on Port A @attach(CPlusXLMotor, name='motor') # Report back when motor speed changes. You must have a motor_change method defined @attach(CPlusXLMotor, name='motor', capabilities=['sense_speed']) # Only report back when speed change exceeds 5 units, and position changes (degrees) @attach(CPlusXLMotor, name='motor', capabilities=[('sense_speed', 5), 'sense_pos']) And then within the run body:: await self.motor.set_speed(50) # Setting the speed await self.motor.ramp_speed(80, 2000) # Ramp speed to 80 over 2 seconds await self.motor.set_pos(90, speed=20) # Turn clockwise to 3 o'clock position await self.motor.rotate(60, speed=-50) # Turn 60 degrees counter-clockwise from current position See Also: * :class:`TrainMotor` for connecting to a train motor * :class:`InternalMotor` for connecting to the Boost hub built-in motors """ _sensor_id = 0x2F
[docs]class TrainMotor(Motor): """ Connects to the train motors. TrainMotor has no sensing capabilities and only supports a single output mode that sets the speed. Examples:: @attach(TrainMotor, name='train') And then within the run body, use:: await self.train.set_speed(speed) Attributes: speed (int) : Keep track of the current speed in order to ramp it See Also: :class:`InternalMotor` """ _sensor_id = 0x0002
[docs]class WedoMotor(Motor): """ Connects to the Wedo motors. WedoMotor has no sensing capabilities and only supports a single output mode that sets the speed. Examples:: @attach(WedoMotor, name='motor') And then within the run body, use:: await self.motor.set_speed(speed) Attributes: speed (int) : Keep track of the current speed in order to ramp it See Also: * :class:`InternalMotor` * :class:`TrainMotor` """ _sensor_id = 0x0001
[docs]class DuploTrainMotor(Motor): """Train Motor on Duplo Trains Make sure that the train is sitting on the ground (the front wheels need to keep rotating) in order to keep the train motor powered. If you pick up the train, the motor will stop operating withina few seconds. Examples:: @attach(DuploTrainMotor, name='motor') And then within the run body, use:: await self.train.set_speed(speed) Attributes: speed (int): Keep track of the current speed in order to ramp it See Also: :class:`TrainMotor` for connecting to a PoweredUp train motor """ _sensor_id = 0x0029