Compare commits
No commits in common. "b526bfb9135b3295e266737b22c430c677977de1" and "3e79e2ea4fbb9d79f09505f04cec053a4420dc07" have entirely different histories.
b526bfb913
...
3e79e2ea4f
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,3 +1,2 @@
|
||||
__pycache__
|
||||
.ipynb_checkpoints
|
||||
testing
|
||||
|
@ -1,2 +0,0 @@
|
||||
include regina/package-data/*
|
||||
include regina/sql/*.sql
|
@ -1 +0,0 @@
|
||||
bspc rule -a matplotlib desktop=^1 state=pseudo_tiled focus=off
|
@ -1,6 +1,6 @@
|
||||
"""
|
||||
run this before using this library:
|
||||
ipython -i m_teng_interactive.py
|
||||
ipython -i k_teng_interactive.py
|
||||
|
||||
always records iv-t curves
|
||||
i-data -> smua.nvbuffer1
|
||||
@ -17,56 +17,22 @@ from time import sleep
|
||||
from os import path, makedirs
|
||||
import pickle as pkl
|
||||
import json
|
||||
import atexit
|
||||
|
||||
import argparse
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
if __package__ is None:
|
||||
# make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change
|
||||
__package__ = "m_teng"
|
||||
__package__ = "k-teng"
|
||||
import sys
|
||||
from os import path
|
||||
filepath = path.realpath(path.abspath(__file__))
|
||||
sys.path.insert(0, path.dirname(path.dirname(filepath)))
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="m-teng",
|
||||
description="measure triboelectric nanogenerator output using a Keithley SMU or an Arduino",
|
||||
)
|
||||
backend_group = parser.add_mutually_exclusive_group(required=True)
|
||||
backend_group.add_argument("-k", "--keithley", action="store_true")
|
||||
backend_group.add_argument("-a", "--arduino", action="store_true")
|
||||
backend_group.add_argument("-t", "--testing", action='store_true')
|
||||
parser.add_argument("-c", "--config", action="store", help="alternate path to config file")
|
||||
args = vars(parser.parse_args())
|
||||
|
||||
i = 1
|
||||
while i < len(sys.argv):
|
||||
if args["keithley"]:
|
||||
import m_teng.backends.keithley.keithley as _backend
|
||||
import m_teng.backends.keithley.measure as _measure
|
||||
elif args["arduino"]:
|
||||
import m_teng.backends.arduino.arduino as _backend
|
||||
import m_teng.backends.arduino.measure as _measure
|
||||
elif args["testing"]:
|
||||
import m_teng.backends.testing.testing as _backend
|
||||
import m_teng.backends.testing.measure as _measure
|
||||
elif sys.argv[i] in ["-c", "--config"]:
|
||||
if i+1 < len(sys.argv):
|
||||
config_path = sys.argv[i+1]
|
||||
else:
|
||||
print("-c requires an extra argument: path of config file")
|
||||
i += 1
|
||||
i += 1
|
||||
|
||||
|
||||
from m_teng.utility import data as _data
|
||||
from m_teng.utility.data import load_dataframe
|
||||
from m_teng.utility import file_io
|
||||
from m_teng.update_funcs import _Monitor, _ModelPredict, _update_print
|
||||
|
||||
config_path = path.expanduser("~/.config/k-teng.json")
|
||||
from .keithley import keithley as _keithley
|
||||
from .keithley.measure import measure_count as _measure_count, measure as _measure
|
||||
from .utility import data as _data
|
||||
from .utility.data import load_dataframe
|
||||
from .utility import file_io
|
||||
|
||||
_runtime_vars = {
|
||||
"last-measurement": ""
|
||||
@ -78,35 +44,66 @@ settings = {
|
||||
"interval": 0.02,
|
||||
"beep": True,
|
||||
}
|
||||
config_path = path.expanduser("~/.config/k-teng.json")
|
||||
|
||||
test = False
|
||||
|
||||
# global variable for the instrument/client returned by pyvisa/bleak
|
||||
dev = None
|
||||
# global variable for the instrument returned by pyvisa
|
||||
k = None
|
||||
|
||||
|
||||
def monitor_predict(model_dir: str, count=5000, interval=settings["interval"], max_points_shown=160):
|
||||
|
||||
def _update_print(i, ival, vval):
|
||||
print(f"n = {i:5d}, I = {ival: .12f} A, U = {vval: .5f} V" + " "*10, end='\r')
|
||||
|
||||
class _Monitor:
|
||||
"""
|
||||
Take <count> measurements in <interval> and predict with a machine learning model
|
||||
Monitor v and i data
|
||||
"""
|
||||
model_predict = _ModelPredict(dev, model_dir)
|
||||
plt_monitor = _Monitor(max_points_shown, use_print=False)
|
||||
skip_n = 0
|
||||
def update(i, ival, vval):
|
||||
plt_monitor.update(i, ival, vval)
|
||||
if skip_n % 10 == 0:
|
||||
model_predict.update(i, ival, vval)
|
||||
skip_n += 1
|
||||
def __init__(self, max_points_shown=None, use_print=False):
|
||||
self.max_points_shown = max_points_shown
|
||||
self.use_print = use_print
|
||||
self.index = []
|
||||
self.vdata = []
|
||||
self.idata = []
|
||||
|
||||
plt.ion()
|
||||
self.fig1, (self.vax, self.iax) = plt.subplots(2, 1, figsize=(8, 5))
|
||||
|
||||
self.vline, = self.vax.plot(self.index, self.vdata, color="g")
|
||||
self.vax.set_ylabel("Voltage [V]")
|
||||
self.vax.grid(True)
|
||||
|
||||
self.iline, = self.iax.plot(self.index, self.idata, color="m")
|
||||
self.iax.set_ylabel("Current [A]")
|
||||
self.iax.grid(True)
|
||||
|
||||
def update(self, i, ival, vval):
|
||||
if self.use_print:
|
||||
_update_print(i, ival, vval)
|
||||
self.index.append(i)
|
||||
self.idata.append(ival)
|
||||
self.vdata.append(vval)
|
||||
# update data
|
||||
self.iline.set_xdata(self.index)
|
||||
self.iline.set_ydata(self.idata)
|
||||
self.vline.set_xdata(self.index)
|
||||
self.vline.set_ydata(self.vdata)
|
||||
# recalculate limits and set them for the view
|
||||
self.iax.relim()
|
||||
self.vax.relim()
|
||||
if self.max_points_shown and i > self.max_points_shown:
|
||||
self.iax.set_xlim(i - self.max_points_shown, i)
|
||||
self.vax.set_xlim(i - self.max_points_shown, i)
|
||||
self.iax.autoscale_view()
|
||||
self.vax.autoscale_view()
|
||||
# update plot
|
||||
self.fig1.canvas.draw()
|
||||
self.fig1.canvas.flush_events()
|
||||
|
||||
def __del__(self):
|
||||
plt.close(self.fig1)
|
||||
|
||||
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
|
||||
try:
|
||||
_measure.measure_count(dev, count=count, interval=interval, beep_done=False, verbose=False, update_func=update, update_interval=0.1)
|
||||
except KeyboardInterrupt:
|
||||
if args["keithley"]:
|
||||
dev.write(f"smua.source.output = smua.OUTPUT_OFF")
|
||||
print("Monitoring cancelled, measurement might still continue" + " "*50)
|
||||
else:
|
||||
print("Measurement finished" + " "*50)
|
||||
|
||||
def monitor_count(count=5000, interval=settings["interval"], max_points_shown=160):
|
||||
"""
|
||||
@ -126,10 +123,10 @@ def monitor_count(count=5000, interval=settings["interval"], max_points_shown=16
|
||||
|
||||
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
|
||||
try:
|
||||
_measure.measure_count(dev, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05)
|
||||
_measure_count(k, V=True, I=True, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05, testing=test)
|
||||
except KeyboardInterrupt:
|
||||
if args["keithley"]:
|
||||
dev.write(f"smua.source.output = smua.OUTPUT_OFF")
|
||||
if not test:
|
||||
k.write(f"smua.source.output = smua.OUTPUT_OFF")
|
||||
print("Monitoring cancelled, measurement might still continue" + " "*50)
|
||||
else:
|
||||
print("Measurement finished" + " "*50)
|
||||
@ -150,10 +147,10 @@ def measure_count(count=5000, interval=settings["interval"]):
|
||||
|
||||
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
|
||||
try:
|
||||
_measure.measure_count(dev, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05)
|
||||
_measure_count(k, V=True, I=True, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05, testing=test)
|
||||
except KeyboardInterrupt:
|
||||
if args["keithley"]:
|
||||
dev.write(f"smua.source.output = smua.OUTPUT_OFF")
|
||||
if not test:
|
||||
k.write(f"smua.source.output = smua.OUTPUT_OFF")
|
||||
print("Monitoring cancelled, measurement might still continue" + " "*50)
|
||||
else:
|
||||
print("Measurement finished" + " "*50)
|
||||
@ -179,7 +176,7 @@ def monitor(interval=settings["interval"], max_measurements=None, max_points_sho
|
||||
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
|
||||
plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown)
|
||||
update_func = plt_monitor.update
|
||||
_measure.measure(dev, interval=interval, max_measurements=max_measurements, update_func=update_func)
|
||||
_measure(k, interval=interval, max_measurements=max_measurements, update_func=update_func, testing=test)
|
||||
|
||||
|
||||
def measure(interval=settings["interval"], max_measurements=None):
|
||||
@ -198,7 +195,7 @@ def measure(interval=settings["interval"], max_measurements=None):
|
||||
_runtime_vars["last_measurement"] = dtime.now().isoformat()
|
||||
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
|
||||
update_func = _update_print
|
||||
_measure.measure(dev, interval=interval, max_measurements=max_measurements, update_func=update_func)
|
||||
_measure(k, interval=interval, max_measurements=max_measurements, update_func=update_func, testing=test)
|
||||
|
||||
|
||||
def repeat(measure_func: callable, count: int, repeat_delay=0):
|
||||
@ -224,7 +221,7 @@ def repeat(measure_func: callable, count: int, repeat_delay=0):
|
||||
sleep(repeat_delay)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
if settings["beep"]: _backend.beep()
|
||||
if settings["beep"]: k.write("beeper.beep(0.3, 1000)")
|
||||
|
||||
|
||||
def get_dataframe():
|
||||
@ -232,14 +229,21 @@ def get_dataframe():
|
||||
Get a pandas dataframe from the data in smua.nvbuffer1 and smua.nvbuffer2
|
||||
"""
|
||||
global k, settings, _runtime_vars
|
||||
ibuffer = _backend.collect_buffer(dev, 1)
|
||||
vbuffer = _backend.collect_buffer(dev, 2)
|
||||
if test:
|
||||
timestamps = np.arange(0, 50, 0.01)
|
||||
ydata = np.array([testing.testcurve(t, amplitude=15, peak_width=2) for t in timestamps])
|
||||
ibuffer = np.vstack((timestamps, ydata)).T
|
||||
|
||||
ydata = np.array([-testing.testcurve(t, amplitude=5e-8, peak_width=1) for t in timestamps])
|
||||
vbuffer = np.vstack((timestamps, ydata)).T
|
||||
else:
|
||||
ibuffer = _keithley.collect_buffer(k, 1)
|
||||
vbuffer = _keithley.collect_buffer(k, 2)
|
||||
df = _data.buffers2dataframe(ibuffer, vbuffer)
|
||||
df.basename = file_io.get_next_filename(settings["name"], settings["datadir"])
|
||||
df.name = f"{df.basename} @ {_runtime_vars['last-measurement']}"
|
||||
return df
|
||||
|
||||
|
||||
def save_csv():
|
||||
"""
|
||||
Saves the contents of nvbuffer1 as .csv
|
||||
@ -263,7 +267,6 @@ def save_pickle():
|
||||
df.to_pickle(filename)
|
||||
print(f"Saved as '{filename}'")
|
||||
|
||||
|
||||
def run_script(script_path):
|
||||
"""
|
||||
Run a lua script on the Keithley device
|
||||
@ -273,7 +276,7 @@ def run_script(script_path):
|
||||
if test:
|
||||
print("run_script: Test mode enabled, ignoring call to run_script")
|
||||
else:
|
||||
_keithley.run_lua(dev, script_path=script_path)
|
||||
_keithley.run_lua(k, script_path=script_path)
|
||||
|
||||
|
||||
def set(setting, value):
|
||||
@ -302,16 +305,16 @@ def help(topic=None):
|
||||
if topic == None:
|
||||
print("""
|
||||
Functions:
|
||||
measure [kat] - take measurements
|
||||
monitor [kat] - take measurements with live monitoring in a matplotlib window
|
||||
measure_count [kat] - take a fixed number of measurements
|
||||
monitor_count [kat] - take a fixed number of measurements with live monitoring in a matplotlib window
|
||||
repeat [kat] - measure and save to csv multiple times
|
||||
get_dataframe [kat] - return device internal buffer as pandas dataframe
|
||||
save_csv [kat] - save the last measurement as csv file
|
||||
save_pickle [kat] - save the last measurement as pickled pandas dataframe
|
||||
load_dataframe [kat] - load a pandas dataframe from csv or pickle
|
||||
run_script [k ] - run a lua script on the Keithely device
|
||||
measure - take measurements
|
||||
monitor - take measurements with live monitoring in a matplotlib window
|
||||
measure_count - take a fixed number of measurements
|
||||
monitor_count - take a fixed number of measurements with live monitoring in a matplotlib window
|
||||
repeat - measure and save to csv multiple times
|
||||
get_dataframe - return smua.nvbuffer 1 and 2 as pandas dataframe
|
||||
save_csv - save the last measurement as csv file
|
||||
save_pickle - save the last measurement as pickled pandas dataframe
|
||||
load_dataframe - load a pandas dataframe from csv or pickle
|
||||
run_script - run a lua script on the Keithely device
|
||||
Run 'help(function)' to see more information on a function
|
||||
|
||||
Available topics:
|
||||
@ -330,13 +333,13 @@ Run 'help("topic")' to see more information on a topic""")
|
||||
Functions:
|
||||
name("<name>") - short for set("name", "<name>")
|
||||
set("setting", value) - set a setting to a value
|
||||
save_settings() - store the settings as "m-teng.json" in the working directory
|
||||
save_settings() - store the settings as "k-teng.conf" in the working directory
|
||||
load_settings() - load settings from a file
|
||||
The global variable 'config_path' determines the path used by save/load_settings. Use -c '<path>' to set another path.
|
||||
The serach path is:
|
||||
<working-dir>/m-teng.json
|
||||
$XDG_CONFIG_HOME/m-teng.json
|
||||
~/.config/m-teng.json
|
||||
<working-dir>/k-teng.json
|
||||
$XDG_CONFIG_HOME/k-teng.json
|
||||
~/.config/k-teng.json
|
||||
""")
|
||||
elif topic == "imports":
|
||||
print("""Imports:
|
||||
@ -346,37 +349,45 @@ Functions:
|
||||
os.path """)
|
||||
elif topic == "device":
|
||||
print("""Device:
|
||||
keithley backend:
|
||||
The opened pyvisa resource (deveithley device) is the global variable 'dev'.
|
||||
You can interact using pyvisa functions, such as
|
||||
k.write("command"), k.query("command") etc. to interact with the device.
|
||||
arduino backend:
|
||||
The Arduino will be avaiable as BleakClient using the global variable 'dev'. """)
|
||||
The opened pyvisa resource (Keithley device) is the global variable 'k'.
|
||||
You can interact using pyvisa functions, such as
|
||||
k.write("command"), k.query("command") etc. to interact with the device.""")
|
||||
else:
|
||||
print(topic.__doc__)
|
||||
|
||||
|
||||
def init():
|
||||
global dev, settings, config_path
|
||||
print(r""" ______________________ _______ ________
|
||||
_____ \__ ___/\_ _____/ \ \ / _____/
|
||||
/ \ ______| | | __)_ / | \ / \ ___
|
||||
| Y Y \/_____/| | | \/ | \\ \_\ \
|
||||
|__|_| / |____| /_______ /\____|__ / \______ /
|
||||
\/ \/ \/ \/ 1.2
|
||||
global k, settings, test, config_path
|
||||
print(r""" ____ __. ______________________ _______ ________
|
||||
| |/ _| \__ ___/\_ _____/ \ \ / _____/
|
||||
| < ______ | | | __)_ / | \ / \ ___
|
||||
| | \ /_____/ | | | \/ | \\ \_\ \
|
||||
|____|__ \ |____| /_______ /\____|__ / \______ /
|
||||
\/ \/ \/ \/ 1.1
|
||||
Interactive Shell for TENG measurements with Keithley 2600B
|
||||
---
|
||||
Enter 'help()' for a list of commands""")
|
||||
from os import environ
|
||||
if path.isfile("m-teng.json"):
|
||||
config_path = "m-teng.json"
|
||||
if path.isfile("k-teng.json"):
|
||||
config_path = "k-teng.json"
|
||||
elif 'XDG_CONFIG_HOME' in environ.keys():
|
||||
# and path.isfile(environ["XDG_CONFIG_HOME"] + "/m-teng.json"):
|
||||
config_path = environ["XDG_CONFIG_HOME"] + "/m-teng.json"
|
||||
# and path.isfile(environ["XDG_CONFIG_HOME"] + "/k-teng.json"):
|
||||
config_path = environ["XDG_CONFIG_HOME"] + "/k-teng.json"
|
||||
else:
|
||||
config_path = path.expanduser("~/.config/m-teng.json")
|
||||
if args["config"]:
|
||||
config_path = args["config"]
|
||||
config_path = path.expanduser("~/.config/k-teng.json")
|
||||
|
||||
from sys import argv
|
||||
i = 1
|
||||
while i < len(argv):
|
||||
if argv[i] in ["-t", "--test"]:
|
||||
test = True
|
||||
elif argv[i] in ["-c", "--config"]:
|
||||
if i+1 < len(argv):
|
||||
config_path = argv[i+1]
|
||||
else:
|
||||
print("-c requires an extra argument: path of config file")
|
||||
i += 1
|
||||
i += 1
|
||||
|
||||
|
||||
if not path.isdir(path.dirname(config_path)):
|
||||
@ -388,12 +399,15 @@ Enter 'help()' for a list of commands""")
|
||||
if not path.isdir(settings["datadir"]):
|
||||
makedirs(settings["datadir"])
|
||||
|
||||
try:
|
||||
dev = _backend.init(beep_success=settings["beep"])
|
||||
except Exception as e:
|
||||
print(e)
|
||||
exit(1)
|
||||
atexit.register(_backend.exit, dev)
|
||||
if not test:
|
||||
from .keithley.keithley import init_keithley
|
||||
try:
|
||||
k = init_keithley(beep_success=settings["beep"])
|
||||
except Exception as e:
|
||||
print(e)
|
||||
exit()
|
||||
else:
|
||||
print("Running in test mode, device will not be connected.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
@ -1,19 +1,20 @@
|
||||
import pyvisa
|
||||
import numpy as np
|
||||
import pkg_resources
|
||||
|
||||
|
||||
"""
|
||||
Utility
|
||||
"""
|
||||
|
||||
script_dir = "../scripts/"
|
||||
scripts = {
|
||||
"buffer_reset": pkg_resources.resource_filename("m_teng", "keithley_scripts/buffer_reset.lua"),
|
||||
"smua_reset": pkg_resources.resource_filename("m_teng", "keithley_scripts/smua_reset.lua"),
|
||||
"buffer_reset": "buffer_reset.lua",
|
||||
"smua_reset": "smua_reset.lua",
|
||||
}
|
||||
for key,val in scripts.items():
|
||||
scripts[key] = script_dir + scripts[key]
|
||||
|
||||
|
||||
def init(beep_success=True):
|
||||
def init_keithley(beep_success=True):
|
||||
rm = pyvisa.ResourceManager('@py')
|
||||
resources = rm.list_resources()
|
||||
if len(resources) < 1:
|
||||
@ -31,11 +32,6 @@ def init(beep_success=True):
|
||||
return keithley
|
||||
|
||||
|
||||
def exit(instr):
|
||||
instr.close()
|
||||
|
||||
|
||||
|
||||
def run_lua(instr, script_path, verbose=False):
|
||||
"""
|
||||
Run a lua script from the host on the instrument
|
||||
@ -56,15 +52,6 @@ def reset(instr, verbose=False):
|
||||
run_lua(instr, scripts["smua_reset"], verbose=verbose)
|
||||
run_lua(instr, scripts["buffer_reset"], verbose=verbose)
|
||||
|
||||
def get_buffer_name(buffer_nr: int):
|
||||
if buffer_nr == 2: return "smua.nvbuffer2"
|
||||
elif buffer_nr == 1: return "smua.nvbuffer1"
|
||||
raise ValueError(f"Invalid buffer_nr: {buffer_nr}")
|
||||
|
||||
def get_buffer_size(instr, buffer_nr=1):
|
||||
n = instr.query(f"print({get_buffer_name(buffer_nr)}.n)").strip("\n")
|
||||
return int(float(n))
|
||||
|
||||
|
||||
def collect_buffer(instr, buffer_nr=1, verbose=False):
|
||||
"""
|
||||
@ -76,7 +63,8 @@ def collect_buffer(instr, buffer_nr=1, verbose=False):
|
||||
0: timestamps
|
||||
1: readings
|
||||
"""
|
||||
buffername = get_buffer_name(buffer_nr)
|
||||
if buffer_nr == 2: buffername = "smua.nvbuffer2"
|
||||
else: buffername = "smua.nvbuffer1"
|
||||
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
|
||||
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
|
||||
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
|
||||
@ -87,30 +75,3 @@ def collect_buffer(instr, buffer_nr=1, verbose=False):
|
||||
buffer = np.vstack((timestamps, readings)).T
|
||||
return buffer
|
||||
|
||||
|
||||
|
||||
def collect_buffer_range(instr, range_=(1, -1), buffer_nr=1, verbose=False):
|
||||
"""
|
||||
Get the buffer as 2D - np.array
|
||||
@param instr : pyvisa instrument
|
||||
@param buffer_nr : 1 or 2, for smua.nvbuffer1 or 2
|
||||
@returns 2D numpy array:
|
||||
i - ith reading:
|
||||
0: timestamps
|
||||
1: readings
|
||||
"""
|
||||
buffername = get_buffer_name(buffer_nr)
|
||||
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
|
||||
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
|
||||
if range_[1] == -1:
|
||||
range_ = (range_[0], f"{buffername}.n")
|
||||
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
|
||||
timestamps = instr.query_ascii_values(f"printbuffer({range_[0]}, {range_[1]}, {buffername}.timestamps)", container=np.array)
|
||||
readings = instr.query_ascii_values(f"printbuffer({range_[0]}, {range_[1]}, {buffername}.readings)", container=np.array)
|
||||
if verbose:
|
||||
print(f"readings from {buffername}: {readings}, \ntimestamps: {timestamps}")
|
||||
buffer = np.vstack((timestamps, readings)).T
|
||||
return buffer
|
||||
|
||||
|
||||
|
107
k-teng/keithley/measure.py
Normal file
107
k-teng/keithley/measure.py
Normal file
@ -0,0 +1,107 @@
|
||||
from time import sleep
|
||||
import numpy as np
|
||||
from matplotlib import pyplot as plt
|
||||
import pyvisa
|
||||
|
||||
from .keithley import reset
|
||||
from ..utility import testing as _testing
|
||||
|
||||
def measure_count(instr, V=True, I=True, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True, testing=False):
|
||||
"""
|
||||
Take <count> measurements with <interval> inbetween
|
||||
|
||||
@details
|
||||
Uses the devices overlappedY function to make the measurements asynchronosly
|
||||
The update_func is optional and only used when I == True and V == True
|
||||
The update_func does not necessarily get all the values that are measured. To obtain the whole measurement, get them from the device buffers (smua.nvbufferX)
|
||||
@param instr: pyvisa instrument
|
||||
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
|
||||
@param update_interval: interval at which the update_func is called
|
||||
"""
|
||||
f_meas = None
|
||||
if V and I:
|
||||
f_meas = "smua.measure.overlappediv(smua.nvbuffer1, smua.nvbuffer2)"
|
||||
elif V:
|
||||
f_meas = "smua.measure.overlappedv(smua.nvbuffer1)"
|
||||
elif I:
|
||||
f_meas = "smua.measure.overlappedi(smua.nvbuffer1)"
|
||||
else:
|
||||
print("I and/or V needs to be set to True")
|
||||
return
|
||||
|
||||
i = 0
|
||||
if not testing:
|
||||
reset(instr, verbose=verbose)
|
||||
instr.write(f"smua.measure.count = {count}")
|
||||
instr.write(f"smua.measure.interval = {interval}")
|
||||
|
||||
# start measurement
|
||||
instr.write(f"smua.source.output = smua.OUTPUT_ON")
|
||||
instr.write(f_meas)
|
||||
|
||||
condition = lambda: float(instr.query("print(status.operation.measuring.condition)").strip("\n ")) != 0
|
||||
else:
|
||||
condition = lambda: i < int(float(count) * interval / update_interval)
|
||||
|
||||
sleep(update_interval)
|
||||
# for live viewing
|
||||
|
||||
query = """if smua.nvbufferX.n > 0 then print(smua.nvbufferX.readings[smua.nvbufferX.n]) else print(0) end"""
|
||||
# will return 2.0 while measruing
|
||||
while condition():
|
||||
if update_func and V and I:
|
||||
try:
|
||||
if not testing:
|
||||
ival = float(instr.query(query.replace("X", "1")).strip("\n"))
|
||||
vval = float(instr.query(query.replace("X", "2")).strip("\n"))
|
||||
else:
|
||||
ival = _testing.testcurve(i, peak_width=1, amplitude=5e-8)
|
||||
vval = -_testing.testcurve(i, peak_width=2, amplitude=15)
|
||||
update_func(i, ival, vval)
|
||||
except ValueError:
|
||||
pass
|
||||
sleep(update_interval)
|
||||
i += 1
|
||||
|
||||
if not testing:
|
||||
instr.write(f"smua.source.output = smua.OUTPUT_OFF")
|
||||
|
||||
if beep_done:
|
||||
instr.write("beeper.beep(0.3, 1000)")
|
||||
|
||||
|
||||
def measure(instr, interval, update_func=None, max_measurements=None, testing=False):
|
||||
"""
|
||||
@details:
|
||||
- Resets the buffers
|
||||
- Until KeyboardInterrupt:
|
||||
- Take measurement
|
||||
- Call update_func
|
||||
- Wait interval
|
||||
Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision
|
||||
You can take the data from the buffer afterwards, using save_csv
|
||||
@param instr: pyvisa instrument
|
||||
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
|
||||
@param max_measurements : maximum number of measurements. None means infinite
|
||||
"""
|
||||
if not testing:
|
||||
reset(instr, verbose=True)
|
||||
instr.write("smua.source.output = smua.OUTPUT_ON")
|
||||
instr.write("format.data = format.ASCII\nformat.asciiprecision = 12")
|
||||
try:
|
||||
i = 0
|
||||
while max_measurements is None or i < max_measurements:
|
||||
if testing:
|
||||
ival = _testing.testcurve(i, peak_width=1, amplitude=5e-8)
|
||||
vval = -_testing.testcurve(i, peak_width=2, amplitude=15)
|
||||
else:
|
||||
ival, vval = tuple(float(v) for v in instr.query("print(smua.measure.iv(smua.nvbuffer1, smua.nvbuffer2))").strip('\n').split('\t'))
|
||||
if update_func:
|
||||
update_func(i, ival, vval)
|
||||
sleep(interval)
|
||||
i += 1
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
if not testing:
|
||||
instr.write("smua.source.output = smua.OUTPUT_OFF")
|
||||
print("Measurement stopped" + " "*50)
|
4
k-teng/materials
Normal file
4
k-teng/materials
Normal file
@ -0,0 +1,4 @@
|
||||
pdms
|
||||
kapton
|
||||
plastic
|
||||
|
49
k-teng/test.py
Normal file
49
k-teng/test.py
Normal file
@ -0,0 +1,49 @@
|
||||
readings = [
|
||||
4.974127e-10, -1.418591e-12, -1.573563e-12, -1.728535e-12, -1.704693e-12,
|
||||
-1.847744e-12, -1.931190e-12, -1.788139e-12, -1.871586e-12, -2.169609e-12,
|
||||
-2.074242e-12, -1.895428e-12, -1.895428e-12, -1.776218e-12, -1.990795e-12,
|
||||
-1.788139e-12, -1.811981e-12, -1.657009e-12, -1.573563e-12, -2.396107e-12,
|
||||
-2.515316e-12, -2.765656e-12, -2.825260e-12, -3.147125e-12, -2.300739e-12,
|
||||
-2.825260e-12, -3.278255e-12, -5.257130e-12, -6.818771e-12, -8.916855e-12,
|
||||
-7.712841e-12, 6.437302e-12, -1.142025e-11, -1.206398e-11, -4.649043e-10,
|
||||
-3.427613e-09, -2.460408e-09, -2.340376e-09, -1.306653e-10, 1.496077e-11,
|
||||
2.933741e-11, 1.953280e-09, 8.579970e-10, 9.226799e-12, -1.095533e-11,
|
||||
-2.508163e-11, -2.776039e-09, -8.686423e-09, 4.935264e-12, 1.246929e-11,
|
||||
3.225744e-09, 2.814472e-09, 1.877034e-09, 2.229273e-09, 1.713574e-09,
|
||||
8.355618e-10, -4.332781e-10, 5.896091e-11, 5.762577e-11, 8.129537e-09,
|
||||
4.044378e-09, 1.771629e-09, 7.849216e-10, 4.098892e-10, 3.390551e-10,
|
||||
2.956390e-10, 3.033876e-10, 1.716256e-10, 1.463890e-11, -5.078316e-12,
|
||||
-6.949902e-12, -8.106232e-12, -6.473065e-12, -4.506111e-12, 4.919767e-11,
|
||||
3.052297e-08, 1.161162e-08, -9.892106e-09, -3.613818e-09, -5.004287e-09,
|
||||
-2.015829e-11, -4.183054e-11, -1.810908e-10, -2.042532e-10, -3.516316e-10,
|
||||
5.099773e-11, 1.921976e-08, -1.256589e-08, -4.242897e-10, -1.358986e-12,
|
||||
-3.445148e-12, -3.838539e-12, -4.184246e-12, -7.402897e-12, -2.840877e-10,
|
||||
-2.872229e-10, -2.730966e-10, -1.134396e-10, -4.376173e-11, -3.576279e-14
|
||||
]
|
||||
timestamps = [
|
||||
0. , 0.05 , 0.1 , 0.15, 0.2, 0.25, 0.3, 0.35,
|
||||
0.4 , 0.45 , 0.5 , 0.55, 0.6, 0.65, 0.7, 0.75,
|
||||
0.8 , 0.85 , 0.9 , 0.95, 1. , 1.05, 1.1, 1.15,
|
||||
1.2 , 1.25 , 1.3 , 1.35, 1.4, 1.45, 1.5, 1.55,
|
||||
1.6 , 1.690891, 1.710923, 1.75, 1.8, 1.85, 1.9, 1.95,
|
||||
2. , 2.05 , 2.1 , 2.15, 2.2, 2.25, 2.3, 2.35,
|
||||
2.420332, 2.45 , 2.5 , 2.55, 2.6, 2.65, 2.7, 2.75,
|
||||
2.820553, 2.890843, 2.910875, 2.95, 3. , 3.05, 3.1, 3.15,
|
||||
3.2 , 3.25 , 3.3 , 3.35, 3.4, 3.45, 3.5, 3.55,
|
||||
3.6 , 3.65 , 3.7 , 3.75, 3.8, 3.85, 3.9, 3.95,
|
||||
4. , 4.05 , 4.1 , 4.15, 4.2, 4.25, 4.3, 4.35,
|
||||
4.4 , 4.45 , 4.5 , 4.55, 4.6, 4.65, 4.7, 4.75,
|
||||
4.8 , 4.85 , 4.9 , 4.95, ]
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import scipy as sp
|
||||
import matplotlib.pyplot as plt
|
||||
df = pd.DataFrame(np.vstack((timestamps, readings)).T)
|
||||
df.columns = ["Time [s]", "Voltage [V]"]
|
||||
print(df)
|
||||
df.to_csv("test.csv", sep=",", header=True, index=False)
|
||||
|
||||
df.plot(x='Time [s]', y="Voltage [V]")
|
||||
plt.show()
|
||||
|
33
k-teng/testcurve.py
Normal file
33
k-teng/testcurve.py
Normal file
@ -0,0 +1,33 @@
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
def testcurve(x, frequency=10, peak_width=2, amplitude=20, bias=0):
|
||||
# want peak at n*time == frequency
|
||||
nearest_peak = np.round(x / frequency, 0)
|
||||
# if not peak at 0 and within peak_width
|
||||
# print(x, nearest_peak)
|
||||
if nearest_peak > 0 and abs((x - nearest_peak * frequency)) < peak_width:
|
||||
# return sin that does one period within 2*peak_width
|
||||
return amplitude * np.sin(2*np.pi * (x - nearest_peak * frequency - peak_width) / (2*peak_width)) + bias
|
||||
else:
|
||||
return bias
|
||||
|
||||
# 0 = pk - width
|
||||
# 2pi = pk + width
|
||||
|
||||
def baseline(data):
|
||||
# find the value where the most values with low gradients are closest to
|
||||
gradients = np.abs(np.gradient(data))
|
||||
# consider the values where the absolute gradient is in the bottom 20%
|
||||
n_gradients = len(data) // 20
|
||||
consider_indices = np.argsort(gradients)[:n_gradients]
|
||||
# of those, only consider values where the value
|
||||
consider_values = data[consider_indices]
|
||||
|
||||
|
||||
xdata = np.arange(0, 100, 0.01)
|
||||
ydata = np.vectorize(testcurve)(xdata)
|
||||
|
||||
|
||||
plt.plot(xdata, ydata)
|
||||
plt.show()
|
38
k-teng/utility/testing.py
Normal file
38
k-teng/utility/testing.py
Normal file
@ -0,0 +1,38 @@
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
def collect_buffer(instr, buffer_nr=1):
|
||||
"""
|
||||
Get the buffer as 2D - np.array
|
||||
@param instr : pyvisa instrument
|
||||
@param buffer_nr : 1 or 2, for smua.nvbuffer1 or 2
|
||||
@returns 2D numpy array:
|
||||
i - ith reading:
|
||||
0: timestamps
|
||||
1: readings
|
||||
"""
|
||||
if buffer_nr == 2: buffername = "smua.nvbuffer2"
|
||||
else: buffername = "smua.nvbuffer1"
|
||||
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
|
||||
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
|
||||
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
|
||||
timestamps = instr.query_ascii_values(f"printbuffer(1, {buffername}.n, {buffername}.timestamps)", container=np.array)
|
||||
readings = instr.query_ascii_values(f"printbuffer(1, {buffername}.n, {buffername}.readings)", container=np.array)
|
||||
print(f"readings: {readings}, \ntimestamps: {timestamps}")
|
||||
buffer = np.vstack((timestamps, readings)).T
|
||||
return buffer
|
||||
|
||||
|
||||
def testcurve(x, frequency=10, peak_width=2, amplitude=20, bias=0):
|
||||
# want peak at n*time == frequency
|
||||
nearest_peak = np.round(x / frequency, 0)
|
||||
# if not peak at 0 and within peak_width
|
||||
if nearest_peak > 0 and abs((x - nearest_peak * frequency)) < peak_width:
|
||||
# return sin that does one period within 2*peak_width
|
||||
return amplitude * np.sin(2*np.pi * (x - nearest_peak * frequency - peak_width) / (2*peak_width)) + bias
|
||||
else:
|
||||
return bias
|
||||
|
||||
# 0 = pk - width
|
||||
# 2pi = pk + width
|
@ -1,155 +0,0 @@
|
||||
import bleak as b
|
||||
import asyncio
|
||||
import numpy as np
|
||||
|
||||
TARGET_NAME = "ArduinoTENG"
|
||||
|
||||
# GATT service and characteristics UUIDs
|
||||
TENG_SUUID = "00010000-9a74-4b30-9361-4a16ec09930f"
|
||||
TENG_STATUS_CUUID = "00010001-9a74-4b30-9361-4a16ec09930f"
|
||||
TENG_COMMAND_CUUID = "00010002-9a74-4b30-9361-4a16ec09930f"
|
||||
TENG_READING_CUUID = "00010003-9a74-4b30-9361-4a16ec09930f"
|
||||
TENG_COUNT_CUUID = "00010004-9a74-4b30-9361-4a16ec09930f"
|
||||
TENG_INTERVAL_CUUID = "00010005-9a74-4b30-9361-4a16ec09930f"
|
||||
|
||||
TENG_COMMANDS = {
|
||||
"STOP": int(0).to_bytes(1, signed=False),
|
||||
"MEASURE_COUNT": int(1).to_bytes(1, signed=False),
|
||||
"MEASURE": int(2).to_bytes(1, signed=False),
|
||||
}
|
||||
TENG_STATUS = ["ERROR", "BUSY", "WAIT_CONNECT", "CONNECTED", "MEASURING"]
|
||||
|
||||
# TODO save measurements on device buffer, transfer later
|
||||
|
||||
|
||||
# wrapper for global variable
|
||||
class Buffer:
|
||||
def __init__(self):
|
||||
self.data = None
|
||||
_buffer = Buffer()
|
||||
|
||||
# class Runner:
|
||||
# def __init__(self):
|
||||
runner = asyncio.Runner()
|
||||
|
||||
def teng_status_callback(characteristic, data):
|
||||
value = int.from_bytes(data, byteorder="big", signed=False)
|
||||
if 0 <= value and value < len(TENG_STATUS):
|
||||
print(f"Status change: {TENG_STATUS[value]}")
|
||||
else:
|
||||
print(f"Status change (invalid): status={value}")
|
||||
|
||||
|
||||
def disconnect_callback(client):
|
||||
raise Exception(f"The Bluetooth device {client.name} was disconnected")
|
||||
|
||||
|
||||
async def init_arduino_async(n_tries: int=5) -> b.BleakClient:
|
||||
n_try = 0
|
||||
if n_tries <= 0: n_tries = "inf"
|
||||
try:
|
||||
target_device = None
|
||||
while target_device is None and (n_tries == "inf" or n_try < n_tries):
|
||||
print(f"Searching for Bluetooth device '{TARGET_NAME}' ({n_try+1}/{n_tries})", end="\r")
|
||||
devices = await b.BleakScanner.discover(return_adv=True, timeout=1.5)
|
||||
# print(devices)
|
||||
for adr, (device, adv_data) in devices.items():
|
||||
if device.name == TARGET_NAME:
|
||||
# print(adv_data)
|
||||
target_device = device
|
||||
break
|
||||
n_try += 1
|
||||
if target_device is None:
|
||||
raise Exception(f"Could not find Bluetooth device 'ArduinoTENG'")
|
||||
# print(f"Found target device: {target_device.name}: {target_device.metadata}, {target_device.details}")
|
||||
# print(target_device.name, target_device.details)
|
||||
client = b.BleakClient(target_device, disconnect_callback=disconnect_callback)
|
||||
await client.connect()
|
||||
print(f"Connected to Bluetooth device '{TARGET_NAME}' at [{client.address}]")
|
||||
return client
|
||||
except asyncio.exceptions.CancelledError:
|
||||
raise Exception(f"Cancelled")
|
||||
|
||||
|
||||
def init(beep_success=True, n_tries: int=5) -> b.BleakClient:
|
||||
"""
|
||||
Connect to the arduino
|
||||
@returns: BleakClient
|
||||
"""
|
||||
client = runner.run(init_arduino_async(n_tries=n_tries))
|
||||
if beep_success: beep(client)
|
||||
return client
|
||||
|
||||
|
||||
def exit(client):
|
||||
try:
|
||||
runner.run(stop_measurement(client))
|
||||
runner.run(client.disconnect())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
async def set_interval(client, interval: float):
|
||||
"""
|
||||
Set the measurement interval
|
||||
@param interval: interval in seconds
|
||||
"""
|
||||
interval = int(interval * 1000) # convert to ms for arduinos delay)
|
||||
await client.write_gatt_char(TENG_INTERVAL_CUUID, interval.to_bytes(2, byteorder="little", signed=False))
|
||||
|
||||
async def set_count(client, count: int):
|
||||
"""
|
||||
Set the measurement count
|
||||
@param count: number of measurements to take
|
||||
"""
|
||||
await client.write_gatt_char(TENG_COUNT_CUUID, count.to_bytes(2, byteorder="little", signed=False))
|
||||
|
||||
async def stop_measurement(client):
|
||||
await client.write_gatt_char(TENG_COMMAND_CUUID, TENG_COMMANDS["STOP"])
|
||||
|
||||
async def start_measure_count(client):
|
||||
await client.write_gatt_char(TENG_COMMAND_CUUID, TENG_COMMANDS["MEASURE_COUNT"])
|
||||
|
||||
async def start_measure(client):
|
||||
await client.write_gatt_char(TENG_COMMAND_CUUID, TENG_COMMANDS["MEASURE"])
|
||||
|
||||
|
||||
# async def main():
|
||||
# for service in client.services:
|
||||
# print(f"Service: {service.uuid}: {service.description}")
|
||||
# for c in service.characteristics:
|
||||
# print(f"\t{c.uuid}: {c.properties}, {c.descriptors}")
|
||||
# teng_status = client.services.get_characteristic(TENG_STATUS_CUUID)
|
||||
# teng_command = client.services.get_characteristic(TENG_COMMAND_CUUID)
|
||||
# teng_reading = client.services.get_characteristic(TENG_READING_CUUID)
|
||||
# client.start_notify(teng_status, teng_status_callback)
|
||||
|
||||
# await client.write_gatt_char(teng_command, TENG_COMMANDS["NOOP"])
|
||||
# await asyncio.sleep(5)
|
||||
# await client.write_gatt_char(teng_command, TENG_COMMANDS["MEASURE_BASELINE"])
|
||||
|
||||
# while client.is_connected:
|
||||
# data = await client.read_gatt_char(teng_reading)
|
||||
|
||||
|
||||
# value = int.from_bytes(data, byteorder="little", signed=False)
|
||||
# print(f"Reading: {value}")
|
||||
# await asyncio.sleep(0.5)
|
||||
# except KeyboardInterrupt:
|
||||
# pass
|
||||
# except asyncio.exceptions.CancelledError:
|
||||
# pass
|
||||
# print("Disconnected")
|
||||
|
||||
|
||||
def collect_buffer(instr, buffer_nr=1):
|
||||
"""
|
||||
@param buffer_nr: 1 -> current, 2 -> voltage
|
||||
"""
|
||||
assert(buffer_nr in (1, 2))
|
||||
return np.vstack((_buffer.data[:,0], _buffer.data[:,buffer_nr])).T
|
||||
|
||||
|
||||
def beep(client):
|
||||
# TODO connect beeper to arduino?
|
||||
print("beep")
|
@ -1,75 +0,0 @@
|
||||
import bleak as b
|
||||
import numpy as np
|
||||
|
||||
import asyncio
|
||||
import datetime
|
||||
|
||||
from m_teng.backends.arduino.arduino import beep, set_interval, set_count, TENG_READING_CUUID, _buffer, start_measure, start_measure_count, stop_measurement, runner
|
||||
|
||||
|
||||
async def _measure_count_async(client, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True):
|
||||
global _buffer
|
||||
_buffer.data = np.zeros((count, 3))
|
||||
i = 0
|
||||
t_start = datetime.datetime.now()
|
||||
async def add_reading(teng_reading_cr, reading: bytearray):
|
||||
nonlocal i, count
|
||||
if i >= count: return
|
||||
_buffer.data[i][0] = float((datetime.datetime.now() - t_start).microseconds) / 1000
|
||||
# reading = await client.read_gatt_char(TENG_READING_CUUID)
|
||||
_buffer.data[i][2] = int.from_bytes(reading, byteorder="little", signed=False)
|
||||
i += 1
|
||||
|
||||
await set_interval(client, interval)
|
||||
await set_count(client, count)
|
||||
# TODO check if notify works when the same value is written again
|
||||
await client.start_notify(TENG_READING_CUUID, add_reading)
|
||||
await start_measure_count(client)
|
||||
while i < count:
|
||||
await asyncio.sleep(update_interval)
|
||||
if update_func is not None and i > 0: # assume an update has occured
|
||||
update_func(i-1, 0, _buffer.data[i-1, 2])
|
||||
await client.stop_notify(TENG_READING_CUUID)
|
||||
if beep_done: beep(client)
|
||||
|
||||
def measure_count(client, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True):
|
||||
runner.run(_measure_count_async(client, count=count, interval=interval, update_func=update_func, update_interval=update_interval, beep_done=beep_done, verbose=verbose))
|
||||
|
||||
|
||||
async def _measure_async(client, interval, update_func=None, max_measurements=None):
|
||||
global _buffer
|
||||
readings = []
|
||||
timestamps = []
|
||||
i = 0
|
||||
t_start = datetime.datetime.now()
|
||||
|
||||
async def add_reading(teng_reading_cr, reading):
|
||||
nonlocal i
|
||||
timestamps.append(float((datetime.datetime.now() - t_start).microseconds) / 1000)
|
||||
reading = int.from_bytes(reading, byteorder="little", signed=False)
|
||||
readings.append(reading)
|
||||
|
||||
if update_func:
|
||||
try:
|
||||
update_func(i, 0, reading)
|
||||
except KeyboardInterrupt:
|
||||
raise asyncio.exceptions.CancelledError("KeyboardInterrupt in update_func")
|
||||
i += 1
|
||||
|
||||
await set_interval(client, interval)
|
||||
await client.start_notify(TENG_READING_CUUID, add_reading)
|
||||
await start_measure(client)
|
||||
try:
|
||||
while max_measurements is None or i < max_measurements:
|
||||
await asyncio.sleep(0.1) #
|
||||
except asyncio.exceptions.CancelledError:
|
||||
pass
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
await client.stop_notify(TENG_READING_CUUID)
|
||||
await stop_measurement(client)
|
||||
_buffer.data = np.vstack((timestamps, np.zeros(len(timestamps)), readings)).T
|
||||
print("Measurement stopped" + " "*50)
|
||||
|
||||
def measure(client, interval, update_func=None, max_measurements=None):
|
||||
runner.run(_measure_async(client, interval=interval, update_func=update_func, max_measurements=max_measurements))
|
@ -1,93 +0,0 @@
|
||||
from time import sleep
|
||||
import numpy as np
|
||||
from matplotlib import pyplot as plt
|
||||
import pyvisa
|
||||
|
||||
from m_teng.backends.keithley import reset
|
||||
from m_teng.utility import testing as _testing
|
||||
|
||||
def measure_count(instr, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True):
|
||||
"""
|
||||
Take <count> measurements with <interval> inbetween
|
||||
|
||||
@details
|
||||
Uses the devices overlappedY function to make the measurements asynchronosly
|
||||
The update_func is optional and only used when I == True and V == True
|
||||
The update_func does not necessarily get all the values that are measured. To obtain the whole measurement, get them from the device buffers (smua.nvbufferX)
|
||||
@param instr: pyvisa instrument
|
||||
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
|
||||
@param update_interval: interval at which the update_func is called
|
||||
"""
|
||||
f_meas = "smua.measure.overlappediv(smua.nvbuffer1, smua.nvbuffer2)"
|
||||
# if V and I:
|
||||
# elif V:
|
||||
# f_meas = "smua.measure.overlappedv(smua.nvbuffer1)"
|
||||
# elif I:
|
||||
# f_meas = "smua.measure.overlappedi(smua.nvbuffer1)"
|
||||
# else:
|
||||
# print("I and/or V needs to be set to True")
|
||||
# return
|
||||
|
||||
i = 0
|
||||
reset(instr, verbose=verbose)
|
||||
instr.write(f"smua.measure.count = {count}")
|
||||
instr.write(f"smua.measure.interval = {interval}")
|
||||
|
||||
# start measurement
|
||||
instr.write(f"smua.source.output = smua.OUTPUT_ON")
|
||||
instr.write(f_meas)
|
||||
|
||||
sleep(update_interval)
|
||||
# for live viewing
|
||||
query = """if smua.nvbufferX.n > 0 then print(smua.nvbufferX.readings[smua.nvbufferX.n]) else print(0) end"""
|
||||
|
||||
# will return 2.0 while measruing
|
||||
while float(instr.query("print(status.operation.measuring.condition)").strip("\n ")) != 0:
|
||||
if update_func:
|
||||
try:
|
||||
ival = float(instr.query(query.replace("X", "1")).strip("\n"))
|
||||
vval = float(instr.query(query.replace("X", "2")).strip("\n"))
|
||||
update_func(i, ival, vval)
|
||||
except ValueError as e:
|
||||
if i != 0:
|
||||
pass
|
||||
else:
|
||||
print(f"measure_count: ValueError: {e}")
|
||||
sleep(update_interval)
|
||||
i += 1
|
||||
|
||||
instr.write(f"smua.source.output = smua.OUTPUT_OFF")
|
||||
|
||||
if beep_done:
|
||||
instr.write("beeper.beep(0.3, 1000)")
|
||||
|
||||
|
||||
def measure(instr, interval, update_func=None, max_measurements=None):
|
||||
"""
|
||||
@details:
|
||||
- Resets the buffers
|
||||
- Until KeyboardInterrupt:
|
||||
- Take measurement
|
||||
- Call update_func
|
||||
- Wait interval
|
||||
Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision
|
||||
You can take the data from the buffer afterwards, using save_csv
|
||||
@param instr: pyvisa instrument
|
||||
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
|
||||
@param max_measurements : maximum number of measurements. None means infinite
|
||||
"""
|
||||
reset(instr, verbose=True)
|
||||
instr.write("smua.source.output = smua.OUTPUT_ON")
|
||||
instr.write("format.data = format.ASCII\nformat.asciiprecision = 12")
|
||||
try:
|
||||
i = 0
|
||||
while max_measurements is None or i < max_measurements:
|
||||
ival, vval = tuple(float(v) for v in instr.query("print(smua.measure.iv(smua.nvbuffer1, smua.nvbuffer2))").strip('\n').split('\t'))
|
||||
if update_func:
|
||||
update_func(i, ival, vval)
|
||||
sleep(interval)
|
||||
i += 1
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
instr.write("smua.source.output = smua.OUTPUT_OFF")
|
||||
print("Measurement stopped" + " "*50)
|
@ -1,123 +0,0 @@
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
|
||||
import torch
|
||||
|
||||
from teng_ml.util import model_io as mio
|
||||
from teng_ml.util.settings import MLSettings
|
||||
from teng_ml.util.split import DataSplitter
|
||||
|
||||
from m_teng.backends.keithley import keithley
|
||||
|
||||
def _update_print(i, ival, vval):
|
||||
print(f"n = {i:5d}, I = {ival: .12f} A, U = {vval: .5f} V" + " "*10, end='\r')
|
||||
|
||||
class _Monitor:
|
||||
"""
|
||||
Monitor v and i data
|
||||
"""
|
||||
def __init__(self, max_points_shown=None, use_print=False):
|
||||
self.max_points_shown = max_points_shown
|
||||
self.use_print = use_print
|
||||
self.index = []
|
||||
self.vdata = []
|
||||
self.idata = []
|
||||
|
||||
plt.ion()
|
||||
self.fig1, (self.vax, self.iax) = plt.subplots(2, 1, figsize=(8, 5))
|
||||
|
||||
self.vline, = self.vax.plot(self.index, self.vdata, color="g")
|
||||
self.vax.set_ylabel("Voltage [V]")
|
||||
self.vax.grid(True)
|
||||
|
||||
self.iline, = self.iax.plot(self.index, self.idata, color="m")
|
||||
self.iax.set_ylabel("Current [A]")
|
||||
self.iax.grid(True)
|
||||
|
||||
def update(self, i, ival, vval):
|
||||
if self.use_print:
|
||||
_update_print(i, ival, vval)
|
||||
self.index.append(i)
|
||||
self.idata.append(ival)
|
||||
self.vdata.append(vval)
|
||||
# update data
|
||||
self.iline.set_xdata(self.index)
|
||||
self.iline.set_ydata(self.idata)
|
||||
self.vline.set_xdata(self.index)
|
||||
self.vline.set_ydata(self.vdata)
|
||||
# recalculate limits and set them for the view
|
||||
self.iax.relim()
|
||||
self.vax.relim()
|
||||
if self.max_points_shown and i > self.max_points_shown:
|
||||
self.iax.set_xlim(i - self.max_points_shown, i)
|
||||
self.vax.set_xlim(i - self.max_points_shown, i)
|
||||
self.iax.autoscale_view()
|
||||
self.vax.autoscale_view()
|
||||
# update plot
|
||||
self.fig1.canvas.draw()
|
||||
self.fig1.canvas.flush_events()
|
||||
|
||||
def __del__(self):
|
||||
plt.close(self.fig1)
|
||||
|
||||
|
||||
class _ModelPredict:
|
||||
colors = ["red", "green", "purple", "blue", "orange", "grey", "cyan"]
|
||||
def __init__(self, instr, model_dir):
|
||||
"""
|
||||
@param model_dir: directory where model.plk and settings.pkl are stored
|
||||
|
||||
Predict the values that are currently being recorded
|
||||
@details:
|
||||
Load the model and model settings from model dir
|
||||
Wait until the number of recoreded points is >= the size of the models DataSplitter
|
||||
Collect the data from the keithley, apply the transforms and predict the label with the model
|
||||
Shows the prediction with a bar plot
|
||||
"""
|
||||
self.instr = instr
|
||||
self.model = mio.load_model(model_dir)
|
||||
self.model_settings: MLSettings = mio.load_settings(model_dir)
|
||||
if type(self.model_settings.splitter) == DataSplitter:
|
||||
self.data_length = self.model_settings.splitter.split_size
|
||||
else:
|
||||
self.data_length = 200
|
||||
|
||||
plt.ion()
|
||||
self.fig1, (self.ax) = plt.subplots(1, 1, figsize=(8, 5))
|
||||
|
||||
self.bar_cont = self.ax.bar(self.model_settings.labels.get_labels(), [ 1 for _ in range(len(self.model_settings.labels))])
|
||||
self.ax.set_ylabel("Prediction")
|
||||
self.ax.grid(True)
|
||||
|
||||
def update(self, i, ival, vval):
|
||||
buffer_size = keithley.get_buffer_size(self.instr, buffer_nr=1)
|
||||
if buffer_size <= self.data_length:
|
||||
print(f"ModelPredict.update: buffer_size={buffer_size} < {self.data_length}")
|
||||
return
|
||||
else:
|
||||
ibuffer = keithley.collect_buffer_range(self.instr, (buffer_size-self.data_length, buffer_size), buffer_nr=1)
|
||||
vbuffer = keithley.collect_buffer_range(self.instr, (buffer_size-self.data_length, buffer_size), buffer_nr=2)
|
||||
if self.model_settings.num_features == 1: # model uses only voltage
|
||||
data = np.vstack((ibuffer[:,0], ibuffer[:,1], vbuffer[:,1])).T
|
||||
# print(f"data.shape:", data.shape)
|
||||
else:
|
||||
raise NotImplementedError(f"Cant handle models with num_features != 1 yet")
|
||||
for t in self.model_settings.transforms:
|
||||
data = t(data)
|
||||
data = np.reshape(data[:,2], (1, -1, 1)) # batch_size, seq, features
|
||||
with torch.no_grad():
|
||||
x = torch.FloatTensor(data) # select voltage data, without timestamps
|
||||
# print(x.shape)
|
||||
|
||||
prediction = self.model(x) # (batch_size, label-predictions)
|
||||
prediction = torch.nn.functional.softmax(prediction) # TODO remove when softmax is already applied by model
|
||||
predicted = torch.argmax(prediction, dim=1, keepdim=False) # -> [ label_indices ]
|
||||
# print(f"raw={prediction[0]}, predicted_index={predicted[0}")
|
||||
label = self.model_settings.labels[predicted[0]]
|
||||
# print(f"-> label={label}")
|
||||
|
||||
self.bar_cont.remove()
|
||||
self.bar_cont = self.ax.bar(self.model_settings.labels.get_labels(), prediction[0], color=_ModelPredict.colors[:len(self.model_settings.labels)])
|
||||
# update plot
|
||||
self.fig1.canvas.draw()
|
||||
self.fig1.canvas.flush_events()
|
@ -1,17 +0,0 @@
|
||||
import numpy as np
|
||||
|
||||
def testcurve(x, frequency=10, peak_width=2, amplitude=20, bias=0):
|
||||
# 0 = pk - width
|
||||
# 2pi = pk + width
|
||||
# want peak at n*time == frequency
|
||||
nearest_peak = np.round(x / frequency, 0)
|
||||
# if not peak at 0 and within peak_width
|
||||
if nearest_peak > 0 and np.abs((x - nearest_peak * frequency)) < peak_width:
|
||||
# return sin that does one period within 2*peak_width
|
||||
return amplitude * np.sin(2*np.pi * (x - nearest_peak * frequency - peak_width) / (2*peak_width)) + bias
|
||||
else:
|
||||
return bias
|
||||
|
||||
def get_testcurve(frequency=10, peak_width=2, amplitude=20, bias=0):
|
||||
return np.vectorize(lambda x: testcurve(x, frequency=frequency, peak_width=peak_width, amplitude=amplitude, bias=bias))
|
||||
|
419
measurement.ipynb
Normal file
419
measurement.ipynb
Normal file
File diff suppressed because one or more lines are too long
@ -1,35 +0,0 @@
|
||||
[build-system]
|
||||
requires = ["setuptools"]
|
||||
|
||||
[project]
|
||||
name = "k_teng"
|
||||
version = "1.0.0"
|
||||
description = "Interactive utility for I-V measurements with a Keitley 2600 SMU"
|
||||
requires-python = ">=3.10"
|
||||
readme = "readme.md"
|
||||
license = {file = "LICENSE"}
|
||||
authors = [
|
||||
{ name = "Matthias Quintern", email = "matthias@quintern.xyz" }
|
||||
]
|
||||
classifiers = [
|
||||
"Operating System :: POSIX :: Linux",
|
||||
"Environment :: Console",
|
||||
"Programming Language :: Python :: 3",
|
||||
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
|
||||
]
|
||||
dependencies = [
|
||||
"matplotlib>=3.6",
|
||||
"numpy",
|
||||
"pyvisa",
|
||||
"pyvisa-py"
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
repository = "https://git.quintern.xyz/MatthiasQuintern/k-teng"
|
||||
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["."]
|
||||
|
||||
[project.scripts]
|
||||
k-teng = "k_teng.k_teng_interactive:main"
|
38
readme.md
38
readme.md
@ -1,39 +1,23 @@
|
||||
# m-TENG
|
||||
Helper scripts and shell for measuring **T**ribo**e**lectric **N**ano**g**enerator-based sensor output with a Keithley 2611B SMU or an Arduino
|
||||
# K-TENG
|
||||
Helper scripts and shell for measuring **T**ribo**e**lectric **N**ano**g**enerator-based sensor output with a Keithley 2611B SMU using pyvisa
|
||||
|
||||
## Features
|
||||
|
||||
### Useful functions for scripts
|
||||
- Measure Voltage and/or Current
|
||||
- Transfer buffer from device to host
|
||||
- Save/load as csv
|
||||
- Run lua script on device
|
||||
- Auto-filenames
|
||||
### Interactive (shell) mode
|
||||
- Live view
|
||||
- Press button to stop
|
||||
- Save and load settings (default interval, data directory...)
|
||||
- Easily run arbitrary command on device
|
||||
|
||||
|
||||
### Useful functions for scripts
|
||||
- Measure voltage and/or current
|
||||
- Transfer buffer from measurement device to host
|
||||
- Save/load as csv
|
||||
- Run lua script on Keithley SMU
|
||||
- Auto-filenames
|
||||
|
||||
## Available backends
|
||||
### keithley
|
||||
Use a Keithley 2611B Source-Measure-Unit via *pyvisa*. This backend allows measuring both voltage and current simultaneously.
|
||||
|
||||
### arduino
|
||||
Use a Bluetooth capable Arduino with [https://git.quintern.xyz/MatthiasQuintern/teng-arduino](this software on the arduino).
|
||||
This backend only allows measuring voltage using an Arduinos analog input pin (0 - 3.3 V, 12 bit resolution).
|
||||
|
||||
### testing
|
||||
Use the shell without measuring TENG output. When starting a measurement, sample data will be generated.
|
||||
|
||||
|
||||
## Shell mode
|
||||
It is recommended to run the shell with ipython:
|
||||
Start with:
|
||||
```shell
|
||||
ipython -i k_teng_interactive.py -- -*X*
|
||||
ipython -i k_teng_interactive.py
|
||||
```
|
||||
Substitute *X* for `-k` for keithley backend, `-a` for arduino backend or `-t` for testing backend.
|
||||
|
||||
In the shell, run `help()` to get a list of available commands
|
||||
Use `help()` to get a list of available commands
|
||||
|
Loading…
Reference in New Issue
Block a user