I have been building up a GRIB file reading tool using the following code. I have heterogeneous files so I can't use the xarray.open_mfdatasets or anything like that.
def create_open_delays(file_list: List[str],expected_dataset_count:int) -> List[Delayed]:
"""
This function runs through list of files and creates a
list of delayed open commands.
"""
return [
dask.delayed(cfgrib.open_datasets,nout=expected_dataset_count)(file,
backend_kwargs={
"indexpath": ""
},
cache=True) for file in file_list
]
While running the code, I have noticed that running in Thread parallel had a 10X performance decrease compared to completely process parallel (each Dask worker has 1 thread only). I am guessing that this is to do with the GIL, no real surprises to anyone I guess. The DASK documentation does highlight this as an optimization opportunity. Having so many workers has some downsides as they now have a limited memory and it is extra overhead to start all of the workers, not to mention more process communication. Each task takes roughly 10 seconds so I am not concerned about the over head of Dask.delayed.
I have two questions:
- Is there anything that can be done in the underling CFGrib/Eccodes packages that can improve multi-threading performance? From my vague understanding, numpy etc. takes steps in the underlying compiled code to release the GIL?
- Is it possible to leverage the the new asyncIO python functionality in DASK? (I am not making demands of anyone to just develop this instantly, I am just interested to know if something like that exists or is in the works, whether it's a dumb idea)
Thanks.
If you haven't already seen it, I recommend looking at Xarray, at least to see how they handle Grib.