Dask Memory leakage issue with json and requests

609 views Asked by At

This is just a sample minimal test to reproduce memory leakage issue in remote Dask kubernetes cluster.

def load_geojson(pid):
    import requests
    import io
    r = requests.get("https://github.com/datasets/geo-countries/raw/master/data/countries.geojson")
    temp = r.json()
    import sys
    size_temp = sys.getsizeof(temp)
    del temp
    return size_temp

L_geojson = client.map(load_geojson, range(200))

del L_geojson

Observation: Steady increase in worker memory(Bytes Storage) by approx 30 MB on each run and keeps on increasing until whole memory is used. Another test I tried with urllib, I observed there was a random increase and decrease in memory on each run.

Desired behavior: Memory should be cleaned up after the reference L_geojson is deleted.

Could someone please help with this memory leakage issue?

2

There are 2 answers

2
mdurant On

I can confirm an increase in memory and "full garbage collections took X% CPU time recently" messages,. If I allow the futures to run, memory also increases, but more slowly.

Using fsspec does not have this problem, as you found with urllib, and this is what Dask typically uses for its IO (fsspec switched from requests to using aiohttp for communication).

Your modified function might look like

def load_geojson(pid):
    import fsspec
    import json
    fs = fsspec.filesystem("http"). # or use fsspec.open
    r = fs.cat("https://github.com/datasets/geo-countries/raw/master/data/countries.geojson"). # get bytes
    temp = json.loads(r)
    import sys
    size_temp = sys.getsizeof(temp)
    del temp
    return size_temp

but you still get garbage collection warnings.

0
deeplook On

I've also tried your code with fsspec but could not see any significant change. I'm observing this effect with much simpler code, as shown in the GIF. (It would be helpful to have a simple lineplot widget for some things over time in the Dask JL dashboard extension, instead of dynamic barcharts.)

Dask memory issue GIF

I wonder how much this is an issue in practice for long running clusters and applications? I know you can restart the cluster, but I don't know if this can be done in some smart way, like periodically when no tasks are running and/or are not scheduled yet. I wonder what people recommended?

In fact I've found this "lifetime" option which might work in practice although a runtime solution would be nice, too: Memory clean up of Dask workers. I wonder how this is handled in large cluster installations working on a Tera-/Petabyte scale?