Inner product of two unsigned byte vectors using AVX2 in C/C++

506 views Asked by At

I'd like to implement a fast correlation coefficient computation using SSE/AVX2. The operands are two unsigned char vectors. The function should be mostly equivalent to this:

float correlate_simple(const unsigned char* vec1, const unsigned char* vec2, size_t length)
{
    int sum1 = 0;
    int sum2 = 0;
    int sum11 = 0;
    int sum22 = 0;
    int sum12 = 0;

    for (size_t i = length; i > 0; --i, ++vec1, ++vec2) {
        sum1 += *vec1;
        sum2 += *vec2;
        sum11 += *vec1  * *vec1;
        sum22 += *vec2  * *vec2;
        sum12 += *vec1  * *vec2;
    }
    double mean1 = double(sum1) / double(length);
    double mean2 = double(sum2) / double(length);
    double mean11 = double(sum11) / double(length);
    double mean22 = double(sum22) / double(length);
    double mean12 = double(sum12) / double(length);

    double b = (mean11 - mean1 * mean1) * (mean22 - mean2 * mean2);
    if (b <= 0.0)
        return 0.0f;
    double a = (mean12 - mean1 * mean2);

    return float(a / sqrt(b));
}

The parameter length ranges from 1 to less than 1000.

In order to do this, I researched on how to implement an inner product of two unsigned byte arrays. However, I could not come up with a solution that does not involve converting all the unsigned 8 bit values to signed 16 bit values.

