model working

This commit is contained in:
Matthias@Dell 2023-05-05 18:26:44 +02:00
parent c56f51b8e5
commit 341cc39f24
3 changed files with 80 additions and 44 deletions

View File

@ -42,88 +42,104 @@ if __name__ == "__main__":
)
print(f"Using device: {device}")
settings = {}
labels = LabelConverter(["foam", "glass", "kapton", "foil", "cloth", "rigid_foam"])
t_const_int = ConstantInterval(0.01)
t_norm = Normalize(0, 1)
train_set, test_set = load_datasets("/home/matth/Uni/TENG/testdata", labels, voltage=8.2, transforms=[t_const_int], train_to_test_ratio=0.7, random_state=42)
transforms = [ t_const_int, t_norm ]
settings["transforms"] = str(transforms)
train_set, test_set = load_datasets("/home/matth/Uni/TENG/data", labels, voltage=8.2, transforms=[t_const_int], train_to_test_ratio=0.7, random_state=42)
# train_loader = iter(DataLoader(train_set))
# test_loader = iter(DataLoader(test_set))
train_loader = iter(DataLoader(train_set, batch_size=3, shuffle=True))
test_loader = iter(DataLoader(test_set, batch_size=3, shuffle=True))
train_loader = DataLoader(train_set, batch_size=3, shuffle=True)
test_loader = DataLoader(test_set, batch_size=3, shuffle=True)
# , dtype=torch.float32
sample = next(train_loader)
print(sample)
# sample = next(train_loader)
# print(sample)
feature_count = 1
settings["feature_count"] = str(feature_count)
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes, if_bidirectional):
def __init__(self, input_size, hidden_size, num_layers, num_classes, bidirectional):
super(RNN, self).__init__()
self.num_layers = num_layers
self.hidden_size = hidden_size
self.if_bidirectional = if_bidirectional
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=if_bidirectional)
self.is_bidirectional = bidirectional
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=bidirectional)
# x = (batch_size, sequence, feature)
if if_bidirectional == True:
if bidirectional == True:
self.fc = nn.Linear(hidden_size * 2, num_classes)
else:
self.fc = nn.Linear(hidden_size, num_classes)
self.softmax = nn.Softmax(dim=1)
def forward(self, x):
# x: batches, length, features
print(f"forward pass")
D = 2 if self.if_bidirectional == True else 1
# print(f"forward pass")
D = 2 if self.is_bidirectional == True else 1
print(f"x({x.shape})=...")
# print(f"x({x.shape})=...")
batch_size = x.shape[0]
print(f"batch_size={batch_size}")
# print(f"batch_size={batch_size}")
h0 = torch.zeros(D * self.num_layers, batch_size, self.hidden_size).to(device)
print(f"h0({h0.shape})=...")
# print(f"h1({h0.shape})=...")
c0 = torch.zeros(D * self.num_layers, batch_size, self.hidden_size).to(device)
x.to(device)
_, (h_n, _) = self.lstm(x, (h0, c0))
print(f"h_n({h_n.shape})=...")
# print(f"h_n({h_n.shape})=...")
final_state = h_n.view(self.num_layers, D, batch_size, self.hidden_size)[-1] # num_layers, num_directions, batch, hidden_size
print(f"final_state({final_state.shape})=...")
# print(f"final_state({final_state.shape})=...")
if D == 1:
X = final_state.squeeze()
X = final_state.squeeze() # TODO what if batch_size == 1
elif D == 2:
h_1, h_2 = final_state[0], final_state[1] # forward & backward pass
#X = h_1 + h_2 # Add both states
X = torch.cat((h_1, h_2), 1) # Concatenate both states, X-size: (Batch, hidden_size * 2
else:
raise ValueError("D must be 1 or 2")
# print(f"X({X.shape})={X}")
output = self.fc(X) # fully-connected layer
print(f"out({output.shape})={output}")
# print(f"out({output.shape})={output}")
output = self.softmax(output)
# print(f"out({output.shape})={output}")
return output
model=RNN(input_size=1, hidden_size=8, num_layers=3, num_classes=18, if_bidirectional=True).to(device)
model=RNN(input_size=1, hidden_size=8, num_layers=3, num_classes=len(labels), bidirectional=True).to(device)
loss_func = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)
print(model)
print(f"model:", model)
print(f"loss_func={loss_func}")
print(f"optimizer={optimizer}")
print(f"scheduler={scheduler}")
num_epochs = 10
# training
for ep in range(40):
for ep in range(num_epochs):
train_correct = 0
train_total = 0
val_correct = 0
val_total = 0
for data, y in train_loader:
for i, (data, y) in enumerate(train_loader):
# print(data, y)
# data = batch, seq, features
print(ep, "Train")
# print(f"data({data.shape})={data}")
x = data[:,:,[2]].float() # select voltage data
print(f"x({x.shape}, {x.dtype})=...")
print(f"y({y.shape}, {y.dtype})=...")
# print(f"x({x.shape}, {x.dtype})=...")
# print(f"y({y.shape}, {y.dtype})=...")
# length = torch.tensor([x.shape[1] for _ in range(x.shape[0])], dtype=torch.int64)
# print(f"length({length.shape})={length}")
# batch_size = x.shape[0]
@ -134,6 +150,7 @@ if __name__ == "__main__":
# print(data.batch_sizes[0])
# print(data)
out = model(x)
# print(f"out({out.shape}={out})")
loss = loss_func(out, y)
# print(loss)
@ -141,28 +158,38 @@ if __name__ == "__main__":
loss.backward() # backpropagation, compute gradients
optimizer.step() # apply gradients
predicted = torch.max(torch.nn.functional.softmax(out), 1)[1]
# predicted = torch.max(torch.nn.functional.softmax(out), 1)[1]
predicted = torch.argmax(out, dim=1, keepdim=False) # -> [ label_indices ]
actual = torch.argmax(y, dim=1, keepdim=False) # -> [ label_indices ]
# print(f"predicted={predicted}, actual={actual}")
train_total += y.size(0)
train_correct += (predicted == y).sum().item()
train_correct += (predicted == actual).sum().item()
print(f"epoch={ep+1:3}: Training accuracy={100 * train_correct / train_total:.2f}%, loss={loss:3f}")
scheduler.step()
for data, y in test_loader:
print(ep, "Test")
x = data[:,:,[2]]
print(f"x({x.shape})={x}")
# length = torch.tensor(x.shape[1], dtype=torch.int64)
# print(f"length={length}")
# batch_size = x.shape[0]
# print(f"batch_size={batch_size}")
# v = x.view(batch_size, -1, feature_count)
# data = rnn_utils.pack_padded_sequence(v.type(torch.FloatTensor), length, batch_first=True).to(device)
with torch.no_grad():
for i, (data, y) in enumerate(test_loader):
# print(ep, "Test")
x = data[:,:,[2]].float()
out = model(x)
loss = loss_func(out, y)
predicted = torch.max(torch.nn.functional.softmax(out), 1)[1]
predicted = torch.argmax(out, dim=1, keepdim=False) # -> [ label_indices ]
actual = torch.argmax(y, dim=1, keepdim=False) # -> [ label_indices ]
# print(f"predicted={predicted}, actual={actual}")
val_total += y.size(0)
val_correct += (predicted == y).sum().item()
val_correct += (predicted == actual).sum().item()
# print(f"train_total={train_total}, val_total={val_total}")
if train_total == 0: train_total = -1
if val_total == 0: val_total = -1
print(f"epoch={ep+1:3}: Testing accuracy={100 * val_correct / val_total:.2f}")
print(f"End result: Training accuracy={100 * train_correct / train_total:.2f}%, Testing accuracy={100 * val_correct / val_total:.2f}")
settings["model"] = str(model)
with open("settings.txt", "w") as file:
file.write(str(settings))
print("epoch: ", ep + 1, 'Accuracy of the Train: %.2f %%' % (100 * train_correct / train_total), 'Accuracy of the Test: %.2f %%' % (100 * val_correct / val_total))

View File

@ -26,6 +26,9 @@ class LabelConverter:
def __contains__(self, value):
return value in self.class_labels
def __len__(self):
return len(self.class_labels)
def get_labels(self):
return self.class_labels.copy()
@ -52,7 +55,7 @@ class Datasample:
def get_data(self):
"""[[timestamps, idata, vdata]]"""
if not self.data:
if self.data is None:
self._load_data()
return self.data

View File

@ -20,6 +20,9 @@ class Normalize:
a -= self.low
return a
def __repr__(self):
return f"Normalize(low={self.low}, high={self.high})"
class ConstantInterval:
"""
@ -49,3 +52,6 @@ class ConstantInterval:
# sug_interval = 0.5 * avg_interval
# print(f"Average interval: {avg_interval}, Suggestion: {sug_interval}")
def __repr__(self):
return f"ConstantInterval(interval={self.interval})"