How to quickly subtract one ushort array from another in C#?

1.9k views Asked by At

I need to quickly subtract each value in ushort arrayA from the corresponding index value in ushort arrayB which has an identical length.

In addition, if the difference is negative, I need to store a zero, not the negative difference.

(Length = 327680 to be exact, since I'm subtracting a 640x512 image from another image of identical size).

The code below is currently taking ~20ms and I'd like to get it down under ~5ms if possible. Unsafe code is ok, but please provide an example, as I'm not super-skilled at writing unsafe code.

Thank you!

public ushort[] Buffer { get; set; }

public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
    System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
    sw.Start();

    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        int difference = Buffer[index] - backgroundBuffer[index];

        if (difference >= 0)
            Buffer[index] = (ushort)difference;
        else
            Buffer[index] = 0;
    }

    Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
}

UPDATE: While it's not strictly C#, for the benefit of others who read this, I finally ended up adding a C++ CLR Class Library to my solution with the following code. It runs in ~3.1ms. If an unmanaged C++ library is used, it runs in ~2.2ms. I decided to go with the managed library since the time difference was small.

// SpeedCode.h
#pragma once
using namespace System;

namespace SpeedCode
{
    public ref class SpeedClass
    {
        public:
            static void SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength);
    };
}

// SpeedCode.cpp
// This is the main DLL file.
#include "stdafx.h"
#include "SpeedCode.h"

namespace SpeedCode
{
    void SpeedClass::SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength)
    {
        for (int index = 0; index < bufferLength; index++)
        {
            buffer[index] = (UInt16)((buffer[index] - backgroundBuffer[index]) * (buffer[index] > backgroundBuffer[index]));
        }
    }
}

Then I call it like this:

    public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
    {
        System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
        sw.Start();

        SpeedCode.SpeedClass.SpeedSubtractBackgroundFromBuffer(Buffer, backgroundBuffer, Buffer.Length);

        Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
    }
6

There are 6 answers

6
nick_w On BEST ANSWER

Some benchmarks.

  1. SubtractBackgroundFromBuffer: this is the original method from the question.
  2. SubtractBackgroundFromBufferWithCalcOpt: this is the original method augmented with TTat's idea for improving the calculation speed.
  3. SubtractBackgroundFromBufferParallelFor: the solution from Selman22's answer.
  4. SubtractBackgroundFromBufferBlockParallelFor: my answer. Similar to 3., but breaks the processing up into blocks of 4096 values.
  5. SubtractBackgroundFromBufferPartitionedParallelForEach: Geoff's first answer.
  6. SubtractBackgroundFromBufferPartitionedParallelForEachHack: Geoff's second answer.

Updates

Interestingly, I can get a small speed increase (~6%) for SubtractBackgroundFromBufferBlockParallelFor by using (as suggested by Bruno Costa)

Buffer[i] = (ushort)Math.Max(difference, 0);

instead of

if (difference >= 0)
    Buffer[i] = (ushort)difference;
else
    Buffer[i] = 0;

Results

Note that this is the total time for the 1000 iterations in each run.

SubtractBackgroundFromBuffer(ms):                                 2,062.23 
SubtractBackgroundFromBufferWithCalcOpt(ms):                      2,245.42
SubtractBackgroundFromBufferParallelFor(ms):                      4,021.58
SubtractBackgroundFromBufferBlockParallelFor(ms):                   769.74
SubtractBackgroundFromBufferPartitionedParallelForEach(ms):         827.48
SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):     539.60

So it seems from those results that the best approach combines the calculation optimizations for a small gain and the makes use of Parallel.For to operate on chunks of the image. Your mileage will of course vary, and performance of parallel code is sensitive to the CPU you are running.

Test Harness

I ran this for each method in Release mode. I am starting and stopping the Stopwatch this way to ensure only processing time is measured.

System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
ushort[] bgImg = GenerateRandomBuffer(327680, 818687447);

for (int i = 0; i < 1000; i++)
{
    Buffer = GenerateRandomBuffer(327680, 128011992);                

    sw.Start();
    SubtractBackgroundFromBuffer(bgImg);
    sw.Stop();
}

