auto update when file changes

This commit is contained in:
CPD 2025-02-04 16:57:09 +01:00
parent 7544d5ca08
commit 4782adbf6a

View File

@ -7,23 +7,48 @@ Created on Fri Jan 24 16:46:06 2025
import time
import re
import numpy as np
import watchdog
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler, FileSystemEventHandler
import os
class LedScriptUpdateHandler(FileSystemEventHandler):
def __init__(self, led_script, verbose=False):
super().__init__()
self.led_script = led_script
self.verbose = verbose
def on_modified(self, event):
if self.verbose:
print("File modified: ", event.src_path)
filename = os.path.basename(self.led_script.filepath)
if os.path.basename(event.src_path) == filename:
try:
self.led_script.update()
except ValueError as e:
print(e)
class LedScript:
def __init__(self, script:np.ndarray|str|int=0):
def __init__(self, script:np.ndarray|str|int=0, auto_update=False, verbose=False):
"""
Parameters
----------
script : np.ndarray|str|int
If np.ndarray: numpy array in this form:
[(duration, cumulative time, state), ...]
Where <duration> is the duration of <state>, and <cumulative duration>
If np.ndarray: structured numpy array with
dtype=[("dt", "f8"), ("dtsum", "f8"), ("led", "i4"), ("line", "i4")]
Where <dt> is the duration of <led>, and <dtsum>
is the sum of all previous durations, including the current one.
The <line> field is the line number if the step in a script and is optional.
If str: path to a led script file
If int: constant led state value
constantValue : TYPE, optional
DESCRIPTION. The default is None.
auto_update: bool, optional
If True and script is a filepath, the script will automatically be reloaded when the file changes
Returns
-------
@ -31,12 +56,38 @@ class LedScript:
"""
self.t_start = 0
self.auto_update = False
self.filepath = None
if type(script) == int:
self.script = np.array([(0., 0., script)])
elif type(script) == np.ndarray:
self.script = script
elif type(script) == str:
self.script = LedScript.parse_script(script, ignore_errors=False)
self.filepath = script
self.auto_update = auto_update
if self.auto_update:
# event_handler = LoggingEventHandler()
event_handler = LedScriptUpdateHandler(self, verbose=True)
self.observer = Observer()
dirname = os.path.dirname(os.path.abspath(self.filepath)) # directory of the file
self.observer.schedule(event_handler, dirname)
self.observer.start()
if verbose: print(f"Led script is watching for updates on '{self.filepath}'")
else:
self.observer = None
self.current_dt = 0
assert(self.script.shape[0] > 0)
def __del__(self):
self.stop()
def stop(self):
print("Led script stopped watching for updates")
if self.observer is not None:
self.observer.stop()
self.observer.join()
def start(self) -> int:
"""
@ -85,12 +136,35 @@ class LedScript:
int
LED Intensity [0,100]
"""
if self.script.shape[0] == 1:
return self.script[0, 2]
distance = self.script[:,1] - dt
self.current_dt = dt
idx = self.get_current_index(dt)
return int(self.script["led"][idx])
@staticmethod
def _get_current_index(script, dt:float):
if script.shape[0] == 1:
return 0
distance = script["dtsum"] - dt
idx = np.where(distance >= 0, distance, np.inf).argmin()
return int(self.script[idx, 2])
return idx
def get_current_index(self, dt:float):
return LedScript._get_current_index(self.script, dt)
def update(self, verbose=True):
print(f"Updating led script from '{self.filepath}'")
newscript = LedScript.parse_script(self.filepath, ignore_errors=False)
idx = self.get_current_index(self.current_dt)
# check that all past command dts are the same
if newscript.shape[0] <= idx:
raise ValueError(f"The new script is too short, it has only {newscript.shape[0]} steps but we are already at state {idx+1}")
if not (newscript["dt"][:idx] == self.script["dt"][:idx]).all():
raise ValueError(f"The new script has to match the old script until before the current state, which is step {idx+1} in line {newscript['line'][idx]}")
newidx = LedScript._get_current_index(newscript, self.current_dt)
# check that the current step dt has not shortened so much that the state should have already changed
if idx != newidx:
raise ValueError(f"The duration of the current step {idx+1} in line {newscript['line'][idx]} in the new script is too short TODO")
self.script = newscript
@staticmethod
def parse_script(filepath, ignore_errors=False):
@ -180,8 +254,8 @@ class LedScript:
else:
cum_duration = states[-1][1] + duration
# 6) append
states.append((duration, cum_duration, state))
states = np.array(states)
states.append((duration, cum_duration, state, i+1))
states = np.array(states, dtype=[("dt", "f8"), ("dtsum", "f8"), ("led", "i4"), ("line", "i4")])
if ignore_errors:
return states, errors
return states