I have a very huge data stored in the form of parquet files. There are multiples file each of around 40-50 MB. Each file contains same number of column as well as the column name and data type.
I found that there are duplicate data present in the files. I want to remove those duplicate data and persist the data in the same file.
I want to write a script using python to get input from the list of key value pair and then perform the function of removing duplicate data.
eg:-
parquet file contains column named and data :-
Name, Class, Age, DOB
Ram, 8, 14, 22-01-1999
Ram, 8, 14, 22-01-1999
Shyam, 9, 16,22-01-1997
Shyam, 9, 16,22-01-1997
This is a sample data so similarly we have around 5000 parquet files and each parquet files have around 6000 rows with duplicate data.
the parquet files are stored in the s3 paths, and path contains the name and age in it Path:= s3://bucket-name/Name/age/date/
each and every path contains around 7 files because of partitions applied.
I have tried pandas to read the data and then apply the dedup on it but it is taking a lot of time to dedup it, It is harming the performance of the system.
I will be providing the list of name and age in Key value pair map.
I have tried this code so far but it is taking a lot of time to complete.
import pandas as pd
import pyspark.sql.functions as f
data_files_path = "Name/age/date/"
df = pd.concat((pd.read_parquet(f, engine='pyarrow') for f in data_files_path))
df = df.groupBy().agg(f.count("*").alias('count')).where('count = 1').drop('count')
It is taking alot of time to complete for one folder path. Please help me to make it efficient.
currently using pyspark.
Using this code now :
def main():
# start spark code
spark = SparkSession.builder.appName(AppName+"_"+str(dt_string)).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
logger.info("Starting spark application")
data_files_path = "sample/sample_data/"
logger.info("Reading parquet Files ")
df_category = spark.read.parquet(data_files_path).distinct()
logger.info("READING parquet files finished")
logger.info("Previewing Parquet File Data")
df_category.show(20, truncate=False)
Can it also be done in Scala spark script. I am open to any suggestions. Which works for me.