Link for the dataset – https://www.kaggle.com/datasets/risangbaskoro/wlasl-processed
This is preprocess dataset code.
# IMPORTS
import os
import sys
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import v2
import numpy as np
# import matplotlib.pyplot as plt
# import torch.nn as nn
# from sklearn.metrics import classification_report
import time
# from torchvision import models
####
import json
import cv2
import random
# from torch.cuda.amp import autocast, GradScaler
# Transformations to apply to each frame
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
transformations = v2.Compose([
v2.ToImage(),
v2.ToDtype(torch.float32, scale=True), # Scale to range [0, 1]
v2.Normalize(mean=mean, std=std) # doing mean and std for dataset using pytorch Imagenet dataset values; ResNet was trained on this
])
def preprocess_videos(bbox, fps, frame_end, frame_start, video_path):
cap = cv2.VideoCapture(video_path)
duration_frames = 1 * 30 # 2 sec, 30 fps = 60
if not cap.isOpened():
print("[INFO] Warning: Could not open video." + video_path)
return
frames = []
current_frame = 1
while True:
ret, frame = cap.read()
if not ret:
break
if (current_frame >= frame_start) and (frame_end == -1 or current_frame <= frame_end):
# Apply bounding box
x_min, y_min, x_max, y_max = bbox
cropped_frame = frame[y_min:y_max, x_min:x_max]
# Resize frame
output_frame_size = (500, 500) # Arbitrary
resized_frame = cv2.resize(cropped_frame,
output_frame_size) ## I can specify the interpolation method for better resizing such as BiCubic, Bilinear, etc.
# Randomly flipping
if random.random() > 0.5:
resized_frame = cv2.flip(resized_frame, 0) # 1 means flipping around y-axis # 0 means x-axis
# Randomly rotate the frame
rotation_choices = [0, cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_180, cv2.ROTATE_90_COUNTERCLOCKWISE]
rotation = random.choice(rotation_choices)
if rotation != 0:
resized_frame = cv2.rotate(resized_frame, rotation)
# Convert cv2 BGR to RGB format
resized_frame = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB)
# Transpose the frame from [height, width, channels] to [channels, height, width]
resized_frame = np.transpose(resized_frame, (2, 0, 1)) # This format is PyTorch Native
# Tensor casting
# Maybe no need for numpy to tensor as transformations take ndarray directly.
tensor_frame = torch.from_numpy(resized_frame) # Change numpy array to tensor # No need for .float() here as I am using transform above
frame_transformed = transformations(tensor_frame) # Apply transformations
frames.append(frame_transformed)
current_frame += 1
cap.release()
if len(frames) < duration_frames:
# If fewer frames than desired, repeat the last frame
frames += [frames[-1]] * (duration_frames - len(frames))
elif len(frames) > duration_frames:
# If more frames than desired, sample evenly from the extracted frames
indices = torch.linspace(0, len(frames) - 1, duration_frames, dtype=torch.int)
frames = [frames[i] for i in indices]
return frames
class VideoSignLanguageDataset(Dataset):
def __init__(self, df, root):
self.data_frame = df
# self.transform = transform
self.root_path = root
def __len__(self):
return len(self.data_frame)
def __getitem__(self, idx):
metadata = self.data_frame.iloc[idx]
bbox = metadata['bbox']
fps = metadata['fps']
frame_end = metadata['frame_end']
frame_start = metadata['frame_start']
split = metadata['split']
video_id = metadata['video_id']
gloss = metadata['gloss']
# label = self.data_frame.iloc[idx, -1]
label = metadata['labels']
video_path = f'{self.root_path}/{video_id}.mp4'
frames = preprocess_videos(bbox, fps, frame_end, frame_start, video_path)
# label = torch.tensor(label).long() ## Both lines are almost same
label = torch.tensor(label, dtype=torch.int64)
if frames:
# .stack() RULE: Need to ensure all frames are of the same shape and tensor type for stacking
tensor_frames = torch.stack(frames)
# if self.transform:
# frames = self.transform(frames)
# print("tf_type = ", tensor_frames.dtype)
# print("tf_size = ", tensor_frames.size())
return tensor_frames, label
# Custom collate function to remove None values
def custom_collate_fn(batch):
batch = list(filter(lambda x: x is not None, batch)) # Filter out None values
if not batch:
# Return None or a custom signal that indicates an empty batch
return None ## This return None is not very good, I have to handle this better so it returns something default_collate() can work with.
return torch.utils.data.dataloader.default_collate(batch)
if __name__ == '__main__':
print(torch.__version__)
print(torch.cuda.get_device_name(0))
DEVICE = torch.device(
"cuda:0" if torch.cuda.is_available() else "cpu") ## No need for cuda:0 here as it only has one GPU and 0 is default
print(DEVICE)
tot_mem = torch.cuda.get_device_properties(DEVICE).total_memory
resv_mem = torch.cuda.memory_reserved(DEVICE)
alloc_mem = torch.cuda.memory_allocated(DEVICE)
print(tot_mem, resv_mem, alloc_mem, sep="/")
# Set RNGs to same values every time including CUDA operations
seed = 10
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
# torch.cuda.manual_seed_all(seed) ## For multiple GPUs
# Load the metadata from the JSON file
with open('WLASL_v0.3.json', 'r') as file:
metadata = json.load(file)
folder_path = r"C:Users2811rOneDriveDesktoparchive"
ROOT = folder_path + "/videos"
file_extension = ".mp4"
# Define a function to check if the file exists
def check_file_exists(row):
file_name = f"{row['video_id']}{file_extension}"
file_path = os.path.join(ROOT, file_name)
return os.path.exists(file_path)
labels = {'book': 0, 'drink': 1, 'computer': 2, 'before': 3, 'chair': 4, 'go': 5}
len_labels = len(labels)
print(len_labels)
# Dataframe -start
df = pd.DataFrame()
for header in metadata:
action = header['gloss']
if action in labels.keys():
temp = pd.json_normalize(header, record_path=['instances'], meta=['gloss']).drop(
['source', 'variation_id', 'url', 'signer_id', 'instance_id'], axis=1)
temp['labels'] = labels[action]
# Remove rows that has missing video ids
mask = temp.apply(check_file_exists, axis=1) ## axis 1 is row
temp = temp[mask]
# Concat the dataframes
df = pd.concat([df, temp])
print(df) # with truncation
# print(df.to_string()) # This is for displaying entire dataframe without truncation
df.to_pickle("./final_dataframe.pkl")
# BATCH_SIZE = 32
# training_dataset = VideoSignLanguageDataset(df=df, root=ROOT)
# train_dataloader = DataLoader(training_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4,
# collate_fn=custom_collate_fn)
#
#
# for X, y in train_dataloader:
# print('X :::: ', X)
# print('y :::: ', y)
# break
# print(y_batch)
# unique_tensor = torch.unique(y_batch, return_counts=True)
# print(unique_tensor, len(unique_tensor))
This is code for the model.
# IMPORTS
import torch
from torchvision import models
import torch.nn as nn
####
class ConvLSTMNet(nn.Module):
def __init__(self, input_channels, output_classes):
super().__init__()
self.resnet_cnn_model = models.resnet18(weights='ResNet18_Weights.DEFAULT')
# Replace the final fully connected layer of ResNet with an Identity layer
# This is because we only want to use ResNet as a feature extractor
self.resnet_cnn_model.fc = nn.Identity()
# The output size from ResNet will be [batch_size, 512, 1, 1]
# self.input_size = 512 # Based on ResNet
# # This snippet is for programmatically determining the cnn output to serve as input for LSTM
dummy_input = torch.randn(1, input_channels, 100, 100) ## [batch_size, channels, height, width]
dummy_output = self.resnet_cnn_model(dummy_input)
cnn_output = dummy_output.view(dummy_output.size(0), -1).size(1) # This would give 512 as its output
# LSTM Parameters
self.output_classes = output_classes # number of output classes # 27 is num of actions available
self.num_layers = 2 # number of stacked layers of LSTM
#self.input_size = 512
self.input_size = cnn_output
self.hidden_size = 1024 # Arbitrary
# self.seq_length = 60 # Frames length ## Maybe use dummy_output.size(1)
self.LSTM = nn.LSTM(input_size=self.input_size, hidden_size=self.hidden_size, num_layers=self.num_layers, batch_first=True)
# Maybe add few more dense layers to improve accuracy
self.fc_1 = nn.Linear(in_features=self.hidden_size, out_features=512)
self.relu_1 = nn.ReLU()
self.fc_2 = nn.Linear(in_features=512, out_features=256)
self.relu_2 = nn.ReLU()
self.fc_3 = nn.Linear(in_features=256, out_features=128)
self.relu_3 = nn.ReLU()
self.fc_4 = nn.Linear(in_features=128, out_features=64)
self.relu_4 = nn.ReLU()
self.fc_5 = nn.Linear(in_features=64, out_features=self.output_classes)
def forward(self, x):
# print(f"Original X shape: {x.shape}") # Debugging
batch_size, seq_length, c, h, w = x.size()
x = x.view(-1, c, h, w) ## Converting 5D tensor to 4D as CNN expects
# print(f"Reshaped X for ResNet shape: {x.shape}") # Debugging
x = self.resnet_cnn_model(x) ## Feeding into ResNet CNN
# print(f"Output from ResNet shape: {x.shape}") # Debugging
# Reshape CNN output for LSTM input
x = x.view(batch_size, seq_length, -1) ## [batch_size, frames_of_video, features]
# print(f"Reshaped X for LSTM shape: {x.shape}") # Debugging
# Single Video Processing
outputs = []
for i in range(batch_size):
# Extract the i-th video in the batch
video_seq = x[i].unsqueeze(0)
# Reset hidden states for each batch
h_0 = torch.zeros(self.num_layers, 1, self.hidden_size).to(x.device)
c_0 = torch.zeros(self.num_layers, 1, self.hidden_size).to(x.device)
lstm_out, (hn, cn) = self.LSTM(video_seq, (h_0, c_0)) ## Feed into LSTM
# Get the output from the last time step
last_time_step_output = lstm_out[:, -1, :]
outputs.append(last_time_step_output)
x = torch.cat(outputs, dim=0)
# x = lstm_out[:, -1, :]
# Feed LSTM 'extracted' output to the following dense layers
x = self.fc_1(x)
x = self.relu_1(x)
x = self.fc_2(x)
x = self.relu_2(x)
x = self.fc_3(x)
x = self.relu_3(x)
x = self.fc_4(x)
x = self.relu_4(x)
x = self.fc_5(x)
return x
if __name__ == '__main__':
model = ConvLSTMNet(input_channels=3, output_classes=3)
print(model)
This is training script.
import os
from torch.cuda.amp import GradScaler, autocast
import model
import data_preprocess
from data_preprocess import VideoSignLanguageDataset
import torch
from model import ConvLSTMNet
from torchvision import models
import torch.nn as nn
import time
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import v2
if __name__ == "__main__":
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
print(torch.__version__)
print(torch.cuda.get_device_name(0))
DEVICE = torch.device(
"cuda:0" if torch.cuda.is_available() else "cpu") ## No need for cuda:0 here as it only has one GPU and 0 is default
print(DEVICE)
tot_mem = torch.cuda.get_device_properties(DEVICE).total_memory
resv_mem = torch.cuda.memory_reserved(DEVICE)
alloc_mem = torch.cuda.memory_allocated(DEVICE)
print(tot_mem, resv_mem, alloc_mem, sep="/")
# Set RNGs to same values every time including CUDA operations
seed = 10
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
# torch.cuda.manual_seed_all(seed) ## For multiple GPUs
# Creating instance of model
input_channels = 3 # RGB
output_classes = 6 # ASL # 27 is num of actions available in the dataset
conv_lstm_model = ConvLSTMNet(input_channels, output_classes).to(DEVICE)
print(conv_lstm_model)
print(next(conv_lstm_model.parameters()).device)
# Loss function and Optimizer
scaler = GradScaler()
LRN_RATE = 0.001
loss_function = nn.CrossEntropyLoss() # This loss function itself does LogSoftmax (+ NLLLoss)
g_descent_optimizer = torch.optim.Adam(conv_lstm_model.parameters(),
lr=LRN_RATE) ##Adam is a type of gradient descent
# Training Model
overall_train_losses = []
overall_train_accuracy = []
# test_losses = []
# test_correct = []
folder_path = r"C:Users2811rOneDriveDesktoparchive"
ROOT = folder_path + "/videos"
file_extension = ".mp4"
df = pd.read_pickle("final_dataframe.pkl")
BATCH_SIZE = 15
training_dataset = VideoSignLanguageDataset(df=df, root=ROOT)
train_dataloader = DataLoader(training_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=3,
collate_fn=data_preprocess.custom_collate_fn)
# Train Start
EPOCHS = 50
total_dataset = len(train_dataloader.dataset)
print("Total Dataset: ", total_dataset)
print("[INFO] Training the network...")
start_time = time.time()
for e in range(EPOCHS):
single_epoch_st_time = time.time()
conv_lstm_model.train() ## Set model to train mode
# These variables are for entire dataset once (all batches)
total_train_loss = 0
total_train_correct = 0
for X_batch, y_batch in train_dataloader:
# Push Data Tensors to the GPU
(X_batch, y_batch) = X_batch.to(DEVICE), y_batch.to(DEVICE)
g_descent_optimizer.zero_grad()
with autocast(): ## Maybe no need to write arguments here; could just leave ()
pred = conv_lstm_model(X_batch) ## Predicted values of y
loss = loss_function(pred, y_batch)
# g_descent_optimizer.zero_grad()
scaler.scale(loss).backward()
# loss.backward()
scaler.step(g_descent_optimizer)
# g_descent_optimizer.step()
scaler.update()
total_train_loss += loss.item()
total_train_correct += (pred.argmax(1) == y_batch).type(torch.float).sum().item()
## For one epoch
avg_train_accuracy = total_train_correct / len(train_dataloader.dataset)
avg_train_loss = total_train_loss / len(train_dataloader)
overall_train_accuracy.append(avg_train_accuracy)
overall_train_losses.append(avg_train_loss)
print("[INFO] EPOCH: {}/{}".format(e + 1, EPOCHS))
print("Train loss: {:.6f}, Train accuracy: {:.4f}".format(avg_train_loss, avg_train_accuracy))
print(f'{total_train_correct}/{len(train_dataloader.dataset)}')
print("Time took for one epoch: ", (time.time() - single_epoch_st_time) / 60)
end_time = time.time()
total_time = end_time - start_time
print(f'Total Training Time: {total_time / 60} Minutes.')
torch.save(conv_lstm_model.state_dict(), './model_weights.pth')
I am currently classifying 6 sign gestures. Each video is 30 fps. My total dataset has 73 videos.
[(‘book’, 40), (‘drink’, 35), (‘computer’, 30), (‘before’, 26), (‘chair’, 26), (‘go’, 26)]
This list shows the gesture name and the num. of videos available in the dataset corresponding to that gesture.
I tried training the model with different parameters but the training accuracy for 10 epochs never increased above 10%. I also tweaked around the video dimensions, video transformations, etc. I tried without resetting the hidden states for each batch, with resetting the hidden states for each batch, and (currently) resetting the hidden states for each video in a batch.
This is the photo for the accuracies during training for a few epochs (I stopped the training midway because the accuracy keeps bouncing around the same without increasing)
Accuracies Photo
Dheekshith Manohar is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.