Console.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));


public static ushort[] GenerateRandomBuffer(int size, int randomSeed)
{
    ushort[] buffer = new ushort[327680];
    Random random = new Random(randomSeed);

    for (int i = 0; i < size; i++)
    {
        buffer[i] = (ushort)random.Next(ushort.MinValue, ushort.MaxValue);
    }

    return buffer;
}

The Methods

public static void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        int difference = Buffer[index] - backgroundBuffer[index];

        if (difference >= 0)
            Buffer[index] = (ushort)difference;
        else
            Buffer[index] = 0;
    }
}

public static void SubtractBackgroundFromBufferWithCalcOpt(ushort[] backgroundBuffer)
{
    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        if (Buffer[index] < backgroundBuffer[index])
        {
            Buffer[index] = 0;
        }
        else
        {
            Buffer[index] -= backgroundBuffer[index];
        }
    }
}

public static void SubtractBackgroundFromBufferParallelFor(ushort[] backgroundBuffer)
{
    Parallel.For(0, Buffer.Length, (i) =>
    {
        int difference = Buffer[i] - backgroundBuffer[i];
        if (difference >= 0)
            Buffer[i] = (ushort)difference;
        else
            Buffer[i] = 0;
    });
}        

public static void SubtractBackgroundFromBufferBlockParallelFor(ushort[] backgroundBuffer)
{
    int blockSize = 4096;

    Parallel.For(0, (int)Math.Ceiling(Buffer.Length / (double)blockSize), (j) =>
    {
        for (int i = j * blockSize; i < (j + 1) * blockSize; i++)
        {
            int difference = Buffer[i] - backgroundBuffer[i];

            Buffer[i] = (ushort)Math.Max(difference, 0);                    
        }
    });
}

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                if (Buffer[i] < backgroundBuffer[i])
                {
                    Buffer[i] = 0;
                }
                else
                {
                    Buffer[i] -= backgroundBuffer[i];
                }
            }
        });
}

public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
    {
        for (int i = range.Item1; i < range.Item2; ++i)
        {
            unsafe
            {
                var nonNegative = Buffer[i] > backgroundBuffer[i];
                Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                    *((int*)(&nonNegative)));
            }
        }
    });
}
1
Jodrell On

What about,

Enumerable.Range(0, Buffer.Length).AsParalell().ForAll(i =>
    {
         unsafe
        {
            var nonNegative = Buffer[i] > backgroundBuffer[i];
            Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                *((int*)(&nonNegative)));
        }
    });
1
Maximum Cookie On

You may get a minor performance increase by checking first if the result is going to be negative before actually performing the subtraction. That way, there is no need to perform the subtraction if the result will be negative. Example:

if (Buffer[index] > backgroundBuffer[index])
    Buffer[index] = (ushort)(Buffer[index] - backgroundBuffer[index]);
else
    Buffer[index] = 0;
3
Selman Genç On

You can try Parallel.For :

Parallel.For(0, Buffer.Length, (i) =>
{
    int difference = Buffer[i] - backgroundBuffer[i];
    if (difference >= 0)
          Buffer[i] = (ushort) difference;
    else
         Buffer[i] = 0;
}); 

Update: I have tried it and I see there is a minimal difference in your case,But when the array become bigger the difference is become bigger too

enter image description here

0
Steven Liekens On

Here's a solution that uses Zip():

Buffer = Buffer.Zip<ushort, ushort, ushort>(backgroundBuffer, (x, y) =>
{
    return (ushort)Math.Max(0, x - y);
}).ToArray();

It doesn't perform as well as the other answers, but it's definitely the shortest solution.

7
Ergwun On

This is an interesting question.

Only performing the subtraction after testing that the result won't be negative (as suggested by TTat and Maximum Cookie) has negligible impact, as this optimization can already be performed by the JIT compiler.

Parallelizing the task (as suggested by Selman22) is a good idea, but when the loop is as fast as it is in this case, the overhead ends up outwaying the gains, so Selman22's implementation actually runs slower in my testing. I suspect that nick_w's benchmarks were produced with debugger attached, hiding this fact.

