Add Arduino LED, Keithley2700 cleanup

This commit is contained in:
CPD 2025-01-17 15:59:12 +01:00
parent f7e85f6e73
commit a32f9e0183
10 changed files with 202 additions and 188 deletions

1
.gitignore vendored
View File

@ -2,6 +2,7 @@ __pycache__
.ipynb_checkpoints .ipynb_checkpoints
.vim .vim
.old .old
.spyproject
testing testing
bspc_rule.sh bspc_rule.sh
readings.txt readings.txt

View File

@ -1,60 +1,108 @@
import pyvisa import pyvisa
import numpy as np import numpy as np
from time import sleep
# import pkg_resources # import pkg_resources
import os
"""
Utility
"""
# scripts = {
# "buffer_reset": pkg_resources.resource_filename("cpdctrl", "keithley_scripts/buffer_reset.lua"),
# "instrument_reset": pkg_resources.resource_filename("cpdctrl", "keithley_scripts/smua_reset.lua"),
# }
scripts = {
"instrument_reset": "~/cpd-dev/cpdctrl/cpdctrl/keithley_scripts/reset.scpi",
}
class Keithley2007:
class Keithley2700:
""" """
Utility Wrapper class for the Keithley2700 SMU controlled via pyvisa
""" """
scripts = {}
def __init__(self, beep_success=True, visa_backend="", check_front_switch=True): def __init__(self, visa_backend="", check_front_switch=True):
rm = pyvisa.ResourceManager(visa_backend) rm = pyvisa.ResourceManager(visa_backend)
resources = rm.list_resources() resources = rm.list_resources()
if len(resources) < 1: if len(resources) < 1:
raise Exception("No resources found.") raise Exception("No resources found.")
elif len(resources) == 1: elif len(resources) == 1:
print(f"Opening the only resource found: {resources[0]}") print(f"Opening the only resource found: {resources[0]}")
self. = rm.open_resource(resources[0]) self.instr = rm.open_resource(resources[0])
if beep_success: keithley.write("beeper.beep(0.5, 1000)")
return keithley
elif len(resources) > 1: elif len(resources) > 1:
print(f"Resources:") print(f"Resources:")
for i, r in enumerate(resources): for i, r in enumerate(resources):
print(f"{i:02}: {r}") print(f"{i:02}: {r}")
instr = int(input("Select an instrument: ")) instr = int(input("Select an instrument: "))
self.instr = rm.open_resource(resources[instr]) self.instr = rm.open_resource(resources[instr])
if beep_success: self.beep()
if check_front_switch: if check_front_switch:
self._check_front_input_selected() self._check_front_input_selected()
def __del__(self):
"""Properly close the instrument connection"""
self.instr.close()
def _check_front_input_selected(self): def _check_front_input_selected(self):
"""
Make sure the front switch selecting the inputs selects the FRONT inputs
Raises
------
Exception
If front input state != 1
Returns
-------
switch : str
Front input state
"""
switch = self.instr.query("SYSTem:FRSwitch?").strip("\n") switch = self.instr.query("SYSTem:FRSwitch?").strip("\n")
if switch != "1": if switch != "1":
raise Exception("The Keithley's INPUT switch must select the [F]ront inputs") raise Exception("The Keithley's INPUT switch must select the [F]ront inputs")
return switch return switch
def run(self, code, verbose=False):
"""
Run SCPI code on the device by writing it.
Empty lines, leading whitespaces and lines starting with ' or # are ignored.
Parameters
----------
code : str
SCPI commands
"""
script = '\n'.join([l.strip(" ") for l in code.strip(" ").strip("\n").split("\n") if len(l) > 0 and l[0] not in "#'"])
if verbose:
print(f"Running code:\n{script}")
self.instr.write(script)
def run_script(self, script_path, verbose=False): def run_script(self, script_path, verbose=False):
""" """
Run a script from the host on the instrument Run a script from the host on the instrument
@param script_path : full path to the script @param script_path : full path to the script
Parameters
----------
script_path : str
full path to the script
verbose : bool, optional
If true, print a message when the script is run. The default is False.
Returns
-------
None.
""" """
with open(script_path, "r") as file: with open(os.path.expanduser(script_path), "r") as file:
script = file.read() script = file.read()
if verbose: print(f"Running script: {script_path}") if verbose: print(f"Running script: {script_path}")
self.instr.write(script) self.run(script)
def beep(self, length=0.5, pitch=1000): def beep(self, length=0.5, pitch=1000):
"""The Keithley2700 cant beep on command :(""" """The Keithley2700 cant beep on command :("""
pass pass
def __del__(self):
"""Properly close the instrument connection"""
self.instr.close()
def reset(self, verbose=False): def reset(self, verbose=False):
@ -62,40 +110,50 @@ class Keithley2007:
Reset smua and its buffers Reset smua and its buffers
@param instr : pyvisa instrument @param instr : pyvisa instrument
""" """
self.run_script(self.instr, scripts["smua_reset"], verbose=verbose) self.run_script(scripts["instrument_reset"], verbose=verbose)
run_lua(self.instr, scripts["buffer_reset"], verbose=verbose) self.buffer_reset()
def buffer_reset(self):
buffer_reset = """
TRACe:CLEar
TRACe:CLEar:AUTO ON
SYSTem:TSTamp:TYPE RELative
"""
self.run(buffer_reset)
def buffer_get_size(self, buffer_nr=1):
def get_buffer_size(instr, buffer_nr=1): n = self.instr.query(f"TRACe:POINts?").strip("\n")
n = instr.query(f"print({get_buffer_name(buffer_nr)}.n)").strip("\n")
return int(float(n)) return int(float(n))
def buffer_set_size(s):
if not type(s) == int or s < 2 or s > 55000:
raise ValueError(f"Invalid buffer size: {s}. Must be int and between 2 and 55000")
self.instr.write(f"TRACe:POINts {s}")
def collect_buffer(instr, buffer_nr=1, verbose=False): def buffer_collect(self, verbose=False):
""" """
Get the buffer as 2D - np.array Get the buffer as 2D - np.array
@param instr : pyvisa instrument
@param buffer_nr : 1 or 2, for smua.nvbuffer1 or 2 @param buffer_nr : 1 or 2, for smua.nvbuffer1 or 2
@returns 2D numpy array: @returns 2D numpy array:
i - ith reading: i - ith reading:
0: timestamps 0: timestamps
1: readings 1: readings
""" """
buffername = get_buffer_name(buffer_nr) readings = self.instr.query("TRACe:DATA?")
# instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN") tdata = []
# buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array) vdata = []
instr.write("format.data = format.ASCII\nformat.asciiprecision = 7") for reading in readings.split("\n"):
timestamps = instr.query_ascii_values(f"printbuffer(1, {buffername}.n, {buffername}.timestamps)", container=np.array) t, v = self.process_reading(reading)
readings = instr.query_ascii_values(f"printbuffer(1, {buffername}.n, {buffername}.readings)", container=np.array) tdata.append(t)
vdata.append(v)
if verbose: if verbose:
print(f"readings from {buffername}: {readings}, \ntimestamps: {timestamps}") print(f"readings from buffer:\n{vdata}\ntimestamps:\n{tdata}")
buffer = np.vstack((timestamps, readings)).T buffer = np.vstack((tdata, vdata)).T
return buffer return buffer
def collect_buffer_range(self, range_=(1, -1), buffer_nr=1, verbose=False):
def collect_buffer_range(instr, range_=(1, -1), buffer_nr=1, verbose=False):
""" """
Get the buffer as 2D - np.array Get the buffer as 2D - np.array
@param instr : pyvisa instrument @param instr : pyvisa instrument
@ -118,7 +176,50 @@ class Keithley2007:
buffer = np.vstack((timestamps, readings)).T buffer = np.vstack((timestamps, readings)).T
return buffer return buffer
def process_reading(self, reading: str):
"""
process a reading. Only works with VDC and relative time stamps right now!
'-1.19655066E+01VDC,+9627.275SECS,+64993RDNG#\n'
"""
parts = reading.split(",")
if len(parts) != 3:
raise ValueError(f"Invalid reading: '{reading}'")
vdc = float(parts[0][:-3])
timestamp = float(parts[1][:-4])
# TODO last value
return timestamp, vdc
def read_value(self):
reading = self.instr.query("READ?")
return self.process_reading(reading)
def measure(self, interval, update_func=None, max_measurements=None):
"""
@details:
- Resets the buffers
- Until KeyboardInterrupt:
- Take measurement
- Call update_func
- Wait interval
Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision
You can take the data from the buffer afterwards, using save_csv
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param max_measurements : maximum number of measurements. None means infinite
"""
self.reset(verbose=True)
try:
i = 0
while max_measurements is None or i < max_measurements:
tval, vval = self.read_value()
if update_func:
update_func(i, tval, vval)
sleep(interval)
i += 1
except KeyboardInterrupt:
pass
# instr.write("smua.source.output = smua.OUTPUT_OFF")
print("Measurement stopped" + " "*50)
def init():
return Keithley2700()

