Split load in two funcitons
This commit is contained in:
parent
4fc43e7db3
commit
60eee2c54c
@ -11,6 +11,8 @@ import numpy as np
|
|||||||
from watchdog.observers import Observer
|
from watchdog.observers import Observer
|
||||||
from watchdog.events import FileSystemEventHandler
|
from watchdog.events import FileSystemEventHandler
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
|
import logging
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class InvalidScript(Exception):
|
class InvalidScript(Exception):
|
||||||
@ -242,19 +244,34 @@ class LedScript:
|
|||||||
if self.verbose: print("Led script stopped watching for updates")
|
if self.verbose: print("Led script stopped watching for updates")
|
||||||
|
|
||||||
def update_from_file(self, verbose=True):
|
def update_from_file(self, verbose=True):
|
||||||
|
"""
|
||||||
|
Update the led script by reloading the initially loaded file
|
||||||
|
"""
|
||||||
if verbose: print(f"Updating led script from '{self.filepath}'")
|
if verbose: print(f"Updating led script from '{self.filepath}'")
|
||||||
|
log.info(f"Updating led script from '{self.filepath}'")
|
||||||
newscript = LedScript.parse_script(self.filepath, ignore_errors=False)
|
newscript = LedScript.parse_script(self.filepath, ignore_errors=False)
|
||||||
|
self.update_from_script(newscript)
|
||||||
|
|
||||||
|
def update_from_script(self, newscript: np.ndarray):
|
||||||
|
"""
|
||||||
|
Update the led script from a new led script
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
newscript
|
||||||
|
The new led script, must be structured array of type LedScript.ARRAY_DTYPE
|
||||||
|
"""
|
||||||
idx = self.get_current_index(self.current_dt)
|
idx = self.get_current_index(self.current_dt)
|
||||||
# check that all past command dts are the same
|
# check that all past command dts are the same
|
||||||
if newscript.shape[0] <= idx:
|
if newscript.shape[0] <= idx:
|
||||||
raise ValueError(f"The new script is too short, it has only {newscript.shape[0]} steps but we are already at state {idx+1}")
|
raise ValueError(f"The new script is too short, it has only {newscript.shape[0]} steps but we are already at state {idx+1}")
|
||||||
if not (newscript["dt"][:idx] == self.script["dt"][:idx]).all():
|
if not (newscript["dt"][:idx] == self.script["dt"][:idx]).all():
|
||||||
raise ValueError(f"The new script has to match the old script until before the current state, which is step {idx+1} in line {newscript['line'][idx]}")
|
raise ValueError(f"The new script has to match the old script until before the current state, which is step {idx+1} in line {newscript['line'][idx]}")
|
||||||
newidx = LedScript._get_current_index(newscript, self.current_dt)
|
new_idx = LedScript._get_current_index(newscript, self.current_dt)
|
||||||
# check that the current step dt has not shortened so much that the state should have already changed
|
# check that the current step dt has not shortened so much that the state should have already changed
|
||||||
if idx != newidx:
|
if idx != new_idx:
|
||||||
raise ValueError(f"The duration of the current step {idx+1} in line {newscript['line'][idx]} in the new script is too short TODO")
|
raise ValueError(f"The duration of the current step {idx+1} in line {newscript['line'][idx]} in the new script is too short TODO")
|
||||||
self.script = newscript
|
self.script = newscript
|
||||||
|
log.info("Updated led script from new script")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def parse_script(filepath: str, ignore_errors:bool=False) -> np.ndarray|tuple[np.ndarray, list[InvalidScript]]:
|
def parse_script(filepath: str, ignore_errors:bool=False) -> np.ndarray|tuple[np.ndarray, list[InvalidScript]]:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user