Fixed model, restructured files

This commit is contained in:
matthias@arch 2023-05-10 22:44:14 +02:00
parent 9660de248a
commit f61d88e0d8
16 changed files with 743 additions and 320 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
*__pycache__* *__pycache__*
.old

View File

@ -1,2 +1,13 @@
# Machine Learning stuff for TENG project # Machine Learning stuff for TENG project
(Bi)LSTM for name classification.
More information on the project are [on my website](https://quintern.xyz/en/teng.html).
## Model training
Adjust the parameters in `main.py` and run it.
All models and the settings they were trained with are automatically serialized with pickle and stored in a subfolder
of the `<model_dir>` that was set in `main.py`.
## Model evaluation
Run `find_best_model.py <model_dir>` with the `<model_dir>` specified in `main.py` during training.

View File

@ -7,20 +7,22 @@ if __name__ == "__main__":
filepath = path.realpath(path.abspath(__file__)) filepath = path.realpath(path.abspath(__file__))
sys.path.insert(0, path.dirname(path.dirname(filepath))) sys.path.insert(0, path.dirname(path.dirname(filepath)))
from sys import exit
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import pandas as pd import pandas as pd
import torch import torch
import torch.nn as nn import torch.nn as nn
import torch.nn.utils.rnn as rnn_utils
from torch.utils.data import DataLoader from torch.utils.data import DataLoader
import json import itertools
import time import time
import pickle from os import makedirs, path
from .util.transform import ConstantInterval, Normalize from .util.transform import ConstantInterval, Normalize
from .util.data_loader import load_datasets, LabelConverter from .util.data_loader import load_datasets, LabelConverter
from .util.epoch_tracker import EpochTracker from .util.split import DataSplitter
from .util.settings import MLSettings from .util.settings import MLSettings
from .rnn.rnn import RNN
from .rnn.training import train_validate_save, select_device
def test_interpol(): def test_interpol():
file = "/home/matth/data/2023-04-27_glass_8.2V_179mm000.csv" file = "/home/matth/data/2023-04-27_glass_8.2V_179mm000.csv"
@ -35,187 +37,92 @@ def test_interpol():
ax1.plot(interp_array[:,0], interp_array[:,1], color="r", label="Interpolated") ax1.plot(interp_array[:,0], interp_array[:,1], color="r", label="Interpolated")
ax1.scatter(array[:,0], array[:,2], color="g", label="Original") ax1.scatter(array[:,0], array[:,2], color="g", label="Original")
ax1.legend() ax1.legend()
plt.show() # plt.show()
if __name__ == "__main__": if __name__ == "__main__":
device = (
"cuda"
if torch.cuda.is_available()
else "mps"
if torch.backends.mps.is_available()
else "cpu"
)
labels = LabelConverter(["foam", "glass", "kapton", "foil", "cloth", "rigid_foam"]) labels = LabelConverter(["foam", "glass", "kapton", "foil", "cloth", "rigid_foam"])
models_dir = "/home/matth/Uni/TENG/models" # where to save models, settings and results
if not path.isdir(models_dir):
makedirs(models_dir)
data_dir = "/home/matth/Uni/TENG/data"
# Test with
num_layers = [ 3 ]
hidden_size = [ 8 ]
bidirectional = [ True ]
t_const_int = ConstantInterval(0.01) t_const_int = ConstantInterval(0.01)
t_norm = Normalize(0, 1) t_norm = Normalize(0, 1)
transforms = [ t_const_int, t_norm ] transforms = [[ t_const_int ]] #, [ t_const_int, t_norm ]]
st = MLSettings(num_features=1, batch_sizes = [ 64 ] # , 16]
num_layers=1, splitters = [ DataSplitter(100) ]
hidden_size=1, num_epochs = [ 80 ]
bidirectional=True,
transforms=transforms,
num_epochs=40,
batch_size=3,
labels=labels,
)
print(f"Using device: {device}") # num_layers=1,
# hidden_size=1,
# bidirectional=True,
# optimizer=None,
# scheduler=None,
# loss_func=None,
# transforms=[],
# splitter=None,
# num_epochs=10,
# batch_size=5,
args = [num_layers, hidden_size, bidirectional, [None], [None], [None], transforms, splitters, num_epochs, batch_sizes]
# create settings for every possible combination
settings = [
MLSettings(1, *params, labels) for params in itertools.product(*args)
]
train_set, test_set = load_datasets("/home/matth/Uni/TENG/data", labels, voltage=8.2, transforms=st.transforms, train_to_test_ratio=0.7, random_state=42) loss_func = nn.CrossEntropyLoss()
optimizers = [
lambda model: torch.optim.Adam(model.parameters(), lr=0.03),
# lambda model: torch.optim.Adam(model.parameters(), lr=0.25),
# lambda model: torch.optim.Adam(model.parameters(), lr=0.50),
]
schedulers = [
lambda optimizer, st: torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9),
lambda optimizer, st: torch.optim.lr_scheduler.StepLR(optimizer, step_size=st.num_epochs // 10, gamma=0.40, verbose=False),
# lambda optimizer, st: torch.optim.lr_scheduler.StepLR(optimizer, step_size=st.num_epochs // 10, gamma=0.75, verbose=False),
]
n_total = len(settings) * len(optimizers) * len(schedulers)
print(f"Testing {n_total} possible configurations")
# scheduler2 =
def create_model(st, optimizer_f, scheduler_f):
model=RNN(input_size=st.num_features, hidden_size=st.hidden_size, num_layers=st.num_layers, num_classes=len(labels), bidirectional=st.bidirectional)
optimizer = optimizer_f(model)
scheduler = scheduler_f(optimizer, st)
return model, optimizer, scheduler
t_begin = time.time()
n = 1
for o in range(len(optimizers)):
for s in range(len(schedulers)):
for i in range(len(settings)):
st = settings[i]
# print(st.get_name())
train_set, test_set = load_datasets(data_dir, labels, voltage=8.2, transforms=st.transforms, split_function=st.splitter, train_to_test_ratio=0.7, random_state=42, num_workers=4)
generator = torch.manual_seed(42)
# train_loader = iter(DataLoader(train_set)) # train_loader = iter(DataLoader(train_set))
# test_loader = iter(DataLoader(test_set)) # test_loader = iter(DataLoader(test_set))
train_loader = DataLoader(train_set, batch_size=st.batch_size, shuffle=True) train_loader = DataLoader(train_set, batch_size=st.batch_size, shuffle=True, generator=generator)
test_loader = DataLoader(test_set, batch_size=st.batch_size, shuffle=True) test_loader = DataLoader(test_set, batch_size=st.batch_size, shuffle=True, generator=generator)
print(f"Testing {n}/{n_total}: (o={o}, s={s}, i={i})")
class RNN(nn.Module): model, optimizer, scheduler = create_model(st, optimizers[o], schedulers[s])
def __init__(self, input_size, hidden_size, num_layers, num_classes, bidirectional): device = select_device(force_device="cpu")
super(RNN, self).__init__() try:
self.num_layers = num_layers train_validate_save(model, optimizer, scheduler, loss_func, train_loader, test_loader, st, models_dir, print_interval=4)
self.hidden_size = hidden_size except KeyboardInterrupt:
self.is_bidirectional = bidirectional if input("Cancelled current training. Quit? (q/*): ") == "q":
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=bidirectional)
# x = (batch_size, sequence, feature)
if bidirectional == True:
self.fc = nn.Linear(hidden_size * 2, num_classes)
else:
self.fc = nn.Linear(hidden_size, num_classes)
self.softmax = nn.Softmax(dim=1)
def forward(self, x):
# x: batches, length, features
# print(f"forward pass")
D = 2 if self.is_bidirectional == True else 1
# print(f"x({x.shape})=...")
batch_size = x.shape[0]
# print(f"batch_size={batch_size}")
h0 = torch.zeros(D * self.num_layers, batch_size, self.hidden_size).to(device)
# print(f"h1({h0.shape})=...")
c0 = torch.zeros(D * self.num_layers, batch_size, self.hidden_size).to(device)
x.to(device)
_, (h_n, _) = self.lstm(x, (h0, c0))
# print(f"h_n({h_n.shape})=...")
final_state = h_n.view(self.num_layers, D, batch_size, self.hidden_size)[-1] # num_layers, num_directions, batch, hidden_size
# print(f"final_state({final_state.shape})=...")
if D == 1:
X = final_state.squeeze() # TODO what if batch_size == 1
elif D == 2:
h_1, h_2 = final_state[0], final_state[1] # forward & backward pass
#X = h_1 + h_2 # Add both states
X = torch.cat((h_1, h_2), 1) # Concatenate both states, X-size: (Batch, hidden_size * 2
else:
raise ValueError("D must be 1 or 2")
# print(f"X({X.shape})={X}")
output = self.fc(X) # fully-connected layer
# print(f"out({output.shape})={output}")
output = self.softmax(output)
# print(f"out({output.shape})={output}")
return output
model=RNN(input_size=st.num_features, hidden_size=st.hidden_size, num_layers=st.num_layers, num_classes=len(labels), bidirectional=st.bidirectional).to(device)
loss_func = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)
print(f"model:", model)
print(f"loss_func={loss_func}")
print(f"optimizer={optimizer}")
print(f"scheduler={scheduler}")
epoch_tracker = EpochTracker(labels)
print(f"train_loader")
for i, (data, y) in enumerate(train_loader):
print(y)
print(f"{i:3} - {torch.argmax(y, dim=1, keepdim=False)}")
# training
epoch_tracker.train_begin()
for ep in range(st.num_epochs):
for i, (data, y) in enumerate(train_loader):
# print(data, y)
# data = batch, seq, features
# print(f"data({data.shape})={data}")
x = data[:,:,[2]].float() # select voltage data
# print(f"x({x.shape}, {x.dtype})=...")
# print(f"y({y.shape}, {y.dtype})=...")
# length = torch.tensor([x.shape[1] for _ in range(x.shape[0])], dtype=torch.int64)
# print(f"length({length.shape})={length}")
# batch_size = x.shape[0]
# print(f"batch_size={batch_size}")
# v = x.view(batch_size, -1, feature_count)
# data = rnn_utils.pack_padded_sequence(v.type(torch.FloatTensor), length, batch_first=True).to(device)[0]
# print(f"data({data.shape})={data}")
# print(data.batch_sizes[0])
# print(data)
out = model(x)
# print(f"out({out.shape}={out})")
loss = loss_func(out, y)
# print(loss)
optimizer.zero_grad() # clear gradients for next train
loss.backward() # backpropagation, compute gradients
optimizer.step() # apply gradients
# predicted = torch.max(torch.nn.functional.softmax(out), 1)[1]
predicted = torch.argmax(out, dim=1, keepdim=False) # -> [ label_indices ]
correct = torch.argmax(y, dim=1, keepdim=False) # -> [ label_indices ]
# print(f"predicted={predicted}, correct={correct}")
# train_total += y.size(0)
# train_correct += (predicted == correct).sum().item()
epoch_tracker.train(correct, predicted)
epoch_tracker.next_epoch(loss)
print(epoch_tracker.get_last_epoch_summary_str())
scheduler.step()
t_end = time.time() t_end = time.time()
print(f"Testing took {t_end - t_begin:.2f}s = {(t_end-t_begin)/60:.1f}m")
exit()
n += 1
with torch.no_grad(): t_end = time.time()
for i, (data, y) in enumerate(test_loader): print(f"Testing took {t_end - t_begin:.2f}s = {(t_end-t_begin)/60:.1f}m")
# print(ep, "Test")
x = data[:,:,[2]].float()
out = model(x)
loss = loss_func(out, y)
predicted = torch.argmax(out, dim=1, keepdim=False) # -> [ label_indices ]
correct = torch.argmax(y, dim=1, keepdim=False) # -> [ label_indices ]
# print(f"predicted={predicted}, correct={correct}")
# val_total += y.size(0)
# val_correct += (predicted == correct).sum().item()
epoch_tracker.test(correct, predicted)
# print(f"train_total={train_total}, val_total={val_total}")
# if train_total == 0: train_total = -1
# if val_total == 0: val_total = -1
# print(f"epoch={ep+1:3}: Testing accuracy={100 * val_correct / val_total:.2f}")
# print(f"End result: Training accuracy={100 * train_correct / train_total:.2f}%, Testing accuracy={100 * val_correct / val_total:.2f}, training took {t_end - t_begin:.2f} seconds")
epoch_tracker.get_test_statistics()
# epoch_tracker.()
# print(epoch_tracker.get_training_summary_str())
print(epoch_tracker.get_training_count_per_label())
model_name = st.get_name()
# save the settings, results and model
with open(model_name + "_settings.pkl", "wb") as file:
pickle.dump(st, file)
with open(model_name + "_results.pkl", "wb") as file:
pickle.dump(epoch_tracker, file)
with open(model_name + "_model.pkl", "wb") as file:
pickle.dump(model, file)

