#StackBounty: #c# #locking #consul #consul-kv #leader-election How to use Consul to perform synchronize task on one machine at a time?

Bounty: 100

I have a system with 10 machines where I need to perform a certain task on each machine one by one in synchronize order. Basically only one machine should do that task at a particular time. We already use Consul for some other purpose but I was thinking can we use Consul to do this as well?

I read more about it and it looks like we can use leader election with consul where each machine will try to acquire lock, do the work and then release the lock. Once work is done, it will release the lock and then other machine will try to acquire lock again and do the same work. This way everything will be synchronized one machine at a time.

I decided to use this C# PlayFab ConsulDotNet library which already has this capability built in looks like but if there is any better option available I am open to that. Below Action method in my code base is called on each machine at the same time almost through a watcher mechanism.

 private void Action() {
    // Try to acquire lock using Consul.
    // If lock acquired then DoTheWork() otherwise keep waiting for it until lock is acquired.
    // Once work is done, release the lock
    // so that some other machine can acquire the lock and do the same work.
 }

Now inside that above method I need to do below things –

  • Try to acquire lock. If you cannot acquire the lock wait for it since other machine might have grabbed it before you.
  • If lock acquired then DoTheWork().
  • Once work is done, release the lock so that some other machine can acquire the lock and do the same work.

Idea is all 10 machines should DoTheWork() one at a time in synchronize order. Based on this blog and this blog I decided to modify their example to fit our needs –

Below is my LeaderElectionService class:

public class LeaderElectionService
{
    public LeaderElectionService(string leadershipLockKey)
    {
        this.key = leadershipLockKey;
    }

    public event EventHandler<LeaderChangedEventArgs> LeaderChanged;
    string key;
    CancellationTokenSource cts = new CancellationTokenSource();
    Timer timer;
    bool lastIsHeld = false;
    IDistributedLock distributedLock;

    public void Start()
    {
        timer = new Timer(async (object state) => await TryAcquireLock((CancellationToken)state), cts.Token, 0, Timeout.Infinite);
    }

    private async Task TryAcquireLock(CancellationToken token)
    {
        if (token.IsCancellationRequested)
            return;
        try
        {
            if (distributedLock == null)
            {
                var clientConfig = new ConsulClientConfiguration { Address = new Uri("http://consul.host.domain.com") };
                ConsulClient client = new ConsulClient(clientConfig);
                distributedLock = await client.AcquireLock(new LockOptions(key) { LockTryOnce = true, LockWaitTime = TimeSpan.FromSeconds(3) }, token).ConfigureAwait(false);
            }
            else
            {
                if (!distributedLock.IsHeld)
                {
                    await distributedLock.Acquire(token).ConfigureAwait(false);
                }
            }
        }
        catch (LockMaxAttemptsReachedException ex)
        {
            //this is expected if it couldn't acquire the lock within the first attempt.
            Console.WriteLine(ex.Stacktrace);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Stacktrace);
        }
        finally
        {
            bool lockHeld = distributedLock?.IsHeld == true;
            HandleLockStatusChange(lockHeld);
            //Retrigger the timer after a 10 seconds delay (in this example). Delay for 7s if not held as the AcquireLock call will block for ~3s in every failed attempt.
            timer.Change(lockHeld ? 10000 : 7000, Timeout.Infinite);
        }
    }

    protected virtual void HandleLockStatusChange(bool isHeldNew)
    {
        // Is this the right way to check and do the work here?
        // In general I want to call method "DoTheWork" in "Action" method itself
        // And then release and destroy the session once work is done.
        if (isHeldNew)
        {
            // DoTheWork();
            Console.WriteLine("Hello");
            // And then were should I release the lock so that other machine can try to grab it?
            // distributedLock.Release();
            // distributedLock.Destroy();
        }

        if (lastIsHeld == isHeldNew)
            return;
        else
        {
            lastIsHeld = isHeldNew;
        }

        if (LeaderChanged != null)
        {
            LeaderChangedEventArgs args = new LeaderChangedEventArgs(lastIsHeld);
            foreach (EventHandler<LeaderChangedEventArgs> handler in LeaderChanged.GetInvocationList())
            {
                try
                {
                    handler(this, args);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Stacktrace);
                }
            }
        }
    }
}

And below is my LeaderChangedEventArgs class:

public class LeaderChangedEventArgs : EventArgs
{
    private bool isLeader;

    public LeaderChangedEventArgs(bool isHeld)
    {
        isLeader = isHeld;
    }

    public bool IsLeader { get { return isLeader; } }
}

In the above code there are lot of pieces which might not be needed for my use case but idea is same.

Problem Statement

Now in my Action method I would like to use above class and perform the task as soon as lock is acquired otherwise keep waiting for the lock. Once work is done, release and destroy the session so that other machine can grab it and do the work. I am kinda confuse on how to use above class properly in my below method.

 private void Action() {
    LeaderElectionService electionService = new LeaderElectionService("data/process");
    // electionService.LeaderChanged += (source, arguments) => Console.WriteLine(arguments.IsLeader ? "Leader" : "Slave");
    electionService.Start();

    // now how do I wait for the lock to be acquired here indefinitely
    // And once lock is acquired, do the work and then release and destroy the session
    // so that other machine can grab the lock and do the work
 }

I recently started working with C# so that’s why kinda confuse on how to make this work efficiently in production by using Consul and this library.


Get this bounty!!!

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.