Compare commits

...

4 Commits
main ... dev

Author SHA1 Message Date
CPD
3481563c81 Add device enumeration to LEDD1B 2025-03-26 17:46:15 +01:00
CPD
6a453da9bb Add connection checks 2025-03-19 17:01:33 +01:00
CPD
1fd09bee82 Add support for a usb power switch 2025-03-17 18:15:20 +01:00
CPD
b566ee4c67 Add log 2025-03-17 18:14:47 +01:00
13 changed files with 151 additions and 18 deletions

View File

@ -23,7 +23,8 @@ def list_devices() -> dict[str,list[str]]:
} }
try: try:
from .impl import thorlabs_ledd1b as th from .impl import thorlabs_ledd1b as th
devices[TYPENAME_LEDD1B] = ["Thorlabs LEDD1B"] #keithley2700.enumerate_devices() # devices[TYPENAME_LEDD1B] = ["Thorlabs LEDD1B"] #keithley2700.enumerate_devices()
devices[TYPENAME_LEDD1B] = th.LEDD1B.enumerate_devices()
except ImportError: except ImportError:
pass pass
try: try:
@ -52,7 +53,10 @@ def connect_device(type_name: str, device_name: str) -> LedControlDevice:
elif type_name == TYPENAME_LEDD1B: elif type_name == TYPENAME_LEDD1B:
try: try:
from .impl import thorlabs_ledd1b as th from .impl import thorlabs_ledd1b as th
return th.LEDD1B() dev = th.LEDD1B.connect_device(device_name)
# make sure a potential software mismatch is being logged
dev.check_has_correct_software(no_error=True)
return dev
except ImportError as e: except ImportError as e:
raise ValueError(f"Arduino devices not available: {e}") raise ValueError(f"Arduino devices not available: {e}")
elif type_name == TYPENAME_DC2200: elif type_name == TYPENAME_DC2200:

View File

@ -6,7 +6,15 @@ Created on Tue Jan 21 16:26:13 2025
""" """
class LedControlDevice(ABC): class LedControlDevice(ABC):
@abstractmethod @abstractmethod
def test_connection(self) -> None:
"""
Verify that the device is still properly connected.
If not, raises ConnectionError
"""
pass
@abstractmethod
def on(self): def on(self):
""" """
Set the led brightness to 100% Set the led brightness to 100%

View File

@ -4,6 +4,9 @@ class TestLedControlDevice(LedControlDevice):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
def test_connection(self) -> None:
pass
def on(self): def on(self):
pass pass

View File

@ -36,6 +36,13 @@ class DC2200(LedControlDevice):
self.off() self.off()
self.instr.close() self.instr.close()
def test_connection(self) -> None:
try:
res = self.instr.query('*IDN?')
except Exception as e:
log.error(f"Failed to query *IDN?:", e)
raise ConnectionError
def on(self): def on(self):
self.instr.write(f'SOURCE1:CBRightness:BRIGhtness 100') self.instr.write(f'SOURCE1:CBRightness:BRIGhtness 100')

View File

