From c955d7a8f9f040f10981cf7dedf60e8ff709d9de Mon Sep 17 00:00:00 2001
From: CPD <CPD@TUZEWSI-2LN203M.ads.mwn.de>
Date: Mon, 17 Mar 2025 10:14:10 +0100
Subject: [PATCH] BREAKING: Change flush filenames Improve load_csv performance
 using numpy builtin

---
 cpdctrl/utility/data.py | 160 +++++++++++++++++++++-------------------
 1 file changed, 85 insertions(+), 75 deletions(-)

diff --git a/cpdctrl/utility/data.py b/cpdctrl/utility/data.py
index 5022814..fbcd3ae 100644
--- a/cpdctrl/utility/data.py
+++ b/cpdctrl/utility/data.py
@@ -9,7 +9,8 @@ log = logging.getLogger(__name__)
 
 from cpdctrl.utility.file_io import get_next_filename, sanitize_filename
 FLUSH_TYPE = "pickle-ndarray"
-METADATA_FILENAME = "_measurement_metadata.pkl"
+FLUSH_PREFIX = "PART_"
+METADATA_FILENAME = FLUSH_PREFIX + "measurement_metadata.pkl"
 
 
 class DataCollector:
@@ -45,30 +46,18 @@ class DataCollector:
                 self.dirpath = dirpath
         self.assert_directory_exists()
         self.flushed = False
-        
-    
-    def _get_filename(self):
-       return sanitize_filename(get_next_filename(self.name, self.dirpath, digits=5))
-
-    def write_metadata(self):
-        f"""
-        Write the metadata to the disk as '{METADATA_FILENAME}'
-
-        Returns
-        -------
-        None.
-        """
-        filepath = os.path.join(self.dirpath, METADATA_FILENAME)
-        log.debug(f"Writing metadata to {filepath}")
-        with open(filepath, "wb") as file:
-            pickle.dump(self.metadata, file)
-
-    def assert_directory_exists(self):
-        if not os.path.isdir(self.dirpath):
-            os.makedirs(self.dirpath)
 
 
-    def flush(self, verbose:bool=False):
+    # OPERATION
+    def clear(self):
+        self.data = []
+        self.fulldata = None
+
+    def add_data(self, i, t, v, l):
+        self.data.append((i, t, v, l))
+        self.fulldata = None  # no longer up to date
+
+    def flush(self, verbose: bool = False):
         """
         Write the current data to a file and clear the internal data
 
@@ -92,13 +81,13 @@ class DataCollector:
             return
         self.assert_directory_exists()
         if FLUSH_TYPE == "csv":
-            filename = self._get_filename() + ".csv"
+            filename = self._get_flush_filename() + ".csv"
             filepath = os.path.join(self.dirpath, filename)
             log.info(f"Flushing data to {filepath}")
             if verbose: print(f"Flushing data to {filepath}")
             self.to_dataframe().to_csv(filepath, sep=",", index=False, metadata=True)
         elif FLUSH_TYPE == "pickle-ndarray":
-            filename = self._get_filename() + ".ndarray.pkl"
+            filename = self._get_flush_filename() + ".ndarray.pkl"
             filepath = os.path.join(self.dirpath, filename)
             log.info(f"Flushing data to {filepath}")
             if verbose: print(f"Flushing data to {filepath}")
@@ -107,32 +96,18 @@ class DataCollector:
         else:
             raise ValueError(f"Invalid FLUSH_TYPE: '{FLUSH_TYPE}'")
         self.clear()
-                
-    def clear(self):
-        self.data = []
-        self.fulldata = None
 
-    def add_data(self, i, t, v, l):
-        self.data.append((i, t, v, l))
-        self.fulldata = None  # no longer up to date
-    
+    # Convert data
     def to_dataframe(self):
         df = pd.DataFrame(self.data, columns=DataCollector.columns)
         df.meta = str(self.metadata)
         return df
-    
+
     def to_csv(self, sep=","):
         # self.to_dataframe().to_csv(os.path.join(self.path, self.name + ".csv"), index=False, metadata=True)
         data, metadata = self.get_data()
         return DataCollector.get_csv(data, self.metadata, sep=sep)
-    
-    def save_csv(self, sep=",", verbose=False):
-        filepath = os.path.join(self.path, self.dirname + ".csv")
-        if verbose: print(f"Writing csv to {filepath}")
-        log.info(f"Writing csv to {filepath}")
-        with open(filepath, "w") as file:
-            file.write(self.to_csv(sep=sep))
-        
+
     def get_data(self) -> tuple[np.ndarray, dict]:
         """
         Load the full data and return it together with the metadata
@@ -145,7 +120,47 @@ class DataCollector:
             self.fulldata, new_mdata = DataCollector.load_data_from_dir(self.dirpath)
             self.metadata |= new_mdata
         return self.fulldata, self.metadata
