Compare commits

..

No commits in common. "1ffedb5ddc7568418af8c3a01bd7583f04df92c7" and "9660de248aa23ceb4ce5dcdf30673710cf3705bf" have entirely different histories.

17 changed files with 320 additions and 1059 deletions

1
.gitignore vendored
View File

@ -1,2 +1 @@
*__pycache__* *__pycache__*
.old

View File

@ -1,13 +1,2 @@
# Machine Learning stuff for TENG project # Machine Learning stuff for TENG project
(Bi)LSTM for name classification.
More information on the project are [on my website](https://quintern.xyz/en/teng.html).
## Model training
Adjust the parameters in `main.py` and run it.
All models and the settings they were trained with are automatically serialized with pickle and stored in a subfolder
of the `<model_dir>` that was set in `main.py`.
## Model evaluation
Run `find_best_model.py <model_dir>` with the `<model_dir>` specified in `main.py` during training.

View File

@ -1,316 +0,0 @@
from os import path, listdir
import matplotlib.pyplot as plt
import numpy as np
from sys import exit, argv
if __name__ == "__main__":
if __package__ is None:
# make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change
__package__ = "teng-ml"
import sys
from os import path
filepath = path.realpath(path.abspath(__file__))
sys.path.insert(0, path.dirname(path.dirname(filepath)))
from .tracker.epoch_tracker import EpochTracker
from .util.settings import MLSettings
from .util import model_io as mio
from .util.string import cleanup_str, fill_and_center
cache = {}
#
# TOOLS
#
def get_model_dirs(models_dir):
"""
return all model_dirs, relative to models_dir
"""
if "model_dirs" in cache: return cache["model_dirs"].copy()
paths = listdir(models_dir)
model_dirs = []
for p in paths:
if not path.isdir(f"{models_dir}/{p}"): continue
if not path.isfile(f"{models_dir}/{p}/settings.pkl"): continue
if not path.isfile(f"{models_dir}/{p}/tracker_training.pkl"): continue
if not path.isfile(f"{models_dir}/{p}/tracker_validation.pkl"): continue
if not path.isfile(f"{models_dir}/{p}/model.pkl"): continue
model_dirs.append(f"{models_dir}/{p}")
cache["model_dirs"] = model_dirs.copy()
return model_dirs
def resave_images_svg(model_dirs):
"""
open all trackers and save all plots as svg
"""
for model_dir in model_dirs:
val_tracker: EpochTracker = mio.load_tracker_validation(model_dir)
fig, _ = val_tracker.plot_predictions("Validation: Predictions", model_dir=model_dir, name="img_validation_predictions")
train_tracker: EpochTracker = mio.load_tracker_training(model_dir)
fig, _ = train_tracker.plot_predictions("Training: Predictions", model_dir=model_dir, name="img_training_predictions")
fig, _ = train_tracker.plot_training(model_dir=model_dir)
plt.close('all')
#
# MODEL RANKING
#
def get_model_info_md(model_dir):
st: MLSettings = mio.load_settings(model_dir)
validation_tracker = mio.load_tracker_validation(model_dir)
training_tracker = mio.load_tracker_training(model_dir)
s = f"""Model {model_dir[model_dir.rfind('/')+1:]}
Model parameters:
- num_features = {st.num_features}
- num_layers = {st.num_layers}
- hidden_size = {st.hidden_size}
- bidirectional = {st.bidirectional}
Training data:
- transforms = {st.transforms}
- splitter = {st.splitter}
- labels = {st.labels}
Training info:
- optimizer = {cleanup_str(st.optimizer)}
- scheduler = {cleanup_str(st.scheduler)}
- loss_func = {st.loss_func}
- num_epochs = {st.num_epochs}
- batch_size = {st.batch_size}
- n_predictions = {np.sum(training_tracker.get_count_per_label())}
- final accuracy = {training_tracker.accuracies[-1]}
- highest accuracy = {np.max(training_tracker.accuracies)}
Validation info:
- n_predictions = {np.sum(validation_tracker.get_count_per_label())}
- accuracy = {validation_tracker.accuracies[-1]}
"""
return s
def write_model_info(model_dir, model_info=None):
if model_info is None: model_info = get_model_info_md(model_dir)
with open(f"{model_dir}/model_info.md", "w") as file:
file.write(model_info)
def get_model_ranking(model_dirs):
if "model_ranking" in cache: return cache["model_ranking"].copy()
model_ranking = [] # model, (model_dir | validation accuracy)
for model_dir in model_dirs:
model_ranking.append((model_dir, mio.load_tracker_validation(model_dir).accuracies[-1]))
model_ranking.sort(key=lambda t: t[1]) # sort accuracy
model_ranking.reverse() # best to worst
cache["model_ranking"] = model_ranking.copy()
return model_ranking
def get_model_ranking_md(model_dirs):
model_ranking = get_model_ranking(model_dirs)
ranking_md = ""
for i in range(len(model_ranking)):
model_dir = model_ranking[i][0]
model_name = model_dir[model_dir.rfind("/")+1:]
ranking_md += f"{i+1:3}. Model=`{model_name}`, Validation accuaracy={round(model_ranking[i][1], 2):.2f}%\n"
return ranking_md
#
# SETTINGS RANKING
#
def get_settings_ranking(model_dirs, use_ranking_instead_of_accuracy=False):
"""
load the settings for each model and score them based on the performance of the model
This only works when all settings were the same number of times
(Example: 2 batch sizes x and y have to both to be used z times for the ranking to make sense)
"""
if "settings_ranking" in cache: return cache["settings_ranking"].copy()
settings_ranking = {} # parameter name: param_value: score
model_ranking = get_model_ranking(model_dirs)
model_ranking.reverse() # worst to best
def score_ranking_based(i, param_name, param_value):
"""
score settings depending on the ranking of the model
eg: best of 32 models has batch_size 10 -> batch_size 10 gets 32 points
"""
param_value = cleanup_str(param_value)
if not param_name in settings_ranking.keys():
settings_ranking[param_name] = {}
if not param_value in settings_ranking[param_name].keys():
settings_ranking[param_name][param_value] = 0
settings_ranking[param_name][param_value] += i # i+1 is reverse place in the ranking, worst model is at i=0
def score_accuracy_based(i, param_name, param_value):
"""
score settings depending on the accuracy of the model
eg: models has batch_size 10 and accuracy 63% -> batch_size 10 gets 63 points
"""
param_value = cleanup_str(param_value)
if not param_name in settings_ranking.keys():
settings_ranking[param_name] = {}
if not param_value in settings_ranking[param_name].keys():
settings_ranking[param_name][param_value] = 0
settings_ranking[param_name][param_value] += int(model_ranking[i][1]) # accuracy
if use_ranking_instead_of_accuracy:
score = lambda i, name, val : score_ranking_based(i, name, val)
else:
score = lambda i, name, val : score_accuracy_based(i, name, val)
for i in range(len(model_ranking)):
st = mio.load_settings(model_ranking[i][0])
score(i, "num_features", st.num_features)
score(i, "num_layers", st.num_layers)
score(i, "hidden_size", st.hidden_size)
score(i, "num_epochs", st.num_epochs)
score(i, "bidirectional", st.bidirectional)
score(i, "optimizer", st.optimizer)
score(i, "scheduler", st.scheduler)
score(i, "loss_func", st.loss_func)
score(i, "transforms", st.transforms)
score(i, "splitter", st.splitter)
score(i, "batch_size", st.batch_size)
# remove parameters with only one value
settings_ranking = { k: v for k, v in settings_ranking.items() if len(v) > 1 }
cache["settings_ranking"] = settings_ranking.copy()
return settings_ranking
def get_settings_ranking_md(model_dirs):
"""
turn the scores dict from rank_settings into a markdown string
"""
settings_ranking = get_settings_ranking(model_dirs)
s = ""
for param_name, d in settings_ranking.items():
s += f"- {param_name}:\n"
sorted_scores = sorted(d.items(), key=lambda x: x[1], reverse=True)
for i in range(len(sorted_scores)):
param_value, score = sorted_scores[i]
s += f"\t{i+1}. `{param_value}` ({score} points)\n"
return s
def interactive_model_inspector(models_dir: str):
model_dirs = get_model_dirs(models_dir)
model_dirs.sort()
model_names = [ mdir[mdir.rfind('/')+1:] for mdir in model_dirs ]
def print_options():
s = fill_and_center("Interactive Model Inspector") + "\n"
for i in range(len(model_names)):
s += f"{i+1:02}: {model_names[i]}\n"
s += """ ---
x: print model info for x. listed model (1-based)
x.: print model info for x. ranked model (1-based)
w: write last model info
wa: write info for all listed models
q: quit
*: name of model or path to model directory. If not found, reprint list."""
print(s)
last_model_info = None
last_model_dir = None
def print_model_info(model_dir):
last_model_dir = model_dir
last_model_info = get_model_info_md(last_model_dir)
print(last_model_info)
print_options()
loop = True
try:
while loop:
answer = input("> ")
if len(answer) == 0: continue
try: # if x -> take x. from listed models
i = int(answer)
if 0 < i and i <= len(model_dirs):
print_model_info(model_dirs[i-1])
continue
except ValueError: pass
if answer.endswith('.'): # if x. -> take x. from model ranking
try:
i = int(answer[:-1])
if 0 < i and i <= len(model_dirs):
model_ranking = get_model_ranking(model_dirs)
print_model_info(model_ranking[i-1][0])
continue
except ValueError: pass
elif answer == "w":
if last_model_info is None:
print("Print a model info first.")
continue
write_model_info(last_model_dir, last_model_info)
elif answer == "wa":
for model_dir in model_dirs:
write_model_info(model_dir)
elif answer == "q":
loop = False
continue
else:
if path.isdir(answer): # if model dir
print_model_info(answer)
elif path.isdir(f"{models_dir}/{answer}"): # if model name
print_model_info(f"{models_dir}/{answer}")
else:
print(f"'{answer}' is not a model name in {models_dir} or path to a model directory.")
print_options()
except KeyboardInterrupt: # if <C-C>
pass
except EOFError: # if <C-D>
exit(0)
return True
if __name__ == "__main__":
if len(argv) != 2:
print(f"Exactly one argument (models directory) is required, but got {len(argv)-1}.")
exit(1)
# models_dir = "/home/matth/Uni/TENG/models_phase_2" # where to save models, settings and results
models_dir = path.abspath(path.expanduser(argv[1]))
model_dirs = get_model_dirs(models_dir)
def save_model_ranking():
model_ranking = get_model_ranking_md(model_dirs)
with open(f"{models_dir}/ranking_models.md", "w") as file:
file.write(model_ranking)
def save_settings_ranking():
scores = get_settings_ranking(model_dirs)
with open(f"{models_dir}/ranking_settings.md", "w") as file:
file.write(get_settings_ranking_md(scores))
# if the functions return True, the options are printed again
options = {
'1': ("Print model ranking", lambda: print(get_model_ranking_md(model_dirs))),
'2': ("Save model ranking", save_model_ranking),
'3': ("Print settings ranking", lambda: print(get_settings_ranking_md(model_dirs))),
'4': ("Save settings ranking", save_settings_ranking),
'5': ("Interactive model inspector", lambda: interactive_model_inspector(models_dir)),
'6': ("Resave all images", lambda: resave_images_svg(model_dirs)),
'q': ("quit", exit)
}
def print_options():
print(fill_and_center("Model Evaluator"))
for op, (name, _) in options.items():
print(f"{op:4}: {name}")
print(f"Using models directory '{models_dir}', which contains {len(model_dirs)} models")
print_options()
try:
while True:
answer = input("> ")
if answer in options.keys():
reprint = options[answer][1]()
if reprint == True: print_options()
else:
print(f"Invalid option: '{answer}'")
print_options()
except KeyboardInterrupt: pass
except EOFError: pass

View File

@ -7,22 +7,20 @@ if __name__ == "__main__":
filepath = path.realpath(path.abspath(__file__)) filepath = path.realpath(path.abspath(__file__))
sys.path.insert(0, path.dirname(path.dirname(filepath))) sys.path.insert(0, path.dirname(path.dirname(filepath)))
from sys import exit
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import pandas as pd import pandas as pd
import torch import torch
import torch.nn as nn import torch.nn as nn
import torch.nn.utils.rnn as rnn_utils
from torch.utils.data import DataLoader from torch.utils.data import DataLoader
import itertools import json
import time import time
from os import makedirs, path import pickle
from .util.transform import ConstantInterval, Normalize from .util.transform import ConstantInterval, Normalize
from .util.data_loader import load_datasets, LabelConverter from .util.data_loader import load_datasets, LabelConverter
from .util.split import DataSplitter from .util.epoch_tracker import EpochTracker
from .util.settings import MLSettings from .util.settings import MLSettings
from .rnn.rnn import RNN
from .rnn.training import train_validate_save, select_device
def test_interpol(): def test_interpol():
file = "/home/matth/data/2023-04-27_glass_8.2V_179mm000.csv" file = "/home/matth/data/2023-04-27_glass_8.2V_179mm000.csv"
@ -37,92 +35,187 @@ def test_interpol():
ax1.plot(interp_array[:,0], interp_array[:,1], color="r", label="Interpolated") ax1.plot(interp_array[:,0], interp_array[:,1], color="r", label="Interpolated")
ax1.scatter(array[:,0], array[:,2], color="g", label="Original") ax1.scatter(array[:,0], array[:,2], color="g", label="Original")
ax1.legend() ax1.legend()
# plt.show() plt.show()
if __name__ == "__main__": if __name__ == "__main__":
device = (
"cuda"
if torch.cuda.is_available()
else "mps"
if torch.backends.mps.is_available()
else "cpu"
)
labels = LabelConverter(["foam", "glass", "kapton", "foil", "cloth", "rigid_foam"]) labels = LabelConverter(["foam", "glass", "kapton", "foil", "cloth", "rigid_foam"])
models_dir = "/home/matth/Uni/TENG/models" # where to save models, settings and results
if not path.isdir(models_dir):
makedirs(models_dir)
data_dir = "/home/matth/Uni/TENG/data"
# Test with
num_layers = [ 3 ]
hidden_size = [ 8 ]
bidirectional = [ True ]
t_const_int = ConstantInterval(0.01) t_const_int = ConstantInterval(0.01)
t_norm = Normalize(0, 1) t_norm = Normalize(0, 1)
transforms = [[ t_const_int ]] #, [ t_const_int, t_norm ]] transforms = [ t_const_int, t_norm ]
batch_sizes = [ 64 ] # , 16] st = MLSettings(num_features=1,
splitters = [ DataSplitter(100) ] num_layers=1,
num_epochs = [ 80 ] hidden_size=1,
bidirectional=True,
transforms=transforms,
num_epochs=40,
batch_size=3,
labels=labels,
)
# num_layers=1, print(f"Using device: {device}")
# hidden_size=1,
# bidirectional=True,
# optimizer=None,
# scheduler=None,
# loss_func=None,
# transforms=[],
# splitter=None,
# num_epochs=10,
# batch_size=5,
args = [num_layers, hidden_size, bidirectional, [None], [None], [None], transforms, splitters, num_epochs, batch_sizes]
# create settings for every possible combination
settings = [
MLSettings(1, *params, labels) for params in itertools.product(*args)
]
loss_func = nn.CrossEntropyLoss() train_set, test_set = load_datasets("/home/matth/Uni/TENG/data", labels, voltage=8.2, transforms=st.transforms, train_to_test_ratio=0.7, random_state=42)
optimizers = [
lambda model: torch.optim.Adam(model.parameters(), lr=0.03),
# lambda model: torch.optim.Adam(model.parameters(), lr=0.25),
# lambda model: torch.optim.Adam(model.parameters(), lr=0.50),
]
schedulers = [
lambda optimizer, st: torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9),
lambda optimizer, st: torch.optim.lr_scheduler.StepLR(optimizer, step_size=st.num_epochs // 10, gamma=0.40, verbose=False),
# lambda optimizer, st: torch.optim.lr_scheduler.StepLR(optimizer, step_size=st.num_epochs // 10, gamma=0.75, verbose=False),
]
n_total = len(settings) * len(optimizers) * len(schedulers)
print(f"Testing {n_total} possible configurations")
# scheduler2 =
def create_model(st, optimizer_f, scheduler_f):
model=RNN(input_size=st.num_features, hidden_size=st.hidden_size, num_layers=st.num_layers, num_classes=len(labels), bidirectional=st.bidirectional)
optimizer = optimizer_f(model)
scheduler = scheduler_f(optimizer, st)
return model, optimizer, scheduler
t_begin = time.time()
n = 1
for o in range(len(optimizers)):
for s in range(len(schedulers)):
for i in range(len(settings)):
st = settings[i]
# print(st.get_name())
train_set, test_set = load_datasets(data_dir, labels, voltage=8.2, transforms=st.transforms, split_function=st.splitter, train_to_test_ratio=0.7, random_state=42, num_workers=4)
generator = torch.manual_seed(42)
# train_loader = iter(DataLoader(train_set)) # train_loader = iter(DataLoader(train_set))
# test_loader = iter(DataLoader(test_set)) # test_loader = iter(DataLoader(test_set))
train_loader = DataLoader(train_set, batch_size=st.batch_size, shuffle=True, generator=generator) train_loader = DataLoader(train_set, batch_size=st.batch_size, shuffle=True)
test_loader = DataLoader(test_set, batch_size=st.batch_size, shuffle=True, generator=generator) test_loader = DataLoader(test_set, batch_size=st.batch_size, shuffle=True)
print(f"Testing {n}/{n_total}: (o={o}, s={s}, i={i})")
model, optimizer, scheduler = create_model(st, optimizers[o], schedulers[s])
device = select_device(force_device="cpu")
try:
train_validate_save(model, optimizer, scheduler, loss_func, train_loader, test_loader, st, models_dir, print_interval=4)
except KeyboardInterrupt:
if input("Cancelled current training. Quit? (q/*): ") == "q":
t_end = time.time()
print(f"Testing took {t_end - t_begin:.2f}s = {(t_end-t_begin)/60:.1f}m")
exit()
n += 1
t_end = time.time() class RNN(nn.Module):
print(f"Testing took {t_end - t_begin:.2f}s = {(t_end-t_begin)/60:.1f}m") def __init__(self, input_size, hidden_size, num_layers, num_classes, bidirectional):
super(RNN, self).__init__()
self.num_layers = num_layers
self.hidden_size = hidden_size
self.is_bidirectional = bidirectional
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=bidirectional)
# x = (batch_size, sequence, feature)
if bidirectional == True:
self.fc = nn.Linear(hidden_size * 2, num_classes)
else:
self.fc = nn.Linear(hidden_size, num_classes)
self.softmax = nn.Softmax(dim=1)
def forward(self, x):
# x: batches, length, features
# print(f"forward pass")
D = 2 if self.is_bidirectional == True else 1
# print(f"x({x.shape})=...")
batch_size = x.shape[0]
# print(f"batch_size={batch_size}")
h0 = torch.zeros(D * self.num_layers, batch_size, self.hidden_size).to(device)
# print(f"h1({h0.shape})=...")
c0 = torch.zeros(D * self.num_layers, batch_size, self.hidden_size).to(device)
x.to(device)
_, (h_n, _) = self.lstm(x, (h0, c0))
# print(f"h_n({h_n.shape})=...")
final_state = h_n.view(self.num_layers, D, batch_size, self.hidden_size)[-1] # num_layers, num_directions, batch, hidden_size
# print(f"final_state({final_state.shape})=...")
if D == 1:
X = final_state.squeeze() # TODO what if batch_size == 1
elif D == 2:
h_1, h_2 = final_state[0], final_state[1] # forward & backward pass
#X = h_1 + h_2 # Add both states
X = torch.cat((h_1, h_2), 1) # Concatenate both states, X-size: (Batch, hidden_size * 2
else:
raise ValueError("D must be 1 or 2")
# print(f"X({X.shape})={X}")
output = self.fc(X) # fully-connected layer
# print(f"out({output.shape})={output}")
output = self.softmax(output)
# print(f"out({output.shape})={output}")
return output
model=RNN(input_size=st.num_features, hidden_size=st.hidden_size, num_layers=st.num_layers, num_classes=len(labels), bidirectional=st.bidirectional).to(device)
loss_func = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)
print(f"model:", model)
print(f"loss_func={loss_func}")
print(f"optimizer={optimizer}")
print(f"scheduler={scheduler}")
epoch_tracker = EpochTracker(labels)
print(f"train_loader")
for i, (data, y) in enumerate(train_loader):
print(y)
print(f"{i:3} - {torch.argmax(y, dim=1, keepdim=False)}")
# training
epoch_tracker.train_begin()
for ep in range(st.num_epochs):
for i, (data, y) in enumerate(train_loader):
# print(data, y)
# data = batch, seq, features
# print(f"data({data.shape})={data}")
x = data[:,:,[2]].float() # select voltage data
# print(f"x({x.shape}, {x.dtype})=...")
# print(f"y({y.shape}, {y.dtype})=...")
# length = torch.tensor([x.shape[1] for _ in range(x.shape[0])], dtype=torch.int64)
# print(f"length({length.shape})={length}")
# batch_size = x.shape[0]
# print(f"batch_size={batch_size}")
# v = x.view(batch_size, -1, feature_count)
# data = rnn_utils.pack_padded_sequence(v.type(torch.FloatTensor), length, batch_first=True).to(device)[0]
# print(f"data({data.shape})={data}")
# print(data.batch_sizes[0])
# print(data)
out = model(x)
# print(f"out({out.shape}={out})")
loss = loss_func(out, y)
# print(loss)
optimizer.zero_grad() # clear gradients for next train
loss.backward() # backpropagation, compute gradients
optimizer.step() # apply gradients
# predicted = torch.max(torch.nn.functional.softmax(out), 1)[1]
predicted = torch.argmax(out, dim=1, keepdim=False) # -> [ label_indices ]
correct = torch.argmax(y, dim=1, keepdim=False) # -> [ label_indices ]
# print(f"predicted={predicted}, correct={correct}")
# train_total += y.size(0)
# train_correct += (predicted == correct).sum().item()
epoch_tracker.train(correct, predicted)
epoch_tracker.next_epoch(loss)
print(epoch_tracker.get_last_epoch_summary_str())
scheduler.step()
t_end = time.time()
with torch.no_grad():
for i, (data, y) in enumerate(test_loader):
# print(ep, "Test")
x = data[:,:,[2]].float()
out = model(x)
loss = loss_func(out, y)
predicted = torch.argmax(out, dim=1, keepdim=False) # -> [ label_indices ]
correct = torch.argmax(y, dim=1, keepdim=False) # -> [ label_indices ]
# print(f"predicted={predicted}, correct={correct}")
# val_total += y.size(0)
# val_correct += (predicted == correct).sum().item()
epoch_tracker.test(correct, predicted)
# print(f"train_total={train_total}, val_total={val_total}")
# if train_total == 0: train_total = -1
# if val_total == 0: val_total = -1
# print(f"epoch={ep+1:3}: Testing accuracy={100 * val_correct / val_total:.2f}")
# print(f"End result: Training accuracy={100 * train_correct / train_total:.2f}%, Testing accuracy={100 * val_correct / val_total:.2f}, training took {t_end - t_begin:.2f} seconds")
epoch_tracker.get_test_statistics()
# epoch_tracker.()
# print(epoch_tracker.get_training_summary_str())
print(epoch_tracker.get_training_count_per_label())
model_name = st.get_name()
# save the settings, results and model
with open(model_name + "_settings.pkl", "wb") as file:
pickle.dump(st, file)
with open(model_name + "_results.pkl", "wb") as file:
pickle.dump(epoch_tracker, file)
with open(model_name + "_model.pkl", "wb") as file:
pickle.dump(model, file)

