How does LocalCluster() affect the number of tasks?

93 views Asked by At

Do the calculations (like dask method dd.merge) need to be done inside or outside the LocalCluster? Do final calculations (like .compute) need to be done inside or outside the LocalCluster?

My main question is - how does LocalCluster() affect the number of tasks?

I and my colleague noticed that placing dd.merge outside of LocalCLuster() downgraded the number of tasks significantly (like 10x or smth like that). What is the reason for that?

pseudo example

many tasks:

dd.read_parquet(somewhere, index=False)

with LocalCluster(
        n_workers=8,
        processes=True,
        threads_per_worker=1,
        memory_limit="10GB",
        ip="tcp://localhost:9895",
    ) as cluster, Client(cluster) as client:
 dd.merge(smth)
 smth..to_parquet(
            somewhere, engine="fastparquet", compression="snappy"
        )

few tasks:

dd.read_parquet(somewhere, index=False)
dd.merge(smth)

with LocalCluster(
        n_workers=8,
        processes=True,
        threads_per_worker=1,
        memory_limit="10GB",
        ip="tcp://localhost:9895",
    ) as cluster, Client(cluster) as client:
 
 smth..to_parquet(
            somewhere, engine="fastparquet", compression="snappy"
        )
1

There are 1 answers

0
Brian Larsen On

The performance difference is due to the difference in the schedulers being used.

According the the dask docs:

The dask collections each have a default scheduler

dask.dataframe use the threaded scheduler by default

The default scheduler is what is used when there is not another scheduler registered.

Additionally, according to the dask distributed docs:

When we create a Client object it registers itself as the default Dask scheduler. All .compute() methods will automatically start using the distributed system.

So when operating within the context manager for the cluster, computations implicitly use that scheduler.

A couple of additional notes: It may be the case that the default scheduler is using more threads than the local cluster you are defining. It is also possible that a significant difference in performance is due to the overhead of inter-process communication that is not incurred by the threaded scheduler. More information about the schedulers is available here.