CPD 55b4c20865 Revert "Fix: Use correct timezone for dirname"
This reverts commit a9b6ce7ef726e1e0ef7953560aafdab4252f209f.
2025-03-12 16:44:45 +01:00

254 lines
8.3 KiB
Python

import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import datetime
import pickle
from cpdctrl.utility.file_io import get_next_filename, sanitize_filename
FLUSH_TYPE = "pickle-ndarray"
METADATA_FILENAME = "_measurement_metadata.pkl"
class DataCollector:
columns = ["idx", "t [s]", "V [V]", "LED [%]"]
def __init__(self,
data_path: str,
data_name: str="CPData",
metadata: dict[str, str]={},
dirname: str|None=None,
add_number_if_dir_exists=True,
):
self.data = []
self.fulldata = None # if loaded, this contains the final numpy array
self.name = data_name
self.metadata = metadata
self.path = os.path.abspath(os.path.expanduser(data_path))
if dirname is None:
self.dirname = sanitize_filename(datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d_%H-%M") + "_" + self.name)
else:
self.dirname = sanitize_filename(dirname)
self.dirpath = os.path.join(self.path, self.dirname)
if os.path.exists(self.dirpath):
if not add_number_if_dir_exists:
raise Exception(f"Directory '{self.dirname}' already exists. Provide a different directory or pass `add_number_if_dir_exists=True` to ignore this")
else:
i = 1
dirpath = f"{self.dirpath}-{i}"
while os.path.exists(dirpath):
i += 1
dirpath = f"{self.dirpath}-{i}"
print(f"Directory '{self.dirname}' already exists. Trying '{dirpath}' instead")
self.dirpath = dirpath
os.makedirs(self.dirpath)
self.flushed = False
def _get_filename(self):
return sanitize_filename(get_next_filename(self.name, self.dirpath, digits=5))
def write_metadata(self):
f"""
Write the metadata to the disk as '{METADATA_FILENAME}'
Returns
-------
None.
"""
filepath = os.path.join(self.dirpath, METADATA_FILENAME)
with open(filepath, "wb") as file:
pickle.dump(self.metadata, file)
def flush(self, verbose:bool=False):
"""
Write the current data to a file and clear the internal data
Parameters
----------
verbose : bool, optional
If True, print a message when flushing data. The default is False.
Raises
------
ValueError
If the FLUSH_TYPE is invalid.
Returns
-------
None.
"""
# dont flush empty data
if len(self.data) == 0:
return
# TODO check if dir still exists
if FLUSH_TYPE == "csv":
filename = self._get_filename() + ".csv"
filepath = os.path.join(self.dirpath, filename)
if verbose: print(f"Flushing data to {filepath}")
self.to_dataframe().to_csv(filepath, sep=",", index=False, metadata=True)
elif FLUSH_TYPE == "pickle-ndarray":
filename = self._get_filename() + ".ndarray.pkl"
filepath = os.path.join(self.dirpath, filename)
if verbose: print(f"Flushing data to {filepath}")
with open(filepath, "wb") as file:
pickle.dump(np.array(self.data), file)
else:
raise ValueError(f"Invalid FLUSH_TYPE: '{FLUSH_TYPE}'")
self.clear()
def clear(self):
self.data = []
self.fulldata = None
def add_data(self, i, t, v, l):
self.data.append((i, t, v, l))
self.fulldata = None # no longer up to date
def to_dataframe(self):
df = pd.DataFrame(self.data, columns=DataCollector.columns)
df.meta = str(self.metadata)
return df
def to_csv(self, sep=","):
# self.to_dataframe().to_csv(os.path.join(self.path, self.name + ".csv"), index=False, metadata=True)
data, metadata = self.get_data()
return DataCollector.get_csv(data, self.metadata, sep=sep)
def save_csv(self, sep=",", verbose=False):
filepath = os.path.join(self.path, self.dirname + ".csv")
if verbose: print(f"Writing data to {filepath}")
with open(filepath, "w") as file:
file.write(self.to_csv(sep=sep))
def get_data(self) -> tuple[np.ndarray, dict]:
if self.fulldata is None:
return DataCollector.load_data(self.dirpath)
else:
return self.fulldata, self.metadata
@staticmethod
def get_csv(data, metadata, sep=","):
csv = ""
for k, v in metadata.items():
csv += f"# {k}: {v}\n"
csv += "".join(f"{colname}{sep}" for colname in DataCollector.columns).strip(sep) + "\n"
for i in range(data.shape[0]):
csv += f"{i}{sep}{data[i,1]}{sep}{data[i,2]}{sep}{data[i,3]}\n"
return csv.strip("\n")
@staticmethod
def load_data(dirpath:str, verbose:bool=False) -> tuple[np.ndarray, dict]:
"""
Combines all data files from a directory into a numpy array
Parameters
----------
dirpath : str
Path to the data directory
verbose : bool, optional
If True, print a message for every file that is opened. The default is False.
Raises
------
NotImplementedError
DESCRIPTION.
Returns
-------
data : ndarray
First index: Measurement
Second index: (index, timestamp [s], CPD [V], LED [%])
"""
files = os.listdir(dirpath)
files.sort()
data = np.empty((0, 4))
metadata = {}
for filename in files:
filepath = os.path.join(dirpath, filename)
if filename.endswith(".csv"):
if verbose: print(f"Opening {filepath} as csv")
df = pd.read_csv(filepath)
arr = df.to_numpy()
data = np.concatenate((data, arr))
elif filename.endswith(".ndarray.pkl"):
with open(filepath, "rb") as file:
arr = pickle.load(file)
if len(arr.shape) != 2 or arr.shape[1] != 4:
print(f"Skipping file '{filepath}' with invalid array shape: {arr.shape}")
continue
data = np.concatenate((data, arr))
elif filename == METADATA_FILENAME:
with open(filepath, "rb") as file:
metadata = pickle.load(file)
else:
raise NotImplementedError()
return data, metadata
def plot_cpd_data(data: str or pd.DataFrame or np.ndarray, t: str="seconds", title: str="", CPD:bool=True, LED:bool=True):
"""
Plot recorded data
Parameters
----------
data : str or np.ndarray
Path to the data directory or
numpy array with columns (idx, t [s], V [V], LED [%])
t : str, optional
Which timescale to use for the x axis:
Must be one of "seconds", "mintutes", "hours".
The default is "seconds".
title : str, optional
Title for the plot. The default is "".
CPD : bool, optional
Wether to plot the voltage (CPD) line. The default is True.
LED : bool, optional
Wether to plot the LED state line. The default is False.
Returns
-------
fig : TYPE
Matplotlib figure object.
"""
if type(data) == str:
_data, _ = DataCollector.load_data(data)
else:
_data = data
fig, ax = plt.subplots()
xdata = _data[:,1].copy()
xlabel = "t [s]"
if t == "minutes":
xdata /= 60
xlabel = "t [minutes]"
elif t == "hours":
xdata /= 3600
xlabel = "t [hours]"
ax.set_xlabel(xlabel)
ax_cpd = ax
ax_led = ax
if CPD and LED:
ax_led = ax.twinx()
if CPD:
ax_cpd = ax
ax_cpd.set_ylabel("CPD [V]")
ax_cpd.plot(xdata, _data[:,2], color="blue", label="CPD")
if LED:
ax_led.set_ylabel("LED [%]")
ax_led.plot(xdata, _data[:,3], color="orange", label="LED")
ax_led.set_ylim(-2, 102)
ax_led.set_yticks([0, 20, 40, 60, 80, 100])
if CPD and LED:
# ax_led.legend()
# ax_cpd.legend()
pass
if title:
ax.set_title(title)
fig.tight_layout()
return fig