View File

@ -17,6 +17,7 @@ if __name__ == "__main__":
sys.path.insert(0, path.dirname(path.dirname(filepath))) sys.path.insert(0, path.dirname(path.dirname(filepath)))
from .util.transform import Normalize from .util.transform import Normalize
from .util.data_loader import get_datafiles
file = "/home/matth/data/2023-04-25_kapton_8.2V_179mm002.csv" file = "/home/matth/data/2023-04-25_kapton_8.2V_179mm002.csv"

View File

@ -1,39 +0,0 @@
import torch
import torch.nn as nn
# BiLSTM Model
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes, if_bidirectional):
super(RNN, self).__init__()
self.num_layers = num_layers
self.hidden_size = hidden_size
self.if_bidirectional = if_bidirectional
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=if_bidirectional)
if if_bidirectional == True:
self.fc = nn.Linear(hidden_size * 2, num_classes)
else:
self.fc = nn.Linear(hidden_size, num_classes)
def forward(self, x):
D = 2 if self.if_bidirectional == True else 1
Batch = x.batch_sizes[0]
h0 = torch.zeros(D * self.num_layers, Batch, self.hidden_size).to(device)
c0 = torch.zeros(D * self.num_layers, Batch, self.hidden_size).to(device)
x.to(device)
_, (h_n, _) = self.lstm(x, (h0, c0))
final_state = h_n.view(self.num_layers, D, Batch, self.hidden_size)[-1] # num_layers, num_directions, batch, hidden_size
if D == 1:
X = final_state.squeeze()
elif D == 2:
h_1, h_2 = final_state[0], final_state[1] # forward & backward pass
# X = h_1 + h_2 # Add both states
X = torch.cat((h_1, h_2), 1) # Concatenate both states, X-size: (Batch, hidden_size * 2
output = self.fc(X) # fully-connected layer
return output

