How do Dask bag partitions and workers correlate?

263 views Asked by At

I'm using a vanilla Dask-Kubernetes setup with two workers and one scheduler to iterate over the lines of some JSON file (and apply some functions which don't appear here for simplicity). I see only one worker ever working, where I'd expect to see two of them, instead.

Hoping that repartitioning would help I've experimented with different values for bag.repartition(num) which return different numbers of lines, but they don't change anything about the worker imbalance and memory consumption concentrating only on one worker.

I think I don't understand the correlation between partitions and workers, and I could not find anything in the Dask documentation about it. Any help or pointers are highly welcome!

import dask.bag as db

def grep_buildings():
    base = "https://usbuildingdata.blob.core.windows.net/usbuildings-v1-1/"
    b = db.text.read_text(f"{base}/Alabama.zip")
    # b = b.repartition(2)
    lines = b.take(3_000_000)
    return lines

len(grep_buildings())
1

There are 1 answers

0
mdurant On

In your example, you are opening on file, and it is compressed

db.text.read_text(f"{base}/Alabama.zip")

Dask is able to open and process multiple files in parallel, with at least one partition per file. Dask is also able to split a single file into chunks (the blocksize parameter); but this only works for uncompressed data. The reason is, that for whole-file compression methods, the only way to get to some point in the uncompressed stream is to read from the start, so every partition would read most of the data.

Finally, repartition doesn't help you when you start with a single partition: you need to read that whole file before splitting the data into pieces for downstream tasks.