pysftp hangs while uploading files in parallel using ThreadExecutorPool

25 views Asked by At

I am facing issues in uploading the SparkDataframe's data to SFTP server.

I am using a function get_connection() to get the pysftp connection.

def getConnection(self, cnx_details):
    cnopts = pysftp.CnOpts()
    cnopts.hostkeys = None  # Disable host key verification
    sftp = pysftp.Connection(**cnx_details, cnopts=cnopts)
    return sftp

I have a backend metadata table where the extract queries and their respective filenames are available. The data from the extract queries will be saved on sftp server in these filenames. So there are 100's of extract queries available in my backend table. so I implemented ThreadPoolExecutor with max_wrks=5, which means 5 extracts will fire at a time. The ThreadPoolExecutor is calling the below upload_files function in a parallel fashion.

def upload_files(self, sftp_conn, spark_df, abs_path_of_target_file):
    try:
        pdf = spark_df.toPandas()
        with sftp_conn.open(abs_path_of_target_file, 'w+', 32768) as f:
            chunksize = 100
            with tqdm(total=len(pdf)) as progbar:
                pdf.to_csv(f, sep='~', index=False, chunksize=chunksize)
                progbar.update(chunksize)
    except Exception as e:
        raise Exception(traceback.format_exc())

sftp_conn is the connection received from get_connection() spark_df is the output of the extract query. abs_path_of_target_file is the fully qualified SFTP path along with the filename where the file has to be created.

Note: Through out only one sftp_conn is being passed to all parallel tasks

when I execute the job, ThreadPoolExecutor fires 5 parallel tasks and fires the remaining as and when the slots are available. At any given point, only 5 parallel tasks are running. the issue is the file seems to be created but the job hangs forever. Some tasks runs successfully and the file contains the data. Some tasks never move ahead.

I even tried the below logic in the upload_files function. But there is no change.

filelikeobj = BytesIO(bytes(spark_df.toPandas().to_csv(index=False), 'ascii'))
filelikeobj.seek(0)
conn.putfo(filelikeobj, abs_path_of_target_file)

I am not sure if a single pysftp session is being jammed. If so please suggest the optimal way to upload the data.

I need help in uploading the data to sftp server without having store the data returned from the query into a local file.

Note: None of the query returns huge data. They only return a handful of records. The database where the query is being hit, is always idle. So the question of busy server will not arise. Python Version is 3.11, pysftp version is 0.2.9 and paramiko
Version is 3.1.0

0

There are 0 answers