Pyspark flatMap iteration generates duplicated API requests

23 views Asked by At

I am using the pyspark flatMap function to call API requests for each record in the dataframe. The dataframe updates as expected, no duplicated records in the dataframe,

But when I checked the server-side logs, it showed duplicated requests. The dataframe has only five(5) records, but I can see 25 API requests in the web server logs.

`def api_call(record): account_id = record['account_id']

datax = []

headers = {
        'Content-Type': 'application/json',
        'Accep': 'application/json'
    }
   
payload = json.dumps({
        "method": "GET",
        "url": "/api/search"
    })

url = f"https://wiremock-dev-na10001-101.io.com/api/search?accountId={account_id}"

response = requests.request("GET", url, headers=headers, data=payload)
datax.append({
                     'account_id': account_id,
                     'response_status' :response.status_code
                     
        })
  
return datax`

`def process_api_flatmap(callback_function, data_set, num_of_partitions=number_of_workers):

#print(f'Number of partitions before: {data_set.getNumPartitions()}')
if num_of_partitions is not None:
    data_set = data_set.repartition(num_of_partitions).rdd
    #data_set = data_set.repartition(1)
    #print(f'Number of partitions after: {data_set.getNumPartitions()}')

res = data_set.**flatMap**(lambda x: callback_function(x))
return res`

The function is called in here.

`rdd_df = process_api_flatmap(api_call,df )

df_new = rdd_df.toDF() df_new = df_new.map(lambda l: Row(**dict(l))).toDF()`

Any idea why it is generating duplicated requests

Thank you.

Tried to iterate dataframe records and add new columns with the result of the API response, which is working as expected However, there are duplicated API request logs in the web server.

The frame has only five(5) records, but I can see 25 API requests in the web server logs.

0

There are 0 answers