We are using Multi threaded step in Spring batch to do the following. Read multiple rows in a table. each row in the table will be processed and written into a separate excel file.
To implement thread safety in JDBCCursorItemreader - I have done the following. 1) Created a SynchronizedItemReader which delegates a synchronized call to the JDBCCursrItemReader's read method. 2) set saveState=false in the jdbcitemreader bean. 3) Added a process indicator in input table
I have the following questions 1)Per my understanding if we set saveState=false Spring batch will not be storing the state in the Execution context i.e. the READ_COUNT, COMMIT_COUNT, WRITE_COUNT etc will stay at 0.
However, even with saveState=false the counts are being updated. I have given the xml config below. Any ideas why this is happening?
2) I understand that JDBCBatchItemWriter is thread safe. But does this mean, only one thread will be doing the business data write at a time? or is it thread safe only for maintaining the state purpose? If only one thread is writing to the DB at a time then I am not sure if the performance will be good
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch" xmlns:util="http://www.springframework.org/schema/util"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-3.2.xsd
">
<!-- database settings -->
<import resource="../config/database-context.xml" />
<import resource="../config/launch-context.xml" />
<batch:job id="multiThreadedJob">
<batch:step id="readWriteMultiThreadedStep">
<batch:tasklet task-executor="taskExecutor">
<batch:chunk reader="synchronizingItemReader" processor="itemProcessor"
writer="excelWriter" commit-interval="1" />
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="taskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5" />
<property name="maxPoolSize" value="5" />
</bean>
<bean id="synchronizingItemReader" class="org.multithreadedstep.SynchronizingItemReader">
<property name="delegate" ref="jdbcItemReader" />
</bean>
<!-- inject stepExecutionContext -->
<bean id="itemProcessor" class="org.multithreadedstep.UserProcessor" scope="step">
<property name="threadName" value="#{stepExecutionContext[name]}" />
<property name="baseDAO" ref="baseDAO" />
</bean>
<bean id="jdbcItemReader"
class="org.springframework.batch.item.database.JdbcCursorItemReader"
scope="step">
<property name="dataSource" ref="dataSource" />
<property name="sql">
<value>
<![CDATA[
SELECT id,
user_login,
user_pass,
age,
name
FROM users_test where processed='N'
ORDER BY TO_NUMBER (id)
]]>
</value>
</property>
<property name="rowMapper" ref="userRowMapper" />
<property name="saveState" value="false"/>
</bean>
<bean id="userRowMapper" class="org.multithreadedstep.UserRowMapper" />
<bean id="excelWriter"
class="org.multithreadedstep.ExcelWriter" scope="step">
</bean>
<bean id="docName" class="java.lang.String" scope="prototype"/>
<bean id="baseDAO"
class="org.multithreadedstep.BaseDAO">
<property name="jdbcTemplate" ref="jdbcTemplate" />
</bean>
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<constructor-arg ref="dataSource"/>
SynchronizingItemReader.java
package org.multithreadedstep;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
public class SynchronizingItemReader implements ItemReader<User>, ItemStream {
private ItemReader<User> delegate;
public synchronized User read() throws Exception {
return delegate.read();
}
public void close() throws ItemStreamException {
if (this.delegate instanceof ItemStream) {
((ItemStream) this.delegate).close();
}
}
public void open(ExecutionContext context) throws ItemStreamException {
if (this.delegate instanceof ItemStream) {
((ItemStream) this.delegate).open(context);
}
}
public void update(ExecutionContext context) throws ItemStreamException {
if (this.delegate instanceof ItemStream) {
((ItemStream) this.delegate).update(context);
}
}
/**
* @return the delegate
*/
public ItemReader<User> getDelegate() {
return delegate;
}
/**
* @param delegate the delegate to set
*/
public void setDelegate(ItemReader<User> delegate) {
this.delegate = delegate;
}
}