View File

@ -17,7 +17,6 @@ if __name__ == "__main__":
sys.path.insert(0, path.dirname(path.dirname(filepath))) sys.path.insert(0, path.dirname(path.dirname(filepath)))
from .util.transform import Normalize from .util.transform import Normalize
from .util.data_loader import get_datafiles
file = "/home/matth/data/2023-04-25_kapton_8.2V_179mm002.csv" file = "/home/matth/data/2023-04-25_kapton_8.2V_179mm002.csv"

39
teng-ml/rnn.py Normal file
View File

@ -0,0 +1,39 @@
import torch
import torch.nn as nn
# BiLSTM Model
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes, if_bidirectional):
super(RNN, self).__init__()
self.num_layers = num_layers
self.hidden_size = hidden_size
self.if_bidirectional = if_bidirectional
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=if_bidirectional)
if if_bidirectional == True:
self.fc = nn.Linear(hidden_size * 2, num_classes)
else:
self.fc = nn.Linear(hidden_size, num_classes)
def forward(self, x):
D = 2 if self.if_bidirectional == True else 1
Batch = x.batch_sizes[0]
h0 = torch.zeros(D * self.num_layers, Batch, self.hidden_size).to(device)
c0 = torch.zeros(D * self.num_layers, Batch, self.hidden_size).to(device)
x.to(device)
_, (h_n, _) = self.lstm(x, (h0, c0))
final_state = h_n.view(self.num_layers, D, Batch, self.hidden_size)[-1] # num_layers, num_directions, batch, hidden_size
if D == 1:
X = final_state.squeeze()
elif D == 2:
h_1, h_2 = final_state[0], final_state[1] # forward & backward pass
# X = h_1 + h_2 # Add both states
X = torch.cat((h_1, h_2), 1) # Concatenate both states, X-size: (Batch, hidden_size * 2
output = self.fc(X) # fully-connected layer
return output

