import pyvisa import struct # for converting bytes to float # import pkg_resources import numpy as np from ..base import LockInAmp from prsctrl.utility.visa import enumerate_devices import logging log = logging.getLogger(__name__) class SR830(LockInAmp): """ Wrapper class for the SR830 controlled via pyvisa """ def __init__(self, instr, check_front_switch=True): self._buffer_length = None self.instr = instr init_script = """ ' INPUT ' set input A ISRC 0 ' set float IGND 0 ' set AC coupling ICPL 0 ' Enable 1x and 2x Line notch filters ILIN 3 ' Enable synchronous filter < 200Hz SYNC 1 ' DATA COLLECTION ' The quantities on the displays are the ones stored in the buffer ' Show R and Phase on the displays DDEF 1,1,0 DDEF 2,1,0 ' Set sample rate to 256 Hz SRAT 12 ' Set buffer loop mode SEND 1 ' REFERENCE ' Set external FMOD 0 ' Set Rising edge RSLP 1 ' Set detection harmonic to 1' HARM 1 """ self.run(init_script) def __del__(self): """Properly close the instrument connection""" self.instr.close() # RUN COMMANDS ON THE DEVICE def run(self, code, split=True): """ Run SCPI code on the device by writing it. Empty lines, leading whitespaces and lines starting with ' or # are ignored. Parameters ---------- code : str SCPI commands """ script = [] if split else '' for line in code.strip(" ").split("\n"): l = line.strip(" ") if len(l) == 0 or l[0] in "#'": continue if split: script.append(l) else: script += l + "\n" if split: for cmd in script: self.run_and_check(cmd) else: self.run_and_check(cmd) def run_and_check(self, cmd): """Run a command and check for errors afterwards by reading the standard event status byte""" try: log.info(f"Running command(s): {cmd}") ret = self.instr.write(cmd) status = int(self.query("*ESR?")) if status & (1 << 4): raise RuntimeError(f"EXE bit set (Execution error) after command(s):\n{cmd}\n") elif status & (1 << 5): raise RuntimeError(f"CMD bit set (Illegal command) after command(s):\n{cmd}\n") # if ret != 8: # raise RuntimeError(f"Error while writing command(s):\n'{script}'\n\nDevice returned code {ret}") except pyvisa.VisaIOError as e: raise RuntimeError(f"VisaIOError raised while writing command(s):\n'{cmd}'\n\nVisaIOError:\n{e}") def query(self, query): return self.instr.query(query, delay=0.05).strip("\n") def snap(self, what="3,4,5,7"): """ Parameters ---------- what : TYPE, optional 1 X 2 Y 3 R 4 θ 5 Aux In 1 6 Aux In 2 7 Aux In 3 8 Aux In 4 9 Reference Frequency 10 CH1 display 11 CH2 display The default is "3,4,5,7". Returns ------- vals : list[float] Values converted to float """ vals = self.query(f"SNAP? {what}").split(",") vals = map(float, vals) return vals def try_recover_from_communication_error(self, original_error): """ Try to get into a normal state by flushing the output queue and reading the instrument status """ log.warning(f"Trying to recover from communication error: {original_error}") # flush whatever is in the queue try: self.instr.read_raw() except pyvisa.VisaIOError as e: pass try: status = int(self.query("*ESR?")) if status & 0b00110101 > 0: # check if INP, QRY, EXE or CMD is set raise RuntimeError(f"Failed to recover from exception, device returned status {status:08b}:\n{original_error}") except Exception as e: raise RuntimeError(f"Failed to recover from the following exception:\n{original_error}\nThe following exception occurred while querying the device status:\n{e}") log.info(f"Recovered from error") def check_overloads(self) -> bool|str: status_lia = int(self.query("LIAS?")) if status_lia & (1 << 0): # input amplifier overload return "Input Amplifier" elif status_lia & (1 << 1): # time constant filter overlaid return "Time Constant Filter" elif status_lia & (1 << 2): # output overload return "Output" return False OUTP = ["X", "Y", "R", "theta"] def read_value(self, which: str): if which not in self.OUTP: raise ValueError(f"Invalid output: {which}. Must be one of {self.OUTP}") outp = self.OUTP.index(which) + 1 return float(self.query(f"OUTP? {outp}")) def reset(self): self.instr.write("*RST") def test_connection(self): pass # REFERENCE FMOD = ["External", "Internal"] def set_reference(self, reference): if not reference in self.FMOD: raise ValueError(f"Invalid reference: {reference}. Must be one of {self.FMOD}") fmod = self.FMOD.index(reference) self.run(f"FMOD {fmod}") def set_frequency_Hz(self, frequency_Hz): self.run(f"FREQ {frequency_Hz}") def get_frequency_Hz(self) -> float: return float(self.query("FREQ?")) RSLP = ["Sine", "Rising Edge", "Falling Edge"] def set_reference_trigger(self, trigger): if not trigger in self.RSLP: raise ValueError(f"Invalid trigger: {trigger}. Must be one of {self.RSLP}") rslp = self.RSLP.index(trigger) self.run(f"RSLP {rslp}") # PARAMETERS SENS = [ 2e-9, 5e-9, 10e-9, 20e-9, 50e-9, 100e-9, 200e-9, 500e-9, 1e-6, 2e-6, 5e-6, 10e-6, 20e-6, 50e-6, 100e-6, 200e-6, 500e-6, 1e-3, 2e-3, 5e-3, 10e-3, 20e-3, 50e-3, 100e-3, 200e-3, 500e-3, 1 ] def set_sensitivity_volt(self, volt): if volt not in SR830.SENS: raise ValueError(f"Invalid sensitivity voltage value: {volt}") sens = SR830.SENS.index(volt) self.run(f"SENS {sens}") def get_sensitivity_volt(self): sens = int(self.query(f"SENS")) return SR830.SENS[sens] OFLT =[ 10e-6, 30e-6, 100e-6, 300e-6, 1e-3, 3e-3, 10e-3, 30e-3, 100e-3, 300e-3, 1, 3, 10, 30, 100, 300, 1e3, 3e3, 10e3, 30e3, ] def set_time_constant_s(self, dt): if dt not in SR830.OFLT: raise ValueError(f"Invalid time constant value: {dt}. Must be one of {SR830.OFLT}") oflt = SR830.OFLT.index(dt) self.run(f"OFLT {oflt}") def get_time_constant_s(self): oflt = int(self.query("OFLT?")) return SR830.OFLT[oflt] OFSL = [6, 12, 18, 24] def set_filter_slope(self, slope_db_oct): if slope_db_oct not in SR830.OFSL: raise ValueError(f"Invalid filter slope value: {slope_db_oct}. Must be one of {SR830.OFSL}") ofsl = SR830.OFSL.index(slope_db_oct) self.run(f"OFSL {ofsl}") def get_filter_slope(self): ofsl = int(self.query("OFSL?")) return SR830.OFSL[ofsl] def get_wait_time_s(self, time_const: float|None=None, filter_slope: int|None=None): """ Get the wait time required to reach 99% of the final value. :param time_const: If not passed, the value will be read from the device :param filter_slop: If not passed, the value will be read from the device See Manual 3-21 :return: """ if not time_const: time_const = self.get_time_constant_s() if not filter_slope: filter_slope = self.get_filter_slope() if filter_slope == 6: return 5 * time_const elif filter_slope == 12: return 7 * time_const elif filter_slope == 18: return 9 * time_const elif filter_slope == 24: return 10 * time_const else: raise ValueError(f"Invalid filter slope value: {filter_slope}") def set_sync_filter(self, sync): if sync not in [0, 1]: raise ValueError(f"Invalid sync value: {sync}, must be 0 (off) or 1 (on)") self.run(f"SYNC {sync}") def get_sync_filter(self): sync = int(self.query("SYNC?")) return sync RMOD = ["High Reserve", "Normal", "Low Noise"] def set_reserve(self, reserve): if not reserve in SR830.RMOD: raise ValueError(f"Invalid reserve value: {reserve}. Must be one of {SR830.RMOD}") rmod = SR830.RMOD.index(reserve) self.run(f"RMOD {rmod}") def get_reserve(self): rmod = int(self.query("RMOD?")) return SR830.RMOD[rmod] CH1 = ["X", "R", "X Noise", "Aux In 1", "Aux In 2"] CH2 = ["Y", "Theta", "Y Noise", "Aux In 3", "Aux In 4"] SRAT = [62.5e-3, 125e-3, 250e-3, 500e-3, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, "Trigger"] def buffer_setup(self, CH1="R", CH2="Theta", length=None, sample_rate=512): """ Prepare the device for a buffer (curve) measurement :param CH1: Source of channel 1: X, R, X Noise, Aux In 1, Aux In 2 :param CH2: Source of channel 2: Y, Theta, Y Noise, Aux In 3, Aux In 4 :param length: Number of values to store in the buffer, or None to use the maximum number of values :param sample_rate: Sample rate in Hz or "Trigger", see SR830 Manual """ # stop data storage and clear buffer self.run(f"PAUS") self.run(f"REST") # set displays, whatever is shown on the display is stored in the buffers try: i_CH1 = SR830.CH1.index(CH1) except KeyError: raise KeyError(f"CH1='{CH1}' but must be one of {SR830.CH1}") try: i_CH2 = SR830.CH2.index(CH2) except KeyError: raise KeyError(f"CH2='{CH2}' but must be one of {SR830.CH2}") self.run(f"DDEF 1,{i_CH1},0") self.run(f"DDEF 2,{i_CH2},0") try: srat = SR830.SRAT.index(sample_rate) except: raise KeyError(f"SRAT={sample_rate} but must be one of {SR830.SRAT}") self.run(f"SRAT {srat}") # length if length is None: self._buffer_length = SR830.max_length else: self._buffer_length = length if self._buffer_length > SR830.max_length: raise ValueError(f"Maximum buffer length is {SR830.max_length} but {length} was given.") max_length = 16383 def buffer_start_fill(self): if self._buffer_length is None: raise RuntimeError(f"Buffer not set up, call buffer_setup() first.") self.run("SEND 0") # shot mode self.run("STRT") # start take data def buffer_is_done(self) -> bool: return self.buffer_get_n_points() >= self._buffer_length def buffer_get_n_points(self) -> int: return int(self.query("SPTS?")) def buffer_get_data(self, CH1=True, CH2=True): """ Get the data from the buffer. :return: np.ndarray | tuple[np.ndarray, np.ndarray] """ if self._buffer_length is None: raise RuntimeError(f"Buffer not set up, call buffer_setup() first.") self.run("PAUS") take_n_points = min(self.buffer_get_n_points(), self._buffer_length) # there might be more points stored then was required # if CH1 and CH2: # data = (np.empty((2, take_n_points), dtype=float) for _ in range(2)) # elif CH1 or CH2: # data = np.empty((1, take_n_points), dtype=float) if (not CH1) and (not CH2): raise ValueError("Either CH1 or CH2 must be set True.") data = [] if CH1: data.append(self._buffer_get_data(1, 0, take_n_points)) if CH2: data.append(self._buffer_get_data(2, 0, take_n_points)) return data def _buffer_get_data_slow(self, CH=1, start=0, n_points=None): if n_points is None: n_points = SR830.max_length self.instr.write(f"TRCB? {CH},{start},{n_points}") # reading all ot once doesnt work because visa times out, even with increased timeout # data = self.instr.read_binary_values(datatype="f", header_fmt="empty", data_points=n_points, container=np.ndarray) # reading one at a time works but is slow data = np.empty(n_points, dtype=float) for i in range(n_points): # read 4 bytes and "cast" them to a float data[i] = struct.unpack("=f", self.instr.read_bytes(4))[0] return data def _buffer_get_data(self, CH=1, start=0, n_points=None): if n_points is None: n_points = SR830.max_length self.instr.write(f"TRCB? {CH},{start},{n_points}") data = np.empty(n_points, dtype=float) CHUNK_SIZE = 256 chunks = n_points // CHUNK_SIZE remainder = n_points % CHUNK_SIZE for i in range(chunks): # read 4 bytes and "cast" them to a float data[i*CHUNK_SIZE:(i+1)*CHUNK_SIZE] = np.frombuffer(self.instr.read_bytes(CHUNK_SIZE*4), dtype=np.float32) # struct.unpack("=f", self.instr.read_bytes(4))[0] if remainder > 0: data[chunks * CHUNK_SIZE:] = np.frombuffer(self.instr.read_bytes(remainder * 4), dtype=np.float32) try: read = self.instr.read_raw() log.warning(f"Unexpected read from device: {read}") except pyvisa.VisaIOError: pass return data def auto_gain(self): self.instr.write("AGAN") # wait until the device responds again, meaning it is no longer busy self.instr.write("SENS?") i = 0 while i < 50: try: gain = self.instr.read().strip("\n") return gain except: i += 1 raise RuntimeError("Could not get response from device after sending auto gain command") @staticmethod def enumerate_devices(query="(GPIB)|(USB)?*:INSTR"): return enumerate_devices("SR830", query) @staticmethod def connect_device(name): rm = pyvisa.ResourceManager() instr = rm.open_resource(name) return SR830(instr) def __str__(self): return "SR830" def set_measurement_params(lockin: SR830, p: dict={}, **kwargs): params = p | kwargs key_to_setter = { "time_constant_s": lockin.set_time_constant_s, "filter_slope": lockin.set_filter_slope, "sync_filter": lockin.set_sync_filter, "reserve": lockin.set_reserve, "sensitivity_volt": lockin.set_sensitivity_volt, "frequency_Hz": lockin.set_frequency_Hz, "reference": lockin.set_reference, "reference_trigger": lockin.set_reference_trigger, } for k, v in params.items(): if k not in key_to_setter.keys(): raise KeyError(f"Invalid parameter {k}") key_to_setter[k](v)