I want to reconstruct the ECG signal from PPG using a vanilla Transformer model. However, as shown in the attached image, the reconstruction is not very successful. Although it is true that I have only run it for 1 epoch, the model fails to follow the shape at all, and I am wondering if there might be an issue with the model. I set the start token to -3.
enter image description here
#%% Transformer model
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
x = layers.MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(inputs, inputs)
x = layers.Dropout(dropout)(x)
x = layers.LayerNormalization(epsilon=1e-6)(x)
x = layers.Add()([inputs, x])
res = layers.Dense(ff_dim, activation='relu')(x)
res = layers.Dense(inputs.shape[-1])(res)
res = layers.LayerNormalization(epsilon=1e-6)(res)
res = layers.Add()([x, res])
return res
def transformer_decoder(inputs, encoder_outputs, head_size, num_heads, ff_dim, dropout=0):
seq_len = tf.shape(inputs)[1]
look_ahead_mask = tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
look_ahead_mask = look_ahead_mask[tf.newaxis, tf.newaxis, :, :]
attention_output = layers.MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(
inputs, inputs, attention_mask=look_ahead_mask
)
attention_output = layers.Dropout(dropout)(attention_output)
attention_output = layers.LayerNormalization(epsilon=1e-6)(attention_output)
out1 = layers.Add()([inputs, attention_output])
attention_output = layers.MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(
out1, encoder_outputs
)
attention_output = layers.Dropout(dropout)(attention_output)
attention_output = layers.LayerNormalization(epsilon=1e-6)(attention_output)
out2 = layers.Add()([out1, attention_output])
res = layers.Dense(ff_dim, activation='relu')(out2)
res = layers.Dense(inputs.shape[-1])(res)
res = layers.LayerNormalization(epsilon=1e-6)(res)
out3 = layers.Add()([out2, res])
return out3
def get_positional_encoding(seq_length, d_model):
position = np.arange(seq_length)[:, np.newaxis]
div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))
positional_encoding = np.zeros((seq_length, d_model))
positional_encoding[:, 0::2] = np.sin(position * div_term)
positional_encoding[:, 1::2] = np.cos(position * div_term)
return tf.cast(positional_encoding, dtype=tf.float32)
def build_encoder_decoder_model(
ppg_input_shape,
ecg_input_shape,
head_size,
num_heads,
ff_dim,
num_transformer_blocks,
dropout=0,
mlp_units=[128],
mlp_dropout=0.4,
):
ppg_seq_length, ppg_d_model = ppg_input_shape
ecg_seq_length, ecg_d_model = ecg_input_shape
encoder_inputs = keras.Input(shape=(ppg_seq_length, ppg_d_model), name="encoder_inputs")
decoder_inputs = keras.Input(shape=(ecg_seq_length, ecg_d_model), name="decoder_inputs")
# Encoder
positional_encoding = get_positional_encoding(ppg_seq_length, ppg_d_model)
x = encoder_inputs + positional_encoding
for _ in range(num_transformer_blocks):
x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)
encoder_outputs = x
# Decoder
positional_encoding = get_positional_encoding(ecg_seq_length, ecg_d_model)
x = decoder_inputs + positional_encoding
for _ in range(num_transformer_blocks):
x = transformer_decoder(x, encoder_outputs, head_size, num_heads, ff_dim, dropout)
x = layers.Dense(ff_dim, activation='relu')(x)
x = layers.Dense(ecg_d_model)(x)
# Output shape should match ecg_seq_length
x = x[:, 1:, :]
model = keras.Model(inputs=[encoder_inputs, decoder_inputs], outputs=x)
return model
# Example usage:
ppg_input_shape = (200, 1)
ecg_input_shape = (201, 1)
head_size = 64
num_heads = 8
ff_dim = 128
num_transformer_blocks = 4
dropout = 0.1
mlp_units = [128]
mlp_dropout = 0.4
model = build_encoder_decoder_model(
ppg_input_shape,
ecg_input_shape,
head_size,
num_heads,
ff_dim,
num_transformer_blocks,
dropout,
mlp_units,
mlp_dropout
)
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.0001),
loss='mse',
metrics=['mae']
)
model.summary()
hist = model.fit([X_train_ppg, y_train_ecg], y_train_ecg[:,1:,:], epochs=1, validation_data=([X_val_ppg, y_val_ecg], y_val_ecg[:,1:,:]), batch_size=4)
import matplotlib.pyplot as plt
def mean_squared_error(y_true, y_pred):
y_true = np.array(y_true)
y_pred = np.array(y_pred)
mse = np.mean((y_true - y_pred) ** 2)
return mse
ppg_test = X_test_ppg[1].reshape(1,200,1)
decoder_input = add_elements_to_samples(np.zeros(ppg_test.shape),-3)
predictions = model.predict([ppg_test, decoder_input])
print(mean_squared_error(y_test_ecg[1,1:,],predictions))
plt.plot(ppg_test.reshape(-1))
plt.plot(predictions.reshape(-1))
plt.plot(y_test_ecg[1,1:,].reshape(-1))
The dataset consists of approximately 1.8 million samples, so I believe the amount of data should be sufficient, but the model seems unable to follow the shape of the ECG at all, making me wonder if there is a problem with the model.
최상훈 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.