use threads

This commit is contained in:
CPD 2025-02-04 16:56:46 +01:00
parent c0bc217d21
commit 7544d5ca08

View File

@ -19,8 +19,9 @@ from os import path, makedirs
import pickle as pkl import pickle as pkl
import json import json
import atexit import atexit
import threading as mt
import multiprocessing as mp import multiprocessing as mp
from multiprocessing.managers import BaseManager # from multiprocessing.managers import BaseManager
import argparse import argparse
@ -52,8 +53,6 @@ from .led_script import LedScript
from .measurement import measure as _measure from .measurement import measure as _measure
from .utility.data import DataCollector from .utility.data import DataCollector
from .utility import data as _data from .utility import data as _data
from .utility.data import load_dataframe, plot_cpd_data from .utility.data import load_dataframe, plot_cpd_data
from .utility import file_io from .utility import file_io
@ -74,14 +73,13 @@ settings = {
test = False test = False
BaseManager.register('LedControlDevice', LedControlDevice)
# global variable for the instrument/client returned by pyvisa/bleak # global variable for the instrument/client returned by pyvisa/bleak
dev: VoltageMeasurementDevice|None = None dev: VoltageMeasurementDevice|None = None
led: LedControlDevice|None = None led: LedControlDevice|None = None
data = DataCollector(data_path=settings["datadir"], data_name="interactive", dirname="interactive_test", dir_exists_is_ok=True) data = DataCollector(data_path=settings["datadir"], data_name="interactive", dirname="interactive_test", dir_exists_is_ok=True)
t0 = 0 t0 = 0
def monitor(script: str|int=0, interval=None, flush_after=None, max_measurements=None, max_points_shown=400): def monitor(script: str|int=0, interval=None, flush_after=None, max_measurements=None, max_points_shown=None):
""" """
Monitor the voltage with matplotlib. Monitor the voltage with matplotlib.
@ -99,77 +97,24 @@ def monitor(script: str|int=0, interval=None, flush_after=None, max_measurements
if not interval: interval = settings["interval"] if not interval: interval = settings["interval"]
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'data.save_csv()' afterwards.") print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'data.save_csv()' afterwards.")
plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown) plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown)
led_script = LedScript(script=script) led_script = LedScript(script=script, auto_update=True, verbose=True)
data.clear() data.clear()
queue = mp.Queue() queue = mp.Queue()
pipe_send, pipe_recv = mp.Pipe() pipe_send, pipe_recv = mp.Pipe()
# TODO: pass instruments # TODO: pass instruments
proc_measure = mp.Process(target=_measure, args=(None, None, led_script, data, interval, flush_after, max_measurements, False, pipe_recv, queue)) proc_measure = mt.Thread(target=_measure, args=(dev, led, led_script, data, interval, flush_after, max_measurements, False, pipe_recv, queue))
proc_measure.start() proc_measure.start()
try: try:
while True: while True:
data = queue.get(block=True, timeout=30) current_data = queue.get(block=True, timeout=30)
i, tval, vval, led_val = data i, tval, vval, led_val = current_data
plt_monitor.update(i, tval, vval, led_val) plt_monitor.update(i, tval, vval, led_val)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
pipe_send.send("stop") pipe_send.send("stop")
proc_measure.join() proc_measure.join()
print(data.metadata)
def measure(interval=None, max_measurements=None): data.save_csv(verbose=True)
"""
Measure voltages
@details:
- Resets the buffers
- Measure voltages
- Waits for the user to press a key
Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision.
You can take the data from the buffer afterwards, using save_csv.
@param max_measurements : maximum number of measurements. None means infinite
"""
global _runtime_vars
if not interval: interval = settings["interval"]
_runtime_vars["last_measurement"] = dtime.now().isoformat()
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
update_func = _update_print
dev.measure(interval=interval, max_measurements=max_measurements, update_func=update_func)
def save_csv():
"""
Saves the contents of nvbuffer1 as .csv
The settings 'datadir' and 'name' are used for determining the filepath:
'datadir/nameXXX.csv', where XXX is the number of files that exist in datadir with the same name.
"""
df = get_dataframe()
filename = settings["datadir"] + "/" + df.basename + ".csv"
df.to_csv(filename, index=False, header=True)
print(f"Saved as '{filename}'")
def save_pickle():
"""
Saves the contents of nvbuffer1 as .pkl
The settings 'datadir' and 'name' are used for determining the filepath:
'datadir/nameXXX.pkl', where XXX is the number of files that exist in datadir with the same name.
"""
df = get_dataframe()
filename = settings["datadir"] + "/" + df.basename + ".pkl"
df.to_pickle(filename)
print(f"Saved as '{filename}'")
def run_script(script_path):
"""
Run a lua script on the Keithley device
@param script_path : relative or absolute path to the .lua script
"""
global k, settings
if test:
print("run_script: Test mode enabled, ignoring call to run_script")
else:
_backend.run_lua(dev, script_path=script_path)
def set(setting, value): def set(setting, value):