View File

@ -1,93 +0,0 @@
from time import sleep
import numpy as np
from matplotlib import pyplot as plt
import pyvisa
from cpdctrl.backends.keithley.keithley import reset
from cpdctrl.utility import testing as _testing
def measure_count(instr, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True):
"""
Take <count> measurements with <interval> inbetween
@details
Uses the devices overlappedY function to make the measurements asynchronosly
The update_func is optional and only used when I == True and V == True
The update_func does not necessarily get all the values that are measured. To obtain the whole measurement, get them from the device buffers (smua.nvbufferX)
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param update_interval: interval at which the update_func is called
"""
f_meas = "smua.measure.overlappediv(smua.nvbuffer1, smua.nvbuffer2)"
# if V and I:
# elif V:
# f_meas = "smua.measure.overlappedv(smua.nvbuffer1)"
# elif I:
# f_meas = "smua.measure.overlappedi(smua.nvbuffer1)"
# else:
# print("I and/or V needs to be set to True")
# return
i = 0
reset(instr, verbose=verbose)
instr.write(f"smua.measure.count = {count}")
instr.write(f"smua.measure.interval = {interval}")
# start measurement
instr.write(f"smua.source.output = smua.OUTPUT_ON")
instr.write(f_meas)
sleep(update_interval)
# for live viewing
query = """if smua.nvbufferX.n > 0 then print(smua.nvbufferX.readings[smua.nvbufferX.n]) else print(0) end"""
# will return 2.0 while measruing
while float(instr.query("print(status.operation.measuring.condition)").strip("\n ")) != 0:
if update_func:
try:
ival = float(instr.query(query.replace("X", "1")).strip("\n"))
vval = float(instr.query(query.replace("X", "2")).strip("\n"))
update_func(i, ival, vval)
except ValueError as e:
if i != 0:
pass
else:
print(f"measure_count: ValueError: {e}")
sleep(update_interval)
i += 1
instr.write(f"smua.source.output = smua.OUTPUT_OFF")
if beep_done:
instr.write("beeper.beep(0.3, 1000)")
def measure(instr, interval, update_func=None, max_measurements=None):
"""
@details:
- Resets the buffers
- Until KeyboardInterrupt:
- Take measurement
- Call update_func
- Wait interval
Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision
You can take the data from the buffer afterwards, using save_csv
@param instr: pyvisa instrument
@param update_func: Callable that processes the measurements: (index, ival, vval) -> None
@param max_measurements : maximum number of measurements. None means infinite
"""
reset(instr, verbose=True)
instr.write("smua.source.output = smua.OUTPUT_ON")
instr.write("format.data = format.ASCII\nformat.asciiprecision = 12")
try:
i = 0
while max_measurements is None or i < max_measurements:
ival, vval = tuple(float(v) for v in instr.query("print(smua.measure.iv(smua.nvbuffer1, smua.nvbuffer2))").strip('\n').split('\t'))
if update_func:
update_func(i, ival, vval)
sleep(interval)
i += 1
except KeyboardInterrupt:
pass
instr.write("smua.source.output = smua.OUTPUT_OFF")
print("Measurement stopped" + " "*50)