80
teng-ml/rnn/rnn.py Normal file
View File

@ -0,0 +1,80 @@
import torch
import torch.nn as nn
class RNN(nn.Module):
"""
(Bi)LSTM for name classification
"""
def __init__(self, input_size, hidden_size, num_layers, num_classes, bidirectional):
super(RNN, self).__init__()
self.num_layers = num_layers
self.hidden_size = hidden_size
self.is_bidirectional = bidirectional
self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, bidirectional=bidirectional)
# x = (batch_size, sequence, feature)
if bidirectional == True:
self.fc = nn.Linear(hidden_size * 2, num_classes)
else:
self.fc = nn.Linear(hidden_size, num_classes)
self.softmax = nn.Softmax(dim=1)
def forward(self, x):
# x: batches, length, features
# print(f"forward pass")
D = 2 if self.is_bidirectional == True else 1
# print(f"x({x.shape})=...")
batch_size = x.shape[0]
device = x.device
h0 = torch.zeros(D * self.num_layers, batch_size, self.hidden_size).to(device)
# print(f"h1({h0.shape})=...")
c0 = torch.zeros(D * self.num_layers, batch_size, self.hidden_size).to(device)
out, (h_n, c_n) = self.lstm(x, (h0, c0))
# out: (N, L, D * hidden_size)
# h_n: (D * num_layers, hidden_size)
# c_n: (D * num_layers, hidden_size)
# print(f"out({out.shape})={out}")
# print(f"h_n({h_n.shape})={h_n}")
# print(f"c_n({c_n.shape})={c_n}")
# print(f"out({out.shape})=...")
# print(f"h_n({h_n.shape})=...")
# print(f"c_n({c_n.shape})=...")
"""
# select only last layer [-1] -> last layer,
last_layer_state = h_n.view(self.num_layers, D, batch_size, self.hidden_size)[-1]
if D == 1:
# [1, batch_size, hidden_size] -> [batch_size, hidden_size]
X = last_layer_state.squeeze() # TODO what if batch_size == 1
elif D == 2:
h_1, h_2 = last_layer_state[0], last_layer_state[1] # states of both directions
# concatenate both states, X-size: (Batch, hidden_size * 2
X = torch.cat((h_1, h_2), dim=1)
else:
raise ValueError("D must be 1 or 2")
""" # all this is quivalent to line below
out = out[:,-1,:] # select last time step
# fc: (*, hidden_size) -> (*, num_classes)
# print(f"X({X.shape})={X}")
# print(f"X({X.shape})=...")
out = self.fc(out) # fully-connected layer
# print(f"out({output.shape})={output}")
# print(f"output({output.shape})=...")
# softmax: (*) -> (*)
# out = self.softmax(out)
# print(f"output({output.shape})=...")
# print(f"output({output.shape})={output}")
"""
out(torch.Size([15, 200, 10]))=...
h_n(torch.Size([3, 15, 10]))=...
c_n(torch.Size([3, 15, 10]))=...
X(torch.Size([3, 1, 15, 10]))=...
output(torch.Size([3, 1, 15, 6]))=...
output(torch.Size([3, 1, 15, 6]))=..."""
return out

