commit 6fd9902aab3b630635a8190be762ab22564e9243 Author: Matthias@Dell Date: Thu Apr 27 01:53:47 2023 +0200 initial commit diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..cdd7616 --- /dev/null +++ b/readme.md @@ -0,0 +1,2 @@ +# Machine Learning stuff for TENG project + diff --git a/teng-ml/main.py b/teng-ml/main.py new file mode 100644 index 0000000..17cb8a3 --- /dev/null +++ b/teng-ml/main.py @@ -0,0 +1,29 @@ +import matplotlib.pyplot as plt +import pandas as pd + +if __name__ == "__main__": + if __package__ is None: + # make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change + __package__ = "teng-ml" + import sys + from os import path + filepath = path.realpath(path.abspath(__file__)) + sys.path.insert(0, path.dirname(path.dirname(filepath))) + +from .util.transform import ConstantInterval + +if __name__ == "__main__": + file = "/home/matth/data/2023-04-25_kapton_8.2V_179mm002.csv" + # file = "/home/matth/data/test001.csv" + df = pd.read_csv(file) + array = df.to_numpy() + print(ConstantInterval.get_average_interval(array[:,0])) + transformer = ConstantInterval(0.05) + interp_array = transformer(array[:,0], array[:,2]) + + fig1, ax1 = plt.subplots() + ax1.plot(interp_array[:,0], interp_array[:,1], color="r", label="Interpolated") + ax1.scatter(array[:,0], array[:,2], color="g", label="Original") + ax1.legend() + plt.show() + diff --git a/teng-ml/prepare.py b/teng-ml/prepare.py new file mode 100644 index 0000000..18f3456 --- /dev/null +++ b/teng-ml/prepare.py @@ -0,0 +1,145 @@ +import pandas as pd +import numpy as np +import scipy.signal as signal +import matplotlib.pyplot as plt +from time import sleep +from random import choice as r_choice +from sys import exit + +from .util.transform import Normalize + +if __name__ == "__main__": + if __package__ is None: + # make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change + __package__ = "teng-ml" + import sys + from os import path + filepath = path.realpath(path.abspath(__file__)) + sys.path.insert(0, path.dirname(path.dirname(filepath))) +from .utility.data import load_dataframe + +file = "/home/matth/data/2023-04-25_kapton_8.2V_179mm002.csv" + +class PeakInfo: + """ + Helper class for "iterating" through selected peaks. + """ + def __init__(self): + self.reset() + + def reset(self): + self._peak_names = [ "first", "second", "last", "lowest" ] + self._peaks = { p: None for p in self._peak_names } + self._iter = 0 + + def current(self): + # return (self._peak_names[self._iter]), self._peaks[self._peak_names[self._iter]] + return self._peaks[self._peak_names[self._iter]] + def name(self): + return self._peak_names[self._iter] + + def next(self): + if self._iter < len(self._peak_names) - 1: self._iter += 1 + return self.current() + def prev(self): + if self._iter > 0: self._iter -= 1 + return self.current() + + def set(self, value): + """Assign a value to the current peak""" + self._peaks[self._peak_names[self._iter]] = value + def is_done(self): + for peak in self._peaks.values(): + if peak is None: return False + return True + + def __getitem__(self, key): + return self._peaks[key] + def __setitem__(self, key, value): + self._peaks[key] = value + def __repr__(self): + return f"{self._peak_names[self._iter]} peak" + + +def find_peaks(a): + peaks = signal.find_peaks(a) + + +def on_click(fig, ax, peaks, event): + """ + Let the user select first, second and last peak by clicking on them in this order. + Right click undos the last selection + """ + select = None + if event.button == 1: # left click + peaks.set((event.xdata, event.ydata)) + print(f"{peaks}: {event.xdata} - {event.ydata}") + ax.set_title(f"{peaks}: {event.xdata} - {event.ydata}") + peaks.next() + elif event.button == 3: # right click + ax.set_title(f"Undo {peaks.name()}") + if not peaks.is_done(): + peaks.prev() + peaks.set(None) + if peaks.is_done(): message = "Close window when done" + else: message = f"Click on {peaks}" + fig.suptitle(message) + fig.canvas.draw() + # fig1.canvas.flush_events() + +def calc_peaks(peaks): + # get the peak points from the information of a Peaks object + # 90% distance between first and second + min_distance = max(1, (peaks["second"][0] - peaks["first"][0]) * 0.9) + min_height = peaks["lowest"][1] * 0.99 + vpeaks = signal.find_peaks(vdata, height=min_height, distance=min_distance) + return vpeaks + + +if __name__ == "__main__": + """ + Peak identification: + plot, let user choose first, second, last and lowest peak for identification + """ + df = load_dataframe(file) + a = df.to_numpy() + + # a2 = interpolate_to_linear_time() + # print(a2) + # exit() + + vdata = Normalize(0, 1)(a[:,2]) + plt.ion() + # vpeaks[0] is the list of the peaks + vpeaks = signal.find_peaks(vdata)[0] + fig, ax = plt.subplots() + ax.plot(vdata) + peak_lines = ax.vlines(vpeaks, 0, 1, colors="r") + ax.grid(True) + fig.suptitle("Click on first peak") + peak_info = PeakInfo() + # handle clicks + fig.canvas.mpl_connect("button_press_event", lambda ev: on_click(fig, ax, peak_info, ev)) + # run until user closes, events are handled with on_click function + print(vdata.size) + while plt.fignum_exists(fig.number): + plt.pause(0.01) + if (peak_info.is_done()): + vpeaks = calc_peaks(peak_info)[0] + x_margin = (a[-1,0] - a[0,0]) * 0.05 # allow some margin if user clicked not close enough on peak + vpeaks = vpeaks[(vpeaks >= peak_info["first"][0] - x_margin) & (vpeaks <= peak_info["last"][0] + x_margin)] # remove peaks before first and after last + peak_lines.remove() + peak_lines = ax.vlines(vpeaks, 0, 1, colors="r") + peak_info.reset() + print(a[:,0], vpeaks) + + # separate peaks + indices = np.arange(0, a[:,0].size) + peak_datas = [] + for i in range(len(vpeaks) - 1): + # TODO: user <= or < + peak_datas.append(vdata[(indices >= vpeaks[i]) & (indices < vpeaks[i+1])]) + plt.plot(peak_datas[i]) + print(peak_datas) + plt.pause(20) + diff --git a/teng-ml/util/__pycache__/transform.cpython-310.pyc b/teng-ml/util/__pycache__/transform.cpython-310.pyc new file mode 100644 index 0000000..9ccaebd Binary files /dev/null and b/teng-ml/util/__pycache__/transform.cpython-310.pyc differ diff --git a/teng-ml/util/data_loader.py b/teng-ml/util/data_loader.py new file mode 100644 index 0000000..d1e7fc3 --- /dev/null +++ b/teng-ml/util/data_loader.py @@ -0,0 +1,25 @@ + +def load_data(): + # Build the category_lines dictionary, a list of names per language + category_lines = {} + all_categories = [] + + def find_files(path): + return glob.glob(path) + + # Read a file and split into lines + def read_lines(filename): + lines = io.open(filename, encoding='utf-8').read().strip().split('\n') + return [unicode_to_ascii(line) for line in lines] + + for filename in find_files('data/names/*.txt'): + category = os.path.splitext(os.path.basename(filename))[0] + all_categories.append(category) + + lines = read_lines(filename) + category_lines[category] = lines + + return category_lines, all_categories + + + diff --git a/teng-ml/util/transform.py b/teng-ml/util/transform.py new file mode 100644 index 0000000..d468c73 --- /dev/null +++ b/teng-ml/util/transform.py @@ -0,0 +1,47 @@ +import numpy as np +from scipy.interpolate import interp1d + +class Normalize: + """ + normalize so that all values are between low and high + """ + def __init__(self, low=0, high=1): + assert(low < high) + self.low = low + self.high = high + def __call__(self, a): + min_ = np.min(a) + a = a - min_ + max_ = np.max(a) + if max_ != 0: + a = (a / max_) + # now normalized between 0 and 1 + a *= (self.high - self.low) + a -= self.low + return a + + +class ConstantInterval: + """ + Interpolate the data to have a constant interval / sample rate, + so that 1 index step is always equivalent to a certain time step + Expects: timestamps, idata, vdata + """ + def __init__(self, interval): + self.interval = interval + + def __call__(self, timestamps, data): + interp = interp1d(timestamps, data) + new_stamps = np.arange(0, timestamps[-1], self.interval) + print(f"old=({timestamps.size}) {timestamps}, new=({new_stamps.size}){new_stamps}") + + new_vals = interp(new_stamps) + return np.vstack((new_stamps, new_vals)).T + @staticmethod + + def get_average_interval(timestamps): + avg_interval = np.average([ timestamps[i] - timestamps[i-1] for i in range(1, len(timestamps))]) + return avg_interval + # sug_interval = 0.5 * avg_interval + # print(f"Average interval: {avg_interval}, Suggestion: {sug_interval}") +