Spring batch JdbcPagingItemReader missing un commit records

1.6k views Asked by At

Batch has 4 steps 1. Do some basic task 2. pull records from input table -> process -> out table 3. validate error count, check count for records in input and output tables. 4. pull records from out table and update status is business tables.

I have issue that count of records in out table is coming correct but when step 4 full records is having less records. As i am using Oracle I feel its due to commit not done when records are pull.

Please suggest if any one face similar issue.

Configuration is as follows:

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:util="http://www.springframework.org/schema/util" xmlns:orcl="http://www.springframework.org/schema/data/orcl"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/beans    
                       http://www.springframework.org/schema/beans/spring-beans.xsd 
                       http://www.springframework.org/schema/context  
                       http://www.springframework.org/schema/context/spring-context-2.5.xsd 
                       http://www.springframework.org/schema/tx       
                       http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
                       http://www.springframework.org/schema/aop 
                       http://www.springframework.org/schema/aop/spring-aop-3.2.xsd
                       http://www.springframework.org/schema/util 
                       http://www.springframework.org/schema/util/spring-util.xsd
                       http://www.springframework.org/schema/data/orcl
                       http://www.springframework.org/schema/data/orcl/spring-data-orcl-1.0.xsd
                       http://www.springframework.org/schema/batch
                        http://www.springframework.org/schema/batch/spring-batch-3.0.xsd ">

<context:component-scan base-package="com.test." />

<import resource="classpath:context-datasource.xml" />

<bean id="parameterSetter" class="com.test.helper.QueryParamSetter"
    scope="step">
    <constructor-arg value="#{jobParameters['fileName']}" />
</bean>

<bean id="pagingItemReader"
    class="org.springframework.batch.item.database.JdbcPagingItemReader"
    scope="step">
    <property name="dataSource" ref="dataSource" />
    <property name="queryProvider">
        <bean
            class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
            <property name="dataSource" ref="dataSource" />
            <property name="selectClause" value="SELECT *" />
            <property name="fromClause" value="from INPUT_TABLE" />
            <property name="whereClause"
                value="FILE_NAME= :fileName AND rownum &lt; 3001" /> 
            <property name="sortKey" value="INPIUT_STG_ID" />

        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="fileName" value="#{jobParameters['fileName']}" />
        </map>
    </property>
    <property name="pageSize" value="100" />
    <property name="rowMapper">
        <bean class="com.test.InputVOMapper" />
    </property>
</bean>

<bean id="postItemReader"
    class="org.springframework.batch.item.database.JdbcPagingItemReader"
    scope="step">
    <property name="dataSource" ref="dataSource" />
    <property name="queryProvider">
        <bean
            class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
            <property name="dataSource" ref="dataSource" />
            <property name="selectClause" value="select *" />
            <property name="fromClause" value="from OUTPUT_TABLE" />
            <property name="whereClause"
                value="FILE_NAME= :fileName AND TXN_ID IS NOT NULL AND FILE_DATA IS NOT NULL" />
            <property name="sortKey" value="CREATE_DT" />

        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="fileName" value="#{jobParameters['fileName']}" />
        </map>
    </property>
    <property name="pageSize" value="100" />
    <property name="rowMapper">
        <bean class="com.test.mapper.OutputVOMapper" />
    </property>
</bean>

<!-- JobExecutionListener to perform business logic before and after the 
    job -->
<bean id="jobListener" class="com.test.SettlementJobListener">

    <property name="templateDir" value="templates" />
</bean>

<bean id="processEventHelper" class="com.wdpr.payment.helper.ProcessEventHelper" />

<!-- ItemWriter which writes data to database -->

<bean id="errorWriter"
    class="org.springframework.batch.item.database.JdbcBatchItemWriter">
    <property name="dataSource" ref="dataSource" />
    <property name="sql"
        value="UPDATE INPUT_TABLE SET ERR_CD=:batchErrorCode, ERR_DS=:batchErrorDesc WHERE PROC_ID=:rrn" />
    <property name="itemSqlParameterSourceProvider">
        <bean
            class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
    </property>
</bean>
<bean id="recordWriter"
    class="org.springframework.batch.item.database.JdbcBatchItemWriter">
    <property name="dataSource" ref="dataSource" />
    <property name="sql"
        value="INSERT INTO OUTPUT_TABLE (CARD_SETTL_ID,TXN_ID,CREATE_DT,CREATE_PROC_ID,FILE_DATA,CLIENT_ID,FILE_NAME) values (:a,:b,:c,:d,:e,:f,:g)" />
    <property name="itemSqlParameterSourceProvider">
        <bean
            class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
    </property>
</bean>

