I'm trying to read in data from parquet for a language model.
The parquet contains two columns:
- target (int)
- feature_vec (int array)
I'm adapting the code from this post (Which works for me). When I try the code below I get an InvalidArgumentError when I try to run the model.
import random
from pyspark.sql import Row
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Embedding, LSTM
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import Sequence
from petastorm import make_batch_reader
from petastorm.etl.dataset_metadata import materialize_dataset
import pyarrow.parquet as pq
## build toy dataset
vocab_size = 250
seq_length = 100
parquet_path = '/dbfs/ml/langmod/petastorm/toy_dataset.parquet'
def row_generator(x):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return Row(target = random.randint(0, vocab_size), feature_vec = [random.randint(0, vocab_size) for i in range(seq_length)])
rows_count = 1000
rows_rdd = sc.parallelize(range(rows_count)).map(row_generator)
df = spark.createDataFrame(rows_rdd)
df.write.parquet(parquet_path, mode = 'overwrite')
underscore_files = [f for f in os.listdir(get_local_path(parquet_path)) if f.startswith("_")]
pq.EXCLUDED_PARQUET_PATHS.update(underscore_files)
## build model and read in data from parquet, converting to tf.Dataset format
with make_batch_reader('file:/dbfs/' + parquet_path, num_epochs = None) as train_reader:
train_dataset = make_petastorm_dataset(train_reader).map(lambda x: (tf.convert_to_tensor(x.feature_vec), tf.one_hot(x.target, depth = vocab_size))) \
.apply(tf.data.experimental.unbatch()) \
.batch(10, drop_remainder = True)
model = Sequential()
model.add(Embedding(vocab_size, 20, mask_zero = True, input_length = None, name = 'embed'))
model.add(LSTM(50, return_sequences = False, name = 'lstm1'))
model.add(Dense(vocab_size, activation = 'softmax', name = 'dense_layer'))
model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['categorical_accuracy'])
model.fit(train_dataset, epochs = 2, steps_per_epoch = 10, verbose = 1)
Error:
InvalidArgumentError Traceback (most recent call last)
<command-2202319388737190> in <module>
10 model.add(Dense(vocab_size, activation = 'softmax', name = 'dense_layer'))
11 model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['categorical_accuracy'])
---> 12 model.fit(train_dataset, epochs = 2, steps_per_epoch = 10, verbose = 1)
/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training.py in fit(self, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, max_queue_size, workers, use_multiprocessing, **kwargs)
725 max_queue_size=max_queue_size,
726 workers=workers,
--> 727 use_multiprocessing=use_multiprocessing)
728
729 def evaluate(self,
/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_arrays.py in fit(self, model, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, **kwargs)
673 validation_steps=validation_steps,
674 validation_freq=validation_freq,
--> 675 steps_name='steps_per_epoch')
676
677 def evaluate(self,
/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_arrays.py in model_iteration(model, inputs, targets, sample_weights, batch_size, epochs, verbose, callbacks, val_inputs, val_targets, val_sample_weights, shuffle, initial_epoch, steps_per_epoch, validation_steps, validation_freq, mode, validation_in_fit, prepared_feed_values_from_dataset, steps_name, **kwargs)
298 else:
299 actual_inputs = ins()
--> 300 batch_outs = f(actual_inputs)
301 except errors.OutOfRangeError:
302 if is_dataset:
/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/backend.py in __call__(self, inputs)
3474
3475 fetched = self._callable_fn(*array_vals,
-> 3476 run_metadata=self.run_metadata)
3477 self._call_fetch_callbacks(fetched[-len(self._fetches):])
3478 output_structure = nest.pack_sequence_as(
/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/client/session.py in __call__(self, *args, **kwargs)
1470 ret = tf_session.TF_SessionRunCallable(self._session._session,
1471 self._handle, args,
-> 1472 run_metadata_ptr)
1473 if run_metadata:
1474 proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)
InvalidArgumentError: 2 root error(s) found.
(0) Invalid argument: transpose expects a vector of size 4. But input(1) is a vector of size 3
[[{{node lstm1_3/transpose}}]]
[[lstm1_3/TensorArrayUnstack_1/range/_459]]
(1) Invalid argument: transpose expects a vector of size 4. But input(1) is a vector of size 3
[[{{node lstm1_3/transpose}}]]
0 successful operations.
0 derived errors ignored.
This error is surprising since it seems to say there is a problem with the shape of the middle layer of the model, which should just fit the shape of the output of the previous layer.
However, if I convert the dataset to an iterator and then run the output X and Ys separately, it runs as expected for that batch:
with make_batch_reader('file:/dbfs/' + parquet_path, num_epochs = None) as train_reader:
train_dataset = make_petastorm_dataset(train_reader).map(lambda x: (tf.convert_to_tensor(x.feature_vec), tf.one_hot(x.target, depth = vocab_size))) \
.apply(tf.data.experimental.unbatch()) \
.batch(10, drop_remainder = True)
iterator = train_dataset.make_one_shot_iterator()
tensor = iterator.get_next()
with tf.Session() as sess:
features, target = sess.run(tensor)
model = Sequential()
model.add(Embedding(vocab_size, 20, mask_zero = True, input_length = None, name = 'embed'))
model.add(LSTM(50, return_sequences = False, name = 'lstm1'))
model.add(Dense(vocab_size, activation = 'softmax', name = 'dense_layer'))
model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['categorical_accuracy'])
model.fit(x = features, y = target, verbose = 1)
10/10 [==============================] - 1s 76ms/sample - loss: 5.5202 - categorical_accuracy: 0.1000
I guess there is some issue with the way the integer array column is getting read in and converted to tf.Dataset format, but can't see what might be causing this. I assume there must be some issue with this line from the above block:
train_dataset = make_petastorm_dataset(train_reader).map(lambda x: (tf.convert_to_tensor(x.feature_vec), tf.one_hot(x.target, depth = vocab_size))) \
.apply(tf.data.experimental.unbatch()) \
.batch(10, drop_remainder = True)
I'm running databricks runtime ML 6.2
- tensorflow 1.15.0
- petastorm 0.8.0