I have a simple embarrassingly parallel program that I am successfully running locally on Dask. Yay! Now I want to move it to a cluster and crank up the problem size. In this case, I am using GCP. I have tried it two ways, GCPCluster()
and HelmCluster()
, and each offers a different failure path. (I have successfully instantiated GCE computations before. Hence, we can likely assume that I have all of the security/login credentials solved. Networking is likely a different story.) Here's the main routine:
from dask import delayed
from dask.distributed import Client, wait, as_completed, LocalCluster
from dask_kubernetes import HelmCluster
from dask_cloudprovider.gcp import GCPCluster
from problem.loop import inner_loop
from problem.problemSpec import problemInit
# gRange = 99
gRange = 12
def phase_transition(client: Client):
p = problemInit()
m = p.m
loop = delayed(inner_loop)
loops = [loop(int(m[i])) for i in range(gRange)]
# visualize(loops, filename='delayed_results', format='svg')
futures = client.compute(loops)
wait(futures)
for future, result in as_completed(futures, with_results=True):
print(result)
if __name__ == "__main__":
# with LocalCluster(dashboard_address='localhost:8787') as cluster:
with GCPCluster(projectid='random-words-654321', machine_type='n1-standard-4', n_workers=2) as cluster:
with Client(cluster) as client:
phase_transition(client)
When using GCPCluster()
, the system waits for a response from the scheduler. Here are the log messages:
Launching cluster with the following configuration:
Source Image: projects/ubuntu-os-cloud/global/images/ubuntu-minimal-1804-bionic-v20201014
Docker Image: daskdev/dask:latest
Machine Type: n1-standard-4
Filesytsem Size: 50
Disk Type: pd-standard
N-GPU Type:
Zone: us-east1-c
Creating scheduler instance
dask-837e1ad1-scheduler
Internal IP: 10.142.0.4
External IP: 35.237.42.13
Waiting for scheduler to run at 35.237.42.13:8786
The scheduler
system is up, I can SSH
into it. Looks like some network problem. (BTW, I am running this from PyCharm using a Conda image similar to the one invoked by daskdev/dask:latest
.) Clearly, we are not even beginning to install local code on the cloud.
This is some kind of problem that experience with Dask and GCP will resolve, experience I don't yet have. Hence, allow me to try a different path through the documentation and start a k8s cluster managed by Helm. The only change to my code is:
if __name__ == "__main__":
cluster = HelmCluster(release_name='gke-dask')
with Client(cluster) as client:
phase_transition(client)
This ran much better. It now has problems finding code on my local machine in a subdirectory, problem
. Here are the logs:
Forwarding from 127.0.0.1:65410 -> 8786
Forwarding from [::1]:65410 -> 8786
Handling connection for 65410
Handling connection for 65410
/Users/awd/opt/anaconda3/envs/dask-cvxpy/lib/python3.8/site-packages/distributed/client.py:1140: VersionMismatchWarning: Mismatched versions found
+---------+---------------+---------------+---------------+
| Package | client | scheduler | workers |
+---------+---------------+---------------+---------------+
| blosc | None | 1.9.2 | 1.9.2 |
| lz4 | 3.1.3 | 3.1.1 | 3.1.1 |
| msgpack | 1.0.2 | 1.0.0 | 1.0.0 |
| numpy | 1.20.2 | 1.18.1 | 1.18.1 |
| python | 3.8.8.final.0 | 3.8.0.final.0 | 3.8.0.final.0 |
+---------+---------------+---------------+---------------+
Notes:
- msgpack: Variation is ok, as long as everything is above 0.6
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
Handling connection for 65410
Handling connection for 65410
Handling connection for 65410
Traceback (most recent call last):
File "<input>", line 1, in <module>
File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
pydev_imports.execfile(filename, global_vars, local_vars) # execute the script
File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/Users/awd/Projects/Stats285/ExamplePhaseTransition/main_func.py", line 39, in <module>
phase_transition(client)
File "/Users/awd/Projects/Stats285/ExamplePhaseTransition/main_func.py", line 28, in phase_transition
for future, result in as_completed(futures, with_results=True):
File "/Users/awd/opt/anaconda3/envs/dask-cvxpy/lib/python3.8/site-packages/distributed/client.py", line 4336, in __next__
return self._get_and_raise()
File "/Users/awd/opt/anaconda3/envs/dask-cvxpy/lib/python3.8/site-packages/distributed/client.py", line 4327, in _get_and_raise
raise exc.with_traceback(tb)
File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
ModuleNotFoundError: No module named 'problem'
In practice, I am looking for help with either problem. I have a slight preference for the GCPCluster()
solution.
Same problem with Fargate. It works on local but not on AWS fargate: