finish initial setup

This commit is contained in:
JohannesDittloff 2025-05-02 15:48:36 +02:00
parent 6c9e68d05c
commit a5e2105668
40 changed files with 1715 additions and 100 deletions

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
.idea
.spyproject
.venv
__pychache__/
__pychache__/**
cpdctrl
benhw64.dll
test2.py

56
Bentham.py Normal file
View File

@ -0,0 +1,56 @@
# proprietary package :/
import pyBen
from os import path
class Bentham():
"""
Controls the Bentham TMC300 monochromator.
This is the source of the probe light beam
"""
def __init__(self):
self.path_cfg = path.abspath('system.cfg')
self.path_atr = path.abspath('systemAll-1nm.atr')
self.parked = -1
self.wavelength = -1
def return_parameters(self):
"""
Returns parameters relevant for this device.
New parameters to be saved can be added here
"""
parameters = {'bentham cfg_path': self.path_cfg,
'bentham atr_path': self.path_atr,
'bentham wavelength': self.wavelength}
return parameters
def park(self):
"""
Parks the TMC300. Important to do at least once after new lamp was started
"""
pyBen.build_system_model(self.path_cfg)
pyBen.load_setup(self.path_atr)
pyBen.initialise()
pyBen.park()
print('TMC300 initialized and parked')
self.parked = 1
def drive(self, wavelength):
"""
Changes the probe wavelength
"""
pyBen.build_system_model(self.path_cfg)
pyBen.load_setup(self.path_atr)
pyBen.initialise()
pyBen.select_wavelength(wavelength, 0)
print('TMC300 wavelength set to: %0.1f' %wavelength + 'nm')
self.wavelength = wavelength
self.parked = 0
if __name__ == '__main__':
from DeviceManager import DeviceManager
dm = DeviceManager()
dm.bentham.park()
dm.bentham.drive(510)

59
devices/Shutter.py Normal file
View File

@ -0,0 +1,59 @@
from pylablib.devices import NI
class Shutter():
def __init__(self):
self.daq_name = 'Dev1'
class ShutterPump(Shutter):
"""
Controls shutter between opo and sample.
Intended to be used instead of switching laser on and off.
Not yet in use.
"""
def __init__(self):
super().__init__()
def open_(self):
with NI.NIDAQ(self.daq_name) as shutter:
shutter.add_voltage_output('vout', 'ao1', (0,5))
shutter.set_voltage_outputs('vout', 5)
print('Pump shutter opened')
def close_(self):
with NI.NIDAQ(self.daq_name) as shutter:
shutter.add_voltage_output('vout', 'ao1', (0,5))
shutter.set_voltage_outputs('vout', 0)
print('Pump shutter closed')
class ShutterProbe(Shutter):
"""
Controls shutter between bentham TMC300 and sample.
Blocks probe light when not needed.
"""
def __init__(self):
super().__init__()
def open_(self):
with NI.NIDAQ(self.daq_name) as shutter:
shutter.add_voltage_output('vout', 'ao0', (0,5))
shutter.set_voltage_outputs('vout', 5)
print('Probe shutter opened')
def close_(self):
with NI.NIDAQ(self.daq_name) as shutter:
shutter.add_voltage_output('vout', 'ao0', (0,5))
shutter.set_voltage_outputs('vout', 0)
print('Probe shutter closed')
if __name__ == '__main__':
from DeviceManager import DeviceManager
dm = DeviceManager()
#dm.shutter_probe.open_()
dm.shutter_probe.close_()
#dm.shutter_pump.open_()
#dm.shutter_pump.close_()

94
devices/Xenon.py Normal file
View File

@ -0,0 +1,94 @@
import bendev
from Bentham import Bentham
import time
import datetime
class Xenon():
def __init__(self):
self.ps_serial_num = '32540/2'
self.state = 0
self.default_current = 5.4010
def start(self):
"""
Starts xenon probe light lamp. Assumes lamp is connected
to given power supply.
Bentham TMC300 is also parked to ensure later operation.
"""
psu = self.apply_current(self.default_current)
psu.query(":SYST:ERR:COUNT?")
psu.write("OUTP ON"); psu.query("OUTP?")
print('Xenon lamp switched on')
Bentham().park()
self.state = 1
def stop(self):
psu = bendev.Device(self.ps_serial_num)
psu.query(":SYST:ERR:COUNT?")
psu.write("OUTP OFF"); psu.query("OUTP?")
print('Xenon lamp switched off')
self.state = 0
def reset_burntime(self):
# Burntime of first lamp: 2900h
# New lamp 25% light intensity increase
psu = bendev.Device(self.ps_serial_num)
psu.query(":SYST:ERR:COUNT?")
psu.write(":RESEt:BURNtime"); psu.query("OUTP?")
print('Xenon burntime reset')
def apply_current(self, current_to_set):
psu = bendev.Device(self.ps_serial_num)
argument = ":SOUR:CURR " + str(current_to_set)
psu.write(argument); psu.query(":CURR?")
return psu
def __status(self):
psu = bendev.Device(self.ps_serial_num)
if float(psu.query(":CURR?")) < 1:
self.state = 0
else:
self.state = 1
def initialize(self):
"""
Checks if xenon is switched on.
If not switches xenon on.
"""
self.__status()
if self.state == 0:
print('Xenon lamp was just started. A warm up time of at least 60min is necessary!')
self.start()
else:
self.apply_current(self.default_current)
print('Xenon current reset to default.')
def preset_start(self, hour=6, minute=00):
"""
Allows for delayed xenon start. Can be used to allow for warmup time in morning.
Max delay 23h 59m.
"""
current_time = datetime.datetime.today()
desired_day = current_time.day
if current_time.hour > hour:
desired_day += 1
desired_time = datetime.datetime(current_time.year, current_time.month, desired_day, hour, minute)
print('Xenon will be started: ', desired_time)
while int(desired_time.timestamp()) - int(datetime.datetime.today().timestamp()) > 0:
remaining = int(desired_time.timestamp()) - int(datetime.datetime.today().timestamp())
print('\rWaiting: ' + time.strftime('%H:%M:%S', time.gmtime(remaining)) + ' remaining until xenon startup', end='')
time.sleep(1)
print('')
self.start()
if __name__ == '__main__':
from DeviceManager import DeviceManager
dm = DeviceManager()
#dm.xenon.apply_current(5.41)
#dm.xenon.initialize()
dm.xenon.stop()
#dm.xenon.initialize()
#dm.xenon.preset_start(8,00)

Binary file not shown.

Binary file not shown.

View File

@ -2,7 +2,8 @@
from .base import MeasurementDevice
TYPENAME_TEST = "Test"
TYPENAME_KEITHLEY2700 = "Keithley 2700"
TYPENAME_SR830 = "SR830"
TYPENAME_MODEL7260 = "Model7260"
try:
from .impl.sr830 import SR830
@ -13,11 +14,16 @@ from .impl.test import TestVoltageMeasurementDevice
def list_devices() -> dict[str,list[str]]:
devices = {
TYPENAME_TEST: ["Voltage Measurement Dummy Device"],
TYPENAME_TEST: ["Measurement Dummy Device"],
}
try:
from .impl.sr830 import SR830
devices[TYPENAME_KEITHLEY2700] = SR830.enumerate_devices()
devices[TYPENAME_SR830] = SR830.enumerate_devices()
except ImportError:
pass
try:
from .impl.model7260 import Model7260
devices[TYPENAME_MODEL7260] = Model7260.enumerate_devices()
except ImportError:
pass
return devices
@ -25,10 +31,10 @@ def list_devices() -> dict[str,list[str]]:
def connect_device(type_name: str, device_name: str) -> MeasurementDevice:
if type_name == TYPENAME_TEST:
return TestVoltageMeasurementDevice()
elif type_name == TYPENAME_KEITHLEY2700:
elif type_name == TYPENAME_SR830:
try:
from .impl.sr830 import SR830
return SR830.connect_device(device_name)
except ImportError as e:
raise ValueError(f"Keithley 2700 devices not available: {e}")
raise ValueError(f"SR830 not available: {e}")
raise ValueError(f"Unknown device type {type_name}")

Binary file not shown.

Binary file not shown.

View File

@ -19,9 +19,6 @@ class MeasurementDevice(ABC):
@abstractmethod
def run(self, code, verbose=False):
pass
@abstractmethod
def run_script(self, script_path, verbose=False):
pass
@abstractmethod
def reset(self, verbose=False):
@ -38,8 +35,9 @@ class MeasurementDevice(ABC):
"""
pass
# def measure(self, interval: int, update_func: Callable[None, [int, float, float]]|None=None, max_measurements:int|None=None):
@abstractmethod
def measure(self, interval: int, update_func: Callable[None, [int, float, float]]|None=None, max_measurements:int|None=None):
def measureTODO():
"""
Take voltage readings after <interval> milliseconds.
@ -58,6 +56,7 @@ class MeasurementDevice(ABC):
"""
pass
@abstractmethod
def __str__(self):
pass

View File

@ -0,0 +1,266 @@
import pyvisa
from time import sleep
# import pkg_resources
import os
from typing import Callable
from ..base import MeasurementDevice
from ...utility.visa import enumerate_devices
import logging
log = logging.getLogger(__name__)
import numpy as np
class Model7260(MeasurementDevice):
"""
Wrapper class for the Model 7260 DSP Lock-In controlled via pyvisa
"""
def __init__(self, instr, check_front_switch=True):
self.instr = instr
init_script = """
' INPUT
' voltage
IMODE 0
' set input A
VMODE 1
' set float
FLOAT 1
' set AC coupling
CP 0
' Enable 1x and 2x Line notch filters
LF 4
' Enable synchronous filter < 200Hz
' SYNC 1
' REFERENCE
' Set external TTL rear panel
IE 1
' Set detection harmonic to 1'
REFN 1
' TODO time constant, filter slope,
"""
self.run(init_script)
self._buffer_length = None
def __del__(self):
"""Properly close the instrument connection"""
self.instr.close()
# RUN COMMANDS ON THE DEVICE
def run(self, code, split=True):
"""
Run SCPI code on the device by writing it.
Empty lines, leading whitespaces and lines starting with ' or # are ignored.
Parameters
----------
code : str
SCPI commands
"""
script = [] if split else ''
for line in code.strip(" ").split("\n"):
l = line.strip(" ")
if len(l) == 0 or l[0] in "#'": continue
if split:
script.append(l)
else:
script += l + "\n"
if split:
for cmd in script:
self.run_and_check(cmd)
else:
self.run_and_check(script)
def run_and_check(self, cmd):
"""Run a command and check for errors afterwards by reading the
standard event status byte"""
try:
log.debug(f"Running command(s): --BEGIN--\n{cmd}\n--END--")
ret = self.instr.write(cmd)
status = int(self.query("ST"))
if status & (1 << 1):
raise RuntimeError(f"Invalid command(s):\n{cmd}\n")
elif status & (1 << 5):
raise RuntimeError(f"Command parameter error after command(s):\n{cmd}\n")
# if ret != 8:
# raise RuntimeError(f"Error while writing command(s):\n'{script}'\n\nDevice returned code {ret}")
except pyvisa.VisaIOError as e:
raise RuntimeError(f"VisaIOError raised while writing command(s):\n'{cmd}'\n\nVisaIOError:\n{e}")
def get_frequency(self) -> float:
# TODo
pass
def query(self, query):
return self.instr.query(query, delay=0.05).strip("\n")
def snap(self, what="3,4,5,7"):
"""
Parameters
----------
what : TYPE, optional
1 X
2 Y
3 R
4 θ
5 Aux In 1
6 Aux In 2
7 Aux In 3
8 Aux In 4
9 Reference Frequency
10 CH1 display
11 CH2 display
The default is "3,4,5,7".
Returns
-------
vals : list[float]
Values converted to float
"""
vals = self.query(f"SNAP? {what}").split(",")
vals = map(float, vals)
return vals
def measureTODO(): pass
def read_value(self):
"""Read the value of R"""
return float(self.query("MAG."))
def reset(self):
# TODO
pass
def test_connection(self):
pass
def buffer_setup(self, X=0, Y=0, MAG=1, PHASE=1, SENS=0, ADC1=1, ADC2=2, length=None, interval_ms=5):
"""
Prepare the device for a buffer (curve) measurement
:param X: 1 if the output should be stored, 0 else
:param Y: 1 if the output should be stored, 0 else
:param MAG: 1 if the output should be stored, 0 else
:param PHASE: 1 if the output should be stored, 0 else
:param SENS: 1 if the output should be stored, 0 else
:param ADC1: 1 if the output should be stored, 0 else
:param ADC2: 1 if the output should be stored, 0 else
:param length: Number of values to store in the buffer, or None to use the maximum number of values
:param interval_ms: Interval in ms with resolution of 5ms
"""
self.buffer_cbd = (X << 0) | (Y << 1) | (MAG << 2) | (PHASE << 3) | (SENS << 4) | (ADC1 << 5) | (ADC2 << 6)
self.run(f"CBD {self.buffer_cbd}")
n_outputs = self.buffer_cbd.bit_count()
# length
max_length = 32768 // n_outputs
log.info(f"Setting up buffer, n_outputs={n_outputs}, max_length={max_length}, CBD={self.buffer_cbd:016b}")
if length is None:
self._buffer_length = max_length
else:
self._buffer_length = length
if self._buffer_length > max_length:
raise ValueError(f"For {n_outputs} outputs, the maximum length is {max_length} but {length} was given.")
self.run(f"LEN {self._buffer_length}")
# set the interval
self.run(f"STR {interval_ms}")
# init the buffer (new curve)
self.run("NC")
def buffer_start_fill(self):
if self._buffer_length is None: raise RuntimeError(f"Buffer not set up, call buffer_setup() first.")
self.run("TD") # take data
def buffer_is_done(self) -> bool:
vals = self.query("M").split(",")
vals = list(map(int, vals))
log.info(f"buffer status: {vals[0]}, {vals[1]}, {vals[2]:08b}, {vals[3]}")
return vals[0] == 0
def buffer_get_data(self, raw=False):
"""
Get the data from the buffer.
:return:
"""
if self._buffer_length is None: raise RuntimeError(f"Buffer not set up, call buffer_setup() first.")
n_points = int(self.query("M").split(",")[3])
data = np.empty((n_points, self.buffer_cbd.bit_count()), dtype=int)
self.instr.write(f"DCT {self.buffer_cbd}") # this is a query, we need to read the next <n_points> lines now
for i in range(n_points):
vals = self.instr.read().strip("\n").split(",")
data[i,:] = [int(x) for x in vals]
# check if all values have been transmitted by issuing another command
length = self.query("LEN")
try:
length = int(length)
except ValueError:
raise RuntimeError(f"Error buffer data transmission: Buffer length {length} is not an integer.")
if length != self._buffer_length:
raise RuntimeError(f"Error buffer data transmission: Buffer length reported as {length}, but should be {self._buffer_length}.")
if raw:
return data
return self._buffer_convert_from_full_scale(data)
def _buffer_convert_from_full_scale(self, data):
"""
Convert values from fullscale to Volts and Degrees
See section 6.4.09 on page 141 in the manual
:param data: full scale data
:return: data converted to Volt and Degrees
"""
# get full voltage scale
# if the sensitivity would have been stored (bit 4), its value could be used too
fs = float(self.query("SEN."))
scales = [
1E-4*fs, # X
1E-4*fs, # Y
1E-4*fs, # MAG
1E-2, # Phase FS=18000 = 180°
1, # Sens
1E-3, # ADC1 FS=10000 = 10 V
1E-3, # ADC2 FS=10000 = 10 V
1E-3, # ADC3 FS=10000 = 10 V
]
data_conv = np.empty(data.shape, dtype=float)
i_bit = 0
for i_val in range(self.buffer_cbd.bit_count()):
while i_bit < len(scales) and self.buffer_cbd & (1 << i_bit) == 0:
i_bit += 1
if i_bit == len(scales):
raise RuntimeError(f"Could not find the output of value number {i_val+1}")
# i_valth value is the quantity belonging to bit i_bit
data_conv[:,i_val] = data[:,i_val] * scales[i_bit]
return data_conv
def auto_gain(self):
self.instr.write("AGAN")
# wait until the device responds again, meaning it is no longer busy
self.instr.write("SENS?")
i = 0
while i < 50:
try:
gain = self.instr.read().strip("\n")
return gain
except:
i += 1
raise RuntimeError("Could not get response from device after sending auto gain command")
@staticmethod
def enumerate_devices(query="(GPIB)?*:INSTR"):
return enumerate_devices("7260", query, cmd="ID")
@staticmethod
def connect_device(name):
rm = pyvisa.ResourceManager()
instr = rm.open_resource(name)
return Model7260(instr)
def __str__(self):
return "SR830"

View File

@ -0,0 +1,272 @@
import pyvisa
import struct # for converting bytes to float
from time import sleep
# import pkg_resources
import os
from typing import Callable
import numpy as np
from ..base import MeasurementDevice
from ...utility.visa import enumerate_devices
import logging
log = logging.getLogger(__name__)
class SR830(MeasurementDevice):
"""
Wrapper class for the SR830 controlled via pyvisa
"""
def __init__(self, instr, check_front_switch=True):
self._buffer_length = None
self.instr = instr
init_script = """
' INPUT
' set input A
ISRC 0
' set float
IGND 0
' set AC coupling
ICPL 0
' Enable 1x and 2x Line notch filters
ILIN 3
' Enable synchronous filter < 200Hz
SYNC 1
' DATA COLLECTION
' The quantities on the displays are the ones stored in the buffer
' Show R and Phase on the displays
DDEF 1,1,0
DDEF 2,1,0
' Set sample rate to 256 Hz
SRAT 12
' Set buffer loop mode
SEND 1
' REFERENCE
' Set external
FMOD 0
' Set Rising edge
RSLP 1
' Set detection harmonic to 1'
HARM 1
"""
self.run(init_script)
def __del__(self):
"""Properly close the instrument connection"""
self.instr.close()
# RUN COMMANDS ON THE DEVICE
def run(self, code, split=True):
"""
Run SCPI code on the device by writing it.
Empty lines, leading whitespaces and lines starting with ' or # are ignored.
Parameters
----------
code : str
SCPI commands
"""
script = [] if split else ''
for line in code.strip(" ").split("\n"):
l = line.strip(" ")
if len(l) == 0 or l[0] in "#'": continue
if split:
script.append(l)
else:
script += l + "\n"
if split:
for cmd in script:
self.run_and_check(cmd)
else:
self.run_and_check(cmd)
def run_and_check(self, cmd):
"""Run a command and check for errors afterwards by reading the
standard event status byte"""
try:
log.info(f"Running command(s): {cmd}")
ret = self.instr.write(cmd)
status = int(self.query("*ESR?"))
if status & (1 << 4):
raise RuntimeError(f"EXE bit set (Execution error) after command(s):\n{cmd}\n")
elif status & (1 << 5):
raise RuntimeError(f"CMD bit set (Illegal command) after command(s):\n{cmd}\n")
# if ret != 8:
# raise RuntimeError(f"Error while writing command(s):\n'{script}'\n\nDevice returned code {ret}")
except pyvisa.VisaIOError as e:
raise RuntimeError(f"VisaIOError raised while writing command(s):\n'{cmd}'\n\nVisaIOError:\n{e}")
def get_frequency(self) -> float:
return float(self.query("FREQ?"))
def query(self, query):
return self.instr.query(query, delay=0.05).strip("\n")
def snap(self, what="3,4,5,7"):
"""
Parameters
----------
what : TYPE, optional
1 X
2 Y
3 R
4 θ
5 Aux In 1
6 Aux In 2
7 Aux In 3
8 Aux In 4
9 Reference Frequency
10 CH1 display
11 CH2 display
The default is "3,4,5,7".
Returns
-------
vals : list[float]
Values converted to float
"""
vals = self.query(f"SNAP? {what}").split(",")
vals = map(float, vals)
return vals
def measureTODO(): pass
def read_value(self):
"""Read the value of R"""
return float(self.query("OUTP? 3"))
def reset(self):
self.instr.write("*RST")
def test_connection(self):
pass
CH1 = ["X", "R", "X Noise", "Aux In 1", "Aux In 2"]
CH2 = ["Y", "Theta", "Y Noise", "Aux In 3", "Aux In 4"]
SRAT = [62.5e-3, 125e-3, 250e-3, 500e-3, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, "Trigger"]
def buffer_setup(self, CH1="R", CH2="Theta", length=None, sample_rate=512):
"""
Prepare the device for a buffer (curve) measurement
:param CH1: Source of channel 1: X, R, X Noise, Aux In 1, Aux In 2
:param CH2: Source of channel 2: Y, Theta, Y Noise, Aux In 3, Aux In 4
:param length: Number of values to store in the buffer, or None to use the maximum number of values
:param sample_rate: Sample rate in Hz or "Trigger", see SR830 Manual
"""
# stop data storage and clear buffer
self.run(f"PAUS")
self.run(f"REST")
# set displays, whatever is shown on the display is stored in the buffers
try:
i_CH1 = SR830.CH1.index(CH1)
except KeyError:
raise KeyError(f"CH1='{CH1}' but must be one of {SR830.CH1}")
try:
i_CH2 = SR830.CH2.index(CH2)
except KeyError:
raise KeyError(f"CH2='{CH2}' but must be one of {SR830.CH2}")
self.run(f"DDEF 1,{i_CH1},0")
self.run(f"DDEF 2,{i_CH2},0")
try:
srat = SR830.SRAT.index(sample_rate)
except:
raise KeyError(f"SRAT={sample_rate} but must be one of {SR830.SRAT}")
self.run(f"SRAT {srat}")
# length
if length is None:
self._buffer_length = SR830.max_length
else:
self._buffer_length = length
if self._buffer_length > SR830.max_length:
raise ValueError(f"Maximum buffer length is {SR830.max_length} but {length} was given.")
max_length = 16383
def buffer_start_fill(self):
if self._buffer_length is None: raise RuntimeError(f"Buffer not set up, call buffer_setup() first.")
self.run("SEND 0") # shot mode
self.run("STRT") # start take data
def buffer_is_done(self) -> bool:
return self.buffer_get_n_points() >= self._buffer_length
def buffer_get_n_points(self) -> int:
return int(self.query("SPTS?"))
def buffer_get_data(self, CH1=True, CH2=True):
"""
Get the data from the buffer.
:return: np.ndarray
Returns a numpy of shape (<1 if one channel, 2 if both channels>, n_points)
"""
if self._buffer_length is None: raise RuntimeError(f"Buffer not set up, call buffer_setup() first.")
self.run("PAUS")
take_n_points = min(self.buffer_get_n_points(), self._buffer_length) # there might be more points stored then was required
if CH1 and CH2:
data = np.empty((2, take_n_points), dtype=float)
elif CH1 or CH2:
data = np.empty((1, take_n_points), dtype=float)
else:
raise ValueError("Either CH1 or CH2 must be set True.")
if CH1:
data[ 0, :] = self._buffer_get_data(1, 0, take_n_points)[:]
if CH2:
data[-1, :] = self._buffer_get_data(2, 0, take_n_points)[:]
return data
def _buffer_get_data_slow(self, CH=1, start=0, n_points=None):
if n_points is None: n_points = SR830.max_length
self.instr.write(f"TRCB? {CH},{start},{n_points}")
# reading all ot once doesnt work because visa times out, even with increased timeout
# data = self.instr.read_binary_values(datatype="f", header_fmt="empty", data_points=n_points, container=np.ndarray)
# reading one at a time works but is slow
data = np.empty(n_points, dtype=float)
for i in range(n_points):
# read 4 bytes and "cast" them to a float
data[i] = struct.unpack("=f", self.instr.read_bytes(4))[0]
return data
def _buffer_get_data(self, CH=1, start=0, n_points=None):
if n_points is None: n_points = SR830.max_length
self.instr.write(f"TRCB? {CH},{start},{n_points}")
data = np.empty(n_points, dtype=float)
CHUNK_SIZE = 256
chunks = n_points // CHUNK_SIZE
remainder = n_points % CHUNK_SIZE
for i in range(chunks):
# read 4 bytes and "cast" them to a float
data[i*CHUNK_SIZE:(i+1)*CHUNK_SIZE] = np.frombuffer(self.instr.read_bytes(CHUNK_SIZE*4), dtype=np.float32)
# struct.unpack("=f", self.instr.read_bytes(4))[0]
if remainder > 0:
data[chunks * CHUNK_SIZE:] = np.frombuffer(self.instr.read_bytes(remainder * 4), dtype=np.float32)
return data
def auto_gain(self):
self.instr.write("AGAN")
# wait until the device responds again, meaning it is no longer busy
self.instr.write("SENS?")
i = 0
while i < 50:
try:
gain = self.instr.read().strip("\n")
return gain
except:
i += 1
raise RuntimeError("Could not get response from device after sending auto gain command")
@staticmethod
def enumerate_devices(query="(GPIB)|(USB)?*:INSTR"):
return enumerate_devices("SR830", query)
@staticmethod
def connect_device(name):
rm = pyvisa.ResourceManager()
instr = rm.open_resource(name)
return SR830(instr)
def __str__(self):
return "SR830"

View File

@ -35,8 +35,9 @@ class TestVoltageMeasurementDevice(MeasurementDevice):
v = self.amplitude * np.sin(2 * np.pi * t / self.frequency)
return t, v
def measure(self, interval: int, update_func: Callable[None, [int, float, float]] | None = None,
max_measurements: int | None = None):
# def measure(self, interval: int, update_func: Callable[None, [int, float, float]] | None = None,
# max_measurements: int | None = None):
def measureTODO():
"""
Take voltage readings after <interval> milliseconds.

View File

@ -1,86 +0,0 @@
import pyvisa
from time import sleep
# import pkg_resources
import os
from typing import Callable
from ..base import MeasurementDevice
from ...utility.visa import enumerate_devices
import logging
log = logging.getLogger(__name__)
class SR830(MeasurementDevice):
"""
Wrapper class for the Keithley2700 SMU controlled via pyvisa
"""
def __init__(self, instr, check_front_switch=True):
self.instr = instr
init_script = """
' set input A
ISRC 0
' set float
IGND 0
' set AC coupling
ICPL 0
' Enable 1x and 2x Line notch filters
ILIN 3
' Enable synchronous filter < 200Hz
SYNC 1
' Show R and Phase on the displays
DDEF 1 1 0
DDEF 2 1 0
' Set sample rate to 256 Hz
12
' Set buffer loop mode
SEND 1
"""
self.run(init_script)
def __del__(self):
"""Properly close the instrument connection"""
self.instr.close()
# RUN COMMANDS ON THE DEVICE
def run(self, code, verbose=False):
"""
Run SCPI code on the device by writing it.
Empty lines, leading whitespaces and lines starting with ' or # are ignored.
Parameters
----------
code : str
SCPI commands
"""
script = ''
for line in code.strip(" ").split("\n"):
l = line.strip(" ")
if len(l) == 0 or l[0] in "#'": continue
script += l + "\n"
if verbose:
print(f"Running code:\n{script}")
try:
ret = self.instr.write(script)
if ret != 8:
raise RuntimeError(f"Error while writing command(s):\n'{script}'\n\nDevice returned code {ret}")
except pyvisa.VisaIOError as e:
raise RuntimeError(f"VisaIOError raised while writing command(s):\n'{script}'\n\nVisaIOError:\n{e}")
def get_frequency(self) -> float:
return float(self.query("FREQ?"))
def query(self, query):
return self.instr.query(query).strip("\n")
@staticmethod
def enumerate_devices(query="(GPIB)|(USB)?*:INSTR"):
return enumerate_devices("SR830", query)
@staticmethod
def connect_device(name):
rm = pyvisa.ResourceManager()
instr = rm.open_resource(name)
return SR830(instr)
def __str__(self):
return "SR830"

359
pr_interactive.py Normal file
View File

@ -0,0 +1,359 @@
"""
run this before using this library:
ipython -i cpdctrl_interactive.py
always records iv-t curves
i-data -> smua.nvbuffer1
v-data -> smua.nvbuffer2
"""
version = "0.1"
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime as dtime
from sys import exit
from time import sleep
from os import path, makedirs
import pickle as pkl
import json
import atexit
import threading as mt
import multiprocessing as mp
# from multiprocessing.managers import BaseManager
import argparse
if __name__ == "__main__":
import sys
if __package__ is None:
# make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change
__package__ = "cpdctrl"
from os import path
filepath = path.realpath(path.abspath(__file__))
sys.path.insert(0, path.dirname(path.dirname(filepath)))
from . import led_control_device
from . import voltage_measurement_device
from .voltage_measurement_device.base import VoltageMeasurementDevice
from .voltage_measurement_device.impl import keithley2700 as _volt
from .led_control_device.base import LedControlDevice
from .led_control_device.impl import thorlabs_ledd1b as _led
from .led_script import LedScript
from .measurement import measure as _measure
from .utility.data import DataCollector
from .utility.data import plot_cpd_data as data_plot
from .utility.config_file import ConfigFile
from .utility import file_io
from .utility.device_select import select_device_interactive
from .update_funcs import _Monitor, _update_print
import logging
log = logging.getLogger(__name__)
# CONFIGURATION
_runtime_vars = {
"last-measurement": ""
}
# defaults, these may be overridden by a config file
settings = {
"datadir": path.expanduser("~/data"),
"name": "interactive-test",
"led": "unknown",
"interval": 0.5,
"flush_after": 3000,
"use_buffer": False,
}
cfilename: str = "cpdctrl.yaml"
config_path: str = ""
config_file: ConfigFile = ConfigFile("")
test = False
# DEVICES
# global variable for the instrument/client returned by pyvisa/bleak
dev: VoltageMeasurementDevice|None = None
led: LedControlDevice|None = None
data_collector = DataCollector(data_path=settings["datadir"], data_name="interactive", dirname="interactive_test", add_number_if_dir_exists=True)
t0 = 0
data = None
md = None
def monitor(script: str|int=0, interval: float|None=None, metadata:dict={}, flush_after: int|None=None, use_buffer: bool|None=None, max_measurements=None, stop_on_script_end: bool=False, max_points_shown=None):
"""
Monitor the voltage with matplotlib.
- Opens a matplotlib window and takes measurements depending on settings["interval"]
- Waits for the user to press a key
If use_buffer=False, uses python's time.sleep() for waiting the interval, which is not very precise.
With use_buffer=True, the timing of the voltage data readings will be very precise, however,
the led updates may deviate up to <interval>.
The data is automatically saved to "<date>_<time>_<name>" in the data directory.
Parameters
----------
script : str|int
Path to a led script file, or a constant value between 0 and 100 for the LED.
interval : float|None
Time between measurements.
If None, the value is taken from the settings.
metadata : dict
Metadata to append to the data header.
The set interval as well as the setting for 'name' and 'led' are automatically added.
flush_after : int|None
Flush the data to disk after <flush_after> readings
If None, the value is taken from the settings.
use_buffer : bool
If True, use the voltage measurement device's internal buffer for readings, which leads to more accurate timings.
If None, the value is taken from the settings.
max_points_shown : int|None
how many points should be shown at once. None means infinite
max_measurements : int|None
maximum number of measurements. None means infinite
stop_on_script_end : bool, optional
Stop measurement when the script end is reached
"""
global _runtime_vars, data_collector, dev, led
global data, md
_runtime_vars["last_measurement"] = dtime.now().isoformat()
if interval is None: interval = settings["interval"]
if flush_after is None: flush_after = settings["flush_after"]
if use_buffer is None: use_buffer = settings["use_buffer"]
# set metadata
metadata["interval"] = str(interval)
metadata["name"] = settings["name"]
metadata["led"] = settings["led"]
metadata["led_script"] = str(script)
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'data.save_csv()' afterwards.")
plt.ion()
plt_monitor = _Monitor(use_print=False, max_points_shown=max_points_shown)
led_script = LedScript(script=script, auto_update=True, verbose=True)
data_collector = DataCollector(metadata=metadata, data_path=settings["datadir"], data_name=settings["name"])
# data_collector.clear()
data_queue = mp.Queue()
command_queue = mp.Queue()
# Argument order must match the definition
proc_measure = mt.Thread(target=_measure, args=(dev,
led,
led_script,
data_collector,
interval,
flush_after,
use_buffer,
max_measurements,
stop_on_script_end,
False, # verbose
command_queue,
data_queue
))
proc_measure.start()
try:
while proc_measure.is_alive():
while not data_queue.empty():
# print(data_queue.qsize(), "\n\n")
current_data = data_queue.get(block=False)
i, tval, vval, led_val = current_data
plt_monitor.update(i, tval, vval, led_val)
except KeyboardInterrupt:
pass
command_queue.put("stop")
proc_measure.join()
print("Measurement stopped" + " "*50)
led_script.stop_updating() # stop watching for file updates (if enabled)
data_collector.save_csv(verbose=True)
data, metadata = data_collector.get_data()
fig = data_plot(data, CPD=True, LED=True)
plt.ioff()
fig_path = path.join(data_collector.path, data_collector.dirname + ".pdf")
fig.savefig(fig_path)
# DATA
def data_load(dirname:str) -> tuple[np.ndarray, dict]:
"""
Load data in directory <dirname> in the data directory as numpy array.
Sets the `data` and `md` variables
Parameters
----------
dirname : str
Absolute path to the directory containing the measurement or directory name in the data directory.
"""
global data, md
if path.isabs(dirname):
dirpath = dirname
else:
dirpath = path.join(settings["datadir"], dirname)
data, md = DataCollector.load_data_from_dir(dirpath, verbose=True)
# SETTINGS
def set(setting, value):
global settings, config_path
if setting in settings:
if type(value) != type(settings[setting]):
print(f"set: setting '{setting}' currently holds a value of type '{type(settings[setting])}'")
return
settings[setting] = value
config_file.set(setting, value)
def name(s:str):
global settings
settings["name"] = s
def save_settings():
global settings
config_file.set_values(settings)
config_file.save()
def load_settings():
global settings, config_path
settings = config_file.get_values()
settings["datadir"] = path.expanduser(settings["datadir"]) # replace ~
def help(topic=None):
if topic == None:
print("""
Functions:
monitor - take measurements with live monitoring in a matplotlib window
data_load - load data from a directory
data_plot - plot a data array
Run 'help(function)' to see more information on a function
Available topics:
imports
settings
Run 'help("topic")' to see more information on a topic""")
elif topic in [settings, "settings"]:
print(f"""Settings:
name: str - name of the measurement, determines filename
led: str - name/model of the LED that is being used
datadir: str - output directory for the csv files
interval: int - interval (inverse frequency) of the measurements, in seconds
beep: bool - whether the device should beep or not
Functions:
name("<name>") - short for set("name", "<name>")
set("setting", value) - set a setting to a value
save_settings() - store the settings as "cpdctrl.json" in the working directory
load_settings() - load settings from a file
Upon startup, settings are loaded from the config file.
The global variable 'config_path' determines the path used by save/load_settings. Use -c '<path>' to set another path.
The search path is:
<working-dir>/{cfilename}
$XDG_CONFIG_HOME/{cfilename}
~/.config/cpdctrl/{cfilename}
The current file path is:
{config_path}
""")
elif topic == "imports":
print("""Imports:
numpy as np
pandas as pd
matplotlib.pyplot as plt
os.path """)
else:
print(topic.__doc__.strip(" ").strip("\n"))
def init():
global dev, led, settings, config_path, config_file
print(r""" .___ __ .__
____ ______ __| _/_____/ |________| |
_/ ___\\____ \ / __ |/ ___\ __\_ __ \ |
\ \___| |_> > /_/ \ \___| | | | \/ |__
\___ > __/\____ |\___ >__| |__| |____/
\/|__| \/ \/ """ + f"""{version}
Interactive Shell for CPD measurements with Keithley 2700B
---
Enter 'help()' for a list of commands""")
parser = argparse.ArgumentParser(
prog="cpdctrl",
description="measure voltage using a Keithley SMU",
)
backend_group = parser.add_mutually_exclusive_group(required=False)
parser.add_argument("-c", "--config", action="store", help="alternate path to config file")
args = vars(parser.parse_args())
from os import environ
# Load config file
if path.isfile(cfilename):
config_path = cfilename
elif 'XDG_CONFIG_HOME' in environ.keys():
# and path.isfile(environ["XDG_CONFIG_HOME"] + "/cpdctrl.json"):
config_path = path.join(environ["XDG_CONFIG_HOME"], "cpdctrl", cfilename)
else:
config_path = path.join(path.expanduser("~/.config/cpdctrl"), cfilename)
if args["config"]:
config_path = args["config"]
config_file = ConfigFile(config_path, init_values=settings)
load_settings()
# setup logging
log_path = path.expanduser(config_file.get_or("path_log", "~/.cache/cpdctrl-interactive.log"))
makedirs(path.dirname(log_path), exist_ok=True)
logging.basicConfig(
level=logging.WARN,
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
handlers=[
logging.FileHandler(log_path),
logging.StreamHandler()
]
)
if not path.isdir(settings["datadir"]):
makedirs(settings["datadir"])
# init the devices
last_vm_type = config_file.get_or("last_dev_vm_type", None)
last_vm_name = config_file.get_or("last_dev_vm_name", None)
if last_vm_name and last_vm_type:
try:
dev = voltage_measurement_device.connect_device(last_vm_type, last_vm_name)
except:
log.error(f"Failed to connect to last used device {last_vm_type}::{last_vm_name}")
while dev is None:
devs = voltage_measurement_device.list_devices()
print("-" * 50)
vm_dev_type, vm_dev_name = select_device_interactive(devs, "Select voltage measurement device: ")
try:
dev = voltage_measurement_device.connect_device(vm_dev_type, vm_dev_name)
except:
log.error(f"Failed to connect to device {vm_dev_type}::{vm_dev_name}")
config_file.set("last_dev_vm_type", vm_dev_type)
config_file.set("last_dev_vm_name", vm_dev_name)
# init the devices
last_led_type = config_file.get_or("last_dev_led_type", None)
last_led_name = config_file.get_or("last_dev_led_name", None)
if last_led_name and last_led_type:
try:
led = led_control_device.connect_device(last_led_type, last_led_name)
except:
log.error(f"Failed to connect to last used device {last_led_type}::{last_led_name}")
while led is None:
devs = led_control_device.list_devices()
print("-" * 50)
led_dev_type, led_dev_name = select_device_interactive(devs, "Select LED control device: ")
try:
led = led_control_device.connect_device(led_dev_type, led_dev_name)
except:
log.error(f"Failed to connect to device {led_dev_type}::{led_dev_name}")
config_file.set("last_dev_led_type", led_dev_type)
config_file.set("last_dev_led_name", led_dev_name)
# atexit.register(_backend.exit, dev)
if __name__ == "__main__":
init()

11
system.cfg Normal file
View File

@ -0,0 +1,11 @@
#
USB imac pid $1705, vid $04D8 , drives 4
#
SAM source drive 11
MVSS_MK2 slit1 drive 3
MVSS_MK2 slit2 drive 4
FW252 fwheel drive 2
#
TM300 mono use fwheel, use slit1, use slit2, use source
#
#

145
systemAll-1nm.atr Normal file
View File

@ -0,0 +1,145 @@
#------------------------------------------------------------------------------
# Bentham Instruments system model attributes file
# Created 11/11/2020 16:00:38
# This file should NOT be edited by hand
#------------------------------------------------------------------------------
mono
cos alpha = 9.72700000000000E-0001
ZordSwitch SAM for turret,1 = -1
end
monoTurret 1
grating,1 = 2400
minimum wavelength,1 = 200
maximum wavelength,1 = 320
sine correction,1 = 0
zord,1,1 = 36764
alpha,1,1 = 1.00190000000000E+0000
phase,1,1 = 0.00000000000000E+0000
amplitude,1,1 = 0.00000000000000E+0000
zord,1,2 = 36764
alpha,1,2 = 1.00190000000000E+0000
phase,1,2 = 0.00000000000000E+0000
amplitude,1,2 = 0.00000000000000E+0000
grating,2 = 1200
minimum wavelength,2 = 250
maximum wavelength,2 = 1101
sine correction,2 = 0
zord,2,1 = 892414
alpha,2,1 = 1.00180000000000E+0000
phase,2,1 = 0.00000000000000E+0000
amplitude,2,1 = 0.00000000000000E+0000
zord,2,2 = 892414
alpha,2,2 = 1.00180000000000E+0000
phase,2,2 = 0.00000000000000E+0000
amplitude,2,2 = 0.00000000000000E+0000
grating,3 = 2400
minimum wavelength,3 = 100
maximum wavelength,3 = 150
sine correction,3 = 0
zord,3,1 = 1745300
alpha,3,1 = 1.00190000000000E+0000
phase,3,1 = 0.00000000000000E+0000
amplitude,3,1 = 0.00000000000000E+0000
zord,3,2 = 1745300
alpha,3,2 = 1.00190000000000E+0000
phase,3,2 = 0.00000000000000E+0000
amplitude,3,2 = 0.00000000000000E+0000
high current threshold = 0
move speed = 8
move current = 1
idle current = 8
microstepping = 128
settle time = 100
end
fwheel
settle time = 2000
filter,1 = 0.00000000000000E+0000
filter,2 = 2.50000000000000E+0002
filter,3 = 3.60000000000000E+0002
filter,4 = 6.30000000000000E+0002
filter,5 = 0.00000000000000E+0000
filter,6 = 0.00000000000000E+0000
move at each wavelength = 1
filter park offset = 0
end
source
settle time = 2000
settle time,1 = 500
initial state = 1
switch wavelength,1 = 0.00000000000000E+0000
state,1 = 1
switch wavelength,2 = 8.00000000000000E+0002
state,2 = 0
deflect = Deflect
no deflect = No Deflection
move at each wavelength = 1
end
slit1
( nm = 1 ) ( mm = 0 )
Current Slit mode = 1
Constant bandwidth (nm) = 1.00000000000000E+0000
Constant width (mm) = 3.0000000000000E+0000
Slit Type =
Minimum slit width = 0.00000000000000E+0000
Backlash mm = 1.00000000000000E+0000
Slit offset (mm) = 0.00000000000000E+0000
Use Bandwidth Fit = 0
Bandwidth Granularity = 1.00000000000000E-0003
BW Slit Width Fit,111 = 0.00000000000000E+0000
BW Slit Width Fit,112 = 1.00000000000000E+0000
BW Slit Width Fit,121 = 0.00000000000000E+0000
BW Slit Width Fit,122 = 1.00000000000000E+0000
BW Slit Width Fit,131 = 0.00000000000000E+0000
BW Slit Width Fit,132 = 1.00000000000000E+0000
BW Slit Width Fit,211 = 0.00000000000000E+0000
BW Slit Width Fit,212 = 1.00000000000000E+0000
BW Slit Width Fit,221 = 0.00000000000000E+0000
BW Slit Width Fit,222 = 1.00000000000000E+0000
BW Slit Width Fit,231 = 0.00000000000000E+0000
BW Slit Width Fit,232 = 1.00000000000000E+0000
Slit park offset (steps) = 1340
(positive park offset: more closed)
end
slit2
( nm = 1 ) ( mm = 0 )
Current Slit mode = 1
Constant bandwidth (nm) = 1.00000E+0000
Constant width (mm) = 8.0000000000000E+0000
Slit Type =
Minimum slit width = 0.00000000000000E+0000
Backlash mm = 1.00000000000000E+0000
Slit offset (mm) = 0.00000000000000E+0000
Use Bandwidth Fit = 0
Bandwidth Granularity = 1.00000000000000E-0003
BW Slit Width Fit,111 = 0.00000000000000E+0000
BW Slit Width Fit,112 = 1.00000000000000E+0000
BW Slit Width Fit,121 = 0.00000000000000E+0000
BW Slit Width Fit,122 = 1.00000000000000E+0000
BW Slit Width Fit,131 = 0.00000000000000E+0000
BW Slit Width Fit,132 = 1.00000000000000E+0000
BW Slit Width Fit,211 = 0.00000000000000E+0000
BW Slit Width Fit,212 = 1.00000000000000E+0000
BW Slit Width Fit,221 = 0.00000000000000E+0000
BW Slit Width Fit,222 = 1.00000000000000E+0000
BW Slit Width Fit,231 = 0.00000000000000E+0000
BW Slit Width Fit,232 = 1.00000000000000E+0000
Slit park offset (steps) = 1400
(positive park offset: more closed)
end
~System
dispersion multiplier = 0.000
stop count = 1.000
dark I integration time = 5.000
dark I delay = 5.000
225 277 input = 1
allow negative = -1
virtual instruments dll =
group = ,,,,,,,mono,
end

145
systemAll-new.atr Normal file
View File

@ -0,0 +1,145 @@
#------------------------------------------------------------------------------
# Bentham Instruments system model attributes file
# Created 11/11/2020 16:00:38
# This file should NOT be edited by hand
#------------------------------------------------------------------------------
mono
cos alpha = 9.72700000000000E-0001
ZordSwitch SAM for turret,1 = -1
end
monoTurret 1
grating,1 = 2400
minimum wavelength,1 = 200
maximum wavelength,1 = 320
sine correction,1 = 0
zord,1,1 = 36764
alpha,1,1 = 1.00190000000000E+0000
phase,1,1 = 0.00000000000000E+0000
amplitude,1,1 = 0.00000000000000E+0000
zord,1,2 = 36764
alpha,1,2 = 1.00190000000000E+0000
phase,1,2 = 0.00000000000000E+0000
amplitude,1,2 = 0.00000000000000E+0000
grating,2 = 1200
minimum wavelength,2 = 250
maximum wavelength,2 = 1101
sine correction,2 = 0
zord,2,1 = 892414
alpha,2,1 = 1.00180000000000E+0000
phase,2,1 = 0.00000000000000E+0000
amplitude,2,1 = 0.00000000000000E+0000
zord,2,2 = 892414
alpha,2,2 = 1.00180000000000E+0000
phase,2,2 = 0.00000000000000E+0000
amplitude,2,2 = 0.00000000000000E+0000
grating,3 = 2400
minimum wavelength,3 = 100
maximum wavelength,3 = 150
sine correction,3 = 0
zord,3,1 = 1745300
alpha,3,1 = 1.00190000000000E+0000
phase,3,1 = 0.00000000000000E+0000
amplitude,3,1 = 0.00000000000000E+0000
zord,3,2 = 1745300
alpha,3,2 = 1.00190000000000E+0000
phase,3,2 = 0.00000000000000E+0000
amplitude,3,2 = 0.00000000000000E+0000
high current threshold = 0
move speed = 8
move current = 1
idle current = 8
microstepping = 128
settle time = 100
end
fwheel
settle time = 2000
filter,1 = 0.00000000000000E+0000
filter,2 = 2.50000000000000E+0002
filter,3 = 3.60000000000000E+0002
filter,4 = 6.30000000000000E+0002
filter,5 = 0.00000000000000E+0000
filter,6 = 0.00000000000000E+0000
move at each wavelength = 1
filter park offset = 0
end
source
settle time = 2000
settle time,1 = 500
initial state = 1
switch wavelength,1 = 0.00000000000000E+0000
state,1 = 1
switch wavelength,2 = 8.00000000000000E+0002
state,2 = 0
deflect = Deflect
no deflect = No Deflection
move at each wavelength = 1
end
slit1
( nm = 1 ) ( mm = 0 )
Current Slit mode = 0
Constant bandwidth (nm) = 2.00000000000000E+0000
Constant width (mm) = 3.0000000000000E+0000
Slit Type =
Minimum slit width = 0.00000000000000E+0000
Backlash mm = 1.00000000000000E+0000
Slit offset (mm) = 0.00000000000000E+0000
Use Bandwidth Fit = 0
Bandwidth Granularity = 1.00000000000000E-0003
BW Slit Width Fit,111 = 0.00000000000000E+0000
BW Slit Width Fit,112 = 1.00000000000000E+0000
BW Slit Width Fit,121 = 0.00000000000000E+0000
BW Slit Width Fit,122 = 1.00000000000000E+0000
BW Slit Width Fit,131 = 0.00000000000000E+0000
BW Slit Width Fit,132 = 1.00000000000000E+0000
BW Slit Width Fit,211 = 0.00000000000000E+0000
BW Slit Width Fit,212 = 1.00000000000000E+0000
BW Slit Width Fit,221 = 0.00000000000000E+0000
BW Slit Width Fit,222 = 1.00000000000000E+0000
BW Slit Width Fit,231 = 0.00000000000000E+0000
BW Slit Width Fit,232 = 1.00000000000000E+0000
Slit park offset (steps) = 1340
(positive park offset: more closed)
end
slit2
( nm = 1 ) ( mm = 0 )
Current Slit mode = 0
Constant bandwidth (nm) = 4.00000E+0000
Constant width (mm) = 8.0000000000000E+0000
Slit Type =
Minimum slit width = 0.00000000000000E+0000
Backlash mm = 1.00000000000000E+0000
Slit offset (mm) = 0.00000000000000E+0000
Use Bandwidth Fit = 0
Bandwidth Granularity = 1.00000000000000E-0003
BW Slit Width Fit,111 = 0.00000000000000E+0000
BW Slit Width Fit,112 = 1.00000000000000E+0000
BW Slit Width Fit,121 = 0.00000000000000E+0000
BW Slit Width Fit,122 = 1.00000000000000E+0000
BW Slit Width Fit,131 = 0.00000000000000E+0000
BW Slit Width Fit,132 = 1.00000000000000E+0000
BW Slit Width Fit,211 = 0.00000000000000E+0000
BW Slit Width Fit,212 = 1.00000000000000E+0000
BW Slit Width Fit,221 = 0.00000000000000E+0000
BW Slit Width Fit,222 = 1.00000000000000E+0000
BW Slit Width Fit,231 = 0.00000000000000E+0000
BW Slit Width Fit,232 = 1.00000000000000E+0000
Slit park offset (steps) = 1400
(positive park offset: more closed)
end
~System
dispersion multiplier = 0.000
stop count = 1.000
dark I integration time = 5.000
dark I delay = 5.000
225 277 input = 1
allow negative = -1
virtual instruments dll =
group = ,,,,,,,mono,
end

195
test.py Normal file
View File

@ -0,0 +1,195 @@
from zipfile import stringFileHeader
if __name__ == "__main__":
import sys
if __package__ is None:
# make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change
__package__ = "photoreflectance"
from os import path
filepath = path.realpath(path.abspath(__file__))
sys.path.insert(0, 'C:\\Users\Administrator\Desktop\Software\Python\Python\github')
from time import sleep
import numpy as np
import scipy as scp
from Bentham import Bentham
from devices.Xenon import Xenon
from devices.Shutter import ShutterProbe
from .update_funcs import Monitor
from .measurement_device.impl.sr830 import SR830
from .measurement_device.impl.model7260 import Model7260
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
handlers=[
# logging.FileHandler(log_path),
logging.StreamHandler()
]
)
def _measure_both(monochromator: Bentham, lockin: SR830, shutter: ShutterProbe, wl_range=(400, 750, 25), AC=True, DC=True, monitor=None):
mon = monitor if monitor is not None else Monitor(r"$\lambda$ [nm]", [
# dict(ax=0, ylabel="Wavelength [nm]", color="red"),
# dict(ax=1, ylabel="Ref", color="blue", lim=(0, 5)),
dict(ax=0, ylabel=r"$\Delta R$", color="green"),
dict(ax=1, ylabel=r"$\sigma_{\Delta R}$", color="green"),
dict(ax=2, ylabel=r"$R$", color="blue"),
dict(ax=3, ylabel=r"$\sigma_R$", color="blue"),
dict(ax=4, ylabel=r"$\Delta R/R$", color="red"),
# dict(ax=3, ylabel="R", color="blue"),
dict(ax=5, ylabel="Phase", color="orange", lim=(-180, 180))
])
N_bins = 512
shutter.open_()
data_raw = []
data_wl = {}
sample_rate = 512
timeout_s = 60
timeout_interval = 0.5
lockin.run("SENS 17") # 1 mV/nA
# lockin.run("OFLT 5") # 3 ms
# since we dont expect changes in our signal, we can use large time constants and aggresive filter slope
# for better signal to noise
lockin.run("OFLT 8") # 100 ms
lockin.run("OFSL 3") # 24dB/Oct ms
if AC:
input("Plug the detector into lock-in port 'A/I' (front panel) and press enter > ")
input("Make sure the laser is turned on and press enter > ")
for i_wl, wl in enumerate(range(*wl_range)):
lockin.buffer_setup(CH1="R", CH2="Theta", length=N_bins, sample_rate=sample_rate)
monochromator.drive(wl)
sleep(1.5) # need to wait until lock-in R signal is stable
lockin.buffer_start_fill()
t = timeout_s
while t > 0:
t -= timeout_interval
sleep(timeout_interval)
if lockin.buffer_is_done():
break
if t < 0: raise RuntimeError("Timed out waiting for buffer measurement to finish")
arr = lockin.buffer_get_data(CH1=True, CH2=True)
data_raw.append([wl, arr])
arr[1] += 180
# calculate means, for theta use circular mean
dR = np.mean(arr[0,:])
sdR = np.std(arr[0,:])
theta = scp.stats.circmean(arr[1,:], low=-180, high=180)
stheta = scp.stats.circstd(arr[1,:], low=-180, high=180)
data_wl[wl] = {"dR": dR, "Theta": theta, "sdR": sdR, "sTheta": stheta}
# wl - dR, sdR, R, sR, dR/R, Theta
mon.update(wl, dR, sdR, None, None, None, stheta)
if DC:
input("Turn off the laser and press enter > ")
input("Plug the detector into lock-in port 'Aux In 1' (rear panel) and press enter > ")
for i_wl, wl in enumerate(range(*wl_range)):
lockin.buffer_setup(CH1="Aux In 1", CH2="Theta", length=N_bins, sample_rate=sample_rate)
monochromator.drive(wl)
sleep(0.5)
lockin.buffer_start_fill()
t = timeout_s
while t > 0:
t -= timeout_interval
sleep(timeout_interval)
if lockin.buffer_is_done():
break
if t < 0: raise RuntimeError("Timed out waiting for buffer measurement to finish")
arr = lockin.buffer_get_data(CH1=True, CH2=False)
if AC:
data_raw[i_wl].append(arr)
else:
data_raw.append([wl, arr])
means = np.mean(arr, axis=1)
errs = np.std(arr, axis=1)
if not wl in data_wl: data_wl[wl] = {}
data_wl[wl] |= {"R": means[0], "sR": errs[0]}
# wl - dR, sdR, R, sR, dR/R, Theta
if AC:
dR_R = data_wl[wl]["dR"] / data_wl[wl]["R"]
mon.override(wl, None, None, data_wl[wl]["R"], data_wl[wl]["sR"], dR_R, None)
else:
mon.update(wl, None, None, data_wl[wl]["R"], data_wl[wl]["sR"], None, None)
return data_wl, data_raw, mon
def _measure(monochromator: Bentham, lamp: Xenon, lockin: SR830, shutter: ShutterProbe, wl_range=(400, 750, 25)):
data = []
mon = Monitor(r"$\lambda$ [nm]", [
# dict(ax=0, ylabel="Wavelength [nm]", color="red"),
# dict(ax=1, ylabel="Ref", color="blue", lim=(0, 5)),
dict(ax=0, ylabel=r"$\Delta R$", color="green"),
dict(ax=1, ylabel=r"$\sigma_{\Delta R}$", color="green"),
# dict(ax=3, ylabel="R", color="blue"),
dict(ax=2, ylabel="Phase", color="orange", lim=(-180, 180))
])
N_bins = 100
dt = 0.01
i = 0
shutter.open_()
if isinstance(lockin, SR830):
lockin.run("SENS 17") # 1 mV/nA
lockin.run("OFLT 5")
for wl in range(*wl_range):
arr = np.empty((N_bins, 2))
monochromator.drive(wl)
# lockin.auto_gain()
for j in range(N_bins):
dR, theta = lockin.snap(what="3,4")
arr[j,:] = (dR, theta)
i += 1
# sleep(dt)
means = np.mean(arr, axis=0)
errs = np.std(arr, axis=0)
mon.update(wl, means[0], errs[0], means[1])
data.append((wl, arr))
elif isinstance(lockin, Model7260):
# lockin.run("SEN 21") # 10 mV/nA
lockin.run("SEN 24") # 100 mV/nA
for wl in range(*wl_range):
monochromator.drive(wl)
lockin.buffer_setup(MAG=1, PHASE=1, ADC1=1, ADC2=1, interval_ms=5, length=1000)
lockin.buffer_start_fill()
timeout_s = 60
timeout_interval = 0.5
while timeout_s > 0:
timeout_s -= timeout_interval
sleep(timeout_interval)
if lockin.buffer_is_done():
break
arr = lockin.buffer_get_data()
length = arr.shape[0]
# for j in range(length):
# # wl, ref, dR, R, theta
# mon.update(i*length+j, wl, arr[j][3], arr[j][0], arr[j][2], arr[j][1])
mon.update_array(range(i * length, (i+1)*length), [wl for _ in range(length)], arr[:,3], arr[:,0], arr[:,2], arr[:,1])
data.append((wl, arr))
i += 1
shutter.close_()
return data, mon
lockin = None
lamp = None
mcm = None
shutter = None
def measure(wl_range=(400, 500, 2)):
return _measure(mcm, lamp, lockin, shutter, wl_range=wl_range)
def measure_both(**kwargs):
return _measure_both(mcm, lockin, shutter, **kwargs)
if __name__ == "__main__":
mcm = Bentham()
# mcm.park()
lamp = Xenon()
lockin = SR830.connect_device(SR830.enumerate_devices()[0])
# lockin = Model7260.connect_device(Model7260.enumerate_devices()[0])
shutter = ShutterProbe()

83
update_funcs.py Normal file
View File

@ -0,0 +1,83 @@
import matplotlib.pyplot as plt
import numpy as np
class Monitor:
"""
Monitor v and i data in a matplotlib window
"""
def __init__(self, xlabel: str, datas: list[dict], max_points_shown=None, use_print=False):
self.max_points_shown = max_points_shown
self.use_print = use_print
self.index = []
self.xdata = []
self.options = [{"ax": 0, "ylabel": None, "lim": None, "color": "orange", "grid": True, "label": None} | datas[i] for i in range(len(datas))]
self.ydatas = [[] for _ in range(len(datas))]
self.lines = []
n_ax = max(self.options, key=lambda v: v["ax"])["ax"] + 1
plt.ion()
self.fig1, self.ax = plt.subplots(n_ax, 1, figsize=(7, 8), sharex=True)
for i in range(len(self.ydatas)):
opt = self.options[i]
ax = self.ax[opt["ax"]]
self.lines.append(ax.plot(self.xdata, self.ydatas[i], color=opt["color"], label=opt["label"])[0])
if opt["label"]:
ax.legend()
if opt["grid"]:
ax.grid(True)
if opt["lim"]:
ax.set_ylim(opt["lim"][0], opt["lim"][1])
if opt["ylabel"]:
ax.set_ylabel(opt["ylabel"])
def override(self, xval, *yvals):
idx = self.xdata.index(xval)
for i, y in enumerate(yvals):
if y is None: continue
self.ydatas[i][idx] = y
self.lines[i].set_ydata(self.ydatas[i])
self._post_update()
def update(self, xval, *yvals):
n = len(self.xdata)
if self.use_print:
s = f"n = {n}"
for i, y in enumerate(yvals):
s += f", {self.options[i]['label']} = {y}"
print(s + " "*10, end='\r')
self.xdata.append(xval)
for i, y in enumerate(yvals):
if y is None: y = np.nan
self.ydatas[i].append(y)
self.lines[i].set_xdata(self.xdata)
self.lines[i].set_ydata(self.ydatas[i])
self._post_update()
def update_array(self, xdata, *ydatas):
self.xdata.extend(xdata)
for i, y in enumerate(ydatas):
self.ydatas[i].extend(y)
self.lines[i].set_xdata(self.xdata)
self.lines[i].set_ydata(self.ydatas[i])
self._post_update()
def _post_update(self):
n = len(self.xdata)
if self.max_points_shown and n > self.max_points_shown:
# recalculate limits and set them for the view
for ax in self.ax:
self.ax[-1].set_xlim(self.xdata[n - self.max_points_shown], self.xdata[n])
for ax in self.ax:
ax.relim() # required for autoscale_view to work
ax.autoscale_view()
self.ax[-1].autoscale_view()
# update plot
self.fig1.canvas.draw()
self.fig1.canvas.flush_events()
def __del__(self):
plt.close(self.fig1)

0
utility/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -3,13 +3,15 @@ import pyvisa
import logging
log = logging.getLogger(__name__)
def enumerate_devices(device_name, query="(GPIB)|(USB)?*:INSTR", visa_backend=""):
def enumerate_devices(device_name, query="(GPIB)|(USB)?*:INSTR", cmd="*IDN?", visa_backend=""):
"""
Return all available visa resources that match the query and the device name
Parameters
----------
device_name
A part of the name that the device is supposed upon the '*IDN?' query
A part of the name that the device is supposed to return upon the '*IDN?' command
cmd
The command to send to ALL devices matching the query
query
A query to the visa resource manager, to filter the resources
visa_backend
@ -25,7 +27,7 @@ def enumerate_devices(device_name, query="(GPIB)|(USB)?*:INSTR", visa_backend=""
for r in rm.list_resources(query):
try:
instr = rm.open_resource(r)
name = instr.query('*IDN?')
name = instr.query(cmd)
if device_name in name:
res.append(r)
instr.close()