@ -1,7 +1,12 @@
import serial import serial
import serial.tools.list_ports
from ..base import LedControlDevice from ..base import LedControlDevice
import logging
log = logging.getLogger(__name__)
ARDUINO_SOFTWARE_VERSION_STRING = "Arduino Nano CPD 1"
class LEDD1B(LedControlDevice): class LEDD1B(LedControlDevice):
""" """
Control a Thorlabs LEDD1B LED driver using an Arduino Nano. Control a Thorlabs LEDD1B LED driver using an Arduino Nano.
@ -10,27 +15,40 @@ class LEDD1B(LedControlDevice):
Note: This currently has COM4 hardcoded Note: This currently has COM4 hardcoded
""" """
def __init__(self, port="COM4"): def __init__(self, port="COM4"):
self.arduino = serial.Serial(port=port, baudrate=9600, timeout=.1) super().__init__()
# self._check_arduino_software() self.arduino = serial.Serial(port=port, baudrate=9600, timeout=0.1)
# flush input
while self.arduino.read(1):
pass
def __del__(self): def __del__(self):
self.off() self.off()
self.arduino.close() self.arduino.close()
def _check_arduino_software(self): def check_has_correct_software(self, no_error=False):
""" """
Run the identify command and raise an Exception Run the identify command and raise an Exception
if the Arduino does not reply with the expected output. if the Arduino does not reply with the expected output.
""" """
self._write('i') self._write('i')
lines = self.read() lines = self.read()
if len(lines) < 1 or not lines[-1].startswith(bytes("Arduino Nano CPD 1", "utf-8")): if len(lines) < 1 or not lines[-1].startswith(bytes(ARDUINO_SOFTWARE_VERSION_STRING, "utf-8")):
print(lines) log.error(f"Arduino did not return the expected output - does it have the correct software loaded?\nExpected: '{ARDUINO_SOFTWARE_VERSION_STRING}'\nReceived:{lines}")
raise Exception("Arduino did not return the expected output - does it have the correct software loaded?") if not no_error:
raise ConnectionError("Arduino did not return the expected output - does it have the correct software loaded?")
def test_connection(self) -> None:
try:
self._write('i')
lines = self.read()
except Exception as e:
log.error(f"Failed to query 'i':", e)
raise ConnectionError
def _write(self, val): def _write(self, val):
self.arduino.write(bytes(val, 'utf-8')) self.arduino.write(bytes(val, 'utf-8'))
self.arduino.flush()
def read(self): def read(self):
data = self.arduino.readlines() data = self.arduino.readlines()
@ -49,6 +67,23 @@ class LEDD1B(LedControlDevice):
def __str__(self): def __str__(self):
return "Thorlabs LEDD1B" return "Thorlabs LEDD1B"
@staticmethod
def enumerate_devices() -> list[str]:
devs = []
coms = serial.tools.list_ports.comports()
# Require a device with no product field and empty serial number
# Change these conditions to support different devices
for com in coms:
if com.product is not None: continue
if com.serial_number: continue
if com.manufacturer not in ["wch.cn"]: continue
devs.append(com.device)
return devs
@staticmethod
def connect_device(name: str):
return LEDD1B(port=name)
if __name__ == '__main__': if __name__ == '__main__':
led = LEDD1B() led = LEDD1B()

View File

@ -306,8 +306,8 @@ class LedScript:
raise InvalidScriptUpdate(f"Cannot update row {idx+1} because with the new duration it would be skipped") raise InvalidScriptUpdate(f"Cannot update row {idx+1} because with the new duration it would be skipped")
self.script["dt"][idx] = dt self.script["dt"][idx] = dt
self.script["led"][idx] = led self.script["led"][idx] = led
log.info(f"Updated row {idx+1} with values dt={dt}, led={led}")
self._update_script_dtsums(idx) self._update_script_dtsums(idx)
log.critical("Updated!")
def _update_script_dtsums(self, from_idx: int): def _update_script_dtsums(self, from_idx: int):
for i in range(from_idx, self.script["dtsum"].shape[0]): for i in range(from_idx, self.script["dtsum"].shape[0]):

View File

View File

@ -0,0 +1,17 @@
from abc import ABC, abstractmethod
class PowerSwitchDevice(ABC):
@abstractmethod
def status(self) -> bool:
pass
@abstractmethod
def on(self):
pass
@abstractmethod
def off(self):
pass
@abstractmethod
def __str__(self):
pass

View File

