How to perform inference for sequence 2 sequence models in tensorflow?

26 views Asked by At

I am trying to build a sequence-to-sequence model in TensorFlow from scratch for English to German translation. After training the model when I try to do inference using a sentence it doesn't predict anything else other than token. I just need a little bit of guidance to complete this project on my own. I have added the necessary comments and code snippets for the given problem. A small help will be highly appreciated.

The code of inference is shown below:

import os.path
import tensorflow as tf
import spacy
import numpy as np

from Model import Sequence2SequenceModel

import pickle
from DataLoader import Vocabulary

spacy_eng = spacy.load(name='en_core_web_sm')


class Inference:
    def __init__(self, model_path, input_sentence, eng_vocab, de_vocab,
                 max_sequence_length=128):
        self.model_path = model_path
        self.max_sequence_length = max_sequence_length
        self.input_sentence = input_sentence

        # Loads the saved model

        self.model = tf.keras.models.load_model(self.model_path,
                                                custom_objects={'Sequence2SequenceModel': Sequence2SequenceModel})
        self.eng_vocab = eng_vocab
        self.de_vocab = de_vocab
        self.vocab = Vocabulary()

        # loads the vocabulary which contains word-to-int and reverse mapping.

        self.load_vocabulary()

    def load_vocabulary(self):
        vocab_path = os.path.join(os.curdir, 'Generated', 'vocabulary.pickle')
        if not os.path.exists(vocab_path):
            raise 'vocabulary.pickle file missing!'
        file = open(vocab_path, 'rb')
        result = pickle.load(file)
        self.vocab.stoi_eng = result[0] # example {'A':23, 'place':3, ....}
        self.vocab.itos_eng = result[1] # example {23:'A', 3:'place', ....}
        self.vocab.stoi_de = result[2] # example {'und':30, 'platz':13, ....}
        self.vocab.itos_de = result[3] # example {30:'und', 13:'platz', ....}
        file.close()
        print('Vocabulary Successfully build!!')

    def inference_(self):
        # Adding <sos> and <eos>
        # Add the start token into source

        numerical_source = [self.vocab.stoi_eng["<SOS>"]]
        
        # Converting each word to index in our vocab, this function simply maps every
        # word to its integer using the loaded vocabulary

        numerical_source += self.vocab.numericalize_source(input_sentence)
        
        # Add the <EOS> token in the end

        numerical_source.append(self.vocab.stoi_eng["<EOS>"])

        # Padding it to match the required size

        numerical_source = tf.keras.utils.pad_sequences([numerical_source], padding='post',
                                                        maxlen=self.max_sequence_length)
        numerical_source = tf.convert_to_tensor(numerical_source, dtype=tf.float32)


        # Retrieving the encoder and the decoder

        encoder = self.model.get_layer(name='encoder')
        decoder = self.model.get_layer(name='decoder')

        _, hidden, cell = encoder(numerical_source, training=False)
        outputs = [self.vocab.stoi_de["<SOS>"]]

        for _ in range(self.max_sequence_length):
            previous_word = [outputs[-1]]
            previous_word = tf.keras.utils.pad_sequences([previous_word], padding='post',
                                                         maxlen=self.max_sequence_length)
            prediction, _, _ = decoder(previous_word, initial_state=[hidden, cell], training=False)

            # The shape for prediction is (1, 128, 21780) where 21780 is the target vocab size. 
            # The shape is because of the embedding layer used in the encoder and decoder.

            prediction = tf.nn.softmax(logits=prediction, axis=-1)

            # In order to take the final guess I chose the argmax of the last series of the second dimension.

            # I thought maybe it make sense to take the last sequence after looking at the full prediction.

            # I also have this gut feeling. Maybe I am wrong here !!
            best_guess = tf.argmax(prediction[0, -1, :]) 
            outputs.append(best_guess.numpy())

            if best_guess.numpy() == self.vocab.stoi_de['<EOS>']:
                break

        translated_sentence = [self.vocab.itos_de[idx] for idx in outputs]

        # remove the start token
        return translated_sentence[1:]


if __name__ == '__main__':
    model_path = os.path.join(os.curdir, 'Generated', 'best_trained_weights.keras')
    input_sentence = 'sit down'
    inference = Inference(model_path=model_path,
                          input_sentence=input_sentence, eng_vocab=12133, de_vocab=21780)
    translation = inference.inference_()
    print(translation)