Parallelizing the task in larger chunks (as suggested by nick_w) deals with the overhead problem, and can actually produce faster performance, but you don't have to calculate the chunks yourself - you can use Partitioner to do this for you:

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                if (Buffer[i] < backgroundBuffer[i])
                {
                    Buffer[i] = 0;
                }
                else
                {
                    Buffer[i] -= backgroundBuffer[i];
                }
            }
        });
}

The above method consistently outperforms nick_w's hand-rolled chunking in my testing.

But wait! There's more to it than that.

The real culprit in slowing down your code is not the assignment or arithmetic. It's the if statement. How it affects performance is going to be majorly impacted by the nature of the data you are processing.

nick_w's benchmarking generates random data of the same magnitude for both buffers. However, I suspect that it is very likely that you actually have lower average magnitude data in the background buffer. This detail can be significant due to branch prediction (as explained in this classic SO answer).

When the value in the background buffer is usually smaller than that in the buffer, the JIT compiler can notice this, and optimize for that branch accordingly. When the data in each buffer is from the same random population there is no way to guess the outcome of the if statement with greater than 50% accuracy. It is this latter scenario that nick_w is benchmarking, and under those conditions we could potentially further optimize your method by using unsafe code to convert a bool to an integer and avoid branching at all. (Note that the following code is relying on an implementation detail of how bool's are represented in memory, and while it works for your scenario in .NET 4.5, it is not necessarily a good idea, and is shown here for illustrative purposes.)

public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                unsafe
                {
                    var nonNegative = Buffer[i] > backgroundBuffer[i];
                    Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                        *((int*)(&nonNegative)));
                }
            }
        });
}

If you are really looking to shave a bit more time off, then you can follow this approach in a safer manner by switching language to C++/CLI, as that will let you use a boolean in an arithmetic expression without resorting to unsafe code:

UInt16 MyCppLib::Maths::SafeSubtraction(UInt16 minuend, UInt16 subtrahend)
{
    return (UInt16)((minuend - subtrahend) * (minuend > subtrahend));
}

You can create a purely managed DLL using C++/CLI exposing the above static method, and then use it in your C# code:

public static void SubtractBackgroundFromBufferPartitionedParallelForEachCpp(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
    {
        for (int i = range.Item1; i < range.Item2; ++i)
        {
            Buffer[i] = 
                MyCppLib.Maths.SafeSubtraction(Buffer[i], backgroundBuffer[i]);
        }
    });
}

This outperforms the hacky unsafe C# code above. In fact, it is so fast that you could write the whole method using C++/CLI forgetting about parallelization, and it would still out-perform the other techniques.

Using nick_w's test harness, the above method will outperform any of the other suggestions published here so far. Here are the results I get (1-4 are the cases he tried, and 5-7 are the ones outlined in this answer):

1. SubtractBackgroundFromBuffer(ms):                               2,021.37
2. SubtractBackgroundFromBufferWithCalcOpt(ms):                    2,125.80
3. SubtractBackgroundFromBufferParallelFor(ms):                    3,431.58
4. SubtractBackgroundFromBufferBlockParallelFor(ms):               1,401.36
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):     1,197.76
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):   742.72
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms):    499.27

However, in the scenario I expect you actually have, where background values are typically smaller, successful branch prediction improves results across the board, and the 'hack' to avoid the if statement is actually slower:

Here are the results I get using nick_w's test harness when I restrict values in the background buffer to the range 0-6500 (c. 10% of the buffer):

1. SubtractBackgroundFromBuffer(ms):                                 773.50
2. SubtractBackgroundFromBufferWithCalcOpt(ms):                      915.91
3. SubtractBackgroundFromBufferParallelFor(ms):                    2,458.36
4. SubtractBackgroundFromBufferBlockParallelFor(ms):                 663.76
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):       658.05
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):   762.11
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms):    494.12

You can see that results 1-5 have all dramatically improved since they are now benefiting from better branch prediction. Results 6 & 7 haven't changed much, since they have avoided branching.

This change in data has completely changes things. In this scenario, even the fastest all C# solution is now only 15% faster than your original code.

Bottom line: be sure to test any method you pick with representative data, or your results will be meaningless.