<bean id="databaseItemWriter" class="com.test.SettlementDataWriter"
    scope="step">
    <property name="errorWriter" ref="errorWriter" />
    <property name="recordWriter" ref="recordWriter" />
</bean>

<bean id="itemProcessor" class="com.test.SettlementDataProcessor"
    scope="step" />

<bean id="preProcessor" class="com.test.PreProcessor" />
<!-- Step will need a transaction manager -->
<bean id="transactionManager"
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> 

<bean id="statusItemWriter" class="com.test.StatusItemWriter" scope="step">
    <property name="processorHelper" ref="processorHelper" />
</bean>

<bean id="errorCheck" class="com.test.ErrorCheckTasklet"
    scope="step">
    <property name="jdbcTemplate" ref="jdbcTemplate" />
    <property name="sql"
        value="SELECT count(*) from INPUT_TABLE where FILE_NAME= ? AND ERR_CD IS NOT NULL" />
    <property name="footerCount"
        value="SELECT count(*) from OUTPUT_TABLE where FILE_NAME= ? AND FILE_DATA IS NOT NULL" />
    <property name="countCheck"
        value="SELECT count(*) from OUTPUT_TABLE where FILE_NAME= ? AND TXN_ID IS NOT NULL AND FILE_DATA IS NOT NULL" />    
    <!-- <property name="preparedStatementSetter" ref="parameterSetter" /> -->
    <property name="recordWriter" ref="recordWriter" />
</bean>

<bean id="preProcessReader" class="com.test.PreProcessReader"
    scope="step">
    <property name="jdbcTemplate" ref="jdbcTemplate" />
    <property name="sql1"
        value="select count(PROC_ID) from INPUT_TABLE where FILE_NAME=? AND rownum &lt; 101" />
    <property name="sql2"
        value="select count(distinct(PROC_ID)) from INPUT_TABLE where FILE_NAME=? AND rownum &lt; 101" />
    <property name="preparedStatementSetter" ref="parameterSetter" />
</bean>

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" >
    <property name="concurrencyLimit" value="100" />
</bean>

<!-- Actual Job -->
<batch:job id="settelmentJob">

    <batch:step id="step1" next="step2">
        <batch:tasklet transaction-manager="transactionManager">
            <batch:chunk reader="preProcessReader" processor="preProcessor"
                writer="recordWriter" commit-interval="1" />
        </batch:tasklet>
    </batch:step>

    <batch:step id="step2" next="step3" >
        <batch:tasklet transaction-manager="transactionManager"
            task-executor="taskExecutor" throttle-limit="100"  ><!-- throttle-limit="100" -->
            <batch:chunk reader="pagingItemReader" writer="databaseItemWriter"
                processor="itemProcessor"  commit-interval="100"  /> <!-- commit-interval="100"  -->
        </batch:tasklet>
    </batch:step>

    <batch:step id="step3" >
        <batch:tasklet transaction-manager="transactionManager" 
            ref="errorCheck" >
        </batch:tasklet>
    </batch:step>

    <batch:step id="step4">
        <batch:tasklet transaction-manager="transactionManager"
            task-executor="taskExecutor" throttle-limit="100" >
            <batch:chunk reader="postItemReader" writer="statusItemWriter"
                commit-interval="100" />

        </batch:tasklet>

        </batch:step>

    <batch:listeners>
        <batch:listener ref="jobListener" />
    </batch:listeners>
</batch:job>

<!-- JobRepository and JobLauncher are configuration/setup classes -->

<bean id="jobRepository"
    class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
</bean>

<bean id="jobLauncher"
    class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
</bean>

Data source

bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="${driverClassName}" />
<property name="url" value="${url}" />
<property name="username" value="${username}" />
<property name="password" value="${password}" />
<property name="initialSize" value="30" />
<property name="maxIdle" value="10" />
<property name="validationQuery" value="select 1 from dual" />
<property name="maxTotal" value="50"></property>
<property name="testWhileIdle" value="true"/>
<property name="timeBetweenEvictionRunsMillis" value="10000"/>
<property name="removeAbandonedOnBorrow" value="true"/>
2

There are 2 answers

0
saddy dj On

I find the issue was not with commit but with jdbcpagination config is returning different result than the query. I have created below issue.

JdbcPagingItemReader in Spring batch is not giving correct results

6
Michael Minella On

I believe the issue is that you're using a ResourcelessTransactionManager. There are two issues with this:

  1. It means you aren't really using transactions (since it doesn't work with any resources).
  2. It means that Spring Batch won't wait for the commit to complete before proceeding since a true commit isn't being called.

You're already using the Map based JobRepository. You should use a TransactionManager for the DataSource you're working with Oracle on. That will fix your issue.