I am creating in memory PCollection and writing it into postgres sql. now, when I insert data into table, few records may throw exception and will not be inserted. how to extract such failed insert records when I start pipeline?
below is the code I have written for pipeline:
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(FlinkRunner.class);
Pipeline p = Pipeline.create(options);
// Preparing dummy data
Collection<Stock> stockList = Arrays.asList(new Stock("AAP", 2000,"Apple Inc"),
new Stock("MSF", 3000, "Microsoft Corporation"),
new Stock("NVDA", 4000, "NVIDIA Corporation"),
new Stock("INT", 3200, "Intel Corporation"));
// Reading dummy data and save it into PCollection<Stock>
PCollection<Stock> data = p.apply(Create.of(stockList)
.withCoder(SerializableCoder.of(Stock.class)));
//insert
@SuppressWarnings("unused")
PDone insertData = data.apply(JdbcIO.<Stock>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create("org.postgresql.Driver","jdbc:postgresql://localhost:5432/postgres")
.withUsername("postgres").withPassword("sachin"))
.withStatement("insert into stocks values(?, ?, ?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<Stock>() {
private static final long serialVersionUID = 1L;
public void setParameters(Stock element, PreparedStatement query) throws SQLException {
query.setString(1, element.getSymbol());
query.setLong(2, element.getPrice());
query.setString(3, element.getCompany());
}
}));
p.run().waitUntilFinish();
After going through all apache beam programming guide, i did not get any clue, So, copied JdbcIO and modified execute batch where I have separated inserted successful record and insert failed record by using TupleTags. now, It is working.
below is code for modified JdbcIO:
and client code:
below is the output as new Stock("NVDA", 4000, "NVIDIA Corporation") is intentianlly not inserted as my db column accept only 3 char "NVD" and not 4 chars "NVDA":
Full Details and github link