2025-05-06 15:45:38 +02:00

389 lines
13 KiB
Python

import pyvisa
import struct # for converting bytes to float
from time import sleep
# import pkg_resources
import os
from typing import Callable
import numpy as np
from ..base import MeasurementDevice
from ...utility.visa import enumerate_devices
import logging
log = logging.getLogger(__name__)
class SR830(MeasurementDevice):
"""
Wrapper class for the SR830 controlled via pyvisa
"""
def __init__(self, instr, check_front_switch=True):
self._buffer_length = None
self.instr = instr
init_script = """
' INPUT
' set input A
ISRC 0
' set float
IGND 0
' set AC coupling
ICPL 0
' Enable 1x and 2x Line notch filters
ILIN 3
' Enable synchronous filter < 200Hz
SYNC 1
' DATA COLLECTION
' The quantities on the displays are the ones stored in the buffer
' Show R and Phase on the displays
DDEF 1,1,0
DDEF 2,1,0
' Set sample rate to 256 Hz
SRAT 12
' Set buffer loop mode
SEND 1
' REFERENCE
' Set external
FMOD 0
' Set Rising edge
RSLP 1
' Set detection harmonic to 1'
HARM 1
"""
self.run(init_script)
def __del__(self):
"""Properly close the instrument connection"""
self.instr.close()
# RUN COMMANDS ON THE DEVICE
def run(self, code, split=True):
"""
Run SCPI code on the device by writing it.
Empty lines, leading whitespaces and lines starting with ' or # are ignored.
Parameters
----------
code : str
SCPI commands
"""
script = [] if split else ''
for line in code.strip(" ").split("\n"):
l = line.strip(" ")
if len(l) == 0 or l[0] in "#'": continue
if split:
script.append(l)
else:
script += l + "\n"
if split:
for cmd in script:
self.run_and_check(cmd)
else:
self.run_and_check(cmd)
def run_and_check(self, cmd):
"""Run a command and check for errors afterwards by reading the
standard event status byte"""
try:
log.info(f"Running command(s): {cmd}")
ret = self.instr.write(cmd)
status = int(self.query("*ESR?"))
if status & (1 << 4):
raise RuntimeError(f"EXE bit set (Execution error) after command(s):\n{cmd}\n")
elif status & (1 << 5):
raise RuntimeError(f"CMD bit set (Illegal command) after command(s):\n{cmd}\n")
# if ret != 8:
# raise RuntimeError(f"Error while writing command(s):\n'{script}'\n\nDevice returned code {ret}")
except pyvisa.VisaIOError as e:
raise RuntimeError(f"VisaIOError raised while writing command(s):\n'{cmd}'\n\nVisaIOError:\n{e}")
def query(self, query):
return self.instr.query(query, delay=0.05).strip("\n")
def snap(self, what="3,4,5,7"):
"""
Parameters
----------
what : TYPE, optional
1 X
2 Y
3 R
4 θ
5 Aux In 1
6 Aux In 2
7 Aux In 3
8 Aux In 4
9 Reference Frequency
10 CH1 display
11 CH2 display
The default is "3,4,5,7".
Returns
-------
vals : list[float]
Values converted to float
"""
vals = self.query(f"SNAP? {what}").split(",")
vals = map(float, vals)
return vals
def try_recover_from_communication_error(self, original_error):
"""
Try to get into a normal state by flushing the output queue
and reading the instrument status
"""
log.warning(f"Trying to recover from communication error: {original_error}")
# flush whatever is in the queue
try:
self.instr.read_raw()
except pyvisa.VisaIOError as e:
pass
try:
status = int(self.query("*ESR?"))
if status & 0b00110101 > 0: # check if INP, QRY, EXE or CMD is set
raise RuntimeError(f"Failed to recover from exception, device returned status {status:08b}:\n{original_error}")
except Exception as e:
raise RuntimeError(f"Failed to recover from the following exception:\n{original_error}\nThe following exception occurred while querying the device status:\n{e}")
log.info(f"Recovered from error")
def check_overloads(self) -> bool:
status_lia = int(self.query("LIAS?"))
if status_lia & (1 << 0): # input amplifier overload
return True
elif status_lia & (1 << 1): # time constant filter overlaid
return True
elif status_lia & (1 << 2): # output overload
return True
return False
def measureTODO(self): pass
def read_value(self):
"""Read the value of R"""
return float(self.query("OUTP? 3"))
def reset(self):
self.instr.write("*RST")
def test_connection(self):
pass
# PARAMETERS
def get_frequency(self) -> float:
return float(self.query("FREQ?"))
SENS = [
2e-9, 5e-9, 10e-9, 20e-9, 50e-9, 100e-9, 200e-9, 500e-9,
1e-6, 2e-6, 5e-6, 10e-6, 20e-6, 50e-6, 100e-6, 200e-6, 500e-6,
1e-3, 2e-3, 5e-3, 10e-3, 20e-3, 50e-3, 100e-3, 200e-3, 500e-3,
1
]
def set_sensitivity_volt(self, volt):
if volt not in SR830.SENS:
raise ValueError(f"Invalid sensitivity voltage value: {volt}")
sens = SR830.SENS.index(volt)
self.run(f"SENS {sens}")
def get_sensitivity_volt(self):
sens = int(self.query(f"SENS"))
return SR830.SENS[sens]
OFLT =[
10e-6, 30e-6, 100e-6, 300e-6,
1e-3, 3e-3, 10e-3, 30e-3, 100e-3, 300e-3,
1, 3, 10, 30, 100, 300,
1e3, 3e3, 10e3, 30e3,
]
def set_time_constant_s(self, dt):
if dt not in SR830.OFLT:
raise ValueError(f"Invalid time constant value: {dt}. Must be one of {SR830.OFLT}")
oflt = SR830.OFLT.index(dt)
self.run(f"OFLT {oflt}")
def get_time_constant_s(self):
oflt = int(self.query("OFLT?"))
return SR830.OFLT[oflt]
OFSL = [6, 12, 18, 24]
def set_filter_slope(self, slope_db_oct):
if slope_db_oct not in SR830.OFSL:
raise ValueError(f"Invalid filter slope value: {slope_db_oct}. Must be one of {SR830.OFSL}")
ofsl = SR830.OFSL.index(slope_db_oct)
self.run(f"OFSL {ofsl}")
def get_filter_slope(self):
ofsl = int(self.query("OFSL?"))
return SR830.OFSL[ofsl]
def get_wait_time_s(self):
"""
Get the wait time required to reach 99% of the final value.
See Manual 3-21
:return:
"""
time_const = self.get_time_constant_s()
filter_slope = self.get_filter_slope()
if filter_slope == 6: return 5 * time_const
elif filter_slope == 12: return 7 * time_const
elif filter_slope == 18: return 9 * time_const
elif filter_slope == 24: return 10 * time_const
else:
raise ValueError(f"Invalid filter slope value: {filter_slope}")
def set_sync_filter(self, sync):
if sync not in [0, 1]:
raise ValueError(f"Invalid sync value: {sync}, must be 0 (off) or 1 (on)")
self.run(f"SYNC {sync}")
def get_sync_filter(self):
sync = int(self.query("SYNC?"))
return sync
RMOD = ["High Reserve", "Normal", "Low Noise"]
def set_reserve(self, reserve):
if not reserve in SR830.RMOD:
raise ValueError(f"Invalid reserve value: {reserve}. Must be one of {SR830.RMOD}")
rmod = SR830.RMOD.index(reserve)
self.run(f"RMOD {rmod}")
def get_reserve(self):
rmod = int(self.query("RMOD?"))
return SR830.RMOD[rmod]
CH1 = ["X", "R", "X Noise", "Aux In 1", "Aux In 2"]
CH2 = ["Y", "Theta", "Y Noise", "Aux In 3", "Aux In 4"]
SRAT = [62.5e-3, 125e-3, 250e-3, 500e-3, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, "Trigger"]
def buffer_setup(self, CH1="R", CH2="Theta", length=None, sample_rate=512):
"""
Prepare the device for a buffer (curve) measurement
:param CH1: Source of channel 1: X, R, X Noise, Aux In 1, Aux In 2
:param CH2: Source of channel 2: Y, Theta, Y Noise, Aux In 3, Aux In 4
:param length: Number of values to store in the buffer, or None to use the maximum number of values
:param sample_rate: Sample rate in Hz or "Trigger", see SR830 Manual
"""
# stop data storage and clear buffer
self.run(f"PAUS")
self.run(f"REST")
# set displays, whatever is shown on the display is stored in the buffers
try:
i_CH1 = SR830.CH1.index(CH1)
except KeyError:
raise KeyError(f"CH1='{CH1}' but must be one of {SR830.CH1}")
try:
i_CH2 = SR830.CH2.index(CH2)
except KeyError:
raise KeyError(f"CH2='{CH2}' but must be one of {SR830.CH2}")
self.run(f"DDEF 1,{i_CH1},0")
self.run(f"DDEF 2,{i_CH2},0")
try:
srat = SR830.SRAT.index(sample_rate)
except:
raise KeyError(f"SRAT={sample_rate} but must be one of {SR830.SRAT}")
self.run(f"SRAT {srat}")
# length
if length is None:
self._buffer_length = SR830.max_length
else:
self._buffer_length = length
if self._buffer_length > SR830.max_length:
raise ValueError(f"Maximum buffer length is {SR830.max_length} but {length} was given.")
max_length = 16383
def buffer_start_fill(self):
if self._buffer_length is None: raise RuntimeError(f"Buffer not set up, call buffer_setup() first.")
self.run("SEND 0") # shot mode
self.run("STRT") # start take data
def buffer_is_done(self) -> bool:
return self.buffer_get_n_points() >= self._buffer_length
def buffer_get_n_points(self) -> int:
return int(self.query("SPTS?"))
def buffer_get_data(self, CH1=True, CH2=True):
"""
Get the data from the buffer.
:return: np.ndarray
Returns a numpy of shape (<1 if one channel, 2 if both channels>, n_points)
"""
if self._buffer_length is None: raise RuntimeError(f"Buffer not set up, call buffer_setup() first.")
self.run("PAUS")
take_n_points = min(self.buffer_get_n_points(), self._buffer_length) # there might be more points stored then was required
if CH1 and CH2:
data = np.empty((2, take_n_points), dtype=float)
elif CH1 or CH2:
data = np.empty((1, take_n_points), dtype=float)
else:
raise ValueError("Either CH1 or CH2 must be set True.")
if CH1:
data[ 0, :] = self._buffer_get_data(1, 0, take_n_points)[:]
if CH2:
data[-1, :] = self._buffer_get_data(2, 0, take_n_points)[:]
return data
def _buffer_get_data_slow(self, CH=1, start=0, n_points=None):
if n_points is None: n_points = SR830.max_length
self.instr.write(f"TRCB? {CH},{start},{n_points}")
# reading all ot once doesnt work because visa times out, even with increased timeout
# data = self.instr.read_binary_values(datatype="f", header_fmt="empty", data_points=n_points, container=np.ndarray)
# reading one at a time works but is slow
data = np.empty(n_points, dtype=float)
for i in range(n_points):
# read 4 bytes and "cast" them to a float
data[i] = struct.unpack("=f", self.instr.read_bytes(4))[0]
return data
def _buffer_get_data(self, CH=1, start=0, n_points=None):
if n_points is None: n_points = SR830.max_length
self.instr.write(f"TRCB? {CH},{start},{n_points}")
data = np.empty(n_points, dtype=float)
CHUNK_SIZE = 256
chunks = n_points // CHUNK_SIZE
remainder = n_points % CHUNK_SIZE
for i in range(chunks):
# read 4 bytes and "cast" them to a float
data[i*CHUNK_SIZE:(i+1)*CHUNK_SIZE] = np.frombuffer(self.instr.read_bytes(CHUNK_SIZE*4), dtype=np.float32)
# struct.unpack("=f", self.instr.read_bytes(4))[0]
if remainder > 0:
data[chunks * CHUNK_SIZE:] = np.frombuffer(self.instr.read_bytes(remainder * 4), dtype=np.float32)
try:
read = self.instr.read_raw()
log.warning(f"Unexpected read from device: {read}")
except pyvisa.VisaIOError:
pass
return data
def auto_gain(self):
self.instr.write("AGAN")
# wait until the device responds again, meaning it is no longer busy
self.instr.write("SENS?")
i = 0
while i < 50:
try:
gain = self.instr.read().strip("\n")
return gain
except:
i += 1
raise RuntimeError("Could not get response from device after sending auto gain command")
@staticmethod
def enumerate_devices(query="(GPIB)|(USB)?*:INSTR"):
return enumerate_devices("SR830", query)
@staticmethod
def connect_device(name):
rm = pyvisa.ResourceManager()
instr = rm.open_resource(name)
return SR830(instr)
def __str__(self):
return "SR830"