diff --git a/Bentham.py b/Bentham.py index 69758b3..124a283 100644 --- a/Bentham.py +++ b/Bentham.py @@ -2,6 +2,13 @@ import pyBen from os import path +class DummyBentham: + def __init__(self): + pass + def return_parameters(self): return {} + def park(self): print("Parked") + def drive(self, wl): print(f"Set to {wl} nm") + class Bentham(): """ Controls the Bentham TMC300 monochromator. diff --git a/devices/Shutter.py b/devices/Shutter.py index 99fce9e..2108f95 100644 --- a/devices/Shutter.py +++ b/devices/Shutter.py @@ -4,6 +4,11 @@ class Shutter(): def __init__(self): self.daq_name = 'Dev1' +class DummyShutter(Shutter): + def __init__(self): + super().__init__() + def open_(self): print("Dummy shutter open") + def close_(self): print("Dummy shutter close") class ShutterPump(Shutter): """ diff --git a/devices/__pycache__/Shutter.cpython-311.pyc b/devices/__pycache__/Shutter.cpython-311.pyc index ad0ec69..f910937 100644 Binary files a/devices/__pycache__/Shutter.cpython-311.pyc and b/devices/__pycache__/Shutter.cpython-311.pyc differ diff --git a/measurement_device/impl/__pycache__/sr830.cpython-311.pyc b/measurement_device/impl/__pycache__/sr830.cpython-311.pyc index 3a2f972..af25bee 100644 Binary files a/measurement_device/impl/__pycache__/sr830.cpython-311.pyc and b/measurement_device/impl/__pycache__/sr830.cpython-311.pyc differ diff --git a/measurement_device/impl/sr830.py b/measurement_device/impl/sr830.py index 1f3c92c..767598b 100644 --- a/measurement_device/impl/sr830.py +++ b/measurement_device/impl/sr830.py @@ -95,8 +95,6 @@ class SR830(MeasurementDevice): except pyvisa.VisaIOError as e: raise RuntimeError(f"VisaIOError raised while writing command(s):\n'{cmd}'\n\nVisaIOError:\n{e}") - def get_frequency(self) -> float: - return float(self.query("FREQ?")) def query(self, query): return self.instr.query(query, delay=0.05).strip("\n") @@ -127,8 +125,37 @@ class SR830(MeasurementDevice): vals = self.query(f"SNAP? {what}").split(",") vals = map(float, vals) return vals - - def measureTODO(): pass + + def try_recover_from_communication_error(self, original_error): + """ + Try to get into a normal state by flushing the output queue + and reading the instrument status + """ + log.warning(f"Trying to recover from communication error: {original_error}") + # flush whatever is in the queue + try: + self.instr.read_raw() + except pyvisa.VisaIOError as e: + pass + try: + status = int(self.query("*ESR?")) + if status & 0b00110101 > 0: # check if INP, QRY, EXE or CMD is set + raise RuntimeError(f"Failed to recover from exception, device returned status {status:08b}:\n{original_error}") + except Exception as e: + raise RuntimeError(f"Failed to recover from the following exception:\n{original_error}\nThe following exception occurred while querying the device status:\n{e}") + log.info(f"Recovered from error") + + def check_overloads(self) -> bool: + status_lia = int(self.query("LIAS?")) + if status_lia & (1 << 0): # input amplifier overload + return True + elif status_lia & (1 << 1): # time constant filter overlaid + return True + elif status_lia & (1 << 2): # output overload + return True + return False + + def measureTODO(self): pass def read_value(self): """Read the value of R""" return float(self.query("OUTP? 3")) @@ -139,10 +166,94 @@ class SR830(MeasurementDevice): def test_connection(self): pass + # PARAMETERS + def get_frequency(self) -> float: + return float(self.query("FREQ?")) + + SENS = [ + 2e-9, 5e-9, 10e-9, 20e-9, 50e-9, 100e-9, 200e-9, 500e-9, + 1e-6, 2e-6, 5e-6, 10e-6, 20e-6, 50e-6, 100e-6, 200e-6, 500e-6, + 1e-3, 2e-3, 5e-3, 10e-3, 20e-3, 50e-3, 100e-3, 200e-3, 500e-3, + 1 + ] + def set_sensitivity_volt(self, volt): + if volt not in SR830.SENS: + raise ValueError(f"Invalid sensitivity voltage value: {volt}") + sens = SR830.SENS.index(volt) + self.run(f"SENS {sens}") + + def get_sensitivity_volt(self): + sens = int(self.query(f"SENS")) + return SR830.SENS[sens] + + OFLT =[ + 10e-6, 30e-6, 100e-6, 300e-6, + 1e-3, 3e-3, 10e-3, 30e-3, 100e-3, 300e-3, + 1, 3, 10, 30, 100, 300, + 1e3, 3e3, 10e3, 30e3, + ] + def set_time_constant_s(self, dt): + if dt not in SR830.OFLT: + raise ValueError(f"Invalid time constant value: {dt}. Must be one of {SR830.OFLT}") + oflt = SR830.OFLT.index(dt) + self.run(f"OFLT {oflt}") + + def get_time_constant_s(self): + oflt = int(self.query("OFLT?")) + return SR830.OFLT[oflt] + + + OFSL = [6, 12, 18, 24] + def set_filter_slope(self, slope_db_oct): + if slope_db_oct not in SR830.OFSL: + raise ValueError(f"Invalid filter slope value: {slope_db_oct}. Must be one of {SR830.OFSL}") + ofsl = SR830.OFSL.index(slope_db_oct) + self.run(f"OFSL {ofsl}") + + def get_filter_slope(self): + ofsl = int(self.query("OFSL?")) + return SR830.OFSL[ofsl] + + def get_wait_time_s(self): + """ + Get the wait time required to reach 99% of the final value. + See Manual 3-21 + :return: + """ + time_const = self.get_time_constant_s() + filter_slope = self.get_filter_slope() + if filter_slope == 6: return 5 * time_const + elif filter_slope == 12: return 7 * time_const + elif filter_slope == 18: return 9 * time_const + elif filter_slope == 24: return 10 * time_const + else: + raise ValueError(f"Invalid filter slope value: {filter_slope}") + + def set_sync_filter(self, sync): + if sync not in [0, 1]: + raise ValueError(f"Invalid sync value: {sync}, must be 0 (off) or 1 (on)") + self.run(f"SYNC {sync}") + + def get_sync_filter(self): + sync = int(self.query("SYNC?")) + return sync + + RMOD = ["High Reserve", "Normal", "Low Noise"] + def set_reserve(self, reserve): + if not reserve in SR830.RMOD: + raise ValueError(f"Invalid reserve value: {reserve}. Must be one of {SR830.RMOD}") + rmod = SR830.RMOD.index(reserve) + self.run(f"RMOD {rmod}") + + def get_reserve(self): + rmod = int(self.query("RMOD?")) + return SR830.RMOD[rmod] + + + CH1 = ["X", "R", "X Noise", "Aux In 1", "Aux In 2"] CH2 = ["Y", "Theta", "Y Noise", "Aux In 3", "Aux In 4"] SRAT = [62.5e-3, 125e-3, 250e-3, 500e-3, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, "Trigger"] - def buffer_setup(self, CH1="R", CH2="Theta", length=None, sample_rate=512): """ Prepare the device for a buffer (curve) measurement @@ -243,6 +354,11 @@ class SR830(MeasurementDevice): # struct.unpack("=f", self.instr.read_bytes(4))[0] if remainder > 0: data[chunks * CHUNK_SIZE:] = np.frombuffer(self.instr.read_bytes(remainder * 4), dtype=np.float32) + try: + read = self.instr.read_raw() + log.warning(f"Unexpected read from device: {read}") + except pyvisa.VisaIOError: + pass return data def auto_gain(self): diff --git a/test.py b/test.py index e891e04..23b4aa4 100644 --- a/test.py +++ b/test.py @@ -1,3 +1,5 @@ +import pyvisa + if __name__ == "__main__": import sys if __package__ is None: @@ -11,9 +13,9 @@ from time import sleep import numpy as np import scipy as scp -from Bentham import Bentham +from Bentham import Bentham, DummyBentham from devices.Xenon import Xenon -from devices.Shutter import ShutterProbe +from devices.Shutter import ShutterProbe, DummyShutter from .update_funcs import Monitor from .measurement_device.impl.sr830 import SR830 @@ -28,7 +30,147 @@ logging.basicConfig( logging.StreamHandler() ] ) +log = logging.getLogger(__name__) +def set_measurement_params(lockin: SR830, p: dict={}, **kwargs): + params = p | kwargs + key_to_setter = { + "time_constant_s": lockin.set_time_constant_s, + "filter_slope": lockin.set_filter_slope, + "sync_filter": lockin.set_sync_filter, + "reserve": lockin.set_reserve, + "sensitivity_volt": lockin.set_sensitivity_volt, + } + for k, v in params.items(): + if k not in key_to_setter.keys(): + raise KeyError(f"Invalid parameter {k}") + key_to_setter[k](v) + + +def set_offset_laser_only(lockin: SR830, shutter: ShutterProbe, wait_time_s): + """ + Set the R offset from the signal when only the laser is on. + This signal should be stray laser light and laser induced PL + :return: Offset as percentage of the full scale R + """ + log.info("Setting offset when the lamp is off.") + shutter.close_() + sleep(wait_time_s + 10) + lockin.run("AOFF 3") # auto offset R + R_offset_fs = float(lockin.query("OEXP? 3").split(",")[0]) # returns R offset and expand + return R_offset_fs + + +def _measure_both_sim(monochromator: Bentham, lockin: SR830, shutter: ShutterProbe, wl_range=(400, 750, 25), aux_DC="Aux In 4", offset_with_laser_only=True, monitor=None): + data = {} + lockin_params = { + "time_constant_s": 10, + # "time_constant_s": 100e-3, + "sensitivity_volt": 50e-6, + "filter_slope": 12, + "sync_filter": 1, + "reserve": "Normal", + } + measurement_params = { + "measurement_time_s": 30, + "sample_rate_Hz": 512, + } + + set_measurement_params(lockin, lockin_params) + + measurement_time_s = measurement_params["measurement_time_s"] + sample_rate_AC = measurement_params["sample_rate_Hz"] + n_bins_AC = measurement_time_s * sample_rate_AC # x sec messen mit werte pro sekunde + timeout_s = 60 + timeout_interval = 0.5 + # trigger on the falling edge, since the light comes through when the ref signal is low + # could of course also trigger on rising and apply 180° shift + lockin.run("RSLP 2") + # since we dont expect changes in our signal, we can use larger time constants and aggressive filter slope + # for better signal to noise + wait_time_s = lockin.get_wait_time_s() + + def run_lockin_cmd(cmd, n_try=2): + com_success = n_try + e = None + while com_success > 0: + try: + return cmd() + except pyvisa.VisaIOError as e: + lockin.try_recover_from_communication_error(e) + com_success -= 1 + raise e + + # 5s for setting buffer, + # 5s for get values and plot + print(f"Time estimate {(measurement_time_s + wait_time_s + 5 + 5)/60 * ((wl_range[1]-wl_range[0])/wl_range[2])} minutes") + input("Make sure the laser is turned on and press enter > ") + mon = monitor if monitor is not None else Monitor(r"$\lambda$ [nm]", [ + dict(ax=0, ylabel=r"$\Delta R$", color="green"), + dict(ax=1, ylabel=r"$\sigma_{\Delta R}$", color="green"), + dict(ax=2, ylabel=r"$R$", color="blue"), + dict(ax=3, ylabel=r"$\sigma_R$", color="blue"), + dict(ax=4, ylabel=r"$\Delta R/R$", color="red"), + dict(ax=5, ylabel=r"$\sigma_{\Delta R/R}$", color="red"), + ]) + mon.set_fig_title(f"Turn on laser and plug detector into A and {aux_DC} ") + + data["lock-in-params"] = lockin_params + data["measurement-params"] = measurement_params + full_scale_voltage = lockin_params["sensitivity_volt"] + if offset_with_laser_only: + mon.set_fig_title(f"Measuring baseline with lamp off") + R_offset_volt = set_offset_laser_only(lockin, shutter, wait_time_s) * full_scale_voltage + data["R_offset_volt_before"] = R_offset_volt + print(f"R_offset_volt_before {R_offset_volt}") + data["reference_freq_Hz_before"] = lockin.get_frequency() + + shutter.open_() + for i_wl, wl in enumerate(range(*wl_range)): + mon.set_ax_title(f"$\\lambda = {wl}$ nm") + run_lockin_cmd(lambda: lockin.buffer_setup(CH1="R", CH2=aux_DC, length=n_bins_AC, sample_rate=sample_rate_AC)) + mon.set_fig_title(f"Setting wavelength to {wl} nm") + monochromator.drive(wl) + mon.set_fig_title(f"Waiting for signal to stabilize") + # wait the wait time + sleep(wait_time_s) + mon.set_fig_title(f"Measuring...") + run_lockin_cmd(lambda: lockin.buffer_start_fill()) + t = timeout_s + while t > 0: + t -= timeout_interval + sleep(timeout_interval) + if run_lockin_cmd(lambda: lockin.buffer_is_done()): + break + if t < 0: raise RuntimeError("Timed out waiting for buffer measurement to finish") + arr = run_lockin_cmd(lambda: lockin.buffer_get_data(CH1=True, CH2=True)) + data[wl] = {} + data[wl]["raw"] = arr + # calculate means + means = np.mean(arr, axis=1) + errs = np.std(arr, axis=1) + dR = means[0] + R = means[1] + sdR = errs[1] + sR = errs[1] + data[wl] |= {"dR": dR, "sdR": sdR, "R": R, "sR": sR} + dR_R = dR / R + sdR_R = np.sqrt((sdR / R) + (dR * sR/R**2)) + data[wl] |= {"dR_R": dR_R, "sdR_R": sdR_R} + mon.update(wl, dR, sdR, R, sR, dR_R, sdR_R) + # if it fails, we still want the data returned + try: + if offset_with_laser_only: + mon.set_fig_title(f"Measuring baseline with lamp off") + R_offset_volt = set_offset_laser_only(lockin, shutter, wait_time_s) * full_scale_voltage + data["R_offset_volt_after"] = R_offset_volt + print(f"R_offset_volt_after {R_offset_volt}") + data["reference_freq_Hz_after"] = lockin.get_frequency() + except Exception as e: + print(e) + mon.set_fig_title("Photoreflectance") + mon.set_ax_title("") + return data, mon def _measure_both(monochromator: Bentham, lockin: SR830, shutter: ShutterProbe, wl_range=(400, 750, 25), AC=True, DC=True, monitor=None): mon = monitor if monitor is not None else Monitor(r"$\lambda$ [nm]", [ @@ -40,34 +182,56 @@ def _measure_both(monochromator: Bentham, lockin: SR830, shutter: ShutterProbe, dict(ax=3, ylabel=r"$\sigma_R$", color="blue"), dict(ax=4, ylabel=r"$\Delta R/R$", color="red"), # dict(ax=3, ylabel="R", color="blue"), - dict(ax=5, ylabel="Phase", color="orange", lim=(-180, 180)) + dict(ax=5, ylabel=r"$\theta$", color="orange", lim=(-180, 180)), + dict(ax=6, ylabel=r"$\sigma_\theta$", color="orange") ]) - N_bins = 512 shutter.open_() data_raw = [] data_wl = {} - sample_rate = 512 + # TODO these are only printed, not set! + time_constant = 30e-3 + filter_slope = 24 + sensitivity = 1.0 + + SYNC = 1 + sample_rate_AC = 64 + sample_rate_DC = 512 + n_bins_AC = 3 * sample_rate_AC # x sec messen mit werte pro sekunde + n_bins_DC = 10 * sample_rate_DC timeout_s = 60 timeout_interval = 0.5 - # lockin.run("SENS 17") # 1 mV/nA - lockin.run("SENS 20") # 10 mV/nA + # lockin.run("SENS 17") # 1 mV + # lockin.run("SENS 20") # 10 mV + # lockin.run("SENS 21") # 20 mV + lockin.run("SENS 26") # 1 V # trigger on the falling edge, since the light comes through when the ref signal is low # could of course also trigger on rising and apply 180° shift lockin.run("RSLP 2") - # since we dont expect changes in our signal, we can use large time constants and aggresive filter slope + # since we dont expect changes in our signal, we can use larger time constants and aggressive filter slope # for better signal to noise # lockin.run("OFLT 5") # 3 ms + lockin.run("OFLT 7") # 30 ms + # lockin.run("OFLT 8") # 100 ms + # lockin.run("OFLT 10") # 1 s lockin.run("RMOD 2") # low noise (small reserve) - lockin.run("OFLT 8") # 100 ms - lockin.run("OFSL 3") # 24dB/Oct ms - lockin.run("SYNC 0") # test without sync filter + # lockin.run("OFSL 0") # 6dB/Oct + lockin.run("OFSL 3") # 24dB/Oct + lockin.run(f"SYNC {SYNC}") # sync filter + + print(f"Time estimate {40 * (wl_range[1]-wl_range[0])/(wl_range[2]*60)} minutes") if AC: input("Plug the detector into lock-in port 'A/I' (front panel) and press enter > ") input("Make sure the laser is turned on and press enter > ") + mon.set_fig_title("Turn on laser and plug detector into A") for i_wl, wl in enumerate(range(*wl_range)): - lockin.buffer_setup(CH1="R", CH2="Theta", length=N_bins, sample_rate=sample_rate) + mon.set_ax_title(f"$\\lambda = {wl}$ nm") + lockin.buffer_setup(CH1="R", CH2="Theta", length=n_bins_AC, sample_rate=sample_rate_AC) + mon.set_fig_title(f"Setting wavelength to {wl} nm") monochromator.drive(wl) - sleep(1.5) # need to wait until lock-in R signal is stable + mon.set_fig_title(f"Waiting for signal to stabilize") + # wait time depends on filter and time constant, for 24dB/Oct and Sync on the minimum is ~12 time constants + sleep(1.0) + mon.set_fig_title(f"Measuring...") lockin.buffer_start_fill() t = timeout_s while t > 0: @@ -85,14 +249,19 @@ def _measure_both(monochromator: Bentham, lockin: SR830, shutter: ShutterProbe, stheta = scp.stats.circstd(arr[1,:], low=-180, high=180) data_wl[wl] = {"dR": dR, "Theta": theta, "sdR": sdR, "sTheta": stheta} # wl - dR, sdR, R, sR, dR/R, Theta - mon.update(wl, dR, sdR, None, None, None, stheta) + mon.update(wl, dR, sdR, None, None, None, theta, stheta) if DC: + mon.set_ax_title("") + mon.set_fig_title("Turn off laser and plug detector into Aux 1") input("Turn off the laser and press enter > ") input("Plug the detector into lock-in port 'Aux In 1' (rear panel) and press enter > ") for i_wl, wl in enumerate(range(*wl_range)): - lockin.buffer_setup(CH1="Aux In 1", CH2="Theta", length=N_bins, sample_rate=sample_rate) + mon.set_ax_title(f"$\\lambda = {wl}$ nm") + lockin.buffer_setup(CH1="Aux In 1", CH2="Theta", length=n_bins_DC, sample_rate=sample_rate_DC) + mon.set_fig_title(f"Setting wavelength to {wl} nm") monochromator.drive(wl) sleep(0.5) + mon.set_fig_title(f"Measuring...") lockin.buffer_start_fill() t = timeout_s while t > 0: @@ -113,9 +282,11 @@ def _measure_both(monochromator: Bentham, lockin: SR830, shutter: ShutterProbe, # wl - dR, sdR, R, sR, dR/R, Theta if AC: dR_R = data_wl[wl]["dR"] / data_wl[wl]["R"] - mon.override(wl, None, None, data_wl[wl]["R"], data_wl[wl]["sR"], dR_R, None) + mon.override(wl, None, None, data_wl[wl]["R"], data_wl[wl]["sR"], dR_R, None, None) else: - mon.update(wl, None, None, data_wl[wl]["R"], data_wl[wl]["sR"], None, None) + mon.update(wl, None, None, data_wl[wl]["R"], data_wl[wl]["sR"], None, None, None) + mon.set_fig_title(f"Time constant = {time_constant} s\nFilter slope = {filter_slope} dB/oct\nSync Filter = {SYNC}\nSensitivity = {sensitivity*1e3} mV") + mon.set_ax_title("") return data_wl, data_raw, mon @@ -126,9 +297,8 @@ def _measure(monochromator: Bentham, lamp: Xenon, lockin: SR830, shutter: Shutte # dict(ax=1, ylabel="Ref", color="blue", lim=(0, 5)), dict(ax=0, ylabel=r"$\Delta R$", color="green"), dict(ax=1, ylabel=r"$\sigma_{\Delta R}$", color="green"), - # dict(ax=3, ylabel="R", color="blue"), dict(ax=2, ylabel="Phase", color="orange", lim=(-180, 180)) - + # dict(ax=3, ylabel="R", color="blue"), ]) N_bins = 100 dt = 0.01 @@ -189,10 +359,15 @@ def measure(wl_range=(400, 500, 2)): def measure_both(**kwargs): return _measure_both(mcm, lockin, shutter, **kwargs) +def measure_both_sim(**kwargs): + return _measure_both_sim(mcm, lockin, shutter, **kwargs) + if __name__ == "__main__": mcm = Bentham() + shutter = ShutterProbe() # mcm.park() lamp = Xenon() lockin = SR830.connect_device(SR830.enumerate_devices()[0]) # lockin = Model7260.connect_device(Model7260.enumerate_devices()[0]) - shutter = ShutterProbe() \ No newline at end of file + # mcm = DummyBentham() + # shutter = DummyShutter() diff --git a/update_funcs.py b/update_funcs.py index b60b262..74ea752 100644 --- a/update_funcs.py +++ b/update_funcs.py @@ -32,6 +32,21 @@ class Monitor: if opt["ylabel"]: ax.set_ylabel(opt["ylabel"]) + def set_ax_title(self, title): + self.ax[0].set_title(title) + self.fig1.canvas.draw() + self.fig1.canvas.flush_events() + + def set_fig_title(self, title): + self.fig1.suptitle(title) + self.fig1.canvas.draw() + self.fig1.canvas.flush_events() + + + def reset(self): + self.xdata = [] + self.ydatas = [[] for _ in range(len(self.ydatas))] + def override(self, xval, *yvals): idx = self.xdata.index(xval) for i, y in enumerate(yvals):