Convert grib to netcdf file

280 views Asked by At

I tried to convert grib to nc file using xarray wit this code:

import xarray

data = xarray.open_dataset('E:/Thesis Dataset/Forecast/2017/dis_fore_012017.grib', engine='cfgrib') 
data.to_netcdf('E:/Thesis Dataset/Forecast/2017/dis_fore_012017.nc')

I have output like this:

MemoryError                               Traceback (most recent call last)
Cell In[14], line 1
----> 1 data.to_netcdf('E:/Thesis Dataset/Forecast/2017/dis_fore_012017.nc')

File ~\anaconda3\lib\site-packages\xarray\core\dataset.py:1901, in Dataset.to_netcdf(self, path, mode, format, group, engine, encoding, unlimited_dims, compute, invalid_netcdf)
   1898     encoding = {}
   1899 from ..backends.api import to_netcdf
-> 1901 return to_netcdf(
   1902     self,
   1903     path,
   1904     mode,
   1905     format=format,
   1906     group=group,
   1907     engine=engine,
   1908     encoding=encoding,
   1909     unlimited_dims=unlimited_dims,
   1910     compute=compute,
   1911     invalid_netcdf=invalid_netcdf,
   1912 )

File ~\anaconda3\lib\site-packages\xarray\backends\api.py:1081, in to_netcdf(dataset, path_or_file, mode, format, group, engine, encoding, unlimited_dims, compute, multifile, invalid_netcdf)
   1078 if multifile:
   1079     return writer, store
-> 1081 writes = writer.sync(compute=compute)
   1083 if path_or_file is None:
   1084     store.sync()

File ~\anaconda3\lib\site-packages\xarray\backends\common.py:166, in ArrayWriter.sync(self, compute)
    160 import dask.array as da
    162 # TODO: consider wrapping targets with dask.delayed, if this makes
    163 # for any discernible difference in perforance, e.g.,
    164 # targets = [dask.delayed(t) for t in self.targets]
--> 166 delayed_store = da.store(
    167     self.sources,
    168     self.targets,
    169     lock=self.lock,
    170     compute=compute,
    171     flush=True,
    172     regions=self.regions,
    173 )
    174 self.sources = []
    175 self.targets = []

File ~\anaconda3\lib\site-packages\dask\array\core.py:981, in store(sources, targets, lock, regions, compute, return_stored, **kwargs)
    978 result = Delayed(name, dsk)
    980 if compute:
--> 981     result.compute(**kwargs)
    982     return None
    983 else:

File ~\anaconda3\lib\site-packages\dask\base.py:167, in DaskMethodsMixin.compute(self, **kwargs)
    143 def compute(self, **kwargs):
    144     """Compute this dask collection
    145 
    146     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    165     dask.base.compute
    166     """
--> 167     (result,) = compute(self, traverse=False, **kwargs)
    168     return result

File ~\anaconda3\lib\site-packages\dask\base.py:452, in compute(*args, **kwargs)
    449     keys.append(x.__dask_keys__())
    450     postcomputes.append(x.__dask_postcompute__())
--> 452 results = schedule(dsk, keys, **kwargs)
    453 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~\anaconda3\lib\site-packages\dask\threaded.py:76, in get(dsk, result, cache, num_workers, pool, **kwargs)
     73             atexit.register(pool.close)
     74             pools[thread][num_workers] = pool
---> 76 results = get_async(
     77     pool.apply_async,
     78     len(pool._pool),
     79     dsk,
     80     result,
     81     cache=cache,
     82     get_id=_thread_get_id,
     83     pack_exception=pack_exception,
     84     **kwargs
     85 )
     87 # Cleanup pools associated to dead threads
     88 with pools_lock:

File ~\anaconda3\lib\site-packages\dask\local.py:486, in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    484         _execute_task(task, data)  # Re-execute locally
    485     else:
--> 486         raise_exception(exc, tb)
    487 res, worker_id = loads(res_info)
    488 state["cache"][key] = res

File ~\anaconda3\lib\site-packages\dask\local.py:316, in reraise(exc, tb)
    314 if exc.__traceback__ is not tb:
    315     raise exc.with_traceback(tb)
--> 316 raise exc

File ~\anaconda3\lib\site-packages\dask\local.py:222, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220 try:
    221     task, data = loads(task_info)
--> 222     result = _execute_task(task, data)
    223     id = get_id()
    224     result = dumps((result, id))

File ~\anaconda3\lib\site-packages\dask\core.py:121, in _execute_task(arg, cache, dsk)
    117     func, args = arg[0], arg[1:]
    118     # Note: Don't assign the subtask results to a variable. numpy detects
    119     # temporaries by their reference count and can execute certain
    120     # operations in-place.