150
teng-ml/rnn/training.py Normal file
View File

@ -0,0 +1,150 @@
from os import makedirs, path
import torch
import pickle
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
from ..util.settings import MLSettings
from ..tracker.epoch_tracker import EpochTracker
from ..util.file_io import get_next_digits
from ..util.string import class_str
from ..util import model_io as mio
def select_device(force_device=None):
"""
Select best device and move model
"""
if force_device is not None:
device = force_device
else:
device = torch.device(
"cuda"
if torch.cuda.is_available()
# else "mps"
# if torch.backends.mps.is_available()
else "cpu"
)
# print(device, torch.cuda.get_device_name(device), torch.cuda.get_device_properties(device))
return device
def train(model, optimizer, scheduler, loss_func, train_loader: DataLoader, st: MLSettings, print_interval=1) -> EpochTracker:
epoch_tracker = EpochTracker(st.labels)
epoch_tracker.begin()
for ep in range(st.num_epochs):
loss = -1
for i, (data, y) in enumerate(train_loader):
# print(data, y)
# data = batch, seq, features
# print(f"data({data.shape})={data}")
x = data[:,:,[2]].float() # select voltage data
# print(f"x({x.shape}, {x.dtype})=...")
# print(f"y({y.shape}, {y.dtype})=...")
# length = torch.tensor([x.shape[1] for _ in range(x.shape[0])], dtype=torch.int64)
# print(f"length({length.shape})={length}")
# batch_size = x.shape[0]
# print(f"batch_size={batch_size}")
# v = x.view(batch_size, -1, feature_count)
# data = rnn_utils.pack_padded_sequence(v.type(torch.FloatTensor), length, batch_first=True).to(device)[0]
# print(f"data({data.shape})={data}")
out = model(x)
# print(f"out({out.shape}={out})")
# print(f" y({y.shape}={y})")
with torch.no_grad():
predicted = torch.argmax(out, dim=1, keepdim=False) # -> [ label_indices ]
correct = torch.argmax(y, dim=1, keepdim=False) # -> [ label_indices ]
# print(f"predicted={predicted}, correct={correct}")
# train_total += y.size(0)
# train_correct += (predicted == correct).sum().item()
epoch_tracker.add_prediction(correct, predicted)
# predicted2 = torch.argmax(out, dim=1, keepdim=True) # -> [ label_indices ]
# print(f"correct={correct}, y={y}")
loss = loss_func(out, correct)
# loss = loss_func(out, y)
optimizer.zero_grad() # clear gradients for next train
loss.backward() # backpropagation, compute gradients
optimizer.step() # apply gradients
# predicted = torch.max(torch.nn.functional.softmax(out), 1)[1]
epoch_tracker.end_epoch(loss, optimizer.param_groups[0]["lr"])
if ep+1 % print_interval == 0:
print(f"Training:", epoch_tracker.get_epoch_summary_str())
scheduler.step()
print("Training:", epoch_tracker.end())
return epoch_tracker
def validate(model, test_loader: DataLoader, st: MLSettings) -> EpochTracker:
epoch_tracker = EpochTracker(st.labels)
epoch_tracker.begin()
with torch.no_grad():
for i, (data, y) in enumerate(test_loader):
# print(ep, "Test")
x = data[:,:,[2]].float()
out = model(x)
predicted = torch.argmax(out, dim=1, keepdim=False) # -> [ label_indices ]
correct = torch.argmax(y, dim=1, keepdim=False) # -> [ label_indices ]
epoch_tracker.add_prediction(correct, predicted)
print("Validation:", epoch_tracker.end())
return epoch_tracker
def train_validate_save(model, optimizer, scheduler, loss_func, train_loader: DataLoader, test_loader: DataLoader, st: MLSettings, models_dir, print_interval=1, show_plots=False):
# assumes model and data is already on correct device
# train_loader.to(device)
# test_loader.to(device)
# store optimizer, scheduler and loss_func in settings
st.optimizer = class_str(optimizer)
st.scheduler = class_str(scheduler)
st.loss_func = class_str(loss_func)
model_name = st.get_name()
def add_tab(s):
return "\t" + str(s).replace("\n", "\n\t")
print(100 * '=')
print("Model Name:", model_name)
print(f"model:\n", add_tab(model))
# print(f"loss_func:\n", add_tab(class_str(loss_func)))
# print(f"optimizer:\n", add_tab(class_str(optimizer)))
# print(f"scheduler:\n", add_tab(class_str(scheduler)))
print(100 * '-')
training_tracker = train(model, optimizer, scheduler, loss_func, train_loader, st, print_interval=print_interval)
# print("Training: Count per label:", training_tracker.get_count_per_label())
# print("Training: Predictions per label:", training_tracker.get_predictions_per_label())
print(100 * '-')
validation_tracker = validate(model, test_loader, st)
# print("Validation: Count per label:", validation_tracker.get_count_per_label())
# print("Validation: Predictions per label:", validation_tracker.get_predictions_per_label())
digits = get_next_digits(f"{model_name}_", models_dir)
model_dir = f"{models_dir}/{model_name}_{digits}"
# do not put earlier, since the dir should not be created if training is interrupted
if not path.isdir(model_dir): # should always run, if not the digits function did not work
makedirs(model_dir)
fig, _ = validation_tracker.plot_predictions("Validation: Predictions", model_dir=model_dir, name="img_validation_predictions")
fig, _ = training_tracker.plot_predictions("Training: Predictions", model_dir=model_dir, name="img_training_predictions")
fig, _ = training_tracker.plot_training(model_dir=model_dir)
if show_plots:
plt.show()
plt.close('all')
# save the settings, results and model
mio.save_settings(model_dir, st)
mio.save_tracker_validation(model_dir, validation_tracker)
mio.save_tracker_training(model_dir, training_tracker)
mio.save_model(model_dir, model)

View File

@ -0,0 +1,187 @@
from ..util.data_loader import LabelConverter
import matplotlib.pyplot as plt
import time
import torch
import numpy as np
class EpochTracker:
"""
Track accuracy, loss, learning_rate etc. during model training
Can also be used for validation (which will probably be only one epoch)
"""
def __init__(self, labels: LabelConverter):
self.labels = labels
self.times: list[float] = [] # (epoch)
self.predictions = [[]] # (epoch, batch_nr, (correct_indices | predicted_indices), ind:ex_nr)
self.loss: list[float] = [] # (epoch)
self.learning_rate: list[float] = [] # (epoch)
self.epochs: list[int] = [] # 1 based for FINISHED epochs
self._current_epoch = 0 # 0 based
# after training
self.accuracies: list[float] = [] # (epoch)
def begin(self):
self.times.append(time.time())
def end(self):
self.times.append(time.time())
# if end_epoch was called before end:
if len(self.predictions[-1]) == 0:
self.predictions.pop()
self._current_epoch -= 1
else: # if end_epoch was not called
self.epochs.append(len(self.epochs) + 1)
self._calculate_accuracies(self._current_epoch)
s = f"Summary: After {self.epochs[-1]} epochs: "
s += f"Accuracy={self.accuracies[-1]:.2f}%"
s += f", Total time={self.get_total_time():.2f}s"
return s
def get_total_time(self):
if len(self.times) > 1: return self.times[-1] - self.times[0]
else: return -1
#
# EPOCH
#
def end_epoch(self, loss, learning_rate):
"""
loss and learning_rate of last epoch
call before scheduler.step()
"""
self.times.append(time.time())
self.epochs.append(len(self.epochs) + 1)
if type(loss) == torch.Tensor: self.loss.append(loss.item())
else: self.loss.append(loss)
self.learning_rate.append(learning_rate)
self._calculate_accuracies(self._current_epoch)
self._current_epoch += 1
self.predictions.append([])
def get_epoch_summary_str(self, ep=-1):
"""call after next_epoch()"""
m = max(ep, 0) # if ep == -1, check if len is > 0
assert(len(self.epochs) > m)
s = f"Epoch {self.epochs[ep]:3}"
if len(self.accuracies) > m:s += f", Accuracy={self.accuracies[ep]:.2f}%"
if len(self.loss) > m: s += f", Loss={self.loss[ep]:.3f}"
if len(self.loss) > m: s += f", lr={self.learning_rate[ep]:.4f}"
if len(self.times) > m+1: s += f", dt={self.times[ep] - self.times[ep-1]:.2f}s"
return s
def add_prediction(self, correct_indices: torch.Tensor, predicted_indices: torch.Tensor):
"""for accuracy calculation"""
self.predictions[self._current_epoch].append((correct_indices.detach().numpy(), predicted_indices.detach().numpy()))
#
# STATISTICS
#
def get_count_per_label(self, epoch=-1):
"""
the number of times where <label> was the correct label, per label
@returns shape: (label)
"""
count_per_label = [ 0 for _ in range(len(self.labels)) ]
for corr, _ in self.predictions[epoch]:
for batch in range(len(corr)):
count_per_label[corr[batch]] += 1
return count_per_label
def get_predictions_per_label(self, epoch=-1):
"""
How often label_i was predicted, when label_j was the correct label
@returns shape: (label_j, label_i)
"""
statistics = [ [ 0 for _ in range(len(self.labels)) ] for _ in range(len(self.labels)) ]
for corr, pred in self.predictions[epoch]:
for batch in range(len(corr)):
statistics[corr[batch]][pred[batch]] += 1
return statistics
def plot_training(self, title="Training Summary", model_dir=None, name="img_training"):
"""
@param model_dir: Optional. If given, save to model_dir as svg
"""
fig, ax = plt.subplots(nrows=3, ncols=1, sharex=True, layout="tight")
ax[0].plot(self.epochs, self.accuracies, color="red")
ax[0].set_ylabel("Accuracy")
ax[1].plot(self.epochs, self.learning_rate, color="green")
ax[1].set_ylabel("Learning Rate")
ax[2].plot(self.epochs, self.loss, color="blue")
ax[2].set_ylabel("Loss")
fig.suptitle(title)
ax[2].set_xlabel("Epoch")
plt.tight_layout()
if model_dir is not None:
fig.savefig(f"{model_dir}/{name}.svg")
return fig, ax
def plot_predictions(self, title="Predictions per Label", ep=-1, model_dir=None, name="img_training_predictions"):
"""
@param model_dir: Optional. If given, save to model_dir as svg
@param ep: Epoch, defaults to last
"""
# Normalize the data
predictions_per_label = self.get_predictions_per_label(ep)
normalized_predictions = predictions_per_label / np.sum(predictions_per_label, axis=1, keepdims=True)
N = len(self.labels)
label_names = self.labels.get_labels()
fig, ax = plt.subplots(layout="tight")
im = ax.imshow(normalized_predictions, cmap='Blues') # cmap='BuPu'
ax.set_xticks(np.arange(N))
ax.set_yticks(np.arange(N))
ax.set_xticklabels(label_names)
ax.set_yticklabels(label_names)
ax.set_xlabel('Predicted Label')
ax.set_ylabel('Correct Label')
# horizontal lines between labels to better show that the sum of a row is 1
for i in range(1, N):
ax.axhline(i-0.5, color='black', linewidth=1)
# rotate the x-axis labels for better readability
plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
# create annotations
for i in range(N):
for j in range(N):
text = ax.text(j, i, round(normalized_predictions[i, j], 2),
ha="center", va="center", color="black")
# add colorbar
cbar = ax.figure.colorbar(im, ax=ax)
ax.set_title(title)
plt.tight_layout()
if model_dir is not None:
fig.savefig(f"{model_dir}/{name}.svg")
return fig, ax
#
# CALCULATION
#
def _calculate_accuracies(self, ep):
correct_predictions = 0
total_predictions = 0
for correct_indices, predicted_indices in self.predictions[ep]:
correct_predictions += (predicted_indices == correct_indices).sum().item()
total_predictions += len(predicted_indices)
accuracy = correct_predictions / total_predictions * 100
while len(self.accuracies) <= ep:
self.accuracies.append(-1)
self.accuracies[ep] = accuracy

View File

@ -3,6 +3,9 @@ from os import path, listdir
import re import re
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from scipy.sparse import data
import threading
from sklearn.model_selection import train_test_split from sklearn.model_selection import train_test_split
@ -10,7 +13,7 @@ from sklearn.model_selection import train_test_split
re_filename = r"(\d{4}-\d{2}-\d{2})_([a-zA-Z_]+)_(\d{1,2}(?:\.\d*)?)V_(\d+(?:\.\d*)?)mm(\d+).csv" re_filename = r"(\d{4}-\d{2}-\d{2})_([a-zA-Z_]+)_(\d{1,2}(?:\.\d*)?)V_(\d+(?:\.\d*)?)mm(\d+).csv"
class LabelConverter: class LabelConverter:
def __init__(self, class_labels): def __init__(self, class_labels: list[str]):
self.class_labels = class_labels.copy() self.class_labels = class_labels.copy()
self.class_labels.sort() self.class_labels.sort()
@ -32,10 +35,12 @@ class LabelConverter:
def get_labels(self): def get_labels(self):
return self.class_labels.copy() return self.class_labels.copy()
def __repr__(self):
return str(self.class_labels)
class Datasample: class Datasample:
def __init__(self, date: str, label: str, voltage: str, distance: str, index: str, label_vec, datapath: str): def __init__(self, date: str, label: str, voltage: str, distance: str, index: str, label_vec, datapath: str, init_data=False):
self.date = date self.date = date
self.label = label self.label = label
self.voltage = float(voltage) self.voltage = float(voltage)
@ -44,50 +49,62 @@ class Datasample:
self.label_vec = label_vec self.label_vec = label_vec
self.datapath = datapath self.datapath = datapath
self.data = None self.data = None
if init_data: self._load_data()
def __repr__(self): def __repr__(self):
size = self.data.size if self.data is not None else "Unknown" size = self.data.size if self.data is not None else "Unknown"
return f"{self.label}-{self.index}: dimension={size}, recorded at {self.date} with U={self.voltage}V, d={self.distance}mm" return f"{self.label}-{self.index}: dimension={size}, recorded at {self.date} with U={self.voltage}V, d={self.distance}mm"
def _load_data(self): def _load_data(self):
df = pd.read_csv(self.datapath) # df = pd.read_csv(self.datapath)
self.data = df.to_numpy(dtype=np.float32) self.data = np.loadtxt(self.datapath, skiprows=1, dtype=np.float32, delimiter=",")
def get_data(self): def get_data(self):
"""[[timestamps, idata, vdata]]""" """[[timestamp, idata, vdata]]"""
if self.data is None: if self.data is None:
self._load_data() self._load_data()
return self.data return self.data
class Dataset: class Dataset:
""" """
Store the whole dataset, compatible with torch.data.Dataloader Store the whole dataset, compatible with torch.data.Dataloader
""" """
def __init__(self, datasamples, transforms=None): def __init__(self, datasamples, transforms=[], split_function=None):
self.datasamples = datasamples """
@param transforms: single callable or list of callables that are applied to the data (before eventual split)
@param split_function: (data) -> [data0, data1...] callable that splits the data
"""
self.transforms = transforms self.transforms = transforms
# self.labels = [ d.label_vec for d in datasamples ] self.data = [] # (data, label)
# self.data = [ d.get_data() for d in datasamples ] for sample in datasamples:
data = self.apply_transforms(sample.get_data())
if split_function is None:
self.data.append((data, sample.label_vec))
else:
for data_split in split_function(data):
self.data.append((data_split, sample.label_vec))
def __getitem__(self, index): def apply_transforms(self, data):
data, label = self.datasamples[index].get_data(), self.datasamples[index].label_vec
# print(f"loading dataset {self.datasamples[index]}")
if type(self.transforms) == list: if type(self.transforms) == list:
for t in self.transforms: for t in self.transforms:
data = t(data) data = t(data)
elif self.transforms: elif self.transforms is not None:
data = self.transforms(data) data = self.transforms(data)
# TODO return data
return data[:2000], label
def __getitem__(self, index):
return self.data[index]
def __len__(self): def __len__(self):
return len(self.datasamples) return len(self.data)
def load_datasets(datadir, labels: LabelConverter, transforms=None, voltage=None, train_to_test_ratio=0.7, random_state=None):
def get_datafiles(datadir, labels: LabelConverter, voltage=None):
""" """
load all data from datadir that are in the format: yyyy-mm-dd_label_x.xV_xxxmm.csv get a list of all matching datafiles from datadir that are in the format: yyyy-mm-dd_label_x.xV_xxxmm.csv
""" """
datasamples = [] datafiles = []
files = listdir(datadir) files = listdir(datadir)
files.sort() files.sort()
for file in files: for file in files:
@ -99,9 +116,38 @@ def load_datasets(datadir, labels: LabelConverter, transforms=None, voltage=None
sample_voltage = float(match.groups()[2]) sample_voltage = float(match.groups()[2])
if voltage and voltage != sample_voltage: continue if voltage and voltage != sample_voltage: continue
datafiles.append((datadir + "/" + file, match, label))
return datafiles
datasamples.append(Datasample(*match.groups(), labels.get_one_hot(label), datadir + "/" + file))
def load_datasets(datadir, labels: LabelConverter, transforms=None, split_function=None, voltage=None, train_to_test_ratio=0.7, random_state=None, num_workers=None):
"""
load all data from datadir that are in the format: yyyy-mm-dd_label_x.xV_xxxmm.csv
"""
datasamples = []
if num_workers == None:
for file, match, label in get_datafiles(datadir, labels, voltage):
datasamples.append(Datasample(*match.groups(), labels.get_one_hot(label), file))
else:
files = get_datafiles(datadir, labels, voltage)
def worker():
while True:
try:
file, match, label = files.pop()
except IndexError:
# No more files to process
return
datasamples.append(Datasample(*match.groups(), labels.get_one_hot(label), file, init_data=True))
threads = [threading.Thread(target=worker) for _ in range(num_workers)]
for t in threads:
t.start()
for t in threads:
t.join()
# TODO do the train_test_split after the Dataset split
# problem: needs to be after transforms
train_samples, test_samples = train_test_split(datasamples, train_size=train_to_test_ratio, shuffle=True, random_state=random_state) train_samples, test_samples = train_test_split(datasamples, train_size=train_to_test_ratio, shuffle=True, random_state=random_state)
train_dataset = Dataset(train_samples, transforms=transforms) train_dataset = Dataset(train_samples, transforms=transforms, split_function=split_function)
test_dataset = Dataset(test_samples, transforms=transforms) test_dataset = Dataset(test_samples, transforms=transforms, split_function=split_function)
return train_dataset, test_dataset return train_dataset, test_dataset

View File

@ -1,83 +0,0 @@
from ..util.data_loader import LabelConverter
import time
import torch
class EpochTracker:
"""
Track progress through epochs and generate statistics
"""
def __init__(self, labels: LabelConverter):
# Training
self.accuracy = []
self.loss = []
self.times = [] # timestamps for each epoch end
self.trainings = []
self.training_indices = [[]] # epoch, batch_nr, (correct_indices, predicted_indices), ind:ex_nr
self._current_epoch = 0
self.labels = labels
# Testing
self.tests = [] # (correct_indices, predicted_indices)
def train_begin(self):
"""for time tracking"""
self.times.append(time.time())
# TRAINING
def train(self, correct_indices: torch.Tensor, predicted_indices: torch.Tensor):
self.training_indices[self._current_epoch].append((correct_indices, predicted_indices))
def next_epoch(self, loss):
self.times.append(time.time())
self.loss.append(loss)
correct_predictions = 0
total_predictions = 0
for predicted_indices, correct_indices in self.training_indices[self._current_epoch]:
correct_predictions += (predicted_indices == correct_indices).sum().item()
total_predictions += predicted_indices.size(0)
accuracy = 100 * correct_predictions / total_predictions
self.accuracy.append(accuracy)
self._current_epoch += 1
self.training_indices.append([])
def get_last_epoch_summary_str(self):
"""call after next_epoch()"""
return f"Epoch {self._current_epoch:3}: Accuracy={self.accuracy[-1]:.2f}, Loss={self.loss[-1]:.3f}, Training duration={self.times[-1] - self.times[0]:.2f}s"
def get_last_epoch_summary(self):
"""
@returns accuracy, loss, training time
"""
return self.accuracy[-1], self.loss[-1], self.times[-1] - self.times[0]
def get_training_count_per_label(self):
count_per_label = [ 0 for _ in range(len(self.labels)) ]
for i in range(len(self.training_indices)):
for j in range(len(self.training_indices[i])):
for k in range(self.training_indices[i][j][0].size(0)):
# epoch, batch_nr, 0 = correct_indices, correct_index_nr
count_per_label[self.training_indices[i][j][0][k]] += 1
return count_per_label
def __len__(self):
return len(self.accuracy)
def __getitem__(self, idx):
return (self.accuracy[idx], self.loss[idx])
# TESTING
def test(self, correct_indices: torch.Tensor, predicted_indices: torch.Tensor):
"""
@param correct_indices and predicted_indices: 1 dim Tensor
"""
for i in range(correct_indices.size(0)):
self.tests.append((correct_indices[i], predicted_indices[i]))
def get_test_statistics(self):
# label i, label_j was predicted when label_i was correct
statistics = [ [ 0 for _ in range(len(self.labels))] for _ in range(len(self.labels)) ]
for corr, pred in self.tests:
statistics[corr][pred] += 1
print(statistics)
return statistics

34
teng-ml/util/file_io.py Normal file
View File

@ -0,0 +1,34 @@
from os import listdir, path
def add_zeros(v: int, digits=3):
"""
return v as string, add leading zeros if len(str(v)) < digits
"""
s = str(v)
return '0' * (max(digits - len(s), 0)) + s
def get_next_digits(basename, directory=".", digits=3):
"""
get the next filename digits
example:
basename = file
directory has file001.csv, file002.pkl, file004.csv
-> return 005
"""
files = listdir(directory)
files.sort()
files.reverse()
lowest_number = -1
for file in files:
if not file.startswith(basename): continue
try:
dot = file.rfind('.')
if dot > 0: file = file[:dot]
number = int(file.replace(basename, ""))
if number < lowest_number: continue
lowest_number = number
except ValueError:
continue
return add_zeros(lowest_number+1)

45
teng-ml/util/model_io.py Normal file
View File

@ -0,0 +1,45 @@
from ..tracker.epoch_tracker import EpochTracker
from ..util.settings import MLSettings
import pickle
"""
Load and save model, settings and EpochTrackers from/on disk
"""
def load_tracker_validation(model_dir):
with open(f"{model_dir}/tracker_validation.pkl", "rb") as file:
validation_tracker: EpochTracker = pickle.load(file)
return validation_tracker
def load_tracker_training(model_dir):
with open(f"{model_dir}/tracker_training.pkl", "rb") as file:
training_tracker: EpochTracker = pickle.load(file)
return training_tracker
def load_settings(model_dir):
with open(f"{model_dir}/settings.pkl", "rb") as file:
st: MLSettings = pickle.load(file)
return st
def load_model(model_dir):
with open(f"{model_dir}/model.pkl", "rb") as file:
model = pickle.load(file)
return model
def save_tracker_validation(model_dir, validation_tracker: EpochTracker):
with open(f"{model_dir}/tracker_validation.pkl", "wb") as file:
pickle.dump(validation_tracker, file)
def save_tracker_training(model_dir, training_tracker: EpochTracker):
with open(f"{model_dir}/tracker_training.pkl", "wb") as file:
pickle.dump(training_tracker, file)
def save_settings(model_dir, st):
with open(f"{model_dir}/settings.pkl", "wb") as file:
pickle.dump(st, file)
def save_model(model_dir, model):
with open(f"{model_dir}/model.pkl", "wb") as file:
pickle.dump(model, file)

View File

@ -1,4 +1,5 @@
from ..util.data_loader import LabelConverter from ..util.data_loader import LabelConverter
from ..util.split import DataSplitter
class MLSettings: class MLSettings:
""" """
@ -9,7 +10,11 @@ class MLSettings:
num_layers=1, num_layers=1,
hidden_size=1, hidden_size=1,
bidirectional=True, bidirectional=True,
optimizer=None,
scheduler=None,
loss_func=None,
transforms=[], transforms=[],
splitter=None,
num_epochs=10, num_epochs=10,
batch_size=5, batch_size=5,
labels=LabelConverter([]), labels=LabelConverter([]),
@ -19,7 +24,11 @@ class MLSettings:
self.hidden_size = hidden_size self.hidden_size = hidden_size
self.num_epochs = num_epochs self.num_epochs = num_epochs
self.bidirectional = bidirectional self.bidirectional = bidirectional
self.optimizer = optimizer
self.scheduler = scheduler
self.loss_func = loss_func
self.transforms = transforms self.transforms = transforms
self.splitter = splitter
self.batch_size = batch_size self.batch_size = batch_size
self.labels = labels self.labels = labels
@ -30,6 +39,7 @@ class MLSettings:
H = hidden_size H = hidden_size
B = bidirectional B = bidirectional
T = #transforms T = #transforms
S = splitter
E = #epochs E = #epochs
""" """
return f"F{self.num_features}L{self.num_layers}H{self.hidden_size}B{'1' if self.bidirectional else '0'}T{len(self.transforms)}" return f"F{self.num_features}L{self.num_layers}H{self.hidden_size:02}B{'1' if self.bidirectional else '0'}T{len(self.transforms)}S{self.splitter.split_size if type(self.splitter) == DataSplitter is not None else 0:03}E{self.num_epochs:03}"

23
teng-ml/util/split.py Normal file
View File

@ -0,0 +1,23 @@
import numpy as np
class DataSplitter:
r"""
Split a numpy array into smaller arrays of size datapoints_per_split
If data.shape(0) % datapoints_per_split != 0, the remaining datapoints are dropped
"""
def __init__(self, datapoints_per_split):
self.split_size = datapoints_per_split
def __call__(self, data: np.ndarray):
"""
data: [[t, i, v]]
"""
ret_data = []
for i in range(self.split_size, data.shape[0], self.split_size):
ret_data.append(data[i-self.split_size:i, :])
if len(ret_data) == 0:
raise ValueError(f"data has only {data.shape[0]}, but datapoints_per_split is set to {self.split_size}")
return ret_data
def __repr__(self):
return f"DataSplitter({self.split_size})"

51
teng-ml/util/string.py Normal file
View File

@ -0,0 +1,51 @@
import inspect
import torch.optim.lr_scheduler as sd
import re
def fill_and_center(s: str, fill_char="=", length=100):
rs = fill_char * length
margin = (length - len(s)) // 2
if margin > 1:
rs = f"{fill_char*(margin-1)} {s} {fill_char*(margin-1)}"
if len(rs) == 99: rs = rs + "="
assert(len(rs) == 100)
return rs
else:
return s
def class_str(x):
"""
Return the constructor of the class of x with arguemnts
"""
name = type(x).__name__
signature = inspect.signature(type(x))
params = []
for param_name, param_value in x.__dict__.items():
if param_name not in signature.parameters:
continue
default_value = signature.parameters[param_name].default
if param_value != default_value:
params.append(f"{param_name}={param_value!r}")
if params:
return f"{name}({', '.join(params)})"
else:
return name
def cleanup_str(s):
"""
convert to string if necessary and
if scheduler string:
remove unnecessary parameters
"""
if not type(s) == str:
s = str(s)
# check if scheduler string
re_scheduler = r"(\w+)\((.*)(optimizer=[A-Za-z]+) \(.*(initial_lr: [\d.]+).*?\)(.*)\)"
# groups: (sched_name, sched_params1, optimizer=Name, initial_lr: digits, sched_params2)
match = re.fullmatch(re_scheduler, s.replace("\n", " "))
if match:
g = match.groups()
s = f"{g[0]}({g[1]}{g[2]}({g[3]}, ...){g[4]})"
return s
return s

View File

@ -54,4 +54,3 @@ class ConstantInterval:
def __repr__(self): def __repr__(self):
return f"ConstantInterval(interval={self.interval})" return f"ConstantInterval(interval={self.interval})"