Read Data from Postgres in parallel using spark for a table without integer primary key column

995 views Asked by At

I am working to read the data from the PostGres table containing 102 million records for the specific quarter. The table has data for multiple quarters. Right now I am reading the data through spark JDBC connector and it is taking too much time to get the data. It almost takes 15-20 minutes to load the data when I perform action on the Data frame(like Count()).Data is getting loaded or getting processed on single task, so I want to process/read data in parallel.

I am using below code to get the data and create connection :

import java.util.Properties
//function to get the connection properties
def getDbConnectionProperties(environment:Int) : Properties = {

val connectionProps = new Properties()
connectionProps.setProperty("user", user)
 connectionProps.setProperty("password", password )
connectionProps.setProperty("driver", "org.postgresql.Driver")
connectionProps.setProperty("stringtype", "unspecified")  //to save the records with the UUID type which are string in dataframe schema
connectionProps
}
val jdbcurl= "jdbc:postgresql://xxxxx : 5432/test"
val connectionString = jdbcurl;
val connectionProps = getDbConnectionProperties(environment)
val readPGSqlData =  spark.read.jdbc(connectionString,_:String,connectionProps)
val query = s"""(select Column_names from TableName where Period= "2020Q1") a"""
val PGExistingRecords = readPGSqlData(existingRecordsQuery)
PGExistingRecords.count()  //takes 15-20 minutes

I know we can read data in parallel if you specify the partition column and specify the lower bound and upper bound and the partition column needs to be integer but in my case I don't have any column that is of type integer. Also the primary key is of type GUID.

Any way by which I can read the data faster or read data on parallel tasks would be helpful for me. Any suggestion on if I can use any third party that has the functionality or any way I can do that using Native JDBC connector.

1

There are 1 answers

0
falcon-le0 On

For a GUID type you can split data by first character:

val tableData =
  List("0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F")
    .map{prefix =>
      val sql = "(select * from table_name where guid_col like '%s%s') t".format(prefix, "%")
      spark.read.jdbc(url = url, table = sql, properties = connectionProps)
    }
    .reduce(_.union(_))

Performance note: For best performance you should have a clustered index on GUID column or non-clustered which includes all other columns. Thus, all read threads will use Index Seek and sequential I/O, otherwise it can lead to full table scan for each thread or random I/O which may be slower than reading table in one thread.