The intrinsic _mm256_maddubs_epi16(a, b) expects b to be a signed byte. This would not be a problem in this case since subtracting some constant (here: 127) from b does not change the correlation coefficient. Unfortunately I could not find an intrinsic that would allow me to subtract 127 from unsigned bytes producing signed bytes (not relying on some two's complement magic).

// vec: const unsigned char*
auto x = _mm256_load_si256((const __m256i*) vec);
auto v = _mm256_set1_epi8(127);

// wrong if vec[i] is less than 127:
auto x_centered = _mm256_sub_epi8 (x, v);

What would be the best approach here to compute inner products (and finally a correlation coefficient)?

Addendum:

Below is my current implementation of a pure inner product. I decided to convert to 16 bit integer to avoid overflow errors. Update: Changed from reading 128 bits to 256 bits at a time.

int accumulate_i32(__m256i x)
{
    auto tmp1 = _mm256_srli_si256(x, 8);
    x = _mm256_add_epi32(x, tmp1);
    auto tmp2 = _mm256_extractf128_si256(x, 1);
    tmp2 = _mm_add_epi32(tmp2, _mm256_castsi256_si128(x));
    
    return _mm_cvtsi128_si32(tmp2) + _mm_extract_epi32(tmp2, 1);
}

int inner_product_avx(const unsigned char* vec1, const unsigned char* vec2, unsigned int length)
{    
    constexpr unsigned int memoryAlignmentBytes = 32;
    constexpr unsigned int bytesPerPack = 256 / 8;

    assert((reinterpret_cast<std::uintptr_t>(vec1) % memoryAlignmentBytes) == 0);
    assert((reinterpret_cast<std::uintptr_t>(vec2) % memoryAlignmentBytes) == 0);    
    

    // compute middle part via AVX2    
    unsigned int packCount = length / bytesPerPack;
    const __m256i zeros = _mm256_setzero_si256();
    auto sumlo = _mm256_setzero_si256();
    auto sumhi = _mm256_setzero_si256();

    for (unsigned int packIdx = 0; packIdx < packCount; ++packIdx) {
        auto x1 = _mm256_load_si256((const __m256i*)vec1);
        auto x2 = _mm256_load_si256((const __m256i*)vec2);

        auto x1lo = _mm256_unpacklo_epi8(x1, zeros);
        auto x1hi = _mm256_unpackhi_epi8(x1, zeros);
        auto x2lo = _mm256_unpacklo_epi8(x2, zeros);
        auto x2hi = _mm256_unpackhi_epi8(x2, zeros);
        
        auto tmplo = _mm256_madd_epi16(x1lo, x2lo);
        auto tmphi = _mm256_madd_epi16(x1hi, x2hi);

        sumlo = _mm256_add_epi32(sumlo, tmplo);
        sumhi = _mm256_add_epi32(sumhi, tmphi);

        vec1 += bytesPerPack;
        vec2 += bytesPerPack;
    }

    int sum = accumulate_i32(sumlo) + accumulate_i32(sumhi);

    
    // compute remaining part that cannot be represented as a 
    // whole packed integer    
    unsigned int packRestCount = length % bytesPerPack;
    for (size_t i = packRestCount; i > 0; --i, ++vec1, ++vec2)
        sum += int(*vec1) * int(*vec2);

    
    return sum;
}

This takes roughly 20 % of the time of the simple C++ implementation (see below). Considering the fact that the AVX code works on 16 16-bit integers simultaneously, I would have expected a higher gain. - Is this reasonable or did I miss something?

Unrolling the last loop in the AVX code did not descrease computation time.

int inner_product_simple(const unsigned char* vec1, const unsigned char* vec2, size_t length)
{
    int sum = 0;

    for (size_t i = length; i > 0; --i, ++vec1, ++vec2)
        sum += int(*vec1) * int(*vec2);

    return sum;
}
2

There are 2 answers

16
Soonts On BEST ANSWER

I would start from something like that. It uses 32-bit accumulators just like your current code. Untested.

namespace
{
    // Compute sum of the 64-bit lanes, convert to double
    inline double hadd_epi64( const __m256i i64 )
    {
        __m128i res = _mm256_castsi256_si128( i64 );
        res = _mm_add_epi64( res, _mm256_extractf128_si256( i64, 1 ) );
        res = _mm_add_epi64( res, _mm_unpackhi_epi64( res, res ) );
        return (double)_mm_cvtsi128_si64( res );
    }

    // Convert 32-bit lanes into 64-bit, compute sum of the 8, convert to double
    inline double hadd_epu32( __m256i x )
    {
        const __m256i zero = _mm256_setzero_si256();
        __m256i i64 = _mm256_unpacklo_epi32( x, zero );
        i64 = _mm256_add_epi64( i64, _mm256_unpackhi_epi32( x, zero ) );
        return hadd_epi64( i64 );
    };
}

class InnerProduct
{
    // These fields are interpreted as 64-bit integers
    __m256i a, b;
    // These fields are interpreted as 32-bit integers
    __m256i aa, bb, ab;

    // Accumulate products of 16-bit numbers, 16 of them at once
    inline void add16( __m256i x, __m256i y )
    {
        const __m256i x2 = _mm256_madd_epi16( x, x );
        const __m256i y2 = _mm256_madd_epi16( y, y );
        const __m256i prod = _mm256_madd_epi16( x, y );

        aa = _mm256_add_epi32( aa, x2 );
        bb = _mm256_add_epi32( bb, y2 );
        ab = _mm256_add_epi32( ab, prod );
    }

public:

    InnerProduct()
    {
        a = b = aa = bb = ab = _mm256_setzero_si256();
    }

    // Handle 32 bytes
    inline void addBytes( __m256i x, __m256i y )
    {
        // Accumulate values
        const __m256i zero = _mm256_setzero_si256();
        a = _mm256_add_epi64( a, _mm256_sad_epu8( x, zero ) );
        b = _mm256_add_epi64( b, _mm256_sad_epu8( y, zero ) );

        // Split the vectors into 2 sets of 16-bit numbers, accumulate products
        const __m256i z = _mm256_unpacklo_epi8( x, zero );
        const __m256i w = _mm256_unpacklo_epi8( y, zero );
        add16( z, w );

        x = _mm256_unpackhi_epi8( x, zero );
        y = _mm256_unpackhi_epi8( y, zero );
        add16( x, y );
    }

    // Compute the result
    float compute( size_t count ) const
    {
        const double div = (double)count;
        const double mean1 = hadd_epi64( a ) / div;
        const double mean2 = hadd_epi64( b ) / div;
        const double mean11 = hadd_epu32( aa ) / div;
        const double mean22 = hadd_epu32( bb ) / div;
        const double mean12 = hadd_epu32( ab ) / div;

        const double b = ( mean11 - mean1 * mean1 ) * ( mean22 - mean2 * mean2 );
        if( b <= 0 )
            return 0;
        const double a = ( mean12 - mean1 * mean2 );
        return float( a / sqrt( b ) );
    }
};

// Load 1-31 bytes into AVX register, zero out unused higher bytes
inline __m256i loadPartial( const uint8_t* p, size_t length )
{
    alignas( 32 ) std::array<uint8_t, 32> arr{};
    memcpy( arr.data(), p, length );
    return _mm256_load_si256( ( const __m256i* )arr.data() );
}

float correlate_simple( const uint8_t* vec1, const uint8_t* vec2, const size_t length )
{
    InnerProduct ip;
    const uint8_t* const vec1End = vec1 + ( ( length / 32 ) * 32 );
    for( ; vec1 < vec1End; vec1 += 32, vec2 += 32 )
    {
        const __m256i x = _mm256_loadu_si256( ( const __m256i * )vec1 );
        const __m256i y = _mm256_loadu_si256( ( const __m256i * )vec2 );
        ip.addBytes( x, y );
    }
    const size_t remainder = length % 32;
    if( remainder > 0 )
    {
        const __m256i x = loadPartial( vec1, remainder );
        const __m256i y = loadPartial( vec2, remainder );
        ip.addBytes( x, y );
    }
    return ip.compute( length );
}
0
Maxim Egorushkin On

Unfortunately I could not find an intrinsic that would allow me to subtract 127 from unsigned bytes producing signed bytes (not relying on some two's complement magic).

Intel x86 CPUs use 2's compliment representation of signed numbers, this is why there are no separate versions of SIMD intrinsics for signed/unsigned packed integers. Intel SIMD intrinsics are outside the scope of C++ standard and have a specific well-defined behaviour.