Add dedicated update methods

This commit is contained in:
CPD 2025-03-17 15:09:33 +01:00
parent 60eee2c54c
commit 041fcf6b71

View File

@ -14,6 +14,7 @@ from typing import Callable
import logging
log = logging.getLogger(__name__)
NO_LINE_NR = -1
class InvalidScript(Exception):
def __init__(self, lineNr, message, fix=""):
@ -23,6 +24,13 @@ class InvalidScript(Exception):
self.full_message = f"Line {lineNr}: {message} {fix}"
super().__init__(self.full_message)
class InvalidScriptUpdate(Exception):
def __init__(self, message, fix=""):
self.message = message
self.fix = fix
self.full_message = f"{message} {fix}"
super().__init__(self.full_message)
class LedScriptUpdateHandler(FileSystemEventHandler):
def __init__(self, led_script, verbose=False):
super().__init__()
@ -91,10 +99,6 @@ class LedScript:
def __del__(self):
self.stop_updating()
def reset(self):
self.current_dt = 0
self.t_start = None
def copy(self):
"""
Copy-construct a LedScript, always without auto-updates
@ -102,11 +106,16 @@ class LedScript:
-------
New LedScript instance with the same script and same start time
"""
script = LedScript(self.script.copy(), auto_update=False)
script = LedScript(self.script.copy(), auto_update=False)
script.t_start = self.t_start
script.current_dt = self.current_dt
return script
# OPERATION
def reset(self):
self.current_dt = 0
self.t_start = None
def start(self) -> int:
"""
Start the script and return the initial LED state
@ -200,7 +209,8 @@ class LedScript:
Index into self.script at relative time dt.
"""
return LedScript._get_current_index(self.script, dt)
# UPDATING
def start_updating(self):
"""
Start watching for updates to the script file.
@ -273,6 +283,59 @@ class LedScript:
self.script = newscript
log.info("Updated led script from new script")
def update_script_set_row(self, idx: int, dt: float, led: int):
"""
Update a single line in the script.
"""
n_rows = self.script["dt"].shape[0]
if idx >= n_rows:
raise InvalidScriptUpdate(f"Cannot update row {idx+1} the script only has {n_rows} rows")
if not (led >= 0 and led <= 100):
raise InvalidScriptUpdate(f"Cannot update row {idx+1} because the led value {led}% is not in the range of 0%-100%")
if self.current_dt:
current_idx = self.get_current_index(self.current_dt)
if current_idx < idx:
raise InvalidScriptUpdate(f"Cannot update row {idx+1} because the current state is already at row {current_idx+1}")
elif current_idx == idx:
dtsum = 0
if current_idx > 0:
dtsum = self.script["dtsum"][current_idx-1]
if dtsum + dt <= self.current_dt:
raise InvalidScriptUpdate(f"Cannot update row {idx+1} because with the new duration it would be skipped")
self.script["dt"][idx] = dt
self.script["led"][idx] = led
self._update_script_dtsums(idx)
log.critical("Updated!")
def _update_script_dtsums(self, from_idx: int):
for i in range(from_idx, self.script["dtsum"].shape[0]):
if i == 0:
self.script["dtsum"][i] = self.script["dt"][i]
else:
self.script["dtsum"][i] = self.script["dtsum"][i-1] + self.script["dt"][i]
def update_script_remove_row(self, idx):
"""
Remove a single line from the script.
"""
if len(self) >= idx:
raise InvalidScriptUpdate(f"Cannot remove row {idx+1} because the script only has {len(self)} rows")
self.script = np.delete(self.script, idx)
self._update_script_dtsums(idx)
def update_script_insert_row(self, idx, dt, led):
"""
Insert row at index idx
"""
if idx > len(self):
raise InvalidScriptUpdate(f"Cannot insert row {idx+1} because the script only has {len(self)} rows")
np.insert(self.script, idx, (dt, led, NO_LINE_NR))
self._update_script_dtsums(idx)
def __len__(self):
return self.script.shape[0]
# LOADING
@staticmethod
def parse_script(filepath: str, ignore_errors:bool=False) -> np.ndarray|tuple[np.ndarray, list[InvalidScript]]:
"""