I'm using a vanilla Dask-Kubernetes setup with two workers and one scheduler to iterate over the lines of some JSON file (and apply some functions which don't appear here for simplicity). I see only one worker ever working, where I'd expect to see two of them, instead.
Hoping that repartitioning would help I've experimented with different values for bag.repartition(num)
which return different numbers of lines, but they don't change anything about the worker imbalance and memory consumption concentrating only on one worker.
I think I don't understand the correlation between partitions and workers, and I could not find anything in the Dask documentation about it. Any help or pointers are highly welcome!
import dask.bag as db
def grep_buildings():
base = "https://usbuildingdata.blob.core.windows.net/usbuildings-v1-1/"
b = db.text.read_text(f"{base}/Alabama.zip")
# b = b.repartition(2)
lines = b.take(3_000_000)
return lines
len(grep_buildings())
In your example, you are opening on file, and it is compressed
Dask is able to open and process multiple files in parallel, with at least one partition per file. Dask is also able to split a single file into chunks (the
blocksize
parameter); but this only works for uncompressed data. The reason is, that for whole-file compression methods, the only way to get to some point in the uncompressed stream is to read from the start, so every partition would read most of the data.Finally, repartition doesn't help you when you start with a single partition: you need to read that whole file before splitting the data into pieces for downstream tasks.