diff --git a/cpdctrl/utility/data.py b/cpdctrl/utility/data.py index 5022814..fbcd3ae 100644 --- a/cpdctrl/utility/data.py +++ b/cpdctrl/utility/data.py @@ -9,7 +9,8 @@ log = logging.getLogger(__name__) from cpdctrl.utility.file_io import get_next_filename, sanitize_filename FLUSH_TYPE = "pickle-ndarray" -METADATA_FILENAME = "_measurement_metadata.pkl" +FLUSH_PREFIX = "PART_" +METADATA_FILENAME = FLUSH_PREFIX + "measurement_metadata.pkl" class DataCollector: @@ -45,30 +46,18 @@ class DataCollector: self.dirpath = dirpath self.assert_directory_exists() self.flushed = False - - - def _get_filename(self): - return sanitize_filename(get_next_filename(self.name, self.dirpath, digits=5)) - - def write_metadata(self): - f""" - Write the metadata to the disk as '{METADATA_FILENAME}' - - Returns - ------- - None. - """ - filepath = os.path.join(self.dirpath, METADATA_FILENAME) - log.debug(f"Writing metadata to {filepath}") - with open(filepath, "wb") as file: - pickle.dump(self.metadata, file) - - def assert_directory_exists(self): - if not os.path.isdir(self.dirpath): - os.makedirs(self.dirpath) - def flush(self, verbose:bool=False): + # OPERATION + def clear(self): + self.data = [] + self.fulldata = None + + def add_data(self, i, t, v, l): + self.data.append((i, t, v, l)) + self.fulldata = None # no longer up to date + + def flush(self, verbose: bool = False): """ Write the current data to a file and clear the internal data @@ -92,13 +81,13 @@ class DataCollector: return self.assert_directory_exists() if FLUSH_TYPE == "csv": - filename = self._get_filename() + ".csv" + filename = self._get_flush_filename() + ".csv" filepath = os.path.join(self.dirpath, filename) log.info(f"Flushing data to {filepath}") if verbose: print(f"Flushing data to {filepath}") self.to_dataframe().to_csv(filepath, sep=",", index=False, metadata=True) elif FLUSH_TYPE == "pickle-ndarray": - filename = self._get_filename() + ".ndarray.pkl" + filename = self._get_flush_filename() + ".ndarray.pkl" filepath = os.path.join(self.dirpath, filename) log.info(f"Flushing data to {filepath}") if verbose: print(f"Flushing data to {filepath}") @@ -107,32 +96,18 @@ class DataCollector: else: raise ValueError(f"Invalid FLUSH_TYPE: '{FLUSH_TYPE}'") self.clear() - - def clear(self): - self.data = [] - self.fulldata = None - def add_data(self, i, t, v, l): - self.data.append((i, t, v, l)) - self.fulldata = None # no longer up to date - + # Convert data def to_dataframe(self): df = pd.DataFrame(self.data, columns=DataCollector.columns) df.meta = str(self.metadata) return df - + def to_csv(self, sep=","): # self.to_dataframe().to_csv(os.path.join(self.path, self.name + ".csv"), index=False, metadata=True) data, metadata = self.get_data() return DataCollector.get_csv(data, self.metadata, sep=sep) - - def save_csv(self, sep=",", verbose=False): - filepath = os.path.join(self.path, self.dirname + ".csv") - if verbose: print(f"Writing csv to {filepath}") - log.info(f"Writing csv to {filepath}") - with open(filepath, "w") as file: - file.write(self.to_csv(sep=sep)) - + def get_data(self) -> tuple[np.ndarray, dict]: """ Load the full data and return it together with the metadata @@ -145,7 +120,47 @@ class DataCollector: self.fulldata, new_mdata = DataCollector.load_data_from_dir(self.dirpath) self.metadata |= new_mdata return self.fulldata, self.metadata - + + # File IO + def _get_flush_filename(self): + """Get the filename of the next partial file, incrementing the number every time""" + return sanitize_filename(get_next_filename(FLUSH_PREFIX + self.name, self.dirpath, digits=5)) + + def assert_directory_exists(self): + if not os.path.isdir(self.dirpath): + os.makedirs(self.dirpath) + + + def save_csv_at(self, filepath, sep=",", verbose=False): + if verbose: print(f"Writing csv to {filepath}") + log.info(f"Writing csv to {filepath}") + with open(filepath, "w") as file: + file.write(self.to_csv(sep=sep)) + + def save_csv(self, sep=",", verbose=False): + """Save the csv inside the data directory""" + filepath = os.path.join(self.path, self.dirname + ".csv") + self.save_csv_at(filepath, sep, verbose) + + def save_csv_in_dir(self, sep=",", verbose=False): + """Save the csv inside the directory with temporary data""" + filepath = os.path.join(self.dirpath, self.dirname + ".csv") + self.save_csv_at(filepath, sep, verbose) + + def write_metadata(self): + f""" + Write the metadata to the disk as '{METADATA_FILENAME}' + + Returns + ------- + None. + """ + filepath = os.path.join(self.dirpath, METADATA_FILENAME) + log.debug(f"Writing metadata to {filepath}") + with open(filepath, "wb") as file: + pickle.dump(self.metadata, file) + + # STATIC LOADERS @staticmethod def get_csv(data, metadata, sep=","): csv = "" @@ -157,7 +172,7 @@ class DataCollector: return csv.strip("\n") @staticmethod - def load_data_from_csv(filepath:str) -> tuple[np.ndarray, dict]: + def load_data_from_csv(filepath:str, sep: str=",") -> tuple[np.ndarray, dict]: """ Loads data from a single csv file. Lines with this format are interpreted as metadata: @@ -168,6 +183,8 @@ class DataCollector: ---------- filepath Path to the csv file. + sep + csv separator Returns ------- @@ -177,9 +194,9 @@ class DataCollector: Dictionary with metadata. """ metadata = {} - data = np.empty((0, 4)) with open(filepath, "r") as f: - for j, line in enumerate(f): + # this loop will read the metadata at the beginning and skip also the header row + for line in f: if line.startswith("#"): colon = line.find(":") if colon == -1: # normal comment @@ -187,20 +204,10 @@ class DataCollector: key = line[1:colon].strip() value = line[colon+1:].strip() metadata[key] = value - continue - if line.startswith("idx"): # header line - continue - vals = line.split(",") - if len(vals) != 4: - raise ValueError(f"Line {j+1}: Line must have 4 values, but has {len(vals)}: '{line}'") - try: - i = int(vals[0]) - t = float(vals[1]) - cpd = float(vals[2]) - led = float(vals[3]) - except ValueError: - raise ValueError(f"Line {j+1}: Failed to convert values to numbers: '{line}'") - data = np.append(data, [[i, t, cpd, led]], axis=0) + else: + break + # here, the generator has only data lines + data = np.loadtxt(f, delimiter=sep) return data, metadata @staticmethod @@ -232,23 +239,26 @@ class DataCollector: metadata = {} for filename in files: filepath = os.path.join(dirpath, filename) - if filename.endswith(".csv"): - if verbose: print(f"Opening {filepath} as csv") - df = pd.read_csv(filepath) - arr = df.to_numpy() - data = np.concatenate((data, arr)) - elif filename.endswith(".ndarray.pkl"): - with open(filepath, "rb") as file: - arr = pickle.load(file) - if len(arr.shape) != 2 or arr.shape[1] != 4: - print(f"Skipping file '{filepath}' with invalid array shape: {arr.shape}") - continue + if filename.startswith(FLUSH_PREFIX): + if filename.endswith(".csv"): + if verbose: print(f"Opening {filepath} as csv") + df = pd.read_csv(filepath) + arr = df.to_numpy() data = np.concatenate((data, arr)) - elif filename == METADATA_FILENAME: - with open(filepath, "rb") as file: - metadata = pickle.load(file) + elif filename.endswith(".ndarray.pkl"): + with open(filepath, "rb") as file: + arr = pickle.load(file) + if len(arr.shape) != 2 or arr.shape[1] != 4: + print(f"Skipping file '{filepath}' with invalid array shape: {arr.shape}") + continue + data = np.concatenate((data, arr)) + elif filename == METADATA_FILENAME: # Metadata filename must also start with FLUSH_PREFIX + with open(filepath, "rb") as file: + metadata = pickle.load(file) + else: + raise NotImplementedError(f"Unknown file extension for file '{filepath}'") else: - raise NotImplementedError() + log.info(f"Skipping unknown file: '{filepath}'") return data, metadata