I am trying to convert this PyTorch implementation of Capsule-Forensics to TensorFlow. I thought I succeeded in converting the model when I was able to compile the model and view its summary, as shown below.
Model: "model_CapsuleForensics-CustomVGG16_S_UNIWARD_04bpp"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
input (InputLayer) [(None, 256, 256, 1)] 0
_________________________________________________________________
conv0 (Conv2D) (None, 256, 256, 1) 26
_________________________________________________________________
vgg16_block1_conv1 (Conv2D) (None, 256, 256, 64) 640
_________________________________________________________________
vgg16_block1_conv2 (Conv2D) (None, 256, 256, 64) 36928
_________________________________________________________________
vgg16_block1_pool (MaxPoolin (None, 128, 128, 64) 0
_________________________________________________________________
vgg16_block2_conv1 (Conv2D) (None, 128, 128, 128) 73856
_________________________________________________________________
vgg16_block2_conv2 (Conv2D) (None, 128, 128, 128) 147584
_________________________________________________________________
vgg16_block2_pool (MaxPoolin (None, 64, 64, 128) 0
_________________________________________________________________
vgg16_block3_conv1 (Conv2D) (None, 64, 64, 256) 295168
_________________________________________________________________
vgg16_block3_conv2 (Conv2D) (None, 64, 64, 256) 590080
_________________________________________________________________
vgg16_block3_conv3 (Conv2D) (None, 64, 64, 256) 590080
_________________________________________________________________
vgg16_block3_pool (MaxPoolin (None, 32, 32, 256) 0
_________________________________________________________________
primary_caps (PrimaryCaps) (None, 10, 8) 1584210
_________________________________________________________________
class_caps (ClassCaps) (None, 2, 4) 640
_________________________________________________________________
y (Lambda) (None, 2) 0
=================================================================
Total params: 3,319,212
Trainable params: 1,583,070
Non-trainable params: 1,736,142
_________________________________________________________________
However, I was met with an error when I attempted to train it:
ValueError: Dimensions must be equal, but are 32 and 4 for '{{node margin_loss/mul}} = Mul[T=DT_FLOAT](IteratorGetNext:1, margin_loss/Square)' with input shapes: [32,2], [4,2].
I printed the tensors to debug and I think the problem is the reshaping layer (View class) at the end of the primary capsules, as shown below. The shape of the input to the View is (32, 1, 1), where 32 is the batch size. After reshaping to (-1, 8), the output is (4, 8). Is my implementation of the reshaping layer wrong or is the error due to the shape of the input tensor after the 1D convolutional layers? I have tried setting a fixed batch size to no avail. Maybe there is another thing I overlooked?
x reshaped in StatsNet
Tensor("model_CapsuleForensics-CustomVGG16_S_UNIWARD_04bpp/primary_caps/sequential_7/stats_net_7/Reshape:0", shape=(32, 32, 512), dtype=float32)
stats output
Tensor("model_CapsuleForensics-CustomVGG16_S_UNIWARD_04bpp/primary_caps/sequential_7/stats_net_7/stack:0", shape=(32, 2, 32), dtype=float32)
view inputs
Tensor("model_CapsuleForensics-CustomVGG16_S_UNIWARD_04bpp/primary_caps/sequential_7/batch_normalization_31/batchnorm/add_1:0", shape=(32, 1, 1), dtype=float32)
view output
Tensor("model_CapsuleForensics-CustomVGG16_S_UNIWARD_04bpp/primary_caps/sequential_7/view_7/Reshape:0", shape=(4, 8), dtype=float32)
For your reference, I have included below the View class from the original Capsule-Forensics in PyTorch, as well as my implementation of the model, including its layers, in TensorFlow.
View class of Capsule-Forensics in Pytorch:
class View(nn.Module):
def __init__(self, *shape):
super(View, self).__init__()
self.shape = shape
def forward(self, input):
return input.view(self.shape)
View class in my TensorFlow implementation:
class View(Layer):
def __init__(self, shape):
super(View, self).__init__()
self.shape = shape
def call(self, inputs):
print("nview inputs")
print(inputs)
output = tf.reshape(inputs, self.shape)
print("nview output")
print(output)
return output
StatsNet class in my TF implementation:
class StatsNet(Layer):
def __init__(self):
super(StatsNet, self).__init__()
def call(self, x):
# Reshape x to have shape [batch_size, height, width, channels]
height = x.shape[1]
width = x.shape[2]
channels = x.shape[3]
x_reshaped = tf.reshape(x, [-1, height, width * channels]) # Dynamic batch size
print("nx reshaped in StatsNet");
print(x_reshaped)
# Calculate mean and standard deviation along the last dimension
mean = tf.reduce_mean(x_reshaped, axis=-1)
std = tf.math.reduce_std(x_reshaped, axis=-1)
# Stack mean and std along a new dimension
stats = tf.stack([mean, std], axis=1)
print("nstats output");
print(stats)
return stats
Primary and class capsule layers in my TF implementation. They correspond to the original PyTorch implementation’s FeatureExtractor and RoutingLayer classes, respectively:
class PrimaryCaps(Layer):
def __init__(self, num_capsules):
super(PrimaryCaps, self).__init__()
self.num_capsules = num_capsules
def get_config(self):
config = super().get_config().copy()
config.update(
{
"num_capsules": self.num_capsules,
}
)
return config
def build(self, input):
self.capsules = [
self.create_capsule() for _ in range(self.num_capsules)
]
self.built = True
def create_capsule(self):
return Sequential([
Conv2D(64, kernel_size=3, strides=1, padding='same', kernel_initializer=initializers.RandomNormal(mean=0.0, stddev=0.02)),
BatchNormalization(),
ReLU(),
Conv2D(16, kernel_size=3, strides=1, padding='same', kernel_initializer=initializers.RandomNormal(mean=0.0, stddev=0.02)),
BatchNormalization(beta_initializer='zeros', gamma_initializer=initializers.RandomNormal(mean=1.0, stddev=0.02)),
ReLU(),
StatsNet(),
Conv1D(8, kernel_size=5, strides=2, padding='same', kernel_initializer=initializers.RandomNormal(mean=0.0, stddev=0.02)),
BatchNormalization(beta_initializer='zeros', gamma_initializer=initializers.RandomNormal(mean=1.0, stddev=0.02)),
Conv1D(1, kernel_size=3, strides=1, padding='same', kernel_initializer=initializers.RandomNormal(mean=0.0, stddev=0.02)),
BatchNormalization(beta_initializer='zeros', gamma_initializer=initializers.RandomNormal(mean=1.0, stddev=0.02)),
View(shape=(-1, 8)),
])
def squash(self, tensor, axis):
squared_norm = tf.reduce_sum(tf.square(tensor), axis=axis, keepdims=True)
scale = squared_norm / (1 + squared_norm)
return scale * tensor / tf.sqrt(squared_norm)
def call(self, inputs):
outputs = [capsule(inputs) for capsule in self.capsules]
print("noutput of primary capsules")
print(outputs)
output = tf.stack(outputs, axis=-1)
# output: [batch_size, data, in_caps]
output = tf.transpose(output, perm=[0, 2, 1])
# output: [batch_size, in_caps, data]
print("noutput of primary capsule layer")
print(output)
return self.squash(output, axis=-1)
class ClassCaps(Layer):
def __init__(self, num_input_capsules, num_output_capsules, data_in, data_out, num_iterations, dropout_rate=0.05):
super(ClassCaps, self).__init__()
self.num_iterations = num_iterations
self.route_weights = tf.Variable(tf.random.normal(shape=(num_output_capsules, num_input_capsules, data_out, data_in), stddev=0.01))
self.dropout_rate = dropout_rate
def get_config(self):
config = super().get_config().copy()
config.update(
{
"num_iterations": self.num_iterations,
"route_weights": self.route_weights,
"dropout_rate": self.dropout_rate
}
)
return config
def squash(self, tensor, axis):
squared_norm = tf.reduce_sum(tf.square(tensor), axis=axis, keepdims=True)
scale = squared_norm / (1 + squared_norm)
return scale * tensor / tf.sqrt(squared_norm)
def call(self, x, random=True):
# x: [batch_size, in_caps, data]
print("nx input")
print(x)
if random:
noise = tf.random.normal(self.route_weights.shape, stddev=0.01)
route_weights = self.route_weights + noise
else:
route_weights = self.route_weights
# route_weights: [out_caps, in_caps, data_out, data_in]
print("nroute_weights")
print(route_weights)
# priors = route_weights[:, None, :, :, :] @ x[None, :, :, :, None]
# # route_weights [out_caps , 1 , in_caps , data_out , data_in]
# # x [ 1 , b , in_caps , data_in , 1 ]
# # priors [out_caps , b , in_caps , data_out, 1 ]
# route: 2 10 4 8, should be 2 1 10 4 8
# x: b 10 8, should be 1 b 10 8 1
print("nbefore matmul")
print("route_weights times x")
print(route_weights[:, None, :, :, :], x[None, :, :, :, None])
priors = tf.matmul(route_weights[:, None, :, :, :], x[None, :, :, :, None])
# priors: [out_caps, batch_size, in_caps, data_out, 1]
print("npriors, after matmul")
print(priors)
priors = tf.transpose(priors, perm=[1, 0, 2, 3, 4])
# priors: [batch_size, out_caps, ,in_caps, data_out, 1]
print("npriors transposed")
print(priors)
if self.dropout_rate > 0.0:
drop = tf.cast(tf.random.uniform(tf.shape(priors)) > self.dropout_rate, dtype=tf.float32)
priors = priors * drop
logits = tf.zeros_like(priors)
# logits: [batch_size, out_caps, in_caps, data_out, 1]
print("nlogits")
print(logits)
for i in range(self.num_iterations):
probs = tf.nn.softmax(logits, axis=2)
outputs = self.squash(tf.reduce_sum(probs * priors, axis=2, keepdims=True), axis=3)
if i != self.num_iterations - 1:
delta_logits = priors * outputs
logits = logits + delta_logits
print("noutputs after dynamic routing")
print(outputs)
# outputs: [batch_size, out_caps, 1, data_out, 1]
outputs = tf.squeeze(outputs, [2, 4])
print("noutputs after squeeze")
print(outputs)
# if len(outputs.shape) == 3:
# outputs = tf.transpose(outputs, perm=[0, 2, 1])
# else:
# outputs = tf.expand_dims(outputs, axis=0)
# outputs = tf.transpose(outputs, perm=[0, 2, 1])
# Do not transpose
if len(outputs.shape) == 3:
pass
else:
outputs = tf.expand_dims(outputs, axis=0)
print("noutputs of class capsule")
print(outputs)
return outputs
My TF implementation of the model using a customized VGG16 for feature extraction:
def CapsuleForensics(input_shape, n_class, name="CapsuleForensics"):
tf.keras.backend.clear_session()
# --- Encoder ---
# Input
x = Input(shape=input_shape, name='input')
# Noise enhancement using HPF
conv0 = tf.keras.layers.Conv2D(1, (5,5), strides=(1,1), padding='same', activation=None, kernel_initializer=HighPassFilterInitializer(F0), trainable=False, name='conv0')(x)
# Custom VGG16
# Block 1
vgg16_block1_conv1 = Conv2D(64, (3, 3), weights=new_block1_conv1, activation='relu', padding='same', trainable=False, name='vgg16_block1_conv1', use_bias=True)(conv0)
vgg16_block1_conv2 = Conv2D(64, (3, 3), weights=vgg16_weights['vgg16_block1_conv2'], activation='relu', padding='same', trainable=False, name='vgg16_block1_conv2', use_bias=True)(vgg16_block1_conv1)
vgg16_block1_pool = MaxPooling2D((2, 2), strides=(2, 2), name='vgg16_block1_pool')(vgg16_block1_conv2)
# Block 2
vgg16_block2_conv1 = Conv2D(128, (3, 3), weights=vgg16_weights['vgg16_block2_conv1'], activation='relu', padding='same', trainable=False, name='vgg16_block2_conv1', use_bias=True)(vgg16_block1_pool)
vgg16_block2_conv2 = Conv2D(128, (3, 3), weights=vgg16_weights['vgg16_block2_conv2'], activation='relu', padding='same', trainable=False, name='vgg16_block2_conv2', use_bias=True)(vgg16_block2_conv1)
vgg16_block2_pool = MaxPooling2D((2, 2), strides=(2, 2), name='vgg16_block2_pool')(vgg16_block2_conv2)
# Block 3
vgg16_block3_conv1 = Conv2D(256, (3, 3), weights=vgg16_weights['vgg16_block3_conv1'], activation='relu', padding='same', trainable=False, name='vgg16_block3_conv1', use_bias=True)(vgg16_block2_pool)
vgg16_block3_conv2 = Conv2D(256, (3, 3), weights=vgg16_weights['vgg16_block3_conv2'], activation='relu', padding='same', trainable=False, name='vgg16_block3_conv2', use_bias=True)(vgg16_block3_conv1)
vgg16_block3_conv3 = Conv2D(256, (3, 3), weights=vgg16_weights['vgg16_block3_conv3'], activation='relu', padding='same', trainable=False, name='vgg16_block3_conv3', use_bias=True)(vgg16_block3_conv2)
vgg16_block3_pool = MaxPooling2D((2, 2), strides=(2, 2), name='vgg16_block3_pool')(vgg16_block3_conv3)
primary_caps = PrimaryCaps(num_capsules=10)(vgg16_block3_pool)
class_caps = ClassCaps(num_input_capsules=10, num_output_capsules=n_class, data_in=8, data_out=4, num_iterations=2, dropout_rate=0.05)(primary_caps)
# y = Softmax(axis=-1, name='out')(class_caps)
# Convenience layer to calculate vectors' length (from Capsnet-Keras implementation)/to compute final prediction as probabilities
y = Lambda(lambda x: tf.sqrt(tf.reduce_sum(tf.square(x), axis=-1)), name="y")(class_caps)
model = Model(inputs=[x], outputs=[y], name=name)
return model
Any input is greatly appreciated. Thank you so much.