add data loading

This commit is contained in:
CPD 2025-01-27 18:35:51 +01:00
parent c73928046f
commit dc8d5a6349
7 changed files with 141 additions and 53 deletions

View File

@ -53,7 +53,7 @@ from .utility.data import DataCollector
from .utility import data as _data
from .utility.data import load_dataframe
from .utility.data import load_dataframe, plot_cpd_data
from .utility import file_io
from .update_funcs import _Monitor, _update_print
@ -75,7 +75,7 @@ test = False
# global variable for the instrument/client returned by pyvisa/bleak
dev: VoltageMeasurementDevice|None = None
led: LedControlDevice|None = None
# data = DataCollector(settings["datadir"], settings["name"])
data = DataCollector(data_path=settings["datadir"], data_name="interactive", dirname="interactive_test", dir_exists_is_ok=True)
t0 = 0
def monitor(interval=None, max_measurements=None, max_points_shown=160):
@ -102,7 +102,7 @@ def monitor(interval=None, max_measurements=None, max_points_shown=160):
if i == 0:
t0 = t
t -= t0
data.add_data((t, v))
data.add_data(i, t, v, 0)
plt_monitor.update(i, t, v)
# update_led()
dev.measure(interval=interval, max_measurements=max_measurements, update_func=update_func)

View File

@ -29,7 +29,7 @@ class LedControlDevice(ABC):
"""
pass
# @abstractmethod
@abstractmethod
def set_level(level:int):
"""
Set the led brightness to a certain level

View File

@ -32,6 +32,11 @@ class LEDD1B(LedControlDevice):
self._write("1")
def off(self):
self._write("0")
def set_level(self, level:int):
if level == 0: self.off()
elif level == 100: self.on()
else:
raise ValueError(f"LEDD1B Led controller can only set 0% or 100%")
if __name__ == '__main__':
led = LEDD1B()

View File

