I am trying to build a sequence-to-sequence model in TensorFlow from scratch for English to German translation. After training the model when I try to do inference using a sentence it doesn't predict anything else other than token. I just need a little bit of guidance to complete this project on my own. I have added the necessary comments and code snippets for the given problem. A small help will be highly appreciated.
The code of inference is shown below:
import os.path
import tensorflow as tf
import spacy
import numpy as np
from Model import Sequence2SequenceModel
import pickle
from DataLoader import Vocabulary
spacy_eng = spacy.load(name='en_core_web_sm')
class Inference:
def __init__(self, model_path, input_sentence, eng_vocab, de_vocab,
max_sequence_length=128):
self.model_path = model_path
self.max_sequence_length = max_sequence_length
self.input_sentence = input_sentence
# Loads the saved model
self.model = tf.keras.models.load_model(self.model_path,
custom_objects={'Sequence2SequenceModel': Sequence2SequenceModel})
self.eng_vocab = eng_vocab
self.de_vocab = de_vocab
self.vocab = Vocabulary()
# loads the vocabulary which contains word-to-int and reverse mapping.
self.load_vocabulary()
def load_vocabulary(self):
vocab_path = os.path.join(os.curdir, 'Generated', 'vocabulary.pickle')
if not os.path.exists(vocab_path):
raise 'vocabulary.pickle file missing!'
file = open(vocab_path, 'rb')
result = pickle.load(file)
self.vocab.stoi_eng = result[0] # example {'A':23, 'place':3, ....}
self.vocab.itos_eng = result[1] # example {23:'A', 3:'place', ....}
self.vocab.stoi_de = result[2] # example {'und':30, 'platz':13, ....}
self.vocab.itos_de = result[3] # example {30:'und', 13:'platz', ....}
file.close()
print('Vocabulary Successfully build!!')
def inference_(self):
# Adding <sos> and <eos>
# Add the start token into source
numerical_source = [self.vocab.stoi_eng["<SOS>"]]
# Converting each word to index in our vocab, this function simply maps every
# word to its integer using the loaded vocabulary
numerical_source += self.vocab.numericalize_source(input_sentence)
# Add the <EOS> token in the end
numerical_source.append(self.vocab.stoi_eng["<EOS>"])
# Padding it to match the required size
numerical_source = tf.keras.utils.pad_sequences([numerical_source], padding='post',
maxlen=self.max_sequence_length)
numerical_source = tf.convert_to_tensor(numerical_source, dtype=tf.float32)
# Retrieving the encoder and the decoder
encoder = self.model.get_layer(name='encoder')
decoder = self.model.get_layer(name='decoder')
_, hidden, cell = encoder(numerical_source, training=False)
outputs = [self.vocab.stoi_de["<SOS>"]]
for _ in range(self.max_sequence_length):
previous_word = [outputs[-1]]
previous_word = tf.keras.utils.pad_sequences([previous_word], padding='post',
maxlen=self.max_sequence_length)
prediction, _, _ = decoder(previous_word, initial_state=[hidden, cell], training=False)
# The shape for prediction is (1, 128, 21780) where 21780 is the target vocab size.
# The shape is because of the embedding layer used in the encoder and decoder.
prediction = tf.nn.softmax(logits=prediction, axis=-1)
# In order to take the final guess I chose the argmax of the last series of the second dimension.
# I thought maybe it make sense to take the last sequence after looking at the full prediction.
# I also have this gut feeling. Maybe I am wrong here !!
best_guess = tf.argmax(prediction[0, -1, :])
outputs.append(best_guess.numpy())
if best_guess.numpy() == self.vocab.stoi_de['<EOS>']:
break
translated_sentence = [self.vocab.itos_de[idx] for idx in outputs]
# remove the start token
return translated_sentence[1:]
if __name__ == '__main__':
model_path = os.path.join(os.curdir, 'Generated', 'best_trained_weights.keras')
input_sentence = 'sit down'
inference = Inference(model_path=model_path,
input_sentence=input_sentence, eng_vocab=12133, de_vocab=21780)
translation = inference.inference_()
print(translation)
The model architecture is defined as follows:
import tensorflow as tf
@tf.keras.saving.register_keras_serializable(package="MyLayers")
class Encoder(tf.keras.Model):
def __init__(self, source_vocabulary, embedding_dimension, hidden_units):
super(Encoder, self).__init__()
self.source_vocabulary = source_vocabulary
self.embedding_dimension = embedding_dimension
self.hidden_units = hidden_units
# Embedding layer
self.embedding = tf.keras.layers.Embedding(self.source_vocabulary, self.embedding_dimension)
# LSTM layer
self.lstm = tf.keras.layers.LSTM(self.hidden_units, return_state=True, return_sequences=True)
def call(self, input_vector, training=False):
# input_vector = (batch_size, max_len) --> batch_size = # sentences, max_len = # words in sentence
input_vector = self.embedding(input_vector)
# input_vector = (batch_size, max_len, embedding_out) --> For every word in a sentence, we have the dimension
# of embedding out.
output, hidden_state, cell_state = self.lstm(input_vector, training=training)
return output, hidden_state, cell_state
def get_config(self):
base_config = super().get_config()
base_config.update({
'source_vocabulary': self.source_vocabulary,
'hidden_units': self.hidden_units,
'embedding_dimension': self.embedding_dimension,
})
return base_config
@tf.keras.saving.register_keras_serializable(package="MyLayers")
class Decoder(tf.keras.Model):
def __init__(self, target_vocab, embedding_dimension, output_dimension, hidden_units):
super(Decoder, self).__init__()
self.hidden_units = hidden_units
self.output_dimension = output_dimension
self.embedding_dimension = embedding_dimension
self.target_vocab = target_vocab
# Embedding
self.embedding = tf.keras.layers.Embedding(self.target_vocab, self.embedding_dimension)
self.lstm = tf.keras.layers.LSTM(self.hidden_units, return_state=True, return_sequences=True)
self.fc = tf.keras.layers.Dense(output_dimension)
def call(self, input_vector, initial_state, training=False):
input_vector = self.embedding(input_vector)
output, hidden_state, cell_state = self.lstm(input_vector, initial_state=initial_state, training=training)
prediction = self.fc(output)
return prediction, hidden_state, cell_state
def get_config(self):
base_config = super().get_config()
base_config.update({
'hidden_units': self.hidden_units,
'output_dimension': self.output_dimension,
'embedding_dimension': self.embedding_dimension,
'target_vocab': self.target_vocab
})
return base_config
class Sequence2SequenceModel:
def __init__(self, encoder, decoder, max_sequence_length):
super(Sequence2SequenceModel, self).__init__()
self.encoder = encoder
self.decoder = decoder
self.max_sequence_length = max_sequence_length
def return_model(self):
encoder_input_layer = tf.keras.Input(shape=[self.max_sequence_length])
decoder_input_layer = tf.keras.Input(shape=[self.max_sequence_length-1])
_, hidden, cell = self.encoder(encoder_input_layer)
prediction, _, _ = self.decoder(decoder_input_layer, initial_state=[hidden, cell])
model = tf.keras.Model(inputs=[encoder_input_layer, decoder_input_layer], outputs=prediction)
return model
if __name__ == '__main__':
input_vector = tf.random.uniform(shape=(32, 128), minval=0, maxval=1000, dtype=tf.int32)
target_vector = tf.random.uniform(shape=(32, 128), minval=0, maxval=1000, dtype=tf.int32)
encoder = Encoder(source_vocabulary=1000, embedding_dimension=256, hidden_units=64)
decoder = Decoder(target_vocab=1000, embedding_dimension=256, hidden_units=64, output_dimension=1000)
out, hidden, cell = encoder.call(input_vector)
out_, hidden, cell = decoder.call(input_vector=input_vector, initial_state=[hidden, cell])
model_object = Sequence2SequenceModel(encoder=encoder, decoder=decoder, max_sequence_length=128)
model = model_object.return_model()
output = model([input_vector, target_vector])