Merge remote-tracking branch 'origin/dev' into dev
This commit is contained in:
commit
c6c7501b0f
@ -33,17 +33,10 @@ if __name__ == "__main__":
|
|||||||
from os import path
|
from os import path
|
||||||
filepath = path.realpath(path.abspath(__file__))
|
filepath = path.realpath(path.abspath(__file__))
|
||||||
sys.path.insert(0, path.dirname(path.dirname(filepath)))
|
sys.path.insert(0, path.dirname(path.dirname(filepath)))
|
||||||
parser = argparse.ArgumentParser(
|
|
||||||
prog="cpdctrl",
|
|
||||||
description="measure voltage using a Keithley SMU",
|
|
||||||
)
|
|
||||||
backend_group = parser.add_mutually_exclusive_group(required=False)
|
|
||||||
backend_group.add_argument("-k", "--keithley", action="store_true")
|
|
||||||
# backend_group.add_argument("-t", "--testing", action='store_true')
|
|
||||||
parser.add_argument("-c", "--config", action="store", help="alternate path to config file")
|
|
||||||
args = vars(parser.parse_args())
|
|
||||||
|
|
||||||
|
|
||||||
|
from . import led_control_device
|
||||||
|
from . import voltage_measurement_device
|
||||||
from .voltage_measurement_device.base import VoltageMeasurementDevice
|
from .voltage_measurement_device.base import VoltageMeasurementDevice
|
||||||
from .voltage_measurement_device.impl import keithley2700 as _volt
|
from .voltage_measurement_device.impl import keithley2700 as _volt
|
||||||
from .led_control_device.base import LedControlDevice
|
from .led_control_device.base import LedControlDevice
|
||||||
@ -55,8 +48,12 @@ from .utility.data import DataCollector
|
|||||||
from .utility.data import plot_cpd_data as data_plot
|
from .utility.data import plot_cpd_data as data_plot
|
||||||
from .utility.config_file import ConfigFile
|
from .utility.config_file import ConfigFile
|
||||||
from .utility import file_io
|
from .utility import file_io
|
||||||
|
from .utility.device_select import select_device_interactive
|
||||||
from .update_funcs import _Monitor, _update_print
|
from .update_funcs import _Monitor, _update_print
|
||||||
|
|
||||||
|
import logging
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
# CONFIGURATION
|
# CONFIGURATION
|
||||||
_runtime_vars = {
|
_runtime_vars = {
|
||||||
"last-measurement": ""
|
"last-measurement": ""
|
||||||
@ -279,8 +276,16 @@ _/ ___\\____ \ / __ |/ ___\ __\_ __ \ |
|
|||||||
Interactive Shell for CPD measurements with Keithley 2700B
|
Interactive Shell for CPD measurements with Keithley 2700B
|
||||||
---
|
---
|
||||||
Enter 'help()' for a list of commands""")
|
Enter 'help()' for a list of commands""")
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
prog="cpdctrl",
|
||||||
|
description="measure voltage using a Keithley SMU",
|
||||||
|
)
|
||||||
|
backend_group = parser.add_mutually_exclusive_group(required=False)
|
||||||
|
parser.add_argument("-c", "--config", action="store", help="alternate path to config file")
|
||||||
|
args = vars(parser.parse_args())
|
||||||
from os import environ
|
from os import environ
|
||||||
|
|
||||||
|
# Load config file
|
||||||
if path.isfile(cfilename):
|
if path.isfile(cfilename):
|
||||||
config_path = cfilename
|
config_path = cfilename
|
||||||
elif 'XDG_CONFIG_HOME' in environ.keys():
|
elif 'XDG_CONFIG_HOME' in environ.keys():
|
||||||
@ -294,16 +299,59 @@ Enter 'help()' for a list of commands""")
|
|||||||
config_file = ConfigFile(config_path, init_values=settings)
|
config_file = ConfigFile(config_path, init_values=settings)
|
||||||
load_settings()
|
load_settings()
|
||||||
|
|
||||||
|
# setup logging
|
||||||
|
log_path = path.expanduser(config_file.get_or("path_log", "~/.cache/cpdctrl-interactive.log"))
|
||||||
|
makedirs(path.dirname(log_path), exist_ok=True)
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.WARN,
|
||||||
|
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
|
||||||
|
handlers=[
|
||||||
|
logging.FileHandler(log_path),
|
||||||
|
logging.StreamHandler()
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
if not path.isdir(settings["datadir"]):
|
if not path.isdir(settings["datadir"]):
|
||||||
makedirs(settings["datadir"])
|
makedirs(settings["datadir"])
|
||||||
|
|
||||||
try:
|
# init the devices
|
||||||
pass
|
last_vm_type = config_file.get_or("last_dev_vm_type", None)
|
||||||
dev = _volt.init("GPIB0::22::INSTR")
|
last_vm_name = config_file.get_or("last_dev_vm_name", None)
|
||||||
led = _led.LEDD1B()
|
if last_vm_name and last_vm_type:
|
||||||
except Exception as e:
|
try:
|
||||||
print(e)
|
dev = voltage_measurement_device.connect_device(last_vm_type, last_vm_name)
|
||||||
exit(1)
|
except:
|
||||||
|
log.error(f"Failed to connect to last used device {last_vm_type}::{last_vm_name}")
|
||||||
|
while dev is None:
|
||||||
|
devs = voltage_measurement_device.list_devices()
|
||||||
|
print("-" * 50)
|
||||||
|
vm_dev_type, vm_dev_name = select_device_interactive(devs, "Select voltage measurement device: ")
|
||||||
|
try:
|
||||||
|
dev = voltage_measurement_device.connect_device(vm_dev_type, vm_dev_name)
|
||||||
|
except:
|
||||||
|
log.error(f"Failed to connect to device {vm_dev_type}::{vm_dev_name}")
|
||||||
|
config_file.set("last_dev_vm_type", vm_dev_type)
|
||||||
|
config_file.set("last_dev_vm_name", vm_dev_name)
|
||||||
|
|
||||||
|
# init the devices
|
||||||
|
last_led_type = config_file.get_or("last_dev_led_type", None)
|
||||||
|
last_led_name = config_file.get_or("last_dev_led_name", None)
|
||||||
|
if last_led_name and last_led_type:
|
||||||
|
try:
|
||||||
|
led = led_control_device.connect_device(last_led_type, last_led_name)
|
||||||
|
except:
|
||||||
|
log.error(f"Failed to connect to last used device {last_led_type}::{last_led_name}")
|
||||||
|
while led is None:
|
||||||
|
devs = led_control_device.list_devices()
|
||||||
|
print("-" * 50)
|
||||||
|
led_dev_type, led_dev_name = select_device_interactive(devs, "Select LED control device: ")
|
||||||
|
try:
|
||||||
|
led = led_control_device.connect_device(led_dev_type, led_dev_name)
|
||||||
|
except:
|
||||||
|
log.error(f"Failed to connect to device {led_dev_type}::{led_dev_name}")
|
||||||
|
config_file.set("last_dev_led_type", led_dev_type)
|
||||||
|
config_file.set("last_dev_led_name", led_dev_name)
|
||||||
|
|
||||||
# atexit.register(_backend.exit, dev)
|
# atexit.register(_backend.exit, dev)
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,25 +1,64 @@
|
|||||||
from cpdctrl.led_control_device.base import LedControlDevice
|
from cpdctrl.led_control_device.base import LedControlDevice
|
||||||
from cpdctrl.led_control_device.impl.test import TestLedControlDevice
|
from cpdctrl.led_control_device.impl.test import TestLedControlDevice
|
||||||
|
|
||||||
|
# can not use a static class member as name since the import might fail
|
||||||
|
TYPENAME_DC2200 = "Thorlabs DC2200"
|
||||||
|
TYPENAME_LEDD1B = "Thorlabs LEDD1B"
|
||||||
|
TYPENAME_TEST = "Test"
|
||||||
|
|
||||||
def list_devices() -> dict[str,list[str]]:
|
def list_devices() -> dict[str,list[str]]:
|
||||||
|
"""
|
||||||
|
Get a list of all devices that are available to connect
|
||||||
|
The returned device names may be passed to connect_device() to connect the device and acquire the handle
|
||||||
|
|
||||||
|
If a module is not installed, it will not be listed.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
dict[str,list[str]]
|
||||||
|
A dictionary with keys being the typename and values being a list of device names
|
||||||
|
"""
|
||||||
devices = {
|
devices = {
|
||||||
"TEST": ["Led Control Dummy Device"],
|
TYPENAME_TEST: ["Led Control Dummy Device"],
|
||||||
}
|
}
|
||||||
try:
|
try:
|
||||||
from .impl import thorlabs_ledd1b as th
|
from .impl import thorlabs_ledd1b as th
|
||||||
devices["ARDUINO"] = ["Thorlabs LEDD1B"] #keithley2700.enumerate_devices()
|
devices[TYPENAME_LEDD1B] = ["Thorlabs LEDD1B"] #keithley2700.enumerate_devices()
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
from .impl.thorlabs_dc2200 import DC2200
|
||||||
|
devices[TYPENAME_DC2200] = DC2200.enumerate_devices()
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
pass
|
||||||
return devices
|
return devices
|
||||||
|
|
||||||
def connect_device(typename: str, devicename: str) -> LedControlDevice:
|
def connect_device(type_name: str, device_name: str) -> LedControlDevice:
|
||||||
if typename == "TEST":
|
"""
|
||||||
|
Connect to a device
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
type_name
|
||||||
|
Type of the device, first key of the dictionary returned by list_devices()
|
||||||
|
device_name
|
||||||
|
Name of the device, element of the list belonging to <type_name> in the dictionary returned by list_devices()
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
An LedControlDevice object
|
||||||
|
"""
|
||||||
|
if type_name == TYPENAME_TEST:
|
||||||
return TestLedControlDevice()
|
return TestLedControlDevice()
|
||||||
elif typename == "ARDUINO":
|
elif type_name == TYPENAME_LEDD1B:
|
||||||
try:
|
try:
|
||||||
from .impl import thorlabs_ledd1b as th
|
from .impl import thorlabs_ledd1b as th
|
||||||
return th.LEDD1B()
|
return th.LEDD1B()
|
||||||
except ImportError as e:
|
except ImportError as e:
|
||||||
raise ValueError(f"Arduino devices not available: {e}")
|
raise ValueError(f"Arduino devices not available: {e}")
|
||||||
raise ValueError(f"Unknown device type {typename}")
|
elif type_name == TYPENAME_DC2200:
|
||||||
|
try:
|
||||||
|
from .impl.thorlabs_dc2200 import DC2200
|
||||||
|
return DC2200.connect_device(device_name)
|
||||||
|
except ImportError as e:
|
||||||
|
raise ValueError(f"DC2200 devices not available: {e}")
|
||||||
|
raise ValueError(f"Unknown device type {type_name}")
|
@ -42,4 +42,11 @@ class LedControlDevice(ABC):
|
|||||||
-------
|
-------
|
||||||
None.
|
None.
|
||||||
"""
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_led_name(self) -> None|str:
|
||||||
|
return None
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def __str__(self):
|
||||||
pass
|
pass
|
@ -12,3 +12,6 @@ class TestLedControlDevice(LedControlDevice):
|
|||||||
|
|
||||||
def set_level(self, level: int):
|
def set_level(self, level: int):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "Dummy Led Controller"
|
57
cpdctrl/led_control_device/impl/thorlabs_dc2200.py
Normal file
57
cpdctrl/led_control_device/impl/thorlabs_dc2200.py
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
from ..base import LedControlDevice
|
||||||
|
from ...utility.visa import enumerate_devices
|
||||||
|
|
||||||
|
import logging
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
import pyvisa
|
||||||
|
|
||||||
|
class DC2200(LedControlDevice):
|
||||||
|
"""
|
||||||
|
Class for controlling Thorlabs DC2200 LED controller.
|
||||||
|
This works only when the LED is connected to Terminal 2
|
||||||
|
Args:
|
||||||
|
instr (pyvisa.Resource): pyvisa resource object for the LED controller.
|
||||||
|
"""
|
||||||
|
def __init__(self, instr: pyvisa.Resource):
|
||||||
|
super().__init__()
|
||||||
|
self.instr = instr
|
||||||
|
# led presence test
|
||||||
|
self.instr.write('OUTPut:TERMinal2:TEST:INITiate')
|
||||||
|
presence = self.instr.query('OUTPut:TERMinal2:TEST:STATus?')
|
||||||
|
# Led name, format:
|
||||||
|
# "<vendor_name_string>,<led_head_model_name_string >, < led_head_serial_no_string >, < fw_version_major_num >, < fw_version_minor_num >, < fw_version_subminor_num > "
|
||||||
|
self.led_name = self.instr.query('SYSTem:TERMinal2:HTYPe?').strip("\n")
|
||||||
|
# constant brightness
|
||||||
|
self.instr.write('SOURCE1:MODE CB')
|
||||||
|
# turn off
|
||||||
|
self.instr.write(f'SOURCE1:CBRightness:BRIGhtness 0')
|
||||||
|
self.instr.write('OUTPUT1:STATE ON')
|
||||||
|
for _ in range(10): # print max 10 errors
|
||||||
|
error = self.instr.query('SYSTem:ERRor?')
|
||||||
|
if error.startswith('+0'): break
|
||||||
|
log.error(f"DC2200 device error during initialization: {error}")
|
||||||
|
def on(self):
|
||||||
|
self.instr.write(f'SOURCE1:CBRightness:BRIGhtness 100')
|
||||||
|
|
||||||
|
def off(self):
|
||||||
|
self.instr.write(f'SOURCE1:CBRightness:BRIGhtness 0')
|
||||||
|
|
||||||
|
def set_level(self, level:int):
|
||||||
|
self.instr.write(f'SOURCE1:CBRightness:BRIGhtness {level}')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def enumerate_devices(query="(GPIB)|(USB)?*:INSTR"):
|
||||||
|
return enumerate_devices("DC2200", query)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def connect_device(name):
|
||||||
|
rm = pyvisa.ResourceManager()
|
||||||
|
instr = rm.open_resource(name)
|
||||||
|
return DC2200(instr)
|
||||||
|
|
||||||
|
def get_led_name(self) -> None|str:
|
||||||
|
return self.led_name
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "Thorlabs DC2200"
|
@ -3,6 +3,13 @@ import serial
|
|||||||
from ..base import LedControlDevice
|
from ..base import LedControlDevice
|
||||||
|
|
||||||
class LEDD1B(LedControlDevice):
|
class LEDD1B(LedControlDevice):
|
||||||
|
"""
|
||||||
|
Control a Thorlabs LEDD1B LED driver using an Arduino Nano.
|
||||||
|
The arduino must have the correct software loaded on it.
|
||||||
|
(See `arduino-thorlabs-ledd1b`project directory.)
|
||||||
|
|
||||||
|
Note: This currently has COM4 hardcoded
|
||||||
|
"""
|
||||||
def __init__(self, port="COM4"):
|
def __init__(self, port="COM4"):
|
||||||
self.arduino = serial.Serial(port=port, baudrate=9600, timeout=.1)
|
self.arduino = serial.Serial(port=port, baudrate=9600, timeout=.1)
|
||||||
# self._check_arduino_software()
|
# self._check_arduino_software()
|
||||||
@ -25,8 +32,8 @@ class LEDD1B(LedControlDevice):
|
|||||||
self.arduino.write(bytes(val, 'utf-8'))
|
self.arduino.write(bytes(val, 'utf-8'))
|
||||||
|
|
||||||
def read(self):
|
def read(self):
|
||||||
data = self.arduino.readlines()
|
data = self.arduino.readlines()
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def on(self):
|
def on(self):
|
||||||
self._write("1")
|
self._write("1")
|
||||||
@ -37,6 +44,10 @@ class LEDD1B(LedControlDevice):
|
|||||||
elif level == 100: self.on()
|
elif level == 100: self.on()
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"LEDD1B Led controller can only set 0% or 100%")
|
raise ValueError(f"LEDD1B Led controller can only set 0% or 100%")
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "Thorlabs LEDD1B"
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
led = LEDD1B()
|
led = LEDD1B()
|
@ -5,9 +5,7 @@ Created on Fri Jan 24 15:18:31 2025
|
|||||||
@author: Matthias Quintern
|
@author: Matthias Quintern
|
||||||
"""
|
"""
|
||||||
from cpdctrl.voltage_measurement_device.base import VoltageMeasurementDevice
|
from cpdctrl.voltage_measurement_device.base import VoltageMeasurementDevice
|
||||||
from cpdctrl.voltage_measurement_device.impl.keithley2700 import init
|
|
||||||
from cpdctrl.led_control_device.base import LedControlDevice
|
from cpdctrl.led_control_device.base import LedControlDevice
|
||||||
from cpdctrl.led_control_device.impl.thorlabs_ledd1b import LEDD1B # TODO: remove!
|
|
||||||
from cpdctrl.led_script import LedScript
|
from cpdctrl.led_script import LedScript
|
||||||
from cpdctrl.utility.data import DataCollector
|
from cpdctrl.utility.data import DataCollector
|
||||||
|
|
||||||
@ -15,6 +13,9 @@ import time
|
|||||||
import datetime
|
import datetime
|
||||||
from queue import Queue
|
from queue import Queue
|
||||||
|
|
||||||
|
import logging
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
def measure(
|
def measure(
|
||||||
vm_dev: VoltageMeasurementDevice,
|
vm_dev: VoltageMeasurementDevice,
|
||||||
led_dev: LedControlDevice,
|
led_dev: LedControlDevice,
|
||||||
@ -27,7 +28,8 @@ def measure(
|
|||||||
stop_on_script_end: bool=False,
|
stop_on_script_end: bool=False,
|
||||||
verbose: bool=False,
|
verbose: bool=False,
|
||||||
command_queue: None|Queue=None,
|
command_queue: None|Queue=None,
|
||||||
data_queue: None|Queue=None
|
data_queue: None|Queue=None,
|
||||||
|
add_measurement_info_to_metadata=True
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Perform a measurement
|
Perform a measurement
|
||||||
@ -64,22 +66,26 @@ def measure(
|
|||||||
The default is None.
|
The default is None.
|
||||||
data_queue : None|Queue, optional
|
data_queue : None|Queue, optional
|
||||||
A queue to put data in. The default is None.
|
A queue to put data in. The default is None.
|
||||||
|
add_measurement_info_to_metadata : bool, optional
|
||||||
|
If True, add measurement info to the metadata:
|
||||||
|
time, measurement_interval, measurement_use_buffer, measurement_voltage_device, measurement_led_device
|
||||||
|
The default is True.
|
||||||
Returns
|
Returns
|
||||||
-------
|
-------
|
||||||
None.
|
None.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# old hack when using multiprocessing instead of mulithreading:
|
get_time = lambda: datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
|
||||||
# devices are not pickleable and thus cant be moved to / shared with the measurement process
|
if add_measurement_info_to_metadata:
|
||||||
# if led_dev is None:
|
data.metadata["measurement_interval"] = str(delta_t) + " s"
|
||||||
# led_dev = LEDD1B()
|
data.metadata["measurement_use_buffer"] = str(use_buffer)
|
||||||
# if vm_dev is None:
|
data.metadata["measurement_voltage_measurement_device"] = str(vm_dev)
|
||||||
# vm_dev = init("GPIB0::22::INSTR")
|
data.metadata["measurement_led_control_device"] = str(led_dev)
|
||||||
# if no "time" in metadata, set the current local time in ISO 8601 format
|
led_name = led_dev.get_led_name()
|
||||||
# and without microseconds
|
if led_name: data.metadata["measurement_led_lamp"] = led_name
|
||||||
if not "time" in data.metadata:
|
data.metadata["measurement_time_start"] = get_time()
|
||||||
data.metadata["time"] = datetime.datetime.now().replace(microsecond=0).astimezone().isoformat()
|
# write metadata to disk
|
||||||
|
data.write_metadata()
|
||||||
vm_dev.reset(True)
|
vm_dev.reset(True)
|
||||||
if use_buffer:
|
if use_buffer:
|
||||||
vm_dev.buffer_measure(delta_t, verbose=True)
|
vm_dev.buffer_measure(delta_t, verbose=True)
|
||||||
@ -90,13 +96,17 @@ def measure(
|
|||||||
i = 0
|
i = 0
|
||||||
led_val = led_script.start()
|
led_val = led_script.start()
|
||||||
t_iter_start = time.time()
|
t_iter_start = time.time()
|
||||||
while max_measurements is None or i < max_measurements:
|
while True:
|
||||||
|
# using while True and if, to be able to log the stop reason
|
||||||
|
if max_measurements is not None and i >= max_measurements:
|
||||||
|
log.info(f"Reached maximum number of measurements ({i}{max_measurements}), stopping measurement")
|
||||||
|
break
|
||||||
# 1) read value(s)
|
# 1) read value(s)
|
||||||
if use_buffer:
|
if use_buffer:
|
||||||
try:
|
try:
|
||||||
values = vm_dev.buffer_read_new_values()
|
values = vm_dev.buffer_read_new_values()
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
print(f"Error in buffer measurement {i}:", e)
|
# print(f"Error in buffer measurement {i}:", e)
|
||||||
values = []
|
values = []
|
||||||
else:
|
else:
|
||||||
values = [vm_dev.read_value()]
|
values = [vm_dev.read_value()]
|
||||||
@ -109,7 +119,7 @@ def measure(
|
|||||||
current_data = (i, tval, vval, led_val)
|
current_data = (i, tval, vval, led_val)
|
||||||
data.add_data(*current_data)
|
data.add_data(*current_data)
|
||||||
# 3) write data
|
# 3) write data
|
||||||
print(f"n = {i:6d}, t = {tval: .2f} s, U = {vval: .5f} V, LED = {led_val:03}%" + " "*10, end='\r')
|
if verbose: print(f"n = {i:6d}, t = {tval: .2f} s, U = {vval: .5f} V, LED = {led_val:03}%" + " "*10, end='\r')
|
||||||
if flush_after is not None and (i+1) % flush_after == 0:
|
if flush_after is not None and (i+1) % flush_after == 0:
|
||||||
data.flush(verbose=verbose)
|
data.flush(verbose=verbose)
|
||||||
# if a queue was given, put the data
|
# if a queue was given, put the data
|
||||||
@ -121,15 +131,16 @@ def measure(
|
|||||||
if command_queue is not None and command_queue.qsize() > 0:
|
if command_queue is not None and command_queue.qsize() > 0:
|
||||||
recv = command_queue.get(block=False)
|
recv = command_queue.get(block=False)
|
||||||
if recv == "stop":
|
if recv == "stop":
|
||||||
|
log.info(f"Received 'stop', stopping measurement")
|
||||||
break
|
break
|
||||||
elif type(recv) == tuple and recv[0] == "led_script":
|
elif type(recv) == tuple and recv[0] == "led_script":
|
||||||
led_script = recv[1]
|
led_script = recv[1]
|
||||||
else:
|
else:
|
||||||
print(f"Received invalid message: '{recv}'")
|
log.error(f"Received invalid message: '{recv}'")
|
||||||
|
|
||||||
# 4) sleep
|
# 4) sleep
|
||||||
# substract the execution time from the sleep time for a more
|
# subtract the execution time from the sleep time for a more
|
||||||
# acurate frequency
|
# accurate frequency
|
||||||
dt_sleep = delta_t - (time.time() - t_iter_start)
|
dt_sleep = delta_t - (time.time() - t_iter_start)
|
||||||
if dt_sleep > 0:
|
if dt_sleep > 0:
|
||||||
# print(f"Sleeping for {dt_sleep}")
|
# print(f"Sleeping for {dt_sleep}")
|
||||||
@ -137,7 +148,7 @@ def measure(
|
|||||||
t_iter_start = time.time()
|
t_iter_start = time.time()
|
||||||
# 5) update LED
|
# 5) update LED
|
||||||
if stop_on_script_end and led_script.is_done(t_iter_start):
|
if stop_on_script_end and led_script.is_done(t_iter_start):
|
||||||
if verbose: print("Reached script end")
|
log.info("Reached led script end, stopping measurement")
|
||||||
break
|
break
|
||||||
new_led_val = led_script.get_state(t_iter_start)
|
new_led_val = led_script.get_state(t_iter_start)
|
||||||
if new_led_val != led_val:
|
if new_led_val != led_val:
|
||||||
@ -145,12 +156,18 @@ def measure(
|
|||||||
led_dev.set_level(new_led_val)
|
led_dev.set_level(new_led_val)
|
||||||
led_val = new_led_val
|
led_val = new_led_val
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error setting led to {new_led_val}%:")
|
log.error(f"Error setting led to {new_led_val:03}%: {e}")
|
||||||
print(e)
|
raise e
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass
|
log.info("Keyboard interrupt, stopping measurement")
|
||||||
data.flush(verbose=verbose)
|
except Exception as e:
|
||||||
|
log.critical(f"Unexpected error, stopping measurement. Error: {e}")
|
||||||
|
if add_measurement_info_to_metadata:
|
||||||
|
data.metadata["measurement_time_stop"] = get_time()
|
||||||
|
# Write again after having updated the stop time
|
||||||
|
data.write_metadata()
|
||||||
|
data.flush()
|
||||||
led_dev.off()
|
led_dev.off()
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
from os import environ, makedirs, path
|
from os import environ, makedirs, path
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
|
import logging
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
class ConfigFile:
|
class ConfigFile:
|
||||||
"""
|
"""
|
||||||
Class managing a yaml config file.
|
Class managing a yaml config file.
|
||||||
@ -15,6 +17,7 @@ class ConfigFile:
|
|||||||
self.values = init_values
|
self.values = init_values
|
||||||
self.filepath = filepath
|
self.filepath = filepath
|
||||||
if path.isfile(self.filepath):
|
if path.isfile(self.filepath):
|
||||||
|
log.debug(f"[{self.filepath}] loading from file")
|
||||||
with open(self.filepath, "r") as file:
|
with open(self.filepath, "r") as file:
|
||||||
self.values |= yaml.safe_load(file)
|
self.values |= yaml.safe_load(file)
|
||||||
|
|
||||||
@ -23,6 +26,7 @@ class ConfigFile:
|
|||||||
directory = path.dirname(self.filepath)
|
directory = path.dirname(self.filepath)
|
||||||
if not path.isdir(directory):
|
if not path.isdir(directory):
|
||||||
makedirs(directory)
|
makedirs(directory)
|
||||||
|
log.debug(f"[{self.filepath}] saving to file")
|
||||||
with open(self.filepath, "w") as file:
|
with open(self.filepath, "w") as file:
|
||||||
yaml.dump(self.values, file)
|
yaml.dump(self.values, file)
|
||||||
|
|
||||||
@ -35,10 +39,12 @@ class ConfigFile:
|
|||||||
raise KeyError(f"Key '{name}' not found in config file '{self.filepath}'")
|
raise KeyError(f"Key '{name}' not found in config file '{self.filepath}'")
|
||||||
|
|
||||||
def set(self, name: str, value):
|
def set(self, name: str, value):
|
||||||
|
log.debug(f"[{self.filepath}] set {name} = {value}")
|
||||||
self.values[name] = value
|
self.values[name] = value
|
||||||
|
|
||||||
def get_values(self):
|
def get_values(self):
|
||||||
return self.values.copy()
|
return self.values.copy()
|
||||||
|
|
||||||
def set_values(self, values):
|
def set_values(self, values):
|
||||||
|
log.debug(f"[{self.filepath}] set values = {values}")
|
||||||
self.values = values
|
self.values = values
|
@ -4,11 +4,14 @@ import os
|
|||||||
import matplotlib.pyplot as plt
|
import matplotlib.pyplot as plt
|
||||||
import datetime
|
import datetime
|
||||||
import pickle
|
import pickle
|
||||||
|
import logging
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
from cpdctrl.utility.file_io import get_next_filename, sanitize_filename
|
from cpdctrl.utility.file_io import get_next_filename, sanitize_filename
|
||||||
FLUSH_TYPE = "pickle-ndarray"
|
FLUSH_TYPE = "pickle-ndarray"
|
||||||
METADATA_FILENAME = "_measurement_metadata.pkl"
|
METADATA_FILENAME = "_measurement_metadata.pkl"
|
||||||
|
|
||||||
|
|
||||||
class DataCollector:
|
class DataCollector:
|
||||||
columns = ["idx", "t [s]", "V [V]", "LED [%]"]
|
columns = ["idx", "t [s]", "V [V]", "LED [%]"]
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
@ -24,7 +27,7 @@ class DataCollector:
|
|||||||
self.metadata = metadata
|
self.metadata = metadata
|
||||||
self.path = os.path.abspath(os.path.expanduser(data_path))
|
self.path = os.path.abspath(os.path.expanduser(data_path))
|
||||||
if dirname is None:
|
if dirname is None:
|
||||||
self.dirname = sanitize_filename(datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d_%H-%M") + "_" + self.name)
|
self.dirname = sanitize_filename(datetime.datetime.now().strftime("%Y-%m-%d_%H-%M") + "_" + self.name)
|
||||||
else:
|
else:
|
||||||
self.dirname = sanitize_filename(dirname)
|
self.dirname = sanitize_filename(dirname)
|
||||||
self.dirpath = os.path.join(self.path, self.dirname)
|
self.dirpath = os.path.join(self.path, self.dirname)
|
||||||
@ -56,6 +59,7 @@ class DataCollector:
|
|||||||
None.
|
None.
|
||||||
"""
|
"""
|
||||||
filepath = os.path.join(self.dirpath, METADATA_FILENAME)
|
filepath = os.path.join(self.dirpath, METADATA_FILENAME)
|
||||||
|
log.debug(f"Writing metadata to {filepath}")
|
||||||
with open(filepath, "wb") as file:
|
with open(filepath, "wb") as file:
|
||||||
pickle.dump(self.metadata, file)
|
pickle.dump(self.metadata, file)
|
||||||
|
|
||||||
@ -85,11 +89,13 @@ class DataCollector:
|
|||||||
if FLUSH_TYPE == "csv":
|
if FLUSH_TYPE == "csv":
|
||||||
filename = self._get_filename() + ".csv"
|
filename = self._get_filename() + ".csv"
|
||||||
filepath = os.path.join(self.dirpath, filename)
|
filepath = os.path.join(self.dirpath, filename)
|
||||||
|
log.info(f"Flushing data to {filepath}")
|
||||||
if verbose: print(f"Flushing data to {filepath}")
|
if verbose: print(f"Flushing data to {filepath}")
|
||||||
self.to_dataframe().to_csv(filepath, sep=",", index=False, metadata=True)
|
self.to_dataframe().to_csv(filepath, sep=",", index=False, metadata=True)
|
||||||
elif FLUSH_TYPE == "pickle-ndarray":
|
elif FLUSH_TYPE == "pickle-ndarray":
|
||||||
filename = self._get_filename() + ".ndarray.pkl"
|
filename = self._get_filename() + ".ndarray.pkl"
|
||||||
filepath = os.path.join(self.dirpath, filename)
|
filepath = os.path.join(self.dirpath, filename)
|
||||||
|
log.info(f"Flushing data to {filepath}")
|
||||||
if verbose: print(f"Flushing data to {filepath}")
|
if verbose: print(f"Flushing data to {filepath}")
|
||||||
with open(filepath, "wb") as file:
|
with open(filepath, "wb") as file:
|
||||||
pickle.dump(np.array(self.data), file)
|
pickle.dump(np.array(self.data), file)
|
||||||
@ -117,7 +123,8 @@ class DataCollector:
|
|||||||
|
|
||||||
def save_csv(self, sep=",", verbose=False):
|
def save_csv(self, sep=",", verbose=False):
|
||||||
filepath = os.path.join(self.path, self.dirname + ".csv")
|
filepath = os.path.join(self.path, self.dirname + ".csv")
|
||||||
if verbose: print(f"Writing data to {filepath}")
|
if verbose: print(f"Writing csv to {filepath}")
|
||||||
|
log.info(f"Writing csv to {filepath}")
|
||||||
with open(filepath, "w") as file:
|
with open(filepath, "w") as file:
|
||||||
file.write(self.to_csv(sep=sep))
|
file.write(self.to_csv(sep=sep))
|
||||||
|
|
||||||
|
26
cpdctrl/utility/device_select.py
Normal file
26
cpdctrl/utility/device_select.py
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
def select_device_interactive(type_devices_dict: dict[str, list[str]], prompt="Select an instrument: ") -> tuple[str, str]:
|
||||||
|
"""
|
||||||
|
Select a device interactively from the command line
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
type_devices_dict
|
||||||
|
A dictionary of device types and their corresponding device names
|
||||||
|
-------
|
||||||
|
The type and name of the selected device.
|
||||||
|
These can be passed to the connect_device method of the led_control_device or voltage_measurement_device libraries
|
||||||
|
"""
|
||||||
|
res = type_devices_dict
|
||||||
|
flat_res = [ (t, v) for t, l in res.items() for v in l ]
|
||||||
|
for i, (t,v) in enumerate(flat_res):
|
||||||
|
print(f"{i+1:02}: {t} - {v}")
|
||||||
|
while len(flat_res) > 0:
|
||||||
|
try:
|
||||||
|
instr = int(input(prompt)) - 1
|
||||||
|
if instr < 0 or instr >= len(flat_res):
|
||||||
|
raise ValueError
|
||||||
|
return flat_res[instr]
|
||||||
|
except ValueError:
|
||||||
|
print(f"Enter a number between 1 and {len(flat_res)}")
|
||||||
|
continue
|
||||||
|
raise Exception("No devices found")
|
34
cpdctrl/utility/visa.py
Normal file
34
cpdctrl/utility/visa.py
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
import pyvisa
|
||||||
|
|
||||||
|
import logging
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def enumerate_devices(device_name, query="(GPIB)|(USB)?*:INSTR", visa_backend=""):
|
||||||
|
"""
|
||||||
|
Return all available visa resources that match the query and the device name
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
device_name
|
||||||
|
A part of the name that the device is supposed upon the '*IDN?' query
|
||||||
|
query
|
||||||
|
A query to the visa resource manager, to filter the resources
|
||||||
|
visa_backend
|
||||||
|
The visa backend to use, if not the default one
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
List of visa resource names
|
||||||
|
|
||||||
|
"""
|
||||||
|
rm = pyvisa.ResourceManager(visa_backend)
|
||||||
|
res = []
|
||||||
|
for r in rm.list_resources(query):
|
||||||
|
try:
|
||||||
|
instr = rm.open_resource(r)
|
||||||
|
name = instr.query('*IDN?')
|
||||||
|
if device_name in name:
|
||||||
|
res.append(r)
|
||||||
|
instr.close()
|
||||||
|
except:
|
||||||
|
log.debug(f"Could not open Visa resources {r}")
|
||||||
|
return res
|
@ -1,6 +1,9 @@
|
|||||||
|
|
||||||
from .base import VoltageMeasurementDevice
|
from .base import VoltageMeasurementDevice
|
||||||
|
|
||||||
|
TYPENAME_TEST = "Test"
|
||||||
|
TYPENAME_KEITHLEY2700 = "Keithley 2700"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from .impl.keithley2700 import Keithley2700
|
from .impl.keithley2700 import Keithley2700
|
||||||
except ImportError:
|
except ImportError:
|
||||||
@ -10,22 +13,22 @@ from .impl.test import TestVoltageMeasurementDevice
|
|||||||
|
|
||||||
def list_devices() -> dict[str,list[str]]:
|
def list_devices() -> dict[str,list[str]]:
|
||||||
devices = {
|
devices = {
|
||||||
"TEST": ["Voltage Measurement Dummy Device"],
|
TYPENAME_TEST: ["Voltage Measurement Dummy Device"],
|
||||||
}
|
}
|
||||||
try:
|
try:
|
||||||
from .impl import keithley2700
|
from .impl.keithley2700 import Keithley2700
|
||||||
devices["VISA"] = keithley2700.enumerate_devices()
|
devices[TYPENAME_KEITHLEY2700] = Keithley2700.enumerate_devices()
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
pass
|
||||||
return devices
|
return devices
|
||||||
|
|
||||||
def connect_device(typename: str, devicename: str) -> VoltageMeasurementDevice:
|
def connect_device(type_name: str, device_name: str) -> VoltageMeasurementDevice:
|
||||||
if typename == "TEST":
|
if type_name == TYPENAME_TEST:
|
||||||
return TestVoltageMeasurementDevice()
|
return TestVoltageMeasurementDevice()
|
||||||
elif typename == "VISA":
|
elif type_name == TYPENAME_KEITHLEY2700:
|
||||||
try:
|
try:
|
||||||
from .impl import keithley2700
|
from .impl.keithley2700 import Keithley2700
|
||||||
return keithley2700.init(devicename)
|
return Keithley2700.connect_device(device_name)
|
||||||
except ImportError as e:
|
except ImportError as e:
|
||||||
raise ValueError(f"VISA devices not available: {e}")
|
raise ValueError(f"Keithley 2700 devices not available: {e}")
|
||||||
raise ValueError(f"Unknown device type {typename}")
|
raise ValueError(f"Unknown device type {type_name}")
|
@ -50,4 +50,7 @@ class VoltageMeasurementDevice(ABC):
|
|||||||
None.
|
None.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
pass
|
||||||
|
@abstractmethod
|
||||||
|
def __str__(self):
|
||||||
pass
|
pass
|
@ -5,78 +5,7 @@ import os
|
|||||||
from typing import Callable
|
from typing import Callable
|
||||||
|
|
||||||
from ..base import VoltageMeasurementDevice
|
from ..base import VoltageMeasurementDevice
|
||||||
|
from ...utility.visa import enumerate_devices
|
||||||
"""
|
|
||||||
Utility
|
|
||||||
"""
|
|
||||||
|
|
||||||
# scripts = {
|
|
||||||
# "buffer_reset": pkg_resources.resource_filename("cpdctrl", "keithley_scripts/buffer_reset.lua"),
|
|
||||||
# "instrument_reset": pkg_resources.resource_filename("cpdctrl", "keithley_scripts/smua_reset.lua"),
|
|
||||||
# }
|
|
||||||
scripts = {
|
|
||||||
|
|
||||||
"instrument_reset": "~/cpd-dev/cpdctrl/cpdctrl/keithley_scripts/reset.scpi",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def enumerate_devices(visa_backend="", query="GPIB?*::INSTR") -> list[str]:
|
|
||||||
"""
|
|
||||||
Enumerate all devices matching the query.
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
visa_backend
|
|
||||||
The Visa backend to use (eg. "@py" for pyvisa-py, "@sim" for pyvisa-sim).
|
|
||||||
If not specified, the default backend is used.
|
|
||||||
query
|
|
||||||
The query to use to find devices. To list all, use "?*::INSTR".
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
|
|
||||||
"""
|
|
||||||
rm = pyvisa.ResourceManager(visa_backend)
|
|
||||||
resources = rm.list_resources(query=query)
|
|
||||||
return resources
|
|
||||||
|
|
||||||
|
|
||||||
def select_visa_device(visa_backend="", query="GPIB?*::INSTR") -> pyvisa.resources.Resource:
|
|
||||||
"""
|
|
||||||
Select a Visa device interactively from the command line
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
visa_backend
|
|
||||||
The Visa backend to use (eg. "@py" for pyvisa-py, "@sim" for pyvisa-sim).
|
|
||||||
If not specified, the default backend is used.
|
|
||||||
query
|
|
||||||
The query to use to find devices. To list all, use "?*::INSTR".
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
pyvisa.resources.Resource : The selected Visa device
|
|
||||||
"""
|
|
||||||
rm = pyvisa.ResourceManager(visa_backend)
|
|
||||||
resources = rm.list_resources(query=query)
|
|
||||||
if len(resources) < 1:
|
|
||||||
raise Exception("No resources found.")
|
|
||||||
elif len(resources) == 1:
|
|
||||||
print(f"Opening the only resource found: {resources[0]}")
|
|
||||||
return rm.open_resource(resources[0])
|
|
||||||
else: # len(resources) > 1:
|
|
||||||
print("Resources:")
|
|
||||||
for i, r in enumerate(resources):
|
|
||||||
print(f"{i+1:02}: {r}")
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
instr = int(input("Select an instrument: ")) - 1
|
|
||||||
if instr < 0 or instr >= len(resources):
|
|
||||||
raise ValueError
|
|
||||||
return rm.open_resource(resources[instr])
|
|
||||||
except ValueError:
|
|
||||||
print(f"Enter a number between 1 and {len(resources)}")
|
|
||||||
continue
|
|
||||||
raise Exception("This should never happen")
|
|
||||||
|
|
||||||
|
|
||||||
class Keithley2700(VoltageMeasurementDevice):
|
class Keithley2700(VoltageMeasurementDevice):
|
||||||
"""
|
"""
|
||||||
@ -317,11 +246,15 @@ class Keithley2700(VoltageMeasurementDevice):
|
|||||||
pass
|
pass
|
||||||
print("Measurement stopped" + " "*50)
|
print("Measurement stopped" + " "*50)
|
||||||
|
|
||||||
def init(name=None, visa_backend=""):
|
@staticmethod
|
||||||
if name:
|
def enumerate_devices(query="(GPIB)|(USB)?*:INSTR"):
|
||||||
rm = pyvisa.ResourceManager(visa_backend)
|
return enumerate_devices("MODEL 2700", query)
|
||||||
instr = rm.open_resource(name)
|
|
||||||
else:
|
|
||||||
instr = select_visa_device(name=name)
|
|
||||||
return Keithley2700(instr)
|
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def connect_device(name):
|
||||||
|
rm = pyvisa.ResourceManager()
|
||||||
|
instr = rm.open_resource(name)
|
||||||
|
return Keithley2700(instr)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "Keithley 2700"
|
||||||
|
@ -52,3 +52,6 @@ class TestVoltageMeasurementDevice(VoltageMeasurementDevice):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "Simulated Voltage Measurement Device"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user