How to use Consul to perform synchronize task on one machine at a time?

828 views Asked by At

I have a system with 10 machines where I need to perform a certain task on each machine one by one in synchronize order. Basically only one machine should do that task at a particular time. We already use Consul for some other purpose but I was thinking can we use Consul to do this as well?

I read more about it and it looks like we can use leader election with consul where each machine will try to acquire lock, do the work and then release the lock. Once work is done, it will release the lock and then other machine will try to acquire lock again and do the same work. This way everything will be synchronized one machine at a time.

I decided to use this C# PlayFab ConsulDotNet library which already has this capability built in looks like but if there is any better option available I am open to that as well. Below Action method in my code base is called on each machine at the same time almost through a watcher mechanism.

 private void Action() {
    // Try to acquire lock using Consul.
    // If lock acquired then DoTheWork() otherwise keep waiting for it until lock is acquired.
    // Once work is done, release the lock
    // so that some other machine can acquire the lock and do the same work.
 }

Now inside that above method I need to do below things -

  • Try to acquire lock. If you cannot acquire the lock wait for it since other machine might have grabbed it before you.
  • If lock acquired then DoTheWork().
  • Once work is done, release the lock so that some other machine can acquire the lock and do the same work.

Idea is all 10 machines should DoTheWork() one at a time in synchronize order. Based on this blog and this blog I decided to modify their example to fit our needs -

Below is my LeaderElectionService class:

public class LeaderElectionService
{
    public LeaderElectionService(string leadershipLockKey)
    {
        this.key = leadershipLockKey;
    }

    public event EventHandler<LeaderChangedEventArgs> LeaderChanged;
    string key;
    CancellationTokenSource cts = new CancellationTokenSource();
    Timer timer;
    bool lastIsHeld = false;
    IDistributedLock distributedLock;

    public void Start()
    {
        timer = new Timer(async (object state) => await TryAcquireLock((CancellationToken)state), cts.Token, 0, Timeout.Infinite);
    }

    private async Task TryAcquireLock(CancellationToken token)
    {
        if (token.IsCancellationRequested)
            return;
        try
        {
            if (distributedLock == null)
            {
                var clientConfig = new ConsulClientConfiguration { Address = new Uri("http://consul.host.domain.com") };
                ConsulClient client = new ConsulClient(clientConfig);
                distributedLock = await client.AcquireLock(new LockOptions(key) { LockTryOnce = true, LockWaitTime = TimeSpan.FromSeconds(3) }, token).ConfigureAwait(false);
            }
            else
            {
                if (!distributedLock.IsHeld)
                {
                    await distributedLock.Acquire(token).ConfigureAwait(false);
                }
            }
        }
        catch (LockMaxAttemptsReachedException ex)
        {
            //this is expected if it couldn't acquire the lock within the first attempt.
            Console.WriteLine(ex.Stacktrace);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Stacktrace);
        }
        finally
        {
            bool lockHeld = distributedLock?.IsHeld == true;
            HandleLockStatusChange(lockHeld);
            //Retrigger the timer after a 10 seconds delay (in this example). Delay for 7s if not held as the AcquireLock call will block for ~3s in every failed attempt.
            timer.Change(lockHeld ? 10000 : 7000, Timeout.Infinite);
        }
    }

    protected virtual void HandleLockStatusChange(bool isHeldNew)
    {
        // Is this the right way to check and do the work here?
        // In general I want to call method "DoTheWork" in "Action" method itself
        // And then release and destroy the session once work is done.
        if (isHeldNew)
        {
            // DoTheWork();
            Console.WriteLine("Hello");
            // And then were should I release the lock so that other machine can try to grab it?
            // distributedLock.Release();
            // distributedLock.Destroy();
        }

        if (lastIsHeld == isHeldNew)
            return;
        else
        {
            lastIsHeld = isHeldNew;
        }

        if (LeaderChanged != null)
        {
            LeaderChangedEventArgs args = new LeaderChangedEventArgs(lastIsHeld);
            foreach (EventHandler<LeaderChangedEventArgs> handler in LeaderChanged.GetInvocationList())
            {
                try
                {
                    handler(this, args);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Stacktrace);
                }
            }
        }
    }
}

And below is my LeaderChangedEventArgs class:

public class LeaderChangedEventArgs : EventArgs
{
    private bool isLeader;