View File

@ -31,8 +31,8 @@ if __name__ == "__main__":
filepath = path.realpath(path.abspath(__file__)) filepath = path.realpath(path.abspath(__file__))
sys.path.insert(0, path.dirname(path.dirname(filepath))) sys.path.insert(0, path.dirname(path.dirname(filepath)))
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog="m-teng", prog="cpdctrl",
description="measure triboelectric nanogenerator output using a Keithley SMU or an Arduino", description="measure voltage using a Keithley SMU",
) )
backend_group = parser.add_mutually_exclusive_group(required=True) backend_group = parser.add_mutually_exclusive_group(required=True)
backend_group.add_argument("-k", "--keithley", action="store_true") backend_group.add_argument("-k", "--keithley", action="store_true")
@ -44,10 +44,7 @@ if __name__ == "__main__":
while i < len(sys.argv): while i < len(sys.argv):
if args["keithley"]: if args["keithley"]:
import cpdctrl.backends.keithley.keithley as _backend import cpdctrl.backends.keithley.keithley as _backend
import cpdctrl.backends.keithley.measure as _measure # import cpdctrl.backends.keithley.measure as _measure
elif args["arduino"]:
import cpdctrl.backends.arduino.arduino as _backend
import cpdctrl.backends.arduino.measure as _measure
elif args["testing"]: elif args["testing"]:
import cpdctrl.backends.testing.testing as _backend import cpdctrl.backends.testing.testing as _backend
import cpdctrl.backends.testing.measure as _measure import cpdctrl.backends.testing.measure as _measure
@ -158,7 +155,7 @@ def monitor(interval=None, max_measurements=None, max_points_shown=160):
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.") print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown) plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown)
update_func = plt_monitor.update update_func = plt_monitor.update
_measure.measure(dev, interval=interval, max_measurements=max_measurements, update_func=update_func) dev.measure(interval=interval, max_measurements=max_measurements, update_func=update_func)
def measure(interval=None, max_measurements=None): def measure(interval=None, max_measurements=None):
@ -178,33 +175,7 @@ def measure(interval=None, max_measurements=None):
_runtime_vars["last_measurement"] = dtime.now().isoformat() _runtime_vars["last_measurement"] = dtime.now().isoformat()
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.") print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'save_csv()' afterwards.")
update_func = _update_print update_func = _update_print
_measure.measure(dev, interval=interval, max_measurements=max_measurements, update_func=update_func) dev.measure(interval=interval, max_measurements=max_measurements, update_func=update_func)
def repeat(measure_func: callable, count: int, repeat_delay=0):
"""
Measure and save to csv multiple times
@details
Repeat count times:
- call measure_func
- call save_csv
- sleep for repeat_delay
@param measure_func: The measurement function to use. Use a lambda to bind your parameters!
@param count: Repeat count times
Example: Repeat 10 times:
repeat(lambda : monitor_count(count=6000, interval=0.02, max_points_shown=200), 10)
"""
try:
for _ in range(count):
measure_func()
save_csv()
sleep(repeat_delay)
except KeyboardInterrupt:
pass
if settings["beep"]: _backend.beep(dev)
def get_dataframe(): def get_dataframe():
@ -369,11 +340,11 @@ Enter 'help()' for a list of commands""")
makedirs(settings["datadir"]) makedirs(settings["datadir"])
try: try:
dev = _backend.init(beep_success=settings["beep"]) dev = _backend.init()
except Exception as e: except Exception as e:
print(e) print(e)
exit(1) exit(1)
atexit.register(_backend.exit, dev) # atexit.register(_backend.exit, dev)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -0,0 +1,2 @@
TRACe:CLEar
TRACe:CLEar:AUTO ON

View File

@ -3,3 +3,6 @@ VOLT:DC:RANGe:AUTO ON
SENSe:FUNC 'VOLT:DC' SENSe:FUNC 'VOLT:DC'
' Set voltage divider if required ' Set voltage divider if required
' SENSE:VOLT:DC:IDIVider OFF ' SENSE:VOLT:DC:IDIVider OFF
' Disable continuous initiation
INIT:CONT OFF

2
cpdctrl/led/__init__.py Normal file
View File

@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-

View File

@ -0,0 +1,35 @@
import serial
class LEDD1B:
def __init__(self, port="COM4"):
self.arduino = serial.Serial(port=port, baudrate=9600, timeout=.1)
self._check_arduino_software()
def __del__(self):
self.arduino.close()
def _check_arduino_software(self):
"""
Run the identify command and raise an Exception
if the Arduino does not reply with the expected output.
"""
self._write('i')
lines = self.read()
if len(lines) < 1 or not lines[-1].startswith(bytes("Arduino Nano CPD 1", "utf-8")):
print(lines)
raise Exception("Arduino did not return the expected output - does it have the correct software loaded?")
def _write(self, val):
self.arduino.write(bytes(val, 'utf-8'))
def read(self):
data = self.arduino.readlines()
return data
def on(self):
self._write("1")
def off(self):
self._write("0")
if __name__ == '__main__':
led = LEDD1B()

View File

@ -1,10 +1,8 @@
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from .backends.keithley import keithley def _update_print(i, tval, vval):
print(f"n = {i:5d}, t = {tval: .2f} s, U = {vval: .5f} V" + " "*10, end='\r')
def _update_print(i, ival, vval):
print(f"n = {i:5d}, I = {ival: .12f} A, U = {vval: .5f} V" + " "*10, end='\r')
class _Monitor: class _Monitor:
""" """
@ -15,37 +13,30 @@ class _Monitor:
self.use_print = use_print self.use_print = use_print
self.index = [] self.index = []
self.vdata = [] self.vdata = []
self.idata = [] self.tdata = []
plt.ion() plt.ion()
self.fig1, (self.vax, self.iax) = plt.subplots(2, 1, figsize=(8, 5)) self.fig1, self.vax = plt.subplots(1, 1, figsize=(8, 5))
self.vline, = self.vax.plot(self.index, self.vdata, color="g") self.vline, = self.vax.plot(self.tdata, self.vdata, color="g")
self.vax.set_xlabel("time [s]")
self.vax.set_ylabel("Voltage [V]") self.vax.set_ylabel("Voltage [V]")
self.vax.grid(True) self.vax.grid(True)
self.iline, = self.iax.plot(self.index, self.idata, color="m")
self.iax.set_ylabel("Current [A]")
self.iax.grid(True)
def update(self, i, ival, vval): def update(self, i, tval, vval):
if self.use_print: if self.use_print:
_update_print(i, ival, vval) _update_print(i, tval, vval)
self.index.append(i) self.index.append(i)
self.idata.append(ival) self.tdata.append(tval)
self.vdata.append(vval) self.vdata.append(vval)
# update data # update data
self.iline.set_xdata(self.index) self.vline.set_xdata(self.tdata)
self.iline.set_ydata(self.idata)
self.vline.set_xdata(self.index)
self.vline.set_ydata(self.vdata) self.vline.set_ydata(self.vdata)
# recalculate limits and set them for the view # recalculate limits and set them for the view
self.iax.relim()
self.vax.relim() self.vax.relim()
if self.max_points_shown and i > self.max_points_shown: if self.max_points_shown and i > self.max_points_shown:
self.iax.set_xlim(i - self.max_points_shown, i)
self.vax.set_xlim(i - self.max_points_shown, i) self.vax.set_xlim(i - self.max_points_shown, i)
self.iax.autoscale_view()
self.vax.autoscale_view() self.vax.autoscale_view()
# update plot # update plot
self.fig1.canvas.draw() self.fig1.canvas.draw()

View File

@ -19,6 +19,7 @@ classifiers = [
] ]
dependencies = [ dependencies = [
"matplotlib>=3.6", "matplotlib>=3.6",
"pyqt6",
"numpy", "numpy",
"pandas", "pandas",
] ]