@ -0,0 +1,40 @@
from ..base import PowerSwitchDevice
import os
from subprocess import Popen, PIPE
class ClewareSwitch(PowerSwitchDevice):
def __init__(self, usbswitchcmd_exe_path: str):
super().__init__()
self.exe = usbswitchcmd_exe_path
def run_cmd(self, cmd) -> (int, str, str):
args = [self.exe, cmd]
p = Popen(args, stdout=PIPE, stderr=PIPE)
p.wait()
return p.returncode, p.stdout.read().decode("utf-8").strip("\n"), p.stderr.read().decode("utf-8").strip("\n")
def status(self) -> bool:
cmd = "-r"
ret, out, err = self.run_cmd(cmd)
print(f"status: ret: {ret}, out: '{out}', err: '{err}'")
if out == "1":
return True
elif out == "0":
return False
else:
raise RuntimeError(f"Failed to read USB switch status.\nCommand: '{cmd}'\nCode: {ret}, stdout: '{out}', stderr: '{err}'")
def on(self):
cmd = "1"
ret, out, err = self.run_cmd(cmd)
if ret != 1 or out or err: # yes, it actually returns 1 when it turns on ._.
raise RuntimeError(f"Failed to turn on USB switch.\nCommand: '{cmd}'\nCode: {ret}, stdout: '{out}', stderr: '{err}'")
def off(self):
cmd = "0"
ret, out, err = self.run_cmd(cmd)
if ret != 0 or out or err:
raise RuntimeError(f"Failed to turn on USB switch.\nCommand: '{cmd}'\nCode: {ret}, stdout: '{out}', stderr: '{err}'")
def __str__(self):
return f"Cleware USB Switch"

View File

@ -7,7 +7,14 @@ Created on Tue Jan 21 16:19:01 2025
@author: Matthias Quintern @author: Matthias Quintern
""" """
class VoltageMeasurementDevice(ABC): class VoltageMeasurementDevice(ABC):
@abstractmethod
def test_connection(self) -> None:
"""
Verify that the device is still properly connected.
If not, raises ConnectionError
"""
pass
# RUN COMMANDS ON THE DEVICE # RUN COMMANDS ON THE DEVICE
@abstractmethod @abstractmethod
def run(self, code, verbose=False): def run(self, code, verbose=False):

View File

@ -7,11 +7,13 @@ from typing import Callable
from ..base import VoltageMeasurementDevice from ..base import VoltageMeasurementDevice
from ...utility.visa import enumerate_devices from ...utility.visa import enumerate_devices
import logging
log = logging.getLogger(__name__)
class Keithley2700(VoltageMeasurementDevice): class Keithley2700(VoltageMeasurementDevice):
""" """
Wrapper class for the Keithley2700 SMU controlled via pyvisa Wrapper class for the Keithley2700 SMU controlled via pyvisa
""" """
def __init__(self, instr, check_front_switch=True): def __init__(self, instr, check_front_switch=True):
self.instr = instr self.instr = instr
if check_front_switch: if check_front_switch:
@ -21,7 +23,8 @@ class Keithley2700(VoltageMeasurementDevice):
def __del__(self): def __del__(self):
"""Properly close the instrument connection""" """Properly close the instrument connection"""
self.instr.close() self.instr.close()
def _check_front_input_selected(self): def _check_front_input_selected(self):
""" """
Make sure the front switch selecting the inputs selects the FRONT inputs Make sure the front switch selecting the inputs selects the FRONT inputs
@ -39,8 +42,14 @@ class Keithley2700(VoltageMeasurementDevice):
if switch != "1": if switch != "1":
raise Exception("The Keithley's INPUT switch must select the [F]ront inputs") raise Exception("The Keithley's INPUT switch must select the [F]ront inputs")
return switch return switch
def test_connection(self) -> None:
try:
res = self.query("*IDN?")
except Exception as e:
log.error(f"Failed to query *IDN?:", e)
raise ConnectionError
def query(self, query): def query(self, query):
try: try:
return self.instr.query(query).strip("\n") return self.instr.query(query).strip("\n")

View File

@ -10,6 +10,9 @@ class TestVoltageMeasurementDevice(VoltageMeasurementDevice):
self.frequency = frequency self.frequency = frequency
self.t0 = now() self.t0 = now()
def test_connection(self) -> None:
pass
# RUN COMMANDS ON THE DEVICE # RUN COMMANDS ON THE DEVICE
def run(self, code, verbose=False): def run(self, code, verbose=False):
pass pass