    public LeaderChangedEventArgs(bool isHeld)
    {
        isLeader = isHeld;
    }

    public bool IsLeader { get { return isLeader; } }
}

In the above code there are lot of pieces which might not be needed for my use case but idea is same.

Problem Statement

Now in my Action method I would like to use above class and perform the task as soon as lock is acquired otherwise keep waiting for the lock. Once work is done, release and destroy the session so that other machine can grab it and do the work. I am kinda confuse on how to use above class properly in my below method.

 private void Action() {
    LeaderElectionService electionService = new LeaderElectionService("data/process");
    // electionService.LeaderChanged += (source, arguments) => Console.WriteLine(arguments.IsLeader ? "Leader" : "Slave");
    electionService.Start();

    // now how do I wait for the lock to be acquired here indefinitely
    // And once lock is acquired, do the work and then release and destroy the session
    // so that other machine can grab the lock and do the work
 }

I recently started working with C# so that's why kinda confuse on how to make this work efficiently in production by using Consul and this library.

Update

I tried with below code as per your suggestion and I think I tried this earlier as well but for some reason as soon as it goes to this line await distributedLock.Acquire(cancellationToken);, it just comes back to main method automatically. It never moves forward to my Doing Some Work! print out. Does CreateLock actually works? I am expecting that it will create data/lock on consul (since it is not there) and then try to acquire the lock on it and if acquired, then do the work and then release it for other machines?

private static CancellationTokenSource cts = new CancellationTokenSource();

public static void Main(string[] args)
{
    Action(cts.Token);
    Console.WriteLine("Hello World");
}

private static async Task Action(CancellationToken cancellationToken)
{
    const string keyName = "data/lock";

    var clientConfig = new ConsulClientConfiguration { Address = new Uri("http://consul.test.host.com") };
    ConsulClient client = new ConsulClient(clientConfig);
    var distributedLock = client.CreateLock(keyName);

    while (true)
    {
        try
        {
            // Try to acquire lock
            // As soon as it comes to this line,
            // it just goes back to main method automatically. not sure why
            await distributedLock.Acquire(cancellationToken);

            // Lock is acquired
            // DoTheWork();
            Console.WriteLine("Doing Some Work!");

            // Work is done. Jump out of loop to release the lock
            break;
        }
        catch (LockHeldException)
        {
            // Cannot acquire the lock. Wait a while then retry
            await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
        }
        catch (Exception)
        {
            // TODO: Handle exception thrown by DoTheWork method

            // Here we jump out of the loop to release the lock
            // But you can try to acquire the lock again based on your requirements
            break;
        }
    }

    // Release and destroy the lock
    // So that other machine can grab the lock and do the work
    await distributedLock.Release(cancellationToken);
    await distributedLock.Destroy(cancellationToken);
}
1

There are 1 answers

6
Han Zhao On BEST ANSWER

IMO, LeaderElectionService from those blogs is an overkill in your case.

Update 1

There is no need to do while loop because:

  1. ConsulClient is local variable
    • No need to check IsHeld property
  2. Acquire will block indefinitely unless
    • Set LockTryOnce true in LockOptions
    • Set timeout to CancellationToken

Side note, it is not necessary to invoke Destroy method after you call Release on the distributed lock (reference).

private async Task Action(CancellationToken cancellationToken)
{
    const string keyName = "YOUR_KEY";

    var client = new ConsulClient();
    var distributedLock = client.CreateLock(keyName);

    try
    {
        // Try to acquire lock
        // NOTE:
        //   Acquire method will block indefinitely unless
        //     1. Set LockTryOnce = true in LockOptions
        //     2. Pass a timeout to cancellation token
        await distributedLock.Acquire(cancellationToken);

        // Lock is acquired
        DoTheWork();
    }
    catch (Exception)
    {
        // TODO: Handle exception thrown by DoTheWork method
    }

    // Release the lock (not necessary to invoke Destroy method), 
    // so that other machine can grab the lock and do the work
    await distributedLock.Release(cancellationToken);
}

Update 2

The reason why OP's code just returns back to Main method is that, Action method is not awaited. You can use async Main if you use C# 7.1, and put await on Action method.

public static async Task Main(string[] args)
{
    await Action(cts.Token);
    Console.WriteLine("Hello World");
}