renamed to m-teng

This commit is contained in:
Matthias@Dell 2023-06-18 17:38:10 +02:00
parent 3e79e2ea4f
commit a0f8a2a903
27 changed files with 622 additions and 787 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
__pycache__ __pycache__
.ipynb_checkpoints .ipynb_checkpoints
testing

2
MANIFEST.in Normal file
View File

@ -0,0 +1,2 @@
include regina/package-data/*
include regina/sql/*.sql

1
bspc_rule.sh Executable file
View File

@ -0,0 +1 @@
bspc rule -a matplotlib desktop=^1 state=pseudo_tiled focus=off

View File

@ -1,107 +0,0 @@
from time import sleep
import numpy as np
from matplotlib import pyplot as plt
import pyvisa
from .keithley import reset
from ..utility import testing as _testing
def measure_count(instr, V=True, I=True, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True, testing=False):
"""
Take <count> measurements with <interval> inbetween
@details
Uses the devices overlappedY function to make the measurements asynchronosly
The update_func is optional and only used when I == True and V == True
The update_func does not necessarily get all the values that are measured. To obtain the whole measurement, get them from the device buffers (smua.nvbufferX)
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param update_interval: interval at which the update_func is called
"""
f_meas = None
if V and I:
f_meas = "smua.measure.overlappediv(smua.nvbuffer1, smua.nvbuffer2)"
elif V:
f_meas = "smua.measure.overlappedv(smua.nvbuffer1)"
elif I:
f_meas = "smua.measure.overlappedi(smua.nvbuffer1)"
else:
print("I and/or V needs to be set to True")
return
i = 0
if not testing:
reset(instr, verbose=verbose)
instr.write(f"smua.measure.count = {count}")
instr.write(f"smua.measure.interval = {interval}")
# start measurement
instr.write(f"smua.source.output = smua.OUTPUT_ON")
instr.write(f_meas)
condition = lambda: float(instr.query("print(status.operation.measuring.condition)").strip("\n ")) != 0
else:
condition = lambda: i < int(float(count) * interval / update_interval)
sleep(update_interval)
# for live viewing
query = """if smua.nvbufferX.n > 0 then print(smua.nvbufferX.readings[smua.nvbufferX.n]) else print(0) end"""
# will return 2.0 while measruing
while condition():
if update_func and V and I:
try:
if not testing:
ival = float(instr.query(query.replace("X", "1")).strip("\n"))
vval = float(instr.query(query.replace("X", "2")).strip("\n"))
else:
ival = _testing.testcurve(i, peak_width=1, amplitude=5e-8)
vval = -_testing.testcurve(i, peak_width=2, amplitude=15)
update_func(i, ival, vval)
except ValueError:
pass
sleep(update_interval)
i += 1
if not testing:
instr.write(f"smua.source.output = smua.OUTPUT_OFF")
if beep_done:
instr.write("beeper.beep(0.3, 1000)")
def measure(instr, interval, update_func=None, max_measurements=None, testing=False):
"""
@details:
- Resets the buffers
- Until KeyboardInterrupt:
- Take measurement
- Call update_func
- Wait interval
Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision
You can take the data from the buffer afterwards, using save_csv
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param max_measurements : maximum number of measurements. None means infinite
"""
if not testing:
reset(instr, verbose=True)
instr.write("smua.source.output = smua.OUTPUT_ON")
instr.write("format.data = format.ASCII\nformat.asciiprecision = 12")
try:
i = 0
while max_measurements is None or i < max_measurements:
if testing:
ival = _testing.testcurve(i, peak_width=1, amplitude=5e-8)
vval = -_testing.testcurve(i, peak_width=2, amplitude=15)
else:
ival, vval = tuple(float(v) for v in instr.query("print(smua.measure.iv(smua.nvbuffer1, smua.nvbuffer2))").strip('\n').split('\t'))
if update_func:
update_func(i, ival, vval)
sleep(interval)
i += 1
except KeyboardInterrupt:
pass
if not testing:
instr.write("smua.source.output = smua.OUTPUT_OFF")
print("Measurement stopped" + " "*50)

View File

@ -1,4 +0,0 @@
pdms
kapton
plastic

View File

@ -1,49 +0,0 @@
readings = [
4.974127e-10, -1.418591e-12, -1.573563e-12, -1.728535e-12, -1.704693e-12,
-1.847744e-12, -1.931190e-12, -1.788139e-12, -1.871586e-12, -2.169609e-12,
-2.074242e-12, -1.895428e-12, -1.895428e-12, -1.776218e-12, -1.990795e-12,
-1.788139e-12, -1.811981e-12, -1.657009e-12, -1.573563e-12, -2.396107e-12,
-2.515316e-12, -2.765656e-12, -2.825260e-12, -3.147125e-12, -2.300739e-12,
-2.825260e-12, -3.278255e-12, -5.257130e-12, -6.818771e-12, -8.916855e-12,
-7.712841e-12, 6.437302e-12, -1.142025e-11, -1.206398e-11, -4.649043e-10,
-3.427613e-09, -2.460408e-09, -2.340376e-09, -1.306653e-10, 1.496077e-11,
2.933741e-11, 1.953280e-09, 8.579970e-10, 9.226799e-12, -1.095533e-11,
-2.508163e-11, -2.776039e-09, -8.686423e-09, 4.935264e-12, 1.246929e-11,
3.225744e-09, 2.814472e-09, 1.877034e-09, 2.229273e-09, 1.713574e-09,
8.355618e-10, -4.332781e-10, 5.896091e-11, 5.762577e-11, 8.129537e-09,
4.044378e-09, 1.771629e-09, 7.849216e-10, 4.098892e-10, 3.390551e-10,
2.956390e-10, 3.033876e-10, 1.716256e-10, 1.463890e-11, -5.078316e-12,
-6.949902e-12, -8.106232e-12, -6.473065e-12, -4.506111e-12, 4.919767e-11,
3.052297e-08, 1.161162e-08, -9.892106e-09, -3.613818e-09, -5.004287e-09,
-2.015829e-11, -4.183054e-11, -1.810908e-10, -2.042532e-10, -3.516316e-10,
5.099773e-11, 1.921976e-08, -1.256589e-08, -4.242897e-10, -1.358986e-12,
-3.445148e-12, -3.838539e-12, -4.184246e-12, -7.402897e-12, -2.840877e-10,
-2.872229e-10, -2.730966e-10, -1.134396e-10, -4.376173e-11, -3.576279e-14
]
timestamps = [
0. , 0.05 , 0.1 , 0.15, 0.2, 0.25, 0.3, 0.35,
0.4 , 0.45 , 0.5 , 0.55, 0.6, 0.65, 0.7, 0.75,
0.8 , 0.85 , 0.9 , 0.95, 1. , 1.05, 1.1, 1.15,
1.2 , 1.25 , 1.3 , 1.35, 1.4, 1.45, 1.5, 1.55,
1.6 , 1.690891, 1.710923, 1.75, 1.8, 1.85, 1.9, 1.95,
2. , 2.05 , 2.1 , 2.15, 2.2, 2.25, 2.3, 2.35,
2.420332, 2.45 , 2.5 , 2.55, 2.6, 2.65, 2.7, 2.75,
2.820553, 2.890843, 2.910875, 2.95, 3. , 3.05, 3.1, 3.15,
3.2 , 3.25 , 3.3 , 3.35, 3.4, 3.45, 3.5, 3.55,
3.6 , 3.65 , 3.7 , 3.75, 3.8, 3.85, 3.9, 3.95,
4. , 4.05 , 4.1 , 4.15, 4.2, 4.25, 4.3, 4.35,
4.4 , 4.45 , 4.5 , 4.55, 4.6, 4.65, 4.7, 4.75,
4.8 , 4.85 , 4.9 , 4.95, ]
import pandas as pd
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
df = pd.DataFrame(np.vstack((timestamps, readings)).T)
df.columns = ["Time [s]", "Voltage [V]"]
print(df)
df.to_csv("test.csv", sep=",", header=True, index=False)
df.plot(x='Time [s]', y="Voltage [V]")
plt.show()

View File

@ -1,33 +0,0 @@
import numpy as np
import matplotlib.pyplot as plt
def testcurve(x, frequency=10, peak_width=2, amplitude=20, bias=0):
# want peak at n*time == frequency
nearest_peak = np.round(x / frequency, 0)
# if not peak at 0 and within peak_width
# print(x, nearest_peak)
if nearest_peak > 0 and abs((x - nearest_peak * frequency)) < peak_width:
# return sin that does one period within 2*peak_width
return amplitude * np.sin(2*np.pi * (x - nearest_peak * frequency - peak_width) / (2*peak_width)) + bias
else:
return bias
# 0 = pk - width
# 2pi = pk + width
def baseline(data):
# find the value where the most values with low gradients are closest to
gradients = np.abs(np.gradient(data))
# consider the values where the absolute gradient is in the bottom 20%
n_gradients = len(data) // 20
consider_indices = np.argsort(gradients)[:n_gradients]
# of those, only consider values where the value
consider_values = data[consider_indices]
xdata = np.arange(0, 100, 0.01)
ydata = np.vectorize(testcurve)(xdata)
plt.plot(xdata, ydata)
plt.show()

View File

@ -1,38 +0,0 @@
import pandas as pd
import numpy as np
def collect_buffer(instr, buffer_nr=1):
"""
Get the buffer as 2D - np.array
@param instr : pyvisa instrument
@param buffer_nr : 1 or 2, for smua.nvbuffer1 or 2
@returns 2D numpy array:
i - ith reading:
0: timestamps
1: readings
"""
if buffer_nr == 2: buffername = "smua.nvbuffer2"
else: buffername = "smua.nvbuffer1"
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
timestamps = instr.query_ascii_values(f"printbuffer(1, {buffername}.n, {buffername}.timestamps)", container=np.array)
readings = instr.query_ascii_values(f"printbuffer(1, {buffername}.n, {buffername}.readings)", container=np.array)
print(f"readings: {readings}, \ntimestamps: {timestamps}")
buffer = np.vstack((timestamps, readings)).T
return buffer
def testcurve(x, frequency=10, peak_width=2, amplitude=20, bias=0):
# want peak at n*time == frequency
nearest_peak = np.round(x / frequency, 0)
# if not peak at 0 and within peak_width
if nearest_peak > 0 and abs((x - nearest_peak * frequency)) < peak_width:
# return sin that does one period within 2*peak_width
return amplitude * np.sin(2*np.pi * (x - nearest_peak * frequency - peak_width) / (2*peak_width)) + bias
else:
return bias
# 0 = pk - width
# 2pi = pk + width

View File

@ -0,0 +1,123 @@
import bleak as b
import asyncio
import numpy as np
TARGET_NAME = "ArduinoTENG"
# GATT service and characteristics UUIDs
TENG_SUUID = "00010000-9a74-4b30-9361-4a16ec09930f"
TENG_STATUS_CUUID = "00010001-9a74-4b30-9361-4a16ec09930f"
TENG_COMMAND_CUUID = "00010002-9a74-4b30-9361-4a16ec09930f"
TENG_READING_CUUID = "00010003-9a74-4b30-9361-4a16ec09930f"
TENG_SETTING_INTERVAL_CUUID = "00010004-9a74-4b30-9361-4a16ec09930f"
TENG_COMMANDS = {
"NOOP": int(0).to_bytes(1),
"MEASURE_BASELINE": int(1).to_bytes(1),
}
TENG_STATUS = ["ERROR", "BUSY", "WAIT_CONNECT", "MEASURING_BASELINE", "READING"]
# TODO save measurements on device buffer, transfer later
# wrapper for global variable
class Buffer:
def __init__(self):
self.data = None
_buffer = Buffer()
def teng_status_callback(characteristic, data):
value = int.from_bytes(data, byteorder="big", signed=False)
if 0 <= value and value < len(TENG_STATUS):
print(f"Status change: {TENG_STATUS[value]}")
else:
print(f"Status change (invalid): status={value}")
def disconnect_callback(client):
raise Exception(f"The bluetooth device {client.name} was disconnected")
async def init_arduino_async(n_tries: int=5) -> b.BleakClient:
n_try = 0
if n_tries <= 0: n_tries = "inf"
try:
target_device = None
while target_device is None and (n_tries == "inf" or n_try < n_tries):
print(f"Searching for bluetooth device '{TARGET_NAME}' ({n_try}/{n_tries})", end="\r")
devices = await b.BleakScanner.discover(return_adv=True)
# print(devices)
for adr, (device, adv_data) in devices.items():
if device.name == TARGET_NAME:
# print(adv_data)
target_device = device
break
n_try += 1
if target_device is None:
raise Exception(f"Could not find bluetooth device 'ArduinoTENG'")
# print(f"Found target device: {target_device.name}: {target_device.metadata}, {target_device.details}")
# print(target_device.name, target_device.details)
client = b.BleakClient(target_device, disconnect_callback=disconnect_callback)
return client
except asyncio.exceptions.CancelledError:
raise Exception(f"Cancelled")
def init(beep_success=True, n_tries: int=5) -> b.BleakClient:
"""
Connect to the arduino
@returns: BleakClient
"""
client = asyncio.run(init_arduino_async(n_tries=n_tries))
if beep_success: beep(client)
return client
def set_interval(client, interval: float):
"""
Set the measurement interval
@param interval: interval in seconds
"""
interval = int(interval * 1000) # convert to ms for arduinos delay)
await client.write_gatt_char(TENG_SETTING_INTERVAL_CUUID, interval.to_bytes(2, byteorder=LITTLEENDIAN, signed=False))
# async def main():
# for service in client.services:
# print(f"Service: {service.uuid}: {service.description}")
# for c in service.characteristics:
# print(f"\t{c.uuid}: {c.properties}, {c.descriptors}")
# teng_status = client.services.get_characteristic(TENG_STATUS_CUUID)
# teng_command = client.services.get_characteristic(TENG_COMMAND_CUUID)
# teng_reading = client.services.get_characteristic(TENG_READING_CUUID)
# client.start_notify(teng_status, teng_status_callback)
# await client.write_gatt_char(teng_command, TENG_COMMANDS["NOOP"])
# await asyncio.sleep(5)
# await client.write_gatt_char(teng_command, TENG_COMMANDS["MEASURE_BASELINE"])
# while client.is_connected:
# data = await client.read_gatt_char(teng_reading)
# value = int.from_bytes(data, byteorder="little", signed=False)
# print(f"Reading: {value}")
# await asyncio.sleep(0.5)
# except KeyboardInterrupt:
# pass
# except asyncio.exceptions.CancelledError:
# pass
# print("Disconnected")
def collect_buffer(instr, buffer_nr=1):
"""
@param buffer_nr: 1 -> current, 2 -> voltage
"""
assert(buffer_nr in (1, 2))
return np.vstack((_buffer.data[:,0], _buffer._data[:,buffer_nr])).T
def beep(client):
# TODO connect beeper to arduino?
print(beep)

View File

@ -0,0 +1,59 @@
import bleak as b
import numpy as np
import asyncio
import datetime
from .arduino.arduino import beep, set_interval, TENG_READING_CUUID, _buffer
# equivalent to internal keithley buffer: write to this value and collect afterwards
def measure_count(client, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True, testing=False):
global _buffer
_buffer.data = np.zeros((count, 3))
i = 0
t_start = datetime.datetime.now()
async def add_buffer.data(client):
if i >= count: return
_buffer.data[i][0] = float((datetime.datetime.now() - t_start).microseconds) / 1000
reading = await client.read_gatt_char(TENG_READING_CUUID)
_buffer.data[i][2] = int.from_bytes(reading, byteorder=LITTLEENDIAN, signed=False).
i += 1
set_interval(client, interval)
# TODO check if notify works when the same value is written again
client.start_notify(TENG_READING_CUUID, add_buffer.data)
while i < count:
asyncio.sleep(update_interval)
if update_func is not None: # assume an update has occured
update_func(i, 0, _buffer.data[i, 2])
if beep_done: beep(client)
def measure(client, interval, update_func=None, max_measurements=None, testing=False):
global _buffer
_buffer.data = np.zeros((count, 3))
i = 0
t_start = datetime.datetime.now()
async def add_buffer.data(client):
if i >= count: return
_buffer.data[i][0] = datetime.datetime.now() - t_start
vval = await client.read_gatt_char(TENG_READING_CUUID)
vval = int.from_bytes(reading, byteorder=LITTLEENDIAN, signed=False).
_buffer.data[i][2] = vval
if update_func:
update_func(i, 0, vval)
i += 1
set_interval(client, interval)
client.start_notify(TENG_READING_CUUID, add_buffer.data)
try:
while max_measurements is None or i < max_measurements:
asyncio.sleep(interval / 2) #
except asyncio.exceptions.CancelledError:
pass
print("Measurement stopped" + " "*50)

View File

View File

@ -1,20 +1,19 @@
import pyvisa import pyvisa
import numpy as np import numpy as np
import pkg_resources
""" """
Utility Utility
""" """
script_dir = "../scripts/"
scripts = { scripts = {
"buffer_reset": "buffer_reset.lua", "buffer_reset": pkg_resources.resource_filename("m_teng", "keithley_scripts/buffer_reset.lua"),
"smua_reset": "smua_reset.lua", "smua_reset": pkg_resources.resource_filename("m_teng", "keithley_scripts/smua_reset.lua"),
} }
for key,val in scripts.items():
scripts[key] = script_dir + scripts[key]
def init_keithley(beep_success=True): def init(beep_success=True):
rm = pyvisa.ResourceManager('@py') rm = pyvisa.ResourceManager('@py')
resources = rm.list_resources() resources = rm.list_resources()
if len(resources) < 1: if len(resources) < 1:
@ -52,6 +51,15 @@ def reset(instr, verbose=False):
run_lua(instr, scripts["smua_reset"], verbose=verbose) run_lua(instr, scripts["smua_reset"], verbose=verbose)
run_lua(instr, scripts["buffer_reset"], verbose=verbose) run_lua(instr, scripts["buffer_reset"], verbose=verbose)
def get_buffer_name(buffer_nr: int):
if buffer_nr == 2: return "smua.nvbuffer2"
elif buffer_nr == 1: return "smua.nvbuffer1"
raise ValueError(f"Invalid buffer_nr: {buffer_nr}")
def get_buffer_size(instr, buffer_nr=1):
n = instr.query(f"print({get_buffer_name(buffer_nr)}.n)").strip("\n")
return int(float(n))
def collect_buffer(instr, buffer_nr=1, verbose=False): def collect_buffer(instr, buffer_nr=1, verbose=False):
""" """
@ -63,8 +71,7 @@ def collect_buffer(instr, buffer_nr=1, verbose=False):
0: timestamps 0: timestamps
1: readings 1: readings
""" """
if buffer_nr == 2: buffername = "smua.nvbuffer2" buffername = get_buffer_name(buffer_nr)
else: buffername = "smua.nvbuffer1"
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN") # instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array) # buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7") instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
@ -75,3 +82,30 @@ def collect_buffer(instr, buffer_nr=1, verbose=False):
buffer = np.vstack((timestamps, readings)).T buffer = np.vstack((timestamps, readings)).T
return buffer return buffer
def collect_buffer_range(instr, range_=(1, -1), buffer_nr=1, verbose=False):
"""
Get the buffer as 2D - np.array
@param instr : pyvisa instrument
@param buffer_nr : 1 or 2, for smua.nvbuffer1 or 2
@returns 2D numpy array:
i - ith reading:
0: timestamps
1: readings
"""
buffername = get_buffer_name(buffer_nr)
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
if range_[1] == -1:
range_ = (range_[0], f"{buffername}.n")
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
timestamps = instr.query_ascii_values(f"printbuffer({range_[0]}, {range_[1]}, {buffername}.timestamps)", container=np.array)
readings = instr.query_ascii_values(f"printbuffer({range_[0]}, {range_[1]}, {buffername}.readings)", container=np.array)
if verbose:
print(f"readings from {buffername}: {readings}, \ntimestamps: {timestamps}")
buffer = np.vstack((timestamps, readings)).T
return buffer

View File

@ -0,0 +1,93 @@
from time import sleep
import numpy as np
from matplotlib import pyplot as plt
import pyvisa
from .keithley import reset
from ..utility import testing as _testing
def measure_count(instr, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True):
"""
Take <count> measurements with <interval> inbetween
@details
Uses the devices overlappedY function to make the measurements asynchronosly
The update_func is optional and only used when I == True and V == True
The update_func does not necessarily get all the values that are measured. To obtain the whole measurement, get them from the device buffers (smua.nvbufferX)
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param update_interval: interval at which the update_func is called
"""
f_meas = "smua.measure.overlappediv(smua.nvbuffer1, smua.nvbuffer2)"
# if V and I:
# elif V:
# f_meas = "smua.measure.overlappedv(smua.nvbuffer1)"
# elif I:
# f_meas = "smua.measure.overlappedi(smua.nvbuffer1)"
# else:
# print("I and/or V needs to be set to True")
# return
i = 0
reset(instr, verbose=verbose)
instr.write(f"smua.measure.count = {count}")
instr.write(f"smua.measure.interval = {interval}")
# start measurement
instr.write(f"smua.source.output = smua.OUTPUT_ON")
instr.write(f_meas)
sleep(update_interval)
# for live viewing
query = """if smua.nvbufferX.n > 0 then print(smua.nvbufferX.readings[smua.nvbufferX.n]) else print(0) end"""
# will return 2.0 while measruing
while float(instr.query("print(status.operation.measuring.condition)").strip("\n ")) != 0:
if update_func:
try:
ival = float(instr.query(query.replace("X", "1")).strip("\n"))
vval = float(instr.query(query.replace("X", "2")).strip("\n"))
update_func(i, ival, vval)
except ValueError as e:
if i != 0:
pass
else:
print(f"measure_count: ValueError: {e}")
sleep(update_interval)
i += 1
instr.write(f"smua.source.output = smua.OUTPUT_OFF")
if beep_done:
instr.write("beeper.beep(0.3, 1000)")
def measure(instr, interval, update_func=None, max_measurements=None):
"""
@details:
- Resets the buffers
- Until KeyboardInterrupt:
- Take measurement
- Call update_func
- Wait interval
Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision
You can take the data from the buffer afterwards, using save_csv
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param max_measurements : maximum number of measurements. None means infinite
"""
reset(instr, verbose=True)
instr.write("smua.source.output = smua.OUTPUT_ON")
instr.write("format.data = format.ASCII\nformat.asciiprecision = 12")
try:
i = 0
while max_measurements is None or i < max_measurements:
ival, vval = tuple(float(v) for v in instr.query("print(smua.measure.iv(smua.nvbuffer1, smua.nvbuffer2))").strip('\n').split('\t'))
if update_func:
update_func(i, ival, vval)
sleep(interval)
i += 1
except KeyboardInterrupt:
pass
instr.write("smua.source.output = smua.OUTPUT_OFF")
print("Measurement stopped" + " "*50)

View File

@ -1,6 +1,6 @@
""" """
run this before using this library: run this before using this library:
ipython -i k_teng_interactive.py ipython -i m_teng_interactive.py
always records iv-t curves always records iv-t curves
i-data -> smua.nvbuffer1 i-data -> smua.nvbuffer1
@ -18,21 +18,54 @@ from os import path, makedirs
import pickle as pkl import pickle as pkl
import json import json
import argparse
if __name__ == "__main__": if __name__ == "__main__":
import sys
if __package__ is None: if __package__ is None:
# make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change # make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change
__package__ = "k-teng" __package__ = "m_teng"
import sys
from os import path from os import path
filepath = path.realpath(path.abspath(__file__)) filepath = path.realpath(path.abspath(__file__))
sys.path.insert(0, path.dirname(path.dirname(filepath))) sys.path.insert(0, path.dirname(path.dirname(filepath)))
parser = argparse.ArgumentParser(
prog="m-teng",
description="measure triboelectric nanogenerator output using a Keithley SMU or an Arduino",
)
backend_group = parser.add_mutually_exclusive_group(required=True)
backend_group.add_argument("-k", "--keithley", action="store_true")
backend_group.add_argument("-a", "--arduino", action="store_true")
backend_group.add_argument("-t", "--testing", action='store_true')
parser.add_argument("-c", "--config", action="store", help="alternate path to config file")
args = vars(parser.parse_args())
i = 1
while i < len(sys.argv):
if args["keithley"]:
import m_teng.backends.keithley.keithley as _backend
import m_teng.backends.keithley.measure as _measure
elif args["arduino"]:
import m_teng.backends.arduino.arduino as _backend
import m_teng.backends.arduino.measure as _measure
elif args["testing"]:
import m_teng.backends.testing.testing as _backend
import m_teng.backends.testing.measure as _measure
elif sys.argv[i] in ["-c", "--config"]:
if i+1 < len(sys.argv):
config_path = sys.argv[i+1]
else:
print("-c requires an extra argument: path of config file")
i += 1
i += 1
from .keithley import keithley as _keithley from m_teng.utility import data as _data
from .keithley.measure import measure_count as _measure_count, measure as _measure from m_teng.utility.data import load_dataframe
from .utility import data as _data from m_teng.utility import file_io
from .utility.data import load_dataframe from m_teng.update_funcs import _Monitor, _ModelPredict, _update_print
from .utility import file_io
config_path = path.expanduser("~/.config/k-teng.json")
_runtime_vars = { _runtime_vars = {
"last-measurement": "" "last-measurement": ""
@ -44,66 +77,35 @@ settings = {
"interval": 0.02, "interval": 0.02,
"beep": True, "beep": True,
} }
config_path = path.expanduser("~/.config/k-teng.json")
test = False test = False
# global variable for the instrument returned by pyvisa # global variable for the instrument/client returned by pyvisa/bleak
k = None dev = None
def monitor_predict(model_dir: str, count=5000, interval=settings["interval"], max_points_shown=160):
def _update_print(i, ival, vval):
print(f"n = {i:5d}, I = {ival: .12f} A, U = {vval: .5f} V" + " "*10, end='\r')
class _Monitor:
""" """
Monitor v and i data Take <count> measurements in <interval> and predict with a machine learning model
""" """
def __init__(self, max_points_shown=None, use_print=False): model_predict = _ModelPredict(dev, model_dir)
self.max_points_shown = max_points_shown plt_monitor = _Monitor(max_points_shown, use_print=False)
self.use_print = use_print skip_n = 0
self.index = [] def update(i, ival, vval):
self.vdata = [] plt_monitor.update(i, ival, vval)
self.idata = [] if skip_n % 10 == 0:
model_predict.update(i, ival, vval)
plt.ion() skip_n += 1
self.fig1, (self.vax, self.iax) = plt.subplots(2, 1, figsize=(8, 5))
self.vline, = self.vax.plot(self.index, self.vdata, color="g")
self.vax.set_ylabel("Voltage [V]")
self.vax.grid(True)
self.iline, = self.iax.plot(self.index, self.idata, color="m")
self.iax.set_ylabel("Current [A]")
self.iax.grid(True)
def update(self, i, ival, vval):
if self.use_print:
_update_print(i, ival, vval)
self.index.append(i)
self.idata.append(ival)
self.vdata.append(vval)
# update data
self.iline.set_xdata(self.index)
self.iline.set_ydata(self.idata)
self.vline.set_xdata(self.index)
self.vline.set_ydata(self.vdata)
# recalculate limits and set them for the view
self.iax.relim()
self.vax.relim()
if self.max_points_shown and i > self.max_points_shown:
self.iax.set_xlim(i - self.max_points_shown, i)
self.vax.set_xlim(i - self.max_points_shown, i)
self.iax.autoscale_view()
self.vax.autoscale_view()
# update plot
self.fig1.canvas.draw()
self.fig1.canvas.flush_events()
def __del__(self):
plt.close(self.fig1)
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
try:
_measure.measure_count(dev, V=True, I=True, count=count, interval=interval, beep_done=False, verbose=False, update_func=update, update_interval=0.1)
except KeyboardInterrupt:
if args["keithley"]:
dev.write(f"smua.source.output = smua.OUTPUT_OFF")
print("Monitoring cancelled, measurement might still continue" + " "*50)
else:
print("Measurement finished" + " "*50)
def monitor_count(count=5000, interval=settings["interval"], max_points_shown=160): def monitor_count(count=5000, interval=settings["interval"], max_points_shown=160):
""" """
@ -123,10 +125,10 @@ def monitor_count(count=5000, interval=settings["interval"], max_points_shown=16
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.") print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
try: try:
_measure_count(k, V=True, I=True, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05, testing=test) _measure.measure_count(dev, V=True, I=True, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05)
except KeyboardInterrupt: except KeyboardInterrupt:
if not test: if args["keithley"]:
k.write(f"smua.source.output = smua.OUTPUT_OFF") dev.write(f"smua.source.output = smua.OUTPUT_OFF")
print("Monitoring cancelled, measurement might still continue" + " "*50) print("Monitoring cancelled, measurement might still continue" + " "*50)
else: else:
print("Measurement finished" + " "*50) print("Measurement finished" + " "*50)
@ -147,10 +149,10 @@ def measure_count(count=5000, interval=settings["interval"]):
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.") print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
try: try:
_measure_count(k, V=True, I=True, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05, testing=test) _measure.measure_count(dev, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05)
except KeyboardInterrupt: except KeyboardInterrupt:
if not test: if args["keithley"]:
k.write(f"smua.source.output = smua.OUTPUT_OFF") dev.write(f"smua.source.output = smua.OUTPUT_OFF")
print("Monitoring cancelled, measurement might still continue" + " "*50) print("Monitoring cancelled, measurement might still continue" + " "*50)
else: else:
print("Measurement finished" + " "*50) print("Measurement finished" + " "*50)
@ -176,7 +178,7 @@ def monitor(interval=settings["interval"], max_measurements=None, max_points_sho
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.") print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown) plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown)
update_func = plt_monitor.update update_func = plt_monitor.update
_measure(k, interval=interval, max_measurements=max_measurements, update_func=update_func, testing=test) _measure.measure(dev, interval=interval, max_measurements=max_measurements, update_func=update_func)
def measure(interval=settings["interval"], max_measurements=None): def measure(interval=settings["interval"], max_measurements=None):
@ -195,7 +197,7 @@ def measure(interval=settings["interval"], max_measurements=None):
_runtime_vars["last_measurement"] = dtime.now().isoformat() _runtime_vars["last_measurement"] = dtime.now().isoformat()
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.") print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
update_func = _update_print update_func = _update_print
_measure(k, interval=interval, max_measurements=max_measurements, update_func=update_func, testing=test) _measure.measure(dev, interval=interval, max_measurements=max_measurements, update_func=update_func)
def repeat(measure_func: callable, count: int, repeat_delay=0): def repeat(measure_func: callable, count: int, repeat_delay=0):
@ -221,7 +223,7 @@ def repeat(measure_func: callable, count: int, repeat_delay=0):
sleep(repeat_delay) sleep(repeat_delay)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
if settings["beep"]: k.write("beeper.beep(0.3, 1000)") if settings["beep"]: _backend.beep()
def get_dataframe(): def get_dataframe():
@ -229,21 +231,14 @@ def get_dataframe():
Get a pandas dataframe from the data in smua.nvbuffer1 and smua.nvbuffer2 Get a pandas dataframe from the data in smua.nvbuffer1 and smua.nvbuffer2
""" """
global k, settings, _runtime_vars global k, settings, _runtime_vars
if test: ibuffer = _backend.collect_buffer(dev, 1)
timestamps = np.arange(0, 50, 0.01) vbuffer = _backend.collect_buffer(dev, 2)
ydata = np.array([testing.testcurve(t, amplitude=15, peak_width=2) for t in timestamps])
ibuffer = np.vstack((timestamps, ydata)).T
ydata = np.array([-testing.testcurve(t, amplitude=5e-8, peak_width=1) for t in timestamps])
vbuffer = np.vstack((timestamps, ydata)).T
else:
ibuffer = _keithley.collect_buffer(k, 1)
vbuffer = _keithley.collect_buffer(k, 2)
df = _data.buffers2dataframe(ibuffer, vbuffer) df = _data.buffers2dataframe(ibuffer, vbuffer)
df.basename = file_io.get_next_filename(settings["name"], settings["datadir"]) df.basename = file_io.get_next_filename(settings["name"], settings["datadir"])
df.name = f"{df.basename} @ {_runtime_vars['last-measurement']}" df.name = f"{df.basename} @ {_runtime_vars['last-measurement']}"
return df return df
def save_csv(): def save_csv():
""" """
Saves the contents of nvbuffer1 as .csv Saves the contents of nvbuffer1 as .csv
@ -267,6 +262,7 @@ def save_pickle():
df.to_pickle(filename) df.to_pickle(filename)
print(f"Saved as '{filename}'") print(f"Saved as '{filename}'")
def run_script(script_path): def run_script(script_path):
""" """
Run a lua script on the Keithley device Run a lua script on the Keithley device
@ -276,7 +272,7 @@ def run_script(script_path):
if test: if test:
print("run_script: Test mode enabled, ignoring call to run_script") print("run_script: Test mode enabled, ignoring call to run_script")
else: else:
_keithley.run_lua(k, script_path=script_path) _keithley.run_lua(dev, script_path=script_path)
def set(setting, value): def set(setting, value):
@ -333,13 +329,13 @@ Run 'help("topic")' to see more information on a topic""")
Functions: Functions:
name("<name>") - short for set("name", "<name>") name("<name>") - short for set("name", "<name>")
set("setting", value) - set a setting to a value set("setting", value) - set a setting to a value
save_settings() - store the settings as "k-teng.conf" in the working directory save_settings() - store the settings as "m-teng.json" in the working directory
load_settings() - load settings from a file load_settings() - load settings from a file
The global variable 'config_path' determines the path used by save/load_settings. Use -c '<path>' to set another path. The global variable 'config_path' determines the path used by save/load_settings. Use -c '<path>' to set another path.
The serach path is: The serach path is:
<working-dir>/k-teng.json <working-dir>/m-teng.json
$XDG_CONFIG_HOME/k-teng.json $XDG_CONFIG_HOME/m-teng.json
~/.config/k-teng.json ~/.config/m-teng.json
""") """)
elif topic == "imports": elif topic == "imports":
print("""Imports: print("""Imports:
@ -349,7 +345,7 @@ Functions:
os.path """) os.path """)
elif topic == "device": elif topic == "device":
print("""Device: print("""Device:
The opened pyvisa resource (Keithley device) is the global variable 'k'. The opened pyvisa resource (deveithley device) is the global variable 'k'.
You can interact using pyvisa functions, such as You can interact using pyvisa functions, such as
k.write("command"), k.query("command") etc. to interact with the device.""") k.write("command"), k.query("command") etc. to interact with the device.""")
else: else:
@ -357,37 +353,26 @@ Functions:
def init(): def init():
global k, settings, test, config_path global dev, settings, config_path
print(r""" ____ __. ______________________ _______ ________ print(r""" ______________________ _______ ________
| |/ _| \__ ___/\_ _____/ \ \ / _____/ _____ \__ ___/\_ _____/ \ \ / _____/
| < ______ | | | __)_ / | \ / \ ___ / \ ______| | | __)_ / | \ / \ ___
| | \ /_____/ | | | \/ | \\ \_\ \ | Y Y \/_____/| | | \/ | \\ \_\ \
|____|__ \ |____| /_______ /\____|__ / \______ / |__|_| / |____| /_______ /\____|__ / \______ /
\/ \/ \/ \/ 1.1 \/ \/ \/ \/ 1.2
Interactive Shell for TENG measurements with Keithley 2600B Interactive Shell for TENG measurements with Keithley 2600B
--- ---
Enter 'help()' for a list of commands""") Enter 'help()' for a list of commands""")
from os import environ from os import environ
if path.isfile("k-teng.json"): if path.isfile("m-teng.json"):
config_path = "k-teng.json" config_path = "m-teng.json"
elif 'XDG_CONFIG_HOME' in environ.keys(): elif 'XDG_CONFIG_HOME' in environ.keys():
# and path.isfile(environ["XDG_CONFIG_HOME"] + "/k-teng.json"): # and path.isfile(environ["XDG_CONFIG_HOME"] + "/m-teng.json"):
config_path = environ["XDG_CONFIG_HOME"] + "/k-teng.json" config_path = environ["XDG_CONFIG_HOME"] + "/m-teng.json"
else: else:
config_path = path.expanduser("~/.config/k-teng.json") config_path = path.expanduser("~/.config/m-teng.json")
if args["config"]:
from sys import argv config_path = args["config"]
i = 1
while i < len(argv):
if argv[i] in ["-t", "--test"]:
test = True
elif argv[i] in ["-c", "--config"]:
if i+1 < len(argv):
config_path = argv[i+1]
else:
print("-c requires an extra argument: path of config file")
i += 1
i += 1
if not path.isdir(path.dirname(config_path)): if not path.isdir(path.dirname(config_path)):
@ -399,15 +384,11 @@ Enter 'help()' for a list of commands""")
if not path.isdir(settings["datadir"]): if not path.isdir(settings["datadir"]):
makedirs(settings["datadir"]) makedirs(settings["datadir"])
if not test:
from .keithley.keithley import init_keithley
try: try:
k = init_keithley(beep_success=settings["beep"]) dev = _backend.init(beep_success=settings["beep"])
except Exception as e: except Exception as e:
print(e) print(e)
exit() exit(1)
else:
print("Running in test mode, device will not be connected.")
if __name__ == "__main__": if __name__ == "__main__":

123
m_teng/update_funcs.py Normal file
View File

@ -0,0 +1,123 @@
import matplotlib.pyplot as plt
import numpy as np
import torch
from teng_ml.util import model_io as mio
from teng_ml.util.settings import MLSettings
from teng_ml.util.split import DataSplitter
from m_teng.backends.keithley import keithley
def _update_print(i, ival, vval):
print(f"n = {i:5d}, I = {ival: .12f} A, U = {vval: .5f} V" + " "*10, end='\r')
class _Monitor:
"""
Monitor v and i data
"""
def __init__(self, max_points_shown=None, use_print=False):
self.max_points_shown = max_points_shown
self.use_print = use_print
self.index = []
self.vdata = []
self.idata = []
plt.ion()
self.fig1, (self.vax, self.iax) = plt.subplots(2, 1, figsize=(8, 5))
self.vline, = self.vax.plot(self.index, self.vdata, color="g")
self.vax.set_ylabel("Voltage [V]")
self.vax.grid(True)
self.iline, = self.iax.plot(self.index, self.idata, color="m")
self.iax.set_ylabel("Current [A]")
self.iax.grid(True)
def update(self, i, ival, vval):
if self.use_print:
_update_print(i, ival, vval)
self.index.append(i)
self.idata.append(ival)
self.vdata.append(vval)
# update data
self.iline.set_xdata(self.index)
self.iline.set_ydata(self.idata)
self.vline.set_xdata(self.index)
self.vline.set_ydata(self.vdata)
# recalculate limits and set them for the view
self.iax.relim()
self.vax.relim()
if self.max_points_shown and i > self.max_points_shown:
self.iax.set_xlim(i - self.max_points_shown, i)
self.vax.set_xlim(i - self.max_points_shown, i)
self.iax.autoscale_view()
self.vax.autoscale_view()
# update plot
self.fig1.canvas.draw()
self.fig1.canvas.flush_events()
def __del__(self):
plt.close(self.fig1)
class _ModelPredict:
colors = ["red", "green", "purple", "blue", "orange", "grey", "cyan"]
def __init__(self, instr, model_dir):
"""
@param model_dir: directory where model.plk and settings.pkl are stored
Predict the values that are currently being recorded
@details:
Load the model and model settings from model dir
Wait until the number of recoreded points is >= the size of the models DataSplitter
Collect the data from the keithley, apply the transforms and predict the label with the model
Shows the prediction with a bar plot
"""
self.instr = instr
self.model = mio.load_model(model_dir)
self.model_settings: MLSettings = mio.load_settings(model_dir)
if type(self.model_settings.splitter) == DataSplitter:
self.data_length = self.model_settings.splitter.split_size
else:
self.data_length = 200
plt.ion()
self.fig1, (self.ax) = plt.subplots(1, 1, figsize=(8, 5))
self.bar_cont = self.ax.bar(self.model_settings.labels.get_labels(), [ 1 for _ in range(len(self.model_settings.labels))])
self.ax.set_ylabel("Prediction")
self.ax.grid(True)
def update(self, i, ival, vval):
buffer_size = keithley.get_buffer_size(self.instr, buffer_nr=1)
if buffer_size <= self.data_length:
print(f"ModelPredict.update: buffer_size={buffer_size} < {self.data_length}")
return
else:
ibuffer = keithley.collect_buffer_range(self.instr, (buffer_size-self.data_length, buffer_size), buffer_nr=1)
vbuffer = keithley.collect_buffer_range(self.instr, (buffer_size-self.data_length, buffer_size), buffer_nr=2)
if self.model_settings.num_features == 1: # model uses only voltage
data = np.vstack((ibuffer[:,0], ibuffer[:,1], vbuffer[:,1])).T
# print(f"data.shape:", data.shape)
else:
raise NotImplementedError(f"Cant handle models with num_features != 1 yet")
for t in self.model_settings.transforms:
data = t(data)
data = np.reshape(data[:,2], (1, -1, 1)) # batch_size, seq, features
with torch.no_grad():
x = torch.FloatTensor(data) # select voltage data, without timestamps
# print(x.shape)
prediction = self.model(x) # (batch_size, label-predictions)
prediction = torch.nn.functional.softmax(prediction) # TODO remove when softmax is already applied by model
predicted = torch.argmax(prediction, dim=1, keepdim=False) # -> [ label_indices ]
# print(f"raw={prediction[0]}, predicted_index={predicted[0}")
label = self.model_settings.labels[predicted[0]]
# print(f"-> label={label}")
self.bar_cont.remove()
self.bar_cont = self.ax.bar(self.model_settings.labels.get_labels(), prediction[0], color=_ModelPredict.colors[:len(self.model_settings.labels)])
# update plot
self.fig1.canvas.draw()
self.fig1.canvas.flush_events()

View File

17
m_teng/utility/testing.py Normal file
View File

@ -0,0 +1,17 @@
import numpy as np
def testcurve(x, frequency=10, peak_width=2, amplitude=20, bias=0):
# 0 = pk - width
# 2pi = pk + width
# want peak at n*time == frequency
nearest_peak = np.round(x / frequency, 0)
# if not peak at 0 and within peak_width
if nearest_peak > 0 and np.abs((x - nearest_peak * frequency)) < peak_width:
# return sin that does one period within 2*peak_width
return amplitude * np.sin(2*np.pi * (x - nearest_peak * frequency - peak_width) / (2*peak_width)) + bias
else:
return bias
def get_testcurve(frequency=10, peak_width=2, amplitude=20, bias=0):
return np.vectorize(lambda x: testcurve(x, frequency=frequency, peak_width=peak_width, amplitude=amplitude, bias=bias))

File diff suppressed because one or more lines are too long

35
pyproject.toml Normal file
View File

@ -0,0 +1,35 @@
[build-system]
requires = ["setuptools"]
[project]
name = "k_teng"
version = "1.0.0"
description = "Interactive utility for I-V measurements with a Keitley 2600 SMU"
requires-python = ">=3.10"
readme = "readme.md"
license = {file = "LICENSE"}
authors = [
{ name = "Matthias Quintern", email = "matthias@quintern.xyz" }
]
classifiers = [
"Operating System :: POSIX :: Linux",
"Environment :: Console",
"Programming Language :: Python :: 3",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
]
dependencies = [
"matplotlib>=3.6",
"numpy",
"pyvisa",
"pyvisa-py"
]
[project.urls]
repository = "https://git.quintern.xyz/MatthiasQuintern/k-teng"
[tool.setuptools.packages.find]
where = ["."]
[project.scripts]
k-teng = "k_teng.k_teng_interactive:main"

View File

@ -1,23 +1,39 @@
# K-TENG # m-TENG
Helper scripts and shell for measuring **T**ribo**e**lectric **N**ano**g**enerator-based sensor output with a Keithley 2611B SMU using pyvisa Helper scripts and shell for measuring **T**ribo**e**lectric **N**ano**g**enerator-based sensor output with a Keithley 2611B SMU or an Arduino
## Features ## Features
### Useful functions for scripts
- Measure Voltage and/or Current
- Transfer buffer from device to host
- Save/load as csv
- Run lua script on device
- Auto-filenames
### Interactive (shell) mode ### Interactive (shell) mode
- Live view - Live view
- Press button to stop - Press button to stop
- Save and load settings (default interval, data directory...) - Save and load settings (default interval, data directory...)
- Easily run arbitrary command on device - Easily run arbitrary command on device
### Useful functions for scripts
- Measure voltage and/or current
- Transfer buffer from measurement device to host
- Save/load as csv
- Run lua script on Keithley SMU
- Auto-filenames
## Available backends
### keithley
Use a Keithley 2611B Source-Measure-Unit via *pyvisa*. This backend allows measuring both voltage and current simultaneously.
### arduino
Use a Bluetooth capable Arduino with [https://git.quintern.xyz/MatthiasQuintern/teng-arduino](this software on the arduino).
This backend only allows measuring voltage using an Arduinos analog input pin (0-5 V, 10 bit resolution).
### testing
Use the shell without measuring TENG output. When starting a measurement, sample data will be generated.
## Shell mode ## Shell mode
Start with: It is recommended to run the shell with ipython:
```shell ```shell
ipython -i k_teng_interactive.py ipython -i k_teng_interactive.py -- -*X*
``` ```
Substitute *X* for `-k` for keithley backend, `-a` for arduino backend or `-t` for testing backend.
Use `help()` to get a list of available commands Use `help()` to get a list of available commands