This commit is contained in:
CPD 2025-02-07 17:13:04 +01:00
parent c5c016399b
commit 1e46aaa176
6 changed files with 150 additions and 77 deletions

View File

@ -1 +1,15 @@
# CPDCTRL # CPDCTRL
A interactive command-line utility for measuring CPD (contact potential difference).
To run it, launch **Powershell**, activate the python virtual environment and then run the script with `ipython`
```ps1
cd cpdctrl
.\venv\Scripts\Activate.ps1
ipython -i .\cpdctrl\cpdctrl\cpdctrl-interactive.py
```
## Devices
Currently requires
- *Keithley 2700 SMU* for measuring the voltage (CPD)
- *Arduino Nano* connected to a *Thorlabs LEDD1B* for controlling the light source (LED)

View File

@ -52,9 +52,7 @@ from .led_script import LedScript
from .measurement import measure as _measure from .measurement import measure as _measure
from .utility.data import DataCollector from .utility.data import DataCollector
from .utility.data import plot_cpd_data as data_plot
from .utility import data as _data
from .utility.data import load_dataframe, plot_cpd_data
from .utility import file_io from .utility import file_io
from .update_funcs import _Monitor, _update_print from .update_funcs import _Monitor, _update_print
@ -66,9 +64,11 @@ _runtime_vars = {
settings = { settings = {
"datadir": path.expanduser("~/data"), "datadir": path.expanduser("~/data"),
"name": "measurement", "name": "interactive-test",
"interval": 0.02, "led": "unkown",
"beep": True, "interval": 0.5,
"flush_after": 3000,
"use_buffer": False,
} }
test = False test = False
@ -76,33 +76,67 @@ test = False
# global variable for the instrument/client returned by pyvisa/bleak # global variable for the instrument/client returned by pyvisa/bleak
dev: VoltageMeasurementDevice|None = None dev: VoltageMeasurementDevice|None = None
led: LedControlDevice|None = None led: LedControlDevice|None = None
data = DataCollector(data_path=settings["datadir"], data_name="interactive", dirname="interactive_test", dir_exists_is_ok=True) data_collector = DataCollector(data_path=settings["datadir"], data_name="interactive", dirname="interactive_test", dir_exists_is_ok=True)
t0 = 0 t0 = 0
data = None
md = None
def monitor(script: str|int=0, interval=None, flush_after=None, use_buffer=False, max_measurements=None, max_points_shown=None): def monitor(script: str|int=0, interval: float|None=None, metadata:dict={}, flush_after: int|None=None, use_buffer: bool|None=None, max_measurements=None, max_points_shown=None):
""" """
Monitor the voltage with matplotlib. Monitor the voltage with matplotlib.
- Opens a matplotlib window and takes measurements depending on settings["interval"]
- Waits for the user to press a key
@details: If use_buffer=False, uses python's time.sleep() for waiting the interval, which is not very precise.
- Resets the buffers With use_buffer=True, the timing of the voltage data readings will be very precise, however,
- Opens a matplotlib window and takes measurements depending on settings["interval"] the led updates may deviate up to <interval>.
- Waits for the user to press a key
Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision. The data is automatically saved to "<date>_<time>_<name>" in the data directory.
You can take the data from the buffer afterwards, using save_csv.
@param max_points_shown : how many points should be shown at once. None means infinite Parameters
@param max_measurements : maximum number of measurements. None means infinite ----------
script : str|int
Path to a led script file or a constant value between 0 and 100 for the LED.
interval : float|None
Time between measurements.
If None, the value is taken from the settings.
metadata : dict
Metadata to append to the data header.
The set interval as well as the setting for 'name' and 'led' are automatically added.
flush_after : int|None
Flush the data to disk after <flush_after> readings
If None, the value is taken from the settings.
use_buffer : Bool
If True, use the voltage measurement device's internal buffer for readings, which leads to more accurate timings.
If None, the value is taken from the settings.
max_points_shown : int|None
how many points should be shown at once. None means infinite
max_measurements : maximum number of measurements. None means infinite
""" """
global _runtime_vars, data, dev, led global _runtime_vars, data_collector, dev, led
global data, md
_runtime_vars["last_measurement"] = dtime.now().isoformat() _runtime_vars["last_measurement"] = dtime.now().isoformat()
if not interval: interval = settings["interval"] if interval is None: interval = settings["interval"]
if flush_after is None: flush_after = settings["flush_after"]
if use_buffer is None: use_buffer = settings["use_buffer"]
# set metadata
metadata["interval"] = str(interval)
metadata["name"] = settings["name"]
metadata["led"] = settings["led"]
metadata["led_script"] = str(script)
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'data.save_csv()' afterwards.") print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'data.save_csv()' afterwards.")
plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown) plt.ion()
plt_monitor = _Monitor(use_print=False, max_points_shown=max_points_shown)
led_script = LedScript(script=script, auto_update=True, verbose=True) led_script = LedScript(script=script, auto_update=True, verbose=True)
data.clear() data_collector = DataCollector(metadata=metadata, data_path=settings["datadir"], data_name=settings["name"])
# data_collector.clear()
data_queue = mp.Queue() data_queue = mp.Queue()
command_queue = mp.Queue() command_queue = mp.Queue()
# Argument order must match the definition # Argument order must match the definition
proc_measure = mt.Thread(target=_measure, args=(dev, led, led_script, data, proc_measure = mt.Thread(target=_measure, args=(dev,
led,
led_script,
data_collector,
interval, interval,
flush_after, flush_after,
use_buffer, use_buffer,
@ -125,9 +159,36 @@ def monitor(script: str|int=0, interval=None, flush_after=None, use_buffer=False
command_queue.put("stop") command_queue.put("stop")
proc_measure.join() proc_measure.join()
led_script.stop_updating() # stop watching for file updates (if enabled) led_script.stop_updating() # stop watching for file updates (if enabled)
data.save_csv(verbose=True) data_collector.save_csv(verbose=True)
data, metadata = data_collector.get_data()
fig = data_plot(data, CPD=True, LED=True)
plt.ioff()
fig_path = path.join(data_collector.path, data_collector.dirname + ".pdf")
fig.savefig(fig_path)
# DATA
def data_load(dirname:str) -> tuple[np.ndarray, dict]:
"""
Load data in directory <dirname> in the data directory as numpy array.
Sets the `data` and `md` variables
Parameters
----------
dirname : str
Absolute path to the directory containing the measurement or directory name in the data directory.
"""
global data, md
if path.isabs(dirname):
dirpath = dirname
else:
dirpath = path.join(settings["datadir"], dirname)
data, md = DataCollector.load_data(dirpath, verbose=True)
# data_plot imported
# SETTINGS
def set(setting, value): def set(setting, value):
global settings, config_path global settings, config_path
if setting in settings: if setting in settings:
@ -154,27 +215,20 @@ def help(topic=None):
if topic == None: if topic == None:
print(""" print("""
Functions: Functions:
measure [kat] - take measurements monitor - take measurements with live monitoring in a matplotlib window
monitor [kat] - take measurements with live monitoring in a matplotlib window data_load - load data from a directory
measure_count [kat] - take a fixed number of measurements data_plot - plot a data array
monitor_count [kat] - take a fixed number of measurements with live monitoring in a matplotlib window Run 'help(function)' to see more information on a function
repeat [kat] - measure and save to csv multiple times
get_dataframe [kat] - return device internal buffer as pandas dataframe
save_csv [kat] - save the last measurement as csv file
save_pickle [kat] - save the last measurement as pickled pandas dataframe
load_dataframe [kat] - load a pandas dataframe from csv or pickle
run_script [k ] - run a lua script on the Keithely device
Run 'help(function)' to see more information on a function
Available topics: Available topics:
imports imports
device
settings settings
Run 'help("topic")' to see more information on a topic""") Run 'help("topic")' to see more information on a topic""")
elif topic in [settings, "settings"]: elif topic in [settings, "settings"]:
print("""Settings: print("""Settings:
name: str - name of the measurement, determines filename of 'save_csv' name: str - name of the measurement, determines filename
led: str - name/model of the LED that is being used
datadir: str - output directory for the csv files datadir: str - output directory for the csv files
interval: int - interval (inverse frequency) of the measurements, in seconds interval: int - interval (inverse frequency) of the measurements, in seconds
beep: bool - wether the device should beep or not beep: bool - wether the device should beep or not
@ -196,16 +250,8 @@ Functions:
pandas as pd pandas as pd
matplotlib.pyplot as plt matplotlib.pyplot as plt
os.path """) os.path """)
elif topic == "device":
print("""Device:
keithley backend:
The opened pyvisa resource (deveithley device) is the global variable 'dev'.
You can interact using pyvisa functions, such as
k.write("command"), k.query("command") etc. to interact with the device.
arduino backend:
The Arduino will be avaiable as BleakClient using the global variable 'dev'. """)
else: else:
print(topic.__doc__) print(topic.__doc__.strip(" ").strip("\n"))
def init(): def init():
@ -230,7 +276,6 @@ Enter 'help()' for a list of commands""")
if args["config"]: if args["config"]:
config_path = args["config"] config_path = args["config"]
if not path.isdir(path.dirname(config_path)): if not path.isdir(path.dirname(config_path)):
makedirs(path.dirname(config_path)) makedirs(path.dirname(config_path))

View File

@ -1,5 +1,4 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Callable
""" """
Created on Tue Jan 21 16:26:13 2025 Created on Tue Jan 21 16:26:13 2025

View File

@ -13,8 +13,6 @@ from cpdctrl.utility.data import DataCollector
import time import time
import datetime import datetime
from multiprocessing import Pipe
from multiprocessing.connection import Connection
from queue import Queue from queue import Queue
def measure( def measure(
@ -78,7 +76,7 @@ def measure(
# if no "time" in metadata, set the current local time in ISO 8601 format # if no "time" in metadata, set the current local time in ISO 8601 format
# and without microseconds # and without microseconds
if not "time" in data.metadata: if not "time" in data.metadata:
data.metadata["time"] = datetime.datetime.now().replace(microsecond=0).isoformat() data.metadata["time"] = datetime.datetime.now().replace(microsecond=0).astimezone().isoformat()
data.metadata["test"] = "TEST" data.metadata["test"] = "TEST"
vm_dev.reset(True) vm_dev.reset(True)
if use_buffer: if use_buffer:

View File

@ -19,6 +19,7 @@ class DataCollector:
dir_exists_is_ok=False, dir_exists_is_ok=False,
): ):
self.data = [] self.data = []
self.fulldata = None # if loaded, this contains the final numpy array
self.name = data_name self.name = data_name
self.metadata = metadata self.metadata = metadata
self.path = os.path.abspath(os.path.expanduser(data_path)) self.path = os.path.abspath(os.path.expanduser(data_path))
@ -70,6 +71,9 @@ class DataCollector:
None. None.
""" """
# dont flush empty data
if len(self.data) == 0:
return
# TODO check if dir still exists # TODO check if dir still exists
if FLUSH_TYPE == "csv": if FLUSH_TYPE == "csv":
filename = self._get_filename() + ".csv" filename = self._get_filename() + ".csv"
@ -84,14 +88,15 @@ class DataCollector:
pickle.dump(np.array(self.data), file) pickle.dump(np.array(self.data), file)
else: else:
raise ValueError(f"Invalid FLUSH_TYPE: '{FLUSH_TYPE}'") raise ValueError(f"Invalid FLUSH_TYPE: '{FLUSH_TYPE}'")
self.data = [] self.clear()
self.flushed = True
def clear(self): def clear(self):
self.data = [] self.data = []
self.fulldata = None
def add_data(self, i, t, v, l): def add_data(self, i, t, v, l):
self.data.append((i, t, v, l)) self.data.append((i, t, v, l))
self.fulldata = None # no longer up to date
def to_dataframe(self): def to_dataframe(self):
df = pd.DataFrame(self.data, columns=DataCollector.columns) df = pd.DataFrame(self.data, columns=DataCollector.columns)
@ -104,13 +109,16 @@ class DataCollector:
return DataCollector.get_csv(data, self.metadata, sep=sep) return DataCollector.get_csv(data, self.metadata, sep=sep)
def save_csv(self, sep=",", verbose=False): def save_csv(self, sep=",", verbose=False):
filepath = os.path.join(self.path, self.name + ".csv") filepath = os.path.join(self.path, self.dirname + ".csv")
if verbose: print(f"Writing data to {filepath}") if verbose: print(f"Writing data to {filepath}")
with open(filepath, "w") as file: with open(filepath, "w") as file:
file.write(self.to_csv(sep=sep)) file.write(self.to_csv(sep=sep))
def get_data(self): def get_data(self) -> tuple[np.ndarray, dict]:
return DataCollector.load_data(self.dirpath) if self.fulldata is None:
return DataCollector.load_data(self.dirpath)
else:
return self.fulldata, self.metadata
@staticmethod @staticmethod
def get_csv(data, metadata, sep=","): def get_csv(data, metadata, sep=","):
@ -123,7 +131,7 @@ class DataCollector:
return csv.strip("\n") return csv.strip("\n")
@staticmethod @staticmethod
def load_data(dirpath:str, verbose:bool=False) -> np.ndarray: def load_data(dirpath:str, verbose:bool=False) -> tuple[np.ndarray, dict]:
""" """
Combines all data files from a directory into a numpy array Combines all data files from a directory into a numpy array
@ -159,6 +167,9 @@ class DataCollector:
elif filename.endswith(".ndarray.pkl"): elif filename.endswith(".ndarray.pkl"):
with open(filepath, "rb") as file: with open(filepath, "rb") as file:
arr = pickle.load(file) arr = pickle.load(file)
if len(arr.shape) != 2 or arr.shape[1] != 4:
print(f"Skipping file '{filepath}' with invalid array shape: {arr.shape}")
continue
data = np.concatenate((data, arr)) data = np.concatenate((data, arr))
elif filename == METADATA_FILENAME: elif filename == METADATA_FILENAME:
with open(filepath, "rb") as file: with open(filepath, "rb") as file:
@ -168,29 +179,33 @@ class DataCollector:
return data, metadata return data, metadata
def load_dataframe(p:str): def plot_cpd_data(data: str or pd.DataFrame or np.ndarray, t: str="seconds", title: str="", CPD:bool=True, LED:bool=True):
"""
Load a dataframe from file.
@param p : path of the file. If it has 'csv' extension, pandas.read_csv is used, pandas.read_pickle otherwise
"""
if not os.path.isfile(p):
print(f"ERROR: load_dataframe: File does not exist: {p}")
return None
if p.endswith(".csv"):
df = pd.read_csv(p)
else:
df = pd.read_pickle(p)
return df
def plot_cpd_data(data: str or pd.DataFrame or np.ndarray, t="seconds", title="", CPD:bool=True, LED:bool=False):
""" """
Plot recorded data Plot recorded data
@param data: filepath, dataframe or numpy array
Parameters
----------
data : str or np.ndarray
Path to the data directory or
numpy array with columns (idx, t [s], V [V], LED [%])
t : str, optional
Which timescale to use for the x axis:
Must be one of "seconds", "mintutes", "hours".
The default is "seconds".
title : str, optional
Title for the plot. The default is "".
CPD : bool, optional
Wether to plot the voltage (CPD) line. The default is True.
LED : bool, optional
Wether to plot the LED state line. The default is False.
Returns
-------
fig : TYPE
Matplotlib figure object.
""" """
if type(data) == str: if type(data) == str:
_data = load_dataframe(data).to_numpy() _data, _ = DataCollector.load_data(data)
elif type(data) == pd.DataFrame:
_data = data.to_numpy()
else: else:
_data = data _data = data
fig, ax = plt.subplots() fig, ax = plt.subplots()
@ -214,6 +229,8 @@ def plot_cpd_data(data: str or pd.DataFrame or np.ndarray, t="seconds", title=""
if LED: if LED:
ax_led.set_ylabel("LED [%]") ax_led.set_ylabel("LED [%]")
ax_led.plot(xdata, _data[:,3], color="orange", label="LED") ax_led.plot(xdata, _data[:,3], color="orange", label="LED")
ax_led.set_ylim(-2, 102)
ax_led.set_yticks([0, 20, 40, 60, 80, 100])
if CPD and LED: if CPD and LED:
# ax_led.legend() # ax_led.legend()
# ax_cpd.legend() # ax_cpd.legend()

View File

@ -4,9 +4,9 @@ requires = ["setuptools"]
[project] [project]
name = "cpdctrl" name = "cpdctrl"
version = "1.1.0" version = "1.1.0"
description = "Utility for voltage measurements with a Keitley 2700 SMU" description = "Utility for CPD measurements with a Keitley 2700 SMU and an Arduino-controlled light source"
requires-python = ">=3.10" requires-python = ">=3.10"
readme = "readme.md" readme = "README.md"
license = {file = "LICENSE"} license = {file = "LICENSE"}
authors = [ authors = [
{ name = "Matthias Quintern", email = "matthias.quintern@tum.de" } { name = "Matthias Quintern", email = "matthias.quintern@tum.de" }