diff --git a/cpdctrl/cpdctrl-interactive.py b/cpdctrl/cpdctrl-interactive.py index 5b387da..e5d70aa 100644 --- a/cpdctrl/cpdctrl-interactive.py +++ b/cpdctrl/cpdctrl-interactive.py @@ -79,7 +79,7 @@ led: LedControlDevice|None = None data = DataCollector(data_path=settings["datadir"], data_name="interactive", dirname="interactive_test", dir_exists_is_ok=True) t0 = 0 -def monitor(script: str|int=0, interval=None, flush_after=None, max_measurements=None, max_points_shown=None): +def monitor(script: str|int=0, interval=None, flush_after=None, use_buffer=False, max_measurements=None, max_points_shown=None): """ Monitor the voltage with matplotlib. @@ -99,21 +99,32 @@ def monitor(script: str|int=0, interval=None, flush_after=None, max_measurements plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown) led_script = LedScript(script=script, auto_update=True, verbose=True) data.clear() - queue = mp.Queue() - pipe_send, pipe_recv = mp.Pipe() - # TODO: pass instruments - proc_measure = mt.Thread(target=_measure, args=(dev, led, led_script, data, interval, flush_after, max_measurements, False, pipe_recv, queue)) + data_queue = mp.Queue() + command_queue = mp.Queue() + # Argument order must match the definition + proc_measure = mt.Thread(target=_measure, args=(dev, led, led_script, data, + interval, + flush_after, + use_buffer, + max_measurements, + False, # verbose + command_queue, + data_queue + )) proc_measure.start() try: - while True: - current_data = queue.get(block=True, timeout=30) - i, tval, vval, led_val = current_data - plt_monitor.update(i, tval, vval, led_val) + while proc_measure.is_alive(): + while not data_queue.empty(): + # print(data_queue.qsize(), "\n\n") + current_data = data_queue.get(block=False) + i, tval, vval, led_val = current_data + plt_monitor.update(i, tval, vval, led_val) + except KeyboardInterrupt: pass - pipe_send.send("stop") + command_queue.put("stop") proc_measure.join() - print(data.metadata) + led_script.stop_updating() # stop watching for file updates (if enabled) data.save_csv(verbose=True) @@ -231,12 +242,8 @@ Enter 'help()' for a list of commands""") try: pass - # dev = _volt.init("GPIB0::22::INSTR") - # TODO - # manager = BaseManager() - # manager.start() - # led = manager._led.LEDD1B() - # led = _led.LEDD1B() + dev = _volt.init("GPIB0::22::INSTR") + led = _led.LEDD1B() except Exception as e: print(e) exit(1) diff --git a/cpdctrl/keithley_scripts/scan.scpi b/cpdctrl/keithley_scripts/scan.scpi index ea997a1..7b3e4f8 100644 --- a/cpdctrl/keithley_scripts/scan.scpi +++ b/cpdctrl/keithley_scripts/scan.scpi @@ -2,5 +2,4 @@ INIT:CONT OFF ' two readings per second TRIGger:SOURce TIMer -TRIGger:TIMer 0.5 - +TRIGger:TIMer 0.5 \ No newline at end of file diff --git a/cpdctrl/led_script.py b/cpdctrl/led_script.py index 37ae0ec..231ab3e 100644 --- a/cpdctrl/led_script.py +++ b/cpdctrl/led_script.py @@ -4,13 +4,21 @@ Created on Fri Jan 24 16:46:06 2025 @author: CPD """ +import os import time import re import numpy as np -import watchdog from watchdog.observers import Observer -from watchdog.events import LoggingEventHandler, FileSystemEventHandler -import os +from watchdog.events import FileSystemEventHandler + + +class InvalidScript(Exception): + def __init__(self, lineNr, message, fix=""): + self.lineNr = lineNr + self.message = message + self.fix = fix + self.full_message = f"Line {lineNr}: {message} {fix}" + super().__init__(self.full_message) class LedScriptUpdateHandler(FileSystemEventHandler): def __init__(self, led_script, verbose=False): @@ -30,10 +38,10 @@ class LedScriptUpdateHandler(FileSystemEventHandler): class LedScript: + ARRAY_DTYPE = [("dt", "f8"), ("dtsum", "f8"), ("led", "i4"), ("line", "i4")] def __init__(self, script:np.ndarray|str|int=0, auto_update=False, verbose=False): """ - Parameters ---------- script : np.ndarray|str|int @@ -45,49 +53,36 @@ class LedScript: The field is the line number if the step in a script and is optional. If str: path to a led script file If int: constant led state value - constantValue : TYPE, optional - DESCRIPTION. The default is None. auto_update: bool, optional If True and script is a filepath, the script will automatically be reloaded when the file changes - + verbose: bool, optional + If True, print messages when important operations occur Returns ------- None. """ + self.verbose = verbose self.t_start = 0 self.auto_update = False self.filepath = None if type(script) == int: - self.script = np.array([(0., 0., script)]) + self.script = np.array([(0., 0., script, 0)], dtype=LedScript.ARRAY_DTYPE) elif type(script) == np.ndarray: self.script = script elif type(script) == str: self.script = LedScript.parse_script(script, ignore_errors=False) self.filepath = script self.auto_update = auto_update + self.observer = None if self.auto_update: - # event_handler = LoggingEventHandler() - event_handler = LedScriptUpdateHandler(self, verbose=True) - self.observer = Observer() - dirname = os.path.dirname(os.path.abspath(self.filepath)) # directory of the file - self.observer.schedule(event_handler, dirname) - self.observer.start() - if verbose: print(f"Led script is watching for updates on '{self.filepath}'") - else: - self.observer = None + self.start_updating() self.current_dt = 0 assert(self.script.shape[0] > 0) def __del__(self): - self.stop() - - def stop(self): - print("Led script stopped watching for updates") - if self.observer is not None: - self.observer.stop() - self.observer.join() - + self.stop_updating() + def start(self) -> int: """ @@ -141,16 +136,71 @@ class LedScript: return int(self.script["led"][idx]) @staticmethod - def _get_current_index(script, dt:float): + def _get_current_index(script, dt:float) -> int: if script.shape[0] == 1: return 0 distance = script["dtsum"] - dt idx = np.where(distance >= 0, distance, np.inf).argmin() return idx - def get_current_index(self, dt:float): - return LedScript._get_current_index(self.script, dt) + def get_current_index(self, dt:float) -> int: + """ + Get the index into self.script at `dt` + Parameters + ---------- + dt : float + Time relative to script start. + + Returns + ------- + int + Index into self.script at relative time dt. + """ + return LedScript._get_current_index(self.script, dt) + + def start_updating(self): + """ + Start watching for updates to the script file. + + Raises + ------ + ValueError + If the LedScript object was initialized with a filepath or if already watching for script updates. + + Returns + ------- + None. + + """ + if self.observer is not None: + raise ValueError("Already watching for updates") + if self.filepath is None: + raise ValueError("Can not watch for updates if the LedScript was not initialized with a file path") + # event_handler = LoggingEventHandler() + event_handler = LedScriptUpdateHandler(self) #, verbose=True) + self.observer = Observer() + dirname = os.path.dirname(os.path.abspath(self.filepath)) # directory of the file + self.observer.schedule(event_handler, dirname) + self.observer.start() + if self.verbose: print(f"Led script is watching for updates on '{self.filepath}'") + + + def stop_updating(self): + """ + Stop watching for updates to the script file. + Does nothing if not currently watching for file updates. + + Returns + ------- + None. + """ + if self.observer is not None: + self.observer.stop() + self.observer.join() + self.observer = None + if self.verbose: print("Led script stopped watching for updates") + def update(self, verbose=True): print(f"Updating led script from '{self.filepath}'") newscript = LedScript.parse_script(self.filepath, ignore_errors=False) @@ -167,7 +217,31 @@ class LedScript: self.script = newscript @staticmethod - def parse_script(filepath, ignore_errors=False): + def parse_script(filepath: str, ignore_errors:bool=False) -> np.ndarray|tuple[np.ndarray, list[InvalidScript]]: + """ + Parse a led script from a file into a structured array + + Parameters + ---------- + filepath : str + Path to the led script file. + ignore_errors : bool, optional + If True, does not throw an exception upon script errors. + Instead, ignores errornous lines and additionally returns all exceptions in a list. + The default is False. + + Raises + ------ + InvalidScript + If encountering invalid lines in the script. + + Returns + ------- + np.ndarray or tuple[np.ndarray, list[InvalidScript]] + Returns the script as structured array. For the format, see the + docstring of the LedString constructor. + If ignore_errors=True, additionally returns a list of all errors. + """ with open(filepath, "r") as file: lines = file.readlines() @@ -255,15 +329,8 @@ class LedScript: cum_duration = states[-1][1] + duration # 6) append states.append((duration, cum_duration, state, i+1)) - states = np.array(states, dtype=[("dt", "f8"), ("dtsum", "f8"), ("led", "i4"), ("line", "i4")]) + states = np.array(states, dtype=LedScript.ARRAY_DTYPE) if ignore_errors: return states, errors return states -class InvalidScript(Exception): - def __init__(self, lineNr, message, fix=""): - self.lineNr = lineNr - self.message = message - self.fix = fix - self.full_message = f"Line {lineNr}: {message} {fix}" - super().__init__(self.full_message) \ No newline at end of file diff --git a/cpdctrl/measurement.py b/cpdctrl/measurement.py index 25f90aa..d507f22 100644 --- a/cpdctrl/measurement.py +++ b/cpdctrl/measurement.py @@ -23,52 +23,108 @@ def measure( led_script: LedScript, data: DataCollector, delta_t: float=0.1, - flush_after:int|None=None, + flush_after:int|None=None, + use_buffer=False, max_measurements: int=None, verbose: bool=False, - pipe: None|Connection=None, - queue: None|Queue=None + command_queue: None|Queue=None, + data_queue: None|Queue=None ): - # TODO: find a way to move inherited objects into a process - if led_dev is None: - led_dev = LEDD1B() - if vm_dev is None: - vm_dev = init("GPIB0::22::INSTR") + """ + Perform a measurement + + Parameters + ---------- + vm_dev : VoltageMeasurementDevice + DESCRIPTION. + led_dev : LedControlDevice + DESCRIPTION. + led_script : LedScript + DESCRIPTION. + data : DataCollector + DESCRIPTION. + delta_t : float, optional + Target interval between measurements and led updates. The default is 0.1. + flush_after : int|None, optional + If int, flush values to disk after . The default is None. + use_buffer : TYPE, optional + If True, use the buffer measurement mode. The default is False. + max_measurements : int, optional + Number of measurements to perform before returning. + Note: If use_buffer=True, a few more than max_measurements might be performed + The default is None. + verbose : bool, optional + If True, print some messages. The default is False. + command_queue : None|Connection, optional + A queue to receive to commands from. + Commands may be: + "stop" -> stops the measurement + ("led_script", ) a new led script to use + The default is None. + data_queue : None|Queue, optional + A queue to put data in. The default is None. + + Returns + ------- + None. + + """ + # old hack when using multiprocessing instead of mulithreading: + # devices are not pickleable and thus cant be moved to / shared with the measurement process + # if led_dev is None: + # led_dev = LEDD1B() + # if vm_dev is None: + # vm_dev = init("GPIB0::22::INSTR") # if no "time" in metadata, set the current local time in ISO 8601 format # and without microseconds if not "time" in data.metadata: data.metadata["time"] = datetime.datetime.now().replace(microsecond=0).isoformat() data.metadata["test"] = "TEST" vm_dev.reset(True) + if use_buffer: + vm_dev.buffer_measure(delta_t, verbose=True) try: i = 0 led_val = led_script.start() t_iter_start = time.time() while max_measurements is None or i < max_measurements: - # 1) read value - tval, vval = vm_dev.read_value() - if i == 0: - t0 = tval - tval -= t0 - current_data = (i, tval, vval, led_val) - data.add_data(*current_data) - # 2) write data - print(f"n = {i:6d}, t = {tval: .2f} s, U = {vval: .5f} V, LED = {led_val:03}%" + " "*10, end='\r') - if flush_after is not None and (i+1) % flush_after == 0: - data.flush(verbose=verbose) - # if a queue was given, put the data - if queue is not None: - queue.put(current_data) + # 1) read value(s) + if use_buffer: + try: + values = vm_dev.buffer_read_new_values() + except ValueError as e: + print(f"Error in buffer measurement {i}:", e) + values = [] + else: + values = [vm_dev.read_value()] + # print(values) + # 2) process value(s) + for (tval, vval) in values: + if i == 0: + t0 = tval + tval -= t0 + current_data = (i, tval, vval, led_val) + data.add_data(*current_data) + # 3) write data + print(f"n = {i:6d}, t = {tval: .2f} s, U = {vval: .5f} V, LED = {led_val:03}%" + " "*10, end='\r') + if flush_after is not None and (i+1) % flush_after == 0: + data.flush(verbose=verbose) + # if a queue was given, put the data + if data_queue is not None: + data_queue.put(current_data) + i += 1 + # if a pipe was given, check for messages - if pipe is not None and pipe.poll(0): - recv = pipe.recv() + if command_queue is not None and command_queue.qsize() > 0: + recv = command_queue.get(block=False) if recv == "stop": break elif type(recv) == tuple and recv[0] == "led_script": led_script = recv[1] else: print(f"Received invalid message: '{recv}'") - # 3) sleep + + # 4) sleep # substract the execution time from the sleep time for a more # acurate frequency dt_sleep = delta_t - (time.time() - t_iter_start) @@ -76,7 +132,7 @@ def measure( # print(f"Sleeping for {dt_sleep}") time.sleep(dt_sleep) t_iter_start = time.time() - # 4) update LED + # 5) update LED new_led_val = led_script.get_state() if new_led_val != led_val: try: @@ -85,10 +141,12 @@ def measure( except Exception as e: print(f"Error setting led to {new_led_val}%:") print(e) - i += 1 + except KeyboardInterrupt: pass data.flush(verbose=verbose) led_dev.off() print(data.metadata) - print("Measurement stopped" + " "*50) \ No newline at end of file + print("Measurement stopped" + " "*50) + + diff --git a/cpdctrl/voltage_measurement_device/impl/keithley2700.py b/cpdctrl/voltage_measurement_device/impl/keithley2700.py index aeb6d64..26209ea 100644 --- a/cpdctrl/voltage_measurement_device/impl/keithley2700.py +++ b/cpdctrl/voltage_measurement_device/impl/keithley2700.py @@ -52,6 +52,7 @@ class Keithley2700(VoltageMeasurementDevice): self.instr = instr if check_front_switch: self._check_front_input_selected() + self.buffer_next_idx = None def __del__(self): """Properly close the instrument connection""" @@ -75,6 +76,17 @@ class Keithley2700(VoltageMeasurementDevice): raise Exception("The Keithley's INPUT switch must select the [F]ront inputs") return switch + + def query(self, query): + try: + return self.instr.query(query).strip("\n") + except pyvisa.VisaIOError as e: + print(f"VisaIOError raised during query: '{query}'") + raise e + + def query_int(self, query): + return int(float(self.query(query))) + # RUN COMMANDS ON THE DEVICE def run(self, code, verbose=False): """ @@ -85,11 +97,19 @@ class Keithley2700(VoltageMeasurementDevice): code : str SCPI commands """ - script = '\n'.join([l.strip(" ") for l in code.strip(" ").strip("\n").split("\n") if len(l) > 0 and l[0] not in "#'"]) + script = '' + for line in code.strip(" ").split("\n"): + l = line.strip(" ") + if len(l) == 0 or l[0] in "#'": continue + script += l + "\n" if verbose: print(f"Running code:\n{script}") - self.instr.write(script) - + try: + self.instr.write(script) + except pyvisa.VisaIOError as e: + print(f"VisaIOError raised while writing command(s):\n'{script}'\n") + raise e + def run_script(self, script_path, verbose=False): """ Load the code from script_path and run it via self.run @@ -118,42 +138,101 @@ class Keithley2700(VoltageMeasurementDevice): def reset(self, verbose=False): """ - Reset smua and its buffers - @param instr : pyvisa instrument + Reset the device """ - self.run_script(scripts["instrument_reset"], verbose=verbose) - self.buffer_reset() + + reset_script = """ + VOLT:DC:RANGe:AUTO ON + ' DC Voltage measurement + SENSe:FUNC 'VOLT:DC' + ' Set voltage divider if required + ' SENSE:VOLT:DC:IDIVider OFF + ' Disable continuous initiation + INIT:CONT OFF + ' Disable Buffer and trigger + TRACe:FEED NONE + TRACe:FEED:CONTrol NEVer + TRIGger:SOURce IMMediate + ' Set timestamp format to relative + SYSTem:TSTamp:TYPE RELative + """ + self.run(reset_script) + # self.run_script(scripts["instrument_reset"], verbose=verbose) + self.buffer_clear() + # INTERACT WITH DEVICE BUFFER # might not be needed - def buffer_reset(self): - buffer_reset = """ + def buffer_clear(self): + buffer_reset_script = """ TRACe:CLEar TRACe:CLEar:AUTO ON - SYSTem:TSTamp:TYPE RELative """ - self.run(buffer_reset) + self.run(buffer_reset_script) - def buffer_get_size(self, buffer_nr=1): - n = self.instr.query("TRACe:POINts?").strip("\n") - return int(float(n)) + def buffer_get_size(self): + return self.query_int("TRACe:POINts?") def buffer_set_size(self, s): if not type(s) == int or s < 2 or s > 55000: raise ValueError(f"Invalid buffer size: {s}. Must be int and between 2 and 55000") self.instr.write(f"TRACe:POINts {s}") + + def buffer_measure(self, interval: float=0.5, verbose=False): + if interval < 0.001 or interval > 999999.999: + raise ValueError("Interval must be between 0.001 and 999999.999") + self.run(f""" + TRIGger:TIMer {interval} + TRIGger:COUNt INFinity + TRIGger:SOURce TIMer + """) + self.buffer_clear() + self.run(""" + TRACe:FEED SENSe + ' write continuously + TRACe:FEED:CONTrol ALWays + INITiate:CONTinuous ON + """) + self.buffer_next_idx = 0 + self.buffer_size = self.buffer_get_size() + if verbose: print("Started buffer measurement") + + def buffer_read_new_values(self): + if self.buffer_next_idx is None: + raise ValueError("You must first start a buffer measurement by calling buffer_measure()") + # TRACe:NEXT? returns the next index that will be written to + new_next_idx = self.query_int("TRACe:NEXT?") + if new_next_idx == self.buffer_next_idx: + raise ValueError(f"No new value or buffer has been filled completely (next reading at {self.buffer_next_idx})") + # if reached end of buffer, first read to end and then from beginning + vals = "" + if new_next_idx < self.buffer_next_idx: + count = self.buffer_size - self.buffer_next_idx + # print(f"start={self.buffer_next_idx}, stop={new_next_idx}, count={count}") + vals += self.query(f"TRACe:DATA:SELected? {self.buffer_next_idx}, {count}").strip("\n") + self.buffer_next_idx = 0 + count = new_next_idx - self.buffer_next_idx + if count > 0: + # print(f"start={self.buffer_next_idx}, stop={new_next_idx}, count={count}") + vals += self.query(f"TRACe:DATA:SELected? {self.buffer_next_idx}, {count}").strip("\n") + self.buffer_next_idx = new_next_idx + processed_vals = [self.process_reading(val) for val in vals.strip("#").split("#")] + return processed_vals + # MEASUREMENT - def process_reading(self, reading: str): + @staticmethod + def process_reading(reading: str): """ process a reading. Only works with VDC and relative time stamps right now! - '-1.19655066E+01VDC,+9627.275SECS,+64993RDNG#\n' + '-1.19655066E+01VDC,+9627.275SECS,+64993RDNG\n' + May have trailing commas and trailing '#' characters Returns ------- [timestamp, voltage] """ - parts = reading.split(",") + parts = reading.strip("#").strip(",").split(",") if len(parts) != 3: raise ValueError(f"Invalid reading: '{reading}'") vdc = float(parts[0][:-3])