# This module demonstrates uses the Nordic UART Service (NUS) to interpret gamepad import bluetooth from machine import UART from micropython import const # Arni from ble_advertising import advertising_payload from robot import ArniRobot, RobotController __version__ = "0.1.0" _IRQ_CENTRAL_CONNECT = const(1) _IRQ_CENTRAL_DISCONNECT = const(2) _IRQ_GATTS_WRITE = const(3) _FLAG_WRITE = const(0x0008) _FLAG_NOTIFY = const(0x0010) # The 128-bit vendor-specific service UUID is 6E400001-B5A3-F393-E0A9-E50E24DCCA9E (16-bit offset: 0x0001). _UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") # Defines the UART_TX characteristic # Notify _UART_TX = ( bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"), _FLAG_NOTIFY, ) # Defines the UART_RX characteristic # Write or Write Without Response # Writes data to the RX Characteristic to send it on to the UART interface. _UART_RX = ( bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"), _FLAG_WRITE, ) _UART_SERVICE = ( _UART_UUID, (_UART_TX, _UART_RX), ) # org.bluetooth.characteristic.gap.appearance.xml _ADV_APPEARANCE_GENERIC_COMPUTER = const(128) class BLEUART: def __init__(self, ble, name="arni-robot", rxbuf=100): self._ble = ble self._ble.active(True) self._ble.irq(self._irq) ((self._tx_handle, self._rx_handle),) = self._ble.gatts_register_services((_UART_SERVICE,)) # Increase the size of the rx buffer and enable append mode. self._ble.gatts_set_buffer(self._rx_handle, rxbuf, True) self._connections = set() self._rx_buffer = bytearray() self._handler = None # Optionally add services=[_UART_UUID], but this is likely to make the payload too large. self._payload = advertising_payload(name=name, appearance=_ADV_APPEARANCE_GENERIC_COMPUTER) self._advertise() def irq(self, handler): self._handler = handler def _irq(self, event, data): # Track connections so we can send notifications. if event == _IRQ_CENTRAL_CONNECT: conn_handle, _, _ = data self._connections.add(conn_handle) elif event == _IRQ_CENTRAL_DISCONNECT: conn_handle, _, _ = data if conn_handle in self._connections: self._connections.remove(conn_handle) # Start advertising again to allow a new connection. self._advertise() elif event == _IRQ_GATTS_WRITE: conn_handle, value_handle = data if conn_handle in self._connections and value_handle == self._rx_handle: self._rx_buffer += self._ble.gatts_read(self._rx_handle) if self._handler: self._handler() def any(self): return len(self._rx_buffer) def read(self, sz=None): if not sz: sz = len(self._rx_buffer) result = self._rx_buffer[0:sz] self._rx_buffer = self._rx_buffer[sz:] return result def write(self, data): for conn_handle in self._connections: self._ble.gatts_notify(conn_handle, self._tx_handle, data) def close(self): for conn_handle in self._connections: self._ble.gap_disconnect(conn_handle) self._connections.clear() def _advertise(self, interval_us=500000): self._ble.gap_advertise(interval_us, adv_data=self._payload) def main(): import time robot_uart = UART(2, baudrate=115200, tx=17, rx=16) arni = ArniRobot(robot_uart) robot_controller = RobotController(arni) # BT ble = bluetooth.BLE() uart = BLEUART(ble) def on_rx(): # Assumes Unicode text data is being sent. # print("rx: ", uart.read().decode().strip()) data = uart.read() # print("Recieved {} bytes of data".format(data)) # for count, v in enumerate(data): # print("Index: {0} | Value: {1} | Binary: {2} | Hex: {3}".format(count,v,bin(v), hex(v))) if len(data) == 8: print("Recieved {} data".format(data)) robot_controller.interpret((data[5],data[6])) # Will execute this function on every received packet uart.irq(handler=on_rx) try: print("Starting BLE Nordic UART Service.") while True: # Just keeps the program alive time.sleep_ms(1000) except KeyboardInterrupt: pass uart.close() if __name__ == "__main__": main()