SaveState=false not working as expected - Spring batch

7k views Asked by At

We are using Multi threaded step in Spring batch to do the following. Read multiple rows in a table. each row in the table will be processed and written into a separate excel file.

To implement thread safety in JDBCCursorItemreader - I have done the following. 1) Created a SynchronizedItemReader which delegates a synchronized call to the JDBCCursrItemReader's read method. 2) set saveState=false in the jdbcitemreader bean. 3) Added a process indicator in input table

I have the following questions 1)Per my understanding if we set saveState=false Spring batch will not be storing the state in the Execution context i.e. the READ_COUNT, COMMIT_COUNT, WRITE_COUNT etc will stay at 0.

However, even with saveState=false the counts are being updated. I have given the xml config below. Any ideas why this is happening?

2) I understand that JDBCBatchItemWriter is thread safe. But does this mean, only one thread will be doing the business data write at a time? or is it thread safe only for maintaining the state purpose? If only one thread is writing to the DB at a time then I am not sure if the performance will be good

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"        xmlns:util="http://www.springframework.org/schema/util"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/batch 
http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util-3.2.xsd
">


<!-- database settings -->
<import resource="../config/database-context.xml" />
<import resource="../config/launch-context.xml" />

<batch:job id="multiThreadedJob">
    <batch:step id="readWriteMultiThreadedStep">
        <batch:tasklet task-executor="taskExecutor">
            <batch:chunk reader="synchronizingItemReader" processor="itemProcessor"
                writer="excelWriter" commit-interval="1" />
        </batch:tasklet>
    </batch:step>
</batch:job>
<bean id="taskExecutor"
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="5" />
    <property name="maxPoolSize" value="5" />
</bean>

<bean id="synchronizingItemReader" class="org.multithreadedstep.SynchronizingItemReader">
    <property name="delegate" ref="jdbcItemReader" />
</bean>
<!-- inject stepExecutionContext -->
<bean id="itemProcessor" class="org.multithreadedstep.UserProcessor" scope="step">
    <property name="threadName" value="#{stepExecutionContext[name]}" />
    <property name="baseDAO" ref="baseDAO" />       
</bean>

<bean id="jdbcItemReader"
    class="org.springframework.batch.item.database.JdbcCursorItemReader"
    scope="step">
    <property name="dataSource" ref="dataSource" />
    <property name="sql">
        <value>
            <![CDATA[

            SELECT id,
             user_login,
             user_pass,
             age,
             name
            FROM users_test where processed='N'          
         ORDER BY TO_NUMBER (id) 
            ]]>
        </value>
    </property>
    <property name="rowMapper" ref="userRowMapper" />
     <property name="saveState" value="false"/>
</bean>
<bean id="userRowMapper" class="org.multithreadedstep.UserRowMapper" />

<bean id="excelWriter"
class="org.multithreadedstep.ExcelWriter" scope="step">

</bean>
<bean id="docName" class="java.lang.String" scope="prototype"/>
<bean id="baseDAO"
    class="org.multithreadedstep.BaseDAO">
    <property name="jdbcTemplate" ref="jdbcTemplate" />
</bean>


<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
  <constructor-arg ref="dataSource"/>

SynchronizingItemReader.java
package org.multithreadedstep;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;

public class SynchronizingItemReader implements ItemReader<User>, ItemStream {
private ItemReader<User> delegate;

public synchronized User read() throws Exception {
    return delegate.read();
}

public void close() throws ItemStreamException {
    if (this.delegate instanceof ItemStream) {
        ((ItemStream) this.delegate).close();
    }
}

public void open(ExecutionContext context) throws ItemStreamException {
    if (this.delegate instanceof ItemStream) {
        ((ItemStream) this.delegate).open(context);
    }
}

public void update(ExecutionContext context) throws ItemStreamException {
    if (this.delegate instanceof ItemStream) {
        ((ItemStream) this.delegate).update(context);
    }
}

/**
 * @return the delegate
 */
public ItemReader<User> getDelegate() {
    return delegate;
}

/**
 * @param delegate the delegate to set
 */
public void setDelegate(ItemReader<User> delegate) {
    this.delegate = delegate;
}

}

0

There are 0 answers