View File

@ -1,80 +0,0 @@
import torch
import torch.nn as nn
class RNN(nn.Module):
"""
(Bi)LSTM for name classification
"""
def __init__(self, input_size, hidden_size, num_layers, num_classes, bidirectional):
super(RNN, self).__init__()
self.num_layers = num_layers
self.hidden_size = hidden_size
self.is_bidirectional = bidirectional
self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, bidirectional=bidirectional)
# x = (batch_size, sequence, feature)
if bidirectional == True:
self.fc = nn.Linear(hidden_size * 2, num_classes)
else:
self.fc = nn.Linear(hidden_size, num_classes)
self.softmax = nn.Softmax(dim=1)
def forward(self, x):
# x: batches, length, features
# print(f"forward pass")
D = 2 if self.is_bidirectional == True else 1
# print(f"x({x.shape})=...")
batch_size = x.shape[0]
device = x.device
h0 = torch.zeros(D * self.num_layers, batch_size, self.hidden_size).to(device)
# print(f"h1({h0.shape})=...")
c0 = torch.zeros(D * self.num_layers, batch_size, self.hidden_size).to(device)
out, (h_n, c_n) = self.lstm(x, (h0, c0))
# out: (N, L, D * hidden_size)
# h_n: (D * num_layers, hidden_size)
# c_n: (D * num_layers, hidden_size)
# print(f"out({out.shape})={out}")
# print(f"h_n({h_n.shape})={h_n}")
# print(f"c_n({c_n.shape})={c_n}")
# print(f"out({out.shape})=...")
# print(f"h_n({h_n.shape})=...")
# print(f"c_n({c_n.shape})=...")
"""
# select only last layer [-1] -> last layer,
last_layer_state = h_n.view(self.num_layers, D, batch_size, self.hidden_size)[-1]
if D == 1:
# [1, batch_size, hidden_size] -> [batch_size, hidden_size]
X = last_layer_state.squeeze() # TODO what if batch_size == 1
elif D == 2:
h_1, h_2 = last_layer_state[0], last_layer_state[1] # states of both directions
# concatenate both states, X-size: (Batch, hidden_size * 2
X = torch.cat((h_1, h_2), dim=1)
else:
raise ValueError("D must be 1 or 2")
""" # all this is quivalent to line below
out = out[:,-1,:] # select last time step
# fc: (*, hidden_size) -> (*, num_classes)
# print(f"X({X.shape})={X}")
# print(f"X({X.shape})=...")
out = self.fc(out) # fully-connected layer
# print(f"out({output.shape})={output}")
# print(f"output({output.shape})=...")
# softmax: (*) -> (*)
# out = self.softmax(out)
# print(f"output({output.shape})=...")
# print(f"output({output.shape})={output}")
"""
out(torch.Size([15, 200, 10]))=...
h_n(torch.Size([3, 15, 10]))=...
c_n(torch.Size([3, 15, 10]))=...
X(torch.Size([3, 1, 15, 10]))=...
output(torch.Size([3, 1, 15, 6]))=...
output(torch.Size([3, 1, 15, 6]))=..."""
return out

