From 154235dbe55926656191dc30e3af9dcbe2fa26e5 Mon Sep 17 00:00:00 2001 From: CPD Date: Thu, 13 Feb 2025 12:15:33 +0100 Subject: [PATCH] Add option to stop when the script ends --- cpdctrl/led_script.py | 43 +++++++++++++++++++++++++++++------------- cpdctrl/measurement.py | 11 +++++++---- 2 files changed, 37 insertions(+), 17 deletions(-) diff --git a/cpdctrl/led_script.py b/cpdctrl/led_script.py index 231ab3e..48f8961 100644 --- a/cpdctrl/led_script.py +++ b/cpdctrl/led_script.py @@ -38,10 +38,12 @@ class LedScriptUpdateHandler(FileSystemEventHandler): class LedScript: + """ + Class representing a script to control the state of a LED + """ ARRAY_DTYPE = [("dt", "f8"), ("dtsum", "f8"), ("led", "i4"), ("line", "i4")] def __init__(self, script:np.ndarray|str|int=0, auto_update=False, verbose=False): """ - Parameters ---------- script : np.ndarray|str|int @@ -95,6 +97,21 @@ class LedScript: """ self.t_start = time.time() return 0 + + def is_done(self, t: None|float=None) -> bool: + """ + Parameters + ---------- + t : None|float, optional + + Returns + ------- + bool + Whether is past the script length + """ + if t is None: t = time.time() + return t >= self.script["dtsum"][-1] + def get_state(self, t: None|float=None) -> int: """ @@ -248,12 +265,12 @@ class LedScript: # for just checking scripts, with ignore_errors=True this function returns all errors in an array errors = [] if ignore_errors: - def raiseError(*args, **kwargs): + def raise_error(*args, **kwargs): e = InvalidScript(*args, **kwargs) errors.append(e) print(e) else: - def raiseError(*args, **kwargs): + def raise_error(*args, **kwargs): raise InvalidScript(*args, **kwargs) # parse into @@ -266,13 +283,13 @@ class LedScript: # 1) Separate statements matches = [ match for match in re.finditer(r"[^ \t\n\r]+", l) ] if len(matches) < 2: - raiseError(i+1, f"Line has only one statement: '{l}'.", fix="The line needs to have the duration and led state separated by a space.") + raise_error(i+1, f"Line has only one statement: '{l}'.", fix="The line needs to have the duration and led state separated by a space.") continue # 2) parse the duration s_duration = matches[0].group() duration_match = re.fullmatch(r"(\d+[hms]?)+", s_duration) if not duration_match: - raiseError(i+1, f"Invalid duration: '{s_duration}'.", fix="Duration needs to consist of one ore more pairs, without spaces.") + raise_error(i+1, f"Invalid duration: '{s_duration}'.", fix="Duration needs to consist of one ore more pairs, without spaces.") continue duration = 0.0 for m in re.findall(r"\d+[hms]?", s_duration): @@ -280,21 +297,21 @@ class LedScript: try: t = int(m[:-1]) except: - raiseError(i+1, f"Invalid number in duration '{m[-1]}': '{m[:-1]}'") + raise_error(i+1, f"Invalid number in duration '{m[-1]}': '{m[:-1]}'") continue t *= timesuffixes[m[-1]] else: try: t = int(m) except: - raiseError(i+1, f"Invalid number in duration: '{m}'") + raise_error(i+1, f"Invalid number in duration: '{m}'") continue duration += t # 3) parse the state s_state = matches[1].group() state_match = re.fullmatch(r"(?:(\d+%?)|(on)|(off))(#.*)?", s_state, re.IGNORECASE) if not state_match: - raiseError(i+1, f"Not a valid LED state: '{s_state}'", fix="LED state needs to be either 'on', 'off', an integer between [0,100], optionally ending with '%'") + raise_error(i+1, f"Not a valid LED state: '{s_state}'", fix="LED state needs to be either 'on', 'off', an integer between [0,100], optionally ending with '%'") continue state_percent, state_on, state_off, ends_with_comment = state_match.groups() if state_percent: @@ -302,25 +319,25 @@ class LedScript: try: state = int(state_percent) except: - raiseError(i+1, f"Invalid number in led state: '{m}'") + raise_error(i+1, f"Invalid number in led state: '{m}'") continue elif state_on: state = 100 elif state_off: state = 0 else: - raiseError(i+1, f"Not a valid LED state: '{s_state}'") + raise_error(i+1, f"Not a valid LED state: '{s_state}'") continue if state > 100: - raiseError(i+1, f"State is larger than 100%: '{state}'") + raise_error(i+1, f"State is larger than 100%: '{state}'") continue elif state < 0: - raiseError(i+1, f"State is smaller than 0%: '{state}'") + raise_error(i+1, f"State is smaller than 0%: '{state}'") # 4) Check for missing comment symbol # AFTER parsing duration and state, since the source of an error might be an invalid duration or state if not ends_with_comment and len(matches) >= 3: rest = matches[2].group() if not rest.startswith("#"): - raiseError(i+1, f"Garbage after statement: '{rest}'.", fix="If this was meant to be a comment, use '#'.") + raise_error(i+1, f"Garbage after statement: '{rest}'.", fix="If this was meant to be a comment, use '#'.") continue # 5) Calculate cumulative duration if len(states) == 0: diff --git a/cpdctrl/measurement.py b/cpdctrl/measurement.py index da3fd29..25b11fa 100644 --- a/cpdctrl/measurement.py +++ b/cpdctrl/measurement.py @@ -24,6 +24,7 @@ def measure( flush_after:int|None=None, use_buffer=False, max_measurements: int=None, + stop_on_script_end: bool=False, verbose: bool=False, command_queue: None|Queue=None, data_queue: None|Queue=None @@ -51,6 +52,8 @@ def measure( Number of measurements to perform before returning. Note: If use_buffer=True, a few more than max_measurements might be performed The default is None. + stop_on_script_end : bool, optional + Stop when the script end is reached. verbose : bool, optional If True, print some messages. The default is False. command_queue : None|Connection, optional @@ -77,7 +80,6 @@ def measure( # and without microseconds if not "time" in data.metadata: data.metadata["time"] = datetime.datetime.now().replace(microsecond=0).astimezone().isoformat() - data.metadata["test"] = "TEST" vm_dev.reset(True) if use_buffer: vm_dev.buffer_measure(delta_t, verbose=True) @@ -131,7 +133,9 @@ def measure( time.sleep(dt_sleep) t_iter_start = time.time() # 5) update LED - new_led_val = led_script.get_state() + if stop_on_script_end and led_script.is_done(t_iter_start): + break + new_led_val = led_script.get_state(t_iter_start) if new_led_val != led_val: try: led_dev.set_level(new_led_val) @@ -144,7 +148,6 @@ def measure( pass data.flush(verbose=verbose) led_dev.off() - print(data.metadata) - print("Measurement stopped" + " "*50) +