In this notebook, we will explore the Double Pendulum Chaotic dataset from the Data Asset Exchange. This dataset consists of recordings of the chaotic movement of a double pendulum, along with frame-by-frame annotations of the pendulum components.
Using these data, we will attempt to predict the 'chaotic' movement of the pendulum components.
!wget https://dax-cdn.cdn.appdomain.cloud/dax-double-pendulum-chaotic/2.0.1/double-pendulum-chaotic.tar.gz
!tar xzf double-pendulum-chaotic.tar.gz
# @hidden_cell
from ibm_botocore.client import Config
import ibm_boto3
def download_file_cos(credentials, local_file_name=None, key=None):
if local_file_name is None:
local_file_name = credentials['FILE']
if key is None:
key = local_file_name
cos = ibm_boto3.client(service_name='s3',
ibm_api_key_id=credentials['IBM_API_KEY_ID'],
ibm_service_instance_id=credentials['IAM_SERVICE_ID'],
ibm_auth_endpoint=credentials['IBM_AUTH_ENDPOINT'],
config=Config(signature_version='oauth'),
endpoint_url=credentials['ENDPOINT'])
try:
res=cos.download_file(Bucket=credentials['BUCKET'], Key=key, Filename=local_file_name)
except Exception as e:
print(Exception, e)
else:
print("Dowloaded:", key, 'from IBM COS to local:', local_file_name)
return local_file_name
# @hidden_cell
# The following code contains the credentials for a file in your IBM Cloud Object Storage.
# You might want to remove those credentials before you share your notebook.
credentials_1 = {
'IAM_SERVICE_ID': 'iam-ServiceId-4e4c7fd2-d9d4-4a4f-a24b-b784e5f86276',
'IBM_API_KEY_ID': '6N8T7TC5Uq4sFkAdfq7B57W_wy-UKylzVlUaKqRV_-tY',
'ENDPOINT': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
'IBM_AUTH_ENDPOINT': 'https://iam.ng.bluemix.net/oidc/token',
'BUCKET': 'samplecollaboration-donotdelete-pr-jcxcqc478wcyqj',
'FILE': 'double-pendulum-example.mkv'
}
download_file_cos(credentials_1)
from IPython.display import Video
Video("double-pendulum-example.mkv")
Let's begin by installing and loading in the dependencies.
!pip install opencv-python
import os
import shutil
import random; random.seed(42)
import pandas as pd
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt
import tensorflow as tf # tensorflow-gpu==2.0.0
from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())
import cv2
tf.__version__
print("GPU Available: ", tf.test.is_gpu_available())
After unzipping the dataset archive, we can specify the location of the training, test, and validation directories.
# training data
train_dir = 'train_and_test_split/dpc_dataset_traintest_4_200_csv/train'
train_dir_video = 'train_and_test_split/dpc_dataset_traintest_4_200_h264/train'
# test data
test_inputs_dir = 'train_and_test_split/dpc_dataset_traintest_4_200_csv/test_inputs/'
test_targets_dir = 'train_and_test_split/dpc_dataset_traintest_4_200_csv/test_targets/'
test_targets_video = 'train_and_test_split/dpc_dataset_traintest_4_200_h264/test_targets/'
# validation data
validation_inputs_dir = 'train_and_test_split/dpc_dataset_traintest_4_200_csv/validation_inputs/'
validation_targets_dir = 'train_and_test_split/dpc_dataset_traintest_4_200_csv/validation_targets/'
validation_targets_video = 'train_and_test_split/dpc_dataset_traintest_4_200_h264/validation_targets/'
In this notebook we will attempt to predict the next 200 videoframes from the preceding 4 videoframes.
Essentially, this means predicting an image from another image or a group of images. First, the pix2pixhd model will be investigated. This model is able to predict an image from a source image. This model will be used to be predict the next frame from the preceding image.
Secondly, we will take a look at how we can do better. The marker position annotations can be used to predict the annotations in the next frame. Once we can reliably predict the position of the pendulum components in the next frame, we can develop a 'decoder' network to generate an image from the raw marker positions.
According to the dataset description, the coordinates of the pendulum markers (x_red, y_red, x_green, y_green, x_blue, y_blue
, respectively) are multiplied by 5 to match the pixel coordinates. The function absolute_pos_to_pixels
is defined below to map the coordinates in the annotation file to the pixel coordinates. With the plot_annotated_video
function, we can overlay the annotations with the frames in the video.
def absolute_pos_to_pixels(l):
'''Convert the absolute marker position to the pixel coordinates.'''
assert isinstance(l, list)
return [round(int(x)/5) for x in l]
def plot_annotated_frame(frame, marker_pixel_coordinates):
'''Plot the frame with the marker annotations.'''
# set the canvas size
plt.figure(figsize=(20,20))
# convert BGR to RGB
frame = frame[...,::-1]
# show the image
plt.imshow(frame)
ax = plt.gca()
# add annotations
x_red, y_red, x_green, y_green, x_blue, y_blue = marker_pixel_coordinates
red_marker = plt.Circle((y_red, x_red), 10, color='red', fill=False)
green_marker = plt.Circle((y_green, x_green), 10, color='green', fill=False)
blue_marker = plt.Circle((y_blue, x_blue), 10, color='blue', fill=False)
ax.add_artist(red_marker)
ax.add_artist(green_marker)
ax.add_artist(blue_marker)
def plot_annotated_video(video_file, marker_positions_file, delim_whitespace=True):
'''Plot the video frame by frame with the marker annotations.'''
# load the video
vidcap = cv2.VideoCapture(video_file)
# load the marker positions
marker_positions = pd.read_csv(marker_positions_file, header=None, delim_whitespace=delim_whitespace).values.tolist()
# for every frame in video
frame_count = 0
while vidcap.isOpened():
success, frame = vidcap.read()
if success:
# add the marker positions to the frame
x_red, y_red, x_green, y_green, x_blue, y_blue = absolute_pos_to_pixels(marker_positions[frame_count])
cv2.circle(frame, (y_red, x_red), 3, (0, 0, 255), -1)
cv2.circle(frame, (y_blue, x_blue), 3, (255, 0, 0), -1)
cv2.circle(frame, (y_green, x_green), 3, (0, 255, 0), -1)
# plot the annotated frame
cv2.imshow('frames', frame)
# wait for key press to go to next frame
cv2.waitKey(0)
frame_count += 1
else:
break
cv2.destroyAllWindows()
vidcap.release()
def video_to_frames(video_path, max_amount=np.Inf):
'''Convert a video into its frames.'''
frames = []
# load the video
vidcap = cv2.VideoCapture(video_path)
while vidcap.isOpened():
success, frame = vidcap.read()
frames.append(frame)
if len(frames) >= max_amount:
break
return frames
def save_annotated_video_from_frames(path, frames, marker_positions=None, fps=60, size=(480,480)):
'''Combine a series of frames into a video.'''
video = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*"mp4v"), fps, size)
for frame_count, frame in enumerate(frames):
if marker_positions is not None:
x_red, y_red, x_green, y_green, x_blue, y_blue = absolute_pos_to_pixels(marker_positions[frame_count])
cv2.circle(frame, (y_red, x_red), 3, (0, 0, 255), -1)
cv2.circle(frame, (y_blue, x_blue), 3, (255, 0, 0), -1)
cv2.circle(frame, (y_green, x_green), 3, (0, 255, 0), -1)
video.write(frame)
video.release()
Use opencv-python
to show a couple annotated frames of the pendulum.
# Load the video frames
video_frames = video_to_frames('train_and_test_split/dpc_dataset_traintest_4_200_h264/train/0.mkv', max_amount=10)
# Load the marker positions
marker_positions = pd.read_csv('train_and_test_split/dpc_dataset_traintest_4_200_csv/train/0.csv', header=None, delim_whitespace=True).values.tolist()
# Select a frame and annotations and plot
video_frame = video_frames[0]
markers = absolute_pos_to_pixels(marker_positions[0])
plot_annotated_frame(video_frame, markers)
pix2pixhd
: Image-based prediction¶Inspired by this Medium article, the pix2pixhd
(original repository) was trained on 17k+ frames for 20 epochs using the 'scheduled sampling' technique mentioned in the article.
Using the previous frame as input for the next frame, there is no temporal motion information available. This results in a strong divergence from reality as we know it; characterized by the laws of physics.
An example video is shown below.
# @hidden_cell
# The following code contains the credentials for a file in your IBM Cloud Object Storage.
# You might want to remove those credentials before you share your notebook
credentials_2 = {
'IAM_SERVICE_ID': 'iam-ServiceId-4e4c7fd2-d9d4-4a4f-a24b-b784e5f86276',
'IBM_API_KEY_ID': '6N8T7TC5Uq4sFkAdfq7B57W_wy-UKylzVlUaKqRV_-tY',
'ENDPOINT': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
'IBM_AUTH_ENDPOINT': 'https://iam.ng.bluemix.net/oidc/token',
'BUCKET': 'samplecollaboration-donotdelete-pr-jcxcqc478wcyqj',
'FILE': 'double-pendulum-pix2pixhd_20.mp4'
}
download_file_cos(credentials_2)
Video('double-pendulum-pix2pixhd_20.mp4')
Illustrated by the above video, the lack of temporal information is problematic. The recently (2019) released vid2vid network proposes huge improvements, although training and optimizing this network in a notebook is beyond the scope of this project.
Therefore, in accordance with the paper, predicting the next pendulum marker coordinates based on a set of preceding marker coordinates is more likely to succeed within the constraints of a notebook.
The authors of the dataset have provided a train and test split for the data. Based on 4 frames of marker coordinates, the goal is to predict the next 200 frames of marker coordinates.
The annotation files are formatted as follows:
x_red, y_red, x_green, y_green, x_blue, y_blue
and all coordinates are multiplied by factor 5, so the pixel coordinates can be obtained following a division by the same factor.
A couple data transformation functions are defined below.
Data transformation functions
# some constants
DEFAULT_X_RED, DEFAULT_Y_RED = (240, 240)
PIXEL_DISTANCE_GREEN_TO_RED = 118 # approx. value | calculated with the Pythagorean theorem and averaged: np.sqrt((y_green-y_red)**2 + (x_green-x_red)**2)
PIXEL_DISTANCE_BLUE_TO_GREEN = 90 # approx. value | calculated with the Pythagorean theorem and averaged: np.sqrt((y_blue-y_green)**2 + (x_blue-x_green)**2)
def raw_to_pixel(l):
'''Convert the raw coordinates to pixel coordinates.'''
assert isinstance(l, list)
return [x/5 for x in l]
def pixel_to_raw(l):
'''Convert the pixel coordinates to raw coordinates.'''
assert isinstance(l, list)
return [x*5 for x in l]
def raw_cartesian_to_polar_angles(l):
'''Convert the cartesian coordinates to polar coordinates.'''
assert isinstance(l, list)
x_red, y_red, x_green, y_green, x_blue, y_blue = raw_to_pixel(l)
angle_green_red = np.arctan((y_green-y_red)/(x_green-x_red+0.001))
angle_blue_green = np.arctan((y_blue-y_green)/(x_blue-x_green+0.001))
return [np.sin(angle_green_red), np.cos(angle_green_red), np.sin(angle_blue_green), np.cos(angle_blue_green)]
def polar_angles_to_raw_cartesian(l):
'''Convert the polar coordinates back to cartesian coordinates.'''
assert isinstance(l, list)
sin_angle_green_red, cos_angle_green_red, sin_angle_blue_green, cos_angle_blue_green = l
y_green = PIXEL_DISTANCE_GREEN_TO_RED * sin_angle_green_red + DEFAULT_Y_RED
x_green = PIXEL_DISTANCE_GREEN_TO_RED * cos_angle_green_red + DEFAULT_X_RED
y_blue = PIXEL_DISTANCE_BLUE_TO_GREEN * sin_angle_blue_green + y_green
x_blue = PIXEL_DISTANCE_BLUE_TO_GREEN * cos_angle_blue_green + x_green
return pixel_to_raw([DEFAULT_X_RED, DEFAULT_Y_RED, x_green, y_green, x_blue, y_blue])
Verify that the raw -> pixel
conversion and pixel -> raw
works as intended.
raw_coordinates = list(np.array([240, 240, 357.4438349670886, 228.55685234634907, 444.41827493559794, 205.41712909467287])*5)
pixel_coordinates = raw_to_pixel(raw_coordinates)
new_raw_coordinates = pixel_to_raw(pixel_coordinates)
assert raw_coordinates == new_raw_coordinates, '`Raw -> Pixel` and `Pixel -> Raw` coordinate conversion methods are malfunctioning.'
Verify that the cartesian -> polar
conversion and polar -> cartesian
conversion works as intended.
raw_cartesian = list(np.array([240, 240, 357.4438349670886, 228.55685234634907, 444.41827493559794, 205.41712909467287])*5)
polar = raw_cartesian_to_polar_angles(raw_cartesian)
new_raw_cartesian = polar_angles_to_raw_cartesian(polar)
assert [round(x) for x in raw_cartesian] == [round(x) for x in new_raw_cartesian], 'Cartesian to Polar and Polar to Cartesian methods are malfunctioning.'
Data reading functions
def parse_training_annotations(csv_file):
'''Parse the training annotations from a CSV file.'''
X_data = []
y_data = []
f = pd.read_csv(csv_file, header=None, delim_whitespace=True, engine='python')
temp = []
for i, row in f.iterrows():
if len(temp) < 4:
# convert the cartesian pixel coordinates to polar coordinates
temp.append(raw_cartesian_to_polar_angles(row.to_list()))
else:
# the output frame
# convert the cartesian pixel coordinates to polar coordinates
next_frame = raw_cartesian_to_polar_angles(row.to_list())
# save
X_data.append(temp.copy())
y_data.append(next_frame.copy())
# add output frame to the inputs and remove the first
temp.pop(0)
temp.append(next_frame)
return X_data, y_data
Load in data
BATCH_SIZE = 4000
# load in all separate files
X = []
y = []
for filename in tqdm([x for x in os.listdir(train_dir) if not x.startswith('.')]):
# load in a file
X_data, y_data = parse_training_annotations(os.path.join(train_dir, filename))
# extract sequential batches and add them to the training data
for i in range(len(X_data) // BATCH_SIZE):
X_batch = X_data[i * BATCH_SIZE:(i + 1) * BATCH_SIZE]
y_batch = y_data[i * BATCH_SIZE:(i + 1) * BATCH_SIZE]
X.append(X_batch)
y.append(y_batch)
num_batches = len(X)
num_records = num_batches * BATCH_SIZE
print(f'{num_records} training records spread over {num_batches} batches of size {BATCH_SIZE}')
# convert the data to a tf.dataset
X = np.array(X)
y = np.array(y)
tfdata_X = tf.data.Dataset.from_tensor_slices(X)
tfdata_y = tf.data.Dataset.from_tensor_slices(y)
training_data = tf.data.Dataset.zip((tfdata_X, tfdata_y))
# shuffle the batches in the dataset (not within the batch)
training_data = training_data.shuffle(buffer_size = num_batches, seed = 42)
Train a model
Define the model first. Because this is a notebook running on basic compute in the cloud, we will keep the model and training process very basic.
# RNN
rnn_model = tf.keras.Sequential([
tf.keras.layers.LSTM(32, input_shape=(4, 4), return_sequences=True),
tf.keras.layers.LSTM(32, return_sequences=True),
tf.keras.layers.LSTM(32, return_sequences=True),
tf.keras.layers.LSTM(32, return_sequences=False),
tf.keras.layers.Dense(4) # the output layer
])
Adam = tf.keras.optimizers.Adam(1e-3)
rnn_model.compile(optimizer=Adam, loss='mean_squared_error', metrics=['mean_squared_error'])
model = rnn_model
model.summary()
Define a function to visualize model training.
def plot_history(history):
"""Plot the training process of a model."""
hist = pd.DataFrame(history.history)
hist['epoch'] = history.epoch
plt.figure()
plt.xlabel('Epoch')
plt.ylabel('Mean Squared Error [MSE]')
plt.plot(hist['epoch'], hist['mean_squared_error'].tolist(),
label='Train Error')
plt.legend()
Train the model. Feel free to adjust and extend the training process, as this will likely increase the performance of the model.
%%time
EPOCHS = 10
# TF 1.X
if tf.__version__[0] == '1':
sess = tf.Session()
history = model.fit(training_data.repeat(), epochs = EPOCHS, steps_per_epoch=num_records//BATCH_SIZE)
# TF 2.0
elif tf.__version__[0] == '2':
history = model.fit(training_data, epochs = EPOCHS)
plot_history(history)
Plot the prediction results
With the model trained, we can show the performance by overlaying the prediction results (marker coordinates) with the actual frames.
# load 204 video frames
input_video = 'train_and_test_split/dpc_dataset_traintest_4_200_h264/train/0.mkv'
videoframes = video_to_frames(input_video, max_amount=204)
# load the 4 input marker positions
input_annotation_file = 'train_and_test_split/dpc_dataset_traintest_4_200_csv/train/0.csv'
input_marker_positions = pd.read_csv(input_annotation_file, header=None, delim_whitespace=True).values.tolist()[:4]
# predict the next 200 positions
predicted_positions = input_marker_positions.copy()
for _ in tqdm(range(200)):
# predict
input_vector = np.array([raw_cartesian_to_polar_angles(x) for x in input_marker_positions]).reshape((1,4,4))
next_pos = polar_angles_to_raw_cartesian(model.predict(input_vector)[0].tolist())
predicted_positions.append(next_pos)
# update inputs
input_marker_positions.pop(0)
input_marker_positions.append(next_pos)
# generate video
save_annotated_video_from_frames(path='temp.mp4', frames=videoframes, fps=5, marker_positions=predicted_positions)
Video('temp.mp4')
# clean up variables
del videoframes
del training_data
del X
del y
del tfdata_X
del tfdata_y
If the prediction results are deemed sufficiently accurate, the predicted marker coordinates can be used to generate a corresponding image. Combining the 200 generated frames, a video can be obtained and compared to the ground truth.
In order to convert a vector of pixel coordinates to an image, a decoder network must be developed, which is a very challenging task in itself. Decoder or 'generator' networks are typically part of Generative Adversarial Networks (GANs), albeit using a random vector as input. GANs can therefore be used as inspiration.
Training a decoder/generator network requires training data, for which data reading functions are defined below.
Data reading functions
Standardizing an image for deep learning typically requires substraction of the mean of the array, followed by division by the maximum value. Because the majority of the image is black (rgb(0, 0, 0)
), the mean of the image is close to zero. Because some pixels in the image are white (rgb(255,255,255)
), we can assume that the maximum value is 255. Scaling can therefore be simplified to a division by 255.
def video_to_decoder_training_data(video_file, marker_positions_file, save_every_x_frames=1, delim_whitespace=True):
"""
Load the videoframes and corresponding annotations.
The value of `save_every_x_frames` determines the temporal 'spacing' between the saved frames.
"""
# load the video
vidcap = cv2.VideoCapture(video_file)
# load the marker positions
marker_positions = pd.read_csv(marker_positions_file, header=None, delim_whitespace=delim_whitespace).values.tolist()
# for every frame in video
frame_count = 0
while vidcap.isOpened():
success, frame = vidcap.read()
if success:
# get the marker pixel coordinates (x_red, y_red, x_green, y_green, x_blue, y_blue)
m = np.array(raw_to_pixel(marker_positions[frame_count]), dtype=np.float32).reshape((1,6))
# save the frame and marker positions
if frame_count % save_every_x_frames == 0:
frame = frame.astype(np.float32)
if frame_count == 0:
# create a new sequence
all_coordinates = np.expand_dims(m.copy(), axis=0)
all_frames = np.expand_dims(frame.copy(), axis=0)
else:
# add to the existing sequence
all_coordinates = np.append(all_coordinates, np.expand_dims(m.copy(), axis=0), axis=0)
all_frames = np.append(all_frames, np.expand_dims(frame.copy(), axis=0), axis=0)
# add count to frame_count
frame_count += 1
else:
break
return all_coordinates, all_frames
def scale_image_array(frame):
"""Scale an image array."""
return frame / 255
def unscale_image_array(frame):
"""Unscale an image array."""
return frame * 255
def postprocess_frame_array(array):
"""Ensure the maximum and minimum value in the array are 255 and 0, respectively."""
array[array > 255] = 255
array[array < 0] = 0
return array.astype(np.int16)
def normalize_coordinates(coordinates):
"""Normalize an array of pixel coordinates by dividing by the maximal pixel value."""
return coordinates / 480
def denormalize_coordinates(coordinates):
"""Denormalize an array of pixel coordinates by multiplying with the maximal pixel value."""
return coordinates * 480
Load video frames and corresponding annotation files
When training a model for production, we want as much data as possible. In this notebook however, we will limit ourselves to a couple thousand frames.
SAVE_EVERY_X_FRAMES = 25 # lower this to reduce the temporal separation between training frames
TRAINING_FRAMES_DESIRED = 3_000 # lower this for fewer training examples (faster processing)
ENTRIES_PER_TFRECORD_FILE = 100 # lower this for lower memory usage
if os.path.exists('tfrecords'):
shutil.rmtree('tfrecords')
os.mkdir('tfrecords')
total_files_saved = 0
for i, filename in tqdm(enumerate(os.listdir(train_dir_video))):
video_file = os.path.join(train_dir_video, filename)
marker_positions_file = os.path.join(train_dir, filename.replace('mkv', 'csv'))
# load the training data
if i == 0:
all_coordinates, all_frames = video_to_decoder_training_data(video_file, marker_positions_file, save_every_x_frames=SAVE_EVERY_X_FRAMES)
else:
c, f = video_to_decoder_training_data(video_file, marker_positions_file, save_every_x_frames=SAVE_EVERY_X_FRAMES)
all_coordinates = np.append(all_coordinates, c, axis=0)
all_frames = np.append(all_frames, f, axis=0)
while all_coordinates.shape[0] >= ENTRIES_PER_TFRECORD_FILE:
# take a subset
coordinate_buffer = all_coordinates[:ENTRIES_PER_TFRECORD_FILE, :].copy()
frame_buffer = all_frames[:ENTRIES_PER_TFRECORD_FILE, :].copy()
# write a TFRecord file
record_file = f'tfrecords/coordinate_to_image_set{total_files_saved+1}.tfrecord'
with tf.io.TFRecordWriter(record_file) as writer:
for c, f in zip(coordinate_buffer, frame_buffer):
c = c[0]
# convert set of coordinates and a frame to a tf.Example
image_bytestring = cv2.imencode('.png', f)[1].tobytes()
feature = {
'coordinates': tf.train.Feature(float_list = tf.train.FloatList(value = c)),
'frame': tf.train.Feature(bytes_list = tf.train.BytesList(value=[image_bytestring]))
}
tf_example = tf.train.Example(features=tf.train.Features(feature=feature))
writer.write(tf_example.SerializeToString())
# update the remaining array
all_coordinates = all_coordinates[ENTRIES_PER_TFRECORD_FILE:, :]
all_frames = all_frames[ENTRIES_PER_TFRECORD_FILE:, :]
# another file saved
total_files_saved += 1
total_data_entries = total_files_saved * ENTRIES_PER_TFRECORD_FILE
print(f'Data entries collected: {total_data_entries}')
if total_data_entries >= TRAINING_FRAMES_DESIRED:
break
def extract_fn(data_record):
features = {
# Extract features using keys set during creation
'coordinates': tf.io.FixedLenFeature([6], tf.float32), # 6 because 6 coordinates
'frame': tf.io.FixedLenFeature([], tf.string)
}
return tf.io.parse_single_example(data_record, features)
def normalize_fn(data_record):
return [tf.reshape(normalize_coordinates(data_record['coordinates']), (1, 6)), scale_image_array(tf.image.decode_png(data_record['frame'], channels=3))]
# Initialize all tfrecord paths
tfrecord_files = [os.path.join('tfrecords', x) for x in os.listdir('tfrecords') if x.endswith('.tfrecord')]
dataset = tf.data.TFRecordDataset(tfrecord_files)
dataset = dataset.map(extract_fn)
dataset = dataset.map(normalize_fn)
decoder_training_data = dataset.shuffle(buffer_size=min(2_000, total_data_entries))
In addition, verify proper functionality of the image standardization functions.
# select a frame
frame = all_frames[0, ...]
# standardize it
std_frame = scale_image_array(frame)
# destandardize it
new_frame = unscale_image_array(std_frame)
# postprocessing
new_frame_pp = postprocess_frame_array(new_frame)
# the original frame should be equal to the output frame
assert frame.all() == new_frame_pp.all()
Build the decoder network
This section contains a very basic example of a decoder network. Feel free to experiment and extend this network as you wish.
INPUT_VECTOR_SHAPE = (1, 6)
OUTPUT_IMAGE_SHAPE = (480, 480, 3)
def build_base_decoder(input_vector_shape=INPUT_VECTOR_SHAPE, output_image_shape=OUTPUT_IMAGE_SHAPE):
"""Build the decoder network."""
model = tf.keras.Sequential([
tf.keras.layers.Dense(256, input_shape=input_vector_shape),
tf.keras.layers.LeakyReLU(alpha=0.2),
tf.keras.layers.BatchNormalization(momentum=0.8),
tf.keras.layers.Dense(512),
tf.keras.layers.LeakyReLU(alpha=0.2),
tf.keras.layers.BatchNormalization(momentum=0.8),
tf.keras.layers.Dense(np.prod(output_image_shape), activation='tanh'),
tf.keras.layers.Reshape(output_image_shape)])
optimizer = tf.keras.optimizers.Adam(0.0001, 0.5)
model.compile(loss='binary_crossentropy', optimizer=optimizer)
return model
decoder = build_base_decoder()
decoder.summary()
The following function helps us follow the training process. Another option is using tensorboard in the notebook, but this requires additional setup.
def plot_decoder_training(history):
"""Plot the change in loss over the training epochs."""
hist = pd.DataFrame(history.history)
hist['epoch'] = history.epoch
plt.figure()
plt.xlabel('Epoch')
plt.ylabel('Binary Cross-Entropy [BCE]')
plt.plot(hist['epoch'], hist['loss'], label='loss')
plt.legend()
Train the model.
%%time
BATCH_SIZE = 32
EPOCHS = 10
# TF 1.X
if tf.__version__[0] == '1':
sess = tf.Session()
training_history = decoder.fit(decoder_training_data.batch(batch_size = BATCH_SIZE).repeat(),
epochs = EPOCHS, steps_per_epoch=total_data_entries//BATCH_SIZE)
# TF 2.0
elif tf.__version__[0] == '2':
training_history = decoder.fit(decoder_training_data.batch(batch_size = BATCH_SIZE), epochs = EPOCHS)
plot_decoder_training(training_history)
Upon completion of the model training, the performance of the model can be evaluated with a marker coordinate used in the training process. For a more representative assessment, validation data is typically used for this. However, during early development of the decoder network, it is far easier to work with training data to determine what architectures are able to provide accurate representations of the double pendulum.
# select a random set of coordinates from the training data
random_index = np.random.randint(low=0, high=all_coordinates.shape[0])
input_pixel_coordinates = all_coordinates[random_index, ...][0]
ground_truth_img = all_frames[random_index, ...].astype(np.int32)
print(f"The input pixel coordinates:\n{input_pixel_coordinates}")
# normalize the coordinates
input_vector = normalize_coordinates(np.array(input_pixel_coordinates).reshape((1,1,6)))
# generate the output image
output_img_standardized = decoder.predict(input_vector)[0]
# destandardize the output
output_img = unscale_image_array(output_img_standardized)
# post-process the output image
output_img = postprocess_frame_array(output_img)
print(np.max(output_img), np.min(output_img))
assert np.max(output_img) <= 255 and np.min(output_img) >= 0
# plot the result
plot_annotated_frame(output_img, input_pixel_coordinates)
plot_annotated_frame(ground_truth_img, input_pixel_coordinates)
With model development complete, the combination of predicting future marker coordinates and generating an image based on the new set of coordinates is straightforward.
In this notebook, we have attempted to predict the chaotic movement of a Double Pendulum using two steps:
This notebook is merely an example analysis. A couple things that can be improved are:
vid2vid
model