How to use JdbcIO to read a large amount of data from sparksql?

37 views Asked by At

I try to use JdbcIO read 100 million lines data from spark, but the spark task run failed. The exception is

java.sql.SQLException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 33.0 failed 4 times, most recent failure: Lost task 0.3 in stage 33.0 (TID 95, zdh-5, executor 1): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 24315756. To avoid this, increase spark.kryoserializer.buffer.max value.

Here is my java code:

private PCollection<Object[]> getSparkData(PBegin pinput) {
            LOG.info("====SparkIO GetSourceRDD begin .\ndatabase:" + this.database + "\nuser:" + this.user + "\nsql:" + this.sql + "\nurl:" + this.url);
            String newUrl = SparkInfo.url + database;
            String domainName = KerberosAction.getDomainName();
            if (KerberosAction.getIsSparkOpen()) {
                String auth = "auth=kerberos";
                newUrl = newUrl + ";principal=" + KerberosAction.getPrincipal() + ";" + auth;
            }
            LOG.info("url: {}, user: {}, pwd: {}", newUrl, user, password);
            return pinput.apply(JdbcIO.<Object[]>read()
                    .withDataSourceProviderFn(
                            new MyDataSourceProviderFn(
                                    newUrl,
                                    KerberosAction.driverName,
                                    user,
                                    password,
                                    KerberosAction.getIsSparkOpen(),
                                    domainName
                            )
                    )
                    .withFetchSize(10000)
                    .withQuery(sql)
                    .withRowMapper(buildRowMapper())
                    .withCoder(ObjectArrayJsonCoder.of())
            );
        }

My sql is select * from my_table.

Does the meaning of the withFetchSize method imply reading a specified number of data from the database into memory each time? Then why does a Buffer overflow error occur? Or does JdbcIO cache all one hundred million records in the buffer? My spark.kryoserializer.buffer.max value is 2G.

Is there any way can i read 100 million lines data to PCollection<Object[]> if JdbcIO can not read such a large amount of data?

I do not want to modify spark.kryoserializer.buffer.max value, i guess it isn't a good way to solve my question. Is there any other way? I don't quite understand the process of data reading with JdbcIO. Does it support reading a large amount of data? If it doesn't, do I have any other better alternatives to read the data?

0

There are 0 answers