From a0538dc4c381e641f51fc7f1063cceefe9ff2cdd Mon Sep 17 00:00:00 2001 From: "Matthias@Dell" Date: Fri, 16 Jun 2023 17:59:45 +0200 Subject: [PATCH] connect to arduino --- .gitignore | 1 + host/main.py | 88 ++++++++++++++++++++++++++++++++++------------------ 2 files changed, 59 insertions(+), 30 deletions(-) diff --git a/.gitignore b/.gitignore index f35e3c0..495abce 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *build* +*venv* diff --git a/host/main.py b/host/main.py index 0524ba0..f12a6aa 100644 --- a/host/main.py +++ b/host/main.py @@ -2,40 +2,68 @@ import bleak as b import asyncio TARGET_NAME = "ArduinoTENG" -TARGET_ADRESS = "C8:C9:A3:E6:08:3A" -def disconnect_callback(client: b.BleakClient): - print(f"Disconnected {client}") +TENG_SUUID = "00010000-9a74-4b30-9361-4a16ec09930f" +TENG_STATUS_CUUID = "00010001-9a74-4b30-9361-4a16ec09930f" +TENG_COMMAND_CUUID = "00010002-9a74-4b30-9361-4a16ec09930f" +TENG_READING_CUUID = "00010003-9a74-4b30-9361-4a16ec09930f" + +TENG_COMMANDS = { + "NOOP": int(0).to_bytes(1), + "MEASURE_BASELINE": int(1).to_bytes(1), +} +TENG_STATUS = ["ERROR", "BUSY", "WAIT_CONNECT", "MEASURING_BASELINE", "READING"] + + + +def teng_status_callback(characteristic, data): + value = int.from_bytes(data, byteorder="big", signed=False) + if 0 <= value and value < len(TENG_STATUS): + print(f"Status change: {TENG_STATUS[value]}") + else: + print(f"Status change (invalid): status={value}") async def main(): - # devices = await b.BleakScanner.discover(return_adv=False) - # # print(devices) - # target_device = None - # for adr, (device, adv_data) in devices.items(): - # if device.name == TARGET_NAME: - # print(adv_data) - # target_device = device - # break - # if target_device is None: - # print("ERROR: Could not find target device") - # return - # print(f"Found target device: {target_device.name}: {target_device.metadata}, {target_device.details}") - # print(target_device.name, target_device.details) - # async with b.BleakClient(target_device) as client: - # # print(f"Connected to {client}") - # print(f"Services: {await client.services}") - scanner = b.BleakScanner() - scanner.start() - target_device = TARGET_ADRESS - client = b.BleakClient(target_device) + devices = await b.BleakScanner.discover(return_adv=True) + # print(devices) + target_device = None + for adr, (device, adv_data) in devices.items(): + if device.name == TARGET_NAME: + print(adv_data) + target_device = device + break + if target_device is None: + print("ERROR: Could not find target device") + return + print(f"Found target device: {target_device.name}: {target_device.metadata}, {target_device.details}") + print(target_device.name, target_device.details) try: - await client.connect() - # print(client.services) - except b.BleakError as e: - print(e) - finally: - await client.disconnect() - scanner.stop() + async with b.BleakClient(target_device) as client: + for service in client.services: + print(f"Service: {service.uuid}: {service.description}") + for c in service.characteristics: + print(f"\t{c.uuid}: {c.properties}, {c.descriptors}") + teng_status = client.services.get_characteristic(TENG_STATUS_CUUID) + teng_command = client.services.get_characteristic(TENG_COMMAND_CUUID) + teng_reading = client.services.get_characteristic(TENG_READING_CUUID) + client.start_notify(teng_status, teng_status_callback) + + await client.write_gatt_char(teng_command, TENG_COMMANDS["NOOP"]) + await asyncio.sleep(5) + await client.write_gatt_char(teng_command, TENG_COMMANDS["MEASURE_BASELINE"]) + + while client.is_connected: + data = await client.read_gatt_char(teng_reading) + + + value = int.from_bytes(data, byteorder="little", signed=False) + print(f"Reading: {value}") + await asyncio.sleep(0.5) + except KeyboardInterrupt: + pass + except asyncio.exceptions.CancelledError: + pass + print("Disconnected") if __name__ == "__main__":