# -*- coding: utf-8 -*- """ Created on Fri Jan 24 15:18:31 2025 @author: Matthias Quintern """ from cpdctrl.voltage_measurement_device.base import VoltageMeasurementDevice from cpdctrl.led_control_device.base import LedControlDevice from cpdctrl.led_script import LedScript from cpdctrl.utility.data import DataCollector import time import datetime from queue import Queue def measure( vm_dev: VoltageMeasurementDevice, led_dev: LedControlDevice, led_script: LedScript, data: DataCollector, delta_t: float=0.1, flush_after:int|None=None, use_buffer=False, max_measurements: int=None, stop_on_script_end: bool=False, verbose: bool=False, command_queue: None|Queue=None, data_queue: None|Queue=None ): """ Perform a measurement Parameters ---------- vm_dev : VoltageMeasurementDevice DESCRIPTION. led_dev : LedControlDevice DESCRIPTION. led_script : LedScript DESCRIPTION. data : DataCollector DESCRIPTION. delta_t : float, optional Target interval between measurements and led updates. The default is 0.1. flush_after : int|None, optional If int, flush values to disk after . The default is None. use_buffer : TYPE, optional If True, use the buffer measurement mode. The default is False. max_measurements : int, optional Number of measurements to perform before returning. Note: If use_buffer=True, a few more than max_measurements might be performed The default is None. stop_on_script_end : bool, optional Stop when the script end is reached. verbose : bool, optional If True, print some messages. The default is False. command_queue : None|Connection, optional A queue to receive to commands from. Commands may be: "stop" -> stops the measurement ("led_script", ) a new led script to use The default is None. data_queue : None|Queue, optional A queue to put data in. The default is None. Returns ------- None. """ # old hack when using multiprocessing instead of mulithreading: # devices are not pickleable and thus cant be moved to / shared with the measurement process # if led_dev is None: # led_dev = LEDD1B() # if vm_dev is None: # vm_dev = init("GPIB0::22::INSTR") # if no "time" in metadata, set the current local time in ISO 8601 format # and without microseconds if not "time" in data.metadata: data.metadata["time"] = datetime.datetime.now().replace(microsecond=0).astimezone().isoformat() vm_dev.reset(True) if use_buffer: vm_dev.buffer_measure(delta_t, verbose=True) # allow 0 instead of None if max_measurements == 0: max_measurements = None if flush_after == 0: flush_after = None try: i = 0 led_val = led_script.start() t_iter_start = time.time() while max_measurements is None or i < max_measurements: # 1) read value(s) if use_buffer: try: values = vm_dev.buffer_read_new_values() except ValueError as e: print(f"Error in buffer measurement {i}:", e) values = [] else: values = [vm_dev.read_value()] # print(values) # 2) process value(s) for (tval, vval) in values: if i == 0: t0 = tval tval -= t0 current_data = (i, tval, vval, led_val) data.add_data(*current_data) # 3) write data print(f"n = {i:6d}, t = {tval: .2f} s, U = {vval: .5f} V, LED = {led_val:03}%" + " "*10, end='\r') if flush_after is not None and (i+1) % flush_after == 0: data.flush(verbose=verbose) # if a queue was given, put the data if data_queue is not None: data_queue.put(current_data) i += 1 # if a pipe was given, check for messages if command_queue is not None and command_queue.qsize() > 0: recv = command_queue.get(block=False) if recv == "stop": break elif type(recv) == tuple and recv[0] == "led_script": led_script = recv[1] else: print(f"Received invalid message: '{recv}'") # 4) sleep # substract the execution time from the sleep time for a more # acurate frequency dt_sleep = delta_t - (time.time() - t_iter_start) if dt_sleep > 0: # print(f"Sleeping for {dt_sleep}") time.sleep(dt_sleep) t_iter_start = time.time() # 5) update LED if stop_on_script_end and led_script.is_done(t_iter_start): if verbose: print("Reached script end") break new_led_val = led_script.get_state(t_iter_start) if new_led_val != led_val: try: led_dev.set_level(new_led_val) led_val = new_led_val except Exception as e: print(f"Error setting led to {new_led_val}%:") print(e) except KeyboardInterrupt: pass data.flush(verbose=verbose) led_dev.off()