This commit is contained in:
CPD 2025-04-29 12:25:57 +02:00
commit afde2f85e2
11 changed files with 811 additions and 0 deletions

View File

@ -0,0 +1,34 @@
from .base import MeasurementDevice
TYPENAME_TEST = "Test"
TYPENAME_KEITHLEY2700 = "Keithley 2700"
try:
from .impl.sr830 import SR830
except ImportError:
pass
from .impl.test import TestVoltageMeasurementDevice
def list_devices() -> dict[str,list[str]]:
devices = {
TYPENAME_TEST: ["Voltage Measurement Dummy Device"],
}
try:
from .impl.sr830 import SR830
devices[TYPENAME_KEITHLEY2700] = SR830.enumerate_devices()
except ImportError:
pass
return devices
def connect_device(type_name: str, device_name: str) -> MeasurementDevice:
if type_name == TYPENAME_TEST:
return TestVoltageMeasurementDevice()
elif type_name == TYPENAME_KEITHLEY2700:
try:
from .impl.sr830 import SR830
return SR830.connect_device(device_name)
except ImportError as e:
raise ValueError(f"Keithley 2700 devices not available: {e}")
raise ValueError(f"Unknown device type {type_name}")

View File

@ -0,0 +1,63 @@
from abc import ABC, abstractmethod
from typing import Callable
"""
Created on Tue Jan 21 16:19:01 2025
@author: Matthias Quintern
"""
class MeasurementDevice(ABC):
@abstractmethod
def test_connection(self) -> None:
"""
Verify that the device is still properly connected.
If not, raises ConnectionError
"""
pass
# RUN COMMANDS ON THE DEVICE
@abstractmethod
def run(self, code, verbose=False):
pass
@abstractmethod
def run_script(self, script_path, verbose=False):
pass
@abstractmethod
def reset(self, verbose=False):
pass
@abstractmethod
def read_value(self) -> tuple[float, float]:
"""
Read a single value
Returns
-------
[timestamp, voltage]
"""
pass
@abstractmethod
def measure(self, interval: int, update_func: Callable[None, [int, float, float]]|None=None, max_measurements:int|None=None):
"""
Take voltage readings after <interval> milliseconds.
Parameters
----------
interval : int
Number of milliseconds to wait between readings.
update_func : Callable[None, [int, float, float]] or None, optional
A function that is called after each reading with parameters <n_reading>, <time>, <voltage>. The default is None.
max_measurements : int or None, optional
Number of readings to perform. Set to None for infinite. The default is None.
Returns
-------
None.
"""
pass
@abstractmethod
def __str__(self):
pass

View File

@ -0,0 +1,86 @@
import pyvisa
from time import sleep
# import pkg_resources
import os
from typing import Callable
from ..base import MeasurementDevice
from ...utility.visa import enumerate_devices
import logging
log = logging.getLogger(__name__)
class SR830(MeasurementDevice):
"""
Wrapper class for the Keithley2700 SMU controlled via pyvisa
"""
def __init__(self, instr, check_front_switch=True):
self.instr = instr
init_script = """
' set input A
ISRC 0
' set float
IGND 0
' set AC coupling
ICPL 0
' Enable 1x and 2x Line notch filters
ILIN 3
' Enable synchronous filter < 200Hz
SYNC 1
' Show R and Phase on the displays
DDEF 1 1 0
DDEF 2 1 0
' Set sample rate to 256 Hz
12
' Set buffer loop mode
SEND 1
"""
self.run(init_script)
def __del__(self):
"""Properly close the instrument connection"""
self.instr.close()
# RUN COMMANDS ON THE DEVICE
def run(self, code, verbose=False):
"""
Run SCPI code on the device by writing it.
Empty lines, leading whitespaces and lines starting with ' or # are ignored.
Parameters
----------
code : str
SCPI commands
"""
script = ''
for line in code.strip(" ").split("\n"):
l = line.strip(" ")
if len(l) == 0 or l[0] in "#'": continue
script += l + "\n"
if verbose:
print(f"Running code:\n{script}")
try:
ret = self.instr.write(script)
if ret != 8:
raise RuntimeError(f"Error while writing command(s):\n'{script}'\n\nDevice returned code {ret}")
except pyvisa.VisaIOError as e:
raise RuntimeError(f"VisaIOError raised while writing command(s):\n'{script}'\n\nVisaIOError:\n{e}")
def get_frequency(self) -> float:
return float(self.query("FREQ?"))
def query(self, query):
return self.instr.query(query).strip("\n")
@staticmethod
def enumerate_devices(query="(GPIB)|(USB)?*:INSTR"):
return enumerate_devices("SR830", query)
@staticmethod
def connect_device(name):
rm = pyvisa.ResourceManager()
instr = rm.open_resource(name)
return SR830(instr)
def __str__(self):
return "SR830"

