Long term measurement

This commit is contained in:
CPD 2025-01-24 18:29:59 +01:00
parent c0158e2478
commit c73928046f
12 changed files with 193 additions and 21 deletions

View File

@ -0,0 +1 @@
from cpdctrl.voltage_measurement_device.base import VoltageMeasurementDevice

View File

@ -40,10 +40,14 @@ if __name__ == "__main__":
parser.add_argument("-c", "--config", action="store", help="alternate path to config file")
args = vars(parser.parse_args())
from .voltage_measurement.base import VoltageMeasurementDevice as VDev
from .voltage_measurement.impl import keithley2700 as _volt
from .led_control.impl import thorlabs_ledd1b as _led
from .voltage_measurement_device.base import VoltageMeasurementDevice
from .voltage_measurement_device.impl import keithley2700 as _volt
from .led_control_device.base import LedControlDevice
from .led_control_device.impl import thorlabs_ledd1b as _led
from .led_script import LedScript
from .measurement import Measurement
from .utility.data import DataCollector
@ -69,8 +73,9 @@ settings = {
test = False
# global variable for the instrument/client returned by pyvisa/bleak
dev:VDev|None = None
data = DataCollector(settings["datadir"], settings["name"])
dev: VoltageMeasurementDevice|None = None
led: LedControlDevice|None = None
# data = DataCollector(settings["datadir"], settings["name"])
t0 = 0
def monitor(interval=None, max_measurements=None, max_points_shown=160):
@ -239,7 +244,7 @@ Functions:
def init():
global dev, settings, config_path
global dev, led, settings, config_path
print(r""" .___ __ .__
____ ______ __| _/_____/ |________| |
_/ ___\\____ \ / __ |/ ___\ __\_ __ \ |
@ -260,7 +265,7 @@ Enter 'help()' for a list of commands""")
if args["config"]:
config_path = args["config"]
if not path.isdir(path.dirname(config_path)):
makedirs(path.dirname(config_path))
@ -271,7 +276,7 @@ Enter 'help()' for a list of commands""")
makedirs(settings["datadir"])
try:
dev = _volt.init()
dev = _volt.init("GPIB0::22::INSTR")
led = _led.LEDD1B()
except Exception as e:
print(e)

View File

@ -1,2 +0,0 @@
# -*- coding: utf-8 -*-

View File

@ -0,0 +1 @@
from cpdctrl.led_control_device.base import LedControlDevice

60
cpdctrl/led_script.py Normal file
View File

@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 24 16:46:06 2025
@author: CPD
"""
import time
class LedScript:
def __init__(self):
self.t_start = 0
def start(self) -> int:
"""
Start the script and return the initial LED state
Returns
-------
int
LED Intensity [0,100]
"""
self.t_start = time.time()
return 0
def get_state(self, t: None|float=None) -> int:
"""
Get the LED statefrom an absolute time (relative to when `start` was called)
Parameters
----------
t : None|float, optional
Absolute time since epoch or None.
If None, the current time will be used (using python `time.time()`
The default is None.
Returns
-------
int
LED Intensity [0,100]
"""
if t is None:
t = time.time()
dt = t - self.t_start
return self.get_relative_state(dt)
def get_relative_state(self, dt: float) -> int:
"""
Get the LED state from a script-relative time
Parameters
----------
dt : float
Number of seconds from the starting point of the script
Returns
-------
int
LED Intensity [0,100]
"""
return 0

58
cpdctrl/measurement.py Normal file
View File

@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 24 15:18:31 2025
@author: Matthias Quintern
"""
from cpdctrl.voltage_measurement_device.base import VoltageMeasurementDevice
from cpdctrl.led_control_device.base import LedControlDevice
from cpdctrl.led_script import LedScript
from cpdctrl.utility.data import DataCollector
import time
import datetime
class Measurement:
def __init__(self,
vm_dev: VoltageMeasurementDevice,
led_dev: LedControlDevice,
led_script: LedScript,
data: DataCollector,
):
self.vm_dev = vm_dev
self.led_dev = led_dev
self.led_script = led_script
self.data = data
def measure_finite(self, delta_t: 0.1, flush_after:int|None=None, max_measurements=None, verbose=False):
# if no "time" in header, set the current local time in ISO 8601 format
# and without microseconds
if not "time" in self.data.header:
self.data.header["time"] = datetime.datetime.now().replace(microsecond=0).isoformat()
self.vm_dev.reset(True)
try:
i = 0
led_val = self.led_script.start()
t_iter_start = time.time()
while max_measurements is None or i < max_measurements:
tval, vval = self.vm_dev.read_value()
if i == 0:
t0 = tval
tval -= t0
self.data.add_data(i, tval, vval, led_val)
print(f"n = {i:6d}, t = {tval: .2f} s, U = {vval: .5f} V, LED = {led_val:03}%" + " "*10, end='\r')
if flush_after is not None and (i+1) % flush_after == 0:
self.data.flush(verbose=verbose)
# substract the execution time from the sleep time for a more
# acurate frequency
dt_sleep = delta_t - (t_iter_start - time.time())
if dt_sleep > 0:
time.sleep(dt_sleep)
t_iter_start = time.time()
led_val = self.led_script.get_state()
i += 1
except KeyboardInterrupt:
pass
self.data.flush(verbose=verbose)
print("Measurement stopped" + " "*50)

View File

@ -1,25 +1,65 @@
import pandas as pd
import numpy as np
from os import path
import os
import matplotlib.pyplot as plt
import datetime
from cpdctrl.utility.file_io import get_next_filename, sanitize_filename
class DataCollector:
def __init__(self, data_name, data_path):
def __init__(self,
data_name: str,
data_path: str,
header: dict[str, str]={},
dirname: str|None=None,
dir_exists_is_ok=False,
):
self.data = []
self.name = data_name
self.path = data_path
self.header = header
self.path = os.path.abspath(os.path.expanduser(data_path))
if dirname is None:
self.dirname = sanitize_filename(datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d_%H-%M") + "_" + self.name)
else:
self.dirname = sanitize_filename(dirname)
self.dirpath = os.path.join(self.path, self.dirname)
if os.path.exists(self.dirpath):
if not dir_exists_is_ok:
raise Exception(f"Directory '{self.dirname}' already exists. Provide a different directory or pass `dir_exists_is_ok=True` to ignore this")
else:
os.makedirs(self.dirpath)
self.flushed = False
def _get_filename(self):
return sanitize_filename(get_next_filename(self.name, self.dirpath, digits=5))
def flush(self, verbose=False):
# TODO check if dir still exists
filename = self._get_filename() + ".csv"
filepath = os.path.join(self.dirpath, filename)
if verbose: print(f"Flushing data to {filepath}")
self.to_dataframe().to_csv(filepath, sep=",", index=False, header=True)
self.data = []
self.flushed = True
def finalize(self):
if self.flushed:
raise NotImplementedError()
def clear(self):
self.data = []
def add_data(self, data):
self.data.append(data)
def add_data(self, i, t, v, l):
self.data.append((i, t, v, l))
def to_dataframe(self):
return pd.DataFrame(self.data)
df = pd.DataFrame(self.data, columns=["idx", "t [s]", "V [V]", "LED [%]"])
df.meta = str(self.header)
return df
def save_csv(self):
self.to_dataframe().to_csv(path.join(self.path, self.name + ".csv"), index=False, header=True)
self.to_dataframe().to_csv(os.path.join(self.path, self.name + ".csv"), index=False, header=True)
@ -28,7 +68,7 @@ class DataCollector:
# df = pd.DataFrame(buffer)
# df.colums = ["Time [s]", "Voltage [V]"]
# return df
# OLD STUFF
def buffers2dataframe(ibuffer, vbuffer):
"""
@param ibuffer : 2d - array: timestamps, current
@ -44,7 +84,7 @@ def load_dataframe(p:str):
Load a dataframe from file.
@param p : path of the file. If it has 'csv' extension, pandas.read_csv is used, pandas.read_pickle otherwise
"""
if not path.isfile(p):
if not os.path.isfile(p):
print(f"ERROR: load_dataframe: File does not exist: {p}")
return None
if p.endswith(".csv"):

View File

@ -1,4 +1,5 @@
from os import listdir, path
import re
def add_zeros(v: int, digits=3):
"""
@ -31,3 +32,6 @@ def get_next_filename(basename, directory=".", digits=3):
continue
return basename + add_zeros(lowest_number+1)
def sanitize_filename(filename):
return re.sub(r'[\\/*?:"<>|]',"", filename)

View File

@ -19,6 +19,7 @@ scripts = {
"instrument_reset": "~/cpd-dev/cpdctrl/cpdctrl/keithley_scripts/reset.scpi",
}
def select_visa_device(visa_backend=""):
rm = pyvisa.ResourceManager(visa_backend)
resources = rm.list_resources()
@ -202,7 +203,11 @@ class Keithley2700(VoltageMeasurementDevice):
pass
print("Measurement stopped" + " "*50)
def init():
instr = select_visa_device()
def init(name=None, visa_backend=""):
if name:
rm = pyvisa.ResourceManager(visa_backend)
instr = rm.open_resource(name)
else:
instr = select_visa_device(name=name)
return Keithley2700(instr)