Add copy, reset, small changes

This commit is contained in:
CPD 2025-03-13 14:35:57 +01:00
parent c3832725c5
commit f858dd6916

View File

@ -10,6 +10,7 @@ import re
import numpy as np import numpy as np
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
from typing import Callable
class InvalidScript(Exception): class InvalidScript(Exception):
@ -32,7 +33,7 @@ class LedScriptUpdateHandler(FileSystemEventHandler):
filename = os.path.basename(self.led_script.filepath) filename = os.path.basename(self.led_script.filepath)
if os.path.basename(event.src_path) == filename: if os.path.basename(event.src_path) == filename:
try: try:
self.led_script.update() self.led_script.update_from_file()
except ValueError as e: except ValueError as e:
print(e) print(e)
@ -42,7 +43,7 @@ class LedScript:
Class representing a script to control the state of a LED Class representing a script to control the state of a LED
""" """
ARRAY_DTYPE = [("dt", "f8"), ("dtsum", "f8"), ("led", "i4"), ("line", "i4")] ARRAY_DTYPE = [("dt", "f8"), ("dtsum", "f8"), ("led", "i4"), ("line", "i4")]
def __init__(self, script:np.ndarray|str|int=0, auto_update=False, verbose=False): def __init__(self, script:np.ndarray|str|int=0, auto_update=False, on_update_callback:Callable[[], None]|None=None, verbose=False):
""" """
Parameters Parameters
---------- ----------
@ -65,7 +66,7 @@ class LedScript:
""" """
self.verbose = verbose self.verbose = verbose
self.t_start = 0 self.t_start = None
self.auto_update = False self.auto_update = False
self.filepath = None self.filepath = None
if type(script) == int: if type(script) == int:
@ -83,10 +84,26 @@ class LedScript:
self.start_updating() self.start_updating()
self.current_dt = 0 self.current_dt = 0
assert(self.script.shape[0] > 0) assert(self.script.shape[0] > 0)
self.on_update_callback = on_update_callback
def __del__(self): def __del__(self):
self.stop_updating() self.stop_updating()
def reset(self):
self.current_dt = 0
self.t_start = None
def copy(self):
"""
Copy-construct a LedScript, always without auto-updates
Returns
-------
New LedScript instance with the same script and same start time
"""
script = LedScript(self.script.copy(), auto_update=False)
script.t_start = self.t_start
script.current_dt = self.current_dt
return script
def start(self) -> int: def start(self) -> int:
""" """
@ -98,7 +115,7 @@ class LedScript:
LED Intensity [0,100] LED Intensity [0,100]
""" """
self.t_start = time.time() self.t_start = time.time()
return 0 return self.get_relative_state(0)
def is_done(self, t: None|float=None) -> bool: def is_done(self, t: None|float=None) -> bool:
""" """
@ -115,6 +132,7 @@ class LedScript:
return dt >= self.script["dtsum"][-1] return dt >= self.script["dtsum"][-1]
def get_dt(self, t: None|float=None) -> float: def get_dt(self, t: None|float=None) -> float:
if self.t_start is None: raise RuntimeError("Script must be started before get_dt can be used")
if t is None: t = time.time() if t is None: t = time.time()
return t - self.t_start return t - self.t_start
@ -223,8 +241,8 @@ class LedScript:
self.observer = None self.observer = None
if self.verbose: print("Led script stopped watching for updates") if self.verbose: print("Led script stopped watching for updates")
def update(self, verbose=True): def update_from_file(self, verbose=True):
print(f"Updating led script from '{self.filepath}'") if verbose: print(f"Updating led script from '{self.filepath}'")
newscript = LedScript.parse_script(self.filepath, ignore_errors=False) newscript = LedScript.parse_script(self.filepath, ignore_errors=False)
idx = self.get_current_index(self.current_dt) idx = self.get_current_index(self.current_dt)
# check that all past command dts are the same # check that all past command dts are the same
@ -237,6 +255,7 @@ class LedScript:
if idx != newidx: if idx != newidx:
raise ValueError(f"The duration of the current step {idx+1} in line {newscript['line'][idx]} in the new script is too short TODO") raise ValueError(f"The duration of the current step {idx+1} in line {newscript['line'][idx]} in the new script is too short TODO")
self.script = newscript self.script = newscript
if self.on_update_callback is not None: self.on_update_callback()
@staticmethod @staticmethod
def parse_script(filepath: str, ignore_errors:bool=False) -> np.ndarray|tuple[np.ndarray, list[InvalidScript]]: def parse_script(filepath: str, ignore_errors:bool=False) -> np.ndarray|tuple[np.ndarray, list[InvalidScript]]: