360 lines
13 KiB
Python
360 lines
13 KiB
Python
"""
|
|
run this before using this library:
|
|
ipython -i cpdctrl_interactive.py
|
|
|
|
always records iv-t curves
|
|
i-data -> smua.nvbuffer1
|
|
v-data -> smua.nvbuffer2
|
|
"""
|
|
version = "0.1"
|
|
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import pandas as pd
|
|
|
|
from datetime import datetime as dtime
|
|
from sys import exit
|
|
from time import sleep
|
|
from os import path, makedirs
|
|
import pickle as pkl
|
|
import json
|
|
import atexit
|
|
import threading as mt
|
|
import multiprocessing as mp
|
|
# from multiprocessing.managers import BaseManager
|
|
|
|
import argparse
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
if __package__ is None:
|
|
# make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change
|
|
__package__ = "cpdctrl"
|
|
from os import path
|
|
filepath = path.realpath(path.abspath(__file__))
|
|
sys.path.insert(0, path.dirname(path.dirname(filepath)))
|
|
|
|
|
|
from . import led_control_device
|
|
from . import voltage_measurement_device
|
|
from .voltage_measurement_device.base import VoltageMeasurementDevice
|
|
from .voltage_measurement_device.impl import keithley2700 as _volt
|
|
from .led_control_device.base import LedControlDevice
|
|
from .led_control_device.impl import thorlabs_ledd1b as _led
|
|
from .led_script import LedScript
|
|
|
|
from .measurement import measure as _measure
|
|
from .utility.data import DataCollector
|
|
from .utility.data import plot_cpd_data as data_plot
|
|
from .utility.config_file import ConfigFile
|
|
from .utility import file_io
|
|
from .utility.device_select import select_device_interactive
|
|
from .update_funcs import _Monitor, _update_print
|
|
|
|
import logging
|
|
log = logging.getLogger(__name__)
|
|
|
|
# CONFIGURATION
|
|
_runtime_vars = {
|
|
"last-measurement": ""
|
|
}
|
|
# defaults, these may be overridden by a config file
|
|
settings = {
|
|
"datadir": path.expanduser("~/data"),
|
|
"name": "interactive-test",
|
|
"led": "unknown",
|
|
"interval": 0.5,
|
|
"flush_after": 3000,
|
|
"use_buffer": False,
|
|
}
|
|
|
|
cfilename: str = "cpdctrl.yaml"
|
|
config_path: str = ""
|
|
config_file: ConfigFile = ConfigFile("")
|
|
|
|
test = False
|
|
|
|
# DEVICES
|
|
# global variable for the instrument/client returned by pyvisa/bleak
|
|
dev: VoltageMeasurementDevice|None = None
|
|
led: LedControlDevice|None = None
|
|
data_collector = DataCollector(data_path=settings["datadir"], data_name="interactive", dirname="interactive_test", add_number_if_dir_exists=True)
|
|
t0 = 0
|
|
data = None
|
|
md = None
|
|
|
|
def monitor(script: str|int=0, interval: float|None=None, metadata:dict={}, flush_after: int|None=None, use_buffer: bool|None=None, max_measurements=None, stop_on_script_end: bool=False, max_points_shown=None):
|
|
"""
|
|
Monitor the voltage with matplotlib.
|
|
- Opens a matplotlib window and takes measurements depending on settings["interval"]
|
|
- Waits for the user to press a key
|
|
|
|
If use_buffer=False, uses python's time.sleep() for waiting the interval, which is not very precise.
|
|
With use_buffer=True, the timing of the voltage data readings will be very precise, however,
|
|
the led updates may deviate up to <interval>.
|
|
|
|
The data is automatically saved to "<date>_<time>_<name>" in the data directory.
|
|
|
|
Parameters
|
|
----------
|
|
script : str|int
|
|
Path to a led script file, or a constant value between 0 and 100 for the LED.
|
|
interval : float|None
|
|
Time between measurements.
|
|
If None, the value is taken from the settings.
|
|
metadata : dict
|
|
Metadata to append to the data header.
|
|
The set interval as well as the setting for 'name' and 'led' are automatically added.
|
|
flush_after : int|None
|
|
Flush the data to disk after <flush_after> readings
|
|
If None, the value is taken from the settings.
|
|
use_buffer : bool
|
|
If True, use the voltage measurement device's internal buffer for readings, which leads to more accurate timings.
|
|
If None, the value is taken from the settings.
|
|
max_points_shown : int|None
|
|
how many points should be shown at once. None means infinite
|
|
max_measurements : int|None
|
|
maximum number of measurements. None means infinite
|
|
stop_on_script_end : bool, optional
|
|
Stop measurement when the script end is reached
|
|
"""
|
|
global _runtime_vars, data_collector, dev, led
|
|
global data, md
|
|
_runtime_vars["last_measurement"] = dtime.now().isoformat()
|
|
if interval is None: interval = settings["interval"]
|
|
if flush_after is None: flush_after = settings["flush_after"]
|
|
if use_buffer is None: use_buffer = settings["use_buffer"]
|
|
# set metadata
|
|
metadata["interval"] = str(interval)
|
|
metadata["name"] = settings["name"]
|
|
metadata["led"] = settings["led"]
|
|
metadata["led_script"] = str(script)
|
|
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'data.save_csv()' afterwards.")
|
|
plt.ion()
|
|
plt_monitor = _Monitor(use_print=False, max_points_shown=max_points_shown)
|
|
led_script = LedScript(script=script, auto_update=True, verbose=True)
|
|
data_collector = DataCollector(metadata=metadata, data_path=settings["datadir"], data_name=settings["name"])
|
|
# data_collector.clear()
|
|
data_queue = mp.Queue()
|
|
command_queue = mp.Queue()
|
|
# Argument order must match the definition
|
|
proc_measure = mt.Thread(target=_measure, args=(dev,
|
|
led,
|
|
led_script,
|
|
data_collector,
|
|
interval,
|
|
flush_after,
|
|
use_buffer,
|
|
max_measurements,
|
|
stop_on_script_end,
|
|
False, # verbose
|
|
command_queue,
|
|
data_queue
|
|
))
|
|
proc_measure.start()
|
|
try:
|
|
while proc_measure.is_alive():
|
|
while not data_queue.empty():
|
|
# print(data_queue.qsize(), "\n\n")
|
|
current_data = data_queue.get(block=False)
|
|
i, tval, vval, led_val = current_data
|
|
plt_monitor.update(i, tval, vval, led_val)
|
|
|
|
except KeyboardInterrupt:
|
|
pass
|
|
command_queue.put("stop")
|
|
proc_measure.join()
|
|
print("Measurement stopped" + " "*50)
|
|
led_script.stop_updating() # stop watching for file updates (if enabled)
|
|
data_collector.save_csv(verbose=True)
|
|
data, metadata = data_collector.get_data()
|
|
fig = data_plot(data, CPD=True, LED=True)
|
|
plt.ioff()
|
|
fig_path = path.join(data_collector.path, data_collector.dirname + ".pdf")
|
|
fig.savefig(fig_path)
|
|
|
|
|
|
# DATA
|
|
def data_load(dirname:str) -> tuple[np.ndarray, dict]:
|
|
"""
|
|
Load data in directory <dirname> in the data directory as numpy array.
|
|
Sets the `data` and `md` variables
|
|
|
|
Parameters
|
|
----------
|
|
dirname : str
|
|
Absolute path to the directory containing the measurement or directory name in the data directory.
|
|
"""
|
|
global data, md
|
|
if path.isabs(dirname):
|
|
dirpath = dirname
|
|
else:
|
|
dirpath = path.join(settings["datadir"], dirname)
|
|
data, md = DataCollector.load_data_from_dir(dirpath, verbose=True)
|
|
|
|
# SETTINGS
|
|
def set(setting, value):
|
|
global settings, config_path
|
|
if setting in settings:
|
|
if type(value) != type(settings[setting]):
|
|
print(f"set: setting '{setting}' currently holds a value of type '{type(settings[setting])}'")
|
|
return
|
|
settings[setting] = value
|
|
config_file.set(setting, value)
|
|
|
|
def name(s:str):
|
|
global settings
|
|
settings["name"] = s
|
|
|
|
def save_settings():
|
|
global settings
|
|
config_file.set_values(settings)
|
|
config_file.save()
|
|
|
|
def load_settings():
|
|
global settings, config_path
|
|
settings = config_file.get_values()
|
|
settings["datadir"] = path.expanduser(settings["datadir"]) # replace ~
|
|
|
|
def help(topic=None):
|
|
if topic == None:
|
|
print("""
|
|
Functions:
|
|
monitor - take measurements with live monitoring in a matplotlib window
|
|
data_load - load data from a directory
|
|
data_plot - plot a data array
|
|
Run 'help(function)' to see more information on a function
|
|
|
|
Available topics:
|
|
imports
|
|
settings
|
|
Run 'help("topic")' to see more information on a topic""")
|
|
|
|
elif topic in [settings, "settings"]:
|
|
print(f"""Settings:
|
|
name: str - name of the measurement, determines filename
|
|
led: str - name/model of the LED that is being used
|
|
datadir: str - output directory for the csv files
|
|
interval: int - interval (inverse frequency) of the measurements, in seconds
|
|
beep: bool - whether the device should beep or not
|
|
|
|
|
|
Functions:
|
|
name("<name>") - short for set("name", "<name>")
|
|
set("setting", value) - set a setting to a value
|
|
save_settings() - store the settings as "cpdctrl.json" in the working directory
|
|
load_settings() - load settings from a file
|
|
|
|
Upon startup, settings are loaded from the config file.
|
|
The global variable 'config_path' determines the path used by save/load_settings. Use -c '<path>' to set another path.
|
|
The search path is:
|
|
<working-dir>/{cfilename}
|
|
$XDG_CONFIG_HOME/{cfilename}
|
|
~/.config/cpdctrl/{cfilename}
|
|
|
|
The current file path is:
|
|
{config_path}
|
|
""")
|
|
elif topic == "imports":
|
|
print("""Imports:
|
|
numpy as np
|
|
pandas as pd
|
|
matplotlib.pyplot as plt
|
|
os.path """)
|
|
else:
|
|
print(topic.__doc__.strip(" ").strip("\n"))
|
|
|
|
|
|
def init():
|
|
global dev, led, settings, config_path, config_file
|
|
print(r""" .___ __ .__
|
|
____ ______ __| _/_____/ |________| |
|
|
_/ ___\\____ \ / __ |/ ___\ __\_ __ \ |
|
|
\ \___| |_> > /_/ \ \___| | | | \/ |__
|
|
\___ > __/\____ |\___ >__| |__| |____/
|
|
\/|__| \/ \/ """ + f"""{version}
|
|
Interactive Shell for CPD measurements with Keithley 2700B
|
|
---
|
|
Enter 'help()' for a list of commands""")
|
|
parser = argparse.ArgumentParser(
|
|
prog="cpdctrl",
|
|
description="measure voltage using a Keithley SMU",
|
|
)
|
|
backend_group = parser.add_mutually_exclusive_group(required=False)
|
|
parser.add_argument("-c", "--config", action="store", help="alternate path to config file")
|
|
args = vars(parser.parse_args())
|
|
from os import environ
|
|
|
|
# Load config file
|
|
if path.isfile(cfilename):
|
|
config_path = cfilename
|
|
elif 'XDG_CONFIG_HOME' in environ.keys():
|
|
# and path.isfile(environ["XDG_CONFIG_HOME"] + "/cpdctrl.json"):
|
|
config_path = path.join(environ["XDG_CONFIG_HOME"], "cpdctrl", cfilename)
|
|
else:
|
|
config_path = path.join(path.expanduser("~/.config/cpdctrl"), cfilename)
|
|
if args["config"]:
|
|
config_path = args["config"]
|
|
|
|
config_file = ConfigFile(config_path, init_values=settings)
|
|
load_settings()
|
|
|
|
# setup logging
|
|
log_path = path.expanduser(config_file.get_or("path_log", "~/.cache/cpdctrl-interactive.log"))
|
|
makedirs(path.dirname(log_path), exist_ok=True)
|
|
logging.basicConfig(
|
|
level=logging.WARN,
|
|
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
|
|
handlers=[
|
|
logging.FileHandler(log_path),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
|
|
if not path.isdir(settings["datadir"]):
|
|
makedirs(settings["datadir"])
|
|
|
|
# init the devices
|
|
last_vm_type = config_file.get_or("last_dev_vm_type", None)
|
|
last_vm_name = config_file.get_or("last_dev_vm_name", None)
|
|
if last_vm_name and last_vm_type:
|
|
try:
|
|
dev = voltage_measurement_device.connect_device(last_vm_type, last_vm_name)
|
|
except:
|
|
log.error(f"Failed to connect to last used device {last_vm_type}::{last_vm_name}")
|
|
while dev is None:
|
|
devs = voltage_measurement_device.list_devices()
|
|
print("-" * 50)
|
|
vm_dev_type, vm_dev_name = select_device_interactive(devs, "Select voltage measurement device: ")
|
|
try:
|
|
dev = voltage_measurement_device.connect_device(vm_dev_type, vm_dev_name)
|
|
except:
|
|
log.error(f"Failed to connect to device {vm_dev_type}::{vm_dev_name}")
|
|
config_file.set("last_dev_vm_type", vm_dev_type)
|
|
config_file.set("last_dev_vm_name", vm_dev_name)
|
|
|
|
# init the devices
|
|
last_led_type = config_file.get_or("last_dev_led_type", None)
|
|
last_led_name = config_file.get_or("last_dev_led_name", None)
|
|
if last_led_name and last_led_type:
|
|
try:
|
|
led = led_control_device.connect_device(last_led_type, last_led_name)
|
|
except:
|
|
log.error(f"Failed to connect to last used device {last_led_type}::{last_led_name}")
|
|
while led is None:
|
|
devs = led_control_device.list_devices()
|
|
print("-" * 50)
|
|
led_dev_type, led_dev_name = select_device_interactive(devs, "Select LED control device: ")
|
|
try:
|
|
led = led_control_device.connect_device(led_dev_type, led_dev_name)
|
|
except:
|
|
log.error(f"Failed to connect to device {led_dev_type}::{led_dev_name}")
|
|
config_file.set("last_dev_led_type", led_dev_type)
|
|
config_file.set("last_dev_led_name", led_dev_name)
|
|
|
|
# atexit.register(_backend.exit, dev)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
init()
|