Concurrent Read/Write in LMDB (Lightning.NET) causes BadCommand error

76 views Asked by At

I'm just starting out using LMDB in C# using Lightning.NET and am having an issue where I get a BadCommand error where a read and a write happen concurrently, even if they're not related.

I was able to reproduce it with the following code (note the semaphores are just in there to force the read to happen before the write commit, obviously they're not in my real code!)

using LightningDB;

namespace LmdbTest;

public class Program
{
    public static async Task Main(string[] args)
    {
        using var environment = new LightningEnvironment("test") { MaxDatabases = 1 };
        environment.Open();

        Console.WriteLine("Doing WriteTask without semaphores");
        WriteTask(environment, 0x22, 0xEE, null, null);

        Console.WriteLine("Doing ReadTask without semaphores");
        ReadTask(environment, 0x22, null, null);

        var readStartSemaphore = new SemaphoreSlim(0, 1);
        var readDoneSemaphore = new SemaphoreSlim(0, 1);

        Console.WriteLine("Starting WriteTask with semaphores");
        var writeTask = Task.Run(() => WriteTask(environment, 0x55, 0xAA, readStartSemaphore, readDoneSemaphore));

        Console.WriteLine("Starting ReadTask with semaphores");
        var readTask = Task.Run(() => ReadTask(environment, 0x22, readStartSemaphore, readDoneSemaphore));

        Console.WriteLine("Awaiting ReadTask");
        await readTask;

        Console.WriteLine("Awaiting WriteTask");
        await writeTask;

        Console.WriteLine("Complete");
    }

    private static void WriteTask(
        LightningEnvironment environment,
        byte key,
        byte value,
        SemaphoreSlim? readStartSemaphore,
        SemaphoreSlim? readDoneSemaphore
    )
    {
        Console.WriteLine("[Write] starting");

        byte[] keyBytes = { key };
        byte[] valueBytes = { value };

        Console.WriteLine("[Write] BeginTransaction");
        using var tx = environment.BeginTransaction();

        Console.WriteLine("[Write] OpenDatabase");
        using var db = tx.OpenDatabase(
            name: "db1",
            configuration: new()
            {
                Flags = DatabaseOpenFlags.Create,
            }
        );

        Console.WriteLine($"[Write] Put 0x{key:X2} = 0x{value:X2}");
        var result = tx.Put(db, keyBytes, valueBytes);

        Console.WriteLine($"[Write] Result = {result}");

        if (readStartSemaphore != null && readDoneSemaphore != null)
        {
            Console.WriteLine("[Write] Releasing readStartSemaphore");
            readStartSemaphore.Release();

            Console.WriteLine("[Write] Waiting for readDoneSemaphore");
            readDoneSemaphore.Wait();
            Console.WriteLine("[Write] Got readDoneSemaphore");
        }

        Console.WriteLine("[Write] Commit");
        tx.Commit();

        Console.WriteLine("[Write] Complete");
    }

    private static void ReadTask(
        LightningEnvironment environment,
        byte key,
        SemaphoreSlim? readStartSemaphore,
        SemaphoreSlim? readDoneSemaphore
    )
    {
        Console.WriteLine("[Read] starting");

        byte[] keyBytes = { key };

        if (readStartSemaphore != null && readDoneSemaphore != null)
        {
            Console.WriteLine("[Read] Waiting for readStartSemaphore");
            readStartSemaphore.Wait();
            Console.WriteLine("[Read] Got readStartSemaphore");
        }

        Console.WriteLine("[Read] BeginTransaction");
        using var tx = environment.BeginTransaction(TransactionBeginFlags.ReadOnly);

        Console.WriteLine("[Read] OpenDatabase");
        using var db = tx.OpenDatabase(
            name: "db1"
        );

        Console.WriteLine($"[Read] Get 0x{key:X2}");
        var (result, keyOut, valueOut) = tx.Get(db, keyBytes);

        if (result == MDBResultCode.Success)
        {
            Console.WriteLine($"[Read] Result = {result} (0x{keyOut.AsSpan()[0]:X2} == 0x{valueOut.AsSpan()[0]:X2} )");
        }
        else
        {
            Console.WriteLine($"[Read] Result = {result}  <<<<<< UNEXPECTED");
        }

        if (readStartSemaphore != null && readDoneSemaphore != null)
        {
            Console.WriteLine("[Read] Releasing readDoneSemaphore");
            readDoneSemaphore.Release();
        }

        Console.WriteLine("[Read] Complete");
    }
}

This gives me the following output:

Doing WriteTask without semaphores
[Write] starting
[Write] BeginTransaction
[Write] OpenDatabase
[Write] Put 0x22 = 0xEE
[Write] Result = Success
[Write] Commit
[Write] Complete
Doing ReadTask without semaphores
[Read] starting
[Read] BeginTransaction
[Read] OpenDatabase
[Read] Get 0x22
[Read] Result = Success (0x22 == 0xEE )
[Read] Complete
Starting WriteTask with semaphores
Starting ReadTask with semaphores
[Write] starting
[Write] BeginTransaction
[Write] OpenDatabase
Awaiting ReadTask
[Read] starting
[Read] Waiting for readStartSemaphore
[Write] Put 0x55 = 0xAA
[Write] Result = Success
[Write] Releasing readStartSemaphore
[Write] Waiting for readDoneSemaphore
[Read] Got readStartSemaphore
[Read] BeginTransaction
[Read] OpenDatabase
[Read] Get 0x22
[Read] Result = BadCommand  <<<<<< UNEXPECTED
[Read] Releasing readDoneSemaphore
[Read] Complete
[Write] Got readDoneSemaphore
[Write] Commit
[Write] Complete
Awaiting WriteTask
Complete

I'm not sure what I'm doing wrong. Is there something I need to enable to allow concurrent read/write operations?

It appears to be related to using a named database. If I do the same without the name in OpenDatabase() then it works as expected. Simply removing the name is not an option however as I need more than one DB in my environment.

1

There are 1 answers

0
Andy On

So, I noticed this comment in the docs for mdb_dbi_open

This function must not be called from multiple concurrent transactions in the same process. A transaction that uses this function must finish (either commit or abort) before any other transaction in the process may use this function.

This confused me because the LMDB docs says

The library is fully thread-aware and supports concurrent read/write access from multiple processes and threads.

So how do you do concurrent read/writes without concurrently opening the DB? Doing googling on this found a number of similar questions going back a number of years, with no helpful answers, until I found this one which says to keep hold of the DB handle from the initial transaction and keep using that, rather than opening a new one, which kind of makes sense, but feels unintuitive since the DB is originally associated with the original transaction, but then isn't.

Anyway, changing the test code to this worked:

using LightningDB;

namespace LmdbTest;

public class Program
{
    public static async Task Main(string[] args)
    {
        using var environment = new LightningEnvironment("test") { MaxDatabases = 1 };
        environment.Open();

        Console.WriteLine("Creating DB");
        using var db = CreateDatabase(environment, "db1");

        Console.WriteLine("Doing WriteTask without semaphores");
        WriteTask(environment, db, 0x22, 0xEE, null, null);

        Console.WriteLine("Doing ReadTask without semaphores");
        ReadTask(environment, db, 0x22, null, null);

        var readStartSemaphore = new SemaphoreSlim(0, 1);
        var readDoneSemaphore = new SemaphoreSlim(0, 1);

        Console.WriteLine("Starting WriteTask with semaphores");
        var writeTask = Task.Run(() => WriteTask(environment, db, 0x55, 0xAA, readStartSemaphore, readDoneSemaphore));

        Console.WriteLine("Starting ReadTask with semaphores");
        var readTask = Task.Run(() => ReadTask(environment, db, 0x22, readStartSemaphore, readDoneSemaphore));

        Console.WriteLine("Awaiting ReadTask");
        await readTask;

        Console.WriteLine("Awaiting WriteTask");
        await writeTask;

        Console.WriteLine("Complete");
    }

    private static LightningDatabase CreateDatabase(LightningEnvironment environment, string name)
    {
        Console.WriteLine("[Create] starting");

        Console.WriteLine("[Create] BeginTransaction");
        using var tx = environment.BeginTransaction();

        Console.WriteLine("[Create] OpenDatabase");
        var db = tx.OpenDatabase(
            name: name,
            configuration: new()
            {
                Flags = DatabaseOpenFlags.Create,
            }
        );

        Console.WriteLine("[Create] Commit");
        tx.Commit();

        Console.WriteLine("[Create] Complete");

        return db;
    }

    private static void WriteTask(
        LightningEnvironment environment,
        LightningDatabase db,
        byte key,
        byte value,
        SemaphoreSlim? readStartSemaphore,
        SemaphoreSlim? readDoneSemaphore
    )
    {
        Console.WriteLine("[Write] starting");

        byte[] keyBytes = { key };
        byte[] valueBytes = { value };

        Console.WriteLine("[Write] BeginTransaction");
        using var tx = environment.BeginTransaction();

        Console.WriteLine($"[Write] Put 0x{key:X2} = 0x{value:X2}");
        var result = tx.Put(db, keyBytes, valueBytes);

        Console.WriteLine($"[Write] Result = {result}");

        if (readStartSemaphore != null && readDoneSemaphore != null)
        {
            Console.WriteLine("[Write] Releasing readStartSemaphore");
            readStartSemaphore.Release();

            Console.WriteLine("[Write] Waiting for readDoneSemaphore");
            readDoneSemaphore.Wait();
            Console.WriteLine("[Write] Got readDoneSemaphore");
        }

        Console.WriteLine("[Write] Commit");
        tx.Commit();

        Console.WriteLine("[Write] Complete");
    }

    private static void ReadTask(
        LightningEnvironment environment,
        LightningDatabase db,
        byte key,
        SemaphoreSlim? readStartSemaphore,
        SemaphoreSlim? readDoneSemaphore
    )
    {
        Console.WriteLine("[Read] starting");

        byte[] keyBytes = { key };

        if (readStartSemaphore != null && readDoneSemaphore != null)
        {
            Console.WriteLine("[Read] Waiting for readStartSemaphore");
            readStartSemaphore.Wait();
            Console.WriteLine("[Read] Got readStartSemaphore");
        }

        Console.WriteLine("[Read] BeginTransaction");
        using var tx = environment.BeginTransaction(TransactionBeginFlags.ReadOnly);

        Console.WriteLine($"[Read] Get 0x{key:X2}");
        var (result, keyOut, valueOut) = tx.Get(db, keyBytes);

        if (result == MDBResultCode.Success)
        {
            Console.WriteLine($"[Read] Result = {result} (0x{keyOut.AsSpan()[0]:X2} == 0x{valueOut.AsSpan()[0]:X2} )");
        }
        else
        {
            Console.WriteLine($"[Read] Result = {result}  <<<<<< UNEXPECTED");
        }

        if (readStartSemaphore != null && readDoneSemaphore != null)
        {
            Console.WriteLine("[Read] Releasing readDoneSemaphore");
            readDoneSemaphore.Release();
        }

        Console.WriteLine("[Read] Complete");
    }
}

Now to see if I can change my actual code to do the same....