I would like to be able to process very large files in Polars without running out of memory. In the documentation they suggest using scanning, lazyframes and sinks, but it is hard to find proper documentation of how to do this in practice. Hopefully some experts on here can help.
Here I provide an example of what works for "smaller" files that can be handled in memory.
1. Setup
# Imports
import pandas as pd
import polars as pl
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow._hdfs import HadoopFileSystem
# Setting up HDFS file system
hdfs_filesystem = HDFSConnection('default')
hdfs_out_path_1 = "scanexample.parquet"
hdfs_out_path_2 = "scanexample2.parquet"
2. Creating data
# Dataset
df = pd.DataFrame({
'A': np.arange(10000),
'B': np.arange(10000),
'C': np.arange(10000),
'D': np.arange(10000),
})
# Writing to Hadoop
pq_table = pa.Table.from_pandas(df)
pq_writer = pq.ParquetWriter(hdfs_out_path_1,
schema=pq_table.schema,
filesystem=hdfs_filesystem)
# Appending to parquet file
pq_writer.write_table(pq_table)
pq_writer.close()
3. Reading parquet into polars dataframe (in memory)
# Read file
pq_df = pl.read_parquet(source=hdfs_out_path_1,
use_pyarrow=True,
pyarrow_options={"filesystem": hdfs_filesystem})
4. Making transforms and writing to file
# Transforms and write
pq_df.filter(pl.col('A')>9000)\
.write_parquet(file = hdfs_out_path_2, use_pyarrow=True, pyarrow_options={"filesystem": hdfs_filesystem})
5. Now doing the same with low memory
# Scanning file: Attempt 1
scan_df = pl.scan_parquet(source = hdfs_out_path_2)
ERROR: Cannot find file
# Scanning file: Attempt 2
scan_df = pl.scan_parquet(source = hdfs_filesystem.open_input_stream(hdfs_out_path_1))
ERROR: expected str, bytes or os.PathLike object, not pyarrow.lib.NativeFile
According to the polars documentation the scan_parquet does not take pyarrow arguments. But it talks about taking some "storage options", which I guess is what I need to use. But how?
6. Example without Hadoop
# Writing to parquet
df.to_parquet(path="testlocal.parquet")
# Read lazily
lazy_df = pl.scan_parquet(source="testlocal.parquet")
# Transforms and write
lazy_df.filter(pl.col('A')>9000).sink_parquet(path= "testlocal.out.parquet")
UPDATE!
While the accepted answer lets you load your data into a LazyFrame that lazyframe comes with limited functionality, as it cannot sink that data to a file without first collecting it all to the memory!
# Reading into LazyFrame
import pyarrow.dataset as ds
pq_lf = pl.scan_pyarrow_dataset(
ds.dataset(hdfs_out_path_1,
filesystem= hdfs_filesystem))
# Attempt at sinking to parquet
pq_lf.filter(pl.col('A')>9000).sink_parquet(path= "testlocal.out.parquet")
PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'
Bad news first, there's not a way to sink to a filesystem directly. You'd have to sink to local storage and then copy if you want to use
sink_parquet. It looks like that will change soon with this PR but that isn't in the current release.For scanning (ie read-only) you have two options.
First option: Use
scan_pyarrow_datasetThat might look like this
and now you have a lazy frame. That's probably the best way as you're already using the pyarrow.fs which seems to be independent of fsspec which is how polars accesses cloud files.
Second Option: use
scan_parquet's storage_options usesfsspec.openunder the hood. That usage might look like thisWhat your
storage_optionslook like relative to what you do currently inhdfs_filesystem = HDFSConnection('default')is going to be determined by your HDFS provider and/or how fsspec interacts with HDFS systems so if it doesn't just work you'd have to figure that out separately.Quasi sinking workaround
You can do something like this
Pyarrow "sinking" (no polars)
polars is primarily focused on being an in-memory dataframe library. It has some support for doing some things out of memory but it can't do everything. Apparently, sink_parquet is not yet capable of streaming results from upstream filesystems. pyarrow has more streaming support even with remote non-local filesystems. For instance you can do this...
The path needs to be a directory and then it will create files in the directory named as "part-{x}.parquet". Whether or not it'll create a single file or multiple is dependent on your data. See this for more info and tweaks. The good news about the above is that you can pass
filesystem=hdfs_filesystemtods.write_datasetand avoid touching the local filesystem. Of course then you're limited to the operations that pyarrow can perform so you've just got to pick your poison.Side notes:
Statistics: When polars writes parquet files, even through pyarrow, it does so with statistics off which will preclude future read optimizations with row groups so you need to explicitly specify
statistics=Trueif you want statistics.Compression: pyarrow uses snappy compression by default but polars uses zstd. I think, at this point, pyarrow's default is just one of those things that has been the default for so long that they can't change it. I say that because zstd seems to be supported by everything I've tried to use and produces smaller files than snappy without really sacrificing performance so if you're using a pyarrow writer then I'd set
compression='zstd'.Row Groups: When you use the ParquetWriter like above, then each call to
write_tablewill produce a distinct row_group in the resulting file so you can really optimize future reads by having a chunking strategy here that aligns with how you'll read the files later (so long as you turn on statistics).