This question applies broadly but the specific use case is concatenating all fragmented files from a number of directories.
The crux of the question is optimizing/inspecting parallelism with with how Databricks performs file system operations.
Notes:
Cluster has 128 cores for the driver. 1 worker with 8. Rationale that file system operations don't run on executors so that can be throttled.
All files in an external s3 bucket, not dbfs.
fragmented_files/
entity_1/
fragment_1.csv
...
fragment_n.csv
...
entity_n/
fragment_1.csv
...
fragment_n.csv
merged_files/
entity_1/
merged.csv
...
entity_n/
merged.csv
I have working code, the gist of which is
def concat_files(fragged_dir, new):
with open(new) as nout:
for orig_frag_file in fragged_dir:
with open(orig_frag_file) as o_file:
nout.write(o_file)
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(concat_files, all_directories_with_fragmented_files)
Questions:
For file system operations, or anything that does not give a SparkUI display, how can I verify that I'm actually using all the driver cores? Rather than just queueing everything up to run on 1.
How would ThreadPoolExecutor vs. ProcessPoolExecutor vary here?
Is there an advantage to using the dbutils api vs. regular python?
By using
ThreadPoolExecutor
, the threads, will run in the same process and share the same memory space. In some cases, due to the Global Interpreter Lock (GIL) in CPython, this might not result in as much parallelism as using separate processes withProcessPoolExecutor
, especially for CPU-bound tasks.Fibonacci calculation (ThreadPoolExecutor & ProcessPoolExecutor)
In your specific case of concatenating fragmented files from external S3 storage, the Databricks
dbutils
API doesn't offer significant advantages. Since you're dealing with file system operations on external storage (depends the type of drive format you are using, but in this case, it's s3 buckets), regular Python provides portability and flexibility and can be used both within and outside the Databricks environment. It's well-suited for your task, and you can achieve parallelism using ThreadPoolExecutor (ProcessToolExecutor is recommended for efficiency) as you've demonstrated.