import pyvisa import struct # for converting bytes to float from time import sleep # import pkg_resources import os from typing import Callable import numpy as np from ..base import MeasurementDevice from ...utility.visa import enumerate_devices import logging log = logging.getLogger(__name__) class SR830(MeasurementDevice): """ Wrapper class for the SR830 controlled via pyvisa """ def __init__(self, instr, check_front_switch=True): self._buffer_length = None self.instr = instr init_script = """ ' INPUT ' set input A ISRC 0 ' set float IGND 0 ' set AC coupling ICPL 0 ' Enable 1x and 2x Line notch filters ILIN 3 ' Enable synchronous filter < 200Hz SYNC 1 ' DATA COLLECTION ' The quantities on the displays are the ones stored in the buffer ' Show R and Phase on the displays DDEF 1,1,0 DDEF 2,1,0 ' Set sample rate to 256 Hz SRAT 12 ' Set buffer loop mode SEND 1 ' REFERENCE ' Set external FMOD 0 ' Set Rising edge RSLP 1 ' Set detection harmonic to 1' HARM 1 """ self.run(init_script) def __del__(self): """Properly close the instrument connection""" self.instr.close() # RUN COMMANDS ON THE DEVICE def run(self, code, split=True): """ Run SCPI code on the device by writing it. Empty lines, leading whitespaces and lines starting with ' or # are ignored. Parameters ---------- code : str SCPI commands """ script = [] if split else '' for line in code.strip(" ").split("\n"): l = line.strip(" ") if len(l) == 0 or l[0] in "#'": continue if split: script.append(l) else: script += l + "\n" if split: for cmd in script: self.run_and_check(cmd) else: self.run_and_check(cmd) def run_and_check(self, cmd): """Run a command and check for errors afterwards by reading the standard event status byte""" try: log.info(f"Running command(s): {cmd}") ret = self.instr.write(cmd) status = int(self.query("*ESR?")) if status & (1 << 4): raise RuntimeError(f"EXE bit set (Execution error) after command(s):\n{cmd}\n") elif status & (1 << 5): raise RuntimeError(f"CMD bit set (Illegal command) after command(s):\n{cmd}\n") # if ret != 8: # raise RuntimeError(f"Error while writing command(s):\n'{script}'\n\nDevice returned code {ret}") except pyvisa.VisaIOError as e: raise RuntimeError(f"VisaIOError raised while writing command(s):\n'{cmd}'\n\nVisaIOError:\n{e}") def get_frequency(self) -> float: return float(self.query("FREQ?")) def query(self, query): return self.instr.query(query, delay=0.05).strip("\n") def snap(self, what="3,4,5,7"): """ Parameters ---------- what : TYPE, optional 1 X 2 Y 3 R 4 θ 5 Aux In 1 6 Aux In 2 7 Aux In 3 8 Aux In 4 9 Reference Frequency 10 CH1 display 11 CH2 display The default is "3,4,5,7". Returns ------- vals : list[float] Values converted to float """ vals = self.query(f"SNAP? {what}").split(",") vals = map(float, vals) return vals def measureTODO(): pass def read_value(self): """Read the value of R""" return float(self.query("OUTP? 3")) def reset(self): self.instr.write("*RST") def test_connection(self): pass CH1 = ["X", "R", "X Noise", "Aux In 1", "Aux In 2"] CH2 = ["Y", "Theta", "Y Noise", "Aux In 3", "Aux In 4"] SRAT = [62.5e-3, 125e-3, 250e-3, 500e-3, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, "Trigger"] def buffer_setup(self, CH1="R", CH2="Theta", length=None, sample_rate=512): """ Prepare the device for a buffer (curve) measurement :param CH1: Source of channel 1: X, R, X Noise, Aux In 1, Aux In 2 :param CH2: Source of channel 2: Y, Theta, Y Noise, Aux In 3, Aux In 4 :param length: Number of values to store in the buffer, or None to use the maximum number of values :param sample_rate: Sample rate in Hz or "Trigger", see SR830 Manual """ # stop data storage and clear buffer self.run(f"PAUS") self.run(f"REST") # set displays, whatever is shown on the display is stored in the buffers try: i_CH1 = SR830.CH1.index(CH1) except KeyError: raise KeyError(f"CH1='{CH1}' but must be one of {SR830.CH1}") try: i_CH2 = SR830.CH2.index(CH2) except KeyError: raise KeyError(f"CH2='{CH2}' but must be one of {SR830.CH2}") self.run(f"DDEF 1,{i_CH1},0") self.run(f"DDEF 2,{i_CH2},0") try: srat = SR830.SRAT.index(sample_rate) except: raise KeyError(f"SRAT={sample_rate} but must be one of {SR830.SRAT}") self.run(f"SRAT {srat}") # length if length is None: self._buffer_length = SR830.max_length else: self._buffer_length = length if self._buffer_length > SR830.max_length: raise ValueError(f"Maximum buffer length is {SR830.max_length} but {length} was given.") max_length = 16383 def buffer_start_fill(self): if self._buffer_length is None: raise RuntimeError(f"Buffer not set up, call buffer_setup() first.") self.run("SEND 0") # shot mode self.run("STRT") # start take data def buffer_is_done(self) -> bool: return self.buffer_get_n_points() >= self._buffer_length def buffer_get_n_points(self) -> int: return int(self.query("SPTS?")) def buffer_get_data(self, CH1=True, CH2=True): """ Get the data from the buffer. :return: np.ndarray Returns a numpy of shape (<1 if one channel, 2 if both channels>, n_points) """ if self._buffer_length is None: raise RuntimeError(f"Buffer not set up, call buffer_setup() first.") self.run("PAUS") take_n_points = min(self.buffer_get_n_points(), self._buffer_length) # there might be more points stored then was required if CH1 and CH2: data = np.empty((2, take_n_points), dtype=float) elif CH1 or CH2: data = np.empty((1, take_n_points), dtype=float) else: raise ValueError("Either CH1 or CH2 must be set True.") if CH1: data[ 0, :] = self._buffer_get_data(1, 0, take_n_points)[:] if CH2: data[-1, :] = self._buffer_get_data(2, 0, take_n_points)[:] return data def _buffer_get_data_slow(self, CH=1, start=0, n_points=None): if n_points is None: n_points = SR830.max_length self.instr.write(f"TRCB? {CH},{start},{n_points}") # reading all ot once doesnt work because visa times out, even with increased timeout # data = self.instr.read_binary_values(datatype="f", header_fmt="empty", data_points=n_points, container=np.ndarray) # reading one at a time works but is slow data = np.empty(n_points, dtype=float) for i in range(n_points): # read 4 bytes and "cast" them to a float data[i] = struct.unpack("=f", self.instr.read_bytes(4))[0] return data def _buffer_get_data(self, CH=1, start=0, n_points=None): if n_points is None: n_points = SR830.max_length self.instr.write(f"TRCB? {CH},{start},{n_points}") data = np.empty(n_points, dtype=float) CHUNK_SIZE = 256 chunks = n_points // CHUNK_SIZE remainder = n_points % CHUNK_SIZE for i in range(chunks): # read 4 bytes and "cast" them to a float data[i*CHUNK_SIZE:(i+1)*CHUNK_SIZE] = np.frombuffer(self.instr.read_bytes(CHUNK_SIZE*4), dtype=np.float32) # struct.unpack("=f", self.instr.read_bytes(4))[0] if remainder > 0: data[chunks * CHUNK_SIZE:] = np.frombuffer(self.instr.read_bytes(remainder * 4), dtype=np.float32) return data def auto_gain(self): self.instr.write("AGAN") # wait until the device responds again, meaning it is no longer busy self.instr.write("SENS?") i = 0 while i < 50: try: gain = self.instr.read().strip("\n") return gain except: i += 1 raise RuntimeError("Could not get response from device after sending auto gain command") @staticmethod def enumerate_devices(query="(GPIB)|(USB)?*:INSTR"): return enumerate_devices("SR830", query) @staticmethod def connect_device(name): rm = pyvisa.ResourceManager() instr = rm.open_resource(name) return SR830(instr) def __str__(self): return "SR830"