Action Reading multiple LAZ point cloud files to a Dask DataFrame.
Problem
Unzipping LAZ (compressed) to LAS (uncompressed) requires a lot of memory. Varying filesizes and multiple processes created by Dask result in MemoryError
's.
Attempts
I tried limiting the number of workers following the guide, but it does not seem to work.
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=3)
client = Client(cluster)
dfs = [load(file) for file in lasfiles]
df = dd.from_delayed(dfs, meta=meta)
df = df.repartition(npartitions=len(df) // part_size)
df.to_parquet('/raw', compression='GZIP')
Question How to go about loading such large amount of data in a non-standard format?
Example
Following example is my current implementation. It groups all input files per 5 to limit max 5 parallel uncompressing processes. Then repartitions and write to Parquet to enable further processing. To me this implementation seems to totally miss the point of Dask.
from laspy.file import File
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed
@delayed
def load(file):
with File(file.as_posix(), mode='r') as las_data:
las_df = pd.DataFrame(las_data.points['point'], dtype=float)
return las_df
meta = pd.DataFrame(np.empty(0, dtype=[('X',float),('Y',float),('Z',float),('intensity',float),('raw_classification',int)]))
lasfile_dir = Path('/data/las/')
lasfiles = sorted(list(lasfile_dir.glob('*.laz')))
part_size = 5000000
for idx, sublasfiles in enumerate([lasfiles[i:i+5] for i in range(0,len(lasfiles),5)]):
try:
dfs = [load(file) for file in sublasfiles]
df = dd.from_delayed(dfs, meta=meta)
df = df.repartition(npartitions=len(df) // part_size)
df.to_parquet('/data/las/parquet/'+str(idx), compression='GZIP')
Your implementation seems mostly fine to me.
The one thing I would change here is that I would avoid the call to
len(df)
, which will force a computation of the entire dataframe (there is no way to determine the length of the dataframe without reading through all of the files).Just to be clear, Dask will not be able to parallelize within your
load
function (it has no concept of LAZ files), so your parallelism will be limited by the number of files that you have.