Spring Batch - Getting a DeadlockLoserDataAccessException when trying to read/write to the same table

3.7k views Asked by At

I am working on a Spring Batch application that will Read unprocessed data from Table A, process the Data, Insert the processed Data to Table B, and then Update the row in Table A to PROCESSED. However, while Inserting the data into Table B works fine, I keep getting a DeadlockLoserDataAccessException every time I try to Updated Table A. I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?

I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.

3

There are 3 answers

0
Mahmoud Ben Hassine On BEST ANSWER

I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?

This should not cause a problem if both the read, insert and update are within the same transaction (which is the case when you use a chunk-oriented step).

I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.

Here is a quick (self-contained) example with the same config as you mentioned:

import java.util.Arrays;
import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public JdbcCursorItemReader<Person> itemReader() {
        return new JdbcCursorItemReaderBuilder<Person>()
                .name("personItemReader")
                .dataSource(dataSource())
                .sql("select id, name from person where processed = false")
                .beanRowMapper(Person.class)
                .saveState(false) // process indicator pattern, no need to save state (see https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#process-indicator)
                .build();
    }

    @Bean
    public ItemProcessor<Person, Person> itemProcessor() {
        return item -> new Person(item.getId(), item.getName().toUpperCase());
    }

    @Bean
    public CompositeItemWriter<Person> itemWriter() {
        return new CompositeItemWriterBuilder<Person>()
                .delegates(Arrays.asList(peopleItemWriter(), personItemUpdater()))
                .ignoreItemStream(true)
                .build();
    }

    @Bean
    public JdbcBatchItemWriter<Person> peopleItemWriter() {
        return new JdbcBatchItemWriterBuilder<Person>()
                .dataSource(dataSource())
                .beanMapped()
                .sql("insert into people (name) values (:name)")
                .build();
    }

    @Bean
    public JdbcBatchItemWriter<Person> personItemUpdater() {
        return new JdbcBatchItemWriterBuilder<Person>()
                .dataSource(dataSource())
                .beanMapped()
                .sql("update person set processed = true where id = :id")
                .build();
    }

    @Bean
    public Step step() {
        return steps.get("step")
                .<Person, Person>chunk(1)
                .reader(itemReader())
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(step())
                .build();
    }

    @Bean
    public DataSource dataSource() {
        return new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.H2)
                .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
                .addScript("/org/springframework/batch/core/schema-h2.sql")
                .build();
    }

    @Bean
    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    public static void main(String[] args) throws Exception {

        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
        jdbcTemplate.execute("CREATE TABLE person (id int IDENTITY PRIMARY KEY, name VARCHAR(10), processed boolean);");
        jdbcTemplate.execute("CREATE TABLE people (id int IDENTITY PRIMARY KEY, name VARCHAR(10));");
        jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo', false);");
        jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'bar', false);");

        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());

        Integer nbInsertedFoos = jdbcTemplate.queryForObject("select count(id) from people where name = 'FOO'", Integer.class);
        Integer nbInsertedBars = jdbcTemplate.queryForObject("select count(id) from people where name = 'BAR'", Integer.class);
        System.out.println("nbInsertedFoos in people table = " + nbInsertedFoos);
        System.out.println("nbInsertedBars in people table = " + nbInsertedBars);
        Integer nbUpdatedPersons = jdbcTemplate.queryForObject("select count(*) from person where processed = true", Integer.class);
        System.out.println("nbUpdatedPersons in person table = " + nbUpdatedPersons);
    }

    public static class Person {

        private int id;

        private String name;

        public Person() {
        }

        public Person(int id, String name) {
            this.id = id;
            this.name = name;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return "Person{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    '}';
        }
    }

}

It reads persons from a Person table (TableA in your case), uppercase their name and writes the result in a People table (TableB in your case). Then it updates the processed flag on the Person table.

If you run the sample, you should see:

nbInsertedFoos in people table = 1
nbInsertedBars in people table = 1
nbUpdatedPersons in person table = 2

without any dead lock exception.

Hope this helps.

0
MichaelTiefenbacher On

The architecure is ETL like reading data from a source, processing it and writing it to a target. I try to avoid this update logic in my processes as it introduces a big overhead and the problems you described. So maybe you could re-think the architecture ...

If not I really recommend to have a appropriate index for the update - depending on the search condition you use. This will make the update not only cheaper but the SQL will only need to access the one row - avoiding additional table scans for the update.

0
Mark Barinstein On

I'd suggest you redesign the transaction logic to "lock" necessary TABLEA's rows marking them as 'PROCESSED' at the very beginning of your transaction, and not update them one more time at the end of the transaction. See the example below.

-- *** Example of queue processing in DB2 ***
-- The following registry variables must be set:
-- DB2_SKIPINSERTED=YES
-- DB2_SKIPDELETED=YES
-- DB2_EVALUNCOMMITTED=YES
-- Don't forget to db2stop/db2start after their setting to make the changes take an effect.

create table test(id int not null, not_processed dec(1) default 1, constraint np check (not_processed=1));
-- 'exclude null keys' is avaiable starting from V10.5
create index test1 on test(not_processed) exclude null keys; 
alter table test volatile; -- easy way to force ixscan discregarding the table statistics collected
insert into test (id) values 1,2;

-- Every session starts its transaction with locking its own set of rows (only one in the example), 
-- which becomes invisible for the same statement issued by other concurrent transactions 
-- due to setting registry variables above.
-- No lock waits expected on such an update.
update (select not_processed from test where not_processed=1 fetch first 1 row only) set not_processed=null;
-- work with other tables comes below
-- ...
-- transaction end