Compare commits
No commits in common. "1ffedb5ddc7568418af8c3a01bd7583f04df92c7" and "9660de248aa23ceb4ce5dcdf30673710cf3705bf" have entirely different histories.
1ffedb5ddc
...
9660de248a
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,2 +1 @@
|
||||
*__pycache__*
|
||||
.old
|
||||
|
11
readme.md
11
readme.md
@ -1,13 +1,2 @@
|
||||
# Machine Learning stuff for TENG project
|
||||
(Bi)LSTM for name classification.
|
||||
More information on the project are [on my website](https://quintern.xyz/en/teng.html).
|
||||
|
||||
## Model training
|
||||
Adjust the parameters in `main.py` and run it.
|
||||
All models and the settings they were trained with are automatically serialized with pickle and stored in a subfolder
|
||||
of the `<model_dir>` that was set in `main.py`.
|
||||
|
||||
|
||||
## Model evaluation
|
||||
Run `find_best_model.py <model_dir>` with the `<model_dir>` specified in `main.py` during training.
|
||||
|
||||
|
@ -1,316 +0,0 @@
|
||||
from os import path, listdir
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
from sys import exit, argv
|
||||
|
||||
if __name__ == "__main__":
|
||||
if __package__ is None:
|
||||
# make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change
|
||||
__package__ = "teng-ml"
|
||||
import sys
|
||||
from os import path
|
||||
filepath = path.realpath(path.abspath(__file__))
|
||||
sys.path.insert(0, path.dirname(path.dirname(filepath)))
|
||||
|
||||
from .tracker.epoch_tracker import EpochTracker
|
||||
from .util.settings import MLSettings
|
||||
from .util import model_io as mio
|
||||
from .util.string import cleanup_str, fill_and_center
|
||||
|
||||
|
||||
cache = {}
|
||||
|
||||
#
|
||||
# TOOLS
|
||||
#
|
||||
def get_model_dirs(models_dir):
|
||||
"""
|
||||
return all model_dirs, relative to models_dir
|
||||
"""
|
||||
if "model_dirs" in cache: return cache["model_dirs"].copy()
|
||||
paths = listdir(models_dir)
|
||||
model_dirs = []
|
||||
for p in paths:
|
||||
if not path.isdir(f"{models_dir}/{p}"): continue
|
||||
if not path.isfile(f"{models_dir}/{p}/settings.pkl"): continue
|
||||
if not path.isfile(f"{models_dir}/{p}/tracker_training.pkl"): continue
|
||||
if not path.isfile(f"{models_dir}/{p}/tracker_validation.pkl"): continue
|
||||
if not path.isfile(f"{models_dir}/{p}/model.pkl"): continue
|
||||
model_dirs.append(f"{models_dir}/{p}")
|
||||
cache["model_dirs"] = model_dirs.copy()
|
||||
return model_dirs
|
||||
|
||||
|
||||
def resave_images_svg(model_dirs):
|
||||
"""
|
||||
open all trackers and save all plots as svg
|
||||
"""
|
||||
for model_dir in model_dirs:
|
||||
val_tracker: EpochTracker = mio.load_tracker_validation(model_dir)
|
||||
fig, _ = val_tracker.plot_predictions("Validation: Predictions", model_dir=model_dir, name="img_validation_predictions")
|
||||
train_tracker: EpochTracker = mio.load_tracker_training(model_dir)
|
||||
fig, _ = train_tracker.plot_predictions("Training: Predictions", model_dir=model_dir, name="img_training_predictions")
|
||||
fig, _ = train_tracker.plot_training(model_dir=model_dir)
|
||||
plt.close('all')
|
||||
|
||||
|
||||
|
||||
#
|
||||
# MODEL RANKING
|
||||
#
|
||||
def get_model_info_md(model_dir):
|
||||
st: MLSettings = mio.load_settings(model_dir)
|
||||
validation_tracker = mio.load_tracker_validation(model_dir)
|
||||
training_tracker = mio.load_tracker_training(model_dir)
|
||||
|
||||
s = f"""Model {model_dir[model_dir.rfind('/')+1:]}
|
||||
Model parameters:
|
||||
- num_features = {st.num_features}
|
||||
- num_layers = {st.num_layers}
|
||||
- hidden_size = {st.hidden_size}
|
||||
- bidirectional = {st.bidirectional}
|
||||
Training data:
|
||||
- transforms = {st.transforms}
|
||||
- splitter = {st.splitter}
|
||||
- labels = {st.labels}
|
||||
Training info:
|
||||
- optimizer = {cleanup_str(st.optimizer)}
|
||||
- scheduler = {cleanup_str(st.scheduler)}
|
||||
- loss_func = {st.loss_func}
|
||||
- num_epochs = {st.num_epochs}
|
||||
- batch_size = {st.batch_size}
|
||||
- n_predictions = {np.sum(training_tracker.get_count_per_label())}
|
||||
- final accuracy = {training_tracker.accuracies[-1]}
|
||||
- highest accuracy = {np.max(training_tracker.accuracies)}
|
||||
Validation info:
|
||||
- n_predictions = {np.sum(validation_tracker.get_count_per_label())}
|
||||
- accuracy = {validation_tracker.accuracies[-1]}
|
||||
"""
|
||||
return s
|
||||
|
||||
|
||||
def write_model_info(model_dir, model_info=None):
|
||||
if model_info is None: model_info = get_model_info_md(model_dir)
|
||||
with open(f"{model_dir}/model_info.md", "w") as file:
|
||||
file.write(model_info)
|
||||
|
||||
|
||||
def get_model_ranking(model_dirs):
|
||||
if "model_ranking" in cache: return cache["model_ranking"].copy()
|
||||
model_ranking = [] # model, (model_dir | validation accuracy)
|
||||
for model_dir in model_dirs:
|
||||
model_ranking.append((model_dir, mio.load_tracker_validation(model_dir).accuracies[-1]))
|
||||
model_ranking.sort(key=lambda t: t[1]) # sort accuracy
|
||||
model_ranking.reverse() # best to worst
|
||||
cache["model_ranking"] = model_ranking.copy()
|
||||
return model_ranking
|
||||
|
||||
def get_model_ranking_md(model_dirs):
|
||||
model_ranking = get_model_ranking(model_dirs)
|
||||
ranking_md = ""
|
||||
for i in range(len(model_ranking)):
|
||||
model_dir = model_ranking[i][0]
|
||||
model_name = model_dir[model_dir.rfind("/")+1:]
|
||||
ranking_md += f"{i+1:3}. Model=`{model_name}`, Validation accuaracy={round(model_ranking[i][1], 2):.2f}%\n"
|
||||
return ranking_md
|
||||
|
||||
#
|
||||
# SETTINGS RANKING
|
||||
#
|
||||
def get_settings_ranking(model_dirs, use_ranking_instead_of_accuracy=False):
|
||||
"""
|
||||
load the settings for each model and score them based on the performance of the model
|
||||
This only works when all settings were the same number of times
|
||||
(Example: 2 batch sizes x and y have to both to be used z times for the ranking to make sense)
|
||||
"""
|
||||
if "settings_ranking" in cache: return cache["settings_ranking"].copy()
|
||||
settings_ranking = {} # parameter name: param_value: score
|
||||
|
||||
model_ranking = get_model_ranking(model_dirs)
|
||||
model_ranking.reverse() # worst to best
|
||||
def score_ranking_based(i, param_name, param_value):
|
||||
"""
|
||||
score settings depending on the ranking of the model
|
||||
eg: best of 32 models has batch_size 10 -> batch_size 10 gets 32 points
|
||||
"""
|
||||
param_value = cleanup_str(param_value)
|
||||
if not param_name in settings_ranking.keys():
|
||||
settings_ranking[param_name] = {}
|
||||
if not param_value in settings_ranking[param_name].keys():
|
||||
settings_ranking[param_name][param_value] = 0
|
||||
settings_ranking[param_name][param_value] += i # i+1 is reverse place in the ranking, worst model is at i=0
|
||||
|
||||
def score_accuracy_based(i, param_name, param_value):
|
||||
"""
|
||||
score settings depending on the accuracy of the model
|
||||
eg: models has batch_size 10 and accuracy 63% -> batch_size 10 gets 63 points
|
||||
"""
|
||||
param_value = cleanup_str(param_value)
|
||||
if not param_name in settings_ranking.keys():
|
||||
settings_ranking[param_name] = {}
|
||||
if not param_value in settings_ranking[param_name].keys():
|
||||
settings_ranking[param_name][param_value] = 0
|
||||
settings_ranking[param_name][param_value] += int(model_ranking[i][1]) # accuracy
|
||||
|
||||
if use_ranking_instead_of_accuracy:
|
||||
score = lambda i, name, val : score_ranking_based(i, name, val)
|
||||
else:
|
||||
score = lambda i, name, val : score_accuracy_based(i, name, val)
|
||||
for i in range(len(model_ranking)):
|
||||
st = mio.load_settings(model_ranking[i][0])
|
||||
score(i, "num_features", st.num_features)
|
||||
score(i, "num_layers", st.num_layers)
|
||||
score(i, "hidden_size", st.hidden_size)
|
||||
score(i, "num_epochs", st.num_epochs)
|
||||
score(i, "bidirectional", st.bidirectional)
|
||||
score(i, "optimizer", st.optimizer)
|
||||
score(i, "scheduler", st.scheduler)
|
||||
score(i, "loss_func", st.loss_func)
|
||||
score(i, "transforms", st.transforms)
|
||||
score(i, "splitter", st.splitter)
|
||||
score(i, "batch_size", st.batch_size)
|
||||
# remove parameters with only one value
|
||||
settings_ranking = { k: v for k, v in settings_ranking.items() if len(v) > 1 }
|
||||
cache["settings_ranking"] = settings_ranking.copy()
|
||||
return settings_ranking
|
||||
|
||||
def get_settings_ranking_md(model_dirs):
|
||||
"""
|
||||
turn the scores dict from rank_settings into a markdown string
|
||||
"""
|
||||
settings_ranking = get_settings_ranking(model_dirs)
|
||||
s = ""
|
||||
for param_name, d in settings_ranking.items():
|
||||
s += f"- {param_name}:\n"
|
||||
sorted_scores = sorted(d.items(), key=lambda x: x[1], reverse=True)
|
||||
for i in range(len(sorted_scores)):
|
||||
param_value, score = sorted_scores[i]
|
||||
s += f"\t{i+1}. `{param_value}` ({score} points)\n"
|
||||
return s
|
||||
|
||||
|
||||
|
||||
|
||||
def interactive_model_inspector(models_dir: str):
|
||||
model_dirs = get_model_dirs(models_dir)
|
||||
model_dirs.sort()
|
||||
model_names = [ mdir[mdir.rfind('/')+1:] for mdir in model_dirs ]
|
||||
|
||||
def print_options():
|
||||
s = fill_and_center("Interactive Model Inspector") + "\n"
|
||||
for i in range(len(model_names)):
|
||||
s += f"{i+1:02}: {model_names[i]}\n"
|
||||
s += """ ---
|
||||
x: print model info for x. listed model (1-based)
|
||||
x.: print model info for x. ranked model (1-based)
|
||||
w: write last model info
|
||||
wa: write info for all listed models
|
||||
q: quit
|
||||
*: name of model or path to model directory. If not found, reprint list."""
|
||||
print(s)
|
||||
last_model_info = None
|
||||
last_model_dir = None
|
||||
|
||||
def print_model_info(model_dir):
|
||||
last_model_dir = model_dir
|
||||
last_model_info = get_model_info_md(last_model_dir)
|
||||
print(last_model_info)
|
||||
print_options()
|
||||
loop = True
|
||||
try:
|
||||
while loop:
|
||||
answer = input("> ")
|
||||
if len(answer) == 0: continue
|
||||
try: # if x -> take x. from listed models
|
||||
i = int(answer)
|
||||
if 0 < i and i <= len(model_dirs):
|
||||
|
||||
print_model_info(model_dirs[i-1])
|
||||
continue
|
||||
except ValueError: pass
|
||||
if answer.endswith('.'): # if x. -> take x. from model ranking
|
||||
try:
|
||||
i = int(answer[:-1])
|
||||
if 0 < i and i <= len(model_dirs):
|
||||
model_ranking = get_model_ranking(model_dirs)
|
||||
print_model_info(model_ranking[i-1][0])
|
||||
continue
|
||||
except ValueError: pass
|
||||
|
||||
elif answer == "w":
|
||||
if last_model_info is None:
|
||||
print("Print a model info first.")
|
||||
continue
|
||||
write_model_info(last_model_dir, last_model_info)
|
||||
elif answer == "wa":
|
||||
for model_dir in model_dirs:
|
||||
write_model_info(model_dir)
|
||||
elif answer == "q":
|
||||
loop = False
|
||||
continue
|
||||
else:
|
||||
if path.isdir(answer): # if model dir
|
||||
print_model_info(answer)
|
||||
elif path.isdir(f"{models_dir}/{answer}"): # if model name
|
||||
print_model_info(f"{models_dir}/{answer}")
|
||||
else:
|
||||
print(f"'{answer}' is not a model name in {models_dir} or path to a model directory.")
|
||||
print_options()
|
||||
except KeyboardInterrupt: # if <C-C>
|
||||
pass
|
||||
except EOFError: # if <C-D>
|
||||
exit(0)
|
||||
return True
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(argv) != 2:
|
||||
print(f"Exactly one argument (models directory) is required, but got {len(argv)-1}.")
|
||||
exit(1)
|
||||
|
||||
# models_dir = "/home/matth/Uni/TENG/models_phase_2" # where to save models, settings and results
|
||||
models_dir = path.abspath(path.expanduser(argv[1]))
|
||||
|
||||
model_dirs = get_model_dirs(models_dir)
|
||||
|
||||
def save_model_ranking():
|
||||
model_ranking = get_model_ranking_md(model_dirs)
|
||||
with open(f"{models_dir}/ranking_models.md", "w") as file:
|
||||
file.write(model_ranking)
|
||||
|
||||
def save_settings_ranking():
|
||||
scores = get_settings_ranking(model_dirs)
|
||||
with open(f"{models_dir}/ranking_settings.md", "w") as file:
|
||||
file.write(get_settings_ranking_md(scores))
|
||||
|
||||
|
||||
# if the functions return True, the options are printed again
|
||||
options = {
|
||||
'1': ("Print model ranking", lambda: print(get_model_ranking_md(model_dirs))),
|
||||
'2': ("Save model ranking", save_model_ranking),
|
||||
'3': ("Print settings ranking", lambda: print(get_settings_ranking_md(model_dirs))),
|
||||
'4': ("Save settings ranking", save_settings_ranking),
|
||||
'5': ("Interactive model inspector", lambda: interactive_model_inspector(models_dir)),
|
||||
'6': ("Resave all images", lambda: resave_images_svg(model_dirs)),
|
||||
'q': ("quit", exit)
|
||||
}
|
||||
|
||||
def print_options():
|
||||
print(fill_and_center("Model Evaluator"))
|
||||
for op, (name, _) in options.items():
|
||||
print(f"{op:4}: {name}")
|
||||
print(f"Using models directory '{models_dir}', which contains {len(model_dirs)} models")
|
||||
print_options()
|
||||
try:
|
||||
while True:
|
||||
answer = input("> ")
|
||||
if answer in options.keys():
|
||||
reprint = options[answer][1]()
|
||||
if reprint == True: print_options()
|
||||
else:
|
||||
print(f"Invalid option: '{answer}'")
|
||||
print_options()
|
||||
except KeyboardInterrupt: pass
|
||||
except EOFError: pass
|
257
teng-ml/main.py
257
teng-ml/main.py
@ -7,22 +7,20 @@ if __name__ == "__main__":
|
||||
filepath = path.realpath(path.abspath(__file__))
|
||||
sys.path.insert(0, path.dirname(path.dirname(filepath)))
|
||||
|
||||
from sys import exit
|
||||
import matplotlib.pyplot as plt
|
||||
import pandas as pd
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.nn.utils.rnn as rnn_utils
|
||||
from torch.utils.data import DataLoader
|
||||
import itertools
|
||||
import json
|
||||
import time
|
||||
from os import makedirs, path
|
||||
import pickle
|
||||
|
||||
from .util.transform import ConstantInterval, Normalize
|
||||
from .util.data_loader import load_datasets, LabelConverter
|
||||
from .util.split import DataSplitter
|
||||
from .util.epoch_tracker import EpochTracker
|
||||
from .util.settings import MLSettings
|
||||
from .rnn.rnn import RNN
|
||||
from .rnn.training import train_validate_save, select_device
|
||||
|
||||
def test_interpol():
|
||||
file = "/home/matth/data/2023-04-27_glass_8.2V_179mm000.csv"
|
||||
@ -37,92 +35,187 @@ def test_interpol():
|
||||
ax1.plot(interp_array[:,0], interp_array[:,1], color="r", label="Interpolated")
|
||||
ax1.scatter(array[:,0], array[:,2], color="g", label="Original")
|
||||
ax1.legend()
|
||||
# plt.show()
|
||||
|
||||
plt.show()
|
||||
|
||||
if __name__ == "__main__":
|
||||
device = (
|
||||
"cuda"
|
||||
if torch.cuda.is_available()
|
||||
else "mps"
|
||||
if torch.backends.mps.is_available()
|
||||
else "cpu"
|
||||
)
|
||||
|
||||
|
||||
labels = LabelConverter(["foam", "glass", "kapton", "foil", "cloth", "rigid_foam"])
|
||||
models_dir = "/home/matth/Uni/TENG/models" # where to save models, settings and results
|
||||
if not path.isdir(models_dir):
|
||||
makedirs(models_dir)
|
||||
data_dir = "/home/matth/Uni/TENG/data"
|
||||
|
||||
|
||||
# Test with
|
||||
num_layers = [ 3 ]
|
||||
hidden_size = [ 8 ]
|
||||
bidirectional = [ True ]
|
||||
t_const_int = ConstantInterval(0.01)
|
||||
t_norm = Normalize(0, 1)
|
||||
transforms = [[ t_const_int ]] #, [ t_const_int, t_norm ]]
|
||||
batch_sizes = [ 64 ] # , 16]
|
||||
splitters = [ DataSplitter(100) ]
|
||||
num_epochs = [ 80 ]
|
||||
transforms = [ t_const_int, t_norm ]
|
||||
st = MLSettings(num_features=1,
|
||||
num_layers=1,
|
||||
hidden_size=1,
|
||||
bidirectional=True,
|
||||
transforms=transforms,
|
||||
num_epochs=40,
|
||||
batch_size=3,
|
||||
labels=labels,
|
||||
)
|
||||
|
||||
# num_layers=1,
|
||||
# hidden_size=1,
|
||||
# bidirectional=True,
|
||||
# optimizer=None,
|
||||
# scheduler=None,
|
||||
# loss_func=None,
|
||||
# transforms=[],
|
||||
# splitter=None,
|
||||
# num_epochs=10,
|
||||
# batch_size=5,
|
||||
args = [num_layers, hidden_size, bidirectional, [None], [None], [None], transforms, splitters, num_epochs, batch_sizes]
|
||||
print(f"Using device: {device}")
|
||||
|
||||
# create settings for every possible combination
|
||||
settings = [
|
||||
MLSettings(1, *params, labels) for params in itertools.product(*args)
|
||||
]
|
||||
|
||||
loss_func = nn.CrossEntropyLoss()
|
||||
optimizers = [
|
||||
lambda model: torch.optim.Adam(model.parameters(), lr=0.03),
|
||||
# lambda model: torch.optim.Adam(model.parameters(), lr=0.25),
|
||||
# lambda model: torch.optim.Adam(model.parameters(), lr=0.50),
|
||||
]
|
||||
schedulers = [
|
||||
lambda optimizer, st: torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9),
|
||||
lambda optimizer, st: torch.optim.lr_scheduler.StepLR(optimizer, step_size=st.num_epochs // 10, gamma=0.40, verbose=False),
|
||||
# lambda optimizer, st: torch.optim.lr_scheduler.StepLR(optimizer, step_size=st.num_epochs // 10, gamma=0.75, verbose=False),
|
||||
]
|
||||
train_set, test_set = load_datasets("/home/matth/Uni/TENG/data", labels, voltage=8.2, transforms=st.transforms, train_to_test_ratio=0.7, random_state=42)
|
||||
|
||||
n_total = len(settings) * len(optimizers) * len(schedulers)
|
||||
print(f"Testing {n_total} possible configurations")
|
||||
# scheduler2 =
|
||||
def create_model(st, optimizer_f, scheduler_f):
|
||||
model=RNN(input_size=st.num_features, hidden_size=st.hidden_size, num_layers=st.num_layers, num_classes=len(labels), bidirectional=st.bidirectional)
|
||||
optimizer = optimizer_f(model)
|
||||
scheduler = scheduler_f(optimizer, st)
|
||||
return model, optimizer, scheduler
|
||||
|
||||
t_begin = time.time()
|
||||
n = 1
|
||||
for o in range(len(optimizers)):
|
||||
for s in range(len(schedulers)):
|
||||
for i in range(len(settings)):
|
||||
st = settings[i]
|
||||
# print(st.get_name())
|
||||
train_set, test_set = load_datasets(data_dir, labels, voltage=8.2, transforms=st.transforms, split_function=st.splitter, train_to_test_ratio=0.7, random_state=42, num_workers=4)
|
||||
|
||||
generator = torch.manual_seed(42)
|
||||
# train_loader = iter(DataLoader(train_set))
|
||||
# test_loader = iter(DataLoader(test_set))
|
||||
train_loader = DataLoader(train_set, batch_size=st.batch_size, shuffle=True, generator=generator)
|
||||
test_loader = DataLoader(test_set, batch_size=st.batch_size, shuffle=True, generator=generator)
|
||||
print(f"Testing {n}/{n_total}: (o={o}, s={s}, i={i})")
|
||||
model, optimizer, scheduler = create_model(st, optimizers[o], schedulers[s])
|
||||
device = select_device(force_device="cpu")
|
||||
try:
|
||||
train_validate_save(model, optimizer, scheduler, loss_func, train_loader, test_loader, st, models_dir, print_interval=4)
|
||||
except KeyboardInterrupt:
|
||||
if input("Cancelled current training. Quit? (q/*): ") == "q":
|
||||
t_end = time.time()
|
||||
print(f"Testing took {t_end - t_begin:.2f}s = {(t_end-t_begin)/60:.1f}m")
|
||||
exit()
|
||||
n += 1
|
||||
train_loader = DataLoader(train_set, batch_size=st.batch_size, shuffle=True)
|
||||
test_loader = DataLoader(test_set, batch_size=st.batch_size, shuffle=True)
|
||||
|
||||
t_end = time.time()
|
||||
print(f"Testing took {t_end - t_begin:.2f}s = {(t_end-t_begin)/60:.1f}m")
|
||||
class RNN(nn.Module):
|
||||
def __init__(self, input_size, hidden_size, num_layers, num_classes, bidirectional):
|
||||
super(RNN, self).__init__()
|
||||
self.num_layers = num_layers
|
||||
self.hidden_size = hidden_size
|
||||
self.is_bidirectional = bidirectional
|
||||
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=bidirectional)
|
||||
# x = (batch_size, sequence, feature)
|
||||
|
||||
if bidirectional == True:
|
||||
self.fc = nn.Linear(hidden_size * 2, num_classes)
|
||||
else:
|
||||
self.fc = nn.Linear(hidden_size, num_classes)
|
||||
|
||||
self.softmax = nn.Softmax(dim=1)
|
||||
|
||||
def forward(self, x):
|
||||
# x: batches, length, features
|
||||
# print(f"forward pass")
|
||||
D = 2 if self.is_bidirectional == True else 1
|
||||
|
||||
# print(f"x({x.shape})=...")
|
||||
batch_size = x.shape[0]
|
||||
# print(f"batch_size={batch_size}")
|
||||
|
||||
h0 = torch.zeros(D * self.num_layers, batch_size, self.hidden_size).to(device)
|
||||
# print(f"h1({h0.shape})=...")
|
||||
c0 = torch.zeros(D * self.num_layers, batch_size, self.hidden_size).to(device)
|
||||
x.to(device)
|
||||
_, (h_n, _) = self.lstm(x, (h0, c0))
|
||||
# print(f"h_n({h_n.shape})=...")
|
||||
final_state = h_n.view(self.num_layers, D, batch_size, self.hidden_size)[-1] # num_layers, num_directions, batch, hidden_size
|
||||
# print(f"final_state({final_state.shape})=...")
|
||||
|
||||
if D == 1:
|
||||
X = final_state.squeeze() # TODO what if batch_size == 1
|
||||
elif D == 2:
|
||||
h_1, h_2 = final_state[0], final_state[1] # forward & backward pass
|
||||
#X = h_1 + h_2 # Add both states
|
||||
X = torch.cat((h_1, h_2), 1) # Concatenate both states, X-size: (Batch, hidden_size * 2)
|
||||
else:
|
||||
raise ValueError("D must be 1 or 2")
|
||||
# print(f"X({X.shape})={X}")
|
||||
output = self.fc(X) # fully-connected layer
|
||||
# print(f"out({output.shape})={output}")
|
||||
output = self.softmax(output)
|
||||
# print(f"out({output.shape})={output}")
|
||||
return output
|
||||
|
||||
model=RNN(input_size=st.num_features, hidden_size=st.hidden_size, num_layers=st.num_layers, num_classes=len(labels), bidirectional=st.bidirectional).to(device)
|
||||
loss_func = torch.nn.CrossEntropyLoss()
|
||||
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)
|
||||
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)
|
||||
|
||||
print(f"model:", model)
|
||||
print(f"loss_func={loss_func}")
|
||||
print(f"optimizer={optimizer}")
|
||||
print(f"scheduler={scheduler}")
|
||||
|
||||
|
||||
|
||||
epoch_tracker = EpochTracker(labels)
|
||||
|
||||
print(f"train_loader")
|
||||
for i, (data, y) in enumerate(train_loader):
|
||||
print(y)
|
||||
print(f"{i:3} - {torch.argmax(y, dim=1, keepdim=False)}")
|
||||
|
||||
|
||||
# training
|
||||
epoch_tracker.train_begin()
|
||||
for ep in range(st.num_epochs):
|
||||
for i, (data, y) in enumerate(train_loader):
|
||||
# print(data, y)
|
||||
# data = batch, seq, features
|
||||
# print(f"data({data.shape})={data}")
|
||||
x = data[:,:,[2]].float() # select voltage data
|
||||
# print(f"x({x.shape}, {x.dtype})=...")
|
||||
# print(f"y({y.shape}, {y.dtype})=...")
|
||||
# length = torch.tensor([x.shape[1] for _ in range(x.shape[0])], dtype=torch.int64)
|
||||
# print(f"length({length.shape})={length}")
|
||||
# batch_size = x.shape[0]
|
||||
# print(f"batch_size={batch_size}")
|
||||
# v = x.view(batch_size, -1, feature_count)
|
||||
# data = rnn_utils.pack_padded_sequence(v.type(torch.FloatTensor), length, batch_first=True).to(device)[0]
|
||||
# print(f"data({data.shape})={data}")
|
||||
# print(data.batch_sizes[0])
|
||||
# print(data)
|
||||
out = model(x)
|
||||
# print(f"out({out.shape}={out})")
|
||||
loss = loss_func(out, y)
|
||||
# print(loss)
|
||||
|
||||
optimizer.zero_grad() # clear gradients for next train
|
||||
loss.backward() # backpropagation, compute gradients
|
||||
optimizer.step() # apply gradients
|
||||
|
||||
# predicted = torch.max(torch.nn.functional.softmax(out), 1)[1]
|
||||
predicted = torch.argmax(out, dim=1, keepdim=False) # -> [ label_indices ]
|
||||
correct = torch.argmax(y, dim=1, keepdim=False) # -> [ label_indices ]
|
||||
# print(f"predicted={predicted}, correct={correct}")
|
||||
# train_total += y.size(0)
|
||||
# train_correct += (predicted == correct).sum().item()
|
||||
epoch_tracker.train(correct, predicted)
|
||||
epoch_tracker.next_epoch(loss)
|
||||
print(epoch_tracker.get_last_epoch_summary_str())
|
||||
scheduler.step()
|
||||
t_end = time.time()
|
||||
|
||||
with torch.no_grad():
|
||||
for i, (data, y) in enumerate(test_loader):
|
||||
# print(ep, "Test")
|
||||
x = data[:,:,[2]].float()
|
||||
out = model(x)
|
||||
loss = loss_func(out, y)
|
||||
|
||||
predicted = torch.argmax(out, dim=1, keepdim=False) # -> [ label_indices ]
|
||||
correct = torch.argmax(y, dim=1, keepdim=False) # -> [ label_indices ]
|
||||
# print(f"predicted={predicted}, correct={correct}")
|
||||
# val_total += y.size(0)
|
||||
# val_correct += (predicted == correct).sum().item()
|
||||
|
||||
epoch_tracker.test(correct, predicted)
|
||||
|
||||
# print(f"train_total={train_total}, val_total={val_total}")
|
||||
# if train_total == 0: train_total = -1
|
||||
# if val_total == 0: val_total = -1
|
||||
|
||||
# print(f"epoch={ep+1:3}: Testing accuracy={100 * val_correct / val_total:.2f}")
|
||||
# print(f"End result: Training accuracy={100 * train_correct / train_total:.2f}%, Testing accuracy={100 * val_correct / val_total:.2f}, training took {t_end - t_begin:.2f} seconds")
|
||||
|
||||
epoch_tracker.get_test_statistics()
|
||||
# epoch_tracker.()
|
||||
|
||||
# print(epoch_tracker.get_training_summary_str())
|
||||
print(epoch_tracker.get_training_count_per_label())
|
||||
|
||||
model_name = st.get_name()
|
||||
# save the settings, results and model
|
||||
with open(model_name + "_settings.pkl", "wb") as file:
|
||||
pickle.dump(st, file)
|
||||
|
||||
with open(model_name + "_results.pkl", "wb") as file:
|
||||
pickle.dump(epoch_tracker, file)
|
||||
|
||||
with open(model_name + "_model.pkl", "wb") as file:
|
||||
pickle.dump(model, file)
|
||||
|
||||
|
@ -17,7 +17,6 @@ if __name__ == "__main__":
|
||||
sys.path.insert(0, path.dirname(path.dirname(filepath)))
|
||||
|
||||
from .util.transform import Normalize
|
||||
from .util.data_loader import get_datafiles
|
||||
|
||||
file = "/home/matth/data/2023-04-25_kapton_8.2V_179mm002.csv"
|
||||
|
||||
|
39
teng-ml/rnn.py
Normal file
39
teng-ml/rnn.py
Normal file
@ -0,0 +1,39 @@
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
|
||||
# BiLSTM Model
|
||||
|
||||
class RNN(nn.Module):
|
||||
def __init__(self, input_size, hidden_size, num_layers, num_classes, if_bidirectional):
|
||||
super(RNN, self).__init__()
|
||||
self.num_layers = num_layers
|
||||
self.hidden_size = hidden_size
|
||||
self.if_bidirectional = if_bidirectional
|
||||
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=if_bidirectional)
|
||||
|
||||
if if_bidirectional == True:
|
||||
self.fc = nn.Linear(hidden_size * 2, num_classes)
|
||||
else:
|
||||
self.fc = nn.Linear(hidden_size, num_classes)
|
||||
|
||||
|
||||
def forward(self, x):
|
||||
D = 2 if self.if_bidirectional == True else 1
|
||||
Batch = x.batch_sizes[0]
|
||||
|
||||
h0 = torch.zeros(D * self.num_layers, Batch, self.hidden_size).to(device)
|
||||
c0 = torch.zeros(D * self.num_layers, Batch, self.hidden_size).to(device)
|
||||
x.to(device)
|
||||
_, (h_n, _) = self.lstm(x, (h0, c0))
|
||||
final_state = h_n.view(self.num_layers, D, Batch, self.hidden_size)[-1] # num_layers, num_directions, batch, hidden_size
|
||||
|
||||
if D == 1:
|
||||
X = final_state.squeeze()
|
||||
elif D == 2:
|
||||
h_1, h_2 = final_state[0], final_state[1] # forward & backward pass
|
||||
# X = h_1 + h_2 # Add both states
|
||||
X = torch.cat((h_1, h_2), 1) # Concatenate both states, X-size: (Batch, hidden_size * 2)
|
||||
|
||||
output = self.fc(X) # fully-connected layer
|
||||
|
||||
return output
|
@ -1,80 +0,0 @@
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
|
||||
class RNN(nn.Module):
|
||||
"""
|
||||
(Bi)LSTM for name classification
|
||||
"""
|
||||
def __init__(self, input_size, hidden_size, num_layers, num_classes, bidirectional):
|
||||
super(RNN, self).__init__()
|
||||
self.num_layers = num_layers
|
||||
self.hidden_size = hidden_size
|
||||
self.is_bidirectional = bidirectional
|
||||
self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, bidirectional=bidirectional)
|
||||
# x = (batch_size, sequence, feature)
|
||||
|
||||
if bidirectional == True:
|
||||
self.fc = nn.Linear(hidden_size * 2, num_classes)
|
||||
else:
|
||||
self.fc = nn.Linear(hidden_size, num_classes)
|
||||
|
||||
self.softmax = nn.Softmax(dim=1)
|
||||
|
||||
def forward(self, x):
|
||||
# x: batches, length, features
|
||||
# print(f"forward pass")
|
||||
D = 2 if self.is_bidirectional == True else 1
|
||||
# print(f"x({x.shape})=...")
|
||||
batch_size = x.shape[0]
|
||||
|
||||
device = x.device
|
||||
|
||||
h0 = torch.zeros(D * self.num_layers, batch_size, self.hidden_size).to(device)
|
||||
# print(f"h1({h0.shape})=...")
|
||||
c0 = torch.zeros(D * self.num_layers, batch_size, self.hidden_size).to(device)
|
||||
|
||||
out, (h_n, c_n) = self.lstm(x, (h0, c0))
|
||||
# out: (N, L, D * hidden_size)
|
||||
# h_n: (D * num_layers, hidden_size)
|
||||
# c_n: (D * num_layers, hidden_size)
|
||||
# print(f"out({out.shape})={out}")
|
||||
# print(f"h_n({h_n.shape})={h_n}")
|
||||
# print(f"c_n({c_n.shape})={c_n}")
|
||||
# print(f"out({out.shape})=...")
|
||||
# print(f"h_n({h_n.shape})=...")
|
||||
# print(f"c_n({c_n.shape})=...")
|
||||
|
||||
"""
|
||||
# select only last layer [-1] -> last layer,
|
||||
last_layer_state = h_n.view(self.num_layers, D, batch_size, self.hidden_size)[-1]
|
||||
if D == 1:
|
||||
# [1, batch_size, hidden_size] -> [batch_size, hidden_size]
|
||||
X = last_layer_state.squeeze() # TODO what if batch_size == 1
|
||||
elif D == 2:
|
||||
h_1, h_2 = last_layer_state[0], last_layer_state[1] # states of both directions
|
||||
# concatenate both states, X-size: (Batch, hidden_size * 2)
|
||||
X = torch.cat((h_1, h_2), dim=1)
|
||||
else:
|
||||
raise ValueError("D must be 1 or 2")
|
||||
""" # all this is quivalent to line below
|
||||
out = out[:,-1,:] # select last time step
|
||||
|
||||
# fc: (*, hidden_size) -> (*, num_classes)
|
||||
# print(f"X({X.shape})={X}")
|
||||
# print(f"X({X.shape})=...")
|
||||
out = self.fc(out) # fully-connected layer
|
||||
# print(f"out({output.shape})={output}")
|
||||
# print(f"output({output.shape})=...")
|
||||
# softmax: (*) -> (*)
|
||||
# out = self.softmax(out)
|
||||
# print(f"output({output.shape})=...")
|
||||
# print(f"output({output.shape})={output}")
|
||||
|
||||
"""
|
||||
out(torch.Size([15, 200, 10]))=...
|
||||
h_n(torch.Size([3, 15, 10]))=...
|
||||
c_n(torch.Size([3, 15, 10]))=...
|
||||
X(torch.Size([3, 1, 15, 10]))=...
|
||||
output(torch.Size([3, 1, 15, 6]))=...
|
||||
output(torch.Size([3, 1, 15, 6]))=..."""
|
||||
return out
|
@ -1,150 +0,0 @@
|
||||
from os import makedirs, path
|
||||
import torch
|
||||
import pickle
|
||||
import matplotlib.pyplot as plt
|
||||
from torch.utils.data import DataLoader
|
||||
|
||||
from ..util.settings import MLSettings
|
||||
from ..tracker.epoch_tracker import EpochTracker
|
||||
from ..util.file_io import get_next_digits
|
||||
from ..util.string import class_str
|
||||
|
||||
from ..util import model_io as mio
|
||||
|
||||
|
||||
def select_device(force_device=None):
|
||||
"""
|
||||
Select best device and move model
|
||||
"""
|
||||
if force_device is not None:
|
||||
device = force_device
|
||||
else:
|
||||
device = torch.device(
|
||||
"cuda"
|
||||
if torch.cuda.is_available()
|
||||
# else "mps"
|
||||
# if torch.backends.mps.is_available()
|
||||
else "cpu"
|
||||
)
|
||||
# print(device, torch.cuda.get_device_name(device), torch.cuda.get_device_properties(device))
|
||||
return device
|
||||
|
||||
|
||||
def train(model, optimizer, scheduler, loss_func, train_loader: DataLoader, st: MLSettings, print_interval=1) -> EpochTracker:
|
||||
epoch_tracker = EpochTracker(st.labels)
|
||||
epoch_tracker.begin()
|
||||
for ep in range(st.num_epochs):
|
||||
loss = -1
|
||||
for i, (data, y) in enumerate(train_loader):
|
||||
# print(data, y)
|
||||
# data = batch, seq, features
|
||||
# print(f"data({data.shape})={data}")
|
||||
x = data[:,:,[2]].float() # select voltage data
|
||||
# print(f"x({x.shape}, {x.dtype})=...")
|
||||
# print(f"y({y.shape}, {y.dtype})=...")
|
||||
# length = torch.tensor([x.shape[1] for _ in range(x.shape[0])], dtype=torch.int64)
|
||||
# print(f"length({length.shape})={length}")
|
||||
# batch_size = x.shape[0]
|
||||
# print(f"batch_size={batch_size}")
|
||||
# v = x.view(batch_size, -1, feature_count)
|
||||
# data = rnn_utils.pack_padded_sequence(v.type(torch.FloatTensor), length, batch_first=True).to(device)[0]
|
||||
# print(f"data({data.shape})={data}")
|
||||
out = model(x)
|
||||
|
||||
# print(f"out({out.shape}={out})")
|
||||
# print(f" y({y.shape}={y})")
|
||||
with torch.no_grad():
|
||||
predicted = torch.argmax(out, dim=1, keepdim=False) # -> [ label_indices ]
|
||||
correct = torch.argmax(y, dim=1, keepdim=False) # -> [ label_indices ]
|
||||
# print(f"predicted={predicted}, correct={correct}")
|
||||
# train_total += y.size(0)
|
||||
# train_correct += (predicted == correct).sum().item()
|
||||
epoch_tracker.add_prediction(correct, predicted)
|
||||
# predicted2 = torch.argmax(out, dim=1, keepdim=True) # -> [ label_indices ]
|
||||
# print(f"correct={correct}, y={y}")
|
||||
loss = loss_func(out, correct)
|
||||
# loss = loss_func(out, y)
|
||||
|
||||
|
||||
optimizer.zero_grad() # clear gradients for next train
|
||||
loss.backward() # backpropagation, compute gradients
|
||||
optimizer.step() # apply gradients
|
||||
|
||||
# predicted = torch.max(torch.nn.functional.softmax(out), 1)[1]
|
||||
epoch_tracker.end_epoch(loss, optimizer.param_groups[0]["lr"])
|
||||
if ep+1 % print_interval == 0:
|
||||
print(f"Training:", epoch_tracker.get_epoch_summary_str())
|
||||
scheduler.step()
|
||||
print("Training:", epoch_tracker.end())
|
||||
return epoch_tracker
|
||||
|
||||
|
||||
def validate(model, test_loader: DataLoader, st: MLSettings) -> EpochTracker:
|
||||
epoch_tracker = EpochTracker(st.labels)
|
||||
epoch_tracker.begin()
|
||||
with torch.no_grad():
|
||||
for i, (data, y) in enumerate(test_loader):
|
||||
# print(ep, "Test")
|
||||
x = data[:,:,[2]].float()
|
||||
out = model(x)
|
||||
|
||||
predicted = torch.argmax(out, dim=1, keepdim=False) # -> [ label_indices ]
|
||||
correct = torch.argmax(y, dim=1, keepdim=False) # -> [ label_indices ]
|
||||
|
||||
epoch_tracker.add_prediction(correct, predicted)
|
||||
print("Validation:", epoch_tracker.end())
|
||||
return epoch_tracker
|
||||
|
||||
|
||||
def train_validate_save(model, optimizer, scheduler, loss_func, train_loader: DataLoader, test_loader: DataLoader, st: MLSettings, models_dir, print_interval=1, show_plots=False):
|
||||
# assumes model and data is already on correct device
|
||||
# train_loader.to(device)
|
||||
# test_loader.to(device)
|
||||
|
||||
# store optimizer, scheduler and loss_func in settings
|
||||
st.optimizer = class_str(optimizer)
|
||||
st.scheduler = class_str(scheduler)
|
||||
st.loss_func = class_str(loss_func)
|
||||
|
||||
model_name = st.get_name()
|
||||
|
||||
def add_tab(s):
|
||||
return "\t" + str(s).replace("\n", "\n\t")
|
||||
print(100 * '=')
|
||||
print("Model Name:", model_name)
|
||||
print(f"model:\n", add_tab(model))
|
||||
# print(f"loss_func:\n", add_tab(class_str(loss_func)))
|
||||
# print(f"optimizer:\n", add_tab(class_str(optimizer)))
|
||||
# print(f"scheduler:\n", add_tab(class_str(scheduler)))
|
||||
|
||||
|
||||
print(100 * '-')
|
||||
training_tracker = train(model, optimizer, scheduler, loss_func, train_loader, st, print_interval=print_interval)
|
||||
# print("Training: Count per label:", training_tracker.get_count_per_label())
|
||||
# print("Training: Predictions per label:", training_tracker.get_predictions_per_label())
|
||||
|
||||
print(100 * '-')
|
||||
validation_tracker = validate(model, test_loader, st)
|
||||
# print("Validation: Count per label:", validation_tracker.get_count_per_label())
|
||||
# print("Validation: Predictions per label:", validation_tracker.get_predictions_per_label())
|
||||
|
||||
|
||||
digits = get_next_digits(f"{model_name}_", models_dir)
|
||||
model_dir = f"{models_dir}/{model_name}_{digits}"
|
||||
# do not put earlier, since the dir should not be created if training is interrupted
|
||||
if not path.isdir(model_dir): # should always run, if not the digits function did not work
|
||||
makedirs(model_dir)
|
||||
|
||||
fig, _ = validation_tracker.plot_predictions("Validation: Predictions", model_dir=model_dir, name="img_validation_predictions")
|
||||
fig, _ = training_tracker.plot_predictions("Training: Predictions", model_dir=model_dir, name="img_training_predictions")
|
||||
fig, _ = training_tracker.plot_training(model_dir=model_dir)
|
||||
|
||||
if show_plots:
|
||||
plt.show()
|
||||
plt.close('all')
|
||||
|
||||
# save the settings, results and model
|
||||
mio.save_settings(model_dir, st)
|
||||
mio.save_tracker_validation(model_dir, validation_tracker)
|
||||
mio.save_tracker_training(model_dir, training_tracker)
|
||||
mio.save_model(model_dir, model)
|
@ -1,187 +0,0 @@
|
||||
from ..util.data_loader import LabelConverter
|
||||
import matplotlib.pyplot as plt
|
||||
import time
|
||||
import torch
|
||||
import numpy as np
|
||||
|
||||
class EpochTracker:
|
||||
"""
|
||||
Track accuracy, loss, learning_rate etc. during model training
|
||||
Can also be used for validation (which will probably be only one epoch)
|
||||
"""
|
||||
def __init__(self, labels: LabelConverter):
|
||||
self.labels = labels
|
||||
|
||||
self.times: list[float] = [] # (epoch)
|
||||
self.predictions = [[]] # (epoch, batch_nr, (correct_indices | predicted_indices), ind:ex_nr)
|
||||
self.loss: list[float] = [] # (epoch)
|
||||
self.learning_rate: list[float] = [] # (epoch)
|
||||
self.epochs: list[int] = [] # 1 based for FINISHED epochs
|
||||
self._current_epoch = 0 # 0 based
|
||||
|
||||
# after training
|
||||
self.accuracies: list[float] = [] # (epoch)
|
||||
|
||||
def begin(self):
|
||||
self.times.append(time.time())
|
||||
|
||||
def end(self):
|
||||
self.times.append(time.time())
|
||||
# if end_epoch was called before end:
|
||||
if len(self.predictions[-1]) == 0:
|
||||
self.predictions.pop()
|
||||
self._current_epoch -= 1
|
||||
else: # if end_epoch was not called
|
||||
self.epochs.append(len(self.epochs) + 1)
|
||||
self._calculate_accuracies(self._current_epoch)
|
||||
|
||||
|
||||
s = f"Summary: After {self.epochs[-1]} epochs: "
|
||||
s += f"Accuracy={self.accuracies[-1]:.2f}%"
|
||||
s += f", Total time={self.get_total_time():.2f}s"
|
||||
return s
|
||||
|
||||
|
||||
|
||||
def get_total_time(self):
|
||||
if len(self.times) > 1: return self.times[-1] - self.times[0]
|
||||
else: return -1
|
||||
|
||||
#
|
||||
# EPOCH
|
||||
#
|
||||
def end_epoch(self, loss, learning_rate):
|
||||
"""
|
||||
loss and learning_rate of last epoch
|
||||
call before scheduler.step()
|
||||
"""
|
||||
self.times.append(time.time())
|
||||
self.epochs.append(len(self.epochs) + 1)
|
||||
if type(loss) == torch.Tensor: self.loss.append(loss.item())
|
||||
else: self.loss.append(loss)
|
||||
self.learning_rate.append(learning_rate)
|
||||
self._calculate_accuracies(self._current_epoch)
|
||||
|
||||
self._current_epoch += 1
|
||||
self.predictions.append([])
|
||||
|
||||
def get_epoch_summary_str(self, ep=-1):
|
||||
"""call after next_epoch()"""
|
||||
m = max(ep, 0) # if ep == -1, check if len is > 0
|
||||
assert(len(self.epochs) > m)
|
||||
s = f"Epoch {self.epochs[ep]:3}"
|
||||
if len(self.accuracies) > m:s += f", Accuracy={self.accuracies[ep]:.2f}%"
|
||||
if len(self.loss) > m: s += f", Loss={self.loss[ep]:.3f}"
|
||||
if len(self.loss) > m: s += f", lr={self.learning_rate[ep]:.4f}"
|
||||
if len(self.times) > m+1: s += f", dt={self.times[ep] - self.times[ep-1]:.2f}s"
|
||||
return s
|
||||
|
||||
def add_prediction(self, correct_indices: torch.Tensor, predicted_indices: torch.Tensor):
|
||||
"""for accuracy calculation"""
|
||||
self.predictions[self._current_epoch].append((correct_indices.detach().numpy(), predicted_indices.detach().numpy()))
|
||||
|
||||
#
|
||||
# STATISTICS
|
||||
#
|
||||
def get_count_per_label(self, epoch=-1):
|
||||
"""
|
||||
the number of times where <label> was the correct label, per label
|
||||
@returns shape: (label)
|
||||
"""
|
||||
count_per_label = [ 0 for _ in range(len(self.labels)) ]
|
||||
for corr, _ in self.predictions[epoch]:
|
||||
for batch in range(len(corr)):
|
||||
count_per_label[corr[batch]] += 1
|
||||
return count_per_label
|
||||
|
||||
def get_predictions_per_label(self, epoch=-1):
|
||||
"""
|
||||
How often label_i was predicted, when label_j was the correct label
|
||||
@returns shape: (label_j, label_i)
|
||||
"""
|
||||
statistics = [ [ 0 for _ in range(len(self.labels)) ] for _ in range(len(self.labels)) ]
|
||||
for corr, pred in self.predictions[epoch]:
|
||||
for batch in range(len(corr)):
|
||||
statistics[corr[batch]][pred[batch]] += 1
|
||||
return statistics
|
||||
|
||||
def plot_training(self, title="Training Summary", model_dir=None, name="img_training"):
|
||||
"""
|
||||
@param model_dir: Optional. If given, save to model_dir as svg
|
||||
"""
|
||||
fig, ax = plt.subplots(nrows=3, ncols=1, sharex=True, layout="tight")
|
||||
|
||||
ax[0].plot(self.epochs, self.accuracies, color="red")
|
||||
ax[0].set_ylabel("Accuracy")
|
||||
|
||||
ax[1].plot(self.epochs, self.learning_rate, color="green")
|
||||
ax[1].set_ylabel("Learning Rate")
|
||||
|
||||
ax[2].plot(self.epochs, self.loss, color="blue")
|
||||
ax[2].set_ylabel("Loss")
|
||||
|
||||
fig.suptitle(title)
|
||||
ax[2].set_xlabel("Epoch")
|
||||
plt.tight_layout()
|
||||
if model_dir is not None:
|
||||
fig.savefig(f"{model_dir}/{name}.svg")
|
||||
|
||||
return fig, ax
|
||||
|
||||
def plot_predictions(self, title="Predictions per Label", ep=-1, model_dir=None, name="img_training_predictions"):
|
||||
"""
|
||||
@param model_dir: Optional. If given, save to model_dir as svg
|
||||
@param ep: Epoch, defaults to last
|
||||
"""
|
||||
# Normalize the data
|
||||
predictions_per_label = self.get_predictions_per_label(ep)
|
||||
normalized_predictions = predictions_per_label / np.sum(predictions_per_label, axis=1, keepdims=True)
|
||||
|
||||
N = len(self.labels)
|
||||
label_names = self.labels.get_labels()
|
||||
|
||||
fig, ax = plt.subplots(layout="tight")
|
||||
|
||||
im = ax.imshow(normalized_predictions, cmap='Blues') # cmap='BuPu'
|
||||
ax.set_xticks(np.arange(N))
|
||||
ax.set_yticks(np.arange(N))
|
||||
ax.set_xticklabels(label_names)
|
||||
ax.set_yticklabels(label_names)
|
||||
ax.set_xlabel('Predicted Label')
|
||||
ax.set_ylabel('Correct Label')
|
||||
|
||||
# horizontal lines between labels to better show that the sum of a row is 1
|
||||
for i in range(1, N):
|
||||
ax.axhline(i-0.5, color='black', linewidth=1)
|
||||
|
||||
# rotate the x-axis labels for better readability
|
||||
plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
|
||||
|
||||
# create annotations
|
||||
for i in range(N):
|
||||
for j in range(N):
|
||||
text = ax.text(j, i, round(normalized_predictions[i, j], 2),
|
||||
ha="center", va="center", color="black")
|
||||
|
||||
# add colorbar
|
||||
cbar = ax.figure.colorbar(im, ax=ax)
|
||||
|
||||
ax.set_title(title)
|
||||
plt.tight_layout()
|
||||
if model_dir is not None:
|
||||
fig.savefig(f"{model_dir}/{name}.svg")
|
||||
return fig, ax
|
||||
|
||||
#
|
||||
# CALCULATION
|
||||
#
|
||||
def _calculate_accuracies(self, ep):
|
||||
correct_predictions = 0
|
||||
total_predictions = 0
|
||||
for correct_indices, predicted_indices in self.predictions[ep]:
|
||||
correct_predictions += (predicted_indices == correct_indices).sum().item()
|
||||
total_predictions += len(predicted_indices)
|
||||
accuracy = correct_predictions / total_predictions * 100
|
||||
while len(self.accuracies) <= ep:
|
||||
self.accuracies.append(-1)
|
||||
self.accuracies[ep] = accuracy
|
@ -3,9 +3,6 @@ from os import path, listdir
|
||||
import re
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from scipy.sparse import data
|
||||
|
||||
import threading
|
||||
|
||||
from sklearn.model_selection import train_test_split
|
||||
|
||||
@ -13,7 +10,7 @@ from sklearn.model_selection import train_test_split
|
||||
re_filename = r"(\d{4}-\d{2}-\d{2})_([a-zA-Z_]+)_(\d{1,2}(?:\.\d*)?)V_(\d+(?:\.\d*)?)mm(\d+).csv"
|
||||
|
||||
class LabelConverter:
|
||||
def __init__(self, class_labels: list[str]):
|
||||
def __init__(self, class_labels):
|
||||
self.class_labels = class_labels.copy()
|
||||
self.class_labels.sort()
|
||||
|
||||
@ -35,12 +32,10 @@ class LabelConverter:
|
||||
def get_labels(self):
|
||||
return self.class_labels.copy()
|
||||
|
||||
def __repr__(self):
|
||||
return str(self.class_labels)
|
||||
|
||||
|
||||
class Datasample:
|
||||
def __init__(self, date: str, label: str, voltage: str, distance: str, index: str, label_vec, datapath: str, init_data=False):
|
||||
def __init__(self, date: str, label: str, voltage: str, distance: str, index: str, label_vec, datapath: str):
|
||||
self.date = date
|
||||
self.label = label
|
||||
self.voltage = float(voltage)
|
||||
@ -49,62 +44,50 @@ class Datasample:
|
||||
self.label_vec = label_vec
|
||||
self.datapath = datapath
|
||||
self.data = None
|
||||
if init_data: self._load_data()
|
||||
|
||||
def __repr__(self):
|
||||
size = self.data.size if self.data is not None else "Unknown"
|
||||
return f"{self.label}-{self.index}: dimension={size}, recorded at {self.date} with U={self.voltage}V, d={self.distance}mm"
|
||||
|
||||
def _load_data(self):
|
||||
# df = pd.read_csv(self.datapath)
|
||||
self.data = np.loadtxt(self.datapath, skiprows=1, dtype=np.float32, delimiter=",")
|
||||
df = pd.read_csv(self.datapath)
|
||||
self.data = df.to_numpy(dtype=np.float32)
|
||||
|
||||
def get_data(self):
|
||||
"""[[timestamp, idata, vdata]]"""
|
||||
"""[[timestamps, idata, vdata]]"""
|
||||
if self.data is None:
|
||||
self._load_data()
|
||||
return self.data
|
||||
|
||||
|
||||
class Dataset:
|
||||
"""
|
||||
Store the whole dataset, compatible with torch.data.Dataloader
|
||||
"""
|
||||
def __init__(self, datasamples, transforms=[], split_function=None):
|
||||
"""
|
||||
@param transforms: single callable or list of callables that are applied to the data (before eventual split)
|
||||
@param split_function: (data) -> [data0, data1...] callable that splits the data
|
||||
"""
|
||||
def __init__(self, datasamples, transforms=None):
|
||||
self.datasamples = datasamples
|
||||
self.transforms = transforms
|
||||
self.data = [] # (data, label)
|
||||
for sample in datasamples:
|
||||
data = self.apply_transforms(sample.get_data())
|
||||
if split_function is None:
|
||||
self.data.append((data, sample.label_vec))
|
||||
else:
|
||||
for data_split in split_function(data):
|
||||
self.data.append((data_split, sample.label_vec))
|
||||
# self.labels = [ d.label_vec for d in datasamples ]
|
||||
# self.data = [ d.get_data() for d in datasamples ]
|
||||
|
||||
def apply_transforms(self, data):
|
||||
def __getitem__(self, index):
|
||||
data, label = self.datasamples[index].get_data(), self.datasamples[index].label_vec
|
||||
# print(f"loading dataset {self.datasamples[index]}")
|
||||
if type(self.transforms) == list:
|
||||
for t in self.transforms:
|
||||
data = t(data)
|
||||
elif self.transforms is not None:
|
||||
elif self.transforms:
|
||||
data = self.transforms(data)
|
||||
return data
|
||||
|
||||
def __getitem__(self, index):
|
||||
return self.data[index]
|
||||
# TODO
|
||||
return data[:2000], label
|
||||
|
||||
def __len__(self):
|
||||
return len(self.data)
|
||||
return len(self.datasamples)
|
||||
|
||||
|
||||
def get_datafiles(datadir, labels: LabelConverter, voltage=None):
|
||||
def load_datasets(datadir, labels: LabelConverter, transforms=None, voltage=None, train_to_test_ratio=0.7, random_state=None):
|
||||
"""
|
||||
get a list of all matching datafiles from datadir that are in the format: yyyy-mm-dd_label_x.xV_xxxmm.csv
|
||||
load all data from datadir that are in the format: yyyy-mm-dd_label_x.xV_xxxmm.csv
|
||||
"""
|
||||
datafiles = []
|
||||
datasamples = []
|
||||
files = listdir(datadir)
|
||||
files.sort()
|
||||
for file in files:
|
||||
@ -116,38 +99,9 @@ def get_datafiles(datadir, labels: LabelConverter, voltage=None):
|
||||
|
||||
sample_voltage = float(match.groups()[2])
|
||||
if voltage and voltage != sample_voltage: continue
|
||||
datafiles.append((datadir + "/" + file, match, label))
|
||||
return datafiles
|
||||
|
||||
|
||||
def load_datasets(datadir, labels: LabelConverter, transforms=None, split_function=None, voltage=None, train_to_test_ratio=0.7, random_state=None, num_workers=None):
|
||||
"""
|
||||
load all data from datadir that are in the format: yyyy-mm-dd_label_x.xV_xxxmm.csv
|
||||
"""
|
||||
datasamples = []
|
||||
if num_workers == None:
|
||||
for file, match, label in get_datafiles(datadir, labels, voltage):
|
||||
datasamples.append(Datasample(*match.groups(), labels.get_one_hot(label), file))
|
||||
else:
|
||||
files = get_datafiles(datadir, labels, voltage)
|
||||
def worker():
|
||||
while True:
|
||||
try:
|
||||
file, match, label = files.pop()
|
||||
except IndexError:
|
||||
# No more files to process
|
||||
return
|
||||
datasamples.append(Datasample(*match.groups(), labels.get_one_hot(label), file, init_data=True))
|
||||
threads = [threading.Thread(target=worker) for _ in range(num_workers)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
|
||||
# TODO do the train_test_split after the Dataset split
|
||||
# problem: needs to be after transforms
|
||||
datasamples.append(Datasample(*match.groups(), labels.get_one_hot(label), datadir + "/" + file))
|
||||
train_samples, test_samples = train_test_split(datasamples, train_size=train_to_test_ratio, shuffle=True, random_state=random_state)
|
||||
train_dataset = Dataset(train_samples, transforms=transforms, split_function=split_function)
|
||||
test_dataset = Dataset(test_samples, transforms=transforms, split_function=split_function)
|
||||
train_dataset = Dataset(train_samples, transforms=transforms)
|
||||
test_dataset = Dataset(test_samples, transforms=transforms)
|
||||
return train_dataset, test_dataset
|
||||
|
83
teng-ml/util/epoch_tracker.py
Normal file
83
teng-ml/util/epoch_tracker.py
Normal file
@ -0,0 +1,83 @@
|
||||
from ..util.data_loader import LabelConverter
|
||||
import time
|
||||
import torch
|
||||
|
||||
class EpochTracker:
|
||||
"""
|
||||
Track progress through epochs and generate statistics
|
||||
"""
|
||||
def __init__(self, labels: LabelConverter):
|
||||
# Training
|
||||
self.accuracy = []
|
||||
self.loss = []
|
||||
self.times = [] # timestamps for each epoch end
|
||||
self.trainings = []
|
||||
self.training_indices = [[]] # epoch, batch_nr, (correct_indices, predicted_indices), ind:ex_nr
|
||||
self._current_epoch = 0
|
||||
|
||||
self.labels = labels
|
||||
|
||||
# Testing
|
||||
self.tests = [] # (correct_indices, predicted_indices)
|
||||
|
||||
def train_begin(self):
|
||||
"""for time tracking"""
|
||||
self.times.append(time.time())
|
||||
|
||||
# TRAINING
|
||||
def train(self, correct_indices: torch.Tensor, predicted_indices: torch.Tensor):
|
||||
self.training_indices[self._current_epoch].append((correct_indices, predicted_indices))
|
||||
|
||||
def next_epoch(self, loss):
|
||||
self.times.append(time.time())
|
||||
self.loss.append(loss)
|
||||
correct_predictions = 0
|
||||
total_predictions = 0
|
||||
for predicted_indices, correct_indices in self.training_indices[self._current_epoch]:
|
||||
correct_predictions += (predicted_indices == correct_indices).sum().item()
|
||||
total_predictions += predicted_indices.size(0)
|
||||
accuracy = 100 * correct_predictions / total_predictions
|
||||
self.accuracy.append(accuracy)
|
||||
self._current_epoch += 1
|
||||
self.training_indices.append([])
|
||||
|
||||
def get_last_epoch_summary_str(self):
|
||||
"""call after next_epoch()"""
|
||||
return f"Epoch {self._current_epoch:3}: Accuracy={self.accuracy[-1]:.2f}, Loss={self.loss[-1]:.3f}, Training duration={self.times[-1] - self.times[0]:.2f}s"
|
||||
def get_last_epoch_summary(self):
|
||||
"""
|
||||
@returns accuracy, loss, training time
|
||||
"""
|
||||
return self.accuracy[-1], self.loss[-1], self.times[-1] - self.times[0]
|
||||
|
||||
def get_training_count_per_label(self):
|
||||
count_per_label = [ 0 for _ in range(len(self.labels)) ]
|
||||
for i in range(len(self.training_indices)):
|
||||
for j in range(len(self.training_indices[i])):
|
||||
for k in range(self.training_indices[i][j][0].size(0)):
|
||||
# epoch, batch_nr, 0 = correct_indices, correct_index_nr
|
||||
count_per_label[self.training_indices[i][j][0][k]] += 1
|
||||
return count_per_label
|
||||
|
||||
def __len__(self):
|
||||
return len(self.accuracy)
|
||||
|
||||
def __getitem__(self, idx):
|
||||
return (self.accuracy[idx], self.loss[idx])
|
||||
|
||||
# TESTING
|
||||
def test(self, correct_indices: torch.Tensor, predicted_indices: torch.Tensor):
|
||||
"""
|
||||
@param correct_indices and predicted_indices: 1 dim Tensor
|
||||
"""
|
||||
for i in range(correct_indices.size(0)):
|
||||
self.tests.append((correct_indices[i], predicted_indices[i]))
|
||||
|
||||
|
||||
def get_test_statistics(self):
|
||||
# label i, label_j was predicted when label_i was correct
|
||||
statistics = [ [ 0 for _ in range(len(self.labels))] for _ in range(len(self.labels)) ]
|
||||
for corr, pred in self.tests:
|
||||
statistics[corr][pred] += 1
|
||||
print(statistics)
|
||||
return statistics
|
@ -1,34 +0,0 @@
|
||||
from os import listdir, path
|
||||
|
||||
def add_zeros(v: int, digits=3):
|
||||
"""
|
||||
return v as string, add leading zeros if len(str(v)) < digits
|
||||
"""
|
||||
s = str(v)
|
||||
return '0' * (max(digits - len(s), 0)) + s
|
||||
|
||||
|
||||
def get_next_digits(basename, directory=".", digits=3):
|
||||
"""
|
||||
get the next filename digits
|
||||
example:
|
||||
basename = file
|
||||
directory has file001.csv, file002.pkl, file004.csv
|
||||
-> return 005
|
||||
"""
|
||||
files = listdir(directory)
|
||||
files.sort()
|
||||
files.reverse()
|
||||
lowest_number = -1
|
||||
for file in files:
|
||||
if not file.startswith(basename): continue
|
||||
try:
|
||||
dot = file.rfind('.')
|
||||
if dot > 0: file = file[:dot]
|
||||
number = int(file.replace(basename, ""))
|
||||
if number < lowest_number: continue
|
||||
lowest_number = number
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
return add_zeros(lowest_number+1)
|
@ -1,45 +0,0 @@
|
||||
from ..tracker.epoch_tracker import EpochTracker
|
||||
from ..util.settings import MLSettings
|
||||
import pickle
|
||||
|
||||
|
||||
"""
|
||||
Load and save model, settings and EpochTrackers from/on disk
|
||||
"""
|
||||
|
||||
def load_tracker_validation(model_dir):
|
||||
with open(f"{model_dir}/tracker_validation.pkl", "rb") as file:
|
||||
validation_tracker: EpochTracker = pickle.load(file)
|
||||
return validation_tracker
|
||||
|
||||
def load_tracker_training(model_dir):
|
||||
with open(f"{model_dir}/tracker_training.pkl", "rb") as file:
|
||||
training_tracker: EpochTracker = pickle.load(file)
|
||||
return training_tracker
|
||||
|
||||
def load_settings(model_dir):
|
||||
with open(f"{model_dir}/settings.pkl", "rb") as file:
|
||||
st: MLSettings = pickle.load(file)
|
||||
return st
|
||||
|
||||
def load_model(model_dir):
|
||||
with open(f"{model_dir}/model.pkl", "rb") as file:
|
||||
model = pickle.load(file)
|
||||
return model
|
||||
|
||||
|
||||
def save_tracker_validation(model_dir, validation_tracker: EpochTracker):
|
||||
with open(f"{model_dir}/tracker_validation.pkl", "wb") as file:
|
||||
pickle.dump(validation_tracker, file)
|
||||
|
||||
def save_tracker_training(model_dir, training_tracker: EpochTracker):
|
||||
with open(f"{model_dir}/tracker_training.pkl", "wb") as file:
|
||||
pickle.dump(training_tracker, file)
|
||||
|
||||
def save_settings(model_dir, st):
|
||||
with open(f"{model_dir}/settings.pkl", "wb") as file:
|
||||
pickle.dump(st, file)
|
||||
|
||||
def save_model(model_dir, model):
|
||||
with open(f"{model_dir}/model.pkl", "wb") as file:
|
||||
pickle.dump(model, file)
|
@ -1,5 +1,4 @@
|
||||
from ..util.data_loader import LabelConverter
|
||||
from ..util.split import DataSplitter
|
||||
|
||||
class MLSettings:
|
||||
"""
|
||||
@ -10,11 +9,7 @@ class MLSettings:
|
||||
num_layers=1,
|
||||
hidden_size=1,
|
||||
bidirectional=True,
|
||||
optimizer=None,
|
||||
scheduler=None,
|
||||
loss_func=None,
|
||||
transforms=[],
|
||||
splitter=None,
|
||||
num_epochs=10,
|
||||
batch_size=5,
|
||||
labels=LabelConverter([]),
|
||||
@ -24,11 +19,7 @@ class MLSettings:
|
||||
self.hidden_size = hidden_size
|
||||
self.num_epochs = num_epochs
|
||||
self.bidirectional = bidirectional
|
||||
self.optimizer = optimizer
|
||||
self.scheduler = scheduler
|
||||
self.loss_func = loss_func
|
||||
self.transforms = transforms
|
||||
self.splitter = splitter
|
||||
self.batch_size = batch_size
|
||||
self.labels = labels
|
||||
|
||||
@ -39,7 +30,6 @@ class MLSettings:
|
||||
H = hidden_size
|
||||
B = bidirectional
|
||||
T = #transforms
|
||||
S = splitter
|
||||
E = #epochs
|
||||
"""
|
||||
return f"F{self.num_features}L{self.num_layers}H{self.hidden_size:02}B{'1' if self.bidirectional else '0'}T{len(self.transforms)}S{self.splitter.split_size if type(self.splitter) == DataSplitter is not None else 0:03}E{self.num_epochs:03}"
|
||||
return f"F{self.num_features}L{self.num_layers}H{self.hidden_size}B{'1' if self.bidirectional else '0'}T{len(self.transforms)}"
|
||||
|
@ -1,23 +0,0 @@
|
||||
import numpy as np
|
||||
|
||||
class DataSplitter:
|
||||
r"""
|
||||
Split a numpy array into smaller arrays of size datapoints_per_split
|
||||
If data.shape(0) % datapoints_per_split != 0, the remaining datapoints are dropped
|
||||
"""
|
||||
def __init__(self, datapoints_per_split):
|
||||
self.split_size = datapoints_per_split
|
||||
|
||||
def __call__(self, data: np.ndarray):
|
||||
"""
|
||||
data: [[t, i, v]]
|
||||
"""
|
||||
ret_data = []
|
||||
for i in range(self.split_size, data.shape[0], self.split_size):
|
||||
ret_data.append(data[i-self.split_size:i, :])
|
||||
if len(ret_data) == 0:
|
||||
raise ValueError(f"data has only {data.shape[0]}, but datapoints_per_split is set to {self.split_size}")
|
||||
return ret_data
|
||||
|
||||
def __repr__(self):
|
||||
return f"DataSplitter({self.split_size})"
|
@ -1,51 +0,0 @@
|
||||
import inspect
|
||||
import torch.optim.lr_scheduler as sd
|
||||
import re
|
||||
|
||||
def fill_and_center(s: str, fill_char="=", length=100):
|
||||
rs = fill_char * length
|
||||
margin = (length - len(s)) // 2
|
||||
if margin > 1:
|
||||
rs = f"{fill_char*(margin-1)} {s} {fill_char*(margin-1)}"
|
||||
if len(rs) == 99: rs = rs + "="
|
||||
assert(len(rs) == 100)
|
||||
return rs
|
||||
else:
|
||||
return s
|
||||
|
||||
def class_str(x):
|
||||
"""
|
||||
Return the constructor of the class of x with arguemnts
|
||||
"""
|
||||
name = type(x).__name__
|
||||
signature = inspect.signature(type(x))
|
||||
params = []
|
||||
for param_name, param_value in x.__dict__.items():
|
||||
if param_name not in signature.parameters:
|
||||
continue
|
||||
default_value = signature.parameters[param_name].default
|
||||
if param_value != default_value:
|
||||
params.append(f"{param_name}={param_value!r}")
|
||||
if params:
|
||||
return f"{name}({', '.join(params)})"
|
||||
else:
|
||||
return name
|
||||
|
||||
|
||||
def cleanup_str(s):
|
||||
"""
|
||||
convert to string if necessary and
|
||||
if scheduler string:
|
||||
remove unnecessary parameters
|
||||
"""
|
||||
if not type(s) == str:
|
||||
s = str(s)
|
||||
# check if scheduler string
|
||||
re_scheduler = r"(\w+)\((.*)(optimizer=[A-Za-z]+) \(.*(initial_lr: [\d.]+).*?\)(.*)\)"
|
||||
# groups: (sched_name, sched_params1, optimizer=Name, initial_lr: digits, sched_params2)
|
||||
match = re.fullmatch(re_scheduler, s.replace("\n", " "))
|
||||
if match:
|
||||
g = match.groups()
|
||||
s = f"{g[0]}({g[1]}{g[2]}({g[3]}, ...){g[4]})"
|
||||
return s
|
||||
return s
|
@ -54,3 +54,4 @@ class ConstantInterval:
|
||||
|
||||
def __repr__(self):
|
||||
return f"ConstantInterval(interval={self.interval})"
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user