The model architecture is defined as follows:

import tensorflow as tf


@tf.keras.saving.register_keras_serializable(package="MyLayers")
class Encoder(tf.keras.Model):
    def __init__(self, source_vocabulary, embedding_dimension, hidden_units):
        super(Encoder, self).__init__()
        self.source_vocabulary = source_vocabulary
        self.embedding_dimension = embedding_dimension
        self.hidden_units = hidden_units

        # Embedding layer
        self.embedding = tf.keras.layers.Embedding(self.source_vocabulary, self.embedding_dimension)

        # LSTM layer
        self.lstm = tf.keras.layers.LSTM(self.hidden_units, return_state=True, return_sequences=True)

    def call(self, input_vector, training=False):
        # input_vector = (batch_size, max_len) --> batch_size = # sentences, max_len = # words in sentence
        input_vector = self.embedding(input_vector)
        # input_vector = (batch_size, max_len, embedding_out) --> For every word in a sentence, we have the dimension
        # of embedding out.
        output, hidden_state, cell_state = self.lstm(input_vector, training=training)
        return output, hidden_state, cell_state

    def get_config(self):
        base_config = super().get_config()
        base_config.update({
            'source_vocabulary': self.source_vocabulary,
            'hidden_units': self.hidden_units,
            'embedding_dimension': self.embedding_dimension,
        })
        return base_config


@tf.keras.saving.register_keras_serializable(package="MyLayers")
class Decoder(tf.keras.Model):
    def __init__(self, target_vocab, embedding_dimension, output_dimension, hidden_units):
        super(Decoder, self).__init__()
        self.hidden_units = hidden_units
        self.output_dimension = output_dimension
        self.embedding_dimension = embedding_dimension
        self.target_vocab = target_vocab

        # Embedding
        self.embedding = tf.keras.layers.Embedding(self.target_vocab, self.embedding_dimension)
        self.lstm = tf.keras.layers.LSTM(self.hidden_units, return_state=True, return_sequences=True)
        self.fc = tf.keras.layers.Dense(output_dimension)

    def call(self, input_vector, initial_state, training=False):
        input_vector = self.embedding(input_vector)
        output, hidden_state, cell_state = self.lstm(input_vector, initial_state=initial_state, training=training)
        prediction = self.fc(output)
        return prediction, hidden_state, cell_state

    def get_config(self):
        base_config = super().get_config()
        base_config.update({
            'hidden_units': self.hidden_units,
            'output_dimension': self.output_dimension,
            'embedding_dimension': self.embedding_dimension,
            'target_vocab': self.target_vocab

        })
        return base_config


class Sequence2SequenceModel:
    def __init__(self, encoder, decoder, max_sequence_length):
        super(Sequence2SequenceModel, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.max_sequence_length = max_sequence_length

    def return_model(self):
        encoder_input_layer = tf.keras.Input(shape=[self.max_sequence_length])
        decoder_input_layer = tf.keras.Input(shape=[self.max_sequence_length-1])

        _, hidden, cell = self.encoder(encoder_input_layer)
        prediction, _, _ = self.decoder(decoder_input_layer, initial_state=[hidden, cell])
        model = tf.keras.Model(inputs=[encoder_input_layer, decoder_input_layer], outputs=prediction)
        return model


if __name__ == '__main__':
    input_vector = tf.random.uniform(shape=(32, 128), minval=0, maxval=1000, dtype=tf.int32)
    target_vector = tf.random.uniform(shape=(32, 128), minval=0, maxval=1000, dtype=tf.int32)

    encoder = Encoder(source_vocabulary=1000, embedding_dimension=256, hidden_units=64)
    decoder = Decoder(target_vocab=1000, embedding_dimension=256, hidden_units=64, output_dimension=1000)

    out, hidden, cell = encoder.call(input_vector)
    out_, hidden, cell = decoder.call(input_vector=input_vector, initial_state=[hidden, cell])

    model_object = Sequence2SequenceModel(encoder=encoder, decoder=decoder, max_sequence_length=128)
    model = model_object.return_model()
    output = model([input_vector, target_vector])
0

There are 0 answers