Compare commits

..

4 Commits

Author SHA1 Message Date
Matthias@Dell
b526bfb913 restructured using backends 2023-06-24 12:28:21 +02:00
Matthias@Dell
0eed4d402b added help for different backends 2023-06-24 12:28:12 +02:00
Matthias@Dell
0494881232 updated readme 2023-06-24 12:27:45 +02:00
Matthias@Dell
a0f8a2a903 renamed to m-teng 2023-06-18 17:38:10 +02:00
27 changed files with 696 additions and 803 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
__pycache__
.ipynb_checkpoints
testing

2
MANIFEST.in Normal file
View File

@ -0,0 +1,2 @@
include regina/package-data/*
include regina/sql/*.sql

1
bspc_rule.sh Executable file
View File

@ -0,0 +1 @@
bspc rule -a matplotlib desktop=^1 state=pseudo_tiled focus=off

View File

@ -1,107 +0,0 @@
from time import sleep
import numpy as np
from matplotlib import pyplot as plt
import pyvisa
from .keithley import reset
from ..utility import testing as _testing
def measure_count(instr, V=True, I=True, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True, testing=False):
"""
Take <count> measurements with <interval> inbetween
@details
Uses the devices overlappedY function to make the measurements asynchronosly
The update_func is optional and only used when I == True and V == True
The update_func does not necessarily get all the values that are measured. To obtain the whole measurement, get them from the device buffers (smua.nvbufferX)
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param update_interval: interval at which the update_func is called
"""
f_meas = None
if V and I:
f_meas = "smua.measure.overlappediv(smua.nvbuffer1, smua.nvbuffer2)"
elif V:
f_meas = "smua.measure.overlappedv(smua.nvbuffer1)"
elif I:
f_meas = "smua.measure.overlappedi(smua.nvbuffer1)"
else:
print("I and/or V needs to be set to True")
return
i = 0
if not testing:
reset(instr, verbose=verbose)
instr.write(f"smua.measure.count = {count}")
instr.write(f"smua.measure.interval = {interval}")
# start measurement
instr.write(f"smua.source.output = smua.OUTPUT_ON")
instr.write(f_meas)
condition = lambda: float(instr.query("print(status.operation.measuring.condition)").strip("\n ")) != 0
else:
condition = lambda: i < int(float(count) * interval / update_interval)
sleep(update_interval)
# for live viewing
query = """if smua.nvbufferX.n > 0 then print(smua.nvbufferX.readings[smua.nvbufferX.n]) else print(0) end"""
# will return 2.0 while measruing
while condition():
if update_func and V and I:
try:
if not testing:
ival = float(instr.query(query.replace("X", "1")).strip("\n"))
vval = float(instr.query(query.replace("X", "2")).strip("\n"))
else:
ival = _testing.testcurve(i, peak_width=1, amplitude=5e-8)
vval = -_testing.testcurve(i, peak_width=2, amplitude=15)
update_func(i, ival, vval)
except ValueError:
pass
sleep(update_interval)
i += 1
if not testing:
instr.write(f"smua.source.output = smua.OUTPUT_OFF")
if beep_done:
instr.write("beeper.beep(0.3, 1000)")
def measure(instr, interval, update_func=None, max_measurements=None, testing=False):
"""
@details:
- Resets the buffers
- Until KeyboardInterrupt:
- Take measurement
- Call update_func
- Wait interval
Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision
You can take the data from the buffer afterwards, using save_csv
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param max_measurements : maximum number of measurements. None means infinite
"""
if not testing:
reset(instr, verbose=True)
instr.write("smua.source.output = smua.OUTPUT_ON")
instr.write("format.data = format.ASCII\nformat.asciiprecision = 12")
try:
i = 0
while max_measurements is None or i < max_measurements:
if testing:
ival = _testing.testcurve(i, peak_width=1, amplitude=5e-8)
vval = -_testing.testcurve(i, peak_width=2, amplitude=15)
else:
ival, vval = tuple(float(v) for v in instr.query("print(smua.measure.iv(smua.nvbuffer1, smua.nvbuffer2))").strip('\n').split('\t'))
if update_func:
update_func(i, ival, vval)
sleep(interval)
i += 1
except KeyboardInterrupt:
pass
if not testing:
instr.write("smua.source.output = smua.OUTPUT_OFF")
print("Measurement stopped" + " "*50)

View File

@ -1,4 +0,0 @@
pdms
kapton
plastic

View File

@ -1,49 +0,0 @@
readings = [
4.974127e-10, -1.418591e-12, -1.573563e-12, -1.728535e-12, -1.704693e-12,
-1.847744e-12, -1.931190e-12, -1.788139e-12, -1.871586e-12, -2.169609e-12,
-2.074242e-12, -1.895428e-12, -1.895428e-12, -1.776218e-12, -1.990795e-12,
-1.788139e-12, -1.811981e-12, -1.657009e-12, -1.573563e-12, -2.396107e-12,
-2.515316e-12, -2.765656e-12, -2.825260e-12, -3.147125e-12, -2.300739e-12,
-2.825260e-12, -3.278255e-12, -5.257130e-12, -6.818771e-12, -8.916855e-12,
-7.712841e-12, 6.437302e-12, -1.142025e-11, -1.206398e-11, -4.649043e-10,
-3.427613e-09, -2.460408e-09, -2.340376e-09, -1.306653e-10, 1.496077e-11,
2.933741e-11, 1.953280e-09, 8.579970e-10, 9.226799e-12, -1.095533e-11,
-2.508163e-11, -2.776039e-09, -8.686423e-09, 4.935264e-12, 1.246929e-11,
3.225744e-09, 2.814472e-09, 1.877034e-09, 2.229273e-09, 1.713574e-09,
8.355618e-10, -4.332781e-10, 5.896091e-11, 5.762577e-11, 8.129537e-09,
4.044378e-09, 1.771629e-09, 7.849216e-10, 4.098892e-10, 3.390551e-10,
2.956390e-10, 3.033876e-10, 1.716256e-10, 1.463890e-11, -5.078316e-12,
-6.949902e-12, -8.106232e-12, -6.473065e-12, -4.506111e-12, 4.919767e-11,
3.052297e-08, 1.161162e-08, -9.892106e-09, -3.613818e-09, -5.004287e-09,
-2.015829e-11, -4.183054e-11, -1.810908e-10, -2.042532e-10, -3.516316e-10,
5.099773e-11, 1.921976e-08, -1.256589e-08, -4.242897e-10, -1.358986e-12,
-3.445148e-12, -3.838539e-12, -4.184246e-12, -7.402897e-12, -2.840877e-10,
-2.872229e-10, -2.730966e-10, -1.134396e-10, -4.376173e-11, -3.576279e-14
]
timestamps = [
0. , 0.05 , 0.1 , 0.15, 0.2, 0.25, 0.3, 0.35,
0.4 , 0.45 , 0.5 , 0.55, 0.6, 0.65, 0.7, 0.75,
0.8 , 0.85 , 0.9 , 0.95, 1. , 1.05, 1.1, 1.15,
1.2 , 1.25 , 1.3 , 1.35, 1.4, 1.45, 1.5, 1.55,
1.6 , 1.690891, 1.710923, 1.75, 1.8, 1.85, 1.9, 1.95,
2. , 2.05 , 2.1 , 2.15, 2.2, 2.25, 2.3, 2.35,
2.420332, 2.45 , 2.5 , 2.55, 2.6, 2.65, 2.7, 2.75,
2.820553, 2.890843, 2.910875, 2.95, 3. , 3.05, 3.1, 3.15,
3.2 , 3.25 , 3.3 , 3.35, 3.4, 3.45, 3.5, 3.55,
3.6 , 3.65 , 3.7 , 3.75, 3.8, 3.85, 3.9, 3.95,
4. , 4.05 , 4.1 , 4.15, 4.2, 4.25, 4.3, 4.35,
4.4 , 4.45 , 4.5 , 4.55, 4.6, 4.65, 4.7, 4.75,
4.8 , 4.85 , 4.9 , 4.95, ]
import pandas as pd
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
df = pd.DataFrame(np.vstack((timestamps, readings)).T)
df.columns = ["Time [s]", "Voltage [V]"]
print(df)
df.to_csv("test.csv", sep=",", header=True, index=False)
df.plot(x='Time [s]', y="Voltage [V]")
plt.show()

View File

@ -1,33 +0,0 @@
import numpy as np
import matplotlib.pyplot as plt
def testcurve(x, frequency=10, peak_width=2, amplitude=20, bias=0):
# want peak at n*time == frequency
nearest_peak = np.round(x / frequency, 0)
# if not peak at 0 and within peak_width
# print(x, nearest_peak)
if nearest_peak > 0 and abs((x - nearest_peak * frequency)) < peak_width:
# return sin that does one period within 2*peak_width
return amplitude * np.sin(2*np.pi * (x - nearest_peak * frequency - peak_width) / (2*peak_width)) + bias
else:
return bias
# 0 = pk - width
# 2pi = pk + width
def baseline(data):
# find the value where the most values with low gradients are closest to
gradients = np.abs(np.gradient(data))
# consider the values where the absolute gradient is in the bottom 20%
n_gradients = len(data) // 20
consider_indices = np.argsort(gradients)[:n_gradients]
# of those, only consider values where the value
consider_values = data[consider_indices]
xdata = np.arange(0, 100, 0.01)
ydata = np.vectorize(testcurve)(xdata)
plt.plot(xdata, ydata)
plt.show()

View File

@ -1,38 +0,0 @@
import pandas as pd
import numpy as np
def collect_buffer(instr, buffer_nr=1):
"""
Get the buffer as 2D - np.array
@param instr : pyvisa instrument
@param buffer_nr : 1 or 2, for smua.nvbuffer1 or 2
@returns 2D numpy array:
i - ith reading:
0: timestamps
1: readings
"""
if buffer_nr == 2: buffername = "smua.nvbuffer2"
else: buffername = "smua.nvbuffer1"
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
timestamps = instr.query_ascii_values(f"printbuffer(1, {buffername}.n, {buffername}.timestamps)", container=np.array)
readings = instr.query_ascii_values(f"printbuffer(1, {buffername}.n, {buffername}.readings)", container=np.array)
print(f"readings: {readings}, \ntimestamps: {timestamps}")
buffer = np.vstack((timestamps, readings)).T
return buffer
def testcurve(x, frequency=10, peak_width=2, amplitude=20, bias=0):
# want peak at n*time == frequency
nearest_peak = np.round(x / frequency, 0)
# if not peak at 0 and within peak_width
if nearest_peak > 0 and abs((x - nearest_peak * frequency)) < peak_width:
# return sin that does one period within 2*peak_width
return amplitude * np.sin(2*np.pi * (x - nearest_peak * frequency - peak_width) / (2*peak_width)) + bias
else:
return bias
# 0 = pk - width
# 2pi = pk + width

View File

@ -0,0 +1,155 @@
import bleak as b
import asyncio
import numpy as np
TARGET_NAME = "ArduinoTENG"
# GATT service and characteristics UUIDs
TENG_SUUID = "00010000-9a74-4b30-9361-4a16ec09930f"
TENG_STATUS_CUUID = "00010001-9a74-4b30-9361-4a16ec09930f"
TENG_COMMAND_CUUID = "00010002-9a74-4b30-9361-4a16ec09930f"
TENG_READING_CUUID = "00010003-9a74-4b30-9361-4a16ec09930f"
TENG_COUNT_CUUID = "00010004-9a74-4b30-9361-4a16ec09930f"
TENG_INTERVAL_CUUID = "00010005-9a74-4b30-9361-4a16ec09930f"
TENG_COMMANDS = {
"STOP": int(0).to_bytes(1, signed=False),
"MEASURE_COUNT": int(1).to_bytes(1, signed=False),
"MEASURE": int(2).to_bytes(1, signed=False),
}
TENG_STATUS = ["ERROR", "BUSY", "WAIT_CONNECT", "CONNECTED", "MEASURING"]
# TODO save measurements on device buffer, transfer later
# wrapper for global variable
class Buffer:
def __init__(self):
self.data = None
_buffer = Buffer()
# class Runner:
# def __init__(self):
runner = asyncio.Runner()
def teng_status_callback(characteristic, data):
value = int.from_bytes(data, byteorder="big", signed=False)
if 0 <= value and value < len(TENG_STATUS):
print(f"Status change: {TENG_STATUS[value]}")
else:
print(f"Status change (invalid): status={value}")
def disconnect_callback(client):
raise Exception(f"The Bluetooth device {client.name} was disconnected")
async def init_arduino_async(n_tries: int=5) -> b.BleakClient:
n_try = 0
if n_tries <= 0: n_tries = "inf"
try:
target_device = None
while target_device is None and (n_tries == "inf" or n_try < n_tries):
print(f"Searching for Bluetooth device '{TARGET_NAME}' ({n_try+1}/{n_tries})", end="\r")
devices = await b.BleakScanner.discover(return_adv=True, timeout=1.5)
# print(devices)
for adr, (device, adv_data) in devices.items():
if device.name == TARGET_NAME:
# print(adv_data)
target_device = device
break
n_try += 1
if target_device is None:
raise Exception(f"Could not find Bluetooth device 'ArduinoTENG'")
# print(f"Found target device: {target_device.name}: {target_device.metadata}, {target_device.details}")
# print(target_device.name, target_device.details)
client = b.BleakClient(target_device, disconnect_callback=disconnect_callback)
await client.connect()
print(f"Connected to Bluetooth device '{TARGET_NAME}' at [{client.address}]")
return client
except asyncio.exceptions.CancelledError:
raise Exception(f"Cancelled")
def init(beep_success=True, n_tries: int=5) -> b.BleakClient:
"""
Connect to the arduino
@returns: BleakClient
"""
client = runner.run(init_arduino_async(n_tries=n_tries))
if beep_success: beep(client)
return client
def exit(client):
try:
runner.run(stop_measurement(client))
runner.run(client.disconnect())
except Exception:
pass
async def set_interval(client, interval: float):
"""
Set the measurement interval
@param interval: interval in seconds
"""
interval = int(interval * 1000) # convert to ms for arduinos delay)
await client.write_gatt_char(TENG_INTERVAL_CUUID, interval.to_bytes(2, byteorder="little", signed=False))
async def set_count(client, count: int):
"""
Set the measurement count
@param count: number of measurements to take
"""
await client.write_gatt_char(TENG_COUNT_CUUID, count.to_bytes(2, byteorder="little", signed=False))
async def stop_measurement(client):
await client.write_gatt_char(TENG_COMMAND_CUUID, TENG_COMMANDS["STOP"])
async def start_measure_count(client):
await client.write_gatt_char(TENG_COMMAND_CUUID, TENG_COMMANDS["MEASURE_COUNT"])
async def start_measure(client):
await client.write_gatt_char(TENG_COMMAND_CUUID, TENG_COMMANDS["MEASURE"])
# async def main():
# for service in client.services:
# print(f"Service: {service.uuid}: {service.description}")
# for c in service.characteristics:
# print(f"\t{c.uuid}: {c.properties}, {c.descriptors}")
# teng_status = client.services.get_characteristic(TENG_STATUS_CUUID)
# teng_command = client.services.get_characteristic(TENG_COMMAND_CUUID)
# teng_reading = client.services.get_characteristic(TENG_READING_CUUID)
# client.start_notify(teng_status, teng_status_callback)
# await client.write_gatt_char(teng_command, TENG_COMMANDS["NOOP"])
# await asyncio.sleep(5)
# await client.write_gatt_char(teng_command, TENG_COMMANDS["MEASURE_BASELINE"])
# while client.is_connected:
# data = await client.read_gatt_char(teng_reading)
# value = int.from_bytes(data, byteorder="little", signed=False)
# print(f"Reading: {value}")
# await asyncio.sleep(0.5)
# except KeyboardInterrupt:
# pass
# except asyncio.exceptions.CancelledError:
# pass
# print("Disconnected")
def collect_buffer(instr, buffer_nr=1):
"""
@param buffer_nr: 1 -> current, 2 -> voltage
"""
assert(buffer_nr in (1, 2))
return np.vstack((_buffer.data[:,0], _buffer.data[:,buffer_nr])).T
def beep(client):
# TODO connect beeper to arduino?
print("beep")

View File

@ -0,0 +1,75 @@
import bleak as b
import numpy as np
import asyncio
import datetime
from m_teng.backends.arduino.arduino import beep, set_interval, set_count, TENG_READING_CUUID, _buffer, start_measure, start_measure_count, stop_measurement, runner
async def _measure_count_async(client, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True):
global _buffer
_buffer.data = np.zeros((count, 3))
i = 0
t_start = datetime.datetime.now()
async def add_reading(teng_reading_cr, reading: bytearray):
nonlocal i, count
if i >= count: return
_buffer.data[i][0] = float((datetime.datetime.now() - t_start).microseconds) / 1000
# reading = await client.read_gatt_char(TENG_READING_CUUID)
_buffer.data[i][2] = int.from_bytes(reading, byteorder="little", signed=False)
i += 1
await set_interval(client, interval)
await set_count(client, count)
# TODO check if notify works when the same value is written again
await client.start_notify(TENG_READING_CUUID, add_reading)
await start_measure_count(client)
while i < count:
await asyncio.sleep(update_interval)
if update_func is not None and i > 0: # assume an update has occured
update_func(i-1, 0, _buffer.data[i-1, 2])
await client.stop_notify(TENG_READING_CUUID)
if beep_done: beep(client)
def measure_count(client, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True):
runner.run(_measure_count_async(client, count=count, interval=interval, update_func=update_func, update_interval=update_interval, beep_done=beep_done, verbose=verbose))
async def _measure_async(client, interval, update_func=None, max_measurements=None):
global _buffer
readings = []
timestamps = []
i = 0
t_start = datetime.datetime.now()
async def add_reading(teng_reading_cr, reading):
nonlocal i
timestamps.append(float((datetime.datetime.now() - t_start).microseconds) / 1000)
reading = int.from_bytes(reading, byteorder="little", signed=False)
readings.append(reading)
if update_func:
try:
update_func(i, 0, reading)
except KeyboardInterrupt:
raise asyncio.exceptions.CancelledError("KeyboardInterrupt in update_func")
i += 1
await set_interval(client, interval)
await client.start_notify(TENG_READING_CUUID, add_reading)
await start_measure(client)
try:
while max_measurements is None or i < max_measurements:
await asyncio.sleep(0.1) #
except asyncio.exceptions.CancelledError:
pass
except KeyboardInterrupt:
pass
await client.stop_notify(TENG_READING_CUUID)
await stop_measurement(client)
_buffer.data = np.vstack((timestamps, np.zeros(len(timestamps)), readings)).T
print("Measurement stopped" + " "*50)
def measure(client, interval, update_func=None, max_measurements=None):
runner.run(_measure_async(client, interval=interval, update_func=update_func, max_measurements=max_measurements))

View File

View File

@ -1,20 +1,19 @@
import pyvisa
import numpy as np
import pkg_resources
"""
Utility
"""
script_dir = "../scripts/"
scripts = {
"buffer_reset": "buffer_reset.lua",
"smua_reset": "smua_reset.lua",
"buffer_reset": pkg_resources.resource_filename("m_teng", "keithley_scripts/buffer_reset.lua"),
"smua_reset": pkg_resources.resource_filename("m_teng", "keithley_scripts/smua_reset.lua"),
}
for key,val in scripts.items():
scripts[key] = script_dir + scripts[key]
def init_keithley(beep_success=True):
def init(beep_success=True):
rm = pyvisa.ResourceManager('@py')
resources = rm.list_resources()
if len(resources) < 1:
@ -32,6 +31,11 @@ def init_keithley(beep_success=True):
return keithley
def exit(instr):
instr.close()
def run_lua(instr, script_path, verbose=False):
"""
Run a lua script from the host on the instrument
@ -52,6 +56,15 @@ def reset(instr, verbose=False):
run_lua(instr, scripts["smua_reset"], verbose=verbose)
run_lua(instr, scripts["buffer_reset"], verbose=verbose)
def get_buffer_name(buffer_nr: int):
if buffer_nr == 2: return "smua.nvbuffer2"
elif buffer_nr == 1: return "smua.nvbuffer1"
raise ValueError(f"Invalid buffer_nr: {buffer_nr}")
def get_buffer_size(instr, buffer_nr=1):
n = instr.query(f"print({get_buffer_name(buffer_nr)}.n)").strip("\n")
return int(float(n))
def collect_buffer(instr, buffer_nr=1, verbose=False):
"""
@ -63,8 +76,7 @@ def collect_buffer(instr, buffer_nr=1, verbose=False):
0: timestamps
1: readings
"""
if buffer_nr == 2: buffername = "smua.nvbuffer2"
else: buffername = "smua.nvbuffer1"
buffername = get_buffer_name(buffer_nr)
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
@ -75,3 +87,30 @@ def collect_buffer(instr, buffer_nr=1, verbose=False):
buffer = np.vstack((timestamps, readings)).T
return buffer
def collect_buffer_range(instr, range_=(1, -1), buffer_nr=1, verbose=False):
"""
Get the buffer as 2D - np.array
@param instr : pyvisa instrument
@param buffer_nr : 1 or 2, for smua.nvbuffer1 or 2
@returns 2D numpy array:
i - ith reading:
0: timestamps
1: readings
"""
buffername = get_buffer_name(buffer_nr)
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
if range_[1] == -1:
range_ = (range_[0], f"{buffername}.n")
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
timestamps = instr.query_ascii_values(f"printbuffer({range_[0]}, {range_[1]}, {buffername}.timestamps)", container=np.array)
readings = instr.query_ascii_values(f"printbuffer({range_[0]}, {range_[1]}, {buffername}.readings)", container=np.array)
if verbose:
print(f"readings from {buffername}: {readings}, \ntimestamps: {timestamps}")
buffer = np.vstack((timestamps, readings)).T
return buffer

View File

@ -0,0 +1,93 @@
from time import sleep
import numpy as np
from matplotlib import pyplot as plt
import pyvisa
from m_teng.backends.keithley import reset
from m_teng.utility import testing as _testing
def measure_count(instr, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True):
"""
Take <count> measurements with <interval> inbetween
@details
Uses the devices overlappedY function to make the measurements asynchronosly
The update_func is optional and only used when I == True and V == True
The update_func does not necessarily get all the values that are measured. To obtain the whole measurement, get them from the device buffers (smua.nvbufferX)
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param update_interval: interval at which the update_func is called
"""
f_meas = "smua.measure.overlappediv(smua.nvbuffer1, smua.nvbuffer2)"
# if V and I:
# elif V:
# f_meas = "smua.measure.overlappedv(smua.nvbuffer1)"
# elif I:
# f_meas = "smua.measure.overlappedi(smua.nvbuffer1)"
# else:
# print("I and/or V needs to be set to True")
# return
i = 0
reset(instr, verbose=verbose)
instr.write(f"smua.measure.count = {count}")
instr.write(f"smua.measure.interval = {interval}")
# start measurement
instr.write(f"smua.source.output = smua.OUTPUT_ON")
instr.write(f_meas)
sleep(update_interval)
# for live viewing
query = """if smua.nvbufferX.n > 0 then print(smua.nvbufferX.readings[smua.nvbufferX.n]) else print(0) end"""
# will return 2.0 while measruing
while float(instr.query("print(status.operation.measuring.condition)").strip("\n ")) != 0:
if update_func:
try:
ival = float(instr.query(query.replace("X", "1")).strip("\n"))
vval = float(instr.query(query.replace("X", "2")).strip("\n"))
update_func(i, ival, vval)
except ValueError as e:
if i != 0:
pass
else:
print(f"measure_count: ValueError: {e}")
sleep(update_interval)
i += 1
instr.write(f"smua.source.output = smua.OUTPUT_OFF")
if beep_done:
instr.write("beeper.beep(0.3, 1000)")
def measure(instr, interval, update_func=None, max_measurements=None):
"""
@details:
- Resets the buffers
- Until KeyboardInterrupt:
- Take measurement
- Call update_func
- Wait interval
Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision
You can take the data from the buffer afterwards, using save_csv
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param max_measurements : maximum number of measurements. None means infinite
"""
reset(instr, verbose=True)
instr.write("smua.source.output = smua.OUTPUT_ON")
instr.write("format.data = format.ASCII\nformat.asciiprecision = 12")
try:
i = 0
while max_measurements is None or i < max_measurements:
ival, vval = tuple(float(v) for v in instr.query("print(smua.measure.iv(smua.nvbuffer1, smua.nvbuffer2))").strip('\n').split('\t'))
if update_func:
update_func(i, ival, vval)
sleep(interval)
i += 1
except KeyboardInterrupt:
pass
instr.write("smua.source.output = smua.OUTPUT_OFF")
print("Measurement stopped" + " "*50)

View File

@ -1,6 +1,6 @@
"""
run this before using this library:
ipython -i k_teng_interactive.py
ipython -i m_teng_interactive.py
always records iv-t curves
i-data -> smua.nvbuffer1
@ -17,22 +17,56 @@ from time import sleep
from os import path, makedirs
import pickle as pkl
import json
import atexit
import argparse
if __name__ == "__main__":
import sys
if __package__ is None:
# make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change
__package__ = "k-teng"
import sys
__package__ = "m_teng"
from os import path
filepath = path.realpath(path.abspath(__file__))
sys.path.insert(0, path.dirname(path.dirname(filepath)))
parser = argparse.ArgumentParser(
prog="m-teng",
description="measure triboelectric nanogenerator output using a Keithley SMU or an Arduino",
)
backend_group = parser.add_mutually_exclusive_group(required=True)
backend_group.add_argument("-k", "--keithley", action="store_true")
backend_group.add_argument("-a", "--arduino", action="store_true")
backend_group.add_argument("-t", "--testing", action='store_true')
parser.add_argument("-c", "--config", action="store", help="alternate path to config file")
args = vars(parser.parse_args())
i = 1
while i < len(sys.argv):
if args["keithley"]:
import m_teng.backends.keithley.keithley as _backend
import m_teng.backends.keithley.measure as _measure
elif args["arduino"]:
import m_teng.backends.arduino.arduino as _backend
import m_teng.backends.arduino.measure as _measure
elif args["testing"]:
import m_teng.backends.testing.testing as _backend
import m_teng.backends.testing.measure as _measure
elif sys.argv[i] in ["-c", "--config"]:
if i+1 < len(sys.argv):
config_path = sys.argv[i+1]
else:
print("-c requires an extra argument: path of config file")
i += 1
i += 1
from .keithley import keithley as _keithley
from .keithley.measure import measure_count as _measure_count, measure as _measure
from .utility import data as _data
from .utility.data import load_dataframe
from .utility import file_io
from m_teng.utility import data as _data
from m_teng.utility.data import load_dataframe
from m_teng.utility import file_io
from m_teng.update_funcs import _Monitor, _ModelPredict, _update_print
config_path = path.expanduser("~/.config/k-teng.json")
_runtime_vars = {
"last-measurement": ""
@ -44,66 +78,35 @@ settings = {
"interval": 0.02,
"beep": True,
}
config_path = path.expanduser("~/.config/k-teng.json")
test = False
# global variable for the instrument returned by pyvisa
k = None
# global variable for the instrument/client returned by pyvisa/bleak
dev = None
def _update_print(i, ival, vval):
print(f"n = {i:5d}, I = {ival: .12f} A, U = {vval: .5f} V" + " "*10, end='\r')
class _Monitor:
def monitor_predict(model_dir: str, count=5000, interval=settings["interval"], max_points_shown=160):
"""
Monitor v and i data
Take <count> measurements in <interval> and predict with a machine learning model
"""
def __init__(self, max_points_shown=None, use_print=False):
self.max_points_shown = max_points_shown
self.use_print = use_print
self.index = []
self.vdata = []
self.idata = []
plt.ion()
self.fig1, (self.vax, self.iax) = plt.subplots(2, 1, figsize=(8, 5))
self.vline, = self.vax.plot(self.index, self.vdata, color="g")
self.vax.set_ylabel("Voltage [V]")
self.vax.grid(True)
self.iline, = self.iax.plot(self.index, self.idata, color="m")
self.iax.set_ylabel("Current [A]")
self.iax.grid(True)
def update(self, i, ival, vval):
if self.use_print:
_update_print(i, ival, vval)
self.index.append(i)
self.idata.append(ival)
self.vdata.append(vval)
# update data
self.iline.set_xdata(self.index)
self.iline.set_ydata(self.idata)
self.vline.set_xdata(self.index)
self.vline.set_ydata(self.vdata)
# recalculate limits and set them for the view
self.iax.relim()
self.vax.relim()
if self.max_points_shown and i > self.max_points_shown:
self.iax.set_xlim(i - self.max_points_shown, i)
self.vax.set_xlim(i - self.max_points_shown, i)
self.iax.autoscale_view()
self.vax.autoscale_view()
# update plot
self.fig1.canvas.draw()
self.fig1.canvas.flush_events()
def __del__(self):
plt.close(self.fig1)
model_predict = _ModelPredict(dev, model_dir)
plt_monitor = _Monitor(max_points_shown, use_print=False)
skip_n = 0
def update(i, ival, vval):
plt_monitor.update(i, ival, vval)
if skip_n % 10 == 0:
model_predict.update(i, ival, vval)
skip_n += 1
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
try:
_measure.measure_count(dev, count=count, interval=interval, beep_done=False, verbose=False, update_func=update, update_interval=0.1)
except KeyboardInterrupt:
if args["keithley"]:
dev.write(f"smua.source.output = smua.OUTPUT_OFF")
print("Monitoring cancelled, measurement might still continue" + " "*50)
else:
print("Measurement finished" + " "*50)
def monitor_count(count=5000, interval=settings["interval"], max_points_shown=160):
"""
@ -123,10 +126,10 @@ def monitor_count(count=5000, interval=settings["interval"], max_points_shown=16
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
try:
_measure_count(k, V=True, I=True, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05, testing=test)
_measure.measure_count(dev, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05)
except KeyboardInterrupt:
if not test:
k.write(f"smua.source.output = smua.OUTPUT_OFF")
if args["keithley"]:
dev.write(f"smua.source.output = smua.OUTPUT_OFF")
print("Monitoring cancelled, measurement might still continue" + " "*50)
else:
print("Measurement finished" + " "*50)
@ -147,10 +150,10 @@ def measure_count(count=5000, interval=settings["interval"]):
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
try:
_measure_count(k, V=True, I=True, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05, testing=test)
_measure.measure_count(dev, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05)
except KeyboardInterrupt:
if not test:
k.write(f"smua.source.output = smua.OUTPUT_OFF")
if args["keithley"]:
dev.write(f"smua.source.output = smua.OUTPUT_OFF")
print("Monitoring cancelled, measurement might still continue" + " "*50)
else:
print("Measurement finished" + " "*50)
@ -176,7 +179,7 @@ def monitor(interval=settings["interval"], max_measurements=None, max_points_sho
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown)
update_func = plt_monitor.update
_measure(k, interval=interval, max_measurements=max_measurements, update_func=update_func, testing=test)
_measure.measure(dev, interval=interval, max_measurements=max_measurements, update_func=update_func)
def measure(interval=settings["interval"], max_measurements=None):
@ -195,7 +198,7 @@ def measure(interval=settings["interval"], max_measurements=None):
_runtime_vars["last_measurement"] = dtime.now().isoformat()
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
update_func = _update_print
_measure(k, interval=interval, max_measurements=max_measurements, update_func=update_func, testing=test)
_measure.measure(dev, interval=interval, max_measurements=max_measurements, update_func=update_func)
def repeat(measure_func: callable, count: int, repeat_delay=0):
@ -221,7 +224,7 @@ def repeat(measure_func: callable, count: int, repeat_delay=0):
sleep(repeat_delay)
except KeyboardInterrupt:
pass
if settings["beep"]: k.write("beeper.beep(0.3, 1000)")
if settings["beep"]: _backend.beep()
def get_dataframe():
@ -229,21 +232,14 @@ def get_dataframe():
Get a pandas dataframe from the data in smua.nvbuffer1 and smua.nvbuffer2
"""
global k, settings, _runtime_vars
if test:
timestamps = np.arange(0, 50, 0.01)
ydata = np.array([testing.testcurve(t, amplitude=15, peak_width=2) for t in timestamps])
ibuffer = np.vstack((timestamps, ydata)).T
ydata = np.array([-testing.testcurve(t, amplitude=5e-8, peak_width=1) for t in timestamps])
vbuffer = np.vstack((timestamps, ydata)).T
else:
ibuffer = _keithley.collect_buffer(k, 1)
vbuffer = _keithley.collect_buffer(k, 2)
ibuffer = _backend.collect_buffer(dev, 1)
vbuffer = _backend.collect_buffer(dev, 2)
df = _data.buffers2dataframe(ibuffer, vbuffer)
df.basename = file_io.get_next_filename(settings["name"], settings["datadir"])
df.name = f"{df.basename} @ {_runtime_vars['last-measurement']}"
return df
def save_csv():
"""
Saves the contents of nvbuffer1 as .csv
@ -267,6 +263,7 @@ def save_pickle():
df.to_pickle(filename)
print(f"Saved as '{filename}'")
def run_script(script_path):
"""
Run a lua script on the Keithley device
@ -276,7 +273,7 @@ def run_script(script_path):
if test:
print("run_script: Test mode enabled, ignoring call to run_script")
else:
_keithley.run_lua(k, script_path=script_path)
_keithley.run_lua(dev, script_path=script_path)
def set(setting, value):
@ -305,16 +302,16 @@ def help(topic=None):
if topic == None:
print("""
Functions:
measure - take measurements
monitor - take measurements with live monitoring in a matplotlib window
measure_count - take a fixed number of measurements
monitor_count - take a fixed number of measurements with live monitoring in a matplotlib window
repeat - measure and save to csv multiple times
get_dataframe - return smua.nvbuffer 1 and 2 as pandas dataframe
save_csv - save the last measurement as csv file
save_pickle - save the last measurement as pickled pandas dataframe
load_dataframe - load a pandas dataframe from csv or pickle
run_script - run a lua script on the Keithely device
measure [kat] - take measurements
monitor [kat] - take measurements with live monitoring in a matplotlib window
measure_count [kat] - take a fixed number of measurements
monitor_count [kat] - take a fixed number of measurements with live monitoring in a matplotlib window
repeat [kat] - measure and save to csv multiple times
get_dataframe [kat] - return device internal buffer as pandas dataframe
save_csv [kat] - save the last measurement as csv file
save_pickle [kat] - save the last measurement as pickled pandas dataframe
load_dataframe [kat] - load a pandas dataframe from csv or pickle
run_script [k ] - run a lua script on the Keithely device
Run 'help(function)' to see more information on a function
Available topics:
@ -333,13 +330,13 @@ Run 'help("topic")' to see more information on a topic""")
Functions:
name("<name>") - short for set("name", "<name>")
set("setting", value) - set a setting to a value
save_settings() - store the settings as "k-teng.conf" in the working directory
save_settings() - store the settings as "m-teng.json" in the working directory
load_settings() - load settings from a file
The global variable 'config_path' determines the path used by save/load_settings. Use -c '<path>' to set another path.
The serach path is:
<working-dir>/k-teng.json
$XDG_CONFIG_HOME/k-teng.json
~/.config/k-teng.json
<working-dir>/m-teng.json
$XDG_CONFIG_HOME/m-teng.json
~/.config/m-teng.json
""")
elif topic == "imports":
print("""Imports:
@ -349,45 +346,37 @@ Functions:
os.path """)
elif topic == "device":
print("""Device:
The opened pyvisa resource (Keithley device) is the global variable 'k'.
keithley backend:
The opened pyvisa resource (deveithley device) is the global variable 'dev'.
You can interact using pyvisa functions, such as
k.write("command"), k.query("command") etc. to interact with the device.""")
k.write("command"), k.query("command") etc. to interact with the device.
arduino backend:
The Arduino will be avaiable as BleakClient using the global variable 'dev'. """)
else:
print(topic.__doc__)
def init():
global k, settings, test, config_path
print(r""" ____ __. ______________________ _______ ________
| |/ _| \__ ___/\_ _____/ \ \ / _____/
| < ______ | | | __)_ / | \ / \ ___
| | \ /_____/ | | | \/ | \\ \_\ \
|____|__ \ |____| /_______ /\____|__ / \______ /
\/ \/ \/ \/ 1.1
global dev, settings, config_path
print(r""" ______________________ _______ ________
_____ \__ ___/\_ _____/ \ \ / _____/
/ \ ______| | | __)_ / | \ / \ ___
| Y Y \/_____/| | | \/ | \\ \_\ \
|__|_| / |____| /_______ /\____|__ / \______ /
\/ \/ \/ \/ 1.2
Interactive Shell for TENG measurements with Keithley 2600B
---
Enter 'help()' for a list of commands""")
from os import environ
if path.isfile("k-teng.json"):
config_path = "k-teng.json"
if path.isfile("m-teng.json"):
config_path = "m-teng.json"
elif 'XDG_CONFIG_HOME' in environ.keys():
# and path.isfile(environ["XDG_CONFIG_HOME"] + "/k-teng.json"):
config_path = environ["XDG_CONFIG_HOME"] + "/k-teng.json"
# and path.isfile(environ["XDG_CONFIG_HOME"] + "/m-teng.json"):
config_path = environ["XDG_CONFIG_HOME"] + "/m-teng.json"
else:
config_path = path.expanduser("~/.config/k-teng.json")
from sys import argv
i = 1
while i < len(argv):
if argv[i] in ["-t", "--test"]:
test = True
elif argv[i] in ["-c", "--config"]:
if i+1 < len(argv):
config_path = argv[i+1]
else:
print("-c requires an extra argument: path of config file")
i += 1
i += 1
config_path = path.expanduser("~/.config/m-teng.json")
if args["config"]:
config_path = args["config"]
if not path.isdir(path.dirname(config_path)):
@ -399,15 +388,12 @@ Enter 'help()' for a list of commands""")
if not path.isdir(settings["datadir"]):
makedirs(settings["datadir"])
if not test:
from .keithley.keithley import init_keithley
try:
k = init_keithley(beep_success=settings["beep"])
dev = _backend.init(beep_success=settings["beep"])
except Exception as e:
print(e)
exit()
else:
print("Running in test mode, device will not be connected.")
exit(1)
atexit.register(_backend.exit, dev)
if __name__ == "__main__":

123
m_teng/update_funcs.py Normal file
View File

@ -0,0 +1,123 @@
import matplotlib.pyplot as plt
import numpy as np
import torch
from teng_ml.util import model_io as mio
from teng_ml.util.settings import MLSettings
from teng_ml.util.split import DataSplitter
from m_teng.backends.keithley import keithley
def _update_print(i, ival, vval):
print(f"n = {i:5d}, I = {ival: .12f} A, U = {vval: .5f} V" + " "*10, end='\r')
class _Monitor:
"""
Monitor v and i data
"""
def __init__(self, max_points_shown=None, use_print=False):
self.max_points_shown = max_points_shown
self.use_print = use_print
self.index = []
self.vdata = []
self.idata = []
plt.ion()
self.fig1, (self.vax, self.iax) = plt.subplots(2, 1, figsize=(8, 5))
self.vline, = self.vax.plot(self.index, self.vdata, color="g")
self.vax.set_ylabel("Voltage [V]")
self.vax.grid(True)
self.iline, = self.iax.plot(self.index, self.idata, color="m")
self.iax.set_ylabel("Current [A]")
self.iax.grid(True)
def update(self, i, ival, vval):
if self.use_print:
_update_print(i, ival, vval)
self.index.append(i)
self.idata.append(ival)
self.vdata.append(vval)
# update data
self.iline.set_xdata(self.index)
self.iline.set_ydata(self.idata)
self.vline.set_xdata(self.index)
self.vline.set_ydata(self.vdata)
# recalculate limits and set them for the view
self.iax.relim()
self.vax.relim()
if self.max_points_shown and i > self.max_points_shown:
self.iax.set_xlim(i - self.max_points_shown, i)
self.vax.set_xlim(i - self.max_points_shown, i)
self.iax.autoscale_view()
self.vax.autoscale_view()
# update plot
self.fig1.canvas.draw()
self.fig1.canvas.flush_events()
def __del__(self):
plt.close(self.fig1)
class _ModelPredict:
colors = ["red", "green", "purple", "blue", "orange", "grey", "cyan"]
def __init__(self, instr, model_dir):
"""
@param model_dir: directory where model.plk and settings.pkl are stored
Predict the values that are currently being recorded
@details:
Load the model and model settings from model dir
Wait until the number of recoreded points is >= the size of the models DataSplitter
Collect the data from the keithley, apply the transforms and predict the label with the model
Shows the prediction with a bar plot
"""
self.instr = instr
self.model = mio.load_model(model_dir)
self.model_settings: MLSettings = mio.load_settings(model_dir)
if type(self.model_settings.splitter) == DataSplitter:
self.data_length = self.model_settings.splitter.split_size
else:
self.data_length = 200
plt.ion()
self.fig1, (self.ax) = plt.subplots(1, 1, figsize=(8, 5))
self.bar_cont = self.ax.bar(self.model_settings.labels.get_labels(), [ 1 for _ in range(len(self.model_settings.labels))])
self.ax.set_ylabel("Prediction")
self.ax.grid(True)
def update(self, i, ival, vval):
buffer_size = keithley.get_buffer_size(self.instr, buffer_nr=1)
if buffer_size <= self.data_length:
print(f"ModelPredict.update: buffer_size={buffer_size} < {self.data_length}")
return
else:
ibuffer = keithley.collect_buffer_range(self.instr, (buffer_size-self.data_length, buffer_size), buffer_nr=1)
vbuffer = keithley.collect_buffer_range(self.instr, (buffer_size-self.data_length, buffer_size), buffer_nr=2)
if self.model_settings.num_features == 1: # model uses only voltage
data = np.vstack((ibuffer[:,0], ibuffer[:,1], vbuffer[:,1])).T
# print(f"data.shape:", data.shape)
else:
raise NotImplementedError(f"Cant handle models with num_features != 1 yet")
for t in self.model_settings.transforms:
data = t(data)
data = np.reshape(data[:,2], (1, -1, 1)) # batch_size, seq, features
with torch.no_grad():
x = torch.FloatTensor(data) # select voltage data, without timestamps
# print(x.shape)
prediction = self.model(x) # (batch_size, label-predictions)
prediction = torch.nn.functional.softmax(prediction) # TODO remove when softmax is already applied by model
predicted = torch.argmax(prediction, dim=1, keepdim=False) # -> [ label_indices ]
# print(f"raw={prediction[0]}, predicted_index={predicted[0}")
label = self.model_settings.labels[predicted[0]]
# print(f"-> label={label}")
self.bar_cont.remove()
self.bar_cont = self.ax.bar(self.model_settings.labels.get_labels(), prediction[0], color=_ModelPredict.colors[:len(self.model_settings.labels)])
# update plot
self.fig1.canvas.draw()
self.fig1.canvas.flush_events()

View File

17
m_teng/utility/testing.py Normal file
View File

@ -0,0 +1,17 @@
import numpy as np
def testcurve(x, frequency=10, peak_width=2, amplitude=20, bias=0):
# 0 = pk - width
# 2pi = pk + width
# want peak at n*time == frequency
nearest_peak = np.round(x / frequency, 0)
# if not peak at 0 and within peak_width
if nearest_peak > 0 and np.abs((x - nearest_peak * frequency)) < peak_width:
# return sin that does one period within 2*peak_width
return amplitude * np.sin(2*np.pi * (x - nearest_peak * frequency - peak_width) / (2*peak_width)) + bias
else:
return bias
def get_testcurve(frequency=10, peak_width=2, amplitude=20, bias=0):
return np.vectorize(lambda x: testcurve(x, frequency=frequency, peak_width=peak_width, amplitude=amplitude, bias=bias))

File diff suppressed because one or more lines are too long

35
pyproject.toml Normal file
View File

@ -0,0 +1,35 @@
[build-system]
requires = ["setuptools"]
[project]
name = "k_teng"
version = "1.0.0"
description = "Interactive utility for I-V measurements with a Keitley 2600 SMU"
requires-python = ">=3.10"
readme = "readme.md"
license = {file = "LICENSE"}
authors = [
{ name = "Matthias Quintern", email = "matthias@quintern.xyz" }
]
classifiers = [
"Operating System :: POSIX :: Linux",
"Environment :: Console",
"Programming Language :: Python :: 3",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
]
dependencies = [
"matplotlib>=3.6",
"numpy",
"pyvisa",
"pyvisa-py"
]
[project.urls]
repository = "https://git.quintern.xyz/MatthiasQuintern/k-teng"
[tool.setuptools.packages.find]
where = ["."]
[project.scripts]
k-teng = "k_teng.k_teng_interactive:main"

View File

@ -1,23 +1,39 @@
# K-TENG
Helper scripts and shell for measuring **T**ribo**e**lectric **N**ano**g**enerator-based sensor output with a Keithley 2611B SMU using pyvisa
# m-TENG
Helper scripts and shell for measuring **T**ribo**e**lectric **N**ano**g**enerator-based sensor output with a Keithley 2611B SMU or an Arduino
## Features
### Useful functions for scripts
- Measure Voltage and/or Current
- Transfer buffer from device to host
- Save/load as csv
- Run lua script on device
- Auto-filenames
### Interactive (shell) mode
- Live view
- Press button to stop
- Save and load settings (default interval, data directory...)
- Easily run arbitrary command on device
## Shell mode
Start with:
```shell
ipython -i k_teng_interactive.py
```
Use `help()` to get a list of available commands
### Useful functions for scripts
- Measure voltage and/or current
- Transfer buffer from measurement device to host
- Save/load as csv
- Run lua script on Keithley SMU
- Auto-filenames
## Available backends
### keithley
Use a Keithley 2611B Source-Measure-Unit via *pyvisa*. This backend allows measuring both voltage and current simultaneously.
### arduino
Use a Bluetooth capable Arduino with [https://git.quintern.xyz/MatthiasQuintern/teng-arduino](this software on the arduino).
This backend only allows measuring voltage using an Arduinos analog input pin (0 - 3.3 V, 12 bit resolution).
### testing
Use the shell without measuring TENG output. When starting a measurement, sample data will be generated.
## Shell mode
It is recommended to run the shell with ipython:
```shell
ipython -i k_teng_interactive.py -- -*X*
```
Substitute *X* for `-k` for keithley backend, `-a` for arduino backend or `-t` for testing backend.
In the shell, run `help()` to get a list of available commands