View File

@ -0,0 +1,60 @@
from ..base import MeasurementDevice
from typing import Callable
from time import time as now
import numpy as np
class TestVoltageMeasurementDevice(MeasurementDevice):
def __init__(self, amplitude: float=1.0, frequency: float=20.0):
super().__init__()
self.amplitude = amplitude
self.frequency = frequency
self.t0 = now()
def test_connection(self) -> None:
pass
# RUN COMMANDS ON THE DEVICE
def run(self, code, verbose=False):
pass
def run_script(self, script_path, verbose=False):
pass
def reset(self, verbose=False):
pass
def read_value(self) -> tuple[float, float]:
"""
Read a single value
Returns
-------
[timestamp, voltage]
"""
t = now() - self.t0
v = self.amplitude * np.sin(2 * np.pi * t / self.frequency)
return t, v
def measure(self, interval: int, update_func: Callable[None, [int, float, float]] | None = None,
max_measurements: int | None = None):
"""
Take voltage readings after <interval> milliseconds.
Parameters
----------
interval : int
Number of milliseconds to wait between readings.
update_func : Callable[None, [int, float, float]] or None, optional
A function that is called after each reading with parameters <n_reading>, <time>, <voltage>. The default is None.
max_measurements : int or None, optional
Number of readings to perform. Set to None for infinite. The default is None.
Returns
-------
None.
"""
pass
def __str__(self):
return "Simulated Voltage Measurement Device"

View File

View File

@ -0,0 +1,50 @@
from os import environ, makedirs, path
import yaml
import logging
log = logging.getLogger(__name__)
class ConfigFile:
"""
Class managing a yaml config file.
The file is loaded on creation and can be saved with .save().
You may initialize an instance with empty string as filepath,
in this case no file will not be loaded or saved.
"""
def __init__(self, filepath: str, init_values = None):
self.values = {}
if init_values:
self.values = init_values
self.filepath = filepath
if path.isfile(self.filepath):
log.debug(f"[{self.filepath}] loading from file")
with open(self.filepath, "r") as file:
self.values |= yaml.safe_load(file)
def save(self):
if not self.filepath: return
directory = path.dirname(self.filepath)
if not path.isdir(directory):
makedirs(directory)
log.debug(f"[{self.filepath}] saving to file")
with open(self.filepath, "w") as file:
yaml.dump(self.values, file)
def get_or(self, name: str, default):
if name in self.values: return self.values[name]
return default
def get(self, name: str):
if name in self.values: return self.values[name]
raise KeyError(f"Key '{name}' not found in config file '{self.filepath}'")
def set(self, name: str, value):
log.debug(f"[{self.filepath}] set {name} = {value}")
self.values[name] = value
def get_values(self):
return self.values.copy()
def set_values(self, values):
log.debug(f"[{self.filepath}] set values = {values}")
self.values = values

280
prsctl/utility/data.py Normal file
View File

