cpdctrl-gui/cpdctrl_gui/ui/main_window.py
2025-03-17 16:42:25 +01:00

587 lines
26 KiB
Python

''' cpdctrl_gui/ui/main_window.py '''
import os.path
from PyQt6.QtCore import Qt, QTimer, QFileSystemWatcher
from PyQt6.QtWidgets import QMainWindow, QWidget, QHBoxLayout, QLabel, QStatusBar, QFileDialog, \
QVBoxLayout
from PyQt6.QtWidgets import QTabWidget
from PyQt6.QtGui import QIcon, QPixmap, QAction, QKeySequence, QDragEnterEvent
from PyQt6.QtWidgets import QDialog, QDialogButtonBox, QMessageBox
from ..resources import get_resource_path
from .widgets.menubar import MenuBar
from .widgets.toolbar import ToolBar
from .widgets.metadata_input import MetadataInput
from cpdctrl_gui.ui.widgets.settings import MeasurementSettings, AppSettings
from .widgets.plot import Plot
from .widgets.device_select import ListChoice
from .widgets.about import MarkdownView
from .widgets.led_script import LedScriptViewer
# from .widgets.treeview import TreeView
import time
import pickle
import logging
log = logging.getLogger(__name__)
import time
import multiprocessing as mp
import threading as mt
from ..utility.config import AppConfig
import cpdctrl.voltage_measurement_device as vmd
import cpdctrl.led_control_device as ledd
from cpdctrl.led_script import LedScript, InvalidScript
from cpdctrl.utility.data import DataCollector
from cpdctrl.measurement import measure
class MainWindow(QMainWindow):
"""
The main window of the app.
Contains most logic as well as measurement functionality with live updates.
"""
def __init__(self) -> None:
"""
Initialize the Main-Window.
"""
super().__init__()
# Window-Settings
self.setWindowTitle(AppConfig.APP_NAME)
x = AppConfig.MAIN_CFG.get_or("geometry_x", 100)
y = AppConfig.MAIN_CFG.get_or("geometry_y", 100)
width = AppConfig.MAIN_CFG.get_or("geometry_width", 1000)
height = AppConfig.MAIN_CFG.get_or("geometry_height", 700)
self.setGeometry(x, y, width, height)
central_widget = QWidget(self)
self.setCentralWidget(central_widget)
# add qt actions that open help and about dialog
self.a_open_help = QAction(text="Help", parent=self)
self.a_open_help.setIcon(QIcon.fromTheme(QIcon.ThemeIcon.HelpFaq))
self.a_open_help.setShortcut(QKeySequence("F1"))
self.a_open_help.triggered.connect(lambda: self.app_open_help())
self.a_open_about = QAction(text="About", parent=self)
self.a_open_about.setIcon(QIcon.fromTheme(get_resource_path("icons/icon.svg")))
self.a_open_about.triggered.connect(lambda: self.app_open_about())
self.create_toolbars()
self.setMenuBar(MenuBar(self)) # must come after toolbars
self.setStatusBar(QStatusBar(self))
layout = QHBoxLayout(central_widget)
central_widget.setLayout(layout)
# Left: Toolbox
self.w_lefttab = QTabWidget(self)
self.w_lefttab.setMinimumWidth(300)
self.w_lefttab.setMaximumWidth(400)
layout.addWidget(self.w_lefttab)
metadata_init_dict = AppConfig.MEAS_CFG.get_or("metadata", {})
# Measurement settings
self.w_measurement_settings = MeasurementSettings()
self.w_lefttab.addTab(self.w_measurement_settings, "Measurement Settings")
# Measurement metadata
self.w_metadata = MetadataInput(metadata_init_dict)
self.w_metadata.metadataChanged.connect(self._metadata_updated)
self.w_lefttab.addTab(self.w_metadata, "Measurement Metadata")
# TODO: the call_f solution isnt pretty
# TODO: only accept single file?
class RightTab(QTabWidget):
def __init__(self, call_f, parent=None):
super().__init__(parent)
self.setAcceptDrops(True)
self.call_f = call_f
def dragEnterEvent(self, event):
if not event.mimeData().hasUrls():
event.ignore()
if len(event.mimeData().urls()) != 1:
event.ignore()
event.setDropAction(Qt.DropAction.LinkAction)
event.accept()
def dropEvent(self, event):
for url in event.mimeData().urls():
file_path = url.path()[1:]
self.call_f(file_path)
# Right: Tabs: Script, Plot
self.w_right_tab = RightTab(self.measurement_load)
layout.addWidget(self.w_right_tab)
max_data_points = AppConfig.MAIN_CFG.get_or("plot_max_data_points", 40000)
self.w_plot = Plot(max_data_points=max_data_points)
self.w_right_tab.addTab(self.w_plot, "Plot")
# LED SCRIPT
self.w_led_script_viewer = LedScriptViewer(LedScript(0))
self.w_led_script_viewer.scriptUpdated.connect(self._led_script_updated)
self.w_lefttab.addTab(self.w_led_script_viewer, "LED Script")
self.w_measurement_settings.w_led_script_load.loadScript.connect(self.led_script_load)
self.verbose = True
# devices
self.vmdev = None
self.leddev = None
self.vmdev_autoconnect()
self.leddev_autoconnect()
# Measurement
self.measurement_timer = None
self.led_script = None
self.led_script_watcher = None
self.data_collector = None
self.data_queue = None
self.proc_measure = None
self.set_status("Ready")
self.menuBar().m_file.addAction(self.a_open_about)
self.menuBar().m_file.addAction(self.a_open_help)
def set_status(self, msg):
self.statusBar().showMessage(msg)
def create_toolbars(self) -> None:
"""
Creates and adds the top and right toolbars to the main window.
"""
# Top Toolbar [PyQt6.QtWidgets.QToolBar]
self.topbar = ToolBar(self, orientation=Qt.Orientation.Horizontal,
style=Qt.ToolButtonStyle.ToolButtonTextUnderIcon, icon_size=(24, 24))
# Top Toolbar Buttons
self.topbar.add_button("connect_vmdev", "CPD Devices", get_resource_path("icons/volt_device.svg"), self.vmdev_connect_from_dialog)
self.topbar.add_button("connect_leddev", "LED Devices", get_resource_path("icons/led_device.svg"), self.leddev_connect_from_dialog)
self.topbar.add_button("meas_start", "Start", QIcon.fromTheme(QIcon.ThemeIcon.MediaPlaybackStart), self.measure_start)
self.topbar.add_button("meas_stop", "Stop", QIcon.fromTheme(QIcon.ThemeIcon.MediaPlaybackStop), self.measure_stop)
self.topbar.add_button("meas_save", "Save", QIcon.fromTheme(QIcon.ThemeIcon.DocumentSaveAs), self.measurement_save)
self.topbar.add_button("meas_load", "Load", QIcon.fromTheme(QIcon.ThemeIcon.DocumentOpen), self.measurement_load_dialog)
self.topbar.add_separator()
self.topbar.add_button("app_settings", "Settings", QIcon.fromTheme(QIcon.ThemeIcon.DocumentProperties), self.app_open_settings)
self.topbar.addAction(self.a_open_help)
self.topbar.addAction(self.a_open_about)
self.topbar.add_button("app_exit", "Exit", QIcon.fromTheme(QIcon.ThemeIcon.ApplicationExit), self.app_exit)
self.addToolBar(Qt.ToolBarArea.TopToolBarArea, self.topbar)
# disable the Stop and Save buttons
self.topbar.disable_button("meas_stop")
self.topbar.disable_button("meas_save")
# Right Toolbar [PyQt6.QtWidgets.QToolBar]
# self.rightbar = ToolBar(self, orientation=Qt.Orientation.Vertical, style=Qt.ToolButtonStyle.ToolButtonIconOnly, icon_size=(24, 24))
# self.rightbar.add_separator()
# self.rightbar.add_button("Settings", "resources/assets/icons/windows/shell32-315.ico", self.settings_window)
# self.addToolBar(Qt.ToolBarArea.RightToolBarArea, self.rightbar)
# def create_treeview(self) -> TreeView:
"""
Creates and adds the tree view widget to the main window.
"""
# return TreeView(self)
# LED DEVICE MANAGEMENT
def leddev_connect(self, leddev_type, leddev_name):
log.info(f"Connecting to LED device {leddev_name} ({leddev_type})")
self.leddev = ledd.connect_device(leddev_type, leddev_name)
AppConfig.MAIN_CFG.set("led_device_last.type", leddev_type)
AppConfig.MAIN_CFG.set("led_device_last.name", leddev_name)
# Update the settings widget value
self.w_measurement_settings.set_value("device_led_controller", str(self.leddev))
led_name = self.leddev.get_led_name()
if not led_name: led_name = "Unknown"
self.w_measurement_settings.set_value("device_led", led_name)
def leddev_autoconnect(self):
if AppConfig.MAIN_CFG.get_or("led_device_auto_reconnect", False):
try:
leddev_type = AppConfig.MAIN_CFG.get("led_device_last.type")
leddev_name = AppConfig.MAIN_CFG.get("led_device_last.name")
self.leddev_connect(leddev_type, leddev_name)
except KeyError:
pass
except Exception as e:
log.error(f"Failed to auto-connect to LED device: {e}")
def leddev_connect_from_dialog(self):
"""
Open a dialog that lets the user choose a LED control device,
and then connects to the device.
"""
devices = ledd.list_devices()
device_dialog = ListChoice(devices, window_title="Select LED device")
if device_dialog.exec() == QDialog.DialogCode.Accepted:
leddev_type, leddev_name = device_dialog.get_selected()
self.leddev_connect(leddev_type, leddev_name)
# VOLTAGE DEVICE MANAGEMENT
def vmdev_connect(self, vmdev_type, vmdev_name):
log.info(f"Connecting to voltage measurement device {vmdev_name} ({vmdev_type})")
self.vmdev = vmd.connect_device(vmdev_type, vmdev_name)
AppConfig.MAIN_CFG.set("voltage_measurement_device_last.type", vmdev_type)
AppConfig.MAIN_CFG.set("voltage_measurement_device_last.name", vmdev_name)
# Update the settings widget value
self.w_measurement_settings.set_value("device_voltage_measurement", str(self.vmdev))
def vmdev_autoconnect(self):
if AppConfig.MAIN_CFG.get_or("voltage_measurement_device_auto_reconnect", False):
try:
vmdev_type = AppConfig.MAIN_CFG.get("voltage_measurement_device_last.type")
vmdev_name = AppConfig.MAIN_CFG.get("voltage_measurement_device_last.name")
self.vmdev_connect(vmdev_type, vmdev_name)
except KeyError:
pass
except Exception as e:
log.error(f"Failed to auto-connect to voltage measurement device: {e}")
def vmdev_connect_from_dialog(self):
"""
Open a dialog that lets the user choose a voltage measurement device,
and then connects to the device.
"""
devices = vmd.list_devices()
device_dialog = ListChoice(devices, window_title="Select CPD Measurement Device")
if device_dialog.exec() == QDialog.DialogCode.Accepted:
vmdev_type, vmdev_name = device_dialog.get_selected()
self.vmdev_connect(vmdev_type, vmdev_name)
# MEASUREMENT
def measure_start(self):
if self.vmdev is None:
self.vmdev_connect_from_dialog()
if self.vmdev is None:
raise ValueError("No measurement device selected")
if self.leddev is None:
self.leddev_connect_from_dialog()
if self.leddev is None:
raise ValueError("No led control device selected")
name = self.w_measurement_settings.get_value("name")
script = self.w_measurement_settings.get_value("led_script")
flush_after = self.w_measurement_settings.get_value("flush_after")
use_buffer = self.w_measurement_settings.get_value("use_buffer")
# check if device supports buffer mode
if use_buffer and not callable(getattr(self.vmdev, 'buffer_measure', None)):
# show warning dialog
ret = QMessageBox.warning(self, "Buffer mode not supported", "The selected voltage measurement device does not support the buffer measurement mode.\nClick OK to continue without buffer measurement mode.", buttons=QMessageBox.StandardButton.Ok|QMessageBox.StandardButton.Cancel, defaultButton=QMessageBox.StandardButton.Ok)
if ret == QMessageBox.StandardButton.Ok:
log.warning("Device does not support buffer mode, disabling")
use_buffer = False
else:
log.info("Cancelled measurement due to buffer mode not being available")
return
max_measurements = self.w_measurement_settings.get_value("max_measurements")
stop_on_script_end = self.w_measurement_settings.get_value("stop_on_script_end")
interval = self.w_measurement_settings.get_value("interval")
auto_add_metadata = AppConfig.MAIN_CFG.get_or("metadata_auto_add", True)
metadata = self.w_metadata.get_dict()
metadata["name"] = name
metadata["led_script"] = str(script)
log.info(f"Starting measurement with:\n\tinterval = {interval}\n\tflush_after = {flush_after}\n\tuse_buffer = {use_buffer}\n\tmax_measurements = {max_measurements}\n\tstop_on_script_end = {stop_on_script_end}")
self.topbar.disable_button("meas_start")
self.topbar.disable_button("connect_vmdev")
self.topbar.disable_button("connect_leddev")
self.topbar.disable_button("meas_save")
self.topbar.disable_button("meas_load")
self.topbar.enable_button("meas_stop")
self.w_measurement_settings.setEnabled(False)
if not AppConfig.MAIN_CFG.get_or("metadata_auto_update", True):
self.w_metadata.setEnabled(False)
self.w_plot.clear_data()
# have the led script member be the only auto-updating script,
# and pass a non-updating copy to the measurement thread
if self.led_script is None:
self.led_script_load()
led_script_no_update = self.led_script.copy()
self.data_collector = DataCollector(metadata=metadata, data_path=AppConfig.MAIN_CFG.get("dir_cache"), data_name=name)
# data_collector.clear()
self.data_queue = mp.Queue()
self.command_queue = mp.Queue()
# Argument order must match the definition
self.proc_measure = mt.Thread(target=measure, args=(
self.vmdev,
self.leddev,
led_script_no_update,
self.data_collector,
interval,
flush_after,
use_buffer,
max_measurements,
stop_on_script_end,
False, # verbose
self.command_queue,
self.data_queue,
auto_add_metadata,
))
self.proc_measure.start()
self.measurement_timer = QTimer(self)
self.measurement_timer.timeout.connect(self.measure_update)
self.measurement_timer.start(AppConfig.MAIN_CFG.get_or("plot_update_interval_ms", 200))
# the time left estimation might be a little to short, since the actual measurement is started a few lines of
# code later
self.led_script.start()
self.w_led_script_viewer.update_time_predictions()
def measure_stop(self):
log.info("Stopping measurement")
if not self.measurement_is_running():
raise RuntimeError("measure_stop: Measurement is not running")
self.set_status("Stopping measurement")
self.command_queue.put("stop")
self.measurement_timer.stop()
self.proc_measure.join()
self.set_status("Saving data...")
self.data_collector.save_csv_in_dir()
self.proc_measure = None
# don't update w_led_script, keep displaying the last values
self.led_script.reset()
# store a copy of the led script
if AppConfig.MAIN_CFG.get_or("led_script_save_copy", True):
filename = self.led_script.filepath
extra_header = []
if filename is None:
filename = "led_script.led"
else:
filename = os.path.basename(filename)
extra_header.append(f"# Original file path: {self.led_script.filepath}")
filename = "led_script.led" # always use led_script.led
datadir = self.data_collector.dirpath
filepath = os.path.join(datadir, filename)
led_script = self.led_script.to_file(add_default_header=True, add_extra_header=extra_header)
with open(filepath, "w") as f:
f.write(led_script)
self.topbar.enable_button("meas_start")
self.topbar.enable_button("connect_vmdev")
self.topbar.enable_button("connect_leddev")
self.topbar.enable_button("meas_save")
self.topbar.enable_button("meas_load")
self.topbar.disable_button("meas_stop")
self.w_measurement_settings.setEnabled(True)
self.w_metadata.setEnabled(True)
self.set_status("Ready")
def measure_update(self):
self.w_led_script_viewer.update_time(time.time())
if self.proc_measure.is_alive():
while not self.data_queue.empty():
# print(data_queue.qsize(), "\n\n")
current_data = self.data_queue.get(block=False)
i, tval, vval, led_val = current_data
# print(f"Data {i:03}: {tval}s, {vval}V, {led_val}%")
self.set_status(f"Data {i:03}: {tval:.3f}s, {vval:.10f}V, {led_val:03}%")
# update the plot
self.w_plot.update_plot(tval, vval, led_val)
else: # measurement might have stopped after max N or script end
self.measure_stop()
def measurement_is_running(self):
return self.proc_measure is not None
def measurement_save(self) -> None:
"""
Save the last measurement data to a file.
"""
if self.data_collector is None:
raise RuntimeError("Can not save, not data collector initialized")
# if not loaded already, this will load the data into memory, which might take a while
# this should be done before the dialog
self.data_collector.get_data()
# open file dialog with previous in directory with previous file extension
last_dir = AppConfig.MAIN_CFG.get_or("tmp_last_measurement_save_dir", "")
ext = AppConfig.MAIN_CFG.get_or("tmp_last_measurement_save_ext", "csv")
if ext not in ["csv", "pkl"]:
ext = "csv"
# data_collector.dirname gets the name, not path
init_path = os.path.join(last_dir, self.data_collector.dirname + "." + ext)
file_path, _ = QFileDialog.getSaveFileName(self, "Save File", init_path, "CSV Files (*.csv);;Pickle Files (*.pkl)")
if file_path:
AppConfig.MAIN_CFG.set("tmp_last_measurement_load_dir", os.path.dirname(file_path))
if file_path.endswith(".csv"):
AppConfig.MAIN_CFG.set("tmp_last_measurement_save_ext", "csv")
csv = self.data_collector.to_csv()
with open(file_path, "w") as f:
f.write(csv)
elif file_path.endswith(".pkl"):
AppConfig.MAIN_CFG.set("tmp_last_measurement_save_ext", "pkl")
data = self.data_collector.get_data()
with open(file_path, "wb") as f:
pickle.dump(data, f)
else:
raise ValueError(f"Unknown file extension in path: '{file_path}'")
self.set_status(f"Saved data to {file_path}")
else:
self.set_status(f"Aborted saving data, no file selected")
def measurement_load(self, filepath):
"""
Load measurement data from the filepath. Only updates the plot.
Parameters
----------
filepath
The path to the file or directory to load the data from.
File may be csv or pickle containing (data, metadata)
"""
log.info(f"Loading measurement data from '{filepath}'")
if self.measurement_is_running():
QMessageBox.critical(self, "Measurement running", "Can not load data while measurement is running")
return
if os.path.isfile(filepath):
if filepath.endswith(".pkl"):
with open(filepath, "rb") as file:
data, mdata = pickle.load(file)
elif filepath.endswith(".csv"):
data, mdata = DataCollector.load_data_from_csv(filepath)
else:
raise NotImplementedError(f"Unknown file extension in path: '{filepath}'.\nOnly .pkl and .csv can be loaded")
elif os.path.isdir(filepath):
data, mdata = DataCollector.load_data_from_dir(filepath)
else:
raise FileNotFoundError(f"No such file or directory: '{filepath}'")
self.w_plot.set_data(data[:,1], data[:,2], data[:,3])
def measurement_load_dialog(self):
"""
Open a file dialog to load measurement data. This only updates the plot
"""
if self.measurement_is_running():
QMessageBox.critical(self, "Measurement running", "Can not load data while measurement is running")
return
last_dir = AppConfig.MAIN_CFG.get_or("tmp_last_measurement_load_dir", "")
file_path, _ = QFileDialog.getOpenFileName(self, "Load File", last_dir, "CSV Files (*.csv)")
if file_path:
dir_name = os.path.dirname(file_path)
AppConfig.MAIN_CFG.set("tmp_last_measurement_load_dir", dir_name)
self.measurement_load(file_path)
else:
self.set_status(f"Aborted loading data, no file selected")
self.measurement_load(file_path)
def led_script_load(self):
"""
Load a new led script from the value in the measurement settings widget
"""
script = self.w_measurement_settings.get_value("led_script")
script_type = self.w_measurement_settings.get_value("led_script_type")
auto_update = AppConfig.MAIN_CFG.get_or("led_script_watch_file", False)
log.info(f"Loading led script of type '{script_type}' with auto-update {auto_update}")
try:
self.led_script = LedScript(script=script, auto_update=False, verbose=True)
except InvalidScript as e:
# show qt error
QMessageBox.critical(self, "Invalid script", str(e))
return
if auto_update and script_type == "file":
# can not use "integrated" auto update function because we cant receive qt signals
# from the watchdog thread -> use QFileSystemwatcher
self.led_script_watcher = QFileSystemWatcher()
self.led_script_watcher.addPath(script)
self.led_script_watcher.fileChanged.connect(self._led_script_update_from_file)
else:
self.led_script_watcher = None
self.w_led_script_viewer.set_script(self.led_script)
self._led_script_updated()
def _led_script_update_from_file(self):
"""
Update the led script when its file has changed
"""
self.led_script.update_from_file()
self._led_script_updated()
def _led_script_updated(self):
"""
Send the new led script to the measurement thread and update the gui widget.
"""
# update gui
self.w_led_script_viewer.update_time_predictions()
# update the measurement led script
if self.measurement_is_running():
self.command_queue.put(("led_script", self.led_script.copy()))
def _metadata_updated(self):
"""
Send the new metadata to the measurement thread.
"""
if self.measurement_is_running() and AppConfig.MAIN_CFG.get_or("metadata_update_live", True):
metadata = self.w_metadata.get_dict()
self.command_queue.put(("metadata", metadata))
def app_exit(self) -> None:
"""
Closes the application.
"""
self.close()
def app_open_about(self) -> None:
dialog = QDialog()
dialog.setWindowTitle("About cpdctrl-gui")
buttons = QDialogButtonBox(QDialogButtonBox.StandardButton.Ok)
buttons.accepted.connect(dialog.accept)
dialog.setLayout(QVBoxLayout())
# show the logo via a pixmap in a label
img_path = get_resource_path("icons/logo.svg")
pixmap = QPixmap(img_path)
pixmap = pixmap.scaled(128, 128, Qt.AspectRatioMode.KeepAspectRatio, Qt.TransformationMode.SmoothTransformation)
# qt cant find the file
label = QLabel()
label.setPixmap(pixmap)
label.setAlignment(Qt.AlignmentFlag.AlignCenter) # center the image
dialog.layout().addWidget(label)
# show about.md
dialog.layout().addWidget(MarkdownView("about.md"))
dialog.layout().addWidget(buttons)
dialog.exec()
def app_open_help(self) -> None:
dialog = QDialog()
dialog.setWindowTitle("Help")
buttons = QDialogButtonBox(QDialogButtonBox.StandardButton.Ok)
buttons.accepted.connect(dialog.accept)
dialog.setLayout(QVBoxLayout())
# show help.md
#dialog.layout().addWidget(MarkdownView("troubleshooting.md"))
dialog.layout().addWidget(MarkdownView("user_guide.md"))
dialog.layout().addWidget(buttons)
# set larger window size
dialog.resize(800, 600)
dialog.exec()
def app_open_settings(self) -> None:
dialog = QDialog()
dialog.setWindowTitle("Settings")
layout = QVBoxLayout()
w_app_settings = AppSettings()
layout.addWidget(w_app_settings)
buttons = QDialogButtonBox(QDialogButtonBox.StandardButton.Ok)
buttons.accepted.connect(dialog.accept)
layout.addWidget(buttons)
dialog.setLayout(layout)
dialog.exec()
def closeEvent(self, event):
if self.measurement_is_running():
self.measure_stop()
# save the metadata
metadata = self.w_metadata.get_dict()
AppConfig.MEAS_CFG.set("metadata", metadata)
# store the geometry as text values in config
geo = self.geometry()
AppConfig.MAIN_CFG.set("geometry_x", geo.x())
AppConfig.MAIN_CFG.set("geometry_y", geo.y())
AppConfig.MAIN_CFG.set("geometry_width", geo.width())
AppConfig.MAIN_CFG.set("geometry_height", geo.height())