I'm trying to make a horovod torch estimator for a spark pipeline, but I'm getting an error while trying to fit the data and I don't know/understand the cause. I've left the full stack error here, but the final trace is as follows:
[1,0]<stderr>:Traceback (most recent call last):
[1,0]<stderr>: File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
[1,0]<stderr>: return _run_code(code, main_globals, None,[1,0]<stderr>:
[1,0]<stderr>: File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
[1,0]<stderr>: exec(code, run_globals)
[1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/task/mpirun_exec_fn.py", line 52, in <module>
[1,0]<stderr>: main(codec.loads_base64(sys.argv[1]), codec.loads_base64(sys.argv[2]))
[1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/task/mpirun_exec_fn.py", line 45, in main
[1,0]<stderr>: task_exec(driver_addresses, settings, 'OMPI_COMM_WORLD_RANK', 'OMPI_COMM_WORLD_LOCAL_RANK')
[1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/task/__init__.py", line 61, in task_exec
[1,0]<stderr>: result = fn(*args, **kwargs)
[1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/torch/remote.py", line 432, in train
[1,0]<stderr>: 'train': _train(epoch)
[1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/torch/remote.py", line 373, in _train
[1,0]<stderr>: inputs, labels, sample_weights = prepare_batch(row)
[1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/torch/remote.py", line 306, in prepare_batch
[1,0]<stderr>: for col, shape in zip(feature_columns, input_shapes)]
[1,0]<stderr>:TypeError: 'NoneType' object is not iterable
Unfortunately, I don't really know how to give a minimal reproducible example, but I'll try to give the most relevant and minimal information possible to understand the context.
I'm working on a google colab notebook.
I created the estimator following the documentation, like so:
import horovod.spark.torch as hvd
from horovod.spark.common.store import DBFSLocalStore
import shutil
import uuid
uuid_str = str(uuid.uuid4())
work_dir = "/dbfs/horovod_spark_estimator/"+uuid_str
from torch import optim
# Setup store for intermediate data
store = DBFSLocalStore(work_dir)
optimizer = optim.Adam(model.parameters(), lr=1.0e-3)
torch_estimator = hvd.TorchEstimator(
store=store,
num_proc=1,
model=model,
optimizer=optimizer,
feature_cols=['Windows'],
label_cols=['Labels'],
verbose=1)
And the model is from this repo, which is a basic LSTM in pytorch with some minor customizations. I also changed the requirements to use the most updated libraries.
The Dataframe fed to the fit function is like so:
+--------------------+------+---+
| Windows|Labels| id|
+--------------------+------+---+
|[0, 0, 0, 0, 1, 2...| 1| 0|
|[0, 0, 0, 1, 2, 2...| 1| 1|
|[0, 0, 1, 2, 2, 2...| 1| 2|
|[0, 1, 2, 2, 2, 2...| 0| 3|
|[1, 2, 2, 2, 2, 2...| 0| 4|
|[2, 2, 2, 2, 2, 2...| 0| 5|
|[2, 2, 2, 2, 2, 2...| 0| 6|
|[2, 2, 2, 2, 2, 2...| 0| 7|
+--------------------+------+---+
Which is an array/vector of 10 ids and 1 label per window.
My attempts to solve this issue
My first hypothesis was that, by looking at the error log, it looks like there is a mismatch between the input data and the model definition. If so, i don't understand how the mismatch could be possible, since the model expects as features the list of ids and a label of 1 or 0 for the whole window (list). Thinking that it might be caused by the column names, I also tried changing them to exactly the name expected when using the normal torch dataloader, but with no success.
The second thing that stood out was that NoneType object, as if horovod isn't taking or finding the correct data or structure to pass the model.
I tried looking on the web for similar situations, but it seems like horovod isn't widely used, so I found little to nothing useful in helping me.
Any help in finding a solution to this would be greatly appreciated, but I'm also open to any alternative to horovod that can integrate a torch model into a spark pipeline or just be fed a spark dataframe.
EDIT 1:
Thanks to @pSoLT I managed to get past the NoneType Error by setting the input shapes in the the constructor of hvd.TorchEstimator (in my case input_shapes=[[-1,10]]
).
Unfortunately, I immediately stumbled upon another one:
[1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/task/__init__.py", line 61, in task_exec
[1,0]<stderr>: [1,0]<stderr>:result = fn(*args, **kwargs)[1,0]<stderr>:
[1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/torch/remote.py", line 432, in train
[1,0]<stderr>: [1,0]<stderr>:'train': _train(epoch)[1,0]<stderr>:
[1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/torch/remote.py", line 374, in _train
[1,0]<stderr>: [1,0]<stderr>:outputs, loss = train_minibatch(model, optimizer, transform_outputs,[1,0]<stderr>:
[1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/torch/remote.py", line 468, in train_minibatch
[1,0]<stderr>: [1,0]<stderr>:loss = loss_fn(outputs, labels, sample_weights)[1,0]<stderr>:
[1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/torch/remote.py", line 351, in loss_fn
[1,0]<stderr>: [1,0]<stderr>:loss = calculate_loss(outputs, labels, loss_weights, loss_fns, sample_weights)[1,0]<stderr>:
[1,0]<stderr>:NameError[1,0]<stderr>:: [1,0]<stderr>:free variable 'loss_fns' referenced before assignment in enclosing scope[1,0]<stderr>:
As I mentioned in the comments, I'll play around more if I have the chance, but I'll try switching to SparkTorch for the immediate future.