@ -0,0 +1,280 @@
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import pickle
import logging
log = logging.getLogger(__name__)
from cpdctrl.utility.file_io import get_next_filename, sanitize_filename
FLUSH_TYPE = "pickle-ndarray"
FLUSH_PREFIX = "PART_"
METADATA_FILENAME = FLUSH_PREFIX + "measurement_metadata.pkl"
class CpdData:
"""
Class managing data and metadata.
Can be initialized from data directly, or a file or directory path.
"""
columns = ["idx", "t [s]", "V [V]", "LED [%]"]
def __init__(self, path:str|None=None, data:np.ndarray|None=None, metadata:dict|None=None, verbose=False):
self.data = data
if type(metadata) == dict:
self.metadata = metadata
else:
self.metadata = {}
if data is None and path is None:
raise ValueError("Either path or data must be defined.")
if data is not None and path is not None:
raise ValueError("Either path or data must be defined, but not both.")
if path is not None: # load from file
if os.path.isdir(path):
self.data, md = CpdData.load_data_from_dir(path, verbose=verbose)
self.metadata |= md
elif os.path.isfile(path):
if path.endswith(".csv"):
self.data, md = CpdData.load_data_from_csv(path)
self.metadata |= md
elif path.endswith(".pkl"):
self.data, md = CpdData.load_data_from_pkl(path)
self.metadata |= md
else:
raise NotImplementedError(f"Only .csv and .pkl files are supported")
else:
raise FileNotFoundError(f"Path '{path}' is neither a file nor a directory.")
else:
self.data = data
# Convert data
def to_dataframe(self):
df = pd.DataFrame(self.data, columns=CpdData.columns)
df.meta = str(self.metadata)
return df
def to_csv(self, sep=","):
# self.to_dataframe().to_csv(os.path.join(self.path, self.name + ".csv"), index=False, metadata=True)
return CpdData.get_csv(self.data, self.metadata, sep=sep)
def save_csv_at(self, filepath, sep=",", verbose=False):
if verbose: print(f"Writing csv to {filepath}")
log.info(f"Writing csv to {filepath}")
with open(filepath, "w") as file:
file.write(self.to_csv(sep=sep))
def save_csv(self, sep=",", verbose=False):
"""Save the csv inside the data directory"""
filepath = os.path.join(self.path, self.dirname + ".csv")
self.save_csv_at(filepath, sep, verbose)
# STATIC CONVERTER
@staticmethod
def get_csv(data, metadata, sep=","):
csv = ""
for k, v in metadata.items():
csv += f"# {k}: {v}\n"
csv += "".join(f"{colname}{sep}" for colname in CpdData.columns).strip(sep) + "\n"
for i in range(data.shape[0]):
csv += f"{i}{sep}{data[i,1]}{sep}{data[i,2]}{sep}{data[i,3]}\n"
return csv.strip("\n")
# STATIC LOADERS
@staticmethod
def load_data_from_csv(filepath:str, sep: str=",") -> tuple[np.ndarray, dict]:
"""
Loads data from a single csv file.
Lines with this format are interpreted as metadata:
# key: value
Lines with this format are interpreted as data:
index, timestamp [s], CPD [V], LED [%]
Parameters
----------
filepath
Path to the csv file.
sep
csv separator
Returns
-------
data
2D numpy array with shape (n, 4) where n is the number of data points.
metadata
Dictionary with metadata.
"""
metadata = {}
with open(filepath, "r") as f:
# this loop will read the metadata at the beginning and skip also the header row
for line in f:
if line.startswith("#"):
colon = line.find(":")
if colon == -1: # normal comment
continue
key = line[1:colon].strip()
value = line[colon+1:].strip()
metadata[key] = value
else:
break
# here, the generator has only data lines
data = np.loadtxt(f, delimiter=sep)
return data, metadata
@staticmethod
def load_data_from_pkl(filepath:str) -> tuple[np.ndarray, dict]:
"""
Loads data from a single csv file.
Lines with this format are interpreted as metadata:
# key: value
Lines with this format are interpreted as data:
index, timestamp [s], CPD [V], LED [%]
Parameters
----------
filepath
Path to the csv file.
sep
csv separator
Returns
-------
data
2D numpy array with shape (n, 4) where n is the number of data points.
metadata
Dictionary with metadata.
"""
data = None
metadata = {}
with open(filepath, "rb") as f:
obj = pickle.load(f)
if isinstance(obj, tuple):
if not len(obj) == 2:
raise ValueError(f"Pickle file is a tuple with length {len(obj)}, however it must be 2: (data, metadata)")
data = obj[0]
metadata = obj[1]
if not isinstance(data, np.ndarray):
raise ValueError(f"First object in tuple is not a numpy.ndarray")
elif isinstance(obj, np.ndarray):
data = obj
else:
raise ValueError(f"Pickled object must be either numpy.ndarray or (numpy.ndarray, dict), but is of type {type(obj)}")
# must be loaded by now
if not len(data.shape) == 2 and data.shape[1] == 4:
raise ValueError(f"numpy.ndarray has invalid shape: {data.shape}, however the shape must be (N, 4)")
if not isinstance(metadata, dict):
raise ValueError(f"Metadata is not a of type dict")
return data, metadata
@staticmethod
def load_data_from_dir(dirpath:str, verbose:bool=False) -> tuple[np.ndarray, dict]:
"""
Combines all data files with the FLUSH_PREFIX from a directory into a numpy array
Parameters
----------
dirpath : str
Path to the data directory
verbose : bool, optional
If True, print a message for every file that is opened. The default is False.
Raises
------
NotImplementedError
DESCRIPTION.
Returns
-------
data : ndarray
First index: Measurement
Second index: (index, timestamp [s], CPD [V], LED [%])
"""
files = os.listdir(dirpath)
files.sort()
data = np.empty((0, 4))
metadata = {}
for filename in files:
filepath = os.path.join(dirpath, filename)
if filename.startswith(FLUSH_PREFIX):
if filename.endswith(".csv"):
if verbose: print(f"Opening {filepath} as csv")
df = pd.read_csv(filepath)
arr = df.to_numpy()
data = np.concatenate((data, arr))
elif filename.endswith(".ndarray.pkl"):
with open(filepath, "rb") as file:
arr = pickle.load(file)
if len(arr.shape) != 2 or arr.shape[1] != 4:
print(f"Skipping file '{filepath}' with invalid array shape: {arr.shape}")
continue
data = np.concatenate((data, arr))
elif filename == METADATA_FILENAME: # Metadata filename must also start with FLUSH_PREFIX
with open(filepath, "rb") as file:
metadata = pickle.load(file)
else:
raise NotImplementedError(f"Unknown file extension for file '{filepath}'")
else:
log.info(f"Skipping unknown file: '{filepath}'")
return data, metadata
def plot_cpd_data(data: str or pd.DataFrame or np.ndarray, t: str="seconds", title: str="", CPD:bool=True, LED:bool=True):
"""
Plot recorded data
Parameters
----------
data : str or np.ndarray
Path to the data directory or
numpy array with columns (idx, t [s], V [V], LED [%])
t : str, optional
Which timescale to use for the x axis:
Must be one of "seconds", "mintutes", "hours".
The default is "seconds".
title : str, optional
Title for the plot. The default is "".
CPD : bool, optional
Wether to plot the voltage (CPD) line. The default is True.
LED : bool, optional
Wether to plot the LED state line. The default is False.
Returns
-------
fig : TYPE
Matplotlib figure object.
"""
if type(data) == str:
_data, _ = CpdData.load_data_from_dir(data)
else:
_data = data
fig, ax = plt.subplots()
xdata = _data[:,1].copy()
xlabel = "t [s]"
if t == "minutes":
xdata /= 60
xlabel = "t [minutes]"
elif t == "hours":
xdata /= 3600
xlabel = "t [hours]"
ax.set_xlabel(xlabel)
ax_cpd = ax
ax_led = ax
if CPD and LED:
ax_led = ax.twinx()
if CPD:
ax_cpd = ax
ax_cpd.set_ylabel("CPD [V]")
ax_cpd.plot(xdata, _data[:,2], color="blue", label="CPD")
if LED:
ax_led.set_ylabel("LED [%]")
ax_led.plot(xdata, _data[:,3], color="orange", label="LED")
ax_led.set_ylim(-2, 102)
ax_led.set_yticks([0, 20, 40, 60, 80, 100])
if CPD and LED:
# ax_led.legend()
# ax_cpd.legend()
pass
if title:
ax.set_title(title)
fig.tight_layout()
return fig

