Can Interlocked CompareExchange be used correctly in this multithreaded round-robin implementation?

1.1k views Asked by At

I need to round-robin some calls between N different connections because of some rate limits in a multithreaded context. I've decided to implement this functionality using a list and a "counter," which is supposed to "jump by one" between instances on each call.

I'll illustrate this concept with a minimal example (using a class called A to stand in for the connections)

class A
{
    public A()
    {
        var newIndex = Interlocked.Increment(ref index);
        ID = newIndex.ToString();
    }
    private static int index;
    public string ID;
}

static int crt = 0;
static List<A> Items = Enumerable.Range(1, 15).Select(i => new A()).ToList();
static int itemsCount = Items.Count;

static A GetInstance()
{            
    var newIndex = Interlocked.Increment(ref crt);
    var instance = Items[newIndex % itemsCount];
    //Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
    return instance;
}

static void Test()
{
    var sw = Stopwatch.StartNew();

    var tasks = Enumerable.Range(1, 1000000).Select(i => Task.Run(GetInstance)).ToArray();
    Task.WaitAll(tasks);
}

This works as expected in that it ensures that calls are round-robin-ed between the connections. I will probably stick to this implementation in the "real" code (with a long instead of an int for the counter)

However, even if it is unlikely to reach int.MaxValue in my use case, I wondered if there is a way to "safely overflow" the counter.

I know that "%" in C# is "Remainder" rather than "Modulus," which would mean that some ?: gymnastics would be required to always return positives, which I want to avoid.

So what I wanted to cume up with is instead something like:

static A GetInstance()
{            
    var newIndex = Interlocked.Increment(ref crt);
    Interlocked.CompareExchange(ref crt, 0, itemsCount); //?? the return value is the original value, how to know if it succeeded
    var instance = Items[newIndex];
    //Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
    return instance;
}

What I am expecting is that Interlocked.CompareExchange(ref crt, 0, itemsCount) would be "won" by only one thread, setting the counter back to 0 once it reaches the number of connections available. However, I don't know how to use this in this context.

Can CompareExchange or another mechanism in Interlocked be used here?

3

There are 3 answers

13
xanatos On BEST ANSWER

You could probably:

static int crt = -1;
static readonly IReadOnlyList<A> Items = Enumerable.Range(1, 15).Select(i => new A()).ToList();
static readonly int itemsCount = Items.Count;
static readonly int maxItemCount = itemsCount * 100;

static A GetInstance()
{
    int newIndex;

    while (true)
    {
        newIndex = Interlocked.Increment(ref crt);

        if (newIndex >= itemsCount)
        {
            while (newIndex >= itemsCount && Interlocked.CompareExchange(ref crt, -1, newIndex) != newIndex)
            {
                // There is an implicit memory barrier caused by the Interlockd.CompareExchange around the
                // next line
                // See for example https://afana.me/archive/2015/07/10/memory-barriers-in-dot-net.aspx/
                // A full memory barrier is the strongest and interesting one. At least all of the following generate a full memory barrier implicitly:
                // Interlocked class mehods
                newIndex = crt;
            }

            continue;
        }

        break;
    }

    var instance = Items[newIndex % itemsCount];
    //Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
    return instance;
}

But I have to say the truth... I'm not sure if it is correct (it should be), and explaining it is hard, and if anyone touches it in any way it will break.

The basic idea is to have a "low" ceiling for crt (we don't want to overflow, it would break everything... so we want to keep veeeeeery far from int.MaxValue, or you could use uint).

The maximum possible value is:

maxItemCount = (int.MaxValue - MaximumNumberOfThreads) / itemsCount * itemsCount;

The / itemsCount * itemsCount is because we want the rounds to be equally distributed. In the example I give I use a probably much lower number (itemsCount * 100) because lowering this ceiling will only cause the reset more often, but the reset isn't so much slow that it is truly important (it depends on what you are doing on the threads. If they are very small threads that only use cpu then the reset is slow, but if not then it isn't).

Then when we overflow this ceiling we try to move it back to -1 (our starting point). We know that at the same time other bad bad threads could Interlocked.Increment it and create a race on this reset. Thanks to the Interlocked.CompareExchange only one thread can successfully reset the counter, but the other racing threads will immediately see this and break from their attempts.

