I'm trying to distribute a large Dask Dataframe across multiple machines for (later) distributed computations on the dataframe. I'm using dask-distributed for this.

All the dask-distributed examples/docs I see are populating the initial data load from a network resource (hdfs, s3, etc) and does not appear to extend the DAG optimization to the load portion (seems to assume that a network load is a necessary evil and just eats the initial cost.) This is underscored on the answer to another question: Does Dask communicate with HDFS to optimize for data locality?

However, I can see cases where we would want this. For example, if we have a sharded database + dask workers co-located on nodes of this DB, we would want to force records from only the local shard to be populated into the local dask workers. From the documentation/examples, network cris-cross seems like a necessarily assumed cost. Is it possible to force parts of a single dataframe to be obtained from specific workers?

The alternative, which I've tried, is to try and force each worker to run a function (iteratively submitted to each worker) where the function loads only the data local to that machine/shard. This works, and I have a bunch of optimally local dataframes with the same column schema -- however -- now I don't have a single dataframe but n dataframes. Is it possible to merge/fuse dataframes across multiple machines so there is a single dataframe reference, but portions have affinity (within reason, as decided by the task DAG) to specific machines?

3 Answers

3
mdurant On Best Solutions

You can produce dask "collections" such as a dataframe from futures and delayed objects, which inter-operate nicely with each other.

For each partition, where you know which machine should load it, you can produce a future as follows:

f = c.submit(make_part_function, args, workers={'my.worker.ip'})

where c is the dask client and the address is the machine you'd want to see it happen on. You can also give allow_other_workers=True is this is a preference rather than a requirement.

To make a dataframe, from a list of such futures, you could do

df = dd.from_delayed([dask.delayed(f) for f in futures])

and ideally provide a meta=, giving a description of the expected dataframe. Now, further operations on a given partition will prefer to be scheduled on the same worker which already holds the data.

1
jpjenk On

I am also interested in having the capability to restrict computation to a specific node (and data localized to that node). I have tried to implement the above with a simple script (see below) but looking at the resulting data frame, results the error (from dask/dataframe/utils.py::check_meta()):

ValueError: Metadata mismatch found in `from_delayed`.

Expected partition of type `DataFrame` but got `DataFrame`

Example:

from dask.distributed import Client
import dask.dataframe as dd
import dask

client = Client(address='<scheduler_ip>:8786')
client.restart()

filename_1 = 'http://samplecsvs.s3.amazonaws.com/Sacramentorealestatetransactions.csv'
filename_2 = 'http://samplecsvs.s3.amazonaws.com/SalesJan2009.csv'

future_1 = client.submit(dd.read_csv, filename_1, workers='w1')
future_2 = client.submit(dd.read_csv, filename_2, workers='w2')

client.has_what()
# Returns: {'tcp://<w1_ip>:41942': ('read_csv-c08b231bb22718946756cf46b2e0f5a1',),
#           'tcp://<w2_ip>:41942': ('read_csv-e27881faa0f641e3550a8d28f8d0e11d',)}

df = dd.from_delayed([dask.delayed(f) for f in [future_1, future_2]])

type(df)
# Returns: dask.dataframe.core.DataFrame

df.head()
# Returns:
#      ValueError: Metadata mismatch found in `from_delayed`.
#      Expected partition of type `DataFrame` but got `DataFrame`

Note The dask environment has a two worker nodes (aliased to w1 and w2) a scheduler node and the script is running on an external host. dask==1.2.2, distributed==1.28.1

0
MRocklin On

It is odd to call many dask dataframe functions in parallel. Perhaps you meant to call many Pandas read_csv calls in parallel instead?

# future_1 = client.submit(dd.read_csv, filename_1, workers='w1')
# future_2 = client.submit(dd.read_csv, filename_2, workers='w2')
future_1 = client.submit(pandas.read_csv, filename_1, workers='w1')
future_2 = client.submit(pandas.read_csv, filename_2, workers='w2')

See https://docs.dask.org/en/latest/delayed-best-practices.html#don-t-call-dask-delayed-on-other-dask-collections for more information