View File

@ -1,150 +0,0 @@
from os import makedirs, path
import torch
import pickle
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
from ..util.settings import MLSettings
from ..tracker.epoch_tracker import EpochTracker
from ..util.file_io import get_next_digits
from ..util.string import class_str
from ..util import model_io as mio
def select_device(force_device=None):
"""
Select best device and move model
"""
if force_device is not None:
device = force_device
else:
device = torch.device(
"cuda"
if torch.cuda.is_available()
# else "mps"
# if torch.backends.mps.is_available()
else "cpu"
)
# print(device, torch.cuda.get_device_name(device), torch.cuda.get_device_properties(device))
return device
def train(model, optimizer, scheduler, loss_func, train_loader: DataLoader, st: MLSettings, print_interval=1) -> EpochTracker:
epoch_tracker = EpochTracker(st.labels)
epoch_tracker.begin()
for ep in range(st.num_epochs):
loss = -1
for i, (data, y) in enumerate(train_loader):
# print(data, y)
# data = batch, seq, features
# print(f"data({data.shape})={data}")
x = data[:,:,[2]].float() # select voltage data
# print(f"x({x.shape}, {x.dtype})=...")
# print(f"y({y.shape}, {y.dtype})=...")
# length = torch.tensor([x.shape[1] for _ in range(x.shape[0])], dtype=torch.int64)
# print(f"length({length.shape})={length}")
# batch_size = x.shape[0]
# print(f"batch_size={batch_size}")
# v = x.view(batch_size, -1, feature_count)
# data = rnn_utils.pack_padded_sequence(v.type(torch.FloatTensor), length, batch_first=True).to(device)[0]
# print(f"data({data.shape})={data}")
out = model(x)
# print(f"out({out.shape}={out})")
# print(f" y({y.shape}={y})")
with torch.no_grad():
predicted = torch.argmax(out, dim=1, keepdim=False) # -> [ label_indices ]
correct = torch.argmax(y, dim=1, keepdim=False) # -> [ label_indices ]
# print(f"predicted={predicted}, correct={correct}")
# train_total += y.size(0)
# train_correct += (predicted == correct).sum().item()
epoch_tracker.add_prediction(correct, predicted)
# predicted2 = torch.argmax(out, dim=1, keepdim=True) # -> [ label_indices ]
# print(f"correct={correct}, y={y}")
loss = loss_func(out, correct)
# loss = loss_func(out, y)
optimizer.zero_grad() # clear gradients for next train
loss.backward() # backpropagation, compute gradients
optimizer.step() # apply gradients
# predicted = torch.max(torch.nn.functional.softmax(out), 1)[1]
epoch_tracker.end_epoch(loss, optimizer.param_groups[0]["lr"])
if ep+1 % print_interval == 0:
print(f"Training:", epoch_tracker.get_epoch_summary_str())
scheduler.step()
print("Training:", epoch_tracker.end())
return epoch_tracker
def validate(model, test_loader: DataLoader, st: MLSettings) -> EpochTracker:
epoch_tracker = EpochTracker(st.labels)
epoch_tracker.begin()
with torch.no_grad():
for i, (data, y) in enumerate(test_loader):
# print(ep, "Test")
x = data[:,:,[2]].float()
out = model(x)
predicted = torch.argmax(out, dim=1, keepdim=False) # -> [ label_indices ]
correct = torch.argmax(y, dim=1, keepdim=False) # -> [ label_indices ]
epoch_tracker.add_prediction(correct, predicted)
print("Validation:", epoch_tracker.end())
return epoch_tracker
def train_validate_save(model, optimizer, scheduler, loss_func, train_loader: DataLoader, test_loader: DataLoader, st: MLSettings, models_dir, print_interval=1, show_plots=False):
# assumes model and data is already on correct device
# train_loader.to(device)
# test_loader.to(device)
# store optimizer, scheduler and loss_func in settings
st.optimizer = class_str(optimizer)
st.scheduler = class_str(scheduler)
st.loss_func = class_str(loss_func)
model_name = st.get_name()
def add_tab(s):
return "\t" + str(s).replace("\n", "\n\t")
print(100 * '=')
print("Model Name:", model_name)
print(f"model:\n", add_tab(model))
# print(f"loss_func:\n", add_tab(class_str(loss_func)))
# print(f"optimizer:\n", add_tab(class_str(optimizer)))
# print(f"scheduler:\n", add_tab(class_str(scheduler)))
print(100 * '-')
training_tracker = train(model, optimizer, scheduler, loss_func, train_loader, st, print_interval=print_interval)
# print("Training: Count per label:", training_tracker.get_count_per_label())
# print("Training: Predictions per label:", training_tracker.get_predictions_per_label())
print(100 * '-')
validation_tracker = validate(model, test_loader, st)
# print("Validation: Count per label:", validation_tracker.get_count_per_label())
# print("Validation: Predictions per label:", validation_tracker.get_predictions_per_label())
digits = get_next_digits(f"{model_name}_", models_dir)
model_dir = f"{models_dir}/{model_name}_{digits}"
# do not put earlier, since the dir should not be created if training is interrupted
if not path.isdir(model_dir): # should always run, if not the digits function did not work
makedirs(model_dir)
fig, _ = validation_tracker.plot_predictions("Validation: Predictions", model_dir=model_dir, name="img_validation_predictions")
fig, _ = training_tracker.plot_predictions("Training: Predictions", model_dir=model_dir, name="img_training_predictions")
fig, _ = training_tracker.plot_training(model_dir=model_dir)
if show_plots:
plt.show()
plt.close('all')
# save the settings, results and model
mio.save_settings(model_dir, st)
mio.save_tracker_validation(model_dir, validation_tracker)
mio.save_tracker_training(model_dir, training_tracker)
mio.save_model(model_dir, model)

View File

@ -1,187 +0,0 @@
from ..util.data_loader import LabelConverter
import matplotlib.pyplot as plt
import time
import torch
import numpy as np
class EpochTracker:
"""
Track accuracy, loss, learning_rate etc. during model training
Can also be used for validation (which will probably be only one epoch)
"""
def __init__(self, labels: LabelConverter):
self.labels = labels
self.times: list[float] = [] # (epoch)
self.predictions = [[]] # (epoch, batch_nr, (correct_indices | predicted_indices), ind:ex_nr)
self.loss: list[float] = [] # (epoch)
self.learning_rate: list[float] = [] # (epoch)
self.epochs: list[int] = [] # 1 based for FINISHED epochs
self._current_epoch = 0 # 0 based
# after training
self.accuracies: list[float] = [] # (epoch)
def begin(self):
self.times.append(time.time())
def end(self):
self.times.append(time.time())
# if end_epoch was called before end:
if len(self.predictions[-1]) == 0:
self.predictions.pop()
self._current_epoch -= 1
else: # if end_epoch was not called
self.epochs.append(len(self.epochs) + 1)
self._calculate_accuracies(self._current_epoch)
s = f"Summary: After {self.epochs[-1]} epochs: "
s += f"Accuracy={self.accuracies[-1]:.2f}%"
s += f", Total time={self.get_total_time():.2f}s"
return s
def get_total_time(self):
if len(self.times) > 1: return self.times[-1] - self.times[0]
else: return -1
#
# EPOCH
#
def end_epoch(self, loss, learning_rate):
"""
loss and learning_rate of last epoch
call before scheduler.step()
"""
self.times.append(time.time())
self.epochs.append(len(self.epochs) + 1)
if type(loss) == torch.Tensor: self.loss.append(loss.item())
else: self.loss.append(loss)
self.learning_rate.append(learning_rate)
self._calculate_accuracies(self._current_epoch)
self._current_epoch += 1
self.predictions.append([])
def get_epoch_summary_str(self, ep=-1):
"""call after next_epoch()"""
m = max(ep, 0) # if ep == -1, check if len is > 0
assert(len(self.epochs) > m)
s = f"Epoch {self.epochs[ep]:3}"
if len(self.accuracies) > m:s += f", Accuracy={self.accuracies[ep]:.2f}%"
if len(self.loss) > m: s += f", Loss={self.loss[ep]:.3f}"
if len(self.loss) > m: s += f", lr={self.learning_rate[ep]:.4f}"
if len(self.times) > m+1: s += f", dt={self.times[ep] - self.times[ep-1]:.2f}s"
return s
def add_prediction(self, correct_indices: torch.Tensor, predicted_indices: torch.Tensor):
"""for accuracy calculation"""
self.predictions[self._current_epoch].append((correct_indices.detach().numpy(), predicted_indices.detach().numpy()))
#
# STATISTICS
#
def get_count_per_label(self, epoch=-1):
"""
the number of times where <label> was the correct label, per label
@returns shape: (label)
"""
count_per_label = [ 0 for _ in range(len(self.labels)) ]
for corr, _ in self.predictions[epoch]:
for batch in range(len(corr)):
count_per_label[corr[batch]] += 1
return count_per_label
def get_predictions_per_label(self, epoch=-1):
"""
How often label_i was predicted, when label_j was the correct label
@returns shape: (label_j, label_i)
"""
statistics = [ [ 0 for _ in range(len(self.labels)) ] for _ in range(len(self.labels)) ]
for corr, pred in self.predictions[epoch]:
for batch in range(len(corr)):
statistics[corr[batch]][pred[batch]] += 1
return statistics
def plot_training(self, title="Training Summary", model_dir=None, name="img_training"):
"""
@param model_dir: Optional. If given, save to model_dir as svg
"""
fig, ax = plt.subplots(nrows=3, ncols=1, sharex=True, layout="tight")
ax[0].plot(self.epochs, self.accuracies, color="red")
ax[0].set_ylabel("Accuracy")
ax[1].plot(self.epochs, self.learning_rate, color="green")
ax[1].set_ylabel("Learning Rate")
ax[2].plot(self.epochs, self.loss, color="blue")
ax[2].set_ylabel("Loss")
fig.suptitle(title)
ax[2].set_xlabel("Epoch")
plt.tight_layout()
if model_dir is not None:
fig.savefig(f"{model_dir}/{name}.svg")
return fig, ax
def plot_predictions(self, title="Predictions per Label", ep=-1, model_dir=None, name="img_training_predictions"):
"""
@param model_dir: Optional. If given, save to model_dir as svg
@param ep: Epoch, defaults to last
"""
# Normalize the data
predictions_per_label = self.get_predictions_per_label(ep)
normalized_predictions = predictions_per_label / np.sum(predictions_per_label, axis=1, keepdims=True)
N = len(self.labels)
label_names = self.labels.get_labels()
fig, ax = plt.subplots(layout="tight")
im = ax.imshow(normalized_predictions, cmap='Blues') # cmap='BuPu'
ax.set_xticks(np.arange(N))
ax.set_yticks(np.arange(N))
ax.set_xticklabels(label_names)
ax.set_yticklabels(label_names)
ax.set_xlabel('Predicted Label')
ax.set_ylabel('Correct Label')
# horizontal lines between labels to better show that the sum of a row is 1
for i in range(1, N):
ax.axhline(i-0.5, color='black', linewidth=1)
# rotate the x-axis labels for better readability
plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
# create annotations
for i in range(N):
for j in range(N):
text = ax.text(j, i, round(normalized_predictions[i, j], 2),
ha="center", va="center", color="black")
# add colorbar
cbar = ax.figure.colorbar(im, ax=ax)
ax.set_title(title)
plt.tight_layout()
if model_dir is not None:
fig.savefig(f"{model_dir}/{name}.svg")
return fig, ax
#
# CALCULATION
#
def _calculate_accuracies(self, ep):
correct_predictions = 0
total_predictions = 0
for correct_indices, predicted_indices in self.predictions[ep]:
correct_predictions += (predicted_indices == correct_indices).sum().item()
total_predictions += len(predicted_indices)
accuracy = correct_predictions / total_predictions * 100
while len(self.accuracies) <= ep:
self.accuracies.append(-1)
self.accuracies[ep] = accuracy