View File

@ -0,0 +1,141 @@
import pandas as pd
import numpy as np
import os
import datetime
import pickle
import logging
log = logging.getLogger(__name__)
from cpdctrl.utility.file_io import get_next_filename, sanitize_filename
from cpdctrl.utility.data import CpdData, FLUSH_TYPE, FLUSH_PREFIX, METADATA_FILENAME
class DataCollector:
"""
Class managing data collection and partial storage
"""
def __init__(self,
data_path: str,
data_name: str="CPData",
metadata: dict[str, str]={},
dirname: str|None=None,
add_number_if_dir_exists=True,
):
self.data = []
self.cpd_data = None # if loaded, this contains the final numpy array
self.name = data_name
self.metadata = metadata
self.path = os.path.abspath(os.path.expanduser(data_path))
if dirname is None:
self.dirname = sanitize_filename(datetime.datetime.now().strftime("%Y-%m-%d_%H-%M") + "_" + self.name)
else:
self.dirname = sanitize_filename(dirname)
self.dirpath = os.path.join(self.path, self.dirname)
if os.path.exists(self.dirpath):
if not add_number_if_dir_exists:
raise Exception(f"Directory '{self.dirname}' already exists. Provide a different directory or pass `add_number_if_dir_exists=True` to ignore this")
else:
i = 1
dirpath = f"{self.dirpath}-{i}"
while os.path.exists(dirpath):
i += 1
dirpath = f"{self.dirpath}-{i}"
print(f"Directory '{self.dirname}' already exists. Trying '{dirpath}' instead")
self.dirpath = dirpath
self.assert_directory_exists()
self.flushed = False
# OPERATION
def clear(self):
self.data = []
self.cpd_data = None
def add_data(self, i, t, v, l):
self.data.append((i, t, v, l))
self.cpd_data = None # no longer up to date
def flush(self, verbose: bool = False):
"""
Write the current data to a file and clear the internal data
Parameters
----------
verbose : bool, optional
If True, print a message when flushing data. The default is False.
Raises
------
ValueError
If the FLUSH_TYPE is invalid.
Returns
-------
None.
"""
# dont flush empty data
if len(self.data) == 0:
return
self.assert_directory_exists()
if FLUSH_TYPE == "csv":
filename = self._get_flush_filename() + ".csv"
filepath = os.path.join(self.dirpath, filename)
log.info(f"Flushing data to {filepath}")
if verbose: print(f"Flushing data to {filepath}")
df = pd.DataFrame(self.data, columns=CpdData.columns)
df.meta = str(self.metadata)
df.to_csv(filepath, sep=",", index=False, metadata=True)
elif FLUSH_TYPE == "pickle-ndarray":
filename = self._get_flush_filename() + ".ndarray.pkl"
filepath = os.path.join(self.dirpath, filename)
log.info(f"Flushing data to {filepath}")
if verbose: print(f"Flushing data to {filepath}")
with open(filepath, "wb") as file:
pickle.dump(np.array(self.data), file)
else:
raise ValueError(f"Invalid FLUSH_TYPE: '{FLUSH_TYPE}'")
self.clear()
# File IO
def _get_flush_filename(self):
"""Get the filename of the next partial file, incrementing the number every time"""
return sanitize_filename(get_next_filename(FLUSH_PREFIX + self.name, self.dirpath, digits=5))
def assert_directory_exists(self):
if not os.path.isdir(self.dirpath):
os.makedirs(self.dirpath)
def get_data(self) -> CpdData:
"""
Load the full data and return it together with the metadata
Returns
-------
tuple[np.ndarray, dict]
The full data and the metadata
"""
if self.cpd_data is None:
self.cpd_data = CpdData(path=self.dirpath, metadata=self.metadata)
return self.cpd_data
def save_csv_in_dir(self, sep=",", verbose=False):
"""Save full data as csv inside the directory with temporary data"""
self.get_data()
filepath = os.path.join(self.dirpath, self.dirname + ".csv")
self.cpd_data.save_csv_at(filepath, sep, verbose)
def write_metadata(self):
f"""
Write the metadata to the disk as '{METADATA_FILENAME}'
Returns
-------
None.
"""
filepath = os.path.join(self.dirpath, METADATA_FILENAME)
log.debug(f"Writing metadata to {filepath}")
with open(filepath, "wb") as file:
pickle.dump(self.metadata, file)

