Add option to stop when the script ends
This commit is contained in:
parent
1e46aaa176
commit
154235dbe5
@ -38,10 +38,12 @@ class LedScriptUpdateHandler(FileSystemEventHandler):
|
||||
|
||||
|
||||
class LedScript:
|
||||
"""
|
||||
Class representing a script to control the state of a LED
|
||||
"""
|
||||
ARRAY_DTYPE = [("dt", "f8"), ("dtsum", "f8"), ("led", "i4"), ("line", "i4")]
|
||||
def __init__(self, script:np.ndarray|str|int=0, auto_update=False, verbose=False):
|
||||
"""
|
||||
|
||||
Parameters
|
||||
----------
|
||||
script : np.ndarray|str|int
|
||||
@ -95,6 +97,21 @@ class LedScript:
|
||||
"""
|
||||
self.t_start = time.time()
|
||||
return 0
|
||||
|
||||
def is_done(self, t: None|float=None) -> bool:
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
t : None|float, optional
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
Whether <t> is past the script length
|
||||
"""
|
||||
if t is None: t = time.time()
|
||||
return t >= self.script["dtsum"][-1]
|
||||
|
||||
|
||||
def get_state(self, t: None|float=None) -> int:
|
||||
"""
|
||||
@ -248,12 +265,12 @@ class LedScript:
|
||||
# for just checking scripts, with ignore_errors=True this function returns all errors in an array
|
||||
errors = []
|
||||
if ignore_errors:
|
||||
def raiseError(*args, **kwargs):
|
||||
def raise_error(*args, **kwargs):
|
||||
e = InvalidScript(*args, **kwargs)
|
||||
errors.append(e)
|
||||
print(e)
|
||||
else:
|
||||
def raiseError(*args, **kwargs):
|
||||
def raise_error(*args, **kwargs):
|
||||
raise InvalidScript(*args, **kwargs)
|
||||
|
||||
# parse into <cumulative t in seconds, t state in seconds, state>
|
||||
@ -266,13 +283,13 @@ class LedScript:
|
||||
# 1) Separate statements
|
||||
matches = [ match for match in re.finditer(r"[^ \t\n\r]+", l) ]
|
||||
if len(matches) < 2:
|
||||
raiseError(i+1, f"Line has only one statement: '{l}'.", fix="The line needs to have the duration and led state separated by a space.")
|
||||
raise_error(i+1, f"Line has only one statement: '{l}'.", fix="The line needs to have the duration and led state separated by a space.")
|
||||
continue
|
||||
# 2) parse the duration
|
||||
s_duration = matches[0].group()
|
||||
duration_match = re.fullmatch(r"(\d+[hms]?)+", s_duration)
|
||||
if not duration_match:
|
||||
raiseError(i+1, f"Invalid duration: '{s_duration}'.", fix="Duration needs to consist of one ore more <digits><h/m/s> pairs, without spaces.")
|
||||
raise_error(i+1, f"Invalid duration: '{s_duration}'.", fix="Duration needs to consist of one ore more <digits><h/m/s> pairs, without spaces.")
|
||||
continue
|
||||
duration = 0.0
|
||||
for m in re.findall(r"\d+[hms]?", s_duration):
|
||||
@ -280,21 +297,21 @@ class LedScript:
|
||||
try:
|
||||
t = int(m[:-1])
|
||||
except:
|
||||
raiseError(i+1, f"Invalid number in duration '{m[-1]}': '{m[:-1]}'")
|
||||
raise_error(i+1, f"Invalid number in duration '{m[-1]}': '{m[:-1]}'")
|
||||
continue
|
||||
t *= timesuffixes[m[-1]]
|
||||
else:
|
||||
try:
|
||||
t = int(m)
|
||||
except:
|
||||
raiseError(i+1, f"Invalid number in duration: '{m}'")
|
||||
raise_error(i+1, f"Invalid number in duration: '{m}'")
|
||||
continue
|
||||
duration += t
|
||||
# 3) parse the state
|
||||
s_state = matches[1].group()
|
||||
state_match = re.fullmatch(r"(?:(\d+%?)|(on)|(off))(#.*)?", s_state, re.IGNORECASE)
|
||||
if not state_match:
|
||||
raiseError(i+1, f"Not a valid LED state: '{s_state}'", fix="LED state needs to be either 'on', 'off', an integer between [0,100], optionally ending with '%'")
|
||||
raise_error(i+1, f"Not a valid LED state: '{s_state}'", fix="LED state needs to be either 'on', 'off', an integer between [0,100], optionally ending with '%'")
|
||||
continue
|
||||
state_percent, state_on, state_off, ends_with_comment = state_match.groups()
|
||||
if state_percent:
|
||||
@ -302,25 +319,25 @@ class LedScript:
|
||||
try:
|
||||
state = int(state_percent)
|
||||
except:
|
||||
raiseError(i+1, f"Invalid number in led state: '{m}'")
|
||||
raise_error(i+1, f"Invalid number in led state: '{m}'")
|
||||
continue
|
||||
elif state_on:
|
||||
state = 100
|
||||
elif state_off:
|
||||
state = 0
|
||||
else:
|
||||
raiseError(i+1, f"Not a valid LED state: '{s_state}'")
|
||||
raise_error(i+1, f"Not a valid LED state: '{s_state}'")
|
||||
continue
|
||||
if state > 100:
|
||||
raiseError(i+1, f"State is larger than 100%: '{state}'")
|
||||
raise_error(i+1, f"State is larger than 100%: '{state}'")
|
||||
continue
|
||||
elif state < 0:
|
||||
raiseError(i+1, f"State is smaller than 0%: '{state}'")
|
||||
raise_error(i+1, f"State is smaller than 0%: '{state}'")
|
||||
# 4) Check for missing comment symbol # AFTER parsing duration and state, since the source of an error might be an invalid duration or state
|
||||
if not ends_with_comment and len(matches) >= 3:
|
||||
rest = matches[2].group()
|
||||
if not rest.startswith("#"):
|
||||
raiseError(i+1, f"Garbage after statement: '{rest}'.", fix="If this was meant to be a comment, use '#'.")
|
||||
raise_error(i+1, f"Garbage after statement: '{rest}'.", fix="If this was meant to be a comment, use '#'.")
|
||||
continue
|
||||
# 5) Calculate cumulative duration
|
||||
if len(states) == 0:
|
||||
|
@ -24,6 +24,7 @@ def measure(
|
||||
flush_after:int|None=None,
|
||||
use_buffer=False,
|
||||
max_measurements: int=None,
|
||||
stop_on_script_end: bool=False,
|
||||
verbose: bool=False,
|
||||
command_queue: None|Queue=None,
|
||||
data_queue: None|Queue=None
|
||||
@ -51,6 +52,8 @@ def measure(
|
||||
Number of measurements to perform before returning.
|
||||
Note: If use_buffer=True, a few more than max_measurements might be performed
|
||||
The default is None.
|
||||
stop_on_script_end : bool, optional
|
||||
Stop when the script end is reached.
|
||||
verbose : bool, optional
|
||||
If True, print some messages. The default is False.
|
||||
command_queue : None|Connection, optional
|
||||
@ -77,7 +80,6 @@ def measure(
|
||||
# and without microseconds
|
||||
if not "time" in data.metadata:
|
||||
data.metadata["time"] = datetime.datetime.now().replace(microsecond=0).astimezone().isoformat()
|
||||
data.metadata["test"] = "TEST"
|
||||
vm_dev.reset(True)
|
||||
if use_buffer:
|
||||
vm_dev.buffer_measure(delta_t, verbose=True)
|
||||
@ -131,7 +133,9 @@ def measure(
|
||||
time.sleep(dt_sleep)
|
||||
t_iter_start = time.time()
|
||||
# 5) update LED
|
||||
new_led_val = led_script.get_state()
|
||||
if stop_on_script_end and led_script.is_done(t_iter_start):
|
||||
break
|
||||
new_led_val = led_script.get_state(t_iter_start)
|
||||
if new_led_val != led_val:
|
||||
try:
|
||||
led_dev.set_level(new_led_val)
|
||||
@ -144,7 +148,6 @@ def measure(
|
||||
pass
|
||||
data.flush(verbose=verbose)
|
||||
led_dev.off()
|
||||
print(data.metadata)
|
||||
print("Measurement stopped" + " "*50)
|
||||
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user