use interfaces

This commit is contained in:
CPD 2025-01-21 18:43:07 +01:00
parent a32f9e0183
commit c0158e2478
11 changed files with 236 additions and 201 deletions

View File

@ -46,7 +46,6 @@ void loop() {
default:
blink_internal_led(5);
break;
}
}
void blink_internal_led(unsigned N) {

View File

@ -6,6 +6,7 @@ always records iv-t curves
i-data -> smua.nvbuffer1
v-data -> smua.nvbuffer2
"""
version = "0.1"
import numpy as np
import matplotlib.pyplot as plt
@ -21,7 +22,6 @@ import atexit
import argparse
if __name__ == "__main__":
import sys
if __package__ is None:
@ -34,35 +34,26 @@ if __name__ == "__main__":
prog="cpdctrl",
description="measure voltage using a Keithley SMU",
)
backend_group = parser.add_mutually_exclusive_group(required=True)
backend_group = parser.add_mutually_exclusive_group(required=False)
backend_group.add_argument("-k", "--keithley", action="store_true")
backend_group.add_argument("-t", "--testing", action='store_true')
# backend_group.add_argument("-t", "--testing", action='store_true')
parser.add_argument("-c", "--config", action="store", help="alternate path to config file")
args = vars(parser.parse_args())
i = 1
while i < len(sys.argv):
if args["keithley"]:
import cpdctrl.backends.keithley.keithley as _backend
# import cpdctrl.backends.keithley.measure as _measure
elif args["testing"]:
import cpdctrl.backends.testing.testing as _backend
import cpdctrl.backends.testing.measure as _measure
elif sys.argv[i] in ["-c", "--config"]:
if i+1 < len(sys.argv):
config_path = sys.argv[i+1]
else:
print("-c requires an extra argument: path of config file")
i += 1
i += 1
from .voltage_measurement.base import VoltageMeasurementDevice as VDev
from .voltage_measurement.impl import keithley2700 as _volt
from .led_control.impl import thorlabs_ledd1b as _led
from .utility.data import DataCollector
from cpdctrl.utility import data as _data
from cpdctrl.utility.data import load_dataframe
from cpdctrl.utility import file_io
from cpdctrl.update_funcs import _Monitor, _update_print
config_path = path.expanduser("~/.config/m-teng.json")
from .utility import data as _data
from .utility.data import load_dataframe
from .utility import file_io
from .update_funcs import _Monitor, _update_print
config_path = path.expanduser("~/.config/cpdctrl.json")
_runtime_vars = {
"last-measurement": ""
@ -78,63 +69,9 @@ settings = {
test = False
# global variable for the instrument/client returned by pyvisa/bleak
dev = None
def monitor_count(count=5000, interval=None, max_points_shown=160):
"""
Take <count> measurements in <interval> and monitor live with matplotlib.
@details:
- Resets the buffers
- Opens a matplotlib window and takes measurements depending on settings["interval"]
Uses the device internal overlappedY measurement method, which allows for greater precision
You can take the data from the buffer afterwards, using save_csv
@param count: count
@param interval: interval, defaults to settings["interval"]
@param max_points_shown: how many points should be shown at once. None means infinite
"""
if not interval: interval = settings["interval"]
plt_monitor = _Monitor(max_points_shown, use_print=True)
update_func = plt_monitor.update
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
try:
_measure.measure_count(dev, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05)
except KeyboardInterrupt:
if args["keithley"]:
dev.write(f"smua.source.output = smua.OUTPUT_OFF")
print("Monitoring cancelled, measurement might still continue" + " "*50)
else:
print("Measurement finished" + " "*50)
def measure_count(count=5000, interval=None):
"""
Take <count> measurements in <interval>
@details:
- Resets the buffers
- Takes <count> measurements depending on settings["interval"]
Uses the device internal overlappedY measurement method, which allows for greater precision
You can take the data from the buffer afterwards, using save_csv
@param count: count
@param interval: interval, defaults to settings["interval"]
"""
if not interval: interval = settings["interval"]
update_func = _update_print
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
try:
_measure.measure_count(dev, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05)
except KeyboardInterrupt:
if args["keithley"]:
dev.write(f"smua.source.output = smua.OUTPUT_OFF")
print("Monitoring cancelled, measurement might still continue" + " "*50)
else:
print("Measurement finished" + " "*50)
dev:VDev|None = None
data = DataCollector(settings["datadir"], settings["name"])
t0 = 0
def monitor(interval=None, max_measurements=None, max_points_shown=160):
"""
@ -154,10 +91,17 @@ def monitor(interval=None, max_measurements=None, max_points_shown=160):
if not interval: interval = settings["interval"]
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown)
update_func = plt_monitor.update
data.clear()
def update_func(i, t, v):
global t0
if i == 0:
t0 = t
t -= t0
data.add_data((t, v))
plt_monitor.update(i, t, v)
# update_led()
dev.measure(interval=interval, max_measurements=max_measurements, update_func=update_func)
def measure(interval=None, max_measurements=None):
"""
Measure voltages
@ -178,19 +122,6 @@ def measure(interval=None, max_measurements=None):
dev.measure(interval=interval, max_measurements=max_measurements, update_func=update_func)
def get_dataframe():
"""
Get a pandas dataframe from the data in smua.nvbuffer1 and smua.nvbuffer2
"""
global k, settings, _runtime_vars
ibuffer = _backend.collect_buffer(dev, 1)
vbuffer = _backend.collect_buffer(dev, 2)
df = _data.buffers2dataframe(ibuffer, vbuffer)
df.basename = file_io.get_next_filename(settings["name"], settings["datadir"])
df.name = f"{df.basename} @ {_runtime_vars['last-measurement']}"
return df
def save_csv():
"""
Saves the contents of nvbuffer1 as .csv
@ -281,13 +212,13 @@ Run 'help("topic")' to see more information on a topic""")
Functions:
name("<name>") - short for set("name", "<name>")
set("setting", value) - set a setting to a value
save_settings() - store the settings as "m-teng.json" in the working directory
save_settings() - store the settings as "cpdctrl.json" in the working directory
load_settings() - load settings from a file
The global variable 'config_path' determines the path used by save/load_settings. Use -c '<path>' to set another path.
The serach path is:
<working-dir>/m-teng.json
$XDG_CONFIG_HOME/m-teng.json
~/.config/m-teng.json
<working-dir>/cpdctrl.json
$XDG_CONFIG_HOME/cpdctrl.json
~/.config/cpdctrl.json
""")
elif topic == "imports":
print("""Imports:
@ -309,23 +240,23 @@ Functions:
def init():
global dev, settings, config_path
print(r""" ______________________ _______ ________
_____ \__ ___/\_ _____/ \ \ / _____/
/ \ ______| | | __)_ / | \ / \ ___
| Y Y \/_____/| | | \/ | \\ \_\ \
|__|_| / |____| /_______ /\____|__ / \______ /
\/ \/ \/ \/ 1.2
Interactive Shell for TENG measurements with Keithley 2600B
print(r""" .___ __ .__
____ ______ __| _/_____/ |________| |
_/ ___\\____ \ / __ |/ ___\ __\_ __ \ |
\ \___| |_> > /_/ \ \___| | | | \/ |__
\___ > __/\____ |\___ >__| |__| |____/
\/|__| \/ \/ """ + f"""{version}
Interactive Shell for CPD measurements with Keithley 2700B
---
Enter 'help()' for a list of commands""")
from os import environ
if path.isfile("m-teng.json"):
config_path = "m-teng.json"
if path.isfile("cpdctrl.json"):
config_path = "cpdctrl.json"
elif 'XDG_CONFIG_HOME' in environ.keys():
# and path.isfile(environ["XDG_CONFIG_HOME"] + "/m-teng.json"):
config_path = environ["XDG_CONFIG_HOME"] + "/m-teng.json"
# and path.isfile(environ["XDG_CONFIG_HOME"] + "/cpdctrl.json"):
config_path = environ["XDG_CONFIG_HOME"] + "/cpdctrl.json"
else:
config_path = path.expanduser("~/.config/m-teng.json")
config_path = path.expanduser("~/.config/cpdctrl.json")
if args["config"]:
config_path = args["config"]
@ -340,7 +271,8 @@ Enter 'help()' for a list of commands""")
makedirs(settings["datadir"])
try:
dev = _backend.init()
dev = _volt.init()
led = _led.LEDD1B()
except Exception as e:
print(e)
exit(1)

View File

@ -0,0 +1,46 @@
from abc import ABC, abstractmethod
from typing import Callable
"""
Created on Tue Jan 21 16:26:13 2025
@author: Matthias Quintern
"""
class LedControlDevice(ABC):
@abstractmethod
def on(self):
"""
Set the led brightness to 100%
Returns
-------
None.
"""
pass
@abstractmethod
def off(self):
"""
Set the led brightness to 0%
Returns
-------
None.
"""
pass
# @abstractmethod
def set_level(level:int):
"""
Set the led brightness to a certain level
Parameters
----------
level : int
Brightness level in percent.
Returns
-------
None.
"""
pass

View File

@ -1,9 +1,11 @@
import serial
class LEDD1B:
from ..base import LedControlDevice
class LEDD1B(LedControlDevice):
def __init__(self, port="COM4"):
self.arduino = serial.Serial(port=port, baudrate=9600, timeout=.1)
self._check_arduino_software()
# self._check_arduino_software()
def __del__(self):
self.arduino.close()

View File

@ -6,7 +6,7 @@ def _update_print(i, tval, vval):
class _Monitor:
"""
Monitor v and i data
Monitor v and i data in a matplotlib window
"""
def __init__(self, max_points_shown=None, use_print=False):
self.max_points_shown = max_points_shown

View File

@ -3,6 +3,26 @@ import numpy as np
from os import path
import matplotlib.pyplot as plt
class DataCollector:
def __init__(self, data_name, data_path):
self.data = []
self.name = data_name
self.path = data_path
def clear(self):
self.data = []
def add_data(self, data):
self.data.append(data)
def to_dataframe(self):
return pd.DataFrame(self.data)
def save_csv(self):
self.to_dataframe().to_csv(path.join(self.path, self.name + ".csv"), index=False, header=True)
# deprecated
# def buffer2dataframe(buffer):
# df = pd.DataFrame(buffer)

View File

@ -0,0 +1,53 @@
from abc import ABC, abstractmethod
from typing import Callable
"""
Created on Tue Jan 21 16:19:01 2025
@author: Matthias Quintern
"""
class VoltageMeasurementDevice(ABC):
# RUN COMMANDS ON THE DEVICE
@abstractmethod
def run(self, code, verbose=False):
pass
@abstractmethod
def run_script(self, script_path, verbose=False):
pass
@abstractmethod
def reset(self, verbose=False):
pass
@abstractmethod
def read_value(self) -> tuple[float, float]:
"""
Read a single value
Returns
-------
[timestamp, voltage]
"""
pass
@abstractmethod
def measure(self, interval: int, update_func: Callable[None, [int, float, float]]|None=None, max_measurements:int|None=None):
"""
Take voltage readings after <interval> milliseconds.
Parameters
----------
interval : int
Number of milliseconds to wait between readings.
update_func : Callable[None, [int, float, float]] or None, optional
A function that is called after each reading with parameters <n_reading>, <time>, <voltage>. The default is None.
max_measurements : int or None, optional
Number of readings to perform. Set to None for infinite. The default is None.
Returns
-------
None.
"""
pass

View File

@ -1,8 +1,10 @@
import pyvisa
import numpy as np
from time import sleep
# import pkg_resources
import os
from typing import Callable
from ..base import VoltageMeasurementDevice
"""
Utility
@ -17,27 +19,36 @@ scripts = {
"instrument_reset": "~/cpd-dev/cpdctrl/cpdctrl/keithley_scripts/reset.scpi",
}
def select_visa_device(visa_backend=""):
rm = pyvisa.ResourceManager(visa_backend)
resources = rm.list_resources()
if len(resources) < 1:
raise Exception("No resources found.")
elif len(resources) == 1:
print(f"Opening the only resource found: {resources[0]}")
return rm.open_resource(resources[0])
else: # len(resources) > 1:
print("Resources:")
for i, r in enumerate(resources):
print(f"{i+1:02}: {r}")
while True:
try:
instr = int(input("Select an instrument: ")) - 1
if instr < 0 or instr >= len(resources):
raise ValueError
return rm.open_resource(resources[instr])
except ValueError:
print(f"Enter a number between 1 and {len(resources)}")
continue
class Keithley2700:
class Keithley2700(VoltageMeasurementDevice):
"""
Wrapper class for the Keithley2700 SMU controlled via pyvisa
"""
def __init__(self, visa_backend="", check_front_switch=True):
rm = pyvisa.ResourceManager(visa_backend)
resources = rm.list_resources()
if len(resources) < 1:
raise Exception("No resources found.")
elif len(resources) == 1:
print(f"Opening the only resource found: {resources[0]}")
self.instr = rm.open_resource(resources[0])
elif len(resources) > 1:
print(f"Resources:")
for i, r in enumerate(resources):
print(f"{i:02}: {r}")
instr = int(input("Select an instrument: "))
self.instr = rm.open_resource(resources[instr])
def __init__(self, instr, check_front_switch=True):
self.instr = instr
if check_front_switch:
self._check_front_input_selected()
@ -63,6 +74,7 @@ class Keithley2700:
raise Exception("The Keithley's INPUT switch must select the [F]ront inputs")
return switch
# RUN COMMANDS ON THE DEVICE
def run(self, code, verbose=False):
"""
Run SCPI code on the device by writing it.
@ -79,8 +91,7 @@ class Keithley2700:
def run_script(self, script_path, verbose=False):
"""
Run a script from the host on the instrument
@param script_path : full path to the script
Load the code from script_path and run it via self.run
Parameters
----------
@ -104,7 +115,6 @@ class Keithley2700:
pass
def reset(self, verbose=False):
"""
Reset smua and its buffers
@ -113,6 +123,8 @@ class Keithley2700:
self.run_script(scripts["instrument_reset"], verbose=verbose)
self.buffer_reset()
# INTERACT WITH DEVICE BUFFER
# might not be needed
def buffer_reset(self):
buffer_reset = """
TRACe:CLEar
@ -122,64 +134,23 @@ class Keithley2700:
self.run(buffer_reset)
def buffer_get_size(self, buffer_nr=1):
n = self.instr.query(f"TRACe:POINts?").strip("\n")
n = self.instr.query("TRACe:POINts?").strip("\n")
return int(float(n))
def buffer_set_size(s):
def buffer_set_size(self, s):
if not type(s) == int or s < 2 or s > 55000:
raise ValueError(f"Invalid buffer size: {s}. Must be int and between 2 and 55000")
self.instr.write(f"TRACe:POINts {s}")
def buffer_collect(self, verbose=False):
"""
Get the buffer as 2D - np.array
@param buffer_nr : 1 or 2, for smua.nvbuffer1 or 2
@returns 2D numpy array:
i - ith reading:
0: timestamps
1: readings
"""
readings = self.instr.query("TRACe:DATA?")
tdata = []
vdata = []
for reading in readings.split("\n"):
t, v = self.process_reading(reading)
tdata.append(t)
vdata.append(v)
if verbose:
print(f"readings from buffer:\n{vdata}\ntimestamps:\n{tdata}")
buffer = np.vstack((tdata, vdata)).T
return buffer
def collect_buffer_range(self, range_=(1, -1), buffer_nr=1, verbose=False):
"""
Get the buffer as 2D - np.array
@param instr : pyvisa instrument
@param buffer_nr : 1 or 2, for smua.nvbuffer1 or 2
@returns 2D numpy array:
i - ith reading:
0: timestamps
1: readings
"""
buffername = get_buffer_name(buffer_nr)
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
if range_[1] == -1:
range_ = (range_[0], f"{buffername}.n")
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
timestamps = instr.query_ascii_values(f"printbuffer({range_[0]}, {range_[1]}, {buffername}.timestamps)", container=np.array)
readings = instr.query_ascii_values(f"printbuffer({range_[0]}, {range_[1]}, {buffername}.readings)", container=np.array)
if verbose:
print(f"readings from {buffername}: {readings}, \ntimestamps: {timestamps}")
buffer = np.vstack((timestamps, readings)).T
return buffer
# MEASUREMENT
def process_reading(self, reading: str):
"""
process a reading. Only works with VDC and relative time stamps right now!
'-1.19655066E+01VDC,+9627.275SECS,+64993RDNG#\n'
Returns
-------
[timestamp, voltage]
"""
parts = reading.split(",")
if len(parts) != 3:
@ -190,22 +161,33 @@ class Keithley2700:
return timestamp, vdc
def read_value(self):
"""
Read a single value
Returns
-------
[timestamp, voltage]
"""
reading = self.instr.query("READ?")
return self.process_reading(reading)
def measure(self, interval, update_func=None, max_measurements=None):
def measure(self, interval: int, update_func: Callable[None, [int, float, float]]|None=None, max_measurements:int|None=None):
"""
@details:
- Resets the buffers
- Until KeyboardInterrupt:
- Take measurement
- Call update_func
- Wait interval
Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision
You can take the data from the buffer afterwards, using save_csv
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param max_measurements : maximum number of measurements. None means infinite
Take voltage readings after <interval> milliseconds.
Parameters
----------
interval : int
Number of milliseconds to wait between readings.
update_func : Callable[None, [int, float, float]] or None, optional
A function that is called after each reading with parameters <n_reading>, <time>, <voltage>. The default is None.
max_measurements : int or None, optional
Number of readings to perform. Set to None for infinite. The default is None.
Returns
-------
None.
"""
self.reset(verbose=True)
try:
@ -218,8 +200,9 @@ class Keithley2700:
i += 1
except KeyboardInterrupt:
pass
# instr.write("smua.source.output = smua.OUTPUT_OFF")
print("Measurement stopped" + " "*50)
def init():
return Keithley2700()
instr = select_visa_device()
return Keithley2700(instr)