Can 'data race' happen with triggers in MariaDB with InnoDB?

96 views Asked by At

My application processes very large volume of real-time data (>200 million a day) and I need to aggregate them real time in order keep reporting performant. The data is fed and hence processed randomly by several threads by the server.

I use MariaDB 10.5.6-MariaDB with InnoDB 10.5.6

  • Do you know if the trigger is thread-safe, i.e. if data race can happen at all. In other words when 1000 updates - increment only - happens to the same columns in a single row within a second by 10 connections, then the data will not be messed up and the result will be like the values were summed by a single connection in series.

  • Do you know how row-level locking works and if it is automatic or if it can be enforced manually.

Sharing some of your relevant bookmarks would also be appreciated because I did not find anything succinct and useful in google.

UPDATE

I have added an after-insert trigger that created a new record in the reporting table if the record did not exist and then updated the columns with an update statement update table set field=value+delta where condition. The database did not like it and the application - java, hibernate -, that was sending data could not bear it either and started to throw exceptions:

  • This is totally irrelevant to the row that hibernate tried to insert because it did not try to update. Clearly it is coming from the MariaDB trigger: Caused by: org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)
  • I am not sure why did this occurred, but got some of this as well: Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction
1

There are 1 answers

0
ben On BEST ANSWER

What I found is the trigger was not thread safe in a sense that the database started throwing different errors for concurrent update to the same row:

  • Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)
  • Deadlock found when trying to get lock; try restarting transaction

I tried to do introduce row level locking but it did not work at all. I believe the lock was ignored or the rows did not get locked at all

$ grep "ExecStart=" /usr/lib/systemd/system/mariadb.service 
ExecStart=/usr/sbin/mariadbd --autocommit=0 --transaction-isolation=read-committed $MYSQLD_OPTS $_WSREP_NEW_CLUSTER $_WSREP_START_POSITION
  • Autocommit was disabled
  • Transaction isolation was changed to read-committed
  • Tried row-level locking with SELECT what FROM tables WHERE conditions FOR UPDATE using primary keys

Tried a table level locking equivalent solution as well with persisting the data with a single thread but it could not cope with the data volume I had.

The solution that I went for is the thread-level segregation of feed processing from persistence in a way that several threads processes the incoming data feed and creates entity objects for another set of threads to save them in the database. This enables me to experiment and to find the optimal number of threads per area for my platform, like currently, I am testing with 8 threads processing the incoming feed and creating entity objects for another 4 threads that are responsible for saving them in the database. For the persister threads, I have introduced some smart segregation and custom locking of set of entities in the application layer to ensure that no two threads tries to update the same row at the same time. This seems to work, I now just need to find the right number of threads for both area.

This is the consumer class that produces the backlog for DB writers

    protected abstract Map<String, Set<ENTITY>> breakDownToBatchesForPersistance(Collection<ENTITY> localBacklog);

    private void saveEntitiesInBatches(IDefaultEntityDAO<ENTITY> dao, Collection<ENTITY> localBacklog) {
            for (Map.Entry<String, Set<ENTITY>> entry : breakDownToBatchesForPersistance(localBacklog).entrySet()) {
                persister.saveAll(dao, entry.getKey(), entry.getValue());
            }
    }

This is the backlog for the DB writers

    private LinkedBlockingQueue<Key> keys;
    private Map<Key, Set> backlog;

    public <ENTITY> void saveAll(IDefaultEntityDAO<ENTITY> dao, String bucket, Set<ENTITY> entitySet) {
        Key<ENTITY> key = Key.get(dao, bucket);
        synchronized (key) {
            synchronized (backlog) {
                if (backlog.containsKey(key)) {
                    backlog.get(key).addAll(entitySet);
                } else {
                    backlog.put(key, entitySet);
                    try {
                        keys.put(key);
                    } catch (InterruptedException ex) {
                    }
                }
            }
        }
    }

This is the core of the DB writer

    private void processDBBatchUpdate(Key key) {
        synchronized (key) {
            Set set;
            synchronized (backlog) {
                set = backlog.remove(key);
            }

            key.getDao().saveAll(set);
        }
    }

This is the Key class for locking

    private IDefaultEntityDAO<ENTITY> dao;
    private String bucket;
    private static Map<IDefaultEntityDAO, Map<Object, Key>> keys = new HashMap<>();

    private Key(IDefaultEntityDAO dao, String bucket) {
        this.dao = dao;
        this.bucket = bucket;
    }

    public static synchronized <ENTITY> Key<ENTITY> get(IDefaultEntityDAO<ENTITY> dao, String bucket) {
        if (!keys.containsKey(dao)) {
            keys.put(dao, new HashMap<>());
        }

        if (!keys.get(dao).containsKey(bucket)) {
            keys.get(dao).put(bucket, new Key(dao, bucket));
        }

        return keys.get(dao).get(bucket);
    }