# -*- coding: utf-8 -*- """ Created on Fri Jan 24 16:46:06 2025 @author: CPD """ import os import time import re import numpy as np from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from typing import Callable import logging log = logging.getLogger(__name__) NO_LINE_NR = -1 class InvalidScript(Exception): def __init__(self, lineNr, message, fix=""): self.lineNr = lineNr self.message = message self.fix = fix self.full_message = f"Line {lineNr}: {message} {fix}" super().__init__(self.full_message) class InvalidScriptUpdate(Exception): def __init__(self, message, fix=""): self.message = message self.fix = fix self.full_message = f"{message} {fix}" super().__init__(self.full_message) class LedScriptUpdateHandler(FileSystemEventHandler): def __init__(self, led_script, verbose=False): super().__init__() self.led_script = led_script self.verbose = verbose def on_modified(self, event): filename = os.path.basename(self.led_script.filepath) # print(event.event_type, event.src_path, os.path.basename(event.src_path), filename) if os.path.basename(event.src_path) == filename: if self.verbose: print("File modified: ", event.src_path) try: self.led_script.update_from_file() except ValueError as e: print(e) class LedScript: """ Class representing a script to control the state of a LED """ ARRAY_DTYPE = [("dt", "f8"), ("dtsum", "f8"), ("led", "i4"), ("line", "i4")] def __init__(self, script:np.ndarray|str|int=0, auto_update=False, verbose=False): """ Parameters ---------- script : np.ndarray|str|int If np.ndarray: structured numpy array with dtype=[("dt", "f8"), ("dtsum", "f8"), ("led", "i4"), ("line", "i4")] Where
is the duration of , and is the sum of all previous durations, including the current one. The field is the line number if the step in a script and is optional. If str: path to a led script file If int: constant led state value auto_update: bool, optional If True and script is a filepath, the script will automatically be reloaded when the file changes verbose: bool, optional If True, print messages when important operations occur Returns ------- None. """ self.verbose = verbose self.t_start = None self.auto_update = False self.filepath = None if type(script) == int: self.script = np.array([(0., 0., script, 0)], dtype=LedScript.ARRAY_DTYPE) elif type(script) == np.ndarray: self.script = script elif type(script) == str: self.script = LedScript.parse_script(script, ignore_errors=False) self.filepath = script self.auto_update = auto_update else: raise ValueError("Invalid value for script: Muse be int, np.ndarray or str") self.observer = None if self.auto_update: self.start_updating() self.current_dt = 0 assert(self.script.shape[0] > 0) def __del__(self): self.stop_updating() def copy(self): """ Copy-construct a LedScript, always without auto-updates Returns ------- New LedScript instance with the same script and same start time """ script = LedScript(self.script.copy(), auto_update=False) script.t_start = self.t_start script.current_dt = self.current_dt return script # OPERATION def reset(self): self.current_dt = 0 self.t_start = None def start(self) -> int: """ Start the script and return the initial LED state Returns ------- int LED Intensity [0,100] """ self.t_start = time.time() return self.get_relative_state(0) def is_done(self, t: None|float=None) -> bool: """ Parameters ---------- t : None|float, optional Returns ------- bool Whether is past the script length """ dt = self.get_dt(t) return dt >= self.script["dtsum"][-1] def get_dt(self, t: None|float=None) -> float: if self.t_start is None: raise RuntimeError("Script must be started before get_dt can be used") if t is None: t = time.time() return t - self.t_start def get_state(self, t: None|float=None) -> int: """ Get the LED statefrom an absolute time (relative to when `start` was called) Parameters ---------- t : None|float, optional Absolute time since epoch or None. If None, the current time will be used (using python `time.time()` The default is None. Returns ------- int LED Intensity [0,100] """ dt = self.get_dt(t) return self.get_relative_state(dt) def get_relative_state(self, dt: float) -> int: """ Get the LED state from a script-relative time Parameters ---------- dt : float Number of seconds from the starting point of the script Returns ------- int LED Intensity [0,100] """ self.current_dt = dt idx = self.get_current_index(dt) return int(self.script["led"][idx]) @staticmethod def _get_current_index(script, dt:float) -> int: if script.shape[0] == 1: return 0 if dt > script["dtsum"][-1]: # if time is past the last index return script["dtsum"].shape[0] - 1 distance = script["dtsum"] - dt idx = int(np.where(distance >= 0, distance, np.inf).argmin()) return idx def get_current_index(self, dt:float) -> int: """ Get the index into self.script at `dt` Parameters ---------- dt : float Time relative to script start. Returns ------- int Index into self.script at relative time dt. """ return LedScript._get_current_index(self.script, dt) # UPDATING def start_updating(self): """ Start watching for updates to the script file. Raises ------ ValueError If the LedScript object was initialized with a filepath or if already watching for script updates. Returns ------- None. """ if self.observer is not None: raise RuntimeError("Already watching for updates") if self.filepath is None: raise RuntimeError("Can not watch for updates if the LedScript was not initialized with a file path") # event_handler = LoggingEventHandler() event_handler = LedScriptUpdateHandler(self) #, verbose=True) self.observer = Observer() dirname = os.path.dirname(os.path.abspath(self.filepath)) # directory of the file self.observer.schedule(event_handler, dirname) self.observer.start() if self.verbose: print(f"Led script is watching for updates on '{self.filepath}'") def stop_updating(self): """ Stop watching for updates to the script file. Does nothing if not currently watching for file updates. Returns ------- None. """ if self.observer is not None: self.observer.stop() self.observer.join() self.observer = None if self.verbose: print("Led script stopped watching for updates") def update_from_file(self, verbose=True): """ Update the led script by reloading the initially loaded file """ if verbose: print(f"Updating led script from '{self.filepath}'") log.info(f"Updating led script from '{self.filepath}'") newscript = LedScript.parse_script(self.filepath, ignore_errors=False) self.update_from_script(newscript) def update_from_script(self, newscript: np.ndarray): """ Update the led script from a new led script Parameters ---------- newscript The new led script, must be structured array of type LedScript.ARRAY_DTYPE """ idx = self.get_current_index(self.current_dt) # check that all past command dts are the same if newscript.shape[0] <= idx: raise ValueError(f"The new script is too short, it has only {newscript.shape[0]} steps but we are already at state {idx+1}") if not (newscript["dt"][:idx] == self.script["dt"][:idx]).all(): raise ValueError(f"The new script has to match the old script until before the current state, which is step {idx+1} in line {newscript['line'][idx]}") new_idx = LedScript._get_current_index(newscript, self.current_dt) # check that the current step dt has not shortened so much that the state should have already changed if idx != new_idx: raise ValueError(f"The duration of the current step {idx+1} in line {newscript['line'][idx]} in the new script is too short TODO") self.script = newscript log.info("Updated led script from new script") def update_script_set_row(self, idx: int, dt: float, led: int): """ Update a single line in the script. """ n_rows = self.script["dt"].shape[0] if idx >= n_rows: raise InvalidScriptUpdate(f"Cannot update row {idx+1} the script only has {n_rows} rows") if not (led >= 0 and led <= 100): raise InvalidScriptUpdate(f"Cannot update row {idx+1} because the led value {led}% is not in the range of 0%-100%") if dt < 0: raise InvalidScriptUpdate(f"Cannot update row {idx+1} because the dt value '{dt}' < 0") if self.current_dt: current_idx = self.get_current_index(self.current_dt) if current_idx < idx: raise InvalidScriptUpdate(f"Cannot update row {idx+1} because the current state is already at row {current_idx+1}") elif current_idx == idx: dtsum = 0 if current_idx > 0: dtsum = self.script["dtsum"][current_idx-1] if dtsum + dt <= self.current_dt: raise InvalidScriptUpdate(f"Cannot update row {idx+1} because with the new duration it would be skipped") self.script["dt"][idx] = dt self.script["led"][idx] = led self._update_script_dtsums(idx) log.critical("Updated!") def _update_script_dtsums(self, from_idx: int): for i in range(from_idx, self.script["dtsum"].shape[0]): if i == 0: self.script["dtsum"][i] = self.script["dt"][i] else: self.script["dtsum"][i] = self.script["dtsum"][i-1] + self.script["dt"][i] def update_script_remove_row(self, idx): """ Remove a single line from the script. """ if len(self) >= idx: raise InvalidScriptUpdate(f"Cannot remove row {idx+1} because the script only has {len(self)} rows") self.script = np.delete(self.script, idx) self._update_script_dtsums(idx) def update_script_insert_row(self, idx, dt, led): """ Insert row at index idx """ if idx > len(self): raise InvalidScriptUpdate(f"Cannot insert row {idx+1} because the script only has {len(self)} rows") np.insert(self.script, idx, (dt, led, NO_LINE_NR)) self._update_script_dtsums(idx) def __len__(self): return self.script.shape[0] # LOADING @staticmethod def parse_script(filepath: str, ignore_errors:bool=False) -> np.ndarray|tuple[np.ndarray, list[InvalidScript]]: """ Parse a led script from a file into a structured array Parameters ---------- filepath : str Path to the led script file. ignore_errors : bool, optional If True, does not throw an exception upon script errors. Instead, ignores errornous lines and additionally returns all exceptions in a list. The default is False. Raises ------ InvalidScript If encountering invalid lines in the script. Returns ------- np.ndarray or tuple[np.ndarray, list[InvalidScript]] Returns the script as structured array. For the format, see the docstring of the LedString constructor. If ignore_errors=True, additionally returns a list of all errors. """ with open(filepath, "r") as file: lines = file.readlines() # for just checking scripts, with ignore_errors=True this function returns all errors in an array errors = [] if ignore_errors: def raise_error(*args, **kwargs): e = InvalidScript(*args, **kwargs) errors.append(e) print(e) else: def raise_error(*args, **kwargs): raise InvalidScript(*args, **kwargs) # parse into states = [] timesuffixes = {"h": 3600, "m": 60, "s": 1} for i, l in enumerate(lines): # 0) Check if empty or comment l = l.strip("\n").strip(" ") if l.startswith("#") or len(l) == 0: continue # 1) Separate statements matches = [ match for match in re.finditer(r"[^ \t\n\r]+", l) ] if len(matches) < 2: raise_error(i+1, f"Line has only one statement: '{l}'.", fix="The line needs to have the duration and led state separated by a space.") continue # 2) parse the duration s_duration = matches[0].group() duration_match = re.fullmatch(r"(\d+[hms]?)+", s_duration) if not duration_match: raise_error(i+1, f"Invalid duration: '{s_duration}'.", fix="Duration needs to consist of one ore more pairs, without spaces.") continue duration = 0.0 for m in re.findall(r"\d+[hms]?", s_duration): if m[-1] in timesuffixes: try: t = int(m[:-1]) except: raise_error(i+1, f"Invalid number in duration '{m[-1]}': '{m[:-1]}'") continue t *= timesuffixes[m[-1]] else: try: t = int(m) except: raise_error(i+1, f"Invalid number in duration: '{m}'") continue duration += t # 3) parse the state s_state = matches[1].group() state_match = re.fullmatch(r"(?:(\d+%?)|(on)|(off))(#.*)?", s_state, re.IGNORECASE) if not state_match: raise_error(i+1, f"Not a valid LED state: '{s_state}'", fix="LED state needs to be either 'on', 'off', an integer between [0,100], optionally ending with '%'") continue state_percent, state_on, state_off, ends_with_comment = state_match.groups() if state_percent: state_percent = state_percent.strip("%") try: state = int(state_percent) except: raise_error(i+1, f"Invalid number in led state: '{m}'") continue elif state_on: state = 100 elif state_off: state = 0 else: raise_error(i+1, f"Not a valid LED state: '{s_state}'") continue if state > 100: raise_error(i+1, f"State is larger than 100%: '{state}'") continue elif state < 0: raise_error(i+1, f"State is smaller than 0%: '{state}'") # 4) Check for missing comment symbol # AFTER parsing duration and state, since the source of an error might be an invalid duration or state if not ends_with_comment and len(matches) >= 3: rest = matches[2].group() if not rest.startswith("#"): raise_error(i+1, f"Garbage after statement: '{rest}'.", fix="If this was meant to be a comment, use '#'.") continue # 5) Calculate cumulative duration if len(states) == 0: cum_duration = duration else: cum_duration = states[-1][1] + duration # 6) append states.append((duration, cum_duration, state, i+1)) states = np.array(states, dtype=LedScript.ARRAY_DTYPE) if ignore_errors: return states, errors return states @staticmethod def script_to_file(script: np.ndarray, add_default_header=True, add_extra_header=[]): """Convert a script array to a script text file.""" lines = [] if add_default_header: lines.append("# Led script file automatically generated at the end of the measurement.") lines.append("# It may contain changes made during the measurement, which means") lines.append("# it might not represented the actual values during the measurement.") lines.append(f"# Duration: {int(script['dtsum'][-1])} s") lines.append(f"# Number of steps: {script['dtsum'].shape[0]}") for line in add_extra_header: if line.startswith("#"): lines.append(line) else: lines.append(f"# {line}") lines.append("# Time LED") for i in range(script['dt'].shape[0]): lines.append(f"{split_seconds_into_units_str(int(script['dt'][i]))} {int(script['led'][i])}") return '\n'.join(lines) def to_file(self, add_default_header=True, add_extra_header=[]): """Convert the script to a script text file.""" return LedScript.script_to_file(self.script, add_default_header, add_extra_header) def split_seconds_into_units_str(t: int): """Split seconds into hours, minutes, seconds.""" h = t // 3600 m = (t % 3600) // 60 s = t % 60 ret = "" if h != 0: ret += f"{h}h" if m != 0: ret += f"{m}m" if len(ret) == 0 or s != 0: ret += f"{s}s" return ret