Compare commits
4 Commits
3e79e2ea4f
...
b526bfb913
Author | SHA1 | Date | |
---|---|---|---|
|
b526bfb913 | ||
|
0eed4d402b | ||
|
0494881232 | ||
|
a0f8a2a903 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,2 +1,3 @@
|
|||||||
__pycache__
|
__pycache__
|
||||||
.ipynb_checkpoints
|
.ipynb_checkpoints
|
||||||
|
testing
|
||||||
|
2
MANIFEST.in
Normal file
2
MANIFEST.in
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
include regina/package-data/*
|
||||||
|
include regina/sql/*.sql
|
1
bspc_rule.sh
Executable file
1
bspc_rule.sh
Executable file
@ -0,0 +1 @@
|
|||||||
|
bspc rule -a matplotlib desktop=^1 state=pseudo_tiled focus=off
|
@ -1,107 +0,0 @@
|
|||||||
from time import sleep
|
|
||||||
import numpy as np
|
|
||||||
from matplotlib import pyplot as plt
|
|
||||||
import pyvisa
|
|
||||||
|
|
||||||
from .keithley import reset
|
|
||||||
from ..utility import testing as _testing
|
|
||||||
|
|
||||||
def measure_count(instr, V=True, I=True, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True, testing=False):
|
|
||||||
"""
|
|
||||||
Take <count> measurements with <interval> inbetween
|
|
||||||
|
|
||||||
@details
|
|
||||||
Uses the devices overlappedY function to make the measurements asynchronosly
|
|
||||||
The update_func is optional and only used when I == True and V == True
|
|
||||||
The update_func does not necessarily get all the values that are measured. To obtain the whole measurement, get them from the device buffers (smua.nvbufferX)
|
|
||||||
@param instr: pyvisa instrument
|
|
||||||
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
|
|
||||||
@param update_interval: interval at which the update_func is called
|
|
||||||
"""
|
|
||||||
f_meas = None
|
|
||||||
if V and I:
|
|
||||||
f_meas = "smua.measure.overlappediv(smua.nvbuffer1, smua.nvbuffer2)"
|
|
||||||
elif V:
|
|
||||||
f_meas = "smua.measure.overlappedv(smua.nvbuffer1)"
|
|
||||||
elif I:
|
|
||||||
f_meas = "smua.measure.overlappedi(smua.nvbuffer1)"
|
|
||||||
else:
|
|
||||||
print("I and/or V needs to be set to True")
|
|
||||||
return
|
|
||||||
|
|
||||||
i = 0
|
|
||||||
if not testing:
|
|
||||||
reset(instr, verbose=verbose)
|
|
||||||
instr.write(f"smua.measure.count = {count}")
|
|
||||||
instr.write(f"smua.measure.interval = {interval}")
|
|
||||||
|
|
||||||
# start measurement
|
|
||||||
instr.write(f"smua.source.output = smua.OUTPUT_ON")
|
|
||||||
instr.write(f_meas)
|
|
||||||
|
|
||||||
condition = lambda: float(instr.query("print(status.operation.measuring.condition)").strip("\n ")) != 0
|
|
||||||
else:
|
|
||||||
condition = lambda: i < int(float(count) * interval / update_interval)
|
|
||||||
|
|
||||||
sleep(update_interval)
|
|
||||||
# for live viewing
|
|
||||||
|
|
||||||
query = """if smua.nvbufferX.n > 0 then print(smua.nvbufferX.readings[smua.nvbufferX.n]) else print(0) end"""
|
|
||||||
# will return 2.0 while measruing
|
|
||||||
while condition():
|
|
||||||
if update_func and V and I:
|
|
||||||
try:
|
|
||||||
if not testing:
|
|
||||||
ival = float(instr.query(query.replace("X", "1")).strip("\n"))
|
|
||||||
vval = float(instr.query(query.replace("X", "2")).strip("\n"))
|
|
||||||
else:
|
|
||||||
ival = _testing.testcurve(i, peak_width=1, amplitude=5e-8)
|
|
||||||
vval = -_testing.testcurve(i, peak_width=2, amplitude=15)
|
|
||||||
update_func(i, ival, vval)
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
sleep(update_interval)
|
|
||||||
i += 1
|
|
||||||
|
|
||||||
if not testing:
|
|
||||||
instr.write(f"smua.source.output = smua.OUTPUT_OFF")
|
|
||||||
|
|
||||||
if beep_done:
|
|
||||||
instr.write("beeper.beep(0.3, 1000)")
|
|
||||||
|
|
||||||
|
|
||||||
def measure(instr, interval, update_func=None, max_measurements=None, testing=False):
|
|
||||||
"""
|
|
||||||
@details:
|
|
||||||
- Resets the buffers
|
|
||||||
- Until KeyboardInterrupt:
|
|
||||||
- Take measurement
|
|
||||||
- Call update_func
|
|
||||||
- Wait interval
|
|
||||||
Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision
|
|
||||||
You can take the data from the buffer afterwards, using save_csv
|
|
||||||
@param instr: pyvisa instrument
|
|
||||||
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
|
|
||||||
@param max_measurements : maximum number of measurements. None means infinite
|
|
||||||
"""
|
|
||||||
if not testing:
|
|
||||||
reset(instr, verbose=True)
|
|
||||||
instr.write("smua.source.output = smua.OUTPUT_ON")
|
|
||||||
instr.write("format.data = format.ASCII\nformat.asciiprecision = 12")
|
|
||||||
try:
|
|
||||||
i = 0
|
|
||||||
while max_measurements is None or i < max_measurements:
|
|
||||||
if testing:
|
|
||||||
ival = _testing.testcurve(i, peak_width=1, amplitude=5e-8)
|
|
||||||
vval = -_testing.testcurve(i, peak_width=2, amplitude=15)
|
|
||||||
else:
|
|
||||||
ival, vval = tuple(float(v) for v in instr.query("print(smua.measure.iv(smua.nvbuffer1, smua.nvbuffer2))").strip('\n').split('\t'))
|
|
||||||
if update_func:
|
|
||||||
update_func(i, ival, vval)
|
|
||||||
sleep(interval)
|
|
||||||
i += 1
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
if not testing:
|
|
||||||
instr.write("smua.source.output = smua.OUTPUT_OFF")
|
|
||||||
print("Measurement stopped" + " "*50)
|
|
@ -1,4 +0,0 @@
|
|||||||
pdms
|
|
||||||
kapton
|
|
||||||
plastic
|
|
||||||
|
|
@ -1,49 +0,0 @@
|
|||||||
readings = [
|
|
||||||
4.974127e-10, -1.418591e-12, -1.573563e-12, -1.728535e-12, -1.704693e-12,
|
|
||||||
-1.847744e-12, -1.931190e-12, -1.788139e-12, -1.871586e-12, -2.169609e-12,
|
|
||||||
-2.074242e-12, -1.895428e-12, -1.895428e-12, -1.776218e-12, -1.990795e-12,
|
|
||||||
-1.788139e-12, -1.811981e-12, -1.657009e-12, -1.573563e-12, -2.396107e-12,
|
|
||||||
-2.515316e-12, -2.765656e-12, -2.825260e-12, -3.147125e-12, -2.300739e-12,
|
|
||||||
-2.825260e-12, -3.278255e-12, -5.257130e-12, -6.818771e-12, -8.916855e-12,
|
|
||||||
-7.712841e-12, 6.437302e-12, -1.142025e-11, -1.206398e-11, -4.649043e-10,
|
|
||||||
-3.427613e-09, -2.460408e-09, -2.340376e-09, -1.306653e-10, 1.496077e-11,
|
|
||||||
2.933741e-11, 1.953280e-09, 8.579970e-10, 9.226799e-12, -1.095533e-11,
|
|
||||||
-2.508163e-11, -2.776039e-09, -8.686423e-09, 4.935264e-12, 1.246929e-11,
|
|
||||||
3.225744e-09, 2.814472e-09, 1.877034e-09, 2.229273e-09, 1.713574e-09,
|
|
||||||
8.355618e-10, -4.332781e-10, 5.896091e-11, 5.762577e-11, 8.129537e-09,
|
|
||||||
4.044378e-09, 1.771629e-09, 7.849216e-10, 4.098892e-10, 3.390551e-10,
|
|
||||||
2.956390e-10, 3.033876e-10, 1.716256e-10, 1.463890e-11, -5.078316e-12,
|
|
||||||
-6.949902e-12, -8.106232e-12, -6.473065e-12, -4.506111e-12, 4.919767e-11,
|
|
||||||
3.052297e-08, 1.161162e-08, -9.892106e-09, -3.613818e-09, -5.004287e-09,
|
|
||||||
-2.015829e-11, -4.183054e-11, -1.810908e-10, -2.042532e-10, -3.516316e-10,
|
|
||||||
5.099773e-11, 1.921976e-08, -1.256589e-08, -4.242897e-10, -1.358986e-12,
|
|
||||||
-3.445148e-12, -3.838539e-12, -4.184246e-12, -7.402897e-12, -2.840877e-10,
|
|
||||||
-2.872229e-10, -2.730966e-10, -1.134396e-10, -4.376173e-11, -3.576279e-14
|
|
||||||
]
|
|
||||||
timestamps = [
|
|
||||||
0. , 0.05 , 0.1 , 0.15, 0.2, 0.25, 0.3, 0.35,
|
|
||||||
0.4 , 0.45 , 0.5 , 0.55, 0.6, 0.65, 0.7, 0.75,
|
|
||||||
0.8 , 0.85 , 0.9 , 0.95, 1. , 1.05, 1.1, 1.15,
|
|
||||||
1.2 , 1.25 , 1.3 , 1.35, 1.4, 1.45, 1.5, 1.55,
|
|
||||||
1.6 , 1.690891, 1.710923, 1.75, 1.8, 1.85, 1.9, 1.95,
|
|
||||||
2. , 2.05 , 2.1 , 2.15, 2.2, 2.25, 2.3, 2.35,
|
|
||||||
2.420332, 2.45 , 2.5 , 2.55, 2.6, 2.65, 2.7, 2.75,
|
|
||||||
2.820553, 2.890843, 2.910875, 2.95, 3. , 3.05, 3.1, 3.15,
|
|
||||||
3.2 , 3.25 , 3.3 , 3.35, 3.4, 3.45, 3.5, 3.55,
|
|
||||||
3.6 , 3.65 , 3.7 , 3.75, 3.8, 3.85, 3.9, 3.95,
|
|
||||||
4. , 4.05 , 4.1 , 4.15, 4.2, 4.25, 4.3, 4.35,
|
|
||||||
4.4 , 4.45 , 4.5 , 4.55, 4.6, 4.65, 4.7, 4.75,
|
|
||||||
4.8 , 4.85 , 4.9 , 4.95, ]
|
|
||||||
|
|
||||||
import pandas as pd
|
|
||||||
import numpy as np
|
|
||||||
import scipy as sp
|
|
||||||
import matplotlib.pyplot as plt
|
|
||||||
df = pd.DataFrame(np.vstack((timestamps, readings)).T)
|
|
||||||
df.columns = ["Time [s]", "Voltage [V]"]
|
|
||||||
print(df)
|
|
||||||
df.to_csv("test.csv", sep=",", header=True, index=False)
|
|
||||||
|
|
||||||
df.plot(x='Time [s]', y="Voltage [V]")
|
|
||||||
plt.show()
|
|
||||||
|
|
@ -1,33 +0,0 @@
|
|||||||
import numpy as np
|
|
||||||
import matplotlib.pyplot as plt
|
|
||||||
|
|
||||||
def testcurve(x, frequency=10, peak_width=2, amplitude=20, bias=0):
|
|
||||||
# want peak at n*time == frequency
|
|
||||||
nearest_peak = np.round(x / frequency, 0)
|
|
||||||
# if not peak at 0 and within peak_width
|
|
||||||
# print(x, nearest_peak)
|
|
||||||
if nearest_peak > 0 and abs((x - nearest_peak * frequency)) < peak_width:
|
|
||||||
# return sin that does one period within 2*peak_width
|
|
||||||
return amplitude * np.sin(2*np.pi * (x - nearest_peak * frequency - peak_width) / (2*peak_width)) + bias
|
|
||||||
else:
|
|
||||||
return bias
|
|
||||||
|
|
||||||
# 0 = pk - width
|
|
||||||
# 2pi = pk + width
|
|
||||||
|
|
||||||
def baseline(data):
|
|
||||||
# find the value where the most values with low gradients are closest to
|
|
||||||
gradients = np.abs(np.gradient(data))
|
|
||||||
# consider the values where the absolute gradient is in the bottom 20%
|
|
||||||
n_gradients = len(data) // 20
|
|
||||||
consider_indices = np.argsort(gradients)[:n_gradients]
|
|
||||||
# of those, only consider values where the value
|
|
||||||
consider_values = data[consider_indices]
|
|
||||||
|
|
||||||
|
|
||||||
xdata = np.arange(0, 100, 0.01)
|
|
||||||
ydata = np.vectorize(testcurve)(xdata)
|
|
||||||
|
|
||||||
|
|
||||||
plt.plot(xdata, ydata)
|
|
||||||
plt.show()
|
|
@ -1,38 +0,0 @@
|
|||||||
|
|
||||||
import pandas as pd
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
def collect_buffer(instr, buffer_nr=1):
|
|
||||||
"""
|
|
||||||
Get the buffer as 2D - np.array
|
|
||||||
@param instr : pyvisa instrument
|
|
||||||
@param buffer_nr : 1 or 2, for smua.nvbuffer1 or 2
|
|
||||||
@returns 2D numpy array:
|
|
||||||
i - ith reading:
|
|
||||||
0: timestamps
|
|
||||||
1: readings
|
|
||||||
"""
|
|
||||||
if buffer_nr == 2: buffername = "smua.nvbuffer2"
|
|
||||||
else: buffername = "smua.nvbuffer1"
|
|
||||||
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
|
|
||||||
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
|
|
||||||
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
|
|
||||||
timestamps = instr.query_ascii_values(f"printbuffer(1, {buffername}.n, {buffername}.timestamps)", container=np.array)
|
|
||||||
readings = instr.query_ascii_values(f"printbuffer(1, {buffername}.n, {buffername}.readings)", container=np.array)
|
|
||||||
print(f"readings: {readings}, \ntimestamps: {timestamps}")
|
|
||||||
buffer = np.vstack((timestamps, readings)).T
|
|
||||||
return buffer
|
|
||||||
|
|
||||||
|
|
||||||
def testcurve(x, frequency=10, peak_width=2, amplitude=20, bias=0):
|
|
||||||
# want peak at n*time == frequency
|
|
||||||
nearest_peak = np.round(x / frequency, 0)
|
|
||||||
# if not peak at 0 and within peak_width
|
|
||||||
if nearest_peak > 0 and abs((x - nearest_peak * frequency)) < peak_width:
|
|
||||||
# return sin that does one period within 2*peak_width
|
|
||||||
return amplitude * np.sin(2*np.pi * (x - nearest_peak * frequency - peak_width) / (2*peak_width)) + bias
|
|
||||||
else:
|
|
||||||
return bias
|
|
||||||
|
|
||||||
# 0 = pk - width
|
|
||||||
# 2pi = pk + width
|
|
155
m_teng/backends/arduino/arduino.py
Normal file
155
m_teng/backends/arduino/arduino.py
Normal file
@ -0,0 +1,155 @@
|
|||||||
|
import bleak as b
|
||||||
|
import asyncio
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
TARGET_NAME = "ArduinoTENG"
|
||||||
|
|
||||||
|
# GATT service and characteristics UUIDs
|
||||||
|
TENG_SUUID = "00010000-9a74-4b30-9361-4a16ec09930f"
|
||||||
|
TENG_STATUS_CUUID = "00010001-9a74-4b30-9361-4a16ec09930f"
|
||||||
|
TENG_COMMAND_CUUID = "00010002-9a74-4b30-9361-4a16ec09930f"
|
||||||
|
TENG_READING_CUUID = "00010003-9a74-4b30-9361-4a16ec09930f"
|
||||||
|
TENG_COUNT_CUUID = "00010004-9a74-4b30-9361-4a16ec09930f"
|
||||||
|
TENG_INTERVAL_CUUID = "00010005-9a74-4b30-9361-4a16ec09930f"
|
||||||
|
|
||||||
|
TENG_COMMANDS = {
|
||||||
|
"STOP": int(0).to_bytes(1, signed=False),
|
||||||
|
"MEASURE_COUNT": int(1).to_bytes(1, signed=False),
|
||||||
|
"MEASURE": int(2).to_bytes(1, signed=False),
|
||||||
|
}
|
||||||
|
TENG_STATUS = ["ERROR", "BUSY", "WAIT_CONNECT", "CONNECTED", "MEASURING"]
|
||||||
|
|
||||||
|
# TODO save measurements on device buffer, transfer later
|
||||||
|
|
||||||
|
|
||||||
|
# wrapper for global variable
|
||||||
|
class Buffer:
|
||||||
|
def __init__(self):
|
||||||
|
self.data = None
|
||||||
|
_buffer = Buffer()
|
||||||
|
|
||||||
|
# class Runner:
|
||||||
|
# def __init__(self):
|
||||||
|
runner = asyncio.Runner()
|
||||||
|
|
||||||
|
def teng_status_callback(characteristic, data):
|
||||||
|
value = int.from_bytes(data, byteorder="big", signed=False)
|
||||||
|
if 0 <= value and value < len(TENG_STATUS):
|
||||||
|
print(f"Status change: {TENG_STATUS[value]}")
|
||||||
|
else:
|
||||||
|
print(f"Status change (invalid): status={value}")
|
||||||
|
|
||||||
|
|
||||||
|
def disconnect_callback(client):
|
||||||
|
raise Exception(f"The Bluetooth device {client.name} was disconnected")
|
||||||
|
|
||||||
|
|
||||||
|
async def init_arduino_async(n_tries: int=5) -> b.BleakClient:
|
||||||
|
n_try = 0
|
||||||
|
if n_tries <= 0: n_tries = "inf"
|
||||||
|
try:
|
||||||
|
target_device = None
|
||||||
|
while target_device is None and (n_tries == "inf" or n_try < n_tries):
|
||||||
|
print(f"Searching for Bluetooth device '{TARGET_NAME}' ({n_try+1}/{n_tries})", end="\r")
|
||||||
|
devices = await b.BleakScanner.discover(return_adv=True, timeout=1.5)
|
||||||
|
# print(devices)
|
||||||
|
for adr, (device, adv_data) in devices.items():
|
||||||
|
if device.name == TARGET_NAME:
|
||||||
|
# print(adv_data)
|
||||||
|
target_device = device
|
||||||
|
break
|
||||||
|
n_try += 1
|
||||||
|
if target_device is None:
|
||||||
|
raise Exception(f"Could not find Bluetooth device 'ArduinoTENG'")
|
||||||
|
# print(f"Found target device: {target_device.name}: {target_device.metadata}, {target_device.details}")
|
||||||
|
# print(target_device.name, target_device.details)
|
||||||
|
client = b.BleakClient(target_device, disconnect_callback=disconnect_callback)
|
||||||
|
await client.connect()
|
||||||
|
print(f"Connected to Bluetooth device '{TARGET_NAME}' at [{client.address}]")
|
||||||
|
return client
|
||||||
|
except asyncio.exceptions.CancelledError:
|
||||||
|
raise Exception(f"Cancelled")
|
||||||
|
|
||||||
|
|
||||||
|
def init(beep_success=True, n_tries: int=5) -> b.BleakClient:
|
||||||
|
"""
|
||||||
|
Connect to the arduino
|
||||||
|
@returns: BleakClient
|
||||||
|
"""
|
||||||
|
client = runner.run(init_arduino_async(n_tries=n_tries))
|
||||||
|
if beep_success: beep(client)
|
||||||
|
return client
|
||||||
|
|
||||||
|
|
||||||
|
def exit(client):
|
||||||
|
try:
|
||||||
|
runner.run(stop_measurement(client))
|
||||||
|
runner.run(client.disconnect())
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
async def set_interval(client, interval: float):
|
||||||
|
"""
|
||||||
|
Set the measurement interval
|
||||||
|
@param interval: interval in seconds
|
||||||
|
"""
|
||||||
|
interval = int(interval * 1000) # convert to ms for arduinos delay)
|
||||||
|
await client.write_gatt_char(TENG_INTERVAL_CUUID, interval.to_bytes(2, byteorder="little", signed=False))
|
||||||
|
|
||||||
|
async def set_count(client, count: int):
|
||||||
|
"""
|
||||||
|
Set the measurement count
|
||||||
|
@param count: number of measurements to take
|
||||||
|
"""
|
||||||
|
await client.write_gatt_char(TENG_COUNT_CUUID, count.to_bytes(2, byteorder="little", signed=False))
|
||||||
|
|
||||||
|
async def stop_measurement(client):
|
||||||
|
await client.write_gatt_char(TENG_COMMAND_CUUID, TENG_COMMANDS["STOP"])
|
||||||
|
|
||||||
|
async def start_measure_count(client):
|
||||||
|
await client.write_gatt_char(TENG_COMMAND_CUUID, TENG_COMMANDS["MEASURE_COUNT"])
|
||||||
|
|
||||||
|
async def start_measure(client):
|
||||||
|
await client.write_gatt_char(TENG_COMMAND_CUUID, TENG_COMMANDS["MEASURE"])
|
||||||
|
|
||||||
|
|
||||||
|
# async def main():
|
||||||
|
# for service in client.services:
|
||||||
|
# print(f"Service: {service.uuid}: {service.description}")
|
||||||
|
# for c in service.characteristics:
|
||||||
|
# print(f"\t{c.uuid}: {c.properties}, {c.descriptors}")
|
||||||
|
# teng_status = client.services.get_characteristic(TENG_STATUS_CUUID)
|
||||||
|
# teng_command = client.services.get_characteristic(TENG_COMMAND_CUUID)
|
||||||
|
# teng_reading = client.services.get_characteristic(TENG_READING_CUUID)
|
||||||
|
# client.start_notify(teng_status, teng_status_callback)
|
||||||
|
|
||||||
|
# await client.write_gatt_char(teng_command, TENG_COMMANDS["NOOP"])
|
||||||
|
# await asyncio.sleep(5)
|
||||||
|
# await client.write_gatt_char(teng_command, TENG_COMMANDS["MEASURE_BASELINE"])
|
||||||
|
|
||||||
|
# while client.is_connected:
|
||||||
|
# data = await client.read_gatt_char(teng_reading)
|
||||||
|
|
||||||
|
|
||||||
|
# value = int.from_bytes(data, byteorder="little", signed=False)
|
||||||
|
# print(f"Reading: {value}")
|
||||||
|
# await asyncio.sleep(0.5)
|
||||||
|
# except KeyboardInterrupt:
|
||||||
|
# pass
|
||||||
|
# except asyncio.exceptions.CancelledError:
|
||||||
|
# pass
|
||||||
|
# print("Disconnected")
|
||||||
|
|
||||||
|
|
||||||
|
def collect_buffer(instr, buffer_nr=1):
|
||||||
|
"""
|
||||||
|
@param buffer_nr: 1 -> current, 2 -> voltage
|
||||||
|
"""
|
||||||
|
assert(buffer_nr in (1, 2))
|
||||||
|
return np.vstack((_buffer.data[:,0], _buffer.data[:,buffer_nr])).T
|
||||||
|
|
||||||
|
|
||||||
|
def beep(client):
|
||||||
|
# TODO connect beeper to arduino?
|
||||||
|
print("beep")
|
75
m_teng/backends/arduino/measure.py
Normal file
75
m_teng/backends/arduino/measure.py
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
import bleak as b
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
from m_teng.backends.arduino.arduino import beep, set_interval, set_count, TENG_READING_CUUID, _buffer, start_measure, start_measure_count, stop_measurement, runner
|
||||||
|
|
||||||
|
|
||||||
|
async def _measure_count_async(client, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True):
|
||||||
|
global _buffer
|
||||||
|
_buffer.data = np.zeros((count, 3))
|
||||||
|
i = 0
|
||||||
|
t_start = datetime.datetime.now()
|
||||||
|
async def add_reading(teng_reading_cr, reading: bytearray):
|
||||||
|
nonlocal i, count
|
||||||
|
if i >= count: return
|
||||||
|
_buffer.data[i][0] = float((datetime.datetime.now() - t_start).microseconds) / 1000
|
||||||
|
# reading = await client.read_gatt_char(TENG_READING_CUUID)
|
||||||
|
_buffer.data[i][2] = int.from_bytes(reading, byteorder="little", signed=False)
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
await set_interval(client, interval)
|
||||||
|
await set_count(client, count)
|
||||||
|
# TODO check if notify works when the same value is written again
|
||||||
|
await client.start_notify(TENG_READING_CUUID, add_reading)
|
||||||
|
await start_measure_count(client)
|
||||||
|
while i < count:
|
||||||
|
await asyncio.sleep(update_interval)
|
||||||
|
if update_func is not None and i > 0: # assume an update has occured
|
||||||
|
update_func(i-1, 0, _buffer.data[i-1, 2])
|
||||||
|
await client.stop_notify(TENG_READING_CUUID)
|
||||||
|
if beep_done: beep(client)
|
||||||
|
|
||||||
|
def measure_count(client, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True):
|
||||||
|
runner.run(_measure_count_async(client, count=count, interval=interval, update_func=update_func, update_interval=update_interval, beep_done=beep_done, verbose=verbose))
|
||||||
|
|
||||||
|
|
||||||
|
async def _measure_async(client, interval, update_func=None, max_measurements=None):
|
||||||
|
global _buffer
|
||||||
|
readings = []
|
||||||
|
timestamps = []
|
||||||
|
i = 0
|
||||||
|
t_start = datetime.datetime.now()
|
||||||
|
|
||||||
|
async def add_reading(teng_reading_cr, reading):
|
||||||
|
nonlocal i
|
||||||
|
timestamps.append(float((datetime.datetime.now() - t_start).microseconds) / 1000)
|
||||||
|
reading = int.from_bytes(reading, byteorder="little", signed=False)
|
||||||
|
readings.append(reading)
|
||||||
|
|
||||||
|
if update_func:
|
||||||
|
try:
|
||||||
|
update_func(i, 0, reading)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
raise asyncio.exceptions.CancelledError("KeyboardInterrupt in update_func")
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
await set_interval(client, interval)
|
||||||
|
await client.start_notify(TENG_READING_CUUID, add_reading)
|
||||||
|
await start_measure(client)
|
||||||
|
try:
|
||||||
|
while max_measurements is None or i < max_measurements:
|
||||||
|
await asyncio.sleep(0.1) #
|
||||||
|
except asyncio.exceptions.CancelledError:
|
||||||
|
pass
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
await client.stop_notify(TENG_READING_CUUID)
|
||||||
|
await stop_measurement(client)
|
||||||
|
_buffer.data = np.vstack((timestamps, np.zeros(len(timestamps)), readings)).T
|
||||||
|
print("Measurement stopped" + " "*50)
|
||||||
|
|
||||||
|
def measure(client, interval, update_func=None, max_measurements=None):
|
||||||
|
runner.run(_measure_async(client, interval=interval, update_func=update_func, max_measurements=max_measurements))
|
0
m_teng/backends/keithley/__init__.py
Normal file
0
m_teng/backends/keithley/__init__.py
Normal file
@ -1,20 +1,19 @@
|
|||||||
import pyvisa
|
import pyvisa
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
import pkg_resources
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Utility
|
Utility
|
||||||
"""
|
"""
|
||||||
script_dir = "../scripts/"
|
|
||||||
scripts = {
|
scripts = {
|
||||||
"buffer_reset": "buffer_reset.lua",
|
"buffer_reset": pkg_resources.resource_filename("m_teng", "keithley_scripts/buffer_reset.lua"),
|
||||||
"smua_reset": "smua_reset.lua",
|
"smua_reset": pkg_resources.resource_filename("m_teng", "keithley_scripts/smua_reset.lua"),
|
||||||
}
|
}
|
||||||
for key,val in scripts.items():
|
|
||||||
scripts[key] = script_dir + scripts[key]
|
|
||||||
|
|
||||||
|
|
||||||
def init_keithley(beep_success=True):
|
def init(beep_success=True):
|
||||||
rm = pyvisa.ResourceManager('@py')
|
rm = pyvisa.ResourceManager('@py')
|
||||||
resources = rm.list_resources()
|
resources = rm.list_resources()
|
||||||
if len(resources) < 1:
|
if len(resources) < 1:
|
||||||
@ -32,6 +31,11 @@ def init_keithley(beep_success=True):
|
|||||||
return keithley
|
return keithley
|
||||||
|
|
||||||
|
|
||||||
|
def exit(instr):
|
||||||
|
instr.close()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def run_lua(instr, script_path, verbose=False):
|
def run_lua(instr, script_path, verbose=False):
|
||||||
"""
|
"""
|
||||||
Run a lua script from the host on the instrument
|
Run a lua script from the host on the instrument
|
||||||
@ -52,6 +56,15 @@ def reset(instr, verbose=False):
|
|||||||
run_lua(instr, scripts["smua_reset"], verbose=verbose)
|
run_lua(instr, scripts["smua_reset"], verbose=verbose)
|
||||||
run_lua(instr, scripts["buffer_reset"], verbose=verbose)
|
run_lua(instr, scripts["buffer_reset"], verbose=verbose)
|
||||||
|
|
||||||
|
def get_buffer_name(buffer_nr: int):
|
||||||
|
if buffer_nr == 2: return "smua.nvbuffer2"
|
||||||
|
elif buffer_nr == 1: return "smua.nvbuffer1"
|
||||||
|
raise ValueError(f"Invalid buffer_nr: {buffer_nr}")
|
||||||
|
|
||||||
|
def get_buffer_size(instr, buffer_nr=1):
|
||||||
|
n = instr.query(f"print({get_buffer_name(buffer_nr)}.n)").strip("\n")
|
||||||
|
return int(float(n))
|
||||||
|
|
||||||
|
|
||||||
def collect_buffer(instr, buffer_nr=1, verbose=False):
|
def collect_buffer(instr, buffer_nr=1, verbose=False):
|
||||||
"""
|
"""
|
||||||
@ -63,8 +76,7 @@ def collect_buffer(instr, buffer_nr=1, verbose=False):
|
|||||||
0: timestamps
|
0: timestamps
|
||||||
1: readings
|
1: readings
|
||||||
"""
|
"""
|
||||||
if buffer_nr == 2: buffername = "smua.nvbuffer2"
|
buffername = get_buffer_name(buffer_nr)
|
||||||
else: buffername = "smua.nvbuffer1"
|
|
||||||
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
|
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
|
||||||
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
|
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
|
||||||
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
|
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
|
||||||
@ -75,3 +87,30 @@ def collect_buffer(instr, buffer_nr=1, verbose=False):
|
|||||||
buffer = np.vstack((timestamps, readings)).T
|
buffer = np.vstack((timestamps, readings)).T
|
||||||
return buffer
|
return buffer
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def collect_buffer_range(instr, range_=(1, -1), buffer_nr=1, verbose=False):
|
||||||
|
"""
|
||||||
|
Get the buffer as 2D - np.array
|
||||||
|
@param instr : pyvisa instrument
|
||||||
|
@param buffer_nr : 1 or 2, for smua.nvbuffer1 or 2
|
||||||
|
@returns 2D numpy array:
|
||||||
|
i - ith reading:
|
||||||
|
0: timestamps
|
||||||
|
1: readings
|
||||||
|
"""
|
||||||
|
buffername = get_buffer_name(buffer_nr)
|
||||||
|
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN")
|
||||||
|
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array)
|
||||||
|
if range_[1] == -1:
|
||||||
|
range_ = (range_[0], f"{buffername}.n")
|
||||||
|
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7")
|
||||||
|
timestamps = instr.query_ascii_values(f"printbuffer({range_[0]}, {range_[1]}, {buffername}.timestamps)", container=np.array)
|
||||||
|
readings = instr.query_ascii_values(f"printbuffer({range_[0]}, {range_[1]}, {buffername}.readings)", container=np.array)
|
||||||
|
if verbose:
|
||||||
|
print(f"readings from {buffername}: {readings}, \ntimestamps: {timestamps}")
|
||||||
|
buffer = np.vstack((timestamps, readings)).T
|
||||||
|
return buffer
|
||||||
|
|
||||||
|
|
||||||
|
|
93
m_teng/backends/keithley/measure.py
Normal file
93
m_teng/backends/keithley/measure.py
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
from time import sleep
|
||||||
|
import numpy as np
|
||||||
|
from matplotlib import pyplot as plt
|
||||||
|
import pyvisa
|
||||||
|
|
||||||
|
from m_teng.backends.keithley import reset
|
||||||
|
from m_teng.utility import testing as _testing
|
||||||
|
|
||||||
|
def measure_count(instr, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True):
|
||||||
|
"""
|
||||||
|
Take <count> measurements with <interval> inbetween
|
||||||
|
|
||||||
|
@details
|
||||||
|
Uses the devices overlappedY function to make the measurements asynchronosly
|
||||||
|
The update_func is optional and only used when I == True and V == True
|
||||||
|
The update_func does not necessarily get all the values that are measured. To obtain the whole measurement, get them from the device buffers (smua.nvbufferX)
|
||||||
|
@param instr: pyvisa instrument
|
||||||
|
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
|
||||||
|
@param update_interval: interval at which the update_func is called
|
||||||
|
"""
|
||||||
|
f_meas = "smua.measure.overlappediv(smua.nvbuffer1, smua.nvbuffer2)"
|
||||||
|
# if V and I:
|
||||||
|
# elif V:
|
||||||
|
# f_meas = "smua.measure.overlappedv(smua.nvbuffer1)"
|
||||||
|
# elif I:
|
||||||
|
# f_meas = "smua.measure.overlappedi(smua.nvbuffer1)"
|
||||||
|
# else:
|
||||||
|
# print("I and/or V needs to be set to True")
|
||||||
|
# return
|
||||||
|
|
||||||
|
i = 0
|
||||||
|
reset(instr, verbose=verbose)
|
||||||
|
instr.write(f"smua.measure.count = {count}")
|
||||||
|
instr.write(f"smua.measure.interval = {interval}")
|
||||||
|
|
||||||
|
# start measurement
|
||||||
|
instr.write(f"smua.source.output = smua.OUTPUT_ON")
|
||||||
|
instr.write(f_meas)
|
||||||
|
|
||||||
|
sleep(update_interval)
|
||||||
|
# for live viewing
|
||||||
|
query = """if smua.nvbufferX.n > 0 then print(smua.nvbufferX.readings[smua.nvbufferX.n]) else print(0) end"""
|
||||||
|
|
||||||
|
# will return 2.0 while measruing
|
||||||
|
while float(instr.query("print(status.operation.measuring.condition)").strip("\n ")) != 0:
|
||||||
|
if update_func:
|
||||||
|
try:
|
||||||
|
ival = float(instr.query(query.replace("X", "1")).strip("\n"))
|
||||||
|
vval = float(instr.query(query.replace("X", "2")).strip("\n"))
|
||||||
|
update_func(i, ival, vval)
|
||||||
|
except ValueError as e:
|
||||||
|
if i != 0:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
print(f"measure_count: ValueError: {e}")
|
||||||
|
sleep(update_interval)
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
instr.write(f"smua.source.output = smua.OUTPUT_OFF")
|
||||||
|
|
||||||
|
if beep_done:
|
||||||
|
instr.write("beeper.beep(0.3, 1000)")
|
||||||
|
|
||||||
|
|
||||||
|
def measure(instr, interval, update_func=None, max_measurements=None):
|
||||||
|
"""
|
||||||
|
@details:
|
||||||
|
- Resets the buffers
|
||||||
|
- Until KeyboardInterrupt:
|
||||||
|
- Take measurement
|
||||||
|
- Call update_func
|
||||||
|
- Wait interval
|
||||||
|
Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision
|
||||||
|
You can take the data from the buffer afterwards, using save_csv
|
||||||
|
@param instr: pyvisa instrument
|
||||||
|
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
|
||||||
|
@param max_measurements : maximum number of measurements. None means infinite
|
||||||
|
"""
|
||||||
|
reset(instr, verbose=True)
|
||||||
|
instr.write("smua.source.output = smua.OUTPUT_ON")
|
||||||
|
instr.write("format.data = format.ASCII\nformat.asciiprecision = 12")
|
||||||
|
try:
|
||||||
|
i = 0
|
||||||
|
while max_measurements is None or i < max_measurements:
|
||||||
|
ival, vval = tuple(float(v) for v in instr.query("print(smua.measure.iv(smua.nvbuffer1, smua.nvbuffer2))").strip('\n').split('\t'))
|
||||||
|
if update_func:
|
||||||
|
update_func(i, ival, vval)
|
||||||
|
sleep(interval)
|
||||||
|
i += 1
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
instr.write("smua.source.output = smua.OUTPUT_OFF")
|
||||||
|
print("Measurement stopped" + " "*50)
|
@ -1,6 +1,6 @@
|
|||||||
"""
|
"""
|
||||||
run this before using this library:
|
run this before using this library:
|
||||||
ipython -i k_teng_interactive.py
|
ipython -i m_teng_interactive.py
|
||||||
|
|
||||||
always records iv-t curves
|
always records iv-t curves
|
||||||
i-data -> smua.nvbuffer1
|
i-data -> smua.nvbuffer1
|
||||||
@ -17,22 +17,56 @@ from time import sleep
|
|||||||
from os import path, makedirs
|
from os import path, makedirs
|
||||||
import pickle as pkl
|
import pickle as pkl
|
||||||
import json
|
import json
|
||||||
|
import atexit
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
import sys
|
||||||
if __package__ is None:
|
if __package__ is None:
|
||||||
# make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change
|
# make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change
|
||||||
__package__ = "k-teng"
|
__package__ = "m_teng"
|
||||||
import sys
|
|
||||||
from os import path
|
from os import path
|
||||||
filepath = path.realpath(path.abspath(__file__))
|
filepath = path.realpath(path.abspath(__file__))
|
||||||
sys.path.insert(0, path.dirname(path.dirname(filepath)))
|
sys.path.insert(0, path.dirname(path.dirname(filepath)))
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
prog="m-teng",
|
||||||
|
description="measure triboelectric nanogenerator output using a Keithley SMU or an Arduino",
|
||||||
|
)
|
||||||
|
backend_group = parser.add_mutually_exclusive_group(required=True)
|
||||||
|
backend_group.add_argument("-k", "--keithley", action="store_true")
|
||||||
|
backend_group.add_argument("-a", "--arduino", action="store_true")
|
||||||
|
backend_group.add_argument("-t", "--testing", action='store_true')
|
||||||
|
parser.add_argument("-c", "--config", action="store", help="alternate path to config file")
|
||||||
|
args = vars(parser.parse_args())
|
||||||
|
|
||||||
|
i = 1
|
||||||
|
while i < len(sys.argv):
|
||||||
|
if args["keithley"]:
|
||||||
|
import m_teng.backends.keithley.keithley as _backend
|
||||||
|
import m_teng.backends.keithley.measure as _measure
|
||||||
|
elif args["arduino"]:
|
||||||
|
import m_teng.backends.arduino.arduino as _backend
|
||||||
|
import m_teng.backends.arduino.measure as _measure
|
||||||
|
elif args["testing"]:
|
||||||
|
import m_teng.backends.testing.testing as _backend
|
||||||
|
import m_teng.backends.testing.measure as _measure
|
||||||
|
elif sys.argv[i] in ["-c", "--config"]:
|
||||||
|
if i+1 < len(sys.argv):
|
||||||
|
config_path = sys.argv[i+1]
|
||||||
|
else:
|
||||||
|
print("-c requires an extra argument: path of config file")
|
||||||
|
i += 1
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
|
||||||
from .keithley import keithley as _keithley
|
from m_teng.utility import data as _data
|
||||||
from .keithley.measure import measure_count as _measure_count, measure as _measure
|
from m_teng.utility.data import load_dataframe
|
||||||
from .utility import data as _data
|
from m_teng.utility import file_io
|
||||||
from .utility.data import load_dataframe
|
from m_teng.update_funcs import _Monitor, _ModelPredict, _update_print
|
||||||
from .utility import file_io
|
|
||||||
|
config_path = path.expanduser("~/.config/k-teng.json")
|
||||||
|
|
||||||
_runtime_vars = {
|
_runtime_vars = {
|
||||||
"last-measurement": ""
|
"last-measurement": ""
|
||||||
@ -44,66 +78,35 @@ settings = {
|
|||||||
"interval": 0.02,
|
"interval": 0.02,
|
||||||
"beep": True,
|
"beep": True,
|
||||||
}
|
}
|
||||||
config_path = path.expanduser("~/.config/k-teng.json")
|
|
||||||
|
|
||||||
test = False
|
test = False
|
||||||
|
|
||||||
# global variable for the instrument returned by pyvisa
|
# global variable for the instrument/client returned by pyvisa/bleak
|
||||||
k = None
|
dev = None
|
||||||
|
|
||||||
|
|
||||||
|
def monitor_predict(model_dir: str, count=5000, interval=settings["interval"], max_points_shown=160):
|
||||||
def _update_print(i, ival, vval):
|
|
||||||
print(f"n = {i:5d}, I = {ival: .12f} A, U = {vval: .5f} V" + " "*10, end='\r')
|
|
||||||
|
|
||||||
class _Monitor:
|
|
||||||
"""
|
"""
|
||||||
Monitor v and i data
|
Take <count> measurements in <interval> and predict with a machine learning model
|
||||||
"""
|
"""
|
||||||
def __init__(self, max_points_shown=None, use_print=False):
|
model_predict = _ModelPredict(dev, model_dir)
|
||||||
self.max_points_shown = max_points_shown
|
plt_monitor = _Monitor(max_points_shown, use_print=False)
|
||||||
self.use_print = use_print
|
skip_n = 0
|
||||||
self.index = []
|
def update(i, ival, vval):
|
||||||
self.vdata = []
|
plt_monitor.update(i, ival, vval)
|
||||||
self.idata = []
|
if skip_n % 10 == 0:
|
||||||
|
model_predict.update(i, ival, vval)
|
||||||
plt.ion()
|
skip_n += 1
|
||||||
self.fig1, (self.vax, self.iax) = plt.subplots(2, 1, figsize=(8, 5))
|
|
||||||
|
|
||||||
self.vline, = self.vax.plot(self.index, self.vdata, color="g")
|
|
||||||
self.vax.set_ylabel("Voltage [V]")
|
|
||||||
self.vax.grid(True)
|
|
||||||
|
|
||||||
self.iline, = self.iax.plot(self.index, self.idata, color="m")
|
|
||||||
self.iax.set_ylabel("Current [A]")
|
|
||||||
self.iax.grid(True)
|
|
||||||
|
|
||||||
def update(self, i, ival, vval):
|
|
||||||
if self.use_print:
|
|
||||||
_update_print(i, ival, vval)
|
|
||||||
self.index.append(i)
|
|
||||||
self.idata.append(ival)
|
|
||||||
self.vdata.append(vval)
|
|
||||||
# update data
|
|
||||||
self.iline.set_xdata(self.index)
|
|
||||||
self.iline.set_ydata(self.idata)
|
|
||||||
self.vline.set_xdata(self.index)
|
|
||||||
self.vline.set_ydata(self.vdata)
|
|
||||||
# recalculate limits and set them for the view
|
|
||||||
self.iax.relim()
|
|
||||||
self.vax.relim()
|
|
||||||
if self.max_points_shown and i > self.max_points_shown:
|
|
||||||
self.iax.set_xlim(i - self.max_points_shown, i)
|
|
||||||
self.vax.set_xlim(i - self.max_points_shown, i)
|
|
||||||
self.iax.autoscale_view()
|
|
||||||
self.vax.autoscale_view()
|
|
||||||
# update plot
|
|
||||||
self.fig1.canvas.draw()
|
|
||||||
self.fig1.canvas.flush_events()
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
plt.close(self.fig1)
|
|
||||||
|
|
||||||
|
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
|
||||||
|
try:
|
||||||
|
_measure.measure_count(dev, count=count, interval=interval, beep_done=False, verbose=False, update_func=update, update_interval=0.1)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
if args["keithley"]:
|
||||||
|
dev.write(f"smua.source.output = smua.OUTPUT_OFF")
|
||||||
|
print("Monitoring cancelled, measurement might still continue" + " "*50)
|
||||||
|
else:
|
||||||
|
print("Measurement finished" + " "*50)
|
||||||
|
|
||||||
def monitor_count(count=5000, interval=settings["interval"], max_points_shown=160):
|
def monitor_count(count=5000, interval=settings["interval"], max_points_shown=160):
|
||||||
"""
|
"""
|
||||||
@ -123,10 +126,10 @@ def monitor_count(count=5000, interval=settings["interval"], max_points_shown=16
|
|||||||
|
|
||||||
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
|
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
|
||||||
try:
|
try:
|
||||||
_measure_count(k, V=True, I=True, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05, testing=test)
|
_measure.measure_count(dev, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
if not test:
|
if args["keithley"]:
|
||||||
k.write(f"smua.source.output = smua.OUTPUT_OFF")
|
dev.write(f"smua.source.output = smua.OUTPUT_OFF")
|
||||||
print("Monitoring cancelled, measurement might still continue" + " "*50)
|
print("Monitoring cancelled, measurement might still continue" + " "*50)
|
||||||
else:
|
else:
|
||||||
print("Measurement finished" + " "*50)
|
print("Measurement finished" + " "*50)
|
||||||
@ -147,10 +150,10 @@ def measure_count(count=5000, interval=settings["interval"]):
|
|||||||
|
|
||||||
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
|
print(f"Starting measurement with:\n\tinterval = {interval}s\nSave the data using 'save_csv()' afterwards.")
|
||||||
try:
|
try:
|
||||||
_measure_count(k, V=True, I=True, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05, testing=test)
|
_measure.measure_count(dev, count=count, interval=interval, beep_done=False, verbose=False, update_func=update_func, update_interval=0.05)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
if not test:
|
if args["keithley"]:
|
||||||
k.write(f"smua.source.output = smua.OUTPUT_OFF")
|
dev.write(f"smua.source.output = smua.OUTPUT_OFF")
|
||||||
print("Monitoring cancelled, measurement might still continue" + " "*50)
|
print("Monitoring cancelled, measurement might still continue" + " "*50)
|
||||||
else:
|
else:
|
||||||
print("Measurement finished" + " "*50)
|
print("Measurement finished" + " "*50)
|
||||||
@ -176,7 +179,7 @@ def monitor(interval=settings["interval"], max_measurements=None, max_points_sho
|
|||||||
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
|
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
|
||||||
plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown)
|
plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown)
|
||||||
update_func = plt_monitor.update
|
update_func = plt_monitor.update
|
||||||
_measure(k, interval=interval, max_measurements=max_measurements, update_func=update_func, testing=test)
|
_measure.measure(dev, interval=interval, max_measurements=max_measurements, update_func=update_func)
|
||||||
|
|
||||||
|
|
||||||
def measure(interval=settings["interval"], max_measurements=None):
|
def measure(interval=settings["interval"], max_measurements=None):
|
||||||
@ -195,7 +198,7 @@ def measure(interval=settings["interval"], max_measurements=None):
|
|||||||
_runtime_vars["last_measurement"] = dtime.now().isoformat()
|
_runtime_vars["last_measurement"] = dtime.now().isoformat()
|
||||||
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
|
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
|
||||||
update_func = _update_print
|
update_func = _update_print
|
||||||
_measure(k, interval=interval, max_measurements=max_measurements, update_func=update_func, testing=test)
|
_measure.measure(dev, interval=interval, max_measurements=max_measurements, update_func=update_func)
|
||||||
|
|
||||||
|
|
||||||
def repeat(measure_func: callable, count: int, repeat_delay=0):
|
def repeat(measure_func: callable, count: int, repeat_delay=0):
|
||||||
@ -221,7 +224,7 @@ def repeat(measure_func: callable, count: int, repeat_delay=0):
|
|||||||
sleep(repeat_delay)
|
sleep(repeat_delay)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass
|
pass
|
||||||
if settings["beep"]: k.write("beeper.beep(0.3, 1000)")
|
if settings["beep"]: _backend.beep()
|
||||||
|
|
||||||
|
|
||||||
def get_dataframe():
|
def get_dataframe():
|
||||||
@ -229,21 +232,14 @@ def get_dataframe():
|
|||||||
Get a pandas dataframe from the data in smua.nvbuffer1 and smua.nvbuffer2
|
Get a pandas dataframe from the data in smua.nvbuffer1 and smua.nvbuffer2
|
||||||
"""
|
"""
|
||||||
global k, settings, _runtime_vars
|
global k, settings, _runtime_vars
|
||||||
if test:
|
ibuffer = _backend.collect_buffer(dev, 1)
|
||||||
timestamps = np.arange(0, 50, 0.01)
|
vbuffer = _backend.collect_buffer(dev, 2)
|
||||||
ydata = np.array([testing.testcurve(t, amplitude=15, peak_width=2) for t in timestamps])
|
|
||||||
ibuffer = np.vstack((timestamps, ydata)).T
|
|
||||||
|
|
||||||
ydata = np.array([-testing.testcurve(t, amplitude=5e-8, peak_width=1) for t in timestamps])
|
|
||||||
vbuffer = np.vstack((timestamps, ydata)).T
|
|
||||||
else:
|
|
||||||
ibuffer = _keithley.collect_buffer(k, 1)
|
|
||||||
vbuffer = _keithley.collect_buffer(k, 2)
|
|
||||||
df = _data.buffers2dataframe(ibuffer, vbuffer)
|
df = _data.buffers2dataframe(ibuffer, vbuffer)
|
||||||
df.basename = file_io.get_next_filename(settings["name"], settings["datadir"])
|
df.basename = file_io.get_next_filename(settings["name"], settings["datadir"])
|
||||||
df.name = f"{df.basename} @ {_runtime_vars['last-measurement']}"
|
df.name = f"{df.basename} @ {_runtime_vars['last-measurement']}"
|
||||||
return df
|
return df
|
||||||
|
|
||||||
|
|
||||||
def save_csv():
|
def save_csv():
|
||||||
"""
|
"""
|
||||||
Saves the contents of nvbuffer1 as .csv
|
Saves the contents of nvbuffer1 as .csv
|
||||||
@ -267,6 +263,7 @@ def save_pickle():
|
|||||||
df.to_pickle(filename)
|
df.to_pickle(filename)
|
||||||
print(f"Saved as '{filename}'")
|
print(f"Saved as '{filename}'")
|
||||||
|
|
||||||
|
|
||||||
def run_script(script_path):
|
def run_script(script_path):
|
||||||
"""
|
"""
|
||||||
Run a lua script on the Keithley device
|
Run a lua script on the Keithley device
|
||||||
@ -276,7 +273,7 @@ def run_script(script_path):
|
|||||||
if test:
|
if test:
|
||||||
print("run_script: Test mode enabled, ignoring call to run_script")
|
print("run_script: Test mode enabled, ignoring call to run_script")
|
||||||
else:
|
else:
|
||||||
_keithley.run_lua(k, script_path=script_path)
|
_keithley.run_lua(dev, script_path=script_path)
|
||||||
|
|
||||||
|
|
||||||
def set(setting, value):
|
def set(setting, value):
|
||||||
@ -305,16 +302,16 @@ def help(topic=None):
|
|||||||
if topic == None:
|
if topic == None:
|
||||||
print("""
|
print("""
|
||||||
Functions:
|
Functions:
|
||||||
measure - take measurements
|
measure [kat] - take measurements
|
||||||
monitor - take measurements with live monitoring in a matplotlib window
|
monitor [kat] - take measurements with live monitoring in a matplotlib window
|
||||||
measure_count - take a fixed number of measurements
|
measure_count [kat] - take a fixed number of measurements
|
||||||
monitor_count - take a fixed number of measurements with live monitoring in a matplotlib window
|
monitor_count [kat] - take a fixed number of measurements with live monitoring in a matplotlib window
|
||||||
repeat - measure and save to csv multiple times
|
repeat [kat] - measure and save to csv multiple times
|
||||||
get_dataframe - return smua.nvbuffer 1 and 2 as pandas dataframe
|
get_dataframe [kat] - return device internal buffer as pandas dataframe
|
||||||
save_csv - save the last measurement as csv file
|
save_csv [kat] - save the last measurement as csv file
|
||||||
save_pickle - save the last measurement as pickled pandas dataframe
|
save_pickle [kat] - save the last measurement as pickled pandas dataframe
|
||||||
load_dataframe - load a pandas dataframe from csv or pickle
|
load_dataframe [kat] - load a pandas dataframe from csv or pickle
|
||||||
run_script - run a lua script on the Keithely device
|
run_script [k ] - run a lua script on the Keithely device
|
||||||
Run 'help(function)' to see more information on a function
|
Run 'help(function)' to see more information on a function
|
||||||
|
|
||||||
Available topics:
|
Available topics:
|
||||||
@ -333,13 +330,13 @@ Run 'help("topic")' to see more information on a topic""")
|
|||||||
Functions:
|
Functions:
|
||||||
name("<name>") - short for set("name", "<name>")
|
name("<name>") - short for set("name", "<name>")
|
||||||
set("setting", value) - set a setting to a value
|
set("setting", value) - set a setting to a value
|
||||||
save_settings() - store the settings as "k-teng.conf" in the working directory
|
save_settings() - store the settings as "m-teng.json" in the working directory
|
||||||
load_settings() - load settings from a file
|
load_settings() - load settings from a file
|
||||||
The global variable 'config_path' determines the path used by save/load_settings. Use -c '<path>' to set another path.
|
The global variable 'config_path' determines the path used by save/load_settings. Use -c '<path>' to set another path.
|
||||||
The serach path is:
|
The serach path is:
|
||||||
<working-dir>/k-teng.json
|
<working-dir>/m-teng.json
|
||||||
$XDG_CONFIG_HOME/k-teng.json
|
$XDG_CONFIG_HOME/m-teng.json
|
||||||
~/.config/k-teng.json
|
~/.config/m-teng.json
|
||||||
""")
|
""")
|
||||||
elif topic == "imports":
|
elif topic == "imports":
|
||||||
print("""Imports:
|
print("""Imports:
|
||||||
@ -349,45 +346,37 @@ Functions:
|
|||||||
os.path """)
|
os.path """)
|
||||||
elif topic == "device":
|
elif topic == "device":
|
||||||
print("""Device:
|
print("""Device:
|
||||||
The opened pyvisa resource (Keithley device) is the global variable 'k'.
|
keithley backend:
|
||||||
|
The opened pyvisa resource (deveithley device) is the global variable 'dev'.
|
||||||
You can interact using pyvisa functions, such as
|
You can interact using pyvisa functions, such as
|
||||||
k.write("command"), k.query("command") etc. to interact with the device.""")
|
k.write("command"), k.query("command") etc. to interact with the device.
|
||||||
|
arduino backend:
|
||||||
|
The Arduino will be avaiable as BleakClient using the global variable 'dev'. """)
|
||||||
else:
|
else:
|
||||||
print(topic.__doc__)
|
print(topic.__doc__)
|
||||||
|
|
||||||
|
|
||||||
def init():
|
def init():
|
||||||
global k, settings, test, config_path
|
global dev, settings, config_path
|
||||||
print(r""" ____ __. ______________________ _______ ________
|
print(r""" ______________________ _______ ________
|
||||||
| |/ _| \__ ___/\_ _____/ \ \ / _____/
|
_____ \__ ___/\_ _____/ \ \ / _____/
|
||||||
| < ______ | | | __)_ / | \ / \ ___
|
/ \ ______| | | __)_ / | \ / \ ___
|
||||||
| | \ /_____/ | | | \/ | \\ \_\ \
|
| Y Y \/_____/| | | \/ | \\ \_\ \
|
||||||
|____|__ \ |____| /_______ /\____|__ / \______ /
|
|__|_| / |____| /_______ /\____|__ / \______ /
|
||||||
\/ \/ \/ \/ 1.1
|
\/ \/ \/ \/ 1.2
|
||||||
Interactive Shell for TENG measurements with Keithley 2600B
|
Interactive Shell for TENG measurements with Keithley 2600B
|
||||||
---
|
---
|
||||||
Enter 'help()' for a list of commands""")
|
Enter 'help()' for a list of commands""")
|
||||||
from os import environ
|
from os import environ
|
||||||
if path.isfile("k-teng.json"):
|
if path.isfile("m-teng.json"):
|
||||||
config_path = "k-teng.json"
|
config_path = "m-teng.json"
|
||||||
elif 'XDG_CONFIG_HOME' in environ.keys():
|
elif 'XDG_CONFIG_HOME' in environ.keys():
|
||||||
# and path.isfile(environ["XDG_CONFIG_HOME"] + "/k-teng.json"):
|
# and path.isfile(environ["XDG_CONFIG_HOME"] + "/m-teng.json"):
|
||||||
config_path = environ["XDG_CONFIG_HOME"] + "/k-teng.json"
|
config_path = environ["XDG_CONFIG_HOME"] + "/m-teng.json"
|
||||||
else:
|
else:
|
||||||
config_path = path.expanduser("~/.config/k-teng.json")
|
config_path = path.expanduser("~/.config/m-teng.json")
|
||||||
|
if args["config"]:
|
||||||
from sys import argv
|
config_path = args["config"]
|
||||||
i = 1
|
|
||||||
while i < len(argv):
|
|
||||||
if argv[i] in ["-t", "--test"]:
|
|
||||||
test = True
|
|
||||||
elif argv[i] in ["-c", "--config"]:
|
|
||||||
if i+1 < len(argv):
|
|
||||||
config_path = argv[i+1]
|
|
||||||
else:
|
|
||||||
print("-c requires an extra argument: path of config file")
|
|
||||||
i += 1
|
|
||||||
i += 1
|
|
||||||
|
|
||||||
|
|
||||||
if not path.isdir(path.dirname(config_path)):
|
if not path.isdir(path.dirname(config_path)):
|
||||||
@ -399,15 +388,12 @@ Enter 'help()' for a list of commands""")
|
|||||||
if not path.isdir(settings["datadir"]):
|
if not path.isdir(settings["datadir"]):
|
||||||
makedirs(settings["datadir"])
|
makedirs(settings["datadir"])
|
||||||
|
|
||||||
if not test:
|
|
||||||
from .keithley.keithley import init_keithley
|
|
||||||
try:
|
try:
|
||||||
k = init_keithley(beep_success=settings["beep"])
|
dev = _backend.init(beep_success=settings["beep"])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(e)
|
||||||
exit()
|
exit(1)
|
||||||
else:
|
atexit.register(_backend.exit, dev)
|
||||||
print("Running in test mode, device will not be connected.")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
123
m_teng/update_funcs.py
Normal file
123
m_teng/update_funcs.py
Normal file
@ -0,0 +1,123 @@
|
|||||||
|
import matplotlib.pyplot as plt
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
import torch
|
||||||
|
|
||||||
|
from teng_ml.util import model_io as mio
|
||||||
|
from teng_ml.util.settings import MLSettings
|
||||||
|
from teng_ml.util.split import DataSplitter
|
||||||
|
|
||||||
|
from m_teng.backends.keithley import keithley
|
||||||
|
|
||||||
|
def _update_print(i, ival, vval):
|
||||||
|
print(f"n = {i:5d}, I = {ival: .12f} A, U = {vval: .5f} V" + " "*10, end='\r')
|
||||||
|
|
||||||
|
class _Monitor:
|
||||||
|
"""
|
||||||
|
Monitor v and i data
|
||||||
|
"""
|
||||||
|
def __init__(self, max_points_shown=None, use_print=False):
|
||||||
|
self.max_points_shown = max_points_shown
|
||||||
|
self.use_print = use_print
|
||||||
|
self.index = []
|
||||||
|
self.vdata = []
|
||||||
|
self.idata = []
|
||||||
|
|
||||||
|
plt.ion()
|
||||||
|
self.fig1, (self.vax, self.iax) = plt.subplots(2, 1, figsize=(8, 5))
|
||||||
|
|
||||||
|
self.vline, = self.vax.plot(self.index, self.vdata, color="g")
|
||||||
|
self.vax.set_ylabel("Voltage [V]")
|
||||||
|
self.vax.grid(True)
|
||||||
|
|
||||||
|
self.iline, = self.iax.plot(self.index, self.idata, color="m")
|
||||||
|
self.iax.set_ylabel("Current [A]")
|
||||||
|
self.iax.grid(True)
|
||||||
|
|
||||||
|
def update(self, i, ival, vval):
|
||||||
|
if self.use_print:
|
||||||
|
_update_print(i, ival, vval)
|
||||||
|
self.index.append(i)
|
||||||
|
self.idata.append(ival)
|
||||||
|
self.vdata.append(vval)
|
||||||
|
# update data
|
||||||
|
self.iline.set_xdata(self.index)
|
||||||
|
self.iline.set_ydata(self.idata)
|
||||||
|
self.vline.set_xdata(self.index)
|
||||||
|
self.vline.set_ydata(self.vdata)
|
||||||
|
# recalculate limits and set them for the view
|
||||||
|
self.iax.relim()
|
||||||
|
self.vax.relim()
|
||||||
|
if self.max_points_shown and i > self.max_points_shown:
|
||||||
|
self.iax.set_xlim(i - self.max_points_shown, i)
|
||||||
|
self.vax.set_xlim(i - self.max_points_shown, i)
|
||||||
|
self.iax.autoscale_view()
|
||||||
|
self.vax.autoscale_view()
|
||||||
|
# update plot
|
||||||
|
self.fig1.canvas.draw()
|
||||||
|
self.fig1.canvas.flush_events()
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
plt.close(self.fig1)
|
||||||
|
|
||||||
|
|
||||||
|
class _ModelPredict:
|
||||||
|
colors = ["red", "green", "purple", "blue", "orange", "grey", "cyan"]
|
||||||
|
def __init__(self, instr, model_dir):
|
||||||
|
"""
|
||||||
|
@param model_dir: directory where model.plk and settings.pkl are stored
|
||||||
|
|
||||||
|
Predict the values that are currently being recorded
|
||||||
|
@details:
|
||||||
|
Load the model and model settings from model dir
|
||||||
|
Wait until the number of recoreded points is >= the size of the models DataSplitter
|
||||||
|
Collect the data from the keithley, apply the transforms and predict the label with the model
|
||||||
|
Shows the prediction with a bar plot
|
||||||
|
"""
|
||||||
|
self.instr = instr
|
||||||
|
self.model = mio.load_model(model_dir)
|
||||||
|
self.model_settings: MLSettings = mio.load_settings(model_dir)
|
||||||
|
if type(self.model_settings.splitter) == DataSplitter:
|
||||||
|
self.data_length = self.model_settings.splitter.split_size
|
||||||
|
else:
|
||||||
|
self.data_length = 200
|
||||||
|
|
||||||
|
plt.ion()
|
||||||
|
self.fig1, (self.ax) = plt.subplots(1, 1, figsize=(8, 5))
|
||||||
|
|
||||||
|
self.bar_cont = self.ax.bar(self.model_settings.labels.get_labels(), [ 1 for _ in range(len(self.model_settings.labels))])
|
||||||
|
self.ax.set_ylabel("Prediction")
|
||||||
|
self.ax.grid(True)
|
||||||
|
|
||||||
|
def update(self, i, ival, vval):
|
||||||
|
buffer_size = keithley.get_buffer_size(self.instr, buffer_nr=1)
|
||||||
|
if buffer_size <= self.data_length:
|
||||||
|
print(f"ModelPredict.update: buffer_size={buffer_size} < {self.data_length}")
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
ibuffer = keithley.collect_buffer_range(self.instr, (buffer_size-self.data_length, buffer_size), buffer_nr=1)
|
||||||
|
vbuffer = keithley.collect_buffer_range(self.instr, (buffer_size-self.data_length, buffer_size), buffer_nr=2)
|
||||||
|
if self.model_settings.num_features == 1: # model uses only voltage
|
||||||
|
data = np.vstack((ibuffer[:,0], ibuffer[:,1], vbuffer[:,1])).T
|
||||||
|
# print(f"data.shape:", data.shape)
|
||||||
|
else:
|
||||||
|
raise NotImplementedError(f"Cant handle models with num_features != 1 yet")
|
||||||
|
for t in self.model_settings.transforms:
|
||||||
|
data = t(data)
|
||||||
|
data = np.reshape(data[:,2], (1, -1, 1)) # batch_size, seq, features
|
||||||
|
with torch.no_grad():
|
||||||
|
x = torch.FloatTensor(data) # select voltage data, without timestamps
|
||||||
|
# print(x.shape)
|
||||||
|
|
||||||
|
prediction = self.model(x) # (batch_size, label-predictions)
|
||||||
|
prediction = torch.nn.functional.softmax(prediction) # TODO remove when softmax is already applied by model
|
||||||
|
predicted = torch.argmax(prediction, dim=1, keepdim=False) # -> [ label_indices ]
|
||||||
|
# print(f"raw={prediction[0]}, predicted_index={predicted[0}")
|
||||||
|
label = self.model_settings.labels[predicted[0]]
|
||||||
|
# print(f"-> label={label}")
|
||||||
|
|
||||||
|
self.bar_cont.remove()
|
||||||
|
self.bar_cont = self.ax.bar(self.model_settings.labels.get_labels(), prediction[0], color=_ModelPredict.colors[:len(self.model_settings.labels)])
|
||||||
|
# update plot
|
||||||
|
self.fig1.canvas.draw()
|
||||||
|
self.fig1.canvas.flush_events()
|
0
m_teng/utility/__init__.py
Normal file
0
m_teng/utility/__init__.py
Normal file
17
m_teng/utility/testing.py
Normal file
17
m_teng/utility/testing.py
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
import numpy as np
|
||||||
|
|
||||||
|
def testcurve(x, frequency=10, peak_width=2, amplitude=20, bias=0):
|
||||||
|
# 0 = pk - width
|
||||||
|
# 2pi = pk + width
|
||||||
|
# want peak at n*time == frequency
|
||||||
|
nearest_peak = np.round(x / frequency, 0)
|
||||||
|
# if not peak at 0 and within peak_width
|
||||||
|
if nearest_peak > 0 and np.abs((x - nearest_peak * frequency)) < peak_width:
|
||||||
|
# return sin that does one period within 2*peak_width
|
||||||
|
return amplitude * np.sin(2*np.pi * (x - nearest_peak * frequency - peak_width) / (2*peak_width)) + bias
|
||||||
|
else:
|
||||||
|
return bias
|
||||||
|
|
||||||
|
def get_testcurve(frequency=10, peak_width=2, amplitude=20, bias=0):
|
||||||
|
return np.vectorize(lambda x: testcurve(x, frequency=frequency, peak_width=peak_width, amplitude=amplitude, bias=bias))
|
||||||
|
|
File diff suppressed because one or more lines are too long
35
pyproject.toml
Normal file
35
pyproject.toml
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
[build-system]
|
||||||
|
requires = ["setuptools"]
|
||||||
|
|
||||||
|
[project]
|
||||||
|
name = "k_teng"
|
||||||
|
version = "1.0.0"
|
||||||
|
description = "Interactive utility for I-V measurements with a Keitley 2600 SMU"
|
||||||
|
requires-python = ">=3.10"
|
||||||
|
readme = "readme.md"
|
||||||
|
license = {file = "LICENSE"}
|
||||||
|
authors = [
|
||||||
|
{ name = "Matthias Quintern", email = "matthias@quintern.xyz" }
|
||||||
|
]
|
||||||
|
classifiers = [
|
||||||
|
"Operating System :: POSIX :: Linux",
|
||||||
|
"Environment :: Console",
|
||||||
|
"Programming Language :: Python :: 3",
|
||||||
|
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
|
||||||
|
]
|
||||||
|
dependencies = [
|
||||||
|
"matplotlib>=3.6",
|
||||||
|
"numpy",
|
||||||
|
"pyvisa",
|
||||||
|
"pyvisa-py"
|
||||||
|
]
|
||||||
|
|
||||||
|
[project.urls]
|
||||||
|
repository = "https://git.quintern.xyz/MatthiasQuintern/k-teng"
|
||||||
|
|
||||||
|
|
||||||
|
[tool.setuptools.packages.find]
|
||||||
|
where = ["."]
|
||||||
|
|
||||||
|
[project.scripts]
|
||||||
|
k-teng = "k_teng.k_teng_interactive:main"
|
44
readme.md
44
readme.md
@ -1,23 +1,39 @@
|
|||||||
# K-TENG
|
# m-TENG
|
||||||
Helper scripts and shell for measuring **T**ribo**e**lectric **N**ano**g**enerator-based sensor output with a Keithley 2611B SMU using pyvisa
|
Helper scripts and shell for measuring **T**ribo**e**lectric **N**ano**g**enerator-based sensor output with a Keithley 2611B SMU or an Arduino
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
### Useful functions for scripts
|
|
||||||
- Measure Voltage and/or Current
|
|
||||||
- Transfer buffer from device to host
|
|
||||||
- Save/load as csv
|
|
||||||
- Run lua script on device
|
|
||||||
- Auto-filenames
|
|
||||||
### Interactive (shell) mode
|
### Interactive (shell) mode
|
||||||
- Live view
|
- Live view
|
||||||
- Press button to stop
|
- Press button to stop
|
||||||
- Save and load settings (default interval, data directory...)
|
- Save and load settings (default interval, data directory...)
|
||||||
- Easily run arbitrary command on device
|
- Easily run arbitrary command on device
|
||||||
|
|
||||||
## Shell mode
|
|
||||||
Start with:
|
|
||||||
```shell
|
|
||||||
ipython -i k_teng_interactive.py
|
|
||||||
```
|
|
||||||
|
|
||||||
Use `help()` to get a list of available commands
|
### Useful functions for scripts
|
||||||
|
- Measure voltage and/or current
|
||||||
|
- Transfer buffer from measurement device to host
|
||||||
|
- Save/load as csv
|
||||||
|
- Run lua script on Keithley SMU
|
||||||
|
- Auto-filenames
|
||||||
|
|
||||||
|
## Available backends
|
||||||
|
### keithley
|
||||||
|
Use a Keithley 2611B Source-Measure-Unit via *pyvisa*. This backend allows measuring both voltage and current simultaneously.
|
||||||
|
|
||||||
|
### arduino
|
||||||
|
Use a Bluetooth capable Arduino with [https://git.quintern.xyz/MatthiasQuintern/teng-arduino](this software on the arduino).
|
||||||
|
This backend only allows measuring voltage using an Arduinos analog input pin (0 - 3.3 V, 12 bit resolution).
|
||||||
|
|
||||||
|
### testing
|
||||||
|
Use the shell without measuring TENG output. When starting a measurement, sample data will be generated.
|
||||||
|
|
||||||
|
|
||||||
|
## Shell mode
|
||||||
|
It is recommended to run the shell with ipython:
|
||||||
|
```shell
|
||||||
|
ipython -i k_teng_interactive.py -- -*X*
|
||||||
|
```
|
||||||
|
Substitute *X* for `-k` for keithley backend, `-a` for arduino backend or `-t` for testing backend.
|
||||||
|
|
||||||
|
In the shell, run `help()` to get a list of available commands
|
||||||
|
Loading…
Reference in New Issue
Block a user