diff --git a/cpdctrl/led_script.py b/cpdctrl/led_script.py index ffce452..4ae2827 100644 --- a/cpdctrl/led_script.py +++ b/cpdctrl/led_script.py @@ -10,6 +10,7 @@ import re import numpy as np from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler +from typing import Callable class InvalidScript(Exception): @@ -32,7 +33,7 @@ class LedScriptUpdateHandler(FileSystemEventHandler): filename = os.path.basename(self.led_script.filepath) if os.path.basename(event.src_path) == filename: try: - self.led_script.update() + self.led_script.update_from_file() except ValueError as e: print(e) @@ -42,7 +43,7 @@ class LedScript: Class representing a script to control the state of a LED """ ARRAY_DTYPE = [("dt", "f8"), ("dtsum", "f8"), ("led", "i4"), ("line", "i4")] - def __init__(self, script:np.ndarray|str|int=0, auto_update=False, verbose=False): + def __init__(self, script:np.ndarray|str|int=0, auto_update=False, on_update_callback:Callable[[], None]|None=None, verbose=False): """ Parameters ---------- @@ -65,7 +66,7 @@ class LedScript: """ self.verbose = verbose - self.t_start = 0 + self.t_start = None self.auto_update = False self.filepath = None if type(script) == int: @@ -83,10 +84,26 @@ class LedScript: self.start_updating() self.current_dt = 0 assert(self.script.shape[0] > 0) + self.on_update_callback = on_update_callback def __del__(self): self.stop_updating() - + + def reset(self): + self.current_dt = 0 + self.t_start = None + + def copy(self): + """ + Copy-construct a LedScript, always without auto-updates + Returns + ------- + New LedScript instance with the same script and same start time + """ + script = LedScript(self.script.copy(), auto_update=False) + script.t_start = self.t_start + script.current_dt = self.current_dt + return script def start(self) -> int: """ @@ -98,7 +115,7 @@ class LedScript: LED Intensity [0,100] """ self.t_start = time.time() - return 0 + return self.get_relative_state(0) def is_done(self, t: None|float=None) -> bool: """ @@ -115,6 +132,7 @@ class LedScript: return dt >= self.script["dtsum"][-1] def get_dt(self, t: None|float=None) -> float: + if self.t_start is None: raise RuntimeError("Script must be started before get_dt can be used") if t is None: t = time.time() return t - self.t_start @@ -223,8 +241,8 @@ class LedScript: self.observer = None if self.verbose: print("Led script stopped watching for updates") - def update(self, verbose=True): - print(f"Updating led script from '{self.filepath}'") + def update_from_file(self, verbose=True): + if verbose: print(f"Updating led script from '{self.filepath}'") newscript = LedScript.parse_script(self.filepath, ignore_errors=False) idx = self.get_current_index(self.current_dt) # check that all past command dts are the same @@ -237,6 +255,7 @@ class LedScript: if idx != newidx: raise ValueError(f"The duration of the current step {idx+1} in line {newscript['line'][idx]} in the new script is too short TODO") self.script = newscript + if self.on_update_callback is not None: self.on_update_callback() @staticmethod def parse_script(filepath: str, ignore_errors:bool=False) -> np.ndarray|tuple[np.ndarray, list[InvalidScript]]: