Compare commits
3 Commits
6fd9902aab
...
13d436acef
Author | SHA1 | Date | |
---|---|---|---|
|
13d436acef | ||
|
98a07fd64f | ||
|
2aba5fcd0f |
118
teng-ml/main.py
118
teng-ml/main.py
@ -1,6 +1,3 @@
|
|||||||
import matplotlib.pyplot as plt
|
|
||||||
import pandas as pd
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
if __package__ is None:
|
if __package__ is None:
|
||||||
# make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change
|
# make relative imports work as described here: https://peps.python.org/pep-0366/#proposed-change
|
||||||
@ -10,10 +7,18 @@ if __name__ == "__main__":
|
|||||||
filepath = path.realpath(path.abspath(__file__))
|
filepath = path.realpath(path.abspath(__file__))
|
||||||
sys.path.insert(0, path.dirname(path.dirname(filepath)))
|
sys.path.insert(0, path.dirname(path.dirname(filepath)))
|
||||||
|
|
||||||
from .util.transform import ConstantInterval
|
import matplotlib.pyplot as plt
|
||||||
|
import pandas as pd
|
||||||
|
import torch
|
||||||
|
import torch.nn as nn
|
||||||
|
from torch.utils.data import DataLoader
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
file = "/home/matth/data/2023-04-25_kapton_8.2V_179mm002.csv"
|
from .util.transform import ConstantInterval
|
||||||
|
from .util.data_loader import load_datasets, LabelConverter
|
||||||
|
|
||||||
|
def test_interpol():
|
||||||
|
file = "/home/matth/data/2023-04-27_glass_8.2V_179mm000.csv"
|
||||||
# file = "/home/matth/data/test001.csv"
|
# file = "/home/matth/data/test001.csv"
|
||||||
df = pd.read_csv(file)
|
df = pd.read_csv(file)
|
||||||
array = df.to_numpy()
|
array = df.to_numpy()
|
||||||
@ -27,3 +32,104 @@ if __name__ == "__main__":
|
|||||||
ax1.legend()
|
ax1.legend()
|
||||||
plt.show()
|
plt.show()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
device = (
|
||||||
|
"cuda"
|
||||||
|
if torch.cuda.is_available()
|
||||||
|
else "mps"
|
||||||
|
if torch.backends.mps.is_available()
|
||||||
|
else "cpu"
|
||||||
|
)
|
||||||
|
print(f"Using device: {device}")
|
||||||
|
|
||||||
|
labels = LabelConverter(["foam", "glass", "kapton", "foil"])
|
||||||
|
train_set, test_set = load_datasets("/home/matth/data", labels, voltage=8.2)
|
||||||
|
|
||||||
|
# train_loader = iter(DataLoader(train_set))
|
||||||
|
# test_loader = iter(DataLoader(test_set))
|
||||||
|
# sample = next(train_loader)
|
||||||
|
# print(sample)
|
||||||
|
train_loader = iter(DataLoader(train_set))
|
||||||
|
test_loader = iter(DataLoader(test_set))
|
||||||
|
class RNN(nn.Module):
|
||||||
|
def __init__(self, input_size, hidden_size, num_layers, num_classes, if_bidirectional):
|
||||||
|
super(RNN, self).__init__()
|
||||||
|
self.num_layers = num_layers
|
||||||
|
self.hidden_size = hidden_size
|
||||||
|
self.if_bidirectional = if_bidirectional
|
||||||
|
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=if_bidirectional)
|
||||||
|
|
||||||
|
if if_bidirectional == True:
|
||||||
|
self.fc = nn.Linear(hidden_size * 2, num_classes)
|
||||||
|
else:
|
||||||
|
self.fc = nn.Linear(hidden_size, num_classes)
|
||||||
|
|
||||||
|
|
||||||
|
def forward(self, x):
|
||||||
|
D = 2 if self.if_bidirectional == True else 1
|
||||||
|
Batch = x.batch_sizes[0]
|
||||||
|
|
||||||
|
h0 = torch.zeros(D * self.num_layers, Batch, self.hidden_size).to(device)
|
||||||
|
c0 = torch.zeros(D * self.num_layers, Batch, self.hidden_size).to(device)
|
||||||
|
x.to(device)
|
||||||
|
_, (h_n, _) = self.lstm(x, (h0, c0))
|
||||||
|
final_state = h_n.view(self.num_layers, D, Batch, self.hidden_size)[-1] # num_layers, num_directions, batch, hidden_size
|
||||||
|
|
||||||
|
if D == 1:
|
||||||
|
X = final_state.squeeze()
|
||||||
|
elif D == 2:
|
||||||
|
h_1, h_2 = final_state[0], final_state[1] # forward & backward pass
|
||||||
|
#X = h_1 + h_2 # Add both states
|
||||||
|
X = torch.cat((h_1, h_2), 1) # Concatenate both states, X-size: (Batch, hidden_size * 2)
|
||||||
|
|
||||||
|
output = self.fc(X) # fully-connected layer
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
model = RNN(input_size = 1, hidden_size = 8, num_layers = 3, num_classes = 18, if_bidirectional = True).to(device)
|
||||||
|
loss_func = torch.nn.CrossEntropyLoss()
|
||||||
|
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)
|
||||||
|
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)
|
||||||
|
|
||||||
|
print(model)
|
||||||
|
|
||||||
|
# training
|
||||||
|
for ep in range(40):
|
||||||
|
train_correct = 0
|
||||||
|
train_total = 0
|
||||||
|
val_correct = 0
|
||||||
|
val_total = 0
|
||||||
|
for (x, y), length in train_loader:
|
||||||
|
batch_size = x.shape[0]
|
||||||
|
v = x.view(batch_size, -1, nFeatrue)
|
||||||
|
data = rnn_utils.pack_padded_sequence(v.type(torch.FloatTensor), length, batch_first=True).to(device)
|
||||||
|
# print(data.batch_sizes[0])
|
||||||
|
# print(data)
|
||||||
|
out = model(data)
|
||||||
|
loss = loss_func(out, y)
|
||||||
|
# print(loss)
|
||||||
|
|
||||||
|
optimizer.zero_grad() # clear gradients for next train
|
||||||
|
loss.backward() # backpropagation, compute gradients
|
||||||
|
optimizer.step() # apply gradients
|
||||||
|
|
||||||
|
predicted = torch.max(torch.nn.functional.softmax(out), 1)[1]
|
||||||
|
train_total += y.size(0)
|
||||||
|
train_correct += (predicted == y).sum().item()
|
||||||
|
|
||||||
|
|
||||||
|
scheduler.step()
|
||||||
|
|
||||||
|
for (x, y), length in test_loader:
|
||||||
|
batch_size = x.shape[0]
|
||||||
|
v = x.view(batch_size, -1, nFeatrue)
|
||||||
|
data = rnn_utils.pack_padded_sequence(v.type(torch.FloatTensor), length, batch_first=True).to(device)
|
||||||
|
out = model(data)
|
||||||
|
loss = loss_func(out, y)
|
||||||
|
|
||||||
|
predicted = torch.max(torch.nn.functional.softmax(out), 1)[1]
|
||||||
|
val_total += y.size(0)
|
||||||
|
val_correct += (predicted == y).sum().item()
|
||||||
|
|
||||||
|
print("epoch: ", ep + 1, 'Accuracy of the Train: %.2f %%' % (100 * train_correct / train_total), 'Accuracy of the Test: %.2f %%' % (100 * val_correct / val_total))
|
||||||
|
|
||||||
|
@ -6,7 +6,6 @@ from time import sleep
|
|||||||
from random import choice as r_choice
|
from random import choice as r_choice
|
||||||
from sys import exit
|
from sys import exit
|
||||||
|
|
||||||
from .util.transform import Normalize
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
if __package__ is None:
|
if __package__ is None:
|
||||||
@ -16,7 +15,8 @@ if __name__ == "__main__":
|
|||||||
from os import path
|
from os import path
|
||||||
filepath = path.realpath(path.abspath(__file__))
|
filepath = path.realpath(path.abspath(__file__))
|
||||||
sys.path.insert(0, path.dirname(path.dirname(filepath)))
|
sys.path.insert(0, path.dirname(path.dirname(filepath)))
|
||||||
from .utility.data import load_dataframe
|
|
||||||
|
from .util.transform import Normalize
|
||||||
|
|
||||||
file = "/home/matth/data/2023-04-25_kapton_8.2V_179mm002.csv"
|
file = "/home/matth/data/2023-04-25_kapton_8.2V_179mm002.csv"
|
||||||
|
|
||||||
@ -101,7 +101,7 @@ if __name__ == "__main__":
|
|||||||
Peak identification:
|
Peak identification:
|
||||||
plot, let user choose first, second, last and lowest peak for identification
|
plot, let user choose first, second, last and lowest peak for identification
|
||||||
"""
|
"""
|
||||||
df = load_dataframe(file)
|
df = pd.read_csv(file)
|
||||||
a = df.to_numpy()
|
a = df.to_numpy()
|
||||||
|
|
||||||
# a2 = interpolate_to_linear_time()
|
# a2 = interpolate_to_linear_time()
|
39
teng-ml/rnn.py
Normal file
39
teng-ml/rnn.py
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
import torch
|
||||||
|
import torch.nn as nn
|
||||||
|
|
||||||
|
# BiLSTM Model
|
||||||
|
|
||||||
|
class RNN(nn.Module):
|
||||||
|
def __init__(self, input_size, hidden_size, num_layers, num_classes, if_bidirectional):
|
||||||
|
super(RNN, self).__init__()
|
||||||
|
self.num_layers = num_layers
|
||||||
|
self.hidden_size = hidden_size
|
||||||
|
self.if_bidirectional = if_bidirectional
|
||||||
|
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=if_bidirectional)
|
||||||
|
|
||||||
|
if if_bidirectional == True:
|
||||||
|
self.fc = nn.Linear(hidden_size * 2, num_classes)
|
||||||
|
else:
|
||||||
|
self.fc = nn.Linear(hidden_size, num_classes)
|
||||||
|
|
||||||
|
|
||||||
|
def forward(self, x):
|
||||||
|
D = 2 if self.if_bidirectional == True else 1
|
||||||
|
Batch = x.batch_sizes[0]
|
||||||
|
|
||||||
|
h0 = torch.zeros(D * self.num_layers, Batch, self.hidden_size).to(device)
|
||||||
|
c0 = torch.zeros(D * self.num_layers, Batch, self.hidden_size).to(device)
|
||||||
|
x.to(device)
|
||||||
|
_, (h_n, _) = self.lstm(x, (h0, c0))
|
||||||
|
final_state = h_n.view(self.num_layers, D, Batch, self.hidden_size)[-1] # num_layers, num_directions, batch, hidden_size
|
||||||
|
|
||||||
|
if D == 1:
|
||||||
|
X = final_state.squeeze()
|
||||||
|
elif D == 2:
|
||||||
|
h_1, h_2 = final_state[0], final_state[1] # forward & backward pass
|
||||||
|
#X = h_1 + h_2 # Add both states
|
||||||
|
X = torch.cat((h_1, h_2), 1) # Concatenate both states, X-size: (Batch, hidden_size * 2)
|
||||||
|
|
||||||
|
output = self.fc(X) # fully-connected layer
|
||||||
|
|
||||||
|
return output
|
@ -1,25 +1,95 @@
|
|||||||
|
|
||||||
def load_data():
|
from os import path, listdir
|
||||||
# Build the category_lines dictionary, a list of names per language
|
import re
|
||||||
category_lines = {}
|
import numpy as np
|
||||||
all_categories = []
|
import pandas as pd
|
||||||
|
|
||||||
def find_files(path):
|
from sklearn.model_selection import train_test_split
|
||||||
return glob.glob(path)
|
|
||||||
|
|
||||||
# Read a file and split into lines
|
# groups: date, name, voltage, distance, index
|
||||||
def read_lines(filename):
|
re_filename = r"(\d{4}-\d{2}-\d{2})_([a-zA-Z]+)_(\d{1,2}(?:\.\d*)?)V_(\d+(?:\.\d*)?)mm(\d+).csv"
|
||||||
lines = io.open(filename, encoding='utf-8').read().strip().split('\n')
|
|
||||||
return [unicode_to_ascii(line) for line in lines]
|
|
||||||
|
|
||||||
for filename in find_files('data/names/*.txt'):
|
class LabelConverter:
|
||||||
category = os.path.splitext(os.path.basename(filename))[0]
|
def __init__(self, class_labels):
|
||||||
all_categories.append(category)
|
self.class_labels = class_labels.copy()
|
||||||
|
self.class_labels.sort()
|
||||||
|
|
||||||
lines = read_lines(filename)
|
def get_one_hot(self, label):
|
||||||
category_lines[category] = lines
|
"""return one hot vector for given label"""
|
||||||
|
vec = np.zeros(len(self.class_labels), dtype=np.float32)
|
||||||
|
vec[self.class_labels.index(label)] = 1.0
|
||||||
|
return vec
|
||||||
|
|
||||||
return category_lines, all_categories
|
def __getitem__(self, index):
|
||||||
|
return self.class_labels[index]
|
||||||
|
|
||||||
|
def __contains__(self, value):
|
||||||
|
return value in self.class_labels
|
||||||
|
|
||||||
|
def get_labels(self):
|
||||||
|
return self.class_labels.copy()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class Datasample:
|
||||||
|
def __init__(self, date: str, label: str, voltage: str, distance: str, index: str, label_vec, datapath: str):
|
||||||
|
self.date = date
|
||||||
|
self.label = label
|
||||||
|
self.voltage = float(voltage)
|
||||||
|
self.distance = float(distance)
|
||||||
|
self.index = int(index)
|
||||||
|
self.label_vec = label_vec
|
||||||
|
self.datapath = datapath
|
||||||
|
self.data = None
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
size = self.data.size if self.data else "Unknown"
|
||||||
|
return f"{self.label}-{self.index}: dimension={size}, recorded at {self.date} with U={self.voltage}V, d={self.distance}mm"
|
||||||
|
|
||||||
|
def _load_data(self):
|
||||||
|
df = pd.read_csv(self.datapath)
|
||||||
|
self.data = df.to_numpy()
|
||||||
|
|
||||||
|
def get_data(self):
|
||||||
|
"""[[timestamps, idata, vdata]]"""
|
||||||
|
if not self.data:
|
||||||
|
self._load_data()
|
||||||
|
return self.data
|
||||||
|
|
||||||
|
class Dataset:
|
||||||
|
"""
|
||||||
|
Store the whole dataset, compatible with torch.data.Dataloader
|
||||||
|
"""
|
||||||
|
def __init__(self, datasamples):
|
||||||
|
self.datasamples = datasamples
|
||||||
|
# self.labels = [ d.label_vec for d in datasamples ]
|
||||||
|
# self.data = [ d.get_data() for d in datasamples ]
|
||||||
|
|
||||||
|
def __getitem__(self, index):
|
||||||
|
return self.datasamples[index].get_data(), self.datasamples[index].label_vec
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return len(self.datasamples)
|
||||||
|
|
||||||
|
def load_datasets(datadir, labels: LabelConverter, voltage=None, train_to_test_ratio=0.7, random_state=None):
|
||||||
|
"""
|
||||||
|
load all data from datadir that are in the format: yyyy-mm-dd_label_x.xV_xxxmm.csv
|
||||||
|
"""
|
||||||
|
datasamples = []
|
||||||
|
files = listdir(datadir)
|
||||||
|
files.sort()
|
||||||
|
for file in files:
|
||||||
|
match = re.fullmatch(re_filename, file)
|
||||||
|
if not match: continue
|
||||||
|
|
||||||
|
label = match.groups()[1]
|
||||||
|
if label not in labels: continue
|
||||||
|
|
||||||
|
sample_voltage = float(match.groups()[2])
|
||||||
|
if voltage and voltage != sample_voltage: continue
|
||||||
|
|
||||||
|
datasamples.append(Datasample(*match.groups(), labels.get_one_hot(label), datadir + "/" + file))
|
||||||
|
train_samples, test_samples = train_test_split(datasamples, train_size=train_to_test_ratio, shuffle=True, random_state=random_state)
|
||||||
|
train_dataset = Dataset(train_samples)
|
||||||
|
test_dataset = Dataset(test_samples)
|
||||||
|
return train_dataset, test_dataset
|
||||||
|
Loading…
Reference in New Issue
Block a user