@ -57,4 +57,7 @@ class LedScript:
int
LED Intensity [0,100]
"""
# TODO remove hard coded script
if (dt // 3600) % 2 == 0:
return 100
return 0

View File

@ -46,11 +46,19 @@ class Measurement:
self.data.flush(verbose=verbose)
# substract the execution time from the sleep time for a more
# acurate frequency
dt_sleep = delta_t - (t_iter_start - time.time())
dt_sleep = delta_t - (time.time() - t_iter_start)
if dt_sleep > 0:
# print(f"Sleeping for {dt_sleep}")
time.sleep(dt_sleep)
t_iter_start = time.time()
led_val = self.led_script.get_state()
new_led_val = self.led_script.get_state()
if new_led_val != led_val:
try:
self.led_dev.set_level(new_led_val)
led_val = new_led_val
except Exception as e:
print(f"Error setting led to {new_led_val}%:")
print(e)
i += 1
except KeyboardInterrupt:
pass

View File

@ -3,13 +3,16 @@ import numpy as np
import os
import matplotlib.pyplot as plt
import datetime
import pickle
from cpdctrl.utility.file_io import get_next_filename, sanitize_filename
FLUSH_TYPE = "pickle-ndarray"
METADATA_FILENAME = "_measurement_metadata.pkl"
class DataCollector:
def __init__(self,
data_name: str,
data_path: str,
data_name: str="CPData",
header: dict[str, str]={},
dirname: str|None=None,
dir_exists_is_ok=False,
@ -34,19 +37,47 @@ class DataCollector:
def _get_filename(self):
return sanitize_filename(get_next_filename(self.name, self.dirpath, digits=5))
def flush(self, verbose=False):
def _write_metadata(self):
filepath = os.path.join(self.dirpath, METADATA_FILENAME)
with open(filepath, "wb") as file:
pickle.dump(self.header, file)
def flush(self, verbose:bool=False):
"""
Write the current data to a file and clear the internal data
Parameters
----------
verbose : bool, optional
If True, print a message when flushing data. The default is False.
Raises
------
ValueError
If the FLUSH_TYPE is invalid.
Returns
-------
None.
"""
# TODO check if dir still exists
filename = self._get_filename() + ".csv"
filepath = os.path.join(self.dirpath, filename)
if verbose: print(f"Flushing data to {filepath}")
self.to_dataframe().to_csv(filepath, sep=",", index=False, header=True)
if FLUSH_TYPE == "csv":
filename = self._get_filename() + ".csv"
filepath = os.path.join(self.dirpath, filename)
if verbose: print(f"Flushing data to {filepath}")
self.to_dataframe().to_csv(filepath, sep=",", index=False, header=True)
elif FLUSH_TYPE == "pickle-ndarray":
filename = self._get_filename() + ".ndarray.pkl"
filepath = os.path.join(self.dirpath, filename)
if verbose: print(f"Flushing data to {filepath}")
with open(filepath, "wb") as file:
pickle.dump(np.array(self.data), file)
else:
raise ValueError(f"Invalid FLUSH_TYPE: '{FLUSH_TYPE}'")
self.data = []
self.flushed = True
def finalize(self):
if self.flushed:
raise NotImplementedError()
def clear(self):
self.data = []
@ -61,23 +92,53 @@ class DataCollector:
def save_csv(self):
self.to_dataframe().to_csv(os.path.join(self.path, self.name + ".csv"), index=False, header=True)
def get_data(self):
return DataCollector.load_data(self.dirpath)
@staticmethod
def load_data(dirpath:str, verbose:bool=False) -> np.ndarray:
"""
Combines all data files from a directory into a numpy array
Parameters
----------
dirpath : str
Path to the data directory
verbose : bool, optional
If True, print a message for every file that is opened. The default is False.
Raises
------
NotImplementedError
DESCRIPTION.
Returns
-------
data : ndarray
First index: Measurement
Second index: (index, timestamp [s], CPD [V], LED [%])
"""
files = os.listdir(dirpath)
files.sort()
data = np.empty((0, 4))
metadata = {}
for filename in files:
filepath = os.path.join(dirpath, filename)
if filename.endswith(".csv"):
if verbose: print(f"Opening {filepath} as csv")
df = pd.read_csv(filepath)
arr = df.to_numpy()
data = np.concatenate((data, arr))
elif filename.endswith(".ndarray.pkl"):
with open(filepath, "rb") as file:
arr = pickle.load(file)
data = np.concatenate((data, arr))
elif filename == METADATA_FILENAME:
with open(filepath, "rb") as file:
metadata = pickle.load(file)
else:
raise NotImplementedError()
return data, metadata
# deprecated
# def buffer2dataframe(buffer):
# df = pd.DataFrame(buffer)
# df.colums = ["Time [s]", "Voltage [V]"]
# return df
# OLD STUFF
def buffers2dataframe(ibuffer, vbuffer):
"""
@param ibuffer : 2d - array: timestamps, current
@param vbuffer : 2d - array: timestamps, voltage
@returns DataFrame: timestamps, current, voltage
"""
df = pd.DataFrame(np.vstack((ibuffer[:,0], ibuffer[:,1], vbuffer[:,1])).T)
df.columns = ["Time [s]", "Current [A]", "Voltage [V]"]
return df
def load_dataframe(p:str):
"""
@ -93,7 +154,7 @@ def load_dataframe(p:str):
df = pd.read_pickle(p)
return df
def plot(data: str or pd.DataFrame or np.ndarray, title="", U=True, I=False):
def plot_cpd_data(data: str or pd.DataFrame or np.ndarray, t="seconds", title="", CPD:bool=True, LED:bool=False):
"""
Plot recorded data
@param data: filepath, dataframe or numpy array
@ -104,23 +165,34 @@ def plot(data: str or pd.DataFrame or np.ndarray, title="", U=True, I=False):
_data = data.to_numpy()
else:
_data = data
print(_data[0])
plt.ion()
fig, ax = plt.subplots()
ax.set_xlabel("t [s]")
vax = ax
iax = ax
if U and I:
iax = ax.twinx()
if U:
vax = ax
vax.set_ylabel("U [V]")
vax.plot(_data[:,0], _data[:,2], color="blue", label="voltage")
if I:
iax.set_ylabel("I [A]")
iax.plot(_data[:,0], _data[:,1], color="orange", label="current")
if U and I:
plt.legend()
xdata = _data[:,1].copy()
xlabel = "t [s]"
if t == "minutes":
xdata /= 60
xlabel = "t [minutes]"
elif t == "hours":
xdata /= 3600
xlabel = "t [hours]"
ax.set_xlabel(xlabel)
ax_cpd = ax
ax_led = ax
if CPD and LED:
ax_led = ax.twinx()
if CPD:
ax_cpd = ax
ax_cpd.set_ylabel("CPD [V]")
ax_cpd.plot(xdata, _data[:,2], color="blue", label="CPD")
if LED:
ax_led.set_ylabel("LED [%]")
ax_led.plot(xdata, _data[:,3], color="orange", label="LED")
if CPD and LED:
# ax_led.legend()
# ax_cpd.legend()
pass
if title:
ax.set_title(title)
fig.tight_layout()
return fig

View File

@ -14,7 +14,7 @@ def get_next_filename(basename, directory=".", digits=3):
get the next filename (without extenstion).
example:
basename = file
directory has file001.csv, file002.pkl, file004.csv
directory has file001.csv, file002.bla.pkl, file004.csv
-> return file005
"""
files = listdir(directory)
@ -24,14 +24,14 @@ def get_next_filename(basename, directory=".", digits=3):
for file in files:
if not file.startswith(basename): continue
try:
number = file[:file.rfind('.')].replace(basename, "")
number = file[:file.find('.')].replace(basename, "")
number = int(number)
if number < lowest_number: continue
lowest_number = number
except ValueError:
continue
return basename + add_zeros(lowest_number+1)
return basename + add_zeros(lowest_number+1, digits)
def sanitize_filename(filename):
return re.sub(r'[\\/*?:"<>|]',"", filename)