Single transaction using multiple connections. (MYSQL/JDBC)

6.4k views Asked by At

The application I'm working on is a Java-based ETL process that loads data into multiple tables. The DBMS is Infobright (a MYSQL-based DBMS geared for data warehousing).

The data loading should be done atomically; however, for performance reasons, I want to load data into multiple tables at the same time (using a LOAD DATA INFILE command). This means I need to open multiple connections.

Is there any solution that allows me to do the loads atomically and in parallel? (I'm guessing the answer might depend on the engine for the tables I load into; most of them are Brighthouse, which allows Transactions, but no XA and no Savepoints).

To further clarify, I want to avoid a situation where let's say:

  • I load data into 5 tables
  • I commit the loads for the first 4 tables
  • The commit for the 5th table fails

In this situation, I can't rollback the first 4 loads, because they are already commited.

2

There are 2 answers

3
Kohányi Róbert On

Intro

As I've promised I've hacked up a complete example. I've used MySQL and created three tables like the following:

CREATE TABLE `test{1,2,3}` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `data` varchar(255) NOT NULL UNIQUE,
  PRIMARY KEY (`id`)
);

test2 contains a single row initially.

INSERT INTO `test2` (`data`) VALUES ('a');

(I've posted the full code to http://pastebin.com.)

The following example does several things.

  1. Sets threads to 3 which determines how many jobs are going to be run in parallel.
  2. Creates threads number of connections.
  3. Spouts out some sample data for every table (by default the data is a for every table).
  4. Creates threads number of jobs to be run and loads them with data.
  5. Runs the jobs in threads number of threads and waits for their completion (successful or not).
  6. If no exceptions occurred commits every connection; otherwise it rolls back each of them.
  7. Closes the connections (however these can be reused).

(Note, that I've used Java 7's automatic resource management feature in SQLTask.call().)

Logic

public static void main(String[] args) throws SQLException, InterruptedException {
  int threads = 3;
  List<Connection> connections = getConnections(threads);
  Map<String, String> tableData = getTableData(threads);
  List<SQLTask> tasks = getTasks(threads, connections);
  setData(tableData, tasks);
  try {
    runTasks(tasks);
    commitConnections(connections);
  } catch (ExecutionException ex) {
    rollbackConnections(connections);
  } finally {
    closeConnections(connections);
  }
}

Data

private static Map<String, String> getTableData(int threads) {
  Map<String, String> tableData = new HashMap<>();
  for (int i = 1; i <= threads; i++)
    tableData.put("test" + i, "a");
  return tableData;
}

Tasks

private static final class SQLTask implements Callable<Void> {

  private final Connection connection;

  private String data;
  private String table;

  public SQLTask(Connection connection) {
    this.connection = connection;
  }

  public void setTable(String table) {
    this.table = table;
  }

  public void setData(String data) {
    this.data = data;
  }

  @Override
  public Void call() throws SQLException {
    try (Statement statement = connection.createStatement()) {
      statement.executeUpdate(String.format(
        "INSERT INTO `%s` (data) VALUES  ('%s');", table, data));
    }
    return null;
  }
}

private static List<SQLTask> getTasks(int threads, List<Connection> connections) {
  List<SQLTask> tasks = new ArrayList<>();
  for (int i = 0; i < threads; i++)
    tasks.add(new SQLTask(connections.get(i)));
  return tasks;
}

private static void setData(Map<String, String> tableData, List<SQLTask> tasks) {
  Iterator<Entry<String, String>> i = tableData.entrySet().iterator();
  Iterator<SQLTask> j = tasks.iterator();
  while (i.hasNext()) {
    Entry<String, String> entry = i.next();
    SQLTask task = j.next();
    task.setTable(entry.getKey());
    task.setData(entry.getValue());
  }
}

Run

private static void runTasks(List<SQLTask> tasks) 
    throws ExecutionException, InterruptedException {
  ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
  List<Future<Void>> futures = executorService.invokeAll(tasks);
  executorService.shutdown();
  for (Future<Void> future : futures)
    future.get();
}

Result

Given the default data returned by getTableData(...)

test1 -> `a`
test2 -> `a`
test3 -> `a`

and the fact that test2 already contains a (and the data column is unique) the second job will fail and throw an exception, thus every connection will be rolled back.

If instead of as you return bs, then the connections will be committed safely.

This can be done similarly with LOAD DATA.


After OP's response on my answer I realized that what she/he wants to do isn't possible to do in a simple and clear manner.

Basically the problem is that after a successful commit the data that was committed can't be rolled-back, because the operation is atomic. Given multiple commits are needed in the case given, rolling-back everything isn't possible unless one tracks all data (in all of the transactions) and if somethings happens deletes everything that was successfully committed.

There is a nice answer relating to the issue of commits and rollbacks.

0
Craig Trombly On

Actually in the newer version of IEE, not ICE, there is an additional feature called DLP (Distributed Load Processing). There is a PDF file on the site, linked from here:

http://www.infobright.com/Products/Features/