I’m using Keras 3/TensorFlow 2.16.1 and would like to implement this example (https://keras.io/examples/vision/semisupervised_simclr/) for semi-supervised image classification using contrastive pretraining for 3D-data instead of 2D-images.
After adjusting the code from https://keras.io/examples/vision/semisupervised_simclr/ I get this error-message:
OperatorNotAllowedInGraphError: Iterating over a symbolic tf.Tensor
is not allowed. You can attempt the following resolutions to the problem: If you are running in Graph mode, use Eager execution mode or decorate this function with @tf.function. If you are using AutoGraph, you can try decorating this function with @tf.function. If that does not work, then you may be using an unsupported feature or your source code may not be visible to AutoGraph. See https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/autograph/g3doc/reference/limitations.md#access-to-source-code for more information.
This is my mini-code-example:
import numpy as np
import tensorflow as tf
import tensorflow.keras
from tensorflow.keras import layers, utils
import math
dataset_list = []
for i in range(0,100):
array = np.random.random((10,10,10))
dataset_list.append(array)
data = np.asarray(dataset_list)
label = np.random.randint(2, size=100)
def prepare_dataset(data, labels, batch_size):
dataset_size = len(data)
steps_per_epoch = dataset_size // batch_size
train_size = int(0.8 * dataset_size)
train_data, train_labels = data[:train_size], labels[:train_size]
test_data, test_labels = data[train_size:], labels[train_size:]
labeled_train_dataset = tf.data.Dataset.from_tensor_slices((train_data, train_labels))
labeled_train_dataset = labeled_train_dataset.shuffle(buffer_size=dataset_size).batch(batch_size)
unlabeled_train_dataset = tf.data.Dataset.from_tensor_slices(train_data)
unlabeled_train_dataset = unlabeled_train_dataset.batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_data, test_labels))
test_dataset = test_dataset.batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE)
train_dataset = tf.data.Dataset.zip(
(unlabeled_train_dataset, labeled_train_dataset)
).prefetch(buffer_size=tf.data.AUTOTUNE)
return train_dataset, labeled_train_dataset, test_dataset
data = data.reshape(data.shape[0], data.shape[1], data.shape[2], data.shape[3], 1) # Adding a 1 for 1 Channel
train_dataset, labeled_train_dataset, test_dataset = prepare_dataset(data, label, 30)
def get_augmenter(min_area):
zoom_factor = 1.0 - math.sqrt(min_area)
return tf.keras.Sequential(
[
layers.Rescaling(1 / 255),
layers.RandomFlip("horizontal"),
layers.RandomTranslation(zoom_factor / 2, zoom_factor / 2),
layers.RandomZoom((-zoom_factor, 0.0), (-zoom_factor, 0.0)),
RandomGeometricTransform(),
]
)
def get_encoder():
return tensorflow.keras.Sequential(
[
layers.Conv3D(16, kernel_size=(3, 3, 3), activation="relu"),
layers.MaxPooling3D(pool_size=(2, 2, 2)),
layers.Conv3D(32, kernel_size=(3, 3, 3), activation="relu"),
layers.MaxPooling3D(pool_size=(2, 2, 2)),
layers.Dropout(0.5),
layers.Flatten(),
layers.Dense(128, activation="relu"),
layers.Dropout(0.5),
layers.Dense(1, activation="sigmoid"),
],
name="encoder",
)
baseline_model = tensorflow.keras.Sequential(
[
get_encoder(),
],
name="baseline_model",
)
baseline_model.compile(
optimizer=tensorflow.keras.optimizers.Adam(learning_rate= 0.0001),
loss="binary_crossentropy",
metrics=["binary_accuracy"],
)
num_epochs = 20
batch_size = 525
width = 128
temperature = 0.1
contrastive_augmentation = {"min_area": 0.25}
classification_augmentation = {
"min_area": 0.75,
}
class RandomGeometricTransform(layers.Layer):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.seed_generator = tf.random.experimental.Generator.from_seed(1337)
def get_config(self):
config = super().get_config()
return config
def call(self, images, training=True):
if training:
images = tf.image.random_flip_left_right(images, seed=self.seed_generator.make_seeds(2)[0][0])
return images
def compute_output_shape(self, input_shape):
return input_shape
class ContrastiveModel(tensorflow.keras.Model):
def __init__(self):
super().__init__()
self.temperature = temperature
self.contrastive_augmenter = get_augmenter(**contrastive_augmentation)
self.classification_augmenter = get_augmenter(**classification_augmentation)
self.encoder = get_encoder()
self.projection_head = tensorflow.keras.Sequential(
[
tensorflow.keras.Input(shape=(width,)),
layers.Dense(width, activation="relu"),
layers.Dense(width),
],
name="projection_head",
)
# Single dense layer for linear probing
self.linear_probe = tensorflow.keras.Sequential(
[layers.Input(shape=(width,)), layers.Dense(10)],
name="linear_probe",
)
def compile(self, contrastive_optimizer, probe_optimizer, **kwargs):
super().compile(**kwargs)
self.contrastive_optimizer = contrastive_optimizer
self.probe_optimizer = probe_optimizer
self.probe_loss = tensorflow.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
self.contrastive_loss_tracker = tensorflow.keras.metrics.Mean(name="c_loss")
self.contrastive_accuracy = tensorflow.keras.metrics.SparseCategoricalAccuracy(
name="c_acc"
)
self.probe_loss_tracker = tensorflow.keras.metrics.Mean(name="p_loss")
self.probe_accuracy = tensorflow.keras.metrics.SparseCategoricalAccuracy(name="p_acc")
@property
def metrics(self):
return [
self.contrastive_loss_tracker,
self.contrastive_accuracy,
self.probe_loss_tracker,
self.probe_accuracy,
]
def contrastive_loss(self, projections_1, projections_2):
projections_1 = ops.normalize(projections_1, axis=1)
projections_2 = ops.normalize(projections_2, axis=1)
similarities = (
ops.matmul(projections_1, ops.transpose(projections_2)) / self.temperature
)
batch_size = ops.shape(projections_1)[0]
contrastive_labels = ops.arange(batch_size)
self.contrastive_accuracy.update_state(contrastive_labels, similarities)
self.contrastive_accuracy.update_state(
contrastive_labels, ops.transpose(similarities)
)
loss_1_2 = tensorflow.keras.losses.sparse_categorical_crossentropy(
contrastive_labels, similarities, from_logits=True
)
loss_2_1 = tensorflow.keras.losses.sparse_categorical_crossentropy(
contrastive_labels, ops.transpose(similarities), from_logits=True
)
return (loss_1_2 + loss_2_1) / 2
def train_step(self, data):
(unlabeled_images, _), (labeled_images, labels) = data
# Both labeled and unlabeled images are used, without labels
images = ops.concatenate((unlabeled_images, labeled_images), axis=0)
# Each image is augmented twice, differently
augmented_images_1 = self.contrastive_augmenter(images, training=True)
augmented_images_2 = self.contrastive_augmenter(images, training=True)
with tf.GradientTape() as tape:
features_1 = self.encoder(augmented_images_1, training=True)
features_2 = self.encoder(augmented_images_2, training=True)
projections_1 = self.projection_head(features_1, training=True)
projections_2 = self.projection_head(features_2, training=True)
contrastive_loss = self.contrastive_loss(projections_1, projections_2)
gradients = tape.gradient(
contrastive_loss,
self.encoder.trainable_weights + self.projection_head.trainable_weights,
)
self.contrastive_optimizer.apply_gradients(
zip(
gradients,
self.encoder.trainable_weights + self.projection_head.trainable_weights,
)
)
self.contrastive_loss_tracker.update_state(contrastive_loss)
preprocessed_images = self.classification_augmenter(
labeled_images, training=True
)
with tf.GradientTape() as tape:
features = self.encoder(preprocessed_images, training=False)
class_logits = self.linear_probe(features, training=True)
probe_loss = self.probe_loss(labels, class_logits)
gradients = tape.gradient(probe_loss, self.linear_probe.trainable_weights)
self.probe_optimizer.apply_gradients(
zip(gradients, self.linear_probe.trainable_weights)
)
self.probe_loss_tracker.update_state(probe_loss)
self.probe_accuracy.update_state(labels, class_logits)
def test_step(self, data):
labeled_images, labels = data
preprocessed_images = self.classification_augmenter(
labeled_images, training=False
)
features = self.encoder(preprocessed_images, training=False)
class_logits = self.linear_probe(features, training=False)
probe_loss = self.probe_loss(labels, class_logits)
self.probe_loss_tracker.update_state(probe_loss)
self.probe_accuracy.update_state(labels, class_logits)
pretraining_model = ContrastiveModel()
pretraining_model.compile(
contrastive_optimizer=tensorflow.keras.optimizers.Adam(),
probe_optimizer=tensorflow.keras.optimizers.Adam(),
)
pretraining_history = pretraining_model.fit(
labeled_train_dataset, epochs=num_epochs, validation_data=test_dataset
)
Do you have any idea, how to fix this? I would be very grateful for that!