View File

@ -3,9 +3,6 @@ from os import path, listdir
import re import re
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from scipy.sparse import data
import threading
from sklearn.model_selection import train_test_split from sklearn.model_selection import train_test_split
@ -13,7 +10,7 @@ from sklearn.model_selection import train_test_split
re_filename = r"(\d{4}-\d{2}-\d{2})_([a-zA-Z_]+)_(\d{1,2}(?:\.\d*)?)V_(\d+(?:\.\d*)?)mm(\d+).csv" re_filename = r"(\d{4}-\d{2}-\d{2})_([a-zA-Z_]+)_(\d{1,2}(?:\.\d*)?)V_(\d+(?:\.\d*)?)mm(\d+).csv"
class LabelConverter: class LabelConverter:
def __init__(self, class_labels: list[str]): def __init__(self, class_labels):
self.class_labels = class_labels.copy() self.class_labels = class_labels.copy()
self.class_labels.sort() self.class_labels.sort()
@ -35,12 +32,10 @@ class LabelConverter:
def get_labels(self): def get_labels(self):
return self.class_labels.copy() return self.class_labels.copy()
def __repr__(self):
return str(self.class_labels)
class Datasample: class Datasample:
def __init__(self, date: str, label: str, voltage: str, distance: str, index: str, label_vec, datapath: str, init_data=False): def __init__(self, date: str, label: str, voltage: str, distance: str, index: str, label_vec, datapath: str):
self.date = date self.date = date
self.label = label self.label = label
self.voltage = float(voltage) self.voltage = float(voltage)
@ -49,62 +44,50 @@ class Datasample:
self.label_vec = label_vec self.label_vec = label_vec
self.datapath = datapath self.datapath = datapath
self.data = None self.data = None
if init_data: self._load_data()
def __repr__(self): def __repr__(self):
size = self.data.size if self.data is not None else "Unknown" size = self.data.size if self.data is not None else "Unknown"
return f"{self.label}-{self.index}: dimension={size}, recorded at {self.date} with U={self.voltage}V, d={self.distance}mm" return f"{self.label}-{self.index}: dimension={size}, recorded at {self.date} with U={self.voltage}V, d={self.distance}mm"
def _load_data(self): def _load_data(self):
# df = pd.read_csv(self.datapath) df = pd.read_csv(self.datapath)
self.data = np.loadtxt(self.datapath, skiprows=1, dtype=np.float32, delimiter=",") self.data = df.to_numpy(dtype=np.float32)
def get_data(self): def get_data(self):
"""[[timestamp, idata, vdata]]""" """[[timestamps, idata, vdata]]"""
if self.data is None: if self.data is None:
self._load_data() self._load_data()
return self.data return self.data
class Dataset: class Dataset:
""" """
Store the whole dataset, compatible with torch.data.Dataloader Store the whole dataset, compatible with torch.data.Dataloader
""" """
def __init__(self, datasamples, transforms=[], split_function=None): def __init__(self, datasamples, transforms=None):
""" self.datasamples = datasamples
@param transforms: single callable or list of callables that are applied to the data (before eventual split)
@param split_function: (data) -> [data0, data1...] callable that splits the data
"""
self.transforms = transforms self.transforms = transforms
self.data = [] # (data, label) # self.labels = [ d.label_vec for d in datasamples ]
for sample in datasamples: # self.data = [ d.get_data() for d in datasamples ]
data = self.apply_transforms(sample.get_data())
if split_function is None:
self.data.append((data, sample.label_vec))
else:
for data_split in split_function(data):
self.data.append((data_split, sample.label_vec))
def apply_transforms(self, data): def __getitem__(self, index):
data, label = self.datasamples[index].get_data(), self.datasamples[index].label_vec
# print(f"loading dataset {self.datasamples[index]}")
if type(self.transforms) == list: if type(self.transforms) == list:
for t in self.transforms: for t in self.transforms:
data = t(data) data = t(data)
elif self.transforms is not None: elif self.transforms:
data = self.transforms(data) data = self.transforms(data)
return data # TODO
return data[:2000], label
def __getitem__(self, index):
return self.data[index]
def __len__(self): def __len__(self):
return len(self.data) return len(self.datasamples)
def load_datasets(datadir, labels: LabelConverter, transforms=None, voltage=None, train_to_test_ratio=0.7, random_state=None):
def get_datafiles(datadir, labels: LabelConverter, voltage=None):
""" """
get a list of all matching datafiles from datadir that are in the format: yyyy-mm-dd_label_x.xV_xxxmm.csv load all data from datadir that are in the format: yyyy-mm-dd_label_x.xV_xxxmm.csv
""" """
datafiles = [] datasamples = []
files = listdir(datadir) files = listdir(datadir)
files.sort() files.sort()
for file in files: for file in files:
@ -116,38 +99,9 @@ def get_datafiles(datadir, labels: LabelConverter, voltage=None):
sample_voltage = float(match.groups()[2]) sample_voltage = float(match.groups()[2])
if voltage and voltage != sample_voltage: continue if voltage and voltage != sample_voltage: continue
datafiles.append((datadir + "/" + file, match, label))
return datafiles
datasamples.append(Datasample(*match.groups(), labels.get_one_hot(label), datadir + "/" + file))
def load_datasets(datadir, labels: LabelConverter, transforms=None, split_function=None, voltage=None, train_to_test_ratio=0.7, random_state=None, num_workers=None):
"""
load all data from datadir that are in the format: yyyy-mm-dd_label_x.xV_xxxmm.csv
"""
datasamples = []
if num_workers == None:
for file, match, label in get_datafiles(datadir, labels, voltage):
datasamples.append(Datasample(*match.groups(), labels.get_one_hot(label), file))
else:
files = get_datafiles(datadir, labels, voltage)
def worker():
while True:
try:
file, match, label = files.pop()
except IndexError:
# No more files to process
return
datasamples.append(Datasample(*match.groups(), labels.get_one_hot(label), file, init_data=True))
threads = [threading.Thread(target=worker) for _ in range(num_workers)]
for t in threads:
t.start()
for t in threads:
t.join()
# TODO do the train_test_split after the Dataset split
# problem: needs to be after transforms
train_samples, test_samples = train_test_split(datasamples, train_size=train_to_test_ratio, shuffle=True, random_state=random_state) train_samples, test_samples = train_test_split(datasamples, train_size=train_to_test_ratio, shuffle=True, random_state=random_state)
train_dataset = Dataset(train_samples, transforms=transforms, split_function=split_function) train_dataset = Dataset(train_samples, transforms=transforms)
test_dataset = Dataset(test_samples, transforms=transforms, split_function=split_function) test_dataset = Dataset(test_samples, transforms=transforms)
return train_dataset, test_dataset return train_dataset, test_dataset