--> 121     return func(*(_execute_task(a, cache) for a in args))
    122 elif not ishashable(arg):
    123     return arg

File ~\anaconda3\lib\site-packages\dask\array\core.py:102, in getter(a, b, asarray, lock)
    100     c = a[b]
    101     if asarray:
--> 102         c = np.asarray(c)
    103 finally:
    104     if lock:

File ~\anaconda3\lib\site-packages\xarray\core\indexing.py:358, in ImplicitToExplicitIndexingAdapter.__array__(self, dtype)
    357 def __array__(self, dtype=None):
--> 358     return np.asarray(self.array, dtype=dtype)

File ~\anaconda3\lib\site-packages\xarray\core\indexing.py:522, in CopyOnWriteArray.__array__(self, dtype)
    521 def __array__(self, dtype=None):
--> 522     return np.asarray(self.array, dtype=dtype)

File ~\anaconda3\lib\site-packages\xarray\core\indexing.py:423, in LazilyIndexedArray.__array__(self, dtype)
    421 def __array__(self, dtype=None):
    422     array = as_indexable(self.array)
--> 423     return np.asarray(array[self.key], dtype=None)

File ~\anaconda3\lib\site-packages\cfgrib\xarray_plugin.py:156, in CfGribArrayWrapper.__getitem__(self, key)
    152 def __getitem__(
    153     self,
    154     key: xr.core.indexing.ExplicitIndexer,
    155 ) -> np.ndarray:
--> 156     return xr.core.indexing.explicit_indexing_adapter(
    157         key, self.shape, xr.core.indexing.IndexingSupport.BASIC, self._getitem
    158     )

File ~\anaconda3\lib\site-packages\xarray\core\indexing.py:712, in explicit_indexing_adapter(key, shape, indexing_support, raw_indexing_method)
    690 """Support explicit indexing by delegating to a raw indexing method.
    691 
    692 Outer and/or vectorized indexers are supported by indexing a second time
   (...)
    709 Indexing result, in the form of a duck numpy-array.
    710 """
    711 raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
--> 712 result = raw_indexing_method(raw_key.tuple)
    713 if numpy_indices.tuple:
    714     # index the loaded np.ndarray
    715     result = NumpyIndexingAdapter(np.asarray(result))[numpy_indices]

File ~\anaconda3\lib\site-packages\cfgrib\xarray_plugin.py:165, in CfGribArrayWrapper._getitem(self, key)
    160 def _getitem(
    161     self,
    162     key: T.Tuple[T.Any, ...],
    163 ) -> np.ndarray:
    164     with self.datastore.lock:
--> 165         return self.array[key]

File ~\anaconda3\lib\site-packages\cfgrib\dataset.py:347, in OnDiskArray.__getitem__(self, item)
    345 header_item = [{ix: i for i, ix in enumerate(it)} for it in header_item_list]
    346 array_field_shape = tuple(len(i) for i in header_item_list) + self.shape[-self.geo_ndim :]
--> 347 array_field = np.full(array_field_shape, fill_value=np.nan, dtype="float32")
    348 for header_indexes, message_ids in self.field_id_index.items():
    349     try:

File ~\anaconda3\lib\site-packages\numpy\core\numeric.py:343, in full(shape, fill_value, dtype, order, like)
    341     fill_value = asarray(fill_value)
    342     dtype = fill_value.dtype
--> 343 a = empty(shape, dtype, order)
    344 multiarray.copyto(a, fill_value, casting='unsafe')
    345 return a

MemoryError: Unable to allocate 38.8 GiB for an array with shape (51, 215, 950, 1000) and data type float32

My computer have 16 GB RAM, and when I check the performance there is still memory available. Also my python already for 64 bit. The data is EFAS river dicharge data from copernicus. Thanks for your help.

1

There are 1 answers

8
Michael Delgado On

Your data might be 12.5 GB on disk, where it is compressed, but a (51, 215, 950, 1000) array of float32s is 51 * 215 * 950 * 1000 * 4 bytes/float32 / 1024^3 = 39GB in memory.

Try processing the data in chunks, eg by passing chunks={dimname: chunksize} to open_dataset, eg:


data = xarray.open_dataset('E:/Thesis Dataset/Forecast/2017/dis_fore_012017.grib', engine='cfgrib', chunks={"time": 10})
data.to_netcdf('E:/Thesis Dataset/Forecast/2017/dis_fore_012017.nc')

Or whatever your dimensions are.