add buffer measurement mode
This commit is contained in:
parent
4782adbf6a
commit
c5c016399b
@ -79,7 +79,7 @@ led: LedControlDevice|None = None
|
||||
data = DataCollector(data_path=settings["datadir"], data_name="interactive", dirname="interactive_test", dir_exists_is_ok=True)
|
||||
t0 = 0
|
||||
|
||||
def monitor(script: str|int=0, interval=None, flush_after=None, max_measurements=None, max_points_shown=None):
|
||||
def monitor(script: str|int=0, interval=None, flush_after=None, use_buffer=False, max_measurements=None, max_points_shown=None):
|
||||
"""
|
||||
Monitor the voltage with matplotlib.
|
||||
|
||||
@ -99,21 +99,32 @@ def monitor(script: str|int=0, interval=None, flush_after=None, max_measurements
|
||||
plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown)
|
||||
led_script = LedScript(script=script, auto_update=True, verbose=True)
|
||||
data.clear()
|
||||
queue = mp.Queue()
|
||||
pipe_send, pipe_recv = mp.Pipe()
|
||||
# TODO: pass instruments
|
||||
proc_measure = mt.Thread(target=_measure, args=(dev, led, led_script, data, interval, flush_after, max_measurements, False, pipe_recv, queue))
|
||||
data_queue = mp.Queue()
|
||||
command_queue = mp.Queue()
|
||||
# Argument order must match the definition
|
||||
proc_measure = mt.Thread(target=_measure, args=(dev, led, led_script, data,
|
||||
interval,
|
||||
flush_after,
|
||||
use_buffer,
|
||||
max_measurements,
|
||||
False, # verbose
|
||||
command_queue,
|
||||
data_queue
|
||||
))
|
||||
proc_measure.start()
|
||||
try:
|
||||
while True:
|
||||
current_data = queue.get(block=True, timeout=30)
|
||||
i, tval, vval, led_val = current_data
|
||||
plt_monitor.update(i, tval, vval, led_val)
|
||||
while proc_measure.is_alive():
|
||||
while not data_queue.empty():
|
||||
# print(data_queue.qsize(), "\n\n")
|
||||
current_data = data_queue.get(block=False)
|
||||
i, tval, vval, led_val = current_data
|
||||
plt_monitor.update(i, tval, vval, led_val)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
pipe_send.send("stop")
|
||||
command_queue.put("stop")
|
||||
proc_measure.join()
|
||||
print(data.metadata)
|
||||
led_script.stop_updating() # stop watching for file updates (if enabled)
|
||||
data.save_csv(verbose=True)
|
||||
|
||||
|
||||
@ -231,12 +242,8 @@ Enter 'help()' for a list of commands""")
|
||||
|
||||
try:
|
||||
pass
|
||||
# dev = _volt.init("GPIB0::22::INSTR")
|
||||
# TODO
|
||||
# manager = BaseManager()
|
||||
# manager.start()
|
||||
# led = manager._led.LEDD1B()
|
||||
# led = _led.LEDD1B()
|
||||
dev = _volt.init("GPIB0::22::INSTR")
|
||||
led = _led.LEDD1B()
|
||||
except Exception as e:
|
||||
print(e)
|
||||
exit(1)
|
||||
|
@ -2,5 +2,4 @@
|
||||
INIT:CONT OFF
|
||||
' two readings per second
|
||||
TRIGger:SOURce TIMer
|
||||
TRIGger:TIMer 0.5
|
||||
|
||||
TRIGger:TIMer 0.5
|
@ -4,13 +4,21 @@ Created on Fri Jan 24 16:46:06 2025
|
||||
|
||||
@author: CPD
|
||||
"""
|
||||
import os
|
||||
import time
|
||||
import re
|
||||
import numpy as np
|
||||
import watchdog
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.events import LoggingEventHandler, FileSystemEventHandler
|
||||
import os
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
|
||||
|
||||
class InvalidScript(Exception):
|
||||
def __init__(self, lineNr, message, fix=""):
|
||||
self.lineNr = lineNr
|
||||
self.message = message
|
||||
self.fix = fix
|
||||
self.full_message = f"Line {lineNr}: {message} {fix}"
|
||||
super().__init__(self.full_message)
|
||||
|
||||
class LedScriptUpdateHandler(FileSystemEventHandler):
|
||||
def __init__(self, led_script, verbose=False):
|
||||
@ -30,10 +38,10 @@ class LedScriptUpdateHandler(FileSystemEventHandler):
|
||||
|
||||
|
||||
class LedScript:
|
||||
ARRAY_DTYPE = [("dt", "f8"), ("dtsum", "f8"), ("led", "i4"), ("line", "i4")]
|
||||
def __init__(self, script:np.ndarray|str|int=0, auto_update=False, verbose=False):
|
||||
"""
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
script : np.ndarray|str|int
|
||||
@ -45,49 +53,36 @@ class LedScript:
|
||||
The <line> field is the line number if the step in a script and is optional.
|
||||
If str: path to a led script file
|
||||
If int: constant led state value
|
||||
constantValue : TYPE, optional
|
||||
DESCRIPTION. The default is None.
|
||||
auto_update: bool, optional
|
||||
If True and script is a filepath, the script will automatically be reloaded when the file changes
|
||||
|
||||
verbose: bool, optional
|
||||
If True, print messages when important operations occur
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
self.verbose = verbose
|
||||
self.t_start = 0
|
||||
self.auto_update = False
|
||||
self.filepath = None
|
||||
if type(script) == int:
|
||||
self.script = np.array([(0., 0., script)])
|
||||
self.script = np.array([(0., 0., script, 0)], dtype=LedScript.ARRAY_DTYPE)
|
||||
elif type(script) == np.ndarray:
|
||||
self.script = script
|
||||
elif type(script) == str:
|
||||
self.script = LedScript.parse_script(script, ignore_errors=False)
|
||||
self.filepath = script
|
||||
self.auto_update = auto_update
|
||||
self.observer = None
|
||||
if self.auto_update:
|
||||
# event_handler = LoggingEventHandler()
|
||||
event_handler = LedScriptUpdateHandler(self, verbose=True)
|
||||
self.observer = Observer()
|
||||
dirname = os.path.dirname(os.path.abspath(self.filepath)) # directory of the file
|
||||
self.observer.schedule(event_handler, dirname)
|
||||
self.observer.start()
|
||||
if verbose: print(f"Led script is watching for updates on '{self.filepath}'")
|
||||
else:
|
||||
self.observer = None
|
||||
self.start_updating()
|
||||
self.current_dt = 0
|
||||
assert(self.script.shape[0] > 0)
|
||||
|
||||
def __del__(self):
|
||||
self.stop()
|
||||
|
||||
def stop(self):
|
||||
print("Led script stopped watching for updates")
|
||||
if self.observer is not None:
|
||||
self.observer.stop()
|
||||
self.observer.join()
|
||||
|
||||
self.stop_updating()
|
||||
|
||||
|
||||
def start(self) -> int:
|
||||
"""
|
||||
@ -141,16 +136,71 @@ class LedScript:
|
||||
return int(self.script["led"][idx])
|
||||
|
||||
@staticmethod
|
||||
def _get_current_index(script, dt:float):
|
||||
def _get_current_index(script, dt:float) -> int:
|
||||
if script.shape[0] == 1:
|
||||
return 0
|
||||
distance = script["dtsum"] - dt
|
||||
idx = np.where(distance >= 0, distance, np.inf).argmin()
|
||||
return idx
|
||||
|
||||
def get_current_index(self, dt:float):
|
||||
return LedScript._get_current_index(self.script, dt)
|
||||
def get_current_index(self, dt:float) -> int:
|
||||
"""
|
||||
Get the index into self.script at `dt`
|
||||
|
||||
Parameters
|
||||
----------
|
||||
dt : float
|
||||
Time relative to script start.
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
Index into self.script at relative time dt.
|
||||
"""
|
||||
return LedScript._get_current_index(self.script, dt)
|
||||
|
||||
def start_updating(self):
|
||||
"""
|
||||
Start watching for updates to the script file.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the LedScript object was initialized with a filepath or if already watching for script updates.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
if self.observer is not None:
|
||||
raise ValueError("Already watching for updates")
|
||||
if self.filepath is None:
|
||||
raise ValueError("Can not watch for updates if the LedScript was not initialized with a file path")
|
||||
# event_handler = LoggingEventHandler()
|
||||
event_handler = LedScriptUpdateHandler(self) #, verbose=True)
|
||||
self.observer = Observer()
|
||||
dirname = os.path.dirname(os.path.abspath(self.filepath)) # directory of the file
|
||||
self.observer.schedule(event_handler, dirname)
|
||||
self.observer.start()
|
||||
if self.verbose: print(f"Led script is watching for updates on '{self.filepath}'")
|
||||
|
||||
|
||||
def stop_updating(self):
|
||||
"""
|
||||
Stop watching for updates to the script file.
|
||||
Does nothing if not currently watching for file updates.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
"""
|
||||
if self.observer is not None:
|
||||
self.observer.stop()
|
||||
self.observer.join()
|
||||
self.observer = None
|
||||
if self.verbose: print("Led script stopped watching for updates")
|
||||
|
||||
def update(self, verbose=True):
|
||||
print(f"Updating led script from '{self.filepath}'")
|
||||
newscript = LedScript.parse_script(self.filepath, ignore_errors=False)
|
||||
@ -167,7 +217,31 @@ class LedScript:
|
||||
self.script = newscript
|
||||
|
||||
@staticmethod
|
||||
def parse_script(filepath, ignore_errors=False):
|
||||
def parse_script(filepath: str, ignore_errors:bool=False) -> np.ndarray|tuple[np.ndarray, list[InvalidScript]]:
|
||||
"""
|
||||
Parse a led script from a file into a structured array
|
||||
|
||||
Parameters
|
||||
----------
|
||||
filepath : str
|
||||
Path to the led script file.
|
||||
ignore_errors : bool, optional
|
||||
If True, does not throw an exception upon script errors.
|
||||
Instead, ignores errornous lines and additionally returns all exceptions in a list.
|
||||
The default is False.
|
||||
|
||||
Raises
|
||||
------
|
||||
InvalidScript
|
||||
If encountering invalid lines in the script.
|
||||
|
||||
Returns
|
||||
-------
|
||||
np.ndarray or tuple[np.ndarray, list[InvalidScript]]
|
||||
Returns the script as structured array. For the format, see the
|
||||
docstring of the LedString constructor.
|
||||
If ignore_errors=True, additionally returns a list of all errors.
|
||||
"""
|
||||
with open(filepath, "r") as file:
|
||||
lines = file.readlines()
|
||||
|
||||
@ -255,15 +329,8 @@ class LedScript:
|
||||
cum_duration = states[-1][1] + duration
|
||||
# 6) append
|
||||
states.append((duration, cum_duration, state, i+1))
|
||||
states = np.array(states, dtype=[("dt", "f8"), ("dtsum", "f8"), ("led", "i4"), ("line", "i4")])
|
||||
states = np.array(states, dtype=LedScript.ARRAY_DTYPE)
|
||||
if ignore_errors:
|
||||
return states, errors
|
||||
return states
|
||||
|
||||
class InvalidScript(Exception):
|
||||
def __init__(self, lineNr, message, fix=""):
|
||||
self.lineNr = lineNr
|
||||
self.message = message
|
||||
self.fix = fix
|
||||
self.full_message = f"Line {lineNr}: {message} {fix}"
|
||||
super().__init__(self.full_message)
|
@ -23,52 +23,108 @@ def measure(
|
||||
led_script: LedScript,
|
||||
data: DataCollector,
|
||||
delta_t: float=0.1,
|
||||
flush_after:int|None=None,
|
||||
flush_after:int|None=None,
|
||||
use_buffer=False,
|
||||
max_measurements: int=None,
|
||||
verbose: bool=False,
|
||||
pipe: None|Connection=None,
|
||||
queue: None|Queue=None
|
||||
command_queue: None|Queue=None,
|
||||
data_queue: None|Queue=None
|
||||
):
|
||||
# TODO: find a way to move inherited objects into a process
|
||||
if led_dev is None:
|
||||
led_dev = LEDD1B()
|
||||
if vm_dev is None:
|
||||
vm_dev = init("GPIB0::22::INSTR")
|
||||
"""
|
||||
Perform a measurement
|
||||
|
||||
Parameters
|
||||
----------
|
||||
vm_dev : VoltageMeasurementDevice
|
||||
DESCRIPTION.
|
||||
led_dev : LedControlDevice
|
||||
DESCRIPTION.
|
||||
led_script : LedScript
|
||||
DESCRIPTION.
|
||||
data : DataCollector
|
||||
DESCRIPTION.
|
||||
delta_t : float, optional
|
||||
Target interval between measurements and led updates. The default is 0.1.
|
||||
flush_after : int|None, optional
|
||||
If int, flush values to disk after <flush_after>. The default is None.
|
||||
use_buffer : TYPE, optional
|
||||
If True, use the buffer measurement mode. The default is False.
|
||||
max_measurements : int, optional
|
||||
Number of measurements to perform before returning.
|
||||
Note: If use_buffer=True, a few more than max_measurements might be performed
|
||||
The default is None.
|
||||
verbose : bool, optional
|
||||
If True, print some messages. The default is False.
|
||||
command_queue : None|Connection, optional
|
||||
A queue to receive to commands from.
|
||||
Commands may be:
|
||||
"stop" -> stops the measurement
|
||||
("led_script", <LedScript object>) a new led script to use
|
||||
The default is None.
|
||||
data_queue : None|Queue, optional
|
||||
A queue to put data in. The default is None.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
# old hack when using multiprocessing instead of mulithreading:
|
||||
# devices are not pickleable and thus cant be moved to / shared with the measurement process
|
||||
# if led_dev is None:
|
||||
# led_dev = LEDD1B()
|
||||
# if vm_dev is None:
|
||||
# vm_dev = init("GPIB0::22::INSTR")
|
||||
# if no "time" in metadata, set the current local time in ISO 8601 format
|
||||
# and without microseconds
|
||||
if not "time" in data.metadata:
|
||||
data.metadata["time"] = datetime.datetime.now().replace(microsecond=0).isoformat()
|
||||
data.metadata["test"] = "TEST"
|
||||
vm_dev.reset(True)
|
||||
if use_buffer:
|
||||
vm_dev.buffer_measure(delta_t, verbose=True)
|
||||
try:
|
||||
i = 0
|
||||
led_val = led_script.start()
|
||||
t_iter_start = time.time()
|
||||
while max_measurements is None or i < max_measurements:
|
||||
# 1) read value
|
||||
tval, vval = vm_dev.read_value()
|
||||
if i == 0:
|
||||
t0 = tval
|
||||
tval -= t0
|
||||
current_data = (i, tval, vval, led_val)
|
||||
data.add_data(*current_data)
|
||||
# 2) write data
|
||||
print(f"n = {i:6d}, t = {tval: .2f} s, U = {vval: .5f} V, LED = {led_val:03}%" + " "*10, end='\r')
|
||||
if flush_after is not None and (i+1) % flush_after == 0:
|
||||
data.flush(verbose=verbose)
|
||||
# if a queue was given, put the data
|
||||
if queue is not None:
|
||||
queue.put(current_data)
|
||||
# 1) read value(s)
|
||||
if use_buffer:
|
||||
try:
|
||||
values = vm_dev.buffer_read_new_values()
|
||||
except ValueError as e:
|
||||
print(f"Error in buffer measurement {i}:", e)
|
||||
values = []
|
||||
else:
|
||||
values = [vm_dev.read_value()]
|
||||
# print(values)
|
||||
# 2) process value(s)
|
||||
for (tval, vval) in values:
|
||||
if i == 0:
|
||||
t0 = tval
|
||||
tval -= t0
|
||||
current_data = (i, tval, vval, led_val)
|
||||
data.add_data(*current_data)
|
||||
# 3) write data
|
||||
print(f"n = {i:6d}, t = {tval: .2f} s, U = {vval: .5f} V, LED = {led_val:03}%" + " "*10, end='\r')
|
||||
if flush_after is not None and (i+1) % flush_after == 0:
|
||||
data.flush(verbose=verbose)
|
||||
# if a queue was given, put the data
|
||||
if data_queue is not None:
|
||||
data_queue.put(current_data)
|
||||
i += 1
|
||||
|
||||
# if a pipe was given, check for messages
|
||||
if pipe is not None and pipe.poll(0):
|
||||
recv = pipe.recv()
|
||||
if command_queue is not None and command_queue.qsize() > 0:
|
||||
recv = command_queue.get(block=False)
|
||||
if recv == "stop":
|
||||
break
|
||||
elif type(recv) == tuple and recv[0] == "led_script":
|
||||
led_script = recv[1]
|
||||
else:
|
||||
print(f"Received invalid message: '{recv}'")
|
||||
# 3) sleep
|
||||
|
||||
# 4) sleep
|
||||
# substract the execution time from the sleep time for a more
|
||||
# acurate frequency
|
||||
dt_sleep = delta_t - (time.time() - t_iter_start)
|
||||
@ -76,7 +132,7 @@ def measure(
|
||||
# print(f"Sleeping for {dt_sleep}")
|
||||
time.sleep(dt_sleep)
|
||||
t_iter_start = time.time()
|
||||
# 4) update LED
|
||||
# 5) update LED
|
||||
new_led_val = led_script.get_state()
|
||||
if new_led_val != led_val:
|
||||
try:
|
||||
@ -85,10 +141,12 @@ def measure(
|
||||
except Exception as e:
|
||||
print(f"Error setting led to {new_led_val}%:")
|
||||
print(e)
|
||||
i += 1
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
data.flush(verbose=verbose)
|
||||
led_dev.off()
|
||||
print(data.metadata)
|
||||
print("Measurement stopped" + " "*50)
|
||||
print("Measurement stopped" + " "*50)
|
||||
|
||||
|
||||
|
@ -52,6 +52,7 @@ class Keithley2700(VoltageMeasurementDevice):
|
||||
self.instr = instr
|
||||
if check_front_switch:
|
||||
self._check_front_input_selected()
|
||||
self.buffer_next_idx = None
|
||||
|
||||
def __del__(self):
|
||||
"""Properly close the instrument connection"""
|
||||
@ -75,6 +76,17 @@ class Keithley2700(VoltageMeasurementDevice):
|
||||
raise Exception("The Keithley's INPUT switch must select the [F]ront inputs")
|
||||
return switch
|
||||
|
||||
|
||||
def query(self, query):
|
||||
try:
|
||||
return self.instr.query(query).strip("\n")
|
||||
except pyvisa.VisaIOError as e:
|
||||
print(f"VisaIOError raised during query: '{query}'")
|
||||
raise e
|
||||
|
||||
def query_int(self, query):
|
||||
return int(float(self.query(query)))
|
||||
|
||||
# RUN COMMANDS ON THE DEVICE
|
||||
def run(self, code, verbose=False):
|
||||
"""
|
||||
@ -85,11 +97,19 @@ class Keithley2700(VoltageMeasurementDevice):
|
||||
code : str
|
||||
SCPI commands
|
||||
"""
|
||||
script = '\n'.join([l.strip(" ") for l in code.strip(" ").strip("\n").split("\n") if len(l) > 0 and l[0] not in "#'"])
|
||||
script = ''
|
||||
for line in code.strip(" ").split("\n"):
|
||||
l = line.strip(" ")
|
||||
if len(l) == 0 or l[0] in "#'": continue
|
||||
script += l + "\n"
|
||||
if verbose:
|
||||
print(f"Running code:\n{script}")
|
||||
self.instr.write(script)
|
||||
|
||||
try:
|
||||
self.instr.write(script)
|
||||
except pyvisa.VisaIOError as e:
|
||||
print(f"VisaIOError raised while writing command(s):\n'{script}'\n")
|
||||
raise e
|
||||
|
||||
def run_script(self, script_path, verbose=False):
|
||||
"""
|
||||
Load the code from script_path and run it via self.run
|
||||
@ -118,42 +138,101 @@ class Keithley2700(VoltageMeasurementDevice):
|
||||
|
||||
def reset(self, verbose=False):
|
||||
"""
|
||||
Reset smua and its buffers
|
||||
@param instr : pyvisa instrument
|
||||
Reset the device
|
||||
"""
|
||||
self.run_script(scripts["instrument_reset"], verbose=verbose)
|
||||
self.buffer_reset()
|
||||
|
||||
reset_script = """
|
||||
VOLT:DC:RANGe:AUTO ON
|
||||
' DC Voltage measurement
|
||||
SENSe:FUNC 'VOLT:DC'
|
||||
' Set voltage divider if required
|
||||
' SENSE:VOLT:DC:IDIVider OFF
|
||||
' Disable continuous initiation
|
||||
INIT:CONT OFF
|
||||
' Disable Buffer and trigger
|
||||
TRACe:FEED NONE
|
||||
TRACe:FEED:CONTrol NEVer
|
||||
TRIGger:SOURce IMMediate
|
||||
' Set timestamp format to relative
|
||||
SYSTem:TSTamp:TYPE RELative
|
||||
"""
|
||||
self.run(reset_script)
|
||||
# self.run_script(scripts["instrument_reset"], verbose=verbose)
|
||||
self.buffer_clear()
|
||||
|
||||
|
||||
# INTERACT WITH DEVICE BUFFER
|
||||
# might not be needed
|
||||
def buffer_reset(self):
|
||||
buffer_reset = """
|
||||
def buffer_clear(self):
|
||||
buffer_reset_script = """
|
||||
TRACe:CLEar
|
||||
TRACe:CLEar:AUTO ON
|
||||
SYSTem:TSTamp:TYPE RELative
|
||||
"""
|
||||
self.run(buffer_reset)
|
||||
self.run(buffer_reset_script)
|
||||
|
||||
def buffer_get_size(self, buffer_nr=1):
|
||||
n = self.instr.query("TRACe:POINts?").strip("\n")
|
||||
return int(float(n))
|
||||
def buffer_get_size(self):
|
||||
return self.query_int("TRACe:POINts?")
|
||||
|
||||
def buffer_set_size(self, s):
|
||||
if not type(s) == int or s < 2 or s > 55000:
|
||||
raise ValueError(f"Invalid buffer size: {s}. Must be int and between 2 and 55000")
|
||||
self.instr.write(f"TRACe:POINts {s}")
|
||||
|
||||
def buffer_measure(self, interval: float=0.5, verbose=False):
|
||||
if interval < 0.001 or interval > 999999.999:
|
||||
raise ValueError("Interval must be between 0.001 and 999999.999")
|
||||
self.run(f"""
|
||||
TRIGger:TIMer {interval}
|
||||
TRIGger:COUNt INFinity
|
||||
TRIGger:SOURce TIMer
|
||||
""")
|
||||
self.buffer_clear()
|
||||
self.run("""
|
||||
TRACe:FEED SENSe
|
||||
' write continuously
|
||||
TRACe:FEED:CONTrol ALWays
|
||||
INITiate:CONTinuous ON
|
||||
""")
|
||||
self.buffer_next_idx = 0
|
||||
self.buffer_size = self.buffer_get_size()
|
||||
if verbose: print("Started buffer measurement")
|
||||
|
||||
def buffer_read_new_values(self):
|
||||
if self.buffer_next_idx is None:
|
||||
raise ValueError("You must first start a buffer measurement by calling buffer_measure()")
|
||||
# TRACe:NEXT? returns the next index that will be written to
|
||||
new_next_idx = self.query_int("TRACe:NEXT?")
|
||||
if new_next_idx == self.buffer_next_idx:
|
||||
raise ValueError(f"No new value or buffer has been filled completely (next reading at {self.buffer_next_idx})")
|
||||
# if reached end of buffer, first read to end and then from beginning
|
||||
vals = ""
|
||||
if new_next_idx < self.buffer_next_idx:
|
||||
count = self.buffer_size - self.buffer_next_idx
|
||||
# print(f"start={self.buffer_next_idx}, stop={new_next_idx}, count={count}")
|
||||
vals += self.query(f"TRACe:DATA:SELected? {self.buffer_next_idx}, {count}").strip("\n")
|
||||
self.buffer_next_idx = 0
|
||||
count = new_next_idx - self.buffer_next_idx
|
||||
if count > 0:
|
||||
# print(f"start={self.buffer_next_idx}, stop={new_next_idx}, count={count}")
|
||||
vals += self.query(f"TRACe:DATA:SELected? {self.buffer_next_idx}, {count}").strip("\n")
|
||||
self.buffer_next_idx = new_next_idx
|
||||
processed_vals = [self.process_reading(val) for val in vals.strip("#").split("#")]
|
||||
return processed_vals
|
||||
|
||||
|
||||
# MEASUREMENT
|
||||
def process_reading(self, reading: str):
|
||||
@staticmethod
|
||||
def process_reading(reading: str):
|
||||
"""
|
||||
process a reading. Only works with VDC and relative time stamps right now!
|
||||
'-1.19655066E+01VDC,+9627.275SECS,+64993RDNG#\n'
|
||||
'-1.19655066E+01VDC,+9627.275SECS,+64993RDNG\n'
|
||||
May have trailing commas and trailing '#' characters
|
||||
|
||||
Returns
|
||||
-------
|
||||
[timestamp, voltage]
|
||||
"""
|
||||
parts = reading.split(",")
|
||||
parts = reading.strip("#").strip(",").split(",")
|
||||
if len(parts) != 3:
|
||||
raise ValueError(f"Invalid reading: '{reading}'")
|
||||
vdc = float(parts[0][:-3])
|
||||
|
Loading…
x
Reference in New Issue
Block a user