I am not highly acquainted with AI/ML. I was trying to make an AI project using IP102 dataset and CNN-KAN model. The model is relatively new. When I start running the project code the jupyter notebook showing that the code is running but the training is not starting and it doesn’t show any result. It feels as if the code is running on Loop. Can anybody help me ?
The link of the dataset:
https://www.kaggle.com/datasets/rtlmhjbn/ip02-dataset
The code is given below:
import torch
torch.cuda.empty_cache()
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
import math
import matplotlib.pyplot as plt
from torchsummary import summary
import pandas as pd
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from torch.amp import autocast, GradScaler
# Define KANLinear class
class KANLinear(torch.nn.Module):
def __init__(
self,
in_features,
out_features,
grid_size=5,
spline_order=3,
scale_noise=0.1,
scale_base=1.0,
scale_spline=1.0,
enable_standalone_scale_spline=True,
base_activation=torch.nn.SiLU,
grid_eps=0.02,
grid_range=[-1, 1],
):
super(KANLinear, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.grid_size = grid_size
self.spline_order = spline_order
h = (grid_range[1] - grid_range[0]) / grid_size
grid = (
(
torch.arange(-spline_order, grid_size + spline_order + 1) * h
+ grid_range[0]
)
.expand(in_features, -1)
.contiguous()
)
self.register_buffer("grid", grid)
self.base_weight = torch.nn.Parameter(torch.Tensor(out_features, in_features))
self.spline_weight = torch.nn.Parameter(
torch.Tensor(out_features, in_features, grid_size + spline_order)
)
if enable_standalone_scale_spline:
self.spline_scaler = torch.nn.Parameter(
torch.Tensor(out_features, in_features)
)
self.scale_noise = scale_noise
self.scale_base = scale_base
self.scale_spline = scale_spline
self.enable_standalone_scale_spline = enable_standalone_scale_spline
self.base_activation = base_activation()
self.grid_eps = grid_eps
self.reset_parameters()
def reset_parameters(self):
torch.nn.init.kaiming_uniform_(self.base_weight, a=math.sqrt(5) * self.scale_base)
with torch.no_grad():
noise = (
(
torch.rand(self.grid_size + 1, self.in_features, self.out_features)
- 1 / 2
)
* self.scale_noise
/ self.grid_size
)
self.spline_weight.data.copy_(
(self.scale_spline if not self.enable_standalone_scale_spline else 1.0)
* self.curve2coeff(
self.grid.T[self.spline_order : -self.spline_order],
noise,
)
)
if self.enable_standalone_scale_spline:
# torch.nn.init.constant_(self.spline_scaler, self.scale_spline)
torch.nn.init.kaiming_uniform_(self.spline_scaler, a=math.sqrt(5) * self.scale_spline)
def b_splines(self, x: torch.Tensor):
"""
Compute the B-spline bases for the given input tensor.
Args:
x (torch.Tensor): Input tensor of shape (batch_size, in_features).
Returns:
torch.Tensor: B-spline bases tensor of shape (batch_size, in_features, grid_size + spline_order).
"""
assert x.dim() == 2 and x.size(1) == self.in_features
grid: torch.Tensor = (
self.grid
) # (in_features, grid_size + 2 * spline_order + 1)
x = x.unsqueeze(-1)
bases = ((x >= grid[:, :-1]) & (x < grid[:, 1:])).to(x.dtype)
for k in range(1, self.spline_order + 1):
bases = (
(x - grid[:, : -(k + 1)])
/ (grid[:, k:-1] - grid[:, : -(k + 1)])
* bases[:, :, :-1]
) + (
(grid[:, k + 1 :] - x)
/ (grid[:, k + 1 :] - grid[:, 1:(-k)])
* bases[:, :, 1:]
)
assert bases.size() == (
x.size(0),
self.in_features,
self.grid_size + self.spline_order,
)
return bases.contiguous()
def curve2coeff(self, x: torch.Tensor, y: torch.Tensor):
"""
Compute the coefficients of the curve that interpolates the given points.
Args:
x (torch.Tensor): Input tensor of shape (batch_size, in_features).
y (torch.Tensor): Output tensor of shape (batch_size, in_features, out_features).
Returns:
torch.Tensor: Coefficients tensor of shape (out_features, in_features, grid_size + spline_order).
"""
assert x.dim() == 2 and x.size(1) == self.in_features
assert y.size() == (x.size(0), self.in_features, self.out_features)
A = self.b_splines(x).transpose(
0, 1
) # (in_features, batch_size, grid_size + spline_order)
B = y.transpose(0, 1) # (in_features, batch_size, out_features)
solution = torch.linalg.lstsq(
A, B
).solution # (in_features, grid_size + spline_order, out_features)
result = solution.permute(
2, 0, 1
) # (out_features, in_features, grid_size + spline_order)
assert result.size() == (
self.out_features,
self.in_features,
self.grid_size + self.spline_order,
)
return result.contiguous()
@property
def scaled_spline_weight(self):
return self.spline_weight * (
self.spline_scaler.unsqueeze(-1)
if self.enable_standalone_scale_spline
else 1.0
)
def forward(self, x: torch.Tensor):
assert x.dim() == 2 and x.size(1) == self.in_features
base_output = F.linear(self.base_activation(x), self.base_weight)
spline_output = F.linear(
self.b_splines(x).view(x.size(0), -1),
self.scaled_spline_weight.view(self.out_features, -1),
)
return base_output + spline_output
@torch.no_grad()
def update_grid(self, x: torch.Tensor, margin=0.01):
assert x.dim() == 2 and x.size(1) == self.in_features
batch = x.size(0)
splines = self.b_splines(x) # (batch, in, coeff)
splines = splines.permute(1, 0, 2) # (in, batch, coeff)
orig_coeff = self.scaled_spline_weight # (out, in, coeff)
orig_coeff = orig_coeff.permute(1, 2, 0) # (in, coeff, out)
unreduced_spline_output = torch.bmm(splines, orig_coeff) # (in, batch, out)
unreduced_spline_output = unreduced_spline_output.permute(
1, 0, 2
) # (batch, in, out)
# sort each channel individually to collect data distribution
x_sorted = torch.sort(x, dim=0)[0]
grid_adaptive = x_sorted[
torch.linspace(
0, batch - 1, self.grid_size + 1, dtype=torch.int64, device=x.device
)
]
uniform_step = (x_sorted[-1] - x_sorted[0] + 2 * margin) / self.grid_size
grid_uniform = (
torch.arange(
self.grid_size + 1, dtype=torch.float32, device=x.device
).unsqueeze(1)
* uniform_step
+ x_sorted[0]
- margin
)
grid = self.grid_eps * grid_uniform + (1 - self.grid_eps) * grid_adaptive
grid = torch.concatenate(
[
grid[:1]
- uniform_step
* torch.arange(self.spline_order, 0, -1, device=x.device).unsqueeze(1),
grid,
grid[-1:]
+ uniform_step
* torch.arange(1, self.spline_order + 1, device=x.device).unsqueeze(1),
],
dim=0,
)
self.grid.copy_(grid.T)
self.spline_weight.data.copy_(self.curve2coeff(x, unreduced_spline_output))
def regularization_loss(self, regularize_activation=1.0, regularize_entropy=1.0):
"""
Compute the regularization loss.
This is a dumb simulation of the original L1 regularization as stated in the
paper, since the original one requires computing absolutes and entropy from the
expanded (batch, in_features, out_features) intermediate tensor, which is hidden
behind the F.linear function if we want an memory efficient implementation.
The L1 regularization is now computed as mean absolute value of the spline
weights. The authors implementation also includes this term in addition to the
sample-based regularization.
"""
l1_fake = self.spline_weight.abs().mean(-1)
regularization_loss_activation = l1_fake.sum()
p = l1_fake / regularization_loss_activation
regularization_loss_entropy = -torch.sum(p * p.log())
return (
regularize_activation * regularization_loss_activation
+ regularize_entropy * regularization_loss_entropy
)
# CNN-KAN model for IP102 dataset
class CNNKAN(nn.Module):
def __init__(self, num_classes):
super(CNNKAN, self).__init__()
self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
self.pool1 = nn.MaxPool2d(2)
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
self.pool2 = nn.MaxPool2d(2)
self.kan1 = KANLinear(64 * 56 * 56, 256) # Adjust input size to fit 224x224 images
self.kan2 = KANLinear(256, num_classes)
def forward(self, x):
x = F.selu(self.conv1(x))
x = self.pool1(x)
x = F.selu(self.conv2(x))
x = self.pool2(x)
x = x.reshape(x.size(0), -1)
x = self.kan1(x)
x = self.kan2(x)
return x
# Load class labels (from your IP102 dataset)
f = open('/kaggle/input/ip02-dataset/classes.txt')
label = []
name = []
for line in f.readlines():
label.append(int(line.split()[0]))
name.append(' '.join(line.split()[1:]))
classes = pd.DataFrame([label, name]).T
classes.columns = ['label', 'name']
# Load train, validation, and test data
train_df = pd.read_csv('/kaggle/input/ip02-dataset/train.txt', sep=' ', header=None, engine='python')
train_df.columns = ['image_path', 'label']
test_df = pd.read_csv('../input/ip02-dataset/test.txt', sep=' ', header=None, engine='python')
test_df.columns = ['image_path', 'label']
val_df = pd.read_csv('../input/ip02-dataset/val.txt', sep=' ', header=None, engine='python')
val_df.columns = ['image_path', 'label']
# Filter specific classes
choose_label_index = (15, 22, 24, 39, 50, 51, 58, 67, 70, 101)
class_filter = classes.loc[classes['label'].isin(choose_label_index)]
train_df_img = train_df[train_df['label'].isin(choose_label_index)].copy().reset_index(drop=True)
val_df_img = val_df[val_df['label'].isin(choose_label_index)].copy().reset_index(drop=True)
test_df_img = test_df[test_df['label'].isin(choose_label_index)].copy().reset_index(drop=True)
TRAIN_DIR = '../input/ip02-dataset/classification/train'
TEST_DIR = '../input/ip02-dataset/classification/test'
VAL_DIR = '../input/ip02-dataset/classification/val'
train_df_img['image_fullpath'] = TRAIN_DIR + "/" + train_df_img['label'].astype(str) + "/" + train_df_img['image_path']
val_df_img['image_fullpath'] = VAL_DIR + "/" + val_df_img['label'].astype(str) + "/" + val_df_img['image_path']
test_df_img['image_fullpath'] = TEST_DIR + "/" + test_df_img['label'].astype(str) + "/" + test_df_img['image_path']
train_df_img['label'] = train_df_img['label'].astype(str)
val_df_img['label'] = val_df_img['label'].astype(str)
test_df_img['label'] = test_df_img['label'].astype(str)
# Parameters
IMAGE_SIZE = 224
BATCH_SIZE = 16
EPOCHS = 50
NUM_CLASSES = len(choose_label_index) # Number of classes
# Create data generators for training, validation, and test sets
train_datagen = ImageDataGenerator(
rescale=1/255,
width_shift_range=0.2,
height_shift_range=0.2,
horizontal_flip=True,
zoom_range=[.8, 1],
channel_shift_range=30,
fill_mode='reflect'
)
valid_datagen = ImageDataGenerator(rescale=1/255)
train_gen = train_datagen.flow_from_dataframe(
dataframe=train_df_img,
x_col='image_fullpath',
y_col='label',
target_size=(IMAGE_SIZE, IMAGE_SIZE),
class_mode='categorical',
batch_size=BATCH_SIZE,
shuffle=True
)
valid_gen = valid_datagen.flow_from_dataframe(
dataframe=val_df_img,
x_col='image_fullpath',
y_col='label',
target_size=(IMAGE_SIZE, IMAGE_SIZE),
class_mode='categorical',
batch_size=BATCH_SIZE,
shuffle=False
)
# Manually track the size of your dataset if using a generator
def get_dataset_length(generator):
return sum(1 for _ in generator)
# Convert TensorFlow data generator into PyTorch-compatible data loader
def generator_to_loader(generator):
for data_batch in generator:
x_batch = torch.Tensor(data_batch[0]).permute(0, 3, 1, 2) # (batch_size, channels, height, width)
y_batch = torch.Tensor(data_batch[1])
yield x_batch, y_batch
# Create CNN-KAN model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNNKAN(num_classes=NUM_CLASSES).to(device)
# Optimizer and loss function
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-3)
loss_function = nn.CrossEntropyLoss()
accumulation_steps = 4 # Accumulate gradients over this many batches
scaler = GradScaler() # For mixed precision training
# Training function
def train_model(model, device, train_loader, optimizer, epoch, dataset_size):
model.train()
optimizer.zero_grad() # Initialize optimizer's gradients
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
# Using mixed precision
with autocast('cuda'):
output = model(data)
loss = loss_function(output, target)
# Scale the loss for mixed precision
scaler.scale(loss).backward()
# Perform the optimizer step after backward pass
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad() # Reset gradients after updating
# Print the progress
if batch_idx % 10 == 0:
processed_samples = batch_idx * len(data)
print(f'Epoch: {epoch} [{processed_samples}/{dataset_size}]tLoss: {loss.item():.6f}')
# Evaluation function
def evaluate_model(model, device, val_loader):
model.eval()
val_loss = 0
correct = 0
with torch.no_grad():
for data, target in val_loader:
data, target = data.to(device), target.to(device)
output = model(data)
val_loss += loss_function(output, target).item() # Sum up batch loss
pred = output.argmax(dim=1, keepdim=True) # Get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
val_loss /= len(val_loader)
print(f'Validation loss: {val_loss:.4f}, Accuracy: {correct}/{len(val_loader)} ({100. * correct / len(val_loader):.2f}%)n')
# Get dataset length from the generator
dataset_size = get_dataset_length(train_gen)
# Training and evaluating the model
train_loader = generator_to_loader(train_gen)
val_loader = generator_to_loader(valid_gen)
# Main training loop
for epoch in range(1, EPOCHS + 1):
train_model(model, device, train_loader, optimizer, epoch, dataset_size)
evaluate_model(model, device, val_loader)
# Save model
torch.save(model.state_dict(), 'cnn_kan_ip102.pth')
I started the training. I expected that CNN-KAN will start training but feel like it is stuck on a loop.
PIAL GHOSH is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.