View File

@ -0,0 +1,26 @@
def select_device_interactive(type_devices_dict: dict[str, list[str]], prompt="Select an instrument: ") -> tuple[str, str]:
"""
Select a device interactively from the command line
Parameters
----------
type_devices_dict
A dictionary of device types and their corresponding device names
-------
The type and name of the selected device.
These can be passed to the connect_device method of the led_control_device or voltage_measurement_device libraries
"""
res = type_devices_dict
flat_res = [ (t, v) for t, l in res.items() for v in l ]
for i, (t,v) in enumerate(flat_res):
print(f"{i+1:02}: {t} - {v}")
while len(flat_res) > 0:
try:
instr = int(input(prompt)) - 1
if instr < 0 or instr >= len(flat_res):
raise ValueError
return flat_res[instr]
except ValueError:
print(f"Enter a number between 1 and {len(flat_res)}")
continue
raise Exception("No devices found")

37
prsctl/utility/file_io.py Normal file
View File

@ -0,0 +1,37 @@
from os import listdir, path
import re
def add_zeros(v: int, digits=3):
"""
return v as string, add leading zeros if len(str(v)) < digits
"""
s = str(v)
return '0' * (max(digits - len(s), 0)) + s
def get_next_filename(basename, directory=".", digits=3):
"""
get the next filename (without extenstion).
example:
basename = file
directory has file001.csv, file002.bla.pkl, file004.csv
-> return file005
"""
files = listdir(directory)
files.sort()
files.reverse()
lowest_number = -1
for file in files:
if not file.startswith(basename): continue
try:
number = file[:file.find('.')].replace(basename, "")
number = int(number)
if number < lowest_number: continue
lowest_number = number
except ValueError:
continue
return basename + add_zeros(lowest_number+1, digits)
def sanitize_filename(filename):
return re.sub(r'[\\/*?:"<>|]',"", filename)

34
prsctl/utility/visa.py Normal file
View File

@ -0,0 +1,34 @@
import pyvisa
import logging
log = logging.getLogger(__name__)
def enumerate_devices(device_name, query="(GPIB)|(USB)?*:INSTR", visa_backend=""):
"""
Return all available visa resources that match the query and the device name
Parameters
----------
device_name
A part of the name that the device is supposed upon the '*IDN?' query
query
A query to the visa resource manager, to filter the resources
visa_backend
The visa backend to use, if not the default one
Returns
-------
List of visa resource names
"""
rm = pyvisa.ResourceManager(visa_backend)
res = []
for r in rm.list_resources(query):
try:
instr = rm.open_resource(r)
name = instr.query('*IDN?')
if device_name in name:
res.append(r)
instr.close()
except:
log.debug(f"Could not open Visa resources {r}")
return res