Mmmh... The if can be rewritten as:

if (newIndex >= itemsCount)
{
    int newIndex2;
    while (newIndex >= itemsCount && (newIndex2 = Interlocked.CompareExchange(ref crt, 0, newIndex)) != newIndex)
    {
        // If the Interlocked.CompareExchange is successfull, the while will end and so we won't be here,
        // if it fails, newIndex2 is the current value of crt
        newIndex = newIndex2;
    }

    continue;
}
4
Theodor Zoulias On

No, the Interlocked class offers no mechanism that would allow you to restore an Int32 value back to zero in case it overflows. The reason is that it is possible for two threads to invoke concurrently the var newIndex = Interlocked.Increment(ref crt); statement, in which case both with overflow the counter, and then none will succeed in updating the value back to zero. This functionality is just beyond the capabilities of the Interlocked class. To make such complex operations atomic you'll need to use some other synchronization mechanism, like a lock.


Update: xanatos's answer proves that the above statement is wrong. It is also proven wrong by the answers of this 9-year old question. Below are two implementation of an InterlockedIncrementRoundRobin method. The first is a simplified version of this answer, by Alex Sorokoletov:

public static int InterlockedRoundRobinIncrement(ref int location, int modulo)
{
    // Arguments validation omitted (the modulo should be a positive number)
    uint current = unchecked((uint)Interlocked.Increment(ref location));
    return (int)(current % modulo);
}

This implementation is very efficient, but it has the drawback that the backing int value is not directly usable, since it circles through the whole range of the Int32 type (including negative values). The usable information comes by the return value of the method itself, which is guaranteed to be in the range [0..modulo]. If you want to read the current value without incrementing it, you would need another similar method that does the same int -> uint -> int conversion:

public static int InterlockedRoundRobinRead(ref int location, int modulo)
{
    uint current = unchecked((uint)Volatile.Read(ref location));
    return (int)(current % modulo);
}

It also has the drawback that once every 4,294,967,296 increments, and unless the modulo is a power of 2, it returns a 0 value prematurely, before having reached the modulo - 1 value. In other words the rollover logic is technically flawed. This may or may not be a big issue, depending on the application.

The second implementation is a modified version of xanatos's algorithm:

public static int InterlockedRoundRobinIncrement(ref int location, int modulo)
{
    // Arguments validation omitted (the modulo should be a positive number)
    while (true)
    {
        int current = Interlocked.Increment(ref location);
        if (current >= 0 && current < modulo) return current;

        // Overflow. Try to zero the number.
        while (true)
        {
            int current2 = Interlocked.CompareExchange(ref location, 0, current);
            if (current2 == current) return 0; // Success
            current = current2;
            if (current >= 0 && current < modulo)
            {
                break; // Another thread zeroed the number. Retry increment.
            }
        }
    }
}

This is slightly less efficient (especially for small modulo values), because once in a while an Interlocked.Increment operation results to an out-of-range value, and the value is rejected and the operation repeated. It does have the advantage though that the backing int value remains in the [0..modulo] range, except for some very brief time spans, during some of this method's invocations.

1
Alexandru Clonțea On

An alternative to using CompareExchange would be to simply let the values overflow.

I have tested this and could not prove it wrong (so far), but of course that does not mean that it isn't.

 //this incurs some cost, but "should" ensure that the int range
 // is mapped to the unit range (int.MinValue is mapped to 0 in the uint range)
 static ulong toPositive(int i) => (uint)1 + long.MaxValue + (uint)i;

 static A GetInstance()
 {
    //this seems to overflow safely without unchecked
    var newCounter = Interlocked.Increment(ref crt);
    //convert the counter to a list index, that is map the unsigned value
    //to a signed range and get the value modulus the itemCount value
    var newIndex = (int)(toPositive(newCounter) % (ulong)itemsCount);
    var instance = Items[newIndex];
    //Console.WriteLine($"{DateTime.Now.Ticks}, Got instance: {instance.ID}");
    return instance;
 }

PS: Another part of the xy problem part of my question: At a friend's suggestion I am currently investigating using a LinkedList or something similar (with locks) to achieve the same purpose.