connect to arduino

This commit is contained in:
Matthias@Dell 2023-06-16 17:59:45 +02:00
parent 16a43a5b3d
commit a0538dc4c3
2 changed files with 59 additions and 30 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
*build* *build*
*venv*

View File

@ -2,40 +2,68 @@ import bleak as b
import asyncio import asyncio
TARGET_NAME = "ArduinoTENG" TARGET_NAME = "ArduinoTENG"
TARGET_ADRESS = "C8:C9:A3:E6:08:3A"
def disconnect_callback(client: b.BleakClient): TENG_SUUID = "00010000-9a74-4b30-9361-4a16ec09930f"
print(f"Disconnected {client}") TENG_STATUS_CUUID = "00010001-9a74-4b30-9361-4a16ec09930f"
TENG_COMMAND_CUUID = "00010002-9a74-4b30-9361-4a16ec09930f"
TENG_READING_CUUID = "00010003-9a74-4b30-9361-4a16ec09930f"
TENG_COMMANDS = {
"NOOP": int(0).to_bytes(1),
"MEASURE_BASELINE": int(1).to_bytes(1),
}
TENG_STATUS = ["ERROR", "BUSY", "WAIT_CONNECT", "MEASURING_BASELINE", "READING"]
def teng_status_callback(characteristic, data):
value = int.from_bytes(data, byteorder="big", signed=False)
if 0 <= value and value < len(TENG_STATUS):
print(f"Status change: {TENG_STATUS[value]}")
else:
print(f"Status change (invalid): status={value}")
async def main(): async def main():
# devices = await b.BleakScanner.discover(return_adv=False) devices = await b.BleakScanner.discover(return_adv=True)
# # print(devices) # print(devices)
# target_device = None target_device = None
# for adr, (device, adv_data) in devices.items(): for adr, (device, adv_data) in devices.items():
# if device.name == TARGET_NAME: if device.name == TARGET_NAME:
# print(adv_data) print(adv_data)
# target_device = device target_device = device
# break break
# if target_device is None: if target_device is None:
# print("ERROR: Could not find target device") print("ERROR: Could not find target device")
# return return
# print(f"Found target device: {target_device.name}: {target_device.metadata}, {target_device.details}") print(f"Found target device: {target_device.name}: {target_device.metadata}, {target_device.details}")
# print(target_device.name, target_device.details) print(target_device.name, target_device.details)
# async with b.BleakClient(target_device) as client:
# # print(f"Connected to {client}")
# print(f"Services: {await client.services}")
scanner = b.BleakScanner()
scanner.start()
target_device = TARGET_ADRESS
client = b.BleakClient(target_device)
try: try:
await client.connect() async with b.BleakClient(target_device) as client:
# print(client.services) for service in client.services:
except b.BleakError as e: print(f"Service: {service.uuid}: {service.description}")
print(e) for c in service.characteristics:
finally: print(f"\t{c.uuid}: {c.properties}, {c.descriptors}")
await client.disconnect() teng_status = client.services.get_characteristic(TENG_STATUS_CUUID)
scanner.stop() teng_command = client.services.get_characteristic(TENG_COMMAND_CUUID)
teng_reading = client.services.get_characteristic(TENG_READING_CUUID)
client.start_notify(teng_status, teng_status_callback)
await client.write_gatt_char(teng_command, TENG_COMMANDS["NOOP"])
await asyncio.sleep(5)
await client.write_gatt_char(teng_command, TENG_COMMANDS["MEASURE_BASELINE"])
while client.is_connected:
data = await client.read_gatt_char(teng_reading)
value = int.from_bytes(data, byteorder="little", signed=False)
print(f"Reading: {value}")
await asyncio.sleep(0.5)
except KeyboardInterrupt:
pass
except asyncio.exceptions.CancelledError:
pass
print("Disconnected")
if __name__ == "__main__": if __name__ == "__main__":