Compare commits

...

4 Commits

Author SHA1 Message Date
Matthias@Dell
b526bfb913 restructured using backends 2023-06-24 12:28:21 +02:00
Matthias@Dell
0eed4d402b added help for different backends 2023-06-24 12:28:12 +02:00
Matthias@Dell
0494881232 updated readme 2023-06-24 12:27:45 +02:00
Matthias@Dell
a0f8a2a903 renamed to m-teng 2023-06-18 17:38:10 +02:00
27 changed files with 696 additions and 803 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
__pycache__
.ipynb_checkpoints
testing

2
MANIFEST.in Normal file
View File

@ -0,0 +1,2 @@
include regina/package-data/*
include regina/sql/*.sql

1
bspc_rule.sh Executable file
View File

@ -0,0 +1 @@
bspc rule -a matplotlib desktop=^1 state=pseudo_tiled focus=off

View File

@ -1,107 +0,0 @@
from time import sleep
import numpy as np
from matplotlib import pyplot as plt
import pyvisa
from .keithley import reset
from ..utility import testing as _testing
def measure_count(instr, V=True, I=True, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True, testing=False):
"""
Take <count> measurements with <interval> inbetween
@details
Uses the devices overlappedY function to make the measurements asynchronosly
The update_func is optional and only used when I == True and V == True
The update_func does not necessarily get all the values that are measured. To obtain the whole measurement, get them from the device buffers (smua.nvbufferX)
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param update_interval: interval at which the update_func is called
"""
f_meas = None
if V and I:
f_meas = "smua.measure.overlappediv(smua.nvbuffer1, smua.nvbuffer2)"
elif V:
f_meas = "smua.measure.overlappedv(smua.nvbuffer1)"
elif I:
f_meas = "smua.measure.overlappedi(smua.nvbuffer1)"
else:
print("I and/or V needs to be set to True")
return
i = 0
if not testing:
reset(instr, verbose=verbose)
instr.write(f"smua.measure.count = {count}")
instr.write(f"smua.measure.interval = {interval}")
# start measurement
instr.write(f"smua.source.output = smua.OUTPUT_ON")
instr.write(f_meas)
condition = lambda: float(instr.query("print(status.operation.measuring.condition)").strip("\n ")) != 0
else:
condition = lambda: i < int(float(count) * interval / update_interval)
sleep(update_interval)
# for live viewing
query = """if smua.nvbufferX.n > 0 then print(smua.nvbufferX.readings[smua.nvbufferX.n]) else print(0) end"""
# will return 2.0 while measruing
while condition():
if update_func and V and I:
try:
if not testing:
ival = float(instr.query(query.replace("X", "1")).strip("\n"))
vval = float(instr.query(query.replace("X", "2")).strip("\n"))
else:
ival = _testing.testcurve(i, peak_width=1, amplitude=5e-8)
vval = -_testing.testcurve(i, peak_width=2, amplitude=15)
update_func(i, ival, vval)
except ValueError:
pass
sleep(update_interval)
i += 1
if not testing:
instr.write(f"smua.source.output = smua.OUTPUT_OFF")
if beep_done:
instr.write("beeper.beep(0.3, 1000)")
def measure(instr, interval, update_func=None, max_measurements=None, testing=False):
"""
@details:
- Resets the buffers
- Until KeyboardInterrupt:
- Take measurement
- Call update_func
- Wait interval
Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision
You can take the data from the buffer afterwards, using save_csv
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param max_measurements : maximum number of measurements. None means infinite
"""
if not testing:
reset(instr, verbose=True)
instr.write("smua.source.output = smua.OUTPUT_ON")
instr.write("format.data = format.ASCII\nformat.asciiprecision = 12")
try:
i = 0
while max_measurements is None or i < max_measurements:
if testing:
ival = _testing.testcurve(i, peak_width=1, amplitude=5e-8)
vval = -_testing.testcurve(i, peak_width=2, amplitude=15)
else:
ival, vval = tuple(float(v) for v in instr.query("print(smua.measure.iv(smua.nvbuffer1, smua.nvbuffer2))").strip('\n').split('\t'))
if update_func:
update_func(i, ival, vval)
sleep(interval)
i += 1
except KeyboardInterrupt:
pass
if not testing:
instr.write("smua.source.output = smua.OUTPUT_OFF")
print("Measurement stopped" + " "*50)

View File

@ -1,4 +0,0 @@
pdms
kapton
plastic

View File

@ -1,49 +0,0 @@
readings = [
4.974127e-10, -1.418591e-12, -1.573563e-12, -1.728535e-12, -1.704693e-12,
-1.847744e-12, -1.931190e-12, -1.788139e-12, -1.871586e-12, -2.169609e-12,
-2.074242e-12, -1.895428e-12, -1.895428e-12, -1.776218e-12, -1.990795e-12,
-1.788139e-12, -1.811981e-12, -1.657009e-12, -1.573563e-12, -2.396107e-12,
-2.515316e-12, -2.765656e-12, -2.825260e-12, -3.147125e-12, -2.300739e-12,
-2.825260e-12, -3.278255e-12, -5.257130e-12, -6.818771e-12, -8.916855e-12,
-7.712841e-12, 6.437302e-12, -1.142025e-11, -1.206398e-11, -4.649043e-10,
-3.427613e-09, -2.460408e-09, -2.340376e-09, -1.306653e-10, 1.496077e-11,
2.933741e-11, 1.953280e-09, 8.579970e-10, 9.226799e-12, -1.095533e-11,
-2.508163e-11, -2.776039e-09, -8.686423e-09, 4.935264e-12, 1.246929e-11,
3.225744e-09, 2.814472e-09, 1.877034e-09, 2.229273e-09, 1.713574e-09,
8.355618e-10, -4.332781e-10, 5.896091e-11, 5.762577e-11, 8.129537e-09,
4.044378e-09, 1.771629e-09, 7.849216e-10, 4.098892e-10, 3.390551e-10,
2.956390e-10, 3.033876e-10, 1.716256e-10, 1.463890e-11, -5.078316e-12,
-6.949902e-12, -8.106232e-12, -6.473065e-12, -4.506111e-12, 4.919767e-11,
3.052297e-08, 1.161162e-08, -9.892106e-09, -3.613818e-09, -5.004287e-09,
-2.015829e-11, -4.183054e-11, -1.810908e-10, -2.042532e-10, -3.516316e-10,
5.099773e-11, 1.921976e-08, -1.256589e-08, -4.242897e-10, -1.358986e-12,
-3.445148e-12, -3.838539e-12, -4.184246e-12, -7.402897e-12, -2.840877e-10,
-2.872229e-10, -2.730966e-10, -1.134396e-10, -4.376173e-11, -3.576279e-14
]
timestamps = [
0. , 0.05 , 0.1 , 0.15, 0.2, 0.25, 0.3, 0.35,
0.4 , 0.45 , 0.5 , 0.55, 0.6, 0.65, 0.7, 0.75,
0.8 , 0.85 , 0.9 , 0.95, 1. , 1.05, 1.1, 1.15,
1.2 , 1.25 , 1.3 , 1.35, 1.4, 1.45, 1.5, 1.55,
1.6 , 1.690891, 1.710923, 1.75, 1.8, 1.85, 1.9, 1.95,
2. , 2.05 , 2.1 , 2.15, 2.2, 2.25, 2.3, 2.35,
2.420332, 2.45 , 2.5 , 2.55, 2.6, 2.65, 2.7, 2.75,
2.820553, 2.890843, 2.910875, 2.95, 3. , 3.05, 3.1, 3.15,
3.2 , 3.25 , 3.3 , 3.35, 3.4, 3.45, 3.5, 3.55,
3.6 , 3.65 , 3.7 , 3.75, 3.8, 3.85, 3.9, 3.95,
4. , 4.05 , 4.1 , 4.15, 4.2, 4.25, 4.3, 4.35,
4.4 , 4.45 , 4.5 , 4.55, 4.6, 4.65, 4.7, 4.75,
4.8 , 4.85 , 4.9 , 4.95, ]
import pandas as pd
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
df = pd.DataFrame(np.vstack((timestamps, readings)).T)
df.columns = ["Time [s]", "Voltage [V]"]
print(df)
df.to_csv("test.csv", sep=",", header=True, index=False)
df.plot(x='Time [s]', y="Voltage [V]")
plt.show()

View File

@ -1,33 +0,0 @@
import numpy as np
import matplotlib.pyplot as plt
def testcurve(x, frequency=10, peak_width=2, amplitude=20, bias=0):
# want peak at n*time == frequency
nearest_peak = np.round(x / frequency, 0)
# if not peak at 0 and within peak_width
# print(x, nearest_peak)
if nearest_peak > 0 and abs((x - nearest_peak * frequency)) < peak_width:
# return sin that does one period within 2*peak_width
return amplitude * np.sin(2*np.pi * (x - nearest_peak * frequency - peak_width) / (2*peak_width)) + bias
else:
return bias
# 0 = pk - width
# 2pi = pk + width
def baseline(data):
# find the value where the most values with low gradients are closest to
gradients = np.abs(np.gradient(data))
# consider the values where the absolute gradient is in the bottom 20%
n_gradients = len(data) // 20
consider_indices = np.argsort(gradients)[:n_gradients]
# of those, only consider values where the value
consider_values = data[consider_indices]
xdata = np.arange(0, 100, 0.01)
ydata = np.vectorize(testcurve)(xdata)
plt.plot(xdata, ydata)
plt.show()

View File

@ -1,38 +0,0 @@
import pandas as pd
import numpy as np
def collect_buffer(instr, buffer_nr=1):
"""
Get the buffer as 2D - np.array
@param instr : pyvisa instrument
@param buffer_nr : 1 or 2, for smua.nvbuffer1 or 2
@returns 2D numpy array:
i - ith reading:
0: timestamps
1: readings
"""
if buffer_nr == 2: buffername = "smua.nvbuffer2"
else: buffername = "smua.nvbuffer1"
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
timestamps = instr.query_ascii_values(f"printbuffer(1, {buffername}.n, {buffername}.timestamps)", container=np.array)
readings = instr.query_ascii_values(f"printbuffer(1, {buffername}.n, {buffername}.readings)", container=np.array)
print(f"readings: {readings}, \ntimestamps: {timestamps}")
buffer = np.vstack((timestamps, readings)).T
return buffer
def testcurve(x, frequency=10, peak_width=2, amplitude=20, bias=0):
# want peak at n*time == frequency
nearest_peak = np.round(x / frequency, 0)
# if not peak at 0 and within peak_width
if nearest_peak > 0 and abs((x - nearest_peak * frequency)) < peak_width:
# return sin that does one period within 2*peak_width
return amplitude * np.sin(2*np.pi * (x - nearest_peak * frequency - peak_width) / (2*peak_width)) + bias
else:
return bias
# 0 = pk - width
# 2pi = pk + width

View File

@ -0,0 +1,155 @@
import bleak as b
import asyncio
import numpy as np
TARGET_NAME = "ArduinoTENG"
# GATT service and characteristics UUIDs
TENG_SUUID = "00010000-9a74-4b30-9361-4a16ec09930f"
TENG_STATUS_CUUID = "00010001-9a74-4b30-9361-4a16ec09930f"
TENG_COMMAND_CUUID = "00010002-9a74-4b30-9361-4a16ec09930f"
TENG_READING_CUUID = "00010003-9a74-4b30-9361-4a16ec09930f"
TENG_COUNT_CUUID = "00010004-9a74-4b30-9361-4a16ec09930f"
TENG_INTERVAL_CUUID = "00010005-9a74-4b30-9361-4a16ec09930f"
TENG_COMMANDS = {
"STOP": int(0).to_bytes(1, signed=False),
"MEASURE_COUNT": int(1).to_bytes(1, signed=False),
"MEASURE": int(2).to_bytes(1, signed=False),
}
TENG_STATUS = ["ERROR", "BUSY", "WAIT_CONNECT", "CONNECTED", "MEASURING"]
# TODO save measurements on device buffer, transfer later
# wrapper for global variable
class Buffer:
def __init__(self):
self.data = None
_buffer = Buffer()
# class Runner:
# def __init__(self):
runner = asyncio.Runner()
def teng_status_callback(characteristic, data):
value = int.from_bytes(data, byteorder="big", signed=False)
if 0 <= value and value < len(TENG_STATUS):
print(f"Status change: {TENG_STATUS[value]}")
else:
print(f"Status change (invalid): status={value}")
def disconnect_callback(client):
raise Exception(f"The Bluetooth device {client.name} was disconnected")
async def init_arduino_async(n_tries: int=5) -> b.BleakClient:
n_try = 0
if n_tries <= 0: n_tries = "inf"
try:
target_device = None
while target_device is None and (n_tries == "inf" or n_try < n_tries):
print(f"Searching for Bluetooth device '{TARGET_NAME}' ({n_try+1}/{n_tries})", end="\r")
devices = await b.BleakScanner.discover(return_adv=True, timeout=1.5)
# print(devices)
for adr, (device, adv_data) in devices.items():
if device.name == TARGET_NAME:
# print(adv_data)
target_device = device
break
n_try += 1
if target_device is None:
raise Exception(f"Could not find Bluetooth device 'ArduinoTENG'")
# print(f"Found target device: {target_device.name}: {target_device.metadata}, {target_device.details}")
# print(target_device.name, target_device.details)
client = b.BleakClient(target_device, disconnect_callback=disconnect_callback)
await client.connect()
print(f"Connected to Bluetooth device '{TARGET_NAME}' at [{client.address}]")
return client
except asyncio.exceptions.CancelledError:
raise Exception(f"Cancelled")
def init(beep_success=True, n_tries: int=5) -> b.BleakClient:
"""
Connect to the arduino
@returns: BleakClient
"""
client = runner.run(init_arduino_async(n_tries=n_tries))
if beep_success: beep(client)
return client
def exit(client):
try:
runner.run(stop_measurement(client))
runner.run(client.disconnect())
except Exception:
pass
async def set_interval(client, interval: float):
"""
Set the measurement interval
@param interval: interval in seconds
"""
interval = int(interval * 1000) # convert to ms for arduinos delay)
await client.write_gatt_char(TENG_INTERVAL_CUUID, interval.to_bytes(2, byteorder="little", signed=False))
async def set_count(client, count: int):
"""
Set the measurement count
@param count: number of measurements to take
"""
await client.write_gatt_char(TENG_COUNT_CUUID, count.to_bytes(2, byteorder="little", signed=False))
async def stop_measurement(client):
await client.write_gatt_char(TENG_COMMAND_CUUID, TENG_COMMANDS["STOP"])
async def start_measure_count(client):
await client.write_gatt_char(TENG_COMMAND_CUUID, TENG_COMMANDS["MEASURE_COUNT"])
async def start_measure(client):
await client.write_gatt_char(TENG_COMMAND_CUUID, TENG_COMMANDS["MEASURE"])
# async def main():
# for service in client.services:
# print(f"Service: {service.uuid}: {service.description}")
# for c in service.characteristics:
# print(f"\t{c.uuid}: {c.properties}, {c.descriptors}")
# teng_status = client.services.get_characteristic(TENG_STATUS_CUUID)
# teng_command = client.services.get_characteristic(TENG_COMMAND_CUUID)
# teng_reading = client.services.get_characteristic(TENG_READING_CUUID)
# client.start_notify(teng_status, teng_status_callback)
# await client.write_gatt_char(teng_command, TENG_COMMANDS["NOOP"])
# await asyncio.sleep(5)
# await client.write_gatt_char(teng_command, TENG_COMMANDS["MEASURE_BASELINE"])
# while client.is_connected:
# data = await client.read_gatt_char(teng_reading)
# value = int.from_bytes(data, byteorder="little", signed=False)
# print(f"Reading: {value}")
# await asyncio.sleep(0.5)
# except KeyboardInterrupt:
# pass
# except asyncio.exceptions.CancelledError:
# pass
# print("Disconnected")
def collect_buffer(instr, buffer_nr=1):
"""
@param buffer_nr: 1 -> current, 2 -> voltage
"""
assert(buffer_nr in (1, 2))
return np.vstack((_buffer.data[:,0], _buffer.data[:,buffer_nr])).T
def beep(client):
# TODO connect beeper to arduino?
print("beep")

View File

@ -0,0 +1,75 @@
import bleak as b
import numpy as np
import asyncio
import datetime
from m_teng.backends.arduino.arduino import beep, set_interval, set_count, TENG_READING_CUUID, _buffer, start_measure, start_measure_count, stop_measurement, runner
async def _measure_count_async(client, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True):
global _buffer
_buffer.data = np.zeros((count, 3))
i = 0
t_start = datetime.datetime.now()
async def add_reading(teng_reading_cr, reading: bytearray):
nonlocal i, count
if i >= count: return
_buffer.data[i][0] = float((datetime.datetime.now() - t_start).microseconds) / 1000
# reading = await client.read_gatt_char(TENG_READING_CUUID)
_buffer.data[i][2] = int.from_bytes(reading, byteorder="little", signed=False)
i += 1
await set_interval(client, interval)
await set_count(client, count)
# TODO check if notify works when the same value is written again
await client.start_notify(TENG_READING_CUUID, add_reading)
await start_measure_count(client)
while i < count:
await asyncio.sleep(update_interval)
if update_func is not None and i > 0: # assume an update has occured
update_func(i-1, 0, _buffer.data[i-1, 2])
await client.stop_notify(TENG_READING_CUUID)
if beep_done: beep(client)
def measure_count(client, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True):
runner.run(_measure_count_async(client, count=count, interval=interval, update_func=update_func, update_interval=update_interval, beep_done=beep_done, verbose=verbose))
async def _measure_async(client, interval, update_func=None, max_measurements=None):
global _buffer
readings = []
timestamps = []
i = 0
t_start = datetime.datetime.now()
async def add_reading(teng_reading_cr, reading):
nonlocal i
timestamps.append(float((datetime.datetime.now() - t_start).microseconds) / 1000)
reading = int.from_bytes(reading, byteorder="little", signed=False)
readings.append(reading)
if update_func:
try:
update_func(i, 0, reading)
except KeyboardInterrupt:
raise asyncio.exceptions.CancelledError("KeyboardInterrupt in update_func")
i += 1
await set_interval(client, interval)
await client.start_notify(TENG_READING_CUUID, add_reading)
await start_measure(client)
try:
while max_measurements is None or i < max_measurements:
await asyncio.sleep(0.1) #
except asyncio.exceptions.CancelledError:
pass
except KeyboardInterrupt:
pass
await client.stop_notify(TENG_READING_CUUID)
await stop_measurement(client)
_buffer.data = np.vstack((timestamps, np.zeros(len(timestamps)), readings)).T
print("Measurement stopped" + " "*50)
def measure(client, interval, update_func=None, max_measurements=None):
runner.run(_measure_async(client, interval=interval, update_func=update_func, max_measurements=max_measurements))

View File

View File

@ -1,20 +1,19 @@
import pyvisa
import numpy as np
import pkg_resources
"""
Utility
"""
script_dir = "../scripts/"
scripts = {
"buffer_reset": "buffer_reset.lua",
"smua_reset": "smua_reset.lua",
"buffer_reset": pkg_resources.resource_filename("m_teng", "keithley_scripts/buffer_reset.lua"),
"smua_reset": pkg_resources.resource_filename("m_teng", "keithley_scripts/smua_reset.lua"),
}
for key,val in scripts.items():
scripts[key] = script_dir + scripts[key]
def init_keithley(beep_success=True):
def init(beep_success=True):
rm = pyvisa.ResourceManager('@py')
resources = rm.list_resources()
if len(resources) < 1:
@ -32,6 +31,11 @@ def init_keithley(beep_success=True):
return keithley
def exit(instr):
instr.close()
def run_lua(instr, script_path, verbose=False):
"""
Run a lua script from the host on the instrument
@ -52,6 +56,15 @@ def reset(instr, verbose=False):
run_lua(instr, scripts["smua_reset"], verbose=verbose)
run_lua(instr, scripts["buffer_reset"], verbose=verbose)
def get_buffer_name(buffer_nr: int):
if buffer_nr == 2: return "smua.nvbuffer2"
elif buffer_nr == 1: return "smua.nvbuffer1"
raise ValueError(f"Invalid buffer_nr: {buffer_nr}")
def get_buffer_size(instr, buffer_nr=1):
n = instr.query(f"print({get_buffer_name(buffer_nr)}.n)").strip("\n")
return int(float(n))
def collect_buffer(instr, buffer_nr=1, verbose=False):
"""
@ -63,8 +76,7 @@ def collect_buffer(instr, buffer_nr=1, verbose=False):
0: timestamps
1: readings
"""
if buffer_nr == 2: buffername = "smua.nvbuffer2"
else: buffername = "smua.nvbuffer1"
buffername = get_buffer_name(buffer_nr)
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
@ -75,3 +87,30 @@ def collect_buffer(instr, buffer_nr=1, verbose=False):
buffer = np.vstack((timestamps, readings)).T
return buffer
def collect_buffer_range(instr, range_=(1, -1), buffer_nr=1, verbose=False):
"""
Get the buffer as 2D - np.array
@param instr : pyvisa instrument
@param buffer_nr : 1 or 2, for smua.nvbuffer1 or 2
@returns 2D numpy array:
i - ith reading:
0: timestamps
1: readings
"""
buffername = get_buffer_name(buffer_nr)
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
if range_[1] == -1:
range_ = (range_[0], f"{buffername}.n")
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
timestamps = instr.query_ascii_values(f"printbuffer({range_[0]}, {range_[1]}, {buffername}.timestamps)", container=np.array)
readings = instr.query_ascii_values(f"printbuffer({range_[0]}, {range_[1]}, {buffername}.readings)", container=np.array)
if verbose:
print(f"readings from {buffername}: {readings}, \ntimestamps: {timestamps}")
buffer = np.vstack((timestamps, readings)).T
return buffer

View File

@ -0,0 +1,93 @@
from time import sleep
import numpy as np
from matplotlib import pyplot as plt
import pyvisa
from m_teng.backends.keithley import reset
from m_teng.utility import testing as _testing
def measure_count(instr, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True):
"""
Take <count> measurements with <interval> inbetween
@details
Uses the devices overlappedY function to make the measurements asynchronosly
The update_func is optional and only used when I == True and V == True
The update_func does not necessarily get all the values that are measured. To obtain the whole measurement, get them from the device buffers (smua.nvbufferX)
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param update_interval: interval at which the update_func is called
"""
f_meas = "smua.measure.overlappediv(smua.nvbuffer1, smua.nvbuffer2)"
# if V and I:
# elif V:
# f_meas = "smua.measure.overlappedv(smua.nvbuffer1)"
# elif I:
# f_meas = "smua.measure.overlappedi(smua.nvbuffer1)"
# else:
# print("I and/or V needs to be set to True")
# return
i = 0
reset(instr, verbose=verbose)
instr.write(f"smua.measure.count = {count}")
instr.write(f"smua.measure.interval = {interval}")
# start measurement
instr.write(f"smua.source.output = smua.OUTPUT_ON")
instr.write(f_meas)
sleep(update_interval)
# for live viewing
query = """if smua.nvbufferX.n > 0 then print(smua.nvbufferX.readings[smua.nvbufferX.n]) else print(0) end"""
# will return 2.0 while measruing
while float(instr.query("print(status.operation.measuring.condition)").strip("\n ")) != 0:
if update_func:
try:
ival = float(instr.query(query.replace("X", "1")).strip("\n"))
vval = float(instr.query(query.replace("X", "2")).strip("\n"))
update_func(i, ival, vval)
except ValueError as e:
if i != 0:
pass
else:
print(f"measure_count: ValueError: {e}")
sleep(update_interval)
i += 1
instr.write(f"smua.source.output = smua.OUTPUT_OFF")
if beep_done:
instr.write("beeper.beep(0.3, 1000)")
def measure(instr, interval, update_func=None, max_measurements=None):
"""
@details:
- Resets the buffers
- Until KeyboardInterrupt:
- Take measurement
- Call update_func
- Wait interval
Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision
You can take the data from the buffer afterwards, using save_csv
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param max_measurements : maximum number of measurements. None means infinite
"""
reset(instr, verbose=True)
instr.write("smua.source.output = smua.OUTPUT_ON")
instr.write("format.data = format.ASCII\nformat.asciiprecision = 12")
try:
i = 0
while max_measurements is None or i < max_measurements:
ival, vval = tuple(float(v) for v in instr.query("print(smua.measure.iv(smua.nvbuffer1, smua.nvbuffer2))").strip('\n').split('\t'))
if update_func:
update_func(i, ival, vval)
sleep(interval)
i += 1
except KeyboardInterrupt:
pass
instr.write("smua.source.output = smua.OUTPUT_OFF")
print("Measurement stopped" + " "*50)

View File

@ -1,6 +1,6 @@
"""
run this before using this library:
ipython -i k_teng_interactive.py
ipython -i m_teng_interactive.py
always records iv-t curves
i-data -> smua.nvbuffer1
@ -17,22 +17,56 @@ from time import sleep
from os import path, makedirs
import pickle as pkl
import json
import atexit
import argparse
if __name__ == "__main__":
import sys
if __package__ is None:
# make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change
__package__ = "k-teng"
import sys
__package__ = "m_teng"
from os import path
filepath = path.realpath(path.abspath(__file__))
sys.path.insert(0, path.dirname(path.dirname(filepath)))
parser = argparse.ArgumentParser(
prog="m-teng",
description="measure triboelectric nanogenerator output using a Keithley SMU or an Arduino",
)
backend_group = parser.add_mutually_exclusive_group(required=True)
backend_group.add_argument("-k", "--keithley", action="store_true")
backend_group.add_argument("-a", "--arduino", action="store_true")
backend_group.add_argument("-t", "--testing", action='store_true')
parser.add_argument("-c", "--config", action="store", help="alternate path to config file")
args = vars(parser.parse_args())
i = 1
while i < len(sys.argv):
if args["keithley"]:
import m_teng.backends.keithley.keithley as _backend
import m_teng.backends.keithley.measure as _measure
elif args["arduino"]:
import m_teng.backends.arduino.arduino as _backend
import m_teng.backends.arduino.measure as _measure
elif args["testing"]:
import m_teng.backends.testing.testing as _backend
import m_teng.backends.testing.measure as _measure
elif sys.argv[i] in ["-c", "--config"]:
if i+1 < len(sys.argv):
config_path = sys.argv[i+1]
else:
print("-c requires an extra argument: path of config file")
i += 1
i += 1
from .keithley import keithley as _keithley
from .keithley.measure import measure_count as _measure_count, measure as _measure
from .utility import data as _data
from .utility.data import load_dataframe
from .utility import file_io
from m_teng.utility import data as _data
from m_teng.utility.data import load_dataframe
from m_teng.utility import file_io
from m_teng.update_funcs import _Monitor, _ModelPredict, _update_print
config_path = path.expanduser("~/.config/k-teng.json")
_runtime_vars = {
"last-measurement": ""
@ -44,66 +78,35 @@ settings = {
"interval": 0.02,
"beep": True,
}
config_path = path.expanduser("~/.config/k-teng.json")
test = False
# global variable for the instrument returned by pyvisa
k = None
# global variable for the instrument/client returned by pyvisa/bleak
dev = None
def _update_print(i, ival, vval):
print(f"n = {i:5d}, I = {ival: .12f} A, U = {vval: .5f} V" + " "*10, end='\r')
class _Monitor:
def monitor_predict(model_dir: str, count=5000, interval=settings["interval"], max_points_shown=160):
"""
Monitor v and i data
Take <count> measurements in <interval> and predict with a machine learning model
"""
def __init__(self, max_points_shown=None, use_print=False):
self.max_points_shown = max_points_shown
self.use_print = use_print
self.index = []
self.vdata = []
self.idata = []
plt.ion()
self.fig1, (self.vax, self.iax) = plt.subplots(2, 1, figsize=(8, 5))
self.vline, = self.vax.plot(self.index, self.vdata, color="g")
self.vax.set_ylabel("Voltage [V]")
self.vax.grid(True)
self.iline, = self.iax.plot(self.index, self.idata, color="m")
self.iax.set_ylabel("Current [A]")
self.iax.grid(True)
def update(self, i, ival, vval):
if self.use_print:
_update_print(i, ival, vval)
self.index.append(i)
self.idata.append(ival)
self.vdata.append(vval)
# update data
self.iline.set_xdata(self.index)
self.iline.set_ydata(self.idata)
self.vline.set_xdata(self.index)
self.vline.set_ydata(self.vdata)
# recalculate limits and set them for the view
self.iax.relim()
self.vax.relim()
if self.max_points_shown and i > self.max_points_shown:
self.iax.set_xlim(i - self.max_points_shown, i)
self.vax.set_xlim(i - self.max_points_shown, i)
self.iax.autoscale_view()
self.vax.autoscale_view()
# update plot
self.fig1.canvas.draw()
self.fig1.canvas.flush_events()
def __del__(self):
plt.close(self.fig1)
model_predict = _ModelPredict(dev, model_dir)
plt_monitor = _Monitor(max_points_shown, use_print=False)
skip_n = 0
def update(i, ival, vval):
plt_monitor.update(i, ival, vval)
if skip_n % 10 == 0:
model_predict.update(i, ival, vval)
skip_n += 1
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
try:
_measure.measure_count(dev, count=count, interval=interval, beep_done=False, verbose=False, update_func=update, update_interval=0.1)
except KeyboardInterrupt:
if args["keithley"]:
dev.write(f"smua.source.output = smua.OUTPUT_OFF")
print("Monitoring cancelled, measurement might still continue" + " "*50)
else:
print("Measurement finished" + " "*50)
def monitor_count(count=5000, interval=settings["interval"], max_points_shown=160):
"""
@ -123,10 +126,10 @@ def monitor_count(count=5000, interval=settings["interval"], max_points_shown=16
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
try:
_measure_count(k, V=True, I=True, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05, testing=test)
_measure.measure_count(dev, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05)
except KeyboardInterrupt:
if not test:
k.write(f"smua.source.output = smua.OUTPUT_OFF")
if args["keithley"]:
dev.write(f"smua.source.output = smua.OUTPUT_OFF")
print("Monitoring cancelled, measurement might still continue" + " "*50)
else:
print("Measurement finished" + " "*50)
@ -147,10 +150,10 @@ def measure_count(count=5000, interval=settings["interval"]):
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
try:
_measure_count(k, V=True, I=True, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05, testing=test)
_measure.measure_count(dev, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05)
except KeyboardInterrupt:
if not test:
k.write(f"smua.source.output = smua.OUTPUT_OFF")
if args["keithley"]:
dev.write(f"smua.source.output = smua.OUTPUT_OFF")
print("Monitoring cancelled, measurement might still continue" + " "*50)
else:
print("Measurement finished" + " "*50)
@ -176,7 +179,7 @@ def monitor(interval=settings["interval"], max_measurements=None, max_points_sho
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown)
update_func = plt_monitor.update
_measure(k, interval=interval, max_measurements=max_measurements, update_func=update_func, testing=test)
_measure.measure(dev, interval=interval, max_measurements=max_measurements, update_func=update_func)
def measure(interval=settings["interval"], max_measurements=None):
@ -195,7 +198,7 @@ def measure(interval=settings["interval"], max_measurements=None):
_runtime_vars["last_measurement"] = dtime.now().isoformat()
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
update_func = _update_print
_measure(k, interval=interval, max_measurements=max_measurements, update_func=update_func, testing=test)
_measure.measure(dev, interval=interval, max_measurements=max_measurements, update_func=update_func)
def repeat(measure_func: callable, count: int, repeat_delay=0):
@ -221,7 +224,7 @@ def repeat(measure_func: callable, count: int, repeat_delay=0):
sleep(repeat_delay)
except KeyboardInterrupt:
pass
if settings["beep"]: k.write("beeper.beep(0.3, 1000)")
if settings["beep"]: _backend.beep()
def get_dataframe():
@ -229,21 +232,14 @@ def get_dataframe():
Get a pandas dataframe from the data in smua.nvbuffer1 and smua.nvbuffer2
"""
global k, settings, _runtime_vars
if test:
timestamps = np.arange(0, 50, 0.01)
ydata = np.array([testing.testcurve(t, amplitude=15, peak_width=2) for t in timestamps])
ibuffer = np.vstack((timestamps, ydata)).T
ydata = np.array([-testing.testcurve(t, amplitude=5e-8, peak_width=1) for t in timestamps])
vbuffer = np.vstack((timestamps, ydata)).T
else:
ibuffer = _keithley.collect_buffer(k, 1)
vbuffer = _keithley.collect_buffer(k, 2)
ibuffer = _backend.collect_buffer(dev, 1)
vbuffer = _backend.collect_buffer(dev, 2)
df = _data.buffers2dataframe(ibuffer, vbuffer)
df.basename = file_io.get_next_filename(settings["name"], settings["datadir"])
df.name = f"{df.basename} @ {_runtime_vars['last-measurement']}"
return df
def save_csv():
"""
Saves the contents of nvbuffer1 as .csv
@ -267,6 +263,7 @@ def save_pickle():
df.to_pickle(filename)
print(f"Saved as '{filename}'")
def run_script(script_path):
"""
Run a lua script on the Keithley device
@ -276,7 +273,7 @@ def run_script(script_path):
if test:
print("run_script: Test mode enabled, ignoring call to run_script")
else:
_keithley.run_lua(k, script_path=script_path)
_keithley.run_lua(dev, script_path=script_path)
def set(setting, value):
@ -305,16 +302,16 @@ def help(topic=None):
if topic == None:
print("""
Functions:
measure - take measurements
monitor - take measurements with live monitoring in a matplotlib window
measure_count - take a fixed number of measurements
monitor_count - take a fixed number of measurements with live monitoring in a matplotlib window
repeat - measure and save to csv multiple times
get_dataframe - return smua.nvbuffer 1 and 2 as pandas dataframe
save_csv - save the last measurement as csv file
save_pickle - save the last measurement as pickled pandas dataframe
load_dataframe - load a pandas dataframe from csv or pickle
run_script - run a lua script on the Keithely device
measure [kat] - take measurements
monitor [kat] - take measurements with live monitoring in a matplotlib window
measure_count [kat] - take a fixed number of measurements
monitor_count [kat] - take a fixed number of measurements with live monitoring in a matplotlib window
repeat [kat] - measure and save to csv multiple times
get_dataframe [kat] - return device internal buffer as pandas dataframe
save_csv [kat] - save the last measurement as csv file
save_pickle [kat] - save the last measurement as pickled pandas dataframe
load_dataframe [kat] - load a pandas dataframe from csv or pickle
run_script [k ] - run a lua script on the Keithely device
Run 'help(function)' to see more information on a function
Available topics:
@ -333,13 +330,13 @@ Run 'help("topic")' to see more information on a topic""")
Functions:
name("<name>") - short for set("name", "<name>")
set("setting", value) - set a setting to a value
save_settings() - store the settings as "k-teng.conf" in the working directory
save_settings() - store the settings as "m-teng.json" in the working directory
load_settings() - load settings from a file
The global variable 'config_path' determines the path used by save/load_settings. Use -c '<path>' to set another path.
The serach path is:
<working-dir>/k-teng.json
$XDG_CONFIG_HOME/k-teng.json
~/.config/k-teng.json
<working-dir>/m-teng.json
$XDG_CONFIG_HOME/m-teng.json
~/.config/m-teng.json
""")
elif topic == "imports":
print("""Imports:
@ -349,45 +346,37 @@ Functions:
os.path """)
elif topic == "device":
print("""Device:
The opened pyvisa resource (Keithley device) is the global variable 'k'.
You can interact using pyvisa functions, such as
k.write("command"), k.query("command") etc. to interact with the device.""")
keithley backend:
The opened pyvisa resource (deveithley device) is the global variable 'dev'.
You can interact using pyvisa functions, such as
k.write("command"), k.query("command") etc. to interact with the device.
arduino backend:
The Arduino will be avaiable as BleakClient using the global variable 'dev'. """)
else:
print(topic.__doc__)
def init():
global k, settings, test, config_path
print(r""" ____ __. ______________________ _______ ________
| |/ _| \__ ___/\_ _____/ \ \ / _____/
| < ______ | | | __)_ / | \ / \ ___
| | \ /_____/ | | | \/ | \\ \_\ \
|____|__ \ |____| /_______ /\____|__ / \______ /
\/ \/ \/ \/ 1.1
global dev, settings, config_path
print(r""" ______________________ _______ ________
_____ \__ ___/\_ _____/ \ \ / _____/
/ \ ______| | | __)_ / | \ / \ ___
| Y Y \/_____/| | | \/ | \\ \_\ \
|__|_| / |____| /_______ /\____|__ / \______ /
\/ \/ \/ \/ 1.2
Interactive Shell for TENG measurements with Keithley 2600B
---
Enter 'help()' for a list of commands""")
from os import environ
if path.isfile("k-teng.json"):
config_path = "k-teng.json"
if path.isfile("m-teng.json"):
config_path = "m-teng.json"
elif 'XDG_CONFIG_HOME' in environ.keys():
# and path.isfile(environ["XDG_CONFIG_HOME"] + "/k-teng.json"):
config_path = environ["XDG_CONFIG_HOME"] + "/k-teng.json"
# and path.isfile(environ["XDG_CONFIG_HOME"] + "/m-teng.json"):
config_path = environ["XDG_CONFIG_HOME"] + "/m-teng.json"
else:
config_path = path.expanduser("~/.config/k-teng.json")
from sys import argv
i = 1
while i < len(argv):
if argv[i] in ["-t", "--test"]:
test = True
elif argv[i] in ["-c", "--config"]:
if i+1 < len(argv):
config_path = argv[i+1]
else:
print("-c requires an extra argument: path of config file")
i += 1
i += 1
config_path = path.expanduser("~/.config/m-teng.json")
if args["config"]:
config_path = args["config"]
if not path.isdir(path.dirname(config_path)):
@ -399,15 +388,12 @@ Enter 'help()' for a list of commands""")
if not path.isdir(settings["datadir"]):
makedirs(settings["datadir"])
if not test:
from .keithley.keithley import init_keithley
try:
k = init_keithley(beep_success=settings["beep"])
except Exception as e:
print(e)
exit()
else:
print("Running in test mode, device will not be connected.")
try:
dev = _backend.init(beep_success=settings["beep"])
except Exception as e:
print(e)
exit(1)
atexit.register(_backend.exit, dev)
if __name__ == "__main__":

123
m_teng/update_funcs.py Normal file
View File

@ -0,0 +1,123 @@
import matplotlib.pyplot as plt
import numpy as np
import torch
from teng_ml.util import model_io as mio
from teng_ml.util.settings import MLSettings
from teng_ml.util.split import DataSplitter
from m_teng.backends.keithley import keithley
def _update_print(i, ival, vval):
print(f"n = {i:5d}, I = {ival: .12f} A, U = {vval: .5f} V" + " "*10, end='\r')
class _Monitor:
"""
Monitor v and i data
"""
def __init__(self, max_points_shown=None, use_print=False):
self.max_points_shown = max_points_shown
self.use_print = use_print
self.index = []
self.vdata = []
self.idata = []
plt.ion()
self.fig1, (self.vax, self.iax) = plt.subplots(2, 1, figsize=(8, 5))
self.vline, = self.vax.plot(self.index, self.vdata, color="g")
self.vax.set_ylabel("Voltage [V]")
self.vax.grid(True)
self.iline, = self.iax.plot(self.index, self.idata, color="m")
self.iax.set_ylabel("Current [A]")
self.iax.grid(True)
def update(self, i, ival, vval):
if self.use_print:
_update_print(i, ival, vval)
self.index.append(i)
self.idata.append(ival)
self.vdata.append(vval)
# update data
self.iline.set_xdata(self.index)
self.iline.set_ydata(self.idata)
self.vline.set_xdata(self.index)
self.vline.set_ydata(self.vdata)
# recalculate limits and set them for the view
self.iax.relim()
self.vax.relim()
if self.max_points_shown and i > self.max_points_shown:
self.iax.set_xlim(i - self.max_points_shown, i)
self.vax.set_xlim(i - self.max_points_shown, i)
self.iax.autoscale_view()
self.vax.autoscale_view()
# update plot
self.fig1.canvas.draw()
self.fig1.canvas.flush_events()
def __del__(self):
plt.close(self.fig1)
class _ModelPredict:
colors = ["red", "green", "purple", "blue", "orange", "grey", "cyan"]
def __init__(self, instr, model_dir):
"""
@param model_dir: directory where model.plk and settings.pkl are stored
Predict the values that are currently being recorded
@details:
Load the model and model settings from model dir
Wait until the number of recoreded points is >= the size of the models DataSplitter
Collect the data from the keithley, apply the transforms and predict the label with the model
Shows the prediction with a bar plot
"""
self.instr = instr
self.model = mio.load_model(model_dir)
self.model_settings: MLSettings = mio.load_settings(model_dir)
if type(self.model_settings.splitter) == DataSplitter:
self.data_length = self.model_settings.splitter.split_size
else:
self.data_length = 200
plt.ion()
self.fig1, (self.ax) = plt.subplots(1, 1, figsize=(8, 5))
self.bar_cont = self.ax.bar(self.model_settings.labels.get_labels(), [ 1 for _ in range(len(self.model_settings.labels))])
self.ax.set_ylabel("Prediction")
self.ax.grid(True)
def update(self, i, ival, vval):
buffer_size = keithley.get_buffer_size(self.instr, buffer_nr=1)
if buffer_size <= self.data_length:
print(f"ModelPredict.update: buffer_size={buffer_size} < {self.data_length}")
return
else:
ibuffer = keithley.collect_buffer_range(self.instr, (buffer_size-self.data_length, buffer_size), buffer_nr=1)
vbuffer = keithley.collect_buffer_range(self.instr, (buffer_size-self.data_length, buffer_size), buffer_nr=2)
if self.model_settings.num_features == 1: # model uses only voltage
data = np.vstack((ibuffer[:,0], ibuffer[:,1], vbuffer[:,1])).T
# print(f"data.shape:", data.shape)
else:
raise NotImplementedError(f"Cant handle models with num_features != 1 yet")
for t in self.model_settings.transforms:
data = t(data)
data = np.reshape(data[:,2], (1, -1, 1)) # batch_size, seq, features
with torch.no_grad():
x = torch.FloatTensor(data) # select voltage data, without timestamps
# print(x.shape)
prediction = self.model(x) # (batch_size, label-predictions)
prediction = torch.nn.functional.softmax(prediction) # TODO remove when softmax is already applied by model
predicted = torch.argmax(prediction, dim=1, keepdim=False) # -> [ label_indices ]
# print(f"raw={prediction[0]}, predicted_index={predicted[0}")
label = self.model_settings.labels[predicted[0]]
# print(f"-> label={label}")
self.bar_cont.remove()
self.bar_cont = self.ax.bar(self.model_settings.labels.get_labels(), prediction[0], color=_ModelPredict.colors[:len(self.model_settings.labels)])
# update plot
self.fig1.canvas.draw()
self.fig1.canvas.flush_events()

View File

17
m_teng/utility/testing.py Normal file
View File

@ -0,0 +1,17 @@
import numpy as np
def testcurve(x, frequency=10, peak_width=2, amplitude=20, bias=0):
# 0 = pk - width
# 2pi = pk + width
# want peak at n*time == frequency
nearest_peak = np.round(x / frequency, 0)
# if not peak at 0 and within peak_width
if nearest_peak > 0 and np.abs((x - nearest_peak * frequency)) < peak_width:
# return sin that does one period within 2*peak_width
return amplitude * np.sin(2*np.pi * (x - nearest_peak * frequency - peak_width) / (2*peak_width)) + bias
else:
return bias
def get_testcurve(frequency=10, peak_width=2, amplitude=20, bias=0):
return np.vectorize(lambda x: testcurve(x, frequency=frequency, peak_width=peak_width, amplitude=amplitude, bias=bias))

File diff suppressed because one or more lines are too long

35
pyproject.toml Normal file
View File

@ -0,0 +1,35 @@
[build-system]
requires = ["setuptools"]
[project]
name = "k_teng"
version = "1.0.0"
description = "Interactive utility for I-V measurements with a Keitley 2600 SMU"
requires-python = ">=3.10"
readme = "readme.md"
license = {file = "LICENSE"}
authors = [
{ name = "Matthias Quintern", email = "matthias@quintern.xyz" }
]
classifiers = [
"Operating System :: POSIX :: Linux",
"Environment :: Console",
"Programming Language :: Python :: 3",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
]
dependencies = [
"matplotlib>=3.6",
"numpy",
"pyvisa",
"pyvisa-py"
]
[project.urls]
repository = "https://git.quintern.xyz/MatthiasQuintern/k-teng"
[tool.setuptools.packages.find]
where = ["."]
[project.scripts]
k-teng = "k_teng.k_teng_interactive:main"

View File

@ -1,23 +1,39 @@
# K-TENG
Helper scripts and shell for measuring **T**ribo**e**lectric **N**ano**g**enerator-based sensor output with a Keithley 2611B SMU using pyvisa
# m-TENG
Helper scripts and shell for measuring **T**ribo**e**lectric **N**ano**g**enerator-based sensor output with a Keithley 2611B SMU or an Arduino
## Features
### Useful functions for scripts
- Measure Voltage and/or Current
- Transfer buffer from device to host
- Save/load as csv
- Run lua script on device
- Auto-filenames
### Interactive (shell) mode
- Live view
- Press button to stop
- Save and load settings (default interval, data directory...)
- Easily run arbitrary command on device
## Shell mode
Start with:
```shell
ipython -i k_teng_interactive.py
```
Use `help()` to get a list of available commands
### Useful functions for scripts
- Measure voltage and/or current
- Transfer buffer from measurement device to host
- Save/load as csv
- Run lua script on Keithley SMU
- Auto-filenames
## Available backends
### keithley
Use a Keithley 2611B Source-Measure-Unit via *pyvisa*. This backend allows measuring both voltage and current simultaneously.
### arduino
Use a Bluetooth capable Arduino with [https://git.quintern.xyz/MatthiasQuintern/teng-arduino](this software on the arduino).
This backend only allows measuring voltage using an Arduinos analog input pin (0 - 3.3 V, 12 bit resolution).
### testing
Use the shell without measuring TENG output. When starting a measurement, sample data will be generated.
## Shell mode
It is recommended to run the shell with ipython:
```shell
ipython -i k_teng_interactive.py -- -*X*
```
Substitute *X* for `-k` for keithley backend, `-a` for arduino backend or `-t` for testing backend.
In the shell, run `help()` to get a list of available commands