-    
+
+    # File IO
+    def _get_flush_filename(self):
+        """Get the filename of the next partial file, incrementing the number every time"""
+        return sanitize_filename(get_next_filename(FLUSH_PREFIX + self.name, self.dirpath, digits=5))
+
+    def assert_directory_exists(self):
+        if not os.path.isdir(self.dirpath):
+            os.makedirs(self.dirpath)
+
+
+    def save_csv_at(self, filepath, sep=",", verbose=False):
+        if verbose: print(f"Writing csv to {filepath}")
+        log.info(f"Writing csv to {filepath}")
+        with open(filepath, "w") as file:
+            file.write(self.to_csv(sep=sep))
+
+    def save_csv(self, sep=",", verbose=False):
+        """Save the csv inside the data directory"""
+        filepath = os.path.join(self.path, self.dirname + ".csv")
+        self.save_csv_at(filepath, sep, verbose)
+
+    def save_csv_in_dir(self, sep=",", verbose=False):
+        """Save the csv inside the directory with temporary data"""
+        filepath = os.path.join(self.dirpath, self.dirname + ".csv")
+        self.save_csv_at(filepath, sep, verbose)
+
+    def write_metadata(self):
+        f"""
+        Write the metadata to the disk as '{METADATA_FILENAME}'
+
+        Returns
+        -------
+        None.
+        """
+        filepath = os.path.join(self.dirpath, METADATA_FILENAME)
+        log.debug(f"Writing metadata to {filepath}")
+        with open(filepath, "wb") as file:
+            pickle.dump(self.metadata, file)
+
+    # STATIC LOADERS
     @staticmethod 
     def get_csv(data, metadata, sep=","):
         csv = ""
@@ -157,7 +172,7 @@ class DataCollector:
         return csv.strip("\n")
 
     @staticmethod
-    def load_data_from_csv(filepath:str) -> tuple[np.ndarray, dict]:
+    def load_data_from_csv(filepath:str, sep: str=",") -> tuple[np.ndarray, dict]:
         """
         Loads data from a single csv file.
         Lines with this format are interpreted as metadata:
@@ -168,6 +183,8 @@ class DataCollector:
         ----------
         filepath
            Path to the csv file.
+        sep
+            csv separator
 
         Returns
         -------
@@ -177,9 +194,9 @@ class DataCollector:
             Dictionary with metadata.
         """
         metadata = {}
-        data = np.empty((0, 4))
         with open(filepath, "r") as f:
-            for j, line in enumerate(f):
+            # this loop will read the metadata at the beginning and skip also the header row
+            for line in f:
                 if line.startswith("#"):
                     colon =  line.find(":")
                     if colon == -1:  # normal comment
@@ -187,20 +204,10 @@ class DataCollector:
                     key = line[1:colon].strip()
                     value = line[colon+1:].strip()
                     metadata[key] = value
-                    continue
-                if line.startswith("idx"):   # header line
-                    continue
-                vals = line.split(",")
-                if len(vals) != 4:
-                    raise ValueError(f"Line {j+1}: Line must have 4 values, but has {len(vals)}: '{line}'")
-                try:
-                    i = int(vals[0])
-                    t = float(vals[1])
-                    cpd = float(vals[2])
-                    led = float(vals[3])
-                except ValueError:
-                    raise ValueError(f"Line {j+1}: Failed to convert values to numbers: '{line}'")
-                data = np.append(data, [[i, t, cpd, led]], axis=0)
+                else:
+                    break
+            # here, the generator has only data lines
+            data = np.loadtxt(f, delimiter=sep)
         return data, metadata
 
     @staticmethod
@@ -232,23 +239,26 @@ class DataCollector:
         metadata = {}
         for filename in files:
             filepath = os.path.join(dirpath, filename)
-            if filename.endswith(".csv"):
-                if verbose: print(f"Opening {filepath} as csv")
-                df = pd.read_csv(filepath)
-                arr = df.to_numpy()
-                data = np.concatenate((data, arr))
-            elif filename.endswith(".ndarray.pkl"):
-                with open(filepath, "rb") as file:
-                    arr = pickle.load(file)
-                    if len(arr.shape) != 2 or arr.shape[1] != 4:
-                        print(f"Skipping file '{filepath}' with invalid array shape: {arr.shape}")
-                        continue
+            if filename.startswith(FLUSH_PREFIX):
+                if filename.endswith(".csv"):
+                    if verbose: print(f"Opening {filepath} as csv")
+                    df = pd.read_csv(filepath)
+                    arr = df.to_numpy()
                     data = np.concatenate((data, arr))
-            elif filename == METADATA_FILENAME:
-                with open(filepath, "rb") as file:
-                    metadata = pickle.load(file)
+                elif filename.endswith(".ndarray.pkl"):
+                    with open(filepath, "rb") as file:
+                        arr = pickle.load(file)
+                        if len(arr.shape) != 2 or arr.shape[1] != 4:
+                            print(f"Skipping file '{filepath}' with invalid array shape: {arr.shape}")
+                            continue
+                        data = np.concatenate((data, arr))
+                elif filename == METADATA_FILENAME:  # Metadata filename must also start with FLUSH_PREFIX
+                    with open(filepath, "rb") as file:
+                        metadata = pickle.load(file)
+                else:
+                    raise NotImplementedError(f"Unknown file extension for file '{filepath}'")
             else:
-                raise NotImplementedError()
+                log.info(f"Skipping unknown file: '{filepath}'")
         return data, metadata