I try to use tensorflow dataset with a lstm model. It is my first project integrating tf dataset, so all advices to do it better is welcome.
Here all my fonctions to get data from many csv and process it to dataset:
def csv_loader(path):
return tf.data.experimental.CsvDataset(
path,
record_defaults=[tf.float32, tf.float32, tf.float32],
header=header,
field_delim=',',
select_cols=[0,1,2,5],
na_value='nan')
def split_feature_label(x, lbl_position, nb_attrib):
output = x[self.input_sequence_length - 1, lbl_position + 1]
# remove output from input set
sub = list(range(x.shape[1]))
sub.pop(lbl_position + 1)
# remove id column used by fonction "filter_mixed_csv_sample"
sub.pop(0)
x = tf.transpose(tf.gather(tf.transpose(x), sub))
return {'input1': x[:self.input_sequence_length, :-nb_attrib],
'input2': x[self.input_sequence_length - 1, -nb_attrib:]}, output
def filter_mixed_csv_sample(x):
# remove samples with mixed csv values
y, idx = tf.unique(x[:, 0])
if len(y) > 1:
return False
return True
def filter_nan_missing_values(x):
# find NaN and reject samples not sure if it always work...
try:
ynan = tf.math.is_nan(x) # return true un ou plusieurs NaN inside sample
return tf.math.logical_not(tf.math.reduce_any(ynan)) # inverse la réponse pour rejeter avec un False si Nan
except:
return False
def to_timeseries(x):
# turn dataset into 3D lstm compatible dataset
x = x.map(lambda *items: tf.stack(items), num_parallel_calls=tf.data.AUTOTUNE)
x = x.window(self.input_sequence_length + self.output_sequence_length, shift=1,
drop_remainder=True)
x = x.flat_map(lambda i: i).batch(self.input_sequence_length + self.output_sequence_length)
return x
def is_test(x, _):
# split dataset into test et trainning dataset
return x % int(self.val_split * 100) == 0
def is_train(x, y):
return not is_test(x, y)
def apply_scaler(x, y):
x1_std = (x['input1'] - x1_scaler.data_min_) / (x1_scaler.data_max_ - x1_scaler.data_min_)
x1_scaled = x1_std * (x1_scaler.feature_range[1] - x1_scaler.feature_range[0]) + x1_scaler.feature_range[0]
x2_std = (x['input2'] - x2_scaler.data_min_) / (x2_scaler.data_max_ - x2_scaler.data_min_)
x2_scaled = x2_std * (x2_scaler.feature_range[1] - x2_scaler.feature_range[0]) + x2_scaler.feature_range[0]
y_std = (y - y_scaler.data_min_) / (y_scaler.data_max_ - y_scaler.data_min_)
y_scaled = y_std * (y_scaler.feature_range[1] - y_scaler.feature_range[0]) + y_scaler.feature_range[0]
return {'input1': x1_scaled, 'input2': x2_scaled}, y_scaled
and how I order these processing:
tf_list = tf.data.Dataset.list_files(list_files, shuffle=True)
dataset = tf_list.interleave(csv_loader, cycle_length=1)
with tf.device('/cpu:0'):
dataset = to_timeseries(self.dataset)
dataset = dataset.ignore_errors()
dataset = dataset.filter(filter_nan_missing_values)
dataset = dataset.filter(filter_mixed_csv_sample)
if shuffle:
dataset = dataset.shuffle(shuffle_buffer)
dataset = dataset.map(lambda x: split_feature_label(x, label_index, nb_attributs), num_parallel_calls=tf.data.AUTOTUNE)
# Split dataset to train/test set
if val_split > 0:
recover = lambda x, y: y
test_set = dataset.enumerate() \
.filter(is_test) \
.map(recover)
trainning_set = dataset.enumerate() \
.filter(is_train) \
.map(recover)
# set-up multi-GPUs config if availlable
if gpu:
strategy = tf.distribute.MirroredStrategy()
batch_size_per_replica = batch_size * strategy.num_replicas_in_sync
else:
batch_size_per_replica = batch_size
if val_split == 0:
dataset = dataset.batch(batch_size_per_replica)
dataset = dataset.cache()
dataset = dataset.prefetch(2)
else:
trainning_set = trainning_set.batch(batch_size_per_replica).cache().prefetch(2)
test_set = test_set.batch(batch_size_per_replica).cache().prefetch(2)
x1_scaler = load('/artefacts/scalers/x1_scaler.sclr')
x2_scaler = load('/artefacts/scalers/x2_scaler.sclr')
y_scaler = load('/artefacts/scalers/y_scaler.sclr')
dataset = dataset.map(apply_scaler, num_parallel_calls=tf.data.AUTOTUNE)
Finnaly, how I start trainning:
if val_split > 0:
history = model.fit(trainning_set, validation_data=test_set,
verbose=verbose,
epochs=epochs,
callbacks=[checkpoint, early_stop],
shuffle=shuffle)
else:
history = model.fit(dataset,
verbose=verbose,
epochs=epochs,
callbacks=[checkpoint, early_stop],
shuffle=shuffle)
These codes was working in an early version, I did some "light" modification like adding multi-GPU optimisation, and re-organized my code to clean not usefull test lines... now I get this error but realy don't understand what change I did to that and why this error... :
Exception has occurred: TypeError
unhashable type: 'list'
File "/home/cy6112/CASTOR-S3/CASTOR-S3-cy6112/Code/timeseries_nn/core.py", line 797, in train
history = model.fit(dataset,
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/cy6112/CASTOR-S3/CASTOR-S3-cy6112/Code/timeseries_nn/workbench.py", line 33, in <module>
NN.train(ds, 200, 10, verbose=2, shuffle=True)
TypeError: unhashable type: 'list'
my dataset is:
<_ParallelMapDataset element_spec=({'input1': TensorSpec(shape=(None, None, 3), dtype=tf.float32, name=None), 'input2': TensorSpec(shape=(None, 13), dtype=tf.float32, name=None)}, TensorSpec(shape=(None,), dtype=tf.float32, name=None))
thank for your help and advice if you see something not did in the good way with dataset
The error you get stems from the
dataset
instance. In your preprocessing, you are somewhere providing a list as a key to be hashed, but a list is not a hashable type. E.g.,I cannot say for sure which value you are erroneously using, but the following line is somewhat suspicious:
I suggest you use
breakpoint()
in your functions to check your variables or see where it breaks.