Pyspark - Parallelize filesystem operations

101 views Asked by At

We have a requirement where we need to process files on ADLS Gen2 using python/pyspark code. The files are in ADLS in dated folders (YYYY-MM-DD).

Steps:

  • The list of files in ADLS are identified using dbutils.
  • We loop through the returned FileInfo list, extract the path alone and append it to the source_file_list.
  • BlackBox - We perform various validations, the result of which is written to the same list. This step basically filters files that have already been processed/errored out. So, the list and contents are same but in lesser count.
  • We then loop through the list and copy the files to a dated folder from ADLS into DBFS.

Issue:

  • When we timed the run, copying 150 odd files of relatively smaller sizes i.e. less than 1 MB each took more than 35 minutes to copy. We expect to have more than 2000 files in the list.

  • We are looking for a way to improve the speed of the copy process.1

Steps Tried Already:

  • Using dbutils inside a foreach block - Unfortunately does not work and fails with the You cannot use dbutils within a spark job error.

  • Mounting ADLS container in Databricks is not allowed as per policy - Outside our control.

I tried to go through the Parallelize filesystem operations link, but it is in scala. When I tried to convert it to PySpark, I keep running into pickling issue and accessing the FileUtil class from within pyspark.

Any reference to a PySpark implementation of the above scala code or any other suggestions would be greatly helpful. Thank you... Cheers,

import os
source_file_path = "abfss://<container_name>@<storage_account_name>/landing/<process_name>/<yyyy-mm-dd>/"
source_abfss_file_list = dbutils.fs.ls(source_file_path)
source_file_list = list()
for cur_file_path in source_abfss_file_list:
    source_file_list.append(cur_file_path.path)

# Assume source_file_list now have 500 file names with path
# We do some filtering on various conditions and remove items from list
# source_file_list at this point has 350 file names in the format 
# "abfss://<container_name>@<storage_account_name>/landing/<process_name>/<yyyy-mm-dd>/file_name.extn"
# Assume Target path in DBFS exists...

target_file_path = "dbfs://FileStore/temp_path/<yyyy-mm-dd>/"

#Below for loop takes 30+mins for copying 150 odd files...

for cur_file_name in source_file_list:
    target_file_name = os.path.basename(cur_file_name)
    target_file_name_path = target_file_path + target_file_name
    dbutils.fs.cp(cur_file_name, target_file_name_path)

Env:

Azure Databricks - Runtime 12.2 LTS ADLS Gen2

1

There are 1 answers

2
DileeprajnarayanThumula On

I have tried the spark.sparkContext.parallelize(source_file_list).foreach(copy_file).

However, I am encountering the ERROR:

PicklingError: Could not serialize object: Exception: You cannot use dbutils within a spark job.

You can improve the speed of the file copy process by parallelizing the copy operations using ThreadPoolExecutor.

Learn more about the Magic of concurrent.futures.

from concurrent.futures import ThreadPoolExecutor
import os
spark.conf.set("fs.azure.account.key.stggen2dilip.dfs.core.windows.net", "<Access Key>")
container_name = "folder02"
folder_path = "new2/2024-02-05"
adls_uri = f"abfss://{container_name}@stggen2dilip.dfs.core.windows.net/{folder_path}"
source_abfss_file_list = dbutils.fs.ls(adls_uri)
source_file_list = []
for file_info in source_abfss_file_list:
    file_path = file_info.path
    source_file_list.append(file_path)
target_file_path = "dbfs:/FileStore/temp_path/2024-02-05/"
def copy_file(cur_file_name):
    target_file_name = os.path.basename(cur_file_name)
    target_file_name_path = target_file_path + target_file_name
    dbutils.fs.cp(cur_file_name, target_file_name_path)
with ThreadPoolExecutor() as executor:
    executor.map(copy_file, source_file_list)

Results:

print(f"Files copied: {source_abfss_file_list} -> {target_file_path}")

enter image description here

enter image description here

In the above code, the ADLS is set with the account key rather than mounting, and the ADLS container name, folder path, and base ADLS URI are defined. The files in the ADLS folder are listed using dbutils, and the FileInfo list is looped through to extract the paths. The target DBFS path is defined, along with a function to copy a file. ThreadPoolExecutor is used to parallelize the copy operation.