View File

@ -0,0 +1,83 @@
from ..util.data_loader import LabelConverter
import time
import torch
class EpochTracker:
"""
Track progress through epochs and generate statistics
"""
def __init__(self, labels: LabelConverter):
# Training
self.accuracy = []
self.loss = []
self.times = [] # timestamps for each epoch end
self.trainings = []
self.training_indices = [[]] # epoch, batch_nr, (correct_indices, predicted_indices), ind:ex_nr
self._current_epoch = 0
self.labels = labels
# Testing
self.tests = [] # (correct_indices, predicted_indices)
def train_begin(self):
"""for time tracking"""
self.times.append(time.time())
# TRAINING
def train(self, correct_indices: torch.Tensor, predicted_indices: torch.Tensor):
self.training_indices[self._current_epoch].append((correct_indices, predicted_indices))
def next_epoch(self, loss):
self.times.append(time.time())
self.loss.append(loss)
correct_predictions = 0
total_predictions = 0
for predicted_indices, correct_indices in self.training_indices[self._current_epoch]:
correct_predictions += (predicted_indices == correct_indices).sum().item()
total_predictions += predicted_indices.size(0)
accuracy = 100 * correct_predictions / total_predictions
self.accuracy.append(accuracy)
self._current_epoch += 1
self.training_indices.append([])
def get_last_epoch_summary_str(self):
"""call after next_epoch()"""
return f"Epoch {self._current_epoch:3}: Accuracy={self.accuracy[-1]:.2f}, Loss={self.loss[-1]:.3f}, Training duration={self.times[-1] - self.times[0]:.2f}s"
def get_last_epoch_summary(self):
"""
@returns accuracy, loss, training time
"""
return self.accuracy[-1], self.loss[-1], self.times[-1] - self.times[0]
def get_training_count_per_label(self):
count_per_label = [ 0 for _ in range(len(self.labels)) ]
for i in range(len(self.training_indices)):
for j in range(len(self.training_indices[i])):
for k in range(self.training_indices[i][j][0].size(0)):
# epoch, batch_nr, 0 = correct_indices, correct_index_nr
count_per_label[self.training_indices[i][j][0][k]] += 1
return count_per_label
def __len__(self):
return len(self.accuracy)
def __getitem__(self, idx):
return (self.accuracy[idx], self.loss[idx])
# TESTING
def test(self, correct_indices: torch.Tensor, predicted_indices: torch.Tensor):
"""
@param correct_indices and predicted_indices: 1 dim Tensor
"""
for i in range(correct_indices.size(0)):
self.tests.append((correct_indices[i], predicted_indices[i]))
def get_test_statistics(self):
# label i, label_j was predicted when label_i was correct
statistics = [ [ 0 for _ in range(len(self.labels))] for _ in range(len(self.labels)) ]
for corr, pred in self.tests:
statistics[corr][pred] += 1
print(statistics)
return statistics

