#!/usr/bin/python3 -B # Copyright 2021 Josh Pieper, jjp@pobox.com. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ This example commands a single servo at ID #1 using the default transport to hold the current position indefinitely, and prints the state of the servo to the console. """ import asyncio import math import signal import sys from typing import Callable import moteus import logging class BLDCMotor: """A BLDC Motor""" def __init__(self) -> None: self.controller = moteus.Controller() async def set_stop(self, *args, **kwargs): """Stops the BLDC Motor""" await self.controller.set_stop(*args, **kwargs) async def set_position(self, *args, **kwargs): state = await self.controller.set_position(*args, **kwargs) return state def create_async_signal_handler(coro_func: Callable[[], None]) -> Callable[[signal.Signals], None]: async def _async_signal_handler(signal_type: signal.Signals) -> None: logging.info(f"Received exit signal {signal.name}...") await coro_func() tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] [task.cancel() for task in tasks] logging.info(f"Cancelling {len(tasks)} outstanding tasks") await asyncio.gather(*tasks) loop = asyncio.get_event_loop() loop.stop() loop.close() def _signal_handler(signal_type: signal.Signals, loop: asyncio.AbstractEventLoop) -> None: print(f"Signal {signal_type.name} received.") asyncio.create_task(_async_signal_handler(signal_type)) return _signal_handler async def main(motor: BLDCMotor): # In case the controller had faulted previously, at the start of # this script we send the stop command in order to clear it. await motor.set_stop() while True: # `set_position` accepts an optional keyword argument for each # possible position mode register as described in the moteus # reference manual. If a given register is omitted, then that # register is omitted from the command itself, with semantics # as described in the reference manual. # # The return type of 'set_position' is a moteus.Result type. # It has a __repr__ method, and has a 'values' field which can # be used to examine individual result registers. state = await motor.set_position(position=math.nan, query=True) # Print out everything. print(state) # Print out just the position register. print("Position:", state.values[moteus.Register.POSITION]) # And a blank line so we can separate one iteration from the # next. print() # Wait 20ms between iterations. By default, when commanded # over CAN, there is a watchdog which requires commands to be # sent at least every 100ms or the controller will enter a # latched fault state. await asyncio.sleep(0.02) if __name__ == '__main__': motor = BLDCMotor() loop = asyncio.new_event_loop() signal_handler = create_async_signal_handler(motor.set_stop) # Register signal handlers for SIGNINT(Keyboard interrupt) and SIGTERM (Termination) # for sig in (signal.SIGINT, signal.SIGTERM): for sig in (signal.SIGINT): loop.add_signal_handler(sig, signal_handler, sig, loop) try: loop.run_until_complete(main(motor)) finally: loop.close()