diff --git a/cpdctrl/led_script.py b/cpdctrl/led_script.py index 320d220..37ae0ec 100644 --- a/cpdctrl/led_script.py +++ b/cpdctrl/led_script.py @@ -7,23 +7,48 @@ Created on Fri Jan 24 16:46:06 2025 import time import re import numpy as np +import watchdog +from watchdog.observers import Observer +from watchdog.events import LoggingEventHandler, FileSystemEventHandler +import os + +class LedScriptUpdateHandler(FileSystemEventHandler): + def __init__(self, led_script, verbose=False): + super().__init__() + self.led_script = led_script + self.verbose = verbose + + def on_modified(self, event): + if self.verbose: + print("File modified: ", event.src_path) + filename = os.path.basename(self.led_script.filepath) + if os.path.basename(event.src_path) == filename: + try: + self.led_script.update() + except ValueError as e: + print(e) + class LedScript: - def __init__(self, script:np.ndarray|str|int=0): + def __init__(self, script:np.ndarray|str|int=0, auto_update=False, verbose=False): """ Parameters ---------- script : np.ndarray|str|int - If np.ndarray: numpy array in this form: - [(duration, cumulative time, state), ...] - Where is the duration of , and + If np.ndarray: structured numpy array with + dtype=[("dt", "f8"), ("dtsum", "f8"), ("led", "i4"), ("line", "i4")] + + Where
is the duration of , and is the sum of all previous durations, including the current one. + The field is the line number if the step in a script and is optional. If str: path to a led script file If int: constant led state value constantValue : TYPE, optional DESCRIPTION. The default is None. + auto_update: bool, optional + If True and script is a filepath, the script will automatically be reloaded when the file changes Returns ------- @@ -31,12 +56,38 @@ class LedScript: """ self.t_start = 0 + self.auto_update = False + self.filepath = None if type(script) == int: self.script = np.array([(0., 0., script)]) elif type(script) == np.ndarray: self.script = script elif type(script) == str: self.script = LedScript.parse_script(script, ignore_errors=False) + self.filepath = script + self.auto_update = auto_update + if self.auto_update: + # event_handler = LoggingEventHandler() + event_handler = LedScriptUpdateHandler(self, verbose=True) + self.observer = Observer() + dirname = os.path.dirname(os.path.abspath(self.filepath)) # directory of the file + self.observer.schedule(event_handler, dirname) + self.observer.start() + if verbose: print(f"Led script is watching for updates on '{self.filepath}'") + else: + self.observer = None + self.current_dt = 0 + assert(self.script.shape[0] > 0) + + def __del__(self): + self.stop() + + def stop(self): + print("Led script stopped watching for updates") + if self.observer is not None: + self.observer.stop() + self.observer.join() + def start(self) -> int: """ @@ -85,12 +136,35 @@ class LedScript: int LED Intensity [0,100] """ - if self.script.shape[0] == 1: - return self.script[0, 2] - distance = self.script[:,1] - dt + self.current_dt = dt + idx = self.get_current_index(dt) + return int(self.script["led"][idx]) + + @staticmethod + def _get_current_index(script, dt:float): + if script.shape[0] == 1: + return 0 + distance = script["dtsum"] - dt idx = np.where(distance >= 0, distance, np.inf).argmin() - return int(self.script[idx, 2]) + return idx + + def get_current_index(self, dt:float): + return LedScript._get_current_index(self.script, dt) + def update(self, verbose=True): + print(f"Updating led script from '{self.filepath}'") + newscript = LedScript.parse_script(self.filepath, ignore_errors=False) + idx = self.get_current_index(self.current_dt) + # check that all past command dts are the same + if newscript.shape[0] <= idx: + raise ValueError(f"The new script is too short, it has only {newscript.shape[0]} steps but we are already at state {idx+1}") + if not (newscript["dt"][:idx] == self.script["dt"][:idx]).all(): + raise ValueError(f"The new script has to match the old script until before the current state, which is step {idx+1} in line {newscript['line'][idx]}") + newidx = LedScript._get_current_index(newscript, self.current_dt) + # check that the current step dt has not shortened so much that the state should have already changed + if idx != newidx: + raise ValueError(f"The duration of the current step {idx+1} in line {newscript['line'][idx]} in the new script is too short TODO") + self.script = newscript @staticmethod def parse_script(filepath, ignore_errors=False): @@ -180,8 +254,8 @@ class LedScript: else: cum_duration = states[-1][1] + duration # 6) append - states.append((duration, cum_duration, state)) - states = np.array(states) + states.append((duration, cum_duration, state, i+1)) + states = np.array(states, dtype=[("dt", "f8"), ("dtsum", "f8"), ("led", "i4"), ("line", "i4")]) if ignore_errors: return states, errors return states