View File

@ -1,34 +0,0 @@
from os import listdir, path
def add_zeros(v: int, digits=3):
"""
return v as string, add leading zeros if len(str(v)) < digits
"""
s = str(v)
return '0' * (max(digits - len(s), 0)) + s
def get_next_digits(basename, directory=".", digits=3):
"""
get the next filename digits
example:
basename = file
directory has file001.csv, file002.pkl, file004.csv
-> return 005
"""
files = listdir(directory)
files.sort()
files.reverse()
lowest_number = -1
for file in files:
if not file.startswith(basename): continue
try:
dot = file.rfind('.')
if dot > 0: file = file[:dot]
number = int(file.replace(basename, ""))
if number < lowest_number: continue
lowest_number = number
except ValueError:
continue
return add_zeros(lowest_number+1)

View File

@ -1,45 +0,0 @@
from ..tracker.epoch_tracker import EpochTracker
from ..util.settings import MLSettings
import pickle
"""
Load and save model, settings and EpochTrackers from/on disk
"""
def load_tracker_validation(model_dir):
with open(f"{model_dir}/tracker_validation.pkl", "rb") as file:
validation_tracker: EpochTracker = pickle.load(file)
return validation_tracker
def load_tracker_training(model_dir):
with open(f"{model_dir}/tracker_training.pkl", "rb") as file:
training_tracker: EpochTracker = pickle.load(file)
return training_tracker
def load_settings(model_dir):
with open(f"{model_dir}/settings.pkl", "rb") as file:
st: MLSettings = pickle.load(file)
return st
def load_model(model_dir):
with open(f"{model_dir}/model.pkl", "rb") as file:
model = pickle.load(file)
return model
def save_tracker_validation(model_dir, validation_tracker: EpochTracker):
with open(f"{model_dir}/tracker_validation.pkl", "wb") as file:
pickle.dump(validation_tracker, file)
def save_tracker_training(model_dir, training_tracker: EpochTracker):
with open(f"{model_dir}/tracker_training.pkl", "wb") as file:
pickle.dump(training_tracker, file)
def save_settings(model_dir, st):
with open(f"{model_dir}/settings.pkl", "wb") as file:
pickle.dump(st, file)
def save_model(model_dir, model):
with open(f"{model_dir}/model.pkl", "wb") as file:
pickle.dump(model, file)

View File

@ -1,5 +1,4 @@
from ..util.data_loader import LabelConverter from ..util.data_loader import LabelConverter
from ..util.split import DataSplitter
class MLSettings: class MLSettings:
""" """
@ -10,11 +9,7 @@ class MLSettings:
num_layers=1, num_layers=1,
hidden_size=1, hidden_size=1,
bidirectional=True, bidirectional=True,
optimizer=None,
scheduler=None,
loss_func=None,
transforms=[], transforms=[],
splitter=None,
num_epochs=10, num_epochs=10,
batch_size=5, batch_size=5,
labels=LabelConverter([]), labels=LabelConverter([]),
@ -24,11 +19,7 @@ class MLSettings:
self.hidden_size = hidden_size self.hidden_size = hidden_size
self.num_epochs = num_epochs self.num_epochs = num_epochs
self.bidirectional = bidirectional self.bidirectional = bidirectional
self.optimizer = optimizer
self.scheduler = scheduler
self.loss_func = loss_func
self.transforms = transforms self.transforms = transforms
self.splitter = splitter
self.batch_size = batch_size self.batch_size = batch_size
self.labels = labels self.labels = labels
@ -39,7 +30,6 @@ class MLSettings:
H = hidden_size H = hidden_size
B = bidirectional B = bidirectional
T = #transforms T = #transforms
S = splitter
E = #epochs E = #epochs
""" """
return f"F{self.num_features}L{self.num_layers}H{self.hidden_size:02}B{'1' if self.bidirectional else '0'}T{len(self.transforms)}S{self.splitter.split_size if type(self.splitter) == DataSplitter is not None else 0:03}E{self.num_epochs:03}" return f"F{self.num_features}L{self.num_layers}H{self.hidden_size}B{'1' if self.bidirectional else '0'}T{len(self.transforms)}"

View File

@ -1,23 +0,0 @@
import numpy as np
class DataSplitter:
r"""
Split a numpy array into smaller arrays of size datapoints_per_split
If data.shape(0) % datapoints_per_split != 0, the remaining datapoints are dropped
"""
def __init__(self, datapoints_per_split):
self.split_size = datapoints_per_split
def __call__(self, data: np.ndarray):
"""
data: [[t, i, v]]
"""
ret_data = []
for i in range(self.split_size, data.shape[0], self.split_size):
ret_data.append(data[i-self.split_size:i, :])
if len(ret_data) == 0:
raise ValueError(f"data has only {data.shape[0]}, but datapoints_per_split is set to {self.split_size}")
return ret_data
def __repr__(self):
return f"DataSplitter({self.split_size})"

View File

@ -1,51 +0,0 @@
import inspect
import torch.optim.lr_scheduler as sd
import re
def fill_and_center(s: str, fill_char="=", length=100):
rs = fill_char * length
margin = (length - len(s)) // 2
if margin > 1:
rs = f"{fill_char*(margin-1)} {s} {fill_char*(margin-1)}"
if len(rs) == 99: rs = rs + "="
assert(len(rs) == 100)
return rs
else:
return s
def class_str(x):
"""
Return the constructor of the class of x with arguemnts
"""
name = type(x).__name__
signature = inspect.signature(type(x))
params = []
for param_name, param_value in x.__dict__.items():
if param_name not in signature.parameters:
continue
default_value = signature.parameters[param_name].default
if param_value != default_value:
params.append(f"{param_name}={param_value!r}")
if params:
return f"{name}({', '.join(params)})"
else:
return name
def cleanup_str(s):
"""
convert to string if necessary and
if scheduler string:
remove unnecessary parameters
"""
if not type(s) == str:
s = str(s)
# check if scheduler string
re_scheduler = r"(\w+)\((.*)(optimizer=[A-Za-z]+) \(.*(initial_lr: [\d.]+).*?\)(.*)\)"
# groups: (sched_name, sched_params1, optimizer=Name, initial_lr: digits, sched_params2)
match = re.fullmatch(re_scheduler, s.replace("\n", " "))
if match:
g = match.groups()
s = f"{g[0]}({g[1]}{g[2]}({g[3]}, ...){g[4]})"
return s
return s

View File

@ -54,3 +54,4 @@ class ConstantInterval:
def __repr__(self): def __repr__(self):
return f"ConstantInterval(interval={self.interval})" return f"ConstantInterval(interval={self.interval})"