rename prsctrl

This commit is contained in:
JohannesDittloff 2025-05-08 13:07:22 +02:00
parent 5894b5c778
commit 7f7561e4d9
91 changed files with 1388 additions and 490 deletions

5
.gitignore vendored
View File

@ -4,5 +4,6 @@
__pychache__/ __pychache__/
__pychache__/** __pychache__/**
cpdctrl cpdctrl
benhw64.dll prsctrl/benhw64.dll
test2.py prsctrl/test2.py
*benhw64.dll*

View File

@ -1,64 +0,0 @@
from pylablib.devices import NI
class Shutter():
def __init__(self):
self.daq_name = 'Dev1'
class DummyShutter(Shutter):
def __init__(self):
super().__init__()
def open_(self): print("Dummy shutter open")
def close_(self): print("Dummy shutter close")
class ShutterPump(Shutter):
"""
Controls shutter between opo and sample.
Intended to be used instead of switching laser on and off.
Not yet in use.
"""
def __init__(self):
super().__init__()
def open_(self):
with NI.NIDAQ(self.daq_name) as shutter:
shutter.add_voltage_output('vout', 'ao1', (0,5))
shutter.set_voltage_outputs('vout', 5)
print('Pump shutter opened')
def close_(self):
with NI.NIDAQ(self.daq_name) as shutter:
shutter.add_voltage_output('vout', 'ao1', (0,5))
shutter.set_voltage_outputs('vout', 0)
print('Pump shutter closed')
class ShutterProbe(Shutter):
"""
Controls shutter between bentham TMC300 and sample.
Blocks probe light when not needed.
"""
def __init__(self):
super().__init__()
def open_(self):
with NI.NIDAQ(self.daq_name) as shutter:
shutter.add_voltage_output('vout', 'ao0', (0,5))
shutter.set_voltage_outputs('vout', 5)
print('Probe shutter opened')
def close_(self):
with NI.NIDAQ(self.daq_name) as shutter:
shutter.add_voltage_output('vout', 'ao0', (0,5))
shutter.set_voltage_outputs('vout', 0)
print('Probe shutter closed')
if __name__ == '__main__':
from DeviceManager import DeviceManager
dm = DeviceManager()
#dm.shutter_probe.open_()
dm.shutter_probe.close_()
#dm.shutter_pump.open_()
#dm.shutter_pump.close_()

View File

@ -1,40 +0,0 @@
from .base import MeasurementDevice
TYPENAME_TEST = "Test"
TYPENAME_SR830 = "SR830"
TYPENAME_MODEL7260 = "Model7260"
try:
from .impl.sr830 import SR830
except ImportError:
pass
from .impl.test import TestVoltageMeasurementDevice
def list_devices() -> dict[str,list[str]]:
devices = {
TYPENAME_TEST: ["Measurement Dummy Device"],
}
try:
from .impl.sr830 import SR830
devices[TYPENAME_SR830] = SR830.enumerate_devices()
except ImportError:
pass
try:
from .impl.model7260 import Model7260
devices[TYPENAME_MODEL7260] = Model7260.enumerate_devices()
except ImportError:
pass
return devices
def connect_device(type_name: str, device_name: str) -> MeasurementDevice:
if type_name == TYPENAME_TEST:
return TestVoltageMeasurementDevice()
elif type_name == TYPENAME_SR830:
try:
from .impl.sr830 import SR830
return SR830.connect_device(device_name)
except ImportError as e:
raise ValueError(f"SR830 not available: {e}")
raise ValueError(f"Unknown device type {type_name}")

View File

@ -1,62 +0,0 @@
from abc import ABC, abstractmethod
from typing import Callable
"""
Created on Tue Jan 21 16:19:01 2025
@author: Matthias Quintern
"""
class MeasurementDevice(ABC):
@abstractmethod
def test_connection(self) -> None:
"""
Verify that the device is still properly connected.
If not, raises ConnectionError
"""
pass
# RUN COMMANDS ON THE DEVICE
@abstractmethod
def run(self, code, verbose=False):
pass
@abstractmethod
def reset(self, verbose=False):
pass
@abstractmethod
def read_value(self) -> tuple[float, float]:
"""
Read a single value
Returns
-------
[timestamp, voltage]
"""
pass
# def measure(self, interval: int, update_func: Callable[None, [int, float, float]]|None=None, max_measurements:int|None=None):
@abstractmethod
def measureTODO():
"""
Take voltage readings after <interval> milliseconds.
Parameters
----------
interval : int
Number of milliseconds to wait between readings.
update_func : Callable[None, [int, float, float]] or None, optional
A function that is called after each reading with parameters <n_reading>, <time>, <voltage>. The default is None.
max_measurements : int or None, optional
Number of readings to perform. Set to None for infinite. The default is None.
Returns
-------
None.
"""
pass
@abstractmethod
def __str__(self):
pass

View File

@ -1,61 +0,0 @@
from ..base import MeasurementDevice
from typing import Callable
from time import time as now
import numpy as np
class TestVoltageMeasurementDevice(MeasurementDevice):
def __init__(self, amplitude: float=1.0, frequency: float=20.0):
super().__init__()
self.amplitude = amplitude
self.frequency = frequency
self.t0 = now()
def test_connection(self) -> None:
pass
# RUN COMMANDS ON THE DEVICE
def run(self, code, verbose=False):
pass
def run_script(self, script_path, verbose=False):
pass
def reset(self, verbose=False):
pass
def read_value(self) -> tuple[float, float]:
"""
Read a single value
Returns
-------
[timestamp, voltage]
"""
t = now() - self.t0
v = self.amplitude * np.sin(2 * np.pi * t / self.frequency)
return t, v
# def measure(self, interval: int, update_func: Callable[None, [int, float, float]] | None = None,
# max_measurements: int | None = None):
def measureTODO():
"""
Take voltage readings after <interval> milliseconds.
Parameters
----------
interval : int
Number of milliseconds to wait between readings.
update_func : Callable[None, [int, float, float]] or None, optional
A function that is called after each reading with parameters <n_reading>, <time>, <voltage>. The default is None.
max_measurements : int or None, optional
Number of readings to perform. Set to None for infinite. The default is None.
Returns
-------
None.
"""
pass
def __str__(self):
return "Simulated Voltage Measurement Device"

0
prsctrl/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

Binary file not shown.

View File

@ -0,0 +1,34 @@
from .base import Lamp
TYPENAME_DUMMY = "Dummy"
TYPENAME_XENON = "Xenon"
try:
from .impl.sr830 import SR830
except ImportError:
pass
from .impl.dummy import DummyLamp
def list_devices() -> dict[str,list[str]]:
devices = {
TYPENAME_DUMMY: ["Dummy Lamp"],
TYPENAME_XENON: ["Xenon"]
}
try:
from .impl.daq import ShutterDAQ
devices[TYPENAME_XENON] = SR830.enumerate_devices()
except ImportError:
pass
return devices
def connect_device(type_name: str, device_name: str) -> Lamp:
if type_name == TYPENAME_DUMMY:
return DummyLamp()
elif type_name == TYPENAME_XENON:
try:
from .impl.xenon import Xenon
return Xenon()
except ImportError as e:
raise ValueError(f"Xenon not available: {e}")
raise ValueError(f"Unknown device type {type_name}")

Binary file not shown.

View File

@ -0,0 +1,15 @@
from abc import ABC, abstractmethod
class Lamp:
@abstractmethod
def on(self):
pass
@abstractmethod
def off(self):
pass
@abstractmethod
def is_on(self) -> bool:
pass
@abstractmethod
def __repr__(self):
pass

View File

View File

@ -0,0 +1,14 @@
import logging
log = logging.getLogger(__name__)
from ..base import Lamp
class DummyLamp(Lamp):
def __init__(self):
super().__init__()
def open(self):
log.info("Dummy-Lamp on")
def close(self):
log.info("Dummy-Lamp off")
def __repr__(self):
return "Dummy-Lamp"

View File

@ -1,32 +1,40 @@
import bendev import bendev
from Bentham import Bentham from prsctrl.Bentham import Bentham
import time import time
import datetime import datetime
class Xenon(): import logging
log = logging.getLogger(__name__)
from ..base import Lamp
class Xenon(Lamp):
def __init__(self): def __init__(self):
super().__init__()
self.ps_serial_num = '32540/2' self.ps_serial_num = '32540/2'
self.state = 0 self.state = 0
self.default_current = 5.4010 self.default_current = 5.4010
def on(self):
psu = self.apply_current(self.default_current)
psu.query(":SYST:ERR:COUNT?")
psu.write("OUTP ON"); psu.query("OUTP?")
log.info("Xenon lamp switched on")
self.state = 1
def start(self): def start(self):
""" """
Starts xenon probe light lamp. Assumes lamp is connected Starts xenon probe light lamp. Assumes lamp is connected
to given power supply. to given power supply.
Bentham TMC300 is also parked to ensure later operation. Bentham TMC300 is also parked to ensure later operation.
""" """
psu = self.apply_current(self.default_current) self.on()
psu.query(":SYST:ERR:COUNT?")
psu.write("OUTP ON"); psu.query("OUTP?")
print('Xenon lamp switched on')
Bentham().park() Bentham().park()
self.state = 1
def stop(self): def off(self):
psu = bendev.Device(self.ps_serial_num) psu = bendev.Device(self.ps_serial_num)
psu.query(":SYST:ERR:COUNT?") psu.query(":SYST:ERR:COUNT?")
psu.write("OUTP OFF"); psu.query("OUTP?") psu.write("OUTP OFF"); psu.query("OUTP?")
print('Xenon lamp switched off') log.info('Xenon lamp switched off')
self.state = 0 self.state = 0
def reset_burntime(self): def reset_burntime(self):
@ -80,15 +88,3 @@ class Xenon():
time.sleep(1) time.sleep(1)
print('') print('')
self.start() self.start()
if __name__ == '__main__':
from DeviceManager import DeviceManager
dm = DeviceManager()
#dm.xenon.apply_current(5.41)
#dm.xenon.initialize()
dm.xenon.stop()
#dm.xenon.initialize()
#dm.xenon.preset_start(8,00)

View File

@ -0,0 +1,28 @@
from .base import Lock_In_Amp
TYPENAME_DUMMY = "Dummy"
TYPENAME_SR830 = "SR830"
from .impl.dummy import DummyLockInAmp
def list_devices() -> dict[str,list[str]]:
devices = {
TYPENAME_DUMMY: ["Dummy Lock-In Amplifier"],
}
try:
from .impl.sr830 import SR830
devices[TYPENAME_SR830] = SR830.enumerate_devices()
except ImportError:
pass
return devices
def connect_device(type_name: str, device_name: str) -> Lock_In_Amp:
if type_name == TYPENAME_DUMMY:
return DummyLockInAmp()
elif type_name == TYPENAME_SR830:
try:
from .impl.sr830 import SR830
return SR830.connect_device(device_name)
except ImportError as e:
raise ValueError(f"SR830 not available: {e}")
raise ValueError(f"Unknown device type {type_name}")

View File

@ -0,0 +1,34 @@
from abc import ABC, abstractmethod
from typing import Callable
class Lock_In_Amp(ABC):
@abstractmethod
def test_connection(self) -> None:
"""
Verify that the device is still properly connected.
If not, raises ConnectionError
"""
pass
# RUN COMMANDS ON THE DEVICE
@abstractmethod
def run(self, code, verbose=False):
pass
@abstractmethod
def reset(self, verbose=False):
pass
@abstractmethod
def read_value(self) -> tuple[float, float]:
"""
Read a single value
Returns
-------
[timestamp, voltage]
"""
pass
@abstractmethod
def __str__(self):
pass

View File

View File

@ -0,0 +1,134 @@
from ..base import Lock_In_Amp
from typing import Callable
from time import time as now
import numpy as np
class DummyLockInAmp(Lock_In_Amp):
def __init__(self):
super().__init__()
def test_connection(self) -> None:
pass
# RUN COMMANDS ON THE DEVICE
def run(self, code, verbose=False):
pass
def reset(self, verbose=False):
pass
def read_value(self) -> tuple[float, float]:
"""
Read a single value
Returns
-------
[timestamp, voltage]
"""
t = now() - self.t0
v = self.amplitude * np.sin(2 * np.pi * t / self.frequency)
return t, v
def query(self, query):
return "-1"
def snap(self, what="3,4,5,7"):
vals = [-1.0 for _ in what.split(",")]
return vals
def try_recover_from_communication_error(self, original_error):
pass
def check_overloads(self) -> bool | str:
return False
def read_value(self):
"""Read the value of R"""
return float(self.query("OUTP? 3"))
def reset(self):
pass
def test_connection(self):
pass
# REFERENCE
def set_reference(self, reference):
self.reference = reference
def set_frequency_Hz(self, frequency_Hz):
self.frequency = frequency_Hz
def get_frequency_Hz(self) -> float:
return self.frequency
def set_reference_trigger(self, trigger):
self.reference_trigger = trigger
def set_sensitivity_volt(self, volt):
self.sensitivity_volt = volt
def get_sensitivity_volt(self):
return self.sensitivity_volt
def set_time_constant_s(self, dt):
self.time_constant_s = dt
def get_time_constant_s(self):
return self.time_constant_s
def set_filter_slope(self, slope_db_oct):
self.filter_slope = slope_db_oct
def get_filter_slope(self):
return self.filter_slope
def get_wait_time_s(self):
"""
Get the wait time required to reach 99% of the final value.
See Manual 3-21
:return:
"""
return 1.0
def set_sync_filter(self, sync):
self.sync_filter = sync
def get_sync_filter(self):
return self.sync_filter
def set_reserve(self, reserve):
self.reserve = reserve
def get_reserve(self):
return self.reserve
max_length = 16383
def buffer_setup(self, CH1="R", CH2="Theta", length=None, sample_rate=512):
if length is None:
length = self.max_length
self._buffer_length = length
def buffer_start_fill(self):
if self._buffer_length is None: raise RuntimeError(f"Buffer not set up, call buffer_setup() first.")
def buffer_is_done(self) -> bool:
return True
def buffer_get_n_points(self) -> int:
return self._buffer_length
def buffer_get_data(self, CH1=True, CH2=True):
data = []
if CH1:
data.append(np.arange(self._buffer_length))
if CH2:
data.append(np.arange(self._buffer_length))
return data
def auto_gain(self):
pass
def __str__(self):
return "Dummy Lock-In"

View File

@ -1,18 +1,15 @@
import pyvisa import pyvisa
from time import sleep
# import pkg_resources # import pkg_resources
import os
from typing import Callable
from ..base import MeasurementDevice from ..base import Lock_In_Amp
from ...utility.visa import enumerate_devices from prsctrl.util.visa import enumerate_devices
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
import numpy as np import numpy as np
class Model7260(MeasurementDevice): class Model7260(Lock_In_Amp):
""" """
Wrapper class for the Model 7260 DSP Lock-In controlled via pyvisa Wrapper class for the Model 7260 DSP Lock-In controlled via pyvisa
""" """

View File

@ -1,19 +1,16 @@
import pyvisa import pyvisa
import struct # for converting bytes to float import struct # for converting bytes to float
from time import sleep
# import pkg_resources # import pkg_resources
import os
from typing import Callable
import numpy as np import numpy as np
from ..base import MeasurementDevice from ..base import Lock_In_Amp
from ...utility.visa import enumerate_devices from prsctrl.utility.visa import enumerate_devices
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class SR830(MeasurementDevice): class SR830(Lock_In_Amp):
""" """
Wrapper class for the SR830 controlled via pyvisa Wrapper class for the SR830 controlled via pyvisa
""" """
@ -145,14 +142,14 @@ class SR830(MeasurementDevice):
raise RuntimeError(f"Failed to recover from the following exception:\n{original_error}\nThe following exception occurred while querying the device status:\n{e}") raise RuntimeError(f"Failed to recover from the following exception:\n{original_error}\nThe following exception occurred while querying the device status:\n{e}")
log.info(f"Recovered from error") log.info(f"Recovered from error")
def check_overloads(self) -> bool: def check_overloads(self) -> bool|str:
status_lia = int(self.query("LIAS?")) status_lia = int(self.query("LIAS?"))
if status_lia & (1 << 0): # input amplifier overload if status_lia & (1 << 0): # input amplifier overload
return True return "Input Amplifier"
elif status_lia & (1 << 1): # time constant filter overlaid elif status_lia & (1 << 1): # time constant filter overlaid
return True return "Time Constant Filter"
elif status_lia & (1 << 2): # output overload elif status_lia & (1 << 2): # output overload
return True return "Output"
return False return False
def measureTODO(self): pass def measureTODO(self): pass
@ -166,10 +163,29 @@ class SR830(MeasurementDevice):
def test_connection(self): def test_connection(self):
pass pass
# PARAMETERS # REFERENCE
def get_frequency(self) -> float: FMOD = ["External", "Internal"]
def set_reference(self, reference):
if not reference in self.FMOD:
raise ValueError(f"Invalid reference: {reference}. Must be one of {self.FMOD}")
fmod = self.FMOD.index(reference)
self.run(f"FMOD {fmod}")
def set_frequency_Hz(self, frequency_Hz):
self.run(f"FREQ {frequency_Hz}")
def get_frequency_Hz(self) -> float:
return float(self.query("FREQ?")) return float(self.query("FREQ?"))
RSLP = ["Sine", "Rising Edge", "Falling Edge"]
def set_reference_trigger(self, trigger):
if not trigger in self.RSLP:
raise ValueError(f"Invalid trigger: {trigger}. Must be one of {self.RSLP}")
rslp = self.RSLP.index(trigger)
self.run(f"RSLP {rslp}")
# PARAMETERS
SENS = [ SENS = [
2e-9, 5e-9, 10e-9, 20e-9, 50e-9, 100e-9, 200e-9, 500e-9, 2e-9, 5e-9, 10e-9, 20e-9, 50e-9, 100e-9, 200e-9, 500e-9,
1e-6, 2e-6, 5e-6, 10e-6, 20e-6, 50e-6, 100e-6, 200e-6, 500e-6, 1e-6, 2e-6, 5e-6, 10e-6, 20e-6, 50e-6, 100e-6, 200e-6, 500e-6,

View File

@ -0,0 +1,34 @@
from .base import Monochromator
TYPENAME_DUMMY = "Dummy"
TYPENAME_BENTHAM = "Bentham"
try:
from .impl.sr830 import SR830
except ImportError:
pass
from .impl.dummy import DummyMonochromator
def list_devices() -> dict[str,list[str]]:
devices = {
TYPENAME_DUMMY: ["Dummy Monochromator"],
}
try:
from .impl.bentham_tmc300 import TMC300
devices[TYPENAME_BENTHAM] = ["Bentham TMC300"]
except ImportError:
pass
return devices
def connect_device(type_name: str, device_name: str) -> Monochromator:
if type_name == TYPENAME_DUMMY:
return DummyMonochromator()
elif type_name == TYPENAME_BENTHAM:
try:
from .impl.bentham_tmc300 import TMC300
return TMC300()
except ImportError as e:
raise ValueError(f"TMC300 not available: {e}")
raise ValueError(f"Unknown device type {type_name}")

View File

@ -0,0 +1,16 @@
from abc import ABC, abstractmethod
class Monochromator:
@abstractmethod
def reset(self):
pass
@abstractmethod
def set_wavelength_nm(self, wavelength_nm):
pass
@abstractmethod
def get_wavelength_nm(self):
pass
@abstractmethod
def __repr__(self):
pass

View File

@ -0,0 +1,59 @@
# proprietary package :/
import pyBen
from os import path
import logging
log = logging.getLogger(__name__)
from ..base import Monochromator
class TMC300(Monochromator):
"""
Controls the Bentham TMC300 monochromator.
This is the source of the probe light beam
"""
def __init__(self):
self.path_cfg = path.abspath('system.cfg')
self.path_atr = path.abspath('systemAll-1nm.atr')
self.parked = -1
self.wavelength_nm = -1
def return_parameters(self):
"""
Returns parameters relevant for this device.
New parameters to be saved can be added here
"""
parameters = {'bentham cfg_path': self.path_cfg,
'bentham atr_path': self.path_atr,
'bentham wavelength': self.wavelength}
return parameters
def reset(self):
"""
Parks the TMC300. Important to do at least once after new lamp was started
"""
pyBen.build_system_model(self.path_cfg)
pyBen.load_setup(self.path_atr)
pyBen.initialise()
pyBen.park()
log.info('TMC300 initialized and parked')
self.parked = 1
self.wavelength_nm = -1
def set_wavelength_nm(self, wavelength_nm):
"""
Changes the probe wavelength
"""
pyBen.build_system_model(self.path_cfg)
pyBen.load_setup(self.path_atr)
pyBen.initialise()
pyBen.select_wavelength(wavelength_nm, 0)
log.info('TMC300 wavelength set to: %0.1f' % wavelength_nm + 'nm')
self.wavelength = wavelength_nm
self.parked = 0
def get_wavelength_nm(self):
return self.wavelength
def __repr__(self):
return 'TMC300'

View File

@ -0,0 +1,23 @@
import logging
log = logging.getLogger(__name__)
from ..base import Monochromator
class DummyMonochromator(Monochromator):
def __init__(self):
super().__init__()
self.wavelength_nm = -1
def reset(self):
log.info("Dummy-Monochromator reset")
self.wavelength_nm = -1
def set_wavelength_nm(self, wavelength_nm):
log.info("Dummy-Monochromator set to {wl} nm")
self.wavelength_nm = wavelength_nm
def get_wavelength_nm(self):
return self.wavelength_nm
def __repr__(self):
return "Dummy-Monochromator"

View File

@ -0,0 +1,34 @@
from .base import Shutter
TYPENAME_DUMMY = "Dummy"
TYPENAME_DAQ = "DAQ"
try:
from .impl.sr830 import SR830
except ImportError:
pass
from .impl.dummy import DummyShutter
def list_devices() -> dict[str,list[str]]:
devices = {
TYPENAME_DUMMY: ["Dummy Shutter"],
}
try:
from .impl.daq import ShutterDAQ
devices[TYPENAME_DAQ] = ShutterDAQ.enumerate_devices()
except ImportError:
pass
return devices
def connect_device(type_name: str, device_name: str) -> Shutter:
if type_name == TYPENAME_DUMMY:
return DummyShutter()
elif type_name == TYPENAME_DAQ:
try:
from .impl.daq import ShutterDAQ
return ShutterDAQ.connect_device(device_name)
except ImportError as e:
raise ValueError(f"ShutterDAQ not available: {e}")
raise ValueError(f"Unknown device type {type_name}")

View File

@ -0,0 +1,15 @@
from abc import ABC, abstractmethod
class Shutter:
@abstractmethod
def close(self):
pass
@abstractmethod
def open(self):
pass
@abstractmethod
def is_open(self) -> bool:
pass
@abstractmethod
def __repr__(self):
pass

View File

View File

@ -0,0 +1,41 @@
from pylablib.devices import NI
import logging
log = logging.getLogger(__name__)
from ..base import Shutter
class ShutterDAQ(Shutter):
def __init__(self, channel):
self.daq_name = 'Dev1'
self.channel = channel
def open(self):
with NI.NIDAQ(self.daq_name) as shutter:
shutter.add_voltage_output('vout', self.channel, (0,5))
shutter.set_voltage_outputs('vout', 5)
log.info('Shutter opened')
def close(self):
with NI.NIDAQ(self.daq_name) as shutter:
shutter.add_voltage_output('vout', self.channel, (0,5))
shutter.set_voltage_outputs('vout', 0)
log.info('Shutter closed')
def __repr__(self):
return f'ShutterDAQ({self.daq_name}/{self.channel})'
# name: channel
shutters = {
"TAS Lamp Shutter": "ao0",
"TAS Pump Shutter": "ao1",
}
@staticmethod
def enumerate_devices():
return list(ShutterDAQ.shutters.keys())
@staticmethod
def connect_device(name):
if name not in ShutterDAQ.shutters:
raise ValueError(f"Unknown shutter {name}. Must be one of {ShutterDAQ.shutters}")
return ShutterDAQ(ShutterDAQ.shutters[name])

View File

@ -0,0 +1,14 @@
import logging
log = logging.getLogger(__name__)
from ..base import Shutter
class DummyShutter(Shutter):
def __init__(self):
super().__init__()
def open(self):
log.info("Dummy-Shutter open")
def close(self):
log.info("Dummy-Shutter close")
def __repr__(self):
return "Dummy-Shutter"

186
prsctrl/measurement.py Normal file
View File

@ -0,0 +1,186 @@
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 24 15:18:31 2025
@author: Matthias Quintern
"""
from .measurement_device.base import VoltageMeasurementDevice
from .led_control_device.base import LedControlDevice
from .led_script import LedScript
from .utility.prsdata import DataCollector
import time
import datetime
from queue import Queue
import logging
log = logging.getLogger(__name__)
def measure(
vm_dev: VoltageMeasurementDevice,
led_dev: LedControlDevice,
led_script: LedScript,
data: DataCollector,
delta_t: float=0.1,
flush_after:int|None=None,
use_buffer=False,
max_measurements: int=None,
stop_on_script_end: bool=False,
verbose: bool=False,
command_queue: None|Queue=None,
data_queue: None|Queue=None,
add_measurement_info_to_metadata=True
):
"""
Perform a measurement
Parameters
----------
vm_dev : VoltageMeasurementDevice
DESCRIPTION.
led_dev : LedControlDevice
DESCRIPTION.
led_script : LedScript
DESCRIPTION.
data : DataCollector
DESCRIPTION.
delta_t : float, optional
Target interval between measurements and led updates. The default is 0.1.
flush_after : int|None, optional
If int, flush values to disk after <flush_after>. The default is None.
use_buffer : TYPE, optional
If True, use the buffer measurement mode. The default is False.
max_measurements : int, optional
Number of measurements to perform before returning.
Note: If use_buffer=True, a few more than max_measurements might be performed
The default is None.
stop_on_script_end : bool, optional
Stop when the script end is reached.
verbose : bool, optional
If True, print some messages. The default is False.
command_queue : None|Connection, optional
A queue to receive to commands from.
Commands may be:
"stop" -> stops the measurement
("led_script", <LedScript object>) a new led script to use
The default is None.
data_queue : None|Queue, optional
A queue to put data in. The default is None.
add_measurement_info_to_metadata : bool, optional
If True, add measurement info to the metadata:
time, measurement_interval, measurement_use_buffer, measurement_voltage_device, measurement_led_device
The default is True.
Returns
-------
None.
"""
get_time = lambda: datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
if add_measurement_info_to_metadata:
data.metadata["measurement_interval"] = str(delta_t) + " s"
data.metadata["measurement_use_buffer"] = str(use_buffer)
data.metadata["measurement_voltage_measurement_device"] = str(vm_dev)
data.metadata["measurement_led_control_device"] = str(led_dev)
led_name = led_dev.get_led_name()
if led_name: data.metadata["measurement_led_lamp"] = led_name
data.metadata["measurement_time_start"] = get_time()
# write metadata to disk
data.write_metadata()
vm_dev.reset(True)
if use_buffer:
vm_dev.buffer_measure(delta_t, verbose=True)
# allow 0 instead of None
if max_measurements == 0: max_measurements = None
if flush_after == 0: flush_after = None
try:
i = 0
led_val = led_script.start()
try:
led_dev.set_level(led_val)
except Exception as e:
log.error(f"Error setting led to {led_val:03}%: {e}")
raise e
t_iter_start = time.time()
while True:
# using while True and if, to be able to log the stop reason
if max_measurements is not None and i >= max_measurements:
log.info(f"Reached maximum number of measurements ({i}{max_measurements}), stopping measurement")
break
# 1) read value(s)
if use_buffer:
try:
values = vm_dev.buffer_read_new_values()
except ValueError as e:
# print(f"Error in buffer measurement {i}:", e)
values = []
else:
values = [vm_dev.read_value()]
# print(values)
# 2) process value(s)
for (tval, vval) in values:
if i == 0:
t0 = tval
tval -= t0
current_data = (i, tval, vval, led_val)
data.add_data(*current_data)
# 3) write data
if verbose: print(f"n = {i:6d}, t = {tval: .2f} s, U = {vval: .5f} V, LED = {led_val:03}%" + " "*10, end='\r')
if flush_after is not None and (i+1) % flush_after == 0:
data.flush(verbose=verbose)
# if a queue was given, put the data
if data_queue is not None:
data_queue.put(current_data)
i += 1
# if a pipe was given, check for messages
if command_queue is not None and command_queue.qsize() > 0:
recv = command_queue.get(block=False)
if recv == "stop":
log.info(f"Received 'stop', stopping measurement")
break
elif type(recv) == tuple and recv[0] == "led_script":
log.info(f"Received 'led_script', replacing script")
led_script = recv[1]
elif type(recv) == tuple and recv[0] == "metadata":
log.info(f"Received 'metadata', updating metadata")
data.metadata |= recv[1]
data.write_metadata()
else:
log.error(f"Received invalid message: '{recv}'")
# 4) sleep
# subtract the execution time from the sleep time for a more
# accurate frequency
dt_sleep = delta_t - (time.time() - t_iter_start)
if dt_sleep > 0:
# print(f"Sleeping for {dt_sleep}")
time.sleep(dt_sleep)
t_iter_start = time.time()
# 5) update LED
if stop_on_script_end and led_script.is_done(t_iter_start):
log.info("Reached led script end, stopping measurement")
break
new_led_val = led_script.get_state(t_iter_start)
if new_led_val != led_val:
try:
led_dev.set_level(new_led_val)
led_val = new_led_val
except Exception as e:
log.error(f"Error setting led to {new_led_val:03}%: {e}")
raise e
except KeyboardInterrupt:
log.info("Keyboard interrupt, stopping measurement")
except Exception as e:
log.critical(f"Unexpected error, stopping measurement. Error: {e}")
if command_queue is not None:
command_queue.put(("exception", e))
if add_measurement_info_to_metadata:
data.metadata["measurement_time_stop"] = get_time()
# Write again after having updated the stop time
data.write_metadata()
data.flush()
led_dev.off()

View File

@ -1,27 +1,16 @@
""" """
run this before using this library: run this before using this library:
ipython -i cpdctrl_interactive.py ipython -i prctrl_interactive.py
always records iv-t curves
i-data -> smua.nvbuffer1
v-data -> smua.nvbuffer2
""" """
version = "0.1" version = "0.1"
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime as dtime from datetime import datetime as dtime
from sys import exit
from time import sleep
from os import path, makedirs from os import path, makedirs
import pickle as pkl
import json
import atexit
import threading as mt import threading as mt
import multiprocessing as mp import multiprocessing as mp
# from multiprocessing.managers import BaseManager
import argparse import argparse
@ -29,27 +18,28 @@ if __name__ == "__main__":
import sys import sys
if __package__ is None: if __package__ is None:
# make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change # make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change
__package__ = "cpdctrl" __package__ = "prsctrl"
from os import path from os import path
filepath = path.realpath(path.abspath(__file__)) filepath = path.realpath(path.abspath(__file__))
sys.path.insert(0, path.dirname(path.dirname(filepath))) sys.path.insert(0, path.dirname(path.dirname(filepath)))
from . import led_control_device # import device modules
from . import voltage_measurement_device from .devices import shutter as mod_shutter
from .voltage_measurement_device.base import VoltageMeasurementDevice from .devices import lock_in as mod_lock_in
from .voltage_measurement_device.impl import keithley2700 as _volt from .devices import lamp as mod_lamp
from .led_control_device.base import LedControlDevice from .devices import monochromator as mod_monochromator
from .led_control_device.impl import thorlabs_ledd1b as _led # import base classes
from .led_script import LedScript from .devices.lock_in import Lock_In_Amp
from .devices.shutter import Shutter
from .devices.lamp import Lamp
from .devices.monochromator import Monochromator
from .measurement import measure as _measure # from .measurement import measure as _measure
from .utility.data import DataCollector from .utility.data_collector import PrsDataCollector
from .utility.data import plot_cpd_data as data_plot
from .utility.config_file import ConfigFile from .utility.config_file import ConfigFile
from .utility import file_io from .utility.device_select import select_device_interactive, connect_device_from_config_or_interactive
from .utility.device_select import select_device_interactive from .update_funcs import Monitor
from .update_funcs import _Monitor, _update_print
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -60,15 +50,13 @@ _runtime_vars = {
} }
# defaults, these may be overridden by a config file # defaults, these may be overridden by a config file
settings = { settings = {
"datadir": path.expanduser("~/data"), "datadir": path.expanduser("~/Desktop/PR/data"),
"name": "interactive-test", "name": "interactive-test",
"led": "unknown",
"interval": 0.5,
"flush_after": 3000, "flush_after": 3000,
"use_buffer": False, "use_buffer": False,
} }
cfilename: str = "cpdctrl.yaml" cfilename: str = "photoreflectance.yaml"
config_path: str = "" config_path: str = ""
config_file: ConfigFile = ConfigFile("") config_file: ConfigFile = ConfigFile("")
@ -76,13 +64,20 @@ test = False
# DEVICES # DEVICES
# global variable for the instrument/client returned by pyvisa/bleak # global variable for the instrument/client returned by pyvisa/bleak
dev: VoltageMeasurementDevice|None = None lockin: Lock_In_Amp|None = None
led: LedControlDevice|None = None shutter: Shutter|None = None
data_collector = DataCollector(data_path=settings["datadir"], data_name="interactive", dirname="interactive_test", add_number_if_dir_exists=True) lamp: Lamp|None = None
mcm: Monochromator|None = None
data_collector = PrsDataCollector(data_path=settings["datadir"], data_name="interactive", dirname="interactive_test", add_number_if_dir_exists=True)
t0 = 0 t0 = 0
data = None data = None
md = None md = None
from .test_measurement import _measure_both_sim
def measure_both_sim(**kwargs):
return _measure_both_sim(mcm, lockin, shutter, **kwargs)
def monitor(script: str|int=0, interval: float|None=None, metadata:dict={}, flush_after: int|None=None, use_buffer: bool|None=None, max_measurements=None, stop_on_script_end: bool=False, max_points_shown=None): def monitor(script: str|int=0, interval: float|None=None, metadata:dict={}, flush_after: int|None=None, use_buffer: bool|None=None, max_measurements=None, stop_on_script_end: bool=False, max_points_shown=None):
""" """
Monitor the voltage with matplotlib. Monitor the voltage with matplotlib.
@ -131,9 +126,9 @@ def monitor(script: str|int=0, interval: float|None=None, metadata:dict={}, flus
metadata["led_script"] = str(script) metadata["led_script"] = str(script)
print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'data.save_csv()' afterwards.") print(f"Starting measurement with:\n\tinterval = {interval}s\nUse <C-c> to stop. Save the data using 'data.save_csv()' afterwards.")
plt.ion() plt.ion()
plt_monitor = _Monitor(use_print=False, max_points_shown=max_points_shown) plt_monitor = Monitor(max_points_shown=max_points_shown)
led_script = LedScript(script=script, auto_update=True, verbose=True) led_script = LedScript(script=script, auto_update=True, verbose=True)
data_collector = DataCollector(metadata=metadata, data_path=settings["datadir"], data_name=settings["name"]) data_collector = PrsDataCollector(metadata=metadata, data_path=settings["datadir"], data_name=settings["name"])
# data_collector.clear() # data_collector.clear()
data_queue = mp.Queue() data_queue = mp.Queue()
command_queue = mp.Queue() command_queue = mp.Queue()
@ -190,7 +185,7 @@ def data_load(dirname:str) -> tuple[np.ndarray, dict]:
dirpath = dirname dirpath = dirname
else: else:
dirpath = path.join(settings["datadir"], dirname) dirpath = path.join(settings["datadir"], dirname)
data, md = DataCollector.load_data_from_dir(dirpath, verbose=True) data, md = PrsDataCollector.load_data_from_dir(dirpath, verbose=True)
# SETTINGS # SETTINGS
def set(setting, value): def set(setting, value):
@ -208,7 +203,8 @@ def name(s:str):
def save_settings(): def save_settings():
global settings global settings
config_file.set_values(settings) for k, v in settings.items():
config_file.set(k, v)
config_file.save() config_file.save()
def load_settings(): def load_settings():
@ -242,7 +238,7 @@ Run 'help("topic")' to see more information on a topic""")
Functions: Functions:
name("<name>") - short for set("name", "<name>") name("<name>") - short for set("name", "<name>")
set("setting", value) - set a setting to a value set("setting", value) - set a setting to a value
save_settings() - store the settings as "cpdctrl.json" in the working directory save_settings() - store the settings in the config file
load_settings() - load settings from a file load_settings() - load settings from a file
Upon startup, settings are loaded from the config file. Upon startup, settings are loaded from the config file.
@ -250,7 +246,7 @@ The global variable 'config_path' determines the path used by save/load_settings
The search path is: The search path is:
<working-dir>/{cfilename} <working-dir>/{cfilename}
$XDG_CONFIG_HOME/{cfilename} $XDG_CONFIG_HOME/{cfilename}
~/.config/cpdctrl/{cfilename} ~/.config/photoreflectance/{cfilename}
The current file path is: The current file path is:
{config_path} {config_path}
@ -265,20 +261,27 @@ The current file path is:
print(topic.__doc__.strip(" ").strip("\n")) print(topic.__doc__.strip(" ").strip("\n"))
def connect_devices():
global lockin, shutter, lamp, mcm
lockin = mod_lock_in.connect_device(*select_device_interactive(mod_lock_in.list_devices(), "Select Lock-In-Amplifier: "))
shutter = mod_shutter.connect_device(*select_device_interactive(mod_shutter.list_devices(), "Select Shutter: "))
lamp = mod_lamp.connect_device(*select_device_interactive(mod_lamp.list_devices(), "Select Lamp: "))
mcm = mod_monochromator.connect_device(*select_device_interactive(mod_monochromator.list_devices(), "Select Monochromator: "))
def init(): def init():
global dev, led, settings, config_path, config_file global lockin, shutter, lamp, mcm, settings, config_path, config_file
print(r""" .___ __ .__ print(r""" __ .__
____ ______ __| _/_____/ |________| | _____________ ______ _____/ |________| |
_/ ___\\____ \ / __ |/ ___\ __\_ __ \ | \____ \_ __ \/ ___// ___\ __\_ __ \ |
\ \___| |_> > /_/ \ \___| | | | \/ |__ | |_> > | \/\___ \\ \___| | | | \/ |__
\___ > __/\____ |\___ >__| |__| |____/ | __/|__| /____ >\___ >__| |__| |____/
\/|__| \/ \/ """ + f"""{version} |__| \/ \/ """ + f"""{version}
Interactive Shell for CPD measurements with Keithley 2700B Interactive Shell for Photoreflectance measurements
--- ---
Enter 'help()' for a list of commands""") Enter 'help()' for a list of commands""")
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog="cpdctrl", prog="prsctrl",
description="measure voltage using a Keithley SMU", description="measure photoreflectance",
) )
backend_group = parser.add_mutually_exclusive_group(required=False) backend_group = parser.add_mutually_exclusive_group(required=False)
parser.add_argument("-c", "--config", action="store", help="alternate path to config file") parser.add_argument("-c", "--config", action="store", help="alternate path to config file")
@ -289,10 +292,9 @@ Enter 'help()' for a list of commands""")
if path.isfile(cfilename): if path.isfile(cfilename):
config_path = cfilename config_path = cfilename
elif 'XDG_CONFIG_HOME' in environ.keys(): elif 'XDG_CONFIG_HOME' in environ.keys():
# and path.isfile(environ["XDG_CONFIG_HOME"] + "/cpdctrl.json"): config_path = path.join(environ["XDG_CONFIG_HOME"], "prsctrl", cfilename)
config_path = path.join(environ["XDG_CONFIG_HOME"], "cpdctrl", cfilename)
else: else:
config_path = path.join(path.expanduser("~/.config/cpdctrl"), cfilename) config_path = path.join(path.expanduser("~/.config/prsctrl"), cfilename)
if args["config"]: if args["config"]:
config_path = args["config"] config_path = args["config"]
@ -300,7 +302,7 @@ Enter 'help()' for a list of commands""")
load_settings() load_settings()
# setup logging # setup logging
log_path = path.expanduser(config_file.get_or("path_log", "~/.cache/cpdctrl-interactive.log")) log_path = path.expanduser(config_file.get_or("path_log", "~/.cache/prsctrl-interactive.log"))
makedirs(path.dirname(log_path), exist_ok=True) makedirs(path.dirname(log_path), exist_ok=True)
logging.basicConfig( logging.basicConfig(
level=logging.WARN, level=logging.WARN,
@ -315,42 +317,10 @@ Enter 'help()' for a list of commands""")
makedirs(settings["datadir"]) makedirs(settings["datadir"])
# init the devices # init the devices
last_vm_type = config_file.get_or("last_dev_vm_type", None) shutter = connect_device_from_config_or_interactive(config_file, "shutter", "Shutter", mod_shutter, log=log)
last_vm_name = config_file.get_or("last_dev_vm_name", None) lockin = connect_device_from_config_or_interactive(config_file, "lock-in", "Lock-In Amplifier", mod_lock_in, log=log)
if last_vm_name and last_vm_type: lamp = connect_device_from_config_or_interactive(config_file, "lamp", "Lamp", mod_lamp, log=log)
try: mcm = connect_device_from_config_or_interactive(config_file, "monochromator", "Monochromator", mod_monochromator, log=log)
dev = voltage_measurement_device.connect_device(last_vm_type, last_vm_name)
except:
log.error(f"Failed to connect to last used device {last_vm_type}::{last_vm_name}")
while dev is None:
devs = voltage_measurement_device.list_devices()
print("-" * 50)
vm_dev_type, vm_dev_name = select_device_interactive(devs, "Select voltage measurement device: ")
try:
dev = voltage_measurement_device.connect_device(vm_dev_type, vm_dev_name)
except:
log.error(f"Failed to connect to device {vm_dev_type}::{vm_dev_name}")
config_file.set("last_dev_vm_type", vm_dev_type)
config_file.set("last_dev_vm_name", vm_dev_name)
# init the devices
last_led_type = config_file.get_or("last_dev_led_type", None)
last_led_name = config_file.get_or("last_dev_led_name", None)
if last_led_name and last_led_type:
try:
led = led_control_device.connect_device(last_led_type, last_led_name)
except:
log.error(f"Failed to connect to last used device {last_led_type}::{last_led_name}")
while led is None:
devs = led_control_device.list_devices()
print("-" * 50)
led_dev_type, led_dev_name = select_device_interactive(devs, "Select LED control device: ")
try:
led = led_control_device.connect_device(led_dev_type, led_dev_name)
except:
log.error(f"Failed to connect to device {led_dev_type}::{led_dev_name}")
config_file.set("last_dev_led_type", led_dev_type)
config_file.set("last_dev_led_name", led_dev_name)
# atexit.register(_backend.exit, dev) # atexit.register(_backend.exit, dev)

View File

@ -0,0 +1,211 @@
import pyvisa
if __name__ == "__main__":
import sys
if __package__ is None:
# make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change
__package__ = "photoreflectance"
from os import path
filepath = path.realpath(path.abspath(__file__))
sys.path.insert(0, 'C:\\Users\Administrator\Desktop\Software\Python\Python\github')
from time import sleep, time as now
import numpy as np
import matplotlib.pyplot as plt
from Bentham import Bentham
from prsctrl.devices.lamp.impl.xenon import Xenon
from prsctrl.devices.shutter import ShutterProbe
from .update_funcs import Monitor
from prsctrl.devices.lock_in.impl.sr830 import SR830
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
handlers=[
# logging.FileHandler(log_path),
logging.StreamHandler()
]
)
log = logging.getLogger(__name__)
def set_measurement_params(lockin: SR830, p: dict={}, **kwargs):
params = p | kwargs
key_to_setter = {
"time_constant_s": lockin.set_time_constant_s,
"filter_slope": lockin.set_filter_slope,
"sync_filter": lockin.set_sync_filter,
"reserve": lockin.set_reserve,
"sensitivity_volt": lockin.set_sensitivity_volt,
}
for k, v in params.items():
if k not in key_to_setter.keys():
raise KeyError(f"Invalid parameter {k}")
key_to_setter[k](v)
def set_offset_laser_only(lockin: SR830, shutter: ShutterProbe, wait_time_s):
"""
Set the R offset from the signal when only the laser is on.
This signal should be stray laser light and laser induced PL
:return: Offset as percentage of the full scale R
"""
log.info("Setting offset when the lamp is off.")
shutter.close_()
sleep(wait_time_s + 10)
lockin.run("AOFF 3") # auto offset R
R_offset_fs = float(lockin.query("OEXP? 3").split(",")[0]) # returns R offset and expand
return R_offset_fs
def _measure_both_sim(monochromator: Bentham, lockin: SR830, shutter: ShutterProbe, wl=550, aux_DC="Aux In 4", monitor=None):
data = {}
lockin_params = {
"time_constant_s": 10,
# "time_constant_s": 100e-3,
"sensitivity_volt": 50e-6,
"filter_slope": 12,
"sync_filter": 1,
"reserve": "Normal",
}
measurement_params = {
"measurement_time_s": 30,
"sample_rate_Hz": 512,
}
set_measurement_params(lockin, lockin_params)
measurement_time_s = measurement_params["measurement_time_s"]
sample_rate_AC = measurement_params["sample_rate_Hz"]
n_bins_AC = measurement_time_s * sample_rate_AC # x sec messen mit <sample_rate> werte pro sekunde
timeout_s = 60
timeout_interval = 0.5
# trigger on the falling edge, since the light comes through when the ref signal is low
# could of course also trigger on rising and apply 180° shift
lockin.run("RSLP 2")
# since we dont expect changes in our signal, we can use larger time constants and aggressive filter slope
# for better signal to noise
def run_lockin_cmd(cmd, n_try=2):
com_success = n_try
e = None
while com_success > 0:
try:
return cmd()
except pyvisa.VisaIOError as e:
lockin.try_recover_from_communication_error(e)
com_success -= 1
raise e
# 5s for setting buffer,
# 5s for get values and plot
input("Make sure the laser is turned off and press enter > ")
mon = monitor if monitor is not None else Monitor(r"$t$ [s]", [
dict(ax=0, ylabel=r"$R [V]$", color="green"),
])
data["lock-in-params"] = lockin_params
data["measurement-params"] = measurement_params
N_runs = 60
shutter.open_()
monochromator.drive(wl)
sleep(10)
t0 = now()
j = 0
data['R'] = []
data['t'] = []
for i in range(N_runs):
mon.set_ax_title(f"{i+1}/{N_runs}")
run_lockin_cmd(lambda: lockin.buffer_setup(CH1="R", CH2=aux_DC, length=n_bins_AC, sample_rate=sample_rate_AC))
ti = now() - t0
run_lockin_cmd(lambda: lockin.buffer_start_fill())
t = timeout_s
while t > 0:
t -= timeout_interval
sleep(timeout_interval)
if run_lockin_cmd(lambda: lockin.buffer_is_done()):
break
if t < 0: raise RuntimeError("Timed out waiting for buffer measurement to finish")
R = run_lockin_cmd(lambda: lockin.buffer_get_data(CH1=False, CH2=True))[0]
data['R'].append(R)
tdata = np.arange(n_bins_AC) * 1/sample_rate_AC * measurement_time_s + ti
data['t'].append(tdata)
mon.update_array(tdata, R)
mon.set_fig_title("Background")
mon.set_ax_title("")
return data, mon
lockin = None
lamp = None
mcm = None
shutter = None
def measure_both_sim(**kwargs):
return _measure_both_sim(mcm, lockin, shutter, **kwargs)
def _load_process():
import os; import pickle
with open(os.path.expanduser("~/Desktop/PR/2025-05-07_Xenon_1.pkl"), "rb") as file:
data = pickle.load(file)
process_data(data['R'], data['t'])
def process_data(data_R: list[np.ndarray], data_t: list[np.ndarray]) -> np.ndarray:
assert len(data_R) == len(data_t)
n_points_2 = 512 * 5
n_points_averaged = data_R[0].shape[0]-2*n_points_2
l = len(data_R) * n_points_averaged
# Rs = np.empty(l, dtype=float)
# ts = np.empty(l, dtype=float)
Rs = []
ts = []
fig, ax = plt.subplots()
ax.set_xlabel("$t$ [s]")
ax.set_ylabel(r"$R [V]$")
for i in range(len(data_R)):
Rs_i = moving_average(data_R[i], n_points_2)
new_t_data = np.arange(data_R[i].shape[0], dtype=int) /512 + (i * 30)
ts_i = new_t_data[n_points_2:-n_points_2]
# ts_i = data_t[i][n_points_2:-n_points_2]
# Rs[i*n_points_averaged:(i+1)*n_points_averaged] = Rs_i
# ts[i*n_points_averaged:(i+1)*n_points_averaged] = ts_i
Rs.append(Rs_i)
ts.append(ts_i)
ax.plot(ts_i, Rs_i, color="blue")
plt.show()
return fig
def moving_average(a: np.ndarray, n_points_2=2):
"""
:param a:
:param n_points_2: Number of points to take to the left and right of the averaged point, leading to 2*n_points+1 per value
:return:
"""
l = a.shape[0]
lret = l-2*n_points_2
ret = np.empty(lret)
for i in range(lret):
around_i = n_points_2 + i
begin = around_i - n_points_2
end = around_i + n_points_2 + 1
ret[i] = np.sum(a[begin:end]) / (2*n_points_2 + 1)
# print(i, begin, end, ret[i])
return ret
if __name__ == "__main__":
mcm = Bentham()
shutter = ShutterProbe()
# mcm.park()
lamp = Xenon()
#lockin = SR830.connect_device(SR830.enumerate_devices()[0])
# lockin = Model7260.connect_device(Model7260.enumerate_devices()[0])
# mcm = DummyBentham()
# shutter = DummyShutter()

View File

@ -1,5 +1,3 @@
import pyvisa
if __name__ == "__main__": if __name__ == "__main__":
import sys import sys
if __package__ is None: if __package__ is None:
@ -10,18 +8,20 @@ if __name__ == "__main__":
sys.path.insert(0, 'C:\\Users\Administrator\Desktop\Software\Python\Python\github') sys.path.insert(0, 'C:\\Users\Administrator\Desktop\Software\Python\Python\github')
from time import sleep from time import sleep
import pyvisa
import numpy as np import numpy as np
import scipy as scp import scipy as scp
from Bentham import Bentham, DummyBentham
from devices.Xenon import Xenon from Bentham import Bentham
from devices.Shutter import ShutterProbe, DummyShutter from prsctrl.devices.lamp.impl.xenon import Xenon
from prsctrl.devices.shutter import Shutter
from prsctrl.devices.lamp.impl.xenon import Xenon
from .update_funcs import Monitor from .update_funcs import Monitor
from .measurement_device.impl.sr830 import SR830 from prsctrl.devices.lock_in.impl.sr830 import SR830
from .measurement_device.impl.model7260 import Model7260 from prsctrl.devices.lock_in.impl.model7260 import Model7260
import logging
import logging
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s", format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
@ -47,21 +47,21 @@ def set_measurement_params(lockin: SR830, p: dict={}, **kwargs):
key_to_setter[k](v) key_to_setter[k](v)
def set_offset_laser_only(lockin: SR830, shutter: ShutterProbe, wait_time_s): def set_offset_laser_only(lockin: SR830, shutter: Shutter, wait_time_s):
""" """
Set the R offset from the signal when only the laser is on. Set the R offset from the signal when only the laser is on.
This signal should be stray laser light and laser induced PL This signal should be stray laser light and laser induced PL
:return: Offset as percentage of the full scale R :return: Offset as percentage of the full scale R
""" """
log.info("Setting offset when the lamp is off.") log.info("Setting offset when the lamp is off.")
shutter.close_() shutter.close()
sleep(wait_time_s + 10) sleep(wait_time_s + 10)
lockin.run("AOFF 3") # auto offset R lockin.run("AOFF 3") # auto offset R
R_offset_fs = float(lockin.query("OEXP? 3").split(",")[0]) # returns R offset and expand R_offset_fs = float(lockin.query("OEXP? 3").split(",")[0]) # returns R offset and expand
return R_offset_fs return R_offset_fs
def _measure_both_sim(monochromator: Bentham, lockin: SR830, shutter: ShutterProbe, wl_range=(400, 750, 25), aux_DC="Aux In 4", offset_with_laser_only=True, monitor=None): def _measure_both_sim(monochromator: Bentham, lockin: SR830, shutter: Shutter, wl_range=(400, 750, 25), aux_DC="Aux In 4", offset_with_laser_only=True, monitor=None):
data = {} data = {}
lockin_params = { lockin_params = {
"time_constant_s": 10, "time_constant_s": 10,
@ -125,7 +125,7 @@ def _measure_both_sim(monochromator: Bentham, lockin: SR830, shutter: ShutterPro
print(f"R_offset_volt_before {R_offset_volt}") print(f"R_offset_volt_before {R_offset_volt}")
data["reference_freq_Hz_before"] = lockin.get_frequency() data["reference_freq_Hz_before"] = lockin.get_frequency()
shutter.open_() shutter.open()
for i_wl, wl in enumerate(range(*wl_range)): for i_wl, wl in enumerate(range(*wl_range)):
mon.set_ax_title(f"$\\lambda = {wl}$ nm") mon.set_ax_title(f"$\\lambda = {wl}$ nm")
run_lockin_cmd(lambda: lockin.buffer_setup(CH1="R", CH2=aux_DC, length=n_bins_AC, sample_rate=sample_rate_AC)) run_lockin_cmd(lambda: lockin.buffer_setup(CH1="R", CH2=aux_DC, length=n_bins_AC, sample_rate=sample_rate_AC))
@ -143,6 +143,7 @@ def _measure_both_sim(monochromator: Bentham, lockin: SR830, shutter: ShutterPro
if run_lockin_cmd(lambda: lockin.buffer_is_done()): if run_lockin_cmd(lambda: lockin.buffer_is_done()):
break break
if t < 0: raise RuntimeError("Timed out waiting for buffer measurement to finish") if t < 0: raise RuntimeError("Timed out waiting for buffer measurement to finish")
# ToDo Phase messen
arr = run_lockin_cmd(lambda: lockin.buffer_get_data(CH1=True, CH2=True)) arr = run_lockin_cmd(lambda: lockin.buffer_get_data(CH1=True, CH2=True))
data[wl] = {} data[wl] = {}
data[wl]["raw"] = arr data[wl]["raw"] = arr
@ -151,7 +152,7 @@ def _measure_both_sim(monochromator: Bentham, lockin: SR830, shutter: ShutterPro
errs = np.std(arr, axis=1) errs = np.std(arr, axis=1)
dR = means[0] dR = means[0]
R = means[1] R = means[1]
sdR = errs[1] sdR = errs[0]
sR = errs[1] sR = errs[1]
data[wl] |= {"dR": dR, "sdR": sdR, "R": R, "sR": sR} data[wl] |= {"dR": dR, "sdR": sdR, "R": R, "sR": sR}
dR_R = dR / R dR_R = dR / R
@ -172,7 +173,7 @@ def _measure_both_sim(monochromator: Bentham, lockin: SR830, shutter: ShutterPro
mon.set_ax_title("") mon.set_ax_title("")
return data, mon return data, mon
def _measure_both(monochromator: Bentham, lockin: SR830, shutter: ShutterProbe, wl_range=(400, 750, 25), AC=True, DC=True, monitor=None): def _measure_both(monochromator: Bentham, lockin: SR830, shutter: Shutter, wl_range=(400, 750, 25), AC=True, DC=True, monitor=None):
mon = monitor if monitor is not None else Monitor(r"$\lambda$ [nm]", [ mon = monitor if monitor is not None else Monitor(r"$\lambda$ [nm]", [
# dict(ax=0, ylabel="Wavelength [nm]", color="red"), # dict(ax=0, ylabel="Wavelength [nm]", color="red"),
# dict(ax=1, ylabel="Ref", color="blue", lim=(0, 5)), # dict(ax=1, ylabel="Ref", color="blue", lim=(0, 5)),
@ -185,7 +186,7 @@ def _measure_both(monochromator: Bentham, lockin: SR830, shutter: ShutterProbe,
dict(ax=5, ylabel=r"$\theta$", color="orange", lim=(-180, 180)), dict(ax=5, ylabel=r"$\theta$", color="orange", lim=(-180, 180)),
dict(ax=6, ylabel=r"$\sigma_\theta$", color="orange") dict(ax=6, ylabel=r"$\sigma_\theta$", color="orange")
]) ])
shutter.open_() shutter.open()
data_raw = [] data_raw = []
data_wl = {} data_wl = {}
# TODO these are only printed, not set! # TODO these are only printed, not set!
@ -290,7 +291,7 @@ def _measure_both(monochromator: Bentham, lockin: SR830, shutter: ShutterProbe,
return data_wl, data_raw, mon return data_wl, data_raw, mon
def _measure(monochromator: Bentham, lamp: Xenon, lockin: SR830, shutter: ShutterProbe, wl_range=(400, 750, 25)): def _measure(monochromator: Bentham, lamp: Xenon, lockin: SR830, shutter: Shutter, wl_range=(400, 750, 25)):
data = [] data = []
mon = Monitor(r"$\lambda$ [nm]", [ mon = Monitor(r"$\lambda$ [nm]", [
# dict(ax=0, ylabel="Wavelength [nm]", color="red"), # dict(ax=0, ylabel="Wavelength [nm]", color="red"),
@ -303,7 +304,7 @@ def _measure(monochromator: Bentham, lamp: Xenon, lockin: SR830, shutter: Shutte
N_bins = 100 N_bins = 100
dt = 0.01 dt = 0.01
i = 0 i = 0
shutter.open_() shutter.open()
if isinstance(lockin, SR830): if isinstance(lockin, SR830):
lockin.run("SENS 17") # 1 mV/nA lockin.run("SENS 17") # 1 mV/nA
lockin.run("OFLT 5") lockin.run("OFLT 5")
@ -345,7 +346,7 @@ def _measure(monochromator: Bentham, lamp: Xenon, lockin: SR830, shutter: Shutte
data.append((wl, arr)) data.append((wl, arr))
i += 1 i += 1
shutter.close_() shutter.close()
return data, mon return data, mon
lockin = None lockin = None
@ -364,7 +365,7 @@ def measure_both_sim(**kwargs):
if __name__ == "__main__": if __name__ == "__main__":
mcm = Bentham() mcm = Bentham()
shutter = ShutterProbe() shutter = module_shutter.connect_device(module_shutter.TYPENAME_DAQ, "TAS Lamp Shutter")
# mcm.park() # mcm.park()
lamp = Xenon() lamp = Xenon()
lockin = SR830.connect_device(SR830.enumerate_devices()[0]) lockin = SR830.connect_device(SR830.enumerate_devices()[0])

184
prsctrl/test_measurement.py Normal file
View File

@ -0,0 +1,184 @@
import numpy as np
import scipy as scp
from time import sleep
import pyvisa
import logging
log = logging.getLogger(__name__)
from prsctrl.devices.lamp import Lamp
from prsctrl.devices.shutter import Shutter
from prsctrl.devices.monochromator import Monochromator
from .update_funcs import Monitor
from prsctrl.devices.lock_in import Lock_In_Amp
from prsctrl.devices.lock_in.impl.sr830 import SR830
def set_measurement_params(lockin: SR830, p: dict={}, **kwargs):
params = p | kwargs
key_to_setter = {
"time_constant_s": lockin.set_time_constant_s,
"filter_slope": lockin.set_filter_slope,
"sync_filter": lockin.set_sync_filter,
"reserve": lockin.set_reserve,
"sensitivity_volt": lockin.set_sensitivity_volt,
"frequency_Hz": lockin.set_frequency_Hz,
"reference": lockin.set_reference,
"reference_trigger": lockin.set_reference_trigger,
}
for k, v in params.items():
if k not in key_to_setter.keys():
raise KeyError(f"Invalid parameter {k}")
key_to_setter[k](v)
def set_offsets_laser_only(lockin: SR830, shutter: Shutter, wait_time_s, R=True, phase=True):
"""
Set the R offset from the signal when only the laser is on.
This signal should be stray laser light and laser induced PL
:param phase: If True, use the Auto-Phase function to offset the phase
:param R: If True, use the Auto-Offset function to offset R
:return: Offset as percentage of the full scale R, Phase offset in degrees
"""
log.info("Setting offset when the lamp is off.")
shutter.close()
sleep(wait_time_s + 10)
lockin.run("AOFF 3") # auto offset R
# R must come before phase, because after auto-phase the signal needs to stabilize again
if R:
R_offset_fs = float(lockin.query("OEXP? 3").split(",")[0]) # returns R offset and expand
if phase:
lockin.run("APHS")
phase_offset_deg = float(lockin.query("PHAS? 3")) # returns R offset and expand
return R_offset_fs, phase_offset_deg
def _measure_both_sim(monochromator: Monochromator, lockin: SR830, shutter: Shutter, wl_range=(400, 750, 25), aux_DC="Aux In 4", offset_with_laser_only=True, monitor=None, laser_power_mW=None):
data = {}
lockin_params = {
"time_constant_s": 10,
# "time_constant_s": 300e-3,
"sensitivity_volt": 500e-6,
"filter_slope": 12,
"sync_filter": 1,
"reserve": "Normal",
"reference": "Internal",
"reference_trigger": "Falling Edge",
"frequency_Hz": 173,
}
measurement_params = {
"measurement_time_s": 30,
"sample_rate_Hz": 512,
}
if laser_power_mW:
measurement_params["laser_power_mW"] = laser_power_mW
set_measurement_params(lockin, lockin_params)
measurement_time_s = measurement_params["measurement_time_s"]
sample_rate_AC = measurement_params["sample_rate_Hz"]
n_bins_AC = measurement_time_s * sample_rate_AC # x sec messen mit <sample_rate> werte pro sekunde
timeout_s = 60
timeout_interval = 0.5
# trigger on the falling edge, since the light comes through when the ref signal is low
# could of course also trigger on rising and apply 180° shift
lockin.run("RSLP 2")
# since we dont expect changes in our signal, we can use larger time constants and aggressive filter slope
# for better signal to noise
wait_time_s = lockin.get_wait_time_s()
def run_lockin_cmd(cmd, n_try=2):
com_success = n_try
e = None
while com_success > 0:
try:
return cmd()
except pyvisa.VisaIOError as e:
lockin.try_recover_from_communication_error(e)
com_success -= 1
raise e
# 5s for setting buffer,
# 5s for get values and plot
print(f"Time estimate {(measurement_time_s + wait_time_s + 10 + 5 + 5)/60 * ((wl_range[1]-wl_range[0])/wl_range[2])} minutes")
input("Make sure the laser is turned on and press enter > ")
mon = monitor if monitor is not None else Monitor(r"$\lambda$ [nm]", [
dict(ax=0, ylabel=r"$\Delta R$", color="green"),
dict(ax=1, ylabel=r"$\sigma_{\Delta R}$", color="green"),
dict(ax=2, ylabel=r"$R$", color="blue"),
dict(ax=3, ylabel=r"$\sigma_R$", color="blue"),
dict(ax=4, ylabel=r"$\Delta R/R$", color="red"),
dict(ax=5, ylabel=r"$\sigma_{\Delta R/R}$", color="red"),
dict(ax=6, ylabel=r"$\theta$", color="pink"),
])
mon.set_fig_title(f"Turn on laser and plug detector into A and {aux_DC} ")
data["lock-in-params"] = lockin_params
data["measurement-params"] = measurement_params
full_scale_voltage = lockin_params["sensitivity_volt"]
def set_offsets(name):
shutter.close()
mon.set_fig_title(f"Measuring baseline with lamp off")
R_offset_fs, phase_offset_deg = set_offsets_laser_only(lockin, shutter, wait_time_s)
R_offset_volt = R_offset_fs * full_scale_voltage
data[f"R_offset_volt_{name}"] = R_offset_volt
data[f"phase_offset_deg_{name}"] = phase_offset_deg
print(f"R_offset_volt_{name} {R_offset_volt}")
print(f"phase_offset_deg_{name}: {phase_offset_deg}")
if offset_with_laser_only: set_offsets("before")
data["reference_freq_Hz_before"] = lockin.get_frequency_Hz()
data["info"] = []
shutter.open()
for i_wl, wl in enumerate(range(*wl_range)):
mon.set_ax_title(f"$\\lambda = {wl}$ nm")
run_lockin_cmd(lambda: lockin.buffer_setup(CH1="R", CH2=aux_DC, length=n_bins_AC, sample_rate=sample_rate_AC))
mon.set_fig_title(f"Setting wavelength to {wl} nm")
monochromator.set_wavelength_nm(wl)
mon.set_fig_title(f"Waiting for signal to stabilize")
# wait the wait time
sleep(wait_time_s + 10)
overload = run_lockin_cmd(lambda: lockin.check_overloads())
if overload:
msg = f"Overload of {overload} at {wl} nm"
log.warning(msg)
data["info"].append(msg)
theta = []
mon.set_fig_title(f"Measuring...")
theta.append(run_lockin_cmd(lambda: float(lockin.query("OUTP? 4"))))
run_lockin_cmd(lambda: lockin.buffer_start_fill())
t = timeout_s
while t > 0:
t -= timeout_interval
sleep(timeout_interval)
if run_lockin_cmd(lambda: lockin.buffer_is_done()):
break
if t < 0: raise RuntimeError("Timed out waiting for buffer measurement to finish")
theta.append(run_lockin_cmd(lambda: float(lockin.query("OUTP? 4"))))
arr = run_lockin_cmd(lambda: lockin.buffer_get_data(CH1=True, CH2=True))
data[wl] = {}
data[wl]["raw"] = arr
data[wl]["theta"] = theta
# calculate means
means = np.mean(arr, axis=1)
errs = np.std(arr, axis=1)
dR = means[0]
R = means[1]
sdR = errs[0]
sR = errs[1]
data[wl] |= {"dR": dR, "sdR": sdR, "R": R, "sR": sR}
dR_R = dR / R
sdR_R = np.sqrt((sdR / R) + (dR * sR/R**2))
data[wl] |= {"dR_R": dR_R, "sdR_R": sdR_R}
mon.update(wl, dR, sdR, R, sR, dR_R, sdR_R, theta[0])
# if it fails, we still want the data returned
try:
if offset_with_laser_only: set_offsets("before")
data["reference_freq_Hz_after"] = lockin.get_frequency_Hz()
except Exception as e:
print(e)
mon.set_fig_title("Photoreflectance")
mon.set_ax_title("")
return data, mon

View File

@ -16,7 +16,9 @@ class Monitor:
self.lines = [] self.lines = []
n_ax = max(self.options, key=lambda v: v["ax"])["ax"] + 1 n_ax = max(self.options, key=lambda v: v["ax"])["ax"] + 1
plt.ion() plt.ion()
self.fig1, self.ax = plt.subplots(n_ax, 1, figsize=(7, 8), sharex=True) self.fig1, self.ax = plt.subplots(n_ax, 1, figsize=(7, 8), sharex=True, squeeze=False)
self.ax = self.ax[:,0]
self.ax[-1].set_xlabel(xlabel)
for i in range(len(self.ydatas)): for i in range(len(self.ydatas)):

View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -4,11 +4,19 @@ import os
import datetime import datetime
import pickle import pickle
import logging import logging
from abc import abstractmethod
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
from cpdctrl.utility.file_io import get_next_filename, sanitize_filename from ..utility.file_io import get_next_filename, sanitize_filename
from cpdctrl.utility.data import CpdData, FLUSH_TYPE, FLUSH_PREFIX, METADATA_FILENAME from ..utility.prsdata import PrsData, FLUSH_TYPE, FLUSH_PREFIX, METADATA_FILENAME
"""
Wollen:
- Daten während der Messung hinzufügen und in Snippets auf die Disk schreiben
- Daten nach der Messung laden, aus Rohdaten (directory), aus Berechneten Daten (csv)
"""
class DataCollector: class DataCollector:
""" """
@ -16,13 +24,15 @@ class DataCollector:
""" """
def __init__(self, def __init__(self,
data_path: str, data_path: str,
data_name: str="CPData", data_name: str="PRS",
metadata: dict[str, str]={}, metadata: dict[str, str]={},
dirname: str|None=None, dirname: str|None=None,
add_number_if_dir_exists=True, add_number_if_dir_exists=True,
data_container=list,
): ):
self.data = [] self.data_type = data_container
self.cpd_data = None # if loaded, this contains the final numpy array self.data = data_container()
self.full_data = None # if loaded, this contains the final numpy array
self.name = data_name self.name = data_name
self.metadata = metadata self.metadata = metadata
self.path = os.path.abspath(os.path.expanduser(data_path)) self.path = os.path.abspath(os.path.expanduser(data_path))
@ -50,12 +60,60 @@ class DataCollector:
# OPERATION # OPERATION
def clear(self): def clear(self):
self.data = [] self.data = []
self.cpd_data = None self.full_data = None
def add_data(self, i, t, v, l):
self.data.append((i, t, v, l))
self.cpd_data = None # no longer up to date
def assert_directory_exists(self):
if not os.path.isdir(self.dirpath):
os.makedirs(self.dirpath)
def get_data(self) -> PrsData:
"""
Load the full data and return it together with the metadata
Returns
-------
tuple[np.ndarray, dict]
The full data and the metadata
"""
if self.full_data is None:
self.full_data = PrsData(path=self.dirpath, metadata=self.metadata)
return self.full_data
def save_csv_in_dir(self, sep=",", verbose=False):
"""Save full data as csv inside the directory with temporary data"""
self.get_data()
filepath = os.path.join(self.dirpath, self.dirname + ".csv")
self.full_data.save_csv_at(filepath, sep, verbose)
def write_metadata(self):
f"""
Write the metadata to the disk as '{METADATA_FILENAME}'
Returns
-------
None.
"""
filepath = os.path.join(self.dirpath, METADATA_FILENAME)
log.debug(f"Writing metadata to {filepath}")
with open(filepath, "wb") as file:
pickle.dump(self.metadata, file)
class PrsDataCollector(DataCollector):
def __init__(self,
data_path: str,
data_name: str="PRS",
metadata: dict[str, str]={},
dirname: str|None=None,
add_number_if_dir_exists=True,
):
super().__init__(data_path, data_name, metadata, dirname, add_number_if_dir_exists, dict)
@abstractmethod
def add_data(self, wavelength, raw):
self.data[wavelength] = raw
self.full_data = None # no longer up to date
@abstractmethod
def flush(self, verbose: bool = False): def flush(self, verbose: bool = False):
""" """
Write the current data to a file and clear the internal data Write the current data to a file and clear the internal data
@ -79,63 +137,27 @@ class DataCollector:
if len(self.data) == 0: if len(self.data) == 0:
return return
self.assert_directory_exists() self.assert_directory_exists()
for key, key_data in self.data.items():
if FLUSH_TYPE == "csv": if FLUSH_TYPE == "csv":
filename = self._get_flush_filename() + ".csv" filename = self._get_flush_filename(key) + ".csv"
filepath = os.path.join(self.dirpath, filename) filepath = os.path.join(self.dirpath, filename)
log.info(f"Flushing data to {filepath}") log.info(f"Flushing data to {filepath}")
if verbose: print(f"Flushing data to {filepath}") if verbose: print(f"Flushing data to {filepath}")
df = pd.DataFrame(key_data, columns=PrsData.columns)
df = pd.DataFrame(self.data, columns=CpdData.columns)
df.meta = str(self.metadata) df.meta = str(self.metadata)
df.to_csv(filepath, sep=",", index=False, metadata=True) df.to_csv(filepath, sep=",", index=False, metadata=True)
elif FLUSH_TYPE == "pickle-ndarray": elif FLUSH_TYPE == "pickle-ndarray":
filename = self._get_flush_filename() + ".ndarray.pkl" filename = self._get_flush_filename(key) + ".ndarray.pkl"
filepath = os.path.join(self.dirpath, filename) filepath = os.path.join(self.dirpath, filename)
log.info(f"Flushing data to {filepath}") log.info(f"Flushing data to {filepath}")
if verbose: print(f"Flushing data to {filepath}") if verbose: print(f"Flushing data to {filepath}")
with open(filepath, "wb") as file: with open(filepath, "wb") as file:
pickle.dump(np.array(self.data), file) pickle.dump(np.array(key_data), file)
else: else:
raise ValueError(f"Invalid FLUSH_TYPE: '{FLUSH_TYPE}'") raise ValueError(f"Invalid FLUSH_TYPE: '{FLUSH_TYPE}'")
self.clear() self.clear()
# File IO # File IO
def _get_flush_filename(self): def _get_flush_filename(self, key):
"""Get the filename of the next partial file, incrementing the number every time""" return sanitize_filename(self.name + "_" + str(key))
return sanitize_filename(get_next_filename(FLUSH_PREFIX + self.name, self.dirpath, digits=5))
def assert_directory_exists(self):
if not os.path.isdir(self.dirpath):
os.makedirs(self.dirpath)
def get_data(self) -> CpdData:
"""
Load the full data and return it together with the metadata
Returns
-------
tuple[np.ndarray, dict]
The full data and the metadata
"""
if self.cpd_data is None:
self.cpd_data = CpdData(path=self.dirpath, metadata=self.metadata)
return self.cpd_data
def save_csv_in_dir(self, sep=",", verbose=False):
"""Save full data as csv inside the directory with temporary data"""
self.get_data()
filepath = os.path.join(self.dirpath, self.dirname + ".csv")
self.cpd_data.save_csv_at(filepath, sep, verbose)
def write_metadata(self):
f"""
Write the metadata to the disk as '{METADATA_FILENAME}'
Returns
-------
None.
"""
filepath = os.path.join(self.dirpath, METADATA_FILENAME)
log.debug(f"Writing metadata to {filepath}")
with open(filepath, "wb") as file:
pickle.dump(self.metadata, file)

View File

@ -0,0 +1,72 @@
def select_device_interactive(type_devices_dict: dict[str, list[str]], prompt="Select an instrument: ") -> tuple[str, str]:
"""
Select a device interactively from the command line
Parameters
----------
type_devices_dict
A dictionary of device types and their corresponding device names
-------
The type and name of the selected device.
These can be passed to the connect_device method of the led_control_device or voltage_measurement_device libraries
"""
res = type_devices_dict
flat_res = [ (t, v) for t, l in res.items() for v in l ]
for i, (t,v) in enumerate(flat_res):
print(f"{i+1:02}: {t} - {v}")
while len(flat_res) > 0:
try:
instr = int(input(prompt)) - 1
if instr < 0 or instr >= len(flat_res):
raise ValueError
return flat_res[instr]
except ValueError:
print(f"Enter a number between 1 and {len(flat_res)}")
continue
raise Exception("No devices found")
from .config_file import ConfigFile
import logging
def connect_device_from_config_or_interactive(config_file: ConfigFile, device_typename: str, device_typename_prompt: str, module, config_key_template="last_dev_<device>_<type/name>", log: logging.Logger=logging.getLogger(__name__), n_interactive_tries=2):
"""
Connect a device from the type and name stored in a config file, and if that fails prompt the user on the command line
:param config_file: ConfigFile object
:param device_typename: device type name to use in the config file key
:param device_typename_prompt: device type name to use in interactive prompt
:param module: python module supporting .connect_device(type, name) and .enumerate_devices()
:param config_key_template: template for the config key string
:param log: Logger
:param n_interactive_tries: how often to try the interactive selection
:return: device
"""
# get the last connected from the config file
key_type = config_key_template.replace("<device>", device_typename).replace("<type/name>", "type")
key_name = config_key_template.replace("<device>", device_typename).replace("<type/name>", "name")
last_shutter_type = config_file.get_or(key_type, None)
last_shutter_name = config_file.get_or(key_name, None)
dev = None
# try to connect
if last_shutter_name and last_shutter_type:
try:
dev = module.connect_device(last_shutter_type, last_shutter_name)
except:
log.error(f"Failed to connect to last used device {last_shutter_type}::{last_shutter_name}")
while dev is None and n_interactive_tries > 0:
devs = module.list_devices()
print("-" * 50)
dev_type, dev_name = select_device_interactive(devs, f"Select {device_typename_prompt}: ")
try:
dev = module.connect_device(dev_type, dev_name)
config_file.set(key_type, dev_type)
config_file.set(key_name, dev_name)
except:
log.error(f"Failed to connect to device {dev_type}::{dev_name}")
n_interactive_tries -= 1
if dev is None:
raise Exception("Failed to connect a device")
return dev

View File

@ -6,19 +6,19 @@ import pickle
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
from cpdctrl.utility.file_io import get_next_filename, sanitize_filename from ..utility.file_io import get_next_filename, sanitize_filename
FLUSH_TYPE = "pickle-ndarray" FLUSH_TYPE = "pickle-ndarray"
FLUSH_PREFIX = "PART_" FLUSH_PREFIX = "PART_"
METADATA_FILENAME = FLUSH_PREFIX + "measurement_metadata.pkl" METADATA_FILENAME = FLUSH_PREFIX + "measurement_metadata.pkl"
class CpdData:
class PrsData:
""" """
Class managing data and metadata. Class managing data and metadata.
Can be initialized from data directly, or a file or directory path. Can be initialized from data directly, or a file or directory path.
""" """
columns = ["idx", "t [s]", "V [V]", "LED [%]"] def __init__(self, path:str|None=None, data:tuple|None=None, metadata:dict|None=None, verbose=False):
def __init__(self, path:str|None=None, data:np.ndarray|None=None, metadata:dict|None=None, verbose=False):
self.data = data self.data = data
if type(metadata) == dict: if type(metadata) == dict:
self.metadata = metadata self.metadata = metadata
@ -30,14 +30,14 @@ class CpdData:
raise ValueError("Either path or data must be defined, but not both.") raise ValueError("Either path or data must be defined, but not both.")
if path is not None: # load from file if path is not None: # load from file
if os.path.isdir(path): if os.path.isdir(path):
self.data, md = CpdData.load_data_from_dir(path, verbose=verbose) self.data, md = PrsData.load_data_from_dir(path, verbose=verbose)
self.metadata |= md self.metadata |= md
elif os.path.isfile(path): elif os.path.isfile(path):
if path.endswith(".csv"): if path.endswith(".csv"):
self.data, md = CpdData.load_data_from_csv(path) self.data, md = PrsData.load_data_from_csv(path)
self.metadata |= md self.metadata |= md
elif path.endswith(".pkl"): elif path.endswith(".pkl"):
self.data, md = CpdData.load_data_from_pkl(path) self.data, md = PrsData.load_data_from_pkl(path)
self.metadata |= md self.metadata |= md
else: else:
raise NotImplementedError(f"Only .csv and .pkl files are supported") raise NotImplementedError(f"Only .csv and .pkl files are supported")
@ -48,13 +48,13 @@ class CpdData:
# Convert data # Convert data
def to_dataframe(self): def to_dataframe(self):
df = pd.DataFrame(self.data, columns=CpdData.columns) df = pd.DataFrame(self.data, columns=self.columns)
df.meta = str(self.metadata) df.meta = str(self.metadata)
return df return df
def to_csv(self, sep=","): def to_csv(self, sep=","):
# self.to_dataframe().to_csv(os.path.join(self.path, self.name + ".csv"), index=False, metadata=True) # self.to_dataframe().to_csv(os.path.join(self.path, self.name + ".csv"), index=False, metadata=True)
return CpdData.get_csv(self.data, self.metadata, sep=sep) return PrsData.get_csv(self.data, self.metadata, sep=sep)
def save_csv_at(self, filepath, sep=",", verbose=False): def save_csv_at(self, filepath, sep=",", verbose=False):
@ -74,7 +74,7 @@ class CpdData:
csv = "" csv = ""
for k, v in metadata.items(): for k, v in metadata.items():
csv += f"# {k}: {v}\n" csv += f"# {k}: {v}\n"
csv += "".join(f"{colname}{sep}" for colname in CpdData.columns).strip(sep) + "\n" csv += "".join(f"{colname}{sep}" for colname in PrsData.columns).strip(sep) + "\n"
for i in range(data.shape[0]): for i in range(data.shape[0]):
csv += f"{i}{sep}{data[i,1]}{sep}{data[i,2]}{sep}{data[i,3]}\n" csv += f"{i}{sep}{data[i,1]}{sep}{data[i,2]}{sep}{data[i,3]}\n"
return csv.strip("\n") return csv.strip("\n")
@ -118,8 +118,8 @@ class CpdData:
data = np.loadtxt(f, delimiter=sep) data = np.loadtxt(f, delimiter=sep)
return data, metadata return data, metadata
@staticmethod @classmethod
def load_data_from_pkl(filepath:str) -> tuple[np.ndarray, dict]: def load_data_from_pkl(cls, filepath:str) -> tuple[np.ndarray, dict]:
""" """
Loads data from a single csv file. Loads data from a single csv file.
Lines with this format are interpreted as metadata: Lines with this format are interpreted as metadata:
@ -130,8 +130,6 @@ class CpdData:
---------- ----------
filepath filepath
Path to the csv file. Path to the csv file.
sep
csv separator
Returns Returns
------- -------
data data
@ -155,8 +153,8 @@ class CpdData:
else: else:
raise ValueError(f"Pickled object must be either numpy.ndarray or (numpy.ndarray, dict), but is of type {type(obj)}") raise ValueError(f"Pickled object must be either numpy.ndarray or (numpy.ndarray, dict), but is of type {type(obj)}")
# must be loaded by now # must be loaded by now
if not len(data.shape) == 2 and data.shape[1] == 4: if not len(data.shape) == 2 and data.shape[1] == len(cls.columns):
raise ValueError(f"numpy.ndarray has invalid shape: {data.shape}, however the shape must be (N, 4)") raise ValueError(f"numpy.ndarray has invalid shape: {data.shape}, however the shape must be (N, {len(cls.columns)})")
if not isinstance(metadata, dict): if not isinstance(metadata, dict):
raise ValueError(f"Metadata is not a of type dict") raise ValueError(f"Metadata is not a of type dict")
return data, metadata return data, metadata
@ -239,7 +237,7 @@ def plot_cpd_data(data: str or pd.DataFrame or np.ndarray, t: str="seconds", tit
Matplotlib figure object. Matplotlib figure object.
""" """
if type(data) == str: if type(data) == str:
_data, _ = CpdData.load_data_from_dir(data) _data, _ = PrsData.load_data_from_dir(data)
else: else:
_data = data _data = data
fig, ax = plt.subplots() fig, ax = plt.subplots()

View File

@ -1,26 +0,0 @@
def select_device_interactive(type_devices_dict: dict[str, list[str]], prompt="Select an instrument: ") -> tuple[str, str]:
"""
Select a device interactively from the command line
Parameters
----------
type_devices_dict
A dictionary of device types and their corresponding device names
-------
The type and name of the selected device.
These can be passed to the connect_device method of the led_control_device or voltage_measurement_device libraries
"""
res = type_devices_dict
flat_res = [ (t, v) for t, l in res.items() for v in l ]
for i, (t,v) in enumerate(flat_res):
print(f"{i+1:02}: {t} - {v}")
while len(flat_res) > 0:
try:
instr = int(input(prompt)) - 1
if instr < 0 or instr >= len(flat_res):
raise ValueError
return flat_res[instr]
except ValueError:
print(f"Enter a number between 1 and {len(flat_res)}")
continue
raise Exception("No devices found")