From a32f9e0183f08fb1158385fb06e05e17e02b9383 Mon Sep 17 00:00:00 2001 From: CPD Date: Fri, 17 Jan 2025 15:59:12 +0100 Subject: [PATCH] Add Arduino LED, Keithley2700 cleanup --- .gitignore | 1 + cpdctrl/backends/keithley/keithley.py | 181 ++++++++++++++++----- cpdctrl/backends/keithley/measure.py | 93 ----------- cpdctrl/cpdctrl-interactive.py | 43 +---- cpdctrl/keithley_scripts/buffer_reset.scpi | 2 + cpdctrl/keithley_scripts/reset.scpi | 3 + cpdctrl/led/__init__.py | 2 + cpdctrl/led/thorlabs_ledd1b.py | 35 ++++ cpdctrl/update_funcs.py | 29 ++-- pyproject.toml | 1 + 10 files changed, 202 insertions(+), 188 deletions(-) delete mode 100644 cpdctrl/backends/keithley/measure.py create mode 100644 cpdctrl/keithley_scripts/buffer_reset.scpi create mode 100644 cpdctrl/led/__init__.py create mode 100644 cpdctrl/led/thorlabs_ledd1b.py diff --git a/.gitignore b/.gitignore index f3d3be5..9f2c774 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ __pycache__ .ipynb_checkpoints .vim .old +.spyproject testing bspc_rule.sh readings.txt diff --git a/cpdctrl/backends/keithley/keithley.py b/cpdctrl/backends/keithley/keithley.py index 4003b93..3998d3d 100644 --- a/cpdctrl/backends/keithley/keithley.py +++ b/cpdctrl/backends/keithley/keithley.py @@ -1,101 +1,159 @@ import pyvisa import numpy as np +from time import sleep # import pkg_resources +import os + +""" +Utility +""" + +# scripts = { +# "buffer_reset": pkg_resources.resource_filename("cpdctrl", "keithley_scripts/buffer_reset.lua"), +# "instrument_reset": pkg_resources.resource_filename("cpdctrl", "keithley_scripts/smua_reset.lua"), +# } +scripts = { + + "instrument_reset": "~/cpd-dev/cpdctrl/cpdctrl/keithley_scripts/reset.scpi", +} -class Keithley2007: - + +class Keithley2700: """ - Utility + Wrapper class for the Keithley2700 SMU controlled via pyvisa """ - scripts = {} - def __init__(self, beep_success=True, visa_backend="", check_front_switch=True): + def __init__(self, visa_backend="", check_front_switch=True): rm = pyvisa.ResourceManager(visa_backend) resources = rm.list_resources() if len(resources) < 1: raise Exception("No resources found.") elif len(resources) == 1: print(f"Opening the only resource found: {resources[0]}") - self. = rm.open_resource(resources[0]) - if beep_success: keithley.write("beeper.beep(0.5, 1000)") - return keithley + self.instr = rm.open_resource(resources[0]) elif len(resources) > 1: print(f"Resources:") for i, r in enumerate(resources): print(f"{i:02}: {r}") instr = int(input("Select an instrument: ")) self.instr = rm.open_resource(resources[instr]) - if beep_success: self.beep() if check_front_switch: self._check_front_input_selected() + + def __del__(self): + """Properly close the instrument connection""" + self.instr.close() def _check_front_input_selected(self): + """ + Make sure the front switch selecting the inputs selects the FRONT inputs + Raises + ------ + Exception + If front input state != 1 + + Returns + ------- + switch : str + Front input state + """ switch = self.instr.query("SYSTem:FRSwitch?").strip("\n") if switch != "1": raise Exception("The Keithley's INPUT switch must select the [F]ront inputs") return switch - - def run_script(self, script_path, verbose=False): + def run(self, code, verbose=False): + """ + Run SCPI code on the device by writing it. + Empty lines, leading whitespaces and lines starting with ' or # are ignored. + Parameters + ---------- + code : str + SCPI commands + """ + script = '\n'.join([l.strip(" ") for l in code.strip(" ").strip("\n").split("\n") if len(l) > 0 and l[0] not in "#'"]) + if verbose: + print(f"Running code:\n{script}") + self.instr.write(script) + + def run_script(self, script_path, verbose=False): """ Run a script from the host on the instrument @param script_path : full path to the script + + Parameters + ---------- + script_path : str + full path to the script + verbose : bool, optional + If true, print a message when the script is run. The default is False. + + Returns + ------- + None. """ - with open(script_path, "r") as file: + with open(os.path.expanduser(script_path), "r") as file: script = file.read() + if verbose: print(f"Running script: {script_path}") - self.instr.write(script) + self.run(script) def beep(self, length=0.5, pitch=1000): """The Keithley2700 cant beep on command :(""" pass - - def __del__(self): - """Properly close the instrument connection""" - self.instr.close() - + def reset(self, verbose=False): """ Reset smua and its buffers @param instr : pyvisa instrument """ - self.run_script(self.instr, scripts["smua_reset"], verbose=verbose) - run_lua(self.instr, scripts["buffer_reset"], verbose=verbose) + self.run_script(scripts["instrument_reset"], verbose=verbose) + self.buffer_reset() - - - def get_buffer_size(instr, buffer_nr=1): - n = instr.query(f"print({get_buffer_name(buffer_nr)}.n)").strip("\n") + def buffer_reset(self): + buffer_reset = """ + TRACe:CLEar + TRACe:CLEar:AUTO ON + SYSTem:TSTamp:TYPE RELative + """ + self.run(buffer_reset) + + def buffer_get_size(self, buffer_nr=1): + n = self.instr.query(f"TRACe:POINts?").strip("\n") return int(float(n)) - - def collect_buffer(instr, buffer_nr=1, verbose=False): + def buffer_set_size(s): + if not type(s) == int or s < 2 or s > 55000: + raise ValueError(f"Invalid buffer size: {s}. Must be int and between 2 and 55000") + self.instr.write(f"TRACe:POINts {s}") + + def buffer_collect(self, verbose=False): """ Get the buffer as 2D - np.array - @param instr : pyvisa instrument @param buffer_nr : 1 or 2, for smua.nvbuffer1 or 2 @returns 2D numpy array: i - ith reading: 0: timestamps 1: readings """ - buffername = get_buffer_name(buffer_nr) - # instr.write("format.data = format.DREAL\nformat.byteorder = format.LITTLEENDIAN") - # buffer = instr.query_binary_values(f"printbuffer(1, {buffername}.n, {buffername})", datatype='d', container=np.array) - instr.write("format.data = format.ASCII\nformat.asciiprecision = 7") - timestamps = instr.query_ascii_values(f"printbuffer(1, {buffername}.n, {buffername}.timestamps)", container=np.array) - readings = instr.query_ascii_values(f"printbuffer(1, {buffername}.n, {buffername}.readings)", container=np.array) + readings = self.instr.query("TRACe:DATA?") + tdata = [] + vdata = [] + for reading in readings.split("\n"): + t, v = self.process_reading(reading) + tdata.append(t) + vdata.append(v) + if verbose: - print(f"readings from {buffername}: {readings}, \ntimestamps: {timestamps}") - buffer = np.vstack((timestamps, readings)).T + print(f"readings from buffer:\n{vdata}\ntimestamps:\n{tdata}") + buffer = np.vstack((tdata, vdata)).T return buffer - - def collect_buffer_range(instr, range_=(1, -1), buffer_nr=1, verbose=False): + def collect_buffer_range(self, range_=(1, -1), buffer_nr=1, verbose=False): """ Get the buffer as 2D - np.array @param instr : pyvisa instrument @@ -118,7 +176,50 @@ class Keithley2007: buffer = np.vstack((timestamps, readings)).T return buffer + def process_reading(self, reading: str): + """ + process a reading. Only works with VDC and relative time stamps right now! + '-1.19655066E+01VDC,+9627.275SECS,+64993RDNG#\n' + """ + parts = reading.split(",") + if len(parts) != 3: + raise ValueError(f"Invalid reading: '{reading}'") + vdc = float(parts[0][:-3]) + timestamp = float(parts[1][:-4]) + # TODO last value + return timestamp, vdc + + def read_value(self): + reading = self.instr.query("READ?") + return self.process_reading(reading) - - - + def measure(self, interval, update_func=None, max_measurements=None): + """ + @details: + - Resets the buffers + - Until KeyboardInterrupt: + - Take measurement + - Call update_func + - Wait interval + Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision + You can take the data from the buffer afterwards, using save_csv + @param instr: pyvisa instrument + @param update_func: Callable that processes the measurements: (index, ival, vval) -> None + @param max_measurements : maximum number of measurements. None means infinite + """ + self.reset(verbose=True) + try: + i = 0 + while max_measurements is None or i < max_measurements: + tval, vval = self.read_value() + if update_func: + update_func(i, tval, vval) + sleep(interval) + i += 1 + except KeyboardInterrupt: + pass + # instr.write("smua.source.output = smua.OUTPUT_OFF") + print("Measurement stopped" + " "*50) + +def init(): + return Keithley2700() diff --git a/cpdctrl/backends/keithley/measure.py b/cpdctrl/backends/keithley/measure.py deleted file mode 100644 index f688c8e..0000000 --- a/cpdctrl/backends/keithley/measure.py +++ /dev/null @@ -1,93 +0,0 @@ -from time import sleep -import numpy as np -from matplotlib import pyplot as plt -import pyvisa - -from cpdctrl.backends.keithley.keithley import reset -from cpdctrl.utility import testing as _testing - -def measure_count(instr, count=100, interval=0.05, update_func=None, update_interval=0.5, beep_done=True, verbose=True): - """ - Take measurements with inbetween - - @details - Uses the devices overlappedY function to make the measurements asynchronosly - The update_func is optional and only used when I == True and V == True - The update_func does not necessarily get all the values that are measured. To obtain the whole measurement, get them from the device buffers (smua.nvbufferX) - @param instr: pyvisa instrument - @param update_func: Callable that processes the measurements: (index, ival, vval) -> None - @param update_interval: interval at which the update_func is called - """ - f_meas = "smua.measure.overlappediv(smua.nvbuffer1, smua.nvbuffer2)" - # if V and I: - # elif V: - # f_meas = "smua.measure.overlappedv(smua.nvbuffer1)" - # elif I: - # f_meas = "smua.measure.overlappedi(smua.nvbuffer1)" - # else: - # print("I and/or V needs to be set to True") - # return - - i = 0 - reset(instr, verbose=verbose) - instr.write(f"smua.measure.count = {count}") - instr.write(f"smua.measure.interval = {interval}") - - # start measurement - instr.write(f"smua.source.output = smua.OUTPUT_ON") - instr.write(f_meas) - - sleep(update_interval) - # for live viewing - query = """if smua.nvbufferX.n > 0 then print(smua.nvbufferX.readings[smua.nvbufferX.n]) else print(0) end""" - - # will return 2.0 while measruing - while float(instr.query("print(status.operation.measuring.condition)").strip("\n ")) != 0: - if update_func: - try: - ival = float(instr.query(query.replace("X", "1")).strip("\n")) - vval = float(instr.query(query.replace("X", "2")).strip("\n")) - update_func(i, ival, vval) - except ValueError as e: - if i != 0: - pass - else: - print(f"measure_count: ValueError: {e}") - sleep(update_interval) - i += 1 - - instr.write(f"smua.source.output = smua.OUTPUT_OFF") - - if beep_done: - instr.write("beeper.beep(0.3, 1000)") - - -def measure(instr, interval, update_func=None, max_measurements=None): - """ - @details: - - Resets the buffers - - Until KeyboardInterrupt: - - Take measurement - - Call update_func - - Wait interval - Uses python's time.sleep() for waiting the interval, which is not very precise. Use measure_count for better precision - You can take the data from the buffer afterwards, using save_csv - @param instr: pyvisa instrument - @param update_func: Callable that processes the measurements: (index, ival, vval) -> None - @param max_measurements : maximum number of measurements. None means infinite - """ - reset(instr, verbose=True) - instr.write("smua.source.output = smua.OUTPUT_ON") - instr.write("format.data = format.ASCII\nformat.asciiprecision = 12") - try: - i = 0 - while max_measurements is None or i < max_measurements: - ival, vval = tuple(float(v) for v in instr.query("print(smua.measure.iv(smua.nvbuffer1, smua.nvbuffer2))").strip('\n').split('\t')) - if update_func: - update_func(i, ival, vval) - sleep(interval) - i += 1 - except KeyboardInterrupt: - pass - instr.write("smua.source.output = smua.OUTPUT_OFF") - print("Measurement stopped" + " "*50) diff --git a/cpdctrl/cpdctrl-interactive.py b/cpdctrl/cpdctrl-interactive.py index c5a4c53..2759e00 100644 --- a/cpdctrl/cpdctrl-interactive.py +++ b/cpdctrl/cpdctrl-interactive.py @@ -31,8 +31,8 @@ if __name__ == "__main__": filepath = path.realpath(path.abspath(__file__)) sys.path.insert(0, path.dirname(path.dirname(filepath))) parser = argparse.ArgumentParser( - prog="m-teng", - description="measure triboelectric nanogenerator output using a Keithley SMU or an Arduino", + prog="cpdctrl", + description="measure voltage using a Keithley SMU", ) backend_group = parser.add_mutually_exclusive_group(required=True) backend_group.add_argument("-k", "--keithley", action="store_true") @@ -44,10 +44,7 @@ if __name__ == "__main__": while i < len(sys.argv): if args["keithley"]: import cpdctrl.backends.keithley.keithley as _backend - import cpdctrl.backends.keithley.measure as _measure - elif args["arduino"]: - import cpdctrl.backends.arduino.arduino as _backend - import cpdctrl.backends.arduino.measure as _measure + # import cpdctrl.backends.keithley.measure as _measure elif args["testing"]: import cpdctrl.backends.testing.testing as _backend import cpdctrl.backends.testing.measure as _measure @@ -158,7 +155,7 @@ def monitor(interval=None, max_measurements=None, max_points_shown=160): print(f"Starting measurement with:\n\tinterval = {interval}s\nUse to stop. Save the data using 'save_csv()' afterwards.") plt_monitor = _Monitor(use_print=True, max_points_shown=max_points_shown) update_func = plt_monitor.update - _measure.measure(dev, interval=interval, max_measurements=max_measurements, update_func=update_func) + dev.measure(interval=interval, max_measurements=max_measurements, update_func=update_func) def measure(interval=None, max_measurements=None): @@ -178,33 +175,7 @@ def measure(interval=None, max_measurements=None): _runtime_vars["last_measurement"] = dtime.now().isoformat() print(f"Starting measurement with:\n\tinterval = {interval}s\nUse to stop. Save the data using 'save_csv()' afterwards.") update_func = _update_print - _measure.measure(dev, interval=interval, max_measurements=max_measurements, update_func=update_func) - - -def repeat(measure_func: callable, count: int, repeat_delay=0): - """ - Measure and save to csv multiple times - - @details - Repeat count times: - - call measure_func - - call save_csv - - sleep for repeat_delay - - @param measure_func: The measurement function to use. Use a lambda to bind your parameters! - @param count: Repeat count times - - Example: Repeat 10 times: - repeat(lambda : monitor_count(count=6000, interval=0.02, max_points_shown=200), 10) - """ - try: - for _ in range(count): - measure_func() - save_csv() - sleep(repeat_delay) - except KeyboardInterrupt: - pass - if settings["beep"]: _backend.beep(dev) + dev.measure(interval=interval, max_measurements=max_measurements, update_func=update_func) def get_dataframe(): @@ -369,11 +340,11 @@ Enter 'help()' for a list of commands""") makedirs(settings["datadir"]) try: - dev = _backend.init(beep_success=settings["beep"]) + dev = _backend.init() except Exception as e: print(e) exit(1) - atexit.register(_backend.exit, dev) + # atexit.register(_backend.exit, dev) if __name__ == "__main__": diff --git a/cpdctrl/keithley_scripts/buffer_reset.scpi b/cpdctrl/keithley_scripts/buffer_reset.scpi new file mode 100644 index 0000000..4b5aad3 --- /dev/null +++ b/cpdctrl/keithley_scripts/buffer_reset.scpi @@ -0,0 +1,2 @@ +TRACe:CLEar +TRACe:CLEar:AUTO ON diff --git a/cpdctrl/keithley_scripts/reset.scpi b/cpdctrl/keithley_scripts/reset.scpi index 5ccb722..f51a436 100644 --- a/cpdctrl/keithley_scripts/reset.scpi +++ b/cpdctrl/keithley_scripts/reset.scpi @@ -3,3 +3,6 @@ VOLT:DC:RANGe:AUTO ON SENSe:FUNC 'VOLT:DC' ' Set voltage divider if required ' SENSE:VOLT:DC:IDIVider OFF + +' Disable continuous initiation +INIT:CONT OFF \ No newline at end of file diff --git a/cpdctrl/led/__init__.py b/cpdctrl/led/__init__.py new file mode 100644 index 0000000..633f866 --- /dev/null +++ b/cpdctrl/led/__init__.py @@ -0,0 +1,2 @@ +# -*- coding: utf-8 -*- + diff --git a/cpdctrl/led/thorlabs_ledd1b.py b/cpdctrl/led/thorlabs_ledd1b.py new file mode 100644 index 0000000..91e0ec1 --- /dev/null +++ b/cpdctrl/led/thorlabs_ledd1b.py @@ -0,0 +1,35 @@ +import serial + +class LEDD1B: + def __init__(self, port="COM4"): + self.arduino = serial.Serial(port=port, baudrate=9600, timeout=.1) + self._check_arduino_software() + + def __del__(self): + self.arduino.close() + + def _check_arduino_software(self): + """ + Run the identify command and raise an Exception + if the Arduino does not reply with the expected output. + """ + self._write('i') + lines = self.read() + if len(lines) < 1 or not lines[-1].startswith(bytes("Arduino Nano CPD 1", "utf-8")): + print(lines) + raise Exception("Arduino did not return the expected output - does it have the correct software loaded?") + + def _write(self, val): + self.arduino.write(bytes(val, 'utf-8')) + + def read(self): + data = self.arduino.readlines() + return data + + def on(self): + self._write("1") + def off(self): + self._write("0") + +if __name__ == '__main__': + led = LEDD1B() diff --git a/cpdctrl/update_funcs.py b/cpdctrl/update_funcs.py index 3524db0..8f66141 100644 --- a/cpdctrl/update_funcs.py +++ b/cpdctrl/update_funcs.py @@ -1,10 +1,8 @@ import matplotlib.pyplot as plt import numpy as np -from .backends.keithley import keithley - -def _update_print(i, ival, vval): - print(f"n = {i:5d}, I = {ival: .12f} A, U = {vval: .5f} V" + " "*10, end='\r') +def _update_print(i, tval, vval): + print(f"n = {i:5d}, t = {tval: .2f} s, U = {vval: .5f} V" + " "*10, end='\r') class _Monitor: """ @@ -15,37 +13,30 @@ class _Monitor: self.use_print = use_print self.index = [] self.vdata = [] - self.idata = [] + self.tdata = [] plt.ion() - self.fig1, (self.vax, self.iax) = plt.subplots(2, 1, figsize=(8, 5)) + self.fig1, self.vax = plt.subplots(1, 1, figsize=(8, 5)) - self.vline, = self.vax.plot(self.index, self.vdata, color="g") + self.vline, = self.vax.plot(self.tdata, self.vdata, color="g") + self.vax.set_xlabel("time [s]") self.vax.set_ylabel("Voltage [V]") self.vax.grid(True) - self.iline, = self.iax.plot(self.index, self.idata, color="m") - self.iax.set_ylabel("Current [A]") - self.iax.grid(True) - def update(self, i, ival, vval): + def update(self, i, tval, vval): if self.use_print: - _update_print(i, ival, vval) + _update_print(i, tval, vval) self.index.append(i) - self.idata.append(ival) + self.tdata.append(tval) self.vdata.append(vval) # update data - self.iline.set_xdata(self.index) - self.iline.set_ydata(self.idata) - self.vline.set_xdata(self.index) + self.vline.set_xdata(self.tdata) self.vline.set_ydata(self.vdata) # recalculate limits and set them for the view - self.iax.relim() self.vax.relim() if self.max_points_shown and i > self.max_points_shown: - self.iax.set_xlim(i - self.max_points_shown, i) self.vax.set_xlim(i - self.max_points_shown, i) - self.iax.autoscale_view() self.vax.autoscale_view() # update plot self.fig1.canvas.draw() diff --git a/pyproject.toml b/pyproject.toml index 5067227..bdaa90e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ classifiers = [ ] dependencies = [ "matplotlib>=3.6", + "pyqt6", "numpy", "pandas", ]