Reading LAZ to Dask dataframe using delayed loading

814 views Asked by At

Action Reading multiple LAZ point cloud files to a Dask DataFrame.

Problem Unzipping LAZ (compressed) to LAS (uncompressed) requires a lot of memory. Varying filesizes and multiple processes created by Dask result in MemoryError's.

Attempts

I tried limiting the number of workers following the guide, but it does not seem to work.

from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=3)
client = Client(cluster)

dfs = [load(file) for file in lasfiles]
df = dd.from_delayed(dfs, meta=meta)
df = df.repartition(npartitions=len(df) // part_size)
df.to_parquet('/raw', compression='GZIP')

Question How to go about loading such large amount of data in a non-standard format?

Example

Following example is my current implementation. It groups all input files per 5 to limit max 5 parallel uncompressing processes. Then repartitions and write to Parquet to enable further processing. To me this implementation seems to totally miss the point of Dask.

from laspy.file import File
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed

@delayed
def load(file):
    with File(file.as_posix(), mode='r') as las_data:
        las_df = pd.DataFrame(las_data.points['point'], dtype=float)
        return las_df

meta = pd.DataFrame(np.empty(0, dtype=[('X',float),('Y',float),('Z',float),('intensity',float),('raw_classification',int)]))

lasfile_dir = Path('/data/las/')
lasfiles = sorted(list(lasfile_dir.glob('*.laz')))

part_size = 5000000

for idx, sublasfiles in enumerate([lasfiles[i:i+5] for i in range(0,len(lasfiles),5)]):
    try:
        dfs = [load(file) for file in sublasfiles]
        df = dd.from_delayed(dfs, meta=meta)
        df = df.repartition(npartitions=len(df) // part_size)
        df.to_parquet('/data/las/parquet/'+str(idx), compression='GZIP')
1

There are 1 answers

3
MRocklin On

Your implementation seems mostly fine to me.

The one thing I would change here is that I would avoid the call to len(df), which will force a computation of the entire dataframe (there is no way to determine the length of the dataframe without reading through all of the files).

Just to be clear, Dask will not be able to parallelize within your load function (it has no concept of LAZ files), so your parallelism will be limited by the number of files that you have.