diff --git a/cpdctrl/cpdctrl-interactive.py b/cpdctrl/cpdctrl-interactive.py index 2df9718..9535945 100644 --- a/cpdctrl/cpdctrl-interactive.py +++ b/cpdctrl/cpdctrl-interactive.py @@ -53,7 +53,7 @@ from .utility.data import DataCollector from .utility import data as _data -from .utility.data import load_dataframe +from .utility.data import load_dataframe, plot_cpd_data from .utility import file_io from .update_funcs import _Monitor, _update_print @@ -75,7 +75,7 @@ test = False # global variable for the instrument/client returned by pyvisa/bleak dev: VoltageMeasurementDevice|None = None led: LedControlDevice|None = None -# data = DataCollector(settings["datadir"], settings["name"]) +data = DataCollector(data_path=settings["datadir"], data_name="interactive", dirname="interactive_test", dir_exists_is_ok=True) t0 = 0 def monitor(interval=None, max_measurements=None, max_points_shown=160): @@ -102,7 +102,7 @@ def monitor(interval=None, max_measurements=None, max_points_shown=160): if i == 0: t0 = t t -= t0 - data.add_data((t, v)) + data.add_data(i, t, v, 0) plt_monitor.update(i, t, v) # update_led() dev.measure(interval=interval, max_measurements=max_measurements, update_func=update_func) diff --git a/cpdctrl/led_control_device/base.py b/cpdctrl/led_control_device/base.py index 0f7b105..d343d47 100644 --- a/cpdctrl/led_control_device/base.py +++ b/cpdctrl/led_control_device/base.py @@ -29,7 +29,7 @@ class LedControlDevice(ABC): """ pass - # @abstractmethod + @abstractmethod def set_level(level:int): """ Set the led brightness to a certain level diff --git a/cpdctrl/led_control_device/impl/thorlabs_ledd1b.py b/cpdctrl/led_control_device/impl/thorlabs_ledd1b.py index c662fbe..5292011 100644 --- a/cpdctrl/led_control_device/impl/thorlabs_ledd1b.py +++ b/cpdctrl/led_control_device/impl/thorlabs_ledd1b.py @@ -32,6 +32,11 @@ class LEDD1B(LedControlDevice): self._write("1") def off(self): self._write("0") + def set_level(self, level:int): + if level == 0: self.off() + elif level == 100: self.on() + else: + raise ValueError(f"LEDD1B Led controller can only set 0% or 100%") if __name__ == '__main__': led = LEDD1B() \ No newline at end of file diff --git a/cpdctrl/led_script.py b/cpdctrl/led_script.py index e45569e..e97f4bf 100644 --- a/cpdctrl/led_script.py +++ b/cpdctrl/led_script.py @@ -57,4 +57,7 @@ class LedScript: int LED Intensity [0,100] """ + # TODO remove hard coded script + if (dt // 3600) % 2 == 0: + return 100 return 0 \ No newline at end of file diff --git a/cpdctrl/measurement.py b/cpdctrl/measurement.py index 1e35021..c728b4f 100644 --- a/cpdctrl/measurement.py +++ b/cpdctrl/measurement.py @@ -46,11 +46,19 @@ class Measurement: self.data.flush(verbose=verbose) # substract the execution time from the sleep time for a more # acurate frequency - dt_sleep = delta_t - (t_iter_start - time.time()) + dt_sleep = delta_t - (time.time() - t_iter_start) if dt_sleep > 0: + # print(f"Sleeping for {dt_sleep}") time.sleep(dt_sleep) t_iter_start = time.time() - led_val = self.led_script.get_state() + new_led_val = self.led_script.get_state() + if new_led_val != led_val: + try: + self.led_dev.set_level(new_led_val) + led_val = new_led_val + except Exception as e: + print(f"Error setting led to {new_led_val}%:") + print(e) i += 1 except KeyboardInterrupt: pass diff --git a/cpdctrl/utility/data.py b/cpdctrl/utility/data.py index 13860bb..e0c49c9 100644 --- a/cpdctrl/utility/data.py +++ b/cpdctrl/utility/data.py @@ -3,13 +3,16 @@ import numpy as np import os import matplotlib.pyplot as plt import datetime +import pickle from cpdctrl.utility.file_io import get_next_filename, sanitize_filename +FLUSH_TYPE = "pickle-ndarray" +METADATA_FILENAME = "_measurement_metadata.pkl" class DataCollector: def __init__(self, - data_name: str, data_path: str, + data_name: str="CPData", header: dict[str, str]={}, dirname: str|None=None, dir_exists_is_ok=False, @@ -34,19 +37,47 @@ class DataCollector: def _get_filename(self): return sanitize_filename(get_next_filename(self.name, self.dirpath, digits=5)) - def flush(self, verbose=False): + def _write_metadata(self): + filepath = os.path.join(self.dirpath, METADATA_FILENAME) + with open(filepath, "wb") as file: + pickle.dump(self.header, file) + + def flush(self, verbose:bool=False): + """ + Write the current data to a file and clear the internal data + + Parameters + ---------- + verbose : bool, optional + If True, print a message when flushing data. The default is False. + + Raises + ------ + ValueError + If the FLUSH_TYPE is invalid. + + Returns + ------- + None. + + """ # TODO check if dir still exists - filename = self._get_filename() + ".csv" - filepath = os.path.join(self.dirpath, filename) - if verbose: print(f"Flushing data to {filepath}") - self.to_dataframe().to_csv(filepath, sep=",", index=False, header=True) + if FLUSH_TYPE == "csv": + filename = self._get_filename() + ".csv" + filepath = os.path.join(self.dirpath, filename) + if verbose: print(f"Flushing data to {filepath}") + self.to_dataframe().to_csv(filepath, sep=",", index=False, header=True) + elif FLUSH_TYPE == "pickle-ndarray": + filename = self._get_filename() + ".ndarray.pkl" + filepath = os.path.join(self.dirpath, filename) + if verbose: print(f"Flushing data to {filepath}") + with open(filepath, "wb") as file: + pickle.dump(np.array(self.data), file) + else: + raise ValueError(f"Invalid FLUSH_TYPE: '{FLUSH_TYPE}'") self.data = [] self.flushed = True - - def finalize(self): - if self.flushed: - raise NotImplementedError() - + def clear(self): self.data = [] @@ -61,23 +92,53 @@ class DataCollector: def save_csv(self): self.to_dataframe().to_csv(os.path.join(self.path, self.name + ".csv"), index=False, header=True) - + def get_data(self): + return DataCollector.load_data(self.dirpath) + @staticmethod + def load_data(dirpath:str, verbose:bool=False) -> np.ndarray: + """ + Combines all data files from a directory into a numpy array + + Parameters + ---------- + dirpath : str + Path to the data directory + verbose : bool, optional + If True, print a message for every file that is opened. The default is False. + + Raises + ------ + NotImplementedError + DESCRIPTION. + + Returns + ------- + data : ndarray + First index: Measurement + Second index: (index, timestamp [s], CPD [V], LED [%]) + """ + files = os.listdir(dirpath) + files.sort() + data = np.empty((0, 4)) + metadata = {} + for filename in files: + filepath = os.path.join(dirpath, filename) + if filename.endswith(".csv"): + if verbose: print(f"Opening {filepath} as csv") + df = pd.read_csv(filepath) + arr = df.to_numpy() + data = np.concatenate((data, arr)) + elif filename.endswith(".ndarray.pkl"): + with open(filepath, "rb") as file: + arr = pickle.load(file) + data = np.concatenate((data, arr)) + elif filename == METADATA_FILENAME: + with open(filepath, "rb") as file: + metadata = pickle.load(file) + else: + raise NotImplementedError() + return data, metadata -# deprecated -# def buffer2dataframe(buffer): -# df = pd.DataFrame(buffer) -# df.colums = ["Time [s]", "Voltage [V]"] -# return df -# OLD STUFF -def buffers2dataframe(ibuffer, vbuffer): - """ - @param ibuffer : 2d - array: timestamps, current - @param vbuffer : 2d - array: timestamps, voltage - @returns DataFrame: timestamps, current, voltage - """ - df = pd.DataFrame(np.vstack((ibuffer[:,0], ibuffer[:,1], vbuffer[:,1])).T) - df.columns = ["Time [s]", "Current [A]", "Voltage [V]"] - return df def load_dataframe(p:str): """ @@ -93,7 +154,7 @@ def load_dataframe(p:str): df = pd.read_pickle(p) return df -def plot(data: str or pd.DataFrame or np.ndarray, title="", U=True, I=False): +def plot_cpd_data(data: str or pd.DataFrame or np.ndarray, t="seconds", title="", CPD:bool=True, LED:bool=False): """ Plot recorded data @param data: filepath, dataframe or numpy array @@ -104,23 +165,34 @@ def plot(data: str or pd.DataFrame or np.ndarray, title="", U=True, I=False): _data = data.to_numpy() else: _data = data - print(_data[0]) - plt.ion() fig, ax = plt.subplots() - ax.set_xlabel("t [s]") - vax = ax - iax = ax - if U and I: - iax = ax.twinx() - if U: - vax = ax - vax.set_ylabel("U [V]") - vax.plot(_data[:,0], _data[:,2], color="blue", label="voltage") - if I: - iax.set_ylabel("I [A]") - iax.plot(_data[:,0], _data[:,1], color="orange", label="current") - if U and I: - plt.legend() + xdata = _data[:,1].copy() + xlabel = "t [s]" + if t == "minutes": + xdata /= 60 + xlabel = "t [minutes]" + elif t == "hours": + xdata /= 3600 + xlabel = "t [hours]" + ax.set_xlabel(xlabel) + ax_cpd = ax + ax_led = ax + if CPD and LED: + ax_led = ax.twinx() + if CPD: + ax_cpd = ax + ax_cpd.set_ylabel("CPD [V]") + ax_cpd.plot(xdata, _data[:,2], color="blue", label="CPD") + if LED: + ax_led.set_ylabel("LED [%]") + ax_led.plot(xdata, _data[:,3], color="orange", label="LED") + if CPD and LED: + # ax_led.legend() + # ax_cpd.legend() + pass + if title: + ax.set_title(title) + fig.tight_layout() return fig diff --git a/cpdctrl/utility/file_io.py b/cpdctrl/utility/file_io.py index 96a1fd8..dfcf767 100644 --- a/cpdctrl/utility/file_io.py +++ b/cpdctrl/utility/file_io.py @@ -14,7 +14,7 @@ def get_next_filename(basename, directory=".", digits=3): get the next filename (without extenstion). example: basename = file - directory has file001.csv, file002.pkl, file004.csv + directory has file001.csv, file002.bla.pkl, file004.csv -> return file005 """ files = listdir(directory) @@ -24,14 +24,14 @@ def get_next_filename(basename, directory=".", digits=3): for file in files: if not file.startswith(basename): continue try: - number = file[:file.rfind('.')].replace(basename, "") + number = file[:file.find('.')].replace(basename, "") number = int(number) if number < lowest_number: continue lowest_number = number except ValueError: continue - return basename + add_zeros(lowest_number+1) + return basename + add_zeros(lowest_number+1, digits) def sanitize_filename(filename): return re.sub(r'[\\/*?:"<>|]',"", filename) \ No newline at end of file