Read data from a specific column value in a dask dataframe

54 views Asked by At

Context: I have around 2.000 parquet files in a S3 Bucket. This data has arround 20 columns and one of them is called restaurant_id and I need to retrieve whole data for each restaurant_id. The data of a particular restaurant_id, I must create a json file, it means for each restaurant_id, I will have a json file.

What I did so far: I started using pandas library but I got stuck because after reading the parquet file number 700, I got the error lack of memory. I found on internet, I should use dask library because I can handle more data than pandas library. That is why I implemented the following code:

def read_parquet_files_with_dask(bucket_name: str, 
                       access_key_id: str,
                       secret_access_key: str,
                       prefix: str,
                       columns_to_delete: list) -> DataFrame:

  
  s3_path = "s3://" + bucket_name +"/" + prefix + "*.parquet"
  
  dask_df = dd.read_parquet(s3_path,
                            engine='pyarrow',
                            index='restaurant_id',
                            calculate_divisions=True,
                            storage_options={
                                                'key': access_key_id,
                                                'secret': secret_access_key
                                              })


     print(dask_df)
      dask_df = dask_df.drop(columns_to_delete, axis=1)
       
      return dask_df

def main():
## some stuff before
df = read_parquet_files_with_dask(bucket_name,access_key_id,secret_access_key,prefix,columns_to_delete)  

    unique_ids = df.divisions
    print(unique_ids)
    for group_value in unique_ids:
      group_df = df.get_partition(group_value).compute()

Issue: When I retrieve all the divisions of the dask dataframe, all the values are none. I do not know what I am doing incorrectly here. My idea was to retrieve the divisions from the dask dataframe, it means all the restaurant_ids (ie. 1,2,3,4...) and then with get_partition I retrieve the data for this particular restaurant_id. Finally, I export this data to a json file. How can I achieve this goal?.

Update 2: I forgot to mention, the parquet files are compressed. The extension of the files are XXX.gz.parquet. Could it the reason why I am getting the values as none?. I changed in my s3_path variable with *gz.parquet, but still I got the same issue.

0

There are 0 answers