BREAKING: Change flush filenames

Improve load_csv performance using numpy builtin
This commit is contained in:
CPD 2025-03-17 10:14:10 +01:00
parent 51dd329bfc
commit c955d7a8f9

View File

@ -9,7 +9,8 @@ log = logging.getLogger(__name__)
from cpdctrl.utility.file_io import get_next_filename, sanitize_filename
FLUSH_TYPE = "pickle-ndarray"
METADATA_FILENAME = "_measurement_metadata.pkl"
FLUSH_PREFIX = "PART_"
METADATA_FILENAME = FLUSH_PREFIX + "measurement_metadata.pkl"
class DataCollector:
@ -45,30 +46,18 @@ class DataCollector:
self.dirpath = dirpath
self.assert_directory_exists()
self.flushed = False
def _get_filename(self):
return sanitize_filename(get_next_filename(self.name, self.dirpath, digits=5))
def write_metadata(self):
f"""
Write the metadata to the disk as '{METADATA_FILENAME}'
Returns
-------
None.
"""
filepath = os.path.join(self.dirpath, METADATA_FILENAME)
log.debug(f"Writing metadata to {filepath}")
with open(filepath, "wb") as file:
pickle.dump(self.metadata, file)
def assert_directory_exists(self):
if not os.path.isdir(self.dirpath):
os.makedirs(self.dirpath)
def flush(self, verbose:bool=False):
# OPERATION
def clear(self):
self.data = []
self.fulldata = None
def add_data(self, i, t, v, l):
self.data.append((i, t, v, l))
self.fulldata = None # no longer up to date
def flush(self, verbose: bool = False):
"""
Write the current data to a file and clear the internal data
@ -92,13 +81,13 @@ class DataCollector:
return
self.assert_directory_exists()
if FLUSH_TYPE == "csv":
filename = self._get_filename() + ".csv"
filename = self._get_flush_filename() + ".csv"
filepath = os.path.join(self.dirpath, filename)
log.info(f"Flushing data to {filepath}")
if verbose: print(f"Flushing data to {filepath}")
self.to_dataframe().to_csv(filepath, sep=",", index=False, metadata=True)
elif FLUSH_TYPE == "pickle-ndarray":
filename = self._get_filename() + ".ndarray.pkl"
filename = self._get_flush_filename() + ".ndarray.pkl"
filepath = os.path.join(self.dirpath, filename)
log.info(f"Flushing data to {filepath}")
if verbose: print(f"Flushing data to {filepath}")
@ -107,32 +96,18 @@ class DataCollector:
else:
raise ValueError(f"Invalid FLUSH_TYPE: '{FLUSH_TYPE}'")
self.clear()
def clear(self):
self.data = []
self.fulldata = None
def add_data(self, i, t, v, l):
self.data.append((i, t, v, l))
self.fulldata = None # no longer up to date
# Convert data
def to_dataframe(self):
df = pd.DataFrame(self.data, columns=DataCollector.columns)
df.meta = str(self.metadata)
return df
def to_csv(self, sep=","):
# self.to_dataframe().to_csv(os.path.join(self.path, self.name + ".csv"), index=False, metadata=True)
data, metadata = self.get_data()
return DataCollector.get_csv(data, self.metadata, sep=sep)
def save_csv(self, sep=",", verbose=False):
filepath = os.path.join(self.path, self.dirname + ".csv")
if verbose: print(f"Writing csv to {filepath}")
log.info(f"Writing csv to {filepath}")
with open(filepath, "w") as file:
file.write(self.to_csv(sep=sep))
def get_data(self) -> tuple[np.ndarray, dict]:
"""
Load the full data and return it together with the metadata
@ -145,7 +120,47 @@ class DataCollector:
self.fulldata, new_mdata = DataCollector.load_data_from_dir(self.dirpath)
self.metadata |= new_mdata
return self.fulldata, self.metadata
# File IO
def _get_flush_filename(self):
"""Get the filename of the next partial file, incrementing the number every time"""
return sanitize_filename(get_next_filename(FLUSH_PREFIX + self.name, self.dirpath, digits=5))
def assert_directory_exists(self):
if not os.path.isdir(self.dirpath):
os.makedirs(self.dirpath)
def save_csv_at(self, filepath, sep=",", verbose=False):
if verbose: print(f"Writing csv to {filepath}")
log.info(f"Writing csv to {filepath}")
with open(filepath, "w") as file:
file.write(self.to_csv(sep=sep))
def save_csv(self, sep=",", verbose=False):
"""Save the csv inside the data directory"""
filepath = os.path.join(self.path, self.dirname + ".csv")
self.save_csv_at(filepath, sep, verbose)
def save_csv_in_dir(self, sep=",", verbose=False):
"""Save the csv inside the directory with temporary data"""
filepath = os.path.join(self.dirpath, self.dirname + ".csv")
self.save_csv_at(filepath, sep, verbose)
def write_metadata(self):
f"""
Write the metadata to the disk as '{METADATA_FILENAME}'
Returns
-------
None.
"""
filepath = os.path.join(self.dirpath, METADATA_FILENAME)
log.debug(f"Writing metadata to {filepath}")
with open(filepath, "wb") as file:
pickle.dump(self.metadata, file)
# STATIC LOADERS
@staticmethod
def get_csv(data, metadata, sep=","):
csv = ""
@ -157,7 +172,7 @@ class DataCollector:
return csv.strip("\n")
@staticmethod
def load_data_from_csv(filepath:str) -> tuple[np.ndarray, dict]:
def load_data_from_csv(filepath:str, sep: str=",") -> tuple[np.ndarray, dict]:
"""
Loads data from a single csv file.
Lines with this format are interpreted as metadata:
@ -168,6 +183,8 @@ class DataCollector:
----------
filepath
Path to the csv file.
sep
csv separator
Returns
-------
@ -177,9 +194,9 @@ class DataCollector:
Dictionary with metadata.
"""
metadata = {}
data = np.empty((0, 4))
with open(filepath, "r") as f:
for j, line in enumerate(f):
# this loop will read the metadata at the beginning and skip also the header row
for line in f:
if line.startswith("#"):
colon = line.find(":")
if colon == -1: # normal comment
@ -187,20 +204,10 @@ class DataCollector:
key = line[1:colon].strip()
value = line[colon+1:].strip()
metadata[key] = value
continue
if line.startswith("idx"): # header line
continue
vals = line.split(",")
if len(vals) != 4:
raise ValueError(f"Line {j+1}: Line must have 4 values, but has {len(vals)}: '{line}'")
try:
i = int(vals[0])
t = float(vals[1])
cpd = float(vals[2])
led = float(vals[3])
except ValueError:
raise ValueError(f"Line {j+1}: Failed to convert values to numbers: '{line}'")
data = np.append(data, [[i, t, cpd, led]], axis=0)
else:
break
# here, the generator has only data lines
data = np.loadtxt(f, delimiter=sep)
return data, metadata
@staticmethod
@ -232,23 +239,26 @@ class DataCollector:
metadata = {}
for filename in files:
filepath = os.path.join(dirpath, filename)
if filename.endswith(".csv"):
if verbose: print(f"Opening {filepath} as csv")
df = pd.read_csv(filepath)
arr = df.to_numpy()
data = np.concatenate((data, arr))
elif filename.endswith(".ndarray.pkl"):
with open(filepath, "rb") as file:
arr = pickle.load(file)
if len(arr.shape) != 2 or arr.shape[1] != 4:
print(f"Skipping file '{filepath}' with invalid array shape: {arr.shape}")
continue
if filename.startswith(FLUSH_PREFIX):
if filename.endswith(".csv"):
if verbose: print(f"Opening {filepath} as csv")
df = pd.read_csv(filepath)
arr = df.to_numpy()
data = np.concatenate((data, arr))
elif filename == METADATA_FILENAME:
with open(filepath, "rb") as file:
metadata = pickle.load(file)
elif filename.endswith(".ndarray.pkl"):
with open(filepath, "rb") as file:
arr = pickle.load(file)
if len(arr.shape) != 2 or arr.shape[1] != 4:
print(f"Skipping file '{filepath}' with invalid array shape: {arr.shape}")
continue
data = np.concatenate((data, arr))
elif filename == METADATA_FILENAME: # Metadata filename must also start with FLUSH_PREFIX
with open(filepath, "rb") as file:
metadata = pickle.load(file)
else:
raise NotImplementedError(f"Unknown file extension for file '{filepath}'")
else:
raise NotImplementedError()
log.info(f"Skipping unknown file: '{filepath}'")
return data, metadata