I have around 1800 short videos. Each video is around 30 seconds. I trained a VAE to encode each frame into a latent vector (of size 200).
Using this VAE, then I created a TFrecord with an entry for every video. Each entry contains an array of size 830×200 (830 is the number of frames, and 200 is the size of the latent vector), and then also an array of just 4 elements (integers, some metadata).
Once the TFRecord is read for training, it is read into a Dataset, and this Dataset is what is sent to model.fit. To do this I use TFRecordDataset, then I map it to my function to read the examples, and then I do shuffle, prefetch and batch.
I am using now a model similar to the miniature GPT in the documentation to predict the next frame. The model works OK (ignoring the results, which are not good, but at least the data goes in and out, the loss decreases and so). But for every epoch, the model takes batches of full sequences, and predicts the next frame for every element in the batch. So in train_step if you print the input, it says something like (32,830,200). Where 32 is the batch size, and 830 the length of the sequence, and 200 the features.
What I would like is, instead of the model taking the full sequence of 830, I would like to split this sequence into small overlapping sequences.
First I tried to do this inside train_step, surrounding the usual code in train_step (gradienttape and all of that) with a for loop that would take sequences of the input. But this was painfully slow. In fact it took like 30 minutes before it even started training.
Then I tried to do this in the function that creates the Dataset from the TFRecord. Datasets seem to have the window function, which is exactly what I need. So once the Dataset was loaded/created with TFRecordDataset, and after I map my function to read examples, I would then do dataset.window(…), and then I would batch it as usual, but this was not working and the training was not started. I don’t know if the fact that each entry contains 2 arrays (the 800×200 one and the 4,1 one) was a problem with this.
So basically what I would like to do is, is to split the 800×200 sequence into overlapping sequences of size 100×200, batch them, and send them to train_step. So ideally train_step should receive tensors of size 32,100,200.
The only way I see now to do this is to write the tfrecords in this manner. But that is a waste of space since the sequences do overlap.
Any help about how to approach this would be very welcome.
You can see here the code I use to write/read the TFRecords:
class TFRecordWriter(object):
"""
Class handling writing the video data (through a VAE) to a TFRecord.
Please check TFRecordReader to see how to read it back
"""
def __init__(self, videopaths, modelpath, dillation,
step_size=1, batch_size=32, train_split=0.8):
"""Creates an object that will be used to save the data into a TFRecord
Args:
videopaths: Path to the folder where all the videos are. The videos must be
binarised, created with the script VideoUtils.py
modelpath: Path to the folder with the trained VAE (created with train_vae.py)
dillation: "normal", "reverse" or "no". Check FramesLoader for more info
step_size: How many frames to take. For example step_3 will take 1 out 3 frames
batch_size: Batch size of the data when we run it against the VAE encoder
train_split: % of data in train_set (the rest goes to test_set)
"""
self.videopaths = videopaths
self.modelpath = modelpath
self.dillation = dillation
self.step_size = step_size
self.batch_size = batch_size
self.train_split = train_split
self.vae, self.input_dim = load_vae_model(self.modelpath)
# Because the videos have different sizes, we need to break the longer ones
# into sequences of max 416, so that all the batches have the same number of elements
# therefore all the batches will be of size 416, latent_size
# this number needs to be adjusted depending on self.batch_size
self.seq_length = 416 * 2
def serialise_to_tfrecords(self):
""" Serialises all the data into one file with TFRecords."""
# get all the videos
run_name = (self.videopaths+'/*.avi')
videos = glob.glob(run_name)
random.shuffle(videos)
db_split = int(len(videos)*self.train_split)
train_set = videos[:db_split]
test_set = videos[db_split:]
for ti, tset in enumerate([train_set, test_set]):
if ti==0:
out_path = self.videopaths + "/train.tfrecord"
else:
out_path = self.videopaths + "/test.tfrecord"
writer = tf.io.TFRecordWriter(out_path)
for video in tqdm.tqdm(tset, unit='F'):
frames_latent_vectors = self.video2latentvectors(video)
vector_length = frames_latent_vectors.shape[0]
recipe = self.get_recipe(video)
# break down the latent_vectors into sequences of length seq_lentgh
for seq in range(0, vector_length, self.seq_length):
start = seq
end = seq + self.seq_length
if end > vector_length:
break
chunk = frames_latent_vectors[start:end]
# self.visualise_latent_reconstructions(frames_latent_vectors)
serialised_video = self.prepare_TFRecord(chunk, recipe)
writer.write(serialised_video)
# move file to either train or test folder
# get the folder where the file is
current_folder = os.path.dirname(video)
# get the name of the file
file_name = os.path.basename(video)
if ti == 0:
dest_folder = current_folder + "/train/"
else:
dest_folder = current_folder + "/test/"
destination = dest_folder + file_name
shutil.move(video, destination)
writer.close()
def get_recipe(self, videopath):
""" from the filename we can get the recipe. For example:
octanoic_0_pentanol_0_octanol_9_dep_90_raw_1_bin.mp4
means 0% octanoic, 0% pentaol, 9% octanol, 90% DEP"""
# get the file name only, not the full path with folders
file_name = os.path.basename(videopath)
# remove the extension
file_name = os.path.splitext(file_name)[0]
# find the first four numbers, which are the recipe
recipe = re.findall(r'd+', file_name)[:4]
# transform to int and to numpy
recipenp = np.array( [int(x) for x in recipe] )
# normalise and return
return recipenp / np.sum(recipenp)
def video2latentvectors(self, videopath):
"""
Given a video, it will use a trained vae to return the latent vectors for each frame
"""
# create dataset and load vae
ds = self.video2dataset(videopath)
# where to store the data as it is generated by the vae
vectors = []
# using vae get latent vectors
for batch in ds:
_, _, latent = self.vae.encoder(batch)
vectors.append(latent)
# this will go from (n_video, batch, latent_v) to (n_video*batch, latent_v)
return np.concatenate( np.array(vectors) )
def video2dataset(self, videopath):
"""
Given a video, it will return a TF dataset with its frames
"""
AUTOTUNE = tf.data.AUTOTUNE
# Get a numpy array with all the frames
frames = self.frames_from_video_file(videopath)
# convert the numpy array into a tf dataset
dataset = tf.data.Dataset.from_tensor_slices(frames)
# batch it
dataset = dataset.batch(self.batch_size, drop_remainder=True)
# preprocess it
dataset = self.preprocess_dataset(dataset)
# configure for performance
dataset = dataset.prefetch(buffer_size=AUTOTUNE)
return dataset
def preprocess_dataset(self, dataset):
# perform some pre-processing as we did to train the vae
normalization_layer = tf.keras.layers.Rescaling(1./255)
dillation_layer = tf.keras.layers.MaxPool2D(pool_size=5, strides=1, padding='same')
dataset = dataset.map(lambda x: tf.image.resize(
x, (self.input_dim[0], self.input_dim[1]) ))
if self.dillation == "normal":
dataset = dataset.map(lambda x: dillation_layer(x))
elif self.dillation == "reverse":
dataset = dataset.map(lambda x: 1-dillation_layer(1-x))
normalized_ds = dataset.map(lambda x: normalization_layer(x))
return normalized_ds
def frames_from_video_file(self, videopath):
"""
Given a video, it will return the frames in a numpy array
"""
frames = []
video_capture = cv2.VideoCapture(videopath)
while True:
# Take one frame every step_size
for _ in range(self.step_size):
ret, frame = video_capture.read()
if not ret:
break
if not ret:
break
# the following line would convert it from 0..255 to 0..1
# but we do a normalization layer later on, so I will comment this out
# frame = tf.image.convert_image_dtype(frame, tf.float32)
frames.append(frame)
# last bit changes from bgr to rgb
return np.array(frames)[..., [2, 1, 0]]
def prepare_TFRecord(self, frames, recipe):
# Tensorflow nomenclature to serialised data to create the TFRecords
frames_feature = tf.train.Feature(
bytes_list=tf.train.BytesList(value=[
tf.io.serialize_tensor(frames).numpy(),
])
)
recipe_feature = tf.train.Feature(
float_list=tf.train.FloatList(value=recipe),
)
features = tf.train.Features(feature={
'frames': frames_feature,
'recipe': recipe_feature
})
example = tf.train.Example(features=features)
return example.SerializeToString()
def visualise_latent_reconstructions(self, latent_vectors):
"""
creates images of the latent vectors generated, to see if the previous encoding
is correct
"""
batch_size = 32
ds = tf.data.Dataset.from_tensor_slices(latent_vectors)
ds = ds.batch(batch_size)
for entry in ds.take(1):
generated_images = self.vae.decoder(entry)
for i in range(batch_size):
img = utils.array_to_img(generated_images[i])
img.save("writer_img_%03d.png" % (i))
class TFRecordReader(object):
"""
Class handling reading the TFRecord into a dataset to use for training
Please check TFRecordWriter to see how it was saved to disk.
The TFRecord to be read must have been created with TFRecordWriter
"""
def __init__(self, tfrecordfile, batch_size = 64):
self.BATCH_SIZE = batch_size
self.tfrecordfile = tfrecordfile
self.AUTOTUNE = tf.data.AUTOTUNE
self.dataset = self.get_dataset() # this will set self.dataset
self.dataset_iter = iter(self.dataset)
def decode_frames(self, frames):
parsed_data = tf.io.parse_tensor(frames, tf.float32)
parsed_data = tf.reshape(parsed_data, [832, 200]) # explicit size needed for TPU
return parsed_data
def read_tfrecord(self, example):
TFREC_FORMAT = {
"frames": tf.io.FixedLenFeature([], tf.string), # tf.string means bytestring
"recipe": tf.io.FixedLenFeature([4], tf.float32)
}
example = tf.io.parse_single_example(example, TFREC_FORMAT)
video_latent_vectors = self.decode_frames(example['frames'])
return video_latent_vectors, example['recipe']
def load_dataset(self):
""" Loads a TFRecord and uses map to parse it, and stores it into self.dataset
Check https://keras.io/examples/keras_recipes/tfrecord/ "define load methods"
because this is basically a copy paste of that code with small modifications
Args:
properties (list, optional): Check parse_fn above
Returns:
dataset: Loadad TFRecord
"""
ignore_order = tf.data.Options()
ignore_order.experimental_deterministic = False # disable order, increase speed
dataset = tf.data.TFRecordDataset(
self.tfrecordfile
) # automatically interleaves reads from multiple files
dataset = dataset.with_options(
ignore_order
) # uses data as soon as it streams in, rather than in its original order
dataset = dataset.map(
self.read_tfrecord,
num_parallel_calls=self.AUTOTUNE
)
# returns the dataset as loaded
return dataset
def get_dataset(self):
"""Loads the TFRecord from the paths (filenames), and then shuffles the data and
divides it into batches.
"""
dataset = self.load_dataset()
dataset = dataset.shuffle(2048)
dataset = dataset.prefetch(buffer_size=self.AUTOTUNE)
dataset = dataset.batch(self.BATCH_SIZE, drop_remainder=True)
return dataset # .repeat()
def visualise_latent_reconstructions_and_recipes(self, vaepath):
batch_size = 32
data = next(self.dataset_iter)[0] # returns 576,200 (or whatever latent size)
ds = tf.data.Dataset.from_tensor_slices(data.numpy()[0])
ds = ds.batch(batch_size) # returns for example 9,32,200
vae, _ = load_vae_model(vaepath)
for entry in ds.take(1):
generated_images = vae.decoder(entry)
for i in range(batch_size):
img = utils.array_to_img(generated_images[i])
img.save("reader_img_%03d.png" % (i))