I try to use JdbcIO read 100 million lines data from spark, but the spark task run failed. The exception is
java.sql.SQLException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 33.0 failed 4 times, most recent failure: Lost task 0.3 in stage 33.0 (TID 95, zdh-5, executor 1): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 24315756. To avoid this, increase spark.kryoserializer.buffer.max value.
Here is my java code:
private PCollection<Object[]> getSparkData(PBegin pinput) {
LOG.info("====SparkIO GetSourceRDD begin .\ndatabase:" + this.database + "\nuser:" + this.user + "\nsql:" + this.sql + "\nurl:" + this.url);
String newUrl = SparkInfo.url + database;
String domainName = KerberosAction.getDomainName();
if (KerberosAction.getIsSparkOpen()) {
String auth = "auth=kerberos";
newUrl = newUrl + ";principal=" + KerberosAction.getPrincipal() + ";" + auth;
}
LOG.info("url: {}, user: {}, pwd: {}", newUrl, user, password);
return pinput.apply(JdbcIO.<Object[]>read()
.withDataSourceProviderFn(
new MyDataSourceProviderFn(
newUrl,
KerberosAction.driverName,
user,
password,
KerberosAction.getIsSparkOpen(),
domainName
)
)
.withFetchSize(10000)
.withQuery(sql)
.withRowMapper(buildRowMapper())
.withCoder(ObjectArrayJsonCoder.of())
);
}
My sql is select * from my_table
.
Does the meaning of the withFetchSize method imply reading a specified number of data from the database into memory each time? Then why does a Buffer overflow error occur? Or does JdbcIO cache all one hundred million records in the buffer? My spark.kryoserializer.buffer.max value is 2G.
Is there any way can i read 100 million lines data to PCollection<Object[]> if JdbcIO can not read such a large amount of data?
I do not want to modify spark.kryoserializer.buffer.max
value, i guess it isn't a good way to solve my question. Is there any other way?
I don't quite understand the process of data reading with JdbcIO. Does it support reading a large amount of data? If it doesn't, do I have any other better alternatives to read the data?