InvalidArgumentError when reading parquet files into Keras via Petastorm

537 views Asked by At

I'm trying to read in data from parquet for a language model.

The parquet contains two columns:

  • target (int)
  • feature_vec (int array)

I'm adapting the code from this post (Which works for me). When I try the code below I get an InvalidArgumentError when I try to run the model.

import random
from pyspark.sql import Row

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Embedding,  LSTM
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import  Sequence

from petastorm import make_batch_reader
from petastorm.etl.dataset_metadata import materialize_dataset
import pyarrow.parquet as pq

## build toy dataset
vocab_size = 250
seq_length = 100
parquet_path = '/dbfs/ml/langmod/petastorm/toy_dataset.parquet'

def row_generator(x):
    """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
    return Row(target = random.randint(0, vocab_size), feature_vec = [random.randint(0, vocab_size) for i in range(seq_length)])


rows_count = 1000
rows_rdd = sc.parallelize(range(rows_count)).map(row_generator)

df = spark.createDataFrame(rows_rdd)


df.write.parquet(parquet_path, mode = 'overwrite')


underscore_files = [f for f in os.listdir(get_local_path(parquet_path)) if f.startswith("_")]
pq.EXCLUDED_PARQUET_PATHS.update(underscore_files)
## build model and read in data from parquet, converting to tf.Dataset format

with make_batch_reader('file:/dbfs/' + parquet_path, num_epochs = None) as train_reader:
    train_dataset = make_petastorm_dataset(train_reader).map(lambda x: (tf.convert_to_tensor(x.feature_vec), tf.one_hot(x.target, depth = vocab_size)))   \
                                                        .apply(tf.data.experimental.unbatch())                                                            \
                                                        .batch(10, drop_remainder = True)


    model = Sequential()
    model.add(Embedding(vocab_size, 20, mask_zero = True, input_length = None, name = 'embed'))
    model.add(LSTM(50, return_sequences = False, name = 'lstm1'))
    model.add(Dense(vocab_size, activation = 'softmax', name = 'dense_layer'))
    model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['categorical_accuracy'])                                                         
    model.fit(train_dataset, epochs = 2, steps_per_epoch = 10, verbose = 1)

Error:

InvalidArgumentError                      Traceback (most recent call last)
<command-2202319388737190> in <module>
     10     model.add(Dense(vocab_size, activation = 'softmax', name = 'dense_layer'))
     11     model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['categorical_accuracy'])
---> 12     model.fit(train_dataset, epochs = 2, steps_per_epoch = 10, verbose = 1)

/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training.py in fit(self, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, max_queue_size, workers, use_multiprocessing, **kwargs)
    725         max_queue_size=max_queue_size,
    726         workers=workers,
--> 727         use_multiprocessing=use_multiprocessing)
    728 
    729   def evaluate(self,

/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_arrays.py in fit(self, model, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, **kwargs)
    673         validation_steps=validation_steps,
    674         validation_freq=validation_freq,
--> 675         steps_name='steps_per_epoch')
    676 
    677   def evaluate(self,

/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_arrays.py in model_iteration(model, inputs, targets, sample_weights, batch_size, epochs, verbose, callbacks, val_inputs, val_targets, val_sample_weights, shuffle, initial_epoch, steps_per_epoch, validation_steps, validation_freq, mode, validation_in_fit, prepared_feed_values_from_dataset, steps_name, **kwargs)
    298           else:
    299             actual_inputs = ins()
--> 300           batch_outs = f(actual_inputs)
    301         except errors.OutOfRangeError:
    302           if is_dataset:

/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/backend.py in __call__(self, inputs)
   3474 
   3475     fetched = self._callable_fn(*array_vals,
-> 3476                                 run_metadata=self.run_metadata)
   3477     self._call_fetch_callbacks(fetched[-len(self._fetches):])
   3478     output_structure = nest.pack_sequence_as(

/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/client/session.py in __call__(self, *args, **kwargs)
   1470         ret = tf_session.TF_SessionRunCallable(self._session._session,
   1471                                                self._handle, args,
-> 1472                                                run_metadata_ptr)
   1473         if run_metadata:
   1474           proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)

InvalidArgumentError: 2 root error(s) found.
  (0) Invalid argument: transpose expects a vector of size 4. But input(1) is a vector of size 3
     [[{{node lstm1_3/transpose}}]]
     [[lstm1_3/TensorArrayUnstack_1/range/_459]]
  (1) Invalid argument: transpose expects a vector of size 4. But input(1) is a vector of size 3
     [[{{node lstm1_3/transpose}}]]
0 successful operations.
0 derived errors ignored.

This error is surprising since it seems to say there is a problem with the shape of the middle layer of the model, which should just fit the shape of the output of the previous layer.

However, if I convert the dataset to an iterator and then run the output X and Ys separately, it runs as expected for that batch:

with make_batch_reader('file:/dbfs/' + parquet_path, num_epochs = None) as train_reader:
  train_dataset = make_petastorm_dataset(train_reader).map(lambda x: (tf.convert_to_tensor(x.feature_vec), tf.one_hot(x.target, depth = vocab_size)))   \
                                                        .apply(tf.data.experimental.unbatch())                                                            \
                                                        .batch(10, drop_remainder = True)
  iterator = train_dataset.make_one_shot_iterator()
  tensor = iterator.get_next()

  with tf.Session() as sess:
    features, target = sess.run(tensor)

    model = Sequential()
    model.add(Embedding(vocab_size, 20, mask_zero = True, input_length = None, name = 'embed'))
    model.add(LSTM(50, return_sequences = False, name = 'lstm1'))
    model.add(Dense(vocab_size, activation = 'softmax', name = 'dense_layer'))
    model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['categorical_accuracy'])                                                         

    model.fit(x = features, y = target,  verbose = 1)
10/10 [==============================] - 1s 76ms/sample - loss: 5.5202 - categorical_accuracy: 0.1000

I guess there is some issue with the way the integer array column is getting read in and converted to tf.Dataset format, but can't see what might be causing this. I assume there must be some issue with this line from the above block:

train_dataset = make_petastorm_dataset(train_reader).map(lambda x: (tf.convert_to_tensor(x.feature_vec), tf.one_hot(x.target, depth = vocab_size)))   \
                                                        .apply(tf.data.experimental.unbatch())                                                            \
                                                        .batch(10, drop_remainder = True)

I'm running databricks runtime ML 6.2

  • tensorflow 1.15.0
  • petastorm 0.8.0
0

There are 0 answers