230 lines
7.2 KiB
Python
230 lines
7.2 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
import os
|
|
import matplotlib.pyplot as plt
|
|
import datetime
|
|
import pickle
|
|
|
|
from cpdctrl.utility.file_io import get_next_filename, sanitize_filename
|
|
FLUSH_TYPE = "pickle-ndarray"
|
|
METADATA_FILENAME = "_measurement_metadata.pkl"
|
|
|
|
class DataCollector:
|
|
columns = ["idx", "t [s]", "V [V]", "LED [%]"]
|
|
def __init__(self,
|
|
data_path: str,
|
|
data_name: str="CPData",
|
|
metadata: dict[str, str]={},
|
|
dirname: str|None=None,
|
|
dir_exists_is_ok=False,
|
|
):
|
|
self.data = []
|
|
self.name = data_name
|
|
self.metadata = metadata
|
|
self.path = os.path.abspath(os.path.expanduser(data_path))
|
|
if dirname is None:
|
|
self.dirname = sanitize_filename(datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d_%H-%M") + "_" + self.name)
|
|
else:
|
|
self.dirname = sanitize_filename(dirname)
|
|
self.dirpath = os.path.join(self.path, self.dirname)
|
|
|
|
if os.path.exists(self.dirpath):
|
|
if not dir_exists_is_ok:
|
|
raise Exception(f"Directory '{self.dirname}' already exists. Provide a different directory or pass `dir_exists_is_ok=True` to ignore this")
|
|
else:
|
|
os.makedirs(self.dirpath)
|
|
self.flushed = False
|
|
|
|
|
|
def _get_filename(self):
|
|
return sanitize_filename(get_next_filename(self.name, self.dirpath, digits=5))
|
|
|
|
def write_metadata(self):
|
|
f"""
|
|
Write the metadata to the disk as '{METADATA_FILENAME}'
|
|
|
|
Returns
|
|
-------
|
|
None.
|
|
"""
|
|
filepath = os.path.join(self.dirpath, METADATA_FILENAME)
|
|
with open(filepath, "wb") as file:
|
|
pickle.dump(self.metadata, file)
|
|
|
|
def flush(self, verbose:bool=False):
|
|
"""
|
|
Write the current data to a file and clear the internal data
|
|
|
|
Parameters
|
|
----------
|
|
verbose : bool, optional
|
|
If True, print a message when flushing data. The default is False.
|
|
|
|
Raises
|
|
------
|
|
ValueError
|
|
If the FLUSH_TYPE is invalid.
|
|
|
|
Returns
|
|
-------
|
|
None.
|
|
|
|
"""
|
|
# TODO check if dir still exists
|
|
if FLUSH_TYPE == "csv":
|
|
filename = self._get_filename() + ".csv"
|
|
filepath = os.path.join(self.dirpath, filename)
|
|
if verbose: print(f"Flushing data to {filepath}")
|
|
self.to_dataframe().to_csv(filepath, sep=",", index=False, metadata=True)
|
|
elif FLUSH_TYPE == "pickle-ndarray":
|
|
filename = self._get_filename() + ".ndarray.pkl"
|
|
filepath = os.path.join(self.dirpath, filename)
|
|
if verbose: print(f"Flushing data to {filepath}")
|
|
with open(filepath, "wb") as file:
|
|
pickle.dump(np.array(self.data), file)
|
|
else:
|
|
raise ValueError(f"Invalid FLUSH_TYPE: '{FLUSH_TYPE}'")
|
|
self.data = []
|
|
self.flushed = True
|
|
|
|
def clear(self):
|
|
self.data = []
|
|
|
|
def add_data(self, i, t, v, l):
|
|
self.data.append((i, t, v, l))
|
|
|
|
def to_dataframe(self):
|
|
df = pd.DataFrame(self.data, columns=DataCollector.columns)
|
|
df.meta = str(self.metadata)
|
|
return df
|
|
|
|
def to_csv(self, sep=","):
|
|
# self.to_dataframe().to_csv(os.path.join(self.path, self.name + ".csv"), index=False, metadata=True)
|
|
data, metadata = self.get_data()
|
|
return DataCollector.get_csv(data, self.metadata, sep=sep)
|
|
|
|
def save_csv(self, sep=",", verbose=False):
|
|
filepath = os.path.join(self.path, self.name + ".csv")
|
|
if verbose: print(f"Writing data to {filepath}")
|
|
with open(filepath, "w") as file:
|
|
file.write(self.to_csv(sep=sep))
|
|
|
|
def get_data(self):
|
|
return DataCollector.load_data(self.dirpath)
|
|
|
|
@staticmethod
|
|
def get_csv(data, metadata, sep=","):
|
|
csv = ""
|
|
for k, v in metadata.items():
|
|
csv += f"# {k}: {v}\n"
|
|
csv += "".join(f"{colname}{sep}" for colname in DataCollector.columns).strip(sep) + "\n"
|
|
for i in range(data.shape[0]):
|
|
csv += f"{i}{sep}{data[i,1]}{sep}{data[i,2]}{sep}{data[i,3]}\n"
|
|
return csv.strip("\n")
|
|
|
|
@staticmethod
|
|
def load_data(dirpath:str, verbose:bool=False) -> np.ndarray:
|
|
"""
|
|
Combines all data files from a directory into a numpy array
|
|
|
|
Parameters
|
|
----------
|
|
dirpath : str
|
|
Path to the data directory
|
|
verbose : bool, optional
|
|
If True, print a message for every file that is opened. The default is False.
|
|
|
|
Raises
|
|
------
|
|
NotImplementedError
|
|
DESCRIPTION.
|
|
|
|
Returns
|
|
-------
|
|
data : ndarray
|
|
First index: Measurement
|
|
Second index: (index, timestamp [s], CPD [V], LED [%])
|
|
"""
|
|
files = os.listdir(dirpath)
|
|
files.sort()
|
|
data = np.empty((0, 4))
|
|
metadata = {}
|
|
for filename in files:
|
|
filepath = os.path.join(dirpath, filename)
|
|
if filename.endswith(".csv"):
|
|
if verbose: print(f"Opening {filepath} as csv")
|
|
df = pd.read_csv(filepath)
|
|
arr = df.to_numpy()
|
|
data = np.concatenate((data, arr))
|
|
elif filename.endswith(".ndarray.pkl"):
|
|
with open(filepath, "rb") as file:
|
|
arr = pickle.load(file)
|
|
data = np.concatenate((data, arr))
|
|
elif filename == METADATA_FILENAME:
|
|
with open(filepath, "rb") as file:
|
|
metadata = pickle.load(file)
|
|
else:
|
|
raise NotImplementedError()
|
|
return data, metadata
|
|
|
|
|
|
def load_dataframe(p:str):
|
|
"""
|
|
Load a dataframe from file.
|
|
@param p : path of the file. If it has 'csv' extension, pandas.read_csv is used, pandas.read_pickle otherwise
|
|
"""
|
|
if not os.path.isfile(p):
|
|
print(f"ERROR: load_dataframe: File does not exist: {p}")
|
|
return None
|
|
if p.endswith(".csv"):
|
|
df = pd.read_csv(p)
|
|
else:
|
|
df = pd.read_pickle(p)
|
|
return df
|
|
|
|
def plot_cpd_data(data: str or pd.DataFrame or np.ndarray, t="seconds", title="", CPD:bool=True, LED:bool=False):
|
|
"""
|
|
Plot recorded data
|
|
@param data: filepath, dataframe or numpy array
|
|
"""
|
|
if type(data) == str:
|
|
_data = load_dataframe(data).to_numpy()
|
|
elif type(data) == pd.DataFrame:
|
|
_data = data.to_numpy()
|
|
else:
|
|
_data = data
|
|
fig, ax = plt.subplots()
|
|
xdata = _data[:,1].copy()
|
|
xlabel = "t [s]"
|
|
if t == "minutes":
|
|
xdata /= 60
|
|
xlabel = "t [minutes]"
|
|
elif t == "hours":
|
|
xdata /= 3600
|
|
xlabel = "t [hours]"
|
|
ax.set_xlabel(xlabel)
|
|
ax_cpd = ax
|
|
ax_led = ax
|
|
if CPD and LED:
|
|
ax_led = ax.twinx()
|
|
if CPD:
|
|
ax_cpd = ax
|
|
ax_cpd.set_ylabel("CPD [V]")
|
|
ax_cpd.plot(xdata, _data[:,2], color="blue", label="CPD")
|
|
if LED:
|
|
ax_led.set_ylabel("LED [%]")
|
|
ax_led.plot(xdata, _data[:,3], color="orange", label="LED")
|
|
if CPD and LED:
|
|
# ax_led.legend()
|
|
# ax_cpd.legend()
|
|
pass
|
|
if title:
|
|
ax.set_title(title)
|
|
fig.tight_layout()
|
|
return fig
